Mục lục
Hướng dẫn và ví dụ Java Pipe.SinkChannel
Xem thêm các chuyên mục:

Là một website được viết trên công nghệ web Flutter vì vậy hỗ trợ rất tốt cho người học, kể cả những người học khó tính nhất.
Hiện tại website đang tiếp tục được cập nhập nội dung cho phong phú và đầy đủ hơn. Mong các bạn nghé thăm và ủng hộ website mới của chúng tôi.


Giả sử bạn đang phát triển một ứng dụng Multithreading (Đa luồng), và bạn có 2 Thread độc lập là Thread-A và Thread-B. Câu hỏi đặt ra là:
- Làm thế nào để mỗi khi các dữ liệu xuất hiện trên Thread-A chúng sẽ được chuyển sang Thread-B một cách tự động?

Pipe.SinkChannel và Pipe.SourceChannel là hai lớp được tạo ra để xử lý tình huống đề cập ở trên. Mỗi khi dữ liệu được ghi vào Pipe.SinkChannel chúng sẽ tự động xuất hiện trên Pipe.SourceChannel, điều này được gọi là hiệu ứng đường ống (pipe).

Lớp Pipe.SinkChannel là một lớp trừu tượng được định nghĩa bên trong lớp Pipe, và thi hành interface WritableByteChannel và GatheringByteChannel. Nó hoạt động như một kênh ghi dữ liệu.
public abstract static class SinkChannel extends AbstractSelectableChannel
implements WritableByteChannel, GatheringByteChannel
Lớp Pipe.SourceChannel là một lớp trừu tượng được định nghĩa bên trong lớp Pipe, và thi hành interface ReadableByteChannel và ScatteringByteChannel. Nó hoạt động như một kênh đọc dữ liệu.
public abstract static class SourceChannel extends AbstractSelectableChannel
implements ReadableByteChannel, ScatteringByteChannel

Trong ví dụ này, chúng ta sẽ ghi các thông điệp (messages) vào một Pipe.SinkChannel (được điều khiển bởi ThreadA). Chúng sẽ tự động xuất hiện trên một Pipe.SourceChannel (được điều khiển bởi ThreadB).

Pipe_ex1.java
package org.o7planning.pipe.sinkchannel.ex;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.Pipe;
public class Pipe_ex1 {
public static void main(String[] args) throws IOException, InterruptedException {
Pipe pipe = Pipe.open();
ThreadA threadA = new ThreadA(pipe);
ThreadB threadB = new ThreadB(pipe);
threadA.start();
threadB.start();
threadA.join(); // Waits for this thread to die.
threadB.join(); // Waits for this thread to die.
System.out.println();
System.out.println("Done!");
}
}
//
class ThreadA extends Thread {
private Pipe pipe;
public ThreadA(Pipe pipe) {
this.pipe = pipe;
}
@Override
public void run() {
try (Pipe.SinkChannel skChannel = this.pipe.sink();) { // try
String[] messages = new String[] { "Hello\n", "Hi\n", "Bye\n" };
ByteBuffer buffer = ByteBuffer.allocate(512);
for (String msg : messages) {
// Set position = 0; limit = capacity;
buffer.clear();
buffer.put(msg.getBytes("UTF-8"));
buffer.flip();
while (buffer.hasRemaining()) {
skChannel.write(buffer);
}
Thread.sleep(1000);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
//
class ThreadB extends Thread {
private Pipe pipe;
public ThreadB(Pipe pipe) {
this.pipe = pipe;
}
@Override
public void run() {
try (Pipe.SourceChannel srcChannel = this.pipe.source();) { // try
ByteBuffer buffer = ByteBuffer.allocate(512);
while (srcChannel.read(buffer) != -1) {
buffer.flip(); // Set limit = current position; position = 0;
ByteArrayOutputStream baos = new ByteArrayOutputStream();
while (buffer.hasRemaining()) {
byte b = buffer.get();
if (b != '\n') {
baos.write(b);
} else {
String s = baos.toString("UTF-8");
System.out.println(s);
}
}
buffer.clear(); // Set position =0; limit = capacity;
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
Output:
