From 4073b87f706456fd07b4d61bdbdfb9c1d3fd12c4 Mon Sep 17 00:00:00 2001 From: Pravin Bhat Date: Tue, 3 Sep 2024 16:53:39 -0400 Subject: [PATCH] Implemented customTTL feature (#295) --- RELEASE.md | 3 +++ .../datastax/cdm/feature/WritetimeTTL.java | 24 +++++++++++++++---- .../cdm/properties/KnownProperties.java | 3 +++ src/resources/cdm-detailed.properties | 10 ++++++++ .../cdm/feature/TTLAndWritetimeTest.java | 9 +++++++ 5 files changed, 45 insertions(+), 4 deletions(-) diff --git a/RELEASE.md b/RELEASE.md index 500664cd..7dcf8aaf 100644 --- a/RELEASE.md +++ b/RELEASE.md @@ -1,4 +1,7 @@ # Release Notes +## [4.3.7] - 2024-09-03 +- Added property `spark.cdm.transform.custom.ttl` to allow a custom constant value to be set for TTL instead of using the values from `origin` rows. + ## [4.3.6] - 2024-08-29 - Added `overwrite` option to conditionally check or skip `Validation` when it has a non-null value in `target` for the `spark.cdm.feature.extractJson` feature. diff --git a/src/main/java/com/datastax/cdm/feature/WritetimeTTL.java b/src/main/java/com/datastax/cdm/feature/WritetimeTTL.java index f8b0efa4..78a94a8f 100644 --- a/src/main/java/com/datastax/cdm/feature/WritetimeTTL.java +++ b/src/main/java/com/datastax/cdm/feature/WritetimeTTL.java @@ -37,6 +37,7 @@ public class WritetimeTTL extends AbstractFeature { private List writetimeNames; private boolean autoWritetimeNames; private Long customWritetime = 0L; + private Long customTTL = 0L; private List ttlSelectColumnIndexes = null; private List writetimeSelectColumnIndexes = null; private Long filterMin; @@ -67,6 +68,11 @@ public boolean loadProperties(IPropertyHelper propertyHelper) { this.writetimeIncrement = propertyHelper.getLong(KnownProperties.TRANSFORM_CUSTOM_WRITETIME_INCREMENT); + this.customTTL = getCustomTTL(propertyHelper); + if (this.customTTL > 0) { + logger.info("PARAM -- {} is set to TTL {} ", KnownProperties.TRANSFORM_CUSTOM_TTL, customTTL); + } + this.filterMin = getMinFilter(propertyHelper); this.filterMax = getMaxFilter(propertyHelper); this.hasWriteTimestampFilter = (null != filterMin && null != filterMax && filterMin > 0 && filterMax > 0 && filterMax > filterMin); @@ -81,7 +87,7 @@ public boolean loadProperties(IPropertyHelper propertyHelper) { ((null != ttlNames && !ttlNames.isEmpty()) || (null != writetimeNames && !writetimeNames.isEmpty()) || autoTTLNames || autoWritetimeNames - || customWritetime > 0); + || customWritetime > 0 || customTTL > 0); isLoaded = true; return isValid; @@ -167,11 +173,16 @@ public static List getWritetimeNames(IPropertyHelper propertyHelper) { return CqlTable.unFormatNames(propertyHelper.getStringList(KnownProperties.ORIGIN_WRITETIME_NAMES)); } - public static Long getCustomWritetime(IPropertyHelper propertyHelper) { + protected static Long getCustomWritetime(IPropertyHelper propertyHelper) { Long cwt = propertyHelper.getLong(KnownProperties.TRANSFORM_CUSTOM_WRITETIME); return null==cwt ? 0L : cwt; } + protected static Long getCustomTTL(IPropertyHelper propertyHelper) { + Long cttl = propertyHelper.getLong(KnownProperties.TRANSFORM_CUSTOM_TTL); + return null == cttl ? 0L : cttl; + } + public static Long getMinFilter(IPropertyHelper propertyHelper) { return propertyHelper.getLong(KnownProperties.FILTER_WRITETS_MIN); } @@ -181,11 +192,15 @@ public static Long getMaxFilter(IPropertyHelper propertyHelper) { } public Long getCustomWritetime() { return customWritetime; } + public Long getCustomTTL() { return customTTL; } public boolean hasWriteTimestampFilter() { return isEnabled && hasWriteTimestampFilter; } public Long getMinWriteTimeStampFilter() { return (this.hasWriteTimestampFilter && null!=this.filterMin) ? this.filterMin : Long.MIN_VALUE; } public Long getMaxWriteTimeStampFilter() { return (this.hasWriteTimestampFilter && null!=this.filterMax) ? this.filterMax : Long.MAX_VALUE; } - public boolean hasTTLColumns() { return null!=this.ttlSelectColumnIndexes && !this.ttlSelectColumnIndexes.isEmpty(); } + public boolean hasTTLColumns() { + return customTTL > 0 || null != this.ttlSelectColumnIndexes && !this.ttlSelectColumnIndexes.isEmpty(); + } + public boolean hasWritetimeColumns() { return customWritetime>0 || null!=this.writetimeSelectColumnIndexes && !this.writetimeSelectColumnIndexes.isEmpty(); } public Long getLargestWriteTimeStamp(Row row) { @@ -200,7 +215,8 @@ public Long getLargestWriteTimeStamp(Row row) { } public Integer getLargestTTL(Row row) { - if (logDebug) logger.debug("getLargestTTL: ttlSelectColumnIndexes={}", ttlSelectColumnIndexes); + if (logDebug) logger.debug("getLargestTTL: customTTL={}, ttlSelectColumnIndexes={}", customTTL, ttlSelectColumnIndexes); + if (this.customTTL > 0) return this.customTTL.intValue(); if (null==this.ttlSelectColumnIndexes || this.ttlSelectColumnIndexes.isEmpty()) return null; OptionalInt max = this.ttlSelectColumnIndexes.stream() .mapToInt(row::getInt) diff --git a/src/main/java/com/datastax/cdm/properties/KnownProperties.java b/src/main/java/com/datastax/cdm/properties/KnownProperties.java index 5741cf80..a1a743d7 100644 --- a/src/main/java/com/datastax/cdm/properties/KnownProperties.java +++ b/src/main/java/com/datastax/cdm/properties/KnownProperties.java @@ -161,6 +161,7 @@ public enum PropertyType { public static final String TRANSFORM_REPLACE_MISSING_TS = "spark.cdm.transform.missing.key.ts.replace.value"; public static final String TRANSFORM_CUSTOM_WRITETIME = "spark.cdm.transform.custom.writetime"; public static final String TRANSFORM_CUSTOM_WRITETIME_INCREMENT = "spark.cdm.transform.custom.writetime.incrementBy"; + public static final String TRANSFORM_CUSTOM_TTL = "spark.cdm.transform.custom.ttl"; public static final String TRANSFORM_CODECS = "spark.cdm.transform.codecs"; public static final String TRANSFORM_CODECS_TIMESTAMP_STRING_FORMAT = "spark.cdm.transform.codecs.timestamp.string.format"; public static final String TRANSFORM_CODECS_TIMESTAMP_STRING_FORMAT_ZONE = "spark.cdm.transform.codecs.timestamp.string.zone"; @@ -170,6 +171,8 @@ public enum PropertyType { types.put(TRANSFORM_REPLACE_MISSING_TS, PropertyType.NUMBER); types.put(TRANSFORM_CUSTOM_WRITETIME, PropertyType.NUMBER); defaults.put(TRANSFORM_CUSTOM_WRITETIME, "0"); + types.put(TRANSFORM_CUSTOM_TTL, PropertyType.NUMBER); + defaults.put(TRANSFORM_CUSTOM_TTL, "0"); types.put(TRANSFORM_CUSTOM_WRITETIME_INCREMENT, PropertyType.NUMBER); defaults.put(TRANSFORM_CUSTOM_WRITETIME_INCREMENT, "0"); types.put(TRANSFORM_CODECS, PropertyType.STRING_LIST); diff --git a/src/resources/cdm-detailed.properties b/src/resources/cdm-detailed.properties index 77b98f9b..74208bb0 100644 --- a/src/resources/cdm-detailed.properties +++ b/src/resources/cdm-detailed.properties @@ -74,6 +74,9 @@ spark.cdm.connect.target.password cassandra # all origin columns that can have TTL set (which excludes partition key, # clustering key, collections and UDTs). When false, and .names is not set, the target # record will have the TTL determined by the target table configuration. +# +# *** Note spark.cdm.transform.custom.ttl overrides this setting *** +# # .names : Default is empty, meaning they will be determined automatically if that is set # (see above). Specify a subset of eligible columns that are used to calculate # the TTL of the target record. @@ -247,6 +250,12 @@ spark.cdm.perfops.ratelimit.target 20000 # and subsequent UPSERTs would add duplicates to the list. Future versions # of CDM may tombstone the previous list, but for now this solution is # viable and, crucially, more performant. +# .ttl Default is 0 (no expiry). Time-to-live value in seconds to use as the +# TTL for the target record. This is useful when the TTL of the record in +# Origin cannot be determined (such as the only non-key columns are +# collections) OR is a new TTL needs to be set during migration. This +# parameter allows a crude constant value to be used in its place, and +# overrides .schema.origin.column.ttl.names # .codecs Default is empty. A comma-separated list of additional codecs to # enable. Current codecs are: # INT_STRING : int stored in a String @@ -273,6 +282,7 @@ spark.cdm.perfops.ratelimit.target 20000 #spark.cdm.transform.missing.key.ts.replace.value 1685577600000 #spark.cdm.transform.custom.writetime 0 #spark.cdm.transform.custom.writetime.incrementBy 0 +#spark.cdm.transform.custom.ttl 0 #spark.cdm.transform.codecs #spark.cdm.transform.codecs.timestamp.string.format yyyyMMddHHmmss #spark.cdm.transform.codecs.timestamp.string.zone UTC diff --git a/src/test/java/com/datastax/cdm/feature/TTLAndWritetimeTest.java b/src/test/java/com/datastax/cdm/feature/TTLAndWritetimeTest.java index c9e34c52..c94ac38a 100644 --- a/src/test/java/com/datastax/cdm/feature/TTLAndWritetimeTest.java +++ b/src/test/java/com/datastax/cdm/feature/TTLAndWritetimeTest.java @@ -34,6 +34,7 @@ public class TTLAndWritetimeTest extends CommonMocks { WritetimeTTL feature; Long customWritetime = 123456789L; + Long customTTL = 1000L; Long filterMin = 100000000L; Long filterMax = 200000000L; String writetimeColumnName = "writetime_col"; @@ -69,6 +70,7 @@ private void setTestWhens(){ String argument = invocation.getArgument(0); return originValueColumns.contains(argument); }); + when(propertyHelper.getLong(KnownProperties.TRANSFORM_CUSTOM_TTL)).thenReturn(customTTL); } @@ -83,6 +85,7 @@ public void smoke_loadProperties() { assertAll( () -> assertTrue(feature.isEnabled(), "enabled"), () -> assertEquals(customWritetime, feature.getCustomWritetime(), "customWritetime"), + () -> assertEquals(customTTL, feature.getCustomTTL(), "customTTL"), () -> assertTrue(feature.hasWriteTimestampFilter(), "hasWriteTimestampFilter"), () -> assertTrue(feature.hasWritetimeColumns(), "hasWritetimeColumns with custom writetime"), () -> assertEquals(customWritetime, feature.getCustomWritetime(), "customWritetime is set"), @@ -113,6 +116,7 @@ public void smokeTest_disabledFeature() { when(propertyHelper.getLong(KnownProperties.FILTER_WRITETS_MAX)).thenReturn(null); when(propertyHelper.getStringList(KnownProperties.ORIGIN_TTL_NAMES)).thenReturn(null); when(propertyHelper.getLong(KnownProperties.TRANSFORM_CUSTOM_WRITETIME)).thenReturn(null); + when(propertyHelper.getLong(KnownProperties.TRANSFORM_CUSTOM_TTL)).thenReturn(null); when(propertyHelper.getStringList(KnownProperties.ORIGIN_WRITETIME_NAMES)).thenReturn(null); when(propertyHelper.getBoolean(KnownProperties.ORIGIN_WRITETIME_AUTO)).thenReturn(Boolean.FALSE); when(propertyHelper.getBoolean(KnownProperties.ORIGIN_TTL_AUTO)).thenReturn(Boolean.FALSE); @@ -135,6 +139,7 @@ public void smokeTest_enabledFeature_withOnlyWritetimeAuto() { when(propertyHelper.getLong(KnownProperties.FILTER_WRITETS_MAX)).thenReturn(null); when(propertyHelper.getStringList(KnownProperties.ORIGIN_TTL_NAMES)).thenReturn(null); when(propertyHelper.getLong(KnownProperties.TRANSFORM_CUSTOM_WRITETIME)).thenReturn(null); + when(propertyHelper.getLong(KnownProperties.TRANSFORM_CUSTOM_TTL)).thenReturn(null); when(propertyHelper.getStringList(KnownProperties.ORIGIN_WRITETIME_NAMES)).thenReturn(null); when(propertyHelper.getBoolean(KnownProperties.ORIGIN_WRITETIME_AUTO)).thenReturn(Boolean.TRUE); when(propertyHelper.getBoolean(KnownProperties.ORIGIN_TTL_AUTO)).thenReturn(Boolean.FALSE); @@ -174,6 +179,7 @@ public void smokeTest_enabledFeature_withOnlyTTLAuto() { @Test public void smoke_writetimeWithoutTTL() { when(propertyHelper.getLong(KnownProperties.TRANSFORM_CUSTOM_WRITETIME)).thenReturn(0L); + when(propertyHelper.getLong(KnownProperties.TRANSFORM_CUSTOM_TTL)).thenReturn(null); when(propertyHelper.getStringList(KnownProperties.ORIGIN_TTL_NAMES)).thenReturn(null); when(propertyHelper.getLong(KnownProperties.FILTER_WRITETS_MIN)).thenReturn(filterMin); when(propertyHelper.getLong(KnownProperties.FILTER_WRITETS_MAX)).thenReturn(filterMax); @@ -225,6 +231,7 @@ public void smoke_ttlWithoutWritetime_withCustomWritetime() { @Test public void smoke_autoWritetime_noCustomWritetime() { when(propertyHelper.getLong(KnownProperties.TRANSFORM_CUSTOM_WRITETIME)).thenReturn(0L); + when(propertyHelper.getLong(KnownProperties.TRANSFORM_CUSTOM_TTL)).thenReturn(null); when(propertyHelper.getStringList(KnownProperties.ORIGIN_WRITETIME_NAMES)).thenReturn(null); when(propertyHelper.getBoolean(KnownProperties.ORIGIN_WRITETIME_AUTO)).thenReturn(true); when(propertyHelper.getLong(KnownProperties.FILTER_WRITETS_MIN)).thenReturn(null); @@ -244,6 +251,7 @@ public void smoke_autoWritetime_noCustomWritetime() { @Test public void smoke_autoWritetime_CustomWritetime() { when(propertyHelper.getLong(KnownProperties.TRANSFORM_CUSTOM_WRITETIME)).thenReturn(100L); + when(propertyHelper.getLong(KnownProperties.TRANSFORM_CUSTOM_TTL)).thenReturn(null); when(propertyHelper.getStringList(KnownProperties.ORIGIN_WRITETIME_NAMES)).thenReturn(null); when(propertyHelper.getBoolean(KnownProperties.ORIGIN_WRITETIME_AUTO)).thenReturn(true); when(propertyHelper.getLong(KnownProperties.FILTER_WRITETS_MIN)).thenReturn(null); @@ -373,6 +381,7 @@ public void getLargestWriteTimeStampWithCustomTimeTest() { @Test public void getLargestTTLTest() { + when(propertyHelper.getLong(KnownProperties.TRANSFORM_CUSTOM_TTL)).thenReturn(null); when(originTable.indexOf("TTL("+ttlColumnName+")")).thenReturn(100); when(originRow.getInt(eq(100))).thenReturn(30); when(originTable.indexOf("TTL("+writetimeTTLColumnName+")")).thenReturn(101);