From 06b525607d8aaa971d227de253eaeffd262cc59f Mon Sep 17 00:00:00 2001 From: Baha Aiman Date: Wed, 31 Jul 2024 00:15:16 +0000 Subject: [PATCH 1/6] feat(bigtable): Adding first_response_latencies and connectivity_error_count metrics --- bigtable/bigtable.go | 50 +++++++++++++++++------------ bigtable/integration_test.go | 15 ++++++--- bigtable/metric_util.go | 5 +++ bigtable/metrics.go | 61 +++++++++++++++++++++++++++++++++--- 4 files changed, 102 insertions(+), 29 deletions(-) diff --git a/bigtable/bigtable.go b/bigtable/bigtable.go index 576146899de8..20e8108b2393 100644 --- a/bigtable/bigtable.go +++ b/bigtable/bigtable.go @@ -47,8 +47,11 @@ import ( _ "google.golang.org/grpc/balancer/rls" ) -const prodAddr = "bigtable.googleapis.com:443" -const mtlsProdAddr = "bigtable.mtls.googleapis.com:443" +const ( + prodAddr = "bigtable.googleapis.com:443" + mtlsProdAddr = "bigtable.mtls.googleapis.com:443" + methodNameReadRows = "ReadRows" +) // Client is a client for reading and writing data to tables in an instance. // @@ -353,7 +356,7 @@ func (t *Table) ReadRows(ctx context.Context, arg RowSet, f func(Row) bool, opts func (t *Table) readRows(ctx context.Context, arg RowSet, f func(Row) bool, mt *builtinMetricsTracer, opts ...ReadOption) (err error) { var prevRowKey string attrMap := make(map[string]interface{}) - err = gaxInvokeWithRecorder(ctx, mt, "ReadRows", func(ctx context.Context, headerMD, trailerMD *metadata.MD, _ gax.CallSettings) error { + err = gaxInvokeWithRecorder(ctx, mt, methodNameReadRows, func(ctx context.Context, headerMD, trailerMD *metadata.MD, _ gax.CallSettings) error { req := &btpb.ReadRowsRequest{ AppProfileId: t.c.appProfile, } @@ -395,6 +398,7 @@ func (t *Table) readRows(ctx context.Context, arg RowSet, f func(Row) bool, mt * // Ignore error since header is only being used to record builtin metrics // Failure to record metrics should not fail the operation *headerMD, _ = stream.Header() + mt.currOp.setFirstRespTime(time.Now()) for { res, err := stream.Recv() if err == io.EOF { @@ -1531,30 +1535,34 @@ func (t *Table) newBuiltinMetricsTracer(ctx context.Context, isStreaming bool) * } // recordOperationCompletion records as many operation specific metrics as it can +// Ignores error seen while creating metric attributes since metric can still +// be recorded with rest of the attributes func recordOperationCompletion(mt *builtinMetricsTracer) { if !mt.builtInEnabled { return } // Calculate elapsed time - elapsedTimeMs := float64(time.Since(mt.currOp.startTime).Nanoseconds()) / 1000000 + elapsedTimeMs := convertToMs(time.Since(mt.currOp.startTime)) - // Attributes for operation_latencies - // Ignore error seen while creating metric attributes since metric can still - // be recorded with rest of the attributes + // Record operation_latencies opLatAttrs, _ := mt.toOtelMetricAttrs(metricNameOperationLatencies) mt.instrumentOperationLatencies.Record(mt.ctx, elapsedTimeMs, metric.WithAttributes(opLatAttrs...)) - // Attributes for retry_count - // Ignore error seen while creating metric attributes since metric can still - // be recorded with rest of the attributes + // Record retry_count retryCntAttrs, _ := mt.toOtelMetricAttrs(metricNameRetryCount) - - // Only record when retry count is greater than 0 so the retry - // graph will be less confusing if mt.currOp.attemptCount > 1 { + // Only record when retry count is greater than 0 so the retry + // graph will be less confusing mt.instrumentRetryCount.Add(mt.ctx, mt.currOp.attemptCount-1, metric.WithAttributes(retryCntAttrs...)) } + + // Record first_reponse_latencies + firstRespLatAttrs, _ := mt.toOtelMetricAttrs(metricNameRetryCount) + if mt.method == methodNameReadRows { + elapsedTimeMs = convertToMs(mt.currOp.firstRespTime.Sub(mt.currOp.startTime)) + mt.instrumentFirstRespLatencies.Record(mt.ctx, elapsedTimeMs, metric.WithAttributes(firstRespLatAttrs...)) + } } // gaxInvokeWithRecorder: @@ -1604,25 +1612,27 @@ func gaxInvokeWithRecorder(ctx context.Context, mt *builtinMetricsTracer, method } // recordAttemptCompletion records as many attempt specific metrics as it can +// Ignore errors seen while creating metric attributes since metric can still +// be recorded with rest of the attributes func recordAttemptCompletion(mt *builtinMetricsTracer) { if !mt.builtInEnabled { return } // Calculate elapsed time - elapsedTime := float64(time.Since(mt.currOp.currAttempt.startTime).Nanoseconds()) / 1000000 + elapsedTime := convertToMs(time.Since(mt.currOp.currAttempt.startTime)) - // Attributes for attempt_latencies - // Ignore error seen while creating metric attributes since metric can still - // be recorded with rest of the attributes + // Record attempt_latencies attemptLatAttrs, _ := mt.toOtelMetricAttrs(metricNameAttemptLatencies) mt.instrumentAttemptLatencies.Record(mt.ctx, elapsedTime, metric.WithAttributes(attemptLatAttrs...)) - // Attributes for server_latencies - // Ignore error seen while creating metric attributes since metric can still - // be recorded with rest of the attributes + // Record server_latencies and connectivity_error_count + connErrCountAttrs, _ := mt.toOtelMetricAttrs(metricNameConnErrCount) serverLatAttrs, _ := mt.toOtelMetricAttrs(metricNameServerLatencies) if mt.currOp.currAttempt.serverLatencyErr == nil { mt.instrumentServerLatencies.Record(mt.ctx, mt.currOp.currAttempt.serverLatency, metric.WithAttributes(serverLatAttrs...)) + mt.instrumentConnErrCount.Add(mt.ctx, 0, metric.WithAttributes(connErrCountAttrs...)) + } else { + mt.instrumentConnErrCount.Add(mt.ctx, 1, metric.WithAttributes(connErrCountAttrs...)) } } diff --git a/bigtable/integration_test.go b/bigtable/integration_test.go index 240e5bc54222..2a96883cc0fc 100644 --- a/bigtable/integration_test.go +++ b/bigtable/integration_test.go @@ -860,11 +860,14 @@ func TestIntegration_ExportBuiltInMetrics(t *testing.T) { t.Fatalf("Apply: %v", err) } } - err = table.ReadRows(ctx, PrefixRange("row-"), func(r Row) bool { - return true - }, RowFilter(ColumnFilter("col"))) - if err != nil { - t.Fatalf("ReadRows: %v", err) + + for i := 0; i < 10; i++ { + err = table.ReadRows(ctx, PrefixRange("row-"), func(r Row) bool { + return true + }, RowFilter(ColumnFilter("col"))) + if err != nil { + t.Fatalf("ReadRows: %v", err) + } } // Validate that metrics are exported @@ -885,6 +888,8 @@ func TestIntegration_ExportBuiltInMetrics(t *testing.T) { metricNameOperationLatencies, metricNameAttemptLatencies, metricNameServerLatencies, + metricNameFirstRespLatencies, + metricNameConnErrCount, } // Try for 5m with 10s sleep between retries diff --git a/bigtable/metric_util.go b/bigtable/metric_util.go index 50acae12ff76..a4631f8b5b74 100644 --- a/bigtable/metric_util.go +++ b/bigtable/metric_util.go @@ -20,6 +20,7 @@ import ( "fmt" "strconv" "strings" + "time" btpb "cloud.google.com/go/bigtable/apiv2/bigtablepb" "google.golang.org/grpc/metadata" @@ -92,3 +93,7 @@ func extractLocation(headerMD metadata.MD, trailerMD metadata.MD) (string, strin return responseParams.GetClusterId(), responseParams.GetZoneId(), nil } + +func convertToMs(d time.Duration) float64 { + return float64(d.Nanoseconds()) / 1000000 +} diff --git a/bigtable/metrics.go b/bigtable/metrics.go index 4a0413095f9e..f5778f3fb39b 100644 --- a/bigtable/metrics.go +++ b/bigtable/metrics.go @@ -59,7 +59,13 @@ const ( metricNameOperationLatencies = "operation_latencies" metricNameAttemptLatencies = "attempt_latencies" metricNameServerLatencies = "server_latencies" + metricNameFirstRespLatencies = "first_response_latencies" metricNameRetryCount = "retry_count" + metricNameConnErrCount = "connectivity_error_count" + + // Metric units + metricUnitMS = "ms" + metricUnitCount = "1" ) // These are effectively const, but for testing purposes they are mutable @@ -98,12 +104,24 @@ var ( }, recordedPerAttempt: true, }, + metricNameFirstRespLatencies: { + additionalAttrs: []string{ + metricLabelKeyStatus, + }, + recordedPerAttempt: false, + }, metricNameRetryCount: { additionalAttrs: []string{ metricLabelKeyStatus, }, recordedPerAttempt: true, }, + metricNameConnErrCount: { + additionalAttrs: []string{ + metricLabelKeyStatus, + }, + recordedPerAttempt: true, + }, } // Generates unique client ID in the format go-@ @@ -137,7 +155,10 @@ type builtinMetricsTracerFactory struct { operationLatencies metric.Float64Histogram serverLatencies metric.Float64Histogram attemptLatencies metric.Float64Histogram - retryCount metric.Int64Counter + firstRespLatencies metric.Float64Histogram + + retryCount metric.Int64Counter + connErrCount metric.Int64Counter } func newBuiltinMetricsTracerFactory(ctx context.Context, project, instance, appProfile string, metricsProvider MetricsProvider) (*builtinMetricsTracerFactory, error) { @@ -207,7 +228,7 @@ func (tf *builtinMetricsTracerFactory) createInstruments(meter metric.Meter) err tf.operationLatencies, err = meter.Float64Histogram( metricNameOperationLatencies, metric.WithDescription("Total time until final operation success or failure, including retries and backoff."), - metric.WithUnit("ms"), + metric.WithUnit(metricUnitMS), metric.WithExplicitBucketBoundaries(bucketBounds...), ) if err != nil { @@ -218,7 +239,7 @@ func (tf *builtinMetricsTracerFactory) createInstruments(meter metric.Meter) err tf.attemptLatencies, err = meter.Float64Histogram( metricNameAttemptLatencies, metric.WithDescription("Client observed latency per RPC attempt."), - metric.WithUnit("ms"), + metric.WithUnit(metricUnitMS), metric.WithExplicitBucketBoundaries(bucketBounds...), ) if err != nil { @@ -229,7 +250,18 @@ func (tf *builtinMetricsTracerFactory) createInstruments(meter metric.Meter) err tf.serverLatencies, err = meter.Float64Histogram( metricNameServerLatencies, metric.WithDescription("The latency measured from the moment that the RPC entered the Google data center until the RPC was completed."), - metric.WithUnit("ms"), + metric.WithUnit(metricUnitMS), + metric.WithExplicitBucketBoundaries(bucketBounds...), + ) + if err != nil { + return err + } + + // Create first_response_latencies + tf.firstRespLatencies, err = meter.Float64Histogram( + metricNameFirstRespLatencies, + metric.WithDescription("Latency from operation start until the response headers were received. The publishing of the measurement will be delayed until the attempt response has been received."), + metric.WithUnit(metricUnitMS), metric.WithExplicitBucketBoundaries(bucketBounds...), ) if err != nil { @@ -240,6 +272,14 @@ func (tf *builtinMetricsTracerFactory) createInstruments(meter metric.Meter) err tf.retryCount, err = meter.Int64Counter( metricNameRetryCount, metric.WithDescription("The number of additional RPCs sent after the initial attempt."), + metric.WithUnit(metricUnitCount), + ) + + // Create connectivity_error_count + tf.connErrCount, err = meter.Int64Counter( + metricNameConnErrCount, + metric.WithDescription("Number of requests that failed to reach the Google datacenter. (Requests without google response headers"), + metric.WithUnit(metricUnitCount), ) return err } @@ -258,7 +298,9 @@ type builtinMetricsTracer struct { instrumentOperationLatencies metric.Float64Histogram instrumentServerLatencies metric.Float64Histogram instrumentAttemptLatencies metric.Float64Histogram + instrumentFirstRespLatencies metric.Float64Histogram instrumentRetryCount metric.Int64Counter + instrumentConnErrCount metric.Int64Counter tableName string method string @@ -268,11 +310,16 @@ type builtinMetricsTracer struct { } // opTracer is used to record metrics for the entire operation, including retries. +// Operation is a logical unit that represents a single method invocation on client. +// The method might require multiple attempts/rpcs and backoff logic to complete type opTracer struct { attemptCount int64 startTime time.Time + // Only for ReadRows. Time when the response headers are received in a streaming RPC. + firstRespTime time.Time + // gRPC status code of last completed attempt status string @@ -281,7 +328,10 @@ type opTracer struct { func (o *opTracer) setStartTime(t time.Time) { o.startTime = t +} +func (o *opTracer) setFirstRespTime(t time.Time) { + o.firstRespTime = t } func (o *opTracer) setStatus(status string) { @@ -293,6 +343,7 @@ func (o *opTracer) incrementAttemptCount() { } // attemptTracer is used to record metrics for each individual attempt of the operation. +// Attempt corresponds to an attempt of an RPC. type attemptTracer struct { startTime time.Time clusterID string @@ -348,7 +399,9 @@ func (tf *builtinMetricsTracerFactory) createBuiltinMetricsTracer(ctx context.Co instrumentOperationLatencies: tf.operationLatencies, instrumentServerLatencies: tf.serverLatencies, instrumentAttemptLatencies: tf.attemptLatencies, + instrumentFirstRespLatencies: tf.firstRespLatencies, instrumentRetryCount: tf.retryCount, + instrumentConnErrCount: tf.connErrCount, tableName: tableName, isStreaming: isStreaming, From f084993032e297a57f00f9a305b109ef6767270b Mon Sep 17 00:00:00 2001 From: Baha Aiman Date: Thu, 1 Aug 2024 03:25:07 +0000 Subject: [PATCH 2/6] test(bigtable): Fixing tests --- bigtable/metrics_test.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/bigtable/metrics_test.go b/bigtable/metrics_test.go index 5547e7a5091f..6c9b7a108ef1 100644 --- a/bigtable/metrics_test.go +++ b/bigtable/metrics_test.go @@ -86,11 +86,13 @@ func TestNewBuiltinMetricsTracerFactory(t *testing.T) { attribute.String(metricLabelKeyClientUID, clientUID), attribute.String(metricLabelKeyClientName, clientName), } - wantMetricNamesStdout := []string{metricNameAttemptLatencies, metricNameAttemptLatencies, metricNameOperationLatencies, metricNameRetryCount, metricNameServerLatencies} + + wantMetricNames := []string{metricNameAttemptLatencies, metricNameAttemptLatencies, metricNameConnErrCount, metricNameConnErrCount, metricNameFirstRespLatencies, metricNameOperationLatencies, metricNameRetryCount, metricNameServerLatencies} wantMetricTypesGCM := []string{} - for _, wantMetricName := range wantMetricNamesStdout { + for _, wantMetricName := range wantMetricNames { wantMetricTypesGCM = append(wantMetricTypesGCM, builtInMetricsMeterName+wantMetricName) } + sort.Strings(wantMetricTypesGCM) // Reduce sampling period to reduce test run time origSamplePeriod := defaultSamplePeriod @@ -196,6 +198,8 @@ func TestNewBuiltinMetricsTracerFactory(t *testing.T) { gotNonNilInstruments := gotClient.metricsTracerFactory.operationLatencies != nil && gotClient.metricsTracerFactory.serverLatencies != nil && gotClient.metricsTracerFactory.attemptLatencies != nil && + gotClient.metricsTracerFactory.firstRespLatencies != nil && + gotClient.metricsTracerFactory.connErrCount != nil && gotClient.metricsTracerFactory.retryCount != nil if test.wantBuiltinEnabled != gotNonNilInstruments { t.Errorf("NonNilInstruments: got: %v, want: %v", gotNonNilInstruments, test.wantBuiltinEnabled) @@ -232,7 +236,7 @@ func TestNewBuiltinMetricsTracerFactory(t *testing.T) { } sort.Strings(gotMetricTypes) if !testutil.Equal(gotMetricTypes, wantMetricTypesGCM) { - t.Errorf("Metric types missing in req. got: %v, want: %v", gotMetricTypes, wantMetricTypesGCM) + t.Errorf("Metric types missing in req. \ngot: %v, \nwant: %v\ndiff: %v", gotMetricTypes, wantMetricTypesGCM, testutil.Diff(gotMetricTypes, wantMetricTypesGCM)) } } From 39abeaa91c2dd2600ef75342e6a640057b08194f Mon Sep 17 00:00:00 2001 From: Baha Aiman Date: Wed, 21 Aug 2024 05:02:26 +0000 Subject: [PATCH 3/6] refactor(bigtable): Refactoring code --- bigtable/bigtable.go | 7 +++++-- bigtable/metric_util.go | 2 ++ bigtable/metrics.go | 9 ++++++++- 3 files changed, 15 insertions(+), 3 deletions(-) diff --git a/bigtable/bigtable.go b/bigtable/bigtable.go index 20e8108b2393..d63dbaab25a6 100644 --- a/bigtable/bigtable.go +++ b/bigtable/bigtable.go @@ -398,9 +398,9 @@ func (t *Table) readRows(ctx context.Context, arg RowSet, f func(Row) bool, mt * // Ignore error since header is only being used to record builtin metrics // Failure to record metrics should not fail the operation *headerMD, _ = stream.Header() - mt.currOp.setFirstRespTime(time.Now()) for { res, err := stream.Recv() + mt.currOp.setFirstRespTime(time.Now()) if err == io.EOF { *trailerMD = stream.Trailer() break @@ -1595,7 +1595,8 @@ func gaxInvokeWithRecorder(ctx context.Context, mt *builtinMetricsTracer, method // Get location attributes from metadata and set it in tracer // Ignore get location error since the metric can still be recorded with rest of the attributes - clusterID, zoneID, _ := extractLocation(attemptHeaderMD, attempTrailerMD) + clusterID, zoneID, locationErr := extractLocation(attemptHeaderMD, attempTrailerMD) + mt.currOp.currAttempt.setLocationErr(locationErr) mt.currOp.currAttempt.setClusterID(clusterID) mt.currOp.currAttempt.setZoneID(zoneID) @@ -1631,6 +1632,8 @@ func recordAttemptCompletion(mt *builtinMetricsTracer) { serverLatAttrs, _ := mt.toOtelMetricAttrs(metricNameServerLatencies) if mt.currOp.currAttempt.serverLatencyErr == nil { mt.instrumentServerLatencies.Record(mt.ctx, mt.currOp.currAttempt.serverLatency, metric.WithAttributes(serverLatAttrs...)) + } + if mt.currOp.currAttempt.serverLatencyErr == nil && mt.currOp.currAttempt.locationErr == nil { mt.instrumentConnErrCount.Add(mt.ctx, 0, metric.WithAttributes(connErrCountAttrs...)) } else { mt.instrumentConnErrCount.Add(mt.ctx, 1, metric.WithAttributes(connErrCountAttrs...)) diff --git a/bigtable/metric_util.go b/bigtable/metric_util.go index a4631f8b5b74..a3519915d5dd 100644 --- a/bigtable/metric_util.go +++ b/bigtable/metric_util.go @@ -64,6 +64,8 @@ func extractServerLatency(headerMD metadata.MD, trailerMD metadata.MD) (float64, } // Obtain cluster and zone from response metadata +// Check both headers and trailers because in different environments the metadata could +// be returned in headers or trailers func extractLocation(headerMD metadata.MD, trailerMD metadata.MD) (string, string, error) { var locationMetadata []string diff --git a/bigtable/metrics.go b/bigtable/metrics.go index f5778f3fb39b..d4fdb38f7e31 100644 --- a/bigtable/metrics.go +++ b/bigtable/metrics.go @@ -355,8 +355,11 @@ type attemptTracer struct { // Server latency in ms serverLatency float64 - // Error seen while getting server latency from headers + // Error seen while getting server latency from headers / trailers serverLatencyErr error + + // Error seen while getting location (cluster and zone) from headers / trailers + locationErr error } func (a *attemptTracer) setStartTime(t time.Time) { @@ -371,6 +374,10 @@ func (a *attemptTracer) setZoneID(zoneID string) { a.zoneID = zoneID } +func (a *attemptTracer) setLocationErr(err error) { + a.locationErr = err +} + func (a *attemptTracer) setStatus(status string) { a.status = status } From f61c02dea9b03968dac0266de0a6cf44a9749280 Mon Sep 17 00:00:00 2001 From: Baha Aiman Date: Mon, 9 Sep 2024 21:11:42 +0000 Subject: [PATCH 4/6] feat(bigtable): Record only on first response --- bigtable/bigtable.go | 9 +++++---- bigtable/metrics.go | 8 ++++++-- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/bigtable/bigtable.go b/bigtable/bigtable.go index a2609e565b38..01ba55c201a8 100644 --- a/bigtable/bigtable.go +++ b/bigtable/bigtable.go @@ -49,9 +49,10 @@ import ( ) const ( - prodAddr = "bigtable.googleapis.com:443" - mtlsProdAddr = "bigtable.mtls.googleapis.com:443" - methodNameReadRows = "ReadRows" + prodAddr = "bigtable.googleapis.com:443" + mtlsProdAddr = "bigtable.mtls.googleapis.com:443" + featureFlagsHeaderKey = "bigtable-features" + methodNameReadRows = "ReadRows" ) // Client is a client for reading and writing data to tables in an instance. @@ -423,7 +424,7 @@ func (t *Table) readRows(ctx context.Context, arg RowSet, f func(Row) bool, mt * for { proto.Reset(res) err := stream.RecvMsg(res) - mt.currOp.setFirstRespTime(time.Now()) + mt.currOp.setFirstRespTime(time.Now()) if err == io.EOF { *trailerMD = stream.Trailer() break diff --git a/bigtable/metrics.go b/bigtable/metrics.go index d4fdb38f7e31..f5bc92ca1deb 100644 --- a/bigtable/metrics.go +++ b/bigtable/metrics.go @@ -22,6 +22,7 @@ import ( "fmt" "log" "os" + "sync" "time" "cloud.google.com/go/bigtable/internal" @@ -318,7 +319,8 @@ type opTracer struct { startTime time.Time // Only for ReadRows. Time when the response headers are received in a streaming RPC. - firstRespTime time.Time + firstRespTime time.Time + firstRespTimeOnce sync.Once // gRPC status code of last completed attempt status string @@ -331,7 +333,9 @@ func (o *opTracer) setStartTime(t time.Time) { } func (o *opTracer) setFirstRespTime(t time.Time) { - o.firstRespTime = t + o.firstRespTimeOnce.Do(func() { + o.firstRespTime = t + }) } func (o *opTracer) setStatus(status string) { From 62320281000d4d4361325d1f4b31e7cbebcad8f4 Mon Sep 17 00:00:00 2001 From: Baha Aiman Date: Fri, 20 Dec 2024 21:30:36 +0000 Subject: [PATCH 5/6] resolve merge conflicts --- bigtable/bigtable.go | 9 --------- 1 file changed, 9 deletions(-) diff --git a/bigtable/bigtable.go b/bigtable/bigtable.go index ed90aa20ed27..8644648ff6ea 100644 --- a/bigtable/bigtable.go +++ b/bigtable/bigtable.go @@ -1637,18 +1637,9 @@ func gaxInvokeWithRecorder(ctx context.Context, mt *builtinMetricsTracer, method // f makes calls to CBT service err := f(ctx, &attemptHeaderMD, &attempTrailerMD, callSettings) -<<<<<<< HEAD - // Get location attributes from metadata and set it in tracer - // Ignore get location error since the metric can still be recorded with rest of the attributes - clusterID, zoneID, locationErr := extractLocation(attemptHeaderMD, attempTrailerMD) - mt.currOp.currAttempt.setLocationErr(locationErr) - mt.currOp.currAttempt.setClusterID(clusterID) - mt.currOp.currAttempt.setZoneID(zoneID) -======= // Set attempt status statusCode, _ := convertToGrpcStatusErr(err) mt.currOp.currAttempt.setStatus(statusCode.String()) ->>>>>>> main // Get location attributes from metadata and set it in tracer // Ignore get location error since the metric can still be recorded with rest of the attributes From 4145c3a5955b7f00f2262177c2202a596dab7541 Mon Sep 17 00:00:00 2001 From: Baha Aiman Date: Mon, 23 Dec 2024 07:09:15 +0000 Subject: [PATCH 6/6] feat(bigtable): Fixing retry behaviour --- bigtable/bigtable.go | 6 +++--- bigtable/metrics.go | 7 ------- bigtable/metrics_test.go | 4 ++-- 3 files changed, 5 insertions(+), 12 deletions(-) diff --git a/bigtable/bigtable.go b/bigtable/bigtable.go index 8644648ff6ea..f3f1efbda3f7 100644 --- a/bigtable/bigtable.go +++ b/bigtable/bigtable.go @@ -440,7 +440,9 @@ func (t *Table) readRows(ctx context.Context, arg RowSet, f func(Row) bool, mt * for { proto.Reset(res) err := stream.RecvMsg(res) - mt.currOp.setFirstRespTime(time.Now()) + if err == nil || err == io.EOF { + mt.currOp.setFirstRespTime(time.Now()) + } if err == io.EOF { *trailerMD = stream.Trailer() break @@ -1680,8 +1682,6 @@ func recordAttemptCompletion(mt *builtinMetricsTracer) { serverLatAttrs, _ := mt.toOtelMetricAttrs(metricNameServerLatencies) if mt.currOp.currAttempt.serverLatencyErr == nil { mt.instrumentServerLatencies.Record(mt.ctx, mt.currOp.currAttempt.serverLatency, metric.WithAttributes(serverLatAttrs...)) - } - if mt.currOp.currAttempt.serverLatencyErr == nil && mt.currOp.currAttempt.locationErr == nil { mt.instrumentConnErrCount.Add(mt.ctx, 0, metric.WithAttributes(connErrCountAttrs...)) } else { mt.instrumentConnErrCount.Add(mt.ctx, 1, metric.WithAttributes(connErrCountAttrs...)) diff --git a/bigtable/metrics.go b/bigtable/metrics.go index 6eb258e18072..5f590b3db316 100644 --- a/bigtable/metrics.go +++ b/bigtable/metrics.go @@ -382,9 +382,6 @@ type attemptTracer struct { // Error seen while getting server latency from headers / trailers serverLatencyErr error - - // Error seen while getting location (cluster and zone) from headers / trailers - locationErr error } func (a *attemptTracer) setStartTime(t time.Time) { @@ -399,10 +396,6 @@ func (a *attemptTracer) setZoneID(zoneID string) { a.zoneID = zoneID } -func (a *attemptTracer) setLocationErr(err error) { - a.locationErr = err -} - func (a *attemptTracer) setStatus(status string) { a.status = status } diff --git a/bigtable/metrics_test.go b/bigtable/metrics_test.go index 2e7a096ca612..367059d9b1d1 100644 --- a/bigtable/metrics_test.go +++ b/bigtable/metrics_test.go @@ -92,9 +92,9 @@ func TestNewBuiltinMetricsTracerFactory(t *testing.T) { attribute.String(metricLabelKeyClientName, clientName), } - wantMetricNames := []string{metricNameAttemptLatencies, metricNameAttemptLatencies, metricNameConnErrCount, metricNameConnErrCount, metricNameFirstRespLatencies, metricNameOperationLatencies, metricNameRetryCount, metricNameServerLatencies} + wantMetricNamesStdout := []string{metricNameAttemptLatencies, metricNameAttemptLatencies, metricNameConnErrCount, metricNameConnErrCount, metricNameFirstRespLatencies, metricNameOperationLatencies, metricNameRetryCount, metricNameServerLatencies} wantMetricTypesGCM := []string{} - for _, wantMetricName := range wantMetricNames { + for _, wantMetricName := range wantMetricNamesStdout { wantMetricTypesGCM = append(wantMetricTypesGCM, builtInMetricsMeterName+wantMetricName) } sort.Strings(wantMetricTypesGCM)