Skip to content

Commit

Permalink
Use AFTER_SEQUENCE_NUMBER iterator type for expired iterator request (#…
Browse files Browse the repository at this point in the history
…1014)

* Use AFTER_SEQUENCE_NUMBER iterator type for expired iterator request

* Update Java docs and minor refactoring

* Fix Java doc

Co-authored-by: Chenyuan Lee <[email protected]>
  • Loading branch information
cylee99 and chenylee-aws authored Jan 3, 2023
1 parent 676bb86 commit 17d0940
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,42 @@ public static StartingPosition.Builder reconnectRequest(StartingPosition.Builder
ShardIteratorType.AFTER_SEQUENCE_NUMBER);
}

/**
* Creates a GetShardIteratorRequest builder that uses AT_SEQUENCE_NUMBER ShardIteratorType.
*
* @param builder An initial GetShardIteratorRequest builder to be updated.
* @param sequenceNumber The sequence number to restart the request from.
* @param initialPosition One of LATEST, TRIM_HORIZON, or AT_TIMESTAMP.
* @return An updated GetShardIteratorRequest.Builder.
*/
public static GetShardIteratorRequest.Builder request(GetShardIteratorRequest.Builder builder,
String sequenceNumber, InitialPositionInStreamExtended initialPosition) {
String sequenceNumber,
InitialPositionInStreamExtended initialPosition) {
return getShardIteratorRequest(builder, sequenceNumber, initialPosition, ShardIteratorType.AT_SEQUENCE_NUMBER);

}

/**
* Creates a GetShardIteratorRequest builder that uses AFTER_SEQUENCE_NUMBER ShardIteratorType.
*
* @param builder An initial GetShardIteratorRequest builder to be updated.
* @param sequenceNumber The sequence number to restart the request from.
* @param initialPosition One of LATEST, TRIM_HORIZON, or AT_TIMESTAMP.
* @return An updated GetShardIteratorRequest.Builder.
*/
public static GetShardIteratorRequest.Builder reconnectRequest(GetShardIteratorRequest.Builder builder,
String sequenceNumber,
InitialPositionInStreamExtended initialPosition) {
return getShardIteratorRequest(builder, sequenceNumber, initialPosition, ShardIteratorType.AFTER_SEQUENCE_NUMBER);
}

private static GetShardIteratorRequest.Builder getShardIteratorRequest(GetShardIteratorRequest.Builder builder,
String sequenceNumber,
InitialPositionInStreamExtended initialPosition,
ShardIteratorType shardIteratorType) {
return apply(builder, GetShardIteratorRequest.Builder::shardIteratorType, GetShardIteratorRequest.Builder::timestamp,
GetShardIteratorRequest.Builder::startingSequenceNumber, initialPosition, sequenceNumber,
ShardIteratorType.AT_SEQUENCE_NUMBER);
shardIteratorType);
}

private final static Map<String, ShardIteratorType> SHARD_ITERATOR_MAPPING;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,12 @@ public void initialize(final ExtendedSequenceNumber initialCheckpoint,
@Override
public void advanceIteratorTo(final String sequenceNumber,
final InitialPositionInStreamExtended initialPositionInStream) {
advanceIteratorTo(sequenceNumber, initialPositionInStream, false);
}

private void advanceIteratorTo(final String sequenceNumber,
final InitialPositionInStreamExtended initialPositionInStream,
boolean isIteratorRestart) {
if (sequenceNumber == null) {
throw new IllegalArgumentException("SequenceNumber should not be null: shardId " + shardId);
}
Expand All @@ -231,8 +237,12 @@ public void advanceIteratorTo(final String sequenceNumber,

GetShardIteratorRequest.Builder builder = KinesisRequestsBuilder.getShardIteratorRequestBuilder()
.streamName(streamIdentifier.streamName()).shardId(shardId);
GetShardIteratorRequest request = IteratorBuilder.request(builder, sequenceNumber, initialPositionInStream)
.build();
GetShardIteratorRequest request;
if (isIteratorRestart) {
request = IteratorBuilder.reconnectRequest(builder, sequenceNumber, initialPositionInStream).build();
} else {
request = IteratorBuilder.request(builder, sequenceNumber, initialPositionInStream).build();
}

// TODO: Check if this metric is fine to be added
final MetricsScope metricsScope = MetricsUtil.createMetricsWithOperation(metricsFactory, OPERATION);
Expand Down Expand Up @@ -270,16 +280,18 @@ public void advanceIteratorTo(final String sequenceNumber,
}

/**
* Gets a new iterator from the last known sequence number i.e. the sequence number of the last record from the last
* records call.
* Gets a new next shard iterator from last known sequence number i.e. the sequence number of the last
* record from the last records call.
*/
@Override
public void restartIterator() {
if (StringUtils.isEmpty(lastKnownSequenceNumber) || initialPositionInStream == null) {
throw new IllegalStateException(
"Make sure to initialize the KinesisDataFetcher before restarting the iterator.");
}
advanceIteratorTo(lastKnownSequenceNumber, initialPositionInStream);
log.debug("Restarting iterator for sequence number {} on shard id {}",
lastKnownSequenceNumber, streamAndShardId);
advanceIteratorTo(lastKnownSequenceNumber, initialPositionInStream, true);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,8 @@ private void makeRetrievalAttempt() {
calculateHighestSequenceNumber(processRecordsInput), getRecordsResult.nextShardIterator(),
PrefetchRecordsRetrieved.generateBatchUniqueIdentifier());
publisherSession.highestSequenceNumber(recordsRetrieved.lastBatchSequenceNumber);
log.debug("Last sequence number retrieved for streamAndShardId {} is {}", streamAndShardId,
recordsRetrieved.lastBatchSequenceNumber);
addArrivedRecordsInput(recordsRetrieved);
drainQueueForRequests();
} catch (PositionResetException pse) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ public void getShardSequenceNumberTest() {
sequenceNumber(this::gsiBase, this::verifyGsiBase, IteratorBuilder::request, WrappedRequest::wrapped);
}

@Test
public void getShardIteratorReconnectTest() {
sequenceNumber(this::gsiBase, this::verifyGsiBase, IteratorBuilder::reconnectRequest, WrappedRequest::wrapped,
ShardIteratorType.AFTER_SEQUENCE_NUMBER);
}

@Test
public void subscribeTimestampTest() {
timeStampTest(this::stsBase, this::verifyStsBase, IteratorBuilder::request, WrappedRequest::wrapped);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,33 @@ public void testRestartIterator() throws Exception {
assertEquals(restartGetRecordsResponse, kinesisDataFetcher.getRecords().accept());
}

@Test
public void testRestartIteratorUsesAfterSequenceNumberIteratorType() throws Exception {
final String iterator = "iterator";
final String sequenceNumber = "123";

final ArgumentCaptor<GetShardIteratorRequest> shardIteratorRequestCaptor =
ArgumentCaptor.forClass(GetShardIteratorRequest.class);

when(kinesisClient.getShardIterator(shardIteratorRequestCaptor.capture())).
thenReturn(makeGetShardIteratorResonse(iterator));

kinesisDataFetcher.initialize(sequenceNumber, INITIAL_POSITION_LATEST);
kinesisDataFetcher.restartIterator();
// The advanceIteratorTo call should not use AFTER_SEQUENCE_NUMBER iterator
// type unless called by restartIterator
kinesisDataFetcher.advanceIteratorTo(sequenceNumber, INITIAL_POSITION_LATEST);

final List<GetShardIteratorRequest> shardIteratorRequests = shardIteratorRequestCaptor.getAllValues();
assertEquals(3, shardIteratorRequests.size());
assertEquals(ShardIteratorType.AT_SEQUENCE_NUMBER.toString(),
shardIteratorRequests.get(0).shardIteratorTypeAsString());
assertEquals(ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(),
shardIteratorRequests.get(1).shardIteratorTypeAsString());
assertEquals(ShardIteratorType.AT_SEQUENCE_NUMBER.toString(),
shardIteratorRequests.get(2).shardIteratorTypeAsString());
}

@Test(expected = IllegalStateException.class)
public void testRestartIteratorNotInitialized() {
kinesisDataFetcher.restartIterator();
Expand Down

0 comments on commit 17d0940

Please sign in to comment.