dimanche 6 décembre 2015

Testing asyncio-based services

I'm implementing small service using asyncio using a loop that is structured as follows:

pending = {...}
while True:
  done, pending = yield from asyncio.wait( 
    pending, 
    return_when=asyncio.FIRST_COMPLETED, 
  )
  for future in done:
    if future is x:
      # ...
    if future is y:
      # ...

This loop is currently controlling a sub-process, but having written a bunch of ZMQ-based services, this style feels very natural, so I'll likely be writing more of these in the near future.

I have something that runs just the way I like, but I'm kind of at a loss as to how I would write automated tests for this.

I would like to have my tests start this loop until it blocks on asyncio.wait() and inject one particular event to test the loop's handling of that particular event. That way, I can test each possible event handling and know I cover all cases as expected.

However, I can't find anything in asyncio that provides for this. If I simply yield from this coroutine, the test does not unblock until the coroutine completes.

Any ideas on how to test this particular kind of loop?

Aucun commentaire:

Enregistrer un commentaire