Producer and Consumer – Part I

This post deals with the implementation of a ring buffer (also called a circular buffer). The implementation shows a synchronization mechanism but it is NOT in use.

The typically case for a ring buffer is to act as a ‘buffer’ (hence its name) between one or more producers and possibly one or more consumers. The idea is to match the impedance between them.

Following is the test code written in Java using the Eclipse IDE:

import java.util.concurrent.CountDownLatch;

import java.util.concurrent.Semaphore;

public class Solution {

public static void main(String[] args) {

// **** create a circular buffer ****

RingBuffer rb = new RingBuffer();

// **** for ease of use ****

final int MAX_VAL = 1000;

// **** ****

System.out.println(“main <<< start sequential …”);

// **** ****

int retVal  = 0;

int prevVal = 0;

for (int val = 1; val <= MAX_VAL; val++) {

// **** put ****

try {

rb.put(val);

} catch (Exception e) {

e.printStackTrace();

}

// **** get ****

try {

retVal = rb.get();

} catch (Exception e) {

e.printStackTrace();

}

// **** ****

if (val != (prevVal + 1)) {

System.out.println(“main <<< out of sequence prevVal: ” + prevVal + ” val: ” + val);

System.exit(-1);

}

// **** ****

prevVal = val;

}

System.out.println(“main <<< done sequential\n”);

// **** ****

CountDownLatch latch = new CountDownLatch(2);

// **** create a new circular buffer ****

rb = new RingBuffer();

// **** create a mutex ****

Semaphore mutex = new Semaphore(1);

// **** start producer thread ****

Producer producer = new Producer(rb, latch, mutex);

producer.start();

System.out.println(“main <<< producer started …”);

// **** start consumer thread ****

Consumer consumer = new Consumer(rb, latch, mutex);

consumer.start();

System.out.println(“main <<< consumer started …”);

// **** wait for both threads to exit ***

System.out.println(“main <<< waiting for threads to be done …”);

try {

latch.await();

} catch (InterruptedException e) {

e.printStackTrace();

}

System.out.println(“main <<< threads are done”);

}

}

There are two parts to the test. In the first part we loop putting a sequential value [1 : 1000] in the ring buffer. Immediately the value is pulled out and a comparison is made to verify that the value pulled equates to the previous one plus one. The values should always be sequential. Following is a console screen capture that illustrates the first part of the test:

main <<< start sequential …

main <<< done sequential

I have run the code a few times and it seems to work well.

In the next section we start two threads. The first thread acts as a Producer and the second thread acts as a Consumer. The producer puts sequential integers into the ring buffer while the consumer gets the integers. If all is well then the integers should be in sequential order. A check is added in the Consumer if such condition is not met.

At this time please ignore all references to the Semaphore which is not being used.

Following is the code for the Producer thread:

import java.util.concurrent.CountDownLatch;

import java.util.concurrent.Semaphore;

public class Producer extends Thread {

// **** members ****

private CountDownLatch     latch;

private Semaphore          mutex;

private RingBuffer         rb;

// **** constructor ****

public Producer (RingBuffer rb, CountDownLatch latch, Semaphore mutex) {

this.latch    = latch;

this.mutex    = mutex;

this.rb       = rb;

}

@Override

public void run() {

// **** for ease of use ****

final int MAX_VAL = 1000;

// **** ****

System.out.println(“Producer <<< started”);

// **** loop putting integers in ascending order ****

for (int val = 1; val <= MAX_VAL; val ++) {

// **** wait for space in buffer ****

while (rb.isFull()) {}

// **** put value in buffer ****

try {

rb.put(val);

} catch (Exception e1) {}

}

// **** ****

latch.countDown();

System.out.println(“Producer <<< done”);

}

}

The code for the Consumer thread follows:

import java.util.concurrent.CountDownLatch;

import java.util.concurrent.Semaphore;

public class Consumer extends Thread {

// **** members ****

private CountDownLatch latch;

private Semaphore mutex;

private RingBuffer rb;

// **** constructor ****

public Consumer(RingBuffer rb, CountDownLatch latch, Semaphore mutex) {

this.latch    = latch;

this.mutex    = mutex;

this.rb       = rb;

}

@Override

public void run() {

// **** for ease of use ****

final int MAX_VAL = 1000;

// **** ****

System.out.println(“Consumer <<< started”);

// **** ****

int prevVal = 0;

int val = 0;

while (val < MAX_VAL) {

// **** wait while buffer is empty ****

while (rb.isEmpty()) {}

// **** get the next value from the buffer ****

try {

val = rb.get();

} catch (Exception e) {

e.printStackTrace();

}

// **** ****

if (val != prevVal + 1) {

System.out.println(“consumer <<< out of sequence prevVal: ” + prevVal + ” val: ” + val);

rb.dump();

System.exit(-1);

}

// **** ****

prevVal = val;

}

// **** ****

latch.countDown();

System.out.println(“Consumer <<< done”);

}

}

Let’s run the test and display the entire contents of the console:

main <<< start sequential …

main <<< done sequential

main <<< producer started …

Producer <<< started

main <<< consumer started …

main <<< waiting for threads to be done …

Consumer <<< started

Consumer <<< out of sequence prevVal: 81 val: 0

<<<   h: 1

<<<   t: 2

<<< arr: 84 85 0 83

With the number of integers we are producing (i.e., 1,000) the error shows most of the times. The error tends to show up at different points in the sequence (i.e., 81, 82).

The reason for the error is that the Consumer and the Producer were accessing the buffer at the same time. The current implementation (disregarding the mutex) works well only when the Consumer and Producer are synchronized. In the case of the threads, the operating system decides when to start and stop executing a thread. A critical operation might be in progress (i.e., updating indices) and the thread is preempted. When it gets control, the state of the buffer might have been changed.

I will stop at this point and let you think about possible solutions to this dilemma (spoil alert: the unused mutex).

If you have comments or questions regarding this or any other post in this blog or have a topic you would like me to explore, please do not hesitate and send me a message. I will not use your name unless you explicitly allow me to do so.

John

john.canessa@gmail.com

Follow me on Twitter:  @john_canessa

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.