Events stop coming - frustrating

Hi!

I get very strange behavior.

This is a Java snippet:

transactionsClient.getTransactions(lo, transactionFilter, true, tokenVault.getSystemContext().getBearer()).subscribe(t -> {

where lo is the ledger offset LedgerOffset.LedgerBegin.getInstance()

I create commands and see events getting read by this subscriber.
Suddenly there are no more events consumed even when I create new commands.
The subscriber thread doesn’t look dead, it is running.

Is there a 100% safe way to consume all these events as right now this is too unreliable to use?

I use version 1.15. 0 and also have to add this to compile:

    // https://discuss.daml.com/t/connection-exception-encountered-against-sandboxed-ledger/2366
    compile 'io.netty:netty-all:4.1.58.Final'

Thanks,
Per

1 Like

Hi @perbergman,

sorry that you’re having issues with this. getTransactions definitely should keep streaming transactions and not stop without cause.

A few questions/ideas:

  1. Do you have an example to reproduce this that you could share? Maybe a minified version of your actual code.
  2. It looks like you’re running against a ledger with authorization enabled. Can you reproduce the issue against a ledger without authorization? One possibility could be that your token expires although that should kill the stream not silently stop streaming transactions.
  3. You mentioned that the subscriber thread isn’t dead. How exactly are you checking that? Could it be that the thread is still alive but the rxjava stream has terminated (e.g. due to the token expiring as mentioned above? Maybe helpful to throw something on stream complete, e.g. via doOnTerminate to log it.
1 Like
  1. Not yet. It would take a while to simplify.
  2. My (machine2machine) token lives for 24 hours at least and I grab a new one at startup. This situation occurs within seconds of startup. I could try without the token. I don’t see any exception in the observer at all, I do log that.
  3. I print all the threads in the current thread group when creating new commands as well as in the observer thread. The thread is there after stopping consumption, but I can try to instrument the rxjava part.
1 Like
transactionsClient.getTransactions(lo, transactionFilter, true, tokenVault.getSystemContext().getBearer())
                .doAfterTerminate(() -> System.out.println("after terminate"))
                .doOnTerminate(() -> System.out.println("on terminate"))
                .doFinally(() -> System.out.println("finally"))
                .doOnNext(System.out::println)
                .subscribe(System.out::println, System.out::println);

Sometimes I don’t see any output at all.

If I shutdown the ledger I see:

on terminate
io.grpc.StatusRuntimeException: UNAVAILABLE: Server is shutting down
finally
after terminate

Something is seriously broken here!
Is there a good way to debug RxJava events?

1 Like

I found one issue maybe:

    @Override
    public void create(UserContext uc, Command cc, String template) {
        log.info("create --> ");
        DamlLedgerClient clt = this.connect(uc);
        CommandSubmissionClient submissionClient = clt.getCommandSubmissionClient();
        List<Command> toRun = Lists.newArrayList(cc);
        String org = uc.getOrg();

        submissionClient.submit(
                String.format("%s-%s-%d", template, org, 0),
                appId,
                genUUID(),
                org,
                toRun,
                uc.getBearer()
        );

//        try {
//            clt.close();
//        } catch (Exception e) {
//            log.error("close", e);
//        }

        log.info("created <-- ");
    }

It seems to work better when I am not closing the client. Maybe closing the connection ‘too fast’ will stop events going to RxJava?

What is the correct approach to not leak anything here?
Say you have the same end-user connecting many times as well as different end-users connecting many times? Will the underlying implementation handle that?

Looks like the underlying channels are expensive to create, so should I reuse a channel per JWT token?

What if the frontend is load balanced, the same user might have channels on multiple servers open?

1 Like

That code definitely looks problematic. If you close to fast then submit will probably fail because the connection is already closed. I’d try closing in a doOnTerminate of the submit or something like that.

As for connection management you don’t even multiple channels necessarily. If you look at submit for example, it allows you to pass in a token per request so you can use a single DamlLedgerClient for everything.

Running multiple requests over a single http2 connection works fairly well so depending on your load that might just do the trick. If not, some type of connection pool like gax-java/ChannelPool.java at d202f96bc2d0b341cc93e1cf090be41bad7f1f44 · googleapis/gax-java · GitHub might be helpful.

1 Like

Thanks!

So I need to create the first connection with a JWT token for some party, but then the submit can use some other JWT token. That is quite confusing!

Anyway, this works fine now.

1 Like

The token you pass on construction of DamlLedgerClient acts like a default token that will be used if you do not overwrite by passing one on specific requests.

I believe you can’t quite get away with omitting it completely even if you overwrite it everywhere because DamlLedgerClient::connect uses it to query the ledger id initially. However, that only requires public claims so you need a valid token but no actAs or readAs for any parties.

I think it’s a reasonable feature request to provide an API that skips that initial ledger id query (which would mean you have to specify the ledger id) and thereby doesn’t require a default token at all.

2 Likes

Makes sense.
I have a ‘system’ user (using machine-machine tokens) because I need to listen to events immediately.

1 Like