Skip to content

Commit

Permalink
Forward all SerializationExceptions by default (#26)
Browse files Browse the repository at this point in the history
  • Loading branch information
philipp94831 authored Mar 21, 2024
1 parent 632a98e commit 762c34e
Show file tree
Hide file tree
Showing 35 changed files with 153 additions and 197 deletions.
3 changes: 1 addition & 2 deletions .github/workflows/build-and-publish.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ on:
jobs:
build-and-publish:
name: Java Gradle
uses: bakdata/ci-templates/.github/workflows/java-gradle-library.yaml@1.40.6
uses: bakdata/ci-templates/.github/workflows/java-gradle-library.yaml@1.43.0
with:
java-version: 17
secrets:
Expand All @@ -19,5 +19,4 @@ jobs:
signing-password: ${{ secrets.SONATYPE_SIGNING_PASSWORD }}
ossrh-username: ${{ secrets.SONATYPE_OSSRH_USERNAME }}
ossrh-password: ${{ secrets.SONATYPE_OSSRH_PASSWORD }}
github-username: ${{ secrets.GH_USERNAME }}
github-token: ${{ secrets.GH_TOKEN }}
2 changes: 1 addition & 1 deletion .github/workflows/release.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ on:
jobs:
java-gradle-release:
name: Java Gradle
uses: bakdata/ci-templates/.github/workflows/java-gradle-release.yaml@1.40.6
uses: bakdata/ci-templates/.github/workflows/java-gradle-release.yaml@1.43.0
with:
java-version: 17
release-type: "${{ inputs.release-type }}"
Expand Down
20 changes: 3 additions & 17 deletions build.gradle.kts
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
plugins {
id("net.researchgate.release") version "3.0.2"
id("com.bakdata.sonar") version "1.1.17"
id("com.bakdata.sonatype") version "1.1.14"
id("org.hildan.github.changelog") version "2.2.0"
id("com.bakdata.release") version "1.4.0"
id("com.bakdata.sonar") version "1.4.0"
id("com.bakdata.sonatype") version "1.4.0"
id("io.freefair.lombok") version "8.4"
}

Expand Down Expand Up @@ -41,13 +40,6 @@ configure<com.bakdata.gradle.SonatypeSettings> {
}
}

configure<org.hildan.github.changelog.plugin.GitHubChangelogExtension> {
githubUser = "bakdata"
githubRepository = "kafka-error-handling"
futureVersionTag = findProperty("changelog.releaseVersion")?.toString()
sinceTag = findProperty("changelog.sinceTag")?.toString()
}

subprojects {
apply(plugin = "java-library")
apply(plugin = "java-test-fixtures")
Expand All @@ -59,9 +51,3 @@ subprojects {
}
}
}

release {
git {
requireBranch.set("master")
}
}
14 changes: 2 additions & 12 deletions error-handling-core/src/main/java/com/bakdata/kafka/ErrorUtil.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2022 bakdata
* Copyright (c) 2024 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand All @@ -27,7 +27,6 @@
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.SocketTimeoutException;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
import lombok.experimental.UtilityClass;
Expand All @@ -42,7 +41,6 @@
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.specific.SpecificRecord;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.SerializationException;

/**
* This class provides utility methods for dealing with errors in Kafka streams, such as serializing values to string
Expand Down Expand Up @@ -75,22 +73,14 @@ public static boolean isRecoverable(final Exception e) {
* <p>Non-recoverable Kafka errors are:
* <ul>
* <li>{@link RecordTooLargeException}
* <li>{@link SerializationException} which is not caused by timeout
* </ul>
*
* @param e exception
* @return whether exception is thrown by Kafka and recoverable or not
*/
public static boolean isRecoverableKafkaError(final Exception e) {
if (ORG_APACHE_KAFKA_COMMON_ERRORS.equals(e.getClass().getPackageName())) {
if (e instanceof RecordTooLargeException) {
return false;
}
if (e instanceof SerializationException) {
// socket timeouts usually indicate that the schema registry is temporarily down
return e.getCause() instanceof SocketTimeoutException;
}
return true;
return !(e instanceof RecordTooLargeException);
}
return false;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2022 bakdata
* Copyright (c) 2024 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -32,7 +32,6 @@

import java.util.List;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
Expand Down Expand Up @@ -93,13 +92,14 @@ void shouldNotAllowNullFilter(final SoftAssertions softly) {
}

@Test
void shouldForwardSchemaRegistryTimeout(final SoftAssertions softly) {
when(this.mapper.apply(1, "foo")).thenThrow(createSchemaRegistryTimeoutException());
void shouldForwardRecoverableException(final SoftAssertions softly) {
final RuntimeException throwable = createRecoverableException();
when(this.mapper.apply(1, "foo")).thenThrow(throwable);
this.createTopology();
softly.assertThatThrownBy(() -> this.topology.input()
.withValueSerde(STRING_SERDE)
.add(1, "foo"))
.hasCauseInstanceOf(SerializationException.class);
.hasCause(throwable);
final List<ProducerRecord<Double, Long>> records = Seq.seq(this.topology.streamOutput(OUTPUT_TOPIC)
.withKeySerde(DOUBLE_SERDE)
.withValueSerde(LONG_SERDE))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2022 bakdata
* Copyright (c) 2024 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -29,7 +29,6 @@

import java.util.List;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
Expand Down Expand Up @@ -99,8 +98,8 @@ void shouldNotAllowNullFilter(final SoftAssertions softly) {
}

@Test
void shouldForwardSchemaRegistryTimeout(final SoftAssertions softly) {
final RuntimeException throwable = createSchemaRegistryTimeoutException();
void shouldForwardRecoverableException(final SoftAssertions softly) {
final RuntimeException throwable = createRecoverableException();
this.mapper = new Transformer<>() {
private ProcessorContext context = null;

Expand All @@ -126,7 +125,7 @@ public void close() {
softly.assertThatThrownBy(() -> this.topology.input()
.withValueSerde(STRING_SERDE)
.add(1, "foo"))
.hasCauseInstanceOf(SerializationException.class);
.hasCause(throwable);
final List<ProducerRecord<Double, Long>> records = Seq.seq(this.topology.streamOutput(OUTPUT_TOPIC)
.withKeySerde(DOUBLE_SERDE)
.withValueSerde(LONG_SERDE))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2022 bakdata
* Copyright (c) 2024 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -33,7 +33,6 @@
import java.util.Arrays;
import java.util.List;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
Expand Down Expand Up @@ -94,13 +93,14 @@ void shouldNotAllowNullFilter(final SoftAssertions softly) {
}

@Test
void shouldForwardSchemaRegistryTimeout(final SoftAssertions softly) {
when(this.mapper.apply("foo")).thenThrow(createSchemaRegistryTimeoutException());
void shouldForwardRecoverableException(final SoftAssertions softly) {
final RuntimeException throwable = createRecoverableException();
when(this.mapper.apply("foo")).thenThrow(throwable);
this.createTopology();
softly.assertThatThrownBy(() -> this.topology.input()
.withValueSerde(STRING_SERDE)
.add(1, "foo"))
.hasCauseInstanceOf(SerializationException.class);
.hasCause(throwable);
final List<ProducerRecord<Integer, Long>> records = Seq.seq(this.topology.streamOutput(OUTPUT_TOPIC)
.withValueSerde(LONG_SERDE))
.toList();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2022 bakdata
* Copyright (c) 2024 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -33,7 +33,6 @@
import java.util.Arrays;
import java.util.List;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
Expand Down Expand Up @@ -92,13 +91,14 @@ void shouldNotAllowNullFilter(final SoftAssertions softly) {
}

@Test
void shouldForwardSchemaRegistryTimeout(final SoftAssertions softly) {
when(this.mapper.apply(1, "foo")).thenThrow(createSchemaRegistryTimeoutException());
void shouldForwardRecoverableException(final SoftAssertions softly) {
final RuntimeException throwable = createRecoverableException();
when(this.mapper.apply(1, "foo")).thenThrow(throwable);
this.createTopology();
softly.assertThatThrownBy(() -> this.topology.input()
.withValueSerde(STRING_SERDE)
.add(1, "foo"))
.hasCauseInstanceOf(SerializationException.class);
.hasCause(throwable);
final List<ProducerRecord<Integer, Long>> records = Seq.seq(this.topology.streamOutput(OUTPUT_TOPIC)
.withValueSerde(LONG_SERDE))
.toList();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2022 bakdata
* Copyright (c) 2024 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -29,7 +29,6 @@

import java.util.List;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
Expand Down Expand Up @@ -94,8 +93,8 @@ void shouldNotAllowNullFilter(final SoftAssertions softly) {
}

@Test
void shouldForwardSchemaRegistryTimeout(final SoftAssertions softly) {
final RuntimeException throwable = createSchemaRegistryTimeoutException();
void shouldForwardRecoverableException(final SoftAssertions softly) {
final RuntimeException throwable = createRecoverableException();
this.mapper = new ValueTransformer<>() {
private ProcessorContext context = null;

Expand All @@ -121,7 +120,7 @@ public void close() {
softly.assertThatThrownBy(() -> this.topology.input()
.withValueSerde(STRING_SERDE)
.add(1, "foo"))
.hasCauseInstanceOf(SerializationException.class);
.hasCause(throwable);
final List<ProducerRecord<Integer, Long>> records = Seq.seq(this.topology.streamOutput(OUTPUT_TOPIC)
.withValueSerde(LONG_SERDE))
.toList();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2022 bakdata
* Copyright (c) 2024 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -29,7 +29,6 @@

import java.util.List;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
Expand Down Expand Up @@ -97,8 +96,8 @@ void shouldNotAllowNullFilter(final SoftAssertions softly) {
}

@Test
void shouldForwardSchemaRegistryTimeout(final SoftAssertions softly) {
final RuntimeException throwable = createSchemaRegistryTimeoutException();
void shouldForwardRecoverableException(final SoftAssertions softly) {
final RuntimeException throwable = createRecoverableException();
this.mapper = new ValueTransformerWithKey<>() {
private ProcessorContext context = null;

Expand All @@ -124,7 +123,7 @@ public void close() {
softly.assertThatThrownBy(() -> this.topology.input()
.withValueSerde(STRING_SERDE)
.add(1, "foo"))
.hasCauseInstanceOf(SerializationException.class);
.hasCause(throwable);
final List<ProducerRecord<Integer, Long>> records = Seq.seq(this.topology.streamOutput(OUTPUT_TOPIC)
.withValueSerde(LONG_SERDE))
.toList();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2022 bakdata
* Copyright (c) 2024 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -32,7 +32,6 @@

import java.util.List;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
Expand Down Expand Up @@ -93,13 +92,14 @@ void shouldNotAllowNullFilter(final SoftAssertions softly) {
}

@Test
void shouldForwardSchemaRegistryTimeout(final SoftAssertions softly) {
when(this.mapper.apply(1, "foo")).thenThrow(createSchemaRegistryTimeoutException());
void shouldForwardRecoverableException(final SoftAssertions softly) {
final RuntimeException throwable = createRecoverableException();
when(this.mapper.apply(1, "foo")).thenThrow(throwable);
this.createTopology();
softly.assertThatThrownBy(() -> this.topology.input()
.withValueSerde(STRING_SERDE)
.add(1, "foo"))
.hasCauseInstanceOf(SerializationException.class);
.hasCause(throwable);
final List<ProducerRecord<Double, Long>> records = Seq.seq(this.topology.streamOutput(OUTPUT_TOPIC)
.withKeySerde(DOUBLE_SERDE)
.withValueSerde(LONG_SERDE))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2022 bakdata
* Copyright (c) 2024 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -29,7 +29,6 @@

import java.util.List;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
Expand Down Expand Up @@ -95,8 +94,8 @@ void shouldNotAllowNullFilter(final SoftAssertions softly) {
}

@Test
void shouldForwardSchemaRegistryTimeout(final SoftAssertions softly) {
final RuntimeException throwable = createSchemaRegistryTimeoutException();
void shouldForwardRecoverableException(final SoftAssertions softly) {
final RuntimeException throwable = createRecoverableException();
this.mapper = new Processor<>() {

@Override
Expand All @@ -120,7 +119,7 @@ public void close() {
softly.assertThatThrownBy(() -> this.topology.input()
.withValueSerde(STRING_SERDE)
.add(1, "foo"))
.hasCauseInstanceOf(SerializationException.class);
.hasCause(throwable);
final List<ProducerRecord<Double, Long>> records = Seq.seq(this.topology.streamOutput(OUTPUT_TOPIC)
.withKeySerde(DOUBLE_SERDE)
.withValueSerde(LONG_SERDE))
Expand Down
Loading

0 comments on commit 762c34e

Please sign in to comment.