QueueFull Error - daml-dit-if

Hello all,
I am using the daml-dit-if package to create a python integration with the purpose of using it on daml hub.
This integration gathers events from a third party api on a regular interval and after processing that information it outputs it into a queue to create given contracts on the ledger. After leaving the integration running for some time, I’m noticing that I get this QueueFull error, but it comes in goes as the application deals with the queue, losing some events along the way.
My questions is, what can be done so that this error does not occur? (Tracelog for reference)

today at 16:45:34Traceback (most recent call last):
today at 16:45:34  File "/home/daml/foobar-integration/.ddit-venv/lib/python3.8/site-packages/aioschedule/__init__.py", line 462, in run
today at 16:45:34    ret = await self.job_func()
today at 16:45:34  File "/home/daml/foobar-integration/src/foobar_int/foobar_integration.py", line 22, in wrapper
today at 16:45:34    result = await func(*args, **kwargs)
today at 16:45:34  File "/home/daml/foobar-integration/src/foobar_int/foobar_integration.py", line 256, in get_events_for_update
today at 16:45:34    await env.queue.put(message, self.EventUpdateQueue)
today at 16:45:34  File "/home/daml/foobar-integration/.ddit-venv/lib/python3.8/site-packages/daml_dit_if/main/integration_queue_context.py", line 31, in put
today at 16:45:34    await handler(message)
today at 16:45:34  File "/home/daml/foobar-integration/.ddit-venv/lib/python3.8/site-packages/daml_dit_if/main/integration_queue_context.py", line 65, in enqueue_wrapped
today at 16:45:34    await self.queue.put(doit, status)
today at 16:45:34  File "/home/daml/foobar-integration/.ddit-venv/lib/python3.8/site-packages/daml_dit_if/main/integration_deferral_queue.py", line 32, in put
today at 16:45:34    self.queue.put_nowait(IntegrationQueueAction(
today at 16:45:34  File "/usr/lib/python3.8/asyncio/queues.py", line 148, in put_nowait
today at 16:45:34    raise QueueFull
today at 16:45:34asyncio.queues.QueueFull

@David_Martins

Every integration has a single queue, 512 elements long, into which incoming integration events are placed. These include ledger events, webhook calls, and calls to the put method you’re using in your code. The integration then processes entries off of this queue and takes whatever action is necessary for each entry in order to achieve the desired integration behavior.

The error you’re seeing generally indicates that this queue is full and can no longer accept new events. it should be fairly difficult to achieve this, but under higher volumes, it is possible to overrun this queue and elicit this error.

Can you give a sense of the volume of events that your integration is processing? If the load is high, it’s possible you’re hitting the scenario I describe above. In this case, there are batching and throttling techniques that can be useful to manage the load.

1 Like

Hello @Michael_Schaeffer, thanks for the short notice.
So around every minute we are receiving information on an average of four objects that need to be processed and update our contracts on the ledger side. Regarding incoming ledger events or webhook calls, this particular integrations uses neither, apart from the standard ledger_ready handler and the pre-existing webhook calls.
Taking in to consideration that we have this single queue that every call will flow through, how would you suggest to implement either of the techniques you’ve described?

I’ve actually noticed as well, in the same integration the occurance of timeout errors, unsure if it is related or not though.

today at 10:24:442022-07-14T09:24:44+0000 [ERROR] (daml-dit-if) Error while processing: TimeoutError()
today at 10:24:44Traceback (most recent call last):
today at 10:24:44  File "/home/daml/foobar-integration/.ddit-venv/lib/python3.8/site-packages/daml_dit_if/main/common.py", line 88, in wrapped
today at 10:24:44    await wait_for(
today at 10:24:44  File "/usr/lib/python3.8/asyncio/tasks.py", line 501, in wait_for
today at 10:24:44    raise exceptions.TimeoutError()
today at 10:24:44asyncio.exceptions.TimeoutError

Thanks… the two issues may well be related. The TimeoutError is thrown by this line of code:

What this indicates is that the integration code is waiting for a response from the ledger that never arrives. This can happen in the event of ledger commands that are processed, but do not result in any contracts being archived or created.

What do you see when you turn the log level up? You should be able to see a list of the commands your integration is issuing against the ledger, which we can use to diagnose.

As far as batching strategies go, we have an example of one here:

This is the timer integration in the core integration pack. (You can see this through the Hub console.)

Rather than sending ledger commands in the timer event handler as you might expect, this integration prepares batches of ledger commands in the timer event handler and then sends those batches via a internal queue for submission to the ledger as a group. Internally, this queue is actually the same as the queue we’re discussing above, but this technique can lower the number of ledger transactions and reduce queue pressure.

Note that the batches either succeed or fail as a block. If you’re submitting a batch of ten commands and one fails, then none of the ten will be processed. This can be managed, but can complicate your error handling.

1 Like

Thanks @Michael_Schaeffer, I will attempt to incorporate something like this for the integration.

Hi @Michael_Schaeffer
Sometimes, I receive the same error - TimeoutError()
Is there a way to handle situations when the ledger can’t process integration commands in allocated timeout(5 sec by default) due to high load?

Currently, as I see in the code, as_handler_invocation method just catches all possible exceptions from wait_for without raising any exception(it just logs an error).