Skip to content

Commit

Permalink
feat(Event streams): add support for cancellation and IAsyncEnumerator
Browse files Browse the repository at this point in the history
  • Loading branch information
Timothée Lecomte committed Nov 4, 2024
1 parent ad13fd4 commit d27cdac
Show file tree
Hide file tree
Showing 5 changed files with 217 additions and 44 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/*******************************************************************************
/*******************************************************************************
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Licensed under the Apache License, Version 2.0 (the "License"). You may not use
* this file except in compliance with the License. A copy of the License is located at
Expand All @@ -25,6 +25,7 @@
using System.Diagnostics.CodeAnalysis;
using System.IO;
#if AWS_ASYNC_API
using System.Threading;
using System.Threading.Tasks;
#endif

Expand All @@ -36,7 +37,12 @@ namespace Amazon.Runtime.EventStreams.Internal
/// <typeparam name="T">An implementation of IEventStreamEvent (e.g. IS3Event).</typeparam>
/// <typeparam name="TE">An implementation of EventStreamException (e.g. S3EventStreamException).</typeparam>
[SuppressMessage("Microsoft.Naming", "CA1710", Justification = "IEventStreamCollection is not descriptive.")]
#if AWS_ASYNC_ENUMERABLES_API

public interface IEnumerableEventStream<T, TE> : IEventStream<T, TE>, IEnumerable<T>, IAsyncEnumerable<T> where T : IEventStreamEvent where TE : EventStreamException, new()
#else
public interface IEnumerableEventStream<T, TE> : IEventStream<T, TE>, IEnumerable<T> where T : IEventStreamEvent where TE : EventStreamException, new()
#endif
{
}

Expand Down Expand Up @@ -171,13 +177,72 @@ public override void StartProcessing()
///
/// The Task will be completed when all of the events from the stream have been processed.
/// </summary>
public override async Task StartProcessingAsync()
public override async Task StartProcessingAsync(CancellationToken cancellationToken = default)
{
// If they are/have enumerated, the event-driven mode should be disabled
if (IsEnumerated) throw new InvalidOperationException(MutuallyExclusiveExceptionMessage);

await base.StartProcessingAsync().ConfigureAwait(false);
await base.StartProcessingAsync(cancellationToken).ConfigureAwait(false);
}
#endif

#if AWS_ASYNC_ENUMERABLES_API
/// <summary>
/// Returns an async enumerator that iterates through the collection.
/// </summary>
/// <returns>An async enumerator that can be used to iterate through the collection.</returns>
public async IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken)
{
if (IsProcessing)
{
// If the queue has already begun processing, refuse to enumerate.
throw new InvalidOperationException(MutuallyExclusiveExceptionMessage);
}

// There could be more than 1 message created per decoder cycle.
var events = new Queue<T>();

// Opting out of events - letting the enumeration handle everything.
IsEnumerated = true;
IsProcessing = true;

// Enumeration is just magic over the event driven mechanism.
EventReceived += (sender, args) => events.Enqueue(args.EventStreamEvent);

var buffer = new byte[BufferSize];

while (IsProcessing)
{
// If there are already events ready to be served, do not ask for more.
if (events.Count > 0)
{
var ev = events.Dequeue();
// Enumeration handles terminal events on behalf of the user.
if (ev is IEventStreamTerminalEvent)
{
IsProcessing = false;
Dispose();
}

yield return ev;
}
else
{
try
{
await ReadFromStreamAsync(buffer, cancellationToken).ConfigureAwait(false);
}
catch (Exception ex)
{
IsProcessing = false;
Dispose();

// Wrap exceptions as needed to match event-driven behavior.
throw WrapException(ex);
}
}
}
}
#endif
}
}
}
22 changes: 11 additions & 11 deletions sdk/src/Core/Amazon.Runtime/EventStreams/Internal/EventStream.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/*
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
Expand All @@ -17,10 +17,9 @@
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.IO;
using System.Threading;
#if AWS_ASYNC_API
using System.Threading.Tasks;
#else
using System.Threading;
#endif

namespace Amazon.Runtime.EventStreams.Internal
Expand Down Expand Up @@ -55,7 +54,7 @@ namespace Amazon.Runtime.EventStreams.Internal
///
/// The Task will be completed when all of the events from the stream have been processed.
/// </summary>
Task StartProcessingAsync();
Task StartProcessingAsync(CancellationToken cancellationToken = default);
#endif
}

Expand Down Expand Up @@ -262,23 +261,23 @@ protected void Process()
{
#if AWS_ASYNC_API
// Task only exists in framework 4.5 and up, and Standard.
Task.Run(() => ProcessLoopAsync());
Task.Run(() => ProcessLoopAsync(CancellationToken.None));
#else
// ThreadPool only exists in 3.5 and below. These implementations do not have the Task library.
ThreadPool.QueueUserWorkItem(ProcessLoop);
#endif
}

#if AWS_ASYNC_API
private async Task ProcessLoopAsync()
private async Task ProcessLoopAsync(CancellationToken cancellationToken)
{
var buffer = new byte[BufferSize];

try
{
while (IsProcessing)
{
await ReadFromStreamAsync(buffer).ConfigureAwait(false);
await ReadFromStreamAsync(buffer, cancellationToken).ConfigureAwait(false);
}
}
// These exceptions are raised on the background thread. They are fired as events for visibility.
Expand Down Expand Up @@ -351,9 +350,10 @@ protected void ReadFromStream(byte[] buffer)
/// each message it decodes.
/// </summary>
/// <param name="buffer">The buffer to store the read bytes from the stream.</param>
protected async Task ReadFromStreamAsync(byte[] buffer)
/// <param name="cancellationToken">A cancellation token.</param>
protected async Task ReadFromStreamAsync(byte[] buffer, CancellationToken cancellationToken)
{
var bytesRead = await NetworkStream.ReadAsync(buffer, 0, buffer.Length).ConfigureAwait(false);
var bytesRead = await NetworkStream.ReadAsync(buffer, 0, buffer.Length, cancellationToken).ConfigureAwait(false);
if (bytesRead > 0)
{
// Decoder raises MessageReceived for every message it encounters.
Expand Down Expand Up @@ -408,13 +408,13 @@ public virtual void StartProcessing()
///
/// The Task will be completed when all of the events from the stream have been processed.
/// </summary>
public virtual async Task StartProcessingAsync()
public virtual async Task StartProcessingAsync(CancellationToken cancellationToken = default)
{
if (IsProcessing)
return;

IsProcessing = true;
await ProcessLoopAsync().ConfigureAwait(false);
await ProcessLoopAsync(cancellationToken).ConfigureAwait(false);
}
#endif

Expand Down
Loading

0 comments on commit d27cdac

Please sign in to comment.