Skip to content

Commit

Permalink
Merge pull request #289 from datastax/feature/transform-fields
Browse files Browse the repository at this point in the history
Feature/transform fields
  • Loading branch information
pravinbhat authored Aug 26, 2024
2 parents 49d7bb4 + d7a543b commit 27af3fe
Show file tree
Hide file tree
Showing 17 changed files with 792 additions and 312 deletions.
8 changes: 4 additions & 4 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ RUN mkdir -p /assets/ && cd /assets && \
curl -OL https://downloads.datastax.com/enterprise/cqlsh-astra.tar.gz && \
tar -xzf ./cqlsh-astra.tar.gz && \
rm ./cqlsh-astra.tar.gz && \
curl -OL https://archive.apache.org/dist/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3-scala2.13.tgz && \
tar -xzf ./spark-3.5.1-bin-hadoop3-scala2.13.tgz && \
rm ./spark-3.5.1-bin-hadoop3-scala2.13.tgz
curl -OL https://archive.apache.org/dist/spark/spark-3.5.2/spark-3.5.2-bin-hadoop3-scala2.13.tgz && \
tar -xzf ./spark-3.5.2-bin-hadoop3-scala2.13.tgz && \
rm ./spark-3.5.2-bin-hadoop3-scala2.13.tgz

RUN apt-get update && apt-get install -y openssh-server vim python3 --no-install-recommends && \
rm -rf /var/lib/apt/lists/* && \
Expand Down Expand Up @@ -44,7 +44,7 @@ RUN chmod +x ./get-latest-maven-version.sh && \
rm -rf "$USER_HOME_DIR/.m2"

# Add all migration tools to path
ENV PATH="${PATH}:/assets/dsbulk/bin/:/assets/cqlsh-astra/bin/:/assets/spark-3.5.1-bin-hadoop3-scala2.13/bin/"
ENV PATH="${PATH}:/assets/dsbulk/bin/:/assets/cqlsh-astra/bin/:/assets/spark-3.5.2-bin-hadoop3-scala2.13/bin/"

EXPOSE 22

Expand Down
9 changes: 5 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

Migrate and Validate Tables between Origin and Target Cassandra Clusters.

> :warning: Please note this job has been tested with spark version [3.5.1](https://archive.apache.org/dist/spark/spark-3.5.1/)
> :warning: Please note this job has been tested with spark version [3.5.2](https://archive.apache.org/dist/spark/spark-3.5.2/)
## Install as a Container
- Get the latest image that includes all dependencies from [DockerHub](https://hub.docker.com/r/datastax/cassandra-data-migrator)
Expand All @@ -18,10 +18,10 @@ Migrate and Validate Tables between Origin and Target Cassandra Clusters.

### Prerequisite
- Install **Java11** (minimum) as Spark binaries are compiled with it.
- Install Spark version [`3.5.1`](https://archive.apache.org/dist/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3-scala2.13.tgz) on a single VM (no cluster necessary) where you want to run this job. Spark can be installed by running the following: -
- Install Spark version [`3.5.2`](https://archive.apache.org/dist/spark/spark-3.5.2/spark-3.5.2-bin-hadoop3-scala2.13.tgz) on a single VM (no cluster necessary) where you want to run this job. Spark can be installed by running the following: -
```
wget https://archive.apache.org/dist/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3-scala2.13.tgz
tar -xvzf spark-3.5.1-bin-hadoop3-scala2.13.tgz
wget https://archive.apache.org/dist/spark/spark-3.5.2/spark-3.5.2-bin-hadoop3-scala2.13.tgz
tar -xvzf spark-3.5.2-bin-hadoop3-scala2.13.tgz
```

> :warning: If the above Spark and Scala version is not properly installed, you'll then see a similar exception like below when running the CDM jobs,
Expand Down Expand Up @@ -123,6 +123,7 @@ Note:
- Perform guardrail checks (identify large fields)
- Supports adding `constants` as new columns on `Target`
- Supports expanding `Map` columns on `Origin` into multiple records on `Target`
- Supports extracting value from a JSON column in `Origin` and map it to a specific field on `Target`
- Fully containerized (Docker and K8s friendly)
- SSL Support (including custom cipher algorithms)
- Migrate from any Cassandra `Origin` ([Apache Cassandra®](https://cassandra.apache.org) / [DataStax Enterprise™](https://www.datastax.com/products/datastax-enterprise) / [DataStax Astra DB™](https://www.datastax.com/products/datastax-astra)) to any Cassandra `Target` ([Apache Cassandra®](https://cassandra.apache.org) / [DataStax Enterprise™](https://www.datastax.com/products/datastax-enterprise) / [DataStax Astra DB™](https://www.datastax.com/products/datastax-astra))
Expand Down
4 changes: 4 additions & 0 deletions RELEASE.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
# Release Notes
## [4.3.5] - 2024-08-23
- Added feature `spark.cdm.feature.extractJson` which allows you to extract a json value from a column with json content in an Origin table and map it to a column in the Target table.
- Upgraded to use Spark `3.5.2`.

## [4.3.4] - 2024-07-31
- Use `spark.cdm.schema.origin.keyspaceTable` when `spark.cdm.schema.target.keyspaceTable` is missing. Fixes [bug introduced in prior version](https://github.com/datastax/cassandra-data-migrator/issues/284).

Expand Down
19 changes: 16 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>datastax.cdm</groupId>
Expand All @@ -10,9 +12,9 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<scala.version>2.13.14</scala.version>
<scala.main.version>2.13</scala.main.version>
<spark.version>3.5.1</spark.version>
<spark.version>3.5.2</spark.version>
<connector.version>3.5.1</connector.version>
<cassandra.version>5.0-beta1</cassandra.version>
<cassandra.version>5.0-rc1</cassandra.version>
<junit.version>5.9.1</junit.version>
<mockito.version>4.11.0</mockito.version>
<java-driver.version>4.18.1</java-driver.version>
Expand Down Expand Up @@ -102,6 +104,11 @@
<artifactId>java-driver-query-builder</artifactId>
<version>${java-driver.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.15.2</version>
</dependency>

<dependency>
<groupId>org.apache.logging.log4j</groupId>
Expand All @@ -123,6 +130,12 @@
<groupId>com.esri.geometry</groupId>
<artifactId>esri-geometry-api</artifactId>
<version>2.2.4</version>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</exclusion>
</exclusions>
</dependency>

<!-- Test Dependencies -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,19 @@
*/
package com.datastax.cdm.cql.statement;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.datastax.cdm.cql.EnhancedSession;
import com.datastax.cdm.properties.IPropertyHelper;
import com.datastax.cdm.properties.KnownProperties;
import com.datastax.cdm.properties.PropertyHelper;
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.time.Duration;

public class TargetInsertStatement extends TargetUpsertStatement {
public final Logger logger = LoggerFactory.getLogger(this.getClass().getName());
Expand Down Expand Up @@ -61,22 +62,27 @@ protected BoundStatement bind(Row originRow, Row targetRow, Integer ttl, Long wr
try {
if (targetIndex== explodeMapKeyIndex) {
bindValue = explodeMapKey;
}
else if (targetIndex== explodeMapValueIndex) {
} else if (targetIndex== explodeMapValueIndex) {
bindValue = explodeMapValue;
}
else {
} else if (targetIndex == extractJsonFeature.getTargetColumnIndex()) {
int originIndex = extractJsonFeature.getOriginColumnIndex();
bindValue = extractJsonFeature.extract(originRow.getString(originIndex));
} else {
int originIndex = cqlTable.getCorrespondingIndex(targetIndex);
if (originIndex < 0) // we don't have data to bind for this column; continue to the next targetIndex
continue;
bindValue = cqlTable.getOtherCqlTable().getAndConvertData(originIndex, originRow);
}

boundStatement = boundStatement.set(currentBindIndex++, bindValue, cqlTable.getBindClass(targetIndex));
}
catch (Exception e) {
logger.error("Error trying to bind value:" + bindValue + " of class:" +(null==bindValue?"unknown":bindValue.getClass().getName())+ " to column:" + targetColumnNames.get(targetIndex) + " of targetDataType:" + targetColumnTypes.get(targetIndex)+ "/" + cqlTable.getBindClass(targetIndex).getName() + " at column index:" + targetIndex + " and bind index: "+ (currentBindIndex-1) + " of statement:" + this.getCQL());
throw e;
} catch (Exception e) {
logger.error(
"Error trying to bind value: {} of class: {} to column: {} of targetDataType: {}/{} at column index: {} and bind index: {} of statement: {}",
bindValue, (null == bindValue ? "unknown" : bindValue.getClass().getName()),
targetColumnNames.get(targetIndex), targetColumnTypes.get(targetIndex),
cqlTable.getBindClass(targetIndex).getName(), targetIndex, (currentBindIndex - 1),
this.getCQL());
throw new RuntimeException("Error trying to bind value: ", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,11 @@ protected BoundStatement bind(Row originRow, Row targetRow, Integer ttl, Long wr
}
else if (targetIndex== explodeMapKeyIndex) {
bindValueTarget = explodeMapKey;
}
else if (targetIndex== explodeMapValueIndex) {
} else if (targetIndex== explodeMapValueIndex) {
bindValueTarget = explodeMapValue;
} else if (targetIndex == extractJsonFeature.getTargetColumnIndex()) {
originIndex = extractJsonFeature.getOriginColumnIndex();
bindValueTarget = extractJsonFeature.extract(originRow.getString(originIndex));
} else {
if (originIndex < 0)
// we don't have data to bind for this column; continue to the next targetIndex
Expand All @@ -89,7 +91,7 @@ else if (targetIndex== explodeMapValueIndex) {
logger.error("Error trying to bind value:" + bindValueTarget + " to column:" +
targetColumnNames.get(targetIndex) + " of targetDataType:" + targetColumnTypes.get(targetIndex) + "/"
+ cqlTable.getBindClass(targetIndex).getName() + " at column index:" + targetIndex);
throw e;
throw new RuntimeException("Error trying to bind value: ", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.datastax.cdm.data.Record;
import com.datastax.cdm.feature.ConstantColumns;
import com.datastax.cdm.feature.ExplodeMap;
import com.datastax.cdm.feature.ExtractJson;
import com.datastax.cdm.feature.Featureset;
import com.datastax.cdm.feature.WritetimeTTL;
import com.datastax.cdm.properties.IPropertyHelper;
Expand Down Expand Up @@ -53,6 +54,8 @@ public abstract class TargetUpsertStatement extends BaseCdmStatement {
protected int explodeMapValueIndex = -1;
private Boolean haveCheckedBindInputsOnce = false;

protected ExtractJson extractJsonFeature;

protected abstract String buildStatement();
protected abstract BoundStatement bind(Row originRow, Row targetRow, Integer ttl, Long writeTime, Object explodeMapKey, Object explodeMapValue);

Expand All @@ -61,6 +64,7 @@ public TargetUpsertStatement(IPropertyHelper propertyHelper, EnhancedSession ses

constantColumnFeature = (ConstantColumns) cqlTable.getFeature(Featureset.CONSTANT_COLUMNS);
explodeMapFeature = (ExplodeMap) cqlTable.getFeature(Featureset.EXPLODE_MAP);
extractJsonFeature = (ExtractJson) cqlTable.getFeature(Featureset.EXTRACT_JSON);

setTTLAndWriteTimeBooleans();
targetColumnNames.addAll(cqlTable.getColumnNames(true));
Expand Down
153 changes: 153 additions & 0 deletions src/main/java/com/datastax/cdm/feature/ExtractJson.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/*
* Copyright DataStax, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datastax.cdm.feature;

import java.util.Collections;
import java.util.List;
import java.util.Map;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.datastax.cdm.properties.IPropertyHelper;
import com.datastax.cdm.properties.KnownProperties;
import com.datastax.cdm.schema.CqlTable;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;

public class ExtractJson extends AbstractFeature {
public Logger logger = LoggerFactory.getLogger(this.getClass().getName());
private ObjectMapper mapper = new ObjectMapper();

private String originColumnName = "";
private String originJsonFieldName = "";
private Integer originColumnIndex = -1;

private String targetColumnName = "";
private Integer targetColumnIndex = -1;

@Override
public boolean loadProperties(IPropertyHelper helper) {
if (null == helper) {
throw new IllegalArgumentException("helper is null");
}

originColumnName = getColumnName(helper, KnownProperties.EXTRACT_JSON_ORIGIN_COLUMN_NAME);
targetColumnName = getColumnName(helper, KnownProperties.EXTRACT_JSON_TARGET_COLUMN_MAPPING);
// Convert columnToFieldMapping to targetColumnName and originJsonFieldName
if (!targetColumnName.isBlank()) {
String[] parts = targetColumnName.split("\\:");
if (parts.length == 2) {
originJsonFieldName = parts[0];
targetColumnName = parts[1];
} else {
originJsonFieldName = targetColumnName;
}
}

isValid = validateProperties();
isEnabled = isValid && !originColumnName.isEmpty() && !targetColumnName.isEmpty();
isLoaded = true;

return isLoaded && isValid;
}

@Override
protected boolean validateProperties() {
if (StringUtils.isBlank(originColumnName) && StringUtils.isBlank(targetColumnName))
return true;

if (StringUtils.isBlank(originColumnName)) {
logger.error("Origin column name is not set when Target ({}) is set", targetColumnName);
return false;
}

if (StringUtils.isBlank(targetColumnName)) {
logger.error("Target column name is not set when Origin ({}) is set", originColumnName);
return false;
}

return true;
}

@Override
public boolean initializeAndValidate(CqlTable originTable, CqlTable targetTable) {
if (null == originTable || null == targetTable) {
throw new IllegalArgumentException("Origin table and/or Target table is null");
}
if (!originTable.isOrigin()) {
throw new IllegalArgumentException(originTable.getKeyspaceTable() + " is not an origin table");
}
if (targetTable.isOrigin()) {
throw new IllegalArgumentException(targetTable.getKeyspaceTable() + " is not a target table");
}

if (!validateProperties()) {
isEnabled = false;
return false;
}
if (!isEnabled)
return true;

// Initialize Origin variables
List<Class> originBindClasses = originTable.extendColumns(Collections.singletonList(originColumnName));
if (null == originBindClasses || originBindClasses.size() != 1 || null == originBindClasses.get(0)) {
throw new IllegalArgumentException("Origin column " + originColumnName
+ " is not found on the origin table " + originTable.getKeyspaceTable());
} else {
this.originColumnIndex = originTable.indexOf(originColumnName);
}

// Initialize Target variables
List<Class> targetBindClasses = targetTable.extendColumns(Collections.singletonList(targetColumnName));
if (null == targetBindClasses || targetBindClasses.size() != 1 || null == targetBindClasses.get(0)) {
throw new IllegalArgumentException("Target column " + targetColumnName
+ " is not found on the target table " + targetTable.getKeyspaceTable());
} else {
this.targetColumnIndex = targetTable.indexOf(targetColumnName);
}

logger.info("Feature {} is {}", this.getClass().getSimpleName(), isEnabled ? "enabled" : "disabled");
return true;
}

public Object extract(String jsonString) throws JsonMappingException, JsonProcessingException {
if (StringUtils.isNotBlank(jsonString)) {
return mapper.readValue(jsonString, Map.class).get(originJsonFieldName);
}

return null;
}

public Integer getOriginColumnIndex() {
return isEnabled ? originColumnIndex : -1;
}

public Integer getTargetColumnIndex() {
return isEnabled ? targetColumnIndex : -1;
}

public String getTargetColumnName() {
return isEnabled ? targetColumnName : "";
}

private String getColumnName(IPropertyHelper helper, String colName) {
String columnName = CqlTable.unFormatName(helper.getString(colName));
return (null == columnName) ? "" : columnName;
}
}
1 change: 1 addition & 0 deletions src/main/java/com/datastax/cdm/feature/FeatureFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public static Feature getFeature(Featureset feature) {
case ORIGIN_FILTER: return new OriginFilterCondition();
case CONSTANT_COLUMNS: return new ConstantColumns();
case EXPLODE_MAP: return new ExplodeMap();
case EXTRACT_JSON: return new ExtractJson();
case WRITETIME_TTL: return new WritetimeTTL();
case GUARDRAIL_CHECK: return new Guardrail();
default:
Expand Down
1 change: 1 addition & 0 deletions src/main/java/com/datastax/cdm/feature/Featureset.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ public enum Featureset {
ORIGIN_FILTER,
CONSTANT_COLUMNS,
EXPLODE_MAP,
EXTRACT_JSON,
WRITETIME_TTL,
GUARDRAIL_CHECK,
TEST_UNIMPLEMENTED_FEATURE
Expand Down
Loading

0 comments on commit 27af3fe

Please sign in to comment.