Skip to content

Commit

Permalink
Add support for relationship integrity to CRDB datastore driver
Browse files Browse the repository at this point in the history
  • Loading branch information
josephschorr committed Jul 16, 2024
1 parent 980c49b commit bf68d3e
Show file tree
Hide file tree
Showing 10 changed files with 219 additions and 58 deletions.
33 changes: 20 additions & 13 deletions internal/datastore/crdb/crdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,19 +52,26 @@ const (
tableCaveat = "caveat"
tableRelationshipCounter = "relationship_counter"

colNamespace = "namespace"
colConfig = "serialized_config"
colTimestamp = "timestamp"
colTransactionKey = "key"
colObjectID = "object_id"
colRelation = "relation"
colUsersetNamespace = "userset_namespace"
colUsersetObjectID = "userset_object_id"
colUsersetRelation = "userset_relation"
colCaveatName = "name"
colCaveatDefinition = "definition"
colCaveatContextName = "caveat_name"
colCaveatContext = "caveat_context"
colNamespace = "namespace"
colConfig = "serialized_config"
colTimestamp = "timestamp"
colTransactionKey = "key"

colObjectID = "object_id"
colRelation = "relation"

colUsersetNamespace = "userset_namespace"
colUsersetObjectID = "userset_object_id"
colUsersetRelation = "userset_relation"

colCaveatName = "name"
colCaveatDefinition = "definition"
colCaveatContextName = "caveat_name"
colCaveatContext = "caveat_context"

colIntegrityHash = "integrity_hash"
colIntegrityKeyID = "integrity_key_id"

colCounterName = "name"
colCounterSerializedFilter = "serialized_filter"
colCounterCurrentCount = "current_count"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package migrations

import (
"context"

"github.com/jackc/pgx/v5"
)

const (
addIntegrityColumnsQuery = `
ALTER TABLE relation_tuple
ADD COLUMN IF NOT EXISTS integrity_hash BYTEA DEFAULT NULL,
ADD COLUMN IF NOT EXISTS integrity_key_id VARCHAR(255) DEFAULT NULL;
`
)

func init() {
err := CRDBMigrations.Register("add-integrity-columns", "add-relationship-counters-table", addIntegrityColumns, noAtomicMigration)
if err != nil {
panic("failed to register migration: " + err.Error())
}
}

func addIntegrityColumns(ctx context.Context, conn *pgx.Conn) error {
if _, err := conn.Exec(ctx, addIntegrityColumnsQuery); err != nil {
return err
}
return nil
}
3 changes: 3 additions & 0 deletions internal/datastore/crdb/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ var (
colUsersetRelation,
colCaveatContextName,
colCaveatContext,
colIntegrityKeyID,
colIntegrityHash,
colTimestamp,
)

countTuples = psql.Select("count(*)")
Expand Down
20 changes: 19 additions & 1 deletion internal/datastore/crdb/readwrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type crdbReadWriteTXN struct {

var (
upsertTupleSuffix = fmt.Sprintf(
"ON CONFLICT (%s,%s,%s,%s,%s,%s) DO UPDATE SET %s = now(), %s = excluded.%s, %s = excluded.%s WHERE (relation_tuple.%s <> excluded.%s OR relation_tuple.%s <> excluded.%s)",
"ON CONFLICT (%s,%s,%s,%s,%s,%s) DO UPDATE SET %s = now(), %s = excluded.%s, %s = excluded.%s, %s = excluded.%s, %s = excluded.%s WHERE (relation_tuple.%s <> excluded.%s OR relation_tuple.%s <> excluded.%s)",
colNamespace,
colObjectID,
colRelation,
Expand All @@ -63,6 +63,10 @@ var (
colCaveatContextName,
colCaveatContext,
colCaveatContext,
colIntegrityKeyID,
colIntegrityKeyID,
colIntegrityHash,
colIntegrityHash,
colCaveatContextName,
colCaveatContextName,
colCaveatContext,
Expand All @@ -78,6 +82,8 @@ var (
colUsersetRelation,
colCaveatContextName,
colCaveatContext,
colIntegrityKeyID,
colIntegrityHash,
)

queryTouchTuple = queryWriteTuple.Suffix(upsertTupleSuffix)
Expand Down Expand Up @@ -222,6 +228,14 @@ func (rwt *crdbReadWriteTXN) WriteRelationships(ctx context.Context, mutations [
caveatContext = rel.Caveat.Context.AsMap()
}

var integrityKeyID *string
var integrityHash []byte

if rel.Integrity != nil {
integrityKeyID = &rel.Integrity.KeyId
integrityHash = rel.Integrity.Hash
}

rwt.addOverlapKey(rel.ResourceAndRelation.Namespace)
rwt.addOverlapKey(rel.Subject.Namespace)

Expand All @@ -237,6 +251,8 @@ func (rwt *crdbReadWriteTXN) WriteRelationships(ctx context.Context, mutations [
rel.Subject.Relation,
caveatName,
caveatContext,
integrityKeyID,
integrityHash,
)
bulkTouchCount++
case core.RelationTupleUpdate_CREATE:
Expand All @@ -250,6 +266,8 @@ func (rwt *crdbReadWriteTXN) WriteRelationships(ctx context.Context, mutations [
rel.Subject.Relation,
caveatName,
caveatContext,
integrityKeyID,
integrityHash,
)
bulkWriteCount++
case core.RelationTupleUpdate_DELETE:
Expand Down
18 changes: 18 additions & 0 deletions internal/datastore/postgres/common/pgx.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/rs/zerolog"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/authzed/spicedb/internal/datastore/common"
log "github.com/authzed/spicedb/internal/logging"
Expand Down Expand Up @@ -46,6 +47,11 @@ func queryTuples(ctx context.Context, sqlStatement string, args []any, span trac
}
var caveatName sql.NullString
var caveatCtx map[string]any

var integrityKeyID sql.NullString
var integrityHash []byte
var integrityTimestamp *time.Time

err := rows.Scan(
&nextTuple.ResourceAndRelation.Namespace,
&nextTuple.ResourceAndRelation.ObjectId,
Expand All @@ -55,6 +61,9 @@ func queryTuples(ctx context.Context, sqlStatement string, args []any, span trac
&nextTuple.Subject.Relation,
&caveatName,
&caveatCtx,
&integrityKeyID,
&integrityHash,
&integrityTimestamp,
)
if err != nil {
return fmt.Errorf(errUnableToQueryTuples, fmt.Errorf("scan err: %w", err))
Expand All @@ -64,6 +73,15 @@ func queryTuples(ctx context.Context, sqlStatement string, args []any, span trac
if err != nil {
return fmt.Errorf(errUnableToQueryTuples, fmt.Errorf("unable to fetch caveat context: %w", err))
}

if integrityKeyID.Valid && integrityTimestamp != nil {
nextTuple.Integrity = &corev1.RelationshipIntegrity{
KeyId: integrityKeyID.String,
Hash: integrityHash,
HashedAt: timestamppb.New(*integrityTimestamp),
}
}

tuples = append(tuples, nextTuple)
}
if err := rows.Err(); err != nil {
Expand Down
24 changes: 2 additions & 22 deletions internal/datastore/proxy/relationshipintegrity.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"crypto/sha256"
"fmt"
"hash"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -102,27 +101,8 @@ func computeRelationshipHash(tpl *corev1.RelationTuple, key hmacConfig) ([]byte,
defer key.hmacPool.Put(hmac)
defer hmac.Reset()

var sb strings.Builder
sb.WriteString(tpl.ResourceAndRelation.Namespace)
sb.WriteString(":")
sb.WriteString(tpl.ResourceAndRelation.ObjectId)
sb.WriteString("#")
sb.WriteString(tpl.ResourceAndRelation.Relation)
sb.WriteString("@")
sb.WriteString(tpl.Subject.Namespace)
sb.WriteString(":")
sb.WriteString(tpl.Subject.ObjectId)
sb.WriteString("#")
sb.WriteString(tpl.Subject.Relation)

if tpl.Caveat != nil && tpl.Caveat.CaveatName != "" {
sb.WriteString(" with ")
sb.WriteString(tpl.Caveat.CaveatName)
sb.WriteString(":")
sb.WriteString(tpl.Caveat.Context.String())
}

if _, err := hmac.Write([]byte(sb.String())); err != nil {
bytes := tuple.MustCanonicalBytes(tpl)
if _, err := hmac.Write(bytes); err != nil {
return nil, err
}
return hmac.Sum(nil)[:16], nil
Expand Down
2 changes: 1 addition & 1 deletion internal/datastore/proxy/relationshipintegrity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestWriteWithPredefinedIntegrity(t *testing.T) {
require.NoError(t, err)

require.Panics(t, func() {
pds.ReadWriteTx(context.Background(), func(ctx context.Context, tx datastore.ReadWriteTransaction) error {
_, _ = pds.ReadWriteTx(context.Background(), func(ctx context.Context, tx datastore.ReadWriteTransaction) error {
tpl := tuple.MustParse("resource:foo#viewer@user:tom")
tpl.Integrity = &core.RelationshipIntegrity{}
return tx.WriteRelationships(context.Background(), []*core.RelationTupleUpdate{
Expand Down
3 changes: 2 additions & 1 deletion pkg/datastore/test/tuples.go
Original file line number Diff line number Diff line change
Expand Up @@ -1709,14 +1709,15 @@ func RelationshipIntegrityInfoTest(t *testing.T, tester DatastoreTester) {
OptionalResourceRelation: "viewer",
})
require.NoError(err)
t.Cleanup(iter.Close)

tpl := iter.Next()
require.NotNil(tpl)

require.NotNil(tpl.Integrity)
require.Equal("key1", tpl.Integrity.KeyId)
require.Equal([]byte("hash1"), tpl.Integrity.Hash)
require.Equal(timestamp, tpl.Integrity.HashedAt.AsTime())
require.LessOrEqual(timestamp, tpl.Integrity.HashedAt.AsTime())

iter.Close()
}
Expand Down
37 changes: 37 additions & 0 deletions pkg/tuple/tuple.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"reflect"
"regexp"
"slices"
"strings"

v1 "github.com/authzed/authzed-go/proto/authzed/api/v1"
"github.com/jzelinskie/stringz"
Expand Down Expand Up @@ -572,3 +573,39 @@ func WithCaveat(tpl *core.RelationTuple, caveatName string, contexts ...map[stri
}
return tpl, nil
}

// MustCanonicalBytes converts a tuple to a canonical set of bytes. If the tuple is nil or empty, returns nil.
// Can be used for hashing purposes.
func MustCanonicalBytes(tpl *core.RelationTuple) []byte {
if tpl == nil {
return nil
}

var sb strings.Builder
sb.WriteString(tpl.ResourceAndRelation.Namespace)
sb.WriteString(":")
sb.WriteString(tpl.ResourceAndRelation.ObjectId)
sb.WriteString("#")
sb.WriteString(tpl.ResourceAndRelation.Relation)
sb.WriteString("@")
sb.WriteString(tpl.Subject.Namespace)
sb.WriteString(":")
sb.WriteString(tpl.Subject.ObjectId)
sb.WriteString("#")
sb.WriteString(tpl.Subject.Relation)

if tpl.Caveat != nil && tpl.Caveat.CaveatName != "" {
sb.WriteString(" with ")
sb.WriteString(tpl.Caveat.CaveatName)
sb.WriteString(":")

caveatStr, err := StringCaveatContext(tpl.Caveat.Context)
if err != nil {
panic(err)
}

sb.WriteString(caveatStr)
}

return []byte(sb.String())
}
Loading

0 comments on commit bf68d3e

Please sign in to comment.