Skip to content

Commit

Permalink
Automatically remove index blocks during remote reindex migration (#2…
Browse files Browse the repository at this point in the history
…0313)

* Automatically remove index blocks during remote reindex migration

* Added changelog

* extend migration integration test with blocks

* Better block removal

---------

Co-authored-by: Matthias Oesterheld <[email protected]>
  • Loading branch information
todvora and moesterheld authored Sep 13, 2024
1 parent f41bc23 commit 88a0747
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 1 deletion.
5 changes: 5 additions & 0 deletions changelog/unreleased/pr-20313.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
type="f"
message="Automatically remove index write blocks during remote reindex migration"

issues=["graylog-plugin-enterprise#8374"]
pulls=["20313"]
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ void testRemoteAsyncReindexing() throws ExecutionException, RetryException {
closeSourceIndex(indexName);

createTargetIndex(indexName, true);
createTargetIndex(indexName2, false);
blockTargetIndex(indexName2);

Assertions.assertThat(getTargetIndexState(indexName))
.isEqualTo(IndexState.CLOSE);

Expand Down Expand Up @@ -119,6 +122,10 @@ void testRemoteAsyncReindexing() throws ExecutionException, RetryException {

}

private void blockTargetIndex(String indexName) {
apis.backend().searchServerInstance().client().setIndexBlock(indexName);
}

private void openTargetIndex(String indexName) {
apis.backend().searchServerInstance().client().openIndex(indexName);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright (C) 2020 Graylog, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*/
package org.graylog.storage.opensearch2;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;

import java.util.List;
import java.util.Map;
import java.util.Optional;

@JsonIgnoreProperties(ignoreUnknown = true)
public record BlockResponse(@JsonProperty("blocks") Blocks blocks) {

/**
* This is some internal format structure. IDK what the integer number in the map means. For now
* we just ignore it and skip directly into the content.
* @param indices
*/
@JsonIgnoreProperties(ignoreUnknown = true)
record Blocks(@JsonProperty("indices") Map<String, Map<Integer, IndexBlock>> indices) {
public Optional<IndexBlock> forIndex(String indexName) {
return Optional.ofNullable(indices())
.stream()
.flatMap(indices -> indices.entrySet().stream())
.filter(entry -> entry.getKey().equals(indexName))
.map(e -> e.getValue().values().iterator().next())
.findFirst();
}
}

@JsonIgnoreProperties(ignoreUnknown = true)
record IndexBlock(String description, boolean retryable, List<BlockLevel> levels) {}

enum BlockLevel {
read, write, metadata_write
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import org.graylog.shaded.opensearch2.org.opensearch.action.admin.cluster.settings.ClusterGetSettingsRequest;
import org.graylog.shaded.opensearch2.org.opensearch.action.admin.cluster.settings.ClusterGetSettingsResponse;
import org.graylog.shaded.opensearch2.org.opensearch.action.admin.indices.open.OpenIndexRequest;
import org.graylog.shaded.opensearch2.org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.graylog.shaded.opensearch2.org.opensearch.action.support.master.AcknowledgedResponse;
import org.graylog.shaded.opensearch2.org.opensearch.client.Request;
import org.graylog.shaded.opensearch2.org.opensearch.client.RequestOptions;
import org.graylog.shaded.opensearch2.org.opensearch.client.Response;
Expand Down Expand Up @@ -460,6 +462,10 @@ private void executeReindexAsync(MigrationConfiguration migration, URI uri, Stri
postMigrationActions.add(() -> closeLocalIndex(migration, indexName));
}

retrieveIndexBlock(indexName)
.ifPresent(block -> removeLocalBlock(migration, indexName, block));


final BytesReference query = BytesReference.bytes(matchAllQuery().toXContent(builder, ToXContent.EMPTY_PARAMS));
logInfo(migration, "Executing async reindex for " + indexName);
final TaskSubmissionResponse task = client.execute((c, requestOptions) -> {
Expand All @@ -474,11 +480,47 @@ private void executeReindexAsync(MigrationConfiguration migration, URI uri, Stri
postMigrationActions.forEach(Runnable::run);

} catch (Exception e) {
final String message = "Could not reindex index: " + indexName + " - " + e.getMessage();
final String message = "Could not reindex index: " + indexName + " - " + formatErrorMessage(e);
logError(migration, message, e);
}
}

private static String formatErrorMessage(Exception e) {
StringBuilder message = new StringBuilder();
if (e.getMessage() != null) {
message.append(e.getMessage());
}

if (e.getCause() != null && e.getCause().getMessage() != null) {
message.append(" ").append(e.getCause().getMessage());
}
return message.toString();
}

private void removeLocalBlock(MigrationConfiguration migration, String indexName, BlockResponse.IndexBlock block) {
logInfo(migration, "Index " + indexName + " is blocked: " + block.description() + ". Removing the block now.");
final AcknowledgedResponse acknowledgedResponse = client.execute((restHighLevelClient, requestOptions) -> {
final UpdateSettingsRequest settingsRequest = new UpdateSettingsRequest();
settingsRequest.indices(indexName);
settingsRequest.settings(Map.of(
"index.blocks.write", false,
"index.blocks.read_only_allow_delete", false
));
return restHighLevelClient.indices().putSettings(settingsRequest, requestOptions);
});
}

private Optional<BlockResponse.IndexBlock> retrieveIndexBlock(String indexName) {
return client.execute((restHighLevelClient, requestOptions) -> {
final Response blocksResponse = restHighLevelClient.getLowLevelClient().performRequest(new Request("GET", "_cluster/state/blocks/"));
try (final InputStream is = blocksResponse.getEntity().getContent()) {
final BlockResponse indexBlocks = objectMapper.readValue(is, BlockResponse.class);
return indexBlocks.blocks().forIndex(indexName)
.filter(b -> b.levels().contains(BlockResponse.BlockLevel.write));
}
});
}

private void closeLocalIndex(MigrationConfiguration migration, String indexName) {
logInfo(migration, "Target index " + indexName + " is being closed after index migration");
client.execute((restHighLevelClient, requestOptions) -> restHighLevelClient.indices().close(new CloseIndexRequest(indexName), requestOptions));
Expand Down

0 comments on commit 88a0747

Please sign in to comment.