diff --git a/changelog/unreleased/pr-20313.toml b/changelog/unreleased/pr-20313.toml new file mode 100644 index 000000000000..de839dbf562a --- /dev/null +++ b/changelog/unreleased/pr-20313.toml @@ -0,0 +1,5 @@ +type="f" +message="Automatically remove index write blocks during remote reindex migration" + +issues=["graylog-plugin-enterprise#8374"] +pulls=["20313"] diff --git a/full-backend-tests/src/test/java/org/graylog/datanode/RemoteReindexingMigrationIT.java b/full-backend-tests/src/test/java/org/graylog/datanode/RemoteReindexingMigrationIT.java index 45ec0aef8b26..1e5835a9cbaf 100644 --- a/full-backend-tests/src/test/java/org/graylog/datanode/RemoteReindexingMigrationIT.java +++ b/full-backend-tests/src/test/java/org/graylog/datanode/RemoteReindexingMigrationIT.java @@ -83,6 +83,9 @@ void testRemoteAsyncReindexing() throws ExecutionException, RetryException { closeSourceIndex(indexName); createTargetIndex(indexName, true); + createTargetIndex(indexName2, false); + blockTargetIndex(indexName2); + Assertions.assertThat(getTargetIndexState(indexName)) .isEqualTo(IndexState.CLOSE); @@ -119,6 +122,10 @@ void testRemoteAsyncReindexing() throws ExecutionException, RetryException { } + private void blockTargetIndex(String indexName) { + apis.backend().searchServerInstance().client().setIndexBlock(indexName); + } + private void openTargetIndex(String indexName) { apis.backend().searchServerInstance().client().openIndex(indexName); } diff --git a/graylog-storage-opensearch2/src/main/java/org/graylog/storage/opensearch2/BlockResponse.java b/graylog-storage-opensearch2/src/main/java/org/graylog/storage/opensearch2/BlockResponse.java new file mode 100644 index 000000000000..0f5697eeda07 --- /dev/null +++ b/graylog-storage-opensearch2/src/main/java/org/graylog/storage/opensearch2/BlockResponse.java @@ -0,0 +1,52 @@ +/* + * Copyright (C) 2020 Graylog, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + */ +package org.graylog.storage.opensearch2; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.List; +import java.util.Map; +import java.util.Optional; + +@JsonIgnoreProperties(ignoreUnknown = true) +public record BlockResponse(@JsonProperty("blocks") Blocks blocks) { + + /** + * This is some internal format structure. IDK what the integer number in the map means. For now + * we just ignore it and skip directly into the content. + * @param indices + */ + @JsonIgnoreProperties(ignoreUnknown = true) + record Blocks(@JsonProperty("indices") Map> indices) { + public Optional forIndex(String indexName) { + return Optional.ofNullable(indices()) + .stream() + .flatMap(indices -> indices.entrySet().stream()) + .filter(entry -> entry.getKey().equals(indexName)) + .map(e -> e.getValue().values().iterator().next()) + .findFirst(); + } + } + + @JsonIgnoreProperties(ignoreUnknown = true) + record IndexBlock(String description, boolean retryable, List levels) {} + + enum BlockLevel { + read, write, metadata_write + } +} diff --git a/graylog-storage-opensearch2/src/main/java/org/graylog/storage/opensearch2/RemoteReindexingMigrationAdapterOS2.java b/graylog-storage-opensearch2/src/main/java/org/graylog/storage/opensearch2/RemoteReindexingMigrationAdapterOS2.java index ed493e85902a..5331071b7930 100644 --- a/graylog-storage-opensearch2/src/main/java/org/graylog/storage/opensearch2/RemoteReindexingMigrationAdapterOS2.java +++ b/graylog-storage-opensearch2/src/main/java/org/graylog/storage/opensearch2/RemoteReindexingMigrationAdapterOS2.java @@ -36,6 +36,8 @@ import org.graylog.shaded.opensearch2.org.opensearch.action.admin.cluster.settings.ClusterGetSettingsRequest; import org.graylog.shaded.opensearch2.org.opensearch.action.admin.cluster.settings.ClusterGetSettingsResponse; import org.graylog.shaded.opensearch2.org.opensearch.action.admin.indices.open.OpenIndexRequest; +import org.graylog.shaded.opensearch2.org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest; +import org.graylog.shaded.opensearch2.org.opensearch.action.support.master.AcknowledgedResponse; import org.graylog.shaded.opensearch2.org.opensearch.client.Request; import org.graylog.shaded.opensearch2.org.opensearch.client.RequestOptions; import org.graylog.shaded.opensearch2.org.opensearch.client.Response; @@ -460,6 +462,10 @@ private void executeReindexAsync(MigrationConfiguration migration, URI uri, Stri postMigrationActions.add(() -> closeLocalIndex(migration, indexName)); } + retrieveIndexBlock(indexName) + .ifPresent(block -> removeLocalBlock(migration, indexName, block)); + + final BytesReference query = BytesReference.bytes(matchAllQuery().toXContent(builder, ToXContent.EMPTY_PARAMS)); logInfo(migration, "Executing async reindex for " + indexName); final TaskSubmissionResponse task = client.execute((c, requestOptions) -> { @@ -474,11 +480,47 @@ private void executeReindexAsync(MigrationConfiguration migration, URI uri, Stri postMigrationActions.forEach(Runnable::run); } catch (Exception e) { - final String message = "Could not reindex index: " + indexName + " - " + e.getMessage(); + final String message = "Could not reindex index: " + indexName + " - " + formatErrorMessage(e); logError(migration, message, e); } } + private static String formatErrorMessage(Exception e) { + StringBuilder message = new StringBuilder(); + if (e.getMessage() != null) { + message.append(e.getMessage()); + } + + if (e.getCause() != null && e.getCause().getMessage() != null) { + message.append(" ").append(e.getCause().getMessage()); + } + return message.toString(); + } + + private void removeLocalBlock(MigrationConfiguration migration, String indexName, BlockResponse.IndexBlock block) { + logInfo(migration, "Index " + indexName + " is blocked: " + block.description() + ". Removing the block now."); + final AcknowledgedResponse acknowledgedResponse = client.execute((restHighLevelClient, requestOptions) -> { + final UpdateSettingsRequest settingsRequest = new UpdateSettingsRequest(); + settingsRequest.indices(indexName); + settingsRequest.settings(Map.of( + "index.blocks.write", false, + "index.blocks.read_only_allow_delete", false + )); + return restHighLevelClient.indices().putSettings(settingsRequest, requestOptions); + }); + } + + private Optional retrieveIndexBlock(String indexName) { + return client.execute((restHighLevelClient, requestOptions) -> { + final Response blocksResponse = restHighLevelClient.getLowLevelClient().performRequest(new Request("GET", "_cluster/state/blocks/")); + try (final InputStream is = blocksResponse.getEntity().getContent()) { + final BlockResponse indexBlocks = objectMapper.readValue(is, BlockResponse.class); + return indexBlocks.blocks().forIndex(indexName) + .filter(b -> b.levels().contains(BlockResponse.BlockLevel.write)); + } + }); + } + private void closeLocalIndex(MigrationConfiguration migration, String indexName) { logInfo(migration, "Target index " + indexName + " is being closed after index migration"); client.execute((restHighLevelClient, requestOptions) -> restHighLevelClient.indices().close(new CloseIndexRequest(indexName), requestOptions));