Threads – Part III

This post continues the series on threads. To take a look at the previous post please click on the following link: Threads – Part II. Just to recap, on the previous post we implemented an approach that did not work with Java. In this post we will address the issue with a design that seems to work well. It still has at least one issue which we will deal with on the next post.

Let’s first take a look at the source code for the test program written in Java 8:

package canessa.john.threads;

import java.util.LinkedList;

import canessa.john.threads.ReceiveProcess.ThreadType;

public class Solution {

	// **** constants ****
	
	static final int MAX_REQUESTS = 7;
	
	/**
	 * Test program implements main thread.
	 */
	public static void main(String[] args) {

		// **** tell user we started the main thread ****
		
		System.out.println("main <<< started ...");
		
		// **** ****
		
		long beginTime = System.currentTimeMillis();	

		// **** create a queue to insert and get client requests ****

		LinkedList<String> requestQ = new LinkedList<String>();

//		// **** simulate receiving client requests ****
//
//		Receiver receiverThread = new Receiver(requestQ, MAX_REQUESTS);
//		receiverThread.start();
//
//		// **** simulate processing client requests ****
//
//		Processor processor 	= new Processor(requestQ, MAX_REQUESTS);
//		Thread processThread 	= new Thread(processor);
//		processThread.start();

		// **** simulate receiving client requests ****
		
		ReceiveProcess receive 	= new ReceiveProcess(requestQ, ThreadType.RECEIVE_TYPE, MAX_REQUESTS);
		Thread receiveThread 	= new Thread(receive);
		receiveThread.start();

		// **** simulate processing client requests ****
		
		ReceiveProcess process 	= new ReceiveProcess(requestQ, ThreadType.PROCESS_TYPE, MAX_REQUESTS);
		Thread processThread 	= new Thread(process);
		processThread.start();

		// **** wait for the process thread to be done ****
		
		System.out.println("main <<< waiting for processThread to de done ...");
		try {
			processThread.join(1500 * MAX_REQUESTS);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		
		// **** check if the process thread is still alive ****
		
		if (!processThread.isAlive()) {
			System.out.println("main <<< processThread done !!!");			
		} else {
						
			// **** request the process thread to shutdown ****
			
			process.shutDown();	
			
			// **** wait for the process thread to shutdown ****
			
			while (processThread.isAlive()) {
				System.out.println("main <<< waiting for processor to shutdown ...");
				try {
					Thread.sleep(1000);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		}
	
		// **** done with the main thread ****
		
		long endTime = System.currentTimeMillis();
		System.out.println("main <<< duration: " + (endTime - beginTime) + " ms - bye bye !!!");
	}
}

The previous (and flawed code) has been commented out. In this scenario we are starting two threads that belong to the ReceiveProcess class. The rest of the code seems to be quite similar to what we had before. After the main process has started both threads it waits for the processThread to be done. If it is not done it requests it to shut down. It waits for the request to complete and the main process exists. In this post we will not encounter the situation in which the main thread needs to kill the processThread. Feel free to modify the counts and delays to make that happen.

The code for the ReceiveProcess class follows:

package canessa.john.threads;

import java.util.LinkedList;
import java.util.Random;

/**
 * 
 */
public class ReceiveProcess implements Runnable {
	
	// **** constants ****
	
	final long THREAD_DELAY = 1000;

	// **** types of threads ****
	
	public enum ThreadType {
		UNKNOWN_TYPE,
		RECEIVE_TYPE,
		PROCESS_TYPE
	}
	
	// **** members ****
	
	LinkedList<String> requestQ = null;
	ThreadType threadType		= ThreadType.UNKNOWN_TYPE;
	Boolean	shutDown			= false;
	int maxRequests				= 0;
	Random rand 				= null;
	int	id						= 1;
	
	/**
	 * Constructor
	 */
	public ReceiveProcess(LinkedList<String> requestQ, ThreadType threadType) {
		this.requestQ 	= requestQ;
		this.threadType	= threadType;
		this.rand 		= new Random(THREAD_DELAY);

	}
	
	/**
	 * Constructor
	 */
	public ReceiveProcess(LinkedList<String> requestQ, ThreadType threadType, int maxRequests) {
		this.requestQ 		= requestQ;
		this.threadType		= threadType;
		this.maxRequests	= maxRequests;
		this.rand 			= new Random(THREAD_DELAY);
	}

	/**
	 * Receives client requests and post them into a queue.
	 */
	public void ReceiveThread() {
		String request = "";

		System.out.println("ReceiveThread <<< started");

		// **** loop receiving requests ****
		
		for (int i = 0; (i < this.maxRequests) && !shutDown; i++) {

			// **** delay for a few waiting for the next request to arrive ****
			
			long delay = Math.abs(rand.nextLong()) % THREAD_DELAY;
			try {
				Thread.sleep(delay);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}

			// **** generate a new request ****

			request = "REQ_" + Integer.toString(this.id++);
			System.out.println("ReceiveThread <<< request ==>" + request + "<==");

			// **** enqueue the request ****
			
			requestQ.add(request);
			System.out.println("ReceiveThread <<< requestQ.size: " + requestQ.size());
		}
		
		// **** done processing requests ****
		
		System.out.println("ReceiveThread <<< done !!!");
	}

	/**
	 * Gets requests from the queue and process them.
	 */
	public void ProcessThread() {
		System.out.println("ProcessThread <<< started");
		
		// **** loop processing incoming requests ****
		
		for (int i = 0; (i < this.maxRequests) && !shutDown; ) {
			
			// **** wait for a client request to arrive ****

			while (requestQ.isEmpty() && !shutDown) {
				try {
					Thread.sleep(100);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
			
			// **** check if we were told to shutdown ****
			
			if (shutDown) {
				System.out.println("ProcessThread <<< was told to shutdown; bye bye !!!");
				return;
			}

			// **** get the next request from the queue ****
			
			String request = requestQ.remove();
			System.out.println("ProcessThread <<< request ==>" + request + "<==");
					
			// **** delay processing this request ****
			
			long delay = Math.abs(rand.nextLong()) % (THREAD_DELAY * 3);
			try {
				Thread.sleep(delay);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			
			// **** display we are done processing this request ****
			
			System.out.println("ProcessThread <<< done processing request ==>" + request + "<==");
		
			// **** increment the number of requests processed ****
		
			i++;
		}
		
		// **** done processing requests ****
		
		System.out.println("ProcessThread <<< done !!!");
	}

	/**
	 * 
	 */
	@Override
	public void run() {
		if (this.threadType == ThreadType.RECEIVE_TYPE) {
			ReceiveThread();
		} else if (this.threadType == ThreadType.PROCESS_TYPE) {
			ProcessThread();
		} else {
			throw new IllegalArgumentException();
		}
	}
	
	/**
	 * Request the thread to shutdown.
	 */
	public void shutDown() {
		this.shutDown = true;
		System.out.println("shutDown <<< need to shutdown");
	}
}

The ReceiveProcess class implements the Runnable interface. The constructors are used to instantiate the threads while specifying the type. One is a receiver and the other is the processor. The actual thread for each type of operation is implemented with the ReceiveThread() and ProcessThread() methods.

Following is a screen capture of the console of the Eclipse IDE showing a run of the test:

main <<< started ...
main <<< waiting for processThread to de done ...
ReceiveThread <<< started
ProcessThread <<< started
ReceiveThread <<< request ==>REQ_1<==
ReceiveThread <<< requestQ.size: 1
ProcessThread <<< request ==>REQ_1<==
ReceiveThread <<< request ==>REQ_2<==
ReceiveThread <<< requestQ.size: 1
ProcessThread <<< done processing request ==>REQ_1<==
ProcessThread <<< request ==>REQ_2<==
ReceiveThread <<< request ==>REQ_3<==
ReceiveThread <<< requestQ.size: 1
ProcessThread <<< done processing request ==>REQ_2<==
ProcessThread <<< request ==>REQ_3<==
ProcessThread <<< done processing request ==>REQ_3<==
ReceiveThread <<< request ==>REQ_4<==
ReceiveThread <<< requestQ.size: 1
ProcessThread <<< request ==>REQ_4<==
ReceiveThread <<< request ==>REQ_5<==
ReceiveThread <<< requestQ.size: 1
ProcessThread <<< done processing request ==>REQ_4<==
ProcessThread <<< request ==>REQ_5<==
ReceiveThread <<< request ==>REQ_6<==
ReceiveThread <<< requestQ.size: 1
ReceiveThread <<< request ==>REQ_7<==
ReceiveThread <<< requestQ.size: 2
ReceiveThread <<< done !!!
ProcessThread <<< done processing request ==>REQ_5<==
ProcessThread <<< request ==>REQ_6<==
ProcessThread <<< done processing request ==>REQ_6<==
ProcessThread <<< request ==>REQ_7<==
ProcessThread <<< done processing request ==>REQ_7<==
ProcessThread <<< done !!!
main <<< processThread done !!!
main <<< duration: 9950 ms - bye bye !!!

The main thread starts the ReceiveThread, the processThread and then patiently waits for the processThread to de done. After the receiveThread and processThread start the receive thread starts generating requests and inserting them into the request queue. The process thread periodically polls the request queue, displays the number of requests, removes the oldest one and processes it. Given that in our example processing takes three times longer than receiving, the number of elements in the queue increase with time.

The process thread finishes processing the last one and exits. At that time the main thread displays a message and exits. All seems well, but it is not. We will perform a different test in the same code and then will see if we can address it.

If you have comments or questions regarding this or any other post in this blog, please do not hesitate and send me a message. Will reply as soon as possible and will not use your name unless you explicitly allow me to do so.

Regards;

John

john.canessa@gmail.com

Follow me on Twitter:  @john_canessa

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.