Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

allow us to unsubscribe from a topic more easily #1193

Open
ORESoftware opened this issue May 19, 2024 · 4 comments
Open

allow us to unsubscribe from a topic more easily #1193

ORESoftware opened this issue May 19, 2024 · 4 comments

Comments

@ORESoftware
Copy link

ORESoftware commented May 19, 2024

Description

Unsubscribing to 1 topic, but not all topics, for a consumer is currently wonky

I want this:

err := consumer.UnsubscribeFromTopic(topicId)

but all we have is:

err := consumer.Unsubscribe()

which unsubscribes us from all topics.
this is insane?

How to reproduce

here is how to unsubscribe from one topic, but not all topics -

  1. we retrieve list of current topics
  2. remove the topic from the list
  3. then call the subscribe call again with the new list
package main

import (
	"fmt"
	"log"
	"strings"
	"time"

	"github.com/confluentinc/confluent-kafka-go/kafka"
)

func main() {

	consumer, err := kafka.NewConsumer(config)
	
	topics := []string{"topic1", "topic2", "topic3"}
	err = consumer.SubscribeTopics(topics, nil)

	fmt.Printf("Subscribed to topics: %s\n", strings.Join(topics, ", "))

	unsubscribeFromTopic := func(unsubTopic string) error {
		metadata, err := consumer.GetMetadata(nil, true, 5000)
		if err != nil {
			return fmt.Errorf("Failed to get metadata: %s", err)
		}

		var remainingTopics []string
		for _, topic := range topics {
			if topic != unsubTopic {
				if _, exists := metadata.Topics[topic]; exists {
					remainingTopics = append(remainingTopics, topic)
				}
			}
		}

		if len(remainingTopics) == 0 {
			return fmt.Errorf("No remaining topics to subscribe to")
		}

		err = consumer.SubscribeTopics(remainingTopics, nil)
		fmt.Printf("Resubscribed to topics: %s\n", strings.Join(remainingTopics, ", "))
		return nil
	}

	go func() {
		for {
			msg, err := consumer.ReadMessage(100 * time.Millisecond)
			fmt.Printf("Consumed message from topic %s: \n", *msg.TopicPartition.Topic,)
		}
	}()

	time.Sleep(5 * time.Second)

	err = unsubscribeFromTopic("topic2")

	time.Sleep(10 * time.Second)
	err = consumer.Close()
	fmt.Println("Consumer closed.")
}
@ORESoftware ORESoftware changed the title allow Users to unsubscribe from a topic more easily allow us to unsubscribe from a topic more easily May 21, 2024
@milindl
Copy link
Contributor

milindl commented May 23, 2024

Hey - while I don't think there is a plan to immediately add this enhancement right now, I can suggest a better workaround. You can use the Subscription() method to get a list of topics you're subscribed to, rather than the metadata call (this avoids a network call since Subscription is all local). The stuff with the slices still needs to be done, unfortunately.

@ORESoftware
Copy link
Author

ok thanks @milindl - I think this functionality is sorely needed - it would save users re-implementing their own unsubscribe logic. That means the library would store the references, which is fine I guess? Maybe only 10K items, tops.

@ORESoftware
Copy link
Author

we have a lot of topics in Kafka and passing 500 topic ids 3x a second to unsubscribe is a bit scary. Is there no way to store a reference to the sub list on the kafka-side? it seems like this is a big oversight?

@milindl
Copy link
Contributor

milindl commented Jun 6, 2024

I think the API is worth considering, but there are several things which means it will probably not be prioritized soon.

This is a public API, once we support it, we'll need to support it forever. And we'd need to define the semantics of the method for regex-based subscriptions, for which there is more than one way to implement it, and thus requires more discussion. A similar method doesn't exist in the official Kafka Java client either.

Added to that is the fact a workaround exists, and in my experience, changing subscriptions multiple times per second is not a very common mode of operation, since that incurs a rebalance (Even if you're using incremental rebalancing, there is some penalty to it).

Could you help me understand the use case for subscribing and unsubscribing from a topic multiple times every second? That'd be helpful for me to discuss this with my team.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants