Skip to content

Commit

Permalink
Implement support for relationship integrity
Browse files Browse the repository at this point in the history
Currently only supported in memdb and CRDB drivers
  • Loading branch information
josephschorr committed Aug 13, 2024
1 parent 8bcfb32 commit f01c4f1
Show file tree
Hide file tree
Showing 30 changed files with 3,433 additions and 963 deletions.
58 changes: 41 additions & 17 deletions internal/datastore/crdb/crdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,23 +48,31 @@ const (
Engine = "cockroachdb"
tableNamespace = "namespace_config"
tableTuple = "relation_tuple"
tableTupleWithIntegrity = "relation_tuple_with_integrity"
tableTransactions = "transactions"
tableCaveat = "caveat"
tableRelationshipCounter = "relationship_counter"

colNamespace = "namespace"
colConfig = "serialized_config"
colTimestamp = "timestamp"
colTransactionKey = "key"
colObjectID = "object_id"
colRelation = "relation"
colUsersetNamespace = "userset_namespace"
colUsersetObjectID = "userset_object_id"
colUsersetRelation = "userset_relation"
colCaveatName = "name"
colCaveatDefinition = "definition"
colCaveatContextName = "caveat_name"
colCaveatContext = "caveat_context"
colNamespace = "namespace"
colConfig = "serialized_config"
colTimestamp = "timestamp"
colTransactionKey = "key"

colObjectID = "object_id"
colRelation = "relation"

colUsersetNamespace = "userset_namespace"
colUsersetObjectID = "userset_object_id"
colUsersetRelation = "userset_relation"

colCaveatName = "name"
colCaveatDefinition = "definition"
colCaveatContextName = "caveat_name"
colCaveatContext = "caveat_context"

colIntegrityHash = "integrity_hash"
colIntegrityKeyID = "integrity_key_id"

colCounterName = "name"
colCounterSerializedFilter = "serialized_filter"
colCounterCurrentCount = "current_count"
Expand Down Expand Up @@ -189,6 +197,7 @@ func newCRDBDatastore(ctx context.Context, url string, options ...Option) (datas
transactionNowQuery: transactionNowQuery,
analyzeBeforeStatistics: config.analyzeBeforeStatistics,
filterMaximumIDCount: config.filterMaximumIDCount,
supportsIntegrity: config.withIntegrity,
}
ds.RemoteClockRevisions.SetNowFunc(ds.headRevisionInternal)

Expand Down Expand Up @@ -280,18 +289,19 @@ type crdbDatastore struct {
ctx context.Context
cancel context.CancelFunc
filterMaximumIDCount uint16
supportsIntegrity bool
}

func (cds *crdbDatastore) SnapshotReader(rev datastore.Revision) datastore.Reader {
executor := common.QueryExecutor{
Executor: pgxcommon.NewPGXExecutor(cds.readPool),
Executor: pgxcommon.NewPGXExecutorWithIntegrityOption(cds.readPool, cds.supportsIntegrity),
}

fromBuilder := func(query sq.SelectBuilder, fromStr string) sq.SelectBuilder {
return query.From(fromStr + " AS OF SYSTEM TIME " + rev.String())
}

return &crdbReader{cds.readPool, executor, noOverlapKeyer, nil, fromBuilder, cds.filterMaximumIDCount}
return &crdbReader{cds.readPool, executor, noOverlapKeyer, nil, fromBuilder, cds.filterMaximumIDCount, cds.tableTupleName(), cds.supportsIntegrity}
}

func (cds *crdbDatastore) ReadWriteTx(
Expand All @@ -309,7 +319,7 @@ func (cds *crdbDatastore) ReadWriteTx(
err := cds.writePool.BeginFunc(ctx, func(tx pgx.Tx) error {
querier := pgxcommon.QuerierFuncsFor(tx)
executor := common.QueryExecutor{
Executor: pgxcommon.NewPGXExecutor(querier),
Executor: pgxcommon.NewPGXExecutorWithIntegrityOption(querier, cds.supportsIntegrity),
}

rwt := &crdbReadWriteTXN{
Expand All @@ -322,6 +332,8 @@ func (cds *crdbDatastore) ReadWriteTx(
return query.From(fromStr)
},
cds.filterMaximumIDCount,
cds.tableTupleName(),
cds.supportsIntegrity,
},
tx,
0,
Expand Down Expand Up @@ -447,6 +459,18 @@ func (cds *crdbDatastore) Features(ctx context.Context) (*datastore.Features, er
return features, err
}

func (cds *crdbDatastore) SupportsIntegrity() bool {
return cds.supportsIntegrity
}

func (cds *crdbDatastore) tableTupleName() string {
if cds.supportsIntegrity {
return tableTupleWithIntegrity
}

return tableTuple
}

func (cds *crdbDatastore) features(ctx context.Context) (*datastore.Features, error) {
var features datastore.Features

Expand All @@ -470,7 +494,7 @@ func (cds *crdbDatastore) features(ctx context.Context) (*datastore.Features, er
features.Watch.Reason = fmt.Sprintf("Range feeds must be enabled in CockroachDB and the user must have permission to create them in order to enable the Watch API: %s", err.Error())
}
return nil
}, fmt.Sprintf(cds.beginChangefeedQuery, tableTuple, head, "1s"))
}, fmt.Sprintf(cds.beginChangefeedQuery, cds.tableTupleName(), head, "1s"))

<-streamCtx.Done()

Expand Down
Loading

0 comments on commit f01c4f1

Please sign in to comment.