Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memeory not be released after all stream was finished #2771

Open
fearfate opened this issue Aug 12, 2024 · 4 comments
Open

Memeory not be released after all stream was finished #2771

fearfate opened this issue Aug 12, 2024 · 4 comments
Labels
bug needs more info An issue that may be a bug or useful feature, but requires more information

Comments

@fearfate
Copy link

I used connect to sync a table of bigquery which has 50GB storage usage, tring to do a batch synchronization. but i found that after the task is completed, the memory usage remains unchanged. I am confused about this, could someone tell me how to deal with this, thanks!

I runned this in stream mode

My stream config:

input:
  label: "bigquery_package_versions"
  broker:
    inputs:
    - gcp_bigquery_select:
        project: example
        table: bigquery-public-data.deps_dev_v1.PackageVersions
        columns:
        - SnapshotAt
        - System
        - Name
        - Version
        - Licenses
        - Links
        - Advisories
        - VersionInfo
        - Hashes
        - DependenciesProcessed
        - DependencyError
        - UpstreamPublishedAt
        - Registries
        - SLSAProvenance
        - UpstreamIdentifiers
        where:  SnapshotAt >= ? AND SnapshotAt < ? # No default (optional)
        auto_replay_nacks: true
        # job_labels: {}
        # priority: ""
        args_mapping: |
          root = ["2024-07-29", "2024-08-08"]
        # prefix: "" # No default (optional)
        # 87212989
        suffix: |
          LIMIT 15000000 OFFSET 0
    batching:
      byte_size: 268435456
      period: 10m
      processors:
      - mapping: |          #!blobl
          root = this
          root.SnapshotAt = root.SnapshotAt.ts_unix_nano()
          if root.exists("UpstreamPublishedAt") && root.UpstreamPublishedAt != null {
            root.UpstreamPublishedAt = root.UpstreamPublishedAt.ts_unix_nano()
          }

          meta SnapshotAt = root.SnapshotAt


pipeline:
  processors:
  - parquet_encode:
      schema:
      - name: SnapshotAt
        type: INT64
      - name: System
        type: UTF8
      - name: Name
        type: UTF8
      - name: Version
        type: UTF8
      - name: Licenses
        type: UTF8
        repeated: true
      - name: Links
        repeated: true
        fields:
          - name: Label
            type: UTF8
          - name: URL
            type: UTF8
      - name: Advisories
        repeated: true
        fields:
          - name: Source
            type: UTF8
          - name: SourceID
            type: UTF8
      - name: VersionInfo
        optional: true
        fields:
          - name: IsRelease
            type: BOOLEAN
          - name: Ordinal
            type: INT64
      - name: Hashes
        repeated: true
        fields:
          - name: Type
            type: UTF8
          - name: Hash
            type: UTF8
      - name: DependenciesProcessed
        optional: true
        type: BOOLEAN
      - name: DependencyError
        optional: true
        type: BOOLEAN
      - name: UpstreamPublishedAt
        optional: true
        type: INT64
      - name: Registries
        type: UTF8
        repeated: true
      - name: SLSAProvenance
        optional: true
        fields:
          - name: SourceRepository
            type: UTF8
          - name: Commit
            type: UTF8
          - name: URL
            type: UTF8
          - name: Verified
            type: BOOLEAN
      - name: UpstreamIdentifiers
        repeated: true
        fields:
          - name: PackageName
            optional: true
            type: UTF8
          - name: VersionString
            optional: true
            type: UTF8
          - name: Source
            type: UTF8
      default_compression: zstd

output:
  file:
    path: ${YSDB_WORKER_DATA_HOME:/opt/ysdb-worker/data}/package_versions/${! meta("SnapshotAt") }/${! timestamp_unix_nano() }.parquet # No default (required)
    codec: all-bytes

And the metrics:

# HELP batch_created Benthos Counter metric
# TYPE batch_created counter
batch_created{label="bigquery_package_versions",mechanism="check",path="root.input.batching",stream="version"} 0
batch_created{label="bigquery_package_versions",mechanism="count",path="root.input.batching",stream="version"} 0
batch_created{label="bigquery_package_versions",mechanism="period",path="root.input.batching",stream="version"} 0
batch_created{label="bigquery_package_versions",mechanism="size",path="root.input.batching",stream="version"} 71
# HELP go_gc_duration_seconds A summary of the pause duration of garbage collection cycles.
# TYPE go_gc_duration_seconds summary
go_gc_duration_seconds{quantile="0"} 0.000139994
go_gc_duration_seconds{quantile="0.25"} 0.000161236
go_gc_duration_seconds{quantile="0.5"} 0.000174105
go_gc_duration_seconds{quantile="0.75"} 0.00019372
go_gc_duration_seconds{quantile="1"} 0.00047506
go_gc_duration_seconds_sum 2.914484991
go_gc_duration_seconds_count 1168
# HELP go_goroutines Number of goroutines that currently exist.
# TYPE go_goroutines gauge
go_goroutines 378676
# HELP go_info Information about the Go environment.
# TYPE go_info gauge
go_info{version="go1.22.5"} 1
# HELP go_memstats_alloc_bytes Number of bytes allocated and still in use.
# TYPE go_memstats_alloc_bytes gauge
go_memstats_alloc_bytes 4.017885552e+09
# HELP go_memstats_alloc_bytes_total Total number of bytes allocated, even if freed.
# TYPE go_memstats_alloc_bytes_total counter
go_memstats_alloc_bytes_total 9.99175765096e+11
# HELP go_memstats_buck_hash_sys_bytes Number of bytes used by the profiling bucket hash table.
# TYPE go_memstats_buck_hash_sys_bytes gauge
go_memstats_buck_hash_sys_bytes 1.922458e+06
# HELP go_memstats_frees_total Total number of frees.
# TYPE go_memstats_frees_total counter
go_memstats_frees_total 1.1805682842e+10
# HELP go_memstats_gc_sys_bytes Number of bytes used for garbage collection system metadata.
# TYPE go_memstats_gc_sys_bytes gauge
go_memstats_gc_sys_bytes 1.1821236e+08
# HELP go_memstats_heap_alloc_bytes Number of heap bytes allocated and still in use.
# TYPE go_memstats_heap_alloc_bytes gauge
go_memstats_heap_alloc_bytes 4.017885552e+09
# HELP go_memstats_heap_idle_bytes Number of heap bytes waiting to be used.
# TYPE go_memstats_heap_idle_bytes gauge
go_memstats_heap_idle_bytes 8.290402304e+09
# HELP go_memstats_heap_inuse_bytes Number of heap bytes that are in use.
# TYPE go_memstats_heap_inuse_bytes gauge
go_memstats_heap_inuse_bytes 4.57490432e+09
# HELP go_memstats_heap_objects Number of allocated objects.
# TYPE go_memstats_heap_objects gauge
go_memstats_heap_objects 4.3511309e+07
# HELP go_memstats_heap_released_bytes Number of heap bytes released to OS.
# TYPE go_memstats_heap_released_bytes gauge
go_memstats_heap_released_bytes 7.828144128e+09
# HELP go_memstats_heap_sys_bytes Number of heap bytes obtained from system.
# TYPE go_memstats_heap_sys_bytes gauge
go_memstats_heap_sys_bytes 1.2865306624e+10
# HELP go_memstats_last_gc_time_seconds Number of seconds since 1970 of last garbage collection.
# TYPE go_memstats_last_gc_time_seconds gauge
go_memstats_last_gc_time_seconds 1.7234258623280537e+09
# HELP go_memstats_lookups_total Total number of pointer lookups.
# TYPE go_memstats_lookups_total counter
go_memstats_lookups_total 0
# HELP go_memstats_mallocs_total Total number of mallocs.
# TYPE go_memstats_mallocs_total counter
go_memstats_mallocs_total 1.1849194151e+10
# HELP go_memstats_mcache_inuse_bytes Number of bytes in use by mcache structures.
# TYPE go_memstats_mcache_inuse_bytes gauge
go_memstats_mcache_inuse_bytes 19200
# HELP go_memstats_mcache_sys_bytes Number of bytes used for mcache structures obtained from system.
# TYPE go_memstats_mcache_sys_bytes gauge
go_memstats_mcache_sys_bytes 31200
# HELP go_memstats_mspan_inuse_bytes Number of bytes in use by mspan structures.
# TYPE go_memstats_mspan_inuse_bytes gauge
go_memstats_mspan_inuse_bytes 9.207696e+07
# HELP go_memstats_mspan_sys_bytes Number of bytes used for mspan structures obtained from system.
# TYPE go_memstats_mspan_sys_bytes gauge
go_memstats_mspan_sys_bytes 2.2848e+08
# HELP go_memstats_next_gc_bytes Number of heap bytes when next garbage collection will take place.
# TYPE go_memstats_next_gc_bytes gauge
go_memstats_next_gc_bytes 8.278799096e+09
# HELP go_memstats_other_sys_bytes Number of bytes used for other system allocations.
# TYPE go_memstats_other_sys_bytes gauge
go_memstats_other_sys_bytes 2.129251e+07
# HELP go_memstats_stack_inuse_bytes Number of bytes in use by the stack allocator.
# TYPE go_memstats_stack_inuse_bytes gauge
go_memstats_stack_inuse_bytes 7.99637504e+08
# HELP go_memstats_stack_sys_bytes Number of bytes obtained from system for stack allocator.
# TYPE go_memstats_stack_sys_bytes gauge
go_memstats_stack_sys_bytes 7.99637504e+08
# HELP go_memstats_sys_bytes Number of bytes obtained from system.
# TYPE go_memstats_sys_bytes gauge
go_memstats_sys_bytes 1.4034882656e+10
# HELP go_threads Number of OS threads created.
# TYPE go_threads gauge
go_threads 22
# HELP input_connection_failed Benthos Counter metric
# TYPE input_connection_failed counter
input_connection_failed{label="",path="root.input.broker.inputs.0",stream="version"} 0
# HELP input_connection_lost Benthos Counter metric
# TYPE input_connection_lost counter
input_connection_lost{label="",path="root.input.broker.inputs.0",stream="version"} 0
# HELP input_connection_up Benthos Counter metric
# TYPE input_connection_up counter
input_connection_up{label="",path="root.input.broker.inputs.0",stream="version"} 2
# HELP input_latency_ns Benthos Timing metric
# TYPE input_latency_ns summary
input_latency_ns{label="",path="root.input.broker.inputs.0",stream="version",quantile="0.5"} NaN
input_latency_ns{label="",path="root.input.broker.inputs.0",stream="version",quantile="0.9"} NaN
input_latency_ns{label="",path="root.input.broker.inputs.0",stream="version",quantile="0.99"} NaN
input_latency_ns_sum{label="",path="root.input.broker.inputs.0",stream="version"} 3.854610930651667e+18
input_latency_ns_count{label="",path="root.input.broker.inputs.0",stream="version"} 1.484165e+07
# HELP input_received Benthos Counter metric
# TYPE input_received counter
input_received{label="",path="root.input.broker.inputs.0",stream="version"} 1.5220244e+07
# HELP output_batch_sent Benthos Counter metric
# TYPE output_batch_sent counter
output_batch_sent{label="",path="root.output",stream="version"} 70
# HELP output_connection_failed Benthos Counter metric
# TYPE output_connection_failed counter
output_connection_failed{label="",path="root.output",stream="version"} 0
# HELP output_connection_lost Benthos Counter metric
# TYPE output_connection_lost counter
output_connection_lost{label="",path="root.output",stream="version"} 0
# HELP output_connection_up Benthos Counter metric
# TYPE output_connection_up counter
output_connection_up{label="",path="root.output",stream="version"} 1
# HELP output_error Benthos Counter metric
# TYPE output_error counter
output_error{label="",path="root.output",stream="version"} 0
# HELP output_latency_ns Benthos Timing metric
# TYPE output_latency_ns summary
output_latency_ns{label="",path="root.output",stream="version",quantile="0.5"} NaN
output_latency_ns{label="",path="root.output",stream="version",quantile="0.9"} NaN
output_latency_ns{label="",path="root.output",stream="version",quantile="0.99"} NaN
output_latency_ns_sum{label="",path="root.output",stream="version"} 5.135906716e+09
output_latency_ns_count{label="",path="root.output",stream="version"} 70
# HELP output_sent Benthos Counter metric
# TYPE output_sent counter
output_sent{label="",path="root.output",stream="version"} 70
# HELP process_cpu_seconds_total Total user and system CPU time spent in seconds.
# TYPE process_cpu_seconds_total counter
process_cpu_seconds_total 14875.87
# HELP process_max_fds Maximum number of open file descriptors.
# TYPE process_max_fds gauge
process_max_fds 4096
# HELP process_open_fds Number of open file descriptors.
# TYPE process_open_fds gauge
process_open_fds 13
# HELP process_resident_memory_bytes Resident memory size in bytes.
# TYPE process_resident_memory_bytes gauge
process_resident_memory_bytes 1.0702270464e+10
# HELP process_start_time_seconds Start time of the process since unix epoch in seconds.
# TYPE process_start_time_seconds gauge
process_start_time_seconds 1.72329419936e+09
# HELP process_virtual_memory_bytes Virtual memory size in bytes.
# TYPE process_virtual_memory_bytes gauge
process_virtual_memory_bytes 1.5477092352e+10
# HELP process_virtual_memory_max_bytes Maximum amount of virtual memory available in bytes.
# TYPE process_virtual_memory_max_bytes gauge
process_virtual_memory_max_bytes 1.8446744073709552e+19
# HELP processor_batch_received Benthos Counter metric
# TYPE processor_batch_received counter
processor_batch_received{label="",path="root.input.batching.processors.0",stream="version"} 71
processor_batch_received{label="",path="root.pipeline.processors.0",stream="version"} 70
# HELP processor_batch_sent Benthos Counter metric
# TYPE processor_batch_sent counter
processor_batch_sent{label="",path="root.input.batching.processors.0",stream="version"} 71
processor_batch_sent{label="",path="root.pipeline.processors.0",stream="version"} 70
# HELP processor_error Benthos Counter metric
# TYPE processor_error counter
processor_error{label="",path="root.input.batching.processors.0",stream="version"} 0
processor_error{label="",path="root.pipeline.processors.0",stream="version"} 0
# HELP processor_latency_ns Benthos Timing metric
# TYPE processor_latency_ns summary
processor_latency_ns{label="",path="root.input.batching.processors.0",stream="version",quantile="0.5"} NaN
processor_latency_ns{label="",path="root.input.batching.processors.0",stream="version",quantile="0.9"} NaN
processor_latency_ns{label="",path="root.input.batching.processors.0",stream="version",quantile="0.99"} NaN
processor_latency_ns_sum{label="",path="root.input.batching.processors.0",stream="version"} 8.41241099571e+11
processor_latency_ns_count{label="",path="root.input.batching.processors.0",stream="version"} 71
processor_latency_ns{label="",path="root.pipeline.processors.0",stream="version",quantile="0.5"} NaN
processor_latency_ns{label="",path="root.pipeline.processors.0",stream="version",quantile="0.9"} NaN
processor_latency_ns{label="",path="root.pipeline.processors.0",stream="version",quantile="0.99"} NaN
processor_latency_ns_sum{label="",path="root.pipeline.processors.0",stream="version"} 5.17977569825e+11
processor_latency_ns_count{label="",path="root.pipeline.processors.0",stream="version"} 70
# HELP processor_received Benthos Counter metric
# TYPE processor_received counter
processor_received{label="",path="root.input.batching.processors.0",stream="version"} 1.5061893e+07
processor_received{label="",path="root.pipeline.processors.0",stream="version"} 1.484165e+07
# HELP processor_sent Benthos Counter metric
# TYPE processor_sent counter
processor_sent{label="",path="root.input.batching.processors.0",stream="version"} 1.5061893e+07
processor_sent{label="",path="root.pipeline.processors.0",stream="version"} 70

By the way, how to make the pagination loop for the input gcp_bigquery_select, with LIMIT and OFFSET , to scan all rows of the table

@fearfate
Copy link
Author

the top output, ysdb-worker is the connect with some custom componement and bloblang custom functions

[root@prod-pve-worklink-yops-ysdb ~]# top
top - 01:10:18 up 5 days, 12:54,  2 users,  load average: 0.24, 0.09, 0.06
Tasks: 191 total,   1 running, 190 sleeping,   0 stopped,   0 zombie
%Cpu(s):  0.0 us,  0.0 sy,  0.0 ni,100.0 id,  0.0 wa,  0.0 hi,  0.0 si,  0.0 st
KiB Mem : 32778176 total, 12321100 free, 11014104 used,  9442972 buff/cache
KiB Swap:        0 total,        0 free,        0 used. 21336116 avail Mem

  PID USER      PR  NI    VIRT    RES    SHR S  %CPU %MEM     TIME+ COMMAND
17910 root      20   0   14.4g  10.0g  38748 S   0.3 31.9 247:00.85 ysdb-worker

@mihaitodor
Copy link
Collaborator

Hey @fearfate, I had a quick look over the config, but it would be difficult to reproduce the issue you're seeing on my end. Unless the Google library used by gcp_bigquery_select does something silly, the issue should still pop up if you use a simpler input. Any chance you could try reproducing it using a file or generate input (or even a custom input which doesn't rely on 3rd party services)?

@mihaitodor mihaitodor added bug needs more info An issue that may be a bug or useful feature, but requires more information labels Aug 21, 2024
@fearfate
Copy link
Author

fearfate commented Sep 9, 2024

I changed the usage and this can not be reproduced again, thanks for your replay!

Hey @fearfate, I had a quick look over the config, but it would be difficult to reproduce the issue you're seeing on my end. Unless the Google library used by gcp_bigquery_select does something silly, the issue should still pop up if you use a simpler input. Any chance you could try reproducing it using a file or generate input (or even a custom input which doesn't rely on 3rd party services)?

@mihaitodor
Copy link
Collaborator

OK, does that mean the issue has been resolved?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug needs more info An issue that may be a bug or useful feature, but requires more information
Projects
None yet
Development

No branches or pull requests

2 participants