Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(Event streams): add support for cancellation and IAsyncEnumerable #3543

Closed

Conversation

tlecomte
Copy link

@tlecomte tlecomte commented Nov 4, 2024

Description

Adjust EnumerableEventStream to add support for cancellation and for IAsyncEnumerable:

  • IEnumerableEventStream inherits from IAsyncEnumerable. That means that we can use await foreach on it, for example.
  • EnumerableEventStream implements it with a new GetAsyncEnumerator method that mimics the existing GetEnumerator, but uses ReadFromStreamAsync so that it does not block the calling thread while waiting on the incoming network stream.

For example, to process the response stream from Bedrock ConverseStreamRequest, before this change we had to do:

var response = await bedrockRuntimeClient.ConverseStreamAsync(converseStreamRequest);

foreach (var item in response.Stream.AsEnumerable())
{
      // do something with the item
}

The problem above is that foreach (var item in response.Stream.AsEnumerable()) is blocking the current .NET thread while waiting for the events to arrive, so it could cause thread pool exhaustion.

With the changes in this pull request, we can write instead:

var response = await bedrockRuntimeClient.ConverseStreamAsync(converseStreamRequest);

await foreach (var item in response.Stream)
{
      // do something with the item
}

Notice the await foreach that is possible now that IEnumerableEventStream implements IAsyncEnumerable. The benefit is that the .NET thread is not blocked.

It is also easy to add support for cancellation. For example:

await foreach (var item in response.Stream.WithCancellation(cancellationToken))
{
      // do something with the item
}

Motivation and Context

The semantics of AWS event streams map well to .NET IAsyncEnumerable, but it's hard to build an IAsyncEnumerable from such a stream. EnumerableEventStream does provide an async StartProcessingAsync method, but only a synchronous enumerator. So the happy path with the current API is to use a synchronous enumerator, which has the downside of blocking the current .NET thread while enumerating the events. This can lead to the thread pool starvation.

This addresses #3542

Testing

Added an integration test. To run it, the test code needs to be adjusted to comment out the Ignore attribute, give credentials and a region.

Screenshots (if appropriate)

Types of changes

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to change)

Checklist

  • My code follows the code style of this project
  • My change requires a change to the documentation
  • I have updated the documentation accordingly
  • I have read the README document
  • I have added tests to cover my changes
  • All new and existing tests passed

License

  • I confirm that this pull request can be released under the Apache 2 license

@dscpinheiro dscpinheiro changed the base branch from main to main-staging November 4, 2024 18:42
@tlecomte
Copy link
Author

tlecomte commented Nov 4, 2024

The issue checklist mentions updating the documentation. I think that it may apply here, and I'm happy to help, but I would need a pointer to know where the source for the doc is maintained.

@tlecomte tlecomte force-pushed the EventStreamAsyncEnumerator branch from 0887038 to b60de19 Compare November 6, 2024 09:08
@tlecomte
Copy link
Author

tlecomte commented Nov 6, 2024

Removed the unrelated changes in BedrockRuntime.sln.

@peterrsongg
Copy link
Contributor

@tlecomte Thanks for the contribution, we just happened to receive another PR shortly after yours that also addresses the same problem you're running into. We'll review that PR instead as it targets the next major version of the SDK and it allows us to use newer features in the .NET Framework (your PR would need to account for .NET 3.5 as well, for example)." This means that this feature will be available in V4 of the SDK.

#3545

@peterrsongg peterrsongg closed this Nov 6, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants