Skip to content

Commit

Permalink
[fix][transaction] Fix some tests and clean up logs (streamnative#1966)
Browse files Browse the repository at this point in the history
### Modifications

Fix some tests.


Co-authored-by: Enrico Olivelli <[email protected]>
  • Loading branch information
gaoran10 and eolivelli authored Aug 3, 2023
1 parent 1fd3bdb commit 3b22e79
Show file tree
Hide file tree
Showing 13 changed files with 336 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -911,8 +911,13 @@ public void removeGroupsForPartition(int offsetsPartition,
TopicPartition topicPartition = new TopicPartition(
GROUP_METADATA_TOPIC_NAME, offsetsPartition
);
log.info("Scheduling unloading of offsets and group metadata from {}", topicPartition);
scheduler.submit(() -> removeGroupsAndOffsets(offsetsPartition, onGroupUnloaded));

if (scheduler.isShutdown()) {
log.info("Broker is shutting down, skip unloading of offsets and group metadata from {}", topicPartition);
} else {
log.info("Scheduling unloading of offsets and group metadata from {}", topicPartition);
scheduler.submit(() -> removeGroupsAndOffsets(offsetsPartition, onGroupUnloaded));
}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,8 +350,10 @@ private void completeInitProducer(String transactionalId,
new TransactionStateManager.ResponseCallback() {
@Override
public void complete() {
log.info("Initialized transactionalId {} with producerId {} and producer "
+ "epoch {} on partition {}-{}", transactionalId,
log.info("{} Initialized transactionalId {} with producerId {} and producer "
+ "epoch {} on partition {}-{}",
namespacePrefixForMetadata,
transactionalId,
newMetadata.getProducerId(), newMetadata.getProducerEpoch(),
Topic.TRANSACTION_STATE_TOPIC_NAME,
txnManager.partitionFor(transactionalId));
Expand All @@ -363,8 +365,8 @@ public void complete() {

@Override
public void fail(Errors errors) {
log.info("Returning {} error code to client for {}'s InitProducerId "
+ "request", errors, transactionalId);
log.info("{} Returning {} error code to client for {}'s InitProducerId "
+ "request", namespacePrefixForMetadata, errors, transactionalId);
responseCallback.accept(initTransactionError(errors));
}
}, errors -> true);
Expand Down Expand Up @@ -406,7 +408,13 @@ private CompletableFuture<Either<Errors, EpochAndTxnTransitMetadata>> prepareIni
Optional<ProducerIdAndEpoch> expectedProducerIdAndEpoch) {
CompletableFuture<Either<Errors, EpochAndTxnTransitMetadata>> resultFuture = new CompletableFuture<>();
if (txnMetadata.pendingTransitionInProgress()) {
// return a retriable exception to let the client backoff and retry
// return a retryable exception to let the client backoff and retry
// it is okay to log this here, this is not on the write path
// the client calls initProducer only at bootstrap
log.info("{} Failed initProducer for {}, pending transition to {}. {}",
namespacePrefixForMetadata,
transactionalId, txnMetadata.getPendingState(),
txnMetadata);
resultFuture.complete(Either.left(Errors.CONCURRENT_TRANSACTIONS));
return resultFuture;
}
Expand Down Expand Up @@ -602,8 +610,8 @@ private void endTransaction(String transactionalId,
}

if (!isFromClient) {
log.info("endTransaction - before endTxnPreAppend {} metadata {}",
transactionalId, epochAndMetadata.get().getTransactionMetadata());
log.info("{} endTransaction - before endTxnPreAppend {} metadata {}",
namespacePrefixForMetadata, transactionalId, epochAndMetadata.get().getTransactionMetadata());
}

Either<Errors, TxnTransitMetadata> preAppendResult = endTxnPreAppend(
Expand All @@ -630,14 +638,16 @@ public void complete() {
public void fail(Errors errors) {

if (!isFromClient) {
log.info("endTransaction - AFTER failed appendTransactionToLog {} metadata {}"
log.info("{} endTransaction - AFTER failed appendTransactionToLog {} metadata {}"
+ "isEpochFence {}",
namespacePrefixForMetadata,
transactionalId, epochAndMetadata.get().getTransactionMetadata(), isEpochFence);
}

log.info("Aborting sending of transaction markers and returning {} error to client for {}'s "
log.info("{} Aborting sending of transaction markers and returning {} error to client for {}'s "
+ "EndTransaction request of {}, since appending {} to transaction log with "
+ "coordinator epoch {} failed", errors, transactionalId, txnMarkerResult,
+ "coordinator epoch {} failed",
namespacePrefixForMetadata, errors, transactionalId, txnMarkerResult,
preAppendResult.getRight(), coordinatorEpoch);

if (isEpochFence.get()) {
Expand Down Expand Up @@ -844,8 +854,9 @@ private void completeEndTxn(String transactionalId,
}

if (errorsOrPreSendResult.isLeft()) {
log.info("Aborting sending of transaction markers after appended {} to transaction log "
log.info("{} Aborting sending of transaction markers after appended {} to transaction log "
+ "and returning {} error to client for {}'s EndTransaction request",
namespacePrefixForMetadata,
transactionalId, txnMarkerResult, errorsOrPreSendResult.getLeft());
callback.accept(errorsOrPreSendResult.getLeft());
return;
Expand Down Expand Up @@ -915,7 +926,7 @@ protected void abortTimedOutTransactions() {
* Startup logic executed at the same time when the server starts up.
*/
public CompletableFuture<Void> startup(boolean enableTransactionalIdExpiration) {
log.info("Starting up transaction coordinator ...");
log.info("{} Starting up transaction coordinator ...", namespacePrefixForMetadata);

// Abort timeout transactions
scheduler.scheduleAtFixedRate(
Expand All @@ -928,7 +939,7 @@ public CompletableFuture<Void> startup(boolean enableTransactionalIdExpiration)
txnManager.startup(enableTransactionalIdExpiration);

return this.producerIdManager.initialize().thenCompose(ignored -> {
log.info("Startup transaction coordinator complete.");
log.info("{} Startup transaction coordinator complete.", namespacePrefixForMetadata);
return CompletableFuture.completedFuture(null);
});
}
Expand All @@ -938,14 +949,13 @@ public CompletableFuture<Void> startup(boolean enableTransactionalIdExpiration)
* Ordering of actions should be reversed from the startup process.
*/
public void shutdown() {
log.info("Shutting down transaction coordinator ...");
log.info("{} Shutting down transaction coordinator ...", namespacePrefixForMetadata);
producerIdManager.shutdown();
txnManager.shutdown();
transactionMarkerChannelManager.close();
producerStateManagerSnapshotBuffer.shutdown();
scheduler.shutdown();
// TODO shutdown txn
log.info("Shutdown transaction coordinator complete.");
log.info("{} Shutdown transaction coordinator complete.", namespacePrefixForMetadata);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import lombok.AllArgsConstructor;
Expand Down Expand Up @@ -138,6 +139,8 @@ public class PartitionLog {

private volatile String kafkaTopicUUID;

private volatile AtomicBoolean unloaded = new AtomicBoolean();

public PartitionLog(KafkaServiceConfiguration kafkaConfig,
RequestStats requestStats,
Time time,
Expand Down Expand Up @@ -192,6 +195,10 @@ public boolean isInitialisationFailed() {
return initFuture.isDone() && initFuture.isCompletedExceptionally();
}

public void markAsUnloaded() {
unloaded.set(true);
}

private CompletableFuture<Void> loadTopicProperties() {
CompletableFuture<Optional<PersistentTopic>> persistentTopicFuture =
kafkaTopicLookupService.getTopic(fullPartitionName, this);
Expand Down Expand Up @@ -1080,13 +1087,21 @@ public CompletableFuture<?> updatePurgeAbortedTxnsOffset() {
// nothing to do
return CompletableFuture.completedFuture(null);
}
if (unloaded.get()) {
// nothing to do
return CompletableFuture.completedFuture(null);
}
return fetchOldestAvailableIndexFromTopic()
.thenAccept(offset ->
producerStateManager.updateAbortedTxnsPurgeOffset(offset));

}

public CompletableFuture<Long> fetchOldestAvailableIndexFromTopic() {
if (unloaded.get()) {
return FutureUtil.failedFuture(new NotLeaderOrFollowerException());
}

final CompletableFuture<Long> future = new CompletableFuture<>();

// The future that is returned by getTopicConsumerManager is always completed normally
Expand All @@ -1102,15 +1117,17 @@ public CompletableFuture<Long> fetchOldestAvailableIndexFromTopic() {
}
});

ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) tcm.getManagedLedger();
if (managedLedger.getNumberOfEntries() == 0) {
ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) persistentTopic.getManagedLedger();
long numberOfEntries = managedLedger.getNumberOfEntries();
if (numberOfEntries == 0) {
long currentOffset = MessageMetadataUtils.getCurrentOffset(managedLedger);
log.info("First offset for topic {} is {} as the topic is empty (numberOfEntries=0)",
fullPartitionName, currentOffset);
future.complete(currentOffset);

return future;
}

// this is a DUMMY entry with -1
PositionImpl firstPosition = managedLedger.getFirstPosition();
// look for the first entry with data
Expand Down Expand Up @@ -1141,6 +1158,7 @@ public void readEntryFailed(ManagedLedgerException exception, Object ctx) {

}

@VisibleForTesting
public CompletableFuture<?> takeProducerSnapshot() {
return initFuture.thenCompose((___) -> {
// snapshot can be taken only on the same thread that is used for writes
Expand All @@ -1152,6 +1170,7 @@ public CompletableFuture<?> takeProducerSnapshot() {
});
}

@VisibleForTesting
public CompletableFuture<Long> forcePurgeAbortTx() {
return initFuture.thenCompose((___) -> {
// purge can be taken only on the same thread that is used for writes
Expand Down Expand Up @@ -1365,4 +1384,7 @@ private void decodeEntriesForRecovery(final CompletableFuture<DecodeResult> futu
}
}

public boolean isUnloaded() {
return unloaded.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ public PartitionLog getLog(TopicPartition topicPartition, String namespacePrefix
if (error != null) {
// in case of failure we have to remove the CompletableFuture from the map
log.error("Failed to recovery of {}", key, error);
partitionLog.markAsUnloaded();
logMap.remove(key, partitionLog);
}
});
Expand All @@ -93,14 +94,19 @@ public PartitionLog getLog(TopicPartition topicPartition, String namespacePrefix
});
if (res.isInitialisationFailed()) {
log.error("Failed to initialize of {}", kopTopic);
res.markAsUnloaded();
logMap.remove(kopTopic, res);
}
return res;
}

public PartitionLog removeLog(String topicName) {
log.info("removePartitionLog {}", topicName);
return logMap.remove(topicName);
PartitionLog exists = logMap.remove(topicName);
if (exists != null) {
exists.markAsUnloaded();
}
return exists;
}

public int size() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ private CompletableFuture<Void> applySnapshotAndRecover(ProducerStateManagerSnap
return result;
}

@VisibleForTesting
public CompletableFuture<ProducerStateManagerSnapshot> takeSnapshot(Executor executor) {
CompletableFuture<ProducerStateManagerSnapshot> result = new CompletableFuture<>();
executor.execute(new SafeRunnable() {
Expand All @@ -139,8 +140,10 @@ public void safeRun() {
if (error != null) {
result.completeExceptionally(error);
} else {
log.info("Snapshot for {} ({}) taken at offset {}",
topicPartition, kafkaTopicUUID, snapshot.getOffset());
if (log.isDebugEnabled()) {
log.debug("Snapshot for {} ({}) taken at offset {}",
topicPartition, kafkaTopicUUID, snapshot.getOffset());
}
result.complete(snapshot);
}
});
Expand Down Expand Up @@ -169,7 +172,7 @@ void maybeTakeSnapshot(Executor executor) {

void updateAbortedTxnsPurgeOffset(long abortedTxnsPurgeOffset) {
if (log.isDebugEnabled()) {
log.debug("{} updateAbortedTxnsPurgeOffset {}", topicPartition, abortedTxnsPurgeOffset);
log.debug("{} updateAbortedTxnsPurgeOffset offset={}", topicPartition, abortedTxnsPurgeOffset);
}
if (abortedTxnsPurgeOffset < 0) {
return;
Expand All @@ -183,7 +186,11 @@ long maybePurgeAbortedTx() {
}
long now = System.currentTimeMillis();
long deltaFromLast = (now - lastPurgeAbortedTxnTime) / 1000;
if (deltaFromLast / 1000 <= kafkaTxnPurgeAbortedTxnIntervalSeconds) {
if (log.isDebugEnabled()) {
log.debug("maybePurgeAbortedTx deltaFromLast {} vs kafkaTxnPurgeAbortedTxnIntervalSeconds {} ",
deltaFromLast, kafkaTxnPurgeAbortedTxnIntervalSeconds);
}
if (deltaFromLast < kafkaTxnPurgeAbortedTxnIntervalSeconds) {
return 0;
}
lastPurgeAbortedTxnTime = now;
Expand Down Expand Up @@ -318,6 +325,10 @@ public long purgeAbortedTxns(long offset) {
if (toRemove) {
log.info("Transaction {} can be removed (lastOffset {} < {})", tx, tx.lastOffset(), offset);
count.incrementAndGet();
} else {
if (log.isDebugEnabled()) {
log.info("Transaction {} cannot be removed (lastOffset >= {})", tx, tx.lastOffset(), offset);
}
}
return toRemove;
});
Expand Down

This file was deleted.

4 changes: 4 additions & 0 deletions test-listener/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@
<groupId>${pulsar.group.id}</groupId>
<artifactId>pulsar-common</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>

</project>
Loading

0 comments on commit 3b22e79

Please sign in to comment.