diff --git a/README.md b/README.md index 7ead65bf..663dcfc0 100644 --- a/README.md +++ b/README.md @@ -6,4 +6,17 @@ Kine is an etcdshim that translates etcd API to sqlite, Postgres, Mysql, and dql - Can be ran standalone so any k8s (not just k3s) can use Kine - Implements a subset of etcdAPI (not usable at all for general purpose etcd) - Translates etcdTX calls into the desired API (Create, Update, Delete) -- Backend drivers for dqlite, sqlite, Postgres, MySQL +- Backend drivers for dqlite, sqlite, Postgres, MySQL, Oracle (experimental) + +### Using Oracle Backend (experimental) + +The Oracle backend is backed by [godror/godror](https://github.com/godror/godror), that uses a C-based OCI wrapper, +so native OCI drivers are still required to start Kine. +However, as those drivers are proprietary and dynamically linked, you will have to manually download it yourself and +point your LD_LIBRARY_PATH to the right direction. + +Oracle has outlined the procedure to use the drivers correctly. You can refer to their documentation +[here](https://docs.oracle.com/en/database/oracle/oracle-database/19/lnoci/instant-client.html#GUID-7D65474A-8790-4E81-B535-409010791C2F). + +To download the drivers, please go [here](https://www.oracle.com/hk/database/technologies/instant-client/downloads.html) +and carefully read their licensing terms. (unless you want Oracle to send you the papers) diff --git a/go.mod b/go.mod index d9a5ae9c..083f844b 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/Rican7/retry v0.1.0 github.com/canonical/go-dqlite v1.5.1 github.com/go-sql-driver/mysql v1.4.1 + github.com/godror/godror v0.17.0 github.com/lib/pq v1.1.1 github.com/mattn/go-sqlite3 v1.10.0 github.com/pkg/errors v0.8.1 @@ -13,5 +14,6 @@ require ( github.com/sirupsen/logrus v1.4.2 github.com/urfave/cli v1.21.0 go.etcd.io/etcd v0.0.0-20191023171146-3cf2f69b5738 + golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 google.golang.org/grpc v1.23.1 ) diff --git a/go.sum b/go.sum index 70effee0..8b3693f3 100644 --- a/go.sum +++ b/go.sum @@ -76,6 +76,8 @@ github.com/globalsign/mgo v0.0.0-20180905125535-1ca0a4f7cbcb/go.mod h1:xkRDCp4j0 github.com/globalsign/mgo v0.0.0-20181015135952-eeefdecb41b8/go.mod h1:xkRDCp4j0OGD1HRkm4kmhM+pmpv3AKq5SU7GMg4oO/Q= github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= +github.com/go-logfmt/logfmt v0.5.0 h1:TrB8swr/68K7m9CcGut2g3UOihhbcbiMAYiuTXdEih4= +github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= github.com/go-logr/logr v0.1.0/go.mod h1:ixOQHD9gLJUVQQ2ZOR7zLEifBX6tGkNJF4QyIY7sIas= github.com/go-openapi/analysis v0.0.0-20180825180245-b006789cd277/go.mod h1:k70tL6pCuVxPJOHXQ+wIac1FUrvNkHolPie/cLEU6hI= github.com/go-openapi/analysis v0.17.0/go.mod h1:IowGgpVeD0vNm45So8nr+IcQ3pxVtpRoBWb8PVZO0ik= @@ -123,6 +125,8 @@ github.com/go-openapi/validate v0.19.5/go.mod h1:8DJv2CVJQ6kGNpFW6eV9N3JviE1C85n github.com/go-sql-driver/mysql v1.4.1 h1:g24URVg0OFbNUTx9qqY1IRZ9D9z3iPyi5zKhQZpNwpA= github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= +github.com/godror/godror v0.17.0 h1:ClVuKG0qCj6f182BruhwgHS8xCgfpQp35pTrefSq4NE= +github.com/godror/godror v0.17.0/go.mod h1:DE94Br7LXn4dQGCexePriVrKotR9GkVzPPT5nnm8dj0= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.2.1 h1:/s5zKNz0uPFCZ5hddgPdo2TK2TVrUNMn0OOX8/aZMTE= github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= @@ -146,6 +150,8 @@ github.com/google/btree v1.0.0 h1:0udJVsspx3VBr5FwtLhQQtuAsVc79tTq0ocGIPAU6qo= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4= +github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/gofuzz v0.0.0-20161122191042-44d81051d367/go.mod h1:HP5RmnzzSNb993RKQDq4+1A4ia9nllfqcQFTQJedwGI= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= @@ -357,6 +363,8 @@ golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e h1:vcxGaoTs7kV8m5Np9uUNQin4BrLOthgV7252N8V+FwY= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20170830134202-bb24a47a89ea/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -401,6 +409,8 @@ golang.org/x/tools v0.0.0-20190614205625-5aca471b1d59/go.mod h1:/rFqwRUd4F7ZHNgw golang.org/x/tools v0.0.0-20190617190820-da514acc4774/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= golang.org/x/tools v0.0.0-20190920225731-5eefd052ad72/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gonum.org/v1/gonum v0.0.0-20190331200053-3d26580ed485/go.mod h1:2ltnJ7xHfj0zHS40VVPYEAAMTa3ZGguvHGBSJeRWqE0= gonum.org/v1/netlib v0.0.0-20190313105609-8cb42192e0e0/go.mod h1:wa6Ws7BG/ESfp6dHfk7C6KdzKA7wR7u/rKwOGE66zvw= gonum.org/v1/netlib v0.0.0-20190331212654-76723241ea4e/go.mod h1:kS+toOQn6AQKjmKJ7gzohV1XkqsFehRA2FbsbkopSuQ= diff --git a/pkg/drivers/generic/generic.go b/pkg/drivers/generic/generic.go index aadd41a1..3787ba8e 100644 --- a/pkg/drivers/generic/generic.go +++ b/pkg/drivers/generic/generic.go @@ -53,7 +53,7 @@ var ( GROUP BY mkv.name) maxkv ON maxkv.id = kv.id WHERE - (kv.deleted = 0 OR ?) + kv.deleted = 0 OR kv.deleted = ? ORDER BY kv.id ASC `, revSQL, compactRevSQL, columns) ) @@ -67,6 +67,7 @@ func (s Stripped) String() string { type ErrRetry func(error) bool type TranslateErr func(error) error +type TranslateLimit func(num int64) string type ConnectionPoolConfig struct { MaxIdle int // zero means defaultMaxIdleConns; negative means 0 @@ -77,23 +78,26 @@ type ConnectionPoolConfig struct { type Generic struct { sync.Mutex - LockWrites bool - LastInsertID bool - DB *sql.DB - GetCurrentSQL string - GetRevisionSQL string - RevisionSQL string - ListRevisionStartSQL string - GetRevisionAfterSQL string - CountSQL string - AfterSQL string - DeleteSQL string - UpdateCompactSQL string - InsertSQL string - FillSQL string - InsertLastInsertIDSQL string - Retry ErrRetry - TranslateErr TranslateErr + LockWrites bool + LastInsertID bool + InsertReturningInto bool + DB *sql.DB + GetCurrentSQL string + GetRevisionSQL string + RevisionSQL string + ListRevisionStartSQL string + GetRevisionAfterSQL string + CountSQL string + AfterSQL string + DeleteSQL string + UpdateCompactSQL string + InsertSQL string + FillSQL string + InsertLastInsertIDSQL string + InsertReturningIntoSQL string + Retry ErrRetry + TranslateErr TranslateErr + TranslateLimit TranslateLimit } func q(sql, param string, numbered bool) string { @@ -113,17 +117,14 @@ func q(sql, param string, numbered bool) string { } func (d *Generic) Migrate(ctx context.Context) { - var ( - count = 0 - countKV = d.queryRow(ctx, "SELECT COUNT(*) FROM key_value") - countKine = d.queryRow(ctx, "SELECT COUNT(*) FROM kine") - ) + var count int64 - if err := countKV.Scan(&count); err != nil || count == 0 { + if err := d.queryRow(ctx, "SELECT COUNT(*) FROM key_value").Scan(&count); err != nil || count == 0 { + logrus.WithError(err).Debug("no migration request demanded") return } - if err := countKine.Scan(&count); err != nil || count != 0 { + if err := d.queryRow(ctx, "SELECT COUNT(*) FROM kine").Scan(&count); err != nil || count != 0 { return } @@ -234,20 +235,52 @@ func Open(ctx context.Context, driverName, dataSourceName string, connPoolConfig FillSQL: q(`INSERT INTO kine(id, name, created, deleted, create_revision, prev_revision, lease, value, old_value) values(?, ?, ?, ?, ?, ?, ?, ?, ?)`, paramCharacter, numbered), + + InsertReturningIntoSQL: q(`INSERT INTO kine(name, created, deleted, create_revision, prev_revision, lease, value, old_value) + values(?, ?, ?, ?, ?, ?, ?, ?) RETURNING id INTO ?`, paramCharacter, numbered), }, err } -func (d *Generic) query(ctx context.Context, sql string, args ...interface{}) (*sql.Rows, error) { - logrus.Tracef("QUERY %v : %s", args, Stripped(sql)) +var limitNum = regexp.MustCompile(`LIMIT[[:space:]]+(\d+)`) + +func (d *Generic) doTranslateLimit(sql string) string { + if d.TranslateLimit == nil { + return sql + } + + return limitNum.ReplaceAllStringFunc(sql, func(s string) string { + // This should be a necessary condition + if n, err := strconv.ParseInt(limitNum.FindStringSubmatch(s)[1], 10, 64); err == nil { + return d.TranslateLimit(n) + } else { + // ??? + panic(err) + } + }) +} + +func (d *Generic) query(ctx context.Context, sql string, args ...interface{}) (rows *sql.Rows, err error) { + sql = d.doTranslateLimit(sql) + logrus. + WithField("args", args). + WithField("sql", Stripped(sql)). + WithField("mode", nil). + Trace("query") return d.DB.QueryContext(ctx, sql, args...) } func (d *Generic) queryRow(ctx context.Context, sql string, args ...interface{}) *sql.Row { - logrus.Tracef("QUERY ROW %v : %s", args, Stripped(sql)) + sql = d.doTranslateLimit(sql) + logrus. + WithField("args", args). + WithField("sql", Stripped(sql)). + WithField("mode", "row"). + Trace("query") return d.DB.QueryRowContext(ctx, sql, args...) } func (d *Generic) execute(ctx context.Context, sql string, args ...interface{}) (result sql.Result, err error) { + sql = d.doTranslateLimit(sql) if d.LockWrites { d.Lock() defer d.Unlock() @@ -256,9 +289,17 @@ func (d *Generic) execute(ctx context.Context, sql string, args ...interface{}) wait := strategy.Backoff(backoff.Linear(100 + time.Millisecond)) for i := uint(0); i < 20; i++ { if i > 2 { - logrus.Debugf("EXEC (try: %d) %v : %s", i, args, Stripped(sql)) + logrus. + WithField("try", i). + WithField("args", args). + WithField("sql", Stripped(sql)). + Debug("exec") } else { - logrus.Tracef("EXEC (try: %d) %v : %s", i, args, Stripped(sql)) + logrus. + WithField("try", i). + WithField("args", args). + WithField("sql", Stripped(sql)). + Trace("exec") } result, err = d.DB.ExecContext(ctx, sql, args...) if err != nil && d.Retry != nil && d.Retry(err) { @@ -299,7 +340,13 @@ func (d *Generic) ListCurrent(ctx context.Context, prefix string, limit int64, i if limit > 0 { sql = fmt.Sprintf("%s LIMIT %d", sql, limit) } - return d.query(ctx, sql, prefix, includeDeleted) + // don't ask me why, ask golang + return d.query(ctx, sql, prefix, func() int { + if includeDeleted { + return 1 + } + return 0 + }()) } func (d *Generic) List(ctx context.Context, prefix, startKey string, limit, revision int64, includeDeleted bool) (*sql.Rows, error) { @@ -308,14 +355,24 @@ func (d *Generic) List(ctx context.Context, prefix, startKey string, limit, revi if limit > 0 { sql = fmt.Sprintf("%s LIMIT %d", sql, limit) } - return d.query(ctx, sql, prefix, revision, includeDeleted) + return d.query(ctx, sql, prefix, revision, func() int { + if includeDeleted { + return 1 + } + return 0 + }()) } sql := d.GetRevisionAfterSQL if limit > 0 { sql = fmt.Sprintf("%s LIMIT %d", sql, limit) } - return d.query(ctx, sql, prefix, revision, startKey, revision, includeDeleted) + return d.query(ctx, sql, prefix, revision, startKey, revision, func() int { + if includeDeleted { + return 1 + } + return 0 + }()) } func (d *Generic) Count(ctx context.Context, prefix string) (int64, int64, error) { @@ -382,6 +439,15 @@ func (d *Generic) Insert(ctx context.Context, key string, create, delete bool, c return row.LastInsertId() } + if d.InsertReturningInto { + _, err := d.execute(ctx, d.InsertReturningIntoSQL, key, cVal, dVal, createRevision, previousRevision, ttl, value, prevValue, sql.Out{Dest: &id}) + if err != nil { + return 0, err + } + logrus.WithField("id", id).Debug("insert returning into") + return id, nil + } + row := d.queryRow(ctx, d.InsertSQL, key, cVal, dVal, createRevision, previousRevision, ttl, value, prevValue) err = row.Scan(&id) return id, err diff --git a/pkg/drivers/oracle/oracle.go b/pkg/drivers/oracle/oracle.go new file mode 100644 index 00000000..d09b9aac --- /dev/null +++ b/pkg/drivers/oracle/oracle.go @@ -0,0 +1,106 @@ +package oracle + +import ( + "context" + "database/sql" + "fmt" + "strings" + + "github.com/godror/godror" + "golang.org/x/xerrors" + + "github.com/rancher/kine/pkg/drivers/generic" + "github.com/rancher/kine/pkg/logstructured" + "github.com/rancher/kine/pkg/logstructured/sqllog" + "github.com/rancher/kine/pkg/server" +) + +const ( + defaultHostDSN = "system@localhost" +) + +var ( + schema = []string{` + CREATE TABLE kine ( + id INTEGER GENERATED BY DEFAULT ON NULL as IDENTITY, + name VARCHAR(630), + created INTEGER, + deleted INTEGER, + create_revision INTEGER, + prev_revision INTEGER, + lease INTEGER, + value BLOB, + old_value BLOB, + CONSTRAINT kine_pk PRIMARY KEY (id) + )`, + `CREATE INDEX kine_name_index ON kine (name)`, + `CREATE INDEX kine_name_id_index ON kine (name,id)`, + `CREATE UNIQUE INDEX kine_name_prev_revision_uindex ON kine (name, prev_revision)`, + } + procedureTemplate = []string{ + "declare", + "begin", + "%s", + "exception when others then", + "if SQLCODE = -955 then null; else raise; end if;", + "end;", + } +) + +func New(ctx context.Context, dataSourceName string) (server.Backend, error) { + parsedDSN, err := prepareDSN(dataSourceName) + if err != nil { + return nil, err + } + + dialect, err := generic.Open(ctx, "godror", parsedDSN, ":", true) + if err != nil { + return nil, err + } + dialect.InsertReturningInto = true + dialect.TranslateLimit = func(num int64) string { + return fmt.Sprintf("FETCH FIRST %d ROWS ONLY", num) + } + dialect.TranslateErr = func(err error) error { + // ORA-00001: unique constraint violated + if err, ok := xerrors.Unwrap(err).(*godror.OraErr); ok && err.Code() == 1 { + return server.ErrKeyExists + } + return err + } + if err := setup(dialect.DB); err != nil { + return nil, err + } + + dialect.Migrate(context.Background()) + return logstructured.New(sqllog.New(dialect)), nil +} + +func setup(db *sql.DB) error { + var str strings.Builder + + for _, cmd := range procedureTemplate { + if cmd != "%s" { + str.WriteString(cmd + "\n") + } else { + for _, stmt := range schema { + str.WriteString(fmt.Sprintf("execute immediate '%s';\n", stmt)) + } + } + } + _, err := db.Exec(str.String()) + return err +} + +func prepareDSN(dataSourceName string) (string, error) { + if len(dataSourceName) == 0 { + dataSourceName = defaultHostDSN + } + config, err := godror.ParseConnString(dataSourceName) + if err != nil { + return "", err + } + + parsedDSN := config.StringWithPassword() + return parsedDSN, nil +} diff --git a/pkg/endpoint/endpoint.go b/pkg/endpoint/endpoint.go index b40dd18d..cbfc2e1f 100644 --- a/pkg/endpoint/endpoint.go +++ b/pkg/endpoint/endpoint.go @@ -8,15 +8,17 @@ import ( "strings" "github.com/pkg/errors" + "github.com/sirupsen/logrus" + "google.golang.org/grpc" + "github.com/rancher/kine/pkg/drivers/dqlite" "github.com/rancher/kine/pkg/drivers/generic" "github.com/rancher/kine/pkg/drivers/mysql" + "github.com/rancher/kine/pkg/drivers/oracle" "github.com/rancher/kine/pkg/drivers/pgsql" "github.com/rancher/kine/pkg/drivers/sqlite" "github.com/rancher/kine/pkg/server" "github.com/rancher/kine/pkg/tls" - "github.com/sirupsen/logrus" - "google.golang.org/grpc" ) const ( @@ -26,6 +28,7 @@ const ( ETCDBackend = "etcd3" MySQLBackend = "mysql" PostgresBackend = "postgres" + OracleBackend = "oracle" ) type Config struct { @@ -133,6 +136,8 @@ func getKineStorageBackend(ctx context.Context, driver, dsn string, cfg Config) backend, err = pgsql.New(ctx, dsn, cfg.Config, cfg.ConnectionPoolConfig) case MySQLBackend: backend, err = mysql.New(ctx, dsn, cfg.Config, cfg.ConnectionPoolConfig) + case OracleBackend: + backend, err = oracle.New(ctx, dsn) default: return false, nil, fmt.Errorf("storage backend is not defined") } diff --git a/pkg/logstructured/sqllog/sql.go b/pkg/logstructured/sqllog/sql.go index 18235a75..3772ed1a 100644 --- a/pkg/logstructured/sqllog/sql.go +++ b/pkg/logstructured/sqllog/sql.go @@ -430,11 +430,12 @@ func (s *SQLLog) poll(result chan interface{}, pollStart int64) { // the same time we write to the channel. saveLast = true rev = event.KV.ModRevision + logCtx := logrus.WithField("key", event.KV.Key).WithField("revision", event.KV.ModRevision).WithField("delete", event.Delete) if s.d.IsFill(event.KV.Key) { - logrus.Debugf("NOT TRIGGER FILL %s, revision=%d, delete=%v", event.KV.Key, event.KV.ModRevision, event.Delete) + logCtx.Debug("not triggered fill") } else { sequential = append(sequential, event) - logrus.Debugf("TRIGGERED %s, revision=%d, delete=%v", event.KV.Key, event.KV.ModRevision, event.Delete) + logCtx.Debug("triggered") } }