Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flux.collectList() return empty list instead of emit error #3917

Open
ben-cpng opened this issue Nov 5, 2024 · 6 comments
Open

Flux.collectList() return empty list instead of emit error #3917

ben-cpng opened this issue Nov 5, 2024 · 6 comments
Labels
type/bug A general bug

Comments

@ben-cpng
Copy link

ben-cpng commented Nov 5, 2024

For a flux created by zipping two Mono MonoA and MonoB, followed by collectList(),

Flux.zip(MonoA, MonoB).collectList()

If MonoB emit error, the collectList() operator should emit the error instead of return a empty list.

Expected Behavior

Flux.collectList() should emit the error emitted by MonoB.

Actual Behavior

Sometime Flux.collectList() will return empty list instead of emit the error.

Steps to Reproduce

Here is a unit test to reproduce the issue

@Test
public void bug_Test() {
    final var numItem = 1000000;
    var fluxToTest = Flux.range(1, numItem)
            .flatMap(ignored -> {
              var mono1 = Mono.empty()
                      .publishOn(PARALLEL_SCHEDULER) // would pass the test if not run in parallel scheduler
                      .map(Optional::of)
                      .defaultIfEmpty(Optional.empty());

              var mono2 = Mono.error(new NullPointerException())
                      .map(Optional::of)
                      .defaultIfEmpty(Optional.empty());

              return Flux.zip(mono1, mono2).collectList().onErrorResume(e -> Mono.empty());
            })
            // expect upstream will emit error signal only
            .flatMap(evt ->
                Mono.error(new RuntimeException("Unexpected empty list return by collectList of size %s".formatted(evt.size())))
            );
    StepVerifier.create(fluxToTest).expectNextCount(0).verifyComplete();
}

The above test always failed with empty list emitted in the last flatMap() block. For expect behavior, the onErrorResume() operator would replace all error signal into Mono.empty and no data would be processed by the last flatMap() block.

Possible Solution

Your Environment

  • Reactor version(s) used: 3.6.9
  • JVM version (java -version): java 21.0.5 2024-10-15 LTS
    Java(TM) SE Runtime Environment (build 21.0.5+9-LTS-239)
    Java HotSpot(TM) 64-Bit Server VM (build 21.0.5+9-LTS-239, mixed mode, sharing)
  • OS and version (eg uname -a): MSYS_NT-10.0-14393 wh-1eab6196ab 3.4.10-2e2ef940.x86_64 2024-07-09 21:35 UTC x86_64 Msys
@chemicL
Copy link
Member

chemicL commented Nov 5, 2024

Hey, thanks for the report. I am wondering though - aren't you explicitly swallowing the error with .onErrorResume(e -> Mono.empty()) that follows after the .collectList() operator?

@chemicL chemicL added the status/need-user-input This needs user input to proceed label Nov 5, 2024
@ben-cpng
Copy link
Author

ben-cpng commented Nov 5, 2024 via email

@chemicL
Copy link
Member

chemicL commented Nov 6, 2024

[EDIT] I misinterpreted the sequence of events in my initial evaluation. The below is not really what's going on since the delay is applied to a successful termination of a Mono that emits an item, not to the one that terminates with an error.

[ORIGINAL] Hmmm, I think your test is rather convoluted and makes it difficult to infer what's actually going on. Let me try to explain:

  1. The fact you needed to publish the error on the parallel scheduler because otherwise the test would pass indicates there is a timing issue in your code.
  2. Because you delay the error, the Mono.zip operator terminates once mono1 completes with just the onComplete() signal and cancels mono2.
  3. Due to the above, the collectList has no chance of observing an error, it just gets the completion signal from the zip operator.
  4. As the specification of collectList states, a completion signal from upstream gets transformed into an empty list downstream signal.

With the above explanation, I'm closing the report as invalid. Hope this explanation helps you resolve the issue you are facing.

@chemicL chemicL closed this as not planned Won't fix, can't repro, duplicate, stale Nov 6, 2024
@chemicL chemicL added status/invalid We don't feel this issue is valid, or the root cause was found outside of Reactor and removed status/need-user-input This needs user input to proceed labels Nov 6, 2024
@ben-cpng
Copy link
Author

ben-cpng commented Nov 7, 2024

Thanks for your updated.
For 2. Would you please tell me why Flux.zip operator terminates once mono1 completed? As in the code, mono1 should emit a data Optional.empty(). Should Flux.zip operator wait signal from mono2 (either data or error) Why should the Flux.zip operator cancel mono2 in this case? Thanks!

@chemicL
Copy link
Member

chemicL commented Nov 7, 2024

My apologies, I think I have confused something there. Your questions are very helpful and I do think you might have in fact discovered a bug that's a race condition uncovered by these events:

  1. First Mono completes with a genuine signal on Thread1
  2. Second Mono errors on Thread2.

The situation happens to be that both Threads are draining the results at the same time and break the contract.
I will investigate further.

I will also update my previous comment to reflect the reality (edit history can be viewed) as I made an error to assume the error gets delayed. The genuine signal is actually delayed and that is triggering this condition.

@chemicL chemicL reopened this Nov 7, 2024
@chemicL chemicL added type/bug A general bug and removed status/invalid We don't feel this issue is valid, or the root cause was found outside of Reactor labels Nov 7, 2024
@chemicL
Copy link
Member

chemicL commented Nov 8, 2024

Just for the record - the exclusivity of drain looks correct, so not both threads are draining at the same time. My observation came from the natural concurrency in Flux.flatMap and two different Flux.zip instances were draining at the same time but were completely unrelated.

The actual issue is that both threads attempt to complete the FluxZip operator and the problematic sequence seems to be the following:

  1. T1: mono1 delivers an actual signal, starts the drain procedure. It doesn't notice any error so continues looking at other results.
  2. T2: mono2 delivers the error, the error is set in the zip operator, but it fails to gain exclusive access to the drain procedure as T1 is executing it.
  3. T1: mono1 code path delivers the successful signal, while not noticing the error that was stored.

There should be some handling that would drop one of the signals in case the other Thread is already handling termination.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/bug A general bug
Projects
None yet
Development

No branches or pull requests

2 participants