diff --git a/internal/datastore/crdb/watch.go b/internal/datastore/crdb/watch.go index 0cd8749daa..58dbb90e63 100644 --- a/internal/datastore/crdb/watch.go +++ b/internal/datastore/crdb/watch.go @@ -24,6 +24,7 @@ import ( const ( queryChangefeed = "EXPERIMENTAL CHANGEFEED FOR %s WITH updated, cursor = '%s', resolved = '%s', min_checkpoint_frequency = '0';" queryChangefeedPreV22 = "EXPERIMENTAL CHANGEFEED FOR %s WITH updated, cursor = '%s', resolved = '%s';" + watchConnectTimeout = 1 * time.Second ) var retryHistogram = prometheus.NewHistogram(prometheus.HistogramOpts{ @@ -93,7 +94,7 @@ func (cds *crdbDatastore) watch( // changefeed data, instead of using a connection pool as most client // drivers do by default." // see: https://www.cockroachlabs.com/docs/v22.2/changefeed-for#considerations - conn, err := pgxcommon.ConnectWithInstrumentation(ctx, cds.dburl) + conn, err := pgxcommon.ConnectWithInstrumentationAndTimeout(ctx, cds.dburl, watchConnectTimeout) if err != nil { errs <- err return diff --git a/internal/datastore/postgres/common/pgx.go b/internal/datastore/postgres/common/pgx.go index fbc4b05b0a..47f3a4c474 100644 --- a/internal/datastore/postgres/common/pgx.go +++ b/internal/datastore/postgres/common/pgx.go @@ -103,6 +103,17 @@ func ConnectWithInstrumentation(ctx context.Context, url string) (*pgx.Conn, err return pgx.ConnectConfig(ctx, connConfig) } +// ConnectWithInstrumentationAndTimeout returns a pgx.Conn that has been instrumented for observability +func ConnectWithInstrumentationAndTimeout(ctx context.Context, url string, connectTimeout time.Duration) (*pgx.Conn, error) { + connConfig, err := ParseConfigWithInstrumentation(url) + if err != nil { + return nil, err + } + + connConfig.ConnectTimeout = connectTimeout + return pgx.ConnectConfig(ctx, connConfig) +} + // ConfigurePGXLogger sets zerolog global logger into the connection pool configuration, and maps // info level events to debug, as they are rather verbose for SpiceDB's info level func ConfigurePGXLogger(connConfig *pgx.ConnConfig) {