diff --git a/bigtable/bigtable.go b/bigtable/bigtable.go index cde8834bafd2..f3f1efbda3f7 100644 --- a/bigtable/bigtable.go +++ b/bigtable/bigtable.go @@ -49,9 +49,12 @@ import ( _ "google.golang.org/grpc/balancer/rls" ) -const prodAddr = "bigtable.googleapis.com:443" -const mtlsProdAddr = "bigtable.mtls.googleapis.com:443" -const featureFlagsHeaderKey = "bigtable-features" +const ( + prodAddr = "bigtable.googleapis.com:443" + mtlsProdAddr = "bigtable.mtls.googleapis.com:443" + featureFlagsHeaderKey = "bigtable-features" + methodNameReadRows = "ReadRows" +) // Client is a client for reading and writing data to tables in an instance. // @@ -391,7 +394,7 @@ func (t *Table) ReadRows(ctx context.Context, arg RowSet, f func(Row) bool, opts func (t *Table) readRows(ctx context.Context, arg RowSet, f func(Row) bool, mt *builtinMetricsTracer, opts ...ReadOption) (err error) { var prevRowKey string attrMap := make(map[string]interface{}) - err = gaxInvokeWithRecorder(ctx, mt, "ReadRows", func(ctx context.Context, headerMD, trailerMD *metadata.MD, _ gax.CallSettings) error { + err = gaxInvokeWithRecorder(ctx, mt, methodNameReadRows, func(ctx context.Context, headerMD, trailerMD *metadata.MD, _ gax.CallSettings) error { req := &btpb.ReadRowsRequest{ AppProfileId: t.c.appProfile, } @@ -437,6 +440,9 @@ func (t *Table) readRows(ctx context.Context, arg RowSet, f func(Row) bool, mt * for { proto.Reset(res) err := stream.RecvMsg(res) + if err == nil || err == io.EOF { + mt.currOp.setFirstRespTime(time.Now()) + } if err == io.EOF { *trailerMD = stream.Trailer() break @@ -1593,6 +1599,13 @@ func recordOperationCompletion(mt *builtinMetricsTracer) { // graph will be less confusing mt.instrumentRetryCount.Add(mt.ctx, mt.currOp.attemptCount-1, metric.WithAttributes(retryCntAttrs...)) } + + // Record first_reponse_latencies + firstRespLatAttrs, _ := mt.toOtelMetricAttrs(metricNameRetryCount) + if mt.method == methodNameReadRows { + elapsedTimeMs = convertToMs(mt.currOp.firstRespTime.Sub(mt.currOp.startTime)) + mt.instrumentFirstRespLatencies.Record(mt.ctx, elapsedTimeMs, metric.WithAttributes(firstRespLatAttrs...)) + } } // gaxInvokeWithRecorder: @@ -1664,9 +1677,13 @@ func recordAttemptCompletion(mt *builtinMetricsTracer) { attemptLatAttrs, _ := mt.toOtelMetricAttrs(metricNameAttemptLatencies) mt.instrumentAttemptLatencies.Record(mt.ctx, elapsedTime, metric.WithAttributes(attemptLatAttrs...)) - // Record server_latencies + // Record server_latencies and connectivity_error_count + connErrCountAttrs, _ := mt.toOtelMetricAttrs(metricNameConnErrCount) serverLatAttrs, _ := mt.toOtelMetricAttrs(metricNameServerLatencies) if mt.currOp.currAttempt.serverLatencyErr == nil { mt.instrumentServerLatencies.Record(mt.ctx, mt.currOp.currAttempt.serverLatency, metric.WithAttributes(serverLatAttrs...)) + mt.instrumentConnErrCount.Add(mt.ctx, 0, metric.WithAttributes(connErrCountAttrs...)) + } else { + mt.instrumentConnErrCount.Add(mt.ctx, 1, metric.WithAttributes(connErrCountAttrs...)) } } diff --git a/bigtable/integration_test.go b/bigtable/integration_test.go index 6ed532ca8f18..ff36c1bcaacb 100644 --- a/bigtable/integration_test.go +++ b/bigtable/integration_test.go @@ -912,11 +912,14 @@ func TestIntegration_ExportBuiltInMetrics(t *testing.T) { t.Fatalf("Apply: %v", err) } } - err = table.ReadRows(ctx, PrefixRange("row-"), func(r Row) bool { - return true - }, RowFilter(ColumnFilter("col"))) - if err != nil { - t.Fatalf("ReadRows: %v", err) + + for i := 0; i < 10; i++ { + err = table.ReadRows(ctx, PrefixRange("row-"), func(r Row) bool { + return true + }, RowFilter(ColumnFilter("col"))) + if err != nil { + t.Fatalf("ReadRows: %v", err) + } } // Validate that metrics are exported @@ -937,6 +940,8 @@ func TestIntegration_ExportBuiltInMetrics(t *testing.T) { metricNameOperationLatencies, metricNameAttemptLatencies, metricNameServerLatencies, + metricNameFirstRespLatencies, + metricNameConnErrCount, } // Try for 5m with 10s sleep between retries diff --git a/bigtable/metrics.go b/bigtable/metrics.go index 5a3b817abf7c..5f590b3db316 100644 --- a/bigtable/metrics.go +++ b/bigtable/metrics.go @@ -21,6 +21,7 @@ import ( "errors" "fmt" "os" + "sync" "time" "cloud.google.com/go/bigtable/internal" @@ -59,7 +60,9 @@ const ( metricNameOperationLatencies = "operation_latencies" metricNameAttemptLatencies = "attempt_latencies" metricNameServerLatencies = "server_latencies" + metricNameFirstRespLatencies = "first_response_latencies" metricNameRetryCount = "retry_count" + metricNameConnErrCount = "connectivity_error_count" metricNameDebugTags = "debug_tags" // Metric units @@ -105,12 +108,24 @@ var ( }, recordedPerAttempt: true, }, + metricNameFirstRespLatencies: { + additionalAttrs: []string{ + metricLabelKeyStatus, + }, + recordedPerAttempt: false, + }, metricNameRetryCount: { additionalAttrs: []string{ metricLabelKeyStatus, }, recordedPerAttempt: true, }, + metricNameConnErrCount: { + additionalAttrs: []string{ + metricLabelKeyStatus, + }, + recordedPerAttempt: true, + }, } // Generates unique client ID in the format go-@ @@ -149,8 +164,11 @@ type builtinMetricsTracerFactory struct { operationLatencies metric.Float64Histogram serverLatencies metric.Float64Histogram attemptLatencies metric.Float64Histogram - retryCount metric.Int64Counter - debugTags metric.Int64Counter + firstRespLatencies metric.Float64Histogram + + retryCount metric.Int64Counter + connErrCount metric.Int64Counter + debugTags metric.Int64Counter } func newBuiltinMetricsTracerFactory(ctx context.Context, project, instance, appProfile string, metricsProvider MetricsProvider, opts ...option.ClientOption) (*builtinMetricsTracerFactory, error) { @@ -250,12 +268,30 @@ func (tf *builtinMetricsTracerFactory) createInstruments(meter metric.Meter) err return err } + // Create first_response_latencies + tf.firstRespLatencies, err = meter.Float64Histogram( + metricNameFirstRespLatencies, + metric.WithDescription("Latency from operation start until the response headers were received. The publishing of the measurement will be delayed until the attempt response has been received."), + metric.WithUnit(metricUnitMS), + metric.WithExplicitBucketBoundaries(bucketBounds...), + ) + if err != nil { + return err + } + // Create retry_count tf.retryCount, err = meter.Int64Counter( metricNameRetryCount, metric.WithDescription("The number of additional RPCs sent after the initial attempt."), metric.WithUnit(metricUnitCount), ) + + // Create connectivity_error_count + tf.connErrCount, err = meter.Int64Counter( + metricNameConnErrCount, + metric.WithDescription("Number of requests that failed to reach the Google datacenter. (Requests without google response headers"), + metric.WithUnit(metricUnitCount), + ) if err != nil { return err } @@ -283,7 +319,9 @@ type builtinMetricsTracer struct { instrumentOperationLatencies metric.Float64Histogram instrumentServerLatencies metric.Float64Histogram instrumentAttemptLatencies metric.Float64Histogram + instrumentFirstRespLatencies metric.Float64Histogram instrumentRetryCount metric.Int64Counter + instrumentConnErrCount metric.Int64Counter instrumentDebugTags metric.Int64Counter tableName string @@ -301,6 +339,10 @@ type opTracer struct { startTime time.Time + // Only for ReadRows. Time when the response headers are received in a streaming RPC. + firstRespTime time.Time + firstRespTimeOnce sync.Once + // gRPC status code of last completed attempt status string @@ -311,6 +353,12 @@ func (o *opTracer) setStartTime(t time.Time) { o.startTime = t } +func (o *opTracer) setFirstRespTime(t time.Time) { + o.firstRespTimeOnce.Do(func() { + o.firstRespTime = t + }) +} + func (o *opTracer) setStatus(status string) { o.status = status } @@ -332,7 +380,7 @@ type attemptTracer struct { // Server latency in ms serverLatency float64 - // Error seen while getting server latency from headers + // Error seen while getting server latency from headers / trailers serverLatencyErr error } @@ -376,7 +424,9 @@ func (tf *builtinMetricsTracerFactory) createBuiltinMetricsTracer(ctx context.Co instrumentOperationLatencies: tf.operationLatencies, instrumentServerLatencies: tf.serverLatencies, instrumentAttemptLatencies: tf.attemptLatencies, + instrumentFirstRespLatencies: tf.firstRespLatencies, instrumentRetryCount: tf.retryCount, + instrumentConnErrCount: tf.connErrCount, instrumentDebugTags: tf.debugTags, tableName: tableName, diff --git a/bigtable/metrics_test.go b/bigtable/metrics_test.go index bb78829c027d..367059d9b1d1 100644 --- a/bigtable/metrics_test.go +++ b/bigtable/metrics_test.go @@ -91,11 +91,13 @@ func TestNewBuiltinMetricsTracerFactory(t *testing.T) { attribute.String(metricLabelKeyClientUID, clientUID), attribute.String(metricLabelKeyClientName, clientName), } - wantMetricNamesStdout := []string{metricNameAttemptLatencies, metricNameAttemptLatencies, metricNameOperationLatencies, metricNameRetryCount, metricNameServerLatencies} + + wantMetricNamesStdout := []string{metricNameAttemptLatencies, metricNameAttemptLatencies, metricNameConnErrCount, metricNameConnErrCount, metricNameFirstRespLatencies, metricNameOperationLatencies, metricNameRetryCount, metricNameServerLatencies} wantMetricTypesGCM := []string{} for _, wantMetricName := range wantMetricNamesStdout { wantMetricTypesGCM = append(wantMetricTypesGCM, builtInMetricsMeterName+wantMetricName) } + sort.Strings(wantMetricTypesGCM) // Reduce sampling period to reduce test run time origSamplePeriod := defaultSamplePeriod @@ -210,6 +212,8 @@ func TestNewBuiltinMetricsTracerFactory(t *testing.T) { gotNonNilInstruments := gotClient.metricsTracerFactory.operationLatencies != nil && gotClient.metricsTracerFactory.serverLatencies != nil && gotClient.metricsTracerFactory.attemptLatencies != nil && + gotClient.metricsTracerFactory.firstRespLatencies != nil && + gotClient.metricsTracerFactory.connErrCount != nil && gotClient.metricsTracerFactory.retryCount != nil if test.wantBuiltinEnabled != gotNonNilInstruments { t.Errorf("NonNilInstruments: got: %v, want: %v", gotNonNilInstruments, test.wantBuiltinEnabled) @@ -269,7 +273,7 @@ func TestNewBuiltinMetricsTracerFactory(t *testing.T) { } sort.Strings(gotMetricTypes) if !testutil.Equal(gotMetricTypes, wantMetricTypesGCM) { - t.Errorf("Metric types missing in req. got: %v, want: %v", gotMetricTypes, wantMetricTypesGCM) + t.Errorf("Metric types missing in req. \ngot: %v, \nwant: %v\ndiff: %v", gotMetricTypes, wantMetricTypesGCM, testutil.Diff(gotMetricTypes, wantMetricTypesGCM)) } } diff --git a/bigtable/metrics_util.go b/bigtable/metrics_util.go index 64a1fb50e4d2..8549ab609ad2 100644 --- a/bigtable/metrics_util.go +++ b/bigtable/metrics_util.go @@ -65,6 +65,8 @@ func extractServerLatency(headerMD metadata.MD, trailerMD metadata.MD) (float64, } // Obtain cluster and zone from response metadata +// Check both headers and trailers because in different environments the metadata could +// be returned in headers or trailers func extractLocation(headerMD metadata.MD, trailerMD metadata.MD) (string, string, error) { var locationMetadata []string