Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

While client closing, the election listener throws an exception #1409

Open
sefamertkaya opened this issue Sep 25, 2024 · 1 comment
Open

Comments

@sefamertkaya
Copy link

sefamertkaya commented Sep 25, 2024

Versions

  • etcd: 3.5.16
  • jetcd: 0.8.3
  • java: 21

Describe the bug

While etcd client is closing, the onError method of the listener that I registered to the observer in the election class is triggered. This problem only occurs when client closing. When I put a breakpoint, I saw that it was happening while the ConnectionManager was closing.

To Reproduce

Client:

this.client = Client.builder()
    .connectTimeout(Duration.ofSeconds(this.sessionTimeout))
    .endpoints(this.endpoints.toArray(new String[]{}))
    .build();

Observer:

this.election.observe(this.electionByteSequence, new ElectionListener());

ElectionListener

private static class ElectionListener implements Listener {

  @Override
  public void onNext(LeaderResponse response) {
    System.out.println("Leader lease id:" + response.getKv().getLease());
  }

  @Override
  public void onError(Throwable throwable) {
    throwable.printStackTrace();
  }

  @Override
  public void onCompleted() {
    System.out.println("Completed");
  }
}

Close

public void disconnect() {
  this.leaseKeepAliveClient.close();
  this.lease.revoke(this.leaseId);
  this.client.close();
}

Exception

io.etcd.jetcd.common.exception.EtcdException: Channel shutdownNow invoked
	at io.etcd.jetcd.common.exception.EtcdExceptionFactory.newEtcdException(EtcdExceptionFactory.java:35)
	at io.etcd.jetcd.common.exception.EtcdExceptionFactory.fromStatus(EtcdExceptionFactory.java:83)
	at io.etcd.jetcd.common.exception.EtcdExceptionFactory.toEtcdException(EtcdExceptionFactory.java:79)
	at io.etcd.jetcd.common.exception.EtcdExceptionFactory.toEtcdException(EtcdExceptionFactory.java:74)
	at io.etcd.jetcd.impl.ElectionImpl.lambda$observe$3(ElectionImpl.java:119)
	at io.vertx.grpc.stub.StreamObserverReadStream.onError(StreamObserverReadStream.java:44)
	at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:481)
	at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:574)
	at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:72)
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:742)
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:723)
	at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
	at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
	at io.vertx.grpc.VertxChannelBuilder.lambda$null$0(VertxChannelBuilder.java:305)
	at io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:276)
	at io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:258)
	at io.vertx.grpc.VertxChannelBuilder.lambda$build$1(VertxChannelBuilder.java:305)
	at io.grpc.internal.SerializingExecutor.schedule(SerializingExecutor.java:102)
	at io.grpc.internal.SerializingExecutor.execute(SerializingExecutor.java:95)
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.closedInternal(ClientCallImpl.java:750)
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.closed(ClientCallImpl.java:690)
	at io.grpc.internal.RetriableStream$4.run(RetriableStream.java:840)
	at io.grpc.SynchronizationContext.drain(SynchronizationContext.java:94)
	at io.grpc.SynchronizationContext.execute(SynchronizationContext.java:126)
	at io.grpc.internal.RetriableStream.safeCloseMasterListener(RetriableStream.java:835)
	at io.grpc.internal.RetriableStream.access$2200(RetriableStream.java:55)
	at io.grpc.internal.RetriableStream$Sublistener.closed(RetriableStream.java:1028)
	at io.grpc.internal.ForwardingClientStreamListener.closed(ForwardingClientStreamListener.java:34)
	at io.grpc.internal.InternalSubchannel$CallTracingTransport$1$1.closed(InternalSubchannel.java:691)
	at io.grpc.internal.AbstractClientStream$TransportState.closeListener(AbstractClientStream.java:458)
	at io.grpc.internal.AbstractClientStream$TransportState.access$400(AbstractClientStream.java:221)
	at io.grpc.internal.AbstractClientStream$TransportState$1.run(AbstractClientStream.java:441)
	at io.grpc.internal.AbstractClientStream$TransportState.deframerClosed(AbstractClientStream.java:278)
	at io.grpc.internal.Http2ClientStreamTransportState.deframerClosed(Http2ClientStreamTransportState.java:31)
	at io.grpc.internal.MessageDeframer.close(MessageDeframer.java:234)
	at io.grpc.internal.AbstractStream$TransportState.closeDeframer(AbstractStream.java:199)
	at io.grpc.internal.AbstractClientStream$TransportState.transportReportStatus(AbstractClientStream.java:444)
	at io.grpc.internal.AbstractClientStream$TransportState.transportReportStatus(AbstractClientStream.java:400)
	at io.grpc.netty.NettyClientHandler$6.visit(NettyClientHandler.java:791)
	at io.netty.handler.codec.http2.DefaultHttp2Connection$ActiveStreams.forEachActiveStream(DefaultHttp2Connection.java:1007)
	at io.netty.handler.codec.http2.DefaultHttp2Connection.forEachActiveStream(DefaultHttp2Connection.java:209)
	at io.grpc.netty.NettyClientHandler.forcefulClose(NettyClientHandler.java:782)
	at io.grpc.netty.NettyClientHandler.write(NettyClientHandler.java:347)
	at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:895)
	at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:875)
	at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:984)
	at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:868)
	at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:863)
	at io.netty.channel.DefaultChannelPipeline.write(DefaultChannelPipeline.java:1010)
	at io.netty.channel.AbstractChannel.write(AbstractChannel.java:296)
	at io.grpc.netty.NettyClientTransport$6.run(NettyClientTransport.java:341)
	at io.grpc.netty.WriteQueue$RunnableCommand.run(WriteQueue.java:176)
	at io.grpc.netty.WriteQueue.flush(WriteQueue.java:128)
	at io.grpc.netty.WriteQueue.access$000(WriteQueue.java:35)
	at io.grpc.netty.WriteQueue$1.run(WriteQueue.java:47)
	at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173)
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166)
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:569)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base/java.lang.Thread.run(Thread.java:1583)

Expected behavior

I think the oncomplete method should be triggered when closing. Maybe I did something wrong. Is there a different order for closing?

@lburgazzoli
Copy link
Collaborator

@sefamertkaya do you have any time to digg into this ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

No branches or pull requests

2 participants