I have a question. I have used the def job() method from the scheduler API. (https://pypi.org/project/schedule/)
Inside it, I make a connection with the MongoDB database to get a list of JSON-messages. I then go through the list and check if the message is not old enough to resend it. If it's older than 1 day the message is getting deleted, and if it's still fresh enough it's resent to the receiver.
My question is how can I test this? I don't want to test if def job() does his job because that has already been tested inside the API code. (https://github.com/dbader/schedule/blob/master/test_schedule.py)
How can I test that my code works?
Greetings,
Mariioo
My Code:
import pymongo
import schedule
import time
from datetime import datetime, timedelta, timezone
from submarine.ConnectionModule.src.ClientNode import send_message
from submarine.PersistenceModule.src.DatabaseConnection import get_client,
get_db, get_collection
EXPIRED_MESSAGE_DATE = datetime.now(timezone.utc) - timedelta(days=1)
DATETIME_NOW = datetime.now(timezone.utc)
def job():
client = get_client("localhost", 27017)
db = get_db(client, "submarine")
col = get_collection(db, "messages")
my_results = list(db.get_collection("messages").find({}).sort("_id",
pymongo.DESCENDING))
for result in my_results:
time_from_message = result['_id'].generation_time
del result['_id']
if time_from_message > EXPIRED_MESSAGE_DATE:
col.delete_one(result)
elif DATETIME_NOW >= time_from_message:
send_message(result)
schedule.every(5).minutes.do(job)
while True:
schedule.run_pending()
time.sleep(1)
Aucun commentaire:
Enregistrer un commentaire