How to handle connection error using subscriptions to DamlLedgerClient services?

I think the following is kind of the standard way to subscribe to transactions (or other ledger services that return Flowable<T>):

// Some values are omitted/simplified for brevity.
var damlLedgerClient = DamlLedgerClient.newBuilder("localhost", 6865).build();
damlLedgerClient.connect();

ContractFilter<?> contractFilter = null;
var offset = LedgerOffset.LedgerEnd.getInstance();
var parties = Set.<String>of();
var accessToken = "really-secure";
damlLedgerClient
    .getTransactionsClient()
    .getTransactions(contractFilter, offset, parties, false, accessToken)
    .retry()
    .subscribe(transaction -> System.out.printf("Received transaction: %s%n", transaction.getTransactionId()));

As network connection issues may occur to the ledger, it begs to question what is a good way to handle those? In my experience using the retry capabilities of RxJava (e.g. adding some form of retry()) is sufficient. Can this be confirmed?

This is rather a question targeting RxJava itself, and how com.daml.grpc.adapter.client.rs.ClientPublisher behaves in these cases.

Please, ignore handling offset and access tokens.

1 Like

Yes, retry will do the job, but your implementation needs a couple of improvements to work as intended in practice:

  1. Delay your retries. If you had a 5s network outage, the above code would probably attempt many hundreds of connections. You may get throttled by network infrastructure or firewalls.
  2. You’ll always resubscribe from the offset (LedgerEnd) so you may miss events. Say you process up to offset A, then lose connection, and manage to re-connect at offset B, you’ll not receive any transactions between A and B.

You can solve both of these by

  1. Using retryWhen with a predicate containing a dynamic delay.
  2. Wrapping request in a Flowable.defer() so that you can set the offset anew on every cycle.

Thank you, this is fine. Actual implementation does utilise retryWhen in an exponential backoff fashion. Offset is tracked using atomic variable:

ContractFilter<?> contractFilter = null;
var offset = new AtomicReference<LedgerOffset>(LedgerOffset.LedgerBegin.getInstance());
var parties = Set.<String>of();
var accessToken = "really-secure";
damlLedgerClient
    .getTransactionsClient()
    .getTransactions(contractFilter, offset.get(), parties, false, accessToken)
    .doOnNext(transaction -> offset.set(new LedgerOffset.Absolute(transaction.getOffset())))
    .retry()
    .subscribe(transaction -> System.out.printf("Received transaction: %s%n", transaction.getTransactionId()));

Good stuff. But don’t forget the defer. Otherwise your new offset doesn’t take effect. I’d furthermore do the retry before the doOnNext otherwise you’ll retry on errord from the doOnNext, which are likely pure and unrecoverable.

Flowable.defer(() -> {
    damlLedgerClient
        .getTransactionsClient()
        .getTransactions(contractFilter, offset.get(), parties, false, accessToken)        
    })
    .retryWhen(...)
    .doOnNext(transaction -> offset.set(new LedgerOffset.Absolute(transaction.getOffset())))
    .subscribe(transaction -> System.out.printf("Received transaction: %s%n", transaction.getTransactionId()));
1 Like