o7planning

Java Pipe.SinkChannel Tutorial with Examples

View more Tutorials:

Follow us on our fanpages to receive notifications every time there are new articles. Facebook Twitter

1- Pipe.SinkChannel

Suppose you are developing a Multithreading application, and you have 2 independent Threads, Thread-A and Thread-B. The question is:
  • What to do so that when data appears on Thread-A it automatically transfers to Thread-B?
Pipe.SinkChannel and Pipe.SourceChannel are two classes created to handle the situation mentioned above. Every time data is written to Pipe.SinkChannel, they automatically appear on Pipe.SourceChannel. This is called the pipe effect.
The Pipe.SinkChannel class is an abstract class defined inside the Pipe class, and implements the WritableByteChannel and GatheringByteChannel interfaces. It acts as a writing channel.

public abstract static class SinkChannel extends AbstractSelectableChannel
                        implements WritableByteChannel, GatheringByteChannel
The Pipe.SourceChannel class is an abstract class defined inside the Pipe class, and implements the ReadableByteChannel and ScatteringByteChannel interfaces. It acts as a reading channel.

public abstract static class SourceChannel extends AbstractSelectableChannel
                        implements ReadableByteChannel, ScatteringByteChannel

2- Examples

In this example, we will write messages to a Pipe.SinkChannel (controlled by ThreadA). They will automatically appear on the Pipe.SourceChannel (controlled by ThreadB).
Pipe_ex1.java

package org.o7planning.pipe.sinkchannel.ex;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.Pipe;

public class Pipe_ex1 {

    public static void main(String[] args) throws IOException, InterruptedException {
        Pipe pipe = Pipe.open();

        ThreadA threadA = new ThreadA(pipe);
        ThreadB threadB = new ThreadB(pipe);

        threadA.start();
        threadB.start();
        threadA.join(); // Waits for this thread to die.
        threadB.join(); // Waits for this thread to die.
        System.out.println();
        System.out.println("Done!");
    }
}

//
class ThreadA extends Thread {
    private Pipe pipe;

    public ThreadA(Pipe pipe) {
        this.pipe = pipe;
    }

    @Override
    public void run() {
        try (Pipe.SinkChannel skChannel = this.pipe.sink();) { // try
            String[] messages = new String[] { "Hello\n", "Hi\n", "Bye\n" };

            ByteBuffer buffer = ByteBuffer.allocate(512);

            for (String msg : messages) {
                // Set position = 0; limit = capacity;
                buffer.clear();
                buffer.put(msg.getBytes("UTF-8"));
                buffer.flip();
                while (buffer.hasRemaining()) {
                    skChannel.write(buffer);
                }
                Thread.sleep(1000);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

//
class ThreadB extends Thread {
    private Pipe pipe;

    public ThreadB(Pipe pipe) {
        this.pipe = pipe;
    }

    @Override
    public void run() {
        try (Pipe.SourceChannel srcChannel = this.pipe.source();) { // try
            ByteBuffer buffer = ByteBuffer.allocate(512);

            while (srcChannel.read(buffer) != -1) {
                buffer.flip(); // Set limit = current position; position = 0;
                ByteArrayOutputStream baos = new ByteArrayOutputStream();

                while (buffer.hasRemaining()) {
                    byte b = buffer.get();
                    if (b != '\n') {
                        baos.write(b);
                    } else {
                        String s = baos.toString("UTF-8");
                        System.out.println(s);
                    }
                }
                buffer.clear(); // Set position =0; limit = capacity;
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
Output:

View more Tutorials:

Maybe you are interested

These are online courses outside the o7planning website that we introduced, which may include free or discounted courses.