Using offsets

Hi!

I want to be crash-safe so I save the last consumed offset processed in an event listener.
How do I calculate the next offset to start reading from, as I don’t want to reprocess the event already handled?

2 Likes

In general, you can’t:

The format of absolute offsets is opaque to the client: no client-side transformation of an offset is guaranteed to return a meaningful offset.

(source: https://docs.daml.com/1.7.0/app-dev/grpc/proto-docs.html#ledgeroffset)

But the good news is that the starting offset you pass in requests is exclusive, meaning that you can safely memorize the last offset you received and use it to ask for events following the one for which you saved a valid offset. Note that the ending offset (if provided) is inclusive instead.

I’ll have a better look at the documentation, but it appears that this is in fact poorly documented.

Thanks for the good question!

2 Likes

I created this PR to document it. If you believe there are other parts of the documentation that can be improved in this regard, please add your review. :bowing_man:

3 Likes

Thanks!

Works perfectly fine, as long as one understands the threading model in rxjava :slight_smile:

“If you have no problems, buy a goat.”

Per

4 Likes

I supposes this means that this Transaction is not a ledger offset and cannot be used like this new LedgerOffset.Absolute(x.getOffset()). Am I right?

If so, how I can get the ledger offsets in a reliable way? I’m using the TransactionClient to get the stream of transactions.

I do like this: (and others do the same)


 var currentOffset = state.getOffset();
        var lo = (currentOffset == null) ? LedgerOffset.LedgerBegin.getInstance() : new LedgerOffset.Absolute(currentOffset);

transactionsClient.getTransactions(lo, transactionFilter, isLedgerVerbose, tokenVault.getSystemContext().getBearer())
                .subscribe(t -> {
                    log.info("PROCESSOR got transaction in thread " + Thread.currentThread() + " with " + t.getEvents().size() + " events, offset " + t.getOffset());
                    t.getEvents().forEach(e -> {
                        JsonElement je = gson.toJsonTree(e);
                        kafkaServiceImpl.send(je);
                    });
                    state.setOffset(t.getOffset());
                }, e -> {
                    log.error("flow failure, restart", e);
                    this.runIndefinitely(); // restart
                });

I get that, in fact I have something like this as well. I guess the real question is that does this mean Transaction returns an offset? If so, why is the return type not LedgerOffset?

Yes, the transaction returns an offset. I haven’t checked the exact type.

state.setOffset(t.getOffset());