o7planning

Java Awssdk S3 Upload object with S3TransferManager

  1. Library
  2. Upload one file with S3TransferManager
  3. Upload one Directory with S3TransferManager
  4. Upload Stream to S3 Bucket
  5. Upload byte[], String to S3 Bucket
  6. Upload with Pause/Resume feature
S3TransferManager is a high-level transfer utility built on top of S3Client. It provides a simple API to allow you to transfer files and directories between your application and Amazon S3.
S3TransferManager also allows you to monitor transfer progress in real-time as well as pause transfers to perform at a later time.
S3TransferManager uses the Multipart Upload feature, which means the original content will be split into many small parts and uploaded in parallel. All parts are reassembled upon receipt. Multipart uploads provide the following benefits:
  • Higher throughput - Because multiple parts are uploaded in parallel.
  • Easier error recovery – Simply re-upload the failed parts.
  • Pause and resume uploads - Uploads can be paused and resumed later.
Note that when using multipart uploads with S3TransferManager, each part except the last must be at least 5MB in size. Set via the minimumPartSizeInBytes method (See example code below).
MyUtils.java (*)
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.transfer.s3.S3TransferManager;
import software.amazon.awssdk.transfer.s3.SizeConstant;

public class MyUtils {

	public static S3TransferManager createS3TransferManager(Region region) {
		AwsCredentials credentials = AwsBasicCredentials.create("accessKeyId", "secretAccessKey");
		AwsCredentialsProvider credentialsProvider = StaticCredentialsProvider.create(credentials);

		S3AsyncClient s3AsyncClient = S3AsyncClient.crtBuilder() //
				.credentialsProvider(credentialsProvider) //
				.region(region) //
				.targetThroughputInGbps(20.0) //
				.minimumPartSizeInBytes(10 * SizeConstant.MB) //
				.build();

		return S3TransferManager.builder().s3Client(s3AsyncClient).build();
	}
}
To create an S3TransferManager object, you need to create an AwsCredentialsProvider object, which provides credentials that allow you to interact with AWS. See the article below to create an AwsCredentialsProvider suitable for your purposes.
In this article, I will show you how to use S3TransferManager to upload files or directories to S3 Bucket and use its powerful features.

1. Library

<!-- https://mvnrepository.com/artifact/software.amazon.awssdk/s3 -->
<dependency>
	<groupId>software.amazon.awssdk</groupId>
	<artifactId>s3</artifactId>
	<version>2.21.10</version>
</dependency>

<!-- https://mvnrepository.com/artifact/software.amazon.awssdk/s3-transfer-manager -->
<dependency>
	<groupId>software.amazon.awssdk</groupId>
	<artifactId>s3-transfer-manager</artifactId>
	<version>2.21.10</version>
</dependency>

<!-- https://mvnrepository.com/artifact/software.amazon.awssdk.crt/aws-crt -->
<dependency>
	<groupId>software.amazon.awssdk.crt</groupId>
	<artifactId>aws-crt</artifactId>
	<version>0.28.0</version>
</dependency>

<!-- Log Library -->
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
<dependency>
	<groupId>org.slf4j</groupId>
	<artifactId>slf4j-api</artifactId>
	<version>2.0.9</version>
</dependency>

<!-- Log Library -->
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-simple -->
<dependency>
	<groupId>org.slf4j</groupId>
	<artifactId>slf4j-simple</artifactId>
	<version>2.0.9</version>
</dependency>

2. Upload one file with S3TransferManager

A simple example using S3TransferManager to upload a file to S3 Bucket.
UploadOneFileExample1.java
package org.o7planning.java_14245_awssdk_s3;

import java.nio.file.Paths;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletionException;

import org.o7planning.java_14245_awssdk_s3.utils.MyUtils;

import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.transfer.s3.S3TransferManager;
import software.amazon.awssdk.transfer.s3.model.CompletedFileUpload;
import software.amazon.awssdk.transfer.s3.model.FileUpload;
import software.amazon.awssdk.transfer.s3.model.UploadFileRequest;
import software.amazon.awssdk.transfer.s3.progress.LoggingTransferListener;

public class UploadOneFileExample1 {
	private static final Region MY_BUCKET_REGION = Region.EU_CENTRAL_1;
	private static final String BUCKET_NAME = "my-bucket-name";

	public static void uploadFile(S3TransferManager transferManager, String bucketName, //
			String key, String filePath) {
		// Create UploadFileRequest.
		UploadFileRequest uploadFileRequest = UploadFileRequest.builder()
				.putObjectRequest(b -> b.bucket(bucketName).key(key)) // bucketName & key
				.addTransferListener(LoggingTransferListener.create()) // Listener
				.source(Paths.get(filePath)) // filePath
				.build();

		FileUpload fileUpload = transferManager.uploadFile(uploadFileRequest);
		try {
			CompletedFileUpload completedFileUpload = fileUpload //
					.completionFuture() // CompletableFuture<CompletedDirectoryUpload>
					.join(); // Wait until completed, and return value.
			//
			System.out.println("SuccessfullyCompleted");
			System.out.println(" --> " + completedFileUpload.toString());
		} catch (CancellationException e) {
			System.out.println("CancellationException");
			System.out.println(" --> (It could be by the user calling the pause method.)");
		} catch (CompletionException e) {
			System.out.println("Completed with error: " + e);
		}
	}

	public static void main(String[] args) {
		// There are several ways to create an S3TransferManager
		// @See my article 14231.
		S3TransferManager transferManager = MyUtils.createS3TransferManager(MY_BUCKET_REGION);
		//
		// Test upload a video file 11.9MB
		//
		uploadFile(transferManager, //
				BUCKET_NAME, // bucketName
				"static/videos/sample-15s.mp4", // key,
				"/Volumes/New/Test/sample-15s.mp4" // filePath
		);
	}
}
Output:
[main] INFO software.amazon.awssdk.transfer.s3.progress.LoggingTransferListener - Transfer initiated...
[main] INFO software.amazon.awssdk.transfer.s3.progress.LoggingTransferListener - |                    | 0.0%
[AwsEventLoop 8] INFO software.amazon.awssdk.transfer.s3.progress.LoggingTransferListener - |==                  | 12.0%
[AwsEventLoop 8] INFO software.amazon.awssdk.transfer.s3.progress.LoggingTransferListener - |====================| 100.0%
SuccessfullyCompleted
 --> CompletedFileUpload(response=PutObjectResponse(ETag="3a1a600c79846a4af23e4eaeb62d4f77-2", ServerSideEncryption=AES256))
[sdk-async-response-0-0] INFO software.amazon.awssdk.transfer.s3.progress.LoggingTransferListener - Transfer complete!

3. Upload one Directory with S3TransferManager

Simple example using S3TransferManager to upload a directory to S3 Bucket.
UploadDirectoryExample1.java
package org.o7planning.java_14245_awssdk_s3;

import java.nio.file.Paths;

import org.o7planning.java_14245_awssdk_s3.utils.MyUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.transfer.s3.S3TransferManager;
import software.amazon.awssdk.transfer.s3.model.CompletedDirectoryUpload;
import software.amazon.awssdk.transfer.s3.model.DirectoryUpload;
import software.amazon.awssdk.transfer.s3.model.UploadDirectoryRequest;

public class UploadDirectoryExample1 {
	private static Logger logger = LoggerFactory.getLogger(UploadDirectoryExample1.class);

	private static final Region MY_BUCKET_REGION = Region.EU_CENTRAL_1;
	private static final String BUCKET_NAME = "bucket-name";

	public static Integer uploadDirectory(S3TransferManager transferManager, //
			String bucketName, String sourceDirectory) {
		// Create UploadDirectoryRequest
		UploadDirectoryRequest directoryRequest = UploadDirectoryRequest.builder() //
				.bucket(bucketName) //
				.source(Paths.get(sourceDirectory)) //
				.build();
		DirectoryUpload directoryUpload = transferManager.uploadDirectory(directoryRequest);

		CompletedDirectoryUpload completedDirectoryUpload //
				= directoryUpload //
						.completionFuture() // CompletableFuture<CompletedDirectoryUpload>
						.join(); // Wait until completed, and return value.

		completedDirectoryUpload.failedTransfers()
				.forEach(fail -> logger.warn("Object [{}] failed to transfer", fail.toString()));
		//
		// Return fail count.
		//
		return completedDirectoryUpload.failedTransfers().size();
	}

	public static void main(String[] args) {
		// There are several ways to create an S3TransferManager
		// @See my article 14231.
		S3TransferManager transferManager = MyUtils.createS3TransferManager(MY_BUCKET_REGION);
		//
		// Test upload a Directory:
		//
		int failCount = uploadDirectory(transferManager, //
				BUCKET_NAME, // bucketName
				"/Volumes/New/Test/images" // sourceDirectory
		);
		System.out.println("failCount: " + failCount);
	}
}

4. Upload Stream to S3 Bucket

Using S3TransferManager built on top of S3AsyncClient you can upload data from sources other than Files, such as byte[] or Stream.
S3AsyncClient s3AsyncClient = S3AsyncClient.crtBuilder() //
		.credentialsProvider(credentialProvider) //
		.region(region) //
		.targetThroughputInGbps(20.0) //
		.minimumPartSizeInBytes(10 * SizeConstant.MB) //
		.build();

S3TransferManager transferManager= S3TransferManager.builder().s3Client(s3AsyncClient).build();
Use S3TransferManager to upload data from a Stream to S3 Bucket.
UploadStreamExample1.java
package org.o7planning.java_14245_awssdk_s3;

import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletionException;

import org.o7planning.java_14245_awssdk_s3.utils.MyUtils;

import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.async.BlockingInputStreamAsyncRequestBody;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.transfer.s3.S3TransferManager;
import software.amazon.awssdk.transfer.s3.model.CompletedUpload;
import software.amazon.awssdk.transfer.s3.model.Upload;
import software.amazon.awssdk.transfer.s3.model.UploadRequest;

public class UploadStreamExample1 {

	private static final Region MY_BUCKET_REGION = Region.EU_CENTRAL_1;
	private static final String BUCKET_NAME = "my-bucket-name";

	public static void uploadStream(S3TransferManager transferManager, String bucketName, String key) {
		// Stream Data to upload.
		BlockingInputStreamAsyncRequestBody body =
				// 'null' indicates a stream will be provided later.
				AsyncRequestBody.forBlockingInputStream(null);

		UploadRequest uploadRequest = UploadRequest.builder(). //
				requestBody(body) //
				.putObjectRequest(req -> req.bucket(bucketName).key(key)) // bucketName & key
				.build();

		Upload upload = transferManager.upload(uploadRequest);

		// A Stream to upload.
		InputStream inputStream = new ByteArrayInputStream("Some Data".getBytes());

		// Provide the stream of data to be uploaded.
		body.writeInputStream(inputStream);

		try {
			CompletedUpload completedFileUpload = upload //
					.completionFuture() // CompletableFuture<CompletedDirectoryUpload>
					.join(); // Wait until completed, and return value.
			//
			System.out.println("SuccessfullyCompleted");
			System.out.println(" --> " + completedFileUpload.toString());
		} catch (CancellationException e) {
			System.out.println("CancellationException");
			System.out.println(" --> (It could be by the user calling the pause method.)");
		} catch (CompletionException e) {
			System.out.println("Completed with error: " + e);
		}
	}

	public static void main(String[] args) {
		// There are several ways to create an S3TransferManager
		// @See my article 14231.
		S3TransferManager transferManager = MyUtils.createS3TransferManager(MY_BUCKET_REGION);
		//
		uploadStream(transferManager, //
				BUCKET_NAME, // bucketName
				"static/text/stream-text.txt" // key
		);
	}
}

5. Upload byte[], String to S3 Bucket

Example, upload String or byte[] to S3 Bucket.
AsyncRequestBody requestBody1 = AsyncRequestBody.fromBytes(myByteArray);

AsyncRequestBody requestBody2 = AsyncRequestBody.fromString(myString);  

UploadRequest uploadRequest = UploadRequest.builder(). //
		requestBody(requestBody2) // AsyncRequestBody
		.putObjectRequest(req -> req.bucket(bucketName).key(key)) // bucketName & key
		.build();

Upload upload = transferManager.upload(uploadRequest);

6. Upload with Pause/Resume feature

For example, upload files using S3TransferManager with pause/resume feature.
In essence, the original file will be divided into many small parts, the parts will be uploaded. When the user pauses the upload, the unfinished upload parts will be canceled and will be re-uploaded from 0.
ResumableFileUploadThread.java
package org.o7planning.java_14245_awssdk_s3;

import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletionException;

import software.amazon.awssdk.transfer.s3.S3TransferManager;
import software.amazon.awssdk.transfer.s3.model.CompletedFileUpload;
import software.amazon.awssdk.transfer.s3.model.FileUpload;
import software.amazon.awssdk.transfer.s3.model.ResumableFileUpload;
import software.amazon.awssdk.transfer.s3.model.UploadFileRequest;

public class ResumableFileUploadThread extends Thread {
	private S3TransferManager transferManager;
	private String bucketName;
	private String key;
	private String filePath;

	private FileUpload fileUpload;
	private boolean isPause;
	private boolean successfullyCompleted;

	public ResumableFileUploadThread(S3TransferManager transferManager, //
			String bucketName, String key, String filePath) {
		super();
		this.transferManager = transferManager;
		this.bucketName = bucketName;
		this.key = key;
		this.filePath = filePath;
	}

	private String resumableStateFile(String key) {
		return key.replace('/', '_') + "_resumableFileUpload.json";
	}

	public void pauseUpload() {
		if (fileUpload == null || isPause || successfullyCompleted) {
			return;
		}
		System.out.println("--> User calls pauseUpload!");

		// Pause the upload
		ResumableFileUpload resumableFileUpload = fileUpload.pause();
		isPause = true;
		// Optionally, persist the resumableFileUpload
		Path path = Paths.get(this.resumableStateFile(key));
		resumableFileUpload.serializeToFile(path);
	}

	public void resumeUpload() {
		if (fileUpload == null || !isPause || successfullyCompleted) {
			return;
		}
		isPause = false;
		System.out.println("--> User calls resumeUpload!");

		Path path = Paths.get(this.resumableStateFile(key));
		// Retrieve the resumableFileUpload from the file
		//
		// WARNING: Test with: Awssdk 2.21.10
		// The "TransferListener" is not restored. (Need to wait for newer version of
		// Awssdk)
		//
		ResumableFileUpload persistedResumableFileUpload = ResumableFileUpload.fromFile(path);

		// Resume the upload
		this.fileUpload = transferManager.resumeUploadFile(persistedResumableFileUpload);

		this.handleCompletionState(fileUpload);
	}

	@Override
	public void run() {
		UploadFileRequest uploadFileRequest = UploadFileRequest.builder()
				.putObjectRequest(req -> req.bucket(bucketName).key(key)) //
				.source(Paths.get(filePath)) //
				.addTransferListener(new MyTransferListener()) // Listener
				.build();

		// Initiate the transfer
		this.fileUpload = transferManager.uploadFile(uploadFileRequest);

		this.handleCompletionState(fileUpload);
	}

	private void handleCompletionState(FileUpload fileUpload) {
		try {
			CompletedFileUpload completedFileUpload = fileUpload //
					.completionFuture() // CompletableFuture<CompletedDirectoryUpload>
					.join(); // Wait until completed, and return value.
			//
			successfullyCompleted = true;
			System.out.println("handleCompletionState: SuccessfullyCompleted");
			System.out.println(" --> " + completedFileUpload.toString());
		} catch (CancellationException e) {
			System.out.println("handleCompletionState: CancellationException");
			System.out.println(" --> (It could be by the user calling the pause method.)");
		} catch (CompletionException e) {
			System.out.println("handleCompletionState: Completed with error: " + e);
		}
	}
}
Write a class that implements the TransferListener interface to listen for data transfer status.
MyTransferListener.java
package org.o7planning.java_14245_awssdk_s3;

import software.amazon.awssdk.transfer.s3.progress.TransferListener;
import software.amazon.awssdk.transfer.s3.progress.TransferListener.Context.BytesTransferred;
import software.amazon.awssdk.transfer.s3.progress.TransferListener.Context.TransferComplete;
import software.amazon.awssdk.transfer.s3.progress.TransferListener.Context.TransferFailed;
import software.amazon.awssdk.transfer.s3.progress.TransferListener.Context.TransferInitiated;
import software.amazon.awssdk.transfer.s3.progress.TransferProgressSnapshot;

public class MyTransferListener implements TransferListener {

	@Override
	public void bytesTransferred(BytesTransferred context) {
		TransferProgressSnapshot progress = context.progressSnapshot();
		System.out.println("MyTransferListener.bytesTransferred: " //
				+ (progress.ratioTransferred().getAsDouble() * 100) + " %");
	}

	@Override
	public void transferInitiated(TransferInitiated context) {
		TransferProgressSnapshot progress = context.progressSnapshot();
		System.out.println("MyTransferListener.transferInitiated: " //
				+ progress.ratioTransferred().getAsDouble());
	}

	@Override
	public void transferComplete(TransferComplete context) {
		TransferProgressSnapshot progress = context.progressSnapshot();
		System.out.println("MyTransferListener.bytesTransferred: " //
				+ progress.ratioTransferred().getAsDouble());
	}

	@Override
	public void transferFailed(TransferFailed context) {
		System.out.println("MyTransferListener.transferFailed (Or pause called): " //
				+ context.exception().toString());
	}
}
Write a class to simulate the user pausing the upload and resuming the upload later.
ResumableFileUploadThreadTest.java
package org.o7planning.java_14245_awssdk_s3;

import java.time.Duration;

import org.o7planning.java_14245_awssdk_s3.utils.MyUtils;

import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.transfer.s3.S3TransferManager;

public class ResumableFileUploadThreadTest {
	private static final Region MY_BUCKET_REGION = Region.EU_CENTRAL_1;
	private static final String BUCKET_NAME = "my-bucket-name";

	public static void main(String[] args) {
		S3TransferManager transferManager = MyUtils.createS3TransferManager(MY_BUCKET_REGION);

		// Test with a large File...
		ResumableFileUploadThread uploadThread = new ResumableFileUploadThread(transferManager, //
				BUCKET_NAME, // bucketName
				"static/videos/sample-15s.mp4", // key
				"/Volumes/New/Test/sample-15s.mp4" // filePath
		);
		//
		uploadThread.start();

		try {
			Thread.sleep(Duration.ofSeconds(20));
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		// The User calls pauseUpload method.
		uploadThread.pauseUpload();
		try {
			Thread.sleep(Duration.ofSeconds(1));
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		// The User calls resumeUpload method.
		uploadThread.resumeUpload();
	} 
}
Output:
MyTransferListener.transferInitiated: 0.0
MyTransferListener.bytesTransferred: 12.006569699927647 %
--> User calls pauseUpload!
handleCompletionState: CancellationException
 --> (It could be by the user calling the pause method.)
MyTransferListener.transferFailed (Or pause called): java.util.concurrent.CancellationException
--> User calls resumeUpload!
handleCompletionState: SuccessfullyCompleted
 --> CompletedFileUpload(response=PutObjectResponse(ETag="3a1a600c79846a4af23e4eaeb62d4f77-2", ServerSideEncryption=AES256))
When the user pauses the upload, a JSON file can be saved on the user's computer, which contains the current status of the upload. This file helps restore the upload status when the user wants to continue uploading later. Its content is like below:
Resumable Status File.
{
  "fileLength": 11916526,
  "fileLastModified": 1698260287,
  "multipartUploadId": "HM38UOktCecLrhtqIF_1jqLx",
  "partSizeInBytes": 10485760,
  "totalParts": 2,
  "transferredParts": 1,
  "uploadFileRequest": {
    "source": "/Volumes/New/Test/sample-15s.mp4",
    "putObjectRequest": {
      "Bucket": "my-bucket-name",
      "Key": "static/videos/sample-15s.mp4"
    }
  }
}