Producer and Consumer

Hope you are doing well. I fully understand that not all countries are moving quickly to vaccinate most their population against COVID-19. Hopefully the efforts have already started and will be completed soon. Here in the USA it seems that every state is moving at its own pace. That said, a large amount of people have received at least the first of two jabs. The youngest I know of is of age 21. The world needs to get as much of its population vaccinated as possible to achieve herd immunity. If this is not done the virus will continue to mutate and the threat of COVID-19 might remain for years or decades.

It seems that Israel is leading the vaccination process. As in many other countries, there are some people that do not wish to get vaccinated. The prime minister of Israel has made some clever, and for what I understand legal changes to motivate all their citizens and residents to get a COVID-19 vaccine.

In a nutshell the idea is that the good of the majority needs to prevail. People will not be forced to get a vaccine unless there are medical reasons. Besides that, the population needs to show a vaccine passport to work where there is human contact with others. If the person wishes to attend events and places where they could possibly transmit COVID-19, they need to have a COVID test in the previous 48 hours. The COVID tests used to be free but in the near future people will need to pay for a test. So if a person for without a real medical issue does to get the vaccine but wishes to mix and mingle with others, they will have to pay and get a COVID test every other day. I guess Israel will be one of the countries that will reach heard immunity in the very near future. In my humble opinion, we in the USA need to start thinking about each other and work for a common goal.

Enough of chit-chat and let’s get to the main subject of this post.

Today I am not going to discuss and solve a problem posted in a web site. Instead I will specify my own problems and will implement it in increasingly more efficient ways in a set of passes in the immediate future. Hopefully if you encounter a similar problem you will have a good insight on what needs to be done to make it more efficient.

The following diagram illustrates the problem at hand.

On the producer / server side we are provided a file name as an input. We need to read the file in consecutive blocks, perform some processing on each block and then send over a socket the processed data to the consumer / client. The input file contains a number of objects. We need to process and send the objects to the client. After each object we need to wait for an ACK from the client to let us know that all is well so far. We do not need to be concerned with failures for this problem. In a production environment that is a complete different situation which will require retransmission(s).

On the consumer / client side we perform some initializations. We need to create a number of files with different names. Each file will hold a single object. After we receive a complete file we need to send an ACK to the producer indicating all is well so far. TCP/IP is quite resilient and has built in error detection and correction. That said, something can always go wrong outside the protocol, so the producer needs to be informed that each object was received properly stored by the client so it can flag the object as received (outside of the scope of the problem requirements).

Let’s spend some time looking at the diagram and figuring out if we have all what is needed to with a MVP (Minimum Viable Product) implementation. I will wait…

…seems like you are back. While you were thinking I was also looking at the requirements and needed some additional information / clarification.

Q1. Is there a name for the file requested by the server?
A1. The client should be called with the name of the file
    holding the objects needed by the client.

Q2. How does the client find out the number of objects?
A2. The client will request from the server the number, 
    and sizes of all object in the specified file 
    at initialization.

Q3. How does the client find out the size of each object?
A3. The client will request from the server the number, 
    and sizes of all object in the specified file 
    at initialization.

Q4. Is there a name for each object sent by the server?
A4. The client will request from the server the number, 
    and sizes of all object in the specified file 
    at initialization.

Q5. What type of processing is done by the server on each object?
A5. The server will decrypt each object before sending it to the
    client.

Q6. Is there a recommended block size for data transfers?
A6. The recommended size for data packets is 1024 * 1024 bytes.
    The recomended size of ACK packets is 1024 bytes.
    
Additional comments:

o If a socket issue is encountered by the client or server the respective
implementation should exit. We are not expecting the ends to recover at
this time. Please keep in mind that wit production code that is a must!!!

I came up with six questions which I would like to have answers for before diving into a first implementation to generate a MVP. Note that I always like to develop code using a TDD approach. In this pass we will get to a version of the code which is not a MVP yet, but we are very close to it.

It seems that our team will have to update the diagram in order to take into account the initial exchange of data. The client will request a set of objects by name which we will be provided as input. The server will return the number of objects and associated sizes. We will use 32-bit integers to represent the number of objects followed by 32-bit integers that represent the size of each object.

The client after successfully receiving an object will return an ACK with the object number zero-based followed by the number of bytes received. Of course the number of bytes received should match the number of expected bytes based on the size of the file.

Problems with the sockets will result in the client and / or server to exit.

It seems that we have enough information to generate our MVP and give it a try. The software will be developed using the programming language Java on a Windows 10 computer using the VSCode IDE. We will generate test code that will start the producer and consumer programs. The producer should come up and wait for the request from the consumer. When the consumer is started it should send the request to the producer with the name of the file holding the objects. In the initial implementation there will be no need for decrypting the objects.

We need to develop two different programs. One will be the Producer which will have access to the files containing a set of objects. The other is the Consumer which will request from the Producer all the objects so it can write them to individual files. The Producer and consumer may both run on the same computer, but in general they will be running on separate ones. Today we will test on a single computer. On the next installment we will get to our MVP and will run on separate computers. Our interest is to explore performance and see if we can reduce the time it takes the client to receive all objects embedded in the specified file.

I started by generating a program which would start the Producer and Consumer servers implemented as Jar files. I wrote a few lines and decided that such feature was not important or required; so for the time being, we will start the Producer and the Consumer from the command line. That way when we are ready we can test both programs in the same of different computers by just copying the code and running it from a command prompt.

Before we look at the code, let’s see what happens when we run the Producer and Consumer Jars.

# ***** ****
C:\>cd C:\Users\johnc\workspace8\ProducerAndConsumer\build-jar

# **** running consumer without producer ****
C:\Users\johnc\workspace8\ProducerAndConsumer\build-jar>java -cp ProducerAndConsumer.jar; com.canessa.producerconsumer.Consumer
consumer <<< producer ip [0.0.0.0]:
consumer <<< ip ==>0.0.0.0<==
consumer <<< producer port [51515]:
consumer <<< port: 51515
consumer <<< producer input file [E:\DICOM\005056891b354c11ed076009e73fd878]:
consumer <<< inputFile ==>E:\DICOM\005056891b354c11ed076009e73fd878<==
consumer <<< ex: java.net.ConnectException: Connection refused: connect

In this case I decided to start the Consumer without first starting the producer. Our program prompts for the Producer IP and we specify the localhost (0.0.0.0). The program then requests the port in which the server is listening. We specify the default which as the IP was defined as a constant. The program prompts us with the full path to the file of interest. Once again, we use a constant. The Consumer fails to connect to the Producer because it was not started. This is the expected behavior.

# **** ****
C:\>cd C:\Users\johnc\workspace8\ProducerAndConsumer\build-jar

# **** run Producer Jar ****
C:\Users\johnc\workspace8\ProducerAndConsumer\build-jar>java -cp ProducerAndConsumer.jar; com.canessa.producerconsumer.Producer
producer <<< ip [0.0.0.0]:
producer <<< ip ==>0.0.0.0<==
producer <<< producer port [51515]:
producer <<< port: 51515
producer <<< inputFile [E:\DICOM\005056891b354c11ed076009e73fd878]:
producer <<< inputFile ==>E:\DICOM\005056891b354c11ed076009e73fd878<==
producer <<< version: 1.0.00
producer <<< waiting for client connection...

producer <<< received client connection
producer <<< BEFORE reading request...
producer <<<  AFTER reading request dataLen: 458752
producer <<< fileSize: 426608152
producer <<< duration: 344 ms.
producer <<< bytesSent: 426608152

We start the Producer on our local machine. It prompts for the IP and port to use. Note that at this time we also prompt for the file to serve. This is a convenience until our Consumer is able to send the name of the file over the socket connection. The server displays its version number which is not required at this time. We will discuss versions on a future version of the program. The Producer seems to enter a wait listening for consumer requests. The program will wait for a Consumer which in a future implementation will offer the name of the file of interest. At some point we started the Consumer on the same machine. After answering some questions the Producer takes over and sends the entire file without parsing individual objects. The idea is to make sure the plumbing is operational before implementing the next set of requirements. Note that the Producer reads and sends the contents of the specified file. We are using a 1024 * 2024 data buffer. It seems that the last buffer was somewhat smaller. Our base code seems to be able to handle smaller transfer buffers. This is good since all files will not be of the same size. When done the program displays the size of the file, the time it took to read and send the contents of the entire file, and the number of bytes it sent to the Consumer. It seems that all the bytes in the 426MB file were transferred.

# **** ****
C:\>cd C:\Users\johnc\workspace8\ProducerAndConsumer\build-jar

# **** run Consumer Jar ****
C:\Users\johnc\workspace8\ProducerAndConsumer\build-jar>java -cp ProducerAndConsumer.jar; com.canessa.producerconsumer.Consumer
consumer <<< producer ip [0.0.0.0]:
consumer <<< ip ==>0.0.0.0<==
consumer <<< producer port [51515]:
consumer <<< port: 51515
consumer <<< producer input file [E:\DICOM\005056891b354c11ed076009e73fd878]:
consumer <<< inputFile ==>E:\DICOM\005056891b354c11ed076009e73fd878<==
consumer <<< BEFORE send request...
consumer <<<  AFTER send request
producer <<< duration: 375 ms.
consumer <<< bytesReceived: 426608152

We now see on a separate console what the Consumer did. After entering the name of the requested file (which as we discussed before was not sent to the Producer at this time) the Consumer sent a request which triggered the Producer to send the contents of the requested file and the Consumer to read each packet and write it to a single local file. At this time we are just implementing a copy of a file between the Producer and the Consumer.

When done the Consumer displays the time it took to receive and store the 426MB file.

# **** contents of ProducerAndConsumer.jar ****
C:\Users\johnc\workspace8\ProducerAndConsumer\build-jar>jar -vtf ProducerAndConsumer.jar
     0 Thu Mar 25 15:09:54 CDT 2021 META-INF/
    60 Thu Mar 25 15:09:54 CDT 2021 META-INF/MANIFEST.MF
     0 Thu Mar 25 15:09:54 CDT 2021 build-jar/
     0 Thu Mar 25 15:09:54 CDT 2021 build-jar/com/
     0 Thu Mar 25 15:09:54 CDT 2021 build-jar/com/canessa/
     0 Thu Mar 25 15:09:54 CDT 2021 build-jar/com/canessa/producerconsumer/
  1336 Thu Mar 25 15:09:54 CDT 2021 build-jar/com/canessa/producerconsumer/Consumer.class
   851 Thu Mar 25 15:09:54 CDT 2021 build-jar/com/canessa/producerconsumer/Producer.class
  1605 Thu Mar 25 15:09:54 CDT 2021 build-jar/com/canessa/producerconsumer/ProducerAndConsumer.class
     0 Thu Mar 25 14:50:36 CDT 2021 com/
     0 Thu Mar 25 14:58:20 CDT 2021 com/canessa/
     0 Thu Mar 25 14:55:00 CDT 2021 com/canessa/producerconsumer/
   908 Thu Mar 25 14:56:42 CDT 2021 com/canessa/producerconsumer/Consumer.java
   997 Thu Mar 25 14:58:34 CDT 2021 com/canessa/producerconsumer/Producer.java
  1410 Thu Mar 25 15:09:32 CDT 2021 com/canessa/producerconsumer/ProducerAndConsumer.java
  
# **** start Producer.main with no arguments ****
C:\Users\johnc\workspace8\ProducerAndConsumer\build-jar>java -cp ProducerAndConsumer.jar; com.canessa.producerconsumer.Producer
producer <<< started!!!
version <<< 1.0.00
producer <<< sleeping...
producer <<< sleeping...
producer <<< sleeping...
producer <<< sleeping...
producer <<< sleeping...
producer <<< sleeping...
producer <<< sleeping...
producer <<< shutdown!!!

# **** start Consumer.main with two arguments ****
C:\Users\johnc\workspace8\ProducerAndConsumer\build-jar>java -cp ProducerAndConsumer.jar; com.canessa.producerconsumer.Consumer choco cake
consumer <<< args.length: 2
consumer <<<        args: [choco, cake]
consumer <<< inputFile ==>choco<==
consumer <<< shutting down!!!

As I was developing the code I wanted to make sure things were working as desired. I generated the ProducerAndConsumer.jar, Producer.jar and Consumer.jar files. This was done using the “Jar Builder” plugin for VSCode by Aslam Anver. You can find it in the Extension repository for VSCode. We first display the contents of the ProducerAndConsumer.jar file. As you can see it holds all three executables. I then ran a very early implementation of the Producer. It just looped and exited. I just like to develop code in steps. It seems to get to functional and robust code with minimal extra effort. I then experimented with the Consumer passing it arguments. This was done in preparation for being able to start the Producer and the Consumer from the ProducerAndConsume.jar application. Since such feature is not a requirement, I did some simple experimentation or proof of concept, and left it there.

/**
 * 
 */
public class ProducerAndConsumer {


    // **** constants ****
    public static final String INPUT_FILE  = "E:\\DICOM\\005056891b354c11ed076009e73fd878";
    public static final int PRODUCER_PORT  = 51515;
    public static final String PRODUCER_IP = "0.0.0.0";
    public static final int IO_BUFFER_SIZE = (1024 * 1024);
	
	
    /**
     * Test scaffold.
     * 
     * Starts the producer and consumer programs in this computer.
     * It starts the programs with the specified arguments.
     * 
     * @throws IOException
     */
    public static void main(String[] args) throws IOException {

        // **** display a prompt ****
        System.out.print("main >>> input file [" + INPUT_FILE + "]: ");

        // **** open a buffered reader ****
        BufferedReader br = new BufferedReader(new InputStreamReader(System.in));

        // **** read name of file to be processed by producer ****
        String inputFile = br.readLine().trim();

        // **** close the buffered reader ****
        br.close();

        // **** check if we should use the default input file ****
        if (inputFile.equals("")) {
            inputFile = INPUT_FILE;
        }

        // ???? ????
        System.out.println("main <<< inputFile ==>" + inputFile + "<==");

        
        // **** start producer ****
        Process prod = Runtime.getRuntime().exec("java -cp C:\\Users\\johnc\\workspace8\\ProducerAndConsumer\\ProducerAndConsumer.jar; com.canessa.producerconsumer.Producer");


        // **** start consumer ****
        Process cons = Runtime.getRuntime().exec("java -cp C:\\Users\\johnc\\workspace8\\ProducerAndConsumer\\ProducerAndConsumer.jar; com.canessa.producerconsumer.Consumer");


    }
}

This is the code that contains constants and at this time a couple methods we use to validate the TCP/IP and port numbers entered by the user.

The test scaffold starts the Producer and Consumer. Note that at this time I did not redirect the input and output. I just experimented with each at a time and left it for the future (if needed).

As the code stands at this time, it is totally useless. Please disregard it.

    /**
     * Check if TCP/IP IP is valid.
     */
    public static boolean isValidIPAddress(String ip) {
    
        // **** sanity checks ****
        if (ip == null || ip.length() == 0)
            return false;

        // **** regex for digit from 0 to 255 ****
        String zeroTo255 = "(\\d{1,2}|(0|1)\\d{2}|2[0-4]\\d|25[0-5])";

        // **** regex for a digit from 0 to 255 followed by a dot and repeat 4 times ****
        String regex = zeroTo255 + "\\."
                        + zeroTo255 + "\\."
                        + zeroTo255 + "\\."
                        + zeroTo255;

        // **** compile the regex ****
        Pattern p = Pattern.compile(regex);

        // **** ****
        Matcher m = p.matcher(ip);

        // **** return if the ip is valid or not ****
        return m.matches();
    }

This method is used to parse and validate the IP enter by users.

    /**
     * Check if TCP/IP port is valid.
     * 
     * Ports 0 through 1023 are defined as well-known ports.
     * Registered ports are from 1024 to 49151. 
     * The remainder of the ports from 49152 to 65535 can be 
     * used dynamically by applications.
     */
    public static boolean isValidPortNumber(int port) {
        if (port < 49152)
            return false;
        else 
            return true;
    }

This method verifies that the port number is specified in the proper range. I put in the comments some information as how some ranges are reserved for different application and servers.

/**
 * 
 */
public class Producer {

    // **** class members ****
    public int      port;
    public String   ip;
    public String   inputFile;


    /**
     * Constructor
     */
    public Producer(String ip, int port, String inputFile) {
        this.ip         = ip;
        this.port       = port;
        this.inputFile  = inputFile;
    }

    /**
     * Software version
     */
    public String version() {
        return "1.0.00";
    }
	
	:::: :::: ::::
	
}

This is the code for the Producer. At this point I just started with a base class and then just implemented the code directly in the main method. After the MVP is up and running will refactor so it is portable and extensible.

    /**
     * This is the core code for the producer.
     * 
     * @throws InterruptedException
     * @throws IOException
     */
    public static void main(String[] args) throws InterruptedException, IOException {
        
        // **** initialization ****
        String ip                   = ProducerAndConsumer.PRODUCER_IP;
        int port                    = ProducerAndConsumer.PRODUCER_PORT;
        String inputFile            = ProducerAndConsumer.INPUT_FILE;
        byte[] data                 = new byte[ProducerAndConsumer.IO_BUFFER_SIZE];
        String buffer               = "";

        ServerSocket serverSocket   = null;
        Socket clientSocket         = null;
    
        // **** open a buffered reader ****
        BufferedReader br = new BufferedReader(new InputStreamReader(System.in));

        // **** prompt and get the IP ****
        System.out.print("producer <<< ip [" + ip + "]: ");
        buffer = br.readLine().trim();
        if (!buffer.equals(""))
            ip = buffer;

        // **** check if ip is NOT valied ****
        if (!ProducerAndConsumer.isValidIPAddress(ip)) {
            System.out.println("producer <<< ip ==>" + ip + "<== is NOT valid!!!");
            System.exit(-1);
        }

        // ???? ????
        System.out.println("producer <<< ip ==>" + ip + "<==");

        // **** prompt and get the port ****
        System.out.print("producer <<< producer port [" + port + "]: ");
        buffer = br.readLine().trim();
        if (!buffer.equals(""))
            port = Integer.parseInt(buffer);

        // **** check if port NOT valied ****
        if (!ProducerAndConsumer.isValidPortNumber(port)) {
            System.out.println("producer <<< port: " + port + " is NOT valid!!!");
            System.exit(-1);
        }

        // ???? ????
        System.out.println("producer <<< port: " + port);

        // **** prompt and get the input file name ****
        System.out.print("producer <<< inputFile [" + inputFile + "]: ");
        buffer = br.readLine().trim();
        if (!buffer.equals(""))
            inputFile = buffer;

        // ???? ????
        System.out.println("producer <<< inputFile ==>" + inputFile + "<==");

        // **** close the buffered reader ****
        br.close();

        // **** create a producer ****
        Producer producer = new Producer(ip, port, inputFile);

        // **** get and display producer version ****
        System.out.println("producer <<< version: " + producer.version());

        // **** open server socket and accept client (consumer) connection ****
        try {

            // **** server socket ****
            serverSocket    = new ServerSocket(port);

            // ???? ????
            System.out.println("producer <<< waiting for client connection...");

            // **** accept client (consumer) connection ****
            clientSocket    = serverSocket.accept();

            // ???? ????
            System.out.println("producer <<< received client connection");
        } catch (Exception ex) {
            System.err.println("producer <<< ex: " + ex);
            System.exit(-1);
        }

        // **** start timer ****
        long startTime = System.currentTimeMillis();

        // **** to read from client (consumer) socket ****
        DataInputStream dis = new DataInputStream(new BufferedInputStream(clientSocket.getInputStream()));

        // **** to write to client (consumer) socket ****
        DataOutputStream dos = new DataOutputStream(new BufferedOutputStream(clientSocket.getOutputStream()));

        // ???? ????
        System.out.println("producer <<< BEFORE reading request...");

        // **** read consumer (client) request ****
        int dataLen = dis.read(data, 0, data.length);

        // ???? ????
        System.out.println("producer <<<  AFTER reading request dataLen: " + dataLen);

        // **** check if input file does NOT exist ****
        File f = new File(inputFile);
        if (!f.exists() || f.isDirectory()) { 
            System.err.println("producer <<< inputFile ==>" + inputFile + "<== NOT available!!!");
            System.exit(-1);
        }

        // **** get input file size ****
        Path path = Paths.get(inputFile);
        long fileSize = Files.size(path);
        if (fileSize <= 0) {
            System.err.println("producer <<< unexpected fileSize: " + fileSize);
            System.exit(-1);
        }

        // ???? ????
        System.out.println("producer <<< fileSize: " + fileSize);

        // **** open input file ****
        InputStream inStream = new FileInputStream(inputFile);

        // **** loop reading and sending data to consumer (client) ****
        int bytesToRead     = 0;
        long bytesSent      = 0;
        while (bytesSent < fileSize) {

            // **** determine number of bytes to read ****
            if (fileSize - bytesSent > ProducerAndConsumer.IO_BUFFER_SIZE)
                bytesToRead = ProducerAndConsumer.IO_BUFFER_SIZE;
            else
                bytesToRead = (int)(fileSize - bytesSent);

            // **** read data from the local file ****
            int len = inStream.read(data, 0, bytesToRead);

            // **** send data to consumer (client) ****
            dos.write(data, 0, len);

            // **** update the number of bytes sent to consumer (client) ****
            bytesSent += len;

            // ???? ????
            // System.out.println("producer <<< bytesSent: " + bytesSent);
        }

        // **** end timer and compute duration ****
        long endTime    = System.currentTimeMillis();
        long duration   = (endTime - startTime);

        // ???? ????
        System.out.println("producer <<< duration: " + duration + " ms.");

        // ???? ????
        System.out.println("producer <<< bytesSent: " + bytesSent);

        // **** close input stream ****
        inStream.close();

        // **** close the data input stream ****
        dis.close();

        // **** close the data output stream ****
        dos.close();

        // **** close client socket ****
        clientSocket.close();

        // **** close server socket ****
        serverSocket.close();
    }

This is the Producer. We start by assigning some default values to a set of variables. Note that at this time we assign the name of the input file. That name should come from the Consumer when a request is made.

We then ask the user for a set of values to override the defaults. These are useful for testing. When we are all said and done we should only leave the necessary arguments.

We then create a producer object. We use it to get the version of the code. Note that at this point we are not using the version at all. The version can be used to support previous versions when adding new features that need code that would not be available on such versions. That way our consumer and producer indicate so to the user via a message or an entry in a log file.

The Producer needs to create a server socket in order to accept client (consumer) requests. After the socket is created we block waiting for a Consumer request.

As soon as a request is received we start a timer. We are interested in determining how long it takes to complete a request. In particular we will run multiple requests for the same file in order to obtain the average time it takes to process the given request.

We then read the contents of the request packet. We display the length of the packet but do not take action yet on its contents. When we complete our MVP we will be getting in this packet the name of the file we need to process.

Assuming that we received the name of the input file, we need to check if the file is not available. In this simple case we display a message and exit. In practice, the Producer should release any resource it has used and go back to listen for the next request.

We then get the size of the input file. At this point we are going to transfer the complete file. In the actual program we will need to parse the input file and get the names, offsets and lengths of the individual encrypted objects. We need such information to read the objects, decrypt and then send them to the Consumer.

We then open the input file.

A loop is used to read and send the contents of the file. In the final MVP we will have additional instructions to get to the proper offset in the input file and then read the data, decrypt and send to the Consumer.

At this point we just read a chunk of no more than 1024 * 1024 bytes and send it to the Consumer. Note that we need to determine the actual number of bytes to read because chances are (about 1 in a million) that the last packet will be equal to 1024 * 1024 bytes.

As soon as we are done sending the data we compute the duration and display it.

We then proceed to close / release all the resources we have used. Keep in mind that as soon as the program exists all resources would be freed, but we need to keep in mind that the program may fail at different times and we should get back to the point of listing for the next request.

/**
 * 
 */
public class Consumer {


    /**
     * Core code for the consumer.
     * @throws IOException
     */
    public static void main(String[] args) throws IOException {
        
        // **** initialization ****
        String ip           = ProducerAndConsumer.PRODUCER_IP;
        int port            = ProducerAndConsumer.PRODUCER_PORT;
        String inputFile    = ProducerAndConsumer.INPUT_FILE;
        byte[] data         = new byte[ProducerAndConsumer.IO_BUFFER_SIZE];
        String buffer       = "";

        Socket clientSocket = null;

        // **** open a buffered reader ****
        BufferedReader br = new BufferedReader(new InputStreamReader(System.in));

        // **** prompt and get the IP ****
        System.out.print("consumer <<< producer ip [" + ip + "]: ");
        buffer = br.readLine().trim();
        if (!buffer.equals(""))
            ip = buffer;

        // **** check if ip is NOT valied ****
        if (!ProducerAndConsumer.isValidIPAddress(ip)) {
            System.out.println("consumer <<< ip ==>" + ip + "<== is NOT valid!!!");
            System.exit(-1);
        }

        // ???? ????
        System.out.println("consumer <<< ip ==>" + ip + "<==");

        // **** prompt and get the port ****
        System.out.print("consumer <<< producer port [" + port + "]: ");
        buffer = br.readLine().trim();
        if (!buffer.equals(""))
            port = Integer.parseInt(buffer);

        // **** check if port NOT valied ****
        if (!ProducerAndConsumer.isValidPortNumber(port)) {
            System.out.println("consumer <<< port: " + port + " is NOT valid!!!");
            System.exit(-1);
        }

        // ???? ????
        System.out.println("consumer <<< port: " + port);

        // **** prompt and get the input file name ****
        System.out.print("consumer <<< producer input file [" + inputFile + "]: ");
        buffer = br.readLine().trim();
        if (!buffer.equals(""))
            inputFile = buffer;

        // ???? ????
        System.out.println("consumer <<< inputFile ==>" + inputFile + "<==");

        // **** close the buffered reader ****
        br.close();

        // **** start timer ****
        long startTime = System.currentTimeMillis();

        // **** open client socket ****
        try {
            clientSocket = new Socket(ip, port);
        } catch (Exception ex) {
            System.err.println("consumer <<< ex: " + ex.toString());
            System.exit(-1);
        }

        // **** to read data from server (producer) socket ****
        DataInputStream dis = new DataInputStream(new BufferedInputStream(clientSocket.getInputStream()));

        // **** to write data to server (producer) socket ****
        DataOutputStream dos = new DataOutputStream(new BufferedOutputStream(clientSocket.getOutputStream()));

        // ???? ????
        System.out.println("consumer <<< BEFORE send request...");

        // **** send request to producer (input file name) ****
        try {
            Arrays.fill(data, (byte)0);
            System.arraycopy(inputFile.getBytes(), 0, data, 0, inputFile.length());
            dos.write(data, 0, ProducerAndConsumer.IO_BUFFER_SIZE);
        } catch (Exception ex) {
            System.err.println("consumer <<< ex: " + ex.toString());
            System.exit(-1);
        }

        // ???? ????
        System.out.println("consumer <<<  AFTER send request");

        // **** open local file to write data ****
        String outputFile       = "c:\\temp\\_received_file";
        OutputStream outStream  = new FileOutputStream(outputFile);
    
        // **** loop receiving data from producer (server) ****
        boolean done        = false;
        long bytesReceived  = 0;
        while (!done) {

            // **** ****
            int len = dis.read(data, 0, ProducerAndConsumer.IO_BUFFER_SIZE);

            // **** check if we are done receiving data from the producer (server) ****
            if (len == -1) {
                done = true;
                continue;
            }

            // **** count these bytes ****
            bytesReceived += (long)len;

            // ???? ????
            // System.out.println("consumer <<< bytesReceived: " + bytesReceived);

            // **** write data to the output file ****
            outStream.write(data, 0, len);
        }

        // **** end timer and compute duration ****
        long endTime    = System.currentTimeMillis();
        long duration   = (endTime - startTime);

        // ???? ????
        System.out.println("producer <<< duration: " + duration + " ms.");

        // ???? ????
        System.out.println("consumer <<< bytesReceived: " + bytesReceived);

        // **** close local file ****
        outStream.close();

        // **** close the data input stream ****
        dis.close();

        // **** close the data output stream ****
        dos.close();

        // **** close client socket ****
        clientSocket.close();
    }
}

The Consumer is implemented in a similar but reversed fashion as the Producer. We define a set of variables and assign them default values. Note that we could have also used arguments to the Consumer via the command prompt.

We then prompt for possible changes to some of the arguments. This is quite similar to what we did in the Producer side.

When we are ready with the variables, we open a client socket that will be used to send the request to the Producer and then receive the data sent by the Producer.

We then prepare and send the request to the Producer. It seems that the name of the file is sent. You can verify this is the case on the Producer by displaying the contents of the request packet. As we saw on the Procure code, we are not using the contents of the request packet yet. We are using the values of a constant.

Also note that the request packet is sent in a 1024 * 1024 packet. We should be using something smaller i.e., 1024 at most.

We now enter a loop in which we receive a packet from the Producer and we write it to the file what we opened before entering the loop.

As we mentioned, we will receive the entire file at this time. When our software is completed we will be receiving a set of objects which we need to save in individual files.

We also timed out the execution, so when we exit the loop we need to compute the duration.

Note that the number of bytes sent by the Producer, which matches the entire size of the input file, has been received by the Consumer.

We then close / release the resources we used. In a client chances are that there is no need to explicitly close resources because the program should end and exit. This may or may not be true because the client (Consumer) might also be a Producer to a third party that provides the input file names. The new files files might then be used and then deleted based on creation time. The point I am trying to get across is that no matter how well you might think the requirements are specified, you, the principal software engineer, or system architect need to make sure how the software will and may be used in the context of the system.

Hope you enjoyed solving this problem as much as I did. The entire code for this project can be found in my GitHub repository.

If you have comments or questions regarding this, or any other post in this blog, please do not hesitate and leave me a note below. I will reply as soon as possible.

Keep on reading and experimenting. It is one of the best ways to learn, become proficient, refresh your knowledge and enhance your developer toolset.

One last thing, many thanks to all subscribers to this blog!!!

Regards;

John

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.