osdir.com

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

asyncio await different coroutines on the same socket?


On Wed, Oct 3, 2018 at 7:47 AM Russell Owen <rowen at uw.edu> wrote:
> Using asyncio I am looking for a simple way to await multiple events where
> notification comes over the same socket (or other serial stream) in arbitrary
> order. For example, suppose I am communicating with a remote device that can
> run different commands simultaneously and I don't know which command will
> finish first. I want to do this:
>
> coro1 = start(command1)
> coro2 = start(command2)
> asyncio.gather(coro1, coro2)
>
> where either command may finish first. I?m hoping for a simple and
> idiomatic way to read the socket and tell each coroutine it is done. So far
> everything I have come up with is ugly, using multiple layers of "async
> def?, keeping a record of Tasks that are waiting and calling "set_result"
> on those Tasks when finished. Also Task isn?t even documented to have the
> set_result method (though "future" is)

Because Tasks are used to wrap coroutines, and the result of the Task
should be determined by the coroutine, not externally.

Instead of tracking tasks (that's what the event loop is for) I would
suggest tracking futures instead. Have start(command1) return a future
(or create a future that it will await on itself) that is not a task.
Whenever a response from the socket is parsed, that code would then
look up the corresponding future and call set_result on it. It might
look something like this:

class Client:
    async def open(self, host, port):
        self.reader, self.writer = await asyncio.open_connection(host, port)
        asyncio.create_task(self.read_loop())

    async def read_loop(self):
        while not self.reader.at_eof():
            response = self.reader.read()
            id = get_response_id(response)
            self._futures.pop(id).set_result(response)

    def start(self, command):
        future = asyncio.Future()
        self._futures[get_command_id(command)] = future
        self.writer.write(command)
        return future

In this case start() is not a coroutine but its result is a future and
can be awaited.