osdir.com

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

How to await multiple replies in arbitrary order (one coroutine per reply)?


I am using asyncio and am fairly new to it. I have a stream to which I write 
commands and from which I read replies. (In this case the stream is custom 
wrapper around DDS written in C++ and pybind11). Multiple commands can run at 
the same time and I cannot predict which will finish first. I need aa 
different coroutine (or asyncio.Task or other awaitable object) for each 
command that is running.

Is there a simple way to handle this in asyncio? So far the best I have come 
up is the following (greatly simplified), which works but has some 
misfeatures:

import asyncio
from iolib import read_reply, write_command, TIMED_OUT

class RemoteCommand:
def __init__(self):
self._tasks = dict()

def start(self, cmd, timeout):
"""Start a command"""
cmd_id = write_command(cmd)
task = asyncio.ensure_future(self._wait_for_command(cmd_id=cmd_id, 
timeout=timeout))
self._tasks[cmd_id] = task
if len(self._tasks) == 1:
asyncio.ensure_future(self._handle_replies())
return task

async def _wait_for_command(self, cmd_id, timeout):
"""Wait for a command to finish"""
await asyncio.sleep(timeout)
if cmd_id in self._tasks:
del self._tasks[cmd_id]
return TIMED_OUT # our standard end code for timeouts

async def _handle_replies(self):
while True:
cmd_id, end_code = read_reply()
if cmd_id in self._tasks:
task = self._tasks.pop(cmd_id)
task.set_result(end_code)
if not self._tasks:
return
await asyncio.sleep(0.1)

Misfeatures include:
- asyncio.Task is not documented to have a "set_result" method. The 
documentation says that Task is "A Future-like object that runs a Python 
coroutine" and Future does have such a method.
- When "_handle_replies" calls "task.set_result(data)" this does not seem to 
cancel the "await asyncio.sleep(timeout)" in the task, resulting in scary 
messages to stdout. I have tried saving *that* as another task and canceling 
it, but it seems clumsy and I still see scary messages.

I think what I'm looking for is a task-like thing I can create that I can end 
when *I* say it's time to end, and if I'm not quick enough then it will time 
out gracefully. But maybe there's a simpler way to do this. It doesn't seem 
like it should be difficult, but I'm stumped. Any advice would be 
appreciated.

-- Russell