Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#1039 Migrate azure/go-amqp to version 1.0.5 #1040

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions protocol/amqp/v2/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ go 1.18
replace github.com/cloudevents/sdk-go/v2 => ../../../v2

require (
github.com/Azure/go-amqp v0.17.0
github.com/Azure/go-amqp v1.0.5
github.com/cloudevents/sdk-go/v2 v2.5.0
github.com/stretchr/testify v1.8.0
github.com/stretchr/testify v1.9.0
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/google/go-cmp v0.5.1 // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/json-iterator/go v1.1.10 // indirect
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 // indirect
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742 // indirect
Expand Down
18 changes: 6 additions & 12 deletions protocol/amqp/v2/go.sum
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
github.com/Azure/go-amqp v0.17.0 h1:HHXa3149nKrI0IZwyM7DRcRy5810t9ZICDutn4BYzj4=
github.com/Azure/go-amqp v0.17.0/go.mod h1:9YJ3RhxRT1gquYnzpZO1vcYMMpAdJT+QEg6fwmw9Zlg=
github.com/Azure/go-amqp v1.0.5 h1:po5+ljlcNSU8xtapHTe8gIc8yHxCzC03E8afH2g1ftU=
github.com/Azure/go-amqp v1.0.5/go.mod h1:vZAogwdrkbyK3Mla8m/CxSc/aKdnTZ4IbPxl51Y5WZE=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
github.com/google/go-cmp v0.5.1 h1:JFrFEBb2xKufg6XkJsJr+WbKb4FQlURi5RUcBveYu9k=
github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/json-iterator/go v1.1.10 h1:Kz6Cvnvv2wGdaG/V8yMvfkmNiXq9Ya2KUv4rouJJr68=
github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
Expand All @@ -19,18 +18,13 @@ github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWb
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU=
go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI=
go.uber.org/zap v1.10.0 h1:ORx85nbTijNz8ljznvCMR1ZBIPKFn3jQrag10X2AsuM=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
2 changes: 1 addition & 1 deletion protocol/amqp/v2/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ const prefix = "cloudEvents:" // Name prefix for AMQP properties that hold CE at

var (
// Use the package path as AMQP error condition name
condition = amqp.ErrorCondition(reflect.TypeOf(Message{}).PkgPath())
condition = amqp.ErrCond(reflect.TypeOf(Message{}).PkgPath())
specs = spec.WithPrefix(prefix)
)

Expand Down
62 changes: 48 additions & 14 deletions protocol/amqp/v2/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,42 +12,76 @@ import (
// Option is the function signature required to be considered an amqp.Option.
type Option func(*Protocol) error

// WithConnOpt sets a connection option for amqp
func WithConnOpt(opt amqp.ConnOption) Option {
type SendOption func(sender *sender)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: both need doc strings

type ReceiveOption func(receiver *receiver)

// WithConnOpts sets a connection option for amqp
func WithConnOpts(opt *amqp.ConnOptions) Option {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you're using pointer semantics in those options, be careful with races - users might not be aware that you're passing the pointer around (instead of copy) and this could lead to surprises for users. I suggest using value semantic to avoid races and these surprises.

return func(t *Protocol) error {
t.connOpts = append(t.connOpts, opt)
t.connOpts = opt
return nil
}
}

// WithConnSASLPlain sets SASLPlain connection option for amqp
func WithConnSASLPlain(username, password string) Option {
return WithConnOpt(amqp.ConnSASLPlain(username, password))
func WithConnSASLPlain(opt *amqp.ConnOptions, username, password string) Option {
opt.SASLType = amqp.SASLTypePlain(username, password)
return WithConnOpts(opt)
}

// WithSessionOpt sets a session option for amqp
func WithSessionOpt(opt amqp.SessionOption) Option {
// WithSessionOpts sets a session option for amqp
func WithSessionOpts(opt *amqp.SessionOptions) Option {
return func(t *Protocol) error {
t.sessionOpts = append(t.sessionOpts, opt)
t.sessionOpts = opt
return nil
}
}

// WithSenderLinkOption sets a link option for amqp
func WithSenderLinkOption(opt amqp.LinkOption) Option {
// WithSenderOpts sets a link option for amqp
func WithSenderOpts(opt *amqp.SenderOptions) Option {
return func(t *Protocol) error {
t.senderLinkOpts = append(t.senderLinkOpts, opt)
t.senderOpts = opt
return nil
}
}

// WithReceiverLinkOption sets a link option for amqp
func WithReceiverLinkOption(opt amqp.LinkOption) Option {
// WithReceiverOpts sets a link option for amqp
func WithReceiverOpts(opt *amqp.ReceiverOptions) Option {
return func(t *Protocol) error {
t.receiverLinkOpts = append(t.receiverLinkOpts, opt)
t.receiverOpts = opt
return nil
}
}

// WithReceiveOpts sets a receive option for amqp
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please explain why we need WithReceiverOpts and WithReceiveOpts (also for send)? Is there a way to only have one? These changes complicate the API and I'm having a hard time understanding why we need those very similar options (and I guess new CE users would also be slightly confused?)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, this is for the sender/receiver construction, which currently does not support options. Do we need this new functionality at the cost of a more complex user API?

func WithReceiveOpts(opt *amqp.ReceiveOptions) Option {
return func(t *Protocol) error {
t.receiveOpts = opt
return nil
}
}

// WithSendOpts sets a send option for amqp
func WithSendOpts(opt *amqp.SendOptions) Option {
return func(t *Protocol) error {
t.sendOpts = opt
return nil
}
}

// WithSendOptions sets send options for amqp
func WithSendOptions(opts *amqp.SendOptions) SendOption {
return func(t *sender) {
t.options = opts
}
}

// WithReceiveOptions sets receive options for amqp
func WithReceiveOptions(opts *amqp.ReceiveOptions) ReceiveOption {
return func(t *receiver) {
t.options = opts
}
}

// SenderOptionFunc is the type of amqp.Sender options
type SenderOptionFunc func(sender *sender)
113 changes: 65 additions & 48 deletions protocol/amqp/v2/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,19 @@ import (
)

type Protocol struct {
connOpts []amqp.ConnOption
sessionOpts []amqp.SessionOption
senderLinkOpts []amqp.LinkOption
receiverLinkOpts []amqp.LinkOption

// AMQP
Client *amqp.Client
Client *amqp.Conn
Session *amqp.Session
ownedClient bool
Node string

connOpts *amqp.ConnOptions
sessionOpts *amqp.SessionOptions
senderOpts *amqp.SenderOptions
receiverOpts *amqp.ReceiverOptions
sendOpts *amqp.SendOptions
receiveOpts *amqp.ReceiveOptions

// Sender
Sender *sender
SenderContextDecorators []func(context.Context) context.Context
Expand All @@ -35,54 +37,61 @@ type Protocol struct {
}

// NewProtocolFromClient creates a new amqp transport.
func NewProtocolFromClient(client *amqp.Client, session *amqp.Session, queue string, opts ...Option) (*Protocol, error) {
func NewProtocolFromClient(
embano1 marked this conversation as resolved.
Show resolved Hide resolved
ctx context.Context,
client *amqp.Conn,
session *amqp.Session,
queue string,
opts ...Option,
) (*Protocol, error) {
t := &Protocol{
Node: queue,
senderLinkOpts: []amqp.LinkOption(nil),
receiverLinkOpts: []amqp.LinkOption(nil),
Client: client,
Session: session,
Node: queue,
Client: client,
Session: session,
}
if err := t.applyOptions(opts...); err != nil {
return nil, err
}

t.senderLinkOpts = append(t.senderLinkOpts, amqp.LinkTargetAddress(queue))

// Create a sender
amqpSender, err := session.NewSender(t.senderLinkOpts...)
amqpSender, err := session.NewSender(ctx, queue, t.senderOpts)
if err != nil {
_ = client.Close()
_ = session.Close(context.Background())
return nil, err
}
t.Sender = NewSender(amqpSender).(*sender)
t.Sender = NewSender(amqpSender, WithSendOptions(t.sendOpts)).(*sender)
t.SenderContextDecorators = []func(context.Context) context.Context{}

t.receiverLinkOpts = append(t.receiverLinkOpts, amqp.LinkSourceAddress(t.Node))
amqpReceiver, err := t.Session.NewReceiver(t.receiverLinkOpts...)
amqpReceiver, err := t.Session.NewReceiver(ctx, t.Node, t.receiverOpts)
if err != nil {
return nil, err
}
t.Receiver = NewReceiver(amqpReceiver).(*receiver)
t.Receiver = NewReceiver(amqpReceiver, WithReceiveOptions(t.receiveOpts)).(*receiver)
return t, nil
}

// NewProtocol creates a new amqp transport.
func NewProtocol(server, queue string, connOption []amqp.ConnOption, sessionOption []amqp.SessionOption, opts ...Option) (*Protocol, error) {
client, err := amqp.Dial(server, connOption...)
func NewProtocol(
embano1 marked this conversation as resolved.
Show resolved Hide resolved
ctx context.Context,
server, queue string,
connOptions amqp.ConnOptions,
sessionOptions amqp.SessionOptions,
opts ...Option,
) (*Protocol, error) {
client, err := amqp.Dial(ctx, server, &connOptions)
if err != nil {
return nil, err
}

// Open a session
session, err := client.NewSession(sessionOption...)
session, err := client.NewSession(ctx, &sessionOptions)
if err != nil {
_ = client.Close()
return nil, err
}

p, err := NewProtocolFromClient(client, session, queue, opts...)
p, err := NewProtocolFromClient(ctx, client, session, queue, opts...)
if err != nil {
return nil, err
}
Expand All @@ -92,69 +101,77 @@ func NewProtocol(server, queue string, connOption []amqp.ConnOption, sessionOpti
}

// NewSenderProtocolFromClient creates a new amqp sender transport.
func NewSenderProtocolFromClient(client *amqp.Client, session *amqp.Session, address string, opts ...Option) (*Protocol, error) {
func NewSenderProtocolFromClient(
ctx context.Context,
client *amqp.Conn,
session *amqp.Session,
address string,
opts ...Option,
embano1 marked this conversation as resolved.
Show resolved Hide resolved
) (*Protocol, error) {
t := &Protocol{
Node: address,
senderLinkOpts: []amqp.LinkOption(nil),
receiverLinkOpts: []amqp.LinkOption(nil),
Client: client,
Session: session,
Node: address,
Client: client,
Session: session,
}
if err := t.applyOptions(opts...); err != nil {
return nil, err
}
t.senderLinkOpts = append(t.senderLinkOpts, amqp.LinkTargetAddress(address))

// Create a sender
amqpSender, err := session.NewSender(t.senderLinkOpts...)
amqpSender, err := session.NewSender(ctx, address, t.senderOpts)
if err != nil {
_ = client.Close()
_ = session.Close(context.Background())
return nil, err
}
t.Sender = NewSender(amqpSender).(*sender)

t.SenderContextDecorators = []func(context.Context) context.Context{}

return t, nil
}

// NewReceiverProtocolFromClient creates a new receiver amqp transport.
func NewReceiverProtocolFromClient(client *amqp.Client, session *amqp.Session, address string, opts ...Option) (*Protocol, error) {
func NewReceiverProtocolFromClient(
ctx context.Context,
client *amqp.Conn,
session *amqp.Session,
address string,
opts ...Option,
) (*Protocol, error) {
t := &Protocol{
Node: address,
senderLinkOpts: []amqp.LinkOption(nil),
receiverLinkOpts: []amqp.LinkOption(nil),
Client: client,
Session: session,
Node: address,
Client: client,
Session: session,
}
if err := t.applyOptions(opts...); err != nil {
return nil, err
}

t.Node = address
t.receiverLinkOpts = append(t.receiverLinkOpts, amqp.LinkSourceAddress(address))
amqpReceiver, err := t.Session.NewReceiver(t.receiverLinkOpts...)
amqpReceiver, err := t.Session.NewReceiver(ctx, address, t.receiverOpts)
if err != nil {
return nil, err
}
t.Receiver = NewReceiver(amqpReceiver).(*receiver)
t.Receiver = NewReceiver(amqpReceiver, WithReceiveOptions(t.receiveOpts)).(*receiver)
return t, nil
}

// NewSenderProtocol creates a new sender amqp transport.
func NewSenderProtocol(server, address string, connOption []amqp.ConnOption, sessionOption []amqp.SessionOption, opts ...Option) (*Protocol, error) {
client, err := amqp.Dial(server, connOption...)
func NewSenderProtocol(ctx context.Context, server, address string, connOptions amqp.ConnOptions, sessionOptions amqp.SessionOptions, opts ...Option) (*Protocol, error) {
embano1 marked this conversation as resolved.
Show resolved Hide resolved
client, err := amqp.Dial(ctx, server, &connOptions)
if err != nil {
return nil, err
}

// Open a session
session, err := client.NewSession(sessionOption...)
session, err := client.NewSession(ctx, &sessionOptions)
if err != nil {
_ = client.Close()
return nil, err
}

p, err := NewSenderProtocolFromClient(client, session, address, opts...)
p, err := NewSenderProtocolFromClient(ctx, client, session, address, opts...)
if err != nil {
return nil, err
}
Expand All @@ -164,20 +181,20 @@ func NewSenderProtocol(server, address string, connOption []amqp.ConnOption, ses
}

// NewReceiverProtocol creates a new receiver amqp transport.
func NewReceiverProtocol(server, address string, connOption []amqp.ConnOption, sessionOption []amqp.SessionOption, opts ...Option) (*Protocol, error) {
client, err := amqp.Dial(server, connOption...)
func NewReceiverProtocol(ctx context.Context, server, address string, connOptions amqp.ConnOptions, sessionOptions amqp.SessionOptions, opts ...Option) (*Protocol, error) {
client, err := amqp.Dial(ctx, server, &connOptions)
if err != nil {
return nil, err
}

// Open a session
session, err := client.NewSession(sessionOption...)
session, err := client.NewSession(ctx, &sessionOptions)
if err != nil {
_ = client.Close()
return nil, err
}

p, err := NewReceiverProtocolFromClient(client, session, address, opts...)
p, err := NewReceiverProtocolFromClient(ctx, client, session, address, opts...)

if err != nil {
return nil, err
Expand Down
Loading