o7planning

Java BlockingQueue Tutorial with Examples

Follow us on our fanpages to receive notifications every time there are new articles. Facebook Twitter

1- BlockingQueue

BlockingQueue is a subinterface of Queue, which provides additional operations and it is useful in situations where the queue is empty or full of elements.

public interface BlockingQueue<E> extends Queue<E>
The difference between Queue and BlockingQueue is shown by the methods they provide:
Interface Queue<E>  
BlockingQueue<E>
Action Throws exception Special value Blocks Times out
Insert boolean add(e) boolean offer(e) void put(e) boolean offer(e, time, unit)
Remove E remove() E poll() E take() E poll(time, unit)
Examine E element() E peek()    
take()/poll(time,unit)
As we know, the remove(), element(), poll() and peek() methods of the Queue interface return the element at the head of the queue, which will either immediately throw an exception or return null if the queue does not contain any elements. Such operations are not good enough in a multithreading environment, so the BlockingQueue interface provides new methods take() and poll(time,unit).
  • take(): Returns the head element and removes it from the queue. If the queue is empty, the method will wait until an element is available in the queue.
  • poll(timeout,unit): Returns the head element and removes it from the queue. If the queue is empty, the method will wait for an element to be available for a specified amount of time. If the timeout ends with no available elements the method will return null.
put(e)/offer(e,time,unit)
The add(e) and offer(e) methods of the Queue interface are used to add an element to the queue. They will either immediately throw an exception or return false if the queue is full. The BlockingQueue interface provides put(e) and offer(e,timeout,unit) methods for the same purpose, but they have more special features.
  • put(e): Insert an element into the queue. If the queue is full, this method will wait until there is a available space to insert.
  • offer(e,timeout,unit): Insert an element into the queue. If the queue is full, the method will wait for an available space to insert for a specified amount of time. If the timeout ends without any available space, no action will be taken and the method returns false.
The class and interface hierarchy related to the BlockingQueue interface:
The Characteristics of BlockingQueue:
  • BlockingQueue does not accept null elements, if you intentionally add a null element to this queue, a NullPointerException will be thrown.
  • A BlockingQueue can be limited in capacity. The remainingCapacity() method returns the remaining capacity of this queue, or Integer.MAX_VALUE if the queue is not limited in capacity.
  • BlockingQueue is commonly used in Producer & Consumer type applications. BlockingQueue is a descendant of the Collection interface, so the remove(e) method is also supported. However, such methods work inefficiently and for occasional use only. For example, remove a defective product from the queue.
  • BlockingQueue is a thread-safe queue. All queue methods are atomic operations. However, methods inherited from the Collection interface such as addAll, containsAll, retainAll and removeAll are not necessarily atomic operations, it depends on the class implementing the BockingQueue interface. So, maybe, for example, calling addAll(aCollection) could throw an exception if another thread adds an element to the aCollection at the same time.
  • BlockingQueue does not support methods like "close" or "shutdown", such as when the Producer wants to send a signal that no more "products" will be added to the queue. The need and use of these features tends to be implementation-dependent. The solution could be: A final and special "product" is added to the queue as a signal to tell the Consumer that this is the last product to be added to the queue.
See also:
  • TODO Link?

2- BlockingQueue methods

List of methods of the BlockingQueue<E> interface:

void put(E e) throws InterruptedException;
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
E take() throws InterruptedException;
E poll(long timeout, TimeUnit unit) throws InterruptedException;
int remainingCapacity();
int drainTo(Collection<? super E> c);  
int drainTo(Collection<? super E> c, int maxElements);
Methods that are inherited from the Queue<E> interface:

boolean add(E e);
boolean offer(E e);
E remove();
E poll();
E element();
E peek();
The methods that are inherited from the Collection<E> interface:

int size();
boolean isEmpty();
boolean contains(Object o);
Iterator<E> iterator();
Object[] toArray();
<T> T[] toArray(T[] a);
boolean add(E e);
boolean remove(Object o);
boolean containsAll(Collection<?> c);  
boolean addAll(Collection<? extends E> c);
boolean removeAll(Collection<?> c);
boolean retainAll(Collection<?> c);
void clear();  
boolean equals(Object o);
int hashCode();

default <T> T[] toArray(IntFunction<T[]> generator)  
default boolean removeIf(Predicate<? super E> filter)
default Spliterator<E> spliterator()  
default Stream<E> stream()  
default Stream<E> parallelStream() 

3- Example:

The Producer/Consumer model is a good example of using the BlockingQueue interface. Products created by producers are added to a queue before they are taken out by consumers.
  • Producer threads call the BlockingQueue.put(e) method to add products to a BlockingQueue. If the queue is full, the put(e) method will wait until there is available space.
  • Consumer threads call BlockingQueue.take() method to retrieve products from queue. If queue is empty, this method will wait until product is available.
See the full code of the example:
The Product class simulates a product.
Product.java

package org.o7planning.blockingqueue.ex;

public class Product {
    private String name;
    private int serial;

    public Product(String name, int serial) {
        this.name = name;
        this.serial = serial;
    }
    public String getInfo() {
        return "Product: " + this.name + ". Serial: " + this.serial;
    }
}
The Consumer class simulates the consumer.
Consumer.java

package org.o7planning.blockingqueue.ex;

import java.util.concurrent.BlockingQueue;

public class Consumer implements Runnable {
    private String consumerName;
    private final BlockingQueue<Product> queue;

    public Consumer(String consumerName, BlockingQueue<Product> q) {
        this.consumerName = consumerName;
        this.queue = q;
    }
    @Override
    public void run() {
        try {
            while (true) {
                this.consume(queue.take());
            }
        } catch (InterruptedException ex) {
        }
    }  
    private void consume(Product x) {
        System.out.println(" --> " + this.consumerName + " >> Consume: " + x.getInfo());
    }
}
The Producer class simulates the producer.
Producer.java

package org.o7planning.blockingqueue.ex;

import java.util.concurrent.BlockingQueue;

public class Producer implements Runnable {
    private static int serial = 1;

    private final String producerName;
    private final BlockingQueue<Product> queue;
    private final int delay; // Seconds

    public Producer(String producerName, int delay, BlockingQueue<Product> q) {
        this.producerName = producerName;
        this.delay = delay;
        this.queue = q;
    }
    @Override
    public void run() {
        try {
            while (true) {
                Thread.sleep(this.delay * 1000); // 'delay' second.
                this.queue.put(this.produce());
            }
        } catch (InterruptedException ex) {
        }
    }
    private Product produce() {
        System.out.println("#" + this.producerName + " >> Create a new product!");
        return new Product("IPhone", serial++);
    }
}
The Setup class is used to operate the Producer/Consumer system:
Setup.java

package org.o7planning.blockingqueue.ex;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class Setup {
    public static void main(String[] args) {
        // Create a BlockingQueue with a capacity of 5.
        BlockingQueue<Product> q = new ArrayBlockingQueue<Product>(5);
        Producer producer1 = new Producer("Producer 01", 2, q);
        Producer producer2 = new Producer("Producer 02", 1, q);
        Consumer consumer1 = new Consumer("Consumer 01", q);
        Consumer consumer2 = new Consumer("Consumer 02", q);
        Consumer consumer3 = new Consumer("Consumer 03", q);

        // Starting the threads
        new Thread(producer1).start();
        new Thread(producer2).start();
        new Thread(consumer1).start();
        new Thread(consumer2).start();
        new Thread(consumer3).start();
    }
}
Run the above example and you will get the following output:

#Producer 02 >> Create a new product!
 --> Consumer 03 >> Consume: Product: IPhone. Serial: 1
#Producer 01 >> Create a new product!
 --> Consumer 01 >> Consume: Product: IPhone. Serial: 2
#Producer 02 >> Create a new product!
 --> Consumer 02 >> Consume: Product: IPhone. Serial: 3
#Producer 02 >> Create a new product!
 --> Consumer 03 >> Consume: Product: IPhone. Serial: 4
#Producer 01 >> Create a new product!
 --> Consumer 01 >> Consume: Product: IPhone. Serial: 5
#Producer 02 >> Create a new product!
 --> Consumer 02 >> Consume: Product: IPhone. Serial: 6
#Producer 02 >> Create a new product!
 --> Consumer 03 >> Consume: Product: IPhone. Serial: 7
...

4- drainTo(Collection<? super E>)


int drainTo(Collection<? super E> c); 
Removes all elements from this BlockingQueue and adds them to a specified Collection. Using this method is more efficient than calling poll() or remove() multiple times.
The drainTo(Collection) method ensures that either all elements will be successfully moved to the Collection, or no elements will be moved to the Collection if an error occurs.

5- drainTo(Collection<? super E>, int)


int drainTo(Collection<? super E> c, int maxElements);
Removes up to maxElements elements from this BlockingQueue and adds them to a specified Collection. Using this method is more efficient than calling poll() or remove() multiple times.
If an error occurs, no element will be removed from this BlockingQueue and no element will be added to the Collection.

6- offer(E, long, TimeUnit)


boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
Inserts a specified element into the queue. If the queue is full, the method will wait for an available space to insert, in a specified amount of time. If the timeout ends without any available space, no action will be taken and the method returns false.
Example:

queue.offer(e, 5, TimeUnit.HOURS); // 5 hours.

7- poll(long, TimeUnit)


E poll(long timeout, TimeUnit unit) throws InterruptedException;
Returns the head element and removes it from the queue. If the queue is empty, the method will wait for an element to be available in a specified amount of time. If the timeout ends with no available elements, the method will return null.
Example:

E e = queue.offer(2, TimeUnit.HOURS); // 2 hours

8- put(E e)


void put(E e) throws InterruptedException;
Insert an element into the queue. If the queue is full this method will wait until there is an available space to insert.

9- remainingCapacity()


int remainingCapacity();
Returns the remaining capacity of this queue, or Integer.MAX_VALUE if the queue is not limited in capacity.
The ArrayBlockingQueue class allows to create a BlockingQueue with specifying the maximum number of elements.

10- take()


E take() throws InterruptedException;
Returns the head element and removes it from the queue. If the queue is empty, the method will wait until an element is available in the queue.