o7planning

Java PriorityBlockingQueue Tutorial with Examples

Follow us on our fanpages to receive notifications every time there are new articles. Facebook Twitter

1- PriorityBlockingQueue

PriorityBlockingQueue<E> is a class that represents a queue with elements sorted by priority, similar to the PriorityQueue<E> class. The difference is that it implements the BlockingQueue<E> interface, so has additional features specified by this interface.
You should preview the article about the BlockingQueue interface to understand more detail about the features of this interface with basic examples.

public class PriorityBlockingQueue<E> extends AbstractQueue<E>
             implements BlockingQueue<E>, java.io.Serializable
The characteristics of PriorityBlockingQueue:
  • A queue with unlimited capacity.
  • Duplicate elements are allowed, but null elements are not allowed.
  • The elements are sorted in ascending order according to a supplied Comparator or the elements are sorted in their natural order (Depending on the constructor used).
  • PriorityBlockingQueue and its Iterator support all the optional methods specified in the Collection and Iterator interfaces.
Basically, PriorityBlockingQueue manages an internal array to store elements, which can be replaced by a new array if the number of elements of the queue increases.
The Iterator object obtained from the iterator() method and the Spliterator object obtained from the spliterator() method do not guarantee that traversing the elements of this queue in an order. If you want, consider using Arrays.sort(queue.toArray()).
Example:
PriorityBlockingQueue_traverse_ex1.java

// Create a queue that sorts its elements in natural order.
BlockingQueue<String> queue = new PriorityBlockingQueue<>();
queue.add("Rose");
queue.add("Lotus");
queue.add("Jasmine");
queue.add("Sunflower");
queue.add("Daisy");

System.out.println(queue); // [Daisy, Jasmine, Lotus, Sunflower, Rose] (Not ordered)
String[] array = new String[queue.size()];
queue.toArray(array);
Arrays.sort(array);
for(String flower: array)  {
    System.out.println(flower);
}
Output:

[Daisy, Jasmine, Lotus, Sunflower, Rose]
Daisy
Jasmine
Lotus
Rose
Sunflower

2- Constructors


public PriorityBlockingQueue()
public PriorityBlockingQueue(int initialCapacity)  
public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator)  
public PriorityBlockingQueue(Collection<? extends E> c) 
See detailed explanation for each constructor below.

3- Example

See also the BockingQueue article for basic examples.
As is known, the Producer/Consumer model is a good example of using the BlockingQueue interface. Products created by producers are added to a queue before they are taken by consumers. The use of a priority queue is necessary in some cases. For example, products with a short shelf life must have a higher priority to be consumed by consumers soon.
Let's see a simple example:
The Product class represents products with name and shelf life information:
Product.java

package org.o7planning.priorityblockingqueue.ex;

public class Product {
    private String name;
    private int shelfLife;

    public Product(String name, int shelfLife) {
        this.name = name;
        this.shelfLife = shelfLife;
    }  
    public int getShelfLife() {
        return shelfLife;
    }
    @Override
    public String toString() {
        return this.name + ":" + this.shelfLife;
    }
}
The ProductComparator class is used to compare Product objects. The product with the greater shelf life is considered greater. The product with the smallest shelf life will be at the head of the queue. 
ProductComparator.java

package org.o7planning.priorityblockingqueue.ex;

import java.util.Comparator;

public class ProductComparator implements Comparator<Product> {

    @Override
    public int compare(Product o1, Product o2) {
        return o1.getShelfLife() - o2.getShelfLife();
    }
}
  • TODO Link?
Producer.java

package org.o7planning.priorityblockingqueue.ex;

import java.util.Random;
import java.util.concurrent.BlockingQueue;

public class Producer implements Runnable {  
    private final String producerName;
    private final BlockingQueue<Product> queue;
    private final int delay; // Seconds

    public Producer(String producerName, int delay, BlockingQueue<Product> q) {
        this.producerName = producerName;
        this.delay = delay;
        this.queue = q;
    }
    @Override
    public void run() {
        try {
            while (true) {
                Thread.sleep(this.delay * 1000); // 'delay' seconds.
                Product newProduct = this.produce();
                System.out.println("\n#" + this.producerName + " >> New Product: " + newProduct);
                this.queue.put(newProduct);
                // Printed results may not be sorted (***):
                System.out.println("  Current products in queue: " + this.queue + " (***)");
            }
        } catch (InterruptedException ex) {
        }
    }
    private Product produce() {
        int shelfLife = new Random().nextInt(3) + 3;
        return new Product("Apple", shelfLife);
    }
}
Consumer.java

package org.o7planning.priorityblockingqueue.ex;

import java.util.concurrent.BlockingQueue;

public class Consumer implements Runnable {
    private String consumerName;
    private final BlockingQueue<Product> queue;

    public Consumer(String consumerName, BlockingQueue<Product> q) {
        this.consumerName = consumerName;
        this.queue = q;
    }
    @Override
    public void run() {
        try {
            while (true) {
                this.consume(queue.take());
            }
        } catch (InterruptedException ex) {
        }
    }
    // Need 1 seconds to consume a product.
    private void consume(Product x) throws InterruptedException {
        System.out.println(" --> " + this.consumerName + " >> Consuming: " + x);
        Thread.sleep(1 * 1000); // 1 seconds.
    }
}
Setup.java

package org.o7planning.priorityblockingqueue.ex;

import java.util.Comparator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;

public class Setup {
    public static void main(String[] args) {
        // Comparator
        Comparator<Product> comparator = new ProductComparator();
        // Create a PriorityBlockingQueue with a capacity of 100.
        BlockingQueue<Product> queue = new PriorityBlockingQueue<Product>(100, comparator);
        queue.add(new Product("Banana", 5));
        queue.add(new Product("Banana", 2));
        queue.add(new Product("Banana", 7));
        queue.add(new Product("Banana", 3));
        queue.add(new Product("Banana", 1));  
        
        Producer producer1 = new Producer("Producer 01", 2, queue);
        Producer producer2 = new Producer("Producer 02", 3, queue);
        
        Consumer consumer1 = new Consumer("Consumer 01", queue);
        Consumer consumer2 = new Consumer("Consumer 02", queue);
        Consumer consumer3 = new Consumer("Consumer 03", queue);

        // Starting Producer threads
        new Thread(producer1).start();
        new Thread(producer2).start();
        // Starting Consumer threads
        new Thread(consumer1).start();
    }
}
(***) Note: In this example we use the System.out.println(queue) method to print out all the elements of the queue. However, this method does not guarantee the order of the elements.
Output:

--> Consumer 01 >> Consuming: Banana:1
--> Consumer 01 >> Consuming: Banana:2

#Producer 01 >> New Product: Apple:4
Current products in queue: [Banana:3, Apple:4, Banana:7, Banana:5] (***)
--> Consumer 01 >> Consuming: Banana:3

#Producer 02 >> New Product: Apple:3
Current products in queue: [Apple:3, Apple:4, Banana:7, Banana:5] (***)
--> Consumer 01 >> Consuming: Apple:3

#Producer 01 >> New Product: Apple:5
Current products in queue: [Apple:4, Banana:5, Banana:7, Apple:5] (***)
--> Consumer 01 >> Consuming: Apple:4
--> Consumer 01 >> Consuming: Apple:5

#Producer 02 >> New Product: Apple:4
Current products in queue: [Apple:4, Banana:7, Banana:5] (***)

#Producer 01 >> New Product: Apple:5
Current products in queue: [Apple:4, Apple:5, Banana:5, Banana:7] (***)
--> Consumer 01 >> Consuming: Apple:4
--> Consumer 01 >> Consuming: Apple:5
...

4- PriorityBlockingQueue()


public PriorityBlockingQueue()
Create a PriorityBlockingQueue with default initial capacity (11). The elements of this queue will be sorted in their natural order, which requires that all elements must be objects of the Comparable interface.
  • TODO Link?

5- PriorityBlockingQueue(int)


public PriorityBlockingQueue(int initialCapacity)  
Creates a PriorityBlockingQueue with the specified initial capacity. The elements of this queue will be sorted in their natural order, which requires that all elements be objects of the Comparable interface.
  • TODO Link?

6- PriorityBlockingQueue(int, Comparator)


public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator) 
Creates a PriorityBlockingQueue with the specified initial capacity and a provided Comparator to sort its elements. If the comparator is null, the natural order of the elements is used, which requires all elements must be objects of the Comparable interface.

7- PriorityBlockingQueue(Collection)


public PriorityBlockingQueue(Collection<? extends E> c)
Creates a PriorityBlockingQueue containing all the elements of the specified Collection.
If this specified Collection is a SortedSet or a PriorityQueue, this PriorityBlockingQueue will use the same Comparator. Otherwise, the natural order of the elements will be used, which requires all elements must be objects of the Comparable interface.

8- Methods

The methods that are inherited from the BockingQueue<E> interface:

void put(E e) throws InterruptedException;
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
E take() throws InterruptedException;
E poll(long timeout, TimeUnit unit) throws InterruptedException;
int remainingCapacity();
int drainTo(Collection<? super E> c);  
int drainTo(Collection<? super E> c, int maxElements);
See the article on BlockingQueue to learn how to use the above methods.
Methods that are inherited from the Queue<E> interface:

boolean add(E e);
boolean offer(E e);
E remove();
E poll();
E element();
E peek();
The methods that are inherited from the Collection<E> interface:

int size();
boolean isEmpty();
boolean contains(Object o);
Iterator<E> iterator();
Object[] toArray();
<T> T[] toArray(T[] a);
boolean add(E e);
boolean remove(Object o);
boolean containsAll(Collection<?> c);  
boolean addAll(Collection<? extends E> c);
boolean removeAll(Collection<?> c);
boolean retainAll(Collection<?> c);
void clear();  
boolean equals(Object o);
int hashCode();

default <T> T[] toArray(IntFunction<T[]> generator)  
default boolean removeIf(Predicate<? super E> filter)
default Spliterator<E> spliterator()  
default Stream<E> stream()  
default Stream<E> parallelStream()
  • TODO Link?