Skip to content

Commit

Permalink
Make tests faster (#1139)
Browse files Browse the repository at this point in the history
Make tests faster by
 - running them in parallel
 - reduce number of runs with `nonFlaky`

Results for `sbt test`:
 - on GHA: from 17m 34s to 7m
 - on my laptop: from 231s to 130s

Also: upgrade logback from 1.3.14 to 1.4.14.
  • Loading branch information
erikvanoosten authored Dec 22, 2023
1 parent d2aa7f7 commit 8a31ed7
Show file tree
Hide file tree
Showing 11 changed files with 89 additions and 83 deletions.
10 changes: 5 additions & 5 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ lazy val embeddedKafkaVersion = "3.6.1" // Should be the same as kafkaVersion, e

lazy val kafkaClients = "org.apache.kafka" % "kafka-clients" % kafkaVersion
lazy val scalaCollectionCompat = "org.scala-lang.modules" %% "scala-collection-compat" % "2.11.0"
lazy val logback = "ch.qos.logback" % "logback-classic" % "1.3.14"
lazy val logback = "ch.qos.logback" % "logback-classic" % "1.4.14"

enablePlugins(ZioSbtEcosystemPlugin, ZioSbtCiPlugin)

Expand Down Expand Up @@ -166,11 +166,11 @@ lazy val zioKafkaExample =
libraryDependencies ++= Seq(
"dev.zio" %% "zio" % "2.0.20",
"dev.zio" %% "zio-kafka" % "2.7.1",
"dev.zio" %% "zio-kafka-testkit" % "2.7.1" % Test,
"dev.zio" %% "zio-test" % "2.0.20" % Test,
"ch.qos.logback" % "logback-classic" % "1.4.14",
"dev.zio" %% "zio-logging-slf4j2" % "2.1.16",
"io.github.embeddedkafka" %% "embedded-kafka" % embeddedKafkaVersion
"io.github.embeddedkafka" %% "embedded-kafka" % embeddedKafkaVersion,
logback,
"dev.zio" %% "zio-kafka-testkit" % "2.7.1" % Test,
"dev.zio" %% "zio-test" % "2.0.20" % Test
),
// Scala 3 compiling fails with:
// [error] Modules were resolved with conflicting cross-version suffixes in ProjectRef(uri("file:/home/runner/work/zio-kafka/zio-kafka/"), "zioKafkaExample"):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import zio.kafka.serde.Serde
import zio.kafka.testkit.KafkaTestUtils.{ consumer, produceMany, producer }
import zio.kafka.testkit._
import zio.test.Assertion.hasSameElements
import zio.test.TestAspect.{ sequential, timeout }
import zio.test.TestAspect.timeout
import zio.test._

/**
Expand Down Expand Up @@ -41,5 +41,5 @@ object ConsumerSpec extends ZIOSpecDefault with KafkaRandom {
)
.provideSome[Kafka](producer) // Here, we provide a new instance of Producer per test
.provideSomeShared[Scope](Kafka.embedded) // Here, we provide an instance of Kafka for the entire suite
) @@ timeout(2.minutes) @@ sequential
) @@ timeout(2.minutes)
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import zio.kafka.producer.Producer
import zio.kafka.serde.Serde
import zio.kafka.testkit.Kafka
import zio.kafka.testkit.KafkaTestUtils._
import zio.test.TestAspect.{ sequential, timeout }
import zio.test.TestAspect.timeout
import zio.test._

/**
Expand All @@ -25,5 +25,5 @@ object ProducerSpec extends ZIOSpecDefault {
)
.provideSome[Kafka](producer) // Here, we provide a new instance of Producer per test
.provideSomeShared[Scope](Kafka.embedded) // Here, we provide an instance of Kafka for the entire suite
) @@ timeout(2.minutes) @@ sequential
) @@ timeout(2.minutes)
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,6 @@ object AdminSaslSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
assert(remainingAcls)(equalTo(Set.empty[AclBinding]))
}
}
).provideSomeShared[Scope](Kafka.saslEmbedded) @@ withLiveClock @@ sequential
).provideSomeShared[Scope](Kafka.saslEmbedded) @@ withLiveClock

}
84 changes: 39 additions & 45 deletions zio-kafka-test/src/test/scala/zio/kafka/AdminSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,91 +39,85 @@ object AdminSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {

override val kafkaPrefix: String = "adminspec"

private def listTopicsFiltered(client: AdminClient): ZIO[Any, Throwable, Map[String, AdminClient.TopicListing]] =
client.listTopics().map(_.filter { case (key, _) => key.startsWith("adminspec-") })
private def listTopicsFiltered(
client: AdminClient,
prefix: String
): ZIO[Any, Throwable, Map[String, AdminClient.TopicListing]] =
client.listTopics().map(_.filter { case (key, _) => key.startsWith(prefix) })

override def spec: Spec[TestEnvironment with Scope, Throwable] =
suite("client admin test")(
test("create, list, delete single topic") {
val prefix = "adminspec1"
KafkaTestUtils.withAdmin { client =>
for {
list1 <- listTopicsFiltered(client)
_ <- client.createTopic(AdminClient.NewTopic("adminspec-topic1", 1, 1))
list2 <- listTopicsFiltered(client)
_ <- client.deleteTopic("adminspec-topic1")
list3 <- listTopicsFiltered(client)
} yield assert(list1.size)(equalTo(0)) &&
assert(list2.size)(equalTo(1)) &&
assert(list3.size)(equalTo(0))

list1 <- listTopicsFiltered(client, prefix)
_ <- client.createTopic(AdminClient.NewTopic(s"$prefix-topic1", 1, 1))
list2 <- listTopicsFiltered(client, prefix)
_ <- client.deleteTopic(s"$prefix-topic1")
list3 <- listTopicsFiltered(client, prefix)
} yield assertTrue(list1.isEmpty, list2.size == 1, list3.isEmpty)
}
},
test("create, list, delete multiple topic") {
val prefix = "adminspec2"
KafkaTestUtils.withAdmin { client =>
for {
list1 <- listTopicsFiltered(client)
list1 <- listTopicsFiltered(client, prefix)
_ <- client.createTopics(
List(AdminClient.NewTopic("adminspec-topic2", 1, 1), AdminClient.NewTopic("adminspec-topic3", 4, 1))
List(AdminClient.NewTopic(s"$prefix-topic2", 1, 1), AdminClient.NewTopic(s"$prefix-topic3", 4, 1))
)
list2 <- listTopicsFiltered(client)
_ <- client.deleteTopic("adminspec-topic2")
list3 <- listTopicsFiltered(client)
_ <- client.deleteTopic("adminspec-topic3")
list4 <- listTopicsFiltered(client)
} yield assert(list1.size)(equalTo(0)) &&
assert(list2.size)(equalTo(2)) &&
assert(list3.size)(equalTo(1)) &&
assert(list4.size)(equalTo(0))

list2 <- listTopicsFiltered(client, prefix)
_ <- client.deleteTopic(s"$prefix-topic2")
list3 <- listTopicsFiltered(client, prefix)
_ <- client.deleteTopic(s"$prefix-topic3")
list4 <- listTopicsFiltered(client, prefix)
} yield assertTrue(list1.isEmpty, list2.size == 2, list3.size == 1, list4.isEmpty)
}
},
test("just list") {
KafkaTestUtils.withAdmin { client =>
for {
list1 <- listTopicsFiltered(client)
} yield assert(list1.size)(equalTo(0))
list1 <- listTopicsFiltered(client, "adminspec3")
} yield assertTrue(list1.isEmpty)

}
},
test("create, describe, delete multiple topic") {
val prefix = "adminspec4"
KafkaTestUtils.withAdmin { client =>
for {
list1 <- listTopicsFiltered(client)
list1 <- listTopicsFiltered(client, prefix)
_ <- client.createTopics(
List(AdminClient.NewTopic("adminspec-topic4", 1, 1), AdminClient.NewTopic("adminspec-topic5", 4, 1))
List(AdminClient.NewTopic(s"$prefix-topic4", 1, 1), AdminClient.NewTopic(s"$prefix-topic5", 4, 1))
)
descriptions <- client.describeTopics(List("adminspec-topic4", "adminspec-topic5"))
_ <- client.deleteTopics(List("adminspec-topic4", "adminspec-topic5"))
list3 <- listTopicsFiltered(client)
} yield assert(list1.size)(equalTo(0)) &&
assert(descriptions.size)(equalTo(2)) &&
assert(list3.size)(equalTo(0))

descriptions <- client.describeTopics(List(s"$prefix-topic4", s"$prefix-topic5"))
_ <- client.deleteTopics(List(s"$prefix-topic4", s"$prefix-topic5"))
list3 <- listTopicsFiltered(client, prefix)
} yield assertTrue(list1.isEmpty, descriptions.size == 2, list3.isEmpty)
}
},
test("create, describe topic config, delete multiple topic") {
val prefix = "adminspec5"
KafkaTestUtils.withAdmin { client =>
for {
list1 <- listTopicsFiltered(client)
list1 <- listTopicsFiltered(client, prefix)
_ <- client.createTopics(
List(AdminClient.NewTopic("adminspec-topic6", 1, 1), AdminClient.NewTopic("adminspec-topic7", 4, 1))
List(AdminClient.NewTopic(s"$prefix-topic6", 1, 1), AdminClient.NewTopic(s"$prefix-topic7", 4, 1))
)
configResources = List(
ConfigResource(ConfigResourceType.Topic, "adminspec-topic6"),
ConfigResource(ConfigResourceType.Topic, "adminspec-topic7")
ConfigResource(ConfigResourceType.Topic, s"$prefix-topic6"),
ConfigResource(ConfigResourceType.Topic, s"$prefix-topic7")
)
configs <- client.describeConfigs(configResources) <&>
client.describeConfigsAsync(configResources).flatMap { configs =>
ZIO.foreachPar(configs) { case (resource, configTask) =>
configTask.map(config => (resource, config))
}
}
_ <- client.deleteTopics(List("adminspec-topic6", "adminspec-topic7"))
list3 <- listTopicsFiltered(client)
} yield assert(list1.size)(equalTo(0)) &&
assert(configs._1.size)(equalTo(2)) &&
assert(configs._2.size)(equalTo(2)) &&
assert(list3.size)(equalTo(0))
_ <- client.deleteTopics(List(s"$prefix-topic6", s"$prefix-topic7"))
list3 <- listTopicsFiltered(client, prefix)
} yield assertTrue(list1.isEmpty, configs._1.size == 2, configs._2.size == 2, list3.isEmpty)
}
},
test("list cluster nodes") {
Expand Down Expand Up @@ -636,7 +630,7 @@ object AdminSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
assert(remainingAcls)(equalTo(Set.empty[AclBinding]))
}
}
).provideSomeShared[Scope](Kafka.embedded) @@ withLiveClock @@ sequential @@ timeout(2.minutes)
).provideSomeShared[Scope](Kafka.embedded) @@ withLiveClock @@ timeout(2.minutes)

private def consumeNoop(
topicName: String,
Expand Down
21 changes: 11 additions & 10 deletions zio-kafka-test/src/test/scala/zio/kafka/ProducerSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@ import zio.test.TestAspect._
import zio.test._

import java.nio.charset.StandardCharsets
import java.util.UUID

object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
override val kafkaPrefix: String = "producerspec"

private def asString(v: Array[Byte]) = new String(v, StandardCharsets.UTF_8)

def withConsumerInt(
private def withConsumerInt(
subscription: Subscription,
settings: ConsumerSettings
): ZIO[Any with Scope, Throwable, Dequeue[Take[Throwable, CommittableRecord[String, Int]]]] =
Expand Down Expand Up @@ -246,13 +247,13 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
consumer.take.flatMap(_.done).mapError(_.getOrElse(new NoSuchElementException))
}
}
} yield assertTrue(outcome.length == 3) &&
assertTrue(outcome(0).isRight) &&
assertTrue(
outcome(1).swap.exists(_.getMessage.contains("Compacted topic cannot accept message without key"))
) &&
assertTrue(outcome(2).isRight) &&
assertTrue(recordsConsumed.length == 2)
} yield assertTrue(
outcome.length == 3,
outcome(0).isRight,
outcome(1).swap.exists(_.getMessage.contains("Compacted topic cannot accept message without key")),
outcome(2).isRight,
recordsConsumed.length == 2
)
},
test("an empty chunk of records") {
val chunks = Chunk.fromIterable(List.empty)
Expand Down Expand Up @@ -646,11 +647,11 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
produceChunkSpec
)
.provideSome[Kafka](
(KafkaTestUtils.producer ++ transactionalProducer)
(KafkaTestUtils.producer ++ transactionalProducer(UUID.randomUUID().toString))
.mapError(TestFailure.fail),
KafkaTestUtils.consumer(clientId = "producer-spec-consumer", groupId = Some("group-0"))
)
.provideSomeShared[Scope](
Kafka.embedded
) @@ withLiveClock @@ timeout(3.minutes) @@ sequential
) @@ withLiveClock @@ timeout(3.minutes)
}
Original file line number Diff line number Diff line change
Expand Up @@ -323,9 +323,9 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
} yield assert(offset.map(_.offset))(isSome(equalTo(9L)))
},
test("process outstanding commits after a graceful shutdown with aggregateAsync using `maxRebalanceDuration`") {
val kvs = (1 to 100).toList.map(i => (s"key$i", s"msg$i"))
val topic = "test-outstanding-commits"
val kvs = (1 to 100).toList.map(i => (s"key$i", s"msg$i"))
for {
topic <- randomTopic
group <- randomGroup
client <- randomClient
_ <- produceMany(topic, kvs)
Expand Down Expand Up @@ -354,7 +354,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
)
)
} yield assertTrue(offset.map(_.offset).contains(9L))
} @@ TestAspect.nonFlaky(5),
} @@ TestAspect.nonFlaky(2),
test("a consumer timeout interrupts the stream and shuts down the consumer") {
// Setup of this test:
// - Set the max poll interval very low: a couple of seconds.
Expand Down Expand Up @@ -1282,7 +1282,6 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
transactionalConsumer(
clientId,
consumerGroupId,
offsetRetrieval = OffsetRetrieval.Auto(AutoOffsetStrategy.Earliest),
restartStreamOnRebalancing = true,
properties = Map(
ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG ->
Expand All @@ -1297,7 +1296,8 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
}

for {
tProducerSettings <- transactionalProducerSettings
transactionalId <- randomThing("transactional")
tProducerSettings <- transactionalProducerSettings(transactionalId)
tProducer <- TransactionalProducer.make(tProducerSettings)

topicA <- randomTopic
Expand Down Expand Up @@ -1355,7 +1355,6 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
transactionalConsumer(
validatorClientId,
groupB,
offsetRetrieval = OffsetRetrieval.Auto(AutoOffsetStrategy.Earliest),
properties = Map(ConsumerConfig.MAX_POLL_RECORDS_CONFIG -> "200")
)
)
Expand Down Expand Up @@ -1558,6 +1557,6 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
.provideSome[Scope & Kafka](producer)
.provideSomeShared[Scope](
Kafka.embedded
) @@ withLiveClock @@ sequential @@ timeout(2.minutes)
) @@ withLiveClock @@ timeout(2.minutes)

}
Original file line number Diff line number Diff line change
Expand Up @@ -267,8 +267,8 @@ object SubscriptionsSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
.plainStream(Subscription.topics(topic1), Serde.string, Serde.string)
.take(40)
.transduce(
Consumer.offsetBatches.contramap[CommittableRecord[String, String]](_.offset) <&> ZSink
.collectAll[CommittableRecord[String, String]]
Consumer.offsetBatches.contramap[CommittableRecord[String, String]](_.offset) <&>
ZSink.collectAll[CommittableRecord[String, String]]
)
.mapZIO { case (offsetBatch, records) => offsetBatch.commit.as(records) }
.flattenChunks
Expand All @@ -278,7 +278,7 @@ object SubscriptionsSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
.provideSomeLayer[Kafka with Scope](consumer(client, Some(group)))
consumed <- recordsConsumed.get
} yield assert(consumed.map(r => r.value))(hasSameElements(Chunk.fromIterable(kvs.map(_._2))))
} @@ TestAspect.nonFlaky(3)
} @@ TestAspect.nonFlaky(2)
)
.provideSome[Scope & Kafka](producer)
.provideSomeShared[Scope](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ object SslHelperSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
suite(".validateEndpoint")(
integrationTests,
unitTests
) @@ withLiveClock @@ sequential
) @@ withLiveClock

implicit class SettingsHelper(adminClientSettings: AdminClientSettings) {
def bootstrapServers: List[String] = adminClientSettings.driverSettings
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,31 @@ object KafkaTestUtils {
)

/**
* Default transactional Producer settings you can use in your tests.
* Default transactional producer settings you can use in your tests.
*
* Note: to run multiple tests in parallel, you need to use different transactional ids via
* `transactionalProducerSettings(transactionalId)`.
*/
val transactionalProducerSettings: ZIO[Kafka, Nothing, TransactionalProducerSettings] =
transactionalProducerSettings("test-transaction")

def transactionalProducerSettings(transactionalId: String): ZIO[Kafka, Nothing, TransactionalProducerSettings] =
ZIO
.serviceWith[Kafka](_.bootstrapServers)
.map(TransactionalProducerSettings(_, "test-transaction"))
.map(TransactionalProducerSettings(_, transactionalId))

/**
* Transactional Producer instance you can use in your tests. It uses the default transactional Producer settings.
* Transactional producer instance you can use in your tests. It uses the default transactional producer settings.
*
* Note: to run multiple tests in parallel, you need to use different transactional ids via
* `transactionalProducer(transactionalId)`.
*/
val transactionalProducer: ZLayer[Kafka, Throwable, TransactionalProducer] =
transactionalProducer("test-transaction")

def transactionalProducer(transactionalId: String): ZLayer[Kafka, Throwable, TransactionalProducer] =
ZLayer.makeSome[Kafka, TransactionalProducer](
ZLayer(transactionalProducerSettings),
ZLayer(transactionalProducerSettings(transactionalId)),
TransactionalProducer.live
)

Expand Down Expand Up @@ -234,7 +246,7 @@ object KafkaTestUtils {
clientId: String,
groupId: String,
clientInstanceId: Option[String] = None,
offsetRetrieval: OffsetRetrieval = OffsetRetrieval.Auto(),
offsetRetrieval: OffsetRetrieval = OffsetRetrieval.Auto(AutoOffsetStrategy.Earliest),
allowAutoCreateTopics: Boolean = true,
diagnostics: Diagnostics = Diagnostics.NoOp,
restartStreamOnRebalancing: Boolean = false,
Expand Down
Loading

0 comments on commit 8a31ed7

Please sign in to comment.