Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][misc] Sync commits from apache into 3.1_ds #311

Merged
merged 3 commits into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -148,4 +148,13 @@ public boolean containsStickyKeyHashes(Set<Integer> stickyKeyHashes) {
public NavigableSet<PositionImpl> getMessagesToReplayNow(int maxMessagesToRead) {
return messagesToRedeliver.items(maxMessagesToRead, PositionImpl::new);
}

/**
* Get the number of messages registered for replay in the redelivery controller.
*
* @return number of messages
*/
public int size() {
return messagesToRedeliver.size();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1270,5 +1270,9 @@ public Subscription getSubscription() {
return subscription;
}

public long getNumberOfMessagesInReplay() {
return redeliveryMessages.size();
}

private static final Logger log = LoggerFactory.getLogger(PersistentDispatcherMultipleConsumers.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -1223,6 +1223,7 @@ public CompletableFuture<SubscriptionStatsImpl> getStatsAsync(Boolean getPrecise
subStats.unackedMessages = d.getTotalUnackedMessages();
subStats.blockedSubscriptionOnUnackedMsgs = d.isBlockedDispatcherOnUnackedMsgs();
subStats.msgDelayed = d.getNumberOfDelayedMessages();
subStats.msgInReplay = d.getNumberOfMessagesInReplay();
}
}
subStats.msgBacklog = getNumberOfEntriesInBacklog(getPreciseBacklog);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1689,6 +1689,23 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) {
return closeFuture;
}

private boolean isClosed() {
if (closeFutures == null) {
return false;
}
if (closeFutures.notWaitDisconnectClients != null
&& closeFutures.notWaitDisconnectClients.isDone()
&& !closeFutures.notWaitDisconnectClients.isCompletedExceptionally()) {
return true;
}
if (closeFutures.waitDisconnectClients != null
&& closeFutures.waitDisconnectClients.isDone()
&& !closeFutures.waitDisconnectClients.isCompletedExceptionally()) {
return true;
}
return false;
}

private void disposeTopic(CompletableFuture<?> closeFuture) {
brokerService.removeTopicFromCache(PersistentTopic.this)
.thenRun(() -> {
Expand All @@ -1711,6 +1728,9 @@ private void disposeTopic(CompletableFuture<?> closeFuture) {

@VisibleForTesting
CompletableFuture<Void> checkReplicationAndRetryOnFailure() {
if (isClosed()) {
return CompletableFuture.completedFuture(null);
}
CompletableFuture<Void> result = new CompletableFuture<Void>();
checkReplication().thenAccept(res -> {
result.complete(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public class AggregatedNamespaceStats {
public ManagedLedgerStats managedLedgerStats = new ManagedLedgerStats();
public long msgBacklog;
public long msgDelayed;
public long msgInReplay;

public long ongoingTxnCount;
public long abortedTxnCount;
Expand Down Expand Up @@ -141,10 +142,12 @@ void updateStats(TopicStats stats) {
AggregatedSubscriptionStats subsStats =
subscriptionStats.computeIfAbsent(n, k -> new AggregatedSubscriptionStats());
msgDelayed += as.msgDelayed;
msgInReplay += as.msgInReplay;
subsStats.blockedSubscriptionOnUnackedMsgs = as.blockedSubscriptionOnUnackedMsgs;
subsStats.msgBacklog += as.msgBacklog;
subsStats.msgBacklogNoDelayed += as.msgBacklogNoDelayed;
subsStats.msgDelayed += as.msgDelayed;
subsStats.msgInReplay += as.msgInReplay;
subsStats.msgRateRedeliver += as.msgRateRedeliver;
subsStats.unackedMessages += as.unackedMessages;
subsStats.filterProcessedMsgCount += as.filterProcessedMsgCount;
Expand Down Expand Up @@ -200,6 +203,7 @@ public void reset() {

msgBacklog = 0;
msgDelayed = 0;
msgInReplay = 0;
ongoingTxnCount = 0;
abortedTxnCount = 0;
committedTxnCount = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ public class AggregatedSubscriptionStats {

public long msgDelayed;

public long msgInReplay;

long msgOutCounter;

long bytesOutCounter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ private static void aggregateTopicStats(TopicStats stats, SubscriptionStatsImpl
subsStats.msgOutCounter = subscriptionStats.msgOutCounter;
subsStats.msgBacklog = subscriptionStats.msgBacklog;
subsStats.msgDelayed = subscriptionStats.msgDelayed;
subsStats.msgInReplay = subscriptionStats.msgInReplay;
subsStats.msgRateExpired = subscriptionStats.msgRateExpired;
subsStats.totalMsgExpired = subscriptionStats.totalMsgExpired;
subsStats.msgBacklogNoDelayed = subsStats.msgBacklog - subsStats.msgDelayed;
Expand Down Expand Up @@ -412,6 +413,8 @@ private static void printNamespaceStats(PrometheusMetricStreams stream, Aggregat

writeMetric(stream, "pulsar_subscription_delayed", stats.msgDelayed, cluster, namespace);

writeMetric(stream, "pulsar_subscription_in_replay", stats.msgInReplay, cluster, namespace);

writeMetric(stream, "pulsar_delayed_message_index_size_bytes", stats.delayedMessageIndexSizeInBytes, cluster,
namespace);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,8 @@ public static void printTopicStats(PrometheusMetricStreams stream, TopicStats st
subsStats.msgBacklogNoDelayed, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel);
writeSubscriptionMetric(stream, "pulsar_subscription_delayed",
subsStats.msgDelayed, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel);
writeSubscriptionMetric(stream, "pulsar_subscription_in_replay",
subsStats.msgInReplay, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel);
writeSubscriptionMetric(stream, "pulsar_subscription_msg_rate_redeliver",
subsStats.msgRateRedeliver, cluster, namespace, topic, sub, splitTopicAndPartitionIndexLabel);
writeSubscriptionMetric(stream, "pulsar_subscription_unacked_messages",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import io.netty.util.concurrent.FastThreadLocalThread;
Expand All @@ -43,6 +44,7 @@
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -1249,4 +1251,46 @@ public void testReplicationCountMetrics() throws Exception {
admin1.topics().delete(topicName, false);
admin2.topics().delete(topicName, false);
}

/**
* This test used to confirm the "start replicator retry task" will be skipped after the topic is closed.
*/
@Test
public void testCloseTopicAfterStartReplicationFailed() throws Exception {
Field fieldTopicNameCache = TopicName.class.getDeclaredField("cache");
fieldTopicNameCache.setAccessible(true);
ConcurrentHashMap<String, TopicName> topicNameCache =
(ConcurrentHashMap<String, TopicName>) fieldTopicNameCache.get(null);
final String topicName = BrokerTestUtil.newUniqueName("persistent://" + nonReplicatedNamespace + "/tp_");
// 1.Create topic, does not enable replication now.
admin1.topics().createNonPartitionedTopic(topicName);
Producer<byte[]> producer1 = client1.newProducer().topic(topicName).create();
PersistentTopic persistentTopic =
(PersistentTopic) pulsar1.getBrokerService().getTopic(topicName, false).join().get();

// We inject an error to make "start replicator" to fail.
AsyncLoadingCache<String, Boolean> existsCache =
WhiteboxImpl.getInternalState(pulsar1.getConfigurationMetadataStore(), "existsCache");
String path = "/admin/partitioned-topics/" + TopicName.get(topicName).getPersistenceNamingEncoding();
existsCache.put(path, CompletableFuture.completedFuture(true));

// 2.Enable replication and unload topic after failed to start replicator.
admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1, cluster2));
Thread.sleep(3000);
producer1.close();
existsCache.synchronous().invalidate(path);
admin1.topics().unload(topicName);
// Verify: the "start replicator retry task" will be skipped after the topic is closed.
// - Retry delay is "PersistentTopic.POLICY_UPDATE_FAILURE_RETRY_TIME_SECONDS": 60s, so wait for 70s.
// - Since the topic should not be touched anymore, we use "TopicName" to confirm whether it be used by
// Replication again.
Thread.sleep(10 * 1000);
topicNameCache.remove(topicName);
Thread.sleep(60 * 1000);
assertTrue(!topicNameCache.containsKey(topicName));

// cleanup.
admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1));
admin1.topics().delete(topicName, false);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.api;

import static org.testng.Assert.assertTrue;
import com.google.common.util.concurrent.Uninterruptibles;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import lombok.Cleanup;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.common.naming.TopicName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(groups = "broker-api")
public class MessageListenerExecutorTest extends ProducerConsumerBase {
private static final Logger log = LoggerFactory.getLogger(MessageListenerExecutorTest.class);

@BeforeClass(alwaysRun = true)
@Override
protected void setup() throws Exception {
super.internalSetup();
super.producerBaseSetup();
}

@AfterClass(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
}

@Override
protected void customizeNewPulsarClientBuilder(ClientBuilder clientBuilder) {
// Set listenerThreads to 1 to reproduce the pr more easily in #22861
clientBuilder.listenerThreads(1);
}

@Test
public void testConsumerMessageListenerExecutorIsolation() throws Exception {
log.info("-- Starting {} test --", methodName);

@Cleanup("shutdownNow")
ExecutorService executor = Executors.newCachedThreadPool();
List<CompletableFuture<Long>> maxConsumeDelayWithDisableIsolationFutures = new ArrayList<>();
int loops = 5;
long consumeSleepTimeMs = 10000;
for (int i = 0; i < loops; i++) {
// The first consumer will consume messages with sleep block 1s,
// and the others will consume messages without sleep block.
// The maxConsumeDelayWithDisableIsolation of all consumers
// should be greater than sleepTimeMs cause by disable MessageListenerExecutor.
CompletableFuture<Long> maxConsumeDelayFuture = startConsumeAndComputeMaxConsumeDelay(
"persistent://my-property/my-ns/testConsumerMessageListenerDisableIsolation-" + i,
"my-sub-testConsumerMessageListenerDisableIsolation-" + i,
i == 0 ? Duration.ofMillis(consumeSleepTimeMs) : Duration.ofMillis(0),
false,
executor);
maxConsumeDelayWithDisableIsolationFutures.add(maxConsumeDelayFuture);
}

// ensure all consumers consume messages delay more than consumeSleepTimeMs
boolean allDelayMoreThanConsumeSleepTimeMs = maxConsumeDelayWithDisableIsolationFutures.stream()
.map(CompletableFuture::join)
.allMatch(delay -> delay > consumeSleepTimeMs);
assertTrue(allDelayMoreThanConsumeSleepTimeMs);

List<CompletableFuture<Long>> maxConsumeDelayWhitEnableIsolationFutures = new ArrayList<>();
for (int i = 0; i < loops; i++) {
// The first consumer will consume messages with sleep block 1s,
// and the others will consume messages without sleep block.
// The maxConsumeDelayWhitEnableIsolation of the first consumer
// should be greater than sleepTimeMs, and the others should be
// less than sleepTimeMs, cause by enable MessageListenerExecutor.
CompletableFuture<Long> maxConsumeDelayFuture = startConsumeAndComputeMaxConsumeDelay(
"persistent://my-property/my-ns/testConsumerMessageListenerEnableIsolation-" + i,
"my-sub-testConsumerMessageListenerEnableIsolation-" + i,
i == 0 ? Duration.ofMillis(consumeSleepTimeMs) : Duration.ofMillis(0),
true,
executor);
maxConsumeDelayWhitEnableIsolationFutures.add(maxConsumeDelayFuture);
}

assertTrue(maxConsumeDelayWhitEnableIsolationFutures.get(0).join() > consumeSleepTimeMs);
boolean remainingAlmostNoDelay = maxConsumeDelayWhitEnableIsolationFutures.stream()
.skip(1)
.map(CompletableFuture::join)
.allMatch(delay -> delay < 1000);
assertTrue(remainingAlmostNoDelay);

log.info("-- Exiting {} test --", methodName);
}

private CompletableFuture<Long> startConsumeAndComputeMaxConsumeDelay(String topic, String subscriptionName,
Duration consumeSleepTime,
boolean enableMessageListenerExecutorIsolation,
ExecutorService executorService)
throws Exception {
int numMessages = 2;
final CountDownLatch latch = new CountDownLatch(numMessages);
int numPartitions = 50;
TopicName nonIsolationTopicName = TopicName.get(topic);
admin.topics().createPartitionedTopic(nonIsolationTopicName.toString(), numPartitions);

AtomicLong maxConsumeDelay = new AtomicLong(-1);
ConsumerBuilder<Long> consumerBuilder =
pulsarClient.newConsumer(Schema.INT64)
.topic(nonIsolationTopicName.toString())
.subscriptionName(subscriptionName)
.messageListener((c1, msg) -> {
Assert.assertNotNull(msg, "Message cannot be null");
log.debug("Received message [{}] in the listener", msg.getValue());
c1.acknowledgeAsync(msg);
maxConsumeDelay.set(Math.max(maxConsumeDelay.get(),
System.currentTimeMillis() - msg.getValue()));
if (consumeSleepTime.toMillis() > 0) {
Uninterruptibles.sleepUninterruptibly(consumeSleepTime);
}
latch.countDown();
});

ExecutorService executor = Executors.newSingleThreadExecutor(
new ExecutorProvider.ExtendedThreadFactory(subscriptionName + "listener-executor-", true));
if (enableMessageListenerExecutorIsolation) {
consumerBuilder.messageListenerExecutor((message, runnable) -> executor.execute(runnable));
}

Consumer<Long> consumer = consumerBuilder.subscribe();
ProducerBuilder<Long> producerBuilder = pulsarClient.newProducer(Schema.INT64)
.topic(nonIsolationTopicName.toString());

Producer<Long> producer = producerBuilder.create();
List<Future<MessageId>> futures = new ArrayList<>();

// Asynchronously produce messages
for (int i = 0; i < numMessages; i++) {
Future<MessageId> future = producer.sendAsync(System.currentTimeMillis());
futures.add(future);
}

log.info("Waiting for async publish to complete");
for (Future<MessageId> future : futures) {
future.get();
}

CompletableFuture<Long> maxDelayFuture = new CompletableFuture<>();

CompletableFuture.runAsync(() -> {
try {
latch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}, executorService).whenCompleteAsync((v, ex) -> {
maxDelayFuture.complete(maxConsumeDelay.get());
try {
producer.close();
consumer.close();
executor.shutdownNow();
} catch (PulsarClientException e) {
throw new RuntimeException(e);
}
});

return maxDelayFuture;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ public interface SubscriptionStats {
/** Number of delayed messages currently being tracked. */
long getMsgDelayed();

/** Number of messages registered for replay. */
long getMsgInReplay();

/**
* Number of unacknowledged messages for the subscription, where an unacknowledged message is one that has been
* sent to a consumer but not yet acknowledged. Calculated by summing all {@link ConsumerStats#getUnackedMessages()}
Expand Down
Loading
Loading