Skip to content

Commit

Permalink
Document ConsumerSettings better (#961)
Browse files Browse the repository at this point in the history
Documented in scaladoc the way we recommend building `ConsumerSettings`.
  • Loading branch information
erikvanoosten authored Jul 2, 2023
1 parent 0596254 commit 2e08798
Showing 1 changed file with 33 additions and 21 deletions.
54 changes: 33 additions & 21 deletions zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,18 @@ import zio.kafka.consumer.Consumer.OffsetRetrieval
import zio.kafka.security.KafkaCredentialStore

/**
* Settings for the consumer.
*
* To stay source compatible with future releases, you are recommended to construct the settings as follows:
* {{{
* ConsumerSettings(bootstrapServers)
* .withGroupId(groupId)
* .withProperties(properties)
* .... etc.
* }}}
*
* @param bootstrapServers
* @param properties
* @param closeTimeout
* @param pollTimeout
* @param offsetRetrieval
* @param rebalanceListener
* @param restartStreamOnRebalancing
* When `true` _all_ streams are restarted during a rebalance, including those streams that are not revoked. The
* default is `false`.
* @param runloopTimeout
* Internal timeout for each iteration of the command processing and polling loop, use to detect stalling. This should
* be much larger than the pollTimeout and the time it takes to process chunks of records. If your consumer is not
* subscribed for long periods during its lifetime, this timeout should take that into account as well. When the
* timeout expires, the plainStream/partitionedStream/etc will fail with a [[Consumer.RunloopTimeout]].
* @param maxPartitionQueueSize
* Maximum number of records to be buffered per partition. This buffer improves throughput and supports varying
* downstream message processing time, while maintaining some backpressure. Large values effectively disable
* backpressure at the cost of high memory usage, low values will effectively disable prefetching in favour of low
* memory consumption. The number of records that is fetched on every poll is controlled by the `max.poll.records`
* setting, the number of records fetched for every partition is somewhere between 0 and `max.poll.records`. A value
* that is a power of 2 offers somewhat better queueing performance. The default value for this parameter is 2 * the
* default `max.poll.records` of 500, rounded to the nearest power of 2.
* the Kafka bootstrap servers
*/
final case class ConsumerSettings(
bootstrapServers: List[String],
Expand Down Expand Up @@ -87,15 +77,37 @@ final case class ConsumerSettings(
def withRebalanceListener(listener: RebalanceListener): ConsumerSettings =
copy(rebalanceListener = listener)

/**
* @param value
* When `true` _all_ streams are restarted during a rebalance, including those streams that are not revoked. The
* default is `false`.
*/
def withRestartStreamOnRebalancing(value: Boolean): ConsumerSettings =
copy(restartStreamOnRebalancing = value)

def withCredentials(credentialsStore: KafkaCredentialStore): ConsumerSettings =
withProperties(credentialsStore.properties)

/**
* @param timeout
* Internal timeout for each iteration of the command processing and polling loop, use to detect stalling. This
* should be much larger than the pollTimeout and the time it takes to process chunks of records. If your consumer
* is not subscribed for long periods during its lifetime, this timeout should take that into account as well. When
* the timeout expires, the plainStream/partitionedStream/etc will fail with a [[Consumer.RunloopTimeout]].
*/
def withRunloopTimeout(timeout: Duration): ConsumerSettings =
copy(runloopTimeout = timeout)

/**
* @param maxPartitionQueueSize
* Maximum number of records to be buffered per partition. This buffer improves throughput and supports varying
* downstream message processing time, while maintaining some backpressure. Large values effectively disable
* backpressure at the cost of high memory usage, low values will effectively disable prefetching in favour of low
* memory consumption. The number of records that is fetched on every poll is controlled by the `max.poll.records`
* setting, the number of records fetched for every partition is somewhere between 0 and `max.poll.records`. A value
* that is a power of 2 offers somewhat better queueing performance. The default value for this parameter is 1024,
* calculated by taking 2 * the default `max.poll.records` of 500, rounded to the nearest power of 2.
*/
def withMaxPartitionQueueSize(maxPartitionQueueSize: Int): ConsumerSettings =
copy(maxPartitionQueueSize = maxPartitionQueueSize)
}
Expand Down

0 comments on commit 2e08798

Please sign in to comment.