From 9ada445c46b2bcf2454991245fba073ec4315279 Mon Sep 17 00:00:00 2001 From: Erik van Oosten Date: Sat, 28 Oct 2023 10:29:50 +0200 Subject: [PATCH] Keep track of last pulled offset (#1086) For #830 we need to keep track of which records were pulled from the stream so that we can wait for them to be committed. This change prepares for that. The idea is that the rebalance listener first ends the streams for revoked partitions, then it will get the last consumed offset using the now public `completedPromise`. With that info, it can await these commits to complete. The advantage of this approach is that it is very precise; we only await offset commits for records that we know were pulled from the stream. Alternative implementation would track the given offsets inside Runloop where we do not have the exact knowledge of what was pulled or not. --- .../zio/kafka/consumer/ConsumerSpec.scala | 2 +- .../internal/PartitionStreamControl.scala | 50 +++++++++++++------ 2 files changed, 37 insertions(+), 15 deletions(-) diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala index 5ad68a1f3..63cfc269d 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala @@ -346,7 +346,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { // The runner for GitHub Actions is a bit underpowered. The machine is so busy that the logic // that detects the timeout doesn't get the chance to execute quickly enough. To compensate we // sleep a huge amount of time: - .tap(r => ZIO.sleep(10.seconds).when(r.key == "key3")) + .tap(r => ZIO.sleep(20.seconds).when(r.key == "key3")) // Use `take` to ensure the test ends quickly, even when the interrupt fails to occur. // Because of chunking, we need to pull more than 3 records before the interrupt kicks in. .take(100) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala index 554b19952..f1ab25db1 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala @@ -1,6 +1,7 @@ package zio.kafka.consumer.internal import org.apache.kafka.common.TopicPartition +import zio.kafka.consumer.Offset import zio.kafka.consumer.diagnostics.{ DiagnosticEvent, Diagnostics } import zio.kafka.consumer.internal.PartitionStreamControl.QueueInfo import zio.kafka.consumer.internal.Runloop.ByteArrayCommittableRecord @@ -15,12 +16,30 @@ abstract class PartitionStream { def queueSize: UIO[Int] } +/** + * Provides control and information over a stream that consumes from a partition. + * + * @param tp + * topic and partition + * @param stream + * the stream + * @param dataQueue + * the queue the stream reads data from + * @param interruptionPromise + * a promise that when completed stops the stream + * @param completedPromise + * the last pulled offset (if any). The promise completes when the stream completed. + * @param queueInfoRef + * used to track the stream's pull deadline, its queue size, and last pulled offset + * @param maxPollInterval + * see [[zio.kafka.consumer.ConsumerSettings.withMaxPollInterval()]] + */ final class PartitionStreamControl private ( val tp: TopicPartition, stream: ZStream[Any, Throwable, ByteArrayCommittableRecord], dataQueue: Queue[Take[Throwable, ByteArrayCommittableRecord]], interruptionPromise: Promise[Throwable, Unit], - completedPromise: Promise[Nothing, Unit], + val completedPromise: Promise[Nothing, Option[Offset]], queueInfoRef: Ref[QueueInfo], maxPollInterval: Duration ) extends PartitionStream { @@ -97,20 +116,20 @@ object PartitionStreamControl { ): UIO[PartitionStreamControl] = { val maxPollIntervalNanos = maxPollInterval.toNanos - def registerPull(queueInfo: Ref[QueueInfo], recordCount: Int): UIO[Unit] = + def registerPull(queueInfo: Ref[QueueInfo], records: Chunk[ByteArrayCommittableRecord]): UIO[Unit] = for { now <- Clock.nanoTime newPullDeadline = now + maxPollIntervalNanos - _ <- queueInfo.update(_.withPull(newPullDeadline, recordCount)) + _ <- queueInfo.update(_.withPull(newPullDeadline, records)) } yield () for { _ <- ZIO.logDebug(s"Creating partition stream ${tp.toString}") interruptionPromise <- Promise.make[Throwable, Unit] - completedPromise <- Promise.make[Nothing, Unit] + completedPromise <- Promise.make[Nothing, Option[Offset]] dataQueue <- Queue.unbounded[Take[Throwable, ByteArrayCommittableRecord]] now <- Clock.nanoTime - queueInfo <- Ref.make(QueueInfo(now, 0)) + queueInfo <- Ref.make(QueueInfo(now, 0, None)) requestAndAwaitData = for { _ <- commandQueue.offer(RunloopCommand.Request(tp)) @@ -122,16 +141,19 @@ object PartitionStreamControl { LogAnnotation("topic", tp.topic()), LogAnnotation("partition", tp.partition().toString) ) *> - ZStream.finalizer( - completedPromise.succeed(()) <* - ZIO.logDebug(s"Partition stream ${tp.toString} has ended") - ) *> + ZStream.finalizer { + for { + qi <- queueInfo.get + _ <- completedPromise.succeed(qi.lastPulledOffset) + _ <- ZIO.logDebug(s"Partition stream ${tp.toString} has ended") + } yield () + } *> ZStream.repeatZIOChunk { // First try to take all records that are available right now. // When no data is available, request more data and await its arrival. dataQueue.takeAll.flatMap(data => if (data.isEmpty) requestAndAwaitData else ZIO.succeed(data)) }.flattenTake - .chunksWith(_.tap(records => registerPull(queueInfo, records.size))) + .chunksWith(_.tap(records => registerPull(queueInfo, records))) .interruptWhen(interruptionPromise) } yield new PartitionStreamControl( tp, @@ -146,12 +168,12 @@ object PartitionStreamControl { // The `pullDeadline` is only relevant when `size > 0`. We initialize `pullDeadline` as soon as size goes above 0. // (Note that theoretically `size` can go below 0 when the update operations are reordered.) - private final case class QueueInfo(pullDeadline: NanoTime, size: Int) { + private final case class QueueInfo(pullDeadline: NanoTime, size: Int, lastPulledOffset: Option[Offset]) { def withOffer(newPullDeadline: NanoTime, recordCount: Int): QueueInfo = - QueueInfo(if (size <= 0) newPullDeadline else pullDeadline, size + recordCount) + QueueInfo(if (size <= 0) newPullDeadline else pullDeadline, size + recordCount, lastPulledOffset) - def withPull(newPullDeadline: NanoTime, recordCount: Int): QueueInfo = - QueueInfo(newPullDeadline, size - recordCount) + def withPull(newPullDeadline: NanoTime, records: Chunk[ByteArrayCommittableRecord]): QueueInfo = + QueueInfo(newPullDeadline, size - records.size, records.lastOption.map(_.offset).orElse(lastPulledOffset)) def deadlineExceeded(now: NanoTime): Boolean = size > 0 && pullDeadline <= now