Skip to content

Commit

Permalink
Add brain dump on watch flow
Browse files Browse the repository at this point in the history
Signed-off-by: Brad Davidson <[email protected]>
  • Loading branch information
brandond committed Dec 7, 2023
1 parent 5827b22 commit 1c3492c
Show file tree
Hide file tree
Showing 2 changed files with 189 additions and 5 deletions.
9 changes: 4 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
# Kine (Kine is not etcd)
==========================

_NOTE: On 2020-11-19, this repository was moved out of the github.com/rancher org to github.com/k3s-io
supporting the [acceptance of K3s as a CNCF sandbox project](https://github.com/cncf/toc/pull/447)_.

---

Kine is an etcdshim that translates etcd API to:
- SQLite
- Postgres
Expand All @@ -18,3 +13,7 @@ Kine is an etcdshim that translates etcd API to:
- Translates etcdTX calls into the desired API (Create, Update, Delete)

See an [example](/examples/minimal.md).

## Developer Documentation

A high level flow diagram and overview of code structure is available at [docs/flow.md](/docs/flow.md).
185 changes: 185 additions & 0 deletions docs/flow.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
### Overview

Kine implements naieve multi-version concurrency control on top of a traditional SQL database.
It stores all information in a single table; each row stores the revision, key, current value,
previous value, previous revision for this key, and flags indicating if the key was created or
deleted.

The revision or id is an auto-incrementing primary key, and kine attempts to ensure that no
events are missed by enforcing a strict monotonic sequencing of rows. If any gaps are observed,
dummy "fill" records are created to ensure that no other database clients can create a
conflicting event that would lag behind the current revision and cause potential inconsistency.

### Internals

The heart of Kine is a long-running polling goroutine that watches for new rows to be inserted
into the database. New rows are converted to events, sent through an unbuffered broadcaster for
fan-out, and from there go through a sequnce of buffered channels for filtering by prefix and
revision, and grouping into GRPC responses.

Create, Update, and Delete operations wake the polling goroutine to immediately alert it that
there are new rows. Rows inserted by other peers will be discovered by periodic polling.

Range (Get/List) operations directly query the database without interacting with the polling
goroutine.

Database compaction (pruning of deleted or replaced keys) is handled internally by Kine;
compaction requests via GRPC are acknowleged but not acted upon.

Lease/TTL is handled by a simple goroutine that watches all events, and places into a work
queue future removal of any keys that have a TTL. The TTL is checked again when the item is
dequeued, and the current TTL checked to see if removal is still due; if not it is rescheduled.


### Flow Diagram

```mermaid
flowchart LR
subgraph kine ["Kine"]
subgraph grpc-server ["GRPC Server"]
subgraph grpc-txn ["Transaction"]
grpc-create["Create"]
grpc-update["Update"]
grpc-delete["Delete"]
end
subgraph grpc-range["Range"]
grpc-get["Get"]
grpc-list["List"]
end
subgraph grpc-watch ["Watch Stream"]
grpc-watch-create[\"CreateRequest"/]
grpc-watch-response[/"WatchResponse"\]
end
end
subgraph logstructured ["LogStructured"]
logstructured-get("Get")
logstructured-list("List")
logstructured-create("Create")
logstructured-update("Update")
logstructured-delete("Delete")
logstructured-watch("Watch")
end
subgraph sqllog ["SQLLog"]
sqllog-append("Append")
sqllog-list("List")
sqllog-watch("Watch")
sqllog-after("After")
subgraph sqllog-poll ["Poll Loop"]
sqllog-poll-timer("Timer")
sqllog-poll-select("Select")
sqllog-poll-events("Rows to Events")
end
subgraph broadcaster ["Broadcaster"]
broadcaster-subscribe("Subscribe")
end
end
end
subgraph clients ["Clients"]
kube-apiserver((("apiserver")))
etcdctl((("etcdctl")))
end
subgraph database ["Database"]
direction LR
database-table[["table"]]
end
grpc-server --- clients
sqllog ~~~~~~~~~~~~ database
grpc-watch-create == "start" ===> logstructured-watch == "revision" ===> sqllog-watch == "prefix" ===> broadcaster-subscribe
grpc-create --> logstructured-create --> sqllog-append
grpc-update --> logstructured-update --> sqllog-append
grpc-delete --> logstructured-delete --> sqllog-append
grpc-get --> logstructured-get --> sqllog-list
grpc-list --> logstructured-list --> sqllog-list
sqllog-list -- "list query" ---------- database-table
sqllog-append -- "insert returning revision" --- database-table
sqllog-append -- "1a: notify revision" --> sqllog-poll-select
sqllog-poll-timer -. "1b: tick" ..-> sqllog-poll-select
sqllog-poll-select -."2: latest revision" ..-> sqllog-after -. "after query" ..- database-table
sqllog-poll-select -. "3: []row" .-> sqllog-poll-events
sqllog-poll-events -. "4 chan []event" .-> broadcaster-subscribe
broadcaster-subscribe == "chan []event 100<br>prefix filter" ==> sqllog-watch
logstructured-watch == "start revision" ==> sqllog-after == "after query" === database-table
sqllog-watch == "chan []event 100<br>revision filter" ===> logstructured-watch
logstructured-watch == "chan []event 100<br>batching" ===> grpc-watch-response
click grpc-create href "https://github.com/k3s-io/kine/blob/master/pkg/server/create.go" _blank
click grpc-update href "https://github.com/k3s-io/kine/blob/master/pkg/server/update.go" _blank
click grpc-delete href "https://github.com/k3s-io/kine/blob/master/pkg/server/delete.go" _blank
click grpc-get href "https://github.com/k3s-io/kine/blob/master/pkg/server/get.go" _blank
click grpc-list href "https://github.com/k3s-io/kine/blob/master/pkg/server/list.go" _blank
click grpc-watch-create href "https://github.com/k3s-io/kine/blob/master/pkg/server/watch.go#L70" _blank
click grpc-watch-response href "https://github.com/k3s-io/kine/blob/master/pkg/server/watch.go#L119" _blank
click logstructured-create href "https://github.com/k3s-io/kine/blob/master/pkg/logstructured/logstructured.go#L97" _blank
click logstructured-update href "https://github.com/k3s-io/kine/blob/master/pkg/logstructured/logstructured.go#L222" _blank
click logstructured-delete href "https://github.com/k3s-io/kine/blob/master/pkg/logstructured/logstructured.go#L129" _blank
click logstructured-get href "https://github.com/k3s-io/kine/blob/master/pkg/logstructured/logstructured.go#L60" _blank
click logstructured-list href "https://github.com/k3s-io/kine/blob/master/pkg/logstructured/logstructured.go#L171" _blank
click logstructured-watch href "https://github.com/k3s-io/kine/blob/master/pkg/logstructured/logstructured.go#L432" _blank
click sqllog-list href "https://github.com/k3s-io/kine/blob/master/pkg/logstructured/sqllog/sql.go#L274" _blank
click sqllog-append href "https://github.com/k3s-io/kine/blob/master/pkg/logstructured/sqllog/sql.go#L529" _blank
click sqllog-poll-select href "https://github.com/k3s-io/kine/blob/master/pkg/logstructured/sqllog/sql.go#L353" _blank
click sqllog-poll-events href "https://github.com/k3s-io/kine/blob/master/pkg/logstructured/sqllog/sql.go#L402" _blank
click sqllog-watch href "https://github.com/k3s-io/kine/blob/master/pkg/logstructured/sqllog/sql.go#L353" _blank
click broadcaster-subscribe href "https://github.com/k3s-io/kine/blob/master/pkg/broadcaster/broadcaster.go#L16" _blank
```

### Flow Details

This section provides an overview of the sections listed in the block flow diagram above.

* Poll loop (logstructured/sqllog/sql.go)
- Single goroutine started on demand by Broadcaster's ConnectFunc callback if broadcaster is not running when a new subscription is created
- Queries for new rows inserted by other writers sharing the same backend database
- Select is woken early by notify channel which suggests a new revision to check
- Creates fills if jumps in revision ID are observed - ensures that events observe a strict linear sequence without missing any rows
- Converts `[]row` to `[]event` batch and sends to broadcaster channel
Broadcaster channel has no queue

* Broadcaster (broadcaster/broadcaster.go)
- Single goroutine
- Handles fan-out of events from database to watching clients
- Starts poll loop if not running when a subscription is added
- Read `[]event` batch from poll loop via broadcaster channel and sequentially sends to subscriber channels
- If write to a subscriber would block due to its channel being full, the subscription is dropped (channel closed)
Subscriber channel buffer size is 100

* Watch (logstructured/sqllog/sql.go)
- Goroutine per watch
- Create new subscription from broadcaster - "drinking from the firehose" at this stage
- Range reading batch of `[]event` from broadcaster subscription channel
- Filter `[]event` in batch by watch prefix
- Sends filtered `[]event` to logstructured Watch via result channel
Result channel buffer size is 100

* Watch (logstructured/logstructured.go)
- Goroutine per watch
- Create new sqllog Watch with prefix
- Get `[]event` with prefix after selected revision (via sqllog.After) to find any rows that already exist, send to result channel
- Range reading `[]event` batch from sqllog.Watch channel, filter by events since end of After (to avoid sending dupes), send to result channel
Result channel buffer size is 100

* Watch (server/watch.go)
- Goroutine per watch
- Create new logstructured Watch with prefix and revision
- Read `[]event` from Watch channel until channel is drained, building larger []event list
- Uses hybrid blocking initial / nonblocking continued select loop
- Works around fact that local inserts always wake the poll loop immediately, with no opportunity for batching
- Overhead of GRPC messaging is reduced if all queued events are batched before sending
- Group collected `[]event` into WatchResponse and send to client
- Watch continues sending response batches until watch is aborted by server, Cancel called by client, or client disconnects

* Append (logstructured/sqllog/sql.go)
- Called from logstructured.Create/Delete/Update to add a new row
- If insert is successful, sends the new revision into poll loop notify channel to wake it up and read the new row
Revision notify channel buffer size is 1024

0 comments on commit 1c3492c

Please sign in to comment.