-
Notifications
You must be signed in to change notification settings - Fork 187
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(store-sync): add support for live sync from indexer #3226
Conversation
🦋 Changeset detectedLatest commit: a12cb15 The changes in this PR will be included in the next version bump. This PR includes changesets to release 26 packages
Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
The latest updates on your projects. Learn more about Vercel for Git ↗︎
|
debug("error streaming logs from indexer:", e); | ||
debug("falling back to streaming logs from RPC"); | ||
return storedRpcLogs$; | ||
}), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
have we tested the fallback behavior here? any way to make sure e2e tests cover this case like they do for the previous indexers?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The fallback behavior already kicks in if no indexer is provided or the existing indexer doesn't support this API, so in that sense it's covered by the existing e2e tests
return new Observable<MessageEvent>((subscriber) => { | ||
const eventSource = new EventSource(url); | ||
eventSource.onmessage = (ev): void => subscriber.next(ev); | ||
eventSource.onerror = (): void => subscriber.error(new Error("Event source closed: " + url)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am pretty sure EventSource
attempts to reconnect during an error (I think indefinitely) but unclear if it emits onerror
during this reconnect flow. We might want to give it a bad URL and see if this emits a bunch of onerror
and maybe allow a few reconnection attempts before giving up and closing the stream with subscriber.error
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just tried that, it does indeed try to reconnect, but it seems like once it reconnects it skips the blocks that happened while the connection was down and just continues with the current block. I think we can solve this on the indexer side by using the block number as event id and also add code to handle reconnections with last_event_id
, but would leave that as an optimization for later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah exactly, we need to update the indexer SSE stream to include an id
and use the Last-Event-ID
header for reconnections
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ID would likely need to be block number + log index
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be in store-sync/src/sql
? because its specific to sql api?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and could we have a corresponding toStorageAdapterBlock
that contains the blockNumber
conversion so we don't have it in a few places?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's not specific to the sql api, it's the /logs
api
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could rename to isIndexerStorageAdapterBlock
or isIndexerLogsResponse
or something 🤷♂️
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
either way 🤷 just something to differentiate from the StorageAdapterBlock
type (which already uses bigint
not string
for block number)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
renamed to isLogsApiResponse
return new Observable<MessageEvent>((subscriber) => { | ||
const eventSource = new EventSource(url); | ||
eventSource.onmessage = (ev): void => subscriber.next(ev); | ||
eventSource.onerror = (): void => subscriber.error(new Error("Event source failed" + new URL(url).origin)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
eventSource.onerror = (): void => subscriber.error(new Error("Event source failed" + new URL(url).origin)); | |
// we immediately close instead of allowing `EventSource` to retry because the logs API doesn't support `Last-Event-ID` yet | |
eventSource.onerror = (): void => subscriber.error(new Error("Event source failed" + new URL(url).origin)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since fromEventSource
is technically not specific to the logs API, we should probably add an option to fromEventSource
to enable/disable retries, so consumers can decide based on the API capabilities. Would leave for a followup
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@@ -61,7 +64,7 @@ export async function createStoreSync({ | |||
maxBlockRange, | |||
initialState, | |||
initialBlockLogs, | |||
indexerUrl, | |||
indexerUrl: indexerUrlInput, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
small nit: I tend to name these "initial"
indexerUrl: indexerUrlInput, | |
indexerUrl: initialIndexerUrl, |
if (!isLogsApiResponse(data)) { | ||
throw new Error("Received unexpected from indexer:" + messageEvent.data); | ||
} | ||
return { ...data, blockNumber: BigInt(data.blockNumber) }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not blocking but might be nice to move these lines into a toStorageAdapterBlock
since we have this logic in a couple spots
arktype can help a lot with this pattern of parsing+validating+strong types but can save that for a later improvement
const parseManifest = type("string").pipe.try((s) => JSON.parse(s), SystemsManifest); |
mud/packages/world/ts/node/buildSystemsManifest.ts
Lines 13 to 30 in 111bb1b
export const SystemsManifest = type({ | |
systems: [ | |
{ | |
// labels | |
namespaceLabel: "string", | |
label: "string", | |
// resource ID | |
namespace: "string", | |
name: "string", | |
systemId: ["string", ":", (s): s is Hex => isHex(s, { strict: false })], | |
// abi | |
abi: "string[]", | |
worldAbi: "string[]", | |
}, | |
"[]", | |
], | |
createdAt: "number", | |
}); |
No description provided.