-
Notifications
You must be signed in to change notification settings - Fork 13
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
65 consume message when header has expected value #66
65 consume message when header has expected value #66
Conversation
internal/cronsumer.go
Outdated
@@ -62,6 +62,11 @@ func (k *kafkaCronsumer) Listen(ctx context.Context, strategyName string, cancel | |||
|
|||
msg := NewMessageWrapper(*m, strategyName) | |||
|
|||
if k.cfg.Consumer.HeaderFilterFn != nil && k.cfg.Consumer.HeaderFilterFn(msg.Headers) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can also add headerFn in kafkaCronsumer
struct like we did for consumeFn.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We thought about it and it will filter only when consuming, isn't it better in consumer part of config?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reasonable :D
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean, instead of access headerFilterFn via k.cfg.Consumer (k.cfg.Consumer.HeaderFilterFn(msg.Headers)
), we can extract a field named headerFilterFn within kafkaCronsumer
struct so we can access like k.headerFilterFn(msg.Headers)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I got it now :D, we can do that, so I think it still will be defined in config, but we are gonna pass it to newKafkaCronsumer
function in newCronsumer
function. Am i right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Integration test didn't pass. We can close the resp channel maybe. Could you check. 🙏🏻 |
Implementation of header filter function, solves issue #65