Skip to content

Commit

Permalink
Added metrics in ShutdownTask for scenarios when parent leases are …
Browse files Browse the repository at this point in the history
…missing. (#1080)

+ optimizations in `ShutdownTask` (e.g., `Random` static instance,
eliminated over-used Function)
+ DRY+KISS on `ShutdownTaskTest`
+ deleted some dead code
  • Loading branch information
stair-aws authored Mar 21, 2023
1 parent 177303d commit 6be92dc
Show file tree
Hide file tree
Showing 5 changed files with 244 additions and 287 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,24 +93,11 @@ enum ShardConsumerState {
}
}


/**
* The initial state that any {@link ShardConsumer} should start in.
*/
static final ConsumerState INITIAL_STATE = ShardConsumerState.WAITING_ON_PARENT_SHARDS.consumerState();

private static ConsumerState shutdownStateFor(ShutdownReason reason) {
switch (reason) {
case REQUESTED:
return ShardConsumerState.SHUTDOWN_REQUESTED.consumerState();
case SHARD_END:
case LEASE_LOST:
return ShardConsumerState.SHUTTING_DOWN.consumerState();
default:
throw new IllegalArgumentException("Unknown reason: " + reason);
}
}

/**
* This is the initial state of a shard consumer. This causes the consumer to remain blocked until the all parent
* shards have been completed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,28 +243,6 @@ private boolean shouldCallProcessRecords(List<KinesisClientRecord> records) {
return (!records.isEmpty()) || shouldCallProcessRecordsEvenForEmptyRecordList;
}

/**
* Emits metrics, and sleeps if there are no records available
*
* @param startTimeMillis
* the time when the task started
*/
private void handleNoRecords(long startTimeMillis) {
log.debug("Kinesis didn't return any records for shard {}", shardInfoId);

long sleepTimeMillis = idleTimeInMilliseconds - (System.currentTimeMillis() - startTimeMillis);
if (sleepTimeMillis > 0) {
sleepTimeMillis = Math.max(sleepTimeMillis, idleTimeInMilliseconds);
try {
log.debug("Sleeping for {} ms since there were no new records in shard {}", sleepTimeMillis,
shardInfoId);
Thread.sleep(sleepTimeMillis);
} catch (InterruptedException e) {
log.debug("ShardId {}: Sleep was interrupted", shardInfoId);
}
}
}

@Override
public TaskType taskType() {
return taskType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,13 @@ public class ShardConsumer {
private final ShardConsumerArgument shardConsumerArgument;
@NonNull
private final Optional<Long> logWarningForTaskAfterMillis;

/**
* @deprecated unused; to be removed in a "major" version bump
*/
@Deprecated
private final Function<ConsumerTask, ConsumerTask> taskMetricsDecorator;

private final int bufferSize;
private final TaskExecutionListener taskExecutionListener;
private final String streamIdentifier;
Expand Down Expand Up @@ -179,7 +185,6 @@ public void executeLifecycle() {
}
stateChangeFuture = initializeComplete();
}

} catch (InterruptedException e) {
//
// Ignored should be handled by scheduler
Expand All @@ -199,7 +204,6 @@ public void executeLifecycle() {
throw (Error) t;
}
}

}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.leases.LeaseCleanupManager;
import software.amazon.kinesis.leases.LeaseCoordinator;
import software.amazon.kinesis.leases.LeaseRefresher;
import software.amazon.kinesis.leases.ShardDetector;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.leases.UpdateField;
Expand All @@ -54,7 +55,6 @@

import java.util.Random;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
Expand All @@ -66,6 +66,14 @@
public class ShutdownTask implements ConsumerTask {
private static final String SHUTDOWN_TASK_OPERATION = "ShutdownTask";
private static final String RECORD_PROCESSOR_SHUTDOWN_METRIC = "RecordProcessor.shutdown";

/**
* Reusable, immutable {@link LeaseLostInput}.
*/
private static final LeaseLostInput LEASE_LOST_INPUT = LeaseLostInput.builder().build();

private static final Random RANDOM = new Random();

@VisibleForTesting
static final int RETRY_RANDOM_MAX_RANGE = 30;

Expand Down Expand Up @@ -101,8 +109,6 @@ public class ShutdownTask implements ConsumerTask {
@NonNull
private final LeaseCleanupManager leaseCleanupManager;

private static final Function<ShardInfo, String> leaseKeyProvider = shardInfo -> ShardInfo.getLeaseKey(shardInfo);

/*
* Invokes ShardRecordProcessor shutdown() API.
* (non-Javadoc)
Expand All @@ -114,61 +120,61 @@ public TaskResult call() {
recordProcessorCheckpointer.checkpointer().operation(SHUTDOWN_TASK_OPERATION);
final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, SHUTDOWN_TASK_OPERATION);

Exception exception;

final String leaseKey = ShardInfo.getLeaseKey(shardInfo);
try {
try {
log.debug("Invoking shutdown() for shard {} with childShards {}, concurrencyToken {}. Shutdown reason: {}",
leaseKeyProvider.apply(shardInfo), childShards, shardInfo.concurrencyToken(), reason);
leaseKey, childShards, shardInfo.concurrencyToken(), reason);

final long startTime = System.currentTimeMillis();
final Lease currentShardLease = leaseCoordinator.getCurrentlyHeldLease(leaseKeyProvider.apply(shardInfo));
final Runnable leaseLostAction = () -> shardRecordProcessor.leaseLost(LeaseLostInput.builder().build());
final Lease currentShardLease = leaseCoordinator.getCurrentlyHeldLease(leaseKey);
final Runnable leaseLostAction = () -> shardRecordProcessor.leaseLost(LEASE_LOST_INPUT);

if (reason == ShutdownReason.SHARD_END) {
try {
takeShardEndAction(currentShardLease, scope, startTime);
takeShardEndAction(currentShardLease, leaseKey, scope, startTime);
} catch (InvalidStateException e) {
// If InvalidStateException happens, it indicates we have a non recoverable error in short term.
// In this scenario, we should shutdown the shardConsumer with LEASE_LOST reason to allow other worker to take the lease and retry shutting down.
// In this scenario, we should shutdown the shardConsumer with LEASE_LOST reason to allow
// other worker to take the lease and retry shutting down.
log.warn("Lease {}: Invalid state encountered while shutting down shardConsumer with SHARD_END reason. " +
"Dropping the lease and shutting down shardConsumer using LEASE_LOST reason. ", leaseKeyProvider.apply(shardInfo), e);
dropLease(currentShardLease);
throwOnApplicationException(leaseLostAction, scope, startTime);
"Dropping the lease and shutting down shardConsumer using LEASE_LOST reason.",
leaseKey, e);
dropLease(currentShardLease, leaseKey);
throwOnApplicationException(leaseKey, leaseLostAction, scope, startTime);
}
} else {
throwOnApplicationException(leaseLostAction, scope, startTime);
throwOnApplicationException(leaseKey, leaseLostAction, scope, startTime);
}

log.debug("Shutting down retrieval strategy for shard {}.", leaseKeyProvider.apply(shardInfo));
log.debug("Shutting down retrieval strategy for shard {}.", leaseKey);
recordsPublisher.shutdown();
log.debug("Record processor completed shutdown() for shard {}", leaseKeyProvider.apply(shardInfo));
log.debug("Record processor completed shutdown() for shard {}", leaseKey);

return new TaskResult(null);
} catch (Exception e) {
if (e instanceof CustomerApplicationException) {
log.error("Shard {}: Application exception. ", leaseKeyProvider.apply(shardInfo), e);
log.error("Shard {}: Application exception.", leaseKey, e);
} else {
log.error("Shard {}: Caught exception: ", leaseKeyProvider.apply(shardInfo), e);
log.error("Shard {}: Caught exception:", leaseKey, e);
}
exception = e;
// backoff if we encounter an exception.
try {
Thread.sleep(this.backoffTimeMillis);
} catch (InterruptedException ie) {
log.debug("Shard {}: Interrupted sleep", leaseKeyProvider.apply(shardInfo), ie);
log.debug("Shard {}: Interrupted sleep", leaseKey, ie);
}

return new TaskResult(e);
}
} finally {
MetricsUtil.endScope(scope);
}

return new TaskResult(exception);
}

// Involves persisting child shard info, attempt to checkpoint and enqueueing lease for cleanup.
private void takeShardEndAction(Lease currentShardLease,
MetricsScope scope, long startTime)
final String leaseKey, MetricsScope scope, long startTime)
throws DependencyException, ProvisionedThroughputException, InvalidStateException,
CustomerApplicationException {
// Create new lease for the child shards if they don't exist.
Expand All @@ -177,7 +183,7 @@ private void takeShardEndAction(Lease currentShardLease,
// In this case, KinesisDataFetcher and FanOutRecordsPublisher will send out SHARD_END signal to trigger a shutdown task with empty list of childShards.
// This scenario could happen when customer deletes the stream while leaving the KCL application running.
if (currentShardLease == null) {
throw new InvalidStateException(leaseKeyProvider.apply(shardInfo)
throw new InvalidStateException(leaseKey
+ " : Lease not owned by the current worker. Leaving ShardEnd handling to new owner.");
}
if (!CollectionUtils.isNullOrEmpty(childShards)) {
Expand All @@ -189,7 +195,7 @@ private void takeShardEndAction(Lease currentShardLease,
if (!leaseCleanupManager.isEnqueuedForDeletion(leasePendingDeletion)) {
boolean isSuccess = false;
try {
isSuccess = attemptShardEndCheckpointing(scope, startTime);
isSuccess = attemptShardEndCheckpointing(leaseKey, scope, startTime);
} finally {
// Check if either the shard end ddb persist is successful or
// if childshards is empty. When child shards is empty then either it is due to
Expand All @@ -202,80 +208,90 @@ private void takeShardEndAction(Lease currentShardLease,
}
}

private boolean attemptShardEndCheckpointing(MetricsScope scope, long startTime)
private boolean attemptShardEndCheckpointing(final String leaseKey, MetricsScope scope, long startTime)
throws DependencyException, ProvisionedThroughputException, InvalidStateException,
CustomerApplicationException {
final Lease leaseFromDdb = Optional.ofNullable(leaseCoordinator.leaseRefresher().getLease(leaseKeyProvider.apply(shardInfo)))
.orElseThrow(() -> new InvalidStateException("Lease for shard " + leaseKeyProvider.apply(shardInfo) + " does not exist."));
final Lease leaseFromDdb = Optional.ofNullable(leaseCoordinator.leaseRefresher().getLease(leaseKey))
.orElseThrow(() -> new InvalidStateException("Lease for shard " + leaseKey + " does not exist."));
if (!leaseFromDdb.checkpoint().equals(ExtendedSequenceNumber.SHARD_END)) {
// Call the shardRecordsProcessor to checkpoint with SHARD_END sequence number.
// The shardEnded is implemented by customer. We should validate if the SHARD_END checkpointing is successful after calling shardEnded.
throwOnApplicationException(() -> applicationCheckpointAndVerification(), scope, startTime);
// The shardEnded is implemented by customer. We should validate if the SHARD_END checkpointing is
// successful after calling shardEnded.
throwOnApplicationException(leaseKey, () -> applicationCheckpointAndVerification(leaseKey),
scope, startTime);
}
return true;
}

private void applicationCheckpointAndVerification() {
private void applicationCheckpointAndVerification(final String leaseKey) {
recordProcessorCheckpointer
.sequenceNumberAtShardEnd(recordProcessorCheckpointer.largestPermittedCheckpointValue());
recordProcessorCheckpointer.largestPermittedCheckpointValue(ExtendedSequenceNumber.SHARD_END);
shardRecordProcessor.shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build());
final ExtendedSequenceNumber lastCheckpointValue = recordProcessorCheckpointer.lastCheckpointValue();
if (lastCheckpointValue == null
|| !lastCheckpointValue.equals(ExtendedSequenceNumber.SHARD_END)) {
if (!ExtendedSequenceNumber.SHARD_END.equals(lastCheckpointValue)) {
throw new IllegalArgumentException("Application didn't checkpoint at end of shard "
+ leaseKeyProvider.apply(shardInfo) + ". Application must checkpoint upon shard end. " +
+ leaseKey + ". Application must checkpoint upon shard end. " +
"See ShardRecordProcessor.shardEnded javadocs for more information.");
}
}

private void throwOnApplicationException(Runnable action, MetricsScope metricsScope, final long startTime) throws CustomerApplicationException {
private void throwOnApplicationException(final String leaseKey, Runnable action, MetricsScope metricsScope,
final long startTime)
throws CustomerApplicationException {
try {
action.run();
} catch (Exception e) {
throw new CustomerApplicationException("Customer application throws exception for shard " + leaseKeyProvider.apply(shardInfo) +": ", e);
throw new CustomerApplicationException("Customer application throws exception for shard " + leaseKey + ": ", e);
} finally {
MetricsUtil.addLatency(metricsScope, RECORD_PROCESSOR_SHUTDOWN_METRIC, startTime, MetricsLevel.SUMMARY);
}
}

private void createLeasesForChildShardsIfNotExist(MetricsScope scope)
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
final LeaseRefresher leaseRefresher = leaseCoordinator.leaseRefresher();

// For child shard resulted from merge of two parent shards, verify if both the parents are either present or
// not present in the lease table before creating the lease entry.
if (!CollectionUtils.isNullOrEmpty(childShards) && childShards.size() == 1) {
if (childShards.size() == 1) {
final ChildShard childShard = childShards.get(0);
final List<String> parentLeaseKeys = childShard.parentShards().stream()
.map(parentShardId -> ShardInfo.getLeaseKey(shardInfo, parentShardId)).collect(Collectors.toList());
if (parentLeaseKeys.size() != 2) {
MetricsUtil.addCount(scope, "MissingMergeParent", 1, MetricsLevel.SUMMARY);
throw new InvalidStateException("Shard " + shardInfo.shardId() + "'s only child shard " + childShard
+ " does not contain other parent information.");
} else {
boolean isValidLeaseTableState =
Objects.isNull(leaseCoordinator.leaseRefresher().getLease(parentLeaseKeys.get(0))) == Objects
.isNull(leaseCoordinator.leaseRefresher().getLease(parentLeaseKeys.get(1)));
if (!isValidLeaseTableState) {
if (!isOneInNProbability(RETRY_RANDOM_MAX_RANGE)) {
throw new BlockedOnParentShardException(
"Shard " + shardInfo.shardId() + "'s only child shard " + childShard
+ " has partial parent information in lease table. Hence deferring lease creation of child shard.");
} else {
throw new InvalidStateException(
"Shard " + shardInfo.shardId() + "'s only child shard " + childShard
+ " has partial parent information in lease table. Hence deferring lease creation of child shard.");
}
}

final Lease parentLease0 = leaseRefresher.getLease(parentLeaseKeys.get(0));
final Lease parentLease1 = leaseRefresher.getLease(parentLeaseKeys.get(1));
if (Objects.isNull(parentLease0) != Objects.isNull(parentLease1)) {
MetricsUtil.addCount(scope, "MissingMergeParentLease", 1, MetricsLevel.SUMMARY);
final String message = "Shard " + shardInfo.shardId() + "'s only child shard " + childShard +
" has partial parent information in lease table: [parent0=" + parentLease0 +
", parent1=" + parentLease1 + "]. Hence deferring lease creation of child shard.";
if (isOneInNProbability(RETRY_RANDOM_MAX_RANGE)) {
// abort further attempts and drop the lease; lease will
// be reassigned
throw new InvalidStateException(message);
} else {
// initiate a Thread.sleep(...) and keep the lease;
// keeping the lease decreases churn of lease reassignments
throw new BlockedOnParentShardException(message);
}
}
}

for(ChildShard childShard : childShards) {
final String leaseKey = ShardInfo.getLeaseKey(shardInfo, childShard.shardId());
if(leaseCoordinator.leaseRefresher().getLease(leaseKey) == null) {
if (leaseRefresher.getLease(leaseKey) == null) {
log.debug("{} - Shard {} - Attempting to create lease for child shard {}", shardDetector.streamIdentifier(), shardInfo.shardId(), leaseKey);
final Lease leaseToCreate = hierarchicalShardSyncer.createLeaseForChildShard(childShard, shardDetector.streamIdentifier());
final long startTime = System.currentTimeMillis();
boolean success = false;
try {
leaseCoordinator.leaseRefresher().createLeaseIfNotExists(leaseToCreate);
leaseRefresher.createLeaseIfNotExists(leaseToCreate);
success = true;
} finally {
MetricsUtil.addSuccessAndLatency(scope, "CreateLease", success, startTime, MetricsLevel.DETAILED);
Expand All @@ -295,8 +311,7 @@ private void createLeasesForChildShardsIfNotExist(MetricsScope scope)
*/
@VisibleForTesting
boolean isOneInNProbability(int n) {
Random r = new Random();
return 1 == r.nextInt((n - 1) + 1) + 1;
return 0 == RANDOM.nextInt(n);
}

private void updateLeaseWithChildShards(Lease currentLease)
Expand Down Expand Up @@ -324,10 +339,9 @@ public ShutdownReason getReason() {
return reason;
}

private void dropLease(Lease currentLease) {
private void dropLease(Lease currentLease, final String leaseKey) {
if (currentLease == null) {
log.warn("Shard {}: Unable to find the lease for shard. Will shutdown the shardConsumer directly.", leaseKeyProvider.apply(shardInfo));
return;
log.warn("Shard {}: Unable to find the lease for shard. Will shutdown the shardConsumer directly.", leaseKey);
} else {
leaseCoordinator.dropLease(currentLease);
log.info("Dropped lease for shutting down ShardConsumer: " + currentLease.leaseKey());
Expand Down
Loading

0 comments on commit 6be92dc

Please sign in to comment.