Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issues with GraphQL subscriptions #1094

Closed
yassenb opened this issue Dec 4, 2024 · 12 comments
Closed

Issues with GraphQL subscriptions #1094

yassenb opened this issue Dec 4, 2024 · 12 comments
Labels
status: invalid An issue that we don't feel is valid

Comments

@yassenb
Copy link

yassenb commented Dec 4, 2024

We've been having a number of errors regularly each day for months now and I've been postponing logging this until we upgraded to Spring Boot 3.4 but the errors persist. I haven't been able to pin point any determining factor and obviously subscriptions over websockets for the most part work, we use them heavily, but the errors are still in the logs. Here are the two common ones with stacktraces:

reactor.core.Exceptions$OverflowException: Queue is full: Reactive Streams source doesn't respect backpressure
at reactor.core.Exceptions.failWithOverflow ( reactor/core/Exceptions.java:251 )
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.onNext ( reactor/core.publisher/FluxPublishOn.java:233 )
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onNext ( io/opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1/TracingSubscriber.java:68 )
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext ( reactor/core.publisher/MonoFlatMapMany.java:251 )
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onNext ( io/opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1/TracingSubscriber.java:68 )
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext ( reactor/core.publisher/FluxOnErrorResume.java:79 )
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onNext ( io/opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1/TracingSubscriber.java:68 )
at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onNext ( reactor/core.publisher/FluxConcatArray.java:180 )
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onNext ( io/opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1/TracingSubscriber.java:68 )
at reactor.core.publisher.FluxMap$MapSubscriber.onNext ( reactor/core.publisher/FluxMap.java:122 )
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onNext ( io/opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1/TracingSubscriber.java:68 )
at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext ( reactor/core.publisher/FluxPeek.java:200 )
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onNext ( io/opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1/TracingSubscriber.java:68 )
at reactor.core.publisher.FluxMap$MapSubscriber.onNext ( reactor/core.publisher/FluxMap.java:122 )
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onNext ( io/opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1/TracingSubscriber.java:68 )
at graphql.execution.reactive.CompletionStageSubscriber.whenNextFinished ( graphql/execution.reactive/CompletionStageSubscriber.java:95 )
at graphql.execution.reactive.CompletionStageSubscriber.lambda$whenComplete$0 ( graphql/execution.reactive/CompletionStageSubscriber.java:78 )
at java.util.concurrent.CompletableFuture.uniWhenComplete
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire
at java.util.concurrent.CompletableFuture.postComplete
at java.util.concurrent.CompletableFuture.complete
at graphql.execution.ExecutionStrategy.lambda$buildFieldValueMap$2 ( graphql/execution/ExecutionStrategy.java:283 )
at java.util.concurrent.CompletableFuture.uniWhenComplete
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire
at java.util.concurrent.CompletableFuture.postComplete
at java.util.concurrent.CompletableFuture.complete
at graphql.execution.Async$Many.lambda$await$0 ( graphql/execution/Async.java:226 )
at java.util.concurrent.CompletableFuture.uniWhenComplete
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire
at java.util.concurrent.CompletableFuture.postComplete
at java.util.concurrent.CompletableFuture.complete
at graphql.execution.ExecutionStrategy.lambda$buildFieldValueMap$2 ( graphql/execution/ExecutionStrategy.java:283 )
at java.util.concurrent.CompletableFuture.uniWhenComplete
at java.util.concurrent.CompletableFuture.uniWhenCompleteStage
at java.util.concurrent.CompletableFuture.whenComplete
at graphql.execution.ExecutionStrategy.lambda$executeObject$0 ( graphql/execution/ExecutionStrategy.java:234 )
at java.util.concurrent.CompletableFuture.uniWhenComplete
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire
at java.util.concurrent.CompletableFuture.postComplete
at java.util.concurrent.CompletableFuture.complete
at graphql.execution.Async$Many.lambda$await$0 ( graphql/execution/Async.java:226 )
at java.util.concurrent.CompletableFuture.uniWhenComplete
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire
at java.util.concurrent.CompletableFuture.postComplete
at java.util.concurrent.CompletableFuture.complete
at reactor.core.publisher.MonoToCompletableFuture.onNext ( reactor/core.publisher/MonoToCompletableFuture.java:64 )
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onNext ( io/opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1/TracingSubscriber.java:68 )
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext ( reactor/core.publisher/FluxContextWrite.java:107 )
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onNext ( io/opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1/TracingSubscriber.java:68 )
at reactor.core.publisher.MonoCompletionStage$MonoCompletionStageSubscription.apply ( reactor/core.publisher/MonoCompletionStage.java:121 )
at reactor.core.publisher.MonoCompletionStage$MonoCompletionStageSubscription.apply ( reactor/core.publisher/MonoCompletionStage.java:67 )
at java.util.concurrent.CompletableFuture.uniHandle
at java.util.concurrent.CompletableFuture$UniHandle.tryFire
at java.util.concurrent.CompletableFuture.postComplete
at java.util.concurrent.CompletableFuture.complete
at org.dataloader.DataLoaderHelper.lambda$dispatchQueueBatch$2 ( org/dataloader/DataLoaderHelper.java:267 )
at java.util.concurrent.CompletableFuture$UniApply.tryFire
at java.util.concurrent.CompletableFuture.postComplete
at java.util.concurrent.CompletableFuture.complete
at reactor.core.publisher.MonoToCompletableFuture.onNext ( reactor/core.publisher/MonoToCompletableFuture.java:64 )
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onNext ( io/opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1/TracingSubscriber.java:68 )
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext ( reactor/core.publisher/FluxContextWrite.java:107 )
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onNext ( io/opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1/TracingSubscriber.java:68 )
at reactor.core.publisher.MonoCompletionStage$MonoCompletionStageSubscription.apply ( reactor/core.publisher/MonoCompletionStage.java:121 )
at reactor.core.publisher.MonoCompletionStage$MonoCompletionStageSubscription.apply ( reactor/core.publisher/MonoCompletionStage.java:67 )
at java.util.concurrent.CompletableFuture.uniHandle
at java.util.concurrent.CompletableFuture$UniHandle.tryFire
at java.util.concurrent.CompletableFuture.postComplete
at java.util.concurrent.CompletableFuture.complete
at org.springframework.graphql.data.method.InvocableHandlerMethodSupport.lambda$adaptCallable$1 ( org/springframework.graphql.data.method/InvocableHandlerMethodSupport.java:158 )
at java.util.concurrent.ThreadPerTaskExecutor$TaskRunner.run
at java.lang.VirtualThread.run

and

reactor.core.Exceptions$ReactorRejectedExecutionException: Scheduler unavailable
at reactor.core.Exceptions.failWithRejected ( reactor/core/Exceptions.java:285 )
at reactor.core.publisher.Operators.onRejectedExecution ( reactor/core.publisher/Operators.java:1075 )
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.trySchedule ( reactor/core.publisher/FluxPublishOn.java:333 )
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.onNext ( reactor/core.publisher/FluxPublishOn.java:237 )
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onNext ( io/opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1/TracingSubscriber.java:68 )
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext ( reactor/core.publisher/MonoFlatMapMany.java:251 )
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onNext ( io/opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1/TracingSubscriber.java:68 )
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext ( reactor/core.publisher/FluxOnErrorResume.java:79 )
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onNext ( io/opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1/TracingSubscriber.java:68 )
at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onNext ( reactor/core.publisher/FluxConcatArray.java:180 )
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onNext ( io/opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1/TracingSubscriber.java:68 )
at reactor.core.publisher.FluxMap$MapSubscriber.onNext ( reactor/core.publisher/FluxMap.java:122 )
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onNext ( io/opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1/TracingSubscriber.java:68 )
at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext ( reactor/core.publisher/FluxPeek.java:200 )
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onNext ( io/opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1/TracingSubscriber.java:68 )
at reactor.core.publisher.FluxMap$MapSubscriber.onNext ( reactor/core.publisher/FluxMap.java:122 )
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onNext ( io/opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1/TracingSubscriber.java:68 )
at graphql.execution.reactive.CompletionStageSubscriber.whenNextFinished ( graphql/execution.reactive/CompletionStageSubscriber.java:95 )
at graphql.execution.reactive.CompletionStageSubscriber.lambda$whenComplete$0 ( graphql/execution.reactive/CompletionStageSubscriber.java:78 )
at java.util.concurrent.CompletableFuture.uniWhenComplete
at java.util.concurrent.CompletableFuture.uniWhenCompleteStage
at java.util.concurrent.CompletableFuture.whenComplete
at java.util.concurrent.CompletableFuture.whenComplete
at graphql.execution.reactive.CompletionStageSubscriber.onNext ( graphql/execution.reactive/CompletionStageSubscriber.java:66 )
at reactor.core.publisher.StrictSubscriber.onNext ( reactor/core.publisher/StrictSubscriber.java:89 )
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onNext ( io/opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1/TracingSubscriber.java:68 )
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext ( reactor/core.publisher/FluxContextWrite.java:107 )
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onNext ( io/opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1/TracingSubscriber.java:68 )
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext ( reactor/core.publisher/FluxOnErrorResume.java:79 )
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onNext ( io/opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1/TracingSubscriber.java:68 )
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext ( reactor/core.publisher/FluxOnErrorResume.java:79 )
at kotlinx.coroutines.reactive.FlowSubscription$consumeFlow$2.emit ( kotlinx/coroutines.reactive/ReactiveFlow.kt:234 )
at kotlinx.coroutines.flow.internal.UndispatchedContextCollector$emitRef$1.invokeSuspend ( kotlinx/coroutines.flow.internal/ChannelFlow.kt:208 )
at kotlinx.coroutines.flow.internal.UndispatchedContextCollector$emitRef$1.invoke
at kotlinx.coroutines.flow.internal.UndispatchedContextCollector$emitRef$1.invoke
at kotlinx.coroutines.flow.internal.ChannelFlowKt.withContextUndispatched ( kotlinx/coroutines.flow.internal/ChannelFlow.kt:223 )
at kotlinx.coroutines.flow.internal.UndispatchedContextCollector.emit ( kotlinx/coroutines.flow.internal/ChannelFlow.kt:211 )
at havelock.gateway.subscription.SubscriptionController$counter$$inlined$map$1$2.emit ( havelock/gateway.subscription/Emitters.kt:219 )
at kotlinx.coroutines.flow.FlowKt__ChannelsKt.emitAllImpl$FlowKt__ChannelsKt ( kotlinx/coroutines.flow/Channels.kt:33 )
at kotlinx.coroutines.flow.FlowKt__ChannelsKt.access$emitAllImpl$FlowKt__ChannelsKt ( kotlinx/coroutines.flow/Channels.kt:1 )
at kotlinx.coroutines.flow.FlowKt__ChannelsKt$emitAllImpl$1.invokeSuspend
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith ( kotlin/coroutines.jvm.internal/ContinuationImpl.kt:33 )
at kotlinx.coroutines.DispatchedTaskKt.resume ( kotlinx/coroutines/DispatchedTask.kt:231 )
at kotlinx.coroutines.DispatchedTaskKt.resumeUnconfined ( kotlinx/coroutines/DispatchedTask.kt:187 )
at kotlinx.coroutines.DispatchedTaskKt.dispatch ( kotlinx/coroutines/DispatchedTask.kt:159 )
at kotlinx.coroutines.CancellableContinuationImpl.dispatchResume ( kotlinx/coroutines/CancellableContinuationImpl.kt:466 )
at kotlinx.coroutines.CancellableContinuationImpl.completeResume ( kotlinx/coroutines/CancellableContinuationImpl.kt:582 )
at kotlinx.coroutines.channels.BufferedChannelKt.tryResume0 ( kotlinx/coroutines.channels/BufferedChannel.kt:2927 )
at kotlinx.coroutines.channels.BufferedChannelKt.access$tryResume0 ( kotlinx/coroutines.channels/BufferedChannel.kt:1 )
at kotlinx.coroutines.channels.BufferedChannel$BufferedChannelIterator.tryResumeHasNext ( kotlinx/coroutines.channels/BufferedChannel.kt:1717 )
at kotlinx.coroutines.channels.BufferedChannel.tryResumeReceiver ( kotlinx/coroutines.channels/BufferedChannel.kt:665 )
at kotlinx.coroutines.channels.BufferedChannel.updateCellSend ( kotlinx/coroutines.channels/BufferedChannel.kt:481 )
at kotlinx.coroutines.channels.BufferedChannel.access$updateCellSend ( kotlinx/coroutines.channels/BufferedChannel.kt:36 )
at kotlinx.coroutines.channels.BufferedChannel.send$suspendImpl ( kotlinx/coroutines.channels/BufferedChannel.kt:3120 )
at kotlinx.coroutines.channels.BufferedChannel.send
at havelock.gateway.subscription.events.Event$emit$1$1$1.invokeSuspend ( havelock/gateway.subscription.events/Event.kt:41 )
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith ( kotlin/coroutines.jvm.internal/ContinuationImpl.kt:33 )
at kotlinx.coroutines.DispatchedTask.run ( kotlinx/coroutines/DispatchedTask.kt:104 )
at io.opentelemetry.javaagent.instrumentation.kotlinxcoroutines.RunnableWrapper.lambda$stopPropagation$0 ( io/opentelemetry.javaagent.instrumentation.kotlinxcoroutines/RunnableWrapper.java:16 )
at java.util.concurrent.ThreadPerTaskExecutor$TaskRunner.run
at java.lang.VirtualThread.run

Caused by: java.util.concurrent.RejectedExecutionException
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution
at java.util.concurrent.ThreadPoolExecutor.reject
at java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute
at java.util.concurrent.ScheduledThreadPoolExecutor.schedule
at java.util.concurrent.ScheduledThreadPoolExecutor.submit
at reactor.core.scheduler.Schedulers.workerSchedule ( Schedulers.java:1410 )
at reactor.core.scheduler.ExecutorServiceWorker.schedule ( ExecutorServiceWorker.java:50 )
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.trySchedule ( FluxPublishOn.java:312 )

There is nothing in these back traces that I see that can help me further diagnose and I can't reliably reproduce the issue.

The lesser encountered one is

java.io.IOException: The current thread was interrupted while waiting for a blocking send to complete
at org.apache.tomcat.websocket.WsRemoteEndpointImplBase.sendMessageBlockInternal ( org/apache.tomcat.websocket/WsRemoteEndpointImplBase.java:308 )
at org.apache.tomcat.websocket.WsRemoteEndpointImplBase.sendMessageBlock ( org/apache.tomcat.websocket/WsRemoteEndpointImplBase.java:266 )
at org.apache.tomcat.websocket.WsRemoteEndpointImplBase.sendMessageBlock ( org/apache.tomcat.websocket/WsRemoteEndpointImplBase.java:250 )
at org.apache.tomcat.websocket.WsRemoteEndpointImplBase.sendPartialString ( org/apache.tomcat.websocket/WsRemoteEndpointImplBase.java:223 )
at org.apache.tomcat.websocket.WsRemoteEndpointBasic.sendText ( org/apache.tomcat.websocket/WsRemoteEndpointBasic.java:48 )
at org.springframework.web.socket.adapter.standard.StandardWebSocketSession.sendTextMessage ( org/springframework.web.socket.adapter.standard/StandardWebSocketSession.java:217 )
at org.springframework.web.socket.adapter.AbstractWebSocketSession.sendMessage ( org/springframework.web.socket.adapter/AbstractWebSocketSession.java:108 )
at org.springframework.graphql.server.webmvc.GraphQlWebSocketHandler.lambda$handleInternal$2 ( org/springframework.graphql.server.webmvc/GraphQlWebSocketHandler.java:245 )
at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.ContextPropagationOperator$RunnableWrapper.run ( io/opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1/ContextPropagationOperator.java:373 )
at reactor.core.scheduler.SchedulerTask.call ( reactor/core.scheduler/SchedulerTask.java:68 )
at reactor.core.scheduler.SchedulerTask.call ( reactor/core.scheduler/SchedulerTask.java:28 )
at java.util.concurrent.FutureTask.run
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run
at java.util.concurrent.ThreadPoolExecutor.runWorker
at java.util.concurrent.ThreadPoolExecutor$Worker.run
at java.lang.Thread.run

Caused by: java.lang.InterruptedException
at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos
at java.util.concurrent.Semaphore.tryAcquire
at org.apache.tomcat.websocket.WsRemoteEndpointImplBase.acquireMessagePartInProgressSemaphore ( WsRemoteEndpointImplBase.java:355 )
at org.apache.tomcat.websocket.server.WsRemoteEndpointImplServer.acquireMessagePartInProgressSemaphore ( WsRemoteEndpointImplServer.java:146 )
at org.apache.tomcat.websocket.WsRemoteEndpointImplBase.sendMessageBlockInternal ( WsRemoteEndpointImplBase.java:298 ) 

which I think happens upon application shutdown.

If it's any help, we're returning Kotlin Flow-s from our controller subscription methods which are backed by Kotlin Channel-s. We create a Channel, send to it asyncrhonously and convert the Channel to a Flow via consumeAsFlow

Can you look into those or let me know how I can further diagnose what's going on?

Also are there any plans on having another implementation of GraphQL subscriptions that doesn't use the Reactive stack? We would much prefer Kotlin coroutines or virtual threads and that's the only bit in Spring that still forces us to deal with the Reactive stack and it's exactly issues like these that are very hard to diagnose that we'd like to avoid.

@spring-projects-issues spring-projects-issues added the status: waiting-for-triage An issue we've not yet triaged label Dec 4, 2024
@bclozel
Copy link
Member

bclozel commented Dec 5, 2024

This type of error usually happens when a publisher doesn't behave as it should and keeps publishing even if backpressure is ongoing and there is no demand from the subscriber.

Looking at the issue description this looks like a likely candidate:

If it's any help, we're returning Kotlin Flow-s from our controller subscription methods which are backed by Kotlin Channel-s. We create a Channel, send to it asyncrhonously and convert the Channel to a Flow via consumeAsFlow

Would you mind sharing a minimal sample application that does just that?

@bclozel bclozel added the status: waiting-for-feedback We need additional information before we can continue label Dec 5, 2024
@yassenb
Copy link
Author

yassenb commented Dec 6, 2024

Sure, here's the simplified demonstration of our usage:
https://github.com/Havelock-JSC/playground/tree/backpressure - the backpressure branch.
It shouldn't be a problem with the application code since it's a "rendezvous" channel and the send to it should suspend until the client reads the data (the browser receives the next message in the GraphQL subscription) or the channel is closed (because the client terminated the GraphQL subscription). So there is no buffering and no backpressure in the application code itself. My guess would be that when the client doesn't disconnect gracefully but rather loses network connectivity Spring is doing what you describe - "keeps publishing even if backpressure is ongoing" and not discarding and canceling the subscription

@spring-projects-issues spring-projects-issues added status: feedback-provided Feedback has been provided and removed status: waiting-for-feedback We need additional information before we can continue labels Dec 6, 2024
@bclozel
Copy link
Member

bclozel commented Dec 6, 2024

Thanks for the sample @yassenb

I know that the sample is probably synthetic and simplifying quite a lot, but the behavior displayed looks problematic to me. When the client goes away, the channel keeps publishing indefinitely:

Writing = Foo(x=11)
Writing = Foo(x=12)
Writing = Foo(x=13)
Writing = Foo(x=14)
Writing = Foo(x=15)
Writing = Foo(x=16)
Writing = Foo(x=17)
kotlinx.coroutines.JobCancellationException: FlowSubscription was cancelled; job=FlowSubscription{Cancelled}@5e905b7
Writing = Foo(x=18)
kotlinx.coroutines.JobCancellationException: FlowSubscription was cancelled; job=FlowSubscription{Cancelled}@5e905b7
Writing = Foo(x=19)
kotlinx.coroutines.JobCancellationException: FlowSubscription was cancelled; job=FlowSubscription{Cancelled}@5e905b7

As far as I understand, the Channel to Flow integration is quite low level and I'm not sure the backpressure is handled automatically for you, nor all the close/cancel semantics. I would suggest reaching out to the Kotlin community and ask about that particular point for consumeAsFlow. Is this Channel to Flow conversion respecting the backpressure? Are there any concerns or housekeeping tasks that you should be aware of?

It shouldn't be a problem with the application code since it's a "rendezvous" channel and the send to it should suspend until the client reads the data (the browser receives the next message in the GraphQL subscription) or the channel is closed (because the client terminated the GraphQL subscription).

The consumeAsFlow method creates a FlowCollector, which might be queuing elements behind the scenes. Again I'm far from being an expert here but I think the Kotlin community should be of help here.

So there is no buffering and no backpressure in the application code itself. My guess would be that when the client doesn't disconnect gracefully but rather loses network connectivity Spring is doing what you describe - "keeps publishing even if backpressure is ongoing" and not discarding and canceling the subscription

I have tried to reproduce this without Channels being involved and couldn't. For example the following:

    @SubscriptionMapping
    fun getFooSubscription(): Flow<Foo> {

        return Flux.interval(Duration.ofSeconds(1))
            .map { Foo(it.toInt()) }
            .doOnNext({println("Writing = $it")})
            .doOnCancel({println("SubscriptionController cancelled")})
            .asFlow()
    }

Prints this:

Writing = Foo(x=0)
Writing = Foo(x=1)
Writing = Foo(x=2)
Writing = Foo(x=3)
Writing = Foo(x=4)
Writing = Foo(x=5)
SubscriptionController cancelled

The upstream publication is cancelled so I don't know how things would be piling up in this case.

Also are there any plans on having another implementation of GraphQL subscriptions that doesn't use the Reactive stack? We would much prefer Kotlin coroutines or virtual threads and that's the only bit in Spring that still forces us to deal with the Reactive stack and it's exactly issues like these that are very hard to diagnose that we'd like to avoid.

This is a complex topic, but here are a couple of pointers. The graphql-java project is using Publisher for subscriptions support, so reactive-streams is kind of a requirement. I think that Kotlin Flow and Flux are semantically close, even if there are some differences with the execution model that we're working on improving.

Thanks!

@bclozel bclozel added status: waiting-for-feedback We need additional information before we can continue and removed status: feedback-provided Feedback has been provided labels Dec 6, 2024
@yassenb
Copy link
Author

yassenb commented Dec 6, 2024

Thanks for digging deeper!

You are right that I have a leak of channel objects being left in memory after the subscription has ended. I will fix this in our production code and let you know if it fixes the issues but I doubt it. Notice the part in the example code

catch (exception: CancellationException) {
    // ...
}

What happens when a subscription is canceled from what I understand is that Spring unsubscribes from the Flux. That Flux has an underlying coroutine that runs the Flow so when the Flux is unsubscribed that coroutine job is canceled so any subsequent attempts to send to the channel start producing the cancellation exception you've observed but that exception is caught and swallowed. And in any case, given that the Flux is unsubscribed what happens to the underlying Flow and it's underlying Channel should not be of concern to Spring. As you can observe we don't get the exceptions and back traces I've encountered even after the subscription ends and the send-s to the Channel start throwing the cancellation exception.

The graphql-java/graphql-java#754, so reactive-streams is kind of a requirement

Yes, the library is a requirement but org.reactivestreams.Publisher is just an interface so in theory it could be implemented with a different backing mechanism like virtual threads or Flow-s

@spring-projects-issues spring-projects-issues added status: feedback-provided Feedback has been provided and removed status: waiting-for-feedback We need additional information before we can continue labels Dec 6, 2024
@bclozel
Copy link
Member

bclozel commented Dec 6, 2024

Let's take a step back.

The first and second stacktrace you've shared shows a symptom that points to messages overflowing the queue and lifecycle management because a producer does not honor the spec. This can happen at runtime and the client doesn't need to go away necessarily. Or can you confirm this only happens when clients go away?

The second case could be very well linked to the server/client going away and is less concerning in my opinion.

My previous comment was merely stating that working with reactive bridges can be tricky and that the issue is most likely there.

What happens when a subscription is canceled from what I understand is that Spring unsubscribes from the Flux. That Flux has an underlying coroutine that runs the Flow so when the Flux is unsubscribed that coroutine job is canceled so any subsequent attempts to send to the channel start producing the cancellation exception you've observed but that exception is caught and swallowed.

I understand that, but this does not explain what is overwhelming the consumer in this case and I would like to track this down first. I have seen similar issues in the past but they usually happen when a Publisher doesn't honor the spec. We have lots of production usage with Flux, Flow and many other types supported by our ReactiveAdapterRegistry, this is why I would like to ensure that this Channel->Flow bridge is safe. Can you let us know what the Kotlin experts say about this?

And in any case, given that the Flux is unsubscribed what happens to the underlying Flow and it's underlying Channel should not be of concern to Spring.

In any case, Spring is not producing any value, only that data fetcher is. If you can reproduce the problem by using directly a Flux or a Flow without Channel being involved, this would helpful. Being able to reproduce this more consistently would be ideal. I understand we don't have that for now and that's why I'm trying to rule out candidates. In a similar fashion, I'm seeing quite a few io.opentelemetry calls in the stacks; can you reproduce the problem without that Java agent as well (again, for the sake of simplifying things and ruling things out)?.

Yes, the library is a requirement but org.reactivestreams.Publisher is just an interface so in theory it could be implemented with a different backing mechanism like virtual threads or Flow-s

Publisher is indeed just an interface but the spec and TCK are not easy to implement. I don't understand what you would get by replacing reactor by Flow. You'd be more or less in the same situation.

I'll wait for your feedback.

@bclozel bclozel added status: waiting-for-feedback We need additional information before we can continue and removed status: feedback-provided Feedback has been provided labels Dec 6, 2024
@yassenb
Copy link
Author

yassenb commented Dec 12, 2024

I have pushed a new commit to the branch https://github.com/Havelock-JSC/playground/tree/backpressure. There are now two implementations of a simple subscription - one that is Kotlin-based and one purely with Reactive streams using Flux.

The Flux implementation pretty much mimics the way Kotlin converts a Flow backed by a Channel to a Flux (apart from the Semaphore bit, Kotlin uses an atomic counter and coroutines for synchronization). If you look at the implementations you'll see that it's not only the cryptic stacktraces that are my gripe with Reactive but also the way simple things have to be implemented. Having to use a Semaphore for synchronization like that is far from elegant in my view. The Reactive world forces you to think about backpressure. But in my case there is no good way to deal with backpressure. I have to either drop data or use an unbounded buffer which could lead to an unwanted memory increase and possibly in the case of a badly behaving client this memory will never be freed. I don't want to deal with backpressure, I want to suspend (with a timeout) when there's a new message to be sent in a GraphQL subscription and resume only when the client actually reads the data. In other words I want a pull-based implementation where the client pulls data when it's ready to receive it instead of a push-based one where you push to a buffer completely decoupled from the receiving end. While Flow is conceptually similar to Flux this demonstrates the greater flexibility and simplicity Flow gives you. You can have a buffer of arbitrary size or unlimited or size 1 which basically means not emitting until the client is ready. And you can choose what to do when the buffer is full - suspend or error or drop.

To rule out any concerns you might have with the Kotlin implementation let's concider only the Flux one. Again, what it should achieve is not having to ever be in a situation where there's backpressure. The application code only emits results when requested. You can also notice that apart from the implementation being cumbersome it suffers from another disadvantage - when the subscription is canceled one more value is still always emitted. If you switch the FluxSink.OverflowStrategy to ERROR you'll see an exception that the socket has been closed when this last value is emitted after the subscription has been canceled. But that's not a big concern for me, at least it's an understandable error. However even with the default which is BUFFER and this code deployed in my production environment I continue getting the stacktraces outlined above and I can't see how there's any way the application code could be misbehaving and disrespecting backpressure requests when it emits only what is requested and only up until the point the subscription is canceled.

Ruling out other factors like the Open Telemetry agent would be a bit harder because it would mean losing observability for a while in my production environment. I think the evidence above is sufficient to demonstrate that there's something off here - receiving backpressure errors when there is an unbounded buffer and, more importantly, the application ensures it's delivering only what's requested by the framework (and apologies if this could be an issue with graphql-java but I haven't studied the code enough to know where the boundary is and where the issue might be). Probably the next step would be to able to reliably reproduce this with either a stress test or a way to control a client, if the issue arises when a client is slow / never responds but I have no experience in using any tools that would help with this so could take a longer time to get back to you on that.

Thanks again for you valuable feedback and taking the time to look into this.

@spring-projects-issues spring-projects-issues added status: feedback-provided Feedback has been provided and removed status: waiting-for-feedback We need additional information before we can continue labels Dec 12, 2024
@bclozel
Copy link
Member

bclozel commented Dec 16, 2024

I've had a look and unfortunately I couldn't reproduce the behavior you're describing. Using Flux.create seems the right way to go, although Sink mentions concurrent subscriptions as a limitation - maybe this is the case here?

I'm sorry but I can't spend more time on this issue if there is nothing actionable on my side. If you can reproduce consistently those stacktraces locally let me know how to do this. Thanks!

@bclozel bclozel closed this as not planned Won't fix, can't repro, duplicate, stale Dec 16, 2024
@bclozel bclozel added status: invalid An issue that we don't feel is valid and removed status: feedback-provided Feedback has been provided status: waiting-for-triage An issue we've not yet triaged labels Dec 16, 2024
@yassenb
Copy link
Author

yassenb commented Dec 16, 2024

Spring or graphql-java is doing the subscriptions so I don't know if there could be any concurrency issues there. All the application code is doing is creating the Flux and returning it, the framework is then subscribing.

Thanks anyway, I will post here if I have any further findings.

@bclozel
Copy link
Member

bclozel commented Dec 16, 2024

Teams are running in production with this at scale, so the problem is probably not easy to pinpoint.

@yassenb
Copy link
Author

yassenb commented Dec 16, 2024

Agreed. I see 3 options:

  1. Try and setup a stress test and hope it reproduces it
  2. Try and find a tool or build a client that intentionally delays or never receives messages and again hope the problem is somewhere in there
  3. Study the framework and library code and try to deduce how it might occur

Out of the three I think 3. is the most viable option for me. Again, I don't see how the demonstrated application code (which is identical to my production code in all key aspects) can lead to backpressure problems by itself so it must be something with the way the subscription to the returned Flux is created or manipulated. Could be OpenTelemetry after all, I'll look into disabling it as well.

@bclozel
Copy link
Member

bclozel commented Dec 16, 2024

I would seriously avoid 3), especially if you dislike reactive streams in the first place. Without a hint about why a problem occurs, this is basically trying to find a complex concurrency issue by just looking at a codebase and all its libraries.

I have tried both 1) and 2) with your samples, unsuccessfully. There are many stress test tools and proxies that can help with that.

@yassenb
Copy link
Author

yassenb commented Dec 19, 2024

Here's a stack trace after disabling OpenTelemetry Reactor tracing which rules it out as a culprit:

 reactor.core.Exceptions$OverflowException: Queue is full: Reactive Streams source doesn't respect backpressure
at reactor.core.Exceptions.failWithOverflow ( reactor/core/Exceptions.java:251 )
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.onNext ( reactor/core.publisher/FluxPublishOn.java:233 )
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext ( reactor/core.publisher/MonoFlatMapMany.java:251 )
at org.springframework.security.config.annotation.web.configuration.SecurityReactorContextConfiguration$SecurityReactorContextSubscriber.onNext ( org/springframework.security.config.annotation.web.configuration/SecurityReactorContextConfiguration.java:186 )
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext ( reactor/core.publisher/FluxOnErrorResume.java:79 )
at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onNext ( reactor/core.publisher/FluxConcatArray.java:180 )
at reactor.core.publisher.FluxMap$MapSubscriber.onNext ( reactor/core.publisher/FluxMap.java:122 )
at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext ( reactor/core.publisher/FluxPeek.java:200 )
at reactor.core.publisher.FluxMap$MapSubscriber.onNext ( reactor/core.publisher/FluxMap.java:122 )
at graphql.execution.reactive.CompletionStageSubscriber.whenNextFinished ( graphql/execution.reactive/CompletionStageSubscriber.java:95 )
at graphql.execution.reactive.CompletionStageSubscriber.lambda$whenComplete$0 ( graphql/execution.reactive/CompletionStageSubscriber.java:78 )
at java.util.concurrent.CompletableFuture.uniWhenComplete
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire
at java.util.concurrent.CompletableFuture.postComplete
at java.util.concurrent.CompletableFuture.complete
at graphql.execution.ExecutionStrategy.lambda$buildFieldValueMap$2 ( graphql/execution/ExecutionStrategy.java:283 )
at java.util.concurrent.CompletableFuture.uniWhenComplete
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire
at java.util.concurrent.CompletableFuture.postComplete
at java.util.concurrent.CompletableFuture.complete
at graphql.execution.Async$Many.lambda$await$0 ( graphql/execution/Async.java:226 )
at java.util.concurrent.CompletableFuture.uniWhenComplete
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire
at java.util.concurrent.CompletableFuture.postComplete
at java.util.concurrent.CompletableFuture.complete
at graphql.execution.ExecutionStrategy.lambda$buildFieldValueMap$2 ( graphql/execution/ExecutionStrategy.java:283 )
at java.util.concurrent.CompletableFuture.uniWhenComplete
at java.util.concurrent.CompletableFuture.uniWhenCompleteStage
at java.util.concurrent.CompletableFuture.whenComplete
at graphql.execution.ExecutionStrategy.lambda$executeObject$0 ( graphql/execution/ExecutionStrategy.java:234 )
at java.util.concurrent.CompletableFuture.uniWhenComplete
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire
at java.util.concurrent.CompletableFuture.postComplete
at java.util.concurrent.CompletableFuture.complete
at graphql.execution.Async$Many.lambda$await$0 ( graphql/execution/Async.java:226 )
at java.util.concurrent.CompletableFuture.uniWhenComplete
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire
at java.util.concurrent.CompletableFuture.postComplete
at java.util.concurrent.CompletableFuture.complete
at reactor.core.publisher.MonoToCompletableFuture.onNext ( reactor/core.publisher/MonoToCompletableFuture.java:64 )
at org.springframework.security.config.annotation.web.configuration.SecurityReactorContextConfiguration$SecurityReactorContextSubscriber.onNext ( org/springframework.security.config.annotation.web.configuration/SecurityReactorContextConfiguration.java:186 )
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext ( reactor/core.publisher/FluxContextWrite.java:107 )
at reactor.core.publisher.MonoCompletionStage$MonoCompletionStageSubscription.apply ( reactor/core.publisher/MonoCompletionStage.java:121 )
at reactor.core.publisher.MonoCompletionStage$MonoCompletionStageSubscription.apply ( reactor/core.publisher/MonoCompletionStage.java:67 )
at java.util.concurrent.CompletableFuture.uniHandle
at java.util.concurrent.CompletableFuture$UniHandle.tryFire
at java.util.concurrent.CompletableFuture.postComplete
at java.util.concurrent.CompletableFuture.complete
at org.dataloader.DataLoaderHelper.lambda$dispatchQueueBatch$2 ( org/dataloader/DataLoaderHelper.java:267 )
at java.util.concurrent.CompletableFuture$UniApply.tryFire
at java.util.concurrent.CompletableFuture.postComplete
at java.util.concurrent.CompletableFuture.complete
at reactor.core.publisher.MonoToCompletableFuture.onNext ( reactor/core.publisher/MonoToCompletableFuture.java:64 )
at org.springframework.security.config.annotation.web.configuration.SecurityReactorContextConfiguration$SecurityReactorContextSubscriber.onNext ( org/springframework.security.config.annotation.web.configuration/SecurityReactorContextConfiguration.java:186 )
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext ( reactor/core.publisher/FluxContextWrite.java:107 )
at reactor.core.publisher.MonoCompletionStage$MonoCompletionStageSubscription.apply ( reactor/core.publisher/MonoCompletionStage.java:121 )
at reactor.core.publisher.MonoCompletionStage$MonoCompletionStageSubscription.apply ( reactor/core.publisher/MonoCompletionStage.java:67 )
at java.util.concurrent.CompletableFuture.uniHandle
at java.util.concurrent.CompletableFuture$UniHandle.tryFire
at java.util.concurrent.CompletableFuture.postComplete
at java.util.concurrent.CompletableFuture.complete
at org.springframework.graphql.data.method.InvocableHandlerMethodSupport.lambda$adaptCallable$1 ( org/springframework.graphql.data.method/InvocableHandlerMethodSupport.java:158 )
at java.util.concurrent.ThreadPerTaskExecutor$TaskRunner.run
at java.lang.VirtualThread.run

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
status: invalid An issue that we don't feel is valid
Projects
None yet
Development

No branches or pull requests

3 participants