lundi 8 juin 2020

Test methods for a scheduler job function inside Python

I have a question. I have used the def job() method from the scheduler API. (https://pypi.org/project/schedule/)

Inside it, I make a connection with the MongoDB database to get a list of JSON-messages. I then go through the list and check if the message is not old enough to resend it. If it's older than 1 day the message is getting deleted, and if it's still fresh enough it's resent to the receiver.

My question is how can I test this? I don't want to test if def job() does his job because that has already been tested inside the API code. (https://github.com/dbader/schedule/blob/master/test_schedule.py)

How can I test that my code works?

Greetings,

Mariioo

My Code:

import pymongo
import schedule
import time
from datetime import datetime,  timedelta, timezone
from submarine.ConnectionModule.src.ClientNode import send_message
from submarine.PersistenceModule.src.DatabaseConnection import get_client, 
get_db, get_collection


EXPIRED_MESSAGE_DATE = datetime.now(timezone.utc) - timedelta(days=1)
DATETIME_NOW = datetime.now(timezone.utc)


def job():
    client = get_client("localhost", 27017)
    db = get_db(client, "submarine")
    col = get_collection(db, "messages")

     my_results = list(db.get_collection("messages").find({}).sort("_id", 
     pymongo.DESCENDING))
for result in my_results:
    time_from_message = result['_id'].generation_time
    del result['_id']

    if time_from_message > EXPIRED_MESSAGE_DATE:
        col.delete_one(result)
    elif DATETIME_NOW >= time_from_message:
        send_message(result)


   schedule.every(5).minutes.do(job)

   while True:
     schedule.run_pending()
     time.sleep(1)

Aucun commentaire:

Enregistrer un commentaire