RxJava Error Handling - How to add error callback

Hi there,

While coding an RxJava based program i’m facing an Exception that contain some recommendation of better handling:


io.reactivex.exceptions.OnErrorNotImplementedException: **The exception was not handled due to missing onError handler in the subscribe() method call.** Further reading: [https://github.com/ReactiveX/RxJava/wiki/Error-Handling](https://github.com/ReactiveX/RxJava/wiki/Error-Handling) | java.lang.RuntimeException: java.util.concurrent.ExecutionException: io.grpc.StatusRuntimeException: NOT_FOUND: CONTRACT_NOT_FOUND(11,d8368d0f): Contract could not be found with id 0054ff9fd1c537ece135de287c8fa14ff169ec9d6e16796ff7fe40562b6fb5d441
	at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:704)
	at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:701)
	at io.reactivex.internal.subscribers.LambdaSubscriber.onError(LambdaSubscriber.java:79)
	at io.reactivex.internal.subscribers.LambdaSubscriber.onNext(LambdaSubscriber.java:69)
	at io.reactivex.internal.operators.flowable.FlowableFlattenIterable$FlattenIterableSubscriber.drain(FlowableFlattenIterable.java:312)
	at io.reactivex.internal.operators.flowable.FlowableFlattenIterable$FlattenIterableSubscriber.onNext(FlowableFlattenIterable.java:174)
	at io.reactivex.internal.operators.flowable.FlowableMap$MapSubscriber.onNext(FlowableMap.java:68)
	at com.daml.grpc.adapter.client.rs.BufferingResponseObserver.lambda$onNext$2(BufferingResponseObserver.java:60)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.RuntimeException: java.util.concurrent.ExecutionException: io.grpc.StatusRuntimeException: NOT_FOUND: CONTRACT_NOT_FOUND(11,d8368d0f): Contract could not be found with id 0054ff9fd1c537ece135de287c8fa14ff169ec9d6e16796ff7fe40562b6fb5d441
	at io.reactivex.internal.util.ExceptionHelper.wrapOrThrow(ExceptionHelper.java:46)

The thing is that the docs explain how to enhance the “Observe” and “Subscribe” method with callbacks (such as recommended above) but in my case i’m not explicitly subscribing nor observing but rather fetching the transactions and iterating over it like this:

private void processTransaction(Transaction tx) {
        logger.info("processTransaction: {}", tx.toString());

        List<Event> exerciseEvents = tx.getEvents();

        List<Command> exerciseCommands = exerciseEvents.stream()
                            .filter(e -> e instanceof CreatedEvent)
                            .map(e -> (CreatedEvent) e)
                            .flatMap(e -> processEvent(tx.getWorkflowId(), e))
                            .collect(Collectors.toList());

        if (!exerciseCommands.isEmpty()) {
            client.getCommandClient().submitAndWait(
                    tx.getWorkflowId(), APPLICATION_ID, UUID.randomUUID().toString(), tpaPartyId, exerciseCommands)
                    .blockingGet();
        }
    }

So how do i still enhance my flow to catch and handle such Exception?

I think there are manye different concepts shown (both implicitly and explicitly) in the code you are sharing. I’ll go through them separately:

  1. the processTransaction method has a transaction as its input. This transaction comes from a service that allows you to query the Ledger API. The result of this call is returned to you as an RxJava stream on which you can use the methods described in the documentation you linked in your post
  2. the entirety of the content of the processTransaction method makes use of methods that either use a Java stream (not to be confused with an RxJava stream) or invoke a blocking method on an RxJava stream (the blockingGet in the third to last line in the snippet you shared) – in this case, exception handling can happen as it normally would in any Java application, by wrapping a portion of code with the appropriate try ... catch block.
  3. if you were to use the output of submitAndWait in a non-blocking fashion (i.e. not using the blockingGet method), its result type is Single, which you can see as an RxJava stream that ends after a single element is returned – as such, you could be using those same method for exception handling provided by RxJava

One final note: our example contains that usage of blockingGet for simplicity but it’s probably not necessarily something you want to do in a production application, as RxJava is designed for you to always work in this “stream processing” world where nothing ever stops. Do not take that example as a model for what you should be doing using reactive stream libraries like RxJava. I would recommend you keep learning more about RxJava through their documentation and use the accepted patterns they suggest.

Thanks @stefanobaghino-da ,

Just to make sure i follow you: Your #1 advise is to add one of those error handlers as shown in below?

It depends on what you need to do. If you need a simple callback, doOnError will do. But it depends entirely on what kind of exception you want to handle and what kind of response you expect the application to have in such case. The documentation you linked is a good source of information. Doing some experimentation can help you think better about what the different exception handling methods do and how they can help you implement the exception handling logic you need.

1 Like

Hi @stefanobaghino-da ,

Thanks to the help of @cocreature i have successfully coded an application that performs as an observer by subscribing to the Ledger transaction stream.

However, during the few first seconds, after all the transactions are streamed to my end by the observable (Ledger) i encounter an Exception (below) that terminates the stream and prevents from my program to do the one thing it was designed to: listen to new Ledger’s contracts (of a certain template)

I’m trying to temporarily swallow the error in order to prevent from the stream to break. For that i added below callback but it never gets called. I followed the docs but cant seem to find a lead

io.reactivex.exceptions.OnErrorNotImplementedException: The exception was not handled due to missing onError handler in the subscribe() method call. Further reading: https://github.com/ReactiveX/RxJava/wiki/Error-Handling | io.grpc.StatusRuntimeException: INTERNAL: RST_STREAM closed stream. HTTP/2 error code: INTERNAL_ERROR
	at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:704)
	at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:701)
	at io.reactivex.internal.subscribers.LambdaSubscriber.onError(LambdaSubscriber.java:79)
	at io.reactivex.internal.operators.flowable.FlowableFlattenIterable$FlattenIterableSubscriber.checkTerminated(FlowableFlattenIterable.java:395)
	at io.reactivex.internal.operators.flowable.FlowableFlattenIterable$FlattenIterableSubscriber.drain(FlowableFlattenIterable.java:255)
	at io.reactivex.internal.operators.flowable.FlowableFlattenIterable$FlattenIterableSubscriber.onError(FlowableFlattenIterable.java:181)
	at io.reactivex.internal.subscribers.BasicFuseableSubscriber.onError(BasicFuseableSubscriber.java:101)
	at com.daml.grpc.adapter.client.rs.BufferingResponseObserver.lambda$onError$3(BufferingResponseObserver.java:81)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: io.grpc.StatusRuntimeException: INTERNAL: RST_STREAM closed stream. HTTP/2 error code: INTERNAL_ERROR
	at io.grpc.Status.asRuntimeException(Status.java:535)
17:41:03.366 [client-0] ERROR c.d.g.a.SingleThreadExecutionSequencer - Unhandled exception in SingleThreadExecutionSequencer

My code:

public void runIndefinitely() {

        Flowable<Transaction> transactions = client.getTransactionsClient().getTransactions(
                LedgerOffset.LedgerBegin.getInstance(),
                new FiltersByParty(Collections.singletonMap(tpaPartyId, NoFilter.instance)),
                true, DownloadPolicyPagesFromLedger.tpaAccessToken);

        transactions.doOnError(throwable -> handleErrors(throwable) );

        transactions.onExceptionResumeNext(throwable -> System.err.println("do nothing"));

        transactions.forEach(this::processTransaction);
    }

    private void handleErrors(Throwable throwable) {
        System.err.println("Error Occurred on transaction Observable: "+ throwable.getMessage());
    }

The documentation for OnErrorNotImplementedException states that

This indicates that an Observable tried to call its observer’s onError() method, but that no such method existed. You can eliminate this by either fixing the Observable so that it no longer reaches an error condition, by implementing an onError handler in the observer, or by intercepting the onError notification before it reaches the observer by using one of the operators described elsewhere on this page.

I wonder if you’re using the right error handler.

Hi @Leonid_Rozenberg

After some learning and as i wrote over the slack, the reason for the doOnError callback not being invoked was that i haven’t chained it to the returned instance of the transaction stream. This callback however doesn’t process the Exception or swallows it but rather perform as an aspect to allow the client to “do something”

The onExceptionResumeNext will swallow it but it will still break the stream from being alive.

According to gRPC docs this is a server side issue.

INTERNAL: RST_STREAM closed stream. HTTP/2 error code: INTERNAL_ERROR

After debugging my client i saw the below that might lead to the reason why the Observable Ledger shut down the stream (no more elements):

image

How do gain access to the Ledger logs in order to investigate the reason?

gRPC forum on this exact error

gRPC status codes (checkout #13)

Anything? Anyone?

Are you using Daml Hub or do you operate your own participant node attached to some ledger?

Hi @stefanobaghino-da , I’m using Daml Hub

Apologies for the delay; there appears to be an issue with gRPC connections that causes them to be closed after one minute. We’re currently looking into this and will update this post with more information.

You should definitely also implement retry logic in your code, though, as over any network, it’s possible for any number of reasons for services to temporarily lose connectivity with each other.

Hi @dtanabe
I am also looking for a solution to gRPC Server errors which completely disconnects daml CompletionStreams/TransactionStreams at client.

I noticed the gRPC has following retry mechanism implemented.

Is daml library capable to configure the retries like mentioned here?

or is there any alternate mechanism we can use for daml java ledger apis?

What was the final conclusion of the error callback?

transactionsTrees.subscribe(tx -> processTransactionTree(tx,partyId), e -> processError(e));

Even with the above subscription to error callback, I am getting a similar error like io.reactivex.exceptions.OnErrorNotImplementedException:

Do I need to implement the transactionsTrees.doOnError(Consumer<? super Throwable> onError) ?

I think it makes sense to process errors as part of a method devoted to that. Apart from doOnError, there are more operators available for you to handle errors in specific ways. You can read more about them here.

transactionsTrees.retryWhen(retryHandler → retryHandler.flatMap(err →
observeRetries(err,partyId))).subscribe(tx →
processTransactionTree(tx,partyId),
e → processError(e));

I have not been able to invoke the observeRetries API when the publisher/flowable is returning UNAVAILABLE with status code as 503. I intended to handle the retry strategy based on error codes.

Do I need to append onErrorResumeNext in order to trap the publisher error and retry?