Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GrpcServiceBridgeImpl should set exception handling callback #101

Open
hu-chia opened this issue Jun 14, 2024 · 4 comments · May be fixed by #102
Open

GrpcServiceBridgeImpl should set exception handling callback #101

hu-chia opened this issue Jun 14, 2024 · 4 comments · May be fixed by #102
Labels
bug Something isn't working

Comments

@hu-chia
Copy link

hu-chia commented Jun 14, 2024

Questions

When I using GrpcServiceBridgeImpl, some of the behavior of vertx-grpc is not as expected when the network is abnormal.

Expect Behavior

void init(ServerCall.Listener<Req> listener) {
this.listener = listener;
req.errorHandler(error -> {
if (error == GrpcError.CANCELLED && !closed) {
listener.onCancel();
}
});
readAdapter.init(req, new BridgeMessageDecoder<>(methodDef.getMethodDescriptor().getRequestMarshaller(), decompressor));
writeAdapter.init(req.response(), new BridgeMessageEncoder<>(methodDef.getMethodDescriptor().getResponseMarshaller(), compressor));
}

if the network between client and server was interrupted, listener.onCancel() should be invoked.

Actual Behavior

nothing happened.

Version

4.5.7


The errorHandler is an method from GrpcReadStream, it was invoked only when the httpStream has a StreamResetException, but when client was killed, an IOException occurred.

public void init() {
stream.handler(this);
stream.endHandler(v -> queue.write(END_SENTINEL));
stream.exceptionHandler(err -> {
if (err instanceof StreamResetException) {
handleReset(((StreamResetException)err).getCode());
} else {
handleException(err);
}
});
queue.drainHandler(v -> stream.resume());

java.io.IOException: Broken pipe
	at java.base/sun.nio.ch.SocketDispatcher.writev0(Native Method)
	at java.base/sun.nio.ch.SocketDispatcher.writev(SocketDispatcher.java:66)
	at java.base/sun.nio.ch.IOUtil.write(IOUtil.java:227)
	at java.base/sun.nio.ch.IOUtil.write(IOUtil.java:158)
	at java.base/sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:574)
	at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:430)
	at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:931)
	at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:359)
	at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:895)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1372)
	at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:935)
	at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:921)
	at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:907)
	at io.netty.handler.codec.http2.Http2ConnectionHandler.flush(Http2ConnectionHandler.java:197)
	at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:941)
	at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:921)
	at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:907)
	at io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:967)
	at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:254)
	at io.vertx.core.http.impl.VertxHttp2ConnectionHandler.checkFlush(VertxHttp2ConnectionHandler.java:247)
	at io.vertx.core.http.impl.VertxHttp2ConnectionHandler.writeData(VertxHttp2ConnectionHandler.java:242)
	at io.vertx.core.http.impl.VertxHttp2Stream.doWriteData(VertxHttp2Stream.java:244)
	at io.vertx.core.http.impl.VertxHttp2Stream.writeData(VertxHttp2Stream.java:216)
	at io.vertx.core.http.impl.Http2ServerResponse.write(Http2ServerResponse.java:469)
	at io.vertx.core.http.impl.Http2ServerResponse.write(Http2ServerResponse.java:347)
	at io.vertx.core.http.impl.Http2ServerResponse.write(Http2ServerResponse.java:48)
	at io.vertx.grpc.server.impl.GrpcServerResponseImpl.writeMessage(GrpcServerResponseImpl.java:242)
	at io.vertx.grpc.server.impl.GrpcServerResponseImpl.writeMessage(GrpcServerResponseImpl.java:112)
	at io.vertx.grpc.server.impl.GrpcServerResponseImpl.write(GrpcServerResponseImpl.java:102)
	at io.vertx.grpc.server.impl.GrpcServerResponseImpl.write(GrpcServerResponseImpl.java:248)
	at io.vertx.core.streams.impl.PipeImpl.lambda$to$1(PipeImpl.java:81)
	at io.vertx.grpc.server.impl.GrpcServerRequestImpl.lambda$handler$0(GrpcServerRequestImpl.java:79)
	at io.vertx.grpc.common.impl.GrpcReadStreamBase.handleMessage(GrpcReadStreamBase.java:207)
	at io.vertx.grpc.common.impl.GrpcReadStreamBase.lambda$init$3(GrpcReadStreamBase.java:89)
	at io.vertx.core.streams.impl.InboundBuffer.handleEvent(InboundBuffer.java:255)
	at io.vertx.core.streams.impl.InboundBuffer.write(InboundBuffer.java:134)
	at io.vertx.grpc.common.impl.GrpcReadStreamBase.handle(GrpcReadStreamBase.java:125)
	at io.vertx.grpc.common.impl.GrpcReadStreamBase.handle(GrpcReadStreamBase.java:39)
	at io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:279)
	at io.vertx.core.http.impl.HttpEventHandler.handleChunk(HttpEventHandler.java:51)
	at io.vertx.core.http.impl.Http2ServerRequest.handleData(Http2ServerRequest.java:148)
	at io.vertx.core.http.impl.Http2ServerStream.handleData(Http2ServerStream.java:206)
	at io.vertx.core.http.impl.VertxHttp2Stream.lambda$new$1(VertxHttp2Stream.java:75)
	at io.vertx.core.streams.impl.InboundBuffer.handleEvent(InboundBuffer.java:255)
	at io.vertx.core.streams.impl.InboundBuffer.drain(InboundBuffer.java:242)
	at io.vertx.core.streams.impl.InboundBuffer.lambda$fetch$0(InboundBuffer.java:295)
	at io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:279)
	at io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:261)
	at io.vertx.core.impl.ContextInternal.lambda$runOnContext$0(ContextInternal.java:59)
	at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173)
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166)
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:566)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base/java.lang.Thread.run(Thread.java:1583)

@hu-chia hu-chia added the bug Something isn't working label Jun 14, 2024
@hu-chia
Copy link
Author

hu-chia commented Jun 14, 2024

I believe this problem is the same as #28 , and #28 provides a workaround.

@hu-chia
Copy link
Author

hu-chia commented Jun 14, 2024

If you use vert.x grpc server, the keep-alive workaround is unavailable, depending on #74

@vietj
Copy link
Member

vietj commented Jun 16, 2024

can someone provide a reproducer for this ?

@hu-chia
Copy link
Author

hu-chia commented Jun 17, 2024

@vietj here you are.

step 1: define a bidistream grpc api

// filename: echo.proto
syntax = "proto3";

service EchoGrpc {
  rpc Bidirectional(stream Str) returns (stream Str) {}
}

message Str {
  string val = 1;
}

step 2: generate grpc codes

follow this

step 3: implement it and build a server

import Echo.Str;
import EchoGrpcGrpc.EchoGrpcImplBase;
import io.grpc.stub.StreamObserver;
import io.vertx.core.Vertx;
import io.vertx.grpc.server.GrpcServer;
import io.vertx.grpc.server.GrpcServiceBridge;

public class Main {
    
   public static void main(String... args) throws InterruptedException {
        var vertx = Vertx.vertx();
        
        var grpcServer = GrpcServer.server(vertx);
        var bridge = GrpcServiceBridge.bridge(new EchoGrpcImpl());
        bridge.bind(grpcServer);

        var httpServer = vertx.createHttpServer();
        httpServer.requestHandler(grpcServer);
        httpServer.listen(8080);
        
        Thread.currentThread().join();
    }
    
    static class EchoGrpcImpl extends EchoGrpcImplBase {

        @Override
        public StreamObserver<Str> bidirectional(StreamObserver<Str> responseObserver) {
            return new StreamObserver<Str>() {
                @Override public void onNext(Str value) {
                    System.out.println("on next " + value);
                    responseObserver.onNext(value);
                }

                @Override public void onError(Throwable t) {
                    System.out.println("on error");
                    t.printStackTrace(System.out);
                    responseObserver.onError(t);
                }

                @Override public void onCompleted() {
                    System.out.println("on completed");
                    responseObserver.onCompleted();
                }
            };
        }
    }
}

step 4: disconnect when calling the api

install grpcurl
brew install grpcurl
call, then press Ctrl + C

you might see:

$ while true; do echo '{"val":"hello,world"}'; sleep 1; done | grpcurl -proto echo.proto -plaintext -d @ localhost:8080 EchoGrpc/Bidirectional
{
  "val": "hello,world"
}
{
  "val": "hello,world"
}
{
  "val": "hello,world"
}
^C

and serverside:

on next val: "hello,world"

on next val: "hello,world"

on next val: "hello,world"

No completion or error message

hu-chia pushed a commit to hu-chia/vertx-grpc that referenced this issue Jun 19, 2024
@hu-chia hu-chia linked a pull request Jun 19, 2024 that will close this issue
hu-chia added a commit to hu-chia/vertx-grpc that referenced this issue Jun 19, 2024
hu-chia added a commit to hu-chia/vertx-grpc that referenced this issue Jun 24, 2024
hu-chia added a commit to hu-chia/vertx-grpc that referenced this issue Jul 19, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants