openplanning

Java Awssdk S3 Tải object lên với S3TransferManager

  1. Thư viện
  2. Upload một file với S3TransferManager
  3. Upload một thư mục với S3TransferManager
  4. Upload Stream lên S3 Bucket
  5. Upload byte[], String lên S3 Bucket
  6. Upload với tính năng Pause/Resume
S3TransferManager là một tiện ích truyền tải cấp cao được xây dựng dựa trên S3Client. Nó cung cấp một API đơn giản để cho phép bạn truyền tệp và thư mục giữa ứng dụng của bạn và Amazon S3.
S3TransferManager cũng cho phép bạn theo dõi tiến trình truyền trong thời gian thực cũng như tạm dừng quá trình truyền để thực hiện sau đó.
S3TransferManager sử dụng tính năng tải lên nhiều phần (Multipart Upload), điều này có nghĩa là nội dung ban đầu sẽ được chia thành nhiều phần nhỏ và được tải lên song song. Tất cả các bộ phận được lắp ráp lại khi nhận được. Tải lên nhiều phần mang lại những lợi ích sau:
  • Thông lượng cao hơn - Vì nhiều phần được tải lên một cách song song.
  • Việc khôi phục lỗi dễ dàng hơn – Chỉ cần tải lên lại những phần bị lỗi.
  • Tạm dừng và tiếp tục tải lên - Việc tải lên có thể được tạm dừng và tiếp tục lại sau đó.
Lưu ý rằng khi sử dụng tính năng tải lên nhiều phần với S3TransferManager, mỗi phần ngoại trừ phần cuối cùng phải có kích thước tối thiểu 5MB. Được sét đặt thông qua phương thức minimumPartSizeInBytes (Xem mã ví dụ bên dưới).
MyUtils.java (*)
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.transfer.s3.S3TransferManager;
import software.amazon.awssdk.transfer.s3.SizeConstant;

public class MyUtils {

	public static S3TransferManager createS3TransferManager(Region region) {
		AwsCredentials credentials = AwsBasicCredentials.create("accessKeyId", "secretAccessKey");
		AwsCredentialsProvider credentialsProvider = StaticCredentialsProvider.create(credentials);

		S3AsyncClient s3AsyncClient = S3AsyncClient.crtBuilder() //
				.credentialsProvider(credentialsProvider) //
				.region(region) //
				.targetThroughputInGbps(20.0) //
				.minimumPartSizeInBytes(10 * SizeConstant.MB) //
				.build();

		return S3TransferManager.builder().s3Client(s3AsyncClient).build();
	}
}
Để tạo đối tượng S3TransferManager bạn cần phải tạo một đối tượng AwsCredentialsProvider, đối tượng này cung cấp thông tin xác thực (Credentials) cho phép bạn tương tác với AWS. Xem thêm bài viết dưới đây để tạo một AwsCredentialsProvider phù hợp với mục đích sử dụng của bạn.
Trong bài viết này tôi sẽ hướng dẫn bạn sử dụng S3TransferManager để tải các file hoặc thư mục lên S3 Bucket đồng thời sử dụng các tính năng mạnh mẽ của nó.

1. Thư viện

<!-- https://mvnrepository.com/artifact/software.amazon.awssdk/s3 -->
<dependency>
	<groupId>software.amazon.awssdk</groupId>
	<artifactId>s3</artifactId>
	<version>2.21.10</version>
</dependency>

<!-- https://mvnrepository.com/artifact/software.amazon.awssdk/s3-transfer-manager -->
<dependency>
	<groupId>software.amazon.awssdk</groupId>
	<artifactId>s3-transfer-manager</artifactId>
	<version>2.21.10</version>
</dependency>

<!-- https://mvnrepository.com/artifact/software.amazon.awssdk.crt/aws-crt -->
<dependency>
	<groupId>software.amazon.awssdk.crt</groupId>
	<artifactId>aws-crt</artifactId>
	<version>0.28.0</version>
</dependency>

<!-- Log Library -->
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
<dependency>
	<groupId>org.slf4j</groupId>
	<artifactId>slf4j-api</artifactId>
	<version>2.0.9</version>
</dependency>

<!-- Log Library -->
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-simple -->
<dependency>
	<groupId>org.slf4j</groupId>
	<artifactId>slf4j-simple</artifactId>
	<version>2.0.9</version>
</dependency>

2. Upload một file với S3TransferManager

Một ví dụ đơn giản sử dụng S3TransferManager tải một file lên S3 Bucket.
UploadOneFileExample1.java
package org.o7planning.java_14245_awssdk_s3;

import java.nio.file.Paths;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletionException;

import org.o7planning.java_14245_awssdk_s3.utils.MyUtils;

import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.transfer.s3.S3TransferManager;
import software.amazon.awssdk.transfer.s3.model.CompletedFileUpload;
import software.amazon.awssdk.transfer.s3.model.FileUpload;
import software.amazon.awssdk.transfer.s3.model.UploadFileRequest;
import software.amazon.awssdk.transfer.s3.progress.LoggingTransferListener;

public class UploadOneFileExample1 {
	private static final Region MY_BUCKET_REGION = Region.EU_CENTRAL_1;
	private static final String BUCKET_NAME = "my-bucket-name";

	public static void uploadFile(S3TransferManager transferManager, String bucketName, //
			String key, String filePath) {
		// Create UploadFileRequest.
		UploadFileRequest uploadFileRequest = UploadFileRequest.builder()
				.putObjectRequest(b -> b.bucket(bucketName).key(key)) // bucketName & key
				.addTransferListener(LoggingTransferListener.create()) // Listener
				.source(Paths.get(filePath)) // filePath
				.build();

		FileUpload fileUpload = transferManager.uploadFile(uploadFileRequest);
		try {
			CompletedFileUpload completedFileUpload = fileUpload //
					.completionFuture() // CompletableFuture<CompletedDirectoryUpload>
					.join(); // Wait until completed, and return value.
			//
			System.out.println("SuccessfullyCompleted");
			System.out.println(" --> " + completedFileUpload.toString());
		} catch (CancellationException e) {
			System.out.println("CancellationException");
			System.out.println(" --> (It could be by the user calling the pause method.)");
		} catch (CompletionException e) {
			System.out.println("Completed with error: " + e);
		}
	}

	public static void main(String[] args) {
		// There are several ways to create an S3TransferManager
		// @See my article 14231.
		S3TransferManager transferManager = MyUtils.createS3TransferManager(MY_BUCKET_REGION);
		//
		// Test upload a video file 11.9MB
		//
		uploadFile(transferManager, //
				BUCKET_NAME, // bucketName
				"static/videos/sample-15s.mp4", // key,
				"/Volumes/New/Test/sample-15s.mp4" // filePath
		);
	}
}
Output:
[main] INFO software.amazon.awssdk.transfer.s3.progress.LoggingTransferListener - Transfer initiated...
[main] INFO software.amazon.awssdk.transfer.s3.progress.LoggingTransferListener - |                    | 0.0%
[AwsEventLoop 8] INFO software.amazon.awssdk.transfer.s3.progress.LoggingTransferListener - |==                  | 12.0%
[AwsEventLoop 8] INFO software.amazon.awssdk.transfer.s3.progress.LoggingTransferListener - |====================| 100.0%
SuccessfullyCompleted
 --> CompletedFileUpload(response=PutObjectResponse(ETag="3a1a600c79846a4af23e4eaeb62d4f77-2", ServerSideEncryption=AES256))
[sdk-async-response-0-0] INFO software.amazon.awssdk.transfer.s3.progress.LoggingTransferListener - Transfer complete!

3. Upload một thư mục với S3TransferManager

Ví dụ đơn giản sử dụng S3TransferManager tải một thư mục lên S3 Bucket.
UploadDirectoryExample1.java
package org.o7planning.java_14245_awssdk_s3;

import java.nio.file.Paths;

import org.o7planning.java_14245_awssdk_s3.utils.MyUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.transfer.s3.S3TransferManager;
import software.amazon.awssdk.transfer.s3.model.CompletedDirectoryUpload;
import software.amazon.awssdk.transfer.s3.model.DirectoryUpload;
import software.amazon.awssdk.transfer.s3.model.UploadDirectoryRequest;

public class UploadDirectoryExample1 {
	private static Logger logger = LoggerFactory.getLogger(UploadDirectoryExample1.class);

	private static final Region MY_BUCKET_REGION = Region.EU_CENTRAL_1;
	private static final String BUCKET_NAME = "bucket-name";

	public static Integer uploadDirectory(S3TransferManager transferManager, //
			String bucketName, String sourceDirectory) {
		// Create UploadDirectoryRequest
		UploadDirectoryRequest directoryRequest = UploadDirectoryRequest.builder() //
				.bucket(bucketName) //
				.source(Paths.get(sourceDirectory)) //
				.build();
		DirectoryUpload directoryUpload = transferManager.uploadDirectory(directoryRequest);

		CompletedDirectoryUpload completedDirectoryUpload //
				= directoryUpload //
						.completionFuture() // CompletableFuture<CompletedDirectoryUpload>
						.join(); // Wait until completed, and return value.

		completedDirectoryUpload.failedTransfers()
				.forEach(fail -> logger.warn("Object [{}] failed to transfer", fail.toString()));
		//
		// Return fail count.
		//
		return completedDirectoryUpload.failedTransfers().size();
	}

	public static void main(String[] args) {
		// There are several ways to create an S3TransferManager
		// @See my article 14231.
		S3TransferManager transferManager = MyUtils.createS3TransferManager(MY_BUCKET_REGION);
		//
		// Test upload a Directory:
		//
		int failCount = uploadDirectory(transferManager, //
				BUCKET_NAME, // bucketName
				"/Volumes/New/Test/images" // sourceDirectory
		);
		System.out.println("failCount: " + failCount);
	}
}

4. Upload Stream lên S3 Bucket

Sử dụng S3TransferManager được tạo ra dựa trên S3AsyncClient bạn có thể upload dữ liệu từ các nguồn khác File, chẳng hạn byte[] hoặc Stream.
S3AsyncClient s3AsyncClient = S3AsyncClient.crtBuilder() //
		.credentialsProvider(credentialProvider) //
		.region(region) //
		.targetThroughputInGbps(20.0) //
		.minimumPartSizeInBytes(10 * SizeConstant.MB) //
		.build();

S3TransferManager transferManager= S3TransferManager.builder().s3Client(s3AsyncClient).build();
Sử dụng S3TransferManager để upload dữ liệu từ một Stream lên S3 Bucket.
UploadStreamExample1.java
package org.o7planning.java_14245_awssdk_s3;

import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletionException;

import org.o7planning.java_14245_awssdk_s3.utils.MyUtils;

import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.async.BlockingInputStreamAsyncRequestBody;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.transfer.s3.S3TransferManager;
import software.amazon.awssdk.transfer.s3.model.CompletedUpload;
import software.amazon.awssdk.transfer.s3.model.Upload;
import software.amazon.awssdk.transfer.s3.model.UploadRequest;

public class UploadStreamExample1 {

	private static final Region MY_BUCKET_REGION = Region.EU_CENTRAL_1;
	private static final String BUCKET_NAME = "my-bucket-name";

	public static void uploadStream(S3TransferManager transferManager, String bucketName, String key) {
		// Stream Data to upload.
		BlockingInputStreamAsyncRequestBody body =
				// 'null' indicates a stream will be provided later.
				AsyncRequestBody.forBlockingInputStream(null);

		UploadRequest uploadRequest = UploadRequest.builder(). //
				requestBody(body) //
				.putObjectRequest(req -> req.bucket(bucketName).key(key)) // bucketName & key
				.build();

		Upload upload = transferManager.upload(uploadRequest);

		// A Stream to upload.
		InputStream inputStream = new ByteArrayInputStream("Some Data".getBytes());

		// Provide the stream of data to be uploaded.
		body.writeInputStream(inputStream);

		try {
			CompletedUpload completedFileUpload = upload //
					.completionFuture() // CompletableFuture<CompletedDirectoryUpload>
					.join(); // Wait until completed, and return value.
			//
			System.out.println("SuccessfullyCompleted");
			System.out.println(" --> " + completedFileUpload.toString());
		} catch (CancellationException e) {
			System.out.println("CancellationException");
			System.out.println(" --> (It could be by the user calling the pause method.)");
		} catch (CompletionException e) {
			System.out.println("Completed with error: " + e);
		}
	}

	public static void main(String[] args) {
		// There are several ways to create an S3TransferManager
		// @See my article 14231.
		S3TransferManager transferManager = MyUtils.createS3TransferManager(MY_BUCKET_REGION);
		//
		uploadStream(transferManager, //
				BUCKET_NAME, // bucketName
				"static/text/stream-text.txt" // key
		);
	}
}

5. Upload byte[], String lên S3 Bucket

Ví dụ, upload String hoặc byte[] lên S3 Bucket.
AsyncRequestBody requestBody1 = AsyncRequestBody.fromBytes(myByteArray);

AsyncRequestBody requestBody2 = AsyncRequestBody.fromString(myString);  

UploadRequest uploadRequest = UploadRequest.builder(). //
		requestBody(requestBody2) // AsyncRequestBody
		.putObjectRequest(req -> req.bucket(bucketName).key(key)) // bucketName & key
		.build();

Upload upload = transferManager.upload(uploadRequest);

6. Upload với tính năng Pause/Resume

Ví dụ, upload file sử dụng S3TransferManager với tính năng pause/resume (tạm dừng và tiếp tục lại).
Về bản chất, file ban đầu sẽ được chia thành nhiều phần nhỏ, các phần sẽ được tải lên. Khi người dùng tạm dừng việc upload các phần chưa upload xong sẽ bị huỷ và sẽ được upload lại từ con số 0.
ResumableFileUploadThread.java
package org.o7planning.java_14245_awssdk_s3;

import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletionException;

import software.amazon.awssdk.transfer.s3.S3TransferManager;
import software.amazon.awssdk.transfer.s3.model.CompletedFileUpload;
import software.amazon.awssdk.transfer.s3.model.FileUpload;
import software.amazon.awssdk.transfer.s3.model.ResumableFileUpload;
import software.amazon.awssdk.transfer.s3.model.UploadFileRequest;

public class ResumableFileUploadThread extends Thread {
	private S3TransferManager transferManager;
	private String bucketName;
	private String key;
	private String filePath;

	private FileUpload fileUpload;
	private boolean isPause;
	private boolean successfullyCompleted;

	public ResumableFileUploadThread(S3TransferManager transferManager, //
			String bucketName, String key, String filePath) {
		super();
		this.transferManager = transferManager;
		this.bucketName = bucketName;
		this.key = key;
		this.filePath = filePath;
	}

	private String resumableStateFile(String key) {
		return key.replace('/', '_') + "_resumableFileUpload.json";
	}

	public void pauseUpload() {
		if (fileUpload == null || isPause || successfullyCompleted) {
			return;
		}
		System.out.println("--> User calls pauseUpload!");

		// Pause the upload
		ResumableFileUpload resumableFileUpload = fileUpload.pause();
		isPause = true;
		// Optionally, persist the resumableFileUpload
		Path path = Paths.get(this.resumableStateFile(key));
		resumableFileUpload.serializeToFile(path);
	}

	public void resumeUpload() {
		if (fileUpload == null || !isPause || successfullyCompleted) {
			return;
		}
		isPause = false;
		System.out.println("--> User calls resumeUpload!");

		Path path = Paths.get(this.resumableStateFile(key));
		// Retrieve the resumableFileUpload from the file
		//
		// WARNING: Test with: Awssdk 2.21.10
		// The "TransferListener" is not restored. (Need to wait for newer version of
		// Awssdk)
		//
		ResumableFileUpload persistedResumableFileUpload = ResumableFileUpload.fromFile(path);

		// Resume the upload
		this.fileUpload = transferManager.resumeUploadFile(persistedResumableFileUpload);

		this.handleCompletionState(fileUpload);
	}

	@Override
	public void run() {
		UploadFileRequest uploadFileRequest = UploadFileRequest.builder()
				.putObjectRequest(req -> req.bucket(bucketName).key(key)) //
				.source(Paths.get(filePath)) //
				.addTransferListener(new MyTransferListener()) // Listener
				.build();

		// Initiate the transfer
		this.fileUpload = transferManager.uploadFile(uploadFileRequest);

		this.handleCompletionState(fileUpload);
	}

	private void handleCompletionState(FileUpload fileUpload) {
		try {
			CompletedFileUpload completedFileUpload = fileUpload //
					.completionFuture() // CompletableFuture<CompletedDirectoryUpload>
					.join(); // Wait until completed, and return value.
			//
			successfullyCompleted = true;
			System.out.println("handleCompletionState: SuccessfullyCompleted");
			System.out.println(" --> " + completedFileUpload.toString());
		} catch (CancellationException e) {
			System.out.println("handleCompletionState: CancellationException");
			System.out.println(" --> (It could be by the user calling the pause method.)");
		} catch (CompletionException e) {
			System.out.println("handleCompletionState: Completed with error: " + e);
		}
	}
}
Viết lớp thi hành interface TransferListener để lắng nghe trạng thái truyền dữ liệu.
MyTransferListener.java
package org.o7planning.java_14245_awssdk_s3;

import software.amazon.awssdk.transfer.s3.progress.TransferListener;
import software.amazon.awssdk.transfer.s3.progress.TransferListener.Context.BytesTransferred;
import software.amazon.awssdk.transfer.s3.progress.TransferListener.Context.TransferComplete;
import software.amazon.awssdk.transfer.s3.progress.TransferListener.Context.TransferFailed;
import software.amazon.awssdk.transfer.s3.progress.TransferListener.Context.TransferInitiated;
import software.amazon.awssdk.transfer.s3.progress.TransferProgressSnapshot;

public class MyTransferListener implements TransferListener {

	@Override
	public void bytesTransferred(BytesTransferred context) {
		TransferProgressSnapshot progress = context.progressSnapshot();
		System.out.println("MyTransferListener.bytesTransferred: " //
				+ (progress.ratioTransferred().getAsDouble() * 100) + " %");
	}

	@Override
	public void transferInitiated(TransferInitiated context) {
		TransferProgressSnapshot progress = context.progressSnapshot();
		System.out.println("MyTransferListener.transferInitiated: " //
				+ progress.ratioTransferred().getAsDouble());
	}

	@Override
	public void transferComplete(TransferComplete context) {
		TransferProgressSnapshot progress = context.progressSnapshot();
		System.out.println("MyTransferListener.bytesTransferred: " //
				+ progress.ratioTransferred().getAsDouble());
	}

	@Override
	public void transferFailed(TransferFailed context) {
		System.out.println("MyTransferListener.transferFailed (Or pause called): " //
				+ context.exception().toString());
	}
}
Viết một lớp để mô phỏng việc người dùng tạm dừng việc upload và tiếp tục lại việc upload sau đó.
ResumableFileUploadThreadTest.java
package org.o7planning.java_14245_awssdk_s3;

import java.time.Duration;

import org.o7planning.java_14245_awssdk_s3.utils.MyUtils;

import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.transfer.s3.S3TransferManager;

public class ResumableFileUploadThreadTest {
	private static final Region MY_BUCKET_REGION = Region.EU_CENTRAL_1;
	private static final String BUCKET_NAME = "my-bucket-name";

	public static void main(String[] args) {
		S3TransferManager transferManager = MyUtils.createS3TransferManager(MY_BUCKET_REGION);

		// Test with a large File...
		ResumableFileUploadThread uploadThread = new ResumableFileUploadThread(transferManager, //
				BUCKET_NAME, // bucketName
				"static/videos/sample-15s.mp4", // key
				"/Volumes/New/Test/sample-15s.mp4" // filePath
		);
		//
		uploadThread.start();

		try {
			Thread.sleep(Duration.ofSeconds(20));
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		// The User calls pauseUpload method.
		uploadThread.pauseUpload();
		try {
			Thread.sleep(Duration.ofSeconds(1));
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		// The User calls resumeUpload method.
		uploadThread.resumeUpload();
	} 
}
Output:
MyTransferListener.transferInitiated: 0.0
MyTransferListener.bytesTransferred: 12.006569699927647 %
--> User calls pauseUpload!
handleCompletionState: CancellationException
 --> (It could be by the user calling the pause method.)
MyTransferListener.transferFailed (Or pause called): java.util.concurrent.CancellationException
--> User calls resumeUpload!
handleCompletionState: SuccessfullyCompleted
 --> CompletedFileUpload(response=PutObjectResponse(ETag="3a1a600c79846a4af23e4eaeb62d4f77-2", ServerSideEncryption=AES256))
Khi người dùng tạm dừng việc upload, một file định dạng JSON có thể được lưu lại trên máy tính của người dùng, nó chứa các trạng thái hiện thời của việc upload. File này giúp khôi phục lại trạng thái upload khi người dùng muốn tiếp tục việc upload sau đó. Nội dung của nó giống như dưới đây:
Resumable Status File.
{
  "fileLength": 11916526,
  "fileLastModified": 1698260287,
  "multipartUploadId": "HM38UOktCecLrhtqIF_1jqLx",
  "partSizeInBytes": 10485760,
  "totalParts": 2,
  "transferredParts": 1,
  "uploadFileRequest": {
    "source": "/Volumes/New/Test/sample-15s.mp4",
    "putObjectRequest": {
      "Bucket": "my-bucket-name",
      "Key": "static/videos/sample-15s.mp4"
    }
  }
}

Các hướng dẫn Amazon Web Services

Show More