Skip to content

Commit

Permalink
first stab at snapshotting
Browse files Browse the repository at this point in the history
  • Loading branch information
msillence committed Jun 18, 2024
1 parent 014c11c commit e4be885
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,24 @@
*/
package io.debezium.connector.db2as400;

import java.util.Optional;

import io.debezium.jdbc.MainConnectionProvidingConnectionFactory;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.notification.NotificationService;
import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotChangeEventSource;
import io.debezium.pipeline.source.snapshot.incremental.SignalBasedIncrementalSnapshotChangeEventSource;
import io.debezium.pipeline.source.spi.ChangeEventSourceFactory;
import io.debezium.pipeline.source.spi.DataChangeEventListener;
import io.debezium.pipeline.source.spi.SnapshotChangeEventSource;
import io.debezium.pipeline.source.spi.SnapshotProgressListener;
import io.debezium.pipeline.source.spi.StreamingChangeEventSource;
import io.debezium.relational.TableId;
import io.debezium.snapshot.SnapshotterService;
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.util.Clock;
import io.debezium.util.Strings;

public class As400ChangeEventSourceFactory implements ChangeEventSourceFactory<As400Partition, As400OffsetContext> {

Expand Down Expand Up @@ -59,4 +66,27 @@ public StreamingChangeEventSource<As400Partition, As400OffsetContext> getStreami
return new As400StreamingChangeEventSource(configuration, rpcConnection, jdbcConnectionFactory.mainConnection(),
dispatcher, errorHandler, clock, schema);
}

@Override
public Optional<IncrementalSnapshotChangeEventSource<As400Partition, ? extends DataCollectionId>> getIncrementalSnapshotChangeEventSource(As400OffsetContext offsetContext,
SnapshotProgressListener<As400Partition> snapshotProgressListener,
DataChangeEventListener<As400Partition> dataChangeEventListener,
NotificationService<As400Partition, As400OffsetContext> notificationService) {
// If no data collection id is provided, don't return an instance as the implementation requires
// that a signal data collection id be provided to work.
if (Strings.isNullOrEmpty(configuration.getSignalingDataCollectionId())) {
return Optional.empty();
}
final SignalBasedIncrementalSnapshotChangeEventSource<As400Partition, TableId> incrementalSnapshotChangeEventSource = new SignalBasedIncrementalSnapshotChangeEventSource<>(
configuration,
jdbcConnectionFactory.mainConnection(),
dispatcher,
schema,
clock,
snapshotProgressListener,
dataChangeEventListener,
notificationService);
return Optional.of(incrementalSnapshotChangeEventSource);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@
import io.debezium.connector.SnapshotRecord;
import io.debezium.ibmi.db2.journal.retrieve.JournalProcessedPosition;
import io.debezium.ibmi.db2.journal.retrieve.JournalReceiver;
import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotContext;
import io.debezium.pipeline.source.snapshot.incremental.SignalBasedIncrementalSnapshotContext;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.txmetadata.TransactionContext;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.TableId;
import io.debezium.spi.schema.DataCollectionId;

public class As400OffsetContext implements OffsetContext {
Expand Down Expand Up @@ -50,6 +53,7 @@ public class As400OffsetContext implements OffsetContext {
private final String inclueTables;
private boolean hasNewTables = false;
private volatile boolean snapshotComplete = false;
private final IncrementalSnapshotContext<TableId> incrementalSnapshotContext = new SignalBasedIncrementalSnapshotContext<>();

public As400OffsetContext(As400ConnectorConfig connectorConfig) {
super();
Expand All @@ -70,7 +74,7 @@ public As400OffsetContext(As400ConnectorConfig connectorConfig, JournalProcessed
}

public As400OffsetContext(As400ConnectorConfig connectorConfig, JournalProcessedPosition position, String includeTables,
boolean snapshotComplete) {
boolean snapshotComplete, IncrementalSnapshotContext<TableId> incrementalSnapshotContext) {
super();
partition = Collections.singletonMap(SERVER_PARTITION_KEY, connectorConfig.getLogicalName());
this.position = position;
Expand Down Expand Up @@ -209,7 +213,7 @@ public As400OffsetContext load(Map<String, ?> map) {
Instant time = (TimeStr == null) ? Instant.ofEpochSecond(0) : Instant.ofEpochSecond(Long.parseLong(TimeStr));
position = new JournalProcessedPosition(offset, new JournalReceiver(receiver, receiverLibrary), time, processed);
}
return new As400OffsetContext(connectorConfig, position, inclueTables, snapshotComplete);
return new As400OffsetContext(connectorConfig, position, inclueTables, snapshotComplete, SignalBasedIncrementalSnapshotContext.load(map));
}
}

Expand All @@ -230,4 +234,9 @@ public String toString() {
public void markSnapshotRecord(SnapshotRecord record) {
sourceInfo.setSnapshot(record);
}

@Override
public IncrementalSnapshotContext<?> getIncrementalSnapshotContext() {
return incrementalSnapshotContext;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import io.debezium.DebeziumException;
import io.debezium.connector.db2as400.As400RpcConnection.BlockingReceiverConsumer;
import io.debezium.connector.db2as400.As400RpcConnection.RpcException;
import io.debezium.data.Envelope.Operation;
import io.debezium.ibmi.db2.journal.retrieve.JournalEntryType;
import io.debezium.ibmi.db2.journal.retrieve.JournalProcessedPosition;
Expand Down Expand Up @@ -336,4 +337,16 @@ private BlockingReceiverConsumer processJournalEntries(As400Partition partition,
private boolean ignore(JournalEntryType journalCode) {
return journalCode == JournalEntryType.OPEN || journalCode == JournalEntryType.CLOSE;
}

@Override
public As400OffsetContext getOffsetContext() {
// TODO should this be the last process position?
try {
return new As400OffsetContext(connectorConfig, new JournalProcessedPosition(dataConnection.getCurrentPosition(), null, true));
}
catch (RpcException e) {
log.error("failed to retrieve journal position", e);
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public boolean retrieveJournal(JournalProcessedPosition previousPosition, final
this.header = new FirstHeader(0, 0, 0, OffsetStatus.NOT_CALLED,
new JournalProcessedPosition(range.end(), Instant.EPOCH, true));

log.debug("start equals end - range {}", range);
log.trace("start equals end - range {}", range);
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ public AS400JDBCConnectionForcedCcsid() {
@Override
public ConvTable getConverter(int ccsid) throws SQLException {
if (this.fromCcsid != null && fromCcsid.intValue() == ccsid && toCcsid != null) {
log.fine(() -> String.format("requested ccsid %d using replacement ccsid %d\n\t%s", ccsid, toCcsid,
getStack()));
return super.getConverter(this.toCcsid);
}
log.fine(() -> String.format("requested ccsid %d using parent converter\n\t%s", ccsid, getStack()));
Expand Down

0 comments on commit e4be885

Please sign in to comment.