Skip to content

Commit

Permalink
Clean code
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelKora committed Sep 18, 2023
2 parents bea9b07 + 13da160 commit 5338076
Show file tree
Hide file tree
Showing 76 changed files with 4,255 additions and 126 deletions.
41 changes: 31 additions & 10 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,34 @@
# Change Log

## [1.4.1](https://github.com/bakdata/kafka-error-handling/tree/1.4.1) (2023-01-04)
[View commits](https://github.com/bakdata/kafka-error-handling/compare/1.4.0...1.4.1)

**Merged pull requests:**

- Update dependencies [\#20](https://github.com/bakdata/kafka-error-handling/pull/20) ([@philipp94831](https://github.com/philipp94831))

## [1.4.0](https://github.com/bakdata/kafka-error-handling/tree/1.4.0) (2022-12-28)
[View commits](https://github.com/bakdata/kafka-error-handling/compare/1.3.0...1.4.0)

**Closed issues:**

- Protobuf support for dead letters [\#12](https://github.com/bakdata/kafka-error-handling/issues/12)

**Merged pull requests:**

- Upgrade to Avro 1.11 [\#19](https://github.com/bakdata/kafka-error-handling/pull/19) ([@philipp94831](https://github.com/philipp94831))
- Implement new Kafka 3.3 APIs [\#18](https://github.com/bakdata/kafka-error-handling/pull/18) ([@philipp94831](https://github.com/philipp94831))
- Fix badge for Maven Central [\#17](https://github.com/bakdata/kafka-error-handling/pull/17) ([@philipp94831](https://github.com/philipp94831))

## [1.3.0](https://github.com/bakdata/kafka-error-handling/tree/1.3.0) (2022-07-13)
[Full Changelog](https://github.com/bakdata/kafka-error-handling/compare/1.2.5...1.3.0)
[View commits](https://github.com/bakdata/kafka-error-handling/compare/1.2.5...1.3.0)

**Merged pull requests:**

- Proto conversion for deadletter [\#13](https://github.com/bakdata/kafka-error-handling/pull/13) ([@mkcode92](https://github.com/mkcode92))

## [1.2.5](https://github.com/bakdata/kafka-error-handling/tree/1.2.5) (2022-06-23)
[Full Changelog](https://github.com/bakdata/kafka-error-handling/compare/1.2.4...1.2.5)
[View commits](https://github.com/bakdata/kafka-error-handling/compare/1.2.4...1.2.5)

**Closed issues:**

Expand All @@ -19,7 +39,7 @@
- Support exceptions with null message [\#15](https://github.com/bakdata/kafka-error-handling/pull/15) ([@philipp94831](https://github.com/philipp94831))

## [1.2.4](https://github.com/bakdata/kafka-error-handling/tree/1.2.4) (2022-05-11)
[Full Changelog](https://github.com/bakdata/kafka-error-handling/compare/1.2.3...1.2.4)
[View commits](https://github.com/bakdata/kafka-error-handling/compare/1.2.3...1.2.4)

**Closed issues:**

Expand All @@ -30,56 +50,57 @@
- Fix OFFSET header name in ErrorHeaderTransformer [\#11](https://github.com/bakdata/kafka-error-handling/pull/11) ([@patrickjkennedy](https://github.com/patrickjkennedy))

## [1.2.3](https://github.com/bakdata/kafka-error-handling/tree/1.2.3) (2022-05-02)
[Full Changelog](https://github.com/bakdata/kafka-error-handling/compare/1.2.2...1.2.3)
[View commits](https://github.com/bakdata/kafka-error-handling/compare/1.2.2...1.2.3)

**Merged pull requests:**

- Add flat transformers [\#9](https://github.com/bakdata/kafka-error-handling/pull/9) ([@philipp94831](https://github.com/philipp94831))

## [1.2.2](https://github.com/bakdata/kafka-error-handling/tree/1.2.2) (2022-02-18)
[Full Changelog](https://github.com/bakdata/kafka-error-handling/compare/1.2.1...1.2.2)
[View commits](https://github.com/bakdata/kafka-error-handling/compare/1.2.1...1.2.2)

**Merged pull requests:**

- Add factory methods for TransformerSuppliers [\#8](https://github.com/bakdata/kafka-error-handling/pull/8) ([@philipp94831](https://github.com/philipp94831))

## [1.2.1](https://github.com/bakdata/kafka-error-handling/tree/1.2.1) (2022-02-17)
[Full Changelog](https://github.com/bakdata/kafka-error-handling/compare/1.2.0...1.2.1)
[View commits](https://github.com/bakdata/kafka-error-handling/compare/1.2.0...1.2.1)

**Merged pull requests:**

- Add error class to dead letter [\#7](https://github.com/bakdata/kafka-error-handling/pull/7) ([@philipp94831](https://github.com/philipp94831))

## [1.2.0](https://github.com/bakdata/kafka-error-handling/tree/1.2.0) (2022-02-15)
[Full Changelog](https://github.com/bakdata/kafka-error-handling/compare/1.1.2...1.2.0)
[View commits](https://github.com/bakdata/kafka-error-handling/compare/1.1.2...1.2.0)

**Merged pull requests:**

- Add processing context to dead letters [\#6](https://github.com/bakdata/kafka-error-handling/pull/6) ([@philipp94831](https://github.com/philipp94831))

## [1.1.2](https://github.com/bakdata/kafka-error-handling/tree/1.1.2) (2021-12-10)
[Full Changelog](https://github.com/bakdata/kafka-error-handling/compare/1.1.1...1.1.2)
[View commits](https://github.com/bakdata/kafka-error-handling/compare/1.1.1...1.1.2)

**Merged pull requests:**

- Update avro to 1.10 [\#4](https://github.com/bakdata/kafka-error-handling/pull/4) ([@philipp94831](https://github.com/philipp94831))

## [1.1.1](https://github.com/bakdata/kafka-error-handling/tree/1.1.1) (2021-12-10)
[Full Changelog](https://github.com/bakdata/kafka-error-handling/compare/1.1.0...1.1.1)
[View commits](https://github.com/bakdata/kafka-error-handling/compare/1.1.0...1.1.1)

**Merged pull requests:**

- Update log4j to 2.15.0 [\#5](https://github.com/bakdata/kafka-error-handling/pull/5) ([@philipp94831](https://github.com/philipp94831))

## [1.1.0](https://github.com/bakdata/kafka-error-handling/tree/1.1.0) (2021-09-24)
[Full Changelog](https://github.com/bakdata/kafka-error-handling/compare/1.0.0...1.1.0)
[View commits](https://github.com/bakdata/kafka-error-handling/compare/1.0.0...1.1.0)

**Merged pull requests:**

- Update dependencies [\#3](https://github.com/bakdata/kafka-error-handling/pull/3) ([@philipp94831](https://github.com/philipp94831))
- Provide error details as header fields [\#2](https://github.com/bakdata/kafka-error-handling/pull/2) ([@philipp94831](https://github.com/philipp94831))

## [1.0.0](https://github.com/bakdata/kafka-error-handling/tree/1.0.0) (2020-03-09)
[View commits](https://github.com/bakdata/kafka-error-handling/compare/69873530dde0856c890f502f825ce0da046aef1d...1.0.0)

**Merged pull requests:**

Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[![Build Status](https://dev.azure.com/bakdata/public/_apis/build/status/bakdata.kafka-error-handling?branchName=master)](https://dev.azure.com/bakdata/public/_build/latest?definitionId=23&branchName=master)
[![Sonarcloud status](https://sonarcloud.io/api/project_badges/measure?project=com.bakdata.kafka%3Aerror-handling&metric=alert_status)](https://sonarcloud.io/dashboard?id=com.bakdata.kafka%3Aerror-handling)
[![Code coverage](https://sonarcloud.io/api/project_badges/measure?project=com.bakdata.kafka%3Aerror-handling&metric=coverage)](https://sonarcloud.io/dashboard?id=com.bakdata.kafka%3Aerror-handling)
[![Maven](https://img.shields.io/maven-central/v/com.bakdata.kafka/error-handling.svg)](https://search.maven.org/search?q=g:com.bakdata.kafka%20AND%20a:error-handling&core=gav)
[![Maven](https://img.shields.io/maven-central/v/com.bakdata.kafka/error-handling-core.svg)](https://search.maven.org/search?q=g:com.bakdata.kafka%20AND%20a:error-handling-core&core=gav)

# Kafka error handling
Libraries for error handling in [Kafka Streams](https://kafka.apache.org/documentation/streams/).
Expand Down
14 changes: 10 additions & 4 deletions build.gradle.kts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
plugins {
id("net.researchgate.release") version "2.8.1"
id("net.researchgate.release") version "3.0.2"
id("com.bakdata.sonar") version "1.1.7"
id("com.bakdata.sonatype") version "1.1.7"
id("org.hildan.github.changelog") version "0.8.0"
id("io.freefair.lombok") version "5.3.3.3"
id("org.hildan.github.changelog") version "1.12.1"
id("io.freefair.lombok") version "6.6.1"
}

allprojects {
Expand Down Expand Up @@ -53,8 +53,14 @@ subprojects {
apply(plugin = "java-test-fixtures")
apply(plugin = "io.freefair.lombok")

configure<JavaPluginConvention> {
configure<JavaPluginExtension> {
sourceCompatibility = JavaVersion.VERSION_11
targetCompatibility = JavaVersion.VERSION_11
}
}

release {
git {
requireBranch.set("master")
}
}
5 changes: 3 additions & 2 deletions error-handling-avro/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
description = "Transform dead letters in Kafka Streams applications to Avro format."

plugins {
id("com.github.davidmc24.gradle.plugin.avro") version "1.2.1"
id("com.github.davidmc24.gradle.plugin.avro") version "1.5.0"
}

// add .avsc files to jar allowing us to use them in other projects as a schema dependency
Expand All @@ -24,7 +24,8 @@ dependencies {
testImplementation(group = "org.jooq", name = "jool", version = "0.9.14")
val mockitoVersion: String by project
testImplementation(group = "org.mockito", name = "mockito-junit-jupiter", version = mockitoVersion)
testImplementation(group = "org.assertj", name = "assertj-core", version = "3.20.2")
val assertJVersion: String by project
testImplementation(group = "org.assertj", name = "assertj-core", version = assertJVersion)
val log4jVersion: String by project
testImplementation(group = "org.apache.logging.log4j", name = "log4j-slf4j-impl", version = log4jVersion)
val kafkaStreamsTestsVersion: String by project
Expand Down
1 change: 1 addition & 0 deletions error-handling-avro/lombok.config
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
# This file is generated by the 'io.freefair.lombok' Gradle plugin
config.stopBubbling = true
lombok.addLombokGeneratedAnnotation = true
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
package com.bakdata.kafka;

import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
import org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier;

/**
* Convert a {@code DeadLetterDescription} to an Avro {@code DeadLetter}
Expand Down Expand Up @@ -66,10 +67,38 @@ public DeadLetter convert(final DeadLetterDescription deadLetterDescription) {
* @param description shared description for all errors
* @param <V> type of the input value
* @return a transformer supplier
* @deprecated Use {@link #asProcessor(String)}
*/
@Deprecated(since = "1.4.0")
public static <V> ValueTransformerSupplier<ProcessingError<V>, DeadLetter> asTransformer(
final String description) {
return DeadLetterTransformer.create(description, new AvroDeadLetterConverter());
}

/**
* Creates a processor that uses the AvroDeadLetterConverter
*
* <pre>{@code
* // Example, this works for all error capturing topologies
* final KeyValueMapper<K, V, KeyValue<KR, VR>> mapper = ...;
* final KStream<K, V> input = ...;
* final KStream<KR, ProcessedKeyValue<K, V, VR>> processed = input.map(captureErrors(mapper));
* final KStream<KR, VR> output = processed.flatMapValues(ProcessedKeyValue::getValues);
* final KStream<K, ProcessingError<V>> errors = processed.flatMap(ProcessedKeyValue::getErrors);
* final KStream<K, DeadLetter> deadLetters = errors.processValues(
* AvroDeadLetterConverter.asProcessor("Description"));
* deadLetters.to(ERROR_TOPIC);
* }
* </pre>
*
* @param description shared description for all errors
* @param <K> type of the input key
* @param <V> type of the input value
* @return a processor supplier
*/
public static <K, V> FixedKeyProcessorSupplier<K, ProcessingError<V>, DeadLetter> asProcessor(
final String description) {
return DeadLetterProcessor.create(description, new AvroDeadLetterConverter());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class AvroDeadLetterConverterTest {
private SoftAssertions softly;

@Test
void shouldConvertDeadletterDescriptionWithOptionalFields() {
void shouldConvertDeadLetterDescriptionWithOptionalFields() {
final AvroDeadLetterConverter converter = new AvroDeadLetterConverter();
final DeadLetterDescription deadLetterDescription = DeadLetterDescription.builder()
.inputValue("inputValue")
Expand All @@ -63,7 +63,7 @@ void shouldConvertDeadletterDescriptionWithOptionalFields() {
}

@Test
void shouldConvertDeadletterDescriptionWithoutOptionalFields() {
void shouldConvertDeadLetterDescriptionWithoutOptionalFields() {
final AvroDeadLetterConverter converter = new AvroDeadLetterConverter();
final DeadLetterDescription onlyRequiredFieldsDeadLetterDescription = DeadLetterDescription.builder()
.description("description")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*
* MIT License
*
* Copyright (c) 2022 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package com.bakdata.kafka;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;

import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Produced;
import org.assertj.core.api.SoftAssertions;
import org.assertj.core.api.junit.jupiter.InjectSoftAssertions;
import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension;
import org.jooq.lambda.Seq;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;

@ExtendWith(MockitoExtension.class)
@ExtendWith(SoftAssertionsExtension.class)
@MockitoSettings(strictness = Strictness.STRICT_STUBS)
class AvroDeadLetterProcessorTest extends ErrorCaptureTopologyTest {
private static final String ERROR_TOPIC = "errors";
private static final String OUTPUT_TOPIC = "output";
private static final String INPUT_TOPIC = "input";
private static final Serde<String> STRING_SERDE = Serdes.String();
private static final String DEAD_LETTER_DESCRIPTION = "Description";
private static final String ERROR_MESSAGE = "ERROR!";
@InjectSoftAssertions
private SoftAssertions softly;
@Mock
private KeyValueMapper<Integer, String, KeyValue<Integer, String>> mapper;

@Override
protected void buildTopology(final StreamsBuilder builder) {
final KStream<Integer, String> input = builder.stream(INPUT_TOPIC, Consumed.with(null, STRING_SERDE));
final KStream<Integer, ProcessedKeyValue<Integer, String, String>> mapped =
input.map(ErrorCapturingKeyValueMapper.captureErrors(this.mapper));
mapped.flatMapValues(ProcessedKeyValue::getValues)
.to(OUTPUT_TOPIC, Produced.valueSerde(STRING_SERDE));
mapped.flatMap(ProcessedKeyValue::getErrors)
.processValues(AvroDeadLetterConverter.asProcessor(DEAD_LETTER_DESCRIPTION))
.to(ERROR_TOPIC);
}

@Override
protected Properties getKafkaProperties() {
final Properties kafkaProperties = super.getKafkaProperties();
kafkaProperties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
return kafkaProperties;
}

@Test
void shouldConvertAndSerializeAvroDeadLetter() {
when(this.mapper.apply(any(), any())).thenThrow(new RuntimeException(ERROR_MESSAGE));
this.createTopology();
this.topology.input(INPUT_TOPIC).withValueSerde(STRING_SERDE)
.add(1, "foo")
.add(2, "bar");

final List<ProducerRecord<Integer, String>> records = Seq.seq(this.topology.streamOutput(OUTPUT_TOPIC)
.withValueSerde(STRING_SERDE))
.toList();
this.softly.assertThat(records)
.isEmpty();

final List<ProducerRecord<Integer, DeadLetter>> errors = Seq.seq(this.topology.streamOutput(ERROR_TOPIC)
.withValueType(DeadLetter.class))
.toList();

this.softly.assertThat(errors)
.hasSize(2)
.extracting(ProducerRecord::value).allSatisfy(
deadLetter -> {
this.softly.assertThat(deadLetter.getDescription()).isEqualTo(DEAD_LETTER_DESCRIPTION);
this.softly.assertThat(deadLetter.getCause().getMessage()).hasValue(ERROR_MESSAGE);
this.softly.assertThat(deadLetter.getCause().getErrorClass())
.hasValue(RuntimeException.class.getCanonicalName());
// We don't check the exact stack trace, but only that it consists of multiple lines
this.softly.assertThat(deadLetter.getCause().getStackTrace()).map(s -> Arrays.asList(s.split("\n")))
.get().asList().hasSizeGreaterThan(1);
this.softly.assertThat(deadLetter.getTopic()).hasValue(INPUT_TOPIC);
this.softly.assertThat(deadLetter.getPartition()).hasValue(0);
}
);
this.softly.assertThat(errors).extracting(ProducerRecord::value).element(0).satisfies(
deadLetter -> {
this.softly.assertThat(deadLetter.getInputValue()).hasValue("foo");
this.softly.assertThat(deadLetter.getOffset()).hasValue(0L);
}
);
this.softly.assertThat(errors).extracting(ProducerRecord::value).element(1).satisfies(
deadLetter -> {
this.softly.assertThat(deadLetter.getInputValue()).hasValue("bar");
this.softly.assertThat(deadLetter.getOffset()).hasValue(1L);
}
);

}
}
5 changes: 3 additions & 2 deletions error-handling-core/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
description = "A library for error handling in Kafka Streams."

plugins {
id("com.github.davidmc24.gradle.plugin.avro") version "1.2.1"
id("com.github.davidmc24.gradle.plugin.avro") version "1.5.0"
}

dependencies {
Expand All @@ -16,7 +16,8 @@ dependencies {
testImplementation(group = "org.junit.jupiter", name = "junit-jupiter-api", version = junitVersion)
testImplementation(group = "org.junit.jupiter", name = "junit-jupiter-params", version = junitVersion)
testRuntimeOnly(group = "org.junit.jupiter", name = "junit-jupiter-engine", version = junitVersion)
testImplementation(group = "org.assertj", name = "assertj-core", version = "3.20.2")
val assertJVersion: String by project
testImplementation(group = "org.assertj", name = "assertj-core", version = assertJVersion)
val mockitoVersion: String by project
testImplementation(group = "org.mockito", name = "mockito-core", version = mockitoVersion)
testImplementation(group = "org.mockito", name = "mockito-junit-jupiter", version = mockitoVersion)
Expand Down
1 change: 1 addition & 0 deletions error-handling-core/lombok.config
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
# This file is generated by the 'io.freefair.lombok' Gradle plugin
config.stopBubbling = true
lombok.addLombokGeneratedAnnotation = true
Loading

0 comments on commit 5338076

Please sign in to comment.