o7planning

Java TransferQueue Tutorial with Examples

  1. TransferQueue
  2. TransferQueue methods
  3. Example:
  4. getWaitingConsumerCount()
  5. hasWaitingConsumer()
  6. transfer(E)
  7. tryTransfer(E)
  8. tryTransfer(E, long, TimeUnit)

1. TransferQueue

As a subinteface of BlockingQueue, TransferQueue has all the features of the parent interface, in addition, it provides the ability to allow the Producer to wait until the Consumer receives the "product" (element). TransferQueue is useful in some types of applications, such as messaging applications.
public interface TransferQueue<E> extends BlockingQueue<E>
Compared with BlockingQueue<E>, TransferQueue<E> provides a few more methods, including:
void transfer(E e) throws InterruptedException;
boolean tryTransfer(E e);
boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException;
boolean hasWaitingConsumer();
int getWaitingConsumerCount();
transfer(e):
Add an element to this TransferQueue and wait until it is received by a waiting consumer via the BlockingQueue.take() or BlockingQueue.poll(timeout,unit) method.
tryTransfer(e):
The tryTransfer(e) method only adds an element to this TransferQueue if there is a consumer waiting to receive the element via the BlockingQueue.take() or BlockingQueue.poll(timeout,unit) method, and make sure that the consumer will receive this element immediately. Otherwise, the method returns false and no other action is taken.
tryTransfer(e, timeout, unit):
The tryTransfer(e,timeout,unit) method only adds an element to this TransferQueue if within a specified waiting period there is a consumer waiting to receive the element via the BlockingQueue.take( ) or BlockingQueue.poll(timeout,unit), and make sure the consumer receives this element. Otherwise, the method returns false and no other action is taken.

2. TransferQueue methods

Methods are defined in the TransferQueue<E> interface:
void transfer(E e) throws InterruptedException;
boolean tryTransfer(E e);
boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException;
boolean hasWaitingConsumer();
int getWaitingConsumerCount();
Methods that are inherited from the BlockingQueue<E> interface:
void put(E e) throws InterruptedException;
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
E take() throws InterruptedException;
E poll(long timeout, TimeUnit unit) throws InterruptedException;
int remainingCapacity();
int drainTo(Collection<? super E> c);  
int drainTo(Collection<? super E> c, int maxElements);
Methods that are inherited from the Queue<E> interface:
boolean add(E e);
boolean offer(E e);
E remove();
E poll();
E element();
E peek();
Methods that are inherited from the Collection<E> interface:
int size();
boolean isEmpty();
boolean contains(Object o);
Iterator<E> iterator();
Object[] toArray();
<T> T[] toArray(T[] a);
boolean add(E e);
boolean remove(Object o);
boolean containsAll(Collection<?> c);  
boolean addAll(Collection<? extends E> c);
boolean removeAll(Collection<?> c);
boolean retainAll(Collection<?> c);
void clear();  
boolean equals(Object o);
int hashCode();

default <T> T[] toArray(IntFunction<T[]> generator)  
default boolean removeIf(Predicate<? super E> filter)
default Spliterator<E> spliterator()  
default Stream<E> stream()  
default Stream<E> parallelStream()

3. Example:

In the example below, the Producer sends messages to the Consumer through the TransferQueue.transfer(e) method.
Looking at the output of this example you will see that: If all the Consumer(s) are busy consuming the messages (Which means no Consumer is in the waiting state), then the TransferQueue.transfer(e) method will be blocked (fall into waiting state).
TransferQueue_transfer_ex1.java
package org.o7planning.transferqueue.aa;

import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TransferQueue;

public class TransferQueue_transfer_ex1 {

    public static void main(String[] args) {
        TransferQueue<String> queue = new LinkedTransferQueue<String>();

        queue.add("NORMAL-MESSAGE 1");
        queue.add("NORMAL-MESSAGE 2");
        queue.add("NORMAL-MESSAGE 3");

        Producer producer1 = new Producer(queue);
        Consumer consumer1 = new Consumer(queue);
        Consumer consumer2 = new Consumer(queue);

        // Start..
        producer1.start();
        
        consumer1.start();
        consumer2.start();
    }
}

class Producer extends Thread {
    final TransferQueue<String> queue;
    private static int count = 1;

    Producer(TransferQueue<String> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            while (true) {
                String message = "IMPORTANT-MESSAGE " + count++;
                System.out.println("[PRODUCER] Transfering: " + message);
                this.queue.transfer(message);
                System.out.println("[PRODUCER] Transfered: " + message + " (**)");
            }
        } catch (InterruptedException ex) {
        }
    }
}
class Consumer extends Thread {
    final TransferQueue<String> queue;

    Consumer(TransferQueue<String> queue) {
        this.queue = queue;
    }
    @Override
    public void run() {
        try {
            while (true) {
                String message = queue.take();
                this.longConsume(message);
            }
        } catch (InterruptedException ex) {
        }
    }
    // Need 2 seconds to consume the message.
    private void longConsume(String message) throws InterruptedException  {
        System.out.println(" [CONSUMER] Consuming: " + message);
        Thread.sleep(2 * 1000); // 2 seconds.
        System.out.println(" [CONSUMER] Consumed: " + message);
    }
}
Output:
[PRODUCER] Transfering: IMPORTANT-MESSAGE 1
 [CONSUMER] Consuming: NORMAL-MESSAGE 1
 [CONSUMER] Consuming: NORMAL-MESSAGE 2
 [CONSUMER] Consumed: NORMAL-MESSAGE 1
 [CONSUMER] Consumed: NORMAL-MESSAGE 2
 [CONSUMER] Consuming: NORMAL-MESSAGE 3
 [CONSUMER] Consuming: IMPORTANT-MESSAGE 1
[PRODUCER] Transfered: IMPORTANT-MESSAGE 1 (**)
[PRODUCER] Transfering: IMPORTANT-MESSAGE 2
 [CONSUMER] Consumed: NORMAL-MESSAGE 3
 [CONSUMER] Consuming: IMPORTANT-MESSAGE 2
 [CONSUMER] Consumed: IMPORTANT-MESSAGE 1
[PRODUCER] Transfered: IMPORTANT-MESSAGE 2 (**)
[PRODUCER] Transfering: IMPORTANT-MESSAGE 3
[PRODUCER] Transfered: IMPORTANT-MESSAGE 3 (**)
[PRODUCER] Transfering: IMPORTANT-MESSAGE 4
 [CONSUMER] Consuming: IMPORTANT-MESSAGE 3
 [CONSUMER] Consumed: IMPORTANT-MESSAGE 2
 [CONSUMER] Consuming: IMPORTANT-MESSAGE 4
[PRODUCER] Transfered: IMPORTANT-MESSAGE 4 (**)
[PRODUCER] Transfering: IMPORTANT-MESSAGE 5
 [CONSUMER] Consumed: IMPORTANT-MESSAGE 3
 [CONSUMER] Consuming: IMPORTANT-MESSAGE 5
[PRODUCER] Transfered: IMPORTANT-MESSAGE 5 (**)
...
The following example shows how to use the TransferQueue.tryTransfer(e) method. In this example the Producer creates messages and tries to transfer them to the waiting Consumer.
Looking at the output of this example, you will see that there are a lot of messages created by the Producer that will be ignored because at the time of calling the TransferQueue.tryTransfer(e) method, there is no Consumer waiting.
TransferQueue_tryTransfer_ex1.java
package org.o7planning.transferqueue.bb;

import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TransferQueue;

public class TransferQueue_tryTransfer_ex1 {

    public static void main(String[] args) {
        TransferQueue<String> queue = new LinkedTransferQueue<String>();
        queue.add("NORMAL-MESSAGE 1");
        queue.add("NORMAL-MESSAGE 2");
        queue.add("NORMAL-MESSAGE 3");

        Producer producer1 = new Producer(queue);
        Consumer consumer1 = new Consumer(queue);
        Consumer consumer2 = new Consumer(queue);

        // Start..
        producer1.start();
        consumer1.start();
        consumer2.start();
    }
}
class Producer extends Thread {
    final TransferQueue<String> queue;
    private static int count = 1;

    public Producer(TransferQueue<String> queue) {
        this.queue = queue;
    }
    @Override
    public void run() {
        try {
            while (true) {
                String message = "IMPORTANT-MESSAGE " + count++;
                this.queue.tryTransfer(message); // Calling tryTransfer method.
                Thread.sleep(1 * 1000); // 1 seconds.
            }
        } catch (InterruptedException ex) {
        }
    }
}
class Consumer extends Thread {
    final TransferQueue<String> queue;

    Consumer(TransferQueue<String> queue) {
        this.queue = queue;
    }
    @Override
    public void run() {
        try {
            while (true) {
                String message = queue.take();
                System.out.println(">> " + message);
                Thread.sleep(3 * 1000); // 3 seconds
            }
        } catch (InterruptedException ex) {
        }
    }
}
Output:
>> NORMAL-MESSAGE 1
>> NORMAL-MESSAGE 2
>> NORMAL-MESSAGE 3
>> IMPORTANT-MESSAGE 4
>> IMPORTANT-MESSAGE 7
>> IMPORTANT-MESSAGE 8
>> IMPORTANT-MESSAGE 10
>> IMPORTANT-MESSAGE 11
>> IMPORTANT-MESSAGE 13
>> IMPORTANT-MESSAGE 14
>> IMPORTANT-MESSAGE 16
>> IMPORTANT-MESSAGE 17
Messages created by the Producer were ignored:
  • IMPORTANT-MESSAGE 1
  • IMPORTANT-MESSAGE 2
  • IMPORTANT-MESSAGE 3
  • IMPORTANT-MESSAGE 5
  • IMPORTANT-MESSAGE 6
  • ...

4. getWaitingConsumerCount()

int getWaitingConsumerCount();
Returns the estimated number of consumers waiting to receive an element from this TransferQueue via the BlockingQueue.take() or BlockingQueue.poll(timeout,unit) method.
The return value is an approximation of a momentary state of affairs, that may be inaccurate if consumers have completed or given up waiting. The value may be useful for monitoring and heuristics, but not for synchronization control. Implementations of this method are likely to be noticeably slower than those for hasWaitingConsumer().

5. hasWaitingConsumer()

boolean hasWaitingConsumer();
Returns true if at least one consumer is waiting to receive an element via the BlockingQueue.take() or BlockingQueue.poll(timeout,unit) method. The return value represents a transient state of affairs.

6. transfer(E)

void transfer(E e) throws InterruptedException;
Add an element to this TransferQueue and wait until it is received by a waiting consumer via the BlockingQueue.take() or BlockingQueue.poll(timeout,unit) method.

7. tryTransfer(E)

boolean tryTransfer(E e);
The tryTransfer(e) method only adds an element to this TransferQueue if there is a consumer waiting to receive the element via the BlockingQueue.take() or BlockingQueue.poll(timeout,unit) method, and make sure that the consumer will receive this element immediately. Otherwise, the method returns false and no other action is taken.

8. tryTransfer(E, long, TimeUnit)

boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException;
The tryTransfer(e,timeout,unit) method only adds an element to this TransferQueue if within a specified waiting period there is a consumer waiting to receive the element via the BlockingQueue.take( ) or BlockingQueue.poll(timeout,unit), and make sure the consumer receives this element. Otherwise, the method returns false and no other action is taken.