Skip to content

Commit

Permalink
Use wall-clock time for daed letter record
Browse files Browse the repository at this point in the history
The written dead-letter now has the wall-clock time as its record's
timestamp. Because the original timestamp would be lost otherwise, there
is a new field in the respective dead-letter models called timestamp. To
maintain backwards compatibility, it is nullable, but always set
starting from this version.
  • Loading branch information
torbsto committed Jun 20, 2024
1 parent 4fba751 commit b9b4e05
Show file tree
Hide file tree
Showing 8 changed files with 56 additions and 5 deletions.
11 changes: 11 additions & 0 deletions error-handling-avro/src/main/avro/DeadLetter.avsc
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,17 @@
"name": "description",
"type": "string"
},
{
"name": "timestamp",
"type": [
"null",
{
"type": "long",
"logicalType": "timestamp-millis"
}
],
"default": null
},
{
"name": "cause",
"type": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

package com.bakdata.kafka;

import java.time.Instant;
import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
import org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier;

Expand All @@ -45,6 +46,7 @@ public DeadLetter convert(final DeadLetterDescription deadLetterDescription) {
.setTopic(deadLetterDescription.getTopic())
.setPartition(deadLetterDescription.getPartition())
.setOffset(deadLetterDescription.getOffset())
.setTimestamp(deadLetterDescription.getTimestamp())
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import static org.mockito.Mockito.when;

import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
import java.time.Instant;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
Expand Down Expand Up @@ -88,11 +89,12 @@ protected Properties getKafkaProperties() {

@Test
void shouldConvertAndSerializeAvroDeadLetter() {
final long startTimestamp = System.currentTimeMillis();
when(this.mapper.apply(any(), any())).thenThrow(new RuntimeException(ERROR_MESSAGE));
this.createTopology();
this.topology.input(INPUT_TOPIC).withValueSerde(STRING_SERDE)
.add(1, "foo")
.add(2, "bar");
.add(1, "foo", 100)
.add(2, "bar", 200);

final List<ProducerRecord<Integer, String>> records = Seq.seq(this.topology.streamOutput(OUTPUT_TOPIC)
.withValueSerde(STRING_SERDE))
Expand All @@ -106,6 +108,7 @@ void shouldConvertAndSerializeAvroDeadLetter() {

this.softly.assertThat(errors)
.hasSize(2)
.allSatisfy(record -> this.softly.assertThat(record.timestamp()).isGreaterThan(startTimestamp))
.extracting(ProducerRecord::value).allSatisfy(
deadLetter -> {
this.softly.assertThat(deadLetter.getDescription()).isEqualTo(DEAD_LETTER_DESCRIPTION);
Expand All @@ -123,12 +126,14 @@ void shouldConvertAndSerializeAvroDeadLetter() {
deadLetter -> {
this.softly.assertThat(deadLetter.getInputValue()).hasValue("foo");
this.softly.assertThat(deadLetter.getOffset()).hasValue(0L);
this.softly.assertThat(deadLetter.getTimestamp()).hasValue(Instant.ofEpochMilli(100));
}
);
this.softly.assertThat(errors).extracting(ProducerRecord::value).element(1).satisfies(
deadLetter -> {
this.softly.assertThat(deadLetter.getInputValue()).hasValue("bar");
this.softly.assertThat(deadLetter.getOffset()).hasValue(1L);
this.softly.assertThat(deadLetter.getTimestamp()).hasValue(Instant.ofEpochMilli(200));
}
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

package com.bakdata.kafka;

import java.time.Instant;
import lombok.Builder;
import lombok.NonNull;
import lombok.Value;
Expand Down Expand Up @@ -55,4 +56,5 @@ public static class Cause {
String topic;
Integer partition;
Long offset;
Instant timestamp;
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

package com.bakdata.kafka;

import java.time.Instant;
import java.util.Optional;
import lombok.Getter;
import lombok.NonNull;
Expand Down Expand Up @@ -99,8 +100,14 @@ public void process(final FixedKeyRecord<K, ProcessingError<V>> inputRecord) {
.topic(metadata.map(RecordMetadata::topic).orElse(null))
.partition(metadata.map(RecordMetadata::partition).orElse(null))
.offset(metadata.map(RecordMetadata::offset).orElse(null))
.timestamp(Instant.ofEpochMilli(inputRecord.timestamp()))
.build();
this.context.forward(inputRecord.withValue(this.deadLetterConverter.convert(deadLetterDescription)));

final FixedKeyRecord<K, T> record = inputRecord
.withValue(this.deadLetterConverter.convert(deadLetterDescription))
.withTimestamp(this.context.currentSystemTimeMs());

this.context.forward(record);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import com.google.protobuf.Int32Value;
import com.google.protobuf.Int64Value;
import com.google.protobuf.StringValue;
import com.google.protobuf.Timestamp;
import java.time.Instant;
import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
import org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier;

Expand Down Expand Up @@ -65,6 +67,15 @@ public ProtoDeadLetter convert(final DeadLetterDescription deadLetterDescription
if (deadLetterDescription.getOffset() != null) {
builder.setOffset(Int64Value.of(deadLetterDescription.getOffset()));
}

if (deadLetterDescription.getTimestamp() != null) {
final Timestamp timestamp = Timestamp.newBuilder()
.setSeconds(deadLetterDescription.getTimestamp().getEpochSecond())
.setNanos(deadLetterDescription.getTimestamp().getNano())
.build();
builder.setTimestamp(timestamp);
}

return builder.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ syntax = "proto3";
package bakdata.kafka.proto.v1;

import "google/protobuf/wrappers.proto";
import "google/protobuf/timestamp.proto";

option java_package = "com.bakdata.kafka.proto.v1";
option java_multiple_files = true;
Expand All @@ -19,4 +20,5 @@ message ProtoDeadLetter {
google.protobuf.StringValue topic = 4;
google.protobuf.Int32Value partition = 5;
google.protobuf.Int64Value offset = 6;
google.protobuf.Timestamp timestamp = 7;
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,12 @@
import com.google.protobuf.Int32Value;
import com.google.protobuf.Int64Value;
import com.google.protobuf.StringValue;
import com.google.protobuf.Timestamp;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider;
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializerConfig;
import io.confluent.kafka.streams.serdes.protobuf.KafkaProtobufSerde;
import java.time.Instant;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -118,11 +120,12 @@ protected void createTopology() {

@Test
void shouldConvertAndSerializeProtoDeadLetter() {
final long startTimestamp = System.currentTimeMillis();
when(this.mapper.apply(any(), any())).thenThrow(new RuntimeException(ERROR_MESSAGE));
this.createTopology();
this.topology.input(INPUT_TOPIC).withValueSerde(STRING_SERDE)
.add(1, "foo")
.add(2, "bar");
.add(1, "foo", 100)
.add(2, "bar", 200);

final List<ProducerRecord<Integer, String>> records = Seq.seq(this.topology.streamOutput(OUTPUT_TOPIC)
.withValueSerde(STRING_SERDE))
Expand All @@ -136,6 +139,7 @@ void shouldConvertAndSerializeProtoDeadLetter() {

this.softly.assertThat(errors)
.hasSize(2)
.allSatisfy(record -> this.softly.assertThat(record.timestamp()).isGreaterThan(startTimestamp))
.extracting(ProducerRecord::value).allSatisfy(
deadLetter -> {
this.softly.assertThat(deadLetter.getDescription()).isEqualTo(DEAD_LETTER_DESCRIPTION);
Expand All @@ -156,15 +160,22 @@ void shouldConvertAndSerializeProtoDeadLetter() {
this.softly.assertThat(deadLetter.getInputValue()).extracting(StringValue::getValue)
.isEqualTo("foo");
this.softly.assertThat(deadLetter.getOffset()).extracting(Int64Value::getValue).isEqualTo(0L);
this.softly.assertThat(timestampToInstant(deadLetter.getTimestamp()))
.isEqualTo(Instant.ofEpochMilli(100));
}
);
this.softly.assertThat(errors).map(ProducerRecord::value).element(1).satisfies(
deadLetter -> {
this.softly.assertThat(deadLetter.getInputValue()).extracting(StringValue::getValue)
.isEqualTo("bar");
this.softly.assertThat(deadLetter.getOffset()).extracting(Int64Value::getValue).isEqualTo(1L);
this.softly.assertThat(timestampToInstant(deadLetter.getTimestamp()))
.isEqualTo(Instant.ofEpochMilli(200));
}
);
}

private static Instant timestampToInstant(final Timestamp timestamp) {
return Instant.ofEpochSecond(timestamp.getSeconds(), timestamp.getNanos());
}
}

0 comments on commit b9b4e05

Please sign in to comment.