Threads – Part II

In my previous post on this series Threads – Part I we ended up with a fully sequential program. In this post we will modify it to add two threads and a queue. The main program will create a queue and a receiver and a processor threads. The threads will be started. The receiver thread will generate requests and the processor thread will work fulfilling them. Simple approach but contains some issues. Such issues will be addressed in following post.

There at least two ways to create a thread in Java. We will cover both in this post. I strongly recommend that when possible have the entire development team use a single approach to create threads. It is easier to understand and maintain.

The first approach to create a thread is by extending the Thread Java class. The second approach is to implement the Runnable interface. Following is the Java 8 test program starting the receiver and processor threads in our modified program:

package canessa.john.threads;

import java.util.LinkedList;

import canessa.john.threads.ReceiveProcess.ThreadType;

public class Solution {

	// **** constants ****
	
	static final int MAX_REQUESTS = 7;
	
	/**
	 * Test program implements main thread.
	 */
	public static void main(String[] args) {

		// **** tell user we started the main thread ****
		
		System.out.println("main <<< started ...");
		
		// **** ****
		
		long beginTime = System.currentTimeMillis();	

		// **** create a queue to insert and get client requests ****

		LinkedList<String> requestQ = new LinkedList<String>();

		// **** simulate receiving client requests ****

		Receiver receiverThread = new Receiver(requestQ, MAX_REQUESTS);
		receiverThread.start();

		// **** simulate processing client requests ****

		Processor processor 	= new Processor(requestQ, MAX_REQUESTS);
		Thread processThread 	= new Thread(processor);
		processThread.start();

		// **** wait for the process thread to be done ****
		
		System.out.println("main <<< waiting for processThread to de done ...");
		try {
			processThread.join(1000 * MAX_REQUESTS);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		
		// **** check if the process thread is still alive ****
		
		if (!processThread.isAlive()) {
			System.out.println("main <<< processThread done !!!");			
		} else {
			
			// **** tell the process thread to shutdown ****
			
			processor.shutDown();
			
			// **** wait for the process thread to shutdown ****
			
			while (processThread.isAlive()) {
				System.out.println("main <<< waiting for processor to shutdown ...");
				try {
					Thread.sleep(1000);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		}
	
		// **** tell user we are done with the main thread ****
		
		long endTime = System.currentTimeMillis();
		System.out.println("main <<< duration: " + (endTime - beginTime) + " ms - bye bye !!!");
	}
}

We first create a request queue. The queue will be populated by the request thread faster than the process thread can address them. The main thread then creates the process thread.  The main program waits for the process thread to complete before calling it quits that way all pending requests will be addressed. In practice we should first ask the request thread to stop receiving requests and then allow the process thread to complete servicing all pending requests before shutting down. That is how the storage server is designed and implemented, but for sake of brevity we will skip some of those operations.

The updated code for the Receiver class follows:

package canessa.john.threads;

import java.util.LinkedList;
import java.util.Random;

public class Receiver extends Thread {

	// **** constants ****
	
	final long MAX_DELAY = (500 * 1);
	
	// **** members ****
	
	Random rand 				= null;
	int	id						= 1;
	LinkedList<String> requestQ = null;
	int	maxRequests				= 0;
	Boolean shutDown			= false;
	
	/**
	 * Constructor
	 */
	public Receiver() {
		this.rand = new Random(MAX_DELAY);
	}
		
	/**
	 * Constructor
	 */
	public Receiver(LinkedList<String> requestQ, int maxRequests) {
		this.rand 			= new Random(MAX_DELAY);
		this.requestQ 		= new LinkedList<String>();
		this.maxRequests 	= maxRequests;
	}

	/**
	 * Delay, create request and insert it into the queue.
	 */
	public void run() {
		String request = "";
		
		// **** loop until done ****
		
		for (int i = 0; i < this.maxRequests; i++) {
			
			// **** delay for a few waiting for the next request to arrive ****
			
			long delay = Math.abs(rand.nextLong()) % MAX_DELAY;
			try {
				Thread.sleep(delay);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}

			// **** generate a new request ****

			request = "REQ_" + Integer.toString(this.id++);
			System.out.println("receive <<< request ==>" + request + "<==");

			// **** enqueue the request ****
			
			requestQ.add(request);
			System.out.println("receive <<< requestQ.size: " + requestQ.size());
		}
		
		// **** ****
		
		System.out.println("receive <<< done !!!");
	}

	/**
	 * Wait for next request from client (sequential method).
	 */
	public String nextRequest() {
		String request = "";
		
		// **** delay until next request arrives ****
		
		long delay = Math.abs(rand.nextLong()) % MAX_DELAY;
		try {
			Thread.sleep(delay);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		
		// **** generate a client request ****

		request = "REQ_" + Integer.toString(this.id++);
		System.out.println("nextRequest <<< request ==>" + request + "<==");

		// **** return new client request ****
		
		return request;
	}
}

The updated code for the Processor class follows:

package canessa.john.threads;

import java.util.LinkedList;
import java.util.Random;

public class Processor implements Runnable {

	// **** constants ****
	
	final long MAX_DELAY = (500 * 3);
	
	// **** members ****
	
	Random rand 				= null;
	LinkedList<String> requestQ	= null;
	int maxRequests				= 0;
	Boolean shutDown			= false;
	
	/**
	 * Constructor
	 */
	public Processor() {
		this.rand = new Random(MAX_DELAY);
	}
	
	/**
	 * Constructor
	 */
	public Processor(LinkedList<String> requestQ, int maxRequests) {
		this.rand 			= new Random(MAX_DELAY);
		this.requestQ 		= requestQ;
		this.maxRequests	= maxRequests;
	}

	/**
	 * 
	 */
	@Override
	public void run() {

		// **** loop processing incoming requests ****
		
		for (int i = 0; (i < this.maxRequests) && !shutDown; ) {
			
			// **** wait for a client request to arrive ****

			while (requestQ.isEmpty() && !shutDown) {
				System.out.println("process <<< requestQ.isEmpty ...");
				try {
					Thread.sleep(200);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
			
			// **** check if we were told to shutdown ****
			
			if (shutDown) {
				System.out.println("process <<< was told to shutdown; bye bye !!!");
				return;
			}

			// **** get the next request from the queue ****
			
			String request = requestQ.remove();
			System.out.println("process <<< request ==>" + request + "<==");
					
			// **** delay processing this request ****
			
			long delay = Math.abs(rand.nextLong()) % MAX_DELAY;
			try {
				Thread.sleep(delay);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			
			// **** display we are done processing this request ****
			
			System.out.println("process <<< done processing request ==>" + request + "<==");
		
			// **** increment the number of requests processed ****
		
			i++;
		}
		
		// **** we are done processing requests ****
		
		System.out.println("process <<< done !!!");
	}

	/**
	 * Process request
	 */
	public void process(String request) {
		
		// **** delay processing request ****
		
		long delay = Math.abs(rand.nextLong()) % MAX_DELAY;
		try {
			Thread.sleep(delay);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		
		// **** display we are done processing this request ****
		
		System.out.println("process <<< done processing request ==>" + request + "<==");
	}
	
	/**
	 * Tell the thread to shutdown.
	 */
	public void shutDown() {
		this.shutDown = true;
		System.out.println("shutDown <<< need to shutdown");
	}
}

We will cover in detail the use of the shutdown Boolean variable and the shutDown() method in the class after we have an opportunity to check out the output from the console.

Following is a screen capture from the console of the Eclipse IDE executing the test code:

main <<< started ...
main <<< waiting for processThread to de done ...
process <<< requestQ.isEmpty ...
process <<< requestQ.isEmpty ...
receive <<< request ==>REQ_1<==
receive <<< requestQ.size: 1
process <<< requestQ.isEmpty ...
receive <<< request ==>REQ_2<==
receive <<< requestQ.size: 2
process <<< requestQ.isEmpty ...
receive <<< request ==>REQ_3<==
receive <<< requestQ.size: 3
process <<< requestQ.isEmpty ...
process <<< requestQ.isEmpty ...
receive <<< request ==>REQ_4<==
receive <<< requestQ.size: 4
receive <<< request ==>REQ_5<==
receive <<< requestQ.size: 5
process <<< requestQ.isEmpty ...
receive <<< request ==>REQ_6<==
receive <<< requestQ.size: 6
receive <<< request ==>REQ_7<==
receive <<< requestQ.size: 7
receive <<< done !!!
process <<< requestQ.isEmpty ...
process <<< requestQ.isEmpty ...
process <<< requestQ.isEmpty ...
process <<< requestQ.isEmpty ...
process <<< requestQ.isEmpty ...
process <<< requestQ.isEmpty ...
process <<< requestQ.isEmpty ...
process <<< requestQ.isEmpty ...
process <<< requestQ.isEmpty ...
process <<< requestQ.isEmpty ...
process <<< requestQ.isEmpty ...
process <<< requestQ.isEmpty ...
process <<< requestQ.isEmpty ...
process <<< requestQ.isEmpty ...
process <<< requestQ.isEmpty ...
process <<< requestQ.isEmpty ...
process <<< requestQ.isEmpty ...
process <<< requestQ.isEmpty ...
process <<< requestQ.isEmpty ...
process <<< requestQ.isEmpty ...
process <<< requestQ.isEmpty ...
process <<< requestQ.isEmpty ...
process <<< requestQ.isEmpty ...
process <<< requestQ.isEmpty ...
process <<< requestQ.isEmpty ...
process <<< requestQ.isEmpty ...
process <<< requestQ.isEmpty ...
process <<< requestQ.isEmpty ...
shutDown <<< need to shutdown
main <<< waiting for processor to shutdown ...
process <<< was told to shutdown; bye bye !!!
main <<< duration: 8004 ms - bye bye !!!

Something is definitely wrong with our program. The receive and process threads are up and running. We can tell that the receive thread is operating because it is creating requests and our code indicates that they are being placed into the request. On the other hand the process thread is telling us that there are no requests pending in the receive queue.

Our design is flawed because we are using the Java language. In Java we create a request queue and a copy of the object is passed to the receiver thread. The receiver thread is inserting elements in a local copy of the request queue. The main program also passed the request queue to the process thread. In reality the process thread also received a copy of the request queue with no requests. This is the reason the process thread does not see client requests and the main thread had to kill. The approach we designed here should work if we would have chosen C/C++/C# as a programming language but not Java.

If you look at the Java documentation, it used to contain (now deprecated) a stop() thread method. The method has been deprecated. The main reason for this is not to pull the rug out from a thread. The thread might be in a state in which resources might be lost or it might create other issues with critical / monitor regions. The way we address the issue to kill / stop the process thread was to set a shutdown flag and have the thread monitor it. I always use this approach on all threads in different operating systems and programming languages to make sure the thread being killed performs a clean exit.

Getting back to the screen capture, given that the wait for the process thread to exit times out, the program sets the shutdown flag in the process thread. The process thread determines that it was asked to shut down and exits. The main thread then completes the program in about eight seconds.

In the next post in this series I will suggest an approach that will address the request queue issue.

If you have comments or questions regarding this or any other post in this blog, please do not hesitate and send me a message. Will reply as soon as possible and will not use your name unless you explicitly allow me to do so.

Regards;

John

john.canessa@gmail.com

Follow me on Twitter:  @john_canessa

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.