Skip to content

Commit

Permalink
Rename timestamp field
Browse files Browse the repository at this point in the history
  • Loading branch information
torbsto committed Jun 21, 2024
1 parent d534c63 commit 8b0d7fc
Show file tree
Hide file tree
Showing 8 changed files with 13 additions and 15 deletions.
2 changes: 1 addition & 1 deletion error-handling-avro/src/main/avro/DeadLetter.avsc
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
"type": "string"
},
{
"name": "timestamp",
"name": "input_timestamp",
"type": [
"null",
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@

package com.bakdata.kafka;

import java.time.Instant;
import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
import org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier;

Expand All @@ -46,7 +45,7 @@ public DeadLetter convert(final DeadLetterDescription deadLetterDescription) {
.setTopic(deadLetterDescription.getTopic())
.setPartition(deadLetterDescription.getPartition())
.setOffset(deadLetterDescription.getOffset())
.setTimestamp(deadLetterDescription.getTimestamp())
.setInputTimestamp(deadLetterDescription.getInputTimestamp())
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,14 +126,14 @@ void shouldConvertAndSerializeAvroDeadLetter() {
deadLetter -> {
this.softly.assertThat(deadLetter.getInputValue()).hasValue("foo");
this.softly.assertThat(deadLetter.getOffset()).hasValue(0L);
this.softly.assertThat(deadLetter.getTimestamp()).hasValue(Instant.ofEpochMilli(100));
this.softly.assertThat(deadLetter.getInputTimestamp()).hasValue(Instant.ofEpochMilli(100));
}
);
this.softly.assertThat(errors).extracting(ProducerRecord::value).element(1).satisfies(
deadLetter -> {
this.softly.assertThat(deadLetter.getInputValue()).hasValue("bar");
this.softly.assertThat(deadLetter.getOffset()).hasValue(1L);
this.softly.assertThat(deadLetter.getTimestamp()).hasValue(Instant.ofEpochMilli(200));
this.softly.assertThat(deadLetter.getInputTimestamp()).hasValue(Instant.ofEpochMilli(200));
}
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,5 +56,5 @@ public static class Cause {
String topic;
Integer partition;
Long offset;
Instant timestamp;
Instant inputTimestamp;
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public void process(final FixedKeyRecord<K, ProcessingError<V>> inputRecord) {
.topic(metadata.map(RecordMetadata::topic).orElse(null))
.partition(metadata.map(RecordMetadata::partition).orElse(null))
.offset(metadata.map(RecordMetadata::offset).orElse(null))
.timestamp(Instant.ofEpochMilli(inputRecord.timestamp()))
.inputTimestamp(Instant.ofEpochMilli(inputRecord.timestamp()))
.build();

final FixedKeyRecord<K, T> record = inputRecord
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import com.google.protobuf.Int64Value;
import com.google.protobuf.StringValue;
import com.google.protobuf.Timestamp;
import java.time.Instant;
import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
import org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier;

Expand Down Expand Up @@ -68,12 +67,12 @@ public ProtoDeadLetter convert(final DeadLetterDescription deadLetterDescription
builder.setOffset(Int64Value.of(deadLetterDescription.getOffset()));
}

if (deadLetterDescription.getTimestamp() != null) {
if (deadLetterDescription.getInputTimestamp() != null) {
final Timestamp timestamp = Timestamp.newBuilder()
.setSeconds(deadLetterDescription.getTimestamp().getEpochSecond())
.setNanos(deadLetterDescription.getTimestamp().getNano())
.setSeconds(deadLetterDescription.getInputTimestamp().getEpochSecond())
.setNanos(deadLetterDescription.getInputTimestamp().getNano())
.build();
builder.setTimestamp(timestamp);
builder.setInputTimestamp(timestamp);
}

return builder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,5 @@ message ProtoDeadLetter {
google.protobuf.StringValue topic = 4;
google.protobuf.Int32Value partition = 5;
google.protobuf.Int64Value offset = 6;
google.protobuf.Timestamp timestamp = 7;
google.protobuf.Timestamp input_timestamp = 7;
}
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ void shouldConvertAndSerializeProtoDeadLetter() {
this.softly.assertThat(deadLetter.getInputValue()).extracting(StringValue::getValue)
.isEqualTo("foo");
this.softly.assertThat(deadLetter.getOffset()).extracting(Int64Value::getValue).isEqualTo(0L);
this.softly.assertThat(timestampToInstant(deadLetter.getTimestamp()))
this.softly.assertThat(timestampToInstant(deadLetter.getInputTimestamp()))
.isEqualTo(Instant.ofEpochMilli(100));
}
);
Expand All @@ -169,7 +169,7 @@ void shouldConvertAndSerializeProtoDeadLetter() {
this.softly.assertThat(deadLetter.getInputValue()).extracting(StringValue::getValue)
.isEqualTo("bar");
this.softly.assertThat(deadLetter.getOffset()).extracting(Int64Value::getValue).isEqualTo(1L);
this.softly.assertThat(timestampToInstant(deadLetter.getTimestamp()))
this.softly.assertThat(timestampToInstant(deadLetter.getInputTimestamp()))
.isEqualTo(Instant.ofEpochMilli(200));
}
);
Expand Down

0 comments on commit 8b0d7fc

Please sign in to comment.