Skip to content

Commit

Permalink
refactoring codec service
Browse files Browse the repository at this point in the history
Signed-off-by: Sarthak Aggarwal <[email protected]>
  • Loading branch information
sarthakaggarwal97 committed Jul 2, 2023
1 parent 3f76e3c commit 5732823
Show file tree
Hide file tree
Showing 15 changed files with 57 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public class CorrelationCodecService extends CodecService {
* @param codecServiceConfig Generic codec service config
*/
public CorrelationCodecService(CodecServiceConfig codecServiceConfig) {
super(codecServiceConfig.getMapperService(), codecServiceConfig.getLogger());
super(codecServiceConfig.getMapperService(), codecServiceConfig.getIndexSettings(), codecServiceConfig.getLogger());
mapperService = codecServiceConfig.getMapperService();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ EngineConfig engineConfigWithLargerIndexingMemory(EngineConfig config) {
.mergePolicy(config.getMergePolicy())
.analyzer(config.getAnalyzer())
.similarity(config.getSimilarity())
.codecService(new CodecService(null, LogManager.getLogger(IndexingMemoryControllerIT.class)))
.codecService(new CodecService(null, indexSettings, LogManager.getLogger(IndexingMemoryControllerIT.class)))
.eventListener(config.getEventListener())
.queryCache(config.getQueryCache())
.queryCachingPolicy(config.getQueryCachingPolicy())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.lucene.codecs.lucene95.Lucene95Codec.Mode;
import org.opensearch.common.Nullable;
import org.opensearch.common.collect.MapBuilder;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.codec.customcodecs.ZstdCodec;
import org.opensearch.index.codec.customcodecs.ZstdNoDictCodec;
import org.opensearch.index.mapper.MapperService;
Expand Down Expand Up @@ -67,15 +68,15 @@ public class CodecService {
public static final String ZSTD_CODEC = "zstd";
public static final String ZSTD_NO_DICT_CODEC = "zstd_no_dict";

public CodecService(@Nullable MapperService mapperService, Logger logger) {
public CodecService(@Nullable MapperService mapperService, IndexSettings indexSettings, Logger logger) {
final MapBuilder<String, Codec> codecs = MapBuilder.<String, Codec>newMapBuilder();
int compressionLevel = indexSettings.getValue(INDEX_CODEC_COMPRESSION_LEVEL_SETTING);
if (mapperService == null) {
codecs.put(DEFAULT_CODEC, new Lucene95Codec());
codecs.put(BEST_COMPRESSION_CODEC, new Lucene95Codec(Mode.BEST_COMPRESSION));
codecs.put(ZSTD_CODEC, new ZstdCodec());
codecs.put(ZSTD_NO_DICT_CODEC, new ZstdNoDictCodec());
codecs.put(ZSTD_CODEC, new ZstdCodec(compressionLevel));
codecs.put(ZSTD_NO_DICT_CODEC, new ZstdNoDictCodec(compressionLevel));
} else {
int compressionLevel = mapperService.getIndexSettings().getValue(INDEX_CODEC_COMPRESSION_LEVEL_SETTING);
codecs.put(DEFAULT_CODEC, new PerFieldMappingPostingFormatCodec(Mode.BEST_SPEED, mapperService, logger));
codecs.put(BEST_COMPRESSION_CODEC, new PerFieldMappingPostingFormatCodec(Mode.BEST_COMPRESSION, mapperService, logger));
codecs.put(ZSTD_CODEC, new ZstdCodec(mapperService, logger, compressionLevel));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ public IndexShard(
assert shardRouting.initializing();
this.shardRouting = shardRouting;
final Settings settings = indexSettings.getSettings();
this.codecService = new CodecService(mapperService, logger);
this.codecService = new CodecService(mapperService, indexSettings, logger);
this.warmer = warmer;
this.similarityService = similarityService;
Objects.requireNonNull(store, "Store must be provided to the index shard");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@
import org.opensearch.common.UUIDs;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.set.Sets;
import org.opensearch.env.Environment;
import org.opensearch.env.ShardLockObtainFailedException;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.codec.CodecService;
import org.opensearch.index.shard.ShardId;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
Expand All @@ -69,6 +71,7 @@
import org.opensearch.snapshots.SnapshotId;
import org.opensearch.snapshots.SnapshotShardSizeInfo;
import org.junit.Before;
import org.opensearch.test.IndexSettingsModule;

import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -803,21 +806,25 @@ public TestAllocator addData(
}

public TestAllocator addData(DiscoveryNode node, String allocationId, boolean primary) {
Settings nodeSettings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir()).build();
IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("test", nodeSettings);
return addData(
node,
allocationId,
primary,
ReplicationCheckpoint.empty(shardId, new CodecService(null, null).codec("default").getName()),
ReplicationCheckpoint.empty(shardId, new CodecService(null, indexSettings, null).codec("default").getName()),
null
);
}

public TestAllocator addData(DiscoveryNode node, String allocationId, boolean primary, @Nullable Exception storeException) {
Settings nodeSettings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir()).build();
IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("test", nodeSettings);
return addData(
node,
allocationId,
primary,
ReplicationCheckpoint.empty(shardId, new CodecService(null, null).codec("default").getName()),
ReplicationCheckpoint.empty(shardId, new CodecService(null, indexSettings, null).codec("default").getName()),
storeException
);
}
Expand Down
24 changes: 9 additions & 15 deletions server/src/test/java/org/opensearch/index/codec/CodecTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import org.apache.lucene.index.SegmentReader;
import org.apache.lucene.store.Directory;
import org.apache.lucene.tests.util.LuceneTestCase.SuppressCodecs;
import org.opensearch.common.Randomness;
import org.opensearch.common.settings.Settings;
import org.opensearch.env.Environment;
import org.opensearch.index.IndexSettings;
Expand Down Expand Up @@ -96,15 +95,15 @@ public void testZstdNoDict() throws Exception {
}

public void testZstdWithCompressionLevel() throws Exception {
int randomCompressionLevel = generateRandomNumber(6, 1);
int randomCompressionLevel = randomIntBetween(6, 1);
Codec codec = createCodecService(randomCompressionLevel).codec("zstd");
assertStoredFieldsCompressionEquals(Lucene95CustomCodec.Mode.ZSTD, codec);
Lucene95CustomStoredFieldsFormat storedFieldsFormat = (Lucene95CustomStoredFieldsFormat) codec.storedFieldsFormat();
assertEquals(randomCompressionLevel, storedFieldsFormat.getCompressionLevel());
}

public void testZstdNoDictWithCompressionLevel() throws Exception {
int randomCompressionLevel = generateRandomNumber(6, 1);
int randomCompressionLevel = randomIntBetween(6, 1);
Codec codec = createCodecService(randomCompressionLevel).codec("zstd_no_dict");
assertStoredFieldsCompressionEquals(Lucene95CustomCodec.Mode.ZSTD_NO_DICT, codec);
Lucene95CustomStoredFieldsFormat storedFieldsFormat = (Lucene95CustomStoredFieldsFormat) codec.storedFieldsFormat();
Expand Down Expand Up @@ -155,11 +154,10 @@ private void assertStoredFieldsCompressionEquals(Lucene95CustomCodec.Mode expect
}

private CodecService createCodecService(boolean isMapperServiceNull) throws IOException {

Settings nodeSettings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir()).build();
if (isMapperServiceNull) {
return new CodecService(null, LogManager.getLogger("test"));
return new CodecService(null, IndexSettingsModule.newIndexSettings("_na", nodeSettings), LogManager.getLogger("test"));
}
Settings nodeSettings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir()).build();
return buildCodecService(nodeSettings);
}

Expand All @@ -173,12 +171,12 @@ private CodecService createCodecService(int randomCompressionLevel) throws IOExc

private CodecService buildCodecService(Settings nodeSettings) throws IOException {

IndexSettings settings = IndexSettingsModule.newIndexSettings("_na", nodeSettings);
SimilarityService similarityService = new SimilarityService(settings, null, Collections.emptyMap());
IndexAnalyzers indexAnalyzers = createTestAnalysis(settings, nodeSettings).indexAnalyzers;
IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("_na", nodeSettings);
SimilarityService similarityService = new SimilarityService(indexSettings, null, Collections.emptyMap());
IndexAnalyzers indexAnalyzers = createTestAnalysis(indexSettings, nodeSettings).indexAnalyzers;
MapperRegistry mapperRegistry = new MapperRegistry(Collections.emptyMap(), Collections.emptyMap(), MapperPlugin.NOOP_FIELD_FILTER);
MapperService service = new MapperService(
settings,
indexSettings,
indexAnalyzers,
xContentRegistry(),
similarityService,
Expand All @@ -187,7 +185,7 @@ private CodecService buildCodecService(Settings nodeSettings) throws IOException
() -> false,
null
);
return new CodecService(service, LogManager.getLogger("test"));
return new CodecService(service, indexSettings, LogManager.getLogger("test"));
}

private SegmentReader getSegmentReader(Codec codec) throws IOException {
Expand All @@ -205,8 +203,4 @@ private SegmentReader getSegmentReader(Codec codec) throws IOException {
return sr;
}

private int generateRandomNumber(int max, int min) {
return Randomness.get().nextInt(max - min + 1) + min;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

package org.opensearch.index.codec.customcodecs;

import org.opensearch.common.Randomness;
import org.opensearch.test.OpenSearchTestCase;

public class Lucene95CustomStoredFieldsFormatTests extends OpenSearchTestCase {
Expand All @@ -26,7 +25,7 @@ public void testZstdNoDictLucene95CustomCodecMode() {
}

public void testZstdModeWithCompressionLevel() {
int randomCompressionLevel = generateRandomNumber(6, 1);
int randomCompressionLevel = randomIntBetween(6, 1);
Lucene95CustomStoredFieldsFormat lucene95CustomStoredFieldsFormat = new Lucene95CustomStoredFieldsFormat(
Lucene95CustomCodec.Mode.ZSTD,
randomCompressionLevel
Expand All @@ -36,7 +35,7 @@ public void testZstdModeWithCompressionLevel() {
}

public void testZstdNoDictLucene95CustomCodecModeWithCompressionLevel() {
int randomCompressionLevel = generateRandomNumber(6, 1);
int randomCompressionLevel = randomIntBetween(6, 1);
Lucene95CustomStoredFieldsFormat lucene95CustomStoredFieldsFormat = new Lucene95CustomStoredFieldsFormat(
Lucene95CustomCodec.Mode.ZSTD_NO_DICT,
randomCompressionLevel
Expand All @@ -45,8 +44,4 @@ public void testZstdNoDictLucene95CustomCodecModeWithCompressionLevel() {
assertEquals(randomCompressionLevel, lucene95CustomStoredFieldsFormat.getCompressionLevel());
}

private int generateRandomNumber(int max, int min) {
return Randomness.get().nextInt(max - min + 1) + min;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ public Optional<EngineFactory> getEngineFactory(final IndexSettings indexSetting

@Override
public Optional<CodecService> getCustomCodecService(IndexSettings indexSettings) {
return Optional.of(new CodecService(null, LogManager.getLogger(getClass())));
return Optional.of(new CodecService(null, indexSettings, LogManager.getLogger(getClass())));
}

@Override
Expand All @@ -195,7 +195,7 @@ public Optional<EngineFactory> getEngineFactory(final IndexSettings indexSetting

@Override
public Optional<CodecService> getCustomCodecService(IndexSettings indexSettings) {
return Optional.of(new CodecService(null, LogManager.getLogger(getClass())));
return Optional.of(new CodecService(null, indexSettings, LogManager.getLogger(getClass())));
}
}

Expand All @@ -207,7 +207,9 @@ public Optional<EngineFactory> getEngineFactory(final IndexSettings indexSetting

@Override
public Optional<CodecServiceFactory> getCustomCodecServiceFactory(IndexSettings indexSettings) {
return Optional.of(config -> new CodecService(config.getMapperService(), LogManager.getLogger(getClass())));
return Optional.of(
config -> new CodecService(config.getMapperService(), config.getIndexSettings(), LogManager.getLogger(getClass()))
);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3226,7 +3226,7 @@ public void testFailStart() throws IOException {
}

public void testSettings() {
CodecService codecService = new CodecService(null, logger);
CodecService codecService = new CodecService(null, engine.config().getIndexSettings(), logger);
LiveIndexWriterConfig currentIndexWriterConfig = engine.getCurrentIndexWriterConfig();

assertEquals(engine.config().getCodec().getName(), codecService.codec(codecName).getName());
Expand Down Expand Up @@ -3696,7 +3696,7 @@ public void testRecoverFromForeignTranslog() throws IOException {
.mergePolicy(newMergePolicy())
.analyzer(config.getAnalyzer())
.similarity(config.getSimilarity())
.codecService(new CodecService(null, logger))
.codecService(new CodecService(null, config.getIndexSettings(), logger))
.eventListener(config.getEventListener())
.queryCache(IndexSearcher.getDefaultQueryCache())
.queryCachingPolicy(IndexSearcher.getDefaultQueryCachingPolicy())
Expand Down Expand Up @@ -3738,7 +3738,7 @@ public CustomTranslogDeletionPolicy(IndexSettings indexSettings, Supplier<Retent
.mergePolicy(config.getMergePolicy())
.analyzer(config.getAnalyzer())
.similarity(config.getSimilarity())
.codecService(new CodecService(null, logger))
.codecService(new CodecService(null, config.getIndexSettings(), logger))
.eventListener(config.getEventListener())
.queryCache(config.getQueryCache())
.queryCachingPolicy(config.getQueryCachingPolicy())
Expand Down Expand Up @@ -7384,7 +7384,7 @@ public void testNotWarmUpSearcherInEngineCtor() throws Exception {
.mergePolicy(config.getMergePolicy())
.analyzer(config.getAnalyzer())
.similarity(config.getSimilarity())
.codecService(new CodecService(null, logger))
.codecService(new CodecService(null, config.getIndexSettings(), logger))
.eventListener(config.getEventListener())
.queryCache(config.getQueryCache())
.queryCachingPolicy(config.getQueryCachingPolicy())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4865,7 +4865,7 @@ public void testCloseShardWhileEngineIsWarming() throws Exception {
.mergePolicy(config.getMergePolicy())
.analyzer(config.getAnalyzer())
.similarity(config.getSimilarity())
.codecService(new CodecService(null, logger))
.codecService(new CodecService(null, config.getIndexSettings(), logger))
.eventListener(config.getEventListener())
.queryCache(config.getQueryCache())
.queryCachingPolicy(config.getQueryCachingPolicy())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ public void onFailedEngine(String reason, @Nullable Exception e) {
.mergePolicy(newMergePolicy())
.analyzer(iwc.getAnalyzer())
.similarity(iwc.getSimilarity())
.codecService(new CodecService(null, logger))
.codecService(new CodecService(null, indexSettings, logger))
.eventListener(eventListener)
.queryCache(IndexSearcher.getDefaultQueryCache())
.queryCachingPolicy(IndexSearcher.getDefaultQueryCachingPolicy())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ EngineConfig configWithRefreshListener(EngineConfig config, ReferenceManager.Ref
.mergePolicy(config.getMergePolicy())
.analyzer(config.getAnalyzer())
.similarity(config.getSimilarity())
.codecService(new CodecService(null, logger))
.codecService(new CodecService(null, config.getIndexSettings(), logger))
.eventListener(config.getEventListener())
.queryCache(config.getQueryCache())
.queryCachingPolicy(config.getQueryCachingPolicy())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public void setUp() throws Exception {

ShardId testShardId = primary.shardId();

CodecService codecService = new CodecService(null, null);
CodecService codecService = new CodecService(null, getEngine(primary).config().getIndexSettings(), null);
String defaultCodecName = codecService.codec(CodecService.DEFAULT_CODEC).getName();

// This mirrors the creation of the ReplicationCheckpoint inside CopyState
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,16 @@
import org.apache.lucene.util.Version;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.concurrent.GatedCloseable;
import org.opensearch.common.settings.Settings;
import org.opensearch.env.Environment;
import org.opensearch.index.codec.CodecService;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.IndexShardTestCase;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.test.IndexSettingsModule;

import java.io.IOException;
import java.util.Map;
Expand Down Expand Up @@ -52,7 +55,10 @@ public class CopyStateTests extends IndexShardTestCase {
public void testCopyStateCreation() throws IOException {
final IndexShard mockIndexShard = createMockIndexShard();
CopyState copyState = new CopyState(
ReplicationCheckpoint.empty(mockIndexShard.shardId(), new CodecService(null, null).codec("default").getName()),
ReplicationCheckpoint.empty(
mockIndexShard.shardId(),
new CodecService(null, mockIndexShard.indexSettings(), null).codec("default").getName()
),
mockIndexShard
);
ReplicationCheckpoint checkpoint = copyState.getCheckpoint();
Expand All @@ -71,6 +77,9 @@ public static IndexShard createMockIndexShard() throws IOException {
Store mockStore = mock(Store.class);
when(mockShard.store()).thenReturn(mockStore);

Settings nodeSettings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir()).build();
when(mockShard.indexSettings()).thenReturn(IndexSettingsModule.newIndexSettings("_na", nodeSettings));

SegmentInfos testSegmentInfos = new SegmentInfos(Version.LATEST.major);
ReplicationCheckpoint testCheckpoint = new ReplicationCheckpoint(
mockShard.shardId(),
Expand Down
Loading

0 comments on commit 5732823

Please sign in to comment.