While coding an RxJava based program i’m facing an Exception that contain some recommendation of better handling:
io.reactivex.exceptions.OnErrorNotImplementedException: **The exception was not handled due to missing onError handler in the subscribe() method call.** Further reading: [https://github.com/ReactiveX/RxJava/wiki/Error-Handling](https://github.com/ReactiveX/RxJava/wiki/Error-Handling) | java.lang.RuntimeException: java.util.concurrent.ExecutionException: io.grpc.StatusRuntimeException: NOT_FOUND: CONTRACT_NOT_FOUND(11,d8368d0f): Contract could not be found with id 0054ff9fd1c537ece135de287c8fa14ff169ec9d6e16796ff7fe40562b6fb5d441
at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:704)
at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:701)
at io.reactivex.internal.subscribers.LambdaSubscriber.onError(LambdaSubscriber.java:79)
at io.reactivex.internal.subscribers.LambdaSubscriber.onNext(LambdaSubscriber.java:69)
at io.reactivex.internal.operators.flowable.FlowableFlattenIterable$FlattenIterableSubscriber.drain(FlowableFlattenIterable.java:312)
at io.reactivex.internal.operators.flowable.FlowableFlattenIterable$FlattenIterableSubscriber.onNext(FlowableFlattenIterable.java:174)
at io.reactivex.internal.operators.flowable.FlowableMap$MapSubscriber.onNext(FlowableMap.java:68)
at com.daml.grpc.adapter.client.rs.BufferingResponseObserver.lambda$onNext$2(BufferingResponseObserver.java:60)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.RuntimeException: java.util.concurrent.ExecutionException: io.grpc.StatusRuntimeException: NOT_FOUND: CONTRACT_NOT_FOUND(11,d8368d0f): Contract could not be found with id 0054ff9fd1c537ece135de287c8fa14ff169ec9d6e16796ff7fe40562b6fb5d441
at io.reactivex.internal.util.ExceptionHelper.wrapOrThrow(ExceptionHelper.java:46)
The thing is that the docs explain how to enhance the “Observe” and “Subscribe” method with callbacks (such as recommended above) but in my case i’m not explicitly subscribing nor observing but rather fetching the transactions and iterating over it like this:
I think there are manye different concepts shown (both implicitly and explicitly) in the code you are sharing. I’ll go through them separately:
the processTransaction method has a transaction as its input. This transaction comes from a service that allows you to query the Ledger API. The result of this call is returned to you as an RxJava stream on which you can use the methods described in the documentation you linked in your post
the entirety of the content of the processTransaction method makes use of methods that either use a Java stream (not to be confused with an RxJava stream) or invoke a blocking method on an RxJava stream (the blockingGet in the third to last line in the snippet you shared) – in this case, exception handling can happen as it normally would in any Java application, by wrapping a portion of code with the appropriate try ... catch block.
if you were to use the output of submitAndWait in a non-blocking fashion (i.e. not using the blockingGet method), its result type is Single, which you can see as an RxJava stream that ends after a single element is returned – as such, you could be using those same method for exception handling provided by RxJava
One final note: our example contains that usage of blockingGet for simplicity but it’s probably not necessarily something you want to do in a production application, as RxJava is designed for you to always work in this “stream processing” world where nothing ever stops. Do not take that example as a model for what you should be doing using reactive stream libraries like RxJava. I would recommend you keep learning more about RxJava through their documentation and use the accepted patterns they suggest.
It depends on what you need to do. If you need a simple callback, doOnError will do. But it depends entirely on what kind of exception you want to handle and what kind of response you expect the application to have in such case. The documentation you linked is a good source of information. Doing some experimentation can help you think better about what the different exception handling methods do and how they can help you implement the exception handling logic you need.
Thanks to the help of @cocreature i have successfully coded an application that performs as an observer by subscribing to the Ledger transaction stream.
However, during the few first seconds, after all the transactions are streamed to my end by the observable (Ledger) i encounter an Exception (below) that terminates the stream and prevents from my program to do the one thing it was designed to: listen to new Ledger’s contracts (of a certain template)
I’m trying to temporarily swallow the error in order to prevent from the stream to break. For that i added below callback but it never gets called. I followed the docs but cant seem to find a lead
io.reactivex.exceptions.OnErrorNotImplementedException: The exception was not handled due to missing onError handler in the subscribe() method call. Further reading: https://github.com/ReactiveX/RxJava/wiki/Error-Handling | io.grpc.StatusRuntimeException: INTERNAL: RST_STREAM closed stream. HTTP/2 error code: INTERNAL_ERROR
at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:704)
at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:701)
at io.reactivex.internal.subscribers.LambdaSubscriber.onError(LambdaSubscriber.java:79)
at io.reactivex.internal.operators.flowable.FlowableFlattenIterable$FlattenIterableSubscriber.checkTerminated(FlowableFlattenIterable.java:395)
at io.reactivex.internal.operators.flowable.FlowableFlattenIterable$FlattenIterableSubscriber.drain(FlowableFlattenIterable.java:255)
at io.reactivex.internal.operators.flowable.FlowableFlattenIterable$FlattenIterableSubscriber.onError(FlowableFlattenIterable.java:181)
at io.reactivex.internal.subscribers.BasicFuseableSubscriber.onError(BasicFuseableSubscriber.java:101)
at com.daml.grpc.adapter.client.rs.BufferingResponseObserver.lambda$onError$3(BufferingResponseObserver.java:81)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: io.grpc.StatusRuntimeException: INTERNAL: RST_STREAM closed stream. HTTP/2 error code: INTERNAL_ERROR
at io.grpc.Status.asRuntimeException(Status.java:535)
17:41:03.366 [client-0] ERROR c.d.g.a.SingleThreadExecutionSequencer - Unhandled exception in SingleThreadExecutionSequencer
The documentation for OnErrorNotImplementedException states that
This indicates that an Observable tried to call its observer’s onError() method, but that no such method existed. You can eliminate this by either fixing the Observable so that it no longer reaches an error condition, by implementing an onError handler in the observer, or by intercepting the onError notification before it reaches the observer by using one of the operators described elsewhere on this page.
After some learning and as i wrote over the slack, the reason for the doOnError callback not being invoked was that i haven’t chained it to the returned instance of the transaction stream. This callback however doesn’t process the Exception or swallows it but rather perform as an aspect to allow the client to “do something”
The onExceptionResumeNext will swallow it but it will still break the stream from being alive.
According to gRPC docs this is a server side issue.
Apologies for the delay; there appears to be an issue with gRPC connections that causes them to be closed after one minute. We’re currently looking into this and will update this post with more information.
You should definitely also implement retry logic in your code, though, as over any network, it’s possible for any number of reasons for services to temporarily lose connectivity with each other.
I think it makes sense to process errors as part of a method devoted to that. Apart from doOnError, there are more operators available for you to handle errors in specific ways. You can read more about them here.
transactionsTrees.retryWhen(retryHandler → retryHandler.flatMap(err →
observeRetries(err,partyId))).subscribe(tx →
processTransactionTree(tx,partyId),
e → processError(e));
I have not been able to invoke the observeRetries API when the publisher/flowable is returning UNAVAILABLE with status code as 503. I intended to handle the retry strategy based on error codes.
Do I need to append onErrorResumeNext in order to trap the publisher error and retry?