diff --git a/protocol/amqp/v2/options.go b/protocol/amqp/v2/options.go index a86e0781b..ca5fd986b 100644 --- a/protocol/amqp/v2/options.go +++ b/protocol/amqp/v2/options.go @@ -12,7 +12,10 @@ import ( // Option is the function signature required to be considered an amqp.Option. type Option func(*Protocol) error -// WithConnOpt sets a connection option for amqp +type SendOption func(sender *sender) +type ReceiveOption func(receiver *receiver) + +// WithConnOpts sets a connection option for amqp func WithConnOpts(opt *amqp.ConnOptions) Option { return func(t *Protocol) error { t.connOpts = opt @@ -26,7 +29,7 @@ func WithConnSASLPlain(opt *amqp.ConnOptions, username, password string) Option return WithConnOpts(opt) } -// WithSessionOpt sets a session option for amqp +// WithSessionOpts sets a session option for amqp func WithSessionOpts(opt *amqp.SessionOptions) Option { return func(t *Protocol) error { t.sessionOpts = opt @@ -34,7 +37,7 @@ func WithSessionOpts(opt *amqp.SessionOptions) Option { } } -// WithSenderLinkOption sets a link option for amqp +// WithSenderOpts sets a link option for amqp func WithSenderOpts(opt *amqp.SenderOptions) Option { return func(t *Protocol) error { t.senderOpts = opt @@ -42,7 +45,7 @@ func WithSenderOpts(opt *amqp.SenderOptions) Option { } } -// WithReceiverLinkOption sets a link option for amqp +// WithReceiverOpts sets a link option for amqp func WithReceiverOpts(opt *amqp.ReceiverOptions) Option { return func(t *Protocol) error { t.receiverOpts = opt @@ -50,13 +53,15 @@ func WithReceiverOpts(opt *amqp.ReceiverOptions) Option { } } -func WithReceiveOpts(opt amqp.ReceiveOptions) Option { +// WithReceiveOpts sets a receive option for amqp +func WithReceiveOpts(opt *amqp.ReceiveOptions) Option { return func(t *Protocol) error { t.receiveOpts = opt return nil } } +// WithSendOpts sets a send option for amqp func WithSendOpts(opt *amqp.SendOptions) Option { return func(t *Protocol) error { t.sendOpts = opt @@ -64,5 +69,19 @@ func WithSendOpts(opt *amqp.SendOptions) Option { } } +// WithSendOptions sets send options for amqp +func WithSendOptions(opts *amqp.SendOptions) SendOption { + return func(t *sender) { + t.options = opts + } +} + +// WithReceiveOptions sets receive options for amqp +func WithReceiveOptions(opts *amqp.ReceiveOptions) ReceiveOption { + return func(t *receiver) { + t.options = opts + } +} + // SenderOptionFunc is the type of amqp.Sender options type SenderOptionFunc func(sender *sender) diff --git a/protocol/amqp/v2/protocol.go b/protocol/amqp/v2/protocol.go index 30f82794c..cc4ad15ef 100644 --- a/protocol/amqp/v2/protocol.go +++ b/protocol/amqp/v2/protocol.go @@ -26,7 +26,7 @@ type Protocol struct { senderOpts *amqp.SenderOptions receiverOpts *amqp.ReceiverOptions sendOpts *amqp.SendOptions - receiveOpts amqp.ReceiveOptions + receiveOpts *amqp.ReceiveOptions // Sender Sender *sender @@ -60,14 +60,14 @@ func NewProtocolFromClient( _ = session.Close(context.Background()) return nil, err } - t.Sender = NewSender(amqpSender, t.sendOpts).(*sender) + t.Sender = NewSender(amqpSender, WithSendOptions(t.sendOpts)).(*sender) t.SenderContextDecorators = []func(context.Context) context.Context{} amqpReceiver, err := t.Session.NewReceiver(ctx, t.Node, t.receiverOpts) if err != nil { return nil, err } - t.Receiver = NewReceiver(amqpReceiver, t.receiveOpts).(*receiver) + t.Receiver = NewReceiver(amqpReceiver, WithReceiveOptions(t.receiveOpts)).(*receiver) return t, nil } @@ -124,7 +124,8 @@ func NewSenderProtocolFromClient( _ = session.Close(context.Background()) return nil, err } - t.Sender = NewSender(amqpSender, t.sendOpts).(*sender) + t.Sender = NewSender(amqpSender).(*sender) + t.SenderContextDecorators = []func(context.Context) context.Context{} return t, nil @@ -152,7 +153,7 @@ func NewReceiverProtocolFromClient( if err != nil { return nil, err } - t.Receiver = NewReceiver(amqpReceiver, t.receiveOpts).(*receiver) + t.Receiver = NewReceiver(amqpReceiver, WithReceiveOptions(t.receiveOpts)).(*receiver) return t, nil } diff --git a/protocol/amqp/v2/receiver.go b/protocol/amqp/v2/receiver.go index dbc130aa5..c5fc43ff5 100644 --- a/protocol/amqp/v2/receiver.go +++ b/protocol/amqp/v2/receiver.go @@ -21,11 +21,11 @@ const serverDown = "session ended by server" // receiver wraps an amqp.Receiver as a binding.Receiver type receiver struct { amqp *amqp.Receiver - options amqp.ReceiveOptions + options *amqp.ReceiveOptions } func (r *receiver) Receive(ctx context.Context) (binding.Message, error) { - m, err := r.amqp.Receive(ctx, &r.options) + m, err := r.amqp.Receive(ctx, r.options) if err != nil { if err == ctx.Err() { return nil, io.EOF @@ -41,6 +41,15 @@ func (r *receiver) Receive(ctx context.Context) (binding.Message, error) { } // NewReceiver create a new Receiver which wraps an amqp.Receiver in a binding.Receiver -func NewReceiver(amqp *amqp.Receiver, options amqp.ReceiveOptions) protocol.Receiver { - return &receiver{amqp: amqp, options: options} +func NewReceiver(amqp *amqp.Receiver, opts ...ReceiveOption) protocol.Receiver { + r := &receiver{amqp: amqp, options: nil} + applyReceiveOptions(r, opts...) + return r +} + +func applyReceiveOptions(s *receiver, opts ...ReceiveOption) *receiver { + for _, o := range opts { + o(s) + } + return s } diff --git a/protocol/amqp/v2/sender.go b/protocol/amqp/v2/sender.go index 320f61051..a5ded68df 100644 --- a/protocol/amqp/v2/sender.go +++ b/protocol/amqp/v2/sender.go @@ -39,8 +39,15 @@ func (s *sender) Send(ctx context.Context, in binding.Message, transformers ...b } // NewSender creates a new Sender which wraps an amqp.Sender in a binding.Sender -func NewSender(amqpSender *amqp.Sender, options *amqp.SendOptions) protocol.Sender { - s := &sender{amqp: amqpSender, options: options} +func NewSender(amqpSender *amqp.Sender, opts ...SendOption) protocol.Sender { + s := &sender{amqp: amqpSender, options: nil} + applySenderOptions(s, opts...) + return s +} +func applySenderOptions(s *sender, opts ...SendOption) *sender { + for _, o := range opts { + o(s) + } return s } diff --git a/test/integration/amqp_binding/amqp_test.go b/test/integration/amqp_binding/amqp_test.go index 0379331ae..9c6bc2d9c 100644 --- a/test/integration/amqp_binding/amqp_test.go +++ b/test/integration/amqp_binding/amqp_test.go @@ -10,6 +10,7 @@ import ( "io" "net/url" "os" + "strings" "testing" "github.com/Azure/go-amqp" @@ -95,7 +96,7 @@ func testClient(t testing.TB) (client *amqp.Conn, session *amqp.Session, addr st addr = "test" s := os.Getenv("TEST_AMQP_URL") if u, err := url.Parse(s); err == nil && u.Path != "" { - addr = u.Path + addr = strings.TrimPrefix(u.Path, "/") } client, err := amqp.Dial(context.Background(), s, &amqp.ConnOptions{}) if err != nil { @@ -105,7 +106,6 @@ func testClient(t testing.TB) (client *amqp.Conn, session *amqp.Session, addr st require.NoError(t, err) return client, session, addr - } func testSenderReceiver(t testing.TB) (io.Closer, bindings.Sender, bindings.Receiver) { @@ -114,7 +114,7 @@ func testSenderReceiver(t testing.TB) (io.Closer, bindings.Sender, bindings.Rece require.NoError(t, err) s, err := ss.NewSender(context.Background(), a, nil) require.NoError(t, err) - return c, protocolamqp.NewSender(s, nil), protocolamqp.NewReceiver(r, amqp.ReceiveOptions{}) + return c, protocolamqp.NewSender(s), protocolamqp.NewReceiver(r) } func BenchmarkSendReceive(b *testing.B) {