From a42211a15118babb970bcdc83fc49de435da116d Mon Sep 17 00:00:00 2001 From: Erik van Oosten Date: Sun, 17 Dec 2023 11:35:17 +0100 Subject: [PATCH] Improve support and documentation for manual offsets (#1135) According to the javadoc of `seek`, when a given offset is invalid, it falls back to the `auto.offset.reset` configuration. In this change we allow the user to set that configuration when `Manual` offset retrieval is used. Furthermore, testing showed that we fall back to 'Auto' behavior when no offset is given. This change also contains a behavior change. For Manual offsets, when no offset is given by the user, before we would skip the records from the first poll. After this change those are no longer skipped. In addition, add documentation on how to configure offset retrieval. Note: this change is source compatible, but not binary compatible. --- .../src/test/scala/zio/kafka/Benchmarks.scala | 14 +- .../zio/kafka/consumer/ConsumerSpec.scala | 122 ++++++++++++------ .../zio/kafka/testkit/KafkaTestUtils.scala | 5 +- .../scala/zio/kafka/consumer/Consumer.scala | 9 +- .../zio/kafka/consumer/ConsumerSettings.scala | 46 +++++-- .../zio/kafka/consumer/internal/Runloop.scala | 11 +- 6 files changed, 148 insertions(+), 59 deletions(-) diff --git a/zio-kafka-test/src/test/scala/zio/kafka/Benchmarks.scala b/zio-kafka-test/src/test/scala/zio/kafka/Benchmarks.scala index 9fea9d1dc..ab5ffa0ef 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/Benchmarks.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/Benchmarks.scala @@ -1,14 +1,18 @@ package zio.kafka +import org.apache.kafka.clients.consumer.{ ConsumerConfig, KafkaConsumer } import org.apache.kafka.clients.producer.ProducerRecord -import zio.{ System => _, _ }, zio.stream._ +import org.apache.kafka.clients.producer.ProducerConfig +import org.apache.kafka.common.serialization.StringDeserializer +import zio.{ System => _, _ } +import zio.stream._ import zio.kafka.producer._ import zio.kafka.serde._ -import org.apache.kafka.clients.producer.ProducerConfig -import org.apache.kafka.clients.consumer.{ ConsumerConfig, KafkaConsumer } +import zio.kafka.consumer.Consumer.{ AutoOffsetStrategy, OffsetRetrieval } + import scala.jdk.CollectionConverters._ import java.time.Duration -import org.apache.kafka.common.serialization.StringDeserializer + import java.util.concurrent.TimeUnit object PopulateTopic extends ZIOAppDefault { @@ -79,7 +83,7 @@ object ZIOKafka extends ZIOAppDefault { val expectedCount = 1000000 val settings = ConsumerSettings(List("localhost:9092")) .withGroupId(s"zio-kafka-${scala.util.Random.nextInt()}") - .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") + .withOffsetRetrieval(OffsetRetrieval.Auto(AutoOffsetStrategy.Earliest)) .withProperty("fetch.min.bytes", "128000") .withPollTimeout(50.millis) diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala index f55c8404f..6697272ca 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala @@ -607,44 +607,92 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { }.flatten .map(diagnosticEvents => assert(diagnosticEvents.size)(isGreaterThanEqualTo(2))) }, - test("support manual seeking") { - val nrRecords = 10 - val data = (1 to nrRecords).toList.map(i => s"key$i" -> s"msg$i") - val manualOffsetSeek = 3 - - for { - topic <- randomTopic - group <- randomGroup - client1 <- randomClient - client2 <- randomClient + suite("support manual seeking") { + val manualOffsetSeek = 3L + def manualSeekTest(defaultStrategy: AutoOffsetStrategy): ZIO[Kafka & Producer, Throwable, Map[Int, Long]] = { + val nrRecords = 10 + val data = (1 to nrRecords).map(i => s"key$i" -> s"msg$i") + val nrPartitions = 4 + for { + topic <- randomTopic + group <- randomGroup + client1 <- randomClient + client2 <- randomClient + _ <- ZIO.attempt(EmbeddedKafka.createCustomTopic(topic, partitions = nrPartitions)) + produceToAll = ZIO.foreachDiscard(0 until nrPartitions)(p => produceMany(topic, p, data)) + _ <- produceToAll + // Consume 5 records from partitions 0, 1 and 2 (not 3). This sets the current offset to 5. + _ <- ZIO + .foreachDiscard(Chunk(0, 1, 2)) { partition => + Consumer + .plainStream(Subscription.manual(topic, partition), Serde.string, Serde.string) + .take(5) + .transduce(ZSink.collectAllN[CommittableRecord[String, String]](5)) + .mapConcatZIO { committableRecords => + val records = committableRecords.map(_.record) + val offsetBatch = OffsetBatch(committableRecords.map(_.offset)) + offsetBatch.commit.as(records) + } + .runCollect + } + .provideSomeLayer[Kafka](consumer(client1, Some(group))) + // Start a new consumer with manual offsets. The given offset per partition is: + // p0: 3, before the current offset => should consume from the given offset + // p1: _Maxvalue_, offset out of range (invalid) => should consume using default strategy + // p2: _nothing_ given => should consume from the committed offset + // p4: _nothing_ given => should consume using default strategy + offsetRetrieval = OffsetRetrieval.Manual( + getOffsets = _ => + ZIO.attempt( + Map( + new TopicPartition(topic, 0) -> manualOffsetSeek, + new TopicPartition(topic, 1) -> Long.MaxValue + ) + ), + defaultStrategy = defaultStrategy + ) + // Start the second consumer. + c2Fib <- Consumer + .plainStream(Subscription.topics(topic), Serde.string, Serde.string) + .runFoldWhile(Map.empty[Int, Long])(_.size < nrPartitions) { case (acc, record) => + if (acc.contains(record.partition)) acc + else acc + (record.partition -> record.offset.offset) + } + .provideSomeLayer[Kafka]( + consumer(client2, Some(group), offsetRetrieval = offsetRetrieval) + ) + .fork + // For defaultStrategy 'latest' the second consumer won't see a record until we produce some more. + produceFib <- produceToAll + .repeat(Schedule.spaced(1.second)) + .when(defaultStrategy == AutoOffsetStrategy.Latest) + .fork + c2Results <- c2Fib.join + _ <- produceFib.interrupt + } yield c2Results + } - _ <- produceMany(topic, 0, data) - // Consume 5 records to have the offset committed at 5 - _ <- Consumer - .plainStream(Subscription.topics(topic), Serde.string, Serde.string) - .take(5) - .transduce(ZSink.collectAllN[CommittableRecord[String, String]](5)) - .mapConcatZIO { committableRecords => - val records = committableRecords.map(_.record) - val offsetBatch = OffsetBatch(committableRecords.map(_.offset)) - - offsetBatch.commit.as(records) - } - .runCollect - .provideSomeLayer[Kafka](consumer(client1, Some(group))) - // Start a new consumer with manual offset before the committed offset - offsetRetrieval = OffsetRetrieval.Manual(tps => ZIO.attempt(tps.map(_ -> manualOffsetSeek.toLong).toMap)) - secondResults <- Consumer - .plainStream(Subscription.topics(topic), Serde.string, Serde.string) - .take(nrRecords.toLong - manualOffsetSeek) - .map(_.record) - .runCollect - .provideSomeLayer[Kafka]( - consumer(client2, Some(group), offsetRetrieval = offsetRetrieval) - ) - // Check that we only got the records starting from the manually seek'd offset - } yield assert(secondResults.map(rec => rec.key() -> rec.value()).toList)( - equalTo(data.drop(manualOffsetSeek)) + Seq( + test("manual seek with earliest default strategy") { + for { + consumeStartOffsets <- manualSeekTest(AutoOffsetStrategy.Earliest) + } yield assertTrue( + consumeStartOffsets(0) == manualOffsetSeek, + consumeStartOffsets(1) == 0L, + consumeStartOffsets(2) == 5L, + consumeStartOffsets(3) == 0L + ) + }, + test("manual seek with latest default strategy") { + for { + consumeStartOffsets <- manualSeekTest(AutoOffsetStrategy.Latest) + } yield assertTrue( + consumeStartOffsets(0) == manualOffsetSeek, + consumeStartOffsets(1) >= 10L, + consumeStartOffsets(2) == 5L, + consumeStartOffsets(3) >= 10L + ) + } ) }, test("commit offsets for all consumed messages") { diff --git a/zio-kafka-testkit/src/main/scala/zio/kafka/testkit/KafkaTestUtils.scala b/zio-kafka-testkit/src/main/scala/zio/kafka/testkit/KafkaTestUtils.scala index 7ba3c28dd..f19509457 100644 --- a/zio-kafka-testkit/src/main/scala/zio/kafka/testkit/KafkaTestUtils.scala +++ b/zio-kafka-testkit/src/main/scala/zio/kafka/testkit/KafkaTestUtils.scala @@ -114,7 +114,7 @@ object KafkaTestUtils { groupId: Option[String] = None, clientInstanceId: Option[String] = None, allowAutoCreateTopics: Boolean = true, - offsetRetrieval: OffsetRetrieval = OffsetRetrieval.Auto(), + offsetRetrieval: OffsetRetrieval = OffsetRetrieval.Auto(AutoOffsetStrategy.Earliest), restartStreamOnRebalancing: Boolean = false, rebalanceSafeCommits: Boolean = false, maxRebalanceDuration: Duration = 3.minutes, @@ -132,7 +132,6 @@ object KafkaTestUtils { .withMaxPollRecords(`max.poll.records`) .withCommitTimeout(commitTimeout) .withProperties( - ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest", ConsumerConfig.METADATA_MAX_AGE_CONFIG -> "100", ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG -> "3000", ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG -> "1000", @@ -156,7 +155,7 @@ object KafkaTestUtils { clientId: String, clientInstanceId: Option[String] = None, allowAutoCreateTopics: Boolean = true, - offsetRetrieval: OffsetRetrieval = OffsetRetrieval.Auto(), + offsetRetrieval: OffsetRetrieval = OffsetRetrieval.Auto(AutoOffsetStrategy.Earliest), restartStreamOnRebalancing: Boolean = false, rebalanceSafeCommits: Boolean = false, properties: Map[String, String] = Map.empty diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala index b14e759e6..fc3f17ada 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala @@ -401,10 +401,14 @@ object Consumer { def metrics: RIO[Consumer, Map[MetricName, Metric]] = ZIO.serviceWithZIO(_.metrics) + /** See ConsumerSettings.withOffsetRetrieval. */ sealed trait OffsetRetrieval object OffsetRetrieval { - final case class Auto(reset: AutoOffsetStrategy = AutoOffsetStrategy.Latest) extends OffsetRetrieval - final case class Manual(getOffsets: Set[TopicPartition] => Task[Map[TopicPartition, Long]]) extends OffsetRetrieval + final case class Auto(reset: AutoOffsetStrategy = AutoOffsetStrategy.Latest) extends OffsetRetrieval + final case class Manual( + getOffsets: Set[TopicPartition] => Task[Map[TopicPartition, Long]], + defaultStrategy: AutoOffsetStrategy = AutoOffsetStrategy.Latest + ) extends OffsetRetrieval } sealed trait AutoOffsetStrategy { self => @@ -415,6 +419,7 @@ object Consumer { } } + /** See ConsumerSettings.withOffsetRetrieval. */ object AutoOffsetStrategy { case object Earliest extends AutoOffsetStrategy case object Latest extends AutoOffsetStrategy diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala index b42483b83..a7c10ed06 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala @@ -32,10 +32,6 @@ final case class ConsumerSettings( maxRebalanceDuration: Option[Duration] = None, fetchStrategy: FetchStrategy = QueueSizeBasedFetchStrategy() ) { - private[this] def autoOffsetResetConfig: Map[String, String] = offsetRetrieval match { - case OffsetRetrieval.Auto(reset) => Map(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> reset.toConfig) - case OffsetRetrieval.Manual(_) => Map.empty - } /** * Tunes the consumer for high throughput. @@ -74,9 +70,7 @@ final case class ConsumerSettings( .withFetchStrategy(QueueSizeBasedFetchStrategy(partitionPreFetchBufferLimit = 512)) def driverSettings: Map[String, AnyRef] = - Map( - ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> "false" - ) ++ autoOffsetResetConfig ++ properties + Map(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> "false") ++ properties def withBootstrapServers(servers: List[String]): ConsumerSettings = withProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers.mkString(",")) @@ -99,8 +93,44 @@ final case class ConsumerSettings( def withGroupInstanceId(groupInstanceId: String): ConsumerSettings = withProperty(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, groupInstanceId) - def withOffsetRetrieval(retrieval: OffsetRetrieval): ConsumerSettings = + /** + * Which offset to start consuming from for new partitions. + * + * The options are: + * {{{ + * import zio.kafka.consumer.Consumer._ + * OffsetRetrieval.Auto(AutoOffsetStrategy.Latest) // the default + * OffsetRetrieval.Auto(AutoOffsetStrategy.Earliest) + * OffsetRetrieval.Auto(AutoOffsetStrategy.None) + * OffsetRetrieval.Manual(getOffsets, defaultStrategy) + * }}} + * + * The `Auto` options make consuming start from the latest committed offset. When no committed offset is available, + * the given offset strategy is used and consuming starts from the `Latest` offset (the default), the `Earliest` + * offset, or results in an error for `None`. + * + * The `Manual` option allows fine grained control over which offset to consume from. The provided `getOffsets` + * function should return an offset for each topic-partition that is being assigned. When the returned offset is + * smaller than the log start offset or larger than the log end offset, the `defaultStrategy` is used and consuming + * starts from the `Latest` offset (the default), the `Earliest` offset, or results in an error for `None`. + * + * When the returned map does ''not'' contain an entry for a topic-partition, the consumer will continue from the last + * committed offset. When no committed offset is available, the `defaultStrategy` is used and consuming starts from + * the `Latest` offset (the default), the `Earliest` offset, or results in an error for `None`. + * + * This configuration applies to both subscribed and assigned partitions. + * + * This method sets the `auto.offset.reset` Kafka configuration. See + * https://kafka.apache.org/documentation/#consumerconfigs_auto.offset.reset for more information. + */ + def withOffsetRetrieval(retrieval: OffsetRetrieval): ConsumerSettings = { + val resetStrategy = retrieval match { + case OffsetRetrieval.Auto(reset) => reset + case OffsetRetrieval.Manual(_, defaultStrategy) => defaultStrategy + } copy(offsetRetrieval = retrieval) + .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, resetStrategy.toConfig) + } /** * The maximum time to block while polling the Kafka consumer. The Kafka consumer will return earlier when the maximum diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index 1b4fd79c4..1a2983f79 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -386,15 +386,18 @@ private[consumer] final class Runloop private ( if (hasGroupId) consumer.runloopAccess(c => ZIO.attempt(c.groupMetadata())).fold(_ => None, Some(_)) else ZIO.none + /** @return the topic-partitions for which received records should be ignored */ private def doSeekForNewPartitions(c: ByteArrayKafkaConsumer, tps: Set[TopicPartition]): Task[Set[TopicPartition]] = offsetRetrieval match { case OffsetRetrieval.Auto(_) => ZIO.succeed(Set.empty) - case OffsetRetrieval.Manual(getOffsets) => + case OffsetRetrieval.Manual(getOffsets, _) => if (tps.isEmpty) ZIO.succeed(Set.empty) else - getOffsets(tps) - .flatMap(offsets => ZIO.attempt(offsets.foreach { case (tp, offset) => c.seek(tp, offset) })) - .as(tps) + getOffsets(tps).flatMap { offsets => + ZIO + .attempt(offsets.foreach { case (tp, offset) => c.seek(tp, offset) }) + .as(offsets.keySet) + } } /**