Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Knowledge gathering question] Best practices to set up Producer clients for multiple topics #1310

Open
2 of 7 tasks
Sam-sad-Sajid opened this issue Oct 2, 2024 · 1 comment

Comments

@Sam-sad-Sajid
Copy link

Sam-sad-Sajid commented Oct 2, 2024

Description

Hello 👋

I am seeking guidance on the best practices to set up producer clients for multiple topics.

My current setup is that when my server boots up I create producer clients for each topic.
So, if my server wants to publish to 1...N number of topics then I create 1...N number of clients.
While theoretically, N can be any positive number in practice (in my case), it can be between 1~10.

I create the producers when the app boots up (during dependency injection). The producers are alive
throughout the app lifecycle. During the application shutdown, I close the producers after flushing them.

The reason I don't create a producer per message is because of the high latency of producer client creation and
then sending the message.

The motivation for creating one producer client for each topic is to provide isolation among the clients.
If one client gets impacted due to any reasons (for example, messages larger than 1MB are being published
for one topic so Kafka rejects it, but the other clients can continue to publish messages for other topics)

Some downsides of having multiple producer clients for multiple topics I can think of are as follows:

redundant TCP connections to Kafka: since all producer clients are connected to all broker nodes, I have a redundant TCP connection. I am assuming that this take redundancy takes some space in the application heap memory.

Could you please suggest guidance if the above approach is ok? If not could you please give me a pointer
for the best approaches?

How to reproduce

Checklist

Please provide the following information:

  • confluent-kafka-go and librdkafka version (LibraryVersion()): 2.4.0
  • Apache Kafka broker version: 3.5.1
  • Client configuration: ConfigMap{...}
  • Operating system:
  • Provide client logs (with "debug": ".." as necessary)
  • Provide broker log excerpts
  • Critical issue
@Sam-sad-Sajid
Copy link
Author

Sam-sad-Sajid commented Oct 2, 2024

I found this issue in librdkafka
confluentinc/librdkafka#1032

It seems that if I have different producer configurations for different topics using one producer client for multiple topics does not set the different values.

Example:
Sample producer configurations for three topics:

"topic_a": {
      "acks": "all",
      "linger.ms": "10",
      "request.timeout.ms": "5000",
      "delivery.timeout.ms": "10000"
},
"topic_b": {
      "acks": "1",
      "linger.ms": "10",
      "request.timeout.ms": "6000",
      "delivery.timeout.ms": "11000"
},
"topic_c": {
      "acks": "all",
      "linger.ms": "10",
      "request.timeout.ms": "7000",
      "delivery.timeout.ms": "12000"
}

I set the debug: all in the producer configurations.

Output:

%7|1727911922.230|TOPIC|test-app#producer-1| [thrd:app]: New local topic: topic_a
%7|1727911922.230|TOPPARNEW|test-app#producer-1| [thrd:app]: NEW topic_a [-1] 0x12d00aa00 refcnt 0x12d00aa90 (at rd_kafka_topic_new0:488)
%7|1727911922.230|CONF|test-app#producer-1| [thrd:app]: Topic "topic_a" configuration (default_topic_conf):
%7|1727911922.230|CONF|test-app#producer-1| [thrd:app]:   request.required.acks = -1
%7|1727911922.230|CONF|test-app#producer-1| [thrd:app]:   request.timeout.ms = 5000
%7|1727911922.230|CONF|test-app#producer-1| [thrd:app]:   message.timeout.ms = 10000
%7|1727911922.230|CONF|test-app#producer-1| [thrd:app]:   partitioner = murmur2_random
%7|1727911922.230|CONNECT|test-app#producer-1| [thrd:app]: Not selecting any broker for cluster connection: still suppressed for 46ms: leader query
%7|1727911922.230|METADATA|test-app#producer-1| [thrd:app]: Hinted cache of 1/1 topic(s) being queried
%7|1727911922.230|METADATA|test-app#producer-1| [thrd:app]: Skipping metadata refresh of 1 topic(s): leader query: no usable brokers
%7|1727911922.230|TOPIC|test-app#producer-1| [thrd:app]: New local topic: topic_b
%7|1727911922.230|TOPPARNEW|test-app#producer-1| [thrd:app]: NEW topic_b [-1] 0x12c010800 refcnt 0x12c010890 (at rd_kafka_topic_new0:488)
%7|1727911922.230|CONF|test-app#producer-1| [thrd:app]: Topic "topic_b" configuration (default_topic_conf):
%7|1727911922.230|CONF|test-app#producer-1| [thrd:app]:   request.required.acks = -1
%7|1727911922.230|CONF|test-app#producer-1| [thrd:app]:   request.timeout.ms = 5000
%7|1727911922.230|CONF|test-app#producer-1| [thrd:app]:   message.timeout.ms = 10000
%7|1727911922.230|CONF|test-app#producer-1| [thrd:app]:   partitioner = murmur2_random
%7|1727911922.230|CONNECT|test-app#producer-1| [thrd:app]: Not selecting any broker for cluster connection: still suppressed for 46ms: leader query
%7|1727911922.230|METADATA|test-app#producer-1| [thrd:app]: Hinted cache of 1/1 topic(s) being queried
%7|1727911922.230|METADATA|test-app#producer-1| [thrd:app]: Skipping metadata refresh of 1 topic(s): leader query: no usable brokers
%7|1727911922.230|TOPIC|test-app#producer-1| [thrd:app]: New local topic: topic_c
%7|1727911922.230|TOPPARNEW|test-app#producer-1| [thrd:app]: NEW topic_c [-1] 0x12c012200 refcnt 0x12c012290 (at rd_kafka_topic_new0:488)
%7|1727911922.230|CONF|test-app#producer-1| [thrd:app]: Topic "topic_c" configuration (default_topic_conf):
%7|1727911922.230|CONF|test-app#producer-1| [thrd:app]:   request.required.acks = -1
%7|1727911922.230|CONF|test-app#producer-1| [thrd:app]:   request.timeout.ms = 5000
%7|1727911922.230|CONF|test-app#producer-1| [thrd:app]:   message.timeout.ms = 10000
%7|1727911922.230|CONF|test-app#producer-1| [thrd:app]:   partitioner = murmur2_random

However, if I use multiple producers for multiple topics, per-topic configurations are respected.
Output for the same configurations from above:

%7|1727911512.054|TOPIC|test-app#producer-1| [thrd:app]: New local topic: topic_b
%7|1727911512.054|TOPIC|test-app#producer-3| [thrd:app]: New local topic: topic_a
%7|1727911512.054|TOPIC|test-app#producer-2| [thrd:app]: New local topic: topic_c
%7|1727911512.054|TOPPARNEW|test-app#producer-2| [thrd:app]: NEW topic_c [-1] 0x14e81f800 refcnt 0x14e81f890 (at rd_kafka_topic_new0:488)
%7|1727911512.054|CONF|test-app#producer-2| [thrd:app]: Topic "topic_c" configuration (default_topic_conf):
%7|1727911512.054|CONF|test-app#producer-2| [thrd:app]:   request.required.acks = -1
%7|1727911512.054|CONF|test-app#producer-2| [thrd:app]:   request.timeout.ms = 7000
%7|1727911512.054|CONF|test-app#producer-2| [thrd:app]:   message.timeout.ms = 12000
%7|1727911512.054|CONF|test-app#producer-2| [thrd:app]:   partitioner = murmur2_random
%7|1727911512.054|CONNECT|test-app#producer-2| [thrd:app]: Not selecting any broker for cluster connection: still suppressed for 32ms: leader query
%7|1727911512.054|METADATA|test-app#producer-2| [thrd:app]: Hinted cache of 1/1 topic(s) being queried
%7|1727911512.054|METADATA|test-app#producer-2| [thrd:app]: Skipping metadata refresh of 1 topic(s): leader query: no usable brokers
%7|1727911512.054|TOPPARNEW|test-app#producer-1| [thrd:app]: NEW topic_b [-1] 0x152808200 refcnt 0x152808290 (at rd_kafka_topic_new0:488)
%7|1727911512.054|CONF|test-app#producer-1| [thrd:app]: Topic "topic_b" configuration (default_topic_conf):
%7|1727911512.054|CONF|test-app#producer-1| [thrd:app]:   request.required.acks = 1
%7|1727911512.054|CONF|test-app#producer-1| [thrd:app]:   request.timeout.ms = 6000
%7|1727911512.054|CONF|test-app#producer-1| [thrd:app]:   message.timeout.ms = 11000
%7|1727911512.054|CONF|test-app#producer-1| [thrd:app]:   partitioner = murmur2_random
%7|1727911512.054|TOPPARNEW|test-app#producer-3| [thrd:app]: NEW topic_a [-1] 0x15200ce00 refcnt 0x15200ce90 (at rd_kafka_topic_new0:488)
%7|1727911512.054|CONF|test-app#producer-3| [thrd:app]: Topic "topic_a" configuration (default_topic_conf):
%7|1727911512.054|CONF|test-app#producer-3| [thrd:app]:   request.required.acks = -1
%7|1727911512.054|CONF|test-app#producer-3| [thrd:app]:   request.timeout.ms = 5000
%7|1727911512.054|CONF|test-app#producer-3| [thrd:app]:   message.timeout.ms = 10000
%7|1727911512.054|CONF|test-app#producer-3| [thrd:app]:   partitioner = murmur2_random

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant