diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala index 4e3b4ab45..5ad68a1f3 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala @@ -416,6 +416,29 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { .provideSomeLayer[Kafka](consumer(client, Some(group))) } yield assert(offsets.values.map(_.map(_.offset)))(forall(isSome(equalTo(nrMessages.toLong / nrPartitions)))) }, + test("commits an offset with metadata") { + for { + topic <- randomTopic + group <- randomGroup + metadata <- randomThing("metadata") + client <- randomClient + _ <- ZIO.attempt(EmbeddedKafka.createCustomTopic(topic, partitions = 1)) + _ <- produceOne(topic, "key", "msg") + + // Consume messages + subscription = Subscription.topics(topic) + offsets <- (Consumer + .partitionedStream(subscription, Serde.string, Serde.string) + .flatMap(_._2.map(_.offset.withMetadata(metadata))) + .take(1) + .transduce(Consumer.offsetBatches) + .take(1) + .mapZIO(_.commit) + .runDrain *> + Consumer.committed(Set(new TopicPartition(topic, 0)))) + .provideSomeLayer[Kafka](consumer(client, Some(group))) + } yield assert(offsets.values.headOption.flatten.map(_.metadata))(isSome(equalTo(metadata))) + }, test("handle rebalancing by completing topic-partition streams") { val nrMessages = 50 val nrPartitions = 6 // Must be even and strictly positive diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/CommittableRecord.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/CommittableRecord.scala index f9583f5a7..d138ea6d8 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/CommittableRecord.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/CommittableRecord.scala @@ -1,13 +1,13 @@ package zio.kafka.consumer -import org.apache.kafka.clients.consumer.{ ConsumerGroupMetadata, ConsumerRecord } +import org.apache.kafka.clients.consumer.{ ConsumerGroupMetadata, ConsumerRecord, OffsetAndMetadata } import org.apache.kafka.common.TopicPartition import zio.kafka.serde.Deserializer import zio.{ RIO, Task } final case class CommittableRecord[K, V]( record: ConsumerRecord[K, V], - private val commitHandle: Map[TopicPartition, Long] => Task[Unit], + private val commitHandle: Map[TopicPartition, OffsetAndMetadata] => Task[Unit], private val consumerGroupMetadata: Option[ConsumerGroupMetadata] ) { def deserializeWith[R, K1, V1]( @@ -44,14 +44,15 @@ final case class CommittableRecord[K, V]( partition = record.partition(), offset = record.offset(), commitHandle = commitHandle, - consumerGroupMetadata = consumerGroupMetadata + consumerGroupMetadata = consumerGroupMetadata, + metadata = None ) } object CommittableRecord { def apply[K, V]( record: ConsumerRecord[K, V], - commitHandle: Map[TopicPartition, Long] => Task[Unit], + commitHandle: Map[TopicPartition, OffsetAndMetadata] => Task[Unit], consumerGroupMetadata: Option[ConsumerGroupMetadata] ): CommittableRecord[K, V] = new CommittableRecord( diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/Offset.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/Offset.scala index 69f8b9842..c0cd0b65c 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/Offset.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/Offset.scala @@ -1,16 +1,21 @@ package zio.kafka.consumer -import org.apache.kafka.clients.consumer.{ ConsumerGroupMetadata, RetriableCommitFailedException } +import org.apache.kafka.clients.consumer.{ ConsumerGroupMetadata, OffsetAndMetadata, RetriableCommitFailedException } import org.apache.kafka.common.TopicPartition import zio.{ RIO, Schedule, Task } sealed trait Offset { + def topic: String def partition: Int def offset: Long def commit: Task[Unit] def batch: OffsetBatch def consumerGroupMetadata: Option[ConsumerGroupMetadata] + def withMetadata(metadata: String): Offset + + private[consumer] def metadata: Option[String] + private[consumer] def asJavaOffsetAndMetadata: OffsetAndMetadata = new OffsetAndMetadata(offset, metadata.orNull) /** * Attempts to commit and retries according to the given policy when the commit fails with a @@ -39,9 +44,15 @@ private final case class OffsetImpl( topic: String, partition: Int, offset: Long, - commitHandle: Map[TopicPartition, Long] => Task[Unit], - consumerGroupMetadata: Option[ConsumerGroupMetadata] + commitHandle: Map[TopicPartition, OffsetAndMetadata] => Task[Unit], + consumerGroupMetadata: Option[ConsumerGroupMetadata], + metadata: Option[String] = None ) extends Offset { - def commit: Task[Unit] = commitHandle(Map(topicPartition -> offset)) - def batch: OffsetBatch = OffsetBatchImpl(Map(topicPartition -> offset), commitHandle, consumerGroupMetadata) + def commit: Task[Unit] = commitHandle(Map(topicPartition -> asJavaOffsetAndMetadata)) + def batch: OffsetBatch = OffsetBatchImpl( + Map(topicPartition -> asJavaOffsetAndMetadata), + commitHandle, + consumerGroupMetadata + ) + def withMetadata(metadata: String): OffsetImpl = copy(metadata = Some(metadata)) } diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/OffsetBatch.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/OffsetBatch.scala index 3c0c0f6cc..6c4b5a977 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/OffsetBatch.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/OffsetBatch.scala @@ -1,11 +1,11 @@ package zio.kafka.consumer -import org.apache.kafka.clients.consumer.ConsumerGroupMetadata +import org.apache.kafka.clients.consumer.{ ConsumerGroupMetadata, OffsetAndMetadata } import org.apache.kafka.common.TopicPartition import zio.{ RIO, Schedule, Task, ZIO } sealed trait OffsetBatch { - def offsets: Map[TopicPartition, Long] + def offsets: Map[TopicPartition, OffsetAndMetadata] def commit: Task[Unit] def add(offset: Offset): OffsetBatch @deprecated("Use add(Offset) instead", "2.1.4") @@ -28,27 +28,34 @@ object OffsetBatch { } private final case class OffsetBatchImpl( - offsets: Map[TopicPartition, Long], - commitHandle: Map[TopicPartition, Long] => Task[Unit], + offsets: Map[TopicPartition, OffsetAndMetadata], + commitHandle: Map[TopicPartition, OffsetAndMetadata] => Task[Unit], consumerGroupMetadata: Option[ConsumerGroupMetadata] ) extends OffsetBatch { override def commit: Task[Unit] = commitHandle(offsets) - override def add(offset: Offset): OffsetBatch = + override def add(offset: Offset): OffsetBatch = { + val maxOffsetAndMetadata = offsets.get(offset.topicPartition) match { + case Some(existing) if existing.offset > offset.offset => existing + case _ => offset.asJavaOffsetAndMetadata + } + copy( - offsets = offsets + (offset.topicPartition -> (offsets - .getOrElse(offset.topicPartition, -1L) max offset.offset)) + offsets = offsets + (offset.topicPartition -> maxOffsetAndMetadata) ) + } override def merge(offset: Offset): OffsetBatch = add(offset) override def merge(otherOffsets: OffsetBatch): OffsetBatch = { - val newOffsets = Map.newBuilder[TopicPartition, Long] + val newOffsets = Map.newBuilder[TopicPartition, OffsetAndMetadata] newOffsets ++= offsets otherOffsets.offsets.foreach { case (tp, offset) => - val existing = offsets.getOrElse(tp, -1L) - if (existing < offset) - newOffsets += tp -> offset + val laterOffset = offsets.get(tp) match { + case Some(existing) => if (existing.offset < offset.offset) offset else existing + case None => offset + } + newOffsets += tp -> laterOffset } copy(offsets = newOffsets.result()) @@ -56,7 +63,7 @@ private final case class OffsetBatchImpl( } case object EmptyOffsetBatch extends OffsetBatch { - override val offsets: Map[TopicPartition, Long] = Map.empty + override val offsets: Map[TopicPartition, OffsetAndMetadata] = Map.empty override val commit: Task[Unit] = ZIO.unit override def add(offset: Offset): OffsetBatch = offset.batch override def merge(offset: Offset): OffsetBatch = add(offset) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/diagnostics/DiagnosticEvent.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/diagnostics/DiagnosticEvent.scala index f22f2d7d5..a268c5c6c 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/diagnostics/DiagnosticEvent.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/diagnostics/DiagnosticEvent.scala @@ -15,7 +15,7 @@ object DiagnosticEvent { sealed trait Commit extends DiagnosticEvent object Commit { - final case class Started(offsets: Map[TopicPartition, Long]) extends Commit + final case class Started(offsets: Map[TopicPartition, OffsetAndMetadata]) extends Commit final case class Success(offsets: Map[TopicPartition, OffsetAndMetadata]) extends Commit final case class Failure(offsets: Map[TopicPartition, OffsetAndMetadata], cause: Throwable) extends Commit } diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index cd3c5a005..c7aeb6659 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -113,7 +113,7 @@ private[consumer] final class Runloop private ( } /** This is the implementation behind the user facing api `Offset.commit`. */ - private val commit: Map[TopicPartition, Long] => Task[Unit] = + private val commit: Map[TopicPartition, OffsetAndMetadata] => Task[Unit] = offsets => for { p <- Promise.make[Throwable, Unit] @@ -127,16 +127,21 @@ private[consumer] final class Runloop private ( commits: Chunk[RunloopCommand.Commit] ): (JavaMap[TopicPartition, OffsetAndMetadata], OffsetCommitCallback, Throwable => UIO[Unit]) = { val offsets = commits - .foldLeft(mutable.Map.empty[TopicPartition, Long]) { case (acc, commit) => + .foldLeft(mutable.Map.empty[TopicPartition, OffsetAndMetadata]) { case (acc, commit) => commit.offsets.foreach { case (tp, offset) => - acc += (tp -> acc.get(tp).map(_ max offset).getOrElse(offset)) + acc += (tp -> acc + .get(tp) + .map(current => if (current.offset() > offset.offset()) current else offset) + .getOrElse(offset)) } acc } .toMap - val offsetsWithMetaData = offsets.map { case (tp, offset) => tp -> new OffsetAndMetadata(offset + 1) } - val cont = (e: Exit[Throwable, Unit]) => ZIO.foreachDiscard(commits)(_.cont.done(e)) - val onSuccess = cont(Exit.unit) <* diagnostics.emit(DiagnosticEvent.Commit.Success(offsetsWithMetaData)) + val offsetsWithMetaData = offsets.map { case (tp, offset) => + tp -> new OffsetAndMetadata(offset.offset + 1, offset.leaderEpoch, offset.metadata) + } + val cont = (e: Exit[Throwable, Unit]) => ZIO.foreachDiscard(commits)(_.cont.done(e)) + val onSuccess = cont(Exit.unit) <* diagnostics.emit(DiagnosticEvent.Commit.Success(offsetsWithMetaData)) val onFailure: Throwable => UIO[Unit] = { case _: RebalanceInProgressException => for { diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopCommand.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopCommand.scala index a750b077a..a5259b2c2 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopCommand.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopCommand.scala @@ -1,5 +1,6 @@ package zio.kafka.consumer.internal +import org.apache.kafka.clients.consumer.OffsetAndMetadata import org.apache.kafka.common.TopicPartition import zio._ import zio.kafka.consumer.{ InvalidSubscriptionUnion, Subscription } @@ -19,7 +20,8 @@ object RunloopCommand { case object StopRunloop extends Control case object StopAllStreams extends StreamCommand - final case class Commit(offsets: Map[TopicPartition, Long], cont: Promise[Throwable, Unit]) extends RunloopCommand { + final case class Commit(offsets: Map[TopicPartition, OffsetAndMetadata], cont: Promise[Throwable, Unit]) + extends RunloopCommand { @inline def isDone: UIO[Boolean] = cont.isDone @inline def isPending: UIO[Boolean] = isDone.negate } diff --git a/zio-kafka/src/main/scala/zio/kafka/producer/TransactionalProducer.scala b/zio-kafka/src/main/scala/zio/kafka/producer/TransactionalProducer.scala index b5b1d5dce..98380254b 100644 --- a/zio-kafka/src/main/scala/zio/kafka/producer/TransactionalProducer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/producer/TransactionalProducer.scala @@ -41,7 +41,7 @@ object TransactionalProducer { case Some(consumerGroupMetadata) => val offsets: util.Map[TopicPartition, OffsetAndMetadata] = offsetBatch.offsets.map { case (topicPartition, offset) => - topicPartition -> new OffsetAndMetadata(offset + 1) + topicPartition -> new OffsetAndMetadata(offset.offset + 1, offset.metadata) }.asJava ZIO.attemptBlocking(live.p.sendOffsetsToTransaction(offsets, consumerGroupMetadata))