Skip to content

Commit

Permalink
Merge #135292
Browse files Browse the repository at this point in the history
135292: changefeedccl: make changefeed_emitted_bytes work for core changefeeds r=asg0451 a=andyyang890

This patch makes `changefeed_emitted_bytes` structured log events work
for core changefeeds. It also adds the changefeed description to the
aggregator and frontier specs so that a processor doesn't need to load
the job just to get the description and changefeeds that don't use jobs
(i.e. core changefeeds) can also pass along a description.

Informs: #135309

Release note: None

Co-authored-by: Andy Yang <[email protected]>
  • Loading branch information
craig[bot] and andyyang890 committed Dec 2, 2024
2 parents 7a8707d + 30eca1b commit 9eb1027
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 43 deletions.
21 changes: 13 additions & 8 deletions pkg/ccl/changefeedccl/changefeed_dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ func distChangefeedFlow(
execCtx sql.JobExecContext,
jobID jobspb.JobID,
details jobspb.ChangefeedDetails,
description string,
localState *cachedState,
resultsCh chan<- tree.Datums,
) error {
Expand Down Expand Up @@ -132,7 +133,7 @@ func distChangefeedFlow(
}
}
return startDistChangefeed(
ctx, execCtx, jobID, schemaTS, details, initialHighWater, localState, resultsCh)
ctx, execCtx, jobID, schemaTS, details, description, initialHighWater, localState, resultsCh)
}

func fetchTableDescriptors(
Expand Down Expand Up @@ -227,6 +228,7 @@ func startDistChangefeed(
jobID jobspb.JobID,
schemaTS hlc.Timestamp,
details jobspb.ChangefeedDetails,
description string,
initialHighWater hlc.Timestamp,
localState *cachedState,
resultsCh chan<- tree.Datums,
Expand Down Expand Up @@ -259,7 +261,7 @@ func startDistChangefeed(
if progress := localState.progress.GetChangefeed(); progress != nil && progress.Checkpoint != nil {
checkpoint = progress.Checkpoint
}
p, planCtx, err := makePlan(execCtx, jobID, details, initialHighWater,
p, planCtx, err := makePlan(execCtx, jobID, details, description, initialHighWater,
trackedSpans, checkpoint, localState.drainingNodes)(ctx, dsp)
if err != nil {
return err
Expand Down Expand Up @@ -373,6 +375,7 @@ func makePlan(
execCtx sql.JobExecContext,
jobID jobspb.JobID,
details jobspb.ChangefeedDetails,
description string,
initialHighWater hlc.Timestamp,
trackedSpans []roachpb.Span,
checkpoint *jobspb.ChangefeedProgress_Checkpoint,
Expand Down Expand Up @@ -476,12 +479,13 @@ func makePlan(
}

aggregatorSpecs[i] = &execinfrapb.ChangeAggregatorSpec{
Watches: watches,
Checkpoint: aggregatorCheckpoint,
Feed: details,
UserProto: execCtx.User().EncodeProto(),
JobID: jobID,
Select: execinfrapb.Expression{Expr: details.Select},
Watches: watches,
Checkpoint: aggregatorCheckpoint,
Feed: details,
UserProto: execCtx.User().EncodeProto(),
JobID: jobID,
Select: execinfrapb.Expression{Expr: details.Select},
Description: description,
}
}

Expand All @@ -494,6 +498,7 @@ func makePlan(
Feed: details,
JobID: jobID,
UserProto: execCtx.User().EncodeProto(),
Description: description,
}

if haveKnobs && maybeCfKnobs.OnDistflowSpec != nil {
Expand Down
45 changes: 28 additions & 17 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,17 +262,31 @@ func (ca *changeAggregator) MustBeStreaming() bool {
return true
}

// wrapMetricsController wraps the supplied metricsRecorder to emit metrics to telemetry.
// This method modifies ca.cancel().
func (ca *changeAggregator) wrapMetricsController(
// wrapMetricsRecorderWithTelemetry wraps the supplied metricsRecorder
// so it periodically emits metrics to telemetry.
func (ca *changeAggregator) wrapMetricsRecorderWithTelemetry(
ctx context.Context, recorder metricsRecorder,
) (metricsRecorder, error) {
job, err := ca.FlowCtx.Cfg.JobRegistry.LoadJob(ctx, ca.spec.JobID)
if err != nil {
return ca.sliMetrics, err
details := ca.spec.Feed
jobID := ca.spec.JobID
description := ca.spec.Description
// This code exists so that the old behavior is preserved if the spec is created
// in a mixed-version cluster on a node without the new Description field.
// It can be deleted once we no longer need to interoperate with binaries that
// are version 24.3 or earlier.
if description == "" {
// Don't emit telemetry messages for core changefeeds without a description.
if ca.isSinkless() {
return recorder, nil
}
job, err := ca.FlowCtx.Cfg.JobRegistry.LoadJob(ctx, jobID)
if err != nil {
return nil, err
}
description = job.Payload().Description
}

recorderWithTelemetry, err := wrapMetricsRecorderWithTelemetry(ctx, job, ca.FlowCtx.Cfg.Settings, recorder, ca.knobs)
recorderWithTelemetry, err := wrapMetricsRecorderWithTelemetry(ctx, details, description, jobID, ca.FlowCtx.Cfg.Settings, recorder, ca.knobs)
if err != nil {
return ca.sliMetrics, err
}
Expand Down Expand Up @@ -337,18 +351,15 @@ func (ca *changeAggregator) Start(ctx context.Context) {
}
ca.sliMetricsID = ca.sliMetrics.claimId()

// TODO(jayant): add support for sinkless changefeeds using UUID
recorder := metricsRecorder(ca.sliMetrics)
if !ca.isSinkless() {
recorder, err = ca.wrapMetricsController(ctx, recorder)
if err != nil {
if log.V(2) {
log.Infof(ca.Ctx(), "change aggregator moving to draining due to error wrapping metrics controller: %v", err)
}
ca.MoveToDraining(err)
ca.cancel()
return
recorder, err = ca.wrapMetricsRecorderWithTelemetry(ctx, recorder)
if err != nil {
if log.V(2) {
log.Infof(ca.Ctx(), "change aggregator moving to draining due to error wrapping metrics controller: %v", err)
}
ca.MoveToDraining(err)
ca.cancel()
return
}

ca.sink, err = getEventSink(ctx, ca.FlowCtx.Cfg, ca.spec.Feed, timestampOracle,
Expand Down
11 changes: 7 additions & 4 deletions pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func changefeedPlanHook(
telemetry.Count(`changefeed.create.core`)
logChangefeedCreateTelemetry(ctx, jr, changefeedStmt.Select != nil)

err := coreChangefeed(ctx, p, details, progress, resultsCh)
err := coreChangefeed(ctx, p, details, description, progress, resultsCh)
// TODO(yevgeniy): This seems wrong -- core changefeeds always terminate
// with an error. Perhaps rename this telemetry to indicate number of
// completed feeds.
Expand Down Expand Up @@ -336,6 +336,7 @@ func coreChangefeed(
ctx context.Context,
p sql.PlanHookState,
details jobspb.ChangefeedDetails,
description string,
progress jobspb.Progress,
resultsCh chan<- tree.Datums,
) error {
Expand All @@ -354,7 +355,7 @@ func coreChangefeed(
knobs.BeforeDistChangefeed()
}

err := distChangefeedFlow(ctx, p, 0 /* jobID */, details, localState, resultsCh)
err := distChangefeedFlow(ctx, p, 0 /* jobID */, details, description, localState, resultsCh)
if err == nil {
log.Infof(ctx, "core changefeed completed with no error")
return nil
Expand Down Expand Up @@ -1118,12 +1119,13 @@ func (b *changefeedResumer) Resume(ctx context.Context, execCtx interface{}) err
jobID := b.job.ID()
details := b.job.Details().(jobspb.ChangefeedDetails)
progress := b.job.Progress()
description := b.job.Payload().Description

if err := b.ensureClusterIDMatches(ctx, jobExec.ExtendedEvalContext().ClusterID); err != nil {
return err
}

err := b.resumeWithRetries(ctx, jobExec, jobID, details, progress, execCfg)
err := b.resumeWithRetries(ctx, jobExec, jobID, details, description, progress, execCfg)
if err != nil {
return b.handleChangefeedError(ctx, err, details, jobExec)
}
Expand Down Expand Up @@ -1243,6 +1245,7 @@ func (b *changefeedResumer) resumeWithRetries(
jobExec sql.JobExecContext,
jobID jobspb.JobID,
details jobspb.ChangefeedDetails,
description string,
initialProgress jobspb.Progress,
execCfg *sql.ExecutorConfig,
) error {
Expand Down Expand Up @@ -1292,7 +1295,7 @@ func (b *changefeedResumer) resumeWithRetries(
knobs.BeforeDistChangefeed()
}

flowErr = distChangefeedFlow(ctx, jobExec, jobID, details, localState, startedCh)
flowErr = distChangefeedFlow(ctx, jobExec, jobID, details, description, localState, startedCh)
if flowErr == nil {
return nil // Changefeed completed -- e.g. due to initial_scan=only mode.
}
Expand Down
21 changes: 16 additions & 5 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6076,10 +6076,16 @@ func TestChangefeedContinuousTelemetry(t *testing.T) {

sqlDB := sqlutils.MakeSQLRunner(s.DB)
sqlDB.Exec(t, `CREATE TABLE foo (id INT PRIMARY KEY)`)
// NB: In order for this test to work for sinkless feeds, we must
// have at least one row before creating the feed.
sqlDB.Exec(t, `INSERT INTO foo VALUES (-1)`)

foo := feed(t, f, `CREATE CHANGEFEED FOR foo`)
defer closeFeed(t, foo)
jobID := foo.(cdctest.EnterpriseTestFeed).JobID()
var jobID jobspb.JobID
if foo, ok := foo.(cdctest.EnterpriseTestFeed); ok {
jobID = foo.JobID()
}

for i := 0; i < 5; i++ {
beforeCreate := timeutil.Now()
Expand All @@ -6088,7 +6094,7 @@ func TestChangefeedContinuousTelemetry(t *testing.T) {
}
}

cdcTest(t, testFn, feedTestOmitSinks("sinkless"))
cdcTest(t, testFn)
}

type testTelemetryLogger struct {
Expand All @@ -6112,6 +6118,9 @@ func TestChangefeedContinuousTelemetryOnTermination(t *testing.T) {
continuousTelemetryInterval.Override(context.Background(), &s.Server.ClusterSettings().SV, interval)
sqlDB := sqlutils.MakeSQLRunner(s.DB)
sqlDB.Exec(t, `CREATE TABLE foo (id INT PRIMARY KEY)`)
// NB: In order for this test to work for sinkless feeds, we must
// have at least one row before creating the feed.
sqlDB.Exec(t, `INSERT INTO foo VALUES (1)`)

var seen atomic.Bool
waitForIncEmittedCounters := func() error {
Expand All @@ -6138,8 +6147,10 @@ func TestChangefeedContinuousTelemetryOnTermination(t *testing.T) {
// Insert a row and wait for logs to be created.
beforeFirstLog := timeutil.Now()
foo := feed(t, f, `CREATE CHANGEFEED FOR foo`)
jobID := foo.(cdctest.EnterpriseTestFeed).JobID()
sqlDB.Exec(t, `INSERT INTO foo VALUES (1)`)
var jobID jobspb.JobID
if foo, ok := foo.(cdctest.EnterpriseTestFeed); ok {
jobID = foo.JobID()
}
testutils.SucceedsSoon(t, waitForIncEmittedCounters)
verifyLogsWithEmittedBytesAndMessages(t, jobID, beforeFirstLog.UnixNano(), interval.Nanoseconds(), false /* closing */)

Expand All @@ -6161,7 +6172,7 @@ func TestChangefeedContinuousTelemetryOnTermination(t *testing.T) {
verifyLogsWithEmittedBytesAndMessages(t, jobID, afterFirstLog.UnixNano(), interval.Nanoseconds(), true /* closing */)
}

cdcTest(t, testFn, feedTestOmitSinks("sinkless"))
cdcTest(t, testFn)
}

func TestChangefeedContinuousTelemetryDifferentJobs(t *testing.T) {
Expand Down
25 changes: 16 additions & 9 deletions pkg/ccl/changefeedccl/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"sync/atomic"
"time"

"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand All @@ -29,7 +28,6 @@ type sinkTelemetryData struct {
type periodicTelemetryLogger struct {
ctx context.Context
sinkTelemetryData sinkTelemetryData
job *jobs.Job
changefeedDetails eventpb.CommonChangefeedEventDetails
settings *cluster.Settings

Expand All @@ -52,12 +50,15 @@ type telemetryLogger interface {
var _ telemetryLogger = (*periodicTelemetryLogger)(nil)

func makePeriodicTelemetryLogger(
ctx context.Context, job *jobs.Job, s *cluster.Settings,
ctx context.Context,
details jobspb.ChangefeedDetails,
description string,
jobID jobspb.JobID,
s *cluster.Settings,
) (*periodicTelemetryLogger, error) {
return &periodicTelemetryLogger{
ctx: ctx,
job: job,
changefeedDetails: makeCommonChangefeedEventDetails(ctx, job.Details().(jobspb.ChangefeedDetails), job.Payload().Description, job.ID()),
changefeedDetails: makeCommonChangefeedEventDetails(ctx, details, description, jobID),
sinkTelemetryData: sinkTelemetryData{},
settings: s,
}, nil
Expand Down Expand Up @@ -123,10 +124,16 @@ func (ptl *periodicTelemetryLogger) close() {
}

func wrapMetricsRecorderWithTelemetry(
ctx context.Context, job *jobs.Job, s *cluster.Settings, mb metricsRecorder, knobs TestingKnobs,
ctx context.Context,
details jobspb.ChangefeedDetails,
description string,
jobID jobspb.JobID,
s *cluster.Settings,
mb metricsRecorder,
knobs TestingKnobs,
) (*telemetryMetricsRecorder, error) {
var logger telemetryLogger
logger, err := makePeriodicTelemetryLogger(ctx, job, s)
logger, err := makePeriodicTelemetryLogger(ctx, details, description, jobID, s)
if err != nil {
return &telemetryMetricsRecorder{}, err
}
Expand Down Expand Up @@ -167,12 +174,12 @@ func (r *telemetryMetricsRecorder) recordEmittedBatch(
}

// continuousTelemetryInterval determines the interval at which each node emits
// periodic telemetry events during the lifespan of each enterprise changefeed.
// periodic telemetry events during the lifespan of each changefeed.
var continuousTelemetryInterval = settings.RegisterDurationSetting(
settings.ApplicationLevel,
"changefeed.telemetry.continuous_logging.interval",
"determines the interval at which each node emits continuous telemetry events"+
" during the lifespan of every enterprise changefeed; setting a zero value disables logging",
" during the lifespan of every changefeed; setting a zero value disables logging",
24*time.Hour,
settings.NonNegativeDuration,
)
6 changes: 6 additions & 0 deletions pkg/sql/execinfrapb/processors_changefeeds.proto
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ message ChangeAggregatorSpec {

// select is the "select clause" for predicate changefeed.
optional Expression select = 6 [(gogoproto.nullable) = false];

// Description is the description of the changefeed. Used for structured logging.
optional string description = 7 [(gogoproto.nullable) = false];
}

// ChangeFrontierSpec is the specification for a processor that receives
Expand All @@ -78,4 +81,7 @@ message ChangeFrontierSpec {
// User who initiated the changefeed. This is used to check access privileges
// when using FileTable ExternalStorage.
optional string user_proto = 4 [(gogoproto.nullable) = false, (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/security/username.SQLUsernameProto"];

// Description is the description of the changefeed. Used for structured logging.
optional string description = 5 [(gogoproto.nullable) = false];
}

0 comments on commit 9eb1027

Please sign in to comment.