Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bigtable): first_response_latencies and connectivity_error_count metrics #10616

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
23 changes: 19 additions & 4 deletions bigtable/bigtable.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,11 @@ import (
_ "google.golang.org/grpc/balancer/rls"
)

const prodAddr = "bigtable.googleapis.com:443"
const mtlsProdAddr = "bigtable.mtls.googleapis.com:443"
const (
prodAddr = "bigtable.googleapis.com:443"
mtlsProdAddr = "bigtable.mtls.googleapis.com:443"
methodNameReadRows = "ReadRows"
)

// Client is a client for reading and writing data to tables in an instance.
//
Expand Down Expand Up @@ -353,7 +356,7 @@ func (t *Table) ReadRows(ctx context.Context, arg RowSet, f func(Row) bool, opts
func (t *Table) readRows(ctx context.Context, arg RowSet, f func(Row) bool, mt *builtinMetricsTracer, opts ...ReadOption) (err error) {
var prevRowKey string
attrMap := make(map[string]interface{})
err = gaxInvokeWithRecorder(ctx, mt, "ReadRows", func(ctx context.Context, headerMD, trailerMD *metadata.MD, _ gax.CallSettings) error {
err = gaxInvokeWithRecorder(ctx, mt, methodNameReadRows, func(ctx context.Context, headerMD, trailerMD *metadata.MD, _ gax.CallSettings) error {
req := &btpb.ReadRowsRequest{
AppProfileId: t.c.appProfile,
}
Expand Down Expand Up @@ -395,6 +398,7 @@ func (t *Table) readRows(ctx context.Context, arg RowSet, f func(Row) bool, mt *
// Ignore error since header is only being used to record builtin metrics
// Failure to record metrics should not fail the operation
*headerMD, _ = stream.Header()
mt.currOp.setFirstRespTime(time.Now())
for {
res, err := stream.Recv()
if err == io.EOF {
Expand Down Expand Up @@ -1552,6 +1556,13 @@ func recordOperationCompletion(mt *builtinMetricsTracer) {
// graph will be less confusing
mt.instrumentRetryCount.Add(mt.ctx, mt.currOp.attemptCount-1, metric.WithAttributes(retryCntAttrs...))
}

// Record first_reponse_latencies
firstRespLatAttrs, _ := mt.toOtelMetricAttrs(metricNameRetryCount)
if mt.method == methodNameReadRows {
elapsedTimeMs = convertToMs(mt.currOp.firstRespTime.Sub(mt.currOp.startTime))
mt.instrumentFirstRespLatencies.Record(mt.ctx, elapsedTimeMs, metric.WithAttributes(firstRespLatAttrs...))
}
}

// gaxInvokeWithRecorder:
Expand Down Expand Up @@ -1615,9 +1626,13 @@ func recordAttemptCompletion(mt *builtinMetricsTracer) {
attemptLatAttrs, _ := mt.toOtelMetricAttrs(metricNameAttemptLatencies)
mt.instrumentAttemptLatencies.Record(mt.ctx, elapsedTime, metric.WithAttributes(attemptLatAttrs...))

// Record server_latencies
// Record server_latencies and connectivity_error_count
connErrCountAttrs, _ := mt.toOtelMetricAttrs(metricNameConnErrCount)
serverLatAttrs, _ := mt.toOtelMetricAttrs(metricNameServerLatencies)
if mt.currOp.currAttempt.serverLatencyErr == nil {
mt.instrumentServerLatencies.Record(mt.ctx, mt.currOp.currAttempt.serverLatency, metric.WithAttributes(serverLatAttrs...))
mt.instrumentConnErrCount.Add(mt.ctx, 0, metric.WithAttributes(connErrCountAttrs...))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you need to add a zero?

Copy link
Contributor Author

@bhshkh bhshkh Sep 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

} else {
mt.instrumentConnErrCount.Add(mt.ctx, 1, metric.WithAttributes(connErrCountAttrs...))
}
}
15 changes: 10 additions & 5 deletions bigtable/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -860,11 +860,14 @@ func TestIntegration_ExportBuiltInMetrics(t *testing.T) {
t.Fatalf("Apply: %v", err)
}
}
err = table.ReadRows(ctx, PrefixRange("row-"), func(r Row) bool {
return true
}, RowFilter(ColumnFilter("col")))
if err != nil {
t.Fatalf("ReadRows: %v", err)

for i := 0; i < 10; i++ {
err = table.ReadRows(ctx, PrefixRange("row-"), func(r Row) bool {
return true
}, RowFilter(ColumnFilter("col")))
if err != nil {
t.Fatalf("ReadRows: %v", err)
}
}

// Validate that metrics are exported
Expand All @@ -885,6 +888,8 @@ func TestIntegration_ExportBuiltInMetrics(t *testing.T) {
metricNameOperationLatencies,
metricNameAttemptLatencies,
metricNameServerLatencies,
metricNameFirstRespLatencies,
metricNameConnErrCount,
}

// Try for 5m with 10s sleep between retries
Expand Down
48 changes: 47 additions & 1 deletion bigtable/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ const (
metricNameOperationLatencies = "operation_latencies"
metricNameAttemptLatencies = "attempt_latencies"
metricNameServerLatencies = "server_latencies"
metricNameFirstRespLatencies = "first_response_latencies"
metricNameRetryCount = "retry_count"
metricNameConnErrCount = "connectivity_error_count"

// Metric units
metricUnitMS = "ms"
Expand Down Expand Up @@ -102,12 +104,24 @@ var (
},
recordedPerAttempt: true,
},
metricNameFirstRespLatencies: {
additionalAttrs: []string{
metricLabelKeyStatus,
},
recordedPerAttempt: false,
},
metricNameRetryCount: {
additionalAttrs: []string{
metricLabelKeyStatus,
},
recordedPerAttempt: true,
},
metricNameConnErrCount: {
additionalAttrs: []string{
metricLabelKeyStatus,
},
recordedPerAttempt: true,
},
}

// Generates unique client ID in the format go-<random UUID>@<hostname>
Expand Down Expand Up @@ -141,7 +155,10 @@ type builtinMetricsTracerFactory struct {
operationLatencies metric.Float64Histogram
serverLatencies metric.Float64Histogram
attemptLatencies metric.Float64Histogram
retryCount metric.Int64Counter
firstRespLatencies metric.Float64Histogram

retryCount metric.Int64Counter
connErrCount metric.Int64Counter
}

func newBuiltinMetricsTracerFactory(ctx context.Context, project, instance, appProfile string, metricsProvider MetricsProvider) (*builtinMetricsTracerFactory, error) {
Expand Down Expand Up @@ -240,12 +257,30 @@ func (tf *builtinMetricsTracerFactory) createInstruments(meter metric.Meter) err
return err
}

// Create first_response_latencies
tf.firstRespLatencies, err = meter.Float64Histogram(
metricNameFirstRespLatencies,
metric.WithDescription("Latency from operation start until the response headers were received. The publishing of the measurement will be delayed until the attempt response has been received."),
metric.WithUnit(metricUnitMS),
metric.WithExplicitBucketBoundaries(bucketBounds...),
)
if err != nil {
return err
}

// Create retry_count
tf.retryCount, err = meter.Int64Counter(
metricNameRetryCount,
metric.WithDescription("The number of additional RPCs sent after the initial attempt."),
metric.WithUnit(metricUnitCount),
)

// Create connectivity_error_count
tf.connErrCount, err = meter.Int64Counter(
metricNameConnErrCount,
metric.WithDescription("Number of requests that failed to reach the Google datacenter. (Requests without google response headers"),
metric.WithUnit(metricUnitCount),
)
return err
}

Expand All @@ -263,7 +298,9 @@ type builtinMetricsTracer struct {
instrumentOperationLatencies metric.Float64Histogram
instrumentServerLatencies metric.Float64Histogram
instrumentAttemptLatencies metric.Float64Histogram
instrumentFirstRespLatencies metric.Float64Histogram
instrumentRetryCount metric.Int64Counter
instrumentConnErrCount metric.Int64Counter

tableName string
method string
Expand All @@ -280,6 +317,9 @@ type opTracer struct {

startTime time.Time

// Only for ReadRows. Time when the response headers are received in a streaming RPC.
firstRespTime time.Time

// gRPC status code of last completed attempt
status string

Expand All @@ -290,6 +330,10 @@ func (o *opTracer) setStartTime(t time.Time) {
o.startTime = t
}

func (o *opTracer) setFirstRespTime(t time.Time) {
o.firstRespTime = t
}

func (o *opTracer) setStatus(status string) {
o.status = status
}
Expand Down Expand Up @@ -355,7 +399,9 @@ func (tf *builtinMetricsTracerFactory) createBuiltinMetricsTracer(ctx context.Co
instrumentOperationLatencies: tf.operationLatencies,
instrumentServerLatencies: tf.serverLatencies,
instrumentAttemptLatencies: tf.attemptLatencies,
instrumentFirstRespLatencies: tf.firstRespLatencies,
instrumentRetryCount: tf.retryCount,
instrumentConnErrCount: tf.connErrCount,

tableName: tableName,
isStreaming: isStreaming,
Expand Down
10 changes: 7 additions & 3 deletions bigtable/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,13 @@ func TestNewBuiltinMetricsTracerFactory(t *testing.T) {
attribute.String(metricLabelKeyClientUID, clientUID),
attribute.String(metricLabelKeyClientName, clientName),
}
wantMetricNamesStdout := []string{metricNameAttemptLatencies, metricNameAttemptLatencies, metricNameOperationLatencies, metricNameRetryCount, metricNameServerLatencies}

wantMetricNames := []string{metricNameAttemptLatencies, metricNameAttemptLatencies, metricNameConnErrCount, metricNameConnErrCount, metricNameFirstRespLatencies, metricNameOperationLatencies, metricNameRetryCount, metricNameServerLatencies}
wantMetricTypesGCM := []string{}
for _, wantMetricName := range wantMetricNamesStdout {
for _, wantMetricName := range wantMetricNames {
wantMetricTypesGCM = append(wantMetricTypesGCM, builtInMetricsMeterName+wantMetricName)
}
sort.Strings(wantMetricTypesGCM)

// Reduce sampling period to reduce test run time
origSamplePeriod := defaultSamplePeriod
Expand Down Expand Up @@ -196,6 +198,8 @@ func TestNewBuiltinMetricsTracerFactory(t *testing.T) {
gotNonNilInstruments := gotClient.metricsTracerFactory.operationLatencies != nil &&
gotClient.metricsTracerFactory.serverLatencies != nil &&
gotClient.metricsTracerFactory.attemptLatencies != nil &&
gotClient.metricsTracerFactory.firstRespLatencies != nil &&
gotClient.metricsTracerFactory.connErrCount != nil &&
gotClient.metricsTracerFactory.retryCount != nil
if test.wantBuiltinEnabled != gotNonNilInstruments {
t.Errorf("NonNilInstruments: got: %v, want: %v", gotNonNilInstruments, test.wantBuiltinEnabled)
Expand Down Expand Up @@ -232,7 +236,7 @@ func TestNewBuiltinMetricsTracerFactory(t *testing.T) {
}
sort.Strings(gotMetricTypes)
if !testutil.Equal(gotMetricTypes, wantMetricTypesGCM) {
t.Errorf("Metric types missing in req. got: %v, want: %v", gotMetricTypes, wantMetricTypesGCM)
t.Errorf("Metric types missing in req. \ngot: %v, \nwant: %v\ndiff: %v", gotMetricTypes, wantMetricTypesGCM, testutil.Diff(gotMetricTypes, wantMetricTypesGCM))
}
}

Expand Down
Loading