Skip to content

Commit

Permalink
Enable the consumer to commit an offset with metadata (#1067)
Browse files Browse the repository at this point in the history
Fixes #1066
  • Loading branch information
flavienbert authored Oct 27, 2023
1 parent 8981706 commit 35fdcc9
Show file tree
Hide file tree
Showing 8 changed files with 79 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,29 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
.provideSomeLayer[Kafka](consumer(client, Some(group)))
} yield assert(offsets.values.map(_.map(_.offset)))(forall(isSome(equalTo(nrMessages.toLong / nrPartitions))))
},
test("commits an offset with metadata") {
for {
topic <- randomTopic
group <- randomGroup
metadata <- randomThing("metadata")
client <- randomClient
_ <- ZIO.attempt(EmbeddedKafka.createCustomTopic(topic, partitions = 1))
_ <- produceOne(topic, "key", "msg")

// Consume messages
subscription = Subscription.topics(topic)
offsets <- (Consumer
.partitionedStream(subscription, Serde.string, Serde.string)
.flatMap(_._2.map(_.offset.withMetadata(metadata)))
.take(1)
.transduce(Consumer.offsetBatches)
.take(1)
.mapZIO(_.commit)
.runDrain *>
Consumer.committed(Set(new TopicPartition(topic, 0))))
.provideSomeLayer[Kafka](consumer(client, Some(group)))
} yield assert(offsets.values.headOption.flatten.map(_.metadata))(isSome(equalTo(metadata)))
},
test("handle rebalancing by completing topic-partition streams") {
val nrMessages = 50
val nrPartitions = 6 // Must be even and strictly positive
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package zio.kafka.consumer

import org.apache.kafka.clients.consumer.{ ConsumerGroupMetadata, ConsumerRecord }
import org.apache.kafka.clients.consumer.{ ConsumerGroupMetadata, ConsumerRecord, OffsetAndMetadata }
import org.apache.kafka.common.TopicPartition
import zio.kafka.serde.Deserializer
import zio.{ RIO, Task }

final case class CommittableRecord[K, V](
record: ConsumerRecord[K, V],
private val commitHandle: Map[TopicPartition, Long] => Task[Unit],
private val commitHandle: Map[TopicPartition, OffsetAndMetadata] => Task[Unit],
private val consumerGroupMetadata: Option[ConsumerGroupMetadata]
) {
def deserializeWith[R, K1, V1](
Expand Down Expand Up @@ -44,14 +44,15 @@ final case class CommittableRecord[K, V](
partition = record.partition(),
offset = record.offset(),
commitHandle = commitHandle,
consumerGroupMetadata = consumerGroupMetadata
consumerGroupMetadata = consumerGroupMetadata,
metadata = None
)
}

object CommittableRecord {
def apply[K, V](
record: ConsumerRecord[K, V],
commitHandle: Map[TopicPartition, Long] => Task[Unit],
commitHandle: Map[TopicPartition, OffsetAndMetadata] => Task[Unit],
consumerGroupMetadata: Option[ConsumerGroupMetadata]
): CommittableRecord[K, V] =
new CommittableRecord(
Expand Down
21 changes: 16 additions & 5 deletions zio-kafka/src/main/scala/zio/kafka/consumer/Offset.scala
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
package zio.kafka.consumer

import org.apache.kafka.clients.consumer.{ ConsumerGroupMetadata, RetriableCommitFailedException }
import org.apache.kafka.clients.consumer.{ ConsumerGroupMetadata, OffsetAndMetadata, RetriableCommitFailedException }
import org.apache.kafka.common.TopicPartition
import zio.{ RIO, Schedule, Task }

sealed trait Offset {

def topic: String
def partition: Int
def offset: Long
def commit: Task[Unit]
def batch: OffsetBatch
def consumerGroupMetadata: Option[ConsumerGroupMetadata]
def withMetadata(metadata: String): Offset

private[consumer] def metadata: Option[String]
private[consumer] def asJavaOffsetAndMetadata: OffsetAndMetadata = new OffsetAndMetadata(offset, metadata.orNull)

/**
* Attempts to commit and retries according to the given policy when the commit fails with a
Expand Down Expand Up @@ -39,9 +44,15 @@ private final case class OffsetImpl(
topic: String,
partition: Int,
offset: Long,
commitHandle: Map[TopicPartition, Long] => Task[Unit],
consumerGroupMetadata: Option[ConsumerGroupMetadata]
commitHandle: Map[TopicPartition, OffsetAndMetadata] => Task[Unit],
consumerGroupMetadata: Option[ConsumerGroupMetadata],
metadata: Option[String] = None
) extends Offset {
def commit: Task[Unit] = commitHandle(Map(topicPartition -> offset))
def batch: OffsetBatch = OffsetBatchImpl(Map(topicPartition -> offset), commitHandle, consumerGroupMetadata)
def commit: Task[Unit] = commitHandle(Map(topicPartition -> asJavaOffsetAndMetadata))
def batch: OffsetBatch = OffsetBatchImpl(
Map(topicPartition -> asJavaOffsetAndMetadata),
commitHandle,
consumerGroupMetadata
)
def withMetadata(metadata: String): OffsetImpl = copy(metadata = Some(metadata))
}
31 changes: 19 additions & 12 deletions zio-kafka/src/main/scala/zio/kafka/consumer/OffsetBatch.scala
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package zio.kafka.consumer

import org.apache.kafka.clients.consumer.ConsumerGroupMetadata
import org.apache.kafka.clients.consumer.{ ConsumerGroupMetadata, OffsetAndMetadata }
import org.apache.kafka.common.TopicPartition
import zio.{ RIO, Schedule, Task, ZIO }

sealed trait OffsetBatch {
def offsets: Map[TopicPartition, Long]
def offsets: Map[TopicPartition, OffsetAndMetadata]
def commit: Task[Unit]
def add(offset: Offset): OffsetBatch
@deprecated("Use add(Offset) instead", "2.1.4")
Expand All @@ -28,35 +28,42 @@ object OffsetBatch {
}

private final case class OffsetBatchImpl(
offsets: Map[TopicPartition, Long],
commitHandle: Map[TopicPartition, Long] => Task[Unit],
offsets: Map[TopicPartition, OffsetAndMetadata],
commitHandle: Map[TopicPartition, OffsetAndMetadata] => Task[Unit],
consumerGroupMetadata: Option[ConsumerGroupMetadata]
) extends OffsetBatch {
override def commit: Task[Unit] = commitHandle(offsets)

override def add(offset: Offset): OffsetBatch =
override def add(offset: Offset): OffsetBatch = {
val maxOffsetAndMetadata = offsets.get(offset.topicPartition) match {
case Some(existing) if existing.offset > offset.offset => existing
case _ => offset.asJavaOffsetAndMetadata
}

copy(
offsets = offsets + (offset.topicPartition -> (offsets
.getOrElse(offset.topicPartition, -1L) max offset.offset))
offsets = offsets + (offset.topicPartition -> maxOffsetAndMetadata)
)
}

override def merge(offset: Offset): OffsetBatch = add(offset)

override def merge(otherOffsets: OffsetBatch): OffsetBatch = {
val newOffsets = Map.newBuilder[TopicPartition, Long]
val newOffsets = Map.newBuilder[TopicPartition, OffsetAndMetadata]
newOffsets ++= offsets
otherOffsets.offsets.foreach { case (tp, offset) =>
val existing = offsets.getOrElse(tp, -1L)
if (existing < offset)
newOffsets += tp -> offset
val laterOffset = offsets.get(tp) match {
case Some(existing) => if (existing.offset < offset.offset) offset else existing
case None => offset
}
newOffsets += tp -> laterOffset
}

copy(offsets = newOffsets.result())
}
}

case object EmptyOffsetBatch extends OffsetBatch {
override val offsets: Map[TopicPartition, Long] = Map.empty
override val offsets: Map[TopicPartition, OffsetAndMetadata] = Map.empty
override val commit: Task[Unit] = ZIO.unit
override def add(offset: Offset): OffsetBatch = offset.batch
override def merge(offset: Offset): OffsetBatch = add(offset)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ object DiagnosticEvent {

sealed trait Commit extends DiagnosticEvent
object Commit {
final case class Started(offsets: Map[TopicPartition, Long]) extends Commit
final case class Started(offsets: Map[TopicPartition, OffsetAndMetadata]) extends Commit
final case class Success(offsets: Map[TopicPartition, OffsetAndMetadata]) extends Commit
final case class Failure(offsets: Map[TopicPartition, OffsetAndMetadata], cause: Throwable) extends Commit
}
Expand Down
17 changes: 11 additions & 6 deletions zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ private[consumer] final class Runloop private (
}

/** This is the implementation behind the user facing api `Offset.commit`. */
private val commit: Map[TopicPartition, Long] => Task[Unit] =
private val commit: Map[TopicPartition, OffsetAndMetadata] => Task[Unit] =
offsets =>
for {
p <- Promise.make[Throwable, Unit]
Expand All @@ -127,16 +127,21 @@ private[consumer] final class Runloop private (
commits: Chunk[RunloopCommand.Commit]
): (JavaMap[TopicPartition, OffsetAndMetadata], OffsetCommitCallback, Throwable => UIO[Unit]) = {
val offsets = commits
.foldLeft(mutable.Map.empty[TopicPartition, Long]) { case (acc, commit) =>
.foldLeft(mutable.Map.empty[TopicPartition, OffsetAndMetadata]) { case (acc, commit) =>
commit.offsets.foreach { case (tp, offset) =>
acc += (tp -> acc.get(tp).map(_ max offset).getOrElse(offset))
acc += (tp -> acc
.get(tp)
.map(current => if (current.offset() > offset.offset()) current else offset)
.getOrElse(offset))
}
acc
}
.toMap
val offsetsWithMetaData = offsets.map { case (tp, offset) => tp -> new OffsetAndMetadata(offset + 1) }
val cont = (e: Exit[Throwable, Unit]) => ZIO.foreachDiscard(commits)(_.cont.done(e))
val onSuccess = cont(Exit.unit) <* diagnostics.emit(DiagnosticEvent.Commit.Success(offsetsWithMetaData))
val offsetsWithMetaData = offsets.map { case (tp, offset) =>
tp -> new OffsetAndMetadata(offset.offset + 1, offset.leaderEpoch, offset.metadata)
}
val cont = (e: Exit[Throwable, Unit]) => ZIO.foreachDiscard(commits)(_.cont.done(e))
val onSuccess = cont(Exit.unit) <* diagnostics.emit(DiagnosticEvent.Commit.Success(offsetsWithMetaData))
val onFailure: Throwable => UIO[Unit] = {
case _: RebalanceInProgressException =>
for {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package zio.kafka.consumer.internal

import org.apache.kafka.clients.consumer.OffsetAndMetadata
import org.apache.kafka.common.TopicPartition
import zio._
import zio.kafka.consumer.{ InvalidSubscriptionUnion, Subscription }
Expand All @@ -19,7 +20,8 @@ object RunloopCommand {
case object StopRunloop extends Control
case object StopAllStreams extends StreamCommand

final case class Commit(offsets: Map[TopicPartition, Long], cont: Promise[Throwable, Unit]) extends RunloopCommand {
final case class Commit(offsets: Map[TopicPartition, OffsetAndMetadata], cont: Promise[Throwable, Unit])
extends RunloopCommand {
@inline def isDone: UIO[Boolean] = cont.isDone
@inline def isPending: UIO[Boolean] = isDone.negate
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ object TransactionalProducer {
case Some(consumerGroupMetadata) =>
val offsets: util.Map[TopicPartition, OffsetAndMetadata] =
offsetBatch.offsets.map { case (topicPartition, offset) =>
topicPartition -> new OffsetAndMetadata(offset + 1)
topicPartition -> new OffsetAndMetadata(offset.offset + 1, offset.metadata)
}.asJava

ZIO.attemptBlocking(live.p.sendOffsetsToTransaction(offsets, consumerGroupMetadata))
Expand Down

0 comments on commit 35fdcc9

Please sign in to comment.