Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent race condition while deleting namespace #253

Draft
wants to merge 3 commits into
base: 3.1_ds
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ protected CompletableFuture<Void> internalCreateNamespace(Policies policies) {
return CompletableFuture.completedFuture(null);
})
.thenCompose(__ -> namespaceResources().createPoliciesAsync(namespaceName, policies))
.thenAccept(__ -> log.info("[{}] Created namespace {}", clientAppId(), namespaceName));
.thenAccept(__ -> log.info("[{}] Created namespace {} with policies {}", clientAppId(), namespaceName, policies));
}

protected CompletableFuture<List<String>> internalGetListOfTopics(Policies policies,
Expand Down Expand Up @@ -215,6 +215,8 @@ protected CompletableFuture<List<String>> internalGetNonPersistentTopics(Policie
}
private void internalRetryableDeleteNamespaceAsync0(boolean force, int retryTimes,
@Nonnull CompletableFuture<Void> callback) {
log.info("internalRetryableDeleteNamespaceAsync0: namespace={}, force={}, retryTimes={}",
namespaceName, force, retryTimes);
precheckWhenDeleteNamespace(namespaceName, force)
.thenCompose(policies -> {
final CompletableFuture<List<String>> topicsFuture;
Expand Down Expand Up @@ -267,13 +269,16 @@ private void internalRetryableDeleteNamespaceAsync0(boolean force, int retryTime
}
if (!force) {
if (hasNonSystemTopic) {
throw new RestException(Status.CONFLICT, "Cannot delete non empty namespace");
log.info("Namespace {} has non-system topics, force delete is required. User topics: {}, user partitioned topics: {}",
namespaceName, allUserCreatedTopics, allUserCreatedPartitionTopics);
throw new RestException(Status.CONFLICT, "Cannot delete non empty namespace " + namespaceName);
}
}
final CompletableFuture<Void> markDeleteFuture;
if (policies != null && policies.deleted) {
markDeleteFuture = CompletableFuture.completedFuture(null);
} else {
log.info("Marking the namespace {} as deleted", namespaceName);
markDeleteFuture = namespaceResources().setPoliciesAsync(namespaceName, old -> {
old.deleted = true;
return old;
Expand All @@ -298,6 +303,8 @@ private void internalRetryableDeleteNamespaceAsync0(boolean force, int retryTime
.thenCompose(bundles -> FutureUtil.waitForAll(bundles.getBundles().stream()
.map(bundle -> pulsar().getNamespaceService().checkOwnershipPresentAsync(bundle)
.thenCompose(present -> {
log.info("Delete namespace {} check bundle {} ownership present: {}",
namespaceName, bundle, present);
// check if the bundle is owned by any broker,
// if not then we do not need to delete the bundle
if (present) {
Expand All @@ -309,10 +316,15 @@ private void internalRetryableDeleteNamespaceAsync0(boolean force, int retryTime
clientAppId(), ex);
return FutureUtil.failedFuture(ex);
}
log.info("Deleting namespace bundle {}/{} (force: {})",
namespaceName, bundle.getBundleRange(), force);
return admin.namespaces().deleteNamespaceBundleAsync(namespaceName.toString(),
bundle.getBundleRange(), force);
} else {
log.info("Bundle {}/{} is not owned by any broker, no need to delete",
namespaceName, bundle);
return CompletableFuture.completedFuture(null);
}
return CompletableFuture.completedFuture(null);
})
).collect(Collectors.toList())))
.thenCompose(ignore -> internalClearZkSources())
Expand All @@ -321,16 +333,19 @@ private void internalRetryableDeleteNamespaceAsync0(boolean force, int retryTime
final Throwable rc = FutureUtil.unwrapCompletionException(error);
if (rc instanceof MetadataStoreException) {
if (rc.getCause() != null && rc.getCause() instanceof KeeperException.NotEmptyException) {
log.info("[{}] There are in-flight topics created during the namespace deletion, "
+ "retry to delete the namespace again.", namespaceName);
KeeperException.NotEmptyException notEmptyException = (KeeperException.NotEmptyException) rc.getCause();
log.warn("[{}] There are in-flight topics created during the namespace deletion or some bundle is still owned, "
+ "retry to delete the namespace again. Path is {}", namespaceName,
notEmptyException.getPath());

final int next = retryTimes - 1;
if (next > 0) {
// async recursive
internalRetryableDeleteNamespaceAsync0(force, next, callback);
} else {
callback.completeExceptionally(
new RestException(Status.CONFLICT, "The broker still have in-flight topics"
+ " created during namespace deletion, please try again."));
+ " created during namespace deletion or some bundle is still owned, please try again."));
// drop out recursive
}
return;
Expand Down Expand Up @@ -363,6 +378,7 @@ private CompletableFuture<Void> internalDeletePartitionedTopicsAsync(Set<String>
}

private CompletableFuture<Void> internalDeleteTopicsAsync(Set<String> topicNames) {
log.info("internalDeleteTopicsAsync: {}", topicNames);
if (CollectionUtils.isEmpty(topicNames)) {
return CompletableFuture.completedFuture(null);
}
Expand Down Expand Up @@ -453,6 +469,7 @@ private CompletableFuture<Policies> precheckWhenDeleteNamespace(NamespaceName ns

// clear zk-node resources for deleting namespace
protected CompletableFuture<Void> internalClearZkSources() {
log.info("internalClearZkSources: {}", namespaceName);
// clear resource of `/namespace/{namespaceName}` for zk-node
return namespaceResources().deleteNamespaceAsync(namespaceName)
.thenCompose(ignore -> namespaceResources().getPartitionedTopicResources()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,11 +208,11 @@ public CompletableFuture<NamespaceEphemeralData> tryAcquiringOwnership(Namespace
*/
public CompletableFuture<Void> removeOwnership(NamespaceBundle bundle) {
ResourceLock<NamespaceEphemeralData> lock = locallyAcquiredLocks.remove(bundle);
log.info("Removing ownership of {} (current lock is {})", bundle, lock);
if (lock == null) {
// We don't own the specified bundle anymore
return CompletableFuture.completedFuture(null);
}

return lock.release();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,16 @@ public CompletableFuture<Void> addOwnedNamespaceBundleAsync(NamespaceBundle name
return CompletableFuture.completedFuture(null);
}
synchronized (this) {
if (readerCaches.get(namespace) != null) {
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture =
readerCaches.get(namespace);
boolean readerCachedAndNotFailed = readerCompletableFuture != null
&& readerCompletableFuture.isDone()
&& !readerCompletableFuture.isCompletedExceptionally();
boolean completedExceptionally = readerCompletableFuture != null
&& readerCompletableFuture.isCompletedExceptionally();
log.info("[{}] addOwnedNamespaceBundleAsync, readerCachedAndNotFailed: {}, completedExceptionally: {}", namespace,
readerCachedAndNotFailed, completedExceptionally);
if (readerCachedAndNotFailed) {
ownedBundlesCountPerNamespace.get(namespace).incrementAndGet();
return CompletableFuture.completedFuture(null);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -898,10 +898,9 @@ public static CompletableFuture<ClusterDataImpl> checkLocalOrGetPeerReplicationC
if (policiesResult.isPresent()) {
Policies policies = policiesResult.get();
if (!allowDeletedNamespace && policies.deleted) {
String msg = String.format("Namespace %s is deleted", namespace.toString());
String msg = String.format("Namespace %s is marked as deleted", namespace.toString());
log.warn(msg);
validationFuture.completeExceptionally(new RestException(Status.NOT_FOUND,
"Namespace is deleted"));
validationFuture.completeExceptionally(new RestException(Status.NOT_FOUND, msg));
} else if (policies.replication_clusters.isEmpty()) {
String msg = String.format(
"Namespace does not have any clusters configured : local_cluster=%s ns=%s",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ public CompletableFuture<Optional<CacheGetResult<T>>> asyncReload(
if (store instanceof AbstractMetadataStore && ((AbstractMetadataStore) store).isConnected()) {
return readValueFromStore(key);
} else {
log.error("Cannot refresh cache item for key {} because we're not connected to the metadata store", key);
// Do not try to refresh the cache item if we know that we're not connected to the
// metadata store
return CompletableFuture.completedFuture(oldValue);
Expand Down Expand Up @@ -216,6 +217,7 @@ public CompletableFuture<Void> create(String path, T value) {
}

CompletableFuture<Void> future = new CompletableFuture<>();
log.info("Creating path {} with value {}", path, value);
store.put(path, content, Optional.of(-1L))
.thenAccept(stat -> {
// Make sure we have the value cached before the operation is completed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ public synchronized CompletableFuture<Void> updateValue(T newValue) {
return sequencer.sequential(() -> {
synchronized (ResourceLockImpl.this) {
if (state != State.Valid) {
log.error("Cannot update value on lock at {} because it's not in valid state: {}", path, state,
new Exception("invalid lock state " + state + " at " + path).fillInStackTrace());
return CompletableFuture.failedFuture(
new IllegalStateException("Lock was not in valid state: " + state));
}
Expand Down Expand Up @@ -210,12 +212,17 @@ synchronized CompletableFuture<Void> revalidateIfNeededAfterReconnection() {
* This method is thread-safe and it will perform multiple re-validation operations in turn.
*/
synchronized CompletableFuture<Void> silentRevalidateOnce() {
if (state == State.Releasing) {
log.info("Lock on resource {} is being released. Skip revalidation", path);
return CompletableFuture.completedFuture(null);
}
return sequencer.sequential(() -> revalidate(value))
.thenRun(() -> log.info("Successfully revalidated the lock on {}", path))
.exceptionally(ex -> {
synchronized (ResourceLockImpl.this) {
Throwable realCause = FutureUtil.unwrapCompletionException(ex);
if (realCause instanceof BadVersionException || realCause instanceof LockBusyException) {
if (realCause instanceof BadVersionException
|| realCause instanceof LockBusyException) {
log.warn("Failed to revalidate the lock at {}. Marked as expired. {}",
path, realCause.getMessage());
state = State.Released;
Expand All @@ -237,7 +244,7 @@ private synchronized CompletableFuture<Void> revalidate(T newValue) {
// Since the distributed lock has been expired, we don't need to revalidate it.
if (state != State.Valid && state != State.Init) {
return CompletableFuture.failedFuture(
new IllegalStateException("Lock was not in valid state: " + state));
new IllegalStateException("Lock for " + path + " was not in valid state: " + state));
}
if (log.isDebugEnabled()) {
log.debug("doRevalidate with newValue={}, version={}", newValue, version);
Expand Down
Loading