Hey everyone! I’m just looking for feedback on an approach to handling Participant Node failure, and would love to know your thoughts or any flaws with the approach.
Data Streaming and Failover as a Ledger API Client
As a Ledger API Client, henceforth known as ‘User’, you may choose to use the Command Submission Service and Command Completion Service when making requests to a Participant Node, which serves the Ledger API.
There may be cases where a Command Submission has been sent, but the Participant Node serving the Ledger API fails before the User can guarantee that the request has been accepted. This can happen in two possible ways:
-
Scenario 1: The User makes a request and does not receive a response from the Command Submission Service
-
Scenario 2: The User makes a request which is accepted by the Command Submission Service, but does not receive a response from the Command Completion Service.
Certain ledger topologies will support hosting a party on multiple Participant Nodes. This means that parties have higher fault tolerance as they can rely on a secondary Participant Node if a primary fails.
This article will focus on how to handle Scenario 1 in these ledger topologies, along with complementary code samples in Java.
Scenario 1:
Let us assume that a user has already connected to the Participant Node, and has a DamlLedgerClient instance, which is obtained after connecting to the Participant Node’s IP and Port, using the appropriate access token. They then submit a command using the Command Submission Service, while blocking for the response, as shown in this representative code sample:
CommandSubmissionClient commandSubmissionClient = damlLedgerClient.getCommandSubmissionClient()
/* The submission parameters are defined in the
Ledger API Documentation avaialble on the DAML website*/
commandSubmissionClient.submit(
workflowId,
appId,
commandId,
submitter,
Optional.empty(),
Optional.empty(),
Optional.of(deduplicationTime),
commandList
).blockingGet();
return "OK";
The alternative to blocking would be to use an Observer
, which would be slightly different, but the general idea remains. Upon returning OK
, the user can guarantee that the Participant Node has accepted the request. However, in the scenario that the Participant Node crashes or fails before the request can be accepted, the following error will be thrown (if using an Observer
implementation, the onError
function would be invoked instead of an exception being thrown):
io.grpc.StatusRuntimeException: UNAVAILABLE: io exception
As a fundamental rule for any distributed system, failures will occur, and as such, we must be ready to catch this error and deal with it appropriately. Let’s demonstrate how we can catch this error and appropriately failover to our next appropriate Participant Node. The following code sample will showcase a minimal example of handling failure, with assumptions as outlined in the comments. Use this as a guideline when implementing your own system with a similar topology.
import com.daml.ledger.javaapi.data.Command;
import com.daml.ledger.rxjava.CommandSubmissionClient;
import com.daml.ledger.rxjava.DamlLedgerClient;
import io.grpc.Status;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
public class FailOverRetryExample {
public static void main(String[] args) {
DamlLedgerClient damlLedgerClient;
damlLedgerClient = DamlLedgerClient.newBuilder("localhost", 6865).build();
/* We connect to one of our designated Participant Node (Client_1). This throws an Exception on failure
but for our minimal example, we shall assume that the initial connection is successful*/
damlLedgerClient.connect();
/*We fill in arbitrary request parameters, which would differ depending on your use-case/implementation.
We leave the commandList empty for simplicity, however, an empty command list will result in an INVALID_ARGUMENT response if the request does not fail for other reasons
on command submission.*/
String workflowId = "workflow-1";
String commandId = "commandId-1";
List<Command> commandList = new ArrayList<>();
String submitter = "Alice";
/* Let us assume that Client_1 goes OFFLINE or FAILS now*/
/* We attempt to submit our request, unknowning that Client_1 has crashed */
Status.Code responseCode =
submitCommand(damlLedgerClient, workflowId, commandId, commandList, submitter);
/*If the responseCode is 'UNAVAILABLE', it means the Participant Node is
having issues and we should first attempt a backoff and retry mechanism*/
if (responseCode.equals(Status.UNAVAILABLE.getCode())) {
/* Retry five times using a simple backoff strategy*/
for (int i = 0; i < 5; i++) {
try {
Thread.sleep(getWaitTime(i));
} catch (InterruptedException e) {
log("Unexpected Sleep Interruption");
}
/* We retry using the SAME paramaters as before */
responseCode =
submitCommand(damlLedgerClient, workflowId, commandId, commandList, submitter);
if (!responseCode.equals(Status.UNAVAILABLE.getCode())) {
/* The Node is online and our retry mechanism has worked,
we can now handle the non-UNAVAILABLE status*/
break;
}
}
}
/*If our last responseCode was still UNAVAILABLE, our back-off and Retry mechanism has failed, we shall instead fail-over to our next designated Participant Node*/
if (responseCode.equals(Status.UNAVAILABLE.getCode())) {
try {
/* We attempt to close the old connection. Failure to close is not critical */
damlLedgerClient.close();
} catch (Exception e) {
log("Unable to close old client connection, ignoring");
}
/* We create a new connection with our backup Participant Node (Client_2) */
damlLedgerClient = DamlLedgerClient.newBuilder("localhost", 7865).build();
damlLedgerClient.connect();
/* We submit the same request */
responseCode = submitCommand(damlLedgerClient, workflowId, commandId, commandList, submitter);
/*We could continue to error check this responseCode and handle, but we assume for the simplicity that Client_2 not require retry/fail-over handling*/
}
}
public static Status.Code submitCommand(
DamlLedgerClient damlLedgerClient,
String workflowId,
String commandId,
List<Command> commandList,
String submitter) {
try {
CommandSubmissionClient commandSubmissionClient =
damlLedgerClient.getCommandSubmissionClient();
commandSubmissionClient
.submit(
workflowId,
"test-app",
commandId,
submitter,
Optional.empty(),
Optional.empty(),
Optional.empty(),
commandList)
.blockingGet();
} catch (Throwable throwable) {
Status grpcStatus = Status.fromThrowable(throwable);
return grpcStatus.getCode();
}
return Status.Code.OK;
}
public static long getWaitTime(Integer attemptNumber) {
/* We use a simple back-off strategy for the wait time */
double waitTimeDub = 5000 * (attemptNumber + 1);
return new Double(waitTimeDub).longValue();
}
private static void log(String log) {
System.out.println(log);
}
}
By following along the code sample, we see a rough outline of how one may implement a retry/fail-over system. We assume that connecting to the Participant Nodes, who serve the Ledger API, does not require an authentication token, however, a real system would require one.
We can also safely resubmit the same command to the backup node (Client_2), without fear of duplicate transactions in the scenario that Client_1 accepted the request but failed to give a response. This is because internally, a ledger topology supporting the same party on multiple nodes would internally use a deduplication mechanism which will silently drop any duplicate requests, where a duplicate request is defined as having the same CommandId
, which we specified as an argument in the request.
Open Questions:
- Are there any other gRPC status we should consider for failing over? e.g.
UNKNOWN_STATUS