Race condition in Daml Triggers? Command failed: Disputed: Missing input state for key contract_id

I have a trigger and it looks for contracts of template RecordAsBuyer with its hasDelivered flag not set. Once found, it emitCommands:

records <- query @RecordAsBuyer
let myRecords = filter (\(_,c) -> c.owner == _party 
    && not c.hasDelivered) records
unless (null myRecords) do
    let (cid, c) = head myRecords
    debug $ (show _party) <> " get delivery for #INV:" 
        <> (show c.invoiceNum)
    emitCommands 
        [exerciseCmd cid RecordAsBuyer_GetDelivery] 
        [toAnyContractId cid]
    pure()

In the choice RecordAsBuyer_GetDelivery, the contractId of a Wallet is fetched (using fetchedByKey) and a consuming choice of that contract is then exercised. The hasDelivered flag in the contract RecordAsBuyer will also be set so it will not get exercised again.

I ran it and I am getting this error:

[unknown source]: "'alice' get delivery for #INV:0"
[unknown source]: "'alice' get delivery for #INV:1"
Command failed: Disputed: Missing input state for key contract_id: "00f476e797a51614579cffc86ab820ca1b3315c7f52be9df2201d52151526cf53f"
, code: 3 (context: {triggerDefinition=4bb8ea621e7d2576143e52636f7a9fe8899d03cee8135bd085e45455c798d0e6:Keplaax.Triggers.BuyerTrigger:trigger})
[unknown source]: "'alice' get delivery #INV:1"

The contract_id 00f476e7… is an archived Wallet.

I understand this probably comes from a race condition somewhere… how may I rewrite the code to eliminate it?

Here is the trace from the sandbox:

WARN: Exception during model conformance validation.
com.daml.ledger.participant.state.kvutils.Err$MissingInputState: Missing input state for key contract_id: "00f476e797a51614579cffc86ab820ca1b3315c7f52be9df2201d52151526cf53f"

	at com.daml.ledger.participant.state.kvutils.committer.CommitContext.$anonfun$read$1(CommitContext.scala:63)
	at scala.collection.immutable.HashMap$HashMap1.getOrElse0(HashMap.scala:355)
	at scala.collection.immutable.HashMap$HashTrieMap.getOrElse0(HashMap.scala:587)
	at scala.collection.immutable.HashMap$HashTrieMap.getOrElse0(HashMap.scala:587)
	at scala.collection.immutable.HashMap.getOrElse(HashMap.scala:72)
	at com.daml.ledger.participant.state.kvutils.committer.CommitContext.read(CommitContext.scala:63)
	at com.daml.ledger.participant.state.kvutils.committer.TransactionCommitter.lookupContract(TransactionCommitter.scala:661)
	at com.daml.ledger.participant.state.kvutils.committer.TransactionCommitter.$anonfun$validateModelConformance$3(TransactionCommitter.scala:256)
	at com.daml.lf.engine.Result.go$1(Result.scala:52)
	at com.daml.lf.engine.Result.consume(Result.scala:56)
	at com.daml.lf.engine.Result.consume$(Result.scala:42)
	at com.daml.lf.engine.ResultNeedContract.consume(Result.scala:73)
	at com.daml.ledger.participant.state.kvutils.committer.TransactionCommitter.$anonfun$validateModelConformance$2(TransactionCommitter.scala:258)
	at com.codahale.metrics.Timer.time(Timer.java:118)
	at com.daml.ledger.participant.state.kvutils.committer.TransactionCommitter.$anonfun$validateModelConformance$1(TransactionCommitter.scala:233)
	at com.daml.ledger.participant.state.kvutils.committer.Committer.$anonfun$runSteps$2(Committer.scala:156)
	at com.codahale.metrics.Timer.time(Timer.java:118)
	at com.daml.ledger.participant.state.kvutils.committer.Committer.$anonfun$runSteps$1(Committer.scala:156)
	at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
	at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
	at scala.collection.immutable.List.foldLeft(List.scala:91)
	at com.daml.ledger.participant.state.kvutils.committer.Committer.runSteps(Committer.scala:153)
	at com.daml.ledger.participant.state.kvutils.committer.Committer.runSteps$(Committer.scala:149)
	at com.daml.ledger.participant.state.kvutils.committer.TransactionCommitter.runSteps(TransactionCommitter.scala:52)
	at com.daml.ledger.participant.state.kvutils.committer.Committer.$anonfun$run$1(Committer.scala:87)
	at com.codahale.metrics.Timer.time(Timer.java:118)
	at com.daml.ledger.participant.state.kvutils.committer.Committer.run(Committer.scala:85)
	at com.daml.ledger.participant.state.kvutils.committer.Committer.run$(Committer.scala:79)
	at com.daml.ledger.participant.state.kvutils.committer.TransactionCommitter.run(TransactionCommitter.scala:52)
	at com.daml.ledger.participant.state.kvutils.KeyValueCommitting.processSubmission(KeyValueCommitting.scala:91)
	at com.daml.ledger.validator.SubmissionValidator$.processSubmission(SubmissionValidator.scala:381)
	at com.daml.ledger.validator.SubmissionValidator$.$anonfun$createForTimeMode$1(SubmissionValidator.scala:356)
	at com.daml.ledger.validator.SubmissionValidator.$anonfun$runValidation$14(SubmissionValidator.scala:214)
	at scala.util.Try$.apply(Try.scala:213)
	at com.daml.ledger.validator.SubmissionValidator.$anonfun$runValidation$13(SubmissionValidator.scala:209)
	at com.daml.metrics.Timed$.future(Timed.scala:58)
	at com.daml.ledger.validator.SubmissionValidator.$anonfun$runValidation$12(SubmissionValidator.scala:207)
	at scala.concurrent.Future.$anonfun$flatMap$1(Future.scala:307)
	at scala.concurrent.impl.Promise.$anonfun$transformWith$1(Promise.scala:41)
	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
1 Like

Hi @a14843,

There are roughly two main types of races in a trigger:

  1. A trigger racing against itself. This can happen if you don’t put a contract in the pending set so you’ll just keep spamming the same command before the first one has finished. Usually you can fix this by rewriting your trigger, however, there are some cases where a race is currently hard to avoid, e.g., if your trigger gets restarted. See Allow use of command deduplication in runtime components · Issue #7855 · digital-asset/daml · GitHub for one planned approach to fixing that.
  2. A trigger racing against another ledger client. This could be another trigger or something else. While you usually want to try to minimize contention and races of this type, it can be hard to avoid them completely and applications will usually just retry (as do triggers by virtue of being state-based).

You only talked about one trigger so I assume we’re in case 1. Assuming each RecordAsBuyer queries distinct Wallets (and nothing apart from the trigger operates on those wallets at the same time), I don’t see any race here. My guess is that you might have multiple RecordAsBuyer contracts that operate in the same wallet in which case you would see such a race. To fix that you could query getCommandsInFlight and see if you already have a command submitted that will operate on a given wallet key before you send the next one.

1 Like

Yes, any two uses of Wallet will race against each other since you are calling a consuming choice on it. Note that that racing behaviour only occurs when the two uses of Wallet occur in separate transactions.

This will usually fail:

emitCommands [exerciseByKey @MyTemplate ConsumingChaoice] []
emitCommands [exerciseByKey @MyTemplate ConsumingChaoice] []

However, putting them in the same transaction works just fine, as it gives the two calls an order:

emitCommands
  [ emitCommands [exerciseByKeyCmd @MyTemplate ConsumingChaoice
  , emitCommands [exerciseByKeyCmd @MyTemplate ConsumingChaoice ]
  []

You can use that trick for your example. Instead of only calling RecordAsBuyer_GetDelivery on the first record, call it on all of them in a single transaction:

records <- query @RecordAsBuyer
let myRecords = filter (\(_,c) -> c.owner == _party 
    && not c.hasDelivered) records
forA myRecords (\(cid, c) -> do
    debug $ (show _party) <> " get delivery for #INV:" 
        <> (show c.invoiceNum)
  )
emitCommands
  map (\(cid, _) -> exerciseCmd cid RecordAsBuyer_GetDelivery) myRecords
  map (\(cid, _) -> toAnyContractId cid) myRecords

Now there is no race between the commands in one such “batch”. However, two such batches will still race with each other on Wallet, so you will also have to take a look at commandsInFlight, or pass a state form trigger invocation to trigger invocation to sequence these batches.

1 Like

Would you please elaborate a bit more on how to use getCommandsInFlight? I get a map of (commandId, [command]) from this function… so I should check if the new command I am about to emit matches any command returned from getCommandsInFlight, before I call emitCommands?

1 Like

I think first of all, it’s worth pointing out that if you can avoid calling getCommandsInFlight, you should. In a lot of cases, the pending set (controlled via the second argument to emitCommands) is sufficient.

That said, sometimes it is not so you need to access it directly. What exactly you do with the commands heavily depends on your usecase. In this case, you could look for exercises of GetDelivery and check the contract key they operate on. Here’s a self-contained example of how this could look like:

module Main where

import DA.Action
import DA.Foldable (forA_)
import DA.List (dedup)
import qualified DA.Next.Map as Map
import Daml.Trigger
import Daml.Trigger.LowLevel (fromExercise)

template Wallet
  with
    uuid: Text
    p : Party
    counter : Int
  where
    signatory p
    key (uuid, p) : (Text, Party)
    maintainer key._2
    choice Inc : ContractId Wallet
      controller p
      do create this with counter = counter + 1

template RecordAsBuyer
  with
    p : Party
    uuid : Text
  where
    signatory p
    nonconsuming choice GetDelivery : ContractId Wallet
      controller p
      do exerciseByKey @Wallet (uuid, p) Inc

t : Trigger ()
t = Trigger with
  initialize = pure ()
  updateState = \_ -> pure ()
  registeredTemplates = AllInDar
  heartbeat = None
  rule = \p -> do
    cmds <- getCommandsInFlight
    records <- query @RecordAsBuyer
    let walletUuidsInFlight = dedup
          [ record.uuid
          | (_, cmds) <- Map.toList cmds
          , cmd <- cmds
          , Some (cid, GetDelivery) <- [fromExercise cmd]
          , Some (_, record) <- [find (\(cid', _) -> cid' == cid) records]
          ]
    forA_ records $ \(cid, c) -> 
      unless (c.uuid `elem` walletUuidsInFlight) $ do
        emitCommands [exerciseCmd cid GetDelivery] []
        pure ()

That said, this is a pretty artificial example and there are almost certainly better ways of solving the underlying problem (in this silly toy example, it’s hard to say what the underlying problem is). So really depends on what exactly you are trying to do.

2 Likes

I started changing all my code to use exerciseByKey and I started getting less errors. There are still some but that looks less harmful:

Command failed: Inconsistent: InconsistentKeys: at least one contract key has changed since the submission, code: 3 (context: {triggerDefinition=37c1dd59f5102b4a1809e0b4d34c5f3397cbe56b7959a139b24b824c6638b655:Keplaax.Triggers.BuyerTrigger:trigger})

I come to point to believe that these conflicts are unavoidable, and the DAML library will make sure that the same command will not get executed multiple times.

1 Like

Unless you sequence everything very carefully, you are likely to run into some conflicts in a distributes setting. The “Inconsistent” errors you are seeing here means that one transaction changed a contract key after it was resolved by another transaction. A good way of dealing with sporadic errors of this kind is to simply retry.

2 Likes

@bernhard I am facing similar issue. What is the solution to this?