How to safely cancel a gRPC ledger stream?

I’m using the java bindings, specifically the following stub to request completions from the ledger:

https://docs.daml.com/app-dev/bindings-java/javadocs/com/daml/ledger/api/v1/CommandCompletionServiceGrpc.CommandCompletionServiceStub.html

What I would like to know is how I can safely cancel this stream (without shutting down the entire GRPC channel).

I think this is a general question that can be applied to any stub returning a io.grpc.stub.StreamObserver.

Is there a specific reason why you are using the raw gRPC-generated code instead of the Java bindings?

As for raw gRPC-generated code, I would recommend you have a look at the gRPC docs here, there’s also a pointer to an example.

Hi, sorry for the late reply.

Is there a specific reason why you are using the raw gRPC-generated code instead of the Java bindings ?

I’m using a different streaming library, and I’ve had a mediocre experience with RX before. Also, I think RX1 has reach end-of-life already.

Thanks for the link; I will try and implement that.

What streaming library are you using and what advantages does it have over RxJava?

Indeed RxJava 2 (which is used by the Java bindings) is not actively developed anymore. The current plan is to transition to RxJava 3 to minimize disruptions and ensure a smooth transition to existing users.

Currently using fs2. It uses some scala-specific language features so won’t work in Java, I think. But the design is quite different. It has

  • No side-effects, so it’s easier to reason about
  • It’s pull-based (rather than push-based), making working with back-pressure much easier
  • It has good concurrency primitives.

The previous experience I had with rx is that it was good for asynchronous programming, but it was lacking as soon as you started working with concurrency. But tbh that was RX1, quite a while ago.

1 Like

Got it, thank you!

As pointed out by stefano, there is an example of how to do this in java. The key is that in the daml API we provide a ResponseObserver which doesn’t have a cancel method, but it can be cast into a a ClientCall StreamObserver , as explained in this comment:

This provides access to a cancel() method, which does what we want.