diff --git a/protocol/amqp/v2/options.go b/protocol/amqp/v2/options.go new file mode 100644 index 000000000..a86e0781b --- /dev/null +++ b/protocol/amqp/v2/options.go @@ -0,0 +1,68 @@ +/* + Copyright 2021 The CloudEvents Authors + SPDX-License-Identifier: Apache-2.0 +*/ + +package amqp + +import ( + "github.com/Azure/go-amqp" +) + +// Option is the function signature required to be considered an amqp.Option. +type Option func(*Protocol) error + +// WithConnOpt sets a connection option for amqp +func WithConnOpts(opt *amqp.ConnOptions) Option { + return func(t *Protocol) error { + t.connOpts = opt + return nil + } +} + +// WithConnSASLPlain sets SASLPlain connection option for amqp +func WithConnSASLPlain(opt *amqp.ConnOptions, username, password string) Option { + opt.SASLType = amqp.SASLTypePlain(username, password) + return WithConnOpts(opt) +} + +// WithSessionOpt sets a session option for amqp +func WithSessionOpts(opt *amqp.SessionOptions) Option { + return func(t *Protocol) error { + t.sessionOpts = opt + return nil + } +} + +// WithSenderLinkOption sets a link option for amqp +func WithSenderOpts(opt *amqp.SenderOptions) Option { + return func(t *Protocol) error { + t.senderOpts = opt + return nil + } +} + +// WithReceiverLinkOption sets a link option for amqp +func WithReceiverOpts(opt *amqp.ReceiverOptions) Option { + return func(t *Protocol) error { + t.receiverOpts = opt + return nil + } +} + +func WithReceiveOpts(opt amqp.ReceiveOptions) Option { + return func(t *Protocol) error { + t.receiveOpts = opt + return nil + } +} + +func WithSendOpts(opt *amqp.SendOptions) Option { + return func(t *Protocol) error { + t.sendOpts = opt + return nil + } +} + +// SenderOptionFunc is the type of amqp.Sender options +type SenderOptionFunc func(sender *sender) diff --git a/protocol/amqp/v2/protocol.go b/protocol/amqp/v2/protocol.go index 8d56406f4..30f82794c 100644 --- a/protocol/amqp/v2/protocol.go +++ b/protocol/amqp/v2/protocol.go @@ -21,6 +21,13 @@ type Protocol struct { ownedClient bool Node string + connOpts *amqp.ConnOptions + sessionOpts *amqp.SessionOptions + senderOpts *amqp.SenderOptions + receiverOpts *amqp.ReceiverOptions + sendOpts *amqp.SendOptions + receiveOpts amqp.ReceiveOptions + // Sender Sender *sender SenderContextDecorators []func(context.Context) context.Context @@ -35,30 +42,32 @@ func NewProtocolFromClient( client *amqp.Conn, session *amqp.Session, queue string, - senderOptions amqp.SenderOptions, - receiverOptions amqp.ReceiverOptions, + opts ...Option, ) (*Protocol, error) { t := &Protocol{ Node: queue, Client: client, Session: session, } + if err := t.applyOptions(opts...); err != nil { + return nil, err + } // Create a sender - amqpSender, err := session.NewSender(ctx, queue, &senderOptions) + amqpSender, err := session.NewSender(ctx, queue, t.senderOpts) if err != nil { _ = client.Close() _ = session.Close(context.Background()) return nil, err } - t.Sender = NewSender(amqpSender, &amqp.SendOptions{}).(*sender) + t.Sender = NewSender(amqpSender, t.sendOpts).(*sender) t.SenderContextDecorators = []func(context.Context) context.Context{} - amqpReceiver, err := t.Session.NewReceiver(ctx, t.Node, &receiverOptions) + amqpReceiver, err := t.Session.NewReceiver(ctx, t.Node, t.receiverOpts) if err != nil { return nil, err } - t.Receiver = NewReceiver(amqpReceiver, amqp.ReceiveOptions{}).(*receiver) + t.Receiver = NewReceiver(amqpReceiver, t.receiveOpts).(*receiver) return t, nil } @@ -68,8 +77,7 @@ func NewProtocol( server, queue string, connOptions amqp.ConnOptions, sessionOptions amqp.SessionOptions, - senderOptions amqp.SenderOptions, - receiverOptions amqp.ReceiverOptions, + opts ...Option, ) (*Protocol, error) { client, err := amqp.Dial(ctx, server, &connOptions) if err != nil { @@ -83,7 +91,7 @@ func NewProtocol( return nil, err } - p, err := NewProtocolFromClient(ctx, client, session, queue, senderOptions, receiverOptions) + p, err := NewProtocolFromClient(ctx, client, session, queue, opts...) if err != nil { return nil, err } @@ -98,22 +106,25 @@ func NewSenderProtocolFromClient( client *amqp.Conn, session *amqp.Session, address string, - senderOptions amqp.SenderOptions, + opts ...Option, ) (*Protocol, error) { t := &Protocol{ Node: address, Client: client, Session: session, } + if err := t.applyOptions(opts...); err != nil { + return nil, err + } // Create a sender - amqpSender, err := session.NewSender(ctx, address, &senderOptions) + amqpSender, err := session.NewSender(ctx, address, t.senderOpts) if err != nil { _ = client.Close() _ = session.Close(context.Background()) return nil, err } - t.Sender = NewSender(amqpSender, &amqp.SendOptions{}).(*sender) + t.Sender = NewSender(amqpSender, t.sendOpts).(*sender) t.SenderContextDecorators = []func(context.Context) context.Context{} return t, nil @@ -125,25 +136,28 @@ func NewReceiverProtocolFromClient( client *amqp.Conn, session *amqp.Session, address string, - receiverOptions amqp.ReceiverOptions, + opts ...Option, ) (*Protocol, error) { t := &Protocol{ Node: address, Client: client, Session: session, } + if err := t.applyOptions(opts...); err != nil { + return nil, err + } t.Node = address - amqpReceiver, err := t.Session.NewReceiver(ctx, address, &receiverOptions) + amqpReceiver, err := t.Session.NewReceiver(ctx, address, t.receiverOpts) if err != nil { return nil, err } - t.Receiver = NewReceiver(amqpReceiver, amqp.ReceiveOptions{}).(*receiver) + t.Receiver = NewReceiver(amqpReceiver, t.receiveOpts).(*receiver) return t, nil } // NewSenderProtocol creates a new sender amqp transport. -func NewSenderProtocol(ctx context.Context, server, address string, connOptions amqp.ConnOptions, sessionOptions amqp.SessionOptions, senderOptions amqp.SenderOptions) (*Protocol, error) { +func NewSenderProtocol(ctx context.Context, server, address string, connOptions amqp.ConnOptions, sessionOptions amqp.SessionOptions, opts ...Option) (*Protocol, error) { client, err := amqp.Dial(ctx, server, &connOptions) if err != nil { return nil, err @@ -156,7 +170,7 @@ func NewSenderProtocol(ctx context.Context, server, address string, connOptions return nil, err } - p, err := NewSenderProtocolFromClient(ctx, client, session, address, senderOptions) + p, err := NewSenderProtocolFromClient(ctx, client, session, address, opts...) if err != nil { return nil, err } @@ -166,7 +180,7 @@ func NewSenderProtocol(ctx context.Context, server, address string, connOptions } // NewReceiverProtocol creates a new receiver amqp transport. -func NewReceiverProtocol(ctx context.Context, server, address string, connOptions amqp.ConnOptions, sessionOptions amqp.SessionOptions, receiverOptions amqp.ReceiverOptions) (*Protocol, error) { +func NewReceiverProtocol(ctx context.Context, server, address string, connOptions amqp.ConnOptions, sessionOptions amqp.SessionOptions, opts ...Option) (*Protocol, error) { client, err := amqp.Dial(ctx, server, &connOptions) if err != nil { return nil, err @@ -179,7 +193,7 @@ func NewReceiverProtocol(ctx context.Context, server, address string, connOption return nil, err } - p, err := NewReceiverProtocolFromClient(ctx, client, session, address, receiverOptions) + p, err := NewReceiverProtocolFromClient(ctx, client, session, address, opts...) if err != nil { return nil, err @@ -189,6 +203,15 @@ func NewReceiverProtocol(ctx context.Context, server, address string, connOption return p, nil } +func (t *Protocol) applyOptions(opts ...Option) error { + for _, fn := range opts { + if err := fn(t); err != nil { + return err + } + } + return nil +} + func (t *Protocol) Close(ctx context.Context) (err error) { if t.ownedClient { // Closing the client will close at cascade sender and receiver diff --git a/samples/amqp/receiver/main.go b/samples/amqp/receiver/main.go index 95dbe75e3..954eff27c 100644 --- a/samples/amqp/receiver/main.go +++ b/samples/amqp/receiver/main.go @@ -14,7 +14,6 @@ import ( "strings" "github.com/Azure/go-amqp" - ceamqp "github.com/cloudevents/sdk-go/protocol/amqp/v2" cloudevents "github.com/cloudevents/sdk-go/v2" ) @@ -44,8 +43,7 @@ func sampleConfig() (server, node string, opts amqp.ConnOptions) { func main() { host, node, opts := sampleConfig() - p, err := ceamqp.NewProtocol(context.Background(), host, node, opts, amqp.SessionOptions{}, - amqp.SenderOptions{}, amqp.ReceiverOptions{}) + p, err := ceamqp.NewProtocol(context.Background(), host, node, opts, amqp.SessionOptions{}) if err != nil { log.Fatalf("Failed to create AMQP protocol: %v", err) } diff --git a/samples/amqp/sender/main.go b/samples/amqp/sender/main.go index 0fe5c1a28..075615b3c 100644 --- a/samples/amqp/sender/main.go +++ b/samples/amqp/sender/main.go @@ -53,8 +53,7 @@ type Example struct { func main() { host, node, opts := sampleConfig() - p, err := ceamqp.NewProtocol(context.Background(), host, node, opts, amqp.SessionOptions{}, amqp.SenderOptions{}, - amqp.ReceiverOptions{}) + p, err := ceamqp.NewProtocol(context.Background(), host, node, opts, amqp.SessionOptions{}) if err != nil { log.Fatalf("Failed to create amqp protocol: %v", err) } diff --git a/test/integration/amqp/amqp_test.go b/test/integration/amqp/amqp_test.go index b9ca4aeb4..6aeac59d0 100644 --- a/test/integration/amqp/amqp_test.go +++ b/test/integration/amqp/amqp_test.go @@ -45,18 +45,18 @@ func TestSenderReceiverEvent(t *testing.T) { } func senderProtocolFactory(t *testing.T) *protocolamqp.Protocol { - c, ss, a, so, _ := testClient(t) + c, ss, a := testClient(t) - p, err := protocolamqp.NewSenderProtocolFromClient(context.Background(), c, ss, a, so) + p, err := protocolamqp.NewSenderProtocolFromClient(context.Background(), c, ss, a) require.NoError(t, err) return p } func receiverProtocolFactory(t *testing.T) *protocolamqp.Protocol { - c, ss, a, _, ro := testClient(t) + c, ss, a := testClient(t) - p, err := protocolamqp.NewReceiverProtocolFromClient(context.Background(), c, ss, a, ro) + p, err := protocolamqp.NewReceiverProtocolFromClient(context.Background(), c, ss, a) require.NoError(t, err) return p @@ -69,8 +69,7 @@ func receiverProtocolFactory(t *testing.T) *protocolamqp.Protocol { // On option is http://qpid.apache.org/components/dispatch-router/indexthtml. // It can be installed from source or from RPMs, see https://qpid.apache.org/packages.html // Run `qdrouterd` and the tests will work with no further config. -func testClient(t *testing.T) (client *amqp.Conn, session *amqp.Session, addr string, - senderOpts amqp.SenderOptions, receiverOpts amqp.ReceiverOptions) { +func testClient(t *testing.T) (client *amqp.Conn, session *amqp.Session, addr string) { t.Helper() addr = "test" s := os.Getenv("TEST_AMQP_URL") @@ -83,17 +82,14 @@ func testClient(t *testing.T) (client *amqp.Conn, session *amqp.Session, addr st } session, err = client.NewSession(context.Background(), &amqp.SessionOptions{}) require.NoError(t, err) - senderOpts = amqp.SenderOptions{} - require.NotNil(t, senderOpts) - receiverOpts = amqp.ReceiverOptions{} - require.NotNil(t, receiverOpts) - return client, session, addr, senderOpts, receiverOpts + + return client, session, addr } func protocolFactory(t *testing.T) *protocolamqp.Protocol { - c, ss, a, so, ro := testClient(t) + c, ss, a := testClient(t) - p, err := protocolamqp.NewProtocolFromClient(context.Background(), c, ss, a, so, ro) + p, err := protocolamqp.NewProtocolFromClient(context.Background(), c, ss, a) require.NoError(t, err) return p diff --git a/test/integration/amqp_binding/amqp_test.go b/test/integration/amqp_binding/amqp_test.go index 66246d0c4..0379331ae 100644 --- a/test/integration/amqp_binding/amqp_test.go +++ b/test/integration/amqp_binding/amqp_test.go @@ -90,8 +90,7 @@ func TestSendEventReceiveBinary(t *testing.T) { // On option is http://qpid.apache.org/components/dispatch-router/indexthtml. // It can be installed from source or from RPMs, see https://qpid.apache.org/packages.html // Run `qdrouterd` and the tests will work with no further config. -func testClient(t testing.TB) (client *amqp.Conn, session *amqp.Session, addr string, - senderOpts *amqp.SenderOptions, receiverOpts *amqp.ReceiverOptions) { +func testClient(t testing.TB) (client *amqp.Conn, session *amqp.Session, addr string) { t.Helper() addr = "test" s := os.Getenv("TEST_AMQP_URL") @@ -104,21 +103,18 @@ func testClient(t testing.TB) (client *amqp.Conn, session *amqp.Session, addr st } session, err = client.NewSession(context.Background(), &amqp.SessionOptions{}) require.NoError(t, err) - senderOpts = &amqp.SenderOptions{} - require.NotNil(t, senderOpts) - receiverOpts = &amqp.ReceiverOptions{} - require.NotNil(t, receiverOpts) - return client, session, addr, senderOpts, receiverOpts + + return client, session, addr } func testSenderReceiver(t testing.TB) (io.Closer, bindings.Sender, bindings.Receiver) { - c, ss, a, so, ro := testClient(t) - r, err := ss.NewReceiver(context.Background(), a, ro) + c, ss, a := testClient(t) + r, err := ss.NewReceiver(context.Background(), a, nil) require.NoError(t, err) - s, err := ss.NewSender(context.Background(), a, so) + s, err := ss.NewSender(context.Background(), a, nil) require.NoError(t, err) - return c, protocolamqp.NewSender(s, &amqp.SendOptions{}), protocolamqp.NewReceiver(r, amqp.ReceiveOptions{}) + return c, protocolamqp.NewSender(s, nil), protocolamqp.NewReceiver(r, amqp.ReceiveOptions{}) } func BenchmarkSendReceive(b *testing.B) {