When a Django test case runs, it creates an isolated test database so that database writes get rolled back when each test completes. I am trying to create an integration test with Celery, but I can't figure out how to connect Celery to this ephemeral test database. In the naive setup, Objects saved in Django are invisible to Celery and objects saved in Celery persist indefinitely.
Here is an example test case:
import json
from rest_framework.test import APITestCase
from myapp.models import MyModel
from myapp.util import get_result_from_response
class MyTestCase(APITestCase):
@classmethod
def setUpTestData(cls):
# This object is not visible to Celery
MyModel(id='test_object').save()
def test_celery_integration(self):
# This view spawns a Celery task
# Task should see MyModel.objects.get(id='test_object'), but can't
http_response = self.client.post('/', 'test_data', format='json')
result = get_result_from_response(http_response)
result.get() # Wait for task to finish before ending test case
# Objects saved by Celery task should be deleted, but persist
I have two questions:
-
How do make it so that Celery can see the objects that the Django test case?
-
How do I ensure that all objects saved by Celery are automatically rolled back once the test completes?
I am willing to manually clean up the objects if doing this automatically is not possible, but a deletion of objects in tearDown
even in APISimpleTestCase
seems to be rolled back.
Aucun commentaire:
Enregistrer un commentaire