Skip to content

Commit

Permalink
Merge pull request #1980 from josephschorr/relationship-integrity
Browse files Browse the repository at this point in the history
Relationship integrity
  • Loading branch information
josephschorr authored Sep 3, 2024
2 parents 7c1078d + 707f7bf commit 4f9d807
Show file tree
Hide file tree
Showing 40 changed files with 3,566 additions and 980 deletions.
4 changes: 4 additions & 0 deletions internal/datastore/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ func (p *ctxProxy) Features(ctx context.Context) (*datastore.Features, error) {
return p.delegate.Features(SeparateContextWithTracing(ctx))
}

func (p *ctxProxy) OfflineFeatures() (*datastore.Features, error) {
return p.delegate.OfflineFeatures()
}

func (p *ctxProxy) Statistics(ctx context.Context) (datastore.Stats, error) {
return p.delegate.Statistics(SeparateContextWithTracing(ctx))
}
Expand Down
79 changes: 59 additions & 20 deletions internal/datastore/crdb/crdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,23 +48,31 @@ const (
Engine = "cockroachdb"
tableNamespace = "namespace_config"
tableTuple = "relation_tuple"
tableTupleWithIntegrity = "relation_tuple_with_integrity"
tableTransactions = "transactions"
tableCaveat = "caveat"
tableRelationshipCounter = "relationship_counter"

colNamespace = "namespace"
colConfig = "serialized_config"
colTimestamp = "timestamp"
colTransactionKey = "key"
colObjectID = "object_id"
colRelation = "relation"
colUsersetNamespace = "userset_namespace"
colUsersetObjectID = "userset_object_id"
colUsersetRelation = "userset_relation"
colCaveatName = "name"
colCaveatDefinition = "definition"
colCaveatContextName = "caveat_name"
colCaveatContext = "caveat_context"
colNamespace = "namespace"
colConfig = "serialized_config"
colTimestamp = "timestamp"
colTransactionKey = "key"

colObjectID = "object_id"
colRelation = "relation"

colUsersetNamespace = "userset_namespace"
colUsersetObjectID = "userset_object_id"
colUsersetRelation = "userset_relation"

colCaveatName = "name"
colCaveatDefinition = "definition"
colCaveatContextName = "caveat_name"
colCaveatContext = "caveat_context"

colIntegrityHash = "integrity_hash"
colIntegrityKeyID = "integrity_key_id"

colCounterName = "name"
colCounterSerializedFilter = "serialized_filter"
colCounterCurrentCount = "current_count"
Expand Down Expand Up @@ -190,6 +198,7 @@ func newCRDBDatastore(ctx context.Context, url string, options ...Option) (datas
transactionNowQuery: transactionNowQuery,
analyzeBeforeStatistics: config.analyzeBeforeStatistics,
filterMaximumIDCount: config.filterMaximumIDCount,
supportsIntegrity: config.withIntegrity,
}
ds.RemoteClockRevisions.SetNowFunc(ds.headRevisionInternal)

Expand Down Expand Up @@ -282,18 +291,19 @@ type crdbDatastore struct {
ctx context.Context
cancel context.CancelFunc
filterMaximumIDCount uint16
supportsIntegrity bool
}

func (cds *crdbDatastore) SnapshotReader(rev datastore.Revision) datastore.Reader {
executor := common.QueryExecutor{
Executor: pgxcommon.NewPGXExecutor(cds.readPool),
Executor: pgxcommon.NewPGXExecutorWithIntegrityOption(cds.readPool, cds.supportsIntegrity),
}

fromBuilder := func(query sq.SelectBuilder, fromStr string) sq.SelectBuilder {
return query.From(fromStr + " AS OF SYSTEM TIME " + rev.String())
}

return &crdbReader{cds.readPool, executor, noOverlapKeyer, nil, fromBuilder, cds.filterMaximumIDCount}
return &crdbReader{cds.readPool, executor, noOverlapKeyer, nil, fromBuilder, cds.filterMaximumIDCount, cds.tableTupleName(), cds.supportsIntegrity}
}

func (cds *crdbDatastore) ReadWriteTx(
Expand All @@ -311,7 +321,7 @@ func (cds *crdbDatastore) ReadWriteTx(
err := cds.writePool.BeginFunc(ctx, func(tx pgx.Tx) error {
querier := pgxcommon.QuerierFuncsFor(tx)
executor := common.QueryExecutor{
Executor: pgxcommon.NewPGXExecutor(querier),
Executor: pgxcommon.NewPGXExecutorWithIntegrityOption(querier, cds.supportsIntegrity),
}

rwt := &crdbReadWriteTXN{
Expand All @@ -324,6 +334,8 @@ func (cds *crdbDatastore) ReadWriteTx(
return query.From(fromStr)
},
cds.filterMaximumIDCount,
cds.tableTupleName(),
cds.supportsIntegrity,
},
tx,
0,
Expand Down Expand Up @@ -442,15 +454,42 @@ func (cds *crdbDatastore) headRevisionInternal(ctx context.Context) (datastore.R
return hlcNow, fnErr
}

func (cds *crdbDatastore) OfflineFeatures() (*datastore.Features, error) {
if cds.supportsIntegrity {
return &datastore.Features{
IntegrityData: datastore.Feature{
Status: datastore.FeatureSupported,
},
}, nil
}

return &datastore.Features{
IntegrityData: datastore.Feature{
Status: datastore.FeatureUnsupported,
},
}, nil
}

func (cds *crdbDatastore) Features(ctx context.Context) (*datastore.Features, error) {
features, _, err := cds.featureGroup.Do(ctx, "", func(ictx context.Context) (*datastore.Features, error) {
return cds.features(ictx)
})
return features, err
}

func (cds *crdbDatastore) tableTupleName() string {
if cds.supportsIntegrity {
return tableTupleWithIntegrity
}

return tableTuple
}

func (cds *crdbDatastore) features(ctx context.Context) (*datastore.Features, error) {
var features datastore.Features
features := datastore.Features{}
if cds.supportsIntegrity {
features.IntegrityData.Status = datastore.FeatureSupported
}

head, err := cds.HeadRevision(ctx)
if err != nil {
Expand All @@ -465,14 +504,14 @@ func (cds *crdbDatastore) features(ctx context.Context) (*datastore.Features, er

_ = cds.writePool.ExecFunc(streamCtx, func(ctx context.Context, tag pgconn.CommandTag, err error) error {
if err != nil && errors.Is(err, context.Canceled) {
features.Watch.Enabled = true
features.Watch.Status = datastore.FeatureSupported
features.Watch.Reason = ""
} else if err != nil {
features.Watch.Enabled = false
features.Watch.Status = datastore.FeatureUnsupported
features.Watch.Reason = fmt.Sprintf("Range feeds must be enabled in CockroachDB and the user must have permission to create them in order to enable the Watch API: %s", err.Error())
}
return nil
}, fmt.Sprintf(cds.beginChangefeedQuery, tableTuple, head, "1s"))
}, fmt.Sprintf(cds.beginChangefeedQuery, cds.tableTupleName(), head, "1s"))

<-streamCtx.Done()

Expand Down
Loading

0 comments on commit 4f9d807

Please sign in to comment.