-
Notifications
You must be signed in to change notification settings - Fork 0
/
consumer_opts.go
118 lines (103 loc) · 3.3 KB
/
consumer_opts.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
package easykafka
import (
"github.com/segmentio/kafka-go"
"time"
)
// ConsumerWithLogger sets logger
func ConsumerWithLogger[T any](logger kafka.Logger) ConsumerOption[T] {
return func(c *Consumer[T]) (err error) {
c.LoggerContainer.logger = logger
return nil
}
}
// ConsumerWithErrorLogger sets error logger
func ConsumerWithErrorLogger[T any](logger kafka.Logger) ConsumerOption[T] {
return func(c *Consumer[T]) (err error) {
c.LoggerContainer.errorLogger = logger
return nil
}
}
// ConsumerWithWrongMessageHandler sets handler for wrong message
func ConsumerWithWrongMessageHandler[T any](handler ConsumerErrorHandler[T]) ConsumerOption[T] {
return func(k *Consumer[T]) (err error) {
k.onWrongMessage = &handler
return nil
}
}
// ConsumerWithReadMessageHandler sets handler for read message
func ConsumerWithReadMessageHandler[T any](handler ConsumerErrorHandler[T]) ConsumerOption[T] {
return func(k *Consumer[T]) (err error) {
k.onReadError = &handler
return nil
}
}
// ConsumerWithTopicsListUpdatedHandler sets handler for topics list update
func ConsumerWithTopicsListUpdatedHandler[T any](handler ConsumerTopicsListUpdatedHandler[T]) ConsumerOption[T] {
return func(k *Consumer[T]) (err error) {
k.onTopicsListUpdated = &handler
return nil
}
}
// ConsumerWithOnFailCommitHandler sets handler for commit error
func ConsumerWithOnFailCommitHandler[T any](handler ConsumerErrorHandler[T]) ConsumerOption[T] {
return func(k *Consumer[T]) (err error) {
k.onFailCommit = &handler
return nil
}
}
// ConsumerWithReaderConfig sets reader config
func ConsumerWithReaderConfig[T any](config kafka.ReaderConfig) ConsumerOption[T] {
return func(k *Consumer[T]) (err error) {
k.readerConfig = config
return nil
}
}
// ConsumerWithMaxBlockingTasks sets max blocking tasks
func ConsumerWithMaxBlockingTasks[T any](count uint) ConsumerOption[T] {
return func(k *Consumer[T]) (err error) {
k.maxBlockingTasks = count
return nil
}
}
// ConsumerInitialPartitionsCount sets initial partitions count
func ConsumerInitialPartitionsCount[T any](count uint) ConsumerOption[T] {
return func(c *Consumer[T]) (err error) {
c.partitions = count
return nil
}
}
// ConsumerConcurrency sets parallel tasks count
func ConsumerConcurrency[T any](concurrency uint) ConsumerOption[T] {
return func(c *Consumer[T]) (err error) {
c.concurrency = concurrency
return nil
}
}
// ConsumerDynamicTopicsDiscovery enable dynamic topics discovery
func ConsumerDynamicTopicsDiscovery[T any]() ConsumerOption[T] {
return func(c *Consumer[T]) (err error) {
c.dynamicTopicsDiscovery = true
return nil
}
}
// ConsumerDynamicTopicsDiscoveryInterval sets dynamic topics discovery interval
func ConsumerDynamicTopicsDiscoveryInterval[T any](interval time.Duration) ConsumerOption[T] {
return func(c *Consumer[T]) (err error) {
c.dynamicTopicsDiscoveryInterval = interval
return nil
}
}
// ConsumerTopicNamesRegexMatch enable regex match for topic names
func ConsumerTopicNamesRegexMatch[T any]() ConsumerOption[T] {
return func(c *Consumer[T]) (err error) {
c.topicNamesRegexMatch = true
return nil
}
}
// ConsumerTopicNamesExactMatch enable exact match for topic names
func ConsumerTopicNamesExactMatch[T any]() ConsumerOption[T] {
return func(c *Consumer[T]) (err error) {
c.topicNamesRegexMatch = false
return nil
}
}