Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wip #3325

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft

wip #3325

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apis/v1beta1/collector_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (c CollectorWebhook) Default(_ context.Context, obj runtime.Object) error {
if len(otelcol.Spec.ManagementState) == 0 {
otelcol.Spec.ManagementState = ManagementStateManaged
}
return nil
return otelcol.Spec.Config.ApplyDefaults(c.logger)
}

func (c CollectorWebhook) ValidateCreate(ctx context.Context, obj runtime.Object) (admission.Warnings, error) {
Expand Down
38 changes: 38 additions & 0 deletions apis/v1beta1/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,40 @@ func (c *Config) getPortsForComponentKinds(logger logr.Logger, componentKinds ..
return ports, nil
}

// getPortsForComponentKinds gets the ports for the given ComponentKind(s).
func (c *Config) applyDefaultForComponentKinds(logger logr.Logger, componentKinds ...ComponentKind) error {
enabledComponents := c.GetEnabledComponents()
for _, componentKind := range componentKinds {
var retriever components.ParserRetriever
var cfg AnyConfig
switch componentKind {
case KindReceiver:
retriever = receivers.ReceiverFor
cfg = c.Receivers
case KindExporter:
continue
case KindProcessor:
continue
case KindExtension:
continue
}
for componentName := range enabledComponents[componentKind] {
parser := retriever(componentName)
if newCfg, err := parser.GetDefaultConfig(logger, cfg.Object[componentName]); err != nil {
return err
} else {
cc, ok := newCfg.(map[string]interface{})
if !ok {
return fmt.Errorf("could not apply defaults to receiver: %s", componentName)
}
cfg.Object[componentName] = cc
}
}
}

return nil
}

func (c *Config) GetReceiverPorts(logger logr.Logger) ([]corev1.ServicePort, error) {
return c.getPortsForComponentKinds(logger, KindReceiver)
}
Expand All @@ -241,6 +275,10 @@ func (c *Config) GetAllRbacRules(logger logr.Logger) ([]rbacv1.PolicyRule, error
return c.getRbacRulesForComponentKinds(logger, KindReceiver, KindExporter, KindProcessor)
}

func (c *Config) ApplyDefaults(logger logr.Logger) error {
return c.applyDefaultForComponentKinds(logger, KindReceiver)
}

// GetLivenessProbe gets the first enabled liveness probe. There should only ever be one extension enabled
// that provides the hinting for the liveness probe.
func (c *Config) GetLivenessProbe(logger logr.Logger) (*corev1.Probe, error) {
Expand Down
46 changes: 30 additions & 16 deletions internal/components/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,18 @@ import (
type ParserOption[ComponentConfigType any] func(*Settings[ComponentConfigType])

type Settings[ComponentConfigType any] struct {
protocol corev1.Protocol
appProtocol *string
targetPort intstr.IntOrString
nodePort int32
name string
port int32
portParser PortParser[ComponentConfigType]
rbacGen RBACRuleGenerator[ComponentConfigType]
livenessGen ProbeGenerator[ComponentConfigType]
readinessGen ProbeGenerator[ComponentConfigType]
protocol corev1.Protocol
appProtocol *string
targetPort intstr.IntOrString
nodePort int32
name string
port int32
defaultRecAddr string
portParser PortParser[ComponentConfigType]
rbacGen RBACRuleGenerator[ComponentConfigType]
livenessGen ProbeGenerator[ComponentConfigType]
readinessGen ProbeGenerator[ComponentConfigType]
defaultsApplier Defaulter[ComponentConfigType]
}

func NewEmptySettings[ComponentConfigType any]() *Settings[ComponentConfigType] {
Expand Down Expand Up @@ -75,6 +77,11 @@ func (b Builder[ComponentConfigType]) WithAppProtocol(appProtocol *string) Build
o.appProtocol = appProtocol
})
}
func (b Builder[ComponentConfigType]) WithDefaultRecAddress(defaultRecAddr string) Builder[ComponentConfigType] {
return append(b, func(o *Settings[ComponentConfigType]) {
o.defaultRecAddr = defaultRecAddr
})
}
func (b Builder[ComponentConfigType]) WithTargetPort(targetPort int32) Builder[ComponentConfigType] {
return append(b, func(o *Settings[ComponentConfigType]) {
o.targetPort = intstr.FromInt32(targetPort)
Expand Down Expand Up @@ -118,19 +125,26 @@ func (b Builder[ComponentConfigType]) WithReadinessGen(readinessGen ProbeGenerat
})
}

func (b Builder[ComponentConfigType]) WithDefaultsApplier(defaultsApplier Defaulter[ComponentConfigType]) Builder[ComponentConfigType] {
return append(b, func(o *Settings[ComponentConfigType]) {
o.defaultsApplier = defaultsApplier
})
}

func (b Builder[ComponentConfigType]) Build() (*GenericParser[ComponentConfigType], error) {
o := NewEmptySettings[ComponentConfigType]()
o.Apply(b...)
if len(o.name) == 0 {
return nil, fmt.Errorf("invalid settings struct, no name specified")
}
return &GenericParser[ComponentConfigType]{
name: o.name,
portParser: o.portParser,
rbacGen: o.rbacGen,
livenessGen: o.livenessGen,
readinessGen: o.readinessGen,
settings: o,
name: o.name,
portParser: o.portParser,
rbacGen: o.rbacGen,
livenessGen: o.livenessGen,
readinessGen: o.readinessGen,
defaultsApplier: o.defaultsApplier,
settings: o,
}, nil
}

Expand Down
7 changes: 7 additions & 0 deletions internal/components/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ type RBACRuleGenerator[ComponentConfigType any] func(logger logr.Logger, config
// It's expected that type Config is the configuration used by a parser.
type ProbeGenerator[ComponentConfigType any] func(logger logr.Logger, config ComponentConfigType) (*corev1.Probe, error)

// Defaulter is a function that applies given defaults to the passed Config.
// It's expected that type Config is the configuration used by a parser.
type Defaulter[ComponentConfigType any] func(logger logr.Logger, defaultRecAddr string, config ComponentConfigType) (ComponentConfigType, error)

// ComponentType returns the type for a given component name.
// components have a name like:
// - mycomponent/custom
Expand Down Expand Up @@ -87,6 +91,9 @@ func PortFromEndpoint(endpoint string) (int32, error) {
type ParserRetriever func(string) Parser

type Parser interface {
// GetDefaultConfig .. TODO
GetDefaultConfig(logger logr.Logger, config interface{}) (interface{}, error)

// Ports returns the service ports parsed based on the component's configuration where name is the component's name
// of the form "name" or "type/name"
Ports(logger logr.Logger, name string, config interface{}) ([]corev1.ServicePort, error)
Expand Down
29 changes: 23 additions & 6 deletions internal/components/generic_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,29 @@ var (
// GenericParser serves as scaffolding for custom parsing logic by isolating
// functionality to idempotent functions.
type GenericParser[T any] struct {
name string
settings *Settings[T]
portParser PortParser[T]
rbacGen RBACRuleGenerator[T]
livenessGen ProbeGenerator[T]
readinessGen ProbeGenerator[T]
name string
settings *Settings[T]
portParser PortParser[T]
rbacGen RBACRuleGenerator[T]
livenessGen ProbeGenerator[T]
readinessGen ProbeGenerator[T]
defaultsApplier Defaulter[T]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

youll need to set a functionthat matches the signature here for the single endpoint config.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That should be here:

func NewSinglePortParserBuilder(name string, port int32) Builder[*SingleEndpointConfig] {
	return NewBuilder[*SingleEndpointConfig]().WithPort(port).WithName(name).WithPortParser(ParseSingleEndpoint).WithDefaultsApplier(AddressDefaulter)
}

func NewSilentSinglePortParserBuilder(name string, port int32) Builder[*SingleEndpointConfig] {
	return NewBuilder[*SingleEndpointConfig]().WithPort(port).WithName(name).WithPortParser(ParseSingleEndpointSilent).WithDefaultsApplier(AddressDefaulter)
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah 👍

}

func (g *GenericParser[T]) GetDefaultConfig(logger logr.Logger, config interface{}) (interface{}, error) {
if g.settings == nil || g.defaultsApplier == nil {
return config, nil
}

if g.settings.defaultRecAddr == "" {
return config, nil
}

var parsed T
if err := mapstructure.Decode(config, &parsed); err != nil {
return nil, err
}
return g.defaultsApplier(logger, g.settings.defaultRecAddr, parsed)
}

func (g *GenericParser[T]) GetLivenessProbe(logger logr.Logger, config interface{}) (*corev1.Probe, error) {
Expand Down
35 changes: 34 additions & 1 deletion internal/components/multi_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,27 @@ func (m *MultiPortReceiver) ParserName() string {
return fmt.Sprintf("__%s", m.name)
}

func (m *MultiPortReceiver) GetDefaultConfig(logger logr.Logger, config interface{}) (interface{}, error) {
multiProtoEndpointCfg := &MultiProtocolEndpointConfig{}
if err := mapstructure.Decode(config, multiProtoEndpointCfg); err != nil {
return nil, err
}
tmp := make(map[string]*SingleEndpointConfig, len(multiProtoEndpointCfg.Protocols))
for protocol, ec := range multiProtoEndpointCfg.Protocols {
// TODO: Should we take default receiver address from settngs?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes you can just take it from there

res, err := AddressDefaulter(logger, "0.0.0.0", ec)
if err != nil {
return nil, err
}
tmp[protocol] = res
}

for protocol, ec := range tmp {
multiProtoEndpointCfg.Protocols[protocol] = ec
}
return config, mapstructure.Decode(multiProtoEndpointCfg, &config)

}
func (m *MultiPortReceiver) GetLivenessProbe(logger logr.Logger, config interface{}) (*corev1.Probe, error) {
return nil, nil
}
Expand All @@ -91,7 +112,7 @@ func NewMultiPortReceiverBuilder(name string) MultiPortBuilder[*MultiProtocolEnd
}

func NewProtocolBuilder(name string, port int32) Builder[*MultiProtocolEndpointConfig] {
return NewBuilder[*MultiProtocolEndpointConfig]().WithName(name).WithPort(port)
return NewBuilder[*MultiProtocolEndpointConfig]().WithName(name).WithPort(port).WithDefaultsApplier(MultiAddressDefaulter)
}

func (mp MultiPortBuilder[ComponentConfigType]) AddPortMapping(builder Builder[ComponentConfigType]) MultiPortBuilder[ComponentConfigType] {
Expand Down Expand Up @@ -123,3 +144,15 @@ func (mp MultiPortBuilder[ComponentConfigType]) MustBuild() *MultiPortReceiver {
return p
}
}

// MultiAddressDefaulter ...
func MultiAddressDefaulter(logger logr.Logger, defaultRecAddr string, config *MultiProtocolEndpointConfig) (*MultiProtocolEndpointConfig, error) {
for protocol, ec := range config.Protocols {
res, err := AddressDefaulter(logger, defaultRecAddr, ec)
if err != nil {
return nil, err
}
config.Protocols[protocol].Endpoint = res.Endpoint
}
return config, nil
}
2 changes: 2 additions & 0 deletions internal/components/receivers/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,11 @@ var (
components.NewMultiPortReceiverBuilder("otlp").
AddPortMapping(components.NewProtocolBuilder("grpc", 4317).
WithAppProtocol(&components.GrpcProtocol).
WithDefaultRecAddress("0.0.0.0").
WithTargetPort(4317)).
AddPortMapping(components.NewProtocolBuilder("http", 4318).
WithAppProtocol(&components.HttpProtocol).
WithDefaultRecAddress("0.0.0.0").
WithTargetPort(4318)).
MustBuild(),
components.NewMultiPortReceiverBuilder("skywalking").
Expand Down
28 changes: 26 additions & 2 deletions internal/components/single_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
package components

import (
"fmt"
"strings"

"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"

Expand Down Expand Up @@ -80,9 +83,30 @@ func internalParseSingleEndpoint(logger logr.Logger, name string, failSilently b
}

func NewSinglePortParserBuilder(name string, port int32) Builder[*SingleEndpointConfig] {
return NewBuilder[*SingleEndpointConfig]().WithPort(port).WithName(name).WithPortParser(ParseSingleEndpoint)
return NewBuilder[*SingleEndpointConfig]().WithPort(port).WithName(name).WithPortParser(ParseSingleEndpoint).WithDefaultsApplier(AddressDefaulter)
}

func NewSilentSinglePortParserBuilder(name string, port int32) Builder[*SingleEndpointConfig] {
return NewBuilder[*SingleEndpointConfig]().WithPort(port).WithName(name).WithPortParser(ParseSingleEndpointSilent)
return NewBuilder[*SingleEndpointConfig]().WithPort(port).WithName(name).WithPortParser(ParseSingleEndpointSilent).WithDefaultsApplier(AddressDefaulter)
}

// AddressDefaulter ...
func AddressDefaulter(logger logr.Logger, defaultRecAddr string, config *SingleEndpointConfig) (*SingleEndpointConfig, error) {
if config == nil {
config = &SingleEndpointConfig{}
}
if config.Endpoint == "" {
config.Endpoint = defaultRecAddr
return config, nil
}

v := strings.Split(config.Endpoint, ":")
if len(v) < 2 {
return config, nil
}

if v[0] == "" {
config.Endpoint = fmt.Sprintf("%s:%s", defaultRecAddr, v[1])
}
return config, nil
}
Loading