An approach to handling Participant Node failure

Hey everyone! I’m just looking for feedback on an approach to handling Participant Node failure, and would love to know your thoughts or any flaws with the approach.

Data Streaming and Failover as a Ledger API Client

As a Ledger API Client, henceforth known as ‘User’, you may choose to use the Command Submission Service and Command Completion Service when making requests to a Participant Node, which serves the Ledger API.

There may be cases where a Command Submission has been sent, but the Participant Node serving the Ledger API fails before the User can guarantee that the request has been accepted. This can happen in two possible ways:

  • Scenario 1: The User makes a request and does not receive a response from the Command Submission Service

  • Scenario 2: The User makes a request which is accepted by the Command Submission Service, but does not receive a response from the Command Completion Service.

Certain ledger topologies will support hosting a party on multiple Participant Nodes. This means that parties have higher fault tolerance as they can rely on a secondary Participant Node if a primary fails.

This article will focus on how to handle Scenario 1 in these ledger topologies, along with complementary code samples in Java.

Scenario 1:

Let us assume that a user has already connected to the Participant Node, and has a DamlLedgerClient instance, which is obtained after connecting to the Participant Node’s IP and Port, using the appropriate access token. They then submit a command using the Command Submission Service, while blocking for the response, as shown in this representative code sample:


CommandSubmissionClient commandSubmissionClient = damlLedgerClient.getCommandSubmissionClient()

/* The submission parameters are defined in the 

Ledger API Documentation avaialble on the DAML website*/

commandSubmissionClient.submit(

                workflowId,

                appId,

                commandId,

                submitter,

                Optional.empty(),

                Optional.empty(),

                Optional.of(deduplicationTime),

                commandList

           ).blockingGet();

  return "OK";

The alternative to blocking would be to use an Observer, which would be slightly different, but the general idea remains. Upon returning OK, the user can guarantee that the Participant Node has accepted the request. However, in the scenario that the Participant Node crashes or fails before the request can be accepted, the following error will be thrown (if using an Observer implementation, the onError function would be invoked instead of an exception being thrown):


io.grpc.StatusRuntimeException: UNAVAILABLE: io exception

As a fundamental rule for any distributed system, failures will occur, and as such, we must be ready to catch this error and deal with it appropriately. Let’s demonstrate how we can catch this error and appropriately failover to our next appropriate Participant Node. The following code sample will showcase a minimal example of handling failure, with assumptions as outlined in the comments. Use this as a guideline when implementing your own system with a similar topology.


import com.daml.ledger.javaapi.data.Command;

import com.daml.ledger.rxjava.CommandSubmissionClient;

import com.daml.ledger.rxjava.DamlLedgerClient;

import io.grpc.Status;

import java.util.ArrayList;

import java.util.List;

import java.util.Optional;

public class FailOverRetryExample {

  public static void main(String[] args) {

    DamlLedgerClient damlLedgerClient;

    damlLedgerClient = DamlLedgerClient.newBuilder("localhost", 6865).build();

    /* We connect to one of our designated Participant Node (Client_1). This throws an Exception on failure

    but for our minimal example, we shall assume that the initial connection is successful*/

    damlLedgerClient.connect();

    /*We fill in arbitrary request parameters, which would differ depending on your use-case/implementation.

    We leave the commandList empty for simplicity, however, an empty command list will result in an INVALID_ARGUMENT response if the request does not fail for other reasons

    on command submission.*/

    String workflowId = "workflow-1";

    String commandId = "commandId-1";

    List<Command> commandList = new ArrayList<>();

    String submitter = "Alice";

    /* Let us assume that Client_1 goes OFFLINE or FAILS now*/

    

    /* We attempt to submit our request, unknowning that Client_1 has crashed */

    Status.Code responseCode =

        submitCommand(damlLedgerClient, workflowId, commandId, commandList, submitter);

    /*If the responseCode is 'UNAVAILABLE', it means the Participant Node is

    having issues and we should first attempt a backoff and retry mechanism*/

    if (responseCode.equals(Status.UNAVAILABLE.getCode())) {

      /* Retry five times using a simple backoff strategy*/

      for (int i = 0; i < 5; i++) {

        try {

          Thread.sleep(getWaitTime(i));

        } catch (InterruptedException e) {

          log("Unexpected Sleep Interruption");

        }

        /* We retry using the SAME paramaters as before */

        responseCode =

            submitCommand(damlLedgerClient, workflowId, commandId, commandList, submitter);

        if (!responseCode.equals(Status.UNAVAILABLE.getCode())) {

          /* The Node is online and our retry mechanism has worked,

          we can now handle the non-UNAVAILABLE status*/

          break;

        }

      }

    }

    /*If our last responseCode was still UNAVAILABLE, our back-off and Retry mechanism has failed, we shall instead fail-over to our next designated Participant Node*/

    if (responseCode.equals(Status.UNAVAILABLE.getCode())) {

      try {

      /* We attempt to close the old connection. Failure to close is not critical */

        damlLedgerClient.close();

      } catch (Exception e) {

        log("Unable to close old client connection, ignoring");

      }

      /* We create a new connection with our backup Participant Node (Client_2) */

      damlLedgerClient = DamlLedgerClient.newBuilder("localhost", 7865).build();

      damlLedgerClient.connect();

      /* We submit the same request */

      responseCode = submitCommand(damlLedgerClient, workflowId, commandId, commandList, submitter);

      /*We could continue to error check this responseCode and handle, but we assume for the simplicity that Client_2 not require retry/fail-over handling*/

    }

  }

  public static Status.Code submitCommand(

      DamlLedgerClient damlLedgerClient,

      String workflowId,

      String commandId,

      List<Command> commandList,

      String submitter) {

    try {

      CommandSubmissionClient commandSubmissionClient =

          damlLedgerClient.getCommandSubmissionClient();

      commandSubmissionClient

          .submit(

              workflowId,

              "test-app",

              commandId,

              submitter,

              Optional.empty(),

              Optional.empty(),

              Optional.empty(),

              commandList)

          .blockingGet();

    } catch (Throwable throwable) {

      Status grpcStatus = Status.fromThrowable(throwable);

      return grpcStatus.getCode();

    }

    return Status.Code.OK;

  }

  public static long getWaitTime(Integer attemptNumber) {

    /* We use a simple back-off strategy for the wait time */

    double waitTimeDub = 5000 * (attemptNumber + 1);

    return new Double(waitTimeDub).longValue();

  }

  private static void log(String log) {

    System.out.println(log);

  }

}

By following along the code sample, we see a rough outline of how one may implement a retry/fail-over system. We assume that connecting to the Participant Nodes, who serve the Ledger API, does not require an authentication token, however, a real system would require one.

We can also safely resubmit the same command to the backup node (Client_2), without fear of duplicate transactions in the scenario that Client_1 accepted the request but failed to give a response. This is because internally, a ledger topology supporting the same party on multiple nodes would internally use a deduplication mechanism which will silently drop any duplicate requests, where a duplicate request is defined as having the same CommandId, which we specified as an argument in the request.

Open Questions:

  • Are there any other gRPC status we should consider for failing over? e.g. UNKNOWN_STATUS
3 Likes

Thanks for sharing this @zoraizmahmood. Despite the open question, I’d suggest moving this to the Tutorials and Guides section. If nothing else, everyone should take this statement to heart:

:clap:

I’m not sure about what other status codes on the CommandSubmissionService would best be treated by a retry, but I’m sure there are some. For example RESOURCE_EXHAUSTED. Maybe someone closer to this topic could try to give a complete list.

There is another scenario in which the same retry mechanism works though: Not getting a response at all. Maybe you send something to the CommandSubmissionService and just get a timeout. Or maybe you get an OK from the CommandSubmissionService, but no response from the CompletionService. Either way, you don’t know where your command got stuck. It has got lost in the void. Thanks to the command deduplication mechanism you describe, you can just retry until you get a response.

[EDIT: I now see that what I’m describing here is your Scenario 2. So really what I’m saying is: Scenarios 1 and 2 can be treated the same way: retry with backoff followed by failover]

[EDIT2: In case it wasn’t clear from the above, I think you’ve got this right so my feedback is “publish it as a guide”]

2 Likes

I would recommend referring to the Ledger API reference (links to the CommandSubmissionService and CommandService for version 1.9). Each service’s return codes are documented for each endpoint. I would say that for RESOURCE_EXHAUSTED and UNAVAILABLE are the ones that definitely call for retries, possibly with some form of exponential back-off strategy. UNAUTHENTICATED, PERMISSION_DENIED, NOT_FOUND and INVALID_ARGUMENT all (most likely) indicate some form of client-side error, so those errors should be simply reported in the appropriate way. Also consider the implicit possibility of getting back an INTERNAL, which could either indicate a bug on the participant or a transient error. I would probably use a retry strategy there as well to handle the latter case, but repeated INTERNAL errors might mean that the participant is experiencing issues.

3 Likes

One slightly annoying caveat is that you also get RESOURCE_EXHAUSTED for messages that are too large where retrying is not going to work. See BackPressure for more details.

3 Likes

+1 moving this excellent guide to the Tutorials and Guides section!

2 Likes

Curious what your thoughts are on Error message string matching, as seen in the RESOURCE_EXHAUSTED match below. For example, this is what my updated view is on when to back-off/retry/failover, would also love your thoughts on it:

  private static boolean isRetryError(Status status) {
    if (status.isOk()) return false;
    Status.Code errCode = status.getCode();
    if (errCode.equals(Status.Code.ABORTED)) return true;
    else if (errCode.equals(Status.Code.INTERNAL)) return true;
    else if (errCode.equals(Status.Code.UNKNOWN)) return true;
    else if (errCode.equals(Status.Code.UNAVAILABLE)) return true;
    else if (errCode.equals(Status.Code.DEADLINE_EXCEEDED)) return true;
    else if (errCode.equals(Status.Code.RESOURCE_EXHAUSTED)
            && !status.getDescription().contains("gRPC message exceeds maximum size 4194304")) return true;
    else return false;
  }
}
1 Like

What about the failover implementation in case of flowable subscriber to transaction trees?
Also, was the Scenario 2 discussed here?