Skip to content

Commit

Permalink
test changes
Browse files Browse the repository at this point in the history
  • Loading branch information
nhulston committed Nov 8, 2024
1 parent b6f241a commit e133343
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 52 deletions.
46 changes: 10 additions & 36 deletions internal/extension/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,18 +86,13 @@ func BuildExtensionManager(isUniversalInstrumentation bool) *ExtensionManager {
}

func (em *ExtensionManager) checkAgentRunning() {
fmt.Printf("[DEBUG] checkAgentRunning()\n")
if _, err := os.Stat(em.extensionPath); err != nil {
logger.Debug("Will use the API")
fmt.Printf("[DEBUG] Extension is not running\n")
em.isExtensionRunning = false
} else {
fmt.Printf("[DEBUG] Extension is running\n")
logger.Debug("Will use the Serverless Agent")
em.isExtensionRunning = true

fmt.Printf("[DEBUG] universal instrumentation: %t\n", em.isUniversalInstrumentation)

// Tell the extension not to create an execution span if universal instrumentation is disabled
if !em.isUniversalInstrumentation {
req, _ := http.NewRequest(http.MethodGet, em.helloRoute, nil)
Expand All @@ -112,12 +107,10 @@ func (em *ExtensionManager) checkAgentRunning() {
}

func (em *ExtensionManager) SendStartInvocationRequest(ctx context.Context, eventPayload json.RawMessage) context.Context {
fmt.Println("[DEBUG] SendStartInvocationRequest()")
body := bytes.NewBuffer(eventPayload)
req, _ := http.NewRequest(http.MethodPost, em.startInvocationUrl, body)

if response, err := em.httpClient.Do(req); err == nil && response.StatusCode == 200 {
fmt.Printf("[DEBUG] SendStartInvocationRequest succeeded\n")
// Propagate dd-trace context from the extension response if found in the response headers
traceId := response.Header.Get(string(DdTraceId))
if traceId != "" {
Expand All @@ -131,7 +124,6 @@ func (em *ExtensionManager) SendStartInvocationRequest(ctx context.Context, even
ctx = context.WithValue(ctx, DdParentId, parentId)
}
samplingPriority := response.Header.Get(string(DdSamplingPriority))
fmt.Printf("[DEBUG] [extension.go] [SendStartInvocationRequest()] Sampling priority: %s\n", samplingPriority)
if samplingPriority != "" {
ctx = context.WithValue(ctx, DdSamplingPriority, samplingPriority)
}
Expand All @@ -147,7 +139,6 @@ func (em *ExtensionManager) SendEndInvocationRequest(ctx context.Context, functi
content = []byte("{}")
}
body := bytes.NewBuffer(content)
fmt.Printf("[DEBUG lambda] extension.go Sending end request with body: %s\n", string(content))
req, _ := http.NewRequest(http.MethodPost, em.endInvocationUrl, body)

// Mark the invocation as an error if any
Expand All @@ -159,10 +150,8 @@ func (em *ExtensionManager) SendEndInvocationRequest(ctx context.Context, functi
}

// Extract the DD trace context and pass them to the extension via request headers
fmt.Printf("[DEBUG lambda] extension.go extracting trace context and passing to extension via headers.\n")
traceId, ok := ctx.Value(DdTraceId).(string)
if ok {
fmt.Printf("[DEBUG lambda] extension.go ok\n")
req.Header.Set(string(DdTraceId), traceId)
if parentId, ok := ctx.Value(DdParentId).(string); ok {
req.Header.Set(string(DdParentId), parentId)
Expand All @@ -171,50 +160,35 @@ func (em *ExtensionManager) SendEndInvocationRequest(ctx context.Context, functi
req.Header.Set(string(DdSpanId), spanId)
}
if samplingPriority, ok := ctx.Value(DdSamplingPriority).(string); ok {
fmt.Printf("[DEBUG] [extension.go] [SendEndInvocationRequest()] Sampling priority: %s\n", samplingPriority)
req.Header.Set(string(DdSamplingPriority), samplingPriority)
}
} else {
fmt.Printf("[DEBUG lambda] extension.go only sending trace id and span id.\n")
req.Header.Set(string(DdTraceId), fmt.Sprint(functionExecutionSpan.Context().TraceID()))
req.Header.Set(string(DdSpanId), fmt.Sprint(functionExecutionSpan.Context().SpanID()))
if contextKey, ok := ctx.Value("internal.contextKey").(string); ok {
fmt.Printf("[DEBUG lambda]: contextKey%s\n", contextKey)
}

// Send sampling priority
priority := getSamplingPriority(functionExecutionSpan.Context())
fmt.Printf("[DEBUG lambda copied from agent]: priority: %s\n", priority)
fmt.Printf("[DEBUG] [extension.go] [SendEndInvocationRequest()] Sampling priority: %d\n", priority)
if priority != nil {
req.Header.Set(string(DdSamplingPriority), fmt.Sprint(*priority))
}
}
fmt.Printf("[DEBUG lambda] full extension.go context: %+v\n", ctx)

//fmt.Printf("[DEBUG lambda] sampling priority: %s\n", ctx.Value(DdSamplingPriority).(string))
//if val := ctx.Value(trace.traceContextKey); val != nil {
// if traceCtx, ok := val.(trace.TraceContext); ok {
// fmt.Printf("[DEBUG lambda] trace context: %+v\n", traceCtx)
// if samplingPriority, exists := traceCtx["_sampling_priority_v1"]; exists {
// fmt.Printf("Sampling priority: %s\n", samplingPriority)
// }
// }
//}

resp, err := em.httpClient.Do(req)
if err != nil || resp.StatusCode != 200 {
logger.Error(fmt.Errorf("could not send end invocation payload to the extension: %v", err))
}
}

func getSamplingPriority(sc ddtrace.SpanContext) int {
// Default priority (matching -128 from the agent code)
priority := -128

func getSamplingPriority(sc ddtrace.SpanContext) *int {
// Check if the context implements SamplingPriority method
if pc, ok := sc.(interface{ SamplingPriority() (int, bool) }); ok && pc != nil {
if p, ok := pc.SamplingPriority(); ok {
priority = p
fmt.Printf("[DEBUG] [extension.go] Found sampling priority: %d\n", priority)
return priority
priority := p
return &priority
}
}
return priority
return nil
}

// defaultStackLength specifies the default maximum size of a stack trace.
Expand Down
16 changes: 0 additions & 16 deletions internal/trace/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,6 @@ var DefaultTraceExtractor = getHeadersFromEventHeaders
// contextWithRootTraceContext uses the incoming event and context object payloads to determine
// the root TraceContext and then adds that TraceContext to the context object.
func contextWithRootTraceContext(ctx context.Context, ev json.RawMessage, mergeXrayTraces bool, extractor ContextExtractor) (context.Context, error) {
fmt.Printf("[DEBUG] [context.go] [contextWithRootTraceContext()] full context: %+v\n", ctx)
fmt.Printf("[DEBUG] [context.go] [contextWithRootTraceContext()] ev: %s\n", string(ev))
datadogTraceContext, gotDatadogTraceContext := getTraceContext(ctx, extractor(ctx, ev))

xrayTraceContext, errGettingXrayContext := convertXrayTraceContextFromLambdaContext(ctx)
Expand Down Expand Up @@ -78,7 +76,6 @@ func contextWithRootTraceContext(ctx context.Context, ev json.RawMessage, mergeX
mergedTraceContext := TraceContext{}
mergedTraceContext[traceIDHeader] = datadogTraceContext[traceIDHeader]
mergedTraceContext[samplingPriorityHeader] = datadogTraceContext[samplingPriorityHeader]
fmt.Printf("[DEBUG] [context.go] [contextWithRootTraceContext()] Sampling priority: %s\n", datadogTraceContext[samplingPriorityHeader])
mergedTraceContext[parentIDHeader] = xrayTraceContext[parentIDHeader]
return context.WithValue(ctx, traceContextKey, mergedTraceContext), nil
}
Expand All @@ -100,7 +97,6 @@ func ConvertCurrentXrayTraceContext(ctx context.Context) TraceContext {
newTraceContext := map[string]string{}
newTraceContext[traceIDHeader] = xrayTraceContext[traceIDHeader]
newTraceContext[samplingPriorityHeader] = xrayTraceContext[samplingPriorityHeader]
fmt.Printf("[DEBUG] [context.go] [ConvertCurrentXrayTraceContext()] Sampling priority: %s\n", xrayTraceContext[samplingPriorityHeader])
newTraceContext[parentIDHeader] = parentID

return newTraceContext
Expand All @@ -117,7 +113,6 @@ func createDummySubsegmentForXrayConverter(ctx context.Context, traceCtx TraceCo
traceID := traceCtx[traceIDHeader]
parentID := traceCtx[parentIDHeader]
sampled := traceCtx[samplingPriorityHeader]
fmt.Printf("[DEBUG] [context.go] [createDummySubsegmentForXrayConverter()] Sampling priority: %s\n", sampled)
metadata := map[string]string{
"trace-id": traceID,
"parent-id": parentID,
Expand All @@ -133,15 +128,12 @@ func createDummySubsegmentForXrayConverter(ctx context.Context, traceCtx TraceCo
}

func getTraceContext(ctx context.Context, headers map[string]string) (TraceContext, bool) {
fmt.Printf("[DEBUG] [context.go] [getTraceContext()] headers: %v\n", headers)
fmt.Printf("[DEBUG] [context.go] [getTraceContext()] context: %+v\n", ctx)
tc := TraceContext{}

traceID := headers[traceIDHeader]
if traceID == "" {
if val, ok := ctx.Value(extension.DdTraceId).(string); ok {
traceID = val
fmt.Printf("[DEBUG] [context.go] [getTraceContext()] setting traceID: %s\n", traceID)
}
}
if traceID == "" {
Expand All @@ -159,16 +151,13 @@ func getTraceContext(ctx context.Context, headers map[string]string) (TraceConte
}

samplingPriority := headers[samplingPriorityHeader]
fmt.Printf("[DEBUG] [context.go] [getTraceContext() 1] Sampling priority: %s\n", samplingPriority)
if samplingPriority == "" {
if val, ok := ctx.Value(extension.DdSamplingPriority).(string); ok {
samplingPriority = val
fmt.Printf("[DEBUG] [context.go] [getTraceContext() 2] Sampling priority: %s\n", samplingPriority)
}
}
if samplingPriority == "" {
samplingPriority = "1" //sampler-keep
fmt.Printf("[DEBUG] [context.go] [getTraceContext() 3] Sampling priority empty, keeping.\n")
}

tc[samplingPriorityHeader] = samplingPriority
Expand All @@ -182,7 +171,6 @@ func getTraceContext(ctx context.Context, headers map[string]string) (TraceConte
// and creates a dummy X-Ray subsegment containing this information.
// This is used as the DefaultTraceExtractor.
func getHeadersFromEventHeaders(ctx context.Context, ev json.RawMessage) map[string]string {
fmt.Printf("[DEBUG] [context.go] [getHeadersFromEventHeaders()]\n")
eh := eventWithHeaders{}

headers := map[string]string{}
Expand All @@ -195,7 +183,6 @@ func getHeadersFromEventHeaders(ctx context.Context, ev json.RawMessage) map[str
lowercaseHeaders := map[string]string{}
for k, v := range eh.Headers {
lowercaseHeaders[strings.ToLower(k)] = v
fmt.Printf("[DEBUG lambda] context.go: [%s, %s]\n", strings.ToLower(k), v)
}

return lowercaseHeaders
Expand All @@ -218,7 +205,6 @@ func convertXrayTraceContextFromLambdaContext(ctx context.Context) (TraceContext
return traceCtx, fmt.Errorf("Couldn't read parent id from X-Ray: %v", err)
}
samplingPriority := convertXRaySamplingDecision(header.SamplingDecision)
fmt.Printf("[DEBUG] [context.go] [convertXrayTraceContextFromLambdaContext()] Sampling priority: %s\n", samplingPriority)

traceCtx[traceIDHeader] = traceID
traceCtx[parentIDHeader] = parentID
Expand All @@ -230,7 +216,6 @@ func convertXrayTraceContextFromLambdaContext(ctx context.Context) (TraceContext
// By default, the context object won't have any Segment, (xray.GetSegment(ctx) will return nil). However it
// will have the "LambdaTraceHeader" object, which contains the traceID/parentID/sampling info.
func getXrayTraceHeaderFromContext(ctx context.Context) *header.Header {
fmt.Printf("[DEBUG] [context.go] [getXrayTraceHeaderFromContext()]\n")
var traceHeader string

if traceHeaderValue := ctx.Value(xray.LambdaTraceHeaderKey); traceHeaderValue != nil {
Expand Down Expand Up @@ -291,7 +276,6 @@ func convertXRaySamplingDecision(decision header.SamplingDecision) string {

// ConvertTraceContextToSpanContext converts a TraceContext object to a SpanContext that can be used by dd-trace.
func ConvertTraceContextToSpanContext(traceCtx TraceContext) (ddtrace.SpanContext, error) {
fmt.Printf("[DEBUG] [context.go] [ConvertTraceContextToSpanContext()]\n")
spanCtx, err := propagator.Extract(tracer.TextMapCarrier(traceCtx))

if err != nil {
Expand Down

0 comments on commit e133343

Please sign in to comment.