diff --git a/boms/common-expansion/pom.xml b/boms/common-expansion/pom.xml index 7f18c7fea859..7b69103b3680 100644 --- a/boms/common-expansion/pom.xml +++ b/boms/common-expansion/pom.xml @@ -274,6 +274,19 @@ + + + com.github.luben + zstd-jni + ${version.com.github.luben.zstd-jni} + + + * + * + + + + com.fasterxml.jackson.dataformat jackson-dataformat-yaml @@ -1371,12 +1384,35 @@ + + org.lz4 + lz4-java + ${version.org.lz4.lz4-java} + + + * + * + + + + org.wildfly.security.mp wildfly-elytron-jwt ${version.org.wildfly.security.elytron-mp} + + org.xerial.snappy + snappy-java + ${version.org.xerial.snappy.snappy-java} + + + * + * + + + diff --git a/galleon-pack/galleon-shared/pom.xml b/galleon-pack/galleon-shared/pom.xml index e042d94f7cd3..977172834849 100644 --- a/galleon-pack/galleon-shared/pom.xml +++ b/galleon-pack/galleon-shared/pom.xml @@ -40,7 +40,19 @@ com.fasterxml.jackson.jrjackson-jr-objects - + + + + com.github.luben + zstd-jni + + + * + * + + + + com.google.api.grpcproto-google-common-protos com.google.protobufprotobuf-java com.google.protobufprotobuf-java-util @@ -636,6 +648,17 @@ org.jetbrains.kotlinkotlin-stdlib-jdk8 org.jetbrains.kotlinkotlin-stdlib-jdk7 + + org.lz4 + lz4-java + + + * + * + + + + org.wildfly.security.mp wildfly-elytron-jwt @@ -647,6 +670,17 @@ + + org.xerial.snappy + snappy-java + + + * + * + + + + ${full.maven.groupId} wildfly-microprofile-config-smallrye @@ -780,7 +814,6 @@ ${full.maven.groupId} wildfly-microprofile-telemetry-cdi-provider - diff --git a/galleon-pack/galleon-shared/src/main/resources/license/licenses.xml b/galleon-pack/galleon-shared/src/main/resources/license/licenses.xml index ae5f7b739130..97f8491f8e29 100644 --- a/galleon-pack/galleon-shared/src/main/resources/license/licenses.xml +++ b/galleon-pack/galleon-shared/src/main/resources/license/licenses.xml @@ -28,6 +28,17 @@ + + com.github.luben + zstd-jni + + + The BSD License + http://repository.jboss.org/licenses/bsd.txt + repo + + + com.google.api.grpc proto-google-common-protos @@ -961,6 +972,17 @@ + + org.lz4 + lz4-java + + + Apache License 2.0 + http://www.apache.org/licenses/LICENSE-2.0 + repo + + + ${full.maven.groupId} wildfly-microprofile-config-smallrye @@ -1236,5 +1258,16 @@ + + org.xerial.snappy + snappy-java + + + Apache License 2.0 + http://www.apache.org/licenses/LICENSE-2.0 + repo + + + diff --git a/galleon-pack/galleon-shared/src/main/resources/modules/system/layers/base/com/github/luben/zstd-jni/main/module.xml b/galleon-pack/galleon-shared/src/main/resources/modules/system/layers/base/com/github/luben/zstd-jni/main/module.xml new file mode 100644 index 000000000000..924ecbb3615b --- /dev/null +++ b/galleon-pack/galleon-shared/src/main/resources/modules/system/layers/base/com/github/luben/zstd-jni/main/module.xml @@ -0,0 +1,32 @@ + + + + + + + + + + + + \ No newline at end of file diff --git a/galleon-pack/galleon-shared/src/main/resources/modules/system/layers/base/org/apache/kafka/client/main/module.xml b/galleon-pack/galleon-shared/src/main/resources/modules/system/layers/base/org/apache/kafka/client/main/module.xml index 5baeb57c361e..6fb864393ea4 100644 --- a/galleon-pack/galleon-shared/src/main/resources/modules/system/layers/base/org/apache/kafka/client/main/module.xml +++ b/galleon-pack/galleon-shared/src/main/resources/modules/system/layers/base/org/apache/kafka/client/main/module.xml @@ -16,9 +16,12 @@ + + + \ No newline at end of file diff --git a/galleon-pack/galleon-shared/src/main/resources/modules/system/layers/base/org/lz4/lz4-java/main/module.xml b/galleon-pack/galleon-shared/src/main/resources/modules/system/layers/base/org/lz4/lz4-java/main/module.xml new file mode 100644 index 000000000000..90636e326db9 --- /dev/null +++ b/galleon-pack/galleon-shared/src/main/resources/modules/system/layers/base/org/lz4/lz4-java/main/module.xml @@ -0,0 +1,33 @@ + + + + + + + + + + + + + \ No newline at end of file diff --git a/galleon-pack/galleon-shared/src/main/resources/modules/system/layers/base/org/xerial/snappy/snappy-java/main/module.xml b/galleon-pack/galleon-shared/src/main/resources/modules/system/layers/base/org/xerial/snappy/snappy-java/main/module.xml new file mode 100644 index 000000000000..dd0f89a6ae01 --- /dev/null +++ b/galleon-pack/galleon-shared/src/main/resources/modules/system/layers/base/org/xerial/snappy/snappy-java/main/module.xml @@ -0,0 +1,32 @@ + + + + + + + + + + + + \ No newline at end of file diff --git a/pom.xml b/pom.xml index d6b94e18efa3..3e43af18cd1d 100644 --- a/pom.xml +++ b/pom.xml @@ -372,6 +372,7 @@ 1.8 1.9 1.1 + 1.5.2-1 2.0.1 2.8.9 32.1.2-jre @@ -535,6 +536,7 @@ 2.1.0 18.0.2 1.11 + 1.8.0 4.2.0 9.5 1.0.4 @@ -546,6 +548,7 @@ 1.0.0.Final 2.20.126 1.0.1 + 1.1.8.4 ${version.org.glassfish.jaxb} 1.6.3 1.2 diff --git a/testsuite/integration/microprofile/pom.xml b/testsuite/integration/microprofile/pom.xml index 71948e448e25..5db51340f2d7 100644 --- a/testsuite/integration/microprofile/pom.xml +++ b/testsuite/integration/microprofile/pom.xml @@ -226,6 +226,24 @@ ${version.org.apache.kafka} test + + + com.github.luben + zstd-jni + test + + + + org.lz4 + lz4-java + test + + + + org.xerial.snappy + snappy-java + test + org.wildfly.core wildfly-cli diff --git a/testsuite/integration/microprofile/src/test/java/org/wildfly/test/integration/microprofile/reactive/messaging/kafka/compression/CompressionMessagingBean.java b/testsuite/integration/microprofile/src/test/java/org/wildfly/test/integration/microprofile/reactive/messaging/kafka/compression/CompressionMessagingBean.java new file mode 100644 index 000000000000..b9a6d8507698 --- /dev/null +++ b/testsuite/integration/microprofile/src/test/java/org/wildfly/test/integration/microprofile/reactive/messaging/kafka/compression/CompressionMessagingBean.java @@ -0,0 +1,102 @@ +/* + * JBoss, Home of Professional Open Source. + * Copyright 2023, Red Hat, Inc., and individual contributors + * as indicated by the @author tags. See the copyright.txt file in the + * distribution for a full listing of individual contributors. + * + * This is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License as + * published by the Free Software Foundation; either version 2.1 of + * the License, or (at your option) any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this software; if not, write to the Free + * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA + * 02110-1301 USA, or see the FSF site: http://www.fsf.org. + */ + +package org.wildfly.test.integration.microprofile.reactive.messaging.kafka.compression; + +import io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordMetadata; +import io.smallrye.reactive.messaging.kafka.api.KafkaMetadataUtil; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import org.eclipse.microprofile.reactive.messaging.Channel; +import org.eclipse.microprofile.reactive.messaging.Emitter; +import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.eclipse.microprofile.reactive.messaging.Message; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.CountDownLatch; + +/** + * @author Kabir Khan + */ +@ApplicationScoped +public class CompressionMessagingBean { + private final CountDownLatch latch = new CountDownLatch(4); + private List words = new ArrayList<>(); + + public CountDownLatch getLatch() { + return latch; + } + + @Channel("to-kafka-gzip") + @Inject + Emitter gzipEmitter; + + @Channel("to-kafka-snappy") + @Inject + Emitter snappyEmitter; + + @Channel("to-kafka-lz4") + @Inject + Emitter lz4Emitter; + + @Channel("to-kafka-zstd") + @Inject + Emitter zstdEmitter; + + @Incoming("from-kafka") + public CompletionStage sink(Message message) { + IncomingKafkaRecordMetadata metadata = KafkaMetadataUtil.readIncomingKafkaMetadata(message).get(); + words.add(message.getPayload()); + latch.countDown(); + return message.ack(); + } + + public List getWords() { + return words; + } + + public void sendGzip(String...words) { + for (String word : words) { + gzipEmitter.send(word); + } + } + + public void sendSnappy(String...words) { + for (String word : words) { + snappyEmitter.send(word); + } + } + + public void sendLz4(String...words) { + for (String word : words) { + lz4Emitter.send(word); + } + } + + public void sendZstd(String...words) { + for (String word : words) { + zstdEmitter.send(word); + } + } +} diff --git a/testsuite/integration/microprofile/src/test/java/org/wildfly/test/integration/microprofile/reactive/messaging/kafka/compression/ReactiveMessagingKafkaCompressionTestCase.java b/testsuite/integration/microprofile/src/test/java/org/wildfly/test/integration/microprofile/reactive/messaging/kafka/compression/ReactiveMessagingKafkaCompressionTestCase.java new file mode 100644 index 000000000000..b793badac7ca --- /dev/null +++ b/testsuite/integration/microprofile/src/test/java/org/wildfly/test/integration/microprofile/reactive/messaging/kafka/compression/ReactiveMessagingKafkaCompressionTestCase.java @@ -0,0 +1,93 @@ +/* + * JBoss, Home of Professional Open Source. + * Copyright 2023, Red Hat, Inc., and individual contributors + * as indicated by the @author tags. See the copyright.txt file in the + * distribution for a full listing of individual contributors. + * + * This is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License as + * published by the Free Software Foundation; either version 2.1 of + * the License, or (at your option) any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this software; if not, write to the Free + * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA + * 02110-1301 USA, or see the FSF site: http://www.fsf.org. + */ + +package org.wildfly.test.integration.microprofile.reactive.messaging.kafka.compression; + +import jakarta.inject.Inject; +import org.jboss.arquillian.container.test.api.Deployment; +import org.jboss.arquillian.junit.Arquillian; +import org.jboss.as.arquillian.api.ServerSetup; +import org.jboss.as.test.shared.CLIServerSetupTask; +import org.jboss.as.test.shared.PermissionUtils; +import org.jboss.as.test.shared.TimeoutUtil; +import org.jboss.shrinkwrap.api.ShrinkWrap; +import org.jboss.shrinkwrap.api.asset.EmptyAsset; +import org.jboss.shrinkwrap.api.exporter.ZipExporter; +import org.jboss.shrinkwrap.api.spec.WebArchive; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.wildfly.test.integration.microprofile.reactive.EnableReactiveExtensionsSetupTask; +import org.wildfly.test.integration.microprofile.reactive.RunKafkaSetupTask; + +import java.io.File; +import java.util.Arrays; +import java.util.HashSet; +import java.util.PropertyPermission; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +/** + * @author Kabir Khan + */ +@RunWith(Arquillian.class) +@ServerSetup({RunKafkaSetupTask.class, EnableReactiveExtensionsSetupTask.class}) +public class ReactiveMessagingKafkaCompressionTestCase { + + private static final long TIMEOUT = TimeoutUtil.adjust(15000); + + @Inject + CompressionMessagingBean bean; + + + @Deployment + public static WebArchive getDeployment() { + final WebArchive webArchive = ShrinkWrap.create(WebArchive.class, "reactive-messaging-kafka-tx.war") + .addAsWebInfResource(EmptyAsset.INSTANCE, "beans.xml") + .addPackage(ReactiveMessagingKafkaCompressionTestCase.class.getPackage()) + .addClasses(RunKafkaSetupTask.class, EnableReactiveExtensionsSetupTask.class, CLIServerSetupTask.class) + .addAsWebInfResource(ReactiveMessagingKafkaCompressionTestCase.class.getPackage(), "microprofile-config.properties", "classes/META-INF/microprofile-config.properties") + .addClass(TimeoutUtil.class) + .addAsManifestResource(PermissionUtils.createPermissionsXmlAsset( + new PropertyPermission(TimeoutUtil.FACTOR_SYS_PROP, "read") + ), "permissions.xml"); + + webArchive.as(ZipExporter.class).exportTo(new File("target/test-original.war"), true); + + return webArchive; + } + + @Test + public void test() throws InterruptedException { + bean.sendGzip("Hello"); + bean.sendSnappy("World"); + bean.sendLz4("of"); + bean.sendZstd("Reactive"); + + boolean wait = bean.getLatch().await(TIMEOUT, TimeUnit.MILLISECONDS); + Assert.assertTrue("Timed out", wait); + Set expected = new HashSet<>(Arrays.asList("Hello", "World", "of", "Reactive")); + Assert.assertEquals(expected.size(), bean.getWords().size()); + Assert.assertTrue("Expected " + bean.getWords() + " to contain all of " + expected, bean.getWords().containsAll(expected)); + + } +} diff --git a/testsuite/integration/microprofile/src/test/java/org/wildfly/test/integration/microprofile/reactive/messaging/kafka/compression/microprofile-config.properties b/testsuite/integration/microprofile/src/test/java/org/wildfly/test/integration/microprofile/reactive/messaging/kafka/compression/microprofile-config.properties new file mode 100644 index 000000000000..f6a653c13517 --- /dev/null +++ b/testsuite/integration/microprofile/src/test/java/org/wildfly/test/integration/microprofile/reactive/messaging/kafka/compression/microprofile-config.properties @@ -0,0 +1,54 @@ +# +# JBoss, Home of Professional Open Source. +# Copyright 2023, Red Hat, Inc., and individual contributors +# as indicated by the @author tags. See the copyright.txt file in the +# distribution for a full listing of individual contributors. +# +# This is free software; you can redistribute it and/or modify it +# under the terms of the GNU Lesser General Public License as +# published by the Free Software Foundation; either version 2.1 of +# the License, or (at your option) any later version. +# +# This software is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this software; if not, write to the Free +# Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA +# 02110-1301 USA, or see the FSF site: http://www.fsf.org. +# + +# Configure the gzip producer +mp.messaging.outgoing.to-kafka-gzip.connector=smallrye-kafka +mp.messaging.outgoing.to-kafka-gzip.topic=testing +mp.messaging.outgoing.to-kafka-gzip.value.serializer=org.apache.kafka.common.serialization.StringSerializer +mp.messaging.outgoing.to-kafka-gzip.compression.type=gzip + +# Configure the snappy producer +mp.messaging.outgoing.to-kafka-snappy.connector=smallrye-kafka +mp.messaging.outgoing.to-kafka-snappy.topic=testing +mp.messaging.outgoing.to-kafka-snappy.value.serializer=org.apache.kafka.common.serialization.StringSerializer +mp.messaging.outgoing.to-kafka-snappy.compression.type=snappy + +# Configure the snappy producer +mp.messaging.outgoing.to-kafka-lz4.connector=smallrye-kafka +mp.messaging.outgoing.to-kafka-lz4.topic=testing +mp.messaging.outgoing.to-kafka-lz4.value.serializer=org.apache.kafka.common.serialization.StringSerializer +mp.messaging.outgoing.to-kafka-lz4.compression.type=lz4 + +# Configure the zstd producer +mp.messaging.outgoing.to-kafka-zstd.connector=smallrye-kafka +mp.messaging.outgoing.to-kafka-zstd.topic=testing +mp.messaging.outgoing.to-kafka-zstd.value.serializer=org.apache.kafka.common.serialization.StringSerializer +mp.messaging.outgoing.to-kafka-zstd.compression.type=zstd + +# Configure the consumer +mp.messaging.incoming.from-kafka.connector=smallrye-kafka +mp.messaging.incoming.from-kafka.topic=testing +mp.messaging.incoming.from-kafka.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer + +# Needed as per https://github.com/smallrye/smallrye-reactive-messaging/issues/845 since the consumer +# joins after the messages are sent +mp.messaging.incoming.from-kafka.auto.offset.reset=earliest