Skip to content

Commit

Permalink
Merge pull request #1429 from authzed/crdb-follower-stats
Browse files Browse the repository at this point in the history
crdb: use follower reads for statistics
  • Loading branch information
ecordell authored Jul 7, 2023
2 parents 9903c5e + fb1d6b1 commit ed2a1a5
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 13 deletions.
35 changes: 22 additions & 13 deletions internal/datastore/crdb/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,42 +26,51 @@ const (

var (
queryReadUniqueID = psql.Select(colUniqueID).From(tableMetadata)
queryRelationshipEstimate = fmt.Sprintf("SELECT COALESCE(SUM(%s), 0) FROM %s", colCount, tableCounters)
queryRelationshipEstimate = fmt.Sprintf("SELECT COALESCE(SUM(%s), 0) FROM %s AS OF SYSTEM TIME follower_read_timestamp()", colCount, tableCounters)

upsertCounterQuery = psql.Insert(tableCounters).Columns(
colID,
colCount,
).Suffix(fmt.Sprintf("ON CONFLICT (%[1]s) DO UPDATE SET %[2]s = %[3]s.%[2]s + EXCLUDED.%[2]s RETURNING cluster_logical_timestamp()", colID, colCount, tableCounters))

rng = rand.NewSource(time.Now().UnixNano())

uniqueID string
)

func (cds *crdbDatastore) Statistics(ctx context.Context) (datastore.Stats, error) {
sql, args, err := queryReadUniqueID.ToSql()
if err != nil {
return datastore.Stats{}, fmt.Errorf("unable to prepare unique ID sql: %w", err)
if len(uniqueID) == 0 {
sql, args, err := queryReadUniqueID.ToSql()
if err != nil {
return datastore.Stats{}, fmt.Errorf("unable to prepare unique ID sql: %w", err)
}
if err := cds.readPool.QueryRowFunc(ctx, func(ctx context.Context, row pgx.Row) error {
return row.Scan(&uniqueID)
}, sql, args...); err != nil {
return datastore.Stats{}, fmt.Errorf("unable to query unique ID: %w", err)
}
}

var uniqueID string
var nsDefs []datastore.RevisionedNamespace
var relCount uint64

if err := cds.readPool.BeginTxFunc(ctx, pgx.TxOptions{AccessMode: pgx.ReadOnly}, func(tx pgx.Tx) error {
if err := tx.QueryRow(ctx, sql, args...).Scan(&uniqueID); err != nil {
return fmt.Errorf("unable to query unique ID: %w", err)
}
if err := cds.readPool.QueryRowFunc(ctx, func(ctx context.Context, row pgx.Row) error {
return row.Scan(&relCount)
}, queryRelationshipEstimate); err != nil {
return datastore.Stats{}, fmt.Errorf("unable to read relationship count: %w", err)
}

if err := tx.QueryRow(ctx, queryRelationshipEstimate).Scan(&relCount); err != nil {
return fmt.Errorf("unable to read relationship count: %w", err)
if err := cds.readPool.BeginTxFunc(ctx, pgx.TxOptions{AccessMode: pgx.ReadOnly}, func(tx pgx.Tx) error {
_, err := tx.Exec(ctx, "SET TRANSACTION AS OF SYSTEM TIME follower_read_timestamp()")
if err != nil {
return fmt.Errorf("unable to read namespaces: %w", err)
}

nsDefs, err = loadAllNamespaces(ctx, pgxcommon.QuerierFuncsFor(tx), func(sb squirrel.SelectBuilder, fromStr string) squirrel.SelectBuilder {
return sb.From(fromStr)
})
if err != nil {
return fmt.Errorf("unable to read namespaces: %w", err)
}

return nil
}); err != nil {
return datastore.Stats{}, err
Expand Down
4 changes: 4 additions & 0 deletions pkg/datastore/test/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ func StatsTest(t *testing.T, tester DatastoreTester) {

ds, _ = testfixtures.StandardDatastoreWithData(ds, require)

// stats use follower reads, need to wait a bit so that the base tables
// have a chance to be follower-read
time.Sleep(5 * time.Second)

for retryCount := statsRetryCount; retryCount >= 0; retryCount-- {
stats, err := ds.Statistics(ctx)
require.NoError(err)
Expand Down

0 comments on commit ed2a1a5

Please sign in to comment.