Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pubsub: concurrent map write panic when publishing with telemetry enabled and shared attributes #11314

Open
steved opened this issue Dec 18, 2024 · 1 comment
Assignees
Labels
api: pubsub Issues related to the Pub/Sub API. priority: p2 Moderately-important priority. Fix may not be included in next release. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns.

Comments

@steved
Copy link

steved commented Dec 18, 2024

Client

PubSub

Environment

$ go version
go version go1.23.4 darwin/arm64

Code and Dependencies

package main

import (
	"context"
	"fmt"
	"sync"

	"strings"

	"cloud.google.com/go/pubsub"
	"go.opentelemetry.io/otel"
	"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
	"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
	"go.opentelemetry.io/otel/propagation"
	"go.opentelemetry.io/otel/sdk/resource"
	sdktrace "go.opentelemetry.io/otel/sdk/trace"
	semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
)

func initTracing(ctx context.Context) {
	r, err := resource.New(context.Background(),
		resource.WithSchemaURL(semconv.SchemaURL),
		resource.WithAttributes(semconv.ServiceName("test-service")),
		resource.WithTelemetrySDK(),
		resource.WithContainerID(),
	)

	if err != nil {
		panic(err)
	}

	client := otlptracegrpc.NewClient()
	exporter, err := otlptrace.New(ctx, client)
	if err != nil {
		panic(err)
	}

	provider := sdktrace.NewTracerProvider(
		sdktrace.WithBatcher(exporter),
		sdktrace.WithResource(r),
	)

	otel.SetTextMapPropagator(
		propagation.NewCompositeTextMapPropagator(
			propagation.TraceContext{},
			propagation.Baggage{},
		),
	)

	otel.SetTracerProvider(provider)
}

func main() {
	ctx := context.Background()

	initTracing(ctx)

	client, err := pubsub.NewClientWithConfig(ctx, "test-project", &pubsub.ClientConfig{EnableOpenTelemetryTracing: true})
	if err != nil {
		panic(err)
	}

	_, err = client.CreateTopic(ctx, "test-topic")
	if err != nil {
		if !strings.Contains(err.Error(), "AlreadyExists") {
			panic(err)
		}
	}

	tracer := otel.GetTracerProvider().Tracer("")
	topic := client.Topic("test-topic")
	topic.PublishSettings = pubsub.PublishSettings{CountThreshold: 1}

	attributes := map[string]string{"my-key": "my-value"}

	var wg sync.WaitGroup

	for num := range 100 {
		num := num
		wg.Add(1)

		go func() {
			defer wg.Done()

			// attributes := map[string]string{"my-key": "my-value"}
			ctx, span := tracer.Start(context.Background(), "publish")
			defer span.End()

			msg := []byte(fmt.Sprintf("%d", num))
			topic.Publish(ctx, &pubsub.Message{
				Data:       msg,
				Attributes: attributes,
			})
			fmt.Printf("Published %d\n", num)
		}()
	}

	wg.Wait()
	topic.Stop()

	otel.GetTracerProvider().(*sdktrace.TracerProvider).Shutdown(ctx)
}
go.mod
module github.com/steved/pubsub-repro

go 1.23.4

require (
	cloud.google.com/go/pubsub v1.45.3
	go.opentelemetry.io/otel v1.33.0
	go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.33.0
	go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.33.0
	go.opentelemetry.io/otel/sdk v1.33.0
)

require (
	cloud.google.com/go v0.116.0 // indirect
	cloud.google.com/go/auth v0.11.0 // indirect
	cloud.google.com/go/auth/oauth2adapt v0.2.6 // indirect
	cloud.google.com/go/compute/metadata v0.5.2 // indirect
	cloud.google.com/go/iam v1.2.2 // indirect
	github.com/cenkalti/backoff/v4 v4.3.0 // indirect
	github.com/felixge/httpsnoop v1.0.4 // indirect
	github.com/go-logr/logr v1.4.2 // indirect
	github.com/go-logr/stdr v1.2.2 // indirect
	github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
	github.com/google/s2a-go v0.1.8 // indirect
	github.com/google/uuid v1.6.0 // indirect
	github.com/googleapis/enterprise-certificate-proxy v0.3.4 // indirect
	github.com/googleapis/gax-go/v2 v2.14.0 // indirect
	github.com/grpc-ecosystem/grpc-gateway/v2 v2.24.0 // indirect
	go.opencensus.io v0.24.0 // indirect
	go.opentelemetry.io/auto/sdk v1.1.0 // indirect
	go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.54.0 // indirect
	go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.54.0 // indirect
	go.opentelemetry.io/otel/metric v1.33.0 // indirect
	go.opentelemetry.io/otel/trace v1.33.0 // indirect
	go.opentelemetry.io/proto/otlp v1.4.0 // indirect
	golang.org/x/crypto v0.30.0 // indirect
	golang.org/x/net v0.32.0 // indirect
	golang.org/x/oauth2 v0.24.0 // indirect
	golang.org/x/sync v0.10.0 // indirect
	golang.org/x/sys v0.28.0 // indirect
	golang.org/x/text v0.21.0 // indirect
	golang.org/x/time v0.8.0 // indirect
	google.golang.org/api v0.210.0 // indirect
	google.golang.org/genproto v0.0.0-20241118233622-e639e219e697 // indirect
	google.golang.org/genproto/googleapis/api v0.0.0-20241209162323-e6fa225c2576 // indirect
	google.golang.org/genproto/googleapis/rpc v0.0.0-20241209162323-e6fa225c2576 // indirect
	google.golang.org/grpc v1.68.1 // indirect
	google.golang.org/protobuf v1.35.2 // indirect
)

Expected behavior

The answer could be that users should not be passing in a shared map of attributes, but I expected that any modification would only occur on a pubsub-local copy of the attributes.

Actual behavior

fatal error: concurrent map iteration and map write

goroutine 181 [running]:
reflect.mapiterinit(0x1400017f638?, 0x100875220?, 0x1400017f608?)
	/opt/homebrew/Cellar/go/1.23.4/libexec/src/runtime/map.go:1520 +0x1c
reflect.(*MapIter).Next(0x100f8d880?)
	/opt/homebrew/Cellar/go/1.23.4/libexec/src/reflect/value.go:1984 +0x60
google.golang.org/protobuf/internal/impl.sizeMap({0x100f8d880?, 0x1400013c7b0?, 0x1400013c7b0?}, 0x140001601e0, 0x14000172090, {0xec?})
	/Users/sdavidovitz/src/go/pkg/mod/google.golang.org/[email protected]/internal/impl/codec_map.go:98 +0x130
google.golang.org/protobuf/internal/impl.encoderFuncsForMap.func1({0x0?}, 0x14000172090, {0xc0?})
	/Users/sdavidovitz/src/go/pkg/mod/google.golang.org/[email protected]/internal/impl/codec_map.go:54 +0x58
google.golang.org/protobuf/internal/impl.(*MessageInfo).sizePointerSlow(0x140003a47e8, {0x68?}, {0x28?})
	/Users/sdavidovitz/src/go/pkg/mod/google.golang.org/[email protected]/internal/impl/encode.go:82 +0xc8
google.golang.org/protobuf/internal/impl.(*MessageInfo).sizePointer(0x1400017f828?, {0x1008046e8?}, {0x28?})
	/Users/sdavidovitz/src/go/pkg/mod/google.golang.org/[email protected]/internal/impl/encode.go:59 +0x8c
google.golang.org/protobuf/internal/impl.(*MessageInfo).size(0x1400017f8c8?, {{}, {0x1010c1790?, 0x1400013c770?}, 0xc0?})
	/Users/sdavidovitz/src/go/pkg/mod/google.golang.org/[email protected]/internal/impl/encode.go:40 +0x64
google.golang.org/protobuf/proto.MarshalOptions.size({{}, 0x70?, 0xc7?, 0x13?}, {0x1010c1790, 0x1400013c770})
	/Users/sdavidovitz/src/go/pkg/mod/google.golang.org/[email protected]/proto/size.go:35 +0xdc
google.golang.org/protobuf/proto.MarshalOptions.Size({{}, 0xbf?, 0x27?, 0xf5?}, {0x1010aec60?, 0x1400013c770?})
	/Users/sdavidovitz/src/go/pkg/mod/google.golang.org/[email protected]/proto/size.go:26 +0x54
google.golang.org/protobuf/proto.Size(...)
	/Users/sdavidovitz/src/go/pkg/mod/google.golang.org/[email protected]/proto/size.go:16
cloud.google.com/go/pubsub.(*Topic).publishMessageBundle(0x14000204e00, {0x1010b8d40, 0x101621a80}, {0x140001460a8, 0x1, 0x0?})
	/Users/sdavidovitz/src/go/pkg/mod/cloud.google.com/go/[email protected]/topic.go:1198 +0xe68
cloud.google.com/go/pubsub.(*Topic).initBundler.func1({0x100f37060?, 0x14000148348?})
	/Users/sdavidovitz/src/go/pkg/mod/cloud.google.com/go/[email protected]/topic.go:1093 +0xfc
cloud.google.com/go/pubsub/internal/scheduler.(*PublishScheduler).Add.func1({0x100f37060, 0x14000148348})
	/Users/sdavidovitz/src/go/pkg/mod/cloud.google.com/go/[email protected]/internal/scheduler/publish_scheduler.go:114 +0x64
google.golang.org/api/support/bundler.(*Bundler).handle(0x140006a80b0, 0x0?)
	/Users/sdavidovitz/src/go/pkg/mod/google.golang.org/[email protected]/support/bundler/bundler.go:324 +0x4c
created by google.golang.org/api/support/bundler.(*Bundler).enqueueCurBundle in goroutine 77
	/Users/sdavidovitz/src/go/pkg/mod/google.golang.org/[email protected]/support/bundler/bundler.go:179 +0x12c

[snip]

Full stacktrace: https://gist.github.com/steved/55a4c8a2aa451cd2246f1b752e9e44e6

It doesn't seem to be captured all the time, but I believe it's going through:
https://github.com/googleapis/google-cloud-go/blob/pubsub/v1.45.3/pubsub/trace.go#L304
https://github.com/open-telemetry/opentelemetry-go/blob/v1.33.0/propagation/trace_context.go#L64

@steved steved added the triage me I really want to be triaged. label Dec 18, 2024
@product-auto-label product-auto-label bot added the api: pubsub Issues related to the Pub/Sub API. label Dec 18, 2024
@steved steved changed the title pubsub: concurrent map write panic when publishing with telemetry enabled and a shared set of attributes pubsub: concurrent map write panic when publishing with telemetry enabled and shared attributes Dec 18, 2024
@hongalex hongalex added type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns. priority: p2 Moderately-important priority. Fix may not be included in next release. and removed triage me I really want to be triaged. labels Dec 18, 2024
@hongalex
Copy link
Member

Thanks for filing an issue with a repro. It seems like you have a workaround currently (don't use a shared map) but the better long term fix is for us to create a local copy before injecting context propagation attributes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: pubsub Issues related to the Pub/Sub API. priority: p2 Moderately-important priority. Fix may not be included in next release. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns.
Projects
None yet
Development

No branches or pull requests

2 participants