openplanning

Hướng dẫn và ví dụ Java BlockingQueue

  1. BlockingQueue
  2. BlockingQueue methods
  3. Ví dụ
  4. drainTo(Collection<? super E>)
  5. drainTo(Collection<? super E>, int)
  6. offer(E, long, TimeUnit)
  7. poll(long, TimeUnit)
  8. put(E e)
  9. remainingCapacity()
  10. take()

1. BlockingQueue

BlockingQueue là một interface con của Queue, nó cung cấp các hoạt động bổ xung và có ích trong tình huống hàng đợi trống hoặc đầy các phần tử.
public interface BlockingQueue<E> extends Queue<E>
Sự khác biệt giữa QueueBlockingQueue được thể hiện bởi các phương thức mà chúng cung cấp:
Interface
Queue<E>
Interface
BlockingQueue<E>
Action
Throws exception
Special value
Blocks
Times out
Insert
boolean add(e)
boolean offer(e)
void put(e)
boolean offer(e, time, unit)
Remove
E remove()
E poll()
E take()
E poll(time, unit)
Examine
E element()
E peek()
take()/poll(time,unit)
Như đã biết, các phương thức remove(), element(), poll()peek() của interface Queue trả về phần tử đứng đầu hàng đợi, chúng sẽ ngay lập tức ném ra một ngoại lệ hoặc trả về giá trị null nếu hàng đợi không có bất kỳ một phần tử nào. Các hoạt động như vậy là không đủ tốt trong môi trường đa luồng (multithreading), vì vậy interface BlockingQueue cung cấp thêm các phương thức mới là take()poll(time,unit).
  • take(): Trả về phần tử đứng đầu và loại bỏ phần tử này ra khỏi hàng đợi. Nếu hàng đợi trống, phương thức sẽ chờ đợi cho tới khi có một phần tử khả dụng trong hàng đợi.
  • poll(timeout,unit): Trả về phần tử đứng đầu và loại bỏ phần tử này ra khỏi hàng đợi. Nếu hàng đợi trống, phương thức sẽ chờ đợi có một phần tử khả dụng trong một khoảng thời gian được chỉ định. Nếu thời gian chờ đợi kết thúc mà không có phần tử khả dụng nào phương thức sẽ trả về null.
put(e)/offer(e,time,unit)
Các phương thức add(e)offer(e) của interface Queue được sử dụng để thêm một phần tử vào hàng đợi. Chúng sẽ ngay lập tức ném ra một ngoại lệ hoặc trả về false nếu hàng đợi đầy. Interface BlockingQueue cung cấp phương thức put(e)offer(e,timeout,unit) với mục đích tương tự, tuy nhiên chúng có tính năng đặc biệt hơn.
  • put(e): Chèn một phần tử vào hàng đợi. Nếu hàng đợi đầy phương thức này sẽ chờ đợi cho tới khi có một không gian khả dụng để chèn.
  • offer(e,timeout,unit): Chèn một phần tử vào hàng đợi. Nếu hàng đợi đầy phương thức sẽ chờ đợi có một không gian khả dụng để chèn, trong một khoảng thời gian được chỉ định. Nếu thời gian chờ đợi kết thúc mà không có bất kỳ một không gian khả dụng nào, sẽ không có hành động nào được thực hiện và phương thức trả về false.
Hệ thống phân cấp các lớp và interface liên quan tới interface BlockingQueue:
Các đặc điểm của BlockingQueue:
  • BlockingQueue không chấp nhận các phần tử null, nếu cố tình thêm một phần tử null vào hàng đợi này, NullPointerException sẽ được ném ra.
  • Một BlockingQueue có thể bị giới hạn về dung lượng. Phương thức remainingCapacity() trả về dung lượng còn lại của hàng đợi này, hoặc trả về Integer.MAX_VALUE nếu hàng đợi không bị giới hạn dung lượng.
  • BlockingQueue thường được sử dụng trong các ứng dụng kiểu Producer & Consumer (Nhà sản xuất và người tiêu dùng). BlockingQueue là hậu duệ của interface Collection nên phương thức remove(e) cũng được hỗ trợ. Tuy nhiên các phương thức như vậy hoạt động không thực sự hiệu quả và chỉ nhằm mục đích sử dụng không thường xuyên. Chẳng hạn, loại bỏ một sản phẩm lỗi ra khỏi hàng đợi.
  • BlockingQueue là một hàng đợi an toàn theo luồng (thread-safe). Tất cả các phương thức hàng đợi đều là các hoạt động nguyên tử (Atomic Operations). Tuy nhiên, các phương thức được thừa kế từ interface Collection như addAll, containsAll, retainAllremoveAll không nhất thiết là các hoạt động nguyên tử, điều này phụ thuộc vào lớp triển khai interface BockingQueue. Vì vậy, có thể, chẳng hạn, gọi phương addAll(aCollection) có thể ném ra ngoại lệ nếu trong cùng một thời điểm có một thread nào đó thêm một phần tử vào aCollection.
  • BlockingQueue không hỗ trợ các phương thức kiểu như "close" (đóng) hoặc "shutdown" (tắt). Chẳng hạn khi Producer (nhà sản xuất) muốn gửi đi một tín hiệu rằng sẽ không có "sản phẩm" nào được thêm vào hàng đợi nữa. Nhu cầu và việc sử dụng các tính năng này có xu hướng phụ thuộc vào việc triển khai. Giải pháp có thể là: Một "sản phẩm" cuối cùng và đặc biệt được thêm vào hàng đợi như một tín hiệu nói với Consumer (người tiêu dùng) rằng đây là sản phẩm cuối cùng được thêm vào hàng đợi.
Xem thêm:
  • Khái niệm hoạt động nguyên tử trong khoa học máy tính

2. BlockingQueue methods

Danh sách các phương thức của interface BlockingQueue<E>:
void put(E e) throws InterruptedException;
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
E take() throws InterruptedException;
E poll(long timeout, TimeUnit unit) throws InterruptedException;
int remainingCapacity();
int drainTo(Collection<? super E> c);  
int drainTo(Collection<? super E> c, int maxElements);
Các phương thức thừa kế từ interface Queue<E>:
boolean add(E e);
boolean offer(E e);
E remove();
E poll();
E element();
E peek();
Các phương thức được thừa kế từ interface Collection<E>:
int size();
boolean isEmpty();
boolean contains(Object o);
Iterator<E> iterator();
Object[] toArray();
<T> T[] toArray(T[] a);
boolean add(E e);
boolean remove(Object o);
boolean containsAll(Collection<?> c);  
boolean addAll(Collection<? extends E> c);
boolean removeAll(Collection<?> c);
boolean retainAll(Collection<?> c);
void clear();  
boolean equals(Object o);
int hashCode();

default <T> T[] toArray(IntFunction<T[]> generator)  
default boolean removeIf(Predicate<? super E> filter)
default Spliterator<E> spliterator()  
default Stream<E> stream()  
default Stream<E> parallelStream()

3. Ví dụ

Mô hình Producer/Consumer (Nhà sản xuất và người tiêu thụ) là một ví dụ điển hình trong việc sử dụng interface BlockingQueue. Các sản phẩm được tạo ra bởi những nhà sản xuất được thêm vào một hàng đợi trước khi chúng được lấy ra bởi những người tiêu thụ.
  • Các thread Producer gọi phương thức BlockingQueue.put(e) để thêm các sản phẩm vào một BlockingQueue. Nếu hàng đợi đầy, phương thức put(e) sẽ chờ đợi cho tới khi có không gian khả dụng.
  • Các thread Consumer gọi phương thức BlockingQueue.take() để lấy ra các sản phẩm từ hàng đợi, nếu hàng đợi rỗng phương thức này sẽ chờ đợi cho tới khi có sản phẩm khả dụng.
Xem code đầy đủ của ví dụ:
Lớp Product mô phỏng một sản phẩm.
Product.java
package org.o7planning.blockingqueue.ex;

public class Product {
    private String name;
    private int serial;

    public Product(String name, int serial) {
        this.name = name;
        this.serial = serial;
    }
    public String getInfo() {
        return "Product: " + this.name + ". Serial: " + this.serial;
    }
}
Lớp Consumer mô phỏng người tiêu thụ.
Consumer.java
package org.o7planning.blockingqueue.ex;

import java.util.concurrent.BlockingQueue;

public class Consumer implements Runnable {
    private String consumerName;
    private final BlockingQueue<Product> queue;

    public Consumer(String consumerName, BlockingQueue<Product> q) {
        this.consumerName = consumerName;
        this.queue = q;
    }
    @Override
    public void run() {
        try {
            while (true) {
                this.consume(queue.take());
            }
        } catch (InterruptedException ex) {
        }
    }  
    private void consume(Product x) {
        System.out.println(" --> " + this.consumerName + " >> Consume: " + x.getInfo());
    }
}
Lớp Producer mô phỏng nhà sản xuất.
Producer.java
package org.o7planning.blockingqueue.ex;

import java.util.concurrent.BlockingQueue;

public class Producer implements Runnable {
    private static int serial = 1;

    private final String producerName;
    private final BlockingQueue<Product> queue;
    private final int delay; // Seconds

    public Producer(String producerName, int delay, BlockingQueue<Product> q) {
        this.producerName = producerName;
        this.delay = delay;
        this.queue = q;
    }
    @Override
    public void run() {
        try {
            while (true) {
                Thread.sleep(this.delay * 1000); // 'delay' second.
                this.queue.put(this.produce());
            }
        } catch (InterruptedException ex) {
        }
    }
    private Product produce() {
        System.out.println("#" + this.producerName + " >> Create a new product!");
        return new Product("IPhone", serial++);
    }
}
Lớp Setup được sử dụng để vận hành hệ thống Producer/Consumer:
Setup.java
package org.o7planning.blockingqueue.ex;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class Setup {
    public static void main(String[] args) {
        // Create a BlockingQueue with a capacity of 5.
        BlockingQueue<Product> q = new ArrayBlockingQueue<Product>(5);
        Producer producer1 = new Producer("Producer 01", 2, q);
        Producer producer2 = new Producer("Producer 02", 1, q);
        Consumer consumer1 = new Consumer("Consumer 01", q);
        Consumer consumer2 = new Consumer("Consumer 02", q);
        Consumer consumer3 = new Consumer("Consumer 03", q);

        // Starting the threads
        new Thread(producer1).start();
        new Thread(producer2).start();
        new Thread(consumer1).start();
        new Thread(consumer2).start();
        new Thread(consumer3).start();
    }
}
Chạy ví dụ trên và bạn sẽ nhận được kết quả giống dưới đây:
#Producer 02 >> Create a new product!
 --> Consumer 03 >> Consume: Product: IPhone. Serial: 1
#Producer 01 >> Create a new product!
 --> Consumer 01 >> Consume: Product: IPhone. Serial: 2
#Producer 02 >> Create a new product!
 --> Consumer 02 >> Consume: Product: IPhone. Serial: 3
#Producer 02 >> Create a new product!
 --> Consumer 03 >> Consume: Product: IPhone. Serial: 4
#Producer 01 >> Create a new product!
 --> Consumer 01 >> Consume: Product: IPhone. Serial: 5
#Producer 02 >> Create a new product!
 --> Consumer 02 >> Consume: Product: IPhone. Serial: 6
#Producer 02 >> Create a new product!
 --> Consumer 03 >> Consume: Product: IPhone. Serial: 7
...

4. drainTo(Collection<? super E>)

int drainTo(Collection<? super E> c);
Loại bỏ tất cả các phần tử khỏi BlockingQueue này và thêm chúng vào một Collection được chỉ định. Sử dụng phương thức này là hiệu quả hơn so với việc gọi nhiều lần phương thức poll() hoặc remove().
Phương thức drainTo(Collection) đảm bảo rằng hoặc tất cả các phần tử sẽ được di chuyển tới Collection thành công, hoặc không phần tử nào được chuyển sang Collection nếu có lỗi xẩy ra.

5. drainTo(Collection<? super E>, int)

int drainTo(Collection<? super E> c, int maxElements);
Loại bỏ tối đa maxElements phần tử khỏi BlockingQueue này và thêm chúng vào một Collection được chỉ định. Sử dụng phương thức này là hiểu quả hơn so với việc gọi nhiều lần phương thức poll() hoặc remove().
Nếu có lỗi xẩy ra sẽ không có phần tử nào bị loại bỏ khỏi BlockingQueue này và không phần tử nào được thêm vào Collection.

6. offer(E, long, TimeUnit)

boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
Chèn một phần tử được chỉ định vào hàng đợi. Nếu hàng đợi đầy phương thức sẽ chờ đợi có không gian khả dụng để chèn, trong một khoảng thời gian được chỉ định. Nếu thời gian chờ đợi kết thúc mà không có bất kỳ một không gian khả dụng nào, sẽ không có hành động nào được thực hiện và phương thức trả về false.
Ví dụ:
queue.offer(e, 5, TimeUnit.HOURS); // 5 hours.

7. poll(long, TimeUnit)

E poll(long timeout, TimeUnit unit) throws InterruptedException;
Trả về phần tử đứng đầu và loại bỏ phần tử này ra khỏi hàng đợi. Nếu hàng đợi trống, phương thức sẽ chờ đợi có một phần tử khả dụng trong một khoảng thời gian được chỉ định. Nếu thời gian chờ đợi kết thúc mà không có phần tử khả dụng nào phương thức sẽ trả về null.
Ví dụ:
E e = queue.offer(2, TimeUnit.HOURS); // 2 hours

8. put(E e)

void put(E e) throws InterruptedException;
Chèn một phần tử vào hàng đợi. Nếu hàng đợi đầy phương thức này sẽ chờ đợi cho tới khi có một không gian khả dụng để chèn.

9. remainingCapacity()

int remainingCapacity();
Trả về dung lượng còn lại của hàng đợi này, hoặc trả về Integer.MAX_VALUE nếu hàng đợi không bị giới hạn dung lượng.
Lớp ArrayBlockingQueue cho phép tạo ra một BlockingQueue với việc chỉ định số lượng tối đa các phần tử.

10. take()

E take() throws InterruptedException;
Trả về phần tử đứng đầu và loại bỏ phần tử này ra khỏi hàng đợi. Nếu hàng đợi trống, phương thức sẽ chờ đợi cho tới khi có một phần tử khả dụng trong hàng đợi.

Các hướng dẫn Java Collections Framework

Show More