Skip to content

Commit

Permalink
Keep track of last pulled offset (#1086)
Browse files Browse the repository at this point in the history
For #830 we need to keep track of which records were pulled from the stream so that we can wait for them to be committed. This change prepares for that.

The idea is that the rebalance listener first ends the streams for revoked partitions, then it will get the last consumed offset using the now public `completedPromise`. With that info, it can await these commits to complete.

The advantage of this approach is that it is very precise; we only await offset commits for records that we know were pulled from the stream. Alternative implementation would track the given offsets inside Runloop where we do not have the exact knowledge of what was pulled or not.
  • Loading branch information
erikvanoosten authored Oct 28, 2023
1 parent 35fdcc9 commit 9ada445
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
// The runner for GitHub Actions is a bit underpowered. The machine is so busy that the logic
// that detects the timeout doesn't get the chance to execute quickly enough. To compensate we
// sleep a huge amount of time:
.tap(r => ZIO.sleep(10.seconds).when(r.key == "key3"))
.tap(r => ZIO.sleep(20.seconds).when(r.key == "key3"))
// Use `take` to ensure the test ends quickly, even when the interrupt fails to occur.
// Because of chunking, we need to pull more than 3 records before the interrupt kicks in.
.take(100)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package zio.kafka.consumer.internal

import org.apache.kafka.common.TopicPartition
import zio.kafka.consumer.Offset
import zio.kafka.consumer.diagnostics.{ DiagnosticEvent, Diagnostics }
import zio.kafka.consumer.internal.PartitionStreamControl.QueueInfo
import zio.kafka.consumer.internal.Runloop.ByteArrayCommittableRecord
Expand All @@ -15,12 +16,30 @@ abstract class PartitionStream {
def queueSize: UIO[Int]
}

/**
* Provides control and information over a stream that consumes from a partition.
*
* @param tp
* topic and partition
* @param stream
* the stream
* @param dataQueue
* the queue the stream reads data from
* @param interruptionPromise
* a promise that when completed stops the stream
* @param completedPromise
* the last pulled offset (if any). The promise completes when the stream completed.
* @param queueInfoRef
* used to track the stream's pull deadline, its queue size, and last pulled offset
* @param maxPollInterval
* see [[zio.kafka.consumer.ConsumerSettings.withMaxPollInterval()]]
*/
final class PartitionStreamControl private (
val tp: TopicPartition,
stream: ZStream[Any, Throwable, ByteArrayCommittableRecord],
dataQueue: Queue[Take[Throwable, ByteArrayCommittableRecord]],
interruptionPromise: Promise[Throwable, Unit],
completedPromise: Promise[Nothing, Unit],
val completedPromise: Promise[Nothing, Option[Offset]],
queueInfoRef: Ref[QueueInfo],
maxPollInterval: Duration
) extends PartitionStream {
Expand Down Expand Up @@ -97,20 +116,20 @@ object PartitionStreamControl {
): UIO[PartitionStreamControl] = {
val maxPollIntervalNanos = maxPollInterval.toNanos

def registerPull(queueInfo: Ref[QueueInfo], recordCount: Int): UIO[Unit] =
def registerPull(queueInfo: Ref[QueueInfo], records: Chunk[ByteArrayCommittableRecord]): UIO[Unit] =
for {
now <- Clock.nanoTime
newPullDeadline = now + maxPollIntervalNanos
_ <- queueInfo.update(_.withPull(newPullDeadline, recordCount))
_ <- queueInfo.update(_.withPull(newPullDeadline, records))
} yield ()

for {
_ <- ZIO.logDebug(s"Creating partition stream ${tp.toString}")
interruptionPromise <- Promise.make[Throwable, Unit]
completedPromise <- Promise.make[Nothing, Unit]
completedPromise <- Promise.make[Nothing, Option[Offset]]
dataQueue <- Queue.unbounded[Take[Throwable, ByteArrayCommittableRecord]]
now <- Clock.nanoTime
queueInfo <- Ref.make(QueueInfo(now, 0))
queueInfo <- Ref.make(QueueInfo(now, 0, None))
requestAndAwaitData =
for {
_ <- commandQueue.offer(RunloopCommand.Request(tp))
Expand All @@ -122,16 +141,19 @@ object PartitionStreamControl {
LogAnnotation("topic", tp.topic()),
LogAnnotation("partition", tp.partition().toString)
) *>
ZStream.finalizer(
completedPromise.succeed(()) <*
ZIO.logDebug(s"Partition stream ${tp.toString} has ended")
) *>
ZStream.finalizer {
for {
qi <- queueInfo.get
_ <- completedPromise.succeed(qi.lastPulledOffset)
_ <- ZIO.logDebug(s"Partition stream ${tp.toString} has ended")
} yield ()
} *>
ZStream.repeatZIOChunk {
// First try to take all records that are available right now.
// When no data is available, request more data and await its arrival.
dataQueue.takeAll.flatMap(data => if (data.isEmpty) requestAndAwaitData else ZIO.succeed(data))
}.flattenTake
.chunksWith(_.tap(records => registerPull(queueInfo, records.size)))
.chunksWith(_.tap(records => registerPull(queueInfo, records)))
.interruptWhen(interruptionPromise)
} yield new PartitionStreamControl(
tp,
Expand All @@ -146,12 +168,12 @@ object PartitionStreamControl {

// The `pullDeadline` is only relevant when `size > 0`. We initialize `pullDeadline` as soon as size goes above 0.
// (Note that theoretically `size` can go below 0 when the update operations are reordered.)
private final case class QueueInfo(pullDeadline: NanoTime, size: Int) {
private final case class QueueInfo(pullDeadline: NanoTime, size: Int, lastPulledOffset: Option[Offset]) {
def withOffer(newPullDeadline: NanoTime, recordCount: Int): QueueInfo =
QueueInfo(if (size <= 0) newPullDeadline else pullDeadline, size + recordCount)
QueueInfo(if (size <= 0) newPullDeadline else pullDeadline, size + recordCount, lastPulledOffset)

def withPull(newPullDeadline: NanoTime, recordCount: Int): QueueInfo =
QueueInfo(newPullDeadline, size - recordCount)
def withPull(newPullDeadline: NanoTime, records: Chunk[ByteArrayCommittableRecord]): QueueInfo =
QueueInfo(newPullDeadline, size - records.size, records.lastOption.map(_.offset).orElse(lastPulledOffset))

def deadlineExceeded(now: NanoTime): Boolean =
size > 0 && pullDeadline <= now
Expand Down

0 comments on commit 9ada445

Please sign in to comment.