I have a celery task that, if a certain condition is not yet reached, schedules itself to run again after a while.
When I test it I use:
@override_settings(CELERY_EAGER_PROPAGATES_EXCEPTIONS=True,
CELERY_ALWAYS_EAGER=True,
BROKER_BACKEND='memory')
So the task repeatedly schedules itself to run again immediately instead of the date I intended, and it generates an infinite loop and I get the error:
RuntimeError: maximum recursion depth exceeded in cmp
How can I test this task without triggering this error?
Thanks
I don't know if the code from the test is relevant, but here it is:
def create_new_deadline(gig):
if gig.next_deadline is not None:
cancel_overdue_gig.apply_async((gig.id,),
eta = gig.next_deadline)
# cancel gig whose deadline was not met
@task
def cancel_overdue_gig(gig_id):
gig = Gig.objects.get(id=gig_id)
if gig.next_deadline and gig.next_deadline <= timezone.now():
gig.status = 12
gig.next_deadline = None
gig.cancelation_reason = _(
'Deadline not met by ') + gig.venue.display_name
gig.save()
else:
create_new_deadline(gig)
Basically I check if there's a deadline and if the deadline has passed. If it hasn't, that means the deadline has been postponed, and I set another check for the next deadline. If it has, I change the status to cancelled and delete the deadline.
Aucun commentaire:
Enregistrer un commentaire