Skip to content

Commit

Permalink
Make X-Ray telemetry opt-in (#15)
Browse files Browse the repository at this point in the history
  • Loading branch information
dominikschubert authored Mar 21, 2023
1 parent bede0a0 commit f65b6ad
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 25 deletions.
35 changes: 19 additions & 16 deletions cmd/localstack/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,18 @@ import (
)

type LsOpts struct {
InteropPort string
RuntimeEndpoint string
RuntimeId string
InitTracingPort string
User string
CodeArchives string
HotReloadingPaths []string
EnableDnsServer string
LocalstackIP string
InitLogLevel string
EdgePort string
InteropPort string
RuntimeEndpoint string
RuntimeId string
InitTracingPort string
User string
CodeArchives string
HotReloadingPaths []string
EnableDnsServer string
LocalstackIP string
InitLogLevel string
EdgePort string
EnableXRayTelemetry string
}

func GetEnvOrDie(env string) string {
Expand All @@ -48,10 +49,11 @@ func InitLsOpts() *LsOpts {
InitLogLevel: GetenvWithDefault("LOCALSTACK_INIT_LOG_LEVEL", "debug"),
EdgePort: GetenvWithDefault("EDGE_PORT", "4566"),
// optional or empty
CodeArchives: os.Getenv("LOCALSTACK_CODE_ARCHIVES"),
HotReloadingPaths: strings.Split(GetenvWithDefault("LOCALSTACK_HOT_RELOADING_PATHS", ""), ","),
EnableDnsServer: os.Getenv("LOCALSTACK_ENABLE_DNS_SERVER"),
LocalstackIP: os.Getenv("LOCALSTACK_HOSTNAME"),
CodeArchives: os.Getenv("LOCALSTACK_CODE_ARCHIVES"),
HotReloadingPaths: strings.Split(GetenvWithDefault("LOCALSTACK_HOT_RELOADING_PATHS", ""), ","),
EnableDnsServer: os.Getenv("LOCALSTACK_ENABLE_DNS_SERVER"),
EnableXRayTelemetry: os.Getenv("LOCALSTACK_ENABLE_XRAY_TELEMETRY"),
LocalstackIP: os.Getenv("LOCALSTACK_HOSTNAME"),
}
}

Expand All @@ -67,6 +69,7 @@ func UnsetLsEnvs() {
"LOCALSTACK_CODE_ARCHIVES",
"LOCALSTACK_HOT_RELOADING_PATHS",
"LOCALSTACK_ENABLE_DNS_SERVER",
"LOCALSTACK_ENABLE_XRAY_TELEMETRY",
"LOCALSTACK_INIT_LOG_LEVEL",
// Docker container ID
"HOSTNAME",
Expand Down Expand Up @@ -146,7 +149,7 @@ func main() {

// xray daemon
xrayConfig := initConfig("http://" + lsOpts.LocalstackIP + ":" + lsOpts.EdgePort)
d := initDaemon(xrayConfig)
d := initDaemon(xrayConfig, lsOpts.EnableXRayTelemetry == "1")
sandbox.AddShutdownFunc(func() {
log.Debugln("Shutting down xray daemon")
d.stop()
Expand Down
29 changes: 20 additions & 9 deletions cmd/localstack/xraydaemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func initConfig(endpoint string) *cfg.Config {
return xrayConfig
}

func initDaemon(config *cfg.Config) *Daemon {
func initDaemon(config *cfg.Config, enableTelemetry bool) *Daemon {
if logFile != "" {
var fileWriter io.Writer
if *config.Logging.LogRotation {
Expand Down Expand Up @@ -133,8 +133,9 @@ func initDaemon(config *cfg.Config) *Daemon {
awsConfig, session := conn.GetAWSConfigSession(&conn.Conn{}, config, config.RoleARN, config.Region, noMetadata)
log.Infof("Using region: %v", aws.StringValue(awsConfig.Region))

log.Debugf("ARN of the AWS resource running the daemon: %v", config.ResourceARN)
telemetry.Init(awsConfig, session, config.ResourceARN, noMetadata)
if enableTelemetry {
telemetry.Init(awsConfig, session, config.ResourceARN, noMetadata)
}

// If calculated number of buffer is lower than our default, use calculated one. Otherwise, use default value.
parameterConfig.Processor.BatchSize = util.GetMinIntValue(parameterConfig.Processor.BatchSize, buffers)
Expand Down Expand Up @@ -179,10 +180,14 @@ func (d *Daemon) close() {
// Signal routines to finish
// This will push telemetry and customer segments in parallel
d.std.Close()
telemetry.T.Quit <- true
if telemetry.T != nil {
telemetry.T.Quit <- true
}

<-d.processor.Done
<-telemetry.T.Done
if telemetry.T != nil {
<-telemetry.T.Done
}

log.Debugf("Trace segment: received: %d, truncated: %d, processed: %d", atomic.LoadUint64(&d.count), d.std.TruncatedCount(), d.processor.ProcessedCount())
log.Debugf("Shutdown finished. Current epoch in nanoseconds: %v", time.Now().UnixNano())
Expand Down Expand Up @@ -226,7 +231,7 @@ func (d *Daemon) poll() {
fallbackPointerUsed = true
}
rlen := d.read(bufPointer)
if rlen > 0 {
if rlen > 0 && telemetry.T != nil {
telemetry.T.SegmentReceived(1)
}
if rlen == 0 {
Expand All @@ -237,7 +242,9 @@ func (d *Daemon) poll() {
}
if fallbackPointerUsed {
log.Warn("Segment dropped. Consider increasing memory limit")
telemetry.T.SegmentSpillover(1)
if telemetry.T != nil {
telemetry.T.SegmentSpillover(1)
}
continue
} else if rlen == -1 {
return
Expand All @@ -250,7 +257,9 @@ func (d *Daemon) poll() {
if len(slices[1]) == 0 {
log.Warnf("Missing header or segment: %s", string(slices[0]))
d.pool.Return(bufPointer)
telemetry.T.SegmentRejected(1)
if telemetry.T != nil {
telemetry.T.SegmentRejected(1)
}
continue
}

Expand All @@ -264,7 +273,9 @@ func (d *Daemon) poll() {
default:
log.Warnf("Invalid header: %s", string(header))
d.pool.Return(bufPointer)
telemetry.T.SegmentRejected(1)
if telemetry.T != nil {
telemetry.T.SegmentRejected(1)
}
continue
}

Expand Down

0 comments on commit f65b6ad

Please sign in to comment.