Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A package that makes it easier to work with subscribable objects #3301

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/neat-cows-shop.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@solana/subscribable': patch
---

Creates a package for working with subscribable data sources like event targets. From an `EventTarget` or object which conforms to the `EventEmitter` interface you can now create a more ergonomic `DataPublisher` (object with an `on` method that vends an unsubscribe function) or an abortable `AsyncIterable`.
10 changes: 6 additions & 4 deletions packages/errors/src/codes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -294,11 +294,12 @@ export const SOLANA_ERROR__RPC_SUBSCRIPTIONS__TRANSPORT_FAILED_TO_CONNECT = 8190
// Reserve error codes in the range [9900000-9900999].
// These errors should only be thrown when there is a bug with the
// library itself and should, in theory, never reach the end user.
export const SOLANA_ERROR__INVARIANT_VIOLATION__WEBSOCKET_MESSAGE_ITERATOR_STATE_MISSING = 9900000 as const;
export const SOLANA_ERROR__INVARIANT_VIOLATION__WEBSOCKET_MESSAGE_ITERATOR_MUST_NOT_POLL_BEFORE_RESOLVING_EXISTING_MESSAGE_PROMISE =
export const SOLANA_ERROR__INVARIANT_VIOLATION__SUBSCRIPTION_ITERATOR_STATE_MISSING = 9900000 as const;
export const SOLANA_ERROR__INVARIANT_VIOLATION__SUBSCRIPTION_ITERATOR_MUST_NOT_POLL_BEFORE_RESOLVING_EXISTING_MESSAGE_PROMISE =
9900001 as const;
export const SOLANA_ERROR__INVARIANT_VIOLATION__CACHED_ABORTABLE_ITERABLE_CACHE_ENTRY_MISSING = 9900002 as const;
export const SOLANA_ERROR__INVARIANT_VIOLATION__SWITCH_MUST_BE_EXHAUSTIVE = 9900003 as const;
export const SOLANA_ERROR__INVARIANT_VIOLATION__DATA_PUBLISHER_CHANNEL_UNIMPLEMENTED = 9900004 as const;

/**
* A union of every Solana error code
Expand Down Expand Up @@ -419,9 +420,10 @@ export type SolanaErrorCode =
| typeof SOLANA_ERROR__INVALID_BLOCKHASH_BYTE_LENGTH
| typeof SOLANA_ERROR__INVALID_NONCE
| typeof SOLANA_ERROR__INVARIANT_VIOLATION__CACHED_ABORTABLE_ITERABLE_CACHE_ENTRY_MISSING
| typeof SOLANA_ERROR__INVARIANT_VIOLATION__DATA_PUBLISHER_CHANNEL_UNIMPLEMENTED
| typeof SOLANA_ERROR__INVARIANT_VIOLATION__SUBSCRIPTION_ITERATOR_MUST_NOT_POLL_BEFORE_RESOLVING_EXISTING_MESSAGE_PROMISE
| typeof SOLANA_ERROR__INVARIANT_VIOLATION__SUBSCRIPTION_ITERATOR_STATE_MISSING
| typeof SOLANA_ERROR__INVARIANT_VIOLATION__SWITCH_MUST_BE_EXHAUSTIVE
| typeof SOLANA_ERROR__INVARIANT_VIOLATION__WEBSOCKET_MESSAGE_ITERATOR_MUST_NOT_POLL_BEFORE_RESOLVING_EXISTING_MESSAGE_PROMISE
| typeof SOLANA_ERROR__INVARIANT_VIOLATION__WEBSOCKET_MESSAGE_ITERATOR_STATE_MISSING
| typeof SOLANA_ERROR__JSON_RPC__INTERNAL_ERROR
| typeof SOLANA_ERROR__JSON_RPC__INVALID_PARAMS
| typeof SOLANA_ERROR__JSON_RPC__INVALID_REQUEST
Expand Down
5 changes: 5 additions & 0 deletions packages/errors/src/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ import {
SOLANA_ERROR__INVALID_BLOCKHASH_BYTE_LENGTH,
SOLANA_ERROR__INVALID_NONCE,
SOLANA_ERROR__INVARIANT_VIOLATION__CACHED_ABORTABLE_ITERABLE_CACHE_ENTRY_MISSING,
SOLANA_ERROR__INVARIANT_VIOLATION__DATA_PUBLISHER_CHANNEL_UNIMPLEMENTED,
SOLANA_ERROR__INVARIANT_VIOLATION__SWITCH_MUST_BE_EXHAUSTIVE,
SOLANA_ERROR__JSON_RPC__INTERNAL_ERROR,
SOLANA_ERROR__JSON_RPC__INVALID_PARAMS,
Expand Down Expand Up @@ -406,6 +407,10 @@ export type SolanaErrorContext = DefaultUnspecifiedErrorContextToUndefined<
[SOLANA_ERROR__INVARIANT_VIOLATION__CACHED_ABORTABLE_ITERABLE_CACHE_ENTRY_MISSING]: {
cacheKey: string;
};
[SOLANA_ERROR__INVARIANT_VIOLATION__DATA_PUBLISHER_CHANNEL_UNIMPLEMENTED]: {
channelName: string;
supportedChannelNames: string[];
};
[SOLANA_ERROR__INVARIANT_VIOLATION__SWITCH_MUST_BE_EXHAUSTIVE]: {
unexpectedValue: unknown;
};
Expand Down
20 changes: 12 additions & 8 deletions packages/errors/src/messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,10 @@ import {
SOLANA_ERROR__INVALID_BLOCKHASH_BYTE_LENGTH,
SOLANA_ERROR__INVALID_NONCE,
SOLANA_ERROR__INVARIANT_VIOLATION__CACHED_ABORTABLE_ITERABLE_CACHE_ENTRY_MISSING,
SOLANA_ERROR__INVARIANT_VIOLATION__DATA_PUBLISHER_CHANNEL_UNIMPLEMENTED,
SOLANA_ERROR__INVARIANT_VIOLATION__SUBSCRIPTION_ITERATOR_MUST_NOT_POLL_BEFORE_RESOLVING_EXISTING_MESSAGE_PROMISE,
SOLANA_ERROR__INVARIANT_VIOLATION__SUBSCRIPTION_ITERATOR_STATE_MISSING,
SOLANA_ERROR__INVARIANT_VIOLATION__SWITCH_MUST_BE_EXHAUSTIVE,
SOLANA_ERROR__INVARIANT_VIOLATION__WEBSOCKET_MESSAGE_ITERATOR_MUST_NOT_POLL_BEFORE_RESOLVING_EXISTING_MESSAGE_PROMISE,
SOLANA_ERROR__INVARIANT_VIOLATION__WEBSOCKET_MESSAGE_ITERATOR_STATE_MISSING,
SOLANA_ERROR__JSON_RPC__INTERNAL_ERROR,
SOLANA_ERROR__JSON_RPC__INVALID_PARAMS,
SOLANA_ERROR__JSON_RPC__INVALID_REQUEST,
Expand Down Expand Up @@ -393,17 +394,20 @@ export const SolanaErrorMessages: Readonly<{
'Invariant violation: Found no abortable iterable cache entry for key `$cacheKey`. It ' +
'should be impossible to hit this error; please file an issue at ' +
'https://sola.na/web3invariant',
[SOLANA_ERROR__INVARIANT_VIOLATION__SWITCH_MUST_BE_EXHAUSTIVE]:
'Invariant violation: Switch statement non-exhaustive. Received unexpected value ' +
'`$unexpectedValue`. It should be impossible to hit this error; please file an issue at ' +
'https://sola.na/web3invariant',
[SOLANA_ERROR__INVARIANT_VIOLATION__WEBSOCKET_MESSAGE_ITERATOR_MUST_NOT_POLL_BEFORE_RESOLVING_EXISTING_MESSAGE_PROMISE]:
[SOLANA_ERROR__INVARIANT_VIOLATION__DATA_PUBLISHER_CHANNEL_UNIMPLEMENTED]:
'Invariant violation: This data publisher does not publish to the channel named ' +
'`$channelName`. Supported channels include $supportedChannelNames.',
[SOLANA_ERROR__INVARIANT_VIOLATION__SUBSCRIPTION_ITERATOR_MUST_NOT_POLL_BEFORE_RESOLVING_EXISTING_MESSAGE_PROMISE]:
'Invariant violation: WebSocket message iterator state is corrupt; iterated without first ' +
'resolving existing message promise. It should be impossible to hit this error; please ' +
'file an issue at https://sola.na/web3invariant',
[SOLANA_ERROR__INVARIANT_VIOLATION__WEBSOCKET_MESSAGE_ITERATOR_STATE_MISSING]:
[SOLANA_ERROR__INVARIANT_VIOLATION__SUBSCRIPTION_ITERATOR_STATE_MISSING]:
'Invariant violation: WebSocket message iterator is missing state storage. It should be ' +
'impossible to hit this error; please file an issue at https://sola.na/web3invariant',
[SOLANA_ERROR__INVARIANT_VIOLATION__SWITCH_MUST_BE_EXHAUSTIVE]:
'Invariant violation: Switch statement non-exhaustive. Received unexpected value ' +
'`$unexpectedValue`. It should be impossible to hit this error; please file an issue at ' +
'https://sola.na/web3invariant',
[SOLANA_ERROR__JSON_RPC__INTERNAL_ERROR]: 'JSON-RPC error: Internal JSON-RPC error ($__serverMessage)',
[SOLANA_ERROR__JSON_RPC__INVALID_PARAMS]: 'JSON-RPC error: Invalid method parameter(s) ($__serverMessage)',
[SOLANA_ERROR__JSON_RPC__INVALID_REQUEST]:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import {
SOLANA_ERROR__INVARIANT_VIOLATION__WEBSOCKET_MESSAGE_ITERATOR_MUST_NOT_POLL_BEFORE_RESOLVING_EXISTING_MESSAGE_PROMISE,
SOLANA_ERROR__INVARIANT_VIOLATION__WEBSOCKET_MESSAGE_ITERATOR_STATE_MISSING,
SOLANA_ERROR__INVARIANT_VIOLATION__SUBSCRIPTION_ITERATOR_MUST_NOT_POLL_BEFORE_RESOLVING_EXISTING_MESSAGE_PROMISE,
SOLANA_ERROR__INVARIANT_VIOLATION__SUBSCRIPTION_ITERATOR_STATE_MISSING,
SOLANA_ERROR__RPC_SUBSCRIPTIONS__TRANSPORT_CLOSED_BEFORE_MESSAGE_BUFFERED,
SOLANA_ERROR__RPC_SUBSCRIPTIONS__TRANSPORT_CONNECTION_CLOSED,
SOLANA_ERROR__RPC_SUBSCRIPTIONS__TRANSPORT_FAILED_TO_CONNECT,
Expand Down Expand Up @@ -139,13 +139,13 @@ export async function createWebSocketConnection({
if (!state) {
// There should always be state by now.
throw new SolanaError(
SOLANA_ERROR__INVARIANT_VIOLATION__WEBSOCKET_MESSAGE_ITERATOR_STATE_MISSING,
SOLANA_ERROR__INVARIANT_VIOLATION__SUBSCRIPTION_ITERATOR_STATE_MISSING,
);
}
if (state.__hasPolled) {
// You should never be able to poll twice in a row.
throw new SolanaError(
SOLANA_ERROR__INVARIANT_VIOLATION__WEBSOCKET_MESSAGE_ITERATOR_MUST_NOT_POLL_BEFORE_RESOLVING_EXISTING_MESSAGE_PROMISE,
SOLANA_ERROR__INVARIANT_VIOLATION__SUBSCRIPTION_ITERATOR_MUST_NOT_POLL_BEFORE_RESOLVING_EXISTING_MESSAGE_PROMISE,
);
}
const queuedMessages = state.queuedMessages;
Expand Down
1 change: 1 addition & 0 deletions packages/subscribable/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
dist/
1 change: 1 addition & 0 deletions packages/subscribable/.npmrc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
engine-strict=true
1 change: 1 addition & 0 deletions packages/subscribable/.prettierignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
dist/
1 change: 1 addition & 0 deletions packages/subscribable/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# @solana/subscribable
20 changes: 20 additions & 0 deletions packages/subscribable/LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
Copyright (c) 2023 Solana Labs, Inc

Permission is hereby granted, free of charge, to any person obtaining
a copy of this software and associated documentation files (the
"Software"), to deal in the Software without restriction, including
without limitation the rights to use, copy, modify, merge, publish,
distribute, sublicense, and/or sell copies of the Software, and to
permit persons to whom the Software is furnished to do so, subject to
the following conditions:

The above copyright notice and this permission notice shall be
included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
99 changes: 99 additions & 0 deletions packages/subscribable/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
[![npm][npm-image]][npm-url]
[![npm-downloads][npm-downloads-image]][npm-url]
[![semantic-release][semantic-release-image]][semantic-release-url]
<br />
[![code-style-prettier][code-style-prettier-image]][code-style-prettier-url]

[code-style-prettier-image]: https://img.shields.io/badge/code_style-prettier-ff69b4.svg?style=flat-square
[code-style-prettier-url]: https://github.com/prettier/prettier
[npm-downloads-image]: https://img.shields.io/npm/dm/@solana/subscribable/rc.svg?style=flat
[npm-image]: https://img.shields.io/npm/v/@solana/subscribable/rc.svg?style=flat
[npm-url]: https://www.npmjs.com/package/@solana/subscribable/v/rc
[semantic-release-image]: https://img.shields.io/badge/%20%20%F0%9F%93%A6%F0%9F%9A%80-semantic--release-e10079.svg
[semantic-release-url]: https://github.com/semantic-release/semantic-release

# @solana/subscribable

This package contains utilities for creating subscription-based event targets. These differ from the `EventTarget` interface in that the method you use to add a listener returns an unsubscribe function. It is primarily intended for internal use &ndash; particularly for those building `RpcSubscriptionChannels` and associated infrastructure.

## Types

### `DataPublisher<TDataByChannelName>`

This type represents an object with an `on` function that you can call to subscribe to certain data over a named channel.

```ts
let dataPublisher: DataPublisher<{ error: SolanaError }>;
dataPublisher.on('data', handleData); // ERROR. `data` is not a known channel name.
dataPublisher.on('error', e => {
console.error(e);
}); // OK.
```

### `TypedEventEmitter<TEventMap>`

This type allows you to type `addEventListener` and `removeEventListener` so that the call signature of the listener matches the event type given.

```ts
const emitter: TypedEventEmitter<{ message: MessageEvent }> = new WebSocket('wss://api.devnet.solana.com');
emitter.addEventListener('data', handleData); // ERROR. `data` is not a known event type.
emitter.addEventListener('message', message => {
console.log(message.origin); // OK. `message` is a `MessageEvent` so it has an `origin` property.
});
```

### `TypedEventTarget<TEventMap>`

This type is a superset of `TypedEventEmitter` that allows you to constrain calls to `dispatchEvent`.

```ts
const target: TypedEventTarget<{ candyVended: CustomEvent<{ flavour: string }> }> = new EventTarget();
target.dispatchEvent(new CustomEvent('candyVended', { detail: { flavour: 'raspberry' } })); // OK.
target.dispatchEvent(new CustomEvent('candyVended', { detail: { flavor: 'raspberry' } })); // ERROR. Misspelling in detail.
```

## Functions

### `createAsyncIterableFromDataPublisher({ abortSignal, dataChannelName, dataPublisher, errorChannelName })`

Returns an `AsyncIterable` given a data publisher. The iterable will produce iterators that vend messages published to `dataChannelName` and will throw the first time a message is published to `errorChannelName`. Triggering the abort signal will cause all iterators spawned from this iterator to return once they have published all queued messages.

```ts
const iterable = createAsyncIterableFromDataPublisher({
abortSignal: AbortSignal.timeout(10_000),
dataChannelName: 'message',
dataPublisher,
errorChannelName: 'error',
});
try {
for await (const message of iterable) {
console.log('Got message', message);
}
} catch (e) {
console.error('An error was published to the error channel', e);
} finally {
console.log("It's been 10 seconds; that's enough for now.");
}
```

Things to note:

- If a message is published over a channel before the `AsyncIterator` attached to it has polled for the next result, the message will be queued in memory.
- Messages only begin to be queued after the first time an iterator begins to poll. Channel messages published before that time will be dropped.
- If there are messages in the queue and an error occurs, all queued messages will be vended to the iterator before the error is thrown.
- If there are messages in the queue and the abort signal fires, all queued messages will be vended to the iterator after which it will return.
- Any new iterators created after the first error is encountered will reject with that error when polled.

### `getDataPublisherFromEventEmitter(emitter)`

Returns an object with an `on` function that you can call to subscribe to certain data over a named channel. The `on` function returns an unsubscribe function.

```ts
const socketDataPublisher = getDataPublisherFromEventEmitter(new WebSocket('wss://api.devnet.solana.com'));
const unsubscribe = socketDataPublisher.on('message', message => {
if (JSON.parse(message.data).id === 42) {
console.log('Got response 42');
unsubscribe();
}
});
```
90 changes: 90 additions & 0 deletions packages/subscribable/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
{
"name": "@solana/subscribable",
"version": "2.0.0-rc.1",
"description": "Helpers for creating subscription-based event emitters",
"exports": {
"edge-light": {
"import": "./dist/index.node.mjs",
"require": "./dist/index.node.cjs"
},
"workerd": {
"import": "./dist/index.node.mjs",
"require": "./dist/index.node.cjs"
},
"browser": {
"import": "./dist/index.browser.mjs",
"require": "./dist/index.browser.cjs"
},
"node": {
"import": "./dist/index.node.mjs",
"require": "./dist/index.node.cjs"
},
"react-native": "./dist/index.native.mjs",
"types": "./dist/types/index.d.ts"
},
"browser": {
"./dist/index.node.cjs": "./dist/index.browser.cjs",
"./dist/index.node.mjs": "./dist/index.browser.mjs"
},
"main": "./dist/index.node.cjs",
"module": "./dist/index.node.mjs",
"react-native": "./dist/index.native.mjs",
"types": "./dist/types/index.d.ts",
"type": "commonjs",
"files": [
"./dist/"
],
"sideEffects": false,
"keywords": [
"blockchain",
"solana",
"web3"
],
"scripts": {
"compile:js": "tsup --config build-scripts/tsup.config.package.ts",
"compile:typedefs": "tsc -p ./tsconfig.declarations.json",
"dev": "jest -c ../../node_modules/@solana/test-config/jest-dev.config.ts --rootDir . --watch",
"prepublishOnly": "pnpm pkg delete devDependencies",
"publish-impl": "npm view $npm_package_name@$npm_package_version > /dev/null 2>&1 || pnpm publish --tag ${PUBLISH_TAG:-canary} --access public --no-git-checks",
"publish-packages": "pnpm prepublishOnly && pnpm publish-impl",
"style:fix": "pnpm eslint --fix src && pnpm prettier --log-level warn --ignore-unknown --write ./*",
"test:lint": "TERM_OVERRIDE=\"${TURBO_HASH:+dumb}\" TERM=${TERM_OVERRIDE:-$TERM} jest -c ../../node_modules/@solana/test-config/jest-lint.config.ts --rootDir . --silent",
"test:prettier": "TERM_OVERRIDE=\"${TURBO_HASH:+dumb}\" TERM=${TERM_OVERRIDE:-$TERM} jest -c ../../node_modules/@solana/test-config/jest-prettier.config.ts --rootDir . --silent",
"test:treeshakability:browser": "agadoo dist/index.browser.mjs",
"test:treeshakability:native": "agadoo dist/index.native.mjs",
"test:treeshakability:node": "agadoo dist/index.node.mjs",
"test:typecheck": "tsc --noEmit",
"test:unit:browser": "TERM_OVERRIDE=\"${TURBO_HASH:+dumb}\" TERM=${TERM_OVERRIDE:-$TERM} jest -c ../../node_modules/@solana/test-config/jest-unit.config.browser.ts --rootDir . --silent",
"test:unit:node": "TERM_OVERRIDE=\"${TURBO_HASH:+dumb}\" TERM=${TERM_OVERRIDE:-$TERM} jest -c ../../node_modules/@solana/test-config/jest-unit.config.node.ts --rootDir . --silent"
},
"author": "Solana Labs Maintainers <[email protected]>",
"license": "MIT",
"repository": {
"type": "git",
"url": "https://github.com/solana-labs/solana-web3.js"
},
"bugs": {
"url": "http://github.com/solana-labs/solana-web3.js/issues"
},
"browserslist": [
"supports bigint and not dead",
"maintained node versions"
],
"engine": {
"node": ">=17.4"
},
"dependencies": {
"@solana/errors": "workspace:*"
},
"peerDependencies": {
"typescript": ">=5"
},
"bundlewatch": {
"defaultCompression": "gzip",
"files": [
{
"path": "./dist/index*.js"
}
]
}
}
Loading