Threads – Part IV

In this post we will try to address an issue that was hidden in the previous post Threads – Part III in this sequence. Let’s first review the states of a thread in Java (and for that matter in most languages and operating systems).

In Java a thread may be in one of the following states:

State Description
NEW A thread that has not yet started is in this state.
RUNNABLE A thread executing in the Java virtual machine is in this state.
BLOCKED A thread that is blocked waiting for a monitor lock is in this state.
WAITING A thread that is waiting indefinitely for another thread to perform a particular action is in this state.
TIMED_WAITING A thread that is waiting for another thread to perform an action for up to a specified waiting time is in this state.
TERMIANTED A thread that has exited is in this state.

A feature of threads is their priority. The priority of a thread is used by the scheduler to make it run. The higher the priority the often the scheduler will select it to run. In the storage server most threads run at the next higher priority than normal.  That is done in order to facilitate the storage server to complete tasks as soon as possible when client applications / servers share the same computer with it.

Following is a screen capture from the console of the Eclipse IDE:

main <<< started ...
main <<<    min-norm-max: 1-5-10
main <<< currentPriority: 5
main <<< waiting for processThread to de done ...
ReceiveThread <<< started ...
ProcessThread <<< started ...
ProcessThread <<< prevReq: 203 curReq: 205 exit !!!
main <<< processThread done !!!
main <<< duration: 16 ms - bye bye !!!
ReceiveThread <<< done !!!

It seems that (this should be clear in the actual code) we are tracking the request number in the ProcessThread. At some point something happened and the request sequence was out of order.

The following is another screen capture from the console of the IDE:

main <<< started ...
main <<<    min-norm-max: 1-5-10
main <<< currentPriority: 5
main <<< waiting for processThread to de done ...
ReceiveThread <<< started ...
ProcessThread <<< started ...
ProcessThread <<< EXCEPTION requestQ.remove() prevReq: 495 exit !!!
main <<< processThread done !!!
main <<< duration: 0 ms - bye bye !!!
ReceiveThread <<< done !!!

In this case an exception was thrown when the ProcessThread attempted to remove a request from the queue.

Following is the last screen capture from the console of the IDE that I will show in this post:

main <<< started ...
main <<<    min-norm-max: 1-5-10
main <<< currentPriority: 5
main <<< waiting for processThread to de done ...
ReceiveThread <<< started ...
ProcessThread <<< started ...
ReceiveThread <<< done !!!
shutDown <<< need to shutdown
main <<< waiting for processor to shutdown d: 0 ...
main <<< waiting for processor to shutdown d: 1 ...
main <<< waiting for processor to shutdown d: 2 ...
main <<< waiting for processor to shutdown d: 3 ...
main <<< waiting for processor to shutdown d: 4 ...
main <<< waiting for processor to shutdown d: 5 ...
main <<< waiting for processor to shutdown d: 6 ...
main <<< waiting for processor to shutdown d: 7 ...
main <<< waiting for processor to shutdown d: 8 ...
main <<< waiting for processor to shutdown d: 9 ...
main <<< waiting for processor to shutdown d: 10 ...
main <<< duration: 12130 ms - bye bye !!!

In this case the main thread starts both producer and consumer threads. The receiver thread completes. The process thread is not finishing so the main program sends a shutdown request and waits for the thread to exit. The ProcessThread is in bad shape and is not able to exit. The main program exists after waiting for it to complete.

As you can see from the previous set of screen captures bad things tend to happen when more than one thread accesses a single resource simultaneously. In this case the issue is with the request queue.

Following is the test code written in Java 8:

package canessa.john.threads;

import java.util.LinkedList;

import canessa.john.threads.ReceiveProcess.ThreadType;

public class Solution {

	// **** constants ****
	
	static final int MAX_REQUESTS 	= 1000;		// was: 7, 97, 1000, 10000
	static final int MAX_DELAY		= 11;
	
	/**
	 * Test program implements main thread.
	 */
	public static void main(String[] args) {

		// **** tell user we started the main thread ****
		
		System.out.println("main <<< started ...");

		// **** thread priority info ****
		
		System.out.println("main <<<    min-norm-max: " + Thread.MIN_PRIORITY + "-" + Thread.NORM_PRIORITY + "-" + Thread.MAX_PRIORITY);
		System.out.println("main <<< currentPriority: " + Thread.currentThread().getPriority());
		
		// **** get the starting time ****
		
		long beginTime = System.currentTimeMillis();	

		// **** create a queue to insert and get client requests ****

		LinkedList<String> requestQ = new LinkedList<String>();

		// **** simulate receiving client requests ****
		
		ReceiveProcess receive 	= new ReceiveProcess(requestQ, ThreadType.RECEIVE_TYPE, MAX_REQUESTS);
		Thread receiveThread 	= new Thread(receive);
		receiveThread.start();

		// **** simulate processing client requests ****
		
		ReceiveProcess process 	= new ReceiveProcess(requestQ, ThreadType.PROCESS_TYPE, MAX_REQUESTS);
		Thread processThread 	= new Thread(process);
		processThread.start();
		
		// **** wait for the process thread to be done ****
		
		System.out.println("main <<< waiting for processThread to de done ...");
		try {
			processThread.join(MAX_REQUESTS);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		
		// **** check if the process thread is still alive ****
		
		if (!processThread.isAlive()) {
			System.out.println("main <<< processThread done !!!");			
		} else {
						
			// **** request the process thread to shutdown ****
			
			process.shutDown();	
			
			// **** wait for the process thread to shutdown ****
			
			for (int d = 0; (d < MAX_DELAY) && processThread.isAlive(); d++) {
				System.out.println("main <<< waiting for processor to shutdown d: " + d + " ...");
				try {
					Thread.sleep(1000);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		}
	
		// **** get the end time ****
		
		long endTime = System.currentTimeMillis();
		
		// **** display a final message ****
		
		System.out.println("main <<< duration: " + (endTime - beginTime) + " ms - bye bye !!!");
	}
}

This example does not alter the priorities of the threads. For completion I just displayed some priority information in the main thread.

Following is the code for the ReceiveProcess class:

package canessa.john.threads;

import java.util.LinkedList;
import java.util.Random;

/**
 * 
 */
public class ReceiveProcess implements Runnable {
	
	// **** constants ****
	
	final long THREAD_DELAY = 1;		// was: 1, 2, 3, 1000;

	// **** types of threads ****
	
	public enum ThreadType {
		UNKNOWN_TYPE,
		RECEIVE_TYPE,
		PROCESS_TYPE
	}
	
	// **** members ****
	
	LinkedList<String> requestQ = null;
	ThreadType threadType		= ThreadType.UNKNOWN_TYPE;
	Boolean	shutDown			= false;
	int maxRequests				= 0;
	Random rand 				= null;
	int	id						= 1;
	
	/**
	 * Constructor
	 */
	public ReceiveProcess(LinkedList<String> requestQ, ThreadType threadType) {
		this.requestQ 	= requestQ;
		this.threadType	= threadType;
		this.rand 		= new Random(THREAD_DELAY);
		this.shutDown 	= false;
	}
	
	/**
	 * Constructor
	 */
	public ReceiveProcess(LinkedList<String> requestQ, ThreadType threadType, int maxRequests) {
		this.requestQ 		= requestQ;
		this.threadType		= threadType;
		this.maxRequests	= maxRequests;
		this.rand 			= new Random(THREAD_DELAY);
		this.shutDown		= false;
	}

	/**
	 * Receives client requests and post them into a queue.
	 */
	public void ReceiveThread() {
		String request = "";

		System.out.println("ReceiveThread <<< started ...");

		// **** loop receiving requests ****
		
		for (int i = 0; (i < this.maxRequests) && !shutDown; i++) {

			// **** generate a new request ****

			request = "REQ_" + Integer.toString(this.id++);

			// **** enqueue the request ****
			
			requestQ.add(request);

			// **** delay for a few waiting for the next request to arrive ****
			
			long delay = Math.abs(rand.nextLong()) % THREAD_DELAY;
			try {
				Thread.sleep(delay);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		
		// **** done receiving requests ****
		
		System.out.println("ReceiveThread <<< done !!!");
	}

	/**
	 * Gets requests from the queue and process them.
	 */
	public void ProcessThread() {
		System.out.println("ProcessThread <<< started ...");
		
		int prevReq 	= 0;
		int curReq		= 0;
		String request	= "";
		
		// **** loop processing incoming requests ****
		
		for (int i = 0; (i < this.maxRequests) && !this.shutDown; ) {

			// **** check if we were told to shutdown ****
			
			if (this.shutDown) {
				System.out.println("ProcessThread <<< was told to shutdown; exit !!!");
				return;
			}
		
			// **** check if the request queue is empty ****
			
			if (requestQ.isEmpty()) {
				continue;
			}

			// **** get the next request from the queue ****
			
			try {
				request = requestQ.remove();				
			} catch (Exception e) {
				System.out.println("ProcessThread <<< EXCEPTION requestQ.remove() prevReq: " + prevReq + " exit !!!");
				return;
			}
		
			// **** extract the request number ****
			
			String str = request.substring("REQ_".length());
			curReq = Integer.parseInt(str);
			
			// **** determine if request out of order ****
			
			if ((prevReq + 1) != curReq) {
				System.out.println("ProcessThread <<< prevReq: " + prevReq + " curReq: " + curReq + " exit !!!");
				return;
			}
			
			// **** ****
			
			prevReq = curReq;
			
			// **** increment the number of requests processed ****
		
			i++;
		}
		
		// **** done processing requests ****
		
		System.out.println("ProcessThread <<< prevReq: " + prevReq + " curReq: " + curReq + " done !!!");
	}

	/**
	 * 
	 */
	@Override
	public void run() {
		if (this.threadType == ThreadType.RECEIVE_TYPE) {
			ReceiveThread();
		} else if (this.threadType == ThreadType.PROCESS_TYPE) {
			ProcessThread();
		} else {
			throw new IllegalArgumentException();
		}
	}
	
	/**
	 * Request the thread to shutdown.
	 */
	public void shutDown() {
		this.shutDown = true;
		System.out.println("shutDown <<< need to shutdown");
	}
}

You will note that some modifications have been done in order to increase the chances of the code to fail. This was done by changing the delays and the order in which the request thread generates and posts requests.

What we need to take away from this example is that in many situations a flawed implementation may not exhibit problems easily. It may occasional fail when under stress. These issues are hard to find and costly to resolve. The best approach is to have qualified and experienced developers architecting, designing, testing and implementing the software.

On the next post we will address the issue. Please note that the main program is used to serve as a test. In practice one would use a framework or class to keep tests away from the main program. For that approach one may use the Dependency-Injection Principle (DIP) and JUnit.

If you have comments or questions regarding this or any other post in this blog, please feel free and send me a message. I will reply and not use your name unless you explicitly allow me to do so.

John

john.canessa@gmail.com

Follow me on Twitter:  @john_canessa

Leave a Reply

Your email address will not be published. Required fields are marked *