Hi!
I get very strange behavior.
This is a Java snippet:
transactionsClient.getTransactions(lo, transactionFilter, true, tokenVault.getSystemContext().getBearer()).subscribe(t -> {
where lo is the ledger offset LedgerOffset.LedgerBegin.getInstance()
I create commands and see events getting read by this subscriber.
Suddenly there are no more events consumed even when I create new commands.
The subscriber thread doesn’t look dead, it is running.
Is there a 100% safe way to consume all these events as right now this is too unreliable to use?
I use version 1.15. 0 and also have to add this to compile:
// https://discuss.daml.com/t/connection-exception-encountered-against-sandboxed-ledger/2366
compile 'io.netty:netty-all:4.1.58.Final'
Thanks,
Per
1 Like
Hi @perbergman,
sorry that you’re having issues with this. getTransactions
definitely should keep streaming transactions and not stop without cause.
A few questions/ideas:
- Do you have an example to reproduce this that you could share? Maybe a minified version of your actual code.
- It looks like you’re running against a ledger with authorization enabled. Can you reproduce the issue against a ledger without authorization? One possibility could be that your token expires although that should kill the stream not silently stop streaming transactions.
- You mentioned that the subscriber thread isn’t dead. How exactly are you checking that? Could it be that the thread is still alive but the rxjava stream has terminated (e.g. due to the token expiring as mentioned above? Maybe helpful to throw something on stream complete, e.g. via doOnTerminate to log it.
1 Like
transactionsClient.getTransactions(lo, transactionFilter, true, tokenVault.getSystemContext().getBearer())
.doAfterTerminate(() -> System.out.println("after terminate"))
.doOnTerminate(() -> System.out.println("on terminate"))
.doFinally(() -> System.out.println("finally"))
.doOnNext(System.out::println)
.subscribe(System.out::println, System.out::println);
Sometimes I don’t see any output at all.
If I shutdown the ledger I see:
on terminate
io.grpc.StatusRuntimeException: UNAVAILABLE: Server is shutting down
finally
after terminate
Something is seriously broken here!
Is there a good way to debug RxJava events?
1 Like
I found one issue maybe:
@Override
public void create(UserContext uc, Command cc, String template) {
log.info("create --> ");
DamlLedgerClient clt = this.connect(uc);
CommandSubmissionClient submissionClient = clt.getCommandSubmissionClient();
List<Command> toRun = Lists.newArrayList(cc);
String org = uc.getOrg();
submissionClient.submit(
String.format("%s-%s-%d", template, org, 0),
appId,
genUUID(),
org,
toRun,
uc.getBearer()
);
// try {
// clt.close();
// } catch (Exception e) {
// log.error("close", e);
// }
log.info("created <-- ");
}
It seems to work better when I am not closing the client. Maybe closing the connection ‘too fast’ will stop events going to RxJava?
What is the correct approach to not leak anything here?
Say you have the same end-user connecting many times as well as different end-users connecting many times? Will the underlying implementation handle that?
Looks like the underlying channels are expensive to create, so should I reuse a channel per JWT token?
What if the frontend is load balanced, the same user might have channels on multiple servers open?
1 Like
That code definitely looks problematic. If you close to fast then submit
will probably fail because the connection is already closed. I’d try closing in a doOnTerminate
of the submit
or something like that.
As for connection management you don’t even multiple channels necessarily. If you look at submit
for example, it allows you to pass in a token per request so you can use a single DamlLedgerClient
for everything.
Running multiple requests over a single http2 connection works fairly well so depending on your load that might just do the trick. If not, some type of connection pool like gax-java/ChannelPool.java at d202f96bc2d0b341cc93e1cf090be41bad7f1f44 · googleapis/gax-java · GitHub might be helpful.
1 Like
Thanks!
So I need to create the first connection with a JWT token for some party, but then the submit can use some other JWT token. That is quite confusing!
Anyway, this works fine now.
1 Like
The token you pass on construction of DamlLedgerClient
acts like a default token that will be used if you do not overwrite by passing one on specific requests.
I believe you can’t quite get away with omitting it completely even if you overwrite it everywhere because DamlLedgerClient::connect
uses it to query the ledger id initially. However, that only requires public claims so you need a valid token but no actAs
or readAs
for any parties.
I think it’s a reasonable feature request to provide an API that skips that initial ledger id query (which would mean you have to specify the ledger id) and thereby doesn’t require a default token at all.
2 Likes
Makes sense.
I have a ‘system’ user (using machine-machine tokens) because I need to listen to events immediately.
1 Like