How to expose transactions from the Java bindings through a blocking API?

Suppose I want to expose transactions coming from the Java bindings (with the gRPC/RxJava back pressure bridge) through a pull-based interface (that I have no control over). Something along the lines of:

@Override
public List<TransactionTrees> poll() {
   // 1. get a bounded quantity of transactions
   // 2. collect them in a list
   // 3. return the list of transactions
}

My first idea would be to record the latest observed offset and at every call of poll ask for the latest ledger end, read the transaction between the previous latest observed offset and the current ledger end and write the ledger end as the latest observed offset. Something along the lines of:

private LedgerClient ledgerClient;
private LedgerOffset latestOffset = LedgerOffset.LedgerBegin.getInstance(); // TODO Configure starting offset

@Override
public void start(String host, int port) {
  DamlLedgerClient.Builder clientBuilder = DamlLedgerClient.newBuilder(host, port);
  ledgerClient = clientBuilder.build();
}

@Override
public List<TransactionTree> poll() {
  List<TransactionTree> transactions =
      ledgerClient
          .getTransactionsClient()
          .getLedgerEnd()
          .toFlowable()
          .flatMap(
              ledgerEnd -> {
                Flowable<TransactionTree> transactions =
                    ledgerClient
                        .getTransactionsClient()
                        .getTransactionsTrees(
                            latestOffset,
                            ledgerEnd,
                            new FiltersByParty(
                                Map.of("Alice", NoFilter.instance)), // TODO Configure filter
                            false);
                latestOffset = ledgerEnd;
                return transactions;
              })
          .collectInto(
              new LinkedList<TransactionTree>(), List::add) // FIXME Unbounded read, might OOM
          .blockingGet(); // FIXME usage of blockingGet

  return records;
}

However this relies on using collectInto (not to mention blockinGet) and would likely cause an OOM if one reads from ledger begin for a party that sees a lot of transactions.
Would it make sense to rather lean onto the back pressure/flow control mechanism and instead keep an open stream that solves the problem by extracting a chunk of transactions from the stream at every round?

private Flowable<TransactionTree> transactions;
private final static POLL_SIZE = 10; // TODO configure poll size

@override
public void start(String host, int port) {
  DamlLedgerClient.Builder clientBuilder = DamlLedgerClient.newBuilder(host, port);
  LedgerClient ledgerClient = clientBuilder.build();
  transactions =
      ledgerClient
          .getTransactionsClient()
          .getTransactionsTrees(
              LedgerOffset.LedgerBegin.getInstance(), // TODO Configure starting offset
              new FiltersByParty(Map.of("Alice", NoFilter.instance)), // TODO Configure filter
              false);
}

@Override
public List<TransactionTree> poll() {
  return transactions // TODO address error handling
      .take(POLL_SIZE)
      .collectInto(new ArrayList<SourceRecord>(POLL_SIZE), List::add)
      .blockingGet(); // FIXME usage of blockingGet
}

I’m also not entirely satisfied with the idea of using blockingGet to reconcile the non-blocking Java bindings call with the blocking, pull-based API I have to implement. Do you have suggestions for that?

With regards to the usage of blockingGet, I’m wondering whether using a BlockingQueue to decouple the read and write path would be a good approach. As in the following snippet:

private Flowable<TransactionTree> transactions;

private static final int MAX_POLL_SIZE = 1000; // TODO configure poll size

private final BlockingQueue<TransactionTree> buffer = new ArrayBlockingQueue<>(MAX_POLL_SIZE);

@Override
public void start(String host, int port) {
  DamlLedgerClient.Builder clientBuilder = DamlLedgerClient.newBuilder(host, port);
  LedgerClient ledgerClient = clientBuilder.build();
  transactions =
      ledgerClient
          .getTransactionsClient()
          .getTransactionsTrees(
              LedgerOffset.LedgerBegin.getInstance(), // TODO configure starting offset
              new FiltersByParty(Map.of("Alice", NoFilter.instance)), // TODO configure filter
              false);
}



private void asynchronouslyReadTransactionsIntoTheBuffer() {
  synchronized (buffer) { // prevent two threads from competing on the remaining capacity
    transactions // TODO handle errors
        .take(buffer.remainingCapacity())
        .forEach(buffer::add); // TODO how to handle the returned Disposable?
  }
}

@Override
public List<TransactionTree> poll() {

  asynchronouslyReadTransactionsIntoTheBuffer();

  int bufferSize;

  // Block until there's something to read -- this is the contract for the `poll` method
  do {
    bufferSize = buffer.size();
  } while (bufferSize < 1);

  List<TransactionTree> pollingResult = new ArrayList<>(bufferSize);
  buffer.drainTo(pollingResult, bufferSize);

  return pollingResult;
}

This makes my original question even more relevant: can we lean on the built-in back pressure mechanism or are there drawbacks to this approach?

Yes. However, depending on your poll frequency, it may work better to close the stream once you’ve drawn enough transactions and reopen at the last offset.

1 Like

@stefanobaghino-da

import com.daml.ledger.javaapi.data.FiltersByParty
import com.daml.ledger.javaapi.data.LedgerOffset
import com.daml.ledger.javaapi.data.NoFilter
import com.daml.ledger.javaapi.data.TransactionTree
import com.daml.ledger.rxjava.DamlLedgerClient
import io.smallrye.mutiny.Multi
import io.smallrye.mutiny.Uni
import io.smallrye.mutiny.helpers.queues.Queues
import io.smallrye.mutiny.operators.multi.processors.UnicastProcessor
import io.smallrye.mutiny.unchecked.Unchecked
import java.time.Duration
import javax.enterprise.context.ApplicationScoped

@ApplicationScoped
class PollTest(
    private val ledgerClient: DamlLedgerClient
) {

    private val queue = Queues.createStrictSizeQueue<TransactionTree>(100)

    private var lastObservedOffset: LedgerOffset = LedgerOffset.LedgerBegin.getInstance()

    private var processor: UnicastProcessor<TransactionTree> = UnicastProcessor.create(queue, null)

    private var enqueuer = startProcessorForParty("Alice", LedgerOffset.LedgerBegin.getInstance()).subscribe()


    fun poll(): List<TransactionTree> {
        return Multi.createBy().repeating().uni<TransactionTree> { i ->
            val trans = queue.poll() // Get item from queue, could be null meaning queue is empty

            // If Enqueuer is stopped and the queue is less than 100, then restart the subscription
            if (enqueuer.asCompletionStage().isDone && queue.size > 100){
                enqueuer = startProcessorForParty("Alice", LedgerOffset.LedgerBegin.getInstance()).subscribe()
            }

            if (trans != null){
                // Complete Uni with the transaction item
                i.complete(trans)

            } else {
                i.fail(RuntimeException("Queue is empty"))
            }
        }.atMost(100)
            .onFailure().recoverWithCompletion()
            .collect().asList()
            .await().atMost(Duration.ofSeconds(5))
    }

    private fun startProcessorForParty(party: String, since: LedgerOffset, verbose: Boolean = false): Uni<Void> {
        val rxClient = ledgerClient.transactionsClient.getTransactionsTrees(
            since,
            FiltersByParty(
                mapOf(Pair(party, NoFilter.instance))
            ),
            verbose)

        return Multi.createFrom().publisher(rxClient).onItem().invoke(Unchecked.consumer { i ->
            if (queue.size < 100) {
                processor.onNext(i) // Add to queue
                lastObservedOffset = LedgerOffset.Absolute(i.offset)
            } else {
                processor.onComplete() // Once queue is full then stop processing stream
                throw RuntimeException("Queue is full")
            }
        }).toUni().replaceWithVoid()
            .onFailure().recoverWithNull()
    }
}

Untested so maybe a typo or two

My example use Mutiny but rx has the same concepts: UnicastProcessor (RxJava Javadoc 3.1.5)