Command Service Queue Has Been Shutdown

Hi, it’s me again.

I encountered this exception in the middle of stress test but I cant find any error logs in domain,mediator,sequencer and participant nodes.

io.reactivex.exceptions.OnErrorNotImplementedException: The exception was not handled due to missing onError handler in the subscribe() method call. Further reading: https://github.com/ReactiveX/RxJava/wiki/Error-Handling | java.lang.RuntimeException: java.util.concurrent.ExecutionException: io.grpc.StatusRuntimeException: UNAVAILABLE: SERVICE_NOT_RUNNING(1,56665d39): Command service queue has been shut down.
        at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:704)
        at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:701)
        at io.reactivex.internal.subscribers.LambdaSubscriber.onError(LambdaSubscriber.java:79)
        at io.reactivex.internal.subscribers.LambdaSubscriber.onNext(LambdaSubscriber.java:69)
        at io.reactivex.internal.operators.flowable.FlowableRetryPredicate$RetrySubscriber.onNext(FlowableRetryPredicate.java:74)
        at io.reactivex.internal.operators.flowable.FlowableFlattenIterable$FlattenIterableSubscriber.drain(FlowableFlattenIterable.java:312)
        at io.reactivex.internal.operators.flowable.FlowableFlattenIterable$FlattenIterableSubscriber.onNext(FlowableFlattenIterable.java:174)
        at io.reactivex.internal.operators.flowable.FlowableMap$MapSubscriber.onNext(FlowableMap.java:68)
        at com.daml.grpc.adapter.client.rs.BufferingResponseObserver.lambda$onNext$2(BufferingResponseObserver.java:60)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.RuntimeException: java.util.concurrent.ExecutionException: io.grpc.StatusRuntimeException: UNAVAILABLE: SERVICE_NOT_RUNNING(1,56665d39): Command service queue has been shut down.
        at io.reactivex.internal.util.ExceptionHelper.wrapOrThrow(ExceptionHelper.java:46)
        at io.reactivex.internal.observers.BlockingMultiObserver.blockingGet(BlockingMultiObserver.java:93)
        at io.reactivex.Single.blockingGet(Single.java:2870)
        at com.hkjc.wpp.lpm.payout.service.listener.PoolStatusProcess.createNewPayoutSummary(PoolStatusProcess.java:268)
        at com.hkjc.wpp.lpm.payout.service.listener.PoolStatusProcess.poolPayoutStartedProcess(PoolStatusProcess.java:167)
        at com.hkjc.wpp.lpm.payout.service.listener.PoolStatusProcess.poolStatusProcess(PoolStatusProcess.java:69)
        at com.hkjc.wpp.lpm.payout.service.listener.PoolStatusListenService.lambda$grpcListenPoolStatusService$0(PoolStatusListenService.java:99)
        at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
        at com.hkjc.wpp.lpm.payout.service.listener.PoolStatusListenService.lambda$grpcListenPoolStatusService$1(PoolStatusListenService.java:86)
        at io.reactivex.internal.subscribers.LambdaSubscriber.onNext(LambdaSubscriber.java:65)
        ... 8 more
Caused by: java.util.concurrent.ExecutionException: io.grpc.StatusRuntimeException: UNAVAILABLE: SERVICE_NOT_RUNNING(1,56665d39): Command service queue has been shut down.
        at com.google.common.util.concurrent.AbstractFuture.getDoneValue(AbstractFuture.java:588)
        at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:567)
        at io.reactivex.internal.operators.flowable.FlowableFromFuture.subscribeActual(FlowableFromFuture.java:42)
        at io.reactivex.Flowable.subscribe(Flowable.java:14935)
        at io.reactivex.internal.operators.flowable.FlowableSingleSingle.subscribeActual(FlowableSingleSingle.java:39)
        at io.reactivex.Single.subscribe(Single.java:3666)
        at io.reactivex.Single.blockingGet(Single.java:2869)
        ... 15 more
Caused by: io.grpc.StatusRuntimeException: UNAVAILABLE: SERVICE_NOT_RUNNING(1,56665d39): Command service queue has been shut down.
        at io.grpc.Status.asRuntimeException(Status.java:535)
        at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:534)
        at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:562)
        at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:70)
        at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:743)
        at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:722)
        at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
        at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
        ... 3 more
2023-04-12 16:46:26.148 ERROR [client-0] com.daml.grpc.adapter.SingleThreadExecutionSequencer : Unhandled exception in SingleThreadExecutionSequencer.
io.reactivex.exceptions.OnErrorNotImplementedException: The exception was not handled due to missing onError handler in the subscribe() method call. Further reading: https://github.com/ReactiveX/RxJava/wiki/Error-Handling | java.lang.RuntimeException: java.util.concurrent.ExecutionException: io.grpc.StatusRuntimeException: UNAVAILABLE: SERVICE_NOT_RUNNING(1,56665d39): Command service queue has been shut down.
        at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:704)
        at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:701)
        at io.reactivex.internal.subscribers.LambdaSubscriber.onError(LambdaSubscriber.java:79)
        at io.reactivex.internal.subscribers.LambdaSubscriber.onNext(LambdaSubscriber.java:69)
        at io.reactivex.internal.operators.flowable.FlowableRetryPredicate$RetrySubscriber.onNext(FlowableRetryPredicate.java:74)
        at io.reactivex.internal.operators.flowable.FlowableFlattenIterable$FlattenIterableSubscriber.drain(FlowableFlattenIterable.java:312)
        at io.reactivex.internal.operators.flowable.FlowableFlattenIterable$FlattenIterableSubscriber.onNext(FlowableFlattenIterable.java:174)
        at io.reactivex.internal.operators.flowable.FlowableMap$MapSubscriber.onNext(FlowableMap.java:68)
        at com.daml.grpc.adapter.client.rs.BufferingResponseObserver.lambda$onNext$2(BufferingResponseObserver.java:60)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.RuntimeException: java.util.concurrent.ExecutionException: io.grpc.StatusRuntimeException: UNAVAILABLE: SERVICE_NOT_RUNNING(1,56665d39): Command service queue has been shut down.
        at io.reactivex.internal.util.ExceptionHelper.wrapOrThrow(ExceptionHelper.java:46)
        at io.reactivex.internal.observers.BlockingMultiObserver.blockingGet(BlockingMultiObserver.java:93)
        at io.reactivex.Single.blockingGet(Single.java:2870)
        at com.hkjc.wpp.lpm.payout.service.listener.PoolStatusProcess.createNewPayoutSummary(PoolStatusProcess.java:268)
        at com.hkjc.wpp.lpm.payout.service.listener.PoolStatusProcess.poolPayoutStartedProcess(PoolStatusProcess.java:167)
        at com.hkjc.wpp.lpm.payout.service.listener.PoolStatusProcess.poolStatusProcess(PoolStatusProcess.java:69)
        at com.hkjc.wpp.lpm.payout.service.listener.PoolStatusListenService.lambda$grpcListenPoolStatusService$0(PoolStatusListenService.java:99)
        at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
        at com.hkjc.wpp.lpm.payout.service.listener.PoolStatusListenService.lambda$grpcListenPoolStatusService$1(PoolStatusListenService.java:86)
        at io.reactivex.internal.subscribers.LambdaSubscriber.onNext(LambdaSubscriber.java:65)
        ... 8 common frames omitted
Caused by: java.util.concurrent.ExecutionException: io.grpc.StatusRuntimeException: UNAVAILABLE: SERVICE_NOT_RUNNING(1,56665d39): Command service queue has been shut down.
        at com.google.common.util.concurrent.AbstractFuture.getDoneValue(AbstractFuture.java:588)
        at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:567)
        at io.reactivex.internal.operators.flowable.FlowableFromFuture.subscribeActual(FlowableFromFuture.java:42)
        at io.reactivex.Flowable.subscribe(Flowable.java:14935)
        at io.reactivex.internal.operators.flowable.FlowableSingleSingle.subscribeActual(FlowableSingleSingle.java:39)
        at io.reactivex.Single.subscribe(Single.java:3666)
        at io.reactivex.Single.blockingGet(Single.java:2869)
        ... 15 common frames omitted
Caused by: io.grpc.StatusRuntimeException: UNAVAILABLE: SERVICE_NOT_RUNNING(1,56665d39): Command service queue has been shut down.
        at io.grpc.Status.asRuntimeException(Status.java:535)
        at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:534)
        at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:562)
        at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:70)
        at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:743)
        at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:722)
        at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
        at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
        ... 3 common frames omitted

Hi @raphson93 , I’m looking into this for you. At first glance, it looks like concurrently running process in a thread pool can’t find the GRPC service it’s looking for and this is causing a cascading series of errors.

Thanks @Dylan_Thinnes.
Keep restarting all the services in production environment was a real pain in the ass.
FYI the services are running with 32 threads and 64GB of ram.

Hi @raphson93 , some investigation revealed 2 potential issues, one in the java bindings and one in the ledger API server caused by a race. Both are now being tracked, thanks for the report :slight_smile:
I’m curious about your mention of needing to keep restarting all the services though. Do you mean restarting all the canton nodes? This bug would cause the ledger API to temporarily fail requests but should be retryable (Error Codes — Daml SDK 2.6.1 documentation). It shouldn’t be needed to restart canton. Have you tried handling the error in your application and retrying the request?

Hi @Thibault , thanks for getting back to me. Is there any GitHub issue tracker provided to track this bug? Will it be fixed in the upcoming release ?

Do you mean restarting all the canton nodes?

Yes. The services (mediator, sequencer, topology manager, participant) are hosted in AWS EKS. The app that threw the exception wont work until I restarted every single one of them. I will get my team to handle this exception soon. In the mean time is there any workaround/code snippet you can share to me as well ?

Hi @raphson93, thanks for the details. I’ll try to get back to you with an estimate when I have more info, I’ve just forwarded the issues to the relevant team. It’s surprising you had to restart the nodes, how often has this happened? Only the one time or is it somewhat recurrent? When you said the app won’t work until you restart, does it fail continually with the same error you posted or something different?
If it happens again it would be useful if before restarting you could run participant.health.dump() on your participant (replacing participant with the name of your participant). This will create a zip file with debugging information that could help further clarify the issue.

As far as a workaround to handle the exception I’d suggest to wrap the call in a try/catch, catch “OnErrorNotImplementedException” errors, find the initial cause and retry the call if it is a io.grpc.StatusRuntimeException: UNAVAILABLE: SERVICE_NOT_RUNNING. That’s definitely not ideal but I can’t see a better way for the time being.

Hi @Thibault , fyi I’m using Canton 2.6.1

how often has this happened? Only the one time or is it somewhat recurrent?

It happened occasionally.

When you said the app won’t work until you restart, does it fail continually with the same error you posted or something different?

It failed with the same error.

I will let you know once we hit into this error again.

Thank you, I got confirmation that this will be fixed in 2.7 :slight_smile:

Hi @Thibault , please refer attached logs below as requested: