From 2d7d2df26d1faa0eeea78f07cdd273028b22de45 Mon Sep 17 00:00:00 2001 From: Pravin Bhat Date: Tue, 10 Sep 2024 22:06:12 -0400 Subject: [PATCH] Upgraded DiffData to skip corrected-diff on rerun with previous-run-id (#303) * Upgraded DiffData to skip corrected-diff on rerun with previous-run-id * Updated SIT tests to include error count in logs * Fixed regression tests. * Fixed many more integration tests --- RELEASE.md | 4 ++ .../01_constant_column/cdm.fixData.assert | 1 + .../cdm.validateData.assert | 1 + .../02_explode_map/cdm.fixData.assert | 1 + .../02_explode_map/cdm.validateData.assert | 1 + SIT/features/03_codec/cdm.fixData.assert | 1 + SIT/features/03_codec/cdm.validateData.assert | 1 + SIT/features/04_udt_mapper/cdm.fixData.assert | 1 + .../04_udt_mapper/cdm.validateData.assert | 1 + SIT/features/05_guardrail/cdm.fixData.assert | 1 + .../05_guardrail/cdm.validateData.assert | 1 + .../cdm.fixData.assert | 1 + .../cdm.validateData.assert | 1 + .../cdm.fixData.assert | 1 + .../cdm.validateData.assert | 1 + .../cdm.fixData.assert | 1 + .../cdm.validateData.assert | 1 + .../cdm.fixData.assert | 1 + .../cdm.validateData.assert | 1 + .../03_performance/cdm.fixData.assert | 1 + .../03_performance/cdm.validateData.assert | 1 + SIT/smoke/03_ttl_writetime/cdm.fixData.assert | 1 + .../03_ttl_writetime/cdm.validateData.assert | 1 + SIT/smoke/04_counters/cdm.fixForce.assert | 1 + SIT/smoke/04_counters/cdm.validateData.assert | 1 + .../TargetUpsertRunDetailsStatement.java | 2 +- .../com/datastax/cdm/feature/TrackRun.java | 2 +- .../com/datastax/cdm/job/DiffJobSession.java | 16 +++++++- .../TargetUpsertRunDetailsStatementTest.java | 1 - .../datastax/cdm/feature/TrackRunTest.java | 41 +++++++++++++++++-- 30 files changed, 82 insertions(+), 8 deletions(-) diff --git a/RELEASE.md b/RELEASE.md index ec7f7229..c5e5a287 100644 --- a/RELEASE.md +++ b/RELEASE.md @@ -1,4 +1,8 @@ # Release Notes +## [4.3.9] - 2024-09-11 +- Added new `status` value of `DIFF_CORRECTED` on `cdm_run_details` table to specifically mark partitions that were corrected during the CDM validation run. +- Upgraded Validation job skip partitions with `DIFF_CORRECTED` status on rerun with a previous `runId`. + ## [4.3.8] - 2024-09-09 - Upgraded `spark.cdm.trackRun` feature to include `status` on `cdm_run_info` table. Also improved the code to handle rerun of previous run which may have exited before being correctly initialized. diff --git a/SIT/features/01_constant_column/cdm.fixData.assert b/SIT/features/01_constant_column/cdm.fixData.assert index 54f7bb62..83b41b9f 100644 --- a/SIT/features/01_constant_column/cdm.fixData.assert +++ b/SIT/features/01_constant_column/cdm.fixData.assert @@ -5,3 +5,4 @@ Missing Record Count: 1 Corrected Missing Record Count: 1 Valid Record Count: 1 Skipped Record Count: 0 +Error Record Count: 0 diff --git a/SIT/features/01_constant_column/cdm.validateData.assert b/SIT/features/01_constant_column/cdm.validateData.assert index fd0755fb..3d4942f2 100644 --- a/SIT/features/01_constant_column/cdm.validateData.assert +++ b/SIT/features/01_constant_column/cdm.validateData.assert @@ -5,3 +5,4 @@ Missing Record Count: 0 Corrected Missing Record Count: 0 Valid Record Count: 3 Skipped Record Count: 0 +Error Record Count: 0 diff --git a/SIT/features/02_explode_map/cdm.fixData.assert b/SIT/features/02_explode_map/cdm.fixData.assert index 28918f77..c4a3fe56 100644 --- a/SIT/features/02_explode_map/cdm.fixData.assert +++ b/SIT/features/02_explode_map/cdm.fixData.assert @@ -5,3 +5,4 @@ Missing Record Count: 5 Corrected Missing Record Count: 5 Valid Record Count: 5 Skipped Record Count: 0 +Error Record Count: 0 diff --git a/SIT/features/02_explode_map/cdm.validateData.assert b/SIT/features/02_explode_map/cdm.validateData.assert index 89bd3b88..4b3baede 100644 --- a/SIT/features/02_explode_map/cdm.validateData.assert +++ b/SIT/features/02_explode_map/cdm.validateData.assert @@ -5,3 +5,4 @@ Missing Record Count: 0 Corrected Missing Record Count: 0 Valid Record Count: 12 Skipped Record Count: 0 +Error Record Count: 0 diff --git a/SIT/features/03_codec/cdm.fixData.assert b/SIT/features/03_codec/cdm.fixData.assert index 54f7bb62..83b41b9f 100644 --- a/SIT/features/03_codec/cdm.fixData.assert +++ b/SIT/features/03_codec/cdm.fixData.assert @@ -5,3 +5,4 @@ Missing Record Count: 1 Corrected Missing Record Count: 1 Valid Record Count: 1 Skipped Record Count: 0 +Error Record Count: 0 diff --git a/SIT/features/03_codec/cdm.validateData.assert b/SIT/features/03_codec/cdm.validateData.assert index fd0755fb..3d4942f2 100644 --- a/SIT/features/03_codec/cdm.validateData.assert +++ b/SIT/features/03_codec/cdm.validateData.assert @@ -5,3 +5,4 @@ Missing Record Count: 0 Corrected Missing Record Count: 0 Valid Record Count: 3 Skipped Record Count: 0 +Error Record Count: 0 diff --git a/SIT/features/04_udt_mapper/cdm.fixData.assert b/SIT/features/04_udt_mapper/cdm.fixData.assert index 54f7bb62..83b41b9f 100644 --- a/SIT/features/04_udt_mapper/cdm.fixData.assert +++ b/SIT/features/04_udt_mapper/cdm.fixData.assert @@ -5,3 +5,4 @@ Missing Record Count: 1 Corrected Missing Record Count: 1 Valid Record Count: 1 Skipped Record Count: 0 +Error Record Count: 0 diff --git a/SIT/features/04_udt_mapper/cdm.validateData.assert b/SIT/features/04_udt_mapper/cdm.validateData.assert index fd0755fb..3d4942f2 100644 --- a/SIT/features/04_udt_mapper/cdm.validateData.assert +++ b/SIT/features/04_udt_mapper/cdm.validateData.assert @@ -5,3 +5,4 @@ Missing Record Count: 0 Corrected Missing Record Count: 0 Valid Record Count: 3 Skipped Record Count: 0 +Error Record Count: 0 diff --git a/SIT/features/05_guardrail/cdm.fixData.assert b/SIT/features/05_guardrail/cdm.fixData.assert index 1fffd38a..c5f87534 100644 --- a/SIT/features/05_guardrail/cdm.fixData.assert +++ b/SIT/features/05_guardrail/cdm.fixData.assert @@ -5,3 +5,4 @@ Missing Record Count: 0 Corrected Missing Record Count: 0 Valid Record Count: 10 Skipped Record Count: 6 +Error Record Count: 0 diff --git a/SIT/features/05_guardrail/cdm.validateData.assert b/SIT/features/05_guardrail/cdm.validateData.assert index 1fffd38a..c5f87534 100644 --- a/SIT/features/05_guardrail/cdm.validateData.assert +++ b/SIT/features/05_guardrail/cdm.validateData.assert @@ -5,3 +5,4 @@ Missing Record Count: 0 Corrected Missing Record Count: 0 Valid Record Count: 10 Skipped Record Count: 6 +Error Record Count: 0 diff --git a/SIT/features/06_constant_column_remove/cdm.fixData.assert b/SIT/features/06_constant_column_remove/cdm.fixData.assert index 54f7bb62..83b41b9f 100644 --- a/SIT/features/06_constant_column_remove/cdm.fixData.assert +++ b/SIT/features/06_constant_column_remove/cdm.fixData.assert @@ -5,3 +5,4 @@ Missing Record Count: 1 Corrected Missing Record Count: 1 Valid Record Count: 1 Skipped Record Count: 0 +Error Record Count: 0 diff --git a/SIT/features/06_constant_column_remove/cdm.validateData.assert b/SIT/features/06_constant_column_remove/cdm.validateData.assert index fd0755fb..3d4942f2 100644 --- a/SIT/features/06_constant_column_remove/cdm.validateData.assert +++ b/SIT/features/06_constant_column_remove/cdm.validateData.assert @@ -5,3 +5,4 @@ Missing Record Count: 0 Corrected Missing Record Count: 0 Valid Record Count: 3 Skipped Record Count: 0 +Error Record Count: 0 diff --git a/SIT/features/07_constant_column_replace/cdm.fixData.assert b/SIT/features/07_constant_column_replace/cdm.fixData.assert index 54f7bb62..83b41b9f 100644 --- a/SIT/features/07_constant_column_replace/cdm.fixData.assert +++ b/SIT/features/07_constant_column_replace/cdm.fixData.assert @@ -5,3 +5,4 @@ Missing Record Count: 1 Corrected Missing Record Count: 1 Valid Record Count: 1 Skipped Record Count: 0 +Error Record Count: 0 diff --git a/SIT/features/07_constant_column_replace/cdm.validateData.assert b/SIT/features/07_constant_column_replace/cdm.validateData.assert index fd0755fb..3d4942f2 100644 --- a/SIT/features/07_constant_column_replace/cdm.validateData.assert +++ b/SIT/features/07_constant_column_replace/cdm.validateData.assert @@ -5,3 +5,4 @@ Missing Record Count: 0 Corrected Missing Record Count: 0 Valid Record Count: 3 Skipped Record Count: 0 +Error Record Count: 0 diff --git a/SIT/regression/01_explode_map_with_constants/cdm.fixData.assert b/SIT/regression/01_explode_map_with_constants/cdm.fixData.assert index 28918f77..c4a3fe56 100644 --- a/SIT/regression/01_explode_map_with_constants/cdm.fixData.assert +++ b/SIT/regression/01_explode_map_with_constants/cdm.fixData.assert @@ -5,3 +5,4 @@ Missing Record Count: 5 Corrected Missing Record Count: 5 Valid Record Count: 5 Skipped Record Count: 0 +Error Record Count: 0 diff --git a/SIT/regression/01_explode_map_with_constants/cdm.validateData.assert b/SIT/regression/01_explode_map_with_constants/cdm.validateData.assert index 89bd3b88..4b3baede 100644 --- a/SIT/regression/01_explode_map_with_constants/cdm.validateData.assert +++ b/SIT/regression/01_explode_map_with_constants/cdm.validateData.assert @@ -5,3 +5,4 @@ Missing Record Count: 0 Corrected Missing Record Count: 0 Valid Record Count: 12 Skipped Record Count: 0 +Error Record Count: 0 diff --git a/SIT/regression/02_ColumnRenameWithConstantsAndExplode/cdm.fixData.assert b/SIT/regression/02_ColumnRenameWithConstantsAndExplode/cdm.fixData.assert index cdbc2fb2..4b5d5a1d 100644 --- a/SIT/regression/02_ColumnRenameWithConstantsAndExplode/cdm.fixData.assert +++ b/SIT/regression/02_ColumnRenameWithConstantsAndExplode/cdm.fixData.assert @@ -5,3 +5,4 @@ Missing Record Count: 5 Corrected Missing Record Count: 5 Valid Record Count: 6 Skipped Record Count: 0 +Error Record Count: 0 diff --git a/SIT/regression/02_ColumnRenameWithConstantsAndExplode/cdm.validateData.assert b/SIT/regression/02_ColumnRenameWithConstantsAndExplode/cdm.validateData.assert index 89bd3b88..4b3baede 100644 --- a/SIT/regression/02_ColumnRenameWithConstantsAndExplode/cdm.validateData.assert +++ b/SIT/regression/02_ColumnRenameWithConstantsAndExplode/cdm.validateData.assert @@ -5,3 +5,4 @@ Missing Record Count: 0 Corrected Missing Record Count: 0 Valid Record Count: 12 Skipped Record Count: 0 +Error Record Count: 0 diff --git a/SIT/regression/03_performance/cdm.fixData.assert b/SIT/regression/03_performance/cdm.fixData.assert index 5e69630e..ac7d4c95 100644 --- a/SIT/regression/03_performance/cdm.fixData.assert +++ b/SIT/regression/03_performance/cdm.fixData.assert @@ -5,3 +5,4 @@ Missing Record Count: 100 Corrected Missing Record Count: 100 Valid Record Count: 3800 Skipped Record Count: 0 +Error Record Count: 0 diff --git a/SIT/regression/03_performance/cdm.validateData.assert b/SIT/regression/03_performance/cdm.validateData.assert index 54920d67..c274e28e 100644 --- a/SIT/regression/03_performance/cdm.validateData.assert +++ b/SIT/regression/03_performance/cdm.validateData.assert @@ -5,3 +5,4 @@ Missing Record Count: 0 Corrected Missing Record Count: 0 Valid Record Count: 4000 Skipped Record Count: 0 +Error Record Count: 0 diff --git a/SIT/smoke/03_ttl_writetime/cdm.fixData.assert b/SIT/smoke/03_ttl_writetime/cdm.fixData.assert index 1a497fca..2ff4df13 100644 --- a/SIT/smoke/03_ttl_writetime/cdm.fixData.assert +++ b/SIT/smoke/03_ttl_writetime/cdm.fixData.assert @@ -5,3 +5,4 @@ Missing Record Count: 1 Corrected Missing Record Count: 1 Valid Record Count: 3 Skipped Record Count: 0 +Error Record Count: 0 diff --git a/SIT/smoke/03_ttl_writetime/cdm.validateData.assert b/SIT/smoke/03_ttl_writetime/cdm.validateData.assert index ffe2ea15..d19538ba 100644 --- a/SIT/smoke/03_ttl_writetime/cdm.validateData.assert +++ b/SIT/smoke/03_ttl_writetime/cdm.validateData.assert @@ -5,3 +5,4 @@ Missing Record Count: 0 Corrected Missing Record Count: 0 Valid Record Count: 8 Skipped Record Count: 0 +Error Record Count: 0 diff --git a/SIT/smoke/04_counters/cdm.fixForce.assert b/SIT/smoke/04_counters/cdm.fixForce.assert index 48ace724..032c1e2f 100644 --- a/SIT/smoke/04_counters/cdm.fixForce.assert +++ b/SIT/smoke/04_counters/cdm.fixForce.assert @@ -5,3 +5,4 @@ Missing Record Count: 1 Corrected Missing Record Count: 1 Valid Record Count: 6 Skipped Record Count: 0 +Error Record Count: 0 diff --git a/SIT/smoke/04_counters/cdm.validateData.assert b/SIT/smoke/04_counters/cdm.validateData.assert index ed458566..0abf94e9 100644 --- a/SIT/smoke/04_counters/cdm.validateData.assert +++ b/SIT/smoke/04_counters/cdm.validateData.assert @@ -5,3 +5,4 @@ Missing Record Count: 0 Corrected Missing Record Count: 0 Valid Record Count: 7 Skipped Record Count: 0 +Error Record Count: 0 diff --git a/src/main/java/com/datastax/cdm/cql/statement/TargetUpsertRunDetailsStatement.java b/src/main/java/com/datastax/cdm/cql/statement/TargetUpsertRunDetailsStatement.java index 5bb57ff0..79d29fe4 100644 --- a/src/main/java/com/datastax/cdm/cql/statement/TargetUpsertRunDetailsStatement.java +++ b/src/main/java/com/datastax/cdm/cql/statement/TargetUpsertRunDetailsStatement.java @@ -62,7 +62,7 @@ public TargetUpsertRunDetailsStatement(CqlSession session, String keyspaceTable) this.session.execute("create table if not exists " + cdmKsTabInfo + " (table_name text, run_id bigint, run_type text, prev_run_id bigint, start_time timestamp, end_time timestamp, run_info text, status text, primary key (table_name, run_id))"); - + // TODO: Remove this code block after a few releases, its only added for backward compatibility try { this.session.execute("alter table " + cdmKsTabInfo + " add status text"); diff --git a/src/main/java/com/datastax/cdm/feature/TrackRun.java b/src/main/java/com/datastax/cdm/feature/TrackRun.java index a94dfc03..a75cd0a2 100644 --- a/src/main/java/com/datastax/cdm/feature/TrackRun.java +++ b/src/main/java/com/datastax/cdm/feature/TrackRun.java @@ -32,7 +32,7 @@ public enum RUN_TYPE { } public enum RUN_STATUS { - NOT_STARTED, STARTED, PASS, FAIL, DIFF, ENDED + NOT_STARTED, STARTED, PASS, FAIL, DIFF, DIFF_CORRECTED, ENDED } public Logger logger = LoggerFactory.getLogger(this.getClass().getName()); diff --git a/src/main/java/com/datastax/cdm/job/DiffJobSession.java b/src/main/java/com/datastax/cdm/job/DiffJobSession.java index 9669fedb..c01b97af 100644 --- a/src/main/java/com/datastax/cdm/job/DiffJobSession.java +++ b/src/main/java/com/datastax/cdm/job/DiffJobSession.java @@ -73,7 +73,7 @@ public DiffJobSession(CqlSession originSession, CqlSession targetSession, SparkC this.jobCounter.setRegisteredTypes(JobCounter.CounterType.READ, JobCounter.CounterType.VALID, JobCounter.CounterType.MISMATCH, JobCounter.CounterType.CORRECTED_MISMATCH, JobCounter.CounterType.MISSING, JobCounter.CounterType.CORRECTED_MISSING, - JobCounter.CounterType.SKIPPED); + JobCounter.CounterType.SKIPPED, JobCounter.CounterType.ERROR); autoCorrectMissing = propertyHelper.getBoolean(KnownProperties.AUTOCORRECT_MISSING); logger.info("PARAM -- Autocorrect Missing: {}", autoCorrectMissing); @@ -192,11 +192,23 @@ private void getDataAndDiff(BigInteger min, BigInteger max) { } if (hasDiff.get() && null != trackRunFeature) { - trackRunFeature.updateCdmRun(min, TrackRun.RUN_STATUS.DIFF); + if (jobCounter.getCount(JobCounter.CounterType.MISSING) == jobCounter + .getCount(JobCounter.CounterType.CORRECTED_MISSING) + && jobCounter.getCount(JobCounter.CounterType.MISMATCH) == jobCounter + .getCount(JobCounter.CounterType.CORRECTED_MISMATCH)) { + trackRunFeature.updateCdmRun(min, TrackRun.RUN_STATUS.DIFF_CORRECTED); + } else { + trackRunFeature.updateCdmRun(min, TrackRun.RUN_STATUS.DIFF); + } } else if (null != trackRunFeature) { trackRunFeature.updateCdmRun(min, TrackRun.RUN_STATUS.PASS); } } catch (Exception e) { + jobCounter.threadIncrement(JobCounter.CounterType.ERROR, + jobCounter.getCount(JobCounter.CounterType.READ) - jobCounter.getCount(JobCounter.CounterType.VALID) + - jobCounter.getCount(JobCounter.CounterType.MISSING) + - jobCounter.getCount(JobCounter.CounterType.MISMATCH) + - jobCounter.getCount(JobCounter.CounterType.SKIPPED)); logger.error("Error with PartitionRange -- ThreadID: {} Processing min: {} max: {}", Thread.currentThread().getId(), min, max, e); if (null != trackRunFeature) diff --git a/src/test/java/com/datastax/cdm/cql/statement/TargetUpsertRunDetailsStatementTest.java b/src/test/java/com/datastax/cdm/cql/statement/TargetUpsertRunDetailsStatementTest.java index a3418ecf..abaf4eec 100644 --- a/src/test/java/com/datastax/cdm/cql/statement/TargetUpsertRunDetailsStatementTest.java +++ b/src/test/java/com/datastax/cdm/cql/statement/TargetUpsertRunDetailsStatementTest.java @@ -62,7 +62,6 @@ public void setup() { when(preparedStatement.bind(any())).thenReturn(bStatement); when(cqlSession.execute(bStatement)).thenReturn(rs); when(rs.all()).thenReturn(List.of(row)); - } @Test diff --git a/src/test/java/com/datastax/cdm/feature/TrackRunTest.java b/src/test/java/com/datastax/cdm/feature/TrackRunTest.java index ac597d7a..bc154a14 100644 --- a/src/test/java/com/datastax/cdm/feature/TrackRunTest.java +++ b/src/test/java/com/datastax/cdm/feature/TrackRunTest.java @@ -16,18 +16,53 @@ package com.datastax.cdm.feature; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.when; +import java.util.Collection; + +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.mockito.Mock; + +import com.datastax.cdm.cql.CommonMocks; +import com.datastax.cdm.job.RunNotStartedException; +import com.datastax.cdm.job.SplitPartitions; +import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.cql.BoundStatement; + +class TrackRunTest extends CommonMocks { + + @Mock + CqlSession cqlSession; + + @Mock + BoundStatement bStatement; -class TrackRunTest { + @BeforeEach + public void setup() { + // UPDATE is needed by counters, though the class should handle non-counter updates + commonSetup(false, false, true); + when(cqlSession.prepare(anyString())).thenReturn(preparedStatement); + when(preparedStatement.bind(any())).thenReturn(bStatement); + } @Test - void test() { + void countTypesAndStatus() { assertEquals("MIGRATE", TrackRun.RUN_TYPE.MIGRATE.name()); assertEquals("DIFF_DATA", TrackRun.RUN_TYPE.DIFF_DATA.name()); assertEquals(2, TrackRun.RUN_TYPE.values().length); - assertEquals(6, TrackRun.RUN_STATUS.values().length); + assertEquals(7, TrackRun.RUN_STATUS.values().length); + } + + @Test + void init() throws RunNotStartedException { + TrackRun trackRun = new TrackRun(cqlSession, "keyspace.table"); + Collection parts = trackRun.getPendingPartitions(0); + + assertEquals(0, parts.size()); } }