From 17d0940f5d076bdf78546c75edd3feb3705e3800 Mon Sep 17 00:00:00 2001 From: Chenyuan Lee <35975040+cylee99@users.noreply.github.com> Date: Tue, 3 Jan 2023 15:10:31 -0800 Subject: [PATCH] Use AFTER_SEQUENCE_NUMBER iterator type for expired iterator request (#1014) * Use AFTER_SEQUENCE_NUMBER iterator type for expired iterator request * Update Java docs and minor refactoring * Fix Java doc Co-authored-by: Chenyuan Lee --- .../kinesis/retrieval/IteratorBuilder.java | 35 +++++++++++++++++-- .../retrieval/polling/KinesisDataFetcher.java | 22 +++++++++--- .../polling/PrefetchRecordsPublisher.java | 2 ++ .../retrieval/IteratorBuilderTest.java | 6 ++++ .../polling/KinesisDataFetcherTest.java | 27 ++++++++++++++ 5 files changed, 85 insertions(+), 7 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/IteratorBuilder.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/IteratorBuilder.java index 2b49e0319..9e9adf91c 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/IteratorBuilder.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/IteratorBuilder.java @@ -41,11 +41,42 @@ public static StartingPosition.Builder reconnectRequest(StartingPosition.Builder ShardIteratorType.AFTER_SEQUENCE_NUMBER); } + /** + * Creates a GetShardIteratorRequest builder that uses AT_SEQUENCE_NUMBER ShardIteratorType. + * + * @param builder An initial GetShardIteratorRequest builder to be updated. + * @param sequenceNumber The sequence number to restart the request from. + * @param initialPosition One of LATEST, TRIM_HORIZON, or AT_TIMESTAMP. + * @return An updated GetShardIteratorRequest.Builder. + */ public static GetShardIteratorRequest.Builder request(GetShardIteratorRequest.Builder builder, - String sequenceNumber, InitialPositionInStreamExtended initialPosition) { + String sequenceNumber, + InitialPositionInStreamExtended initialPosition) { + return getShardIteratorRequest(builder, sequenceNumber, initialPosition, ShardIteratorType.AT_SEQUENCE_NUMBER); + + } + + /** + * Creates a GetShardIteratorRequest builder that uses AFTER_SEQUENCE_NUMBER ShardIteratorType. + * + * @param builder An initial GetShardIteratorRequest builder to be updated. + * @param sequenceNumber The sequence number to restart the request from. + * @param initialPosition One of LATEST, TRIM_HORIZON, or AT_TIMESTAMP. + * @return An updated GetShardIteratorRequest.Builder. + */ + public static GetShardIteratorRequest.Builder reconnectRequest(GetShardIteratorRequest.Builder builder, + String sequenceNumber, + InitialPositionInStreamExtended initialPosition) { + return getShardIteratorRequest(builder, sequenceNumber, initialPosition, ShardIteratorType.AFTER_SEQUENCE_NUMBER); + } + + private static GetShardIteratorRequest.Builder getShardIteratorRequest(GetShardIteratorRequest.Builder builder, + String sequenceNumber, + InitialPositionInStreamExtended initialPosition, + ShardIteratorType shardIteratorType) { return apply(builder, GetShardIteratorRequest.Builder::shardIteratorType, GetShardIteratorRequest.Builder::timestamp, GetShardIteratorRequest.Builder::startingSequenceNumber, initialPosition, sequenceNumber, - ShardIteratorType.AT_SEQUENCE_NUMBER); + shardIteratorType); } private final static Map SHARD_ITERATOR_MAPPING; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java index 223ab367f..8d36ea8aa 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcher.java @@ -223,6 +223,12 @@ public void initialize(final ExtendedSequenceNumber initialCheckpoint, @Override public void advanceIteratorTo(final String sequenceNumber, final InitialPositionInStreamExtended initialPositionInStream) { + advanceIteratorTo(sequenceNumber, initialPositionInStream, false); + } + + private void advanceIteratorTo(final String sequenceNumber, + final InitialPositionInStreamExtended initialPositionInStream, + boolean isIteratorRestart) { if (sequenceNumber == null) { throw new IllegalArgumentException("SequenceNumber should not be null: shardId " + shardId); } @@ -231,8 +237,12 @@ public void advanceIteratorTo(final String sequenceNumber, GetShardIteratorRequest.Builder builder = KinesisRequestsBuilder.getShardIteratorRequestBuilder() .streamName(streamIdentifier.streamName()).shardId(shardId); - GetShardIteratorRequest request = IteratorBuilder.request(builder, sequenceNumber, initialPositionInStream) - .build(); + GetShardIteratorRequest request; + if (isIteratorRestart) { + request = IteratorBuilder.reconnectRequest(builder, sequenceNumber, initialPositionInStream).build(); + } else { + request = IteratorBuilder.request(builder, sequenceNumber, initialPositionInStream).build(); + } // TODO: Check if this metric is fine to be added final MetricsScope metricsScope = MetricsUtil.createMetricsWithOperation(metricsFactory, OPERATION); @@ -270,8 +280,8 @@ public void advanceIteratorTo(final String sequenceNumber, } /** - * Gets a new iterator from the last known sequence number i.e. the sequence number of the last record from the last - * records call. + * Gets a new next shard iterator from last known sequence number i.e. the sequence number of the last + * record from the last records call. */ @Override public void restartIterator() { @@ -279,7 +289,9 @@ public void restartIterator() { throw new IllegalStateException( "Make sure to initialize the KinesisDataFetcher before restarting the iterator."); } - advanceIteratorTo(lastKnownSequenceNumber, initialPositionInStream); + log.debug("Restarting iterator for sequence number {} on shard id {}", + lastKnownSequenceNumber, streamAndShardId); + advanceIteratorTo(lastKnownSequenceNumber, initialPositionInStream, true); } @Override diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java index ce7e7bb2c..1a49cdfbe 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java @@ -502,6 +502,8 @@ private void makeRetrievalAttempt() { calculateHighestSequenceNumber(processRecordsInput), getRecordsResult.nextShardIterator(), PrefetchRecordsRetrieved.generateBatchUniqueIdentifier()); publisherSession.highestSequenceNumber(recordsRetrieved.lastBatchSequenceNumber); + log.debug("Last sequence number retrieved for streamAndShardId {} is {}", streamAndShardId, + recordsRetrieved.lastBatchSequenceNumber); addArrivedRecordsInput(recordsRetrieved); drainQueueForRequests(); } catch (PositionResetException pse) { diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/IteratorBuilderTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/IteratorBuilderTest.java index 5b04bf8da..db28261e0 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/IteratorBuilderTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/IteratorBuilderTest.java @@ -62,6 +62,12 @@ public void getShardSequenceNumberTest() { sequenceNumber(this::gsiBase, this::verifyGsiBase, IteratorBuilder::request, WrappedRequest::wrapped); } + @Test + public void getShardIteratorReconnectTest() { + sequenceNumber(this::gsiBase, this::verifyGsiBase, IteratorBuilder::reconnectRequest, WrappedRequest::wrapped, + ShardIteratorType.AFTER_SEQUENCE_NUMBER); + } + @Test public void subscribeTimestampTest() { timeStampTest(this::stsBase, this::verifyStsBase, IteratorBuilder::request, WrappedRequest::wrapped); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcherTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcherTest.java index 74b0c1250..d75f701d8 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcherTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/KinesisDataFetcherTest.java @@ -435,6 +435,33 @@ public void testRestartIterator() throws Exception { assertEquals(restartGetRecordsResponse, kinesisDataFetcher.getRecords().accept()); } + @Test + public void testRestartIteratorUsesAfterSequenceNumberIteratorType() throws Exception { + final String iterator = "iterator"; + final String sequenceNumber = "123"; + + final ArgumentCaptor shardIteratorRequestCaptor = + ArgumentCaptor.forClass(GetShardIteratorRequest.class); + + when(kinesisClient.getShardIterator(shardIteratorRequestCaptor.capture())). + thenReturn(makeGetShardIteratorResonse(iterator)); + + kinesisDataFetcher.initialize(sequenceNumber, INITIAL_POSITION_LATEST); + kinesisDataFetcher.restartIterator(); + // The advanceIteratorTo call should not use AFTER_SEQUENCE_NUMBER iterator + // type unless called by restartIterator + kinesisDataFetcher.advanceIteratorTo(sequenceNumber, INITIAL_POSITION_LATEST); + + final List shardIteratorRequests = shardIteratorRequestCaptor.getAllValues(); + assertEquals(3, shardIteratorRequests.size()); + assertEquals(ShardIteratorType.AT_SEQUENCE_NUMBER.toString(), + shardIteratorRequests.get(0).shardIteratorTypeAsString()); + assertEquals(ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(), + shardIteratorRequests.get(1).shardIteratorTypeAsString()); + assertEquals(ShardIteratorType.AT_SEQUENCE_NUMBER.toString(), + shardIteratorRequests.get(2).shardIteratorTypeAsString()); + } + @Test(expected = IllegalStateException.class) public void testRestartIteratorNotInitialized() { kinesisDataFetcher.restartIterator();