diff --git a/pom.xml b/pom.xml
index 8aaa95fe7..e2edeeeea 100644
--- a/pom.xml
+++ b/pom.xml
@@ -135,6 +135,13 @@
test
+
+ software.amazon.awssdk
+ s3-transfer-manager
+ true
+ 2.28.28
+
+
com.amazonaws
aws-java-sdk-kms
diff --git a/src/test/java/software/amazon/encryption/s3/S3AsyncEncryptionClientTest.java b/src/test/java/software/amazon/encryption/s3/S3AsyncEncryptionClientTest.java
index ec0e90707..19740515a 100644
--- a/src/test/java/software/amazon/encryption/s3/S3AsyncEncryptionClientTest.java
+++ b/src/test/java/software/amazon/encryption/s3/S3AsyncEncryptionClientTest.java
@@ -37,11 +37,19 @@
import software.amazon.awssdk.services.s3.model.CopyObjectResponse;
import software.amazon.awssdk.services.s3.model.DeleteObjectResponse;
import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse;
+import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
import software.amazon.awssdk.services.s3.model.S3Exception;
import software.amazon.awssdk.services.s3.multipart.MultipartConfiguration;
+import software.amazon.awssdk.transfer.s3.S3TransferManager;
+import software.amazon.awssdk.transfer.s3.model.CompletedDownload;
+import software.amazon.awssdk.transfer.s3.model.Download;
+import software.amazon.awssdk.transfer.s3.model.DownloadRequest;
+import software.amazon.awssdk.transfer.s3.model.Upload;
+import software.amazon.awssdk.transfer.s3.model.UploadRequest;
+import software.amazon.awssdk.transfer.s3.progress.LoggingTransferListener;
import software.amazon.encryption.s3.internal.InstructionFileConfig;
import software.amazon.encryption.s3.materials.KmsKeyring;
import software.amazon.encryption.s3.utils.BoundedInputStream;
@@ -61,6 +69,7 @@
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -141,6 +150,142 @@ public void asyncCustomConfiguration() {
s3Client.close();
}
+ @Test
+ public void transferManagerUploadString() {
+ final String objectKey = appendTestSuffix("tm-string");
+ final String input = "short test of s3 encryption client with transfer manager";
+ S3AsyncClient v3AsyncClient = S3AsyncEncryptionClient.builder()
+ .aesKey(AES_KEY)
+ .build();
+ S3TransferManager transferManager =
+ S3TransferManager.builder()
+ .s3Client(v3AsyncClient)
+ .build();
+
+ Upload upload = transferManager.upload(UploadRequest.builder()
+ .putObjectRequest((builder -> builder
+ .bucket(BUCKET)
+ .key(objectKey)
+ .build()))
+ .requestBody(AsyncRequestBody.fromString(input))
+ .build());
+ upload.completionFuture().join();
+
+ // tm download
+ Download> download = transferManager.download(DownloadRequest.builder()
+ .getObjectRequest(GetObjectRequest.builder()
+ .bucket(BUCKET)
+ .key(objectKey)
+ .build())
+ .responseTransformer(AsyncResponseTransformer.toBytes())
+ .build());
+ CompletedDownload> resp = download.completionFuture().join();
+ assertEquals(input, resp.result().asUtf8String());
+
+ // Cleanup
+ deleteObject(BUCKET, objectKey, v3AsyncClient);
+ transferManager.close();
+ }
+
+ @Test
+ public void transferManagerUploadStream() throws IOException {
+ final String objectKey = appendTestSuffix("tm-stream");
+
+ final long fileSizeLimit = 1024 * 1024 * 100;
+ final InputStream inputStream = new BoundedInputStream(fileSizeLimit);
+ final InputStream objectStreamForResult = new BoundedInputStream(fileSizeLimit);
+ final InputStream objectStreamForResultTm = new BoundedInputStream(fileSizeLimit);
+
+ S3AsyncClient v3AsyncClient = S3AsyncEncryptionClient.builder()
+ .aesKey(AES_KEY)
+ .enableDelayedAuthenticationMode(true)
+ .enableMultipartPutObject(true)
+ .build();
+ S3TransferManager transferManager =
+ S3TransferManager.builder()
+ .s3Client(v3AsyncClient)
+ .build();
+
+ ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
+ Upload upload = transferManager.upload(UploadRequest.builder()
+ .putObjectRequest((builder -> builder
+ .bucket(BUCKET)
+ .key(objectKey)
+ .build()))
+ .requestBody(AsyncRequestBody.fromInputStream(inputStream, fileSizeLimit, singleThreadExecutor))
+ .addTransferListener(LoggingTransferListener.create())
+ .build());
+ upload.completionFuture().join();
+ singleThreadExecutor.shutdown();
+
+ // tm download
+ Download> download = transferManager.download(DownloadRequest.builder()
+ .getObjectRequest(GetObjectRequest.builder()
+ .bucket(BUCKET)
+ .key(objectKey)
+ .build())
+ .responseTransformer(AsyncResponseTransformer.toBlockingInputStream())
+ .build());
+
+ CompletedDownload> resp = download.completionFuture().join();
+ assertTrue(IOUtils.contentEquals(objectStreamForResultTm, resp.result()));
+
+ // Cleanup
+ deleteObject(BUCKET, objectKey, v3AsyncClient);
+ transferManager.close();
+ }
+
+ @Test
+ public void transferManagerUploadStreamCrt() throws ExecutionException, InterruptedException, IOException {
+ final String objectKey = appendTestSuffix("tm-stream-crt");
+
+ final long fileSizeLimit = 1024 * 1024 * 100;
+ final InputStream inputStream = new BoundedInputStream(fileSizeLimit);
+ final InputStream objectStreamForResult = new BoundedInputStream(fileSizeLimit);
+ final InputStream objectStreamForResultTm = new BoundedInputStream(fileSizeLimit);
+
+ S3AsyncClient wrappedCrt = S3AsyncClient.crtBuilder()
+ .minimumPartSizeInBytes(8000000L)
+ .thresholdInBytes(500L)
+ .build();
+ S3AsyncClient v3AsyncClient = S3AsyncEncryptionClient.builder()
+ .wrappedClient(wrappedCrt)
+ .aesKey(AES_KEY)
+ .enableDelayedAuthenticationMode(true)
+ .enableMultipartPutObject(true)
+ .build();
+ S3TransferManager transferManager =
+ S3TransferManager.builder()
+ .s3Client(v3AsyncClient)
+ .build();
+
+ ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
+ Upload upload = transferManager.upload(UploadRequest.builder()
+ .putObjectRequest((builder -> builder
+ .bucket(BUCKET)
+ .key(objectKey)
+ .build()))
+ .requestBody(AsyncRequestBody.fromInputStream(inputStream, fileSizeLimit, singleThreadExecutor))
+ .addTransferListener(LoggingTransferListener.create())
+ .build());
+ upload.completionFuture().join();
+ singleThreadExecutor.shutdown();
+
+ Download> download = transferManager.download(DownloadRequest.builder()
+ .getObjectRequest(GetObjectRequest.builder()
+ .bucket(BUCKET)
+ .key(objectKey)
+ .build())
+ .responseTransformer(AsyncResponseTransformer.toBlockingInputStream())
+ .build());
+ download.completionFuture().join();
+ CompletedDownload> resp = download.completionFuture().get();
+
+ assertTrue(IOUtils.contentEquals(objectStreamForResultTm, resp.result()));
+ deleteObject(BUCKET, objectKey, v3AsyncClient);
+ transferManager.close();
+ }
+
@Test
public void asyncTopLevelConfigurationAllOptions() {
final String objectKey = appendTestSuffix("async-top-level-all-options");