diff --git a/zio-kafka-test/src/test/scala/zio/kafka/Benchmarks.scala b/zio-kafka-test/src/test/scala/zio/kafka/Benchmarks.scala index 9fea9d1dc..ab5ffa0ef 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/Benchmarks.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/Benchmarks.scala @@ -1,14 +1,18 @@ package zio.kafka +import org.apache.kafka.clients.consumer.{ ConsumerConfig, KafkaConsumer } import org.apache.kafka.clients.producer.ProducerRecord -import zio.{ System => _, _ }, zio.stream._ +import org.apache.kafka.clients.producer.ProducerConfig +import org.apache.kafka.common.serialization.StringDeserializer +import zio.{ System => _, _ } +import zio.stream._ import zio.kafka.producer._ import zio.kafka.serde._ -import org.apache.kafka.clients.producer.ProducerConfig -import org.apache.kafka.clients.consumer.{ ConsumerConfig, KafkaConsumer } +import zio.kafka.consumer.Consumer.{ AutoOffsetStrategy, OffsetRetrieval } + import scala.jdk.CollectionConverters._ import java.time.Duration -import org.apache.kafka.common.serialization.StringDeserializer + import java.util.concurrent.TimeUnit object PopulateTopic extends ZIOAppDefault { @@ -79,7 +83,7 @@ object ZIOKafka extends ZIOAppDefault { val expectedCount = 1000000 val settings = ConsumerSettings(List("localhost:9092")) .withGroupId(s"zio-kafka-${scala.util.Random.nextInt()}") - .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") + .withOffsetRetrieval(OffsetRetrieval.Auto(AutoOffsetStrategy.Earliest)) .withProperty("fetch.min.bytes", "128000") .withPollTimeout(50.millis) diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala index f55c8404f..6697272ca 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala @@ -607,44 +607,92 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { }.flatten .map(diagnosticEvents => assert(diagnosticEvents.size)(isGreaterThanEqualTo(2))) }, - test("support manual seeking") { - val nrRecords = 10 - val data = (1 to nrRecords).toList.map(i => s"key$i" -> s"msg$i") - val manualOffsetSeek = 3 - - for { - topic <- randomTopic - group <- randomGroup - client1 <- randomClient - client2 <- randomClient + suite("support manual seeking") { + val manualOffsetSeek = 3L + def manualSeekTest(defaultStrategy: AutoOffsetStrategy): ZIO[Kafka & Producer, Throwable, Map[Int, Long]] = { + val nrRecords = 10 + val data = (1 to nrRecords).map(i => s"key$i" -> s"msg$i") + val nrPartitions = 4 + for { + topic <- randomTopic + group <- randomGroup + client1 <- randomClient + client2 <- randomClient + _ <- ZIO.attempt(EmbeddedKafka.createCustomTopic(topic, partitions = nrPartitions)) + produceToAll = ZIO.foreachDiscard(0 until nrPartitions)(p => produceMany(topic, p, data)) + _ <- produceToAll + // Consume 5 records from partitions 0, 1 and 2 (not 3). This sets the current offset to 5. + _ <- ZIO + .foreachDiscard(Chunk(0, 1, 2)) { partition => + Consumer + .plainStream(Subscription.manual(topic, partition), Serde.string, Serde.string) + .take(5) + .transduce(ZSink.collectAllN[CommittableRecord[String, String]](5)) + .mapConcatZIO { committableRecords => + val records = committableRecords.map(_.record) + val offsetBatch = OffsetBatch(committableRecords.map(_.offset)) + offsetBatch.commit.as(records) + } + .runCollect + } + .provideSomeLayer[Kafka](consumer(client1, Some(group))) + // Start a new consumer with manual offsets. The given offset per partition is: + // p0: 3, before the current offset => should consume from the given offset + // p1: _Maxvalue_, offset out of range (invalid) => should consume using default strategy + // p2: _nothing_ given => should consume from the committed offset + // p4: _nothing_ given => should consume using default strategy + offsetRetrieval = OffsetRetrieval.Manual( + getOffsets = _ => + ZIO.attempt( + Map( + new TopicPartition(topic, 0) -> manualOffsetSeek, + new TopicPartition(topic, 1) -> Long.MaxValue + ) + ), + defaultStrategy = defaultStrategy + ) + // Start the second consumer. + c2Fib <- Consumer + .plainStream(Subscription.topics(topic), Serde.string, Serde.string) + .runFoldWhile(Map.empty[Int, Long])(_.size < nrPartitions) { case (acc, record) => + if (acc.contains(record.partition)) acc + else acc + (record.partition -> record.offset.offset) + } + .provideSomeLayer[Kafka]( + consumer(client2, Some(group), offsetRetrieval = offsetRetrieval) + ) + .fork + // For defaultStrategy 'latest' the second consumer won't see a record until we produce some more. + produceFib <- produceToAll + .repeat(Schedule.spaced(1.second)) + .when(defaultStrategy == AutoOffsetStrategy.Latest) + .fork + c2Results <- c2Fib.join + _ <- produceFib.interrupt + } yield c2Results + } - _ <- produceMany(topic, 0, data) - // Consume 5 records to have the offset committed at 5 - _ <- Consumer - .plainStream(Subscription.topics(topic), Serde.string, Serde.string) - .take(5) - .transduce(ZSink.collectAllN[CommittableRecord[String, String]](5)) - .mapConcatZIO { committableRecords => - val records = committableRecords.map(_.record) - val offsetBatch = OffsetBatch(committableRecords.map(_.offset)) - - offsetBatch.commit.as(records) - } - .runCollect - .provideSomeLayer[Kafka](consumer(client1, Some(group))) - // Start a new consumer with manual offset before the committed offset - offsetRetrieval = OffsetRetrieval.Manual(tps => ZIO.attempt(tps.map(_ -> manualOffsetSeek.toLong).toMap)) - secondResults <- Consumer - .plainStream(Subscription.topics(topic), Serde.string, Serde.string) - .take(nrRecords.toLong - manualOffsetSeek) - .map(_.record) - .runCollect - .provideSomeLayer[Kafka]( - consumer(client2, Some(group), offsetRetrieval = offsetRetrieval) - ) - // Check that we only got the records starting from the manually seek'd offset - } yield assert(secondResults.map(rec => rec.key() -> rec.value()).toList)( - equalTo(data.drop(manualOffsetSeek)) + Seq( + test("manual seek with earliest default strategy") { + for { + consumeStartOffsets <- manualSeekTest(AutoOffsetStrategy.Earliest) + } yield assertTrue( + consumeStartOffsets(0) == manualOffsetSeek, + consumeStartOffsets(1) == 0L, + consumeStartOffsets(2) == 5L, + consumeStartOffsets(3) == 0L + ) + }, + test("manual seek with latest default strategy") { + for { + consumeStartOffsets <- manualSeekTest(AutoOffsetStrategy.Latest) + } yield assertTrue( + consumeStartOffsets(0) == manualOffsetSeek, + consumeStartOffsets(1) >= 10L, + consumeStartOffsets(2) == 5L, + consumeStartOffsets(3) >= 10L + ) + } ) }, test("commit offsets for all consumed messages") { diff --git a/zio-kafka-testkit/src/main/scala/zio/kafka/testkit/KafkaTestUtils.scala b/zio-kafka-testkit/src/main/scala/zio/kafka/testkit/KafkaTestUtils.scala index 7ba3c28dd..f19509457 100644 --- a/zio-kafka-testkit/src/main/scala/zio/kafka/testkit/KafkaTestUtils.scala +++ b/zio-kafka-testkit/src/main/scala/zio/kafka/testkit/KafkaTestUtils.scala @@ -114,7 +114,7 @@ object KafkaTestUtils { groupId: Option[String] = None, clientInstanceId: Option[String] = None, allowAutoCreateTopics: Boolean = true, - offsetRetrieval: OffsetRetrieval = OffsetRetrieval.Auto(), + offsetRetrieval: OffsetRetrieval = OffsetRetrieval.Auto(AutoOffsetStrategy.Earliest), restartStreamOnRebalancing: Boolean = false, rebalanceSafeCommits: Boolean = false, maxRebalanceDuration: Duration = 3.minutes, @@ -132,7 +132,6 @@ object KafkaTestUtils { .withMaxPollRecords(`max.poll.records`) .withCommitTimeout(commitTimeout) .withProperties( - ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest", ConsumerConfig.METADATA_MAX_AGE_CONFIG -> "100", ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG -> "3000", ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG -> "1000", @@ -156,7 +155,7 @@ object KafkaTestUtils { clientId: String, clientInstanceId: Option[String] = None, allowAutoCreateTopics: Boolean = true, - offsetRetrieval: OffsetRetrieval = OffsetRetrieval.Auto(), + offsetRetrieval: OffsetRetrieval = OffsetRetrieval.Auto(AutoOffsetStrategy.Earliest), restartStreamOnRebalancing: Boolean = false, rebalanceSafeCommits: Boolean = false, properties: Map[String, String] = Map.empty diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala index b14e759e6..fc3f17ada 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala @@ -401,10 +401,14 @@ object Consumer { def metrics: RIO[Consumer, Map[MetricName, Metric]] = ZIO.serviceWithZIO(_.metrics) + /** See ConsumerSettings.withOffsetRetrieval. */ sealed trait OffsetRetrieval object OffsetRetrieval { - final case class Auto(reset: AutoOffsetStrategy = AutoOffsetStrategy.Latest) extends OffsetRetrieval - final case class Manual(getOffsets: Set[TopicPartition] => Task[Map[TopicPartition, Long]]) extends OffsetRetrieval + final case class Auto(reset: AutoOffsetStrategy = AutoOffsetStrategy.Latest) extends OffsetRetrieval + final case class Manual( + getOffsets: Set[TopicPartition] => Task[Map[TopicPartition, Long]], + defaultStrategy: AutoOffsetStrategy = AutoOffsetStrategy.Latest + ) extends OffsetRetrieval } sealed trait AutoOffsetStrategy { self => @@ -415,6 +419,7 @@ object Consumer { } } + /** See ConsumerSettings.withOffsetRetrieval. */ object AutoOffsetStrategy { case object Earliest extends AutoOffsetStrategy case object Latest extends AutoOffsetStrategy diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala index b42483b83..a7c10ed06 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala @@ -32,10 +32,6 @@ final case class ConsumerSettings( maxRebalanceDuration: Option[Duration] = None, fetchStrategy: FetchStrategy = QueueSizeBasedFetchStrategy() ) { - private[this] def autoOffsetResetConfig: Map[String, String] = offsetRetrieval match { - case OffsetRetrieval.Auto(reset) => Map(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> reset.toConfig) - case OffsetRetrieval.Manual(_) => Map.empty - } /** * Tunes the consumer for high throughput. @@ -74,9 +70,7 @@ final case class ConsumerSettings( .withFetchStrategy(QueueSizeBasedFetchStrategy(partitionPreFetchBufferLimit = 512)) def driverSettings: Map[String, AnyRef] = - Map( - ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> "false" - ) ++ autoOffsetResetConfig ++ properties + Map(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> "false") ++ properties def withBootstrapServers(servers: List[String]): ConsumerSettings = withProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers.mkString(",")) @@ -99,8 +93,44 @@ final case class ConsumerSettings( def withGroupInstanceId(groupInstanceId: String): ConsumerSettings = withProperty(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, groupInstanceId) - def withOffsetRetrieval(retrieval: OffsetRetrieval): ConsumerSettings = + /** + * Which offset to start consuming from for new partitions. + * + * The options are: + * {{{ + * import zio.kafka.consumer.Consumer._ + * OffsetRetrieval.Auto(AutoOffsetStrategy.Latest) // the default + * OffsetRetrieval.Auto(AutoOffsetStrategy.Earliest) + * OffsetRetrieval.Auto(AutoOffsetStrategy.None) + * OffsetRetrieval.Manual(getOffsets, defaultStrategy) + * }}} + * + * The `Auto` options make consuming start from the latest committed offset. When no committed offset is available, + * the given offset strategy is used and consuming starts from the `Latest` offset (the default), the `Earliest` + * offset, or results in an error for `None`. + * + * The `Manual` option allows fine grained control over which offset to consume from. The provided `getOffsets` + * function should return an offset for each topic-partition that is being assigned. When the returned offset is + * smaller than the log start offset or larger than the log end offset, the `defaultStrategy` is used and consuming + * starts from the `Latest` offset (the default), the `Earliest` offset, or results in an error for `None`. + * + * When the returned map does ''not'' contain an entry for a topic-partition, the consumer will continue from the last + * committed offset. When no committed offset is available, the `defaultStrategy` is used and consuming starts from + * the `Latest` offset (the default), the `Earliest` offset, or results in an error for `None`. + * + * This configuration applies to both subscribed and assigned partitions. + * + * This method sets the `auto.offset.reset` Kafka configuration. See + * https://kafka.apache.org/documentation/#consumerconfigs_auto.offset.reset for more information. + */ + def withOffsetRetrieval(retrieval: OffsetRetrieval): ConsumerSettings = { + val resetStrategy = retrieval match { + case OffsetRetrieval.Auto(reset) => reset + case OffsetRetrieval.Manual(_, defaultStrategy) => defaultStrategy + } copy(offsetRetrieval = retrieval) + .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, resetStrategy.toConfig) + } /** * The maximum time to block while polling the Kafka consumer. The Kafka consumer will return earlier when the maximum diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index 1b4fd79c4..1a2983f79 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -386,15 +386,18 @@ private[consumer] final class Runloop private ( if (hasGroupId) consumer.runloopAccess(c => ZIO.attempt(c.groupMetadata())).fold(_ => None, Some(_)) else ZIO.none + /** @return the topic-partitions for which received records should be ignored */ private def doSeekForNewPartitions(c: ByteArrayKafkaConsumer, tps: Set[TopicPartition]): Task[Set[TopicPartition]] = offsetRetrieval match { case OffsetRetrieval.Auto(_) => ZIO.succeed(Set.empty) - case OffsetRetrieval.Manual(getOffsets) => + case OffsetRetrieval.Manual(getOffsets, _) => if (tps.isEmpty) ZIO.succeed(Set.empty) else - getOffsets(tps) - .flatMap(offsets => ZIO.attempt(offsets.foreach { case (tp, offset) => c.seek(tp, offset) })) - .as(tps) + getOffsets(tps).flatMap { offsets => + ZIO + .attempt(offsets.foreach { case (tp, offset) => c.seek(tp, offset) }) + .as(offsets.keySet) + } } /**