openplanning

Hướng dẫn và ví dụ Java Pipe.SinkChannel

  1. Pipe.SinkChannel
  2. Examples

1. Pipe.SinkChannel

Giả sử bạn đang phát triển một ứng dụng Multithreading (Đa luồng), và bạn có 2 Thread độc lập là Thread-AThread-B. Câu hỏi đặt ra là:
  • Làm thế nào để mỗi khi các dữ liệu xuất hiện trên Thread-A chúng sẽ được chuyển sang Thread-B một cách tự động?
Pipe.SinkChannelPipe.SourceChannel là hai lớp được tạo ra để xử lý tình huống đề cập ở trên. Mỗi khi dữ liệu được ghi vào Pipe.SinkChannel chúng sẽ tự động xuất hiện trên Pipe.SourceChannel, điều này được gọi là hiệu ứng đường ống (pipe).
Lớp Pipe.SinkChannel là một lớp trừu tượng được định nghĩa bên trong lớp Pipe, và thi hành interface WritableByteChannelGatheringByteChannel. Nó hoạt động như một kênh ghi dữ liệu.
public abstract static class SinkChannel extends AbstractSelectableChannel
                        implements WritableByteChannel, GatheringByteChannel
Lớp Pipe.SourceChannel là một lớp trừu tượng được định nghĩa bên trong lớp Pipe, và thi hành interface ReadableByteChannelScatteringByteChannel. Nó hoạt động như một kênh đọc dữ liệu.
public abstract static class SourceChannel extends AbstractSelectableChannel
                        implements ReadableByteChannel, ScatteringByteChannel

2. Examples

Trong ví dụ này, chúng ta sẽ ghi các thông điệp (messages) vào một Pipe.SinkChannel (được điều khiển bởi ThreadA). Chúng sẽ tự động xuất hiện trên một Pipe.SourceChannel (được điều khiển bởi ThreadB).
Pipe_ex1.java
package org.o7planning.pipe.sinkchannel.ex;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.Pipe;

public class Pipe_ex1 {

    public static void main(String[] args) throws IOException, InterruptedException {
        Pipe pipe = Pipe.open();

        ThreadA threadA = new ThreadA(pipe);
        ThreadB threadB = new ThreadB(pipe);

        threadA.start();
        threadB.start();
        threadA.join(); // Waits for this thread to die.
        threadB.join(); // Waits for this thread to die.
        System.out.println();
        System.out.println("Done!");
    }
}

//
class ThreadA extends Thread {
    private Pipe pipe;

    public ThreadA(Pipe pipe) {
        this.pipe = pipe;
    }

    @Override
    public void run() {
        try (Pipe.SinkChannel skChannel = this.pipe.sink();) { // try
            String[] messages = new String[] { "Hello\n", "Hi\n", "Bye\n" };

            ByteBuffer buffer = ByteBuffer.allocate(512);

            for (String msg : messages) {
                // Set position = 0; limit = capacity;
                buffer.clear();
                buffer.put(msg.getBytes("UTF-8"));
                buffer.flip();
                while (buffer.hasRemaining()) {
                    skChannel.write(buffer);
                }
                Thread.sleep(1000);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

//
class ThreadB extends Thread {
    private Pipe pipe;

    public ThreadB(Pipe pipe) {
        this.pipe = pipe;
    }

    @Override
    public void run() {
        try (Pipe.SourceChannel srcChannel = this.pipe.source();) { // try
            ByteBuffer buffer = ByteBuffer.allocate(512);

            while (srcChannel.read(buffer) != -1) {
                buffer.flip(); // Set limit = current position; position = 0;
                ByteArrayOutputStream baos = new ByteArrayOutputStream();

                while (buffer.hasRemaining()) {
                    byte b = buffer.get();
                    if (b != '\n') {
                        baos.write(b);
                    } else {
                        String s = baos.toString("UTF-8");
                        System.out.println(s);
                    }
                }
                buffer.clear(); // Set position =0; limit = capacity;
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
Output: