jeudi 29 décembre 2016

how to make my rabbitmq code testable?

I have two piece of codes which both using pika , one for doing a task as a worker (I call it worker code) and one for creating a message in queue containing a task( I call it run_async code) and I want to make my codes testable .

this is my run_async code :

def run_async(function, args=None, kwargs=None):
    import json
    import pika
    from django.conf import settings
    from poseidon_async import default_settings
    from poseidon_async._utils import _getattr_multiple  # NOQA

    body = json.dumps(
        {
            "function": function,
            "parameters": {
                "args": args or tuple(),
                "kwargs": kwargs or {}
            }
        }
    )
    settings_modules = (settings, default_settings)
    queue_name = _getattr_multiple("POSEIDON_ASYNC_QUEUE_NAME", settings_modules)
    rmq_parameters = _getattr_multiple("POSEIDON_RABBITMQ_CONNECTION_PARAMETERS", settings_modules)

    with pika.BlockingConnection(pika.ConnectionParameters(**rmq_parameters)) as connection:
        channel = connection.channel()
        channel.queue_declare(queue=queue_name, durable=True)
        channel.basic_publish(exchange='',
                              routing_key=queue_name,
                              body=body,
                              properties=pika.BasicProperties(
                                  delivery_mode=2,
                              ))
    return None

and this is my worker code :

def worker(queue_to_read, logger_name=None, result_log_level=None, connection_parameters=None):
    import pika
    import json
    from django.utils.module_loading import import_string
    from logging import getLogger
    import logging
    import traceback
    if connection_parameters is None:
        raise ValueError("connection_parameters must contains a dictionary of rabbitmq connection parameters")
    connection = pika.BlockingConnection(pika.ConnectionParameters(**connection_parameters))
    channel = connection.channel()
    channel.queue_declare(queue=queue_to_read, durable=True)

    def log_error(exception, body):
        if logger_name is not None:
            log_message = "{}: {} happened during running task\nBody is :\n{}\ntracback :\n{}".format(
                type(exception),
                str(exception),
                body,
                traceback.format_exc()
            )
            getLogger(logger_name).error(log_message)

    def log_result(function, result):
        if logger_name is not None:
            log_message = "{} -> {} ".format(
                function,
                result
            )
            _result_log_level = result_log_level or logging.DEBUG
            getLogger(logger_name).log(_result_log_level, log_message)

        pass

    def callback(ch, method, properties, body):
        try:
            body = json.loads(body.decode('utf8'))
            function_dot_path, parameters = body['function'], body['parameters']
            args, kwargs = parameters['args'], parameters['kwargs']
            function = import_string(function_dot_path)
            if callable(function):
                result = function(*args, **kwargs)
                log_result(function_dot_path, result)
            else:
                raise ValueError("{} is not callable".format(function_dot_path))
        except Exception as exp:
            log_error(exp, body)
        ch.basic_ack(delivery_tag=method.delivery_tag)

    channel.basic_qos(prefetch_count=1)
    channel.basic_consume(callback, queue=queue_to_read)

    channel.start_consuming()

I know I should mock my pika connection but how ?

Aucun commentaire:

Enregistrer un commentaire