Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to optimize the performance of Kafka exporters? #36853

Open
xiaoyao2246 opened this issue Dec 16, 2024 · 6 comments
Open

How to optimize the performance of Kafka exporters? #36853

xiaoyao2246 opened this issue Dec 16, 2024 · 6 comments

Comments

@xiaoyao2246
Copy link

xiaoyao2246 commented Dec 16, 2024

Component(s)

exporter/kafka

Describe the issue you're reporting

I deployed a simple Collector using the OpenTelemetry Operator, and its configuration is as follows:

kind: OpenTelemetryCollector
metadata:
  name: collector-otlp
spec:
  image: xxxxx/opentelemetry-collector-contrib:0.40.0
  replicas: 1
  mode: deployment
  resources:
    limits:
      cpu: 1000m
      memory: 2048Mi
  config: |
    receivers:
      jaeger: 
        protocols: 
          thrift_http: 
            endpoint: 0.0.0.0:4316
      otlp:
        protocols:
          grpc:
            endpoint: 0.0.0.0:4317
          http:
            endpoint: 0.0.0.0:4318
    processors: 
      batch: 
        send_batch_size: 500
        send_batch_max_size: 500
      resource: 
        attributes: 
        - key: from-collector
          value: temp
          action: insert
    exporters: 
      logging: 
        loglevel: info
      kafka: 
        brokers:
          - xx.xx.xx.xx:9092
          - xx.xx.xx.xx:9092
          - xx.xx.xx.xx:9092
        topic: otlp_trace_temp
        protocol_version: 2.0.0
    service:
      pipelines:
        traces:
          receivers: [otlp, jaeger]
          processors: [batch, resource]
          exporters: [logging, kafka]

Since the version of my Kubernetes is 1.20.11, I used the v0.40.0 version of the collector.

The configuration of my Collector is 1 core and 2GB of memory.

Part of the collector's logs are as follows:

2024-12-16T14:36:04.690Z        INFO    loggingexporter/logging_exporter.go:40  TracesExporter  {"#spans": 500}
2024-12-16T14:36:04.700Z        INFO    loggingexporter/logging_exporter.go:40  TracesExporter  {"#spans": 500}
2024-12-16T14:36:05.691Z        INFO    loggingexporter/logging_exporter.go:40  TracesExporter  {"#spans": 500}
2024-12-16T14:36:05.691Z        INFO    loggingexporter/logging_exporter.go:40  TracesExporter  {"#spans": 500}
2024-12-16T14:36:06.692Z        INFO    loggingexporter/logging_exporter.go:40  TracesExporter  {"#spans": 500}
2024-12-16T14:36:06.693Z        INFO    loggingexporter/logging_exporter.go:40  TracesExporter  {"#spans": 500}
2024-12-16T14:36:07.693Z        INFO    loggingexporter/logging_exporter.go:40  TracesExporter  {"#spans": 500}
2024-12-16T14:36:07.696Z        INFO    loggingexporter/logging_exporter.go:40  TracesExporter  {"#spans": 500}
2024-12-16T14:36:08.696Z        INFO    loggingexporter/logging_exporter.go:40  TracesExporter  {"#spans": 500}
2024-12-16T14:36:08.699Z        INFO    loggingexporter/logging_exporter.go:40  TracesExporter  {"#spans": 500}
2024-12-16T14:36:09.699Z        INFO    loggingexporter/logging_exporter.go:40  TracesExporter  {"#spans": 500}
2024-12-16T14:36:09.702Z        INFO    loggingexporter/logging_exporter.go:40  TracesExporter  {"#spans": 500}
2024-12-16T14:36:10.700Z        INFO    loggingexporter/logging_exporter.go:40  TracesExporter  {"#spans": 500}
2024-12-16T14:36:10.701Z        INFO    loggingexporter/logging_exporter.go:40  TracesExporter  {"#spans": 500}
2024-12-16T14:36:11.700Z        INFO    loggingexporter/logging_exporter.go:40  TracesExporter  {"#spans": 500}
2024-12-16T14:36:11.701Z        INFO    loggingexporter/logging_exporter.go:40  TracesExporter  {"#spans": 500}
2024-12-16T14:36:12.702Z        INFO    loggingexporter/logging_exporter.go:40  TracesExporter  {"#spans": 500}
2024-12-16T14:36:12.703Z        INFO    loggingexporter/logging_exporter.go:40  TracesExporter  {"#spans": 500}
2024-12-16T14:36:13.710Z        INFO    loggingexporter/logging_exporter.go:40  TracesExporter  {"#spans": 500}
2024-12-16T14:36:13.712Z        INFO    loggingexporter/logging_exporter.go:40  TracesExporter  {"#spans": 500}
2024-12-16T14:36:14.719Z        INFO    loggingexporter/logging_exporter.go:40  TracesExporter  {"#spans": 500}
2024-12-16T14:36:14.722Z        INFO    loggingexporter/logging_exporter.go:40  TracesExporter  {"#spans": 500}
2024-12-16T14:36:15.726Z        INFO    loggingexporter/logging_exporter.go:40  TracesExporter  {"#spans": 500}
2024-12-16T14:36:15.729Z        INFO    loggingexporter/logging_exporter.go:40  TracesExporter  {"#spans": 500}
2024-12-16T14:36:16.730Z        INFO    loggingexporter/logging_exporter.go:40  TracesExporter  {"#spans": 500}
2024-12-16T14:36:16.733Z        INFO    loggingexporter/logging_exporter.go:40  TracesExporter  {"#spans": 500}
2024-12-16T14:36:17.735Z        INFO    loggingexporter/logging_exporter.go:40  TracesExporter  {"#spans": 500}
2024-12-16T14:36:17.738Z        INFO    loggingexporter/logging_exporter.go:40  TracesExporter  {"#spans": 500}
2024-12-16T14:36:18.739Z        INFO    loggingexporter/logging_exporter.go:40  TracesExporter  {"#spans": 500}
2024-12-16T14:36:18.742Z        INFO    loggingexporter/logging_exporter.go:40  TracesExporter  {"#spans": 500}
2024-12-16T14:36:19.743Z        INFO    loggingexporter/logging_exporter.go:40  TracesExporter  {"#spans": 500}
2024-12-16T14:36:19.746Z        INFO    loggingexporter/logging_exporter.go:40  TracesExporter  {"#spans": 500}
2024-12-16T14:36:20.747Z        INFO    loggingexporter/logging_exporter.go:40  TracesExporter  {"#spans": 500}
2024-12-16T14:36:20.751Z        INFO    loggingexporter/logging_exporter.go:40  TracesExporter  {"#spans": 500}

In order to test the performance of the Collector, I send trace data to the Collector, The current performance of the Collector is as follows:
image

I found that under the current configuration, the CPU usage is relatively high, while the memory usage is very low.

My question is, is there any other way, or strategy, to improve Collector's performance? I'm just new to OpenTelemetry and hope to get some good advice!

Thank you all again for your help.

@xiaoyao2246 xiaoyao2246 added the needs triage New item requiring triage label Dec 16, 2024
Copy link
Contributor

Pinging code owners:

See Adding Labels via Comments if you do not have permissions to add labels yourself.

@VihasMakwana
Copy link
Contributor

v0.40.0 is very old.
Anyways, would it be possible to share CPU profile? You can use https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/v0.40.0/extension/pprofextension/README.md

@VihasMakwana VihasMakwana removed the needs triage New item requiring triage label Dec 17, 2024
@xiaoyao2246
Copy link
Author

@VihasMakwana
I have successfully deployed a higher version of the Collector. I'm using version 0.90.0. Now I'm trying to optimize the performance of Kafka Export.

I saw it in the README.md of Kafka Export:

This exporter uses a synchronous producer that blocks and does not batch messages, therefore it should be used with batch and queued retry processors for higher throughput and resiliency.

Which component does the "queued retry processors" here refer to? I didn't find it in the repository.

Thank you again for your help.

@xiaoyao2246
Copy link
Author

xiaoyao2246 commented Dec 25, 2024

After I turned on pprof, first of all, my collector metrics are as follows:
image

I have obtained the following analysis data:

Saved profile in /root/pprof/pprof.otelcol-contrib.samples.cpu.003.pb.gz
File: otelcol-contrib
Type: cpu
Time: Dec 25, 2024 at 10:31pm (CST)
Duration: 300.13s, Total samples = 207.89s (69.27%)
Entering interactive mode (type "help" for commands, "o" for options)
(pprof) top
Showing nodes accounting for 75.50s, 36.32% of 207.89s total
Dropped 1083 nodes (cum <= 1.04s)
Showing top 10 nodes out of 259
      flat  flat%   sum%        cum   cum%
    15.27s  7.35%  7.35%     15.27s  7.35%  runtime/internal/syscall.Syscall6
    10.32s  4.96% 12.31%     19.91s  9.58%  runtime.scanobject
     9.15s  4.40% 16.71%     40.12s 19.30%  runtime.mallocgc
     8.91s  4.29% 21.00%     11.07s  5.32%  compress/flate.(*decompressor).huffSym
     6.97s  3.35% 24.35%      6.97s  3.35%  runtime.memclrNoHeapPointers
     6.05s  2.91% 27.26%      6.90s  3.32%  runtime.lock2
     5.80s  2.79% 30.05%      5.80s  2.79%  runtime.memmove
     4.40s  2.12% 32.17%      4.45s  2.14%  runtime.unlock2
     4.32s  2.08% 34.24%     20.02s  9.63%  compress/flate.(*decompressor).huffmanBlock
     4.31s  2.07% 36.32%      4.32s  2.08%  compress/flate.(*compressor).reset

image

The configuration I'm using is:

data:
  config.yaml: |-
    receivers:
      otlp:
        protocols:
          grpc:
            endpoint: 0.0.0.0:4317
          http:
            endpoint: 0.0.0.0:4318
    processors:
      batch:
        send_batch_size: 500
        send_batch_max_size: 500
      resource:
        attributes:
          - key: from-collector
            value: gd-fat-k8s
            action: insert
    exporters:
      logging:
        verbosity: normal
      kafka:
        brokers:
          - xx.xx.xx.xx:9092
          - xx.xx.xx.xx:9092
          - xx.xx.xx.xx:9092
        topic: otlp_trace_fat
        partition_traces_by_id: true
        protocol_version: 1.0.0
        
    extensions:
      pprof:
        endpoint: ":1777"
        #save_to_file: "cpu_profile_collector.out"
    service:
      extensions: [pprof]
      pipelines:
        traces:
          receivers: [otlp]
          processors: [batch, resource]
          exporters: [logging, kafka]

I just wanted to send Trace data to Kafka, but it seems that the resource usage of the Collector is a bit high.
Are there any other ways to reduce the CPU usage of the Collector?

@VihasMakwana @indrekj @tmc @dazuma

@VihasMakwana
Copy link
Contributor

@xiaoyao2246 Is there any backpressure from kafka? Do you see any errors in OTeL logs?

@xiaoyao2246
Copy link
Author

@xiaoyao2246 Is there any backpressure from kafka? Do you see any errors in OTeL logs?

No, Kafka is working normally and there are no error logs for the Collector. Throughout the whole process, the memory usage rate of the Collector remains at around 100 megabytes. However, I've allocated 2 gigabytes of memory to it. Then, through pprof, it was found that this is caused by runtime.mallocgc. It should be related to the following code:

// SplitTraces returns one ptrace.Traces for each trace in the given ptrace.Traces input. Each of the resulting ptrace.Traces contains exactly one trace.
func SplitTraces(batch ptrace.Traces) []ptrace.Traces {
	// for each span in the resource spans, we group them into batches of rs/ils/traceID.
	// if the same traceID exists in different ils, they land in different batches.
	var result []ptrace.Traces

	for i := 0; i < batch.ResourceSpans().Len(); i++ {
		rs := batch.ResourceSpans().At(i)

		for j := 0; j < rs.ScopeSpans().Len(); j++ {
			// the batches for this ILS
			batches := map[pcommon.TraceID]ptrace.ResourceSpans{}

			ils := rs.ScopeSpans().At(j)
			for k := 0; k < ils.Spans().Len(); k++ {
				span := ils.Spans().At(k)
				key := span.TraceID()

				// for the first traceID in the ILS, initialize the map entry
				// and add the singleTraceBatch to the result list
				if _, ok := batches[key]; !ok {
					trace := ptrace.NewTraces()
					newRS := trace.ResourceSpans().AppendEmpty()
					// currently, the ResourceSpans implementation has only a Resource and an ILS. We'll copy the Resource
					// and set our own ILS
					rs.Resource().CopyTo(newRS.Resource())
					newRS.SetSchemaUrl(rs.SchemaUrl())
					newILS := newRS.ScopeSpans().AppendEmpty()
					// currently, the ILS implementation has only an InstrumentationLibrary and spans. We'll copy the library
					// and set our own spans
					ils.Scope().CopyTo(newILS.Scope())
					newILS.SetSchemaUrl(ils.SchemaUrl())
					batches[key] = newRS

					result = append(result, trace)
				}

				// there is only one instrumentation library per batch
				tgt := batches[key].ScopeSpans().At(0).Spans().AppendEmpty()
				span.CopyTo(tgt)
			}
		}
	}

	return result
}

What I don't understand is that although there is so much available memory, why is garbage collection (GC) still taking place?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants