Skip to content

Commit

Permalink
Implemented customTTL feature (#295)
Browse files Browse the repository at this point in the history
  • Loading branch information
pravinbhat authored Sep 3, 2024
1 parent 2f04303 commit 4073b87
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 4 deletions.
3 changes: 3 additions & 0 deletions RELEASE.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
# Release Notes
## [4.3.7] - 2024-09-03
- Added property `spark.cdm.transform.custom.ttl` to allow a custom constant value to be set for TTL instead of using the values from `origin` rows.

## [4.3.6] - 2024-08-29
- Added `overwrite` option to conditionally check or skip `Validation` when it has a non-null value in `target` for the `spark.cdm.feature.extractJson` feature.

Expand Down
24 changes: 20 additions & 4 deletions src/main/java/com/datastax/cdm/feature/WritetimeTTL.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class WritetimeTTL extends AbstractFeature {
private List<String> writetimeNames;
private boolean autoWritetimeNames;
private Long customWritetime = 0L;
private Long customTTL = 0L;
private List<Integer> ttlSelectColumnIndexes = null;
private List<Integer> writetimeSelectColumnIndexes = null;
private Long filterMin;
Expand Down Expand Up @@ -67,6 +68,11 @@ public boolean loadProperties(IPropertyHelper propertyHelper) {

this.writetimeIncrement = propertyHelper.getLong(KnownProperties.TRANSFORM_CUSTOM_WRITETIME_INCREMENT);

this.customTTL = getCustomTTL(propertyHelper);
if (this.customTTL > 0) {
logger.info("PARAM -- {} is set to TTL {} ", KnownProperties.TRANSFORM_CUSTOM_TTL, customTTL);
}

this.filterMin = getMinFilter(propertyHelper);
this.filterMax = getMaxFilter(propertyHelper);
this.hasWriteTimestampFilter = (null != filterMin && null != filterMax && filterMin > 0 && filterMax > 0 && filterMax > filterMin);
Expand All @@ -81,7 +87,7 @@ public boolean loadProperties(IPropertyHelper propertyHelper) {
((null != ttlNames && !ttlNames.isEmpty())
|| (null != writetimeNames && !writetimeNames.isEmpty())
|| autoTTLNames || autoWritetimeNames
|| customWritetime > 0);
|| customWritetime > 0 || customTTL > 0);

isLoaded = true;
return isValid;
Expand Down Expand Up @@ -167,11 +173,16 @@ public static List<String> getWritetimeNames(IPropertyHelper propertyHelper) {
return CqlTable.unFormatNames(propertyHelper.getStringList(KnownProperties.ORIGIN_WRITETIME_NAMES));
}

public static Long getCustomWritetime(IPropertyHelper propertyHelper) {
protected static Long getCustomWritetime(IPropertyHelper propertyHelper) {
Long cwt = propertyHelper.getLong(KnownProperties.TRANSFORM_CUSTOM_WRITETIME);
return null==cwt ? 0L : cwt;
}

protected static Long getCustomTTL(IPropertyHelper propertyHelper) {
Long cttl = propertyHelper.getLong(KnownProperties.TRANSFORM_CUSTOM_TTL);
return null == cttl ? 0L : cttl;
}

public static Long getMinFilter(IPropertyHelper propertyHelper) {
return propertyHelper.getLong(KnownProperties.FILTER_WRITETS_MIN);
}
Expand All @@ -181,11 +192,15 @@ public static Long getMaxFilter(IPropertyHelper propertyHelper) {
}

public Long getCustomWritetime() { return customWritetime; }
public Long getCustomTTL() { return customTTL; }
public boolean hasWriteTimestampFilter() { return isEnabled && hasWriteTimestampFilter; }
public Long getMinWriteTimeStampFilter() { return (this.hasWriteTimestampFilter && null!=this.filterMin) ? this.filterMin : Long.MIN_VALUE; }
public Long getMaxWriteTimeStampFilter() { return (this.hasWriteTimestampFilter && null!=this.filterMax) ? this.filterMax : Long.MAX_VALUE; }

public boolean hasTTLColumns() { return null!=this.ttlSelectColumnIndexes && !this.ttlSelectColumnIndexes.isEmpty(); }
public boolean hasTTLColumns() {
return customTTL > 0 || null != this.ttlSelectColumnIndexes && !this.ttlSelectColumnIndexes.isEmpty();
}

public boolean hasWritetimeColumns() { return customWritetime>0 || null!=this.writetimeSelectColumnIndexes && !this.writetimeSelectColumnIndexes.isEmpty(); }

public Long getLargestWriteTimeStamp(Row row) {
Expand All @@ -200,7 +215,8 @@ public Long getLargestWriteTimeStamp(Row row) {
}

public Integer getLargestTTL(Row row) {
if (logDebug) logger.debug("getLargestTTL: ttlSelectColumnIndexes={}", ttlSelectColumnIndexes);
if (logDebug) logger.debug("getLargestTTL: customTTL={}, ttlSelectColumnIndexes={}", customTTL, ttlSelectColumnIndexes);
if (this.customTTL > 0) return this.customTTL.intValue();
if (null==this.ttlSelectColumnIndexes || this.ttlSelectColumnIndexes.isEmpty()) return null;
OptionalInt max = this.ttlSelectColumnIndexes.stream()
.mapToInt(row::getInt)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ public enum PropertyType {
public static final String TRANSFORM_REPLACE_MISSING_TS = "spark.cdm.transform.missing.key.ts.replace.value";
public static final String TRANSFORM_CUSTOM_WRITETIME = "spark.cdm.transform.custom.writetime";
public static final String TRANSFORM_CUSTOM_WRITETIME_INCREMENT = "spark.cdm.transform.custom.writetime.incrementBy";
public static final String TRANSFORM_CUSTOM_TTL = "spark.cdm.transform.custom.ttl";
public static final String TRANSFORM_CODECS = "spark.cdm.transform.codecs";
public static final String TRANSFORM_CODECS_TIMESTAMP_STRING_FORMAT = "spark.cdm.transform.codecs.timestamp.string.format";
public static final String TRANSFORM_CODECS_TIMESTAMP_STRING_FORMAT_ZONE = "spark.cdm.transform.codecs.timestamp.string.zone";
Expand All @@ -170,6 +171,8 @@ public enum PropertyType {
types.put(TRANSFORM_REPLACE_MISSING_TS, PropertyType.NUMBER);
types.put(TRANSFORM_CUSTOM_WRITETIME, PropertyType.NUMBER);
defaults.put(TRANSFORM_CUSTOM_WRITETIME, "0");
types.put(TRANSFORM_CUSTOM_TTL, PropertyType.NUMBER);
defaults.put(TRANSFORM_CUSTOM_TTL, "0");
types.put(TRANSFORM_CUSTOM_WRITETIME_INCREMENT, PropertyType.NUMBER);
defaults.put(TRANSFORM_CUSTOM_WRITETIME_INCREMENT, "0");
types.put(TRANSFORM_CODECS, PropertyType.STRING_LIST);
Expand Down
10 changes: 10 additions & 0 deletions src/resources/cdm-detailed.properties
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ spark.cdm.connect.target.password cassandra
# all origin columns that can have TTL set (which excludes partition key,
# clustering key, collections and UDTs). When false, and .names is not set, the target
# record will have the TTL determined by the target table configuration.
#
# *** Note spark.cdm.transform.custom.ttl overrides this setting ***
#
# .names : Default is empty, meaning they will be determined automatically if that is set
# (see above). Specify a subset of eligible columns that are used to calculate
# the TTL of the target record.
Expand Down Expand Up @@ -247,6 +250,12 @@ spark.cdm.perfops.ratelimit.target 20000
# and subsequent UPSERTs would add duplicates to the list. Future versions
# of CDM may tombstone the previous list, but for now this solution is
# viable and, crucially, more performant.
# .ttl Default is 0 (no expiry). Time-to-live value in seconds to use as the
# TTL for the target record. This is useful when the TTL of the record in
# Origin cannot be determined (such as the only non-key columns are
# collections) OR is a new TTL needs to be set during migration. This
# parameter allows a crude constant value to be used in its place, and
# overrides .schema.origin.column.ttl.names
# .codecs Default is empty. A comma-separated list of additional codecs to
# enable. Current codecs are:
# INT_STRING : int stored in a String
Expand All @@ -273,6 +282,7 @@ spark.cdm.perfops.ratelimit.target 20000
#spark.cdm.transform.missing.key.ts.replace.value 1685577600000
#spark.cdm.transform.custom.writetime 0
#spark.cdm.transform.custom.writetime.incrementBy 0
#spark.cdm.transform.custom.ttl 0
#spark.cdm.transform.codecs
#spark.cdm.transform.codecs.timestamp.string.format yyyyMMddHHmmss
#spark.cdm.transform.codecs.timestamp.string.zone UTC
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public class TTLAndWritetimeTest extends CommonMocks {

WritetimeTTL feature;
Long customWritetime = 123456789L;
Long customTTL = 1000L;
Long filterMin = 100000000L;
Long filterMax = 200000000L;
String writetimeColumnName = "writetime_col";
Expand Down Expand Up @@ -69,6 +70,7 @@ private void setTestWhens(){
String argument = invocation.getArgument(0);
return originValueColumns.contains(argument);
});
when(propertyHelper.getLong(KnownProperties.TRANSFORM_CUSTOM_TTL)).thenReturn(customTTL);
}


Expand All @@ -83,6 +85,7 @@ public void smoke_loadProperties() {
assertAll(
() -> assertTrue(feature.isEnabled(), "enabled"),
() -> assertEquals(customWritetime, feature.getCustomWritetime(), "customWritetime"),
() -> assertEquals(customTTL, feature.getCustomTTL(), "customTTL"),
() -> assertTrue(feature.hasWriteTimestampFilter(), "hasWriteTimestampFilter"),
() -> assertTrue(feature.hasWritetimeColumns(), "hasWritetimeColumns with custom writetime"),
() -> assertEquals(customWritetime, feature.getCustomWritetime(), "customWritetime is set"),
Expand Down Expand Up @@ -113,6 +116,7 @@ public void smokeTest_disabledFeature() {
when(propertyHelper.getLong(KnownProperties.FILTER_WRITETS_MAX)).thenReturn(null);
when(propertyHelper.getStringList(KnownProperties.ORIGIN_TTL_NAMES)).thenReturn(null);
when(propertyHelper.getLong(KnownProperties.TRANSFORM_CUSTOM_WRITETIME)).thenReturn(null);
when(propertyHelper.getLong(KnownProperties.TRANSFORM_CUSTOM_TTL)).thenReturn(null);
when(propertyHelper.getStringList(KnownProperties.ORIGIN_WRITETIME_NAMES)).thenReturn(null);
when(propertyHelper.getBoolean(KnownProperties.ORIGIN_WRITETIME_AUTO)).thenReturn(Boolean.FALSE);
when(propertyHelper.getBoolean(KnownProperties.ORIGIN_TTL_AUTO)).thenReturn(Boolean.FALSE);
Expand All @@ -135,6 +139,7 @@ public void smokeTest_enabledFeature_withOnlyWritetimeAuto() {
when(propertyHelper.getLong(KnownProperties.FILTER_WRITETS_MAX)).thenReturn(null);
when(propertyHelper.getStringList(KnownProperties.ORIGIN_TTL_NAMES)).thenReturn(null);
when(propertyHelper.getLong(KnownProperties.TRANSFORM_CUSTOM_WRITETIME)).thenReturn(null);
when(propertyHelper.getLong(KnownProperties.TRANSFORM_CUSTOM_TTL)).thenReturn(null);
when(propertyHelper.getStringList(KnownProperties.ORIGIN_WRITETIME_NAMES)).thenReturn(null);
when(propertyHelper.getBoolean(KnownProperties.ORIGIN_WRITETIME_AUTO)).thenReturn(Boolean.TRUE);
when(propertyHelper.getBoolean(KnownProperties.ORIGIN_TTL_AUTO)).thenReturn(Boolean.FALSE);
Expand Down Expand Up @@ -174,6 +179,7 @@ public void smokeTest_enabledFeature_withOnlyTTLAuto() {
@Test
public void smoke_writetimeWithoutTTL() {
when(propertyHelper.getLong(KnownProperties.TRANSFORM_CUSTOM_WRITETIME)).thenReturn(0L);
when(propertyHelper.getLong(KnownProperties.TRANSFORM_CUSTOM_TTL)).thenReturn(null);
when(propertyHelper.getStringList(KnownProperties.ORIGIN_TTL_NAMES)).thenReturn(null);
when(propertyHelper.getLong(KnownProperties.FILTER_WRITETS_MIN)).thenReturn(filterMin);
when(propertyHelper.getLong(KnownProperties.FILTER_WRITETS_MAX)).thenReturn(filterMax);
Expand Down Expand Up @@ -225,6 +231,7 @@ public void smoke_ttlWithoutWritetime_withCustomWritetime() {
@Test
public void smoke_autoWritetime_noCustomWritetime() {
when(propertyHelper.getLong(KnownProperties.TRANSFORM_CUSTOM_WRITETIME)).thenReturn(0L);
when(propertyHelper.getLong(KnownProperties.TRANSFORM_CUSTOM_TTL)).thenReturn(null);
when(propertyHelper.getStringList(KnownProperties.ORIGIN_WRITETIME_NAMES)).thenReturn(null);
when(propertyHelper.getBoolean(KnownProperties.ORIGIN_WRITETIME_AUTO)).thenReturn(true);
when(propertyHelper.getLong(KnownProperties.FILTER_WRITETS_MIN)).thenReturn(null);
Expand All @@ -244,6 +251,7 @@ public void smoke_autoWritetime_noCustomWritetime() {
@Test
public void smoke_autoWritetime_CustomWritetime() {
when(propertyHelper.getLong(KnownProperties.TRANSFORM_CUSTOM_WRITETIME)).thenReturn(100L);
when(propertyHelper.getLong(KnownProperties.TRANSFORM_CUSTOM_TTL)).thenReturn(null);
when(propertyHelper.getStringList(KnownProperties.ORIGIN_WRITETIME_NAMES)).thenReturn(null);
when(propertyHelper.getBoolean(KnownProperties.ORIGIN_WRITETIME_AUTO)).thenReturn(true);
when(propertyHelper.getLong(KnownProperties.FILTER_WRITETS_MIN)).thenReturn(null);
Expand Down Expand Up @@ -373,6 +381,7 @@ public void getLargestWriteTimeStampWithCustomTimeTest() {

@Test
public void getLargestTTLTest() {
when(propertyHelper.getLong(KnownProperties.TRANSFORM_CUSTOM_TTL)).thenReturn(null);
when(originTable.indexOf("TTL("+ttlColumnName+")")).thenReturn(100);
when(originRow.getInt(eq(100))).thenReturn(30);
when(originTable.indexOf("TTL("+writetimeTTLColumnName+")")).thenReturn(101);
Expand Down

0 comments on commit 4073b87

Please sign in to comment.