o7planning

Java Pipe.SourceChannel Tutorial with Examples

  1. Pipe.SourceChannel
  2. Examples

1. Pipe.SourceChannel

Suppose you are developing a Multithreading application, and you have 2 independent Threads, Thread-A and Thread-B. The question is:
  • What to do so that when data appears on Thread-A it automatically transfers to Thread-B?
Pipe.SinkChannel and Pipe.SourceChannel are two classes created to handle the situation mentioned above. Every time data is written to Pipe.SinkChannel, they will automatically appear on Pipe.SourceChannel, this is called the pipe effect.
The Pipe.SinkChannel class is an abstract class defined inside the Pipe class and implements the WritableByteChannel and GatheringByteChannel interfaces. It acts as a writing channel.
public abstract static class SinkChannel extends AbstractSelectableChannel
                        implements WritableByteChannel, GatheringByteChannel
The Pipe.SourceChannel class is an abstract class defined inside the Pipe class and implements the ReadableByteChannel and ScatteringByteChannel interfaces. It acts as a reading channel.
public abstract static class SourceChannel extends AbstractSelectableChannel
                        implements ReadableByteChannel, ScatteringByteChannel

2. Examples

In this example, we will write messages to a Pipe.SinkChannel (controlled by ThreadA). They will automatically appear on the Pipe.SourceChannel (controlled by ThreadB).
Pipe_ex1.java
package org.o7planning.pipe.sourcechannel.ex;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.Pipe;

public class Pipe_ex1 {

    public static void main(String[] args) throws IOException, InterruptedException {
        Pipe pipe = Pipe.open();

        ThreadA threadA = new ThreadA(pipe);
        ThreadB threadB = new ThreadB(pipe);

        threadA.start();
        threadB.start();
        threadA.join(); // Waits for this thread to die.
        threadB.join(); // Waits for this thread to die.
        System.out.println();
        System.out.println("Done!");
    }
}

//
class ThreadA extends Thread {
    private Pipe pipe;

    public ThreadA(Pipe pipe) {
        this.pipe = pipe;
    }

    @Override
    public void run() {
        try (Pipe.SinkChannel skChannel = this.pipe.sink();) { // try
            String[] messages = new String[] { "Hello\n", "Hi\n", "Bye\n" };

            ByteBuffer buffer = ByteBuffer.allocate(512);

            for (String msg : messages) {
                // Set position = 0; limit = capacity;
                buffer.clear();
                buffer.put(msg.getBytes("UTF-8"));
                buffer.flip();
                while (buffer.hasRemaining()) {
                    skChannel.write(buffer);
                }
                Thread.sleep(1000);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

//
class ThreadB extends Thread {
    private Pipe pipe;

    public ThreadB(Pipe pipe) {
        this.pipe = pipe;
    }

    @Override
    public void run() {
        try (Pipe.SourceChannel srcChannel = this.pipe.source();) { // try
            ByteBuffer buffer = ByteBuffer.allocate(512);

            while (srcChannel.read(buffer) != -1) {
                buffer.flip(); // Set limit = current position; position = 0;
                ByteArrayOutputStream baos = new ByteArrayOutputStream();

                while (buffer.hasRemaining()) {
                    byte b = buffer.get();
                    if (b != '\n') {
                        baos.write(b);
                    } else {
                        String s = baos.toString("UTF-8");
                        System.out.println(s);
                    }
                }
                buffer.clear(); // Set position =0; limit = capacity;
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
Output: