openplanning

Hướng dẫn và ví dụ Java PriorityBlockingQueue

Xem thêm các chuyên mục:

Hãy theo dõi chúng tôi trên Fanpage để nhận được thông báo mỗi khi có bài viết mới. Facebook

1- PriorityBlockingQueue

PriorityBlockingQueue<E> là một lớp đại diện cho một hàng đợi với các phần tử được sắp xếp theo thứ tự ưu tiên, tương tự như lớp PriorityQueue<E>. Điều khác biệt là nó thi hành interface BlockingQueue<E>, vì vậy có thêm các tính năng được đặc tả bởi interface này.
Bạn nên xem trước bài viết về interface BlockingQueue để hiểu chi tiết về các tính năng của interface này với các ví dụ cơ bản.

public class PriorityBlockingQueue<E> extends AbstractQueue<E>
             implements BlockingQueue<E>, java.io.Serializable
Các đặc điểm của PriorityBlockingQueue:
  • Là một hàng đợi với dung lượng không bị giới hạn.
  • Cho phép các phần tử trùng lặp nhưng không cho phép các phần tử null.
  • Các phần tử được xắp xếp tăng dần theo một Comparator (bộ so sánh) được cung cấp hoặc các phần tử được sắp xếp theo thứ tự tự nhiên của chúng (Tuỳ thuộc vào constructor được sử dụng).
  • PriorityBlockingQueueIterator của nó hỗ trợ tất cả các phương thức tuỳ chọn được đặc tả trong interface CollectionIterator.
Về cơ bản, PriorityBlockingQueue quản lý một mảng nội bộ để lưu trữ các phần tử, mảng này có thể được thay thế bởi một mảng mới nếu số lượng phần tử của hàng đợi tăng thêm.
Đối tượng Iterator thu được từ phương thức iterator() và đối tượng Spliterator thu được từ phương thức spliterator() không đảm bảo rằng việc đi qua các phần tử của hàng đợi này theo một thứ tự. Nếu muốn, hãy cân nhắc sử dụng Arrays.sort(queue.toArray()).
Ví dụ:
PriorityBlockingQueue_traverse_ex1.java

// Create a queue that sorts its elements in natural order.
BlockingQueue<String> queue = new PriorityBlockingQueue<>();
queue.add("Rose");
queue.add("Lotus");
queue.add("Jasmine");
queue.add("Sunflower");
queue.add("Daisy");

System.out.println(queue); // [Daisy, Jasmine, Lotus, Sunflower, Rose] (Not ordered)
String[] array = new String[queue.size()];
queue.toArray(array);
Arrays.sort(array);
for(String flower: array)  {
    System.out.println(flower);
}
Output:

[Daisy, Jasmine, Lotus, Sunflower, Rose]
Daisy
Jasmine
Lotus
Rose
Sunflower

2- Constructors


public PriorityBlockingQueue()
public PriorityBlockingQueue(int initialCapacity)  
public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator)  
public PriorityBlockingQueue(Collection<? extends E> c) 
Xem giải thích chi tiết cho từng constructor ở phần dưới.

3- Ví dụ

Xem thêm bài viết về BockingQueue để có các ví dụ cơ bản.
Như đã biết, mô hình Producer/Consumer (Nhà sản xuất và người tiêu thụ) là một ví dụ điển hình trong việc sử dụng interface BlockingQueue. Các sản phẩm được tạo ra bởi những nhà sản xuất được thêm vào một hàng đợi trước khi chúng được lấy ra bởi những người tiêu thụ. Việc sử dụng một hàng đợi ưu tiên là cần thiết trong một vài trường hợp. Chẳng hạn, các sản phẩm với thời hạn sử dụng ngắn phải có sự ưu tiên cao hơn để sớm được tiêu thụ bởi người tiêu dùng.
Hãy xem một ví dụ đơn giản:
Lớp Product đại diện cho các sản phẩm với thông tin tên và thời hạn sử dụng:
Product.java

package org.o7planning.priorityblockingqueue.ex;

public class Product {
    private String name;
    private int shelfLife;

    public Product(String name, int shelfLife) {
        this.name = name;
        this.shelfLife = shelfLife;
    }  
    public int getShelfLife() {
        return shelfLife;
    }
    @Override
    public String toString() {
        return this.name + ":" + this.shelfLife;
    }
}
Lớp ProductComparator được sử dụng để so sánh các đối tượng Product. Sản phẩm nào có thời hạn sử dụng lớn hơn được coi là lớn hơn. Sản phẩm với thời hạn sử dụng nhỏ nhất sẽ được đứng ở đầu hàng đợi.
ProductComparator.java

package org.o7planning.priorityblockingqueue.ex;

import java.util.Comparator;

public class ProductComparator implements Comparator<Product> {

    @Override
    public int compare(Product o1, Product o2) {
        return o1.getShelfLife() - o2.getShelfLife();
    }
}
  • TODO Link?
Producer.java

package org.o7planning.priorityblockingqueue.ex;

import java.util.Random;
import java.util.concurrent.BlockingQueue;

public class Producer implements Runnable {  
    private final String producerName;
    private final BlockingQueue<Product> queue;
    private final int delay; // Seconds

    public Producer(String producerName, int delay, BlockingQueue<Product> q) {
        this.producerName = producerName;
        this.delay = delay;
        this.queue = q;
    }
    @Override
    public void run() {
        try {
            while (true) {
                Thread.sleep(this.delay * 1000); // 'delay' seconds.
                Product newProduct = this.produce();
                System.out.println("\n#" + this.producerName + " >> New Product: " + newProduct);
                this.queue.put(newProduct);
                // Printed results may not be sorted (***):
                System.out.println("  Current products in queue: " + this.queue + " (***)");
            }
        } catch (InterruptedException ex) {
        }
    }
    private Product produce() {
        int shelfLife = new Random().nextInt(3) + 3;
        return new Product("Apple", shelfLife);
    }
}
Consumer.java

package org.o7planning.priorityblockingqueue.ex;

import java.util.concurrent.BlockingQueue;

public class Consumer implements Runnable {
    private String consumerName;
    private final BlockingQueue<Product> queue;

    public Consumer(String consumerName, BlockingQueue<Product> q) {
        this.consumerName = consumerName;
        this.queue = q;
    }
    @Override
    public void run() {
        try {
            while (true) {
                this.consume(queue.take());
            }
        } catch (InterruptedException ex) {
        }
    }
    // Need 1 seconds to consume a product.
    private void consume(Product x) throws InterruptedException {
        System.out.println(" --> " + this.consumerName + " >> Consuming: " + x);
        Thread.sleep(1 * 1000); // 1 seconds.
    }
}
Setup.java

package org.o7planning.priorityblockingqueue.ex;

import java.util.Comparator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;

public class Setup {
    public static void main(String[] args) {
        // Comparator
        Comparator<Product> comparator = new ProductComparator();
        // Create a PriorityBlockingQueue with a capacity of 100.
        BlockingQueue<Product> queue = new PriorityBlockingQueue<Product>(100, comparator);
        queue.add(new Product("Banana", 5));
        queue.add(new Product("Banana", 2));
        queue.add(new Product("Banana", 7));
        queue.add(new Product("Banana", 3));
        queue.add(new Product("Banana", 1));  
        
        Producer producer1 = new Producer("Producer 01", 2, queue);
        Producer producer2 = new Producer("Producer 02", 3, queue);
        
        Consumer consumer1 = new Consumer("Consumer 01", queue);
        Consumer consumer2 = new Consumer("Consumer 02", queue);
        Consumer consumer3 = new Consumer("Consumer 03", queue);

        // Starting Producer threads
        new Thread(producer1).start();
        new Thread(producer2).start();
        // Starting Consumer threads
        new Thread(consumer1).start();
    }
}
(***) Chú ý: Trong ví dụ này chúng ta sử dụng phương thức System.out.println(queue) để in ra tất cả các phần tử của hàng đợi. Tuy nhiên, phương thức này không đảm bảo thứ tự của các phần tử.
Output:

--> Consumer 01 >> Consuming: Banana:1
--> Consumer 01 >> Consuming: Banana:2

#Producer 01 >> New Product: Apple:4
Current products in queue: [Banana:3, Apple:4, Banana:7, Banana:5] (***)
--> Consumer 01 >> Consuming: Banana:3

#Producer 02 >> New Product: Apple:3
Current products in queue: [Apple:3, Apple:4, Banana:7, Banana:5] (***)
--> Consumer 01 >> Consuming: Apple:3

#Producer 01 >> New Product: Apple:5
Current products in queue: [Apple:4, Banana:5, Banana:7, Apple:5] (***)
--> Consumer 01 >> Consuming: Apple:4
--> Consumer 01 >> Consuming: Apple:5

#Producer 02 >> New Product: Apple:4
Current products in queue: [Apple:4, Banana:7, Banana:5] (***)

#Producer 01 >> New Product: Apple:5
Current products in queue: [Apple:4, Apple:5, Banana:5, Banana:7] (***)
--> Consumer 01 >> Consuming: Apple:4
--> Consumer 01 >> Consuming: Apple:5
...

4- PriorityBlockingQueue()


public PriorityBlockingQueue()
Tạo một PriorityBlockingQueue với dung lượng ban đầu mặc định (11). Các phần tử của hàng đợi này sẽ được sắp xếp theo thứ tự tự nhiên của chúng, điều này đòi hỏi tất cả các phần tử phải là đối tượng của interface Comparable.
  • TODO Link?

5- PriorityBlockingQueue(int)


public PriorityBlockingQueue(int initialCapacity)  
Tạo một PriorityBlockingQueue với dung lượng ban đầu được chỉ định. Các phần tử của hàng đợi này sẽ được sắp xếp theo thứ tự tự nhiên của chúng, điều này đòi hỏi tất cả các phần tử phải là đối tượng của interface Comparable.
  • TODO Link?

6- PriorityBlockingQueue(int, Comparator)


public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator) 
Tạo một PriorityBlockingQueue với dung lượng ban đầu được chỉ định và Comparator được cung cấp để sắp xếp các phần tử của nó. Nếu comparatornull, thứ tự tự nhiên của các phần tử sẽ được sử dụng, điều này yêu cầu tất cả các phần tử phải là đối tượng của interface Comparable.

7- PriorityBlockingQueue(Collection)


public PriorityBlockingQueue(Collection<? extends E> c)
Tạo một PriorityBlockingQueue chứa tất cả các phần tử của Collection được chỉ định.
Nếu Collection được chỉ định này là một SortedSet hoặc PriorityQueue, PriorityBlockingQueue này sẽ sử dụng cùng một Comparator. Ngược lại, thứ tự tự nhiên của các phần tử sẽ được sử dụng, điều này yêu cầu tất cả các phần tử phải là đối tượng của interface Comparable.

8- Methods

Các phương thức được thừa kế từ interface BockingQueue<E>:

void put(E e) throws InterruptedException;
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
E take() throws InterruptedException;
E poll(long timeout, TimeUnit unit) throws InterruptedException;
int remainingCapacity();
int drainTo(Collection<? super E> c);  
int drainTo(Collection<? super E> c, int maxElements);
Xem bài viết về BlockingQueue để tìm hiểu cách sử dụng các phương thức trên.
Các phương thức được thừa kế từ interface Queue<E>:

boolean add(E e);
boolean offer(E e);
E remove();
E poll();
E element();
E peek();
Các phương thức được thừa kế từ interface Collection<E>:

int size();
boolean isEmpty();
boolean contains(Object o);
Iterator<E> iterator();
Object[] toArray();
<T> T[] toArray(T[] a);
boolean add(E e);
boolean remove(Object o);
boolean containsAll(Collection<?> c);  
boolean addAll(Collection<? extends E> c);
boolean removeAll(Collection<?> c);
boolean retainAll(Collection<?> c);
void clear();  
boolean equals(Object o);
int hashCode();

default <T> T[] toArray(IntFunction<T[]> generator)  
default boolean removeIf(Predicate<? super E> filter)
default Spliterator<E> spliterator()  
default Stream<E> stream()  
default Stream<E> parallelStream()
  • TODO Link?

Xem thêm các chuyên mục: