Skip to content

Commit

Permalink
Upgraded DiffData to skip corrected-diff on rerun with previous-run-id (
Browse files Browse the repository at this point in the history
#303)

* Upgraded DiffData to skip corrected-diff on rerun with previous-run-id
* Updated SIT tests to include error count in logs
* Fixed regression tests.
* Fixed many more integration tests
  • Loading branch information
pravinbhat authored Sep 11, 2024
1 parent 6962a83 commit 2d7d2df
Show file tree
Hide file tree
Showing 30 changed files with 82 additions and 8 deletions.
4 changes: 4 additions & 0 deletions RELEASE.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
# Release Notes
## [4.3.9] - 2024-09-11
- Added new `status` value of `DIFF_CORRECTED` on `cdm_run_details` table to specifically mark partitions that were corrected during the CDM validation run.
- Upgraded Validation job skip partitions with `DIFF_CORRECTED` status on rerun with a previous `runId`.

## [4.3.8] - 2024-09-09
- Upgraded `spark.cdm.trackRun` feature to include `status` on `cdm_run_info` table. Also improved the code to handle rerun of previous run which may have exited before being correctly initialized.

Expand Down
1 change: 1 addition & 0 deletions SIT/features/01_constant_column/cdm.fixData.assert
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ Missing Record Count: 1
Corrected Missing Record Count: 1
Valid Record Count: 1
Skipped Record Count: 0
Error Record Count: 0
1 change: 1 addition & 0 deletions SIT/features/01_constant_column/cdm.validateData.assert
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ Missing Record Count: 0
Corrected Missing Record Count: 0
Valid Record Count: 3
Skipped Record Count: 0
Error Record Count: 0
1 change: 1 addition & 0 deletions SIT/features/02_explode_map/cdm.fixData.assert
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ Missing Record Count: 5
Corrected Missing Record Count: 5
Valid Record Count: 5
Skipped Record Count: 0
Error Record Count: 0
1 change: 1 addition & 0 deletions SIT/features/02_explode_map/cdm.validateData.assert
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ Missing Record Count: 0
Corrected Missing Record Count: 0
Valid Record Count: 12
Skipped Record Count: 0
Error Record Count: 0
1 change: 1 addition & 0 deletions SIT/features/03_codec/cdm.fixData.assert
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ Missing Record Count: 1
Corrected Missing Record Count: 1
Valid Record Count: 1
Skipped Record Count: 0
Error Record Count: 0
1 change: 1 addition & 0 deletions SIT/features/03_codec/cdm.validateData.assert
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ Missing Record Count: 0
Corrected Missing Record Count: 0
Valid Record Count: 3
Skipped Record Count: 0
Error Record Count: 0
1 change: 1 addition & 0 deletions SIT/features/04_udt_mapper/cdm.fixData.assert
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ Missing Record Count: 1
Corrected Missing Record Count: 1
Valid Record Count: 1
Skipped Record Count: 0
Error Record Count: 0
1 change: 1 addition & 0 deletions SIT/features/04_udt_mapper/cdm.validateData.assert
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ Missing Record Count: 0
Corrected Missing Record Count: 0
Valid Record Count: 3
Skipped Record Count: 0
Error Record Count: 0
1 change: 1 addition & 0 deletions SIT/features/05_guardrail/cdm.fixData.assert
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ Missing Record Count: 0
Corrected Missing Record Count: 0
Valid Record Count: 10
Skipped Record Count: 6
Error Record Count: 0
1 change: 1 addition & 0 deletions SIT/features/05_guardrail/cdm.validateData.assert
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ Missing Record Count: 0
Corrected Missing Record Count: 0
Valid Record Count: 10
Skipped Record Count: 6
Error Record Count: 0
1 change: 1 addition & 0 deletions SIT/features/06_constant_column_remove/cdm.fixData.assert
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ Missing Record Count: 1
Corrected Missing Record Count: 1
Valid Record Count: 1
Skipped Record Count: 0
Error Record Count: 0
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ Missing Record Count: 0
Corrected Missing Record Count: 0
Valid Record Count: 3
Skipped Record Count: 0
Error Record Count: 0
1 change: 1 addition & 0 deletions SIT/features/07_constant_column_replace/cdm.fixData.assert
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ Missing Record Count: 1
Corrected Missing Record Count: 1
Valid Record Count: 1
Skipped Record Count: 0
Error Record Count: 0
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ Missing Record Count: 0
Corrected Missing Record Count: 0
Valid Record Count: 3
Skipped Record Count: 0
Error Record Count: 0
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ Missing Record Count: 5
Corrected Missing Record Count: 5
Valid Record Count: 5
Skipped Record Count: 0
Error Record Count: 0
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ Missing Record Count: 0
Corrected Missing Record Count: 0
Valid Record Count: 12
Skipped Record Count: 0
Error Record Count: 0
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ Missing Record Count: 5
Corrected Missing Record Count: 5
Valid Record Count: 6
Skipped Record Count: 0
Error Record Count: 0
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ Missing Record Count: 0
Corrected Missing Record Count: 0
Valid Record Count: 12
Skipped Record Count: 0
Error Record Count: 0
1 change: 1 addition & 0 deletions SIT/regression/03_performance/cdm.fixData.assert
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ Missing Record Count: 100
Corrected Missing Record Count: 100
Valid Record Count: 3800
Skipped Record Count: 0
Error Record Count: 0
1 change: 1 addition & 0 deletions SIT/regression/03_performance/cdm.validateData.assert
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ Missing Record Count: 0
Corrected Missing Record Count: 0
Valid Record Count: 4000
Skipped Record Count: 0
Error Record Count: 0
1 change: 1 addition & 0 deletions SIT/smoke/03_ttl_writetime/cdm.fixData.assert
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ Missing Record Count: 1
Corrected Missing Record Count: 1
Valid Record Count: 3
Skipped Record Count: 0
Error Record Count: 0
1 change: 1 addition & 0 deletions SIT/smoke/03_ttl_writetime/cdm.validateData.assert
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ Missing Record Count: 0
Corrected Missing Record Count: 0
Valid Record Count: 8
Skipped Record Count: 0
Error Record Count: 0
1 change: 1 addition & 0 deletions SIT/smoke/04_counters/cdm.fixForce.assert
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ Missing Record Count: 1
Corrected Missing Record Count: 1
Valid Record Count: 6
Skipped Record Count: 0
Error Record Count: 0
1 change: 1 addition & 0 deletions SIT/smoke/04_counters/cdm.validateData.assert
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ Missing Record Count: 0
Corrected Missing Record Count: 0
Valid Record Count: 7
Skipped Record Count: 0
Error Record Count: 0
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public TargetUpsertRunDetailsStatement(CqlSession session, String keyspaceTable)

this.session.execute("create table if not exists " + cdmKsTabInfo
+ " (table_name text, run_id bigint, run_type text, prev_run_id bigint, start_time timestamp, end_time timestamp, run_info text, status text, primary key (table_name, run_id))");

// TODO: Remove this code block after a few releases, its only added for backward compatibility
try {
this.session.execute("alter table " + cdmKsTabInfo + " add status text");
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/datastax/cdm/feature/TrackRun.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public enum RUN_TYPE {
}

public enum RUN_STATUS {
NOT_STARTED, STARTED, PASS, FAIL, DIFF, ENDED
NOT_STARTED, STARTED, PASS, FAIL, DIFF, DIFF_CORRECTED, ENDED
}

public Logger logger = LoggerFactory.getLogger(this.getClass().getName());
Expand Down
16 changes: 14 additions & 2 deletions src/main/java/com/datastax/cdm/job/DiffJobSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public DiffJobSession(CqlSession originSession, CqlSession targetSession, SparkC
this.jobCounter.setRegisteredTypes(JobCounter.CounterType.READ, JobCounter.CounterType.VALID,
JobCounter.CounterType.MISMATCH, JobCounter.CounterType.CORRECTED_MISMATCH,
JobCounter.CounterType.MISSING, JobCounter.CounterType.CORRECTED_MISSING,
JobCounter.CounterType.SKIPPED);
JobCounter.CounterType.SKIPPED, JobCounter.CounterType.ERROR);

autoCorrectMissing = propertyHelper.getBoolean(KnownProperties.AUTOCORRECT_MISSING);
logger.info("PARAM -- Autocorrect Missing: {}", autoCorrectMissing);
Expand Down Expand Up @@ -192,11 +192,23 @@ private void getDataAndDiff(BigInteger min, BigInteger max) {
}

if (hasDiff.get() && null != trackRunFeature) {
trackRunFeature.updateCdmRun(min, TrackRun.RUN_STATUS.DIFF);
if (jobCounter.getCount(JobCounter.CounterType.MISSING) == jobCounter
.getCount(JobCounter.CounterType.CORRECTED_MISSING)
&& jobCounter.getCount(JobCounter.CounterType.MISMATCH) == jobCounter
.getCount(JobCounter.CounterType.CORRECTED_MISMATCH)) {
trackRunFeature.updateCdmRun(min, TrackRun.RUN_STATUS.DIFF_CORRECTED);
} else {
trackRunFeature.updateCdmRun(min, TrackRun.RUN_STATUS.DIFF);
}
} else if (null != trackRunFeature) {
trackRunFeature.updateCdmRun(min, TrackRun.RUN_STATUS.PASS);
}
} catch (Exception e) {
jobCounter.threadIncrement(JobCounter.CounterType.ERROR,
jobCounter.getCount(JobCounter.CounterType.READ) - jobCounter.getCount(JobCounter.CounterType.VALID)
- jobCounter.getCount(JobCounter.CounterType.MISSING)
- jobCounter.getCount(JobCounter.CounterType.MISMATCH)
- jobCounter.getCount(JobCounter.CounterType.SKIPPED));
logger.error("Error with PartitionRange -- ThreadID: {} Processing min: {} max: {}",
Thread.currentThread().getId(), min, max, e);
if (null != trackRunFeature)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ public void setup() {
when(preparedStatement.bind(any())).thenReturn(bStatement);
when(cqlSession.execute(bStatement)).thenReturn(rs);
when(rs.all()).thenReturn(List.of(row));

}

@Test
Expand Down
41 changes: 38 additions & 3 deletions src/test/java/com/datastax/cdm/feature/TrackRunTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,53 @@
package com.datastax.cdm.feature;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.when;

import java.util.Collection;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mock;

import com.datastax.cdm.cql.CommonMocks;
import com.datastax.cdm.job.RunNotStartedException;
import com.datastax.cdm.job.SplitPartitions;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.BoundStatement;

class TrackRunTest extends CommonMocks {

@Mock
CqlSession cqlSession;

@Mock
BoundStatement bStatement;

class TrackRunTest {
@BeforeEach
public void setup() {
// UPDATE is needed by counters, though the class should handle non-counter updates
commonSetup(false, false, true);
when(cqlSession.prepare(anyString())).thenReturn(preparedStatement);
when(preparedStatement.bind(any())).thenReturn(bStatement);
}

@Test
void test() {
void countTypesAndStatus() {
assertEquals("MIGRATE", TrackRun.RUN_TYPE.MIGRATE.name());
assertEquals("DIFF_DATA", TrackRun.RUN_TYPE.DIFF_DATA.name());

assertEquals(2, TrackRun.RUN_TYPE.values().length);
assertEquals(6, TrackRun.RUN_STATUS.values().length);
assertEquals(7, TrackRun.RUN_STATUS.values().length);
}

@Test
void init() throws RunNotStartedException {
TrackRun trackRun = new TrackRun(cqlSession, "keyspace.table");
Collection<SplitPartitions.Partition> parts = trackRun.getPendingPartitions(0);

assertEquals(0, parts.size());
}

}

0 comments on commit 2d7d2df

Please sign in to comment.