Producer and Consumer – Part III

This is the third and last part of this sequence. We will swap the mutex (implemented as a Semaphore) with a modifier per method in the Ring Buffer class.

First let’s take a look at the output from the Eclipse IDE console:

main <<< start sequential …

main <<< done sequential

main <<< producer started …

Producer <<< started

main <<< consumer started …

main <<< waiting for threads to be done …

Consumer <<< started

Producer <<< done

Consumer <<< done

main <<< threads are done

I removed displaying the numbers because the count has been increased up to 100,000 values. That provides for more situations in which lack of synchronization would cause an issue.

The Java code for the Producer follows:

import java.util.concurrent.CountDownLatch;

import java.util.concurrent.Semaphore;

public class Producer extends Thread {

// **** members ****

private CountDownLatch     latch;

private Semaphore          mutex;

private RingBuffer         rb;

// **** constructor ****

public Producer (RingBuffer rb, CountDownLatch latch, Semaphore mutex) {

this.latch    = latch;

this.mutex    = mutex;

this.rb       = rb;

}

@Override

public void run() {

// **** for ease of use ****

final int MAX_VAL = 100000;

// **** ****

System.out.println(“Producer <<< started”);

// **** loop putting integers in ascending order ****

for (int val = 1; val <= MAX_VAL; val ++) {

// ****  ****

for (boolean done = false; !done; ) {

//                         // **** get access to the mutex ****

//

//                         try {

//                                mutex.acquire();

//                         } catch (InterruptedException e) {

//                                e.printStackTrace();

//                         }

// **** check if the buffer is not full ****

boolean full = rb.isFull();

if (full) {

//                                // **** release the mutex ****

//

//                                mutex.release();

// **** ****

continue;

} else {

// **** put the value in buffer ****

try {

rb.put(val);

} catch (Exception e1) {}

// **** flag we are done with the loop ****

done = true;

}

}

//                   // **** release the mutex ****

//

//                   mutex.release();

}

// **** ****

latch.countDown();

System.out.println(“Producer <<< done”);

}

}

As you can tell, the use of the mutex has been commented out. The basic structure was kept.

The code for the Consumer thread follows:

import java.util.concurrent.CountDownLatch;

import java.util.concurrent.Semaphore;

public class Consumer extends Thread {

// **** members ****

private CountDownLatch latch;

private Semaphore mutex;

private RingBuffer rb;

// **** constructor ****

public Consumer(RingBuffer rb, CountDownLatch latch, Semaphore mutex) {

this.latch    = latch;

this.mutex    = mutex;

this.rb       = rb;

}

@Override

public void run() {

// **** for ease of use ****

final int MAX_VAL = 100000;

// **** ****

System.out.println(“Consumer <<< started”);

//            System.out.print(“Consumer <<< val: “);

// **** ****

int prevVal = 0;

int val = 0;

while (val < MAX_VAL) {

// **** wait for and get the next value from the buffer ****

for (boolean done = false; !done; ) {

//                         // **** get the mutex ****

//

//                         try {

//                                mutex.acquire();

//                         } catch (InterruptedException e1) {

//                                e1.printStackTrace();

//                         }

// **** check if the buffer is empty ****

boolean empty = rb.isEmpty();

// **** ****

if (empty) {

//                                // **** release the mutex ****

//

//                                mutex.release();

// **** continue ****

continue;

} else {

// **** get the next value from the buffer ****

try {

val = rb.get();

} catch (Exception e) {

e.printStackTrace();

}

// **** flag we are done with the loop ****

done = true;

}

//                         // **** release the mutex ****

//

//                         mutex.release();

}

// **** ****

if (val != prevVal + 1) {

System.out.println(“Consumer <<< out of sequence prevVal: ” + prevVal + ” val: ” + val);

rb.dump();

System.exit(-1);

}

//                   else {

//                         System.out.printf(“%3d “, val);

//                   }

// **** ****

prevVal = val;

}

// **** ****

latch.countDown();

System.out.println(“Consumer <<< done”);

}

}

The code that makes use of the mutex has also been commented. How can the Producer and Consumer with 100,000 values not run into problems? The answer is in the following code for the Ring Buffer class:

public class RingBuffer {

// **** ****

private final int ARRAY_LEN = 4;

// **** members ****

private int[] arr = null;

private int   h;

private int t;

// **** constructor ****

public synchronized RingBuffer() {

this.arr      = new int[ARRAY_LEN];

this.h               = 0;

this.t               = 0;

}

// **** methods ****

public synchronized void put(int val) throws Exception {

// **** check if buffer is full ****

if (isFull()) {

throw new Exception(“put <<< EXCEPTION buffer is full”);

}

// **** increment head ****

h++;

h %= ARRAY_LEN;

// **** store value in array ****

arr[h] = val;

}

public synchronized int get() throws Exception {

// **** check if buffer is empty ****

if (isEmpty()) {

throw new Exception(“get <<< EXCEPTION buffer is empty”);

}

// **** update the tail ****

t++;

t %= ARRAY_LEN;

// **** get the value from the array ****

int val = arr[t];

arr[t] = 0;

// **** return the value ***

return val;

}

public synchronized boolean isEmpty() {

return (t == h);

}

public synchronized boolean isFull() {

if (h == (ARRAY_LEN – 1)) {

if (t == 0) {

return true;

}

} else {

if ((h + 1) == t) {

return true;

}

}

return false;

}

public synchronized void dump() {

System.out.println(“<<<   h: ” + h);

System.out.println(“<<<   t: ” + t);

System.out.print(“<<< arr: “);

for (int i : arr) {

System.out.print(i + ” “);

}

System.out.println();

}

}

The tests appear to be working. In general they do but there is something to note. Take a look at the following code from the Producer thread:

// **** loop putting integers in ascending order ****

for (int val = 1; val <= MAX_VAL; val ++) {

// ****  ****

for (boolean done = false; !done; ) {

// **** check if the buffer is not full ****

boolean full = rb.isFull();

if (full) {

continue;

} else {

// **** put the value in buffer ****

try {

rb.put(val);

} catch (Exception e1) {}

// **** flag we are done with the loop ****

done = true;

}

}

}

The code calls rb.isFull(). The actual method is synchronized. All is well and let’s say that it return false indicating that the buffer is not full. The methods in the class are released. This same code then call rb.put() which also creates a lock. It is possible (not in this example because we only have a single producer) that a separate producer gets access to the object and puts a new element into the ring buffer which may cause the buffer to be full. The invocation of rb.put() would throw an exception. This is something to think about.

Following is the Consumer thread code:

// **** wait for and get the next value from the buffer ****

for (boolean done = false; !done; ) {

// **** check if the buffer is empty ****

boolean empty = rb.isEmpty();

// **** ****

if (empty) {

continue;

} else {

// **** get the next value from the buffer ****

try {

val = rb.get();

} catch (Exception e) {

e.printStackTrace();

}

// **** flag we are done with the loop ****

done = true;

}

}

A similar scenario exists. In a situation where we could have multiple consumers, the code checks if the buffer is empty. The code releases the methods in the Ring Buffer class. A second consumer gets the last element in the buffer. The original thread gets control and the buffer is now empty. This causes an exception.

Synchronization is something that needs to be though out in much detail. Make sure you come up with a pattern that best suits your application and have every developer stick to it.

If you have comments or questions regarding this or any other post in this blog send me a message. I will not use your name unless you explicitly allow me to do so.

John

john.canessa@gmail.com

Follow me on Twitter:  @john_canessa

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.