openplanning

Hướng dẫn và ví dụ Java TransferQueue

Xem thêm các chuyên mục:

Hãy theo dõi chúng tôi trên Fanpage để nhận được thông báo mỗi khi có bài viết mới. Facebook

1- TransferQueue

Là một inteface con của BlockingQueue, TransferQueue có đầy đủ đặc điểm của interface cha, ngoài ra nó cung cấp khả năng cho phép Producer (nhà sản xuất) chờ đợi cho tới khi Consumer (người tiêu dùng) nhận được "sản phẩm" (phần tử). TransferQueue có ích trong một vài kiểu ứng dụng, chẳng hạn trong các ứng dụng truyền thông điệp.

public interface TransferQueue<E> extends BlockingQueue<E>
So với BlockingQueue<E>, TransferQueue<E> cung cấp thêm một vài phương thức, chúng bao gồm:

void transfer(E e) throws InterruptedException;
boolean tryTransfer(E e);
boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException;
boolean hasWaitingConsumer();
int getWaitingConsumerCount();
transfer(e):
Thêm một phần tử vào TransferQueue này và chờ đợi cho tới khi nó được nhận bởi một người tiêu dùng đang chờ đợi thông qua phương thức BlockingQueue.take() hoặc BlockingQueue.poll(timeout,unit).
tryTransfer(e):
Phương thức tryTransfer(e) chỉ thêm một phần tử vào TransferQueue này nếu có người tiêu dùng đang chờ đợi để nhận được phần tử thông qua phương thức BlockingQueue.take() hoặc BlockingQueue.poll(timeout,unit), và chắc chắn rằng người tiêu dùng sẽ nhận được phần tử này ngay lập tức. Ngược lại, phương thức trả về false và không có bất kỳ hành động nào khác được thực hiện.
tryTransfer(e, timeout, unit):
Phương thức tryTransfer(e,timeout,unit) chỉ thêm một phần tử vào TransferQueue này nếu trong một khoảng thời gian chờ đợi được chỉ định có người tiêu dùng đang chờ đợi để nhận được phần tử này thông qua phương thức BlockingQueue.take() hoặc BlockingQueue.poll(timeout,unit), và chắc chắn rằng người tiêu dùng sẽ nhận được phần tử này. Ngược lại, phương thức trả về false và không có bất kỳ hành động nào khác được thực hiện.

2- TransferQueue methods

Các phương thức được định nghĩa trong interface TransferQueue<E>:

void transfer(E e) throws InterruptedException;
boolean tryTransfer(E e);
boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException;
boolean hasWaitingConsumer();
int getWaitingConsumerCount();
Các phương thức được thừa kế từ interface BlockingQueue<E>:

void put(E e) throws InterruptedException;
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
E take() throws InterruptedException;
E poll(long timeout, TimeUnit unit) throws InterruptedException;
int remainingCapacity();
int drainTo(Collection<? super E> c);  
int drainTo(Collection<? super E> c, int maxElements);
Các phương thức được thừa kế từ interface Queue<E>:

boolean add(E e);
boolean offer(E e);
E remove();
E poll();
E element();
E peek();
Các phương thức được thừa kế từ interface Collection<E>:

int size();
boolean isEmpty();
boolean contains(Object o);
Iterator<E> iterator();
Object[] toArray();
<T> T[] toArray(T[] a);
boolean add(E e);
boolean remove(Object o);
boolean containsAll(Collection<?> c);  
boolean addAll(Collection<? extends E> c);
boolean removeAll(Collection<?> c);
boolean retainAll(Collection<?> c);
void clear();  
boolean equals(Object o);
int hashCode();

default <T> T[] toArray(IntFunction<T[]> generator)  
default boolean removeIf(Predicate<? super E> filter)
default Spliterator<E> spliterator()  
default Stream<E> stream()  
default Stream<E> parallelStream()

3- Ví dụ

Trong ví dụ dưới đây, Producer gửi các thông điệp tới các Consumer thông qua phương thức TransferQueue.transfer(e).
Nhìn vào đầu ra của ví dụ này, bạn sẽ thấy rằng: Nếu tất cả các Consumer đang bận tiêu thụ các thông điệp (Có nghĩa là không có Consumer nào ở trạng thái chờ đợi) thì phương thức TransferQueue.transfer(e) sẽ bị chặn (block) (rơi vào trạng thái chờ).
TransferQueue_transfer_ex1.java

package org.o7planning.transferqueue.aa;

import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TransferQueue;

public class TransferQueue_transfer_ex1 {

    public static void main(String[] args) {
        TransferQueue<String> queue = new LinkedTransferQueue<String>();

        queue.add("NORMAL-MESSAGE 1");
        queue.add("NORMAL-MESSAGE 2");
        queue.add("NORMAL-MESSAGE 3");

        Producer producer1 = new Producer(queue);
        Consumer consumer1 = new Consumer(queue);
        Consumer consumer2 = new Consumer(queue);

        // Start..
        producer1.start();
        
        consumer1.start();
        consumer2.start();
    }
}

class Producer extends Thread {
    final TransferQueue<String> queue;
    private static int count = 1;

    Producer(TransferQueue<String> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            while (true) {
                String message = "IMPORTANT-MESSAGE " + count++;
                System.out.println("[PRODUCER] Transfering: " + message);
                this.queue.transfer(message);
                System.out.println("[PRODUCER] Transfered: " + message + " (**)");
            }
        } catch (InterruptedException ex) {
        }
    }
}
class Consumer extends Thread {
    final TransferQueue<String> queue;

    Consumer(TransferQueue<String> queue) {
        this.queue = queue;
    }
    @Override
    public void run() {
        try {
            while (true) {
                String message = queue.take();
                this.longConsume(message);
            }
        } catch (InterruptedException ex) {
        }
    }
    // Need 2 seconds to consume the message.
    private void longConsume(String message) throws InterruptedException  {
        System.out.println(" [CONSUMER] Consuming: " + message);
        Thread.sleep(2 * 1000); // 2 seconds.
        System.out.println(" [CONSUMER] Consumed: " + message);
    }
}
Output:

[PRODUCER] Transfering: IMPORTANT-MESSAGE 1
 [CONSUMER] Consuming: NORMAL-MESSAGE 1
 [CONSUMER] Consuming: NORMAL-MESSAGE 2
 [CONSUMER] Consumed: NORMAL-MESSAGE 1
 [CONSUMER] Consumed: NORMAL-MESSAGE 2
 [CONSUMER] Consuming: NORMAL-MESSAGE 3
 [CONSUMER] Consuming: IMPORTANT-MESSAGE 1
[PRODUCER] Transfered: IMPORTANT-MESSAGE 1 (**)
[PRODUCER] Transfering: IMPORTANT-MESSAGE 2
 [CONSUMER] Consumed: NORMAL-MESSAGE 3
 [CONSUMER] Consuming: IMPORTANT-MESSAGE 2
 [CONSUMER] Consumed: IMPORTANT-MESSAGE 1
[PRODUCER] Transfered: IMPORTANT-MESSAGE 2 (**)
[PRODUCER] Transfering: IMPORTANT-MESSAGE 3
[PRODUCER] Transfered: IMPORTANT-MESSAGE 3 (**)
[PRODUCER] Transfering: IMPORTANT-MESSAGE 4
 [CONSUMER] Consuming: IMPORTANT-MESSAGE 3
 [CONSUMER] Consumed: IMPORTANT-MESSAGE 2
 [CONSUMER] Consuming: IMPORTANT-MESSAGE 4
[PRODUCER] Transfered: IMPORTANT-MESSAGE 4 (**)
[PRODUCER] Transfering: IMPORTANT-MESSAGE 5
 [CONSUMER] Consumed: IMPORTANT-MESSAGE 3
 [CONSUMER] Consuming: IMPORTANT-MESSAGE 5
[PRODUCER] Transfered: IMPORTANT-MESSAGE 5 (**)
...
Ví dụ tiếp theo dưới đây chỉ ra cách sử dụng phương thức TransferQueue.tryTransfer(e). Trong ví dụ này Producer tạo ra các thông điệp và cố gắng chuyển chúng tới các Consumer đang chờ đợi.
Nhìn vào đầu ra của ví dụ này, bạn sẽ thấy rằng có rất nhiều thông điệp được tạo ra bởi Producer sẽ bị bỏ qua vì tại thời điểm gọi phương thức TransferQueue.tryTransfer(e) không có Consumer nào đang chờ đợi.
TransferQueue_tryTransfer_ex1.java

package org.o7planning.transferqueue.bb;

import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TransferQueue;

public class TransferQueue_tryTransfer_ex1 {

    public static void main(String[] args) {
        TransferQueue<String> queue = new LinkedTransferQueue<String>();
        queue.add("NORMAL-MESSAGE 1");
        queue.add("NORMAL-MESSAGE 2");
        queue.add("NORMAL-MESSAGE 3");

        Producer producer1 = new Producer(queue);
        Consumer consumer1 = new Consumer(queue);
        Consumer consumer2 = new Consumer(queue);

        // Start..
        producer1.start();
        consumer1.start();
        consumer2.start();
    }
}
class Producer extends Thread {
    final TransferQueue<String> queue;
    private static int count = 1;

    public Producer(TransferQueue<String> queue) {
        this.queue = queue;
    }
    @Override
    public void run() {
        try {
            while (true) {
                String message = "IMPORTANT-MESSAGE " + count++;
                this.queue.tryTransfer(message); // Calling tryTransfer method.
                Thread.sleep(1 * 1000); // 1 seconds.
            }
        } catch (InterruptedException ex) {
        }
    }
}
class Consumer extends Thread {
    final TransferQueue<String> queue;

    Consumer(TransferQueue<String> queue) {
        this.queue = queue;
    }
    @Override
    public void run() {
        try {
            while (true) {
                String message = queue.take();
                System.out.println(">> " + message);
                Thread.sleep(3 * 1000); // 3 seconds
            }
        } catch (InterruptedException ex) {
        }
    }
}
Output:

>> NORMAL-MESSAGE 1
>> NORMAL-MESSAGE 2
>> NORMAL-MESSAGE 3
>> IMPORTANT-MESSAGE 4
>> IMPORTANT-MESSAGE 7
>> IMPORTANT-MESSAGE 8
>> IMPORTANT-MESSAGE 10
>> IMPORTANT-MESSAGE 11
>> IMPORTANT-MESSAGE 13
>> IMPORTANT-MESSAGE 14
>> IMPORTANT-MESSAGE 16
>> IMPORTANT-MESSAGE 17
Các thông điệp được tạo ra bởi Producer đã bị bỏ qua:
  • IMPORTANT-MESSAGE 1
  • IMPORTANT-MESSAGE 2
  • IMPORTANT-MESSAGE 3
  • IMPORTANT-MESSAGE 5
  • IMPORTANT-MESSAGE 6
  • ...

4- getWaitingConsumerCount()


int getWaitingConsumerCount();
Trả về số ước tính các người tiêu thụ đang chờ đợi để nhận được phần tử từ TransferQueue này thông qua phương thức BlockingQueue.take() hoặc BlockingQueue.poll(timeout,unit).
Giá trị trả lại là giá trị ước tính của tình trạng tạm thời, có thể không chính xác nếu người tiêu dùng đã hoàn thành hoặc từ bỏ việc chờ đợi. Giá trị có thể hữu ích cho việc theo dõi và phỏng đoán, nhưng không hữu ích cho việc điều khiển đồng bộ hóa. Việc triển khai phương thức này có thể chậm hơn đáng kể so với phương thức hasWaitingConsumer().

5- hasWaitingConsumer()


boolean hasWaitingConsumer();
Trả về true nếu có ít nhất một người tiêu dùng đang đợi để nhận một phần tử thông qua phương thức BlockingQueue.take() hoặc BlockingQueue.poll(timeout,unit). Giá trị trả về đại diện cho một trạng thái nhất thời của sự việc.

6- transfer(E)


void transfer(E e) throws InterruptedException;
Thêm một phần tử vào TransferQueue này và chờ đợi cho tới khi nó được nhận bởi một người tiêu dùng đang chờ đợi thông qua phương thức BlockingQueue.take() hoặc BlockingQueue.poll(timeout,unit).

7- tryTransfer(E)


boolean tryTransfer(E e);
Phương thức tryTransfer(e) chỉ thêm một phần tử vào TransferQueue này nếu có người tiêu dùng đang chờ đợi để nhận được phần tử thông qua phương thức BlockingQueue.take() hoặc BlockingQueue.poll(timeout,unit), và chắc chắn rằng người tiêu dùng sẽ nhận được phần tử này ngay lập tức. Ngược lại, phương thức trả về false và không có bất kỳ hành động nào khác được thực hiện.

8- tryTransfer(E, long, TimeUnit)


boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException;
Phương thức tryTransfer(e,timeout,unit) chỉ thêm một phần tử vào TransferQueue này nếu trong một khoảng thời gian chờ đợi được chỉ định có người tiêu dùng đang chờ đợi để nhận được phần tử này thông qua phương thức BlockingQueue.take() hoặc BlockingQueue.poll(timeout,unit), và chắc chắn rằng người tiêu dùng sẽ nhận được phần tử này. Ngược lại, phương thức trả về false và không có bất kỳ hành động nào khác được thực hiện.

Xem thêm các chuyên mục: