diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index a540828000e1c..beccda60cc1b6 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -1682,8 +1682,9 @@ void createNewOpAddEntryForNewLedger() { if (existsOp.ledger != null) { existsOp = existsOp.duplicateAndClose(currentLedgerTimeoutTriggered); } else { - // This scenario should not happen. - log.warn("[{}] An OpAddEntry's ledger is empty.", name); + // It may happen when the following operations execute at the same time, so it is expected. + // - Adding entry. + // - Switching ledger. existsOp.setTimeoutTriggered(currentLedgerTimeoutTriggered); } existsOp.setLedger(currentLedger); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerMLInitializeDelayTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerMLInitializeDelayTest.java index ab4e063ae3d83..7c7665a5bd3e4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerMLInitializeDelayTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerMLInitializeDelayTest.java @@ -20,12 +20,19 @@ import com.carrotsearch.hppc.ObjectSet; import java.time.Duration; +import java.util.ArrayList; +import java.util.LinkedHashSet; import java.util.List; +import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.service.Dispatcher; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.common.naming.TopicName; import org.awaitility.Awaitility; import org.awaitility.reflect.WhiteboxImpl; @@ -105,4 +112,67 @@ public void testConsumerListMatchesConsumerSet() throws Exception { // cleanup. client.close(); } + + @Test(timeOut = 30 * 1000) + public void testConcurrentlyOfPublishAndSwitchLedger() throws Exception { + final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); + final String subscription = "s1"; + admin.topics().createNonPartitionedTopic(topicName); + admin.topics().createSubscription(topicName, subscription, MessageId.earliest); + // Make ledger switches faster. + PersistentTopic persistentTopic = + (PersistentTopic) pulsar.getBrokerService().getTopic(topicName, false).join().get(); + ManagedLedgerConfig config = persistentTopic.getManagedLedger().getConfig(); + config.setMaxEntriesPerLedger(2); + config.setMinimumRolloverTime(0, TimeUnit.MILLISECONDS); + // Inject a delay for switching ledgers, so publishing requests will be push in to the pending queue. + AtomicInteger delayTimes = new AtomicInteger(); + mockZooKeeper.delay(10, (op, s) -> { + if (op.toString().equals("SET") && s.contains(TopicName.get(topicName).getPersistenceNamingEncoding())) { + return delayTimes.incrementAndGet() == 1; + } + return false; + }); + Producer producer = pulsarClient.newProducer(Schema.STRING).topic(topicName).enableBatching(false) + .create(); + List> sendRequests = new ArrayList<>(); + List msgsSent = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + String msg = i + ""; + sendRequests.add(producer.sendAsync(i + "")); + msgsSent.add(msg); + } + // Verify: + // - All messages were sent. + // - The order of messages are correct. + Set msgIds = new LinkedHashSet<>(); + MessageIdImpl previousMsgId = null; + for (CompletableFuture msgId : sendRequests) { + Assert.assertNotNull(msgId.join()); + MessageIdImpl messageIdImpl = (MessageIdImpl) msgId.join(); + if (previousMsgId != null) { + Assert.assertTrue(messageIdImpl.compareTo(previousMsgId) > 0); + } + msgIds.add(String.format("%s:%s", messageIdImpl.getLedgerId(), messageIdImpl.getEntryId())); + previousMsgId = messageIdImpl; + } + Assert.assertEquals(msgIds.size(), 100); + log.info("messages were sent: {}", msgIds.toString()); + List msgsReceived = new ArrayList<>(); + Consumer consumer = pulsarClient.newConsumer(Schema.STRING).topic(topicName) + .subscriptionName(subscription).subscribe(); + while (true) { + Message receivedMsg = consumer.receive(2, TimeUnit.SECONDS); + if (receivedMsg == null) { + break; + } + msgsReceived.add(receivedMsg.getValue()); + } + Assert.assertEquals(msgsReceived, msgsSent); + + // cleanup. + consumer.close(); + producer.close(); + admin.topics().delete(topicName); + } }