RabbitMQ – Work Queues

In this post we will experiment with a work queue using the RabbitMQ middleware software on a Windows 10 machine. After we cover the basics, we will integrate RabbitMQ with Docker and a group of microservices. It is extremely important to read and experiment until all the concepts are clearly understood.

With that said, let’s talk about work queues. In this example we will create a Work Queue that will be used to distribute time-consuming tasks among multiple workers. I have been experimenting with one, two and three worker threads.

The main idea behind Work Queues (aka: Task Queues) is to avoid doing a resource-intensive task immediately and having to wait for it to complete. Instead we schedule the task to be done later. We encapsulate a task as a message and send it to a queue.  A worker process running in the background will pop the tasks and eventually execute the job. When we run many workers the tasks will be shared between them. This we will see in the example documented in this post.

This concept is especially useful in web applications where it’s impossible to handle a complex task during a short HTTP request window. One of the advantages of using a Task Queue is the ability to easily parallelize work. If we are building up a backlog of work, we can just add more workers and that way, scaling horizontally is quite easy. In a future post we will use this feature with microservices.

By default, RabbitMQ will send each message to the next consumer, in sequence.  On average every consumer will get the same number of messages.  This way of distributing messages is called round-robin.

But we do not wish to lose tasks. If a worker dies, we would like for failed task to be delivered to another worker.

In order to make sure a message is never lost, RabbitMQ supports message acknowledgments.  An ack(nowledgement) is sent back by the consumer to tell RabbitMQ that a particular message has been received, processed and that RabbitMQ is free to delete it.

In this post, we will be sending strings that stand for complex tasks.  We do not have a real-world task, like images to be resized or pdf files to be rendered, so we will fake it by just pretending we are busy – by using the Thread.sleep() function.  We will take the number of dots in the string as its complexity; every dot will account for one second of “work”. For example, a message “Hello…..” will create a task that will take five seconds because the message string contains five ‘.’s.

The code for this post was developed in Java using the Eclipse IDE. The software was developed using a Maven project. We will create a workqueues.jar and will embed it in two separate batch files. Following are the batch files:

C:\Temp>type worker.bat
java -jar workqueues.jar

And

C:\Temp>type newtask.bat
echo %1
java -jar workqueues.jar %1

The software implements the feature of sending incomplete tasks to other worker processes. You can read more about this feature here.

In this setup, we have three consoles. Two run the worker.bat and the other runs newtask.bat. With the workers up and running, we send four tasks. About half way processing the first task received, we kill the process. Given that RabbitMQ would have sent two jobs / messages to each worker process, it ends up sending the four jobs to the remaining worker. Pretty cool; is in it?

The following screen capture shows the process used to send four messages to RabbitMQ. Half should be processed by each worker.

C:\Temp>newtask Hello..........     <==== 1 C:\Temp>echo Hello..........
Hello..........

C:\Temp>java -jar workqueues.jar Hello..........
main <<< args.length: 1 arg ==>Hello..........<==
main <<< arg ==>Hello..........<==
postWork <<< [x] Sent message ==> Hello..........<==
main <<< done C:\Temp>newtask Hello..........     <==== 2 C:\Temp>echo Hello..........
Hello..........

C:\Temp>java -jar workqueues.jar Hello..........
main <<< args.length: 1 arg ==>Hello..........<==
main <<< arg ==>Hello..........<==
postWork <<< [x] Sent message ==> Hello..........<==
main <<< done C:\Temp>newtask Hello..........     <==== 3 C:\Temp>echo Hello..........
Hello..........

C:\Temp>java -jar workqueues.jar Hello..........
main <<< args.length: 1 arg ==>Hello..........<==
main <<< arg ==>Hello..........<==
postWork <<< [x] Sent message ==> Hello..........<==
main <<< done C:\Temp>newtask Hello..........     <==== 4 C:\Temp>echo Hello..........
Hello..........

C:\Temp>java -jar workqueues.jar Hello..........
main <<< args.length: 1 arg ==>Hello..........<==
main <<< arg ==>Hello..........<==
postWork <<< [x] Sent message ==> Hello..........<==
main <<< done

Following is the output for the first worker thread:

C:\Temp>java -jar workqueues.jar
main <<< args.length: 0
processWork <<< [*] waiting for messages (to exit press CTRL+C)
main <<< done
processWork <<< [x] received ==> Hello..........<==
doWork <<< working 0 ...
doWork <<< working 1 ...
doWork <<< working 2 ...
doWork <<< working 3 ...
doWork <<< working 4 ...
doWork <<< working 5 ...
doWork <<< working 6 ...
doWork <<< working 7 ...
Terminate batch job (Y/N)? y    <==== killed C:\Temp>

The output for the second worker follows:

processWork <<< [x] received ==> Hello..........<==
doWork <<< working 0 ...
doWork <<< working 1 ...
doWork <<< working 2 ...
doWork <<< working 3 ...
doWork <<< working 4 ...
doWork <<< working 5 ...
doWork <<< working 6 ...
doWork <<< working 7 ...
doWork <<< working 8 ...
doWork <<< working 9 ...
doWork <<< done !!!
processWork <<< [x] Done        <==== 1
processWork <<< [x] received ==> Hello..........<==
doWork <<< working 0 ...
doWork <<< working 1 ...
doWork <<< working 2 ...
doWork <<< working 3 ...
doWork <<< working 4 ...
doWork <<< working 5 ...
doWork <<< working 6 ...
doWork <<< working 7 ...
doWork <<< working 8 ...
doWork <<< working 9 ...
doWork <<< done !!!
processWork <<< [x] Done        <==== 2
processWork <<< [x] received ==> Hello..........<==
doWork <<< working 0 ...
doWork <<< working 1 ...
doWork <<< working 2 ...
doWork <<< working 3 ...
doWork <<< working 4 ...
doWork <<< working 5 ...
doWork <<< working 6 ...
doWork <<< working 7 ...
doWork <<< working 8 ...
doWork <<< working 9 ...
doWork <<< done !!!
processWork <<< [x] Done        <==== 3
processWork <<< [x] received ==> Hello..........<==
doWork <<< working 0 ...
doWork <<< working 1 ...
doWork <<< working 2 ...
doWork <<< working 3 ...
doWork <<< working 4 ...
doWork <<< working 5 ...
doWork <<< working 6 ...
doWork <<< working 7 ...
doWork <<< working 8 ...
doWork <<< working 9 ...
doWork <<< done !!!
processWork <<< [x] Done        <==== 4

As you can see, we killed the first work process about half way into processing the first message. What is interesting to note that no messages were lost. All got processed.

The code for the pom.xml file follows:

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.canessa.workqueues</groupId>
  <artifactId>WorkQueues</artifactId>
  <version>0.0.1-SNAPSHOT</version>

  <name>WorkQueues</name>
  <description>A simple WorkQueues.</description>
  <!-- FIXME change it to the project's website -->
  <url>http://www.example.com</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.7</maven.compiler.source>
    <maven.compiler.target>1.7</maven.compiler.target>
  </properties>

  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
    </dependency>
    
   	<!-- **** project specific dependencies **** -->
	<dependency>
  		<groupId>com.rabbitmq</groupId>
  		<artifactId>amqp-client</artifactId>
  		<version>5.5.1</version>
  	</dependency>
  	
	<dependency>
   		<groupId>org.slf4j</groupId>
   		<artifactId>slf4j-simple</artifactId>
   		<version>1.7.25</version>
	</dependency>
    
  </dependencies>

  <build>
    <pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
      <plugins>
        <plugin>
          <artifactId>maven-clean-plugin</artifactId>
          <version>3.1.0</version>
        </plugin>
        <plugin>
          <artifactId>maven-site-plugin</artifactId>
          <version>3.7.1</version>
        </plugin>
        <plugin>
          <artifactId>maven-project-info-reports-plugin</artifactId>
          <version>3.0.0</version>
        </plugin>
        <!-- see http://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging -->
        <plugin>
          <artifactId>maven-resources-plugin</artifactId>
          <version>3.0.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-compiler-plugin</artifactId>
          <version>3.8.0</version>
        </plugin>
        <plugin>
          <artifactId>maven-surefire-plugin</artifactId>
          <version>2.22.1</version>
        </plugin>
        <plugin>
          <artifactId>maven-jar-plugin</artifactId>
          <version>3.0.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-install-plugin</artifactId>
          <version>2.5.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-deploy-plugin</artifactId>
          <version>2.8.2</version>
        </plugin>
      </plugins>
    </pluginManagement>
  </build>

  <reporting>
    <plugins>
      <plugin>
        <artifactId>maven-project-info-reports-plugin</artifactId>
      </plugin>
    </plugins>
  </reporting>
</project>

The Java code for the App class which implements main() follows:

package com.canessa.workqueues.WorkQueues;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/*
 * 
 */
public class App 
{
	
	/*
	 * main entry point
	 */
    public static void main( String[] args) throws IOException, TimeoutException {
    	
    	// **** ****
    	System.out.println("main <<< args.length: " + args.length); for (String arg : args) System.out.println("arg ==>" + arg + "<==");
    	
    	// **** proceed based on the calling arguments ****
    	if (args.length == 0) {
    		Worker worker = new Worker();
    		
    		worker.processWork();
    	}
    	
    	// **** add new task to the queue ****
    	else if (args.length == 1) {
    		
    		// **** ****
    		NewTask newTask = new NewTask();
    		
    		// **** ****
    		String arg = args[0];
    		System.out.println("main <<< arg ==>" + arg + "<==");
    		
    		newTask.postWork(arg);
    	}
    	
    	// **** invalid set of arguments ****
    	else {
    		System.err.println("main <<< UNEXTECTED args.length: " + args.length);
    	}
    	
    	// **** ****
    	System.out.println("main <<< done");

    }
}

The code for the NewTask class follows:

package com.canessa.workqueues.WorkQueues;


import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;


/*
 * 
 */
public class NewTask {
	
	// **** name for our queue ****
    private final static String TASK_QUEUE_NAME = "task_queue";
    
	/*
	 * constructor
	 */
	public NewTask() {
	}
	
	/*
	 * 
	 */
	public void postWork(String task) throws IOException, TimeoutException {
		
    	// **** connect to RabbitMQ ****
        ConnectionFactory factory = new ConnectionFactory();
        
        // **** default host to use for connections ****
        factory.setHost("localhost");
        
        // **** open a channel ****
        try (Connection connection = factory.newConnection();
        		
        	// **** ****
        	Channel channel = connection.createChannel()) {
        	
        	// **** declare a durable queue ****
        	boolean durable = true;
        	channel.queueDeclare(	TASK_QUEUE_NAME,		// name of queue
        							durable,				// durable
        							false,					// exclusive
        							false,					// auto delete
        							null);					// construction arguments for the queue
        	
    		// **** ****
    		String message = " " + task;
     		
        	// **** send persistent messages ****
			channel.basicPublish	("",					// exchange the exchange to publish the message to
									TASK_QUEUE_NAME,		// routingKey the routing key
									MessageProperties.PERSISTENT_TEXT_PLAIN,
									message.getBytes("UTF-8"));
			
			// **** ****
			System.out.println("postWork <<< [x] Sent message ==>" + message + "<==");
        }

	}

}

The code for the Worker.java class follows:

package com.canessa.workqueues.WorkQueues;


import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

/*
 * 
 */
public class Worker {
	
	// **** name for our queue ****
    private final static String TASK_QUEUE_NAME = "task_queue";
    
	/*
	 * constructor
	 */
	public Worker() {
	}
	
	/*
	 * 
	 */
	public void processWork() throws IOException, TimeoutException {
		
    	// **** connect to RabbitMQ ****
        ConnectionFactory factory = new ConnectionFactory();
        
        // **** default host to use for connections ****
        factory.setHost("localhost");
        
        // **** ****
        final Connection connection = factory.newConnection();
        
        // **** ****
        final Channel channel = connection.createChannel();
        
    	// **** declare a durable queue ****
        boolean durable = true;
    	channel.queueDeclare(	TASK_QUEUE_NAME,		// name of queue
    							durable,				// durable
    							false,					// exclusive
    							false,					// auto delete
    							null);					// construction arguments for the queue
    	
    	// **** ****
        System.out.println("processWork <<< [*] waiting for messages (to exit press CTRL+C)"); // **** do not to give more than one message to a worker at a time **** int prefetchCount = 1; channel.basicQos(prefetchCount); // **** **** DeliverCallback deliverCallback = (consumerTag, delivery) -> {
			
			// **** ****
			String message = new String(delivery.getBody(), "UTF-8");

			// **** ****
			System.out.println("processWork <<< [x] received ==>" + message + "<==");
			
			// **** ****
			try {
			    doWork(message);
			}  finally {
				
				// **** ****
			    System.out.println("processWork <<< [x] Done"); // **** **** channel.basicAck( delivery.getEnvelope().getDeliveryTag(), false); } }; // **** **** boolean autoAck = false; channel.basicConsume( TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
	}
	
	/*
	 * fake task to simulate execution time
	 */
	private static void doWork(String task) {
		
		// **** loop working ****
		int i = 0;
	    for (char ch: task.toCharArray()) {
	        if (ch == '.') {
	        	try {
	        		System.out.println("doWork <<< working " + (i++) + " ...");
		        	Thread.sleep(1000);
	        	} catch (InterruptedException _ignored) {
	        		Thread.currentThread().interrupt();
	        	}
	        }
	    }
	 
	    // **** we are done ****
	    System.out.println("doWork <<< done !!!");
	}
}

If interested, the entire code for this project is available in my GitHub repository.

The base content for this code came from examples that I have been experimenting with from the RabbitMQ web site.

If you have comments / questions regarding this or any other post in this blob, or if you need additional help with some software development project, please leave me a note bellow. The notes will not appear until they are approved. Requests are never displayed.

Keep on learning and having fun developing the best possible software.

John

Follow me on Twitter:  @john_canessa

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.