Skip to content

Commit

Permalink
DBZ-5720 [Cloudevents] Switch Quarkus app to Apicurio
Browse files Browse the repository at this point in the history
  • Loading branch information
vjuranek committed Oct 18, 2022
1 parent 8fcdce3 commit 7b05aee
Show file tree
Hide file tree
Showing 7 changed files with 34 additions and 17 deletions.
6 changes: 3 additions & 3 deletions cloudevents/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ curl -i -X PUT -H "Accept:application/json" -H "Content-Type:application/json"
docker run --rm --tty \
--network cloudevents-network \
quay.io/debezium/tooling:1.2 \
kafkacat -b kafka:9092 -C -o beginning -q -s value=avro -r http://schema-registry:8080 \
kafkacat -b kafka:9092 -C -o beginning -q \
-t dbserver3.inventory.customers | jq .
```

Expand All @@ -76,8 +76,8 @@ The same stream processing application writes out that data to the `customers3`
docker run --rm --tty \
--network cloudevents-network \
quay.io/debezium/tooling:1.2 \
kafkacat -b kafka:9092 -C -o beginning -q -s value=avro -r http://schema-registry:8081 \
-t customers2 | jq .
kafkacat -b kafka:9092 -C -o beginning -q \
-t customers3
```

## CloudEvents Binary Mode
Expand Down
10 changes: 7 additions & 3 deletions cloudevents/avro-data-extractor/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<version.surefire>3.0.0-M6</version.surefire>

<apache.kafka.version>3.2.0</apache.kafka.version>
<version.quarkus>2.11.0.Final</version.quarkus>
<version.quarkus>2.13.1.Final</version.quarkus>
<version.debezium>2.0.0.CR1</version.debezium>
<version.kafka.avro>7.2.0</version.kafka.avro>
</properties>
Expand Down Expand Up @@ -98,7 +98,12 @@
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-streams-avro-serde</artifactId>
<version>5.3.2</version>
<version>7.2.1</version>
</dependency>
<dependency>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-utils-serde</artifactId>
<version>1.3.2.Final</version>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
Expand All @@ -125,7 +130,6 @@
<artifactId>kafka-connect-avro-converter</artifactId>
<version>${version.kafka.avro}</version>
</dependency>

</dependencies>
<profiles>
<profile>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ COPY --chown=185 target/quarkus-app/*.jar /deployments/
COPY --chown=185 target/quarkus-app/app/ /deployments/app/
COPY --chown=185 target/quarkus-app/quarkus/ /deployments/quarkus/

EXPOSE 8080
EXPOSE 8079
USER 185
ENV AB_JOLOKIA_OFF=""
ENV JAVA_OPTS="-Dquarkus.http.host=0.0.0.0 -Djava.util.logging.manager=org.jboss.logmanager.LogManager"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@

import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Map;

import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.inject.Produces;
import javax.inject.Inject;

import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
Expand All @@ -24,10 +24,15 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
import io.confluent.kafka.streams.serdes.avro.GenericAvroSerde;
import io.apicurio.registry.client.CompatibleClient;
import io.apicurio.registry.client.RegistryService;
import io.apicurio.registry.utils.serde.AvroKafkaDeserializer;
import io.apicurio.registry.utils.serde.AvroKafkaSerializer;

import io.debezium.examples.cloudevents.dataextractor.model.CloudEvent;
import io.debezium.serde.DebeziumSerdes;


/**
* Starts up the KStreams pipeline once the source topics have been created.
*
Expand Down Expand Up @@ -73,9 +78,13 @@ Topology createStreamTopology() {
.mapValues(ce -> ce.data)
.to(jsonAvroExtractedTopic, Produced.with(longKeySerde, Serdes.ByteArray()));

Serde<GenericRecord> genericAvroSerde = new GenericAvroSerde();
Map<String, String> config = Collections.singletonMap(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
genericAvroSerde.configure(config, false);

RegistryService service = CompatibleClient.createCompatible(schemaRegistryUrl);
Deserializer<GenericRecord> deserializer = new AvroKafkaDeserializer<>(service);
Serde<GenericRecord> genericAvroSerde = Serdes.serdeFrom(
new AvroKafkaSerializer<>(service),
deserializer
);

builder.stream(avroAvroCustomersTopic, Consumed.with(longKeySerde, genericAvroSerde))
.mapValues(ce -> ((ByteBuffer) ce.get("data")).array())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,16 @@ public class CloudEvent {
public String iodebeziumconnector;
public String iodebeziumname;
public String iodebeziumtsms;
public boolean iodebeziumsnapshot;
public String iodebeziumsnapshot;
public String iodebeziumdb;
public String iodebeziumsequence;
public String iodebeziumschema;
public String iodebeziumtable;
public String iodebeziumtxId;
public String iodebeziumtxid;
public String iodebeziumlsn;
public String iodebeziumxmin;
public String iodebeziumtxtotalorder;
public String iodebeziumtxdatacollectionorder;
public byte[] data;
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ json.avro.extracted.topic=customers2
avro.avro.customers.topic=dbserver3.inventory.customers
avro.avro.extracted.topic=customers3

schema.registry.url=http://schema-registry:8081
schema.registry.url=http://schema-registry:8080/api/

quarkus.kafka-streams.bootstrap-servers=localhost:9092
quarkus.kafka-streams.bootstrap-servers=kafka:9092
quarkus.kafka-streams.application-id=cloudevents-data-extractor
quarkus.kafka-streams.topics=${json.avro.customers.topic},${avro.avro.customers.topic}

Expand Down
2 changes: 1 addition & 1 deletion cloudevents/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ services:
networks:
- my-network
schema-registry:
image: apicurio/apicurio-registry-mem:2.2.5.Final
image: apicurio/apicurio-registry-mem:2.3.1.Final
ports:
- 8080:8080
networks:
Expand Down

0 comments on commit 7b05aee

Please sign in to comment.