Skip to content

Commit

Permalink
fix(python-runtime): add compliance test for async context callbacks
Browse files Browse the repository at this point in the history
feat(kernel): process callbacks while EndRequest is awaiting promise
  • Loading branch information
mrgrain committed Mar 3, 2023
1 parent 863b19b commit e4d838d
Show file tree
Hide file tree
Showing 18 changed files with 777 additions and 116 deletions.
2 changes: 2 additions & 0 deletions packages/@jsii/kernel/src/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ export function isPropertyOverride(value: Override): value is PropertyOverride {

export interface Callback {
readonly cbid: string;
/** Whether this callback is synchronous. */
readonly sync: boolean;
readonly cookie: string | undefined;
readonly invoke?: InvokeRequest;
readonly get?: GetRequest;
Expand Down
30 changes: 30 additions & 0 deletions packages/@jsii/kernel/src/kernel.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2212,6 +2212,36 @@ defineTest('invokeBinScript() accepts arguments', (sandbox) => {
});
});

// defineTest('ImplementationFromAsyncContext compliance', async (sandbox) => {
// const producer = sandbox.create({
// fqn: 'Object',
// overrides: [{ method: 'produce', cookie: 'produce1234' }],
// interfaces: ['jsii-calc.IPromiseProducer'],
// });

// const obj = sandbox.create({
// fqn: 'jsii-calc.ImplementationFromAsyncContext',
// args: [producer],
// });

// const promise1 = sandbox.begin({
// objref: obj,
// method: 'doAsyncWork',
// });

// const callbacks1 = sandbox.callbacks();
// expect(callbacks1.callbacks.length).toBe(1);
// expect(callbacks1.callbacks[0].cookie).toBe('produce1234');

// sandbox.complete({
// cbid: callbacks1.callbacks[0].cbid,
// result: 'test-string',
// });

// const result = (await sandbox.end(promise1)).result;
// expect(result).toBe('test-string');
// });

// =================================================================================================

const testNames: { [name: string]: boolean } = {};
Expand Down
54 changes: 52 additions & 2 deletions packages/@jsii/kernel/src/kernel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,52 @@ export class Kernel {

let result;
try {
result = await promise;
let settled = false;
/**
* Poll for new callback requests until the promise is resolved. This is
* to allow any promises necessary for the promise to be able to settle.
* We use setImmediate so the next poll happens on the next run loop tick,
* after other microtasks might have been paused on a pending callback.
*/
// eslint-disable-next-line no-inner-declarations
function pollForCallbacks(kernel: Kernel) {
// Promise has settled already, not going any further...
if (settled) {
return;
}

for (const [cbid, cb] of kernel.cbs.entries()) {
kernel.waiting.set(cbid, cb);
kernel.cbs.delete(cbid);
try {
cb.succeed(
kernel.callbackHandler({
cbid,
sync: false,
cookie: cb.override.cookie,
invoke: {
objref: cb.objref,
method: cb.override.method,
args: cb.args,
},
}),
);
} catch (err) {
cb.fail(err);
} finally {
kernel.waiting.delete(cbid);
}
}
if (!settled) {
setImmediate(pollForCallbacks, kernel);
}
}
pollForCallbacks(this);

result = await promise.finally(() => {
settled = true;
});

this._debug('promise result:', result);
} catch (e: any) {
this._debug('promise error:', e);
Expand Down Expand Up @@ -475,14 +520,16 @@ export class Kernel {
};
}

/** @deprecated the flow should be handled directly by "end" */
public callbacks(_req?: api.CallbacksRequest): api.CallbacksResponse {
this._debug('callbacks');
const ret = Array.from(this.cbs.entries()).map(([cbid, cb]) => {
this.waiting.set(cbid, cb); // move to waiting
this.cbs.delete(cbid); // remove from created
const callback: api.Callback = {
cbid,
cookie: cb.override.cookie,
cbid,
sync: false,
invoke: {
objref: cb.objref,
method: cb.override.method,
Expand Down Expand Up @@ -758,6 +805,7 @@ export class Kernel {
const result = this.callbackHandler({
cookie: override.cookie,
cbid: this._makecbid(),
sync: true,
get: { objref, property: propertyName },
});
this._debug('callback returned', result);
Expand All @@ -774,6 +822,7 @@ export class Kernel {
this.callbackHandler({
cookie: override.cookie,
cbid: this._makecbid(),
sync: true,
set: {
objref,
property: propertyName,
Expand Down Expand Up @@ -910,6 +959,7 @@ export class Kernel {
const result = this.callbackHandler({
cookie: override.cookie,
cbid: this._makecbid(),
sync: true,
invoke: {
objref,
method: methodName,
Expand Down
27 changes: 7 additions & 20 deletions packages/@jsii/python-runtime/src/jsii/_kernel/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import datetime
import inspect
import itertools
import time
from types import FunctionType, MethodType, BuiltinFunctionType, LambdaType

from typing import Callable, cast, Any, List, Optional, Sequence, Type
Expand Down Expand Up @@ -28,6 +29,7 @@
CreateResponse,
DeleteRequest,
EndRequest,
EndResponse,
EnumRef,
GetRequest,
GetResponse,
Expand Down Expand Up @@ -466,26 +468,11 @@ def ainvoke(self, obj: Any, method: str, args: Optional[List[Any]] = None) -> An
if isinstance(promise, Callback):
promise = _callback_till_result(self, promise, BeginResponse)

callbacks = self.provider.callbacks(CallbacksRequest()).callbacks
while callbacks:
for callback in callbacks:
try:
result = _handle_callback(self, callback)
except Exception as exc:
# TODO: Maybe we want to print the whole traceback here?
complete = self.provider.complete(
CompleteRequest(cbid=callback.cbid, err=str(exc))
)
else:
complete = self.provider.complete(
CompleteRequest(cbid=callback.cbid, result=result)
)

assert complete.cbid == callback.cbid

callbacks = self.provider.callbacks(CallbacksRequest()).callbacks

return self.provider.end(EndRequest(promiseid=promise.promiseid)).result
response = self.provider.end(EndRequest(promiseid=promise.promiseid))
if isinstance(response, Callback):
return _callback_till_result(self, response, EndResponse).result
else:
return response.result

def stats(self):
resp = self.provider.stats(StatsRequest())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ def stop(self):
assert self._process.stdin is not None
if not self._process.stdin.closed:
self._process.stdin.write(b'{"exit":0}\n')
# Close the process' STDIN, singaling we are done with it
# Close the process' STDIN, signaling we are done with it
self._process.stdin.close()

try:
Expand Down
1 change: 1 addition & 0 deletions packages/@jsii/python-runtime/src/jsii/_kernel/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ class StatsResponse:
LoadResponse,
CreateResponse,
DeleteResponse,
EndResponse,
GetResponse,
InvokeResponse,
InvokeScriptResponse,
Expand Down
19 changes: 18 additions & 1 deletion packages/@jsii/python-runtime/tests/test_compliance.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@
AnonymousImplementationProvider,
UpcasingReflectable,
PromiseNothing,
IPromiseProducer,
ImplementationFromAsyncContext,
)
from jsii_calc.cdk16625 import Cdk16625
from jsii_calc.cdk22369 import AcceptsPath
Expand Down Expand Up @@ -528,6 +530,7 @@ def test_asyncOverrides_overrideAsyncMethodByParentClass():
assert obj.call_me() == 4452


# fails for no reason and poisons the runtime
def test_asyncOverrides_overrideCallsSuper():
obj = OverrideCallsSuper()
assert obj.override_me(12) == 1441
Expand Down Expand Up @@ -1361,5 +1364,19 @@ def test_void_returning_async():
"""Verifies it's okay to return a Promise<void>."""

assert PromiseNothing().instance_promise_it() is None
## TODO: This is currently broken as code-gen is incorrect for static async.
# TODO: This is currently broken as code-gen is incorrect for static async.
# assert PromiseNothing.promise_it() is None


def test_calling_implementation_from_async_context():
@jsii.implements(IPromiseProducer)
class ConcreteProducer:
def produce(self) -> str:
return "result"

producer = ConcreteProducer()

assert producer.produce() == "result"

worker = ImplementationFromAsyncContext(producer)
assert worker.do_async_work() == "result"
8 changes: 4 additions & 4 deletions packages/@jsii/runtime/lib/host.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ export class KernelHost {
return this.processRequest(
req,
completeCallback.bind(this),
/* sync */ true,
callback.sync,
);
}
}
Expand Down Expand Up @@ -126,7 +126,7 @@ export class KernelHost {
// promises. see the kernel test 'async overrides: two overrides'
// for an example for this use case.
if (apiReq.api === 'begin' || apiReq.api === 'complete') {
checkIfAsyncIsAllowed();
assertAsyncIsAllowed();

this.debug('processing pending promises before responding');

Expand All @@ -141,7 +141,7 @@ export class KernelHost {
// if this is an async method, return immediately and
// call next only when the promise is fulfilled.
if (this.isPromise(ret)) {
checkIfAsyncIsAllowed();
assertAsyncIsAllowed();

this.debug('waiting for promise to be fulfilled');

Expand Down Expand Up @@ -169,7 +169,7 @@ export class KernelHost {
// indicate this request was processed (synchronously).
return next();

function checkIfAsyncIsAllowed() {
function assertAsyncIsAllowed() {
if (sync) {
throw new JsiiFault(
'Cannot handle async operations while waiting for a sync callback to return',
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 25 additions & 0 deletions packages/jsii-calc/lib/compliance.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3141,3 +3141,28 @@ export class PromiseNothing {
return PromiseNothing.promiseIt();
}
}

/**
* Async Context operations
* Validates features work when run from within an async context
*
* @see https://github.com/aws/jsii/issues/3917
*/
export interface IPromiseProducer {
produce(): Promise<string>;
}

export class ImplementationFromAsyncContext {
public constructor(private readonly producer: IPromiseProducer) {}

public async doAsyncWork(): Promise<string> {
await this.sleep(200);
return this.producer.produce();
}

private async sleep(ms: number) {
return new Promise((resolve) => {
setTimeout(resolve, ms);
});
}
}
Loading

0 comments on commit e4d838d

Please sign in to comment.