Skip to content

Commit

Permalink
Improve support and documentation for manual offsets (#1135)
Browse files Browse the repository at this point in the history
According to the javadoc of `seek`, when a given offset is invalid, it falls back to the `auto.offset.reset` configuration. In this change we allow the user to set that configuration when `Manual` offset retrieval is used. Furthermore, testing showed that we fall back to 'Auto' behavior when no offset is given.

This change also contains a behavior change. For Manual offsets, when no offset is given by the user, before we would skip the records from the first poll. After this change those are no longer skipped.

In addition, add documentation on how to configure offset retrieval.

Note: this change is source compatible, but not binary compatible.
  • Loading branch information
erikvanoosten authored Dec 17, 2023
1 parent dfcef72 commit a42211a
Show file tree
Hide file tree
Showing 6 changed files with 148 additions and 59 deletions.
14 changes: 9 additions & 5 deletions zio-kafka-test/src/test/scala/zio/kafka/Benchmarks.scala
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
package zio.kafka

import org.apache.kafka.clients.consumer.{ ConsumerConfig, KafkaConsumer }
import org.apache.kafka.clients.producer.ProducerRecord
import zio.{ System => _, _ }, zio.stream._
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import zio.{ System => _, _ }
import zio.stream._
import zio.kafka.producer._
import zio.kafka.serde._
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.clients.consumer.{ ConsumerConfig, KafkaConsumer }
import zio.kafka.consumer.Consumer.{ AutoOffsetStrategy, OffsetRetrieval }

import scala.jdk.CollectionConverters._
import java.time.Duration
import org.apache.kafka.common.serialization.StringDeserializer

import java.util.concurrent.TimeUnit

object PopulateTopic extends ZIOAppDefault {
Expand Down Expand Up @@ -79,7 +83,7 @@ object ZIOKafka extends ZIOAppDefault {
val expectedCount = 1000000
val settings = ConsumerSettings(List("localhost:9092"))
.withGroupId(s"zio-kafka-${scala.util.Random.nextInt()}")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
.withOffsetRetrieval(OffsetRetrieval.Auto(AutoOffsetStrategy.Earliest))
.withProperty("fetch.min.bytes", "128000")
.withPollTimeout(50.millis)

Expand Down
122 changes: 85 additions & 37 deletions zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -607,44 +607,92 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
}.flatten
.map(diagnosticEvents => assert(diagnosticEvents.size)(isGreaterThanEqualTo(2)))
},
test("support manual seeking") {
val nrRecords = 10
val data = (1 to nrRecords).toList.map(i => s"key$i" -> s"msg$i")
val manualOffsetSeek = 3

for {
topic <- randomTopic
group <- randomGroup
client1 <- randomClient
client2 <- randomClient
suite("support manual seeking") {
val manualOffsetSeek = 3L
def manualSeekTest(defaultStrategy: AutoOffsetStrategy): ZIO[Kafka & Producer, Throwable, Map[Int, Long]] = {
val nrRecords = 10
val data = (1 to nrRecords).map(i => s"key$i" -> s"msg$i")
val nrPartitions = 4
for {
topic <- randomTopic
group <- randomGroup
client1 <- randomClient
client2 <- randomClient
_ <- ZIO.attempt(EmbeddedKafka.createCustomTopic(topic, partitions = nrPartitions))
produceToAll = ZIO.foreachDiscard(0 until nrPartitions)(p => produceMany(topic, p, data))
_ <- produceToAll
// Consume 5 records from partitions 0, 1 and 2 (not 3). This sets the current offset to 5.
_ <- ZIO
.foreachDiscard(Chunk(0, 1, 2)) { partition =>
Consumer
.plainStream(Subscription.manual(topic, partition), Serde.string, Serde.string)
.take(5)
.transduce(ZSink.collectAllN[CommittableRecord[String, String]](5))
.mapConcatZIO { committableRecords =>
val records = committableRecords.map(_.record)
val offsetBatch = OffsetBatch(committableRecords.map(_.offset))
offsetBatch.commit.as(records)
}
.runCollect
}
.provideSomeLayer[Kafka](consumer(client1, Some(group)))
// Start a new consumer with manual offsets. The given offset per partition is:
// p0: 3, before the current offset => should consume from the given offset
// p1: _Maxvalue_, offset out of range (invalid) => should consume using default strategy
// p2: _nothing_ given => should consume from the committed offset
// p4: _nothing_ given => should consume using default strategy
offsetRetrieval = OffsetRetrieval.Manual(
getOffsets = _ =>
ZIO.attempt(
Map(
new TopicPartition(topic, 0) -> manualOffsetSeek,
new TopicPartition(topic, 1) -> Long.MaxValue
)
),
defaultStrategy = defaultStrategy
)
// Start the second consumer.
c2Fib <- Consumer
.plainStream(Subscription.topics(topic), Serde.string, Serde.string)
.runFoldWhile(Map.empty[Int, Long])(_.size < nrPartitions) { case (acc, record) =>
if (acc.contains(record.partition)) acc
else acc + (record.partition -> record.offset.offset)
}
.provideSomeLayer[Kafka](
consumer(client2, Some(group), offsetRetrieval = offsetRetrieval)
)
.fork
// For defaultStrategy 'latest' the second consumer won't see a record until we produce some more.
produceFib <- produceToAll
.repeat(Schedule.spaced(1.second))
.when(defaultStrategy == AutoOffsetStrategy.Latest)
.fork
c2Results <- c2Fib.join
_ <- produceFib.interrupt
} yield c2Results
}

_ <- produceMany(topic, 0, data)
// Consume 5 records to have the offset committed at 5
_ <- Consumer
.plainStream(Subscription.topics(topic), Serde.string, Serde.string)
.take(5)
.transduce(ZSink.collectAllN[CommittableRecord[String, String]](5))
.mapConcatZIO { committableRecords =>
val records = committableRecords.map(_.record)
val offsetBatch = OffsetBatch(committableRecords.map(_.offset))

offsetBatch.commit.as(records)
}
.runCollect
.provideSomeLayer[Kafka](consumer(client1, Some(group)))
// Start a new consumer with manual offset before the committed offset
offsetRetrieval = OffsetRetrieval.Manual(tps => ZIO.attempt(tps.map(_ -> manualOffsetSeek.toLong).toMap))
secondResults <- Consumer
.plainStream(Subscription.topics(topic), Serde.string, Serde.string)
.take(nrRecords.toLong - manualOffsetSeek)
.map(_.record)
.runCollect
.provideSomeLayer[Kafka](
consumer(client2, Some(group), offsetRetrieval = offsetRetrieval)
)
// Check that we only got the records starting from the manually seek'd offset
} yield assert(secondResults.map(rec => rec.key() -> rec.value()).toList)(
equalTo(data.drop(manualOffsetSeek))
Seq(
test("manual seek with earliest default strategy") {
for {
consumeStartOffsets <- manualSeekTest(AutoOffsetStrategy.Earliest)
} yield assertTrue(
consumeStartOffsets(0) == manualOffsetSeek,
consumeStartOffsets(1) == 0L,
consumeStartOffsets(2) == 5L,
consumeStartOffsets(3) == 0L
)
},
test("manual seek with latest default strategy") {
for {
consumeStartOffsets <- manualSeekTest(AutoOffsetStrategy.Latest)
} yield assertTrue(
consumeStartOffsets(0) == manualOffsetSeek,
consumeStartOffsets(1) >= 10L,
consumeStartOffsets(2) == 5L,
consumeStartOffsets(3) >= 10L
)
}
)
},
test("commit offsets for all consumed messages") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ object KafkaTestUtils {
groupId: Option[String] = None,
clientInstanceId: Option[String] = None,
allowAutoCreateTopics: Boolean = true,
offsetRetrieval: OffsetRetrieval = OffsetRetrieval.Auto(),
offsetRetrieval: OffsetRetrieval = OffsetRetrieval.Auto(AutoOffsetStrategy.Earliest),
restartStreamOnRebalancing: Boolean = false,
rebalanceSafeCommits: Boolean = false,
maxRebalanceDuration: Duration = 3.minutes,
Expand All @@ -132,7 +132,6 @@ object KafkaTestUtils {
.withMaxPollRecords(`max.poll.records`)
.withCommitTimeout(commitTimeout)
.withProperties(
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest",
ConsumerConfig.METADATA_MAX_AGE_CONFIG -> "100",
ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG -> "3000",
ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG -> "1000",
Expand All @@ -156,7 +155,7 @@ object KafkaTestUtils {
clientId: String,
clientInstanceId: Option[String] = None,
allowAutoCreateTopics: Boolean = true,
offsetRetrieval: OffsetRetrieval = OffsetRetrieval.Auto(),
offsetRetrieval: OffsetRetrieval = OffsetRetrieval.Auto(AutoOffsetStrategy.Earliest),
restartStreamOnRebalancing: Boolean = false,
rebalanceSafeCommits: Boolean = false,
properties: Map[String, String] = Map.empty
Expand Down
9 changes: 7 additions & 2 deletions zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -401,10 +401,14 @@ object Consumer {
def metrics: RIO[Consumer, Map[MetricName, Metric]] =
ZIO.serviceWithZIO(_.metrics)

/** See ConsumerSettings.withOffsetRetrieval. */
sealed trait OffsetRetrieval
object OffsetRetrieval {
final case class Auto(reset: AutoOffsetStrategy = AutoOffsetStrategy.Latest) extends OffsetRetrieval
final case class Manual(getOffsets: Set[TopicPartition] => Task[Map[TopicPartition, Long]]) extends OffsetRetrieval
final case class Auto(reset: AutoOffsetStrategy = AutoOffsetStrategy.Latest) extends OffsetRetrieval
final case class Manual(
getOffsets: Set[TopicPartition] => Task[Map[TopicPartition, Long]],
defaultStrategy: AutoOffsetStrategy = AutoOffsetStrategy.Latest
) extends OffsetRetrieval
}

sealed trait AutoOffsetStrategy { self =>
Expand All @@ -415,6 +419,7 @@ object Consumer {
}
}

/** See ConsumerSettings.withOffsetRetrieval. */
object AutoOffsetStrategy {
case object Earliest extends AutoOffsetStrategy
case object Latest extends AutoOffsetStrategy
Expand Down
46 changes: 38 additions & 8 deletions zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,6 @@ final case class ConsumerSettings(
maxRebalanceDuration: Option[Duration] = None,
fetchStrategy: FetchStrategy = QueueSizeBasedFetchStrategy()
) {
private[this] def autoOffsetResetConfig: Map[String, String] = offsetRetrieval match {
case OffsetRetrieval.Auto(reset) => Map(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> reset.toConfig)
case OffsetRetrieval.Manual(_) => Map.empty
}

/**
* Tunes the consumer for high throughput.
Expand Down Expand Up @@ -74,9 +70,7 @@ final case class ConsumerSettings(
.withFetchStrategy(QueueSizeBasedFetchStrategy(partitionPreFetchBufferLimit = 512))

def driverSettings: Map[String, AnyRef] =
Map(
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> "false"
) ++ autoOffsetResetConfig ++ properties
Map(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> "false") ++ properties

def withBootstrapServers(servers: List[String]): ConsumerSettings =
withProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers.mkString(","))
Expand All @@ -99,8 +93,44 @@ final case class ConsumerSettings(
def withGroupInstanceId(groupInstanceId: String): ConsumerSettings =
withProperty(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, groupInstanceId)

def withOffsetRetrieval(retrieval: OffsetRetrieval): ConsumerSettings =
/**
* Which offset to start consuming from for new partitions.
*
* The options are:
* {{{
* import zio.kafka.consumer.Consumer._
* OffsetRetrieval.Auto(AutoOffsetStrategy.Latest) // the default
* OffsetRetrieval.Auto(AutoOffsetStrategy.Earliest)
* OffsetRetrieval.Auto(AutoOffsetStrategy.None)
* OffsetRetrieval.Manual(getOffsets, defaultStrategy)
* }}}
*
* The `Auto` options make consuming start from the latest committed offset. When no committed offset is available,
* the given offset strategy is used and consuming starts from the `Latest` offset (the default), the `Earliest`
* offset, or results in an error for `None`.
*
* The `Manual` option allows fine grained control over which offset to consume from. The provided `getOffsets`
* function should return an offset for each topic-partition that is being assigned. When the returned offset is
* smaller than the log start offset or larger than the log end offset, the `defaultStrategy` is used and consuming
* starts from the `Latest` offset (the default), the `Earliest` offset, or results in an error for `None`.
*
* When the returned map does ''not'' contain an entry for a topic-partition, the consumer will continue from the last
* committed offset. When no committed offset is available, the `defaultStrategy` is used and consuming starts from
* the `Latest` offset (the default), the `Earliest` offset, or results in an error for `None`.
*
* This configuration applies to both subscribed and assigned partitions.
*
* This method sets the `auto.offset.reset` Kafka configuration. See
* https://kafka.apache.org/documentation/#consumerconfigs_auto.offset.reset for more information.
*/
def withOffsetRetrieval(retrieval: OffsetRetrieval): ConsumerSettings = {
val resetStrategy = retrieval match {
case OffsetRetrieval.Auto(reset) => reset
case OffsetRetrieval.Manual(_, defaultStrategy) => defaultStrategy
}
copy(offsetRetrieval = retrieval)
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, resetStrategy.toConfig)
}

/**
* The maximum time to block while polling the Kafka consumer. The Kafka consumer will return earlier when the maximum
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,15 +386,18 @@ private[consumer] final class Runloop private (
if (hasGroupId) consumer.runloopAccess(c => ZIO.attempt(c.groupMetadata())).fold(_ => None, Some(_))
else ZIO.none

/** @return the topic-partitions for which received records should be ignored */
private def doSeekForNewPartitions(c: ByteArrayKafkaConsumer, tps: Set[TopicPartition]): Task[Set[TopicPartition]] =
offsetRetrieval match {
case OffsetRetrieval.Auto(_) => ZIO.succeed(Set.empty)
case OffsetRetrieval.Manual(getOffsets) =>
case OffsetRetrieval.Manual(getOffsets, _) =>
if (tps.isEmpty) ZIO.succeed(Set.empty)
else
getOffsets(tps)
.flatMap(offsets => ZIO.attempt(offsets.foreach { case (tp, offset) => c.seek(tp, offset) }))
.as(tps)
getOffsets(tps).flatMap { offsets =>
ZIO
.attempt(offsets.foreach { case (tp, offset) => c.seek(tp, offset) })
.as(offsets.keySet)
}
}

/**
Expand Down

0 comments on commit a42211a

Please sign in to comment.