Producer and Consumer – Part 3

It is Friday once again. The cleaning crew is at home. I have been listening to the vacuum for a few hours. They should be done shortly.

I have to get a COVID-19 test this afternoon. On Monday I will be having a procedure and the clinic requires a COVID test in the past 72 hours no matter if you have already received the complete doze of a vaccine. I was going to get this procedure done a couple years ago, but with some travelling plans and then COVID did not have an opportunity to get it done.

Today I suggested to my wife if we could have Asian from Trader Joe’s for lunch. She suggested noodles with an Italian sauce we fixed last week. We were able to fill five one-pint containers and put them in the outside freezer. The sauce turned out very good. We made it last Saturday when my wife’s brother and wife had lunch with us. We used a few pounds of chuck and Italian pureed tomatoes. Hopefully the cleaning crew will leave soon so we can have an early lunch.

On this post I will continue working on the Producer and Consumer project. If interested please take a look at the previous posts Producer and Consumer and Producer and Consumer – Part 2.

In this pass we continue to work towards our MVP version. Last time we left the system in a state in which the names, offsets and length of the encrypted objects were properly obtained by the Producer and sent and received by the Consumer.

In this pass we will make sure we can transfer the objects from the Producer and receive them in the Consumer. We will be able to check names and sizes but will not be able to access the objects since they will be received encrypted. Decrypting the objects will be the last task we need to complete to get our MVP ready.

You can read about the requirements in the previous posts. In this pass we will check the code that is used to implement the Producer and Consumer. We will not look at all the code in the project because some things have not been updated. I believe that the code shown in this post has been updated since the last pass. If you wish to get the entire code and give it a run, you will need an input file. I am not able to provide the files I have been using for testing. In the next and last post of this series I will provide a test file and if I have time I will write a test method to generate encrypted test files.

    /**
     * This is the main code for the Producer.
     * 
     * @throws InterruptedException
     * @throws IOException
     */
    public static void main(String[] args) throws InterruptedException, IOException {
        
        // **** initialization ****
        String ip                   = ProducerAndConsumer.PRODUCER_IP;
        int port                    = ProducerAndConsumer.PRODUCER_PORT;
        byte[] data                 = new byte[ProducerAndConsumer.IO_BUFFER_SIZE];
        String buffer               = "";

        ServerSocket serverSocket   = null;
        Socket clientSocket         = null;
    
        // **** open a buffered reader ****
        BufferedReader br = new BufferedReader(new InputStreamReader(System.in));

        // **** prompt and get the IP ****
        System.out.print("producer <<< ip [" + ip + "]: ");
        buffer = br.readLine().trim();
        if (!buffer.equals(""))
            ip = buffer;

        // **** check if ip is NOT valied ****
        if (!ProducerAndConsumer.isValidIPAddress(ip)) {
            System.out.println("producer <<< ip ==>" + ip + "<== is NOT valid!!!");
            System.exit(-1);
        }

        // ???? ????
        System.out.println("producer <<< ip ==>" + ip + "<==");

        // **** prompt and get the port ****
        System.out.print("producer <<< producer port [" + port + "]: ");
        buffer = br.readLine().trim();
        if (!buffer.equals(""))
            port = Integer.parseInt(buffer);

        // **** check if port NOT valied ****
        if (!ProducerAndConsumer.isValidPortNumber(port)) {
            System.out.println("producer <<< port: " + port + " is NOT valid!!!");
            System.exit(-1);
        }

        // ???? ????
        System.out.println("producer <<< port: " + port);

        // **** close the buffered reader ****
        br.close();

        // **** create a producer ****
        Producer producer = new Producer(ip, port);

        // **** get and display producer version ****
        System.out.println("producer <<< version: " + producer.version());

        // **** open server socket and accept client (consumer) connection ****
        try {

            // **** server socket ****
            serverSocket    = new ServerSocket(port);

            // ???? ????
            System.out.println("producer <<< waiting for client connection...");

            // **** accept client (consumer) connection ****
            clientSocket    = serverSocket.accept();

            // ???? ????
            System.out.println("producer <<< received client connection");
        } catch (Exception ex) {
            System.err.println("producer <<< ex: " + ex);
            System.exit(-1);
        }

        // **** start timer ****
        long startTime = System.currentTimeMillis();

        // **** to read from client (consumer) socket ****
        DataInputStream dis = new DataInputStream(new BufferedInputStream(clientSocket.getInputStream()));

        // **** to write to client (consumer) socket ****
        DataOutputStream dos = new DataOutputStream(new BufferedOutputStream(clientSocket.getOutputStream()));

        // ???? ????
        System.out.println("producer <<< BEFORE reading request...");

        // **** read consumer (client) request ****
        int dataLen = dis.read(data, 0, data.length);

        // ???? ????
        System.out.println("producer <<<  AFTER reading request dataLen: " + dataLen);

        // **** get the input file name ****
        String inputFile = new String(data, "UTF-8").trim(); 

        // ???? ????
        System.out.println("producer <<< inputFile ==>" + inputFile + "<==");

        // **** check if input file does NOT exist ****
        File f = new File(inputFile);
        if (!f.exists() || f.isDirectory()) { 
            System.err.println("producer <<< inputFile ==>" + inputFile + "<== NOT available!!!");
            System.exit(-1);
        }

        // **** get input file size ****
        Path path = Paths.get(inputFile);
        long fileSize = Files.size(path);
        if (fileSize <= 0) {
            System.err.println("producer <<< unexpected fileSize: " + fileSize);
            System.exit(-1);
        }

        // ???? ????
        System.out.println("producer <<< fileSize: " + fileSize);

        // **** open input file ****
        InputStream inStream = new FileInputStream(inputFile);

        // **** [1] extract information for each object that will follow ****
        MergeDirEntry[] arr =  producer.readMergedFileInfo(inStream);

        // **** close input stream ****
        inStream.close();

        // **** open input stream ****
        inStream = new FileInputStream(inputFile);

        // **** send to Consumer information for each object that will follow ****
        producer.sendObjectList(dos, arr);

        // **** for ease of use ****
        int entryCount = arr.length;

        // ???? ????
        System.out.println("producer <<< entryCount: " + entryCount);

        // **** [2] set the file position ****
        long filePos = 0;

        // **** loop sending file objects to Consumer ****
        for (int i = 0; i < entryCount; i++) {

            // **** for ease of use ****
            String guid = arr[i].guid;
            long length = arr[i].length;

            // ???? ????
            System.out.println("producer <<< length: " + length);

            // **** number of bytes to skip ****
            long skip = 0;
            if (i == 0) {
                skip = arr[i].offset;
            } else {
                skip = arr[i].offset - (arr[i - 1].offset + arr[i - 1].length);
            }

            // **** get to the specified offset ****
            long skipped = inStream.skip(skip);

            // ???? ????
            System.out.println("producer <<< skip: " + skip + " skipped: " + skipped);

            // **** loop reading and sending data to the Consumer (client) ****
            int bytesToRead     = 0;
            long bytesSent      = 0;
            while (bytesSent < length) {

                // **** determine number of bytes to read ****
                if (length - bytesSent >= ProducerAndConsumer.IO_BUFFER_SIZE)
                    bytesToRead = ProducerAndConsumer.IO_BUFFER_SIZE;
                else
                    bytesToRead = (int)(length - bytesSent);

                // ???? ????
                // System.out.println("producer <<< bytesToRead: " + bytesToRead);

                // **** read data from the local file ****
                int len = inStream.read(data, 0, bytesToRead);

                // ???? ????
                // System.out.println("producer <<<         len: " + len);

                // **** send data to consumer (client) ****
                dos.write(data, 0, len);

                // **** update the number of bytes sent to consumer (client) ****
                bytesSent += len;

                // ???? ????
                // System.out.println("producer <<<   bytesSent: " + bytesSent);
            }

            // ???? ????
            System.out.println( "producer <<< guid ==>" + guid + "<== bytesSent: " + bytesSent + " length: " + length);

            // **** update the file position ****
            filePos += (skip + bytesSent);

            // ???? ????
            System.out.println("producer <<< filePos: " + filePos);
        }

        // ???? ????
        System.out.println("producer <<< diff: " + (fileSize - filePos));

        // **** [3] end timer and compute duration ****
        long endTime    = System.currentTimeMillis();
        long duration   = (endTime - startTime);

        // ???? ????
        System.out.println("producer <<< duration: " + duration + " ms.");

        // **** close input stream ****
        inStream.close();

        // **** close the data input stream ****
        dis.close();

        // **** close the data output stream ****
        dos.close();

        // **** close client socket ****
        clientSocket.close();

        // **** close server socket ****
        serverSocket.close();
    }

We start by looking at the Producer main code. Not much has changed in the initialization or the reception of a request by the Consumer. I flagged with a number in braces in comments when the particular code or method has experienced changes.

The first set of changes was some cleaning in the method that collects information about the objects. We will go over this method shortly.

The next change was made in the main loop which is used to send the contents of each object to the Consumer. The filePos variable is used to verify that we are sending all the data for each object.

In the loop we determine the number of bytes we need to skip in order to get to the start of an object.

After skipping to the start of the encrypted object, we read data from the input file and send it to the Consumer.

After all the data has been sent, we update the filePos variable to take into account that we send an object.

When all objects are sent, we determine the amount of data that remains unread in the input file. This is used to make sure that 1536 bytes are left which accounts for administrative data that is of no concern to use for this software.

We also compute and display the time it took to process the input file. The time is expressed in milliseconds.

    /**
     * Read from the merged file information regarding the embeded objects.
     * 
     * @throws IOException
     */
    public MergeDirEntry[] readMergedFileInfo(InputStream inStream) throws IOException {

        // **** initialization ****
        byte[] bytes = new byte[64];

        // **** [1] skip signature (8 bytes) ****
        long skip = 8;
        long skipped = inStream.skip(skip);

        // ???? ????
        System.out.println("readMergedFileInfo <<< skip: " + skip + " skipped: " + skipped);

        // **** read bitfile count ****
        int len = inStream.read(bytes, 0, 4);

        // **** convert bytes to integer ****
        int entryCount =    ((bytes[3] & 0xff) << 24) |
                            ((bytes[2] & 0xff) << 16) |
                            ((bytes[1] & 0xff) << 8) |
                             (bytes[0] & 0xff);

        // ???? ????
        System.out.println("readMergedFileInfo <<< entryCount: " + entryCount);

        // **** allocate array ****
        MergeDirEntry[] arr = new MergeDirEntry[entryCount];

        // **** loop reading info populating the array ****
        for (int i = 0; i < entryCount; i++) {

            // **** read GUID ****
            len = inStream.read(bytes, 0, 40);

            // **** get the GUID ****
            String guid = new String(bytes, "UTF-8").trim(); 

            // ???? ????
            System.out.println("readMergedFileInfo <<< len: " + len + " guid ==>" + guid + "<==");

            // **** read offset ****
            len = inStream.read(bytes, 0, 8);

            // **** convert buffer to long ****
            long offset =   ((bytes[7] & 0xff) << 56) |
                            ((bytes[6] & 0xff) << 48) |
                            ((bytes[5] & 0xff) << 40) |
                            ((bytes[4] & 0xff) << 32) |
                            ((bytes[3] & 0xff) << 24) |
                            ((bytes[2] & 0xff) << 16) |
                            ((bytes[1] & 0xff) << 8) |
                             (bytes[0] & 0xff);

            // **** [2] take into account the CAS_MERGE_BITFILE data structure ****
            offset += 512;

            // ???? ????
            System.out.println("readMergedFileInfo <<< len: " + len + " offset: " + offset);

            // **** read length ****
            len = inStream.read(bytes, 0, 8);

            // **** convert buffer to long ****
            long length =   ((bytes[7] & 0xff) << 56) |
                            ((bytes[6] & 0xff) << 48) |
                            ((bytes[5] & 0xff) << 40) |
                            ((bytes[4] & 0xff) << 32) |
                            ((bytes[3] & 0xff) << 24) |
                            ((bytes[2] & 0xff) << 16) |
                            ((bytes[1] & 0xff) << 8) |
                             (bytes[0] & 0xff);


            // ???? ????
            System.out.println("readMergedFileInfo <<< len: " + len + " length: " + length);

            // **** ****
            arr[i] = new MergeDirEntry(guid, offset, length);

            // ???? ????
            System.out.println("readMergedFileInfo <<< arr[" + i + "]: " + arr[i].toString());
        }

        // **** return the array ****
        return arr;
    }

This method was slightly modified to provide the number of bytes to skip in a variable. Perhaps in the future we might want to use an argument to the method to specify the number of bytes to skip.

We also added a few bytes to the offset of the data in order to skip a data structure which is not relevant for our purpose. If this would be production code we would read and act on the contents of the data structures we are skipping.

    /**
     * Send to Consumer a list of object names, offsets and lengths.
     * 
     * @throws IOException
     */
    public void sendObjectList(DataOutputStream dos, MergeDirEntry[] arr) throws IOException {

        // ***** initialization ****
        int entryCount  = arr.length;
        byte[] data     = new byte[ProducerAndConsumer.IO_BUFFER_SIZE];

        // ???? ????
        System.out.println("sendObjectList <<< entryCount: " + entryCount);

        // **** ****
        data[0] = (byte)(entryCount & 0xff);
        data[1] = (byte)((entryCount << 8) & 0xff);
        data[2] = (byte)((entryCount << 16) & 0xff);
        data[3] = (byte)((entryCount << 24) & 0xff);

        // **** send the number of merge dir entries to the Consumer ****
        dos.write(data, 0, 4);

        // **** loop sending the merge dir entries to the Consumer ****
        for (int i = 0; i < entryCount; i++) {

            // **** ****
            ByteArrayOutputStream bos   = new ByteArrayOutputStream();
            ObjectOutputStream oos      = new ObjectOutputStream(bos);

            // **** ****
            oos.writeObject(arr[i]);
            oos.flush();
            data = bos.toByteArray();

            // ???? ????
            System.out.println("sendObjectList <<< data.length: " + data.length);
            
            // **** send the entry to the Consumer ****
            dos.write(data, 0, data.length);
        }
    }

As I am looking to the code for the sendObjectList() method, I do not believe we made changes from the previous post. I decided to show the code since it is directly invoked by the Producer.

    /**
     * This is the main code for the Consumer.
     * @throws IOException
     * @throws ClassNotFoundException
     */
    public static void main(String[] args) throws IOException, ClassNotFoundException {
        
        // **** initialization ****
        String ip           = ProducerAndConsumer.PRODUCER_IP;
        int port            = ProducerAndConsumer.PRODUCER_PORT;
        String inputFile    = ProducerAndConsumer.INPUT_FILE;
        byte[] data         = new byte[ProducerAndConsumer.IO_BUFFER_SIZE];
        String buffer       = "";

        Socket clientSocket = null;

        // **** open a buffered reader ****
        BufferedReader br = new BufferedReader(new InputStreamReader(System.in));

        // **** prompt and get the IP ****
        System.out.print("consumer <<< producer ip [" + ip + "]: ");
        buffer = br.readLine().trim();
        if (!buffer.equals(""))
            ip = buffer;

        // **** check if ip is NOT valied ****
        if (!ProducerAndConsumer.isValidIPAddress(ip)) {
            System.out.println("consumer <<< ip ==>" + ip + "<== is NOT valid!!!");
            System.exit(-1);
        }

        // ???? ????
        System.out.println("consumer <<< ip ==>" + ip + "<==");

        // **** prompt and get the port ****
        System.out.print("consumer <<< producer port [" + port + "]: ");
        buffer = br.readLine().trim();
        if (!buffer.equals(""))
            port = Integer.parseInt(buffer);

        // **** check if port NOT valied ****
        if (!ProducerAndConsumer.isValidPortNumber(port)) {
            System.out.println("consumer <<< port: " + port + " is NOT valid!!!");
            System.exit(-1);
        }

        // ???? ????
        System.out.println("consumer <<< port: " + port);

        // **** prompt and get the input file name ****
        System.out.print("consumer <<< producer input file [" + inputFile + "]: ");
        buffer = br.readLine().trim();
        if (!buffer.equals(""))
            inputFile = buffer;

        // ???? ????
        System.out.println("consumer <<< inputFile ==>" + inputFile + "<==");

        // **** close the buffered reader ****
        br.close();

        // **** start timer ****
        long startTime = System.currentTimeMillis();

        // **** open client socket ****
        try {
            clientSocket = new Socket(ip, port);
        } catch (Exception ex) {
            System.err.println("consumer <<< ex: " + ex.toString());
            System.exit(-1);
        }

        // **** to read data from server (producer) socket ****
        DataInputStream dis = new DataInputStream(new BufferedInputStream(clientSocket.getInputStream()));

        // **** to write data to server (producer) socket ****
        DataOutputStream dos = new DataOutputStream(new BufferedOutputStream(clientSocket.getOutputStream()));

        // ???? ????
        System.out.println("consumer <<< BEFORE send request...");

        // **** send request to producer (input file name) ****
        try {
            Arrays.fill(data, (byte)0);
            System.arraycopy(inputFile.getBytes(), 0, data, 0, inputFile.length());

            // ???? ????
            String str = new String(data, "UTF-8").trim(); 
            System.out.println("consumer <<< str ==>" + str + "<==");

            dos.write(data, 0, ProducerAndConsumer.IO_BUFFER_SIZE);
        } catch (Exception ex) {
            System.err.println("consumer <<< ex: " + ex.toString());
            System.exit(-1);
        }

        // ???? ????
        System.out.println("consumer <<<  AFTER send request");

        // **** create consumer ****
        Consumer consumer = new Consumer();

        // **** receive information for each object that will follow ****
        MergeDirEntry arr[] = consumer.receiveObjectList(dis);

        // ???? ????
        System.out.println("consumer <<< arr.length: " + arr.length);
        for (int i = 0; i < arr.length; i++)
            System.out.println("consumer <<< arr[" + i + "]: " + arr[i].toString());

        // **** for ease of use ****
        int entryCount = arr.length;

        // ???? ????
        System.out.println("consumer <<< entryCount: " + entryCount);

        // **** loop once per file object to receive ****
        for (int i = 0; i < entryCount; i++) {

            // **** [1] for ease of use ****
            String guid = arr[i].guid;
            long length = arr[i].length;

            // **** name for file object ****
            String outputFile = "c:\\temp\\Folder2\\" + guid;

            // ???? ????
            System.out.println("consumer <<< outputFile ==>" + outputFile + "<==");

            // **** open local file to write data ****
            OutputStream outStream = new FileOutputStream(outputFile);
        
            // **** loop receiving data from Producer (server) ****
            int bytesToReceive  = 0;
            long bytesReceived  = 0;
            while (bytesReceived < length) {

                // **** [2] determine number of bytes to receive ****
                if (length - bytesReceived >= ProducerAndConsumer.IO_BUFFER_SIZE)
                    bytesToReceive = ProducerAndConsumer.IO_BUFFER_SIZE;
                else
                    bytesToReceive = (int)(length - bytesReceived);

                // **** ****
                int len = dis.read(data, 0, bytesToReceive);

                // **** count the number of bytes received ****
                bytesReceived += (long)len;

                // ???? ????
                // System.out.println("consumer <<< bytesReceived: " + bytesReceived);

                // **** write data to the output file ****
                outStream.write(data, 0, len);
            }

            // **** close local file ****
            outStream.close();

            // ???? ????
            System.out.println("consumer <<< bytesReceived: " + bytesReceived);
        }

        // **** end timer and compute duration ****
        long endTime    = System.currentTimeMillis();
        long duration   = (endTime - startTime);

        // ???? ????
        System.out.println("producer <<< duration: " + duration + " ms.");

        // **** close the data input stream ****
        dis.close();

        // **** close the data output stream ****
        dos.close();

        // **** close client socket ****
        clientSocket.close();
    }

This is the main code for the Consumer. Several changes have been made since it now receives and acts on the names of the objects by writing their contents to files in the file system.

The main loop starts by generating the path to the folder in which the files with the objects will be stored. In the next pass we should pass this folder as an argument to the Consumer. We should also have a default value.

Note that once we have the full path we open a stream to write the object to the file system.

The way we track bytes to be received has been updated from the previous pass.  When done receiving the contents of the object we close the output stream.

At this time we could send an ACK / NAK to the Producer to indicate if all went well or not. If it did not we could retry the file that had the issue. In our MVP we will not add such feature wince it was not required. In production code we would have to implement retries even if the requirements document does not call for such feature explicitly.

    /**
     * Receive from Producer merged file information regarding 
     * the embeded objects.
     * 
     * @throws IOException
     * @throws ClassNotFoundException
     */
    public MergeDirEntry[] receiveObjectList(DataInputStream dis) throws IOException, ClassNotFoundException {

        // **** ****
        byte[] bytes = new byte[ProducerAndConsumer.IO_BUFFER_SIZE];

        // **** receive the number of objects ****
        int len = dis.read(bytes, 0, 4);

        // ???? ????
        System.out.println("receiveObjectList <<< len: " + len);

        // **** convert bytes to integer ****
        int entryCount =    ((bytes[3] & 0xff) << 24) |
                            ((bytes[2] & 0xff) << 16) |
                            ((bytes[1] & 0xff) << 8) |
                             (bytes[0] & 0xff);

        // ???? ????
        System.out.println("receiveObjectList <<< entryCount: " + entryCount);

        // **** create the array ****
        MergeDirEntry[] arr = new MergeDirEntry[entryCount];

        // **** loop receiving the array of entries  ****
        for (int i = 0; i < entryCount; i++) {

            // **** receive a merged dir entry ****
            len = dis.read(bytes, 0, 160);

            // ???? ????
            System.out.println("receiveObjectList <<< len: " + len);

            // **** ****
            ByteArrayInputStream bis    = new ByteArrayInputStream(bytes);
            ObjectInputStream ois       = new ObjectInputStream(bis);

            // **** ****
            arr[i] = (MergeDirEntry)ois.readObject();
        }

        // **** return array of entries ****
        return arr;
    }

This is another method that as I am going through it I realize that it has not been changed since the last pass. I decided to leave it here so you can reference to it while looking at the Consumer code.

# **** ****
cd C:\Users\johnc\workspace8\ProducerAndConsumer\build-jar

# **** run Producer Jar ****
java -cp ProducerAndConsumer.jar; com.canessa.producerconsumer.Producer
producer <<< ip [0.0.0.0]:
producer <<< ip ==>0.0.0.0<==
producer <<< producer port [51515]:
producer <<< port: 51515
producer <<< version: 1.0.00
producer <<< waiting for client connection...
producer <<< received client connection
producer <<< BEFORE reading request...
producer <<<  AFTER reading request dataLen: 1048576
producer <<< inputFile ==>E:\DICOM\005056891b354c11ed076009e73fd878<==
producer <<< fileSize: 426608152
readMergedFileInfo <<< skip: 8 skipped: 8
readMergedFileInfo <<< entryCount: 9
readMergedFileInfo <<< len: 40 guid ==>005056891b354c11ed076009e73fd879<==
readMergedFileInfo <<< len: 8 offset: 1084
readMergedFileInfo <<< len: 8 length: 442980
readMergedFileInfo <<< arr[0]: 005056891b354c11ed076009e73fd879 (1084, 442980)
readMergedFileInfo <<< len: 40 guid ==>005056891b354c11ed076009e73fd87a<==
readMergedFileInfo <<< len: 8 offset: 445088
readMergedFileInfo <<< len: 8 length: 1676850
readMergedFileInfo <<< arr[1]: 005056891b354c11ed076009e73fd87a (445088, 1676850)
readMergedFileInfo <<< len: 40 guid ==>005056891b354c11ed076009e73fd87b<==
readMergedFileInfo <<< len: 8 offset: 2122962
readMergedFileInfo <<< len: 8 length: 122700674
readMergedFileInfo <<< arr[2]: 005056891b354c11ed076009e73fd87b (2122962, 122700674)
readMergedFileInfo <<< len: 40 guid ==>005056891b354c11ed076009e73fd87c<==
readMergedFileInfo <<< len: 8 offset: 124824660
readMergedFileInfo <<< len: 8 length: 1670062
readMergedFileInfo <<< arr[3]: 005056891b354c11ed076009e73fd87c (124824660, 1670062)
readMergedFileInfo <<< len: 40 guid ==>005056891b354c11ed076009e73fd87d<==
readMergedFileInfo <<< len: 8 offset: 126495746
readMergedFileInfo <<< len: 8 length: 112339530
readMergedFileInfo <<< arr[4]: 005056891b354c11ed076009e73fd87d (126495746, 112339530)
readMergedFileInfo <<< len: 40 guid ==>005056891b354c11ed076009e73fd87e<==
readMergedFileInfo <<< len: 8 offset: 238836300
readMergedFileInfo <<< len: 8 length: 1488978
readMergedFileInfo <<< arr[5]: 005056891b354c11ed076009e73fd87e (238836300, 1488978)
readMergedFileInfo <<< len: 40 guid ==>005056891b354c11ed076009e73fd87f<==
readMergedFileInfo <<< len: 8 offset: 240326302
readMergedFileInfo <<< len: 8 length: 95431248
readMergedFileInfo <<< arr[6]: 005056891b354c11ed076009e73fd87f (240326302, 95431248)
readMergedFileInfo <<< len: 40 guid ==>005056891b354c11ed076009e73fd880<==
readMergedFileInfo <<< len: 8 offset: 335758574
readMergedFileInfo <<< len: 8 length: 1478244
readMergedFileInfo <<< arr[7]: 005056891b354c11ed076009e73fd880 (335758574, 1478244)
readMergedFileInfo <<< len: 40 guid ==>005056891b354c11ed076009e73fd881<==
readMergedFileInfo <<< len: 8 offset: 337237842
readMergedFileInfo <<< len: 8 length: 89368774
readMergedFileInfo <<< arr[8]: 005056891b354c11ed076009e73fd881 (337237842, 89368774)
sendObjectList <<< entryCount: 9
sendObjectList <<< data.length: 160
sendObjectList <<< data.length: 160
sendObjectList <<< data.length: 160
sendObjectList <<< data.length: 160
sendObjectList <<< data.length: 160
sendObjectList <<< data.length: 160
sendObjectList <<< data.length: 160
sendObjectList <<< data.length: 160
sendObjectList <<< data.length: 160
producer <<< entryCount: 9
producer <<< length: 442980
producer <<< skip: 1084 skipped: 1084
producer <<< guid ==>005056891b354c11ed076009e73fd879<== bytesSent: 442980 length: 442980
producer <<< filePos: 444064
producer <<< length: 1676850
producer <<< skip: 1024 skipped: 1024
producer <<< guid ==>005056891b354c11ed076009e73fd87a<== bytesSent: 1676850 length: 1676850
producer <<< filePos: 2121938
producer <<< length: 122700674
producer <<< skip: 1024 skipped: 1024
producer <<< guid ==>005056891b354c11ed076009e73fd87b<== bytesSent: 122700674 length: 122700674
producer <<< filePos: 124823636
producer <<< length: 1670062
producer <<< skip: 1024 skipped: 1024
producer <<< guid ==>005056891b354c11ed076009e73fd87c<== bytesSent: 1670062 length: 1670062
producer <<< filePos: 126494722
producer <<< length: 112339530
producer <<< skip: 1024 skipped: 1024
producer <<< guid ==>005056891b354c11ed076009e73fd87d<== bytesSent: 112339530 length: 112339530
producer <<< filePos: 238835276
producer <<< length: 1488978
producer <<< skip: 1024 skipped: 1024
producer <<< guid ==>005056891b354c11ed076009e73fd87e<== bytesSent: 1488978 length: 1488978
producer <<< filePos: 240325278
producer <<< length: 95431248
producer <<< skip: 1024 skipped: 1024
producer <<< guid ==>005056891b354c11ed076009e73fd87f<== bytesSent: 95431248 length: 95431248
producer <<< filePos: 335757550
producer <<< length: 1478244
producer <<< skip: 1024 skipped: 1024
producer <<< guid ==>005056891b354c11ed076009e73fd880<== bytesSent: 1478244 length: 1478244
producer <<< filePos: 337236818
producer <<< length: 89368774
producer <<< skip: 1024 skipped: 1024
producer <<< guid ==>005056891b354c11ed076009e73fd881<== bytesSent: 89368774 length: 89368774
producer <<< filePos: 426606616
producer <<< diff: 1536
producer <<< duration: 592 ms.

From a command prompt on a Windows machine we invoke the Producer from the specified Jar. We provide the two required arguments and the Producer comes up. It is now waiting for a client (Consumer) connection.

When the request from the Consumer is received, we read it and extract the name for the input file. We also display the size in order to make sure that the processing steps are working as designed.

The producer extracts information about each object and sends it to the Consumer. Note that in this example we have nine objects embedded in the specified file. For each object we have the name, offset from the top of the file and length of the object.

We then send the objects to the Consumer.

While the objects are being sent, we need to offset to the proper place in the file and then read the proper number of bytes which will be sent to the Consumer in blocks of 1024 * 1024 bytes each.

Note that at the end of each object we display some ancillary data used to get to the objects and inform us if all is proceeding as expected.

When the Producer is done sending the objects to the Consumer we display the time it took the software to process the request.

# **** ****
cd C:\Users\johnc\workspace8\ProducerAndConsumer\build-jar

# **** run Consumer Jar ****
java -cp ProducerAndConsumer.jar; com.canessa.producerconsumer.Consumer
consumer <<< producer ip [0.0.0.0]:
consumer <<< ip ==>0.0.0.0<==
consumer <<< producer port [51515]:
consumer <<< port: 51515
consumer <<< producer input file [E:\DICOM\005056891b354c11ed076009e73fd878]:
consumer <<< inputFile ==>E:\DICOM\005056891b354c11ed076009e73fd878<==
consumer <<< BEFORE send request...
consumer <<< str ==>E:\DICOM\005056891b354c11ed076009e73fd878<==
consumer <<<  AFTER send request
receiveObjectList <<< len: 4
receiveObjectList <<< entryCount: 9
receiveObjectList <<< len: 160
receiveObjectList <<< len: 160
receiveObjectList <<< len: 160
receiveObjectList <<< len: 160
receiveObjectList <<< len: 160
receiveObjectList <<< len: 160
receiveObjectList <<< len: 160
receiveObjectList <<< len: 160
receiveObjectList <<< len: 160
consumer <<< arr.length: 9
consumer <<< arr[0]: 005056891b354c11ed076009e73fd879 (1084, 442980)
consumer <<< arr[1]: 005056891b354c11ed076009e73fd87a (445088, 1676850)
consumer <<< arr[2]: 005056891b354c11ed076009e73fd87b (2122962, 122700674)
consumer <<< arr[3]: 005056891b354c11ed076009e73fd87c (124824660, 1670062)
consumer <<< arr[4]: 005056891b354c11ed076009e73fd87d (126495746, 112339530)
consumer <<< arr[5]: 005056891b354c11ed076009e73fd87e (238836300, 1488978)
consumer <<< arr[6]: 005056891b354c11ed076009e73fd87f (240326302, 95431248)
consumer <<< arr[7]: 005056891b354c11ed076009e73fd880 (335758574, 1478244)
consumer <<< arr[8]: 005056891b354c11ed076009e73fd881 (337237842, 89368774)
consumer <<< entryCount: 9
consumer <<< outputFile ==>c:\temp\Folder2\005056891b354c11ed076009e73fd879<==
consumer <<< bytesReceived: 442980
consumer <<< outputFile ==>c:\temp\Folder2\005056891b354c11ed076009e73fd87a<==
consumer <<< bytesReceived: 1676850
consumer <<< outputFile ==>c:\temp\Folder2\005056891b354c11ed076009e73fd87b<==
consumer <<< bytesReceived: 122700674
consumer <<< outputFile ==>c:\temp\Folder2\005056891b354c11ed076009e73fd87c<==
consumer <<< bytesReceived: 1670062
consumer <<< outputFile ==>c:\temp\Folder2\005056891b354c11ed076009e73fd87d<==
consumer <<< bytesReceived: 112339530
consumer <<< outputFile ==>c:\temp\Folder2\005056891b354c11ed076009e73fd87e<==
consumer <<< bytesReceived: 1488978
consumer <<< outputFile ==>c:\temp\Folder2\005056891b354c11ed076009e73fd87f<==
consumer <<< bytesReceived: 95431248
consumer <<< outputFile ==>c:\temp\Folder2\005056891b354c11ed076009e73fd880<==
consumer <<< bytesReceived: 1478244
consumer <<< outputFile ==>c:\temp\Folder2\005056891b354c11ed076009e73fd881<==
consumer <<< bytesReceived: 89368774
producer <<< duration: 607 ms.

# **** extracted objects ****
C:\>dir c:\temp\Folder2
04/02/2021  09:53 AM           442,980 005056891b354c11ed076009e73fd879
04/02/2021  09:53 AM         1,676,850 005056891b354c11ed076009e73fd87a
04/02/2021  09:53 AM       122,700,674 005056891b354c11ed076009e73fd87b
04/02/2021  09:53 AM         1,670,062 005056891b354c11ed076009e73fd87c
04/02/2021  09:53 AM       112,339,530 005056891b354c11ed076009e73fd87d
04/02/2021  09:53 AM         1,488,978 005056891b354c11ed076009e73fd87e
04/02/2021  09:53 AM        95,431,248 005056891b354c11ed076009e73fd87f
04/02/2021  09:53 AM         1,478,244 005056891b354c11ed076009e73fd880
04/02/2021  09:53 AM        89,368,774 005056891b354c11ed076009e73fd881

On a command line we run the Consumer Jar.

As we have seen in the previous pass, we need to provide arguments. When ready, the Consumer sends a request to the Producer. The producer sends to the Consumer the list of objects it will be sending. For each object we receive the name, offset into the input file (which we do not care for) and length.

As we loop in the code we determine the name of each object and write it to the file system. The size of each received object is displayed. The number of received bytes matches the expected counts.

When all is set and done, we display the amount of milliseconds the Consumer took to send the request, receive the data and write the objects to the file system.

We also display the contents of the folder used to store the objects. Note that the names and sizes match what was expected by the Consumer and was obtained from the input file by the Producer.

Hope you enjoyed solving this problem as much as I did. The entire code for this project can be found in my GitHub repository.

If you have comments or questions regarding this, or any other post in this blog, please do not hesitate and leave me a note below. I will reply as soon as possible.

Keep on reading and experimenting. It is one of the best ways to learn, become proficient, refresh your knowledge and enhance your developer toolset.

One last thing, many thanks to all subscribers to this blog!!!

Regards;

John

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.