-
Notifications
You must be signed in to change notification settings - Fork 217
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add v3 version of nats jetstream protocol with integration tests and samples #1095
base: main
Are you sure you want to change the base?
add v3 version of nats jetstream protocol with integration tests and samples #1095
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Regarding the samples, please take a look at how we use a protocol in https://github.com/cloudevents/sdk-go/blob/main/samples/kafka_confluent/sender/main.go
16961e8
to
6f1ff3d
Compare
6f1ff3d
to
df22aee
Compare
0e2da9c
to
2cbe682
Compare
@stephen-totty-hpe Thanks! It looks good to me. Leave it to @embano1 ~ |
type ProtocolOption func(*Protocol) error | ||
|
||
// WithURL configures the Sender and/or Receiver | ||
func WithURL(url string, natsOpts ...nats.Option) ProtocolOption { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's quite a lot of logic in this option. I thought URL was always required? If so, better to make it a constructor parameter and configure these fields after all options are applied.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can use one of two options:
- URL +nats.Options // if you do not have a connection yet
- *nats.Conn // if you already have a connection.
This requires either:
- two constructors
a) context.Context, URL , nats.Options, ...ProtocolOption
b) context.Context, *nats.Conn, ...ProtocolOption - Options that makes the two cases "mutually exclusive"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Simplified more. One constructor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, thx for clarifying. So we need those two options then. I don't think having an option with additional options here is what we want though. Please remove the additional options slice in this option then.
func WithURL(url string, natsOpts ...nats.Option) ProtocolOption { | |
func WithURL(url string) ProtocolOption { |
} | ||
} | ||
|
||
// WithConnection configures the Sender and/or Receiver |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please update all option comments with a clear description of what they do e.g.,
// WithConnection configures the Sender and/or Receiver | |
// WithConnection uses the provided connection in the protocol sender and receiver |
protocol/nats_jetstream/v3/sender.go
Outdated
|
||
// NewSender creates a new protocol.Sender responsible for opening and closing the NATS connection | ||
// Use WithURL() OR WithConnection() | ||
func NewSender(ctx context.Context, subject string, opts ...ProtocolOption) (*sender, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make these sender/receiver constructors private?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The protocol constructor makes both a sender and receiver. in some cases you might only want a sender OR receiver.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can move sendSubject to an Option.
Then I can check:
A) ConsumerConfig/OrderedConsumerConfig to see if the use wants a receiver
B) SendSubject to see if the user wants a sender
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thx for all the hard work you put into this! Some minor comments on the Options implementation (really like it) and I think we can further reduce the complexity of this code base, especially in the sender/receiver implementations. I suggested to take a look at the HTTP/Kafka (Confluent) protocol implementation for how to simplify the internal logic of this protocol.
2cbe682
to
9d9a30e
Compare
9d9a30e
to
5a16005
Compare
…samples Signed-off-by: stephen-totty-hpe <[email protected]>
5a16005
to
7f198df
Compare
if got == nil { | ||
t.Errorf("Error in NewMessage!") | ||
} | ||
err := got.ReadBinary(context.TODO(), &outBinaryMessage) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: shouldn't you also assert that the contents of the messages are as expected?
} | ||
|
||
// WithConsumerConfig creates a unordered consumer used in the protocol receiver. | ||
// If WithOrderedConsumerConfig is also given, the receiver will error. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// If WithOrderedConsumerConfig is also given, the receiver will error. | |
// This option is mutually exclusive with WithOrderedConsumerConfig. |
} | ||
|
||
// WithOrderedConsumerConfig creates a ordered consumer used in the protocol receiver. | ||
// If WithConsumerConfig is also given, the receiver will error. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// If WithConsumerConfig is also given, the receiver will error. | |
// This option is mutually exclusive with WithConsumerConfig. |
type ProtocolOption func(*Protocol) error | ||
|
||
// WithURL configures the Sender and/or Receiver | ||
func WithURL(url string, natsOpts ...nats.Option) ProtocolOption { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, thx for clarifying. So we need those two options then. I don't think having an option with additional options here is what we want though. Please remove the additional options slice in this option then.
func WithURL(url string, natsOpts ...nats.Option) ProtocolOption { | |
func WithURL(url string) ProtocolOption { |
SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package nats_jetstream |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: many of these tests don't assert error cases. If you run your tests with coverage, you'll see that several code paths are not checked
// Before closing, let's be sure OpenInbound completes | ||
// We send a signal to close and then we lock on subMtx in order | ||
// to wait OpenInbound to finish draining the queue | ||
p.internalClose <- struct{}{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If OpenInbound is already done (ctx cancelled) this will deadlock
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A quick fix IMHO is a buffered channel of 1
|
||
// validateOptions runs after all options have been applied and makes sure needed options were set correctly. | ||
func (p *Protocol) validateOptions() error { | ||
if p.url == "" && p.conn == nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you also need to check for conflicting options here
p.incoming <- msgErr{msg: NewMessage(msg)} | ||
} | ||
|
||
// Close implements Closer.Close |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// Close implements Closer.Close | |
// Close must be called after use to releases internal resources and close the underlying connection |
return conn | ||
} | ||
|
||
func testProtocol(ctx context.Context, t testing.TB, natsConn *nats.Conn, consumerConfig any, opts ...ce_nats.ProtocolOption) (func(), bindings.Sender, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this be TestProtocol?
} | ||
} | ||
|
||
func testConn(t testing.TB) *nats.Conn { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Helper, therefore let's not confuse names here
func testConn(t testing.TB) *nats.Conn { | |
func createTestConnection(t testing.TB) *nats.Conn { |
As been discussed previously, there has been a desire to implement the CloudEventSDK nats jetstream protocol using the newer jetstream package. The newer jetstream package exposes more jetstream specific functionality that should make it more flexible to use.
Much of this code is stolen from the v2 implementation and changed where necessary. It builds upon comments made in:
#1083
Under the covers, this uses a jetstream.Consumer, which can be "normal" or "ordered". This is done using consumer options WithConsumerConfig and WithOrderedConsumerConfig.
Also, the need to know the stream names upfront is not needed. Many of the internal options are available if needed, although the only required option is the ConsumerOption with the correct config.