Skip to content

Commit

Permalink
reapply options pattern
Browse files Browse the repository at this point in the history
Signed-off-by: Jose Silva <[email protected]>
  • Loading branch information
cx-joses committed Jul 19, 2024
1 parent f7c544c commit d173978
Show file tree
Hide file tree
Showing 6 changed files with 128 additions and 48 deletions.
68 changes: 68 additions & 0 deletions protocol/amqp/v2/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
Copyright 2021 The CloudEvents Authors
SPDX-License-Identifier: Apache-2.0
*/

package amqp

import (
"github.com/Azure/go-amqp"
)

// Option is the function signature required to be considered an amqp.Option.
type Option func(*Protocol) error

// WithConnOpt sets a connection option for amqp
func WithConnOpts(opt *amqp.ConnOptions) Option {
return func(t *Protocol) error {
t.connOpts = opt
return nil
}
}

// WithConnSASLPlain sets SASLPlain connection option for amqp
func WithConnSASLPlain(opt *amqp.ConnOptions, username, password string) Option {
opt.SASLType = amqp.SASLTypePlain(username, password)
return WithConnOpts(opt)
}

// WithSessionOpt sets a session option for amqp
func WithSessionOpts(opt *amqp.SessionOptions) Option {
return func(t *Protocol) error {
t.sessionOpts = opt
return nil
}
}

// WithSenderLinkOption sets a link option for amqp
func WithSenderOpts(opt *amqp.SenderOptions) Option {
return func(t *Protocol) error {
t.senderOpts = opt
return nil
}
}

// WithReceiverLinkOption sets a link option for amqp
func WithReceiverOpts(opt *amqp.ReceiverOptions) Option {
return func(t *Protocol) error {
t.receiverOpts = opt
return nil
}
}

func WithReceiveOpts(opt amqp.ReceiveOptions) Option {
return func(t *Protocol) error {
t.receiveOpts = opt
return nil
}
}

func WithSendOpts(opt *amqp.SendOptions) Option {
return func(t *Protocol) error {
t.sendOpts = opt
return nil
}
}

// SenderOptionFunc is the type of amqp.Sender options
type SenderOptionFunc func(sender *sender)
61 changes: 42 additions & 19 deletions protocol/amqp/v2/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,13 @@ type Protocol struct {
ownedClient bool
Node string

connOpts *amqp.ConnOptions
sessionOpts *amqp.SessionOptions
senderOpts *amqp.SenderOptions
receiverOpts *amqp.ReceiverOptions
sendOpts *amqp.SendOptions
receiveOpts amqp.ReceiveOptions

// Sender
Sender *sender
SenderContextDecorators []func(context.Context) context.Context
Expand All @@ -35,30 +42,32 @@ func NewProtocolFromClient(
client *amqp.Conn,
session *amqp.Session,
queue string,
senderOptions amqp.SenderOptions,
receiverOptions amqp.ReceiverOptions,
opts ...Option,
) (*Protocol, error) {
t := &Protocol{
Node: queue,
Client: client,
Session: session,
}
if err := t.applyOptions(opts...); err != nil {
return nil, err
}

// Create a sender
amqpSender, err := session.NewSender(ctx, queue, &senderOptions)
amqpSender, err := session.NewSender(ctx, queue, t.senderOpts)
if err != nil {
_ = client.Close()
_ = session.Close(context.Background())
return nil, err
}
t.Sender = NewSender(amqpSender, &amqp.SendOptions{}).(*sender)
t.Sender = NewSender(amqpSender, t.sendOpts).(*sender)
t.SenderContextDecorators = []func(context.Context) context.Context{}

amqpReceiver, err := t.Session.NewReceiver(ctx, t.Node, &receiverOptions)
amqpReceiver, err := t.Session.NewReceiver(ctx, t.Node, t.receiverOpts)
if err != nil {
return nil, err
}
t.Receiver = NewReceiver(amqpReceiver, amqp.ReceiveOptions{}).(*receiver)
t.Receiver = NewReceiver(amqpReceiver, t.receiveOpts).(*receiver)
return t, nil
}

Expand All @@ -68,8 +77,7 @@ func NewProtocol(
server, queue string,
connOptions amqp.ConnOptions,
sessionOptions amqp.SessionOptions,
senderOptions amqp.SenderOptions,
receiverOptions amqp.ReceiverOptions,
opts ...Option,
) (*Protocol, error) {
client, err := amqp.Dial(ctx, server, &connOptions)
if err != nil {
Expand All @@ -83,7 +91,7 @@ func NewProtocol(
return nil, err
}

p, err := NewProtocolFromClient(ctx, client, session, queue, senderOptions, receiverOptions)
p, err := NewProtocolFromClient(ctx, client, session, queue, opts...)
if err != nil {
return nil, err
}
Expand All @@ -98,22 +106,25 @@ func NewSenderProtocolFromClient(
client *amqp.Conn,
session *amqp.Session,
address string,
senderOptions amqp.SenderOptions,
opts ...Option,
) (*Protocol, error) {
t := &Protocol{
Node: address,
Client: client,
Session: session,
}
if err := t.applyOptions(opts...); err != nil {
return nil, err
}

// Create a sender
amqpSender, err := session.NewSender(ctx, address, &senderOptions)
amqpSender, err := session.NewSender(ctx, address, t.senderOpts)
if err != nil {
_ = client.Close()
_ = session.Close(context.Background())
return nil, err
}
t.Sender = NewSender(amqpSender, &amqp.SendOptions{}).(*sender)
t.Sender = NewSender(amqpSender, t.sendOpts).(*sender)
t.SenderContextDecorators = []func(context.Context) context.Context{}

return t, nil
Expand All @@ -125,25 +136,28 @@ func NewReceiverProtocolFromClient(
client *amqp.Conn,
session *amqp.Session,
address string,
receiverOptions amqp.ReceiverOptions,
opts ...Option,
) (*Protocol, error) {
t := &Protocol{
Node: address,
Client: client,
Session: session,
}
if err := t.applyOptions(opts...); err != nil {
return nil, err
}

t.Node = address
amqpReceiver, err := t.Session.NewReceiver(ctx, address, &receiverOptions)
amqpReceiver, err := t.Session.NewReceiver(ctx, address, t.receiverOpts)
if err != nil {
return nil, err
}
t.Receiver = NewReceiver(amqpReceiver, amqp.ReceiveOptions{}).(*receiver)
t.Receiver = NewReceiver(amqpReceiver, t.receiveOpts).(*receiver)
return t, nil
}

// NewSenderProtocol creates a new sender amqp transport.
func NewSenderProtocol(ctx context.Context, server, address string, connOptions amqp.ConnOptions, sessionOptions amqp.SessionOptions, senderOptions amqp.SenderOptions) (*Protocol, error) {
func NewSenderProtocol(ctx context.Context, server, address string, connOptions amqp.ConnOptions, sessionOptions amqp.SessionOptions, opts ...Option) (*Protocol, error) {
client, err := amqp.Dial(ctx, server, &connOptions)
if err != nil {
return nil, err
Expand All @@ -156,7 +170,7 @@ func NewSenderProtocol(ctx context.Context, server, address string, connOptions
return nil, err
}

p, err := NewSenderProtocolFromClient(ctx, client, session, address, senderOptions)
p, err := NewSenderProtocolFromClient(ctx, client, session, address, opts...)
if err != nil {
return nil, err
}
Expand All @@ -166,7 +180,7 @@ func NewSenderProtocol(ctx context.Context, server, address string, connOptions
}

// NewReceiverProtocol creates a new receiver amqp transport.
func NewReceiverProtocol(ctx context.Context, server, address string, connOptions amqp.ConnOptions, sessionOptions amqp.SessionOptions, receiverOptions amqp.ReceiverOptions) (*Protocol, error) {
func NewReceiverProtocol(ctx context.Context, server, address string, connOptions amqp.ConnOptions, sessionOptions amqp.SessionOptions, opts ...Option) (*Protocol, error) {
client, err := amqp.Dial(ctx, server, &connOptions)
if err != nil {
return nil, err
Expand All @@ -179,7 +193,7 @@ func NewReceiverProtocol(ctx context.Context, server, address string, connOption
return nil, err
}

p, err := NewReceiverProtocolFromClient(ctx, client, session, address, receiverOptions)
p, err := NewReceiverProtocolFromClient(ctx, client, session, address, opts...)

if err != nil {
return nil, err
Expand All @@ -189,6 +203,15 @@ func NewReceiverProtocol(ctx context.Context, server, address string, connOption
return p, nil
}

func (t *Protocol) applyOptions(opts ...Option) error {
for _, fn := range opts {
if err := fn(t); err != nil {
return err
}
}
return nil
}

func (t *Protocol) Close(ctx context.Context) (err error) {
if t.ownedClient {
// Closing the client will close at cascade sender and receiver
Expand Down
4 changes: 1 addition & 3 deletions samples/amqp/receiver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"strings"

"github.com/Azure/go-amqp"

ceamqp "github.com/cloudevents/sdk-go/protocol/amqp/v2"
cloudevents "github.com/cloudevents/sdk-go/v2"
)
Expand Down Expand Up @@ -44,8 +43,7 @@ func sampleConfig() (server, node string, opts amqp.ConnOptions) {

func main() {
host, node, opts := sampleConfig()
p, err := ceamqp.NewProtocol(context.Background(), host, node, opts, amqp.SessionOptions{},
amqp.SenderOptions{}, amqp.ReceiverOptions{})
p, err := ceamqp.NewProtocol(context.Background(), host, node, opts, amqp.SessionOptions{})
if err != nil {
log.Fatalf("Failed to create AMQP protocol: %v", err)
}
Expand Down
3 changes: 1 addition & 2 deletions samples/amqp/sender/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,7 @@ type Example struct {

func main() {
host, node, opts := sampleConfig()
p, err := ceamqp.NewProtocol(context.Background(), host, node, opts, amqp.SessionOptions{}, amqp.SenderOptions{},
amqp.ReceiverOptions{})
p, err := ceamqp.NewProtocol(context.Background(), host, node, opts, amqp.SessionOptions{})
if err != nil {
log.Fatalf("Failed to create amqp protocol: %v", err)
}
Expand Down
22 changes: 9 additions & 13 deletions test/integration/amqp/amqp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,18 +45,18 @@ func TestSenderReceiverEvent(t *testing.T) {
}

func senderProtocolFactory(t *testing.T) *protocolamqp.Protocol {
c, ss, a, so, _ := testClient(t)
c, ss, a := testClient(t)

p, err := protocolamqp.NewSenderProtocolFromClient(context.Background(), c, ss, a, so)
p, err := protocolamqp.NewSenderProtocolFromClient(context.Background(), c, ss, a)
require.NoError(t, err)

return p
}

func receiverProtocolFactory(t *testing.T) *protocolamqp.Protocol {
c, ss, a, _, ro := testClient(t)
c, ss, a := testClient(t)

p, err := protocolamqp.NewReceiverProtocolFromClient(context.Background(), c, ss, a, ro)
p, err := protocolamqp.NewReceiverProtocolFromClient(context.Background(), c, ss, a)
require.NoError(t, err)

return p
Expand All @@ -69,8 +69,7 @@ func receiverProtocolFactory(t *testing.T) *protocolamqp.Protocol {
// On option is http://qpid.apache.org/components/dispatch-router/indexthtml.
// It can be installed from source or from RPMs, see https://qpid.apache.org/packages.html
// Run `qdrouterd` and the tests will work with no further config.
func testClient(t *testing.T) (client *amqp.Conn, session *amqp.Session, addr string,
senderOpts amqp.SenderOptions, receiverOpts amqp.ReceiverOptions) {
func testClient(t *testing.T) (client *amqp.Conn, session *amqp.Session, addr string) {
t.Helper()
addr = "test"
s := os.Getenv("TEST_AMQP_URL")
Expand All @@ -83,17 +82,14 @@ func testClient(t *testing.T) (client *amqp.Conn, session *amqp.Session, addr st
}
session, err = client.NewSession(context.Background(), &amqp.SessionOptions{})
require.NoError(t, err)
senderOpts = amqp.SenderOptions{}
require.NotNil(t, senderOpts)
receiverOpts = amqp.ReceiverOptions{}
require.NotNil(t, receiverOpts)
return client, session, addr, senderOpts, receiverOpts

return client, session, addr
}

func protocolFactory(t *testing.T) *protocolamqp.Protocol {
c, ss, a, so, ro := testClient(t)
c, ss, a := testClient(t)

p, err := protocolamqp.NewProtocolFromClient(context.Background(), c, ss, a, so, ro)
p, err := protocolamqp.NewProtocolFromClient(context.Background(), c, ss, a)
require.NoError(t, err)

return p
Expand Down
18 changes: 7 additions & 11 deletions test/integration/amqp_binding/amqp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,7 @@ func TestSendEventReceiveBinary(t *testing.T) {
// On option is http://qpid.apache.org/components/dispatch-router/indexthtml.
// It can be installed from source or from RPMs, see https://qpid.apache.org/packages.html
// Run `qdrouterd` and the tests will work with no further config.
func testClient(t testing.TB) (client *amqp.Conn, session *amqp.Session, addr string,
senderOpts *amqp.SenderOptions, receiverOpts *amqp.ReceiverOptions) {
func testClient(t testing.TB) (client *amqp.Conn, session *amqp.Session, addr string) {
t.Helper()
addr = "test"
s := os.Getenv("TEST_AMQP_URL")
Expand All @@ -104,21 +103,18 @@ func testClient(t testing.TB) (client *amqp.Conn, session *amqp.Session, addr st
}
session, err = client.NewSession(context.Background(), &amqp.SessionOptions{})
require.NoError(t, err)
senderOpts = &amqp.SenderOptions{}
require.NotNil(t, senderOpts)
receiverOpts = &amqp.ReceiverOptions{}
require.NotNil(t, receiverOpts)
return client, session, addr, senderOpts, receiverOpts

return client, session, addr

}

func testSenderReceiver(t testing.TB) (io.Closer, bindings.Sender, bindings.Receiver) {
c, ss, a, so, ro := testClient(t)
r, err := ss.NewReceiver(context.Background(), a, ro)
c, ss, a := testClient(t)
r, err := ss.NewReceiver(context.Background(), a, nil)
require.NoError(t, err)
s, err := ss.NewSender(context.Background(), a, so)
s, err := ss.NewSender(context.Background(), a, nil)
require.NoError(t, err)
return c, protocolamqp.NewSender(s, &amqp.SendOptions{}), protocolamqp.NewReceiver(r, amqp.ReceiveOptions{})
return c, protocolamqp.NewSender(s, nil), protocolamqp.NewReceiver(r, amqp.ReceiveOptions{})
}

func BenchmarkSendReceive(b *testing.B) {
Expand Down

0 comments on commit d173978

Please sign in to comment.