jeudi 22 juin 2017

Testing Django Channels background task end-to-end

I am trying to perform end-to-end tests of my Django app which uses Channels and Websockets. In my case a request triggers a complex background calculation and I would like to observe the effects of it. As far as I understand, in the Django test suite there is no way of running workers, which will consume the background tasks of Django Channels.

Is there any way of triggering however the background tasks with the right parameters? Do I have to modify my code to make it testable?

The following is simplified skeleton of my setup:

views.py

def calculation(request):
    # doing some simple calculation depending on request
    simple_result, parameter = simple_calculation(request)
    # trigger complex asynchronous calculation depending on parameter
    Channel('background-calculations').send({
        "parameter": parameter,
    })
    # return results of simple calculation
    return render(request, 'simple_reponse.html',{'simple_result': simple_result})

routing.py

channel_routing = [
    route("background-calculations", run_background_calculations),
    route("websocket.connect", ws_connect),
    route("websocket.receive", ws_message),
]

consumers.py

def run_background_calculations(message):
    # perform complex calculation depending on parameter from simple calculation
    result = complex_calculation(message)
    # update frontend via websocket
    Group("frontend-updates").send({
        "text": json.dumps({
            "result": result,
        })
    })

@channel_session
def ws_connect(message):
    message.reply_channel.send({"accept": True})
    Group("frontend-updates").add(message.reply_channel)


@channel_session_user
def ws_message(message):
    message.reply_channel.send({
        "text": message.content['text'],
    })

Aucun commentaire:

Enregistrer un commentaire