Skip to content

Commit

Permalink
Fail UploadDirectory if xform throws
Browse files Browse the repository at this point in the history
This fixes an issue where if the request transformation function given
to UploadDirectoryRequest throws an error when it is invoked, the error
would be silently swallowed. Now, the completion future will be
completed exceptionally if the function throws.
  • Loading branch information
dagnir committed Dec 18, 2024
1 parent 84b5a6c commit 87c828e
Show file tree
Hide file tree
Showing 6 changed files with 122 additions and 4 deletions.
6 changes: 6 additions & 0 deletions .changes/next-release/bugfix-S3TransferManager-07e5b6b.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"type": "bugfix",
"category": "S3 Transfer Manager",
"contributor": "",
"description": "Fix an issue where if the request transformation function given to UploadDirectoryRequest throws an error when it is invoked, the error would be silently swallowed. Now, the completion future will be completed exceptionally if the function throws."
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,17 @@ public void onSubscribe(Subscription subscription) {
@Override
public void onNext(T item) {
numRequestsInFlight.incrementAndGet();
CompletableFuture<?> currentRequest = consumer.apply(item);
CompletableFuture<?> currentRequest;

try {
currentRequest = consumer.apply(item);
} catch (Throwable t) {
log.error(() -> "Consumer function threw error. Cancelling subscription and completing future exceptionally.", t);
subscription.cancel();
onError(t);
return;
}

requestsInFlight.add(currentRequest);
currentRequest.whenComplete((r, t) -> {
checkForCompletion(numRequestsInFlight.decrementAndGet());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,15 @@ private void doUploadDirectory(CompletableFuture<CompletedDirectoryUpload> retur
iterablePublisher.subscribe(bufferingSubscriber);
CompletableFutureUtils.forwardExceptionTo(returnFuture, allOfFutures);

allOfFutures.whenComplete((r, t) -> returnFuture.complete(CompletedDirectoryUpload.builder()
.failedTransfers(failedFileUploads)
.build()));
allOfFutures.whenComplete((r, t) -> {
if (t != null) {
returnFuture.completeExceptionally(SdkClientException.create("Failed to send request", t));
return;
}
returnFuture.complete(CompletedDirectoryUpload.builder()
.failedTransfers(failedFileUploads)
.build());
});
}

private void validateDirectory(UploadDirectoryRequest uploadDirectoryRequest) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
package software.amazon.awssdk.transfer.s3.internal;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import io.reactivex.Flowable;
import io.reactivex.Observable;
Expand All @@ -36,6 +40,7 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.reactivestreams.Subscription;
import software.amazon.awssdk.utils.async.SimplePublisher;

class AsyncBufferingSubscriberTest {
Expand Down Expand Up @@ -108,4 +113,21 @@ void onErrorInvoked_shouldCompleteFutureExceptionallyAndCancelRequestsFuture() {
assertThat(futures.get(0)).isCancelled();
assertThat(futures.get(1)).isCancelled();
}

@Test
public void consumerFunctionThrows_shouldCancelSubscriptionAndCompleteFutureExceptionally() {
RuntimeException exception = new RuntimeException("test");
CompletableFuture<Void> future = new CompletableFuture<>();
AsyncBufferingSubscriber<String> subscriber = new AsyncBufferingSubscriber<>(s -> {
throw exception;
}, future, 1);

Subscription mockSubscription = mock(Subscription.class);

subscriber.onSubscribe(mockSubscription);
subscriber.onNext("item");

verify(mockSubscription, times(1)).cancel();
assertThatThrownBy(future::join).hasCause(exception);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.assertj.core.util.Sets;
Expand Down Expand Up @@ -471,6 +472,56 @@ void downloadDirectory_notDirectory_shouldCompleteFutureExceptionally(FileSystem
.hasMessageContaining("is not a directory").hasCauseInstanceOf(IllegalArgumentException.class);
}

@Test
void downloadDirectory_withDownloadRequestTransformer_transformerThrows_failsDownload() {
stubSuccessfulListObjects(listObjectsHelper, "key1", "key2");

FileDownload fileDownload = newSuccessfulDownload();
FileDownload fileDownload2 = newSuccessfulDownload();

when(singleDownloadFunction.apply(any(DownloadFileRequest.class))).thenReturn(fileDownload, fileDownload2);


RuntimeException exception = new RuntimeException("boom");
Consumer<DownloadFileRequest.Builder> downloadFileRequestTransformer = b -> {
throw exception;
};

DirectoryDownload downloadDirectory =
downloadDirectoryHelper.downloadDirectory(DownloadDirectoryRequest.builder()
.destination(directory)
.bucket("bucket")
.downloadFileRequestTransformer(downloadFileRequestTransformer)
.build());

assertThatThrownBy(downloadDirectory.completionFuture()::join).getCause().hasCause(exception);
}

@Test
void downloadDirectory_withListObjectsRequestTransformer_transformerThrows_failsDownload() {
stubSuccessfulListObjects(listObjectsHelper, "key1", "key2");

FileDownload fileDownload = newSuccessfulDownload();
FileDownload fileDownload2 = newSuccessfulDownload();

when(singleDownloadFunction.apply(any(DownloadFileRequest.class))).thenReturn(fileDownload, fileDownload2);


RuntimeException exception = new RuntimeException("boom");
Consumer<ListObjectsV2Request.Builder> downloadFileRequestTransformer = b -> {
throw exception;
};

DirectoryDownload downloadDirectory =
downloadDirectoryHelper.downloadDirectory(DownloadDirectoryRequest.builder()
.destination(directory)
.bucket("bucket")
.listObjectsV2RequestTransformer(downloadFileRequestTransformer)
.build());

assertThatThrownBy(downloadDirectory.completionFuture()::join).hasCause(exception);
}

private static DefaultFileDownload completedDownload() {
return new DefaultFileDownload(CompletableFuture.completedFuture(CompletedFileDownload.builder()
.response(GetObjectResponse.builder().build())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,29 @@ void uploadDirectory_notDirectoryFollowSymlinkTrue_shouldCompleteSuccessfully()
assertThat(keys).containsOnly("2.txt");
}

@Test
public void uploadDirectory_requestTransformFunctionThrows_failsUpload() {
when(singleUploadFunction.apply(any())).thenReturn(null);

RuntimeException exception = new RuntimeException("boom");

Consumer<UploadFileRequest.Builder> uploadFileRequestTransformer = r -> {
throw exception;
};

CompletableFuture<CompletedDirectoryUpload> uploadFuture =
uploadDirectoryHelper.uploadDirectory(
UploadDirectoryRequest.builder()
.source(directory)
.bucket("bucket")
.uploadFileRequestTransformer(uploadFileRequestTransformer)
.build())
.completionFuture();

assertThatThrownBy(uploadFuture::join).getCause().hasCause(exception);
}


private DefaultFileUpload completedUpload() {
return new DefaultFileUpload(CompletableFuture.completedFuture(CompletedFileUpload.builder()
.response(PutObjectResponse.builder().build())
Expand Down

0 comments on commit 87c828e

Please sign in to comment.