I have two piece of codes which both using pika , one for doing a task as a worker (I call it worker code) and one for creating a message in queue containing a task( I call it run_async code) and I want to make my codes testable .
this is my run_async code :
def run_async(function, args=None, kwargs=None):
import json
import pika
from django.conf import settings
from poseidon_async import default_settings
from poseidon_async._utils import _getattr_multiple # NOQA
body = json.dumps(
{
"function": function,
"parameters": {
"args": args or tuple(),
"kwargs": kwargs or {}
}
}
)
settings_modules = (settings, default_settings)
queue_name = _getattr_multiple("POSEIDON_ASYNC_QUEUE_NAME", settings_modules)
rmq_parameters = _getattr_multiple("POSEIDON_RABBITMQ_CONNECTION_PARAMETERS", settings_modules)
with pika.BlockingConnection(pika.ConnectionParameters(**rmq_parameters)) as connection:
channel = connection.channel()
channel.queue_declare(queue=queue_name, durable=True)
channel.basic_publish(exchange='',
routing_key=queue_name,
body=body,
properties=pika.BasicProperties(
delivery_mode=2,
))
return None
and this is my worker code :
def worker(queue_to_read, logger_name=None, result_log_level=None, connection_parameters=None):
import pika
import json
from django.utils.module_loading import import_string
from logging import getLogger
import logging
import traceback
if connection_parameters is None:
raise ValueError("connection_parameters must contains a dictionary of rabbitmq connection parameters")
connection = pika.BlockingConnection(pika.ConnectionParameters(**connection_parameters))
channel = connection.channel()
channel.queue_declare(queue=queue_to_read, durable=True)
def log_error(exception, body):
if logger_name is not None:
log_message = "{}: {} happened during running task\nBody is :\n{}\ntracback :\n{}".format(
type(exception),
str(exception),
body,
traceback.format_exc()
)
getLogger(logger_name).error(log_message)
def log_result(function, result):
if logger_name is not None:
log_message = "{} -> {} ".format(
function,
result
)
_result_log_level = result_log_level or logging.DEBUG
getLogger(logger_name).log(_result_log_level, log_message)
pass
def callback(ch, method, properties, body):
try:
body = json.loads(body.decode('utf8'))
function_dot_path, parameters = body['function'], body['parameters']
args, kwargs = parameters['args'], parameters['kwargs']
function = import_string(function_dot_path)
if callable(function):
result = function(*args, **kwargs)
log_result(function_dot_path, result)
else:
raise ValueError("{} is not callable".format(function_dot_path))
except Exception as exp:
log_error(exp, body)
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback, queue=queue_to_read)
channel.start_consuming()
I know I should mock my pika connection but how ?
Aucun commentaire:
Enregistrer un commentaire