diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/PulsarServerException.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/PulsarServerException.java index 2235b9a7128b8..d7c0d0adb3afc 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/PulsarServerException.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/PulsarServerException.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker; import java.io.IOException; +import java.util.concurrent.CompletionException; public class PulsarServerException extends IOException { private static final long serialVersionUID = 1; @@ -44,4 +45,20 @@ public NotFoundException(Throwable t) { super(t); } } + + public static PulsarServerException from(Throwable throwable) { + if (throwable instanceof CompletionException) { + return from(throwable.getCause()); + } + if (throwable instanceof PulsarServerException pulsarServerException) { + return pulsarServerException; + } else { + return new PulsarServerException(throwable); + } + } + + // Wrap this checked exception into a specific unchecked exception + public static CompletionException toUncheckedException(PulsarServerException e) { + return new CompletionException(e); + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index dd66fc957d8c7..334eabeacea62 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -960,7 +960,7 @@ public void start() throws PulsarServerException { state = State.Started; } catch (Exception e) { LOG.error("Failed to start Pulsar service: {}", e.getMessage(), e); - PulsarServerException startException = new PulsarServerException(e); + PulsarServerException startException = PulsarServerException.from(e); readyForIncomingRequestsFuture.completeExceptionally(startException); throw startException; } finally { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java index e0fd738a408e2..af0ea0ea224d1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java @@ -80,7 +80,6 @@ import org.apache.pulsar.broker.loadbalance.extensions.scheduler.SplitScheduler; import org.apache.pulsar.broker.loadbalance.extensions.scheduler.UnloadScheduler; import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore; -import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStoreException; import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStoreFactory; import org.apache.pulsar.broker.loadbalance.extensions.strategy.BrokerSelectionStrategy; import org.apache.pulsar.broker.loadbalance.extensions.strategy.LeastResourceUsageWithWeight; @@ -97,10 +96,7 @@ import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.stats.Metrics; -import org.apache.pulsar.common.util.Backoff; -import org.apache.pulsar.common.util.BackoffBuilder; import org.apache.pulsar.common.util.FutureUtil; -import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.metadata.api.coordination.LeaderElectionState; import org.slf4j.Logger; @@ -123,10 +119,6 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager { public static final long COMPACTION_THRESHOLD = 5 * 1024 * 1024; - public static final int STARTUP_TIMEOUT_SECONDS = 30; - - public static final int MAX_RETRY = 5; - private static final String ELECTION_ROOT = "/loadbalance/extension/leader"; public static final Set INTERNAL_TOPICS = @@ -204,7 +196,7 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager { private final ConcurrentHashMap>> lookupRequests = new ConcurrentHashMap<>(); - private final CompletableFuture initWaiter = new CompletableFuture<>(); + private final CompletableFuture initWaiter = new CompletableFuture<>(); /** * Get all the bundles that are owned by this broker. @@ -331,7 +323,7 @@ public void start() throws PulsarServerException { return; } try { - this.brokerRegistry = new BrokerRegistryImpl(pulsar); + this.brokerRegistry = createBrokerRegistry(pulsar); this.leaderElectionService = new LeaderElectionService( pulsar.getCoordinationService(), pulsar.getBrokerId(), pulsar.getSafeWebServiceAddress(), ELECTION_ROOT, @@ -346,53 +338,14 @@ public void start() throws PulsarServerException { }); }); }); - this.serviceUnitStateChannel = ServiceUnitStateChannelImpl.newInstance(pulsar); + this.serviceUnitStateChannel = createServiceUnitStateChannel(pulsar); this.brokerRegistry.start(); this.splitManager = new SplitManager(splitCounter); this.unloadManager = new UnloadManager(unloadCounter); this.serviceUnitStateChannel.listen(unloadManager); this.serviceUnitStateChannel.listen(splitManager); this.leaderElectionService.start(); - pulsar.runWhenReadyForIncomingRequests(() -> { - Backoff backoff = new BackoffBuilder() - .setInitialTime(100, TimeUnit.MILLISECONDS) - .setMax(STARTUP_TIMEOUT_SECONDS, TimeUnit.SECONDS) - .create(); - int retry = 0; - while (!Thread.currentThread().isInterrupted()) { - try { - brokerRegistry.register(); - this.serviceUnitStateChannel.start(); - break; - } catch (Exception e) { - log.warn("The broker:{} failed to start service unit state channel. Retrying {} th ...", - pulsar.getBrokerId(), ++retry, e); - try { - Thread.sleep(backoff.next()); - } catch (InterruptedException ex) { - log.warn("Interrupted while sleeping."); - // preserve thread's interrupt status - Thread.currentThread().interrupt(); - try { - pulsar.close(); - } catch (PulsarServerException exc) { - log.error("Failed to close pulsar service.", exc); - } - return; - } - failStarting(e); - if (retry >= MAX_RETRY) { - log.error("Failed to start the service unit state channel after retry {} th. " - + "Closing pulsar service.", retry, e); - try { - pulsar.close(); - } catch (PulsarServerException ex) { - log.error("Failed to close pulsar service.", ex); - } - } - } - } - }); + this.antiAffinityGroupPolicyHelper = new AntiAffinityGroupPolicyHelper(pulsar, serviceUnitStateChannel); antiAffinityGroupPolicyHelper.listenFailureDomainUpdate(); @@ -401,15 +354,10 @@ public void start() throws PulsarServerException { SimpleResourceAllocationPolicies policies = new SimpleResourceAllocationPolicies(pulsar); this.isolationPoliciesHelper = new IsolationPoliciesHelper(policies); this.brokerFilterPipeline.add(new BrokerIsolationPoliciesFilter(isolationPoliciesHelper)); - - try { - this.brokerLoadDataStore = LoadDataStoreFactory - .create(pulsar, BROKER_LOAD_DATA_STORE_TOPIC, BrokerLoadData.class); - this.topBundlesLoadDataStore = LoadDataStoreFactory - .create(pulsar, TOP_BUNDLES_LOAD_DATA_STORE_TOPIC, TopBundlesLoadData.class); - } catch (LoadDataStoreException e) { - throw new PulsarServerException(e); - } + this.brokerLoadDataStore = LoadDataStoreFactory + .create(pulsar, BROKER_LOAD_DATA_STORE_TOPIC, BrokerLoadData.class); + this.topBundlesLoadDataStore = LoadDataStoreFactory + .create(pulsar, TOP_BUNDLES_LOAD_DATA_STORE_TOPIC, TopBundlesLoadData.class); this.context = LoadManagerContextImpl.builder() .configuration(conf) @@ -433,6 +381,7 @@ public void start() throws PulsarServerException { pulsar.runWhenReadyForIncomingRequests(() -> { try { + this.serviceUnitStateChannel.start(); var interval = conf.getLoadBalancerReportUpdateMinIntervalMillis(); this.brokerLoadDataReportTask = this.pulsar.getLoadManagerExecutor() @@ -467,38 +416,33 @@ public void start() throws PulsarServerException { MONITOR_INTERVAL_IN_MILLIS, TimeUnit.MILLISECONDS); this.splitScheduler.start(); - this.initWaiter.complete(null); + this.initWaiter.complete(true); this.started = true; log.info("Started load manager."); - } catch (Exception ex) { - failStarting(ex); + } catch (Throwable e) { + failStarting(e); } }); - } catch (Exception ex) { + } catch (Throwable ex) { failStarting(ex); } } - private void failStarting(Exception ex) { - log.error("Failed to start the extensible load balance and close broker registry {}.", - this.brokerRegistry, ex); + private void failStarting(Throwable throwable) { if (this.brokerRegistry != null) { try { - brokerRegistry.unregister(); - } catch (MetadataStoreException e) { - // ignore - } - } - if (this.serviceUnitStateChannel != null) { - try { - serviceUnitStateChannel.close(); - } catch (IOException e) { - // ignore + brokerRegistry.close(); + } catch (PulsarServerException e) { + // If close failed, this broker might still exist in the metadata store. Then it could be found by other + // brokers as an available broker. Hence, print a warning log for it. + log.warn("Failed to close the broker registry: {}", e.getMessage()); } } - initWaiter.completeExceptionally(ex); + initWaiter.complete(false); // exit the background thread gracefully + throw PulsarServerException.toUncheckedException(PulsarServerException.from(throwable)); } + @Override public void initialize(PulsarService pulsar) { this.pulsar = pulsar; @@ -843,7 +787,9 @@ synchronized void playLeader() { boolean becameFollower = false; while (!Thread.currentThread().isInterrupted()) { try { - initWaiter.get(); + if (!initWaiter.get()) { + return; + } if (!serviceUnitStateChannel.isChannelOwner()) { becameFollower = true; break; @@ -893,7 +839,9 @@ synchronized void playFollower() { boolean becameLeader = false; while (!Thread.currentThread().isInterrupted()) { try { - initWaiter.get(); + if (!initWaiter.get()) { + return; + } if (serviceUnitStateChannel.isChannelOwner()) { becameLeader = true; break; @@ -957,7 +905,9 @@ public List getMetrics() { @VisibleForTesting protected void monitor() { try { - initWaiter.get(); + if (!initWaiter.get()) { + return; + } // Monitor role // Periodically check the role in case ZK watcher fails. @@ -1012,4 +962,14 @@ private void closeInternalTopics() { log.warn("Failed to wait for closing internal topics", e); } } + + @VisibleForTesting + protected BrokerRegistry createBrokerRegistry(PulsarService pulsar) { + return new BrokerRegistryImpl(pulsar); + } + + @VisibleForTesting + protected ServiceUnitStateChannel createServiceUnitStateChannel(PulsarService pulsar) { + return new ServiceUnitStateChannelImpl(pulsar); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/LoadManagerFailFastTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/LoadManagerFailFastTest.java new file mode 100644 index 0000000000000..a400bf733e557 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/LoadManagerFailFastTest.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.loadbalance.extensions; + +import java.util.Optional; +import lombok.Cleanup; +import org.apache.pulsar.broker.PulsarServerException; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.loadbalance.LoadManager; +import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel; +import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl; +import org.apache.pulsar.common.util.PortManager; +import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble; +import org.awaitility.Awaitility; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +public class LoadManagerFailFastTest { + + private static final String cluster = "test"; + private final int zkPort = PortManager.nextLockedFreePort(); + private final LocalBookkeeperEnsemble bk = new LocalBookkeeperEnsemble(2, zkPort, PortManager::nextLockedFreePort); + private final ServiceConfiguration config = new ServiceConfiguration(); + + @BeforeClass + protected void setup() throws Exception { + bk.start(); + config.setClusterName(cluster); + config.setAdvertisedAddress("localhost"); + config.setBrokerServicePort(Optional.of(0)); + config.setWebServicePort(Optional.of(0)); + config.setMetadataStoreUrl("zk:localhost:" + zkPort); + } + + @AfterClass + protected void cleanup() throws Exception { + bk.stop(); + } + + @Test(timeOut = 30000) + public void testBrokerRegistryFailure() throws Exception { + config.setLoadManagerClassName(BrokerRegistryLoadManager.class.getName()); + @Cleanup final var pulsar = new PulsarService(config); + try { + pulsar.start(); + Assert.fail(); + } catch (PulsarServerException e) { + Assert.assertNull(e.getCause()); + Assert.assertEquals(e.getMessage(), "Cannot start BrokerRegistry"); + } + Assert.assertTrue(pulsar.getLocalMetadataStore().getChildren(LoadManager.LOADBALANCE_BROKERS_ROOT).get() + .isEmpty()); + } + + @Test(timeOut = 30000) + public void testServiceUnitStateChannelFailure() throws Exception { + config.setLoadManagerClassName(ChannelLoadManager.class.getName()); + @Cleanup final var pulsar = new PulsarService(config); + try { + pulsar.start(); + Assert.fail(); + } catch (PulsarServerException e) { + Assert.assertNull(e.getCause()); + Assert.assertEquals(e.getMessage(), "Cannot start ServiceUnitStateChannel"); + } + Awaitility.await().untilAsserted(() -> Assert.assertTrue(pulsar.getLocalMetadataStore() + .getChildren(LoadManager.LOADBALANCE_BROKERS_ROOT).get().isEmpty())); + } + + private static class BrokerRegistryLoadManager extends ExtensibleLoadManagerImpl { + + @Override + protected BrokerRegistry createBrokerRegistry(PulsarService pulsar) { + final var mockBrokerRegistry = Mockito.mock(BrokerRegistryImpl.class); + try { + Mockito.doThrow(new PulsarServerException("Cannot start BrokerRegistry")).when(mockBrokerRegistry) + .start(); + } catch (PulsarServerException e) { + throw new RuntimeException(e); + } + return mockBrokerRegistry; + } + } + + private static class ChannelLoadManager extends ExtensibleLoadManagerImpl { + + @Override + protected ServiceUnitStateChannel createServiceUnitStateChannel(PulsarService pulsar) { + final var channel = Mockito.mock(ServiceUnitStateChannelImpl.class); + try { + Mockito.doThrow(new PulsarServerException("Cannot start ServiceUnitStateChannel")).when(channel) + .start(); + } catch (PulsarServerException e) { + throw new RuntimeException(e); + } + Mockito.doAnswer(__ -> null).when(channel).listen(Mockito.any()); + return channel; + } + } +}