I'm implementing small service using asyncio using a loop that is structured as follows:
pending = {...}
while True:
done, pending = yield from asyncio.wait(
pending,
return_when=asyncio.FIRST_COMPLETED,
)
for future in done:
if future is x:
# ...
if future is y:
# ...
This loop is currently controlling a sub-process, but having written a bunch of ZMQ-based services, this style feels very natural, so I'll likely be writing more of these in the near future.
I have something that runs just the way I like, but I'm kind of at a loss as to how I would write automated tests for this.
I would like to have my tests start this loop until it blocks on asyncio.wait()
and inject one particular event to test the loop's handling of that particular event. That way, I can test each possible event handling and know I cover all cases as expected.
However, I can't find anything in asyncio
that provides for this. If I simply yield from
this coroutine, the test does not unblock until the coroutine completes.
Any ideas on how to test this particular kind of loop?
Aucun commentaire:
Enregistrer un commentaire