Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fail UploadDirectory if xform throws #5756

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/next-release/bugfix-S3TransferManager-07e5b6b.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"type": "bugfix",
"category": "S3 Transfer Manager",
"contributor": "",
"description": "Fix an issue where if the request transformation function given to UploadDirectoryRequest throws an error when it is invoked, the error would be silently swallowed. Now, the completion future will be completed exceptionally if the function throws."
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class AsyncBufferingSubscriber<T> implements Subscriber<T> {
private final int maxConcurrentExecutions;
private final AtomicInteger numRequestsInFlight;
private volatile boolean upstreamDone;
private Subscription subscription;
private volatile Subscription subscription;
dagnir marked this conversation as resolved.
Show resolved Hide resolved

private final Set<CompletableFuture<?>> requestsInFlight;

Expand Down Expand Up @@ -75,7 +75,18 @@ public void onSubscribe(Subscription subscription) {
@Override
public void onNext(T item) {
numRequestsInFlight.incrementAndGet();
CompletableFuture<?> currentRequest = consumer.apply(item);
CompletableFuture<?> currentRequest;

try {
currentRequest = consumer.apply(item);
} catch (Throwable t) {
synchronized (this) {
subscription.cancel();
}
onError(t);
return;
}

requestsInFlight.add(currentRequest);
currentRequest.whenComplete((r, t) -> {
checkForCompletion(numRequestsInFlight.decrementAndGet());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,15 @@ private void doUploadDirectory(CompletableFuture<CompletedDirectoryUpload> retur
iterablePublisher.subscribe(bufferingSubscriber);
CompletableFutureUtils.forwardExceptionTo(returnFuture, allOfFutures);

allOfFutures.whenComplete((r, t) -> returnFuture.complete(CompletedDirectoryUpload.builder()
.failedTransfers(failedFileUploads)
.build()));
allOfFutures.whenComplete((r, t) -> {
if (t != null) {
returnFuture.completeExceptionally(SdkClientException.create("Failed to send request", t));
dagnir marked this conversation as resolved.
Show resolved Hide resolved
return;
}
returnFuture.complete(CompletedDirectoryUpload.builder()
.failedTransfers(failedFileUploads)
.build());
});
}

private void validateDirectory(UploadDirectoryRequest uploadDirectoryRequest) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
package software.amazon.awssdk.transfer.s3.internal;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import io.reactivex.Flowable;
import io.reactivex.Observable;
Expand All @@ -36,6 +40,7 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.reactivestreams.Subscription;
import software.amazon.awssdk.utils.async.SimplePublisher;

class AsyncBufferingSubscriberTest {
Expand Down Expand Up @@ -108,4 +113,21 @@ void onErrorInvoked_shouldCompleteFutureExceptionallyAndCancelRequestsFuture() {
assertThat(futures.get(0)).isCancelled();
assertThat(futures.get(1)).isCancelled();
}

@Test
public void consumerFunctionThrows_shouldCancelSubscriptionAndCompleteFutureExceptionally() {
RuntimeException exception = new RuntimeException("test");
CompletableFuture<Void> future = new CompletableFuture<>();
AsyncBufferingSubscriber<String> subscriber = new AsyncBufferingSubscriber<>(s -> {
throw exception;
}, future, 1);

Subscription mockSubscription = mock(Subscription.class);

subscriber.onSubscribe(mockSubscription);
subscriber.onNext("item");

verify(mockSubscription, times(1)).cancel();
assertThatThrownBy(future::join).hasCause(exception);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.assertj.core.util.Sets;
Expand Down Expand Up @@ -471,6 +472,56 @@ void downloadDirectory_notDirectory_shouldCompleteFutureExceptionally(FileSystem
.hasMessageContaining("is not a directory").hasCauseInstanceOf(IllegalArgumentException.class);
}

@Test
void downloadDirectory_withDownloadRequestTransformer_transformerThrows_failsDownload() {
stubSuccessfulListObjects(listObjectsHelper, "key1", "key2");

FileDownload fileDownload = newSuccessfulDownload();
FileDownload fileDownload2 = newSuccessfulDownload();

when(singleDownloadFunction.apply(any(DownloadFileRequest.class))).thenReturn(fileDownload, fileDownload2);


RuntimeException exception = new RuntimeException("boom");
Consumer<DownloadFileRequest.Builder> downloadFileRequestTransformer = b -> {
throw exception;
};

DirectoryDownload downloadDirectory =
downloadDirectoryHelper.downloadDirectory(DownloadDirectoryRequest.builder()
.destination(directory)
.bucket("bucket")
.downloadFileRequestTransformer(downloadFileRequestTransformer)
.build());

assertThatThrownBy(downloadDirectory.completionFuture()::join).getCause().hasCause(exception);
}

@Test
void downloadDirectory_withListObjectsRequestTransformer_transformerThrows_failsDownload() {
stubSuccessfulListObjects(listObjectsHelper, "key1", "key2");

FileDownload fileDownload = newSuccessfulDownload();
FileDownload fileDownload2 = newSuccessfulDownload();

when(singleDownloadFunction.apply(any(DownloadFileRequest.class))).thenReturn(fileDownload, fileDownload2);


RuntimeException exception = new RuntimeException("boom");
Consumer<ListObjectsV2Request.Builder> downloadFileRequestTransformer = b -> {
throw exception;
};

DirectoryDownload downloadDirectory =
downloadDirectoryHelper.downloadDirectory(DownloadDirectoryRequest.builder()
.destination(directory)
.bucket("bucket")
.listObjectsV2RequestTransformer(downloadFileRequestTransformer)
.build());

assertThatThrownBy(downloadDirectory.completionFuture()::join).hasCause(exception);
}

private static DefaultFileDownload completedDownload() {
return new DefaultFileDownload(CompletableFuture.completedFuture(CompletedFileDownload.builder()
.response(GetObjectResponse.builder().build())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,29 @@ void uploadDirectory_notDirectoryFollowSymlinkTrue_shouldCompleteSuccessfully()
assertThat(keys).containsOnly("2.txt");
}

@Test
public void uploadDirectory_requestTransformFunctionThrows_failsUpload() {
when(singleUploadFunction.apply(any())).thenReturn(null);

RuntimeException exception = new RuntimeException("boom");

Consumer<UploadFileRequest.Builder> uploadFileRequestTransformer = r -> {
throw exception;
};

CompletableFuture<CompletedDirectoryUpload> uploadFuture =
uploadDirectoryHelper.uploadDirectory(
UploadDirectoryRequest.builder()
.source(directory)
.bucket("bucket")
.uploadFileRequestTransformer(uploadFileRequestTransformer)
.build())
.completionFuture();

assertThatThrownBy(uploadFuture::join).getCause().hasCause(exception);
}


private DefaultFileUpload completedUpload() {
return new DefaultFileUpload(CompletableFuture.completedFuture(CompletedFileUpload.builder()
.response(PutObjectResponse.builder().build())
Expand Down
Loading