Producer and Consumer – Part 2

Yesterday the temperature in the Twin Cities of Minneapolis and St. Paul reached 72F which is quite pleasant for this time of the year. Today we are expecting 42F for a high. That is a drop of 30F! At least the day is forecasted to be sunny.

I also received a message from Gleves48061@hotmail.com regarding my post Revenue Milestones in Java. Not sure what it was all about so I will not comment on it at this time. If I could get an updated comment, I will review and reply as needed. Thanks.

Today we will continue our work on the Producer and Consumer system. We will add a mechanism for the Producer to extract the names of the objects in the merged file and send the information to the Consumer. On the consumer side we will include a mechanism to receive the names of the objects and their associated sizes. This is illustrated in the following diagram:

When the Producer receives a request with the name of a file, it opens the file requested by the Consumer and extracts the following information:

o Number of objects which we refer to as count of entries.
o Name for each object (GUID).
o Offset into the merged file.
o Length of the object.

Since we are planning of sending each object at a time and the Consumer will store them as separate files, it needs to know the total number of objects it will be receiving.

We also need to send names for each object so the Consumer can create such files with the contents of the objects.

The offset to the start of each object is information that we will be useful to the Producer.

The size / length of each object will be required by both the Producer and Consumer so we will pass it to the Consumer.

# **** ****
cd C:\Users\johnc\workspace8\ProducerAndConsumer\build-jar

# **** run Producer Jar ****
java -cp ProducerAndConsumer.jar; com.canessa.producerconsumer.Producer
producer <<< ip [0.0.0.0]:
producer <<< ip ==>0.0.0.0<==
producer <<< producer port [51515]:
producer <<< port: 51515
producer <<< version: 1.0.00
producer <<< waiting for client connection...
producer <<< received client connection
producer <<< BEFORE reading request...
producer <<<  AFTER reading request dataLen: 851968
producer <<< inputFile ==>E:\DICOM\005056891b354c11ed076009e73fd878<==
producer <<< fileSize: 426608152
producer <<< markSupported: false
readMergedFileInfo <<< skipped: 8
readMergedFileInfo <<< entryCount: 9
readMergedFileInfo <<< len: 40 guid ==>005056891b354c11ed076009e73fd879<==
readMergedFileInfo <<< len: 8 offset: 572
readMergedFileInfo <<< len: 8 length: 442980
readMergedFileInfo <<< arr[0]: 005056891b354c11ed076009e73fd879 (572, 442980)
readMergedFileInfo <<< len: 40 guid ==>005056891b354c11ed076009e73fd87a<==
readMergedFileInfo <<< len: 8 offset: 444576
readMergedFileInfo <<< len: 8 length: 1676850
readMergedFileInfo <<< arr[1]: 005056891b354c11ed076009e73fd87a (444576, 1676850)
readMergedFileInfo <<< len: 40 guid ==>005056891b354c11ed076009e73fd87b<==
readMergedFileInfo <<< len: 8 offset: 2122450
readMergedFileInfo <<< len: 8 length: 122700674
readMergedFileInfo <<< arr[2]: 005056891b354c11ed076009e73fd87b (2122450, 122700674)
readMergedFileInfo <<< len: 40 guid ==>005056891b354c11ed076009e73fd87c<==
readMergedFileInfo <<< len: 8 offset: 124824148
readMergedFileInfo <<< len: 8 length: 1670062
readMergedFileInfo <<< arr[3]: 005056891b354c11ed076009e73fd87c (124824148, 1670062)
readMergedFileInfo <<< len: 40 guid ==>005056891b354c11ed076009e73fd87d<==
readMergedFileInfo <<< len: 8 offset: 126495234
readMergedFileInfo <<< len: 8 length: 112339530
readMergedFileInfo <<< arr[4]: 005056891b354c11ed076009e73fd87d (126495234, 112339530)
readMergedFileInfo <<< len: 40 guid ==>005056891b354c11ed076009e73fd87e<==
readMergedFileInfo <<< len: 8 offset: 238835788
readMergedFileInfo <<< len: 8 length: 1488978
readMergedFileInfo <<< arr[5]: 005056891b354c11ed076009e73fd87e (238835788, 1488978)
readMergedFileInfo <<< len: 40 guid ==>005056891b354c11ed076009e73fd87f<==
readMergedFileInfo <<< len: 8 offset: 240325790
readMergedFileInfo <<< len: 8 length: 95431248
readMergedFileInfo <<< arr[6]: 005056891b354c11ed076009e73fd87f (240325790, 95431248)
readMergedFileInfo <<< len: 40 guid ==>005056891b354c11ed076009e73fd880<==
readMergedFileInfo <<< len: 8 offset: 335758062
readMergedFileInfo <<< len: 8 length: 1478244
readMergedFileInfo <<< arr[7]: 005056891b354c11ed076009e73fd880 (335758062, 1478244)
readMergedFileInfo <<< len: 40 guid ==>005056891b354c11ed076009e73fd881<==
readMergedFileInfo <<< len: 8 offset: 337237330
readMergedFileInfo <<< len: 8 length: 89368774
readMergedFileInfo <<< arr[8]: 005056891b354c11ed076009e73fd881 (337237330, 89368774)
sendObjectList <<< entryCount: 9
sendObjectList <<< data.length: 160
sendObjectList <<< data.length: 160
sendObjectList <<< data.length: 160
sendObjectList <<< data.length: 160
sendObjectList <<< data.length: 160
sendObjectList <<< data.length: 160
sendObjectList <<< data.length: 160
sendObjectList <<< data.length: 160
sendObjectList <<< data.length: 160
producer <<< duration: 438 ms.
producer <<< bytesSent: 426608152

The producer is started and we provide the TCP/IP and port number in which it will be listening for a request from the Consumer. It then waits for a request.

When the request is received, the name of the merged file is extracted. The file size is obtained.

Disregard the markSupported flag for now. We will discuss it when we take a look at the code.

The readMergedFileInfo() method seems to collect from the merged file the number of objects followed by the names, offsets and lengths. Such information seems to be placed in an array.

The next step seems to be a call to the sendObjectList() method which appears to send the count of objects and the name, offset and length of each.

At this point we seem to be sending the entire contents of the file to the Consumer. That will probably change on our next pass of the Producer MVP.

# **** ****
cd C:\Users\johnc\workspace8\ProducerAndConsumer\build-jar

# **** run Consumer Jar ****
java -cp ProducerAndConsumer.jar; com.canessa.producerconsumer.Consumer
consumer <<< producer ip [0.0.0.0]:
consumer <<< ip ==>0.0.0.0<==
consumer <<< producer port [51515]:
consumer <<< port: 51515
consumer <<< producer input file [E:\DICOM\005056891b354c11ed076009e73fd878]:
consumer <<< inputFile ==>E:\DICOM\005056891b354c11ed076009e73fd878<==
consumer <<< BEFORE send request...
consumer <<< str ==>E:\DICOM\005056891b354c11ed076009e73fd878<==
consumer <<<  AFTER send request
receiveObjectList <<< len: 4
receiveObjectList <<< entryCount: 9
receiveObjectList <<< len: 160
receiveObjectList <<< len: 160
receiveObjectList <<< len: 160
receiveObjectList <<< len: 160
receiveObjectList <<< len: 160
receiveObjectList <<< len: 160
receiveObjectList <<< len: 160
receiveObjectList <<< len: 160
receiveObjectList <<< len: 160
consumer <<< arr.length: 9
consumer <<< arr[0]: 005056891b354c11ed076009e73fd879 (572, 442980)
consumer <<< arr[1]: 005056891b354c11ed076009e73fd87a (444576, 1676850)
consumer <<< arr[2]: 005056891b354c11ed076009e73fd87b (2122450, 122700674)
consumer <<< arr[3]: 005056891b354c11ed076009e73fd87c (124824148, 1670062)
consumer <<< arr[4]: 005056891b354c11ed076009e73fd87d (126495234, 112339530)
consumer <<< arr[5]: 005056891b354c11ed076009e73fd87e (238835788, 1488978)
consumer <<< arr[6]: 005056891b354c11ed076009e73fd87f (240325790, 95431248)
consumer <<< arr[7]: 005056891b354c11ed076009e73fd880 (335758062, 1478244)
consumer <<< arr[8]: 005056891b354c11ed076009e73fd881 (337237330, 89368774)
producer <<< duration: 453 ms.
consumer <<< bytesReceived: 426608152

On the Consumer side the changes from the previous implementation seem to start with the receiveObjectList() method. After the Consumer sends the request it expects to receive information about the number of objects, the names and lengths of the objects the Producer will be sending its way. After receiving the information for the files the objects seem to be place into an array. The records are displayed. They appear to match the data after being received and the data sent by the Producer. The entire file is then sent from the Producer to the Consumer. The number of bytes seems to match the file size and the number of bytes sent by the Producer. The duration of the transfer is at about half second. Will see in the next pass once we transfer the contents of each object at a time what is the overhead. Remember that we still need to decrypt the objects so they can be accessed by other applications on the Consumer side.

    /**
     * Test scaffold.
     * 
     * Starts the producer and consumer programs in this computer.
     * It starts the programs with the specified arguments.
     * 
     * @throws IOException
     */
    public static void main(String[] args) throws IOException {

        // **** display a prompt ****
        System.out.print("main >>> input file [" + INPUT_FILE + "]: ");

        // **** open a buffered reader ****
        BufferedReader br = new BufferedReader(new InputStreamReader(System.in));

        // **** read name of file to be processed by producer ****
        String inputFile = br.readLine().trim();

        // **** close the buffered reader ****
        br.close();

        // **** check if we should use the default input file ****
        if (inputFile.equals("")) {
            inputFile = INPUT_FILE;
        }

        // ???? ????
        System.out.println("main <<< inputFile ==>" + inputFile + "<==");

        // **** start producer ****
        Process prod = Runtime.getRuntime().exec("java -cp C:\\Users\\johnc\\workspace8\\ProducerAndConsumer\\ProducerAndConsumer.jar; com.canessa.producerconsumer.Producer");

        // **** start consumer ****
        Process cons = Runtime.getRuntime().exec("java -cp C:\\Users\\johnc\\workspace8\\ProducerAndConsumer\\ProducerAndConsumer.jar; com.canessa.producerconsumer.Consumer");
    }

The main program has not been changed from the previous pass. Not sure if we will make any changes while getting the Producer and Consumer to meet the stated requirements.

    // **** constants ****
    public static final String INPUT_FILE  = "E:\\DICOM\\005056891b354c11ed076009e73fd878";
    public static final int PRODUCER_PORT  = 51515;
    public static final String PRODUCER_IP = "0.0.0.0";
    public static final int IO_BUFFER_SIZE = (1024 * 1024);

The constants have not changed since the last pass. At some point we will specify a different IP for the Producer. That will be when we test the Producer and Consumer on separate machines. At this point we have only been experimenting on a single computer.

    /**
     * Check if TCP/IP IP is valid.
     */
    public static boolean isValidIPAddress(String ip) {
    
        // **** sanity checks ****
        if (ip == null || ip.length() == 0)
            return false;

        // **** regex for digit from 0 to 255 ****
        String zeroTo255 = "(\\d{1,2}|(0|1)\\d{2}|2[0-4]\\d|25[0-5])";

        // **** regex for a digit from 0 to 255 followed by a dot and repeat 4 times ****
        String regex = zeroTo255 + "\\."
                        + zeroTo255 + "\\."
                        + zeroTo255 + "\\."
                        + zeroTo255;

        // **** compile the regex ****
        Pattern p = Pattern.compile(regex);

        // **** match the pattern ****
        Matcher m = p.matcher(ip);

        // **** return true if the ip is valid ****
        return m.matches();
    }

This method is used to verify that the format of the IPs entered by the user is correct.

    /**
     * Check if TCP/IP port is valid.
     * 
     * Ports 0 through 1023 are defined as well-known ports.
     * Registered ports are from 1024 to 49151. 
     * The remainder of the ports from 49152 to 65535 can be 
     * used dynamically by applications.
     */
    public static boolean isValidPortNumber(int port) {
        if (port < 49152)
            return false;
        else 
            return true;
    }

This is the method used to verify that the value of the port entered by the user is correct.

/**
 * MergeDirEntry class to keep track of GUID, 
 * offset and length of objects.
 */
class MergeDirEntry implements Serializable {


    // **** ****
    private static final long serialVersionUID = 7326474381142349468L;


    // **** ****
    public String guid;
    public long offset;
    public long length;


    /**
     * Constructor
     */
    public MergeDirEntry(String guid, long offset, long length) {
        this.guid   = guid;
        this.offset = offset;
        this.length = length;
    }


    /**
     * String representation
     */
    @Override
    public String toString() {
        return guid + " " + "(" + offset + ", " + length + ")"; 
    }
}

This class is new to the implementation. It is used to hold information for the object name, offset and length. The values are obtained from the merged file specified by the Consumer to the Producer.

We added a Constructor() and a toString() methods for ease of use.

    /**
     * This is the main code for the Producer.
     * 
     * @throws InterruptedException
     * @throws IOException
     */
    public static void main(String[] args) throws InterruptedException, IOException {
        
        // **** initialization ****
        String ip                   = ProducerAndConsumer.PRODUCER_IP;
        int port                    = ProducerAndConsumer.PRODUCER_PORT;
        byte[] data                 = new byte[ProducerAndConsumer.IO_BUFFER_SIZE];
        String buffer               = "";

        ServerSocket serverSocket   = null;
        Socket clientSocket         = null;
    
        // **** open a buffered reader ****
        BufferedReader br = new BufferedReader(new InputStreamReader(System.in));

        // **** prompt and get the IP ****
        System.out.print("producer <<< ip [" + ip + "]: ");
        buffer = br.readLine().trim();
        if (!buffer.equals(""))
            ip = buffer;

        // **** check if ip is NOT valied ****
        if (!ProducerAndConsumer.isValidIPAddress(ip)) {
            System.out.println("producer <<< ip ==>" + ip + "<== is NOT valid!!!");
            System.exit(-1);
        }

        // ???? ????
        System.out.println("producer <<< ip ==>" + ip + "<==");

        // **** prompt and get the port ****
        System.out.print("producer <<< producer port [" + port + "]: ");
        buffer = br.readLine().trim();
        if (!buffer.equals(""))
            port = Integer.parseInt(buffer);

        // **** check if port NOT valid ****
        if (!ProducerAndConsumer.isValidPortNumber(port)) {
            System.out.println("producer <<< port: " + port + " is NOT valid!!!");
            System.exit(-1);
        }

        // ???? ????
        System.out.println("producer <<< port: " + port);

        // **** close the buffered reader ****
        br.close();

        // **** create a producer ****
        Producer producer = new Producer(ip, port);

        // **** get and display producer version ****
        System.out.println("producer <<< version: " + producer.version());

        // **** open server socket and accept client (consumer) connection ****
        try {

            // **** server socket ****
            serverSocket    = new ServerSocket(port);

            // ???? ????
            System.out.println("producer <<< waiting for client connection...");

            // **** accept client (consumer) connection ****
            clientSocket    = serverSocket.accept();

            // ???? ????
            System.out.println("producer <<< received client connection");
        } catch (Exception ex) {
            System.err.println("producer <<< ex: " + ex);
            System.exit(-1);
        }

        // **** start timer ****
        long startTime = System.currentTimeMillis();

        // **** to read from client (consumer) socket ****
        DataInputStream dis = new DataInputStream(new BufferedInputStream(clientSocket.getInputStream()));

        // **** to write to client (consumer) socket ****
        DataOutputStream dos = new DataOutputStream(new BufferedOutputStream(clientSocket.getOutputStream()));

        // ???? ????
        System.out.println("producer <<< BEFORE reading request...");

        // **** read consumer (client) request ****
        int dataLen = dis.read(data, 0, data.length);

        // ???? ????
        System.out.println("producer <<<  AFTER reading request dataLen: " + dataLen);

        // **** get the input file name ****
        String inputFile = new String(data, "UTF-8").trim(); 

        // ???? ????
        System.out.println("producer <<< inputFile ==>" + inputFile + "<==");

        // **** check if input file does NOT exist ****
        File f = new File(inputFile);
        if (!f.exists() || f.isDirectory()) { 
            System.err.println("producer <<< inputFile ==>" + inputFile + "<== NOT available!!!");
            System.exit(-1);
        }

        // **** get input file size ****
        Path path = Paths.get(inputFile);
        long fileSize = Files.size(path);
        if (fileSize <= 0) {
            System.err.println("producer <<< unexpected fileSize: " + fileSize);
            System.exit(-1);
        }

        // ???? ????
        System.out.println("producer <<< fileSize: " + fileSize);

        // **** open input file ****
        InputStream inStream = new FileInputStream(inputFile);

        // **** check if marks are supported ****
        boolean markSupported = inStream.markSupported();

        // ???? ????
        System.out.println("producer <<< markSupported: " + markSupported);

        // **** NOT SUPORTED ****
        // inStream.mark(1024);

        // **** extract information for each object that will follow ****
        MergeDirEntry[] arr =  producer.readMergedFileInfo(inStream);

        // **** NOT SUPPORTED ****
        // inStream.reset();

        // **** close stream ****
        inStream.close();

        // **** open stream ****
        inStream = new FileInputStream(inputFile);

        // **** send to Consumer information for each object that will follow ****
        producer.sendObjectList(dos, arr);

        
        // **** loop reading and sending data to the Consumer (client) ****
        int bytesToRead     = 0;
        long bytesSent      = 0;
        while (bytesSent < fileSize) {

            // **** determine number of bytes to read ****
            if (fileSize - bytesSent > ProducerAndConsumer.IO_BUFFER_SIZE)
                bytesToRead = ProducerAndConsumer.IO_BUFFER_SIZE;
            else
                bytesToRead = (int)(fileSize - bytesSent);

            // **** read data from the local file ****
            int len = inStream.read(data, 0, bytesToRead);

            // **** send data to consumer (client) ****
            dos.write(data, 0, len);

            // **** update the number of bytes sent to consumer (client) ****
            bytesSent += len;

            // ???? ????
            // System.out.println("producer <<< bytesSent: " + bytesSent);
        }

        // **** end timer and compute duration ****
        long endTime    = System.currentTimeMillis();
        long duration   = (endTime - startTime);

        // ???? ????
        System.out.println("producer <<< duration: " + duration + " ms.");

        // ???? ????
        System.out.println("producer <<< bytesSent: " + bytesSent);

        // **** close input stream ****
        inStream.close();

        // **** close the data input stream ****
        dis.close();

        // **** close the data output stream ****
        dos.close();

        // **** close client socket ****
        clientSocket.close();

        // **** close server socket ****
        serverSocket.close();
    }

This is the updated Producer code. Note that if you compare the code to the previous version the first change is that the Producer reads the data contained in the request sent by the Consumer. The data holds the name of the file to open. In practice the Consumer would only send the name and the path would be generated by the Producer. In this case we just access a file in the file system. The file might be stored in a storage server.

Once we get the name of the file we check if the file exists and get the size. We are still sending the entire file from the Producer to the Consumer.

We open a stream to the input file. Since we need to move around in the file I figured we could set marks in the stream and be able to jump around. It seems that marks are not supported in this Java implementation.

We then call the readMergedFileInfo() method to collect the information about the embedded objects. With that in hand we call the sendObjectList() method to send the information to the Consumer.

After receiving such information the Consumer will be aware of the number of files it would be receiving, the names that it should assign and the length of each file. Note that the offset is of no use for the Consumer.

From there on, we just send the entire contents of the file to the Consumer. In the next pass we will make the necessary enhancements to send the proper files but they will be encrypted. I am thinking that perhaps the Consumer should decrypt the objects. It would make sense because the data in the socket transfer would not be encrypted. This is something to think about.

    /**
     * Read from the merged file information regarding the embeded objects.
     * 
     * @throws IOException
     */
    public MergeDirEntry[] readMergedFileInfo(InputStream inStream) throws IOException {

        // **** initialization ****
        byte[] bytes = new byte[64];

        // **** skip signature (8 bytes) ****
        long skipped = inStream.skip(8);

        // ???? ????
        System.out.println("readMergedFileInfo <<< skipped: " + skipped);

        // **** read bitfile count ****
        int len = inStream.read(bytes, 0, 4);

        // **** convert bytes to integer ****
        int entryCount =    ((bytes[3] & 0xff) << 24) |
                            ((bytes[2] & 0xff) << 16) |
                            ((bytes[1] & 0xff) << 8) |
                             (bytes[0] & 0xff);

        // ???? ????
        System.out.println("readMergedFileInfo <<< entryCount: " + entryCount);

        // **** allocate array ****
        MergeDirEntry[] arr = new MergeDirEntry[entryCount];

        // **** loop reading info populating the array ****
        for (int i = 0; i < entryCount; i++) {

            // **** read GUID ****
            len = inStream.read(bytes, 0, 40);

            // **** get the GUID ****
            String guid = new String(bytes, "UTF-8").trim(); 

            // ???? ????
            System.out.println("readMergedFileInfo <<< len: " + len + " guid ==>" + guid + "<==");

            // **** read offset ****
            len = inStream.read(bytes, 0, 8);

            // **** convert buffer to long ****
            long offset =   ((bytes[7] & 0xff) << 56) |
                            ((bytes[6] & 0xff) << 48) |
                            ((bytes[5] & 0xff) << 40) |
                            ((bytes[4] & 0xff) << 32) |
                            ((bytes[3] & 0xff) << 24) |
                            ((bytes[2] & 0xff) << 16) |
                            ((bytes[1] & 0xff) << 8) |
                             (bytes[0] & 0xff);

            // ???? ????
            System.out.println("readMergedFileInfo <<< len: " + len + " offset: " + offset);

            // **** read length ****
            len = inStream.read(bytes, 0, 8);

            // **** convert buffer to long ****
            long length =   ((bytes[7] & 0xff) << 56) |
                            ((bytes[6] & 0xff) << 48) |
                            ((bytes[5] & 0xff) << 40) |
                            ((bytes[4] & 0xff) << 32) |
                            ((bytes[3] & 0xff) << 24) |
                            ((bytes[2] & 0xff) << 16) |
                            ((bytes[1] & 0xff) << 8) |
                             (bytes[0] & 0xff);

            // ???? ????
            System.out.println("readMergedFileInfo <<< len: " + len + " length: " + length);

            // **** ****
            arr[i] = new MergeDirEntry(guid, offset, length);

            // ???? ????
            System.out.println("readMergedFileInfo <<< arr[" + i + "]: " + arr[i].toString());
        }

        // **** return the array ****
        return arr;
    }

The readMergedFileInfo() method is used by the Producer to read the number of objects, their associated name, offsets and lengths from the input file.

We start by getting to the count in the file. We read the count and display it.

With the cont on hand we initialize an array that will hold elements with the name, offset and length.

We enter a loop that reads data from the input file, populates an object, and places the object in the array. When done, the method returns the array with the information per object.

    /**
     * Send to Consumer a list of object names, offsets and lengths.
     * 
     * @throws IOException
     */
    public void sendObjectList(DataOutputStream dos, MergeDirEntry[] arr) throws IOException {

        // ***** initialization ****
        int entryCount  = arr.length;
        byte[] data     = new byte[ProducerAndConsumer.IO_BUFFER_SIZE];

        // ???? ????
        System.out.println("sendObjectList <<< entryCount: " + entryCount);

        // **** ****
        data[0] = (byte)(entryCount & 0xff);
        data[1] = (byte)((entryCount << 8) & 0xff);
        data[2] = (byte)((entryCount << 16) & 0xff);
        data[3] = (byte)((entryCount << 24) & 0xff);

        // **** send the number of merge dir entries to the Consumer ****
        dos.write(data, 0, 4);

        // **** loop sending the merge dir entries to the Consumer ****
        for (int i = 0; i < entryCount; i++) {

            // **** ****
            ByteArrayOutputStream bos   = new ByteArrayOutputStream();
            ObjectOutputStream oos      = new ObjectOutputStream(bos);

            // **** ****
            oos.writeObject(arr[i]);
            oos.flush();
            data = bos.toByteArray();

            // ???? ????
            System.out.println("sendObjectList <<< data.length: " + data.length);
            
            // **** send the entry to the Consumer ****
            dos.write(data, 0, data.length);
        }
    }

This method is used by the Producer to send the data in the array we generated in the previous method to the Consumer. The idea is simple. We serialize the current entry. The serialized data is then sent to the Consumer via the socket stream.

    /**
     * This is the main code for the Consumer.
     * @throws IOException
     * @throws ClassNotFoundException
     */
    public static void main(String[] args) throws IOException, ClassNotFoundException {
        
        // **** initialization ****
        String ip           = ProducerAndConsumer.PRODUCER_IP;
        int port            = ProducerAndConsumer.PRODUCER_PORT;
        String inputFile    = ProducerAndConsumer.INPUT_FILE;
        byte[] data         = new byte[ProducerAndConsumer.IO_BUFFER_SIZE];
        String buffer       = "";

        Socket clientSocket = null;

        // **** open a buffered reader ****
        BufferedReader br = new BufferedReader(new InputStreamReader(System.in));

        // **** prompt and get the IP ****
        System.out.print("consumer <<< producer ip [" + ip + "]: ");
        buffer = br.readLine().trim();
        if (!buffer.equals(""))
            ip = buffer;

        // **** check if ip is NOT valied ****
        if (!ProducerAndConsumer.isValidIPAddress(ip)) {
            System.out.println("consumer <<< ip ==>" + ip + "<== is NOT valid!!!");
            System.exit(-1);
        }

        // ???? ????
        System.out.println("consumer <<< ip ==>" + ip + "<==");

        // **** prompt and get the port ****
        System.out.print("consumer <<< producer port [" + port + "]: ");
        buffer = br.readLine().trim();
        if (!buffer.equals(""))
            port = Integer.parseInt(buffer);

        // **** check if port NOT valied ****
        if (!ProducerAndConsumer.isValidPortNumber(port)) {
            System.out.println("consumer <<< port: " + port + " is NOT valid!!!");
            System.exit(-1);
        }

        // ???? ????
        System.out.println("consumer <<< port: " + port);

        // **** prompt and get the input file name ****
        System.out.print("consumer <<< producer input file [" + inputFile + "]: ");
        buffer = br.readLine().trim();
        if (!buffer.equals(""))
            inputFile = buffer;

        // ???? ????
        System.out.println("consumer <<< inputFile ==>" + inputFile + "<==");

        // **** close the buffered reader ****
        br.close();

        // **** start timer ****
        long startTime = System.currentTimeMillis();

        // **** open client socket ****
        try {
            clientSocket = new Socket(ip, port);
        } catch (Exception ex) {
            System.err.println("consumer <<< ex: " + ex.toString());
            System.exit(-1);
        }

        // **** to read data from server (producer) socket ****
        DataInputStream dis = new DataInputStream(new BufferedInputStream(clientSocket.getInputStream()));

        // **** to write data to server (producer) socket ****
        DataOutputStream dos = new DataOutputStream(new BufferedOutputStream(clientSocket.getOutputStream()));

        // ???? ????
        System.out.println("consumer <<< BEFORE send request...");

        // **** send request to producer (input file name) ****
        try {
            Arrays.fill(data, (byte)0);
            System.arraycopy(inputFile.getBytes(), 0, data, 0, inputFile.length());

            // ???? ????
            String str = new String(data, "UTF-8").trim(); 
            System.out.println("consumer <<< str ==>" + str + "<==");

            dos.write(data, 0, ProducerAndConsumer.IO_BUFFER_SIZE);
        } catch (Exception ex) {
            System.err.println("consumer <<< ex: " + ex.toString());
            System.exit(-1);
        }

        // ???? ????
        System.out.println("consumer <<<  AFTER send request");

        // **** create consumer ****
        Consumer consumer = new Consumer();

        // **** receive information for each object that will follow ****
        MergeDirEntry arr[] = consumer.receiveObjectList(dis);

        // ???? ????
        System.out.println("consumer <<< arr.length: " + arr.length);
        for (int i = 0; i < arr.length; i++)
            System.out.println("consumer <<< arr[" + i + "]: " + arr[i].toString());

            
        // **** open local file to write data ****
        String outputFile       = "c:\\temp\\_received_file";
        OutputStream outStream  = new FileOutputStream(outputFile);
    
        // **** loop receiving data from producer (server) ****
        boolean done        = false;
        long bytesReceived  = 0;
        while (!done) {

            // **** ****
            int len = dis.read(data, 0, ProducerAndConsumer.IO_BUFFER_SIZE);

            // **** check if we are done receiving data from the producer (server) ****
            if (len == -1) {
                done = true;
                continue;
            }

            // **** count these bytes ****
            bytesReceived += (long)len;

            // ???? ????
            // System.out.println("consumer <<< bytesReceived: " + bytesReceived);

            // **** write data to the output file ****
            outStream.write(data, 0, len);
        }

        // **** end timer and compute duration ****
        long endTime    = System.currentTimeMillis();
        long duration   = (endTime - startTime);

        // ???? ????
        System.out.println("producer <<< duration: " + duration + " ms.");

        // ???? ????
        System.out.println("consumer <<< bytesReceived: " + bytesReceived);

        // **** close local file ****
        outStream.close();

        // **** close the data input stream ****
        dis.close();

        // **** close the data output stream ****
        dos.close();

        // **** close client socket ****
        clientSocket.close();
    }

This is the main procedure for the Consumer. Note that the first few lines the code has not changed from the previous pass.

The first change is that we now send the name of the input file to the Producer.

We now create a Consumer object. We could have used a simple function, but when done we would like to have an implementation using an object oriented approach.

The receiveObjectList() method is used to receive the list of records describing the object that the Consumer will receive. The objects are placed in an array. The contents of the array are displayed.

Note that the data regarding the list of objects displayed by the Producer matches the one displayed by the Consumer. It seems that we were able to transfer such data across the network.

    /**
     * Receive from Producer merged file information regarding 
     * the embeded objects.
     * 
     * @throws IOException
     * @throws ClassNotFoundException
     */
    public MergeDirEntry[] receiveObjectList(DataInputStream dis) throws IOException, ClassNotFoundException {

        // **** ****
        byte[] bytes = new byte[ProducerAndConsumer.IO_BUFFER_SIZE];

        // **** receive the number of objects ****
        int len = dis.read(bytes, 0, 4);

        // ???? ????
        System.out.println("receiveObjectList <<< len: " + len);

        // **** convert bytes to integer ****
        int entryCount =    ((bytes[3] & 0xff) << 24) |
                            ((bytes[2] & 0xff) << 16) |
                            ((bytes[1] & 0xff) << 8) |
                             (bytes[0] & 0xff);

        // ???? ????
        System.out.println("receiveObjectList <<< entryCount: " + entryCount);

        // **** create the array ****
        MergeDirEntry[] arr = new MergeDirEntry[entryCount];

        // **** loop receiving the array of entries  ****
        for (int i = 0; i < entryCount; i++) {

            // **** receive a merged dir entry ****
            len = dis.read(bytes, 0, 160);

            // ???? ????
            System.out.println("receiveObjectList <<< len: " + len);

            // **** ****
            ByteArrayInputStream bis    = new ByteArrayInputStream(bytes);
            ObjectInputStream ois       = new ObjectInputStream(bis);

            // **** ****
            arr[i] = (MergeDirEntry)ois.readObject();
        }

        // **** return array of entries ****
        return arr;
    }

This method is used by the Consumer to receive the list of objects that the Producer will transfer.

We start by receiving and parsing the number of elements in the list. We create an array and enter a loop. In the loop we receive a merged directory entry per pass. We get back the object from the serial data we received. The object is then inserted into the array.

When done, the method returns the array with the files information. Note that the offset is included but the Consumer does not have much use for it.

We are getting closer to have our first release of the MVP. In the next two steps we will transfer the objects and decrypt them. AT that point we would have our MVP. I would like to note that between code and some documentation, I have spent about 8 hours of work so far. Based on this pace we should have all done and tested in no more than two days. We will see if this is the case in the next couple days. Obviously there are time limitations per day in which I can work on this small but interesting project.

Hope you enjoyed solving this problem as much as I did. The entire code for this project can be found in my GitHub repository.

If you have comments or questions regarding this, or any other post in this blog, please do not hesitate and leave me a note below. I will reply as soon as possible.

Keep on reading and experimenting. It is one of the best ways to learn, become proficient, refresh your knowledge and enhance your developer toolset.

One last thing, many thanks to all subscribers to this blog!!!  If you have not subscribed yet, then please take a minute or so to do so.

Regards;

John

Leave a Reply

Your email address will not be published.

This site uses Akismet to reduce spam. Learn how your comment data is processed.