diff --git a/error-handling-avro/src/main/avro/DeadLetter.avsc b/error-handling-avro/src/main/avro/DeadLetter.avsc index 790d313..7304756 100644 --- a/error-handling-avro/src/main/avro/DeadLetter.avsc +++ b/error-handling-avro/src/main/avro/DeadLetter.avsc @@ -39,7 +39,7 @@ "type": "string" }, { - "name": "timestamp", + "name": "input_timestamp", "type": [ "null", { diff --git a/error-handling-avro/src/main/java/com/bakdata/kafka/AvroDeadLetterConverter.java b/error-handling-avro/src/main/java/com/bakdata/kafka/AvroDeadLetterConverter.java index 24b4012..f916b65 100644 --- a/error-handling-avro/src/main/java/com/bakdata/kafka/AvroDeadLetterConverter.java +++ b/error-handling-avro/src/main/java/com/bakdata/kafka/AvroDeadLetterConverter.java @@ -24,7 +24,6 @@ package com.bakdata.kafka; -import java.time.Instant; import org.apache.kafka.streams.kstream.ValueTransformerSupplier; import org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier; @@ -46,7 +45,7 @@ public DeadLetter convert(final DeadLetterDescription deadLetterDescription) { .setTopic(deadLetterDescription.getTopic()) .setPartition(deadLetterDescription.getPartition()) .setOffset(deadLetterDescription.getOffset()) - .setTimestamp(deadLetterDescription.getTimestamp()) + .setInputTimestamp(deadLetterDescription.getInputTimestamp()) .build(); } diff --git a/error-handling-avro/src/test/java/com/bakdata/kafka/AvroDeadLetterProcessorTest.java b/error-handling-avro/src/test/java/com/bakdata/kafka/AvroDeadLetterProcessorTest.java index e554de9..56b8d45 100644 --- a/error-handling-avro/src/test/java/com/bakdata/kafka/AvroDeadLetterProcessorTest.java +++ b/error-handling-avro/src/test/java/com/bakdata/kafka/AvroDeadLetterProcessorTest.java @@ -126,14 +126,14 @@ void shouldConvertAndSerializeAvroDeadLetter() { deadLetter -> { this.softly.assertThat(deadLetter.getInputValue()).hasValue("foo"); this.softly.assertThat(deadLetter.getOffset()).hasValue(0L); - this.softly.assertThat(deadLetter.getTimestamp()).hasValue(Instant.ofEpochMilli(100)); + this.softly.assertThat(deadLetter.getInputTimestamp()).hasValue(Instant.ofEpochMilli(100)); } ); this.softly.assertThat(errors).extracting(ProducerRecord::value).element(1).satisfies( deadLetter -> { this.softly.assertThat(deadLetter.getInputValue()).hasValue("bar"); this.softly.assertThat(deadLetter.getOffset()).hasValue(1L); - this.softly.assertThat(deadLetter.getTimestamp()).hasValue(Instant.ofEpochMilli(200)); + this.softly.assertThat(deadLetter.getInputTimestamp()).hasValue(Instant.ofEpochMilli(200)); } ); diff --git a/error-handling-core/src/main/java/com/bakdata/kafka/DeadLetterDescription.java b/error-handling-core/src/main/java/com/bakdata/kafka/DeadLetterDescription.java index 68711e3..a32b293 100644 --- a/error-handling-core/src/main/java/com/bakdata/kafka/DeadLetterDescription.java +++ b/error-handling-core/src/main/java/com/bakdata/kafka/DeadLetterDescription.java @@ -56,5 +56,5 @@ public static class Cause { String topic; Integer partition; Long offset; - Instant timestamp; + Instant inputTimestamp; } diff --git a/error-handling-core/src/main/java/com/bakdata/kafka/DeadLetterProcessor.java b/error-handling-core/src/main/java/com/bakdata/kafka/DeadLetterProcessor.java index 88a151f..dfaf72a 100644 --- a/error-handling-core/src/main/java/com/bakdata/kafka/DeadLetterProcessor.java +++ b/error-handling-core/src/main/java/com/bakdata/kafka/DeadLetterProcessor.java @@ -100,7 +100,7 @@ public void process(final FixedKeyRecord> inputRecord) { .topic(metadata.map(RecordMetadata::topic).orElse(null)) .partition(metadata.map(RecordMetadata::partition).orElse(null)) .offset(metadata.map(RecordMetadata::offset).orElse(null)) - .timestamp(Instant.ofEpochMilli(inputRecord.timestamp())) + .inputTimestamp(Instant.ofEpochMilli(inputRecord.timestamp())) .build(); final FixedKeyRecord record = inputRecord diff --git a/error-handling-proto/src/main/java/com/bakdata/kafka/ProtoDeadLetterConverter.java b/error-handling-proto/src/main/java/com/bakdata/kafka/ProtoDeadLetterConverter.java index 0c89822..225a6b2 100644 --- a/error-handling-proto/src/main/java/com/bakdata/kafka/ProtoDeadLetterConverter.java +++ b/error-handling-proto/src/main/java/com/bakdata/kafka/ProtoDeadLetterConverter.java @@ -29,7 +29,6 @@ import com.google.protobuf.Int64Value; import com.google.protobuf.StringValue; import com.google.protobuf.Timestamp; -import java.time.Instant; import org.apache.kafka.streams.kstream.ValueTransformerSupplier; import org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier; @@ -68,12 +67,12 @@ public ProtoDeadLetter convert(final DeadLetterDescription deadLetterDescription builder.setOffset(Int64Value.of(deadLetterDescription.getOffset())); } - if (deadLetterDescription.getTimestamp() != null) { + if (deadLetterDescription.getInputTimestamp() != null) { final Timestamp timestamp = Timestamp.newBuilder() - .setSeconds(deadLetterDescription.getTimestamp().getEpochSecond()) - .setNanos(deadLetterDescription.getTimestamp().getNano()) + .setSeconds(deadLetterDescription.getInputTimestamp().getEpochSecond()) + .setNanos(deadLetterDescription.getInputTimestamp().getNano()) .build(); - builder.setTimestamp(timestamp); + builder.setInputTimestamp(timestamp); } return builder.build(); diff --git a/error-handling-proto/src/main/proto/bakdata/kafka/proto/v1/deadletter.proto b/error-handling-proto/src/main/proto/bakdata/kafka/proto/v1/deadletter.proto index 29698b1..857a0e2 100644 --- a/error-handling-proto/src/main/proto/bakdata/kafka/proto/v1/deadletter.proto +++ b/error-handling-proto/src/main/proto/bakdata/kafka/proto/v1/deadletter.proto @@ -20,5 +20,5 @@ message ProtoDeadLetter { google.protobuf.StringValue topic = 4; google.protobuf.Int32Value partition = 5; google.protobuf.Int64Value offset = 6; - google.protobuf.Timestamp timestamp = 7; + google.protobuf.Timestamp input_timestamp = 7; } diff --git a/error-handling-proto/src/test/java/com/bakdata/kafka/ProtoDeadLetterProcessorTest.java b/error-handling-proto/src/test/java/com/bakdata/kafka/ProtoDeadLetterProcessorTest.java index ae16259..d6a0293 100644 --- a/error-handling-proto/src/test/java/com/bakdata/kafka/ProtoDeadLetterProcessorTest.java +++ b/error-handling-proto/src/test/java/com/bakdata/kafka/ProtoDeadLetterProcessorTest.java @@ -160,7 +160,7 @@ void shouldConvertAndSerializeProtoDeadLetter() { this.softly.assertThat(deadLetter.getInputValue()).extracting(StringValue::getValue) .isEqualTo("foo"); this.softly.assertThat(deadLetter.getOffset()).extracting(Int64Value::getValue).isEqualTo(0L); - this.softly.assertThat(timestampToInstant(deadLetter.getTimestamp())) + this.softly.assertThat(timestampToInstant(deadLetter.getInputTimestamp())) .isEqualTo(Instant.ofEpochMilli(100)); } ); @@ -169,7 +169,7 @@ void shouldConvertAndSerializeProtoDeadLetter() { this.softly.assertThat(deadLetter.getInputValue()).extracting(StringValue::getValue) .isEqualTo("bar"); this.softly.assertThat(deadLetter.getOffset()).extracting(Int64Value::getValue).isEqualTo(1L); - this.softly.assertThat(timestampToInstant(deadLetter.getTimestamp())) + this.softly.assertThat(timestampToInstant(deadLetter.getInputTimestamp())) .isEqualTo(Instant.ofEpochMilli(200)); } );