Skip to content

Commit

Permalink
Support prefetch limit per consumer. Deprecate Global OptionPrefetchL…
Browse files Browse the repository at this point in the history
…imit. (#232)
  • Loading branch information
DeimonDB authored Aug 28, 2023
1 parent 39e2813 commit de9ae91
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 5 deletions.
17 changes: 14 additions & 3 deletions mq/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (c *consumer) Start(ctx context.Context) error {
c.stopChan = make(chan struct{})

var err error
c.messages, err = c.messageChannel()
c.messages, err = c.messageChannel(c.options.Workers)
if err != nil {
return fmt.Errorf("get message channel: %v", err)
}
Expand Down Expand Up @@ -141,8 +141,19 @@ func (c *consumer) process(queueName string, body []byte) error {
return err
}

func (c *consumer) messageChannel() (<-chan amqp.Delivery, error) {
messageChannel, err := c.client.amqpChan.Consume(
// messageChannel will create a new dedicated channel for this consumer to use
func (c *consumer) messageChannel(prefetchCount int) (<-chan amqp.Delivery, error) {
mqChan, err := c.client.conn.Channel()
if err != nil {
return nil, fmt.Errorf("MQ issue. queue: %s, err: %w", string(c.queue.Name()), err)
}

err = mqChan.Qos(prefetchCount, 0, true)
if err != nil {
return nil, fmt.Errorf("MQ issue. queue: %s, err: %w", string(c.queue.Name()), err)
}

messageChannel, err := mqChan.Consume(
string(c.queue.Name()),
"",
false,
Expand Down
6 changes: 4 additions & 2 deletions mq/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ const (
)

type Client struct {
url string
conn *amqp.Connection
url string
conn *amqp.Connection

// This channel should only be used for management related operations, like declaring queues & exchanges
amqpChan *amqp.Channel

connClients []ConnectionClient
Expand Down
7 changes: 7 additions & 0 deletions mq/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,13 @@ func DefaultConsumerOptions(workers int) *ConsumerOptions {
}
}

// Deprecated: We should not put prefetch limit at channel level. We need to set limit at consumer level
// This option no longer works to limit QoS globally.
//
// From rabbitMQ doc https://www.rabbitmq.com/consumer-prefetch.html
// Unfortunately the channel is not the ideal scope for this - since a single channel may consume from multiple queues,
// the channel and the queue(s) need to coordinate with each other for every message sent to ensure they don't go over
// the limit. This is slow on a single machine, and very slow when consuming across a cluster.
func OptionPrefetchLimit(limit int) Option {
return func(m *Client) error {
err := m.amqpChan.Qos(
Expand Down

0 comments on commit de9ae91

Please sign in to comment.