Skip to content

Commit

Permalink
Feature/track run (#302)
Browse files Browse the repository at this point in the history
* Upgraded trackRun feature to include status on Info table & also fixed rerun of uninitialized jobs

* Added release notes

* Added trace log
  • Loading branch information
pravinbhat authored Sep 9, 2024
1 parent 453a12e commit 9187f1b
Show file tree
Hide file tree
Showing 9 changed files with 193 additions and 24 deletions.
3 changes: 3 additions & 0 deletions RELEASE.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
# Release Notes
## [4.3.8] - 2024-09-09
- Upgraded `spark.cdm.trackRun` feature to include `status` on `cdm_run_info` table. Also improved the code to handle rerun of previous run which may have exited before being correctly initialized.

## [4.3.7] - 2024-09-03
- Added property `spark.cdm.transform.custom.ttl` to allow a custom constant value to be set for TTL instead of using the values from `origin` rows.
- Repo wide code formating & imports organization
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,18 @@
import java.util.ArrayList;
import java.util.Collection;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.datastax.cdm.feature.TrackRun;
import com.datastax.cdm.feature.TrackRun.RUN_TYPE;
import com.datastax.cdm.job.RunNotStartedException;
import com.datastax.cdm.job.SplitPartitions;
import com.datastax.cdm.job.SplitPartitions.Partition;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.Row;

public class TargetUpsertRunDetailsStatement {
private CqlSession session;
Expand All @@ -36,45 +41,71 @@ public class TargetUpsertRunDetailsStatement {
private long prevRunId;
private BoundStatement boundInitInfoStatement;
private BoundStatement boundInitStatement;
private BoundStatement boundUpdateInfoStatement;
private BoundStatement boundEndInfoStatement;
private BoundStatement boundUpdateStatement;
private BoundStatement boundUpdateStartStatement;
private BoundStatement boundSelectInfoStatement;
private BoundStatement boundSelectStatement;

public Logger logger = LoggerFactory.getLogger(this.getClass().getName());

public TargetUpsertRunDetailsStatement(CqlSession session, String keyspaceTable) {
this.session = session;
String[] ksTab = keyspaceTable.split("\\.");
if (ksTab.length != 2) {
throw new RuntimeException("Invalid keyspace.table format: " + keyspaceTable);
}
this.keyspaceName = ksTab[0];
this.tableName = ksTab[1];
String cdmKsTabInfo = this.keyspaceName + ".cdm_run_info";
String cdmKsTabDetails = this.keyspaceName + ".cdm_run_details";

this.session.execute("create table if not exists " + cdmKsTabInfo
+ " (table_name text, run_id bigint, run_type text, prev_run_id bigint, start_time timestamp, end_time timestamp, run_info text, primary key (table_name, run_id))");
this.session.execute("create table if not exists " + cdmKsTabDetails
+ " (table_name text, run_id bigint, start_time timestamp, token_min bigint, token_max bigint, status text, primary key ((table_name, run_id), token_min))");
+ " (table_name text, run_id bigint, run_type text, prev_run_id bigint, start_time timestamp, end_time timestamp, run_info text, status text, primary key (table_name, run_id))");

// TODO: Remove this code block after a few releases, its only added for backward compatibility
try {
this.session.execute("alter table " + cdmKsTabInfo + " add status text");
} catch (Exception e) {
// ignore if column already exists
logger.trace("Column 'status' already exists in table {}", cdmKsTabInfo);
}

boundInitInfoStatement = bindStatement("INSERT INTO " + cdmKsTabInfo
+ " (table_name, run_id, run_type, prev_run_id, start_time) VALUES (?, ?, ?, ?, dateof(now()))");
+ " (table_name, run_id, run_type, prev_run_id, start_time, status) VALUES (?, ?, ?, ?, dateof(now()), ?)");
boundInitStatement = bindStatement("INSERT INTO " + cdmKsTabDetails
+ " (table_name, run_id, token_min, token_max, status) VALUES (?, ?, ?, ?, ?)");
boundUpdateInfoStatement = bindStatement("UPDATE " + cdmKsTabInfo
+ " SET end_time = dateof(now()), run_info = ? WHERE table_name = ? AND run_id = ?");
boundEndInfoStatement = bindStatement("UPDATE " + cdmKsTabInfo
+ " SET end_time = dateof(now()), run_info = ?, status = ? WHERE table_name = ? AND run_id = ?");
boundUpdateStatement = bindStatement(
"UPDATE " + cdmKsTabDetails + " SET status = ? WHERE table_name = ? AND run_id = ? AND token_min = ?");
boundUpdateStartStatement = bindStatement("UPDATE " + cdmKsTabDetails
+ " SET start_time = dateof(now()), status = ? WHERE table_name = ? AND run_id = ? AND token_min = ?");
boundSelectInfoStatement = bindStatement(
"SELECT status FROM " + cdmKsTabInfo + " WHERE table_name = ? AND run_id = ?");
boundSelectStatement = bindStatement("SELECT token_min, token_max FROM " + cdmKsTabDetails
+ " WHERE table_name = ? AND run_id = ? and status in ('NOT_STARTED', 'STARTED', 'FAIL', 'DIFF') ALLOW FILTERING");
}

public Collection<SplitPartitions.Partition> getPendingPartitions(long prevRunId) {
public Collection<SplitPartitions.Partition> getPendingPartitions(long prevRunId) throws RunNotStartedException {
this.prevRunId = prevRunId;
final Collection<SplitPartitions.Partition> pendingParts = new ArrayList<SplitPartitions.Partition>();
if (prevRunId == 0) {
return new ArrayList<SplitPartitions.Partition>();
return pendingParts;
}

ResultSet rsInfo = session
.execute(boundSelectInfoStatement.setString("table_name", tableName).setLong("run_id", prevRunId));
Row cdmRunStatus = rsInfo.one();
if (cdmRunStatus == null) {
return pendingParts;
} else {
String status = cdmRunStatus.getString("status");
if (TrackRun.RUN_STATUS.NOT_STARTED.toString().equals(status)) {
throw new RunNotStartedException("Run not started for run_id: " + prevRunId);
}
}

final Collection<SplitPartitions.Partition> pendingParts = new ArrayList<SplitPartitions.Partition>();
ResultSet rs = session
.execute(boundSelectStatement.setString("table_name", tableName).setLong("run_id", prevRunId));
rs.forEach(row -> {
Expand All @@ -89,8 +120,12 @@ public Collection<SplitPartitions.Partition> getPendingPartitions(long prevRunId
public long initCdmRun(Collection<SplitPartitions.Partition> parts, RUN_TYPE runType) {
runId = System.currentTimeMillis();
session.execute(boundInitInfoStatement.setString("table_name", tableName).setLong("run_id", runId)
.setString("run_type", runType.toString()).setLong("prev_run_id", prevRunId));
.setString("run_type", runType.toString()).setLong("prev_run_id", prevRunId)
.setString("status", TrackRun.RUN_STATUS.NOT_STARTED.toString()));
parts.forEach(part -> initCdmRun(part));
session.execute(boundInitInfoStatement.setString("table_name", tableName).setLong("run_id", runId)
.setString("run_type", runType.toString()).setLong("prev_run_id", prevRunId)
.setString("status", TrackRun.RUN_STATUS.STARTED.toString()));
return runId;
}

Expand All @@ -101,9 +136,9 @@ private void initCdmRun(Partition partition) {
.setString("status", TrackRun.RUN_STATUS.NOT_STARTED.toString()));
}

public void updateCdmRunInfo(String runInfo) {
session.execute(boundUpdateInfoStatement.setString("table_name", tableName).setLong("run_id", runId)
.setString("run_info", runInfo));
public void endCdmRun(String runInfo) {
session.execute(boundEndInfoStatement.setString("table_name", tableName).setLong("run_id", runId)
.setString("run_info", runInfo).setString("status", TrackRun.RUN_STATUS.ENDED.toString()));
}

public void updateCdmRun(BigInteger min, TrackRun.RUN_STATUS status) {
Expand Down
14 changes: 10 additions & 4 deletions src/main/java/com/datastax/cdm/data/DataUtility.java
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,16 @@ public static String getMyClassMethodLine(Exception e) {
break;
}
}
String className = targetStackTraceElement.getClassName();
String methodName = targetStackTraceElement.getMethodName();
int lineNumber = targetStackTraceElement.getLineNumber();
if (null == targetStackTraceElement && null != stackTraceElements && stackTraceElements.length > 0) {
targetStackTraceElement = stackTraceElements[0];
}
if (null != targetStackTraceElement) {
String className = targetStackTraceElement.getClassName();
String methodName = targetStackTraceElement.getMethodName();
int lineNumber = targetStackTraceElement.getLineNumber();
return className + "." + methodName + ":" + lineNumber;
}

return className + "." + methodName + ":" + lineNumber;
return "Unknown";
}
}
7 changes: 4 additions & 3 deletions src/main/java/com/datastax/cdm/feature/TrackRun.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.slf4j.LoggerFactory;

import com.datastax.cdm.cql.statement.TargetUpsertRunDetailsStatement;
import com.datastax.cdm.job.RunNotStartedException;
import com.datastax.cdm.job.SplitPartitions;
import com.datastax.oss.driver.api.core.CqlSession;

Expand All @@ -31,7 +32,7 @@ public enum RUN_TYPE {
}

public enum RUN_STATUS {
NOT_STARTED, STARTED, PASS, FAIL, DIFF
NOT_STARTED, STARTED, PASS, FAIL, DIFF, ENDED
}

public Logger logger = LoggerFactory.getLogger(this.getClass().getName());
Expand All @@ -41,7 +42,7 @@ public TrackRun(CqlSession session, String keyspaceTable) {
this.runStatement = new TargetUpsertRunDetailsStatement(session, keyspaceTable);
}

public Collection<SplitPartitions.Partition> getPendingPartitions(long prevRunId) {
public Collection<SplitPartitions.Partition> getPendingPartitions(long prevRunId) throws RunNotStartedException {
Collection<SplitPartitions.Partition> pendingParts = runStatement.getPendingPartitions(prevRunId);
logger.info("###################### {} partitions pending from previous run id {} ######################",
pendingParts.size(), prevRunId);
Expand All @@ -60,6 +61,6 @@ public void updateCdmRun(BigInteger min, RUN_STATUS status) {
}

public void endCdmRun(String runInfo) {
runStatement.updateCdmRunInfo(runInfo);
runStatement.endCdmRun(runInfo);
}
}
26 changes: 26 additions & 0 deletions src/main/java/com/datastax/cdm/job/RunNotStartedException.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright DataStax, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datastax.cdm.job;

public class RunNotStartedException extends Exception {

private static final long serialVersionUID = -4108800389847708120L;

public RunNotStartedException(String message) {
super(message);
}

}
6 changes: 5 additions & 1 deletion src/main/scala/com/datastax/cdm/job/BasePartitionJob.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,11 @@ abstract class BasePartitionJob extends BaseJob[SplitPartitions.Partition] {
}

if (prevRunId != 0) {
trackRunFeature.getPendingPartitions(prevRunId)
try {
trackRunFeature.getPendingPartitions(prevRunId)
} catch {
case e: RunNotStartedException => SplitPartitions.getRandomSubPartitions(pieces, minPartition, maxPartition, coveragePercent)
}
} else {
SplitPartitions.getRandomSubPartitions(pieces, minPartition, maxPartition, coveragePercent)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright DataStax, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datastax.cdm.cql.statement;

import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.Mockito.*;

import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mock;

import com.datastax.cdm.cql.CommonMocks;
import com.datastax.cdm.job.RunNotStartedException;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.core.type.DataTypes;

public class TargetUpsertRunDetailsStatementTest extends CommonMocks {
@Mock
PreparedStatement preparedStatement;

@Mock
CqlSession cqlSession;

@Mock
ResultSet rs;

@Mock
Row row;

@Mock
BoundStatement bStatement;

TargetUpsertRunDetailsStatement targetUpsertRunDetailsStatement;

@BeforeEach
public void setup() {
// UPDATE is needed by counters, though the class should handle non-counter updates
commonSetup(false, false, true);
when(cqlSession.prepare(anyString())).thenReturn(preparedStatement);
when(preparedStatement.bind(any())).thenReturn(bStatement);
when(cqlSession.execute(bStatement)).thenReturn(rs);
when(rs.all()).thenReturn(List.of(row));

}

@Test
public void init() throws RunNotStartedException {
targetUpsertRunDetailsStatement = new TargetUpsertRunDetailsStatement(cqlSession, "ks.table1");
assertEquals(Collections.emptyList(), targetUpsertRunDetailsStatement.getPendingPartitions(0));
}

@Test
public void incorrectKsTable() throws RunNotStartedException {
assertThrows(RuntimeException.class, () -> new TargetUpsertRunDetailsStatement(cqlSession, "table1"));
}

}
17 changes: 16 additions & 1 deletion src/test/java/com/datastax/cdm/data/DataUtilityTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -137,11 +137,26 @@ public void extractObjectsFromCollectionTest() {
}

@Test
public void getMyClassMethodLineTest() {
public void getMyClassMethodLineTestCDMClass() {
Exception ex = new Exception();
ex.setStackTrace(new StackTraceElement[] { new StackTraceElement("com.datastax.cdm.data.DataUtilityTest",
"getMyClassMethodLineTest", "DataUtilityTest.java", 0) });
assertEquals("com.datastax.cdm.data.DataUtilityTest.getMyClassMethodLineTest:0",
DataUtility.getMyClassMethodLine(ex));
}

@Test
public void getMyClassMethodLineTestOtherClass() {
Exception ex = new Exception();
ex.setStackTrace(new StackTraceElement[] { new StackTraceElement("com.datastax.other.SomeClass",
"getMyClassMethodLineTest", "SomeClass.java", 0) });
assertEquals("com.datastax.other.SomeClass.getMyClassMethodLineTest:0", DataUtility.getMyClassMethodLine(ex));
}

@Test
public void getMyClassMethodLineTestUnknown() {
Exception ex = new Exception();
ex.setStackTrace(new StackTraceElement[] {});
assertEquals("Unknown", DataUtility.getMyClassMethodLine(ex));
}
}
2 changes: 1 addition & 1 deletion src/test/java/com/datastax/cdm/feature/TrackRunTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ void test() {
assertEquals("DIFF_DATA", TrackRun.RUN_TYPE.DIFF_DATA.name());

assertEquals(2, TrackRun.RUN_TYPE.values().length);
assertEquals(5, TrackRun.RUN_STATUS.values().length);
assertEquals(6, TrackRun.RUN_STATUS.values().length);
}

}

0 comments on commit 9187f1b

Please sign in to comment.