Back-End

22 mar, 2017

Brincando com RabbitMQ – Parte 02: agora com Python

Publicidade

Você se lembra do último artigo sobre RabbitMQ? Nele, criamos uma pequena biblioteca wraper para usar RabbitMQ com node e PHP. Eu também trabalho com Python também quero usar o mesmo wraper RabbitMQ aqui. Com Python, existem várias bibliotecas para usar o Rabbit. Vou usar pika.

A ideia é a mesma que a do outro artigo. Eu quero usar queues, trocas e RPCs. Então vamos começar com queues:

Podemos criar um receptor de queue chamado ‘queue.backend’

from rabbit import builder
 
server = {
    'host': 'localhost',
    'port': 5672,
    'user': 'guest',
    'pass': 'guest',
}
 
def onData(data):
    print data['aaa']
 
builder.queue('queue.backend', server).receive(onData)

e emitir mensagens para a queue

from rabbit import builder
 
server = {
    'host': 'localhost',
    'port': 5672,
    'user': 'guest',
    'pass': 'guest',
}
 
queue = builder.queue('queue.backend', server)
 
queue.emit({"aaa": 1})
queue.emit({"aaa": 2})
queue.emit({"aaa": 3})

A biblioteca (como o PHP e outros) usa uma classe construtora para criar nossas instâncias

from queue import Queue
from rpc import RPC
from exchange import Exchange
 
defaults = {
    'queue': {
        'queue': {
            'passive': False,
            'durable': True,
            'exclusive': False,
            'autoDelete': False,
            'nowait': False
        },
        'consumer': {
            'noLocal': False,
            'noAck': False,
            'exclusive': False,
            'nowait': False
        }
    },
    'exchange': {
        'exchange': {
            'passive': False,
            'durable': True,
            'autoDelete': True,
            'internal': False,
            'nowait': False
        },
        'queue': {
            'passive': False,
            'durable': True,
            'exclusive': False,
            'autoDelete': True,
            'nowait': False
        },
        'consumer': {
            'noLocal': False,
            'noAck': False,
            'exclusive': False,
            'nowait': False
        }
    },
    'rpc': {
        'queue': {
            'passive': False,
            'durable': True,
            'exclusive': False,
            'autoDelete': True,
            'nowait': False
        },
        'consumer': {
            'noLocal': False,
            'noAck': False,
            'exclusive': False,
            'nowait': False
        }
    }
}
 
def queue(name, server):
    conf = defaults['queue']
    conf['server'] = server
 
    return Queue(name, conf)
 
def rpc(name, server):
    conf = defaults['rpc']
    conf['server'] = server
 
    return RPC(name, conf)
 
def exchange(name, server):
    conf = defaults['exchange']
    conf['server'] = server
 
    return Exchange(name, conf)

E nossa classe Queue

import pika
import json
import time
 
class Queue:
    def __init__(self, name, conf):
        self.name = name
        self.conf = conf
 
    def emit(self, data=None):
        credentials = pika.PlainCredentials(self.conf['server']['user'], self.conf['server']['pass'])
        connection = pika.BlockingConnection(pika.ConnectionParameters(host=self.conf['server']['host'], port=self.conf['server']['port'], credentials=credentials))
        channel = connection.channel()
 
        queueConf = self.conf['queue']
        channel.queue_declare(queue=self.name, passive=queueConf['passive'], durable=queueConf['durable'], exclusive=queueConf['exclusive'], auto_delete=queueConf['autoDelete'])
 
        channel.basic_publish(exchange='', routing_key=self.name, body=json.dumps(data), properties=pika.BasicProperties(delivery_mode=2))
        connection.close()
 
    def receive(self, callback):
        credentials = pika.PlainCredentials(self.conf['server']['user'], self.conf['server']['pass'])
        connection = pika.BlockingConnection(pika.ConnectionParameters(host=self.conf['server']['host'], port=self.conf['server']['port'], credentials=credentials))
        channel = connection.channel()
 
        queueConf = self.conf['queue']
        channel.queue_declare(queue=self.name, passive=queueConf['passive'], durable=queueConf['durable'], exclusive=queueConf['exclusive'], auto_delete=queueConf['autoDelete'])
 
        def _callback(ch, method, properties, body):
            callback(json.loads(body))
            ch.basic_ack(delivery_tag=method.delivery_tag)
            print "%s %s::%s" % (time.strftime("[%d/%m/%Y-%H:%M:%S]", time.localtime(time.time())), self.name, body)
 
        print "%s Queue '%s' initialized" % (time.strftime("[%d/%m/%Y-%H:%M:%S]", time.localtime(time.time())), self.name)
        consumerConf = self.conf['consumer']
        channel.basic_qos(prefetch_count=1)
        channel.basic_consume(_callback, self.name, no_ack=consumerConf['noAck'], exclusive=consumerConf['exclusive'])
 
        channel.start_consuming()

Também queremos usar as trocas para emitir mensagens sem esperar por respostas, assim como uma transmissão de eventos. Podemos emitir mensagens:

from rabbit import builder
 
server = {
    'host': 'localhost',
    'port': 5672,
    'user': 'guest',
    'pass': 'guest',
}
 
exchange = builder.exchange('process.log', server)
 
exchange.emit("xxx.log", "aaaa")
exchange.emit("xxx.log", ["11", "aaaa"])
exchange.emit("yyy.log", "aaaa")

E ouvir mensagens

from rabbit import builder
 
server = {
    'host': 'localhost',
    'port': 5672,
    'user': 'guest',
    'pass': 'guest',
}
 
def onData(routingKey, data):
    print routingKey, data
 
builder.exchange('process.log', server).receive("yyy.log", onData)

Esta é a classe

import pika
import json
import time
 
class Exchange:
    def __init__(self, name, conf):
        self.name = name
        self.conf = conf
 
    def emit(self, routingKey, data=None):
        credentials = pika.PlainCredentials(self.conf['server']['user'], self.conf['server']['pass'])
        connection = pika.BlockingConnection(pika.ConnectionParameters(host=self.conf['server']['host'], port=self.conf['server']['port'], credentials=credentials))
        channel = connection.channel()
 
        exchangeConf = self.conf['exchange']
        channel.exchange_declare(exchange=self.name, type='topic', passive=exchangeConf['passive'], durable=exchangeConf['durable'], auto_delete=exchangeConf['autoDelete'], internal=exchangeConf['internal'])
        channel.basic_publish(exchange=self.name, routing_key=routingKey, body=json.dumps(data))
        connection.close()
 
    def receive(self, bindingKey, callback):
        credentials = pika.PlainCredentials(self.conf['server']['user'], self.conf['server']['pass'])
        connection = pika.BlockingConnection(pika.ConnectionParameters(host=self.conf['server']['host'], port=self.conf['server']['port'], credentials=credentials))
        channel = connection.channel()
 
        exchangeConf = self.conf['exchange']
        channel.exchange_declare(exchange=self.name, type='topic', passive=exchangeConf['passive'], durable=exchangeConf['durable'], auto_delete=exchangeConf['autoDelete'], internal=exchangeConf['internal'])
 
        queueConf = self.conf['queue']
        result = channel.queue_declare(passive=queueConf['passive'], durable=queueConf['durable'], exclusive=queueConf['exclusive'], auto_delete=queueConf['autoDelete'])
        queue_name = result.method.queue
 
        channel.queue_bind(exchange=self.name, queue=queue_name, routing_key=bindingKey)
 
        print "%s Exchange '%s' initialized" % (time.strftime("[%d/%m/%Y-%H:%M:%S]", time.localtime(time.time())), self.name)
 
        def _callback(ch, method, properties, body):
            callback(method.routing_key, json.loads(body))
            ch.basic_ack(delivery_tag=method.delivery_tag)
            print "%s %s:::%s" % (time.strftime("[%d/%m/%Y-%H:%M:%S]", time.localtime(time.time())), self.name, body)
 
        consumerConf = self.conf['consumer']
        channel.basic_consume(_callback, queue=queue_name, no_ack=consumerConf['noAck'], exclusive=consumerConf['exclusive'])
        channel.start_consuming()

E, finalmente, podemos usar RPCs. Emitir

from rabbit import builder
 
server = {
    'host': 'localhost',
    'port': 5672,
    'user': 'guest',
    'pass': 'guest',
}
 
print builder.rpc('rpc.hello', server).call("Gonzalo", "Ayuso")

E o lado do servidor

from rabbit import builder
 
server = {
    'host': 'localhost',
    'port': 5672,
    'user': 'guest',
    'pass': 'guest',
}
 
def onData(name, surname):
    return "Hello %s %s" % (name, surname)
 
builder.rpc('rpc.hello', server).server(onData)

E esta é a classe

import pika
import json
import time
import uuid
 
class RPC:
    def __init__(self, name, conf):
        self.name = name
        self.conf = conf
 
    def call(self, *params):
        pika.PlainCredentials(self.conf['server']['user'], self.conf['server']['pass'])
        connection = pika.BlockingConnection(pika.ConnectionParameters(host=self.conf['server']['host'], port=self.conf['server']['port']))
        channel = connection.channel()
 
        queueConf = self.conf['queue']
        result = channel.queue_declare(queue='', passive=queueConf['passive'], durable=queueConf['durable'], exclusive=queueConf['exclusive'], auto_delete=queueConf['autoDelete'])
        callback_queue = result.method.queue
        consumerConf = self.conf['consumer']
        channel.basic_consume(self.on_call_response, no_ack=consumerConf['noAck'], exclusive=consumerConf['exclusive'], queue='')
 
        self.response = None
        self.corr_id = str(uuid.uuid4())
        channel.basic_publish(exchange='', routing_key=self.name, properties=pika.BasicProperties(reply_to=callback_queue, correlation_id=self.corr_id), body=json.dumps(params))
        while self.response is None:
            connection.process_data_events()
        return self.response
 
    def on_call_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = body
 
    def server(self, callback):
        pika.PlainCredentials(self.conf['server']['user'], self.conf['server']['pass'])
        connection = pika.BlockingConnection(pika.ConnectionParameters(host=self.conf['server']['host'], port=self.conf['server']['port']))
        channel = connection.channel()
 
        queueConf = self.conf['queue']
        channel.queue_declare(self.name, passive=queueConf['passive'], durable=queueConf['durable'], exclusive=queueConf['exclusive'], auto_delete=queueConf['autoDelete'])
 
        channel.basic_qos(prefetch_count=1)
        consumerConf = self.conf['consumer']
 
        def on_server_request(ch, method, props, body):
            response = callback(*json.loads(body))
 
            ch.basic_publish(exchange='', routing_key=props.reply_to, properties=pika.BasicProperties(correlation_id=props.correlation_id), body=json.dumps(response))
            ch.basic_ack(delivery_tag=method.delivery_tag)
            print "%s %s::req => '%s' response => '%s'" % (time.strftime("[%d/%m/%Y-%H:%M:%S]", time.localtime(time.time())), self.name, body, response)
 
        channel.basic_consume(on_server_request, queue=self.name, no_ack=consumerConf['noAck'], exclusive=consumerConf['exclusive'])
 
        print "%s RPC '%s' initialized" % (time.strftime("[%d/%m/%Y-%H:%M:%S]", time.localtime(time.time())), self.name)
        channel.start_consuming()

E isso é tudo. O projeto completo está disponível no meu github

 

***

Gonzalo Ayuso faz parte do time de colunistas internacionais do iMasters. A tradução do artigo é feita pela Redação iMasters, com autorização do autor, e você pode acompanhar o artigo em inglês no link:  https://gonzalo123.com/2017/03/13/playing-with-rabbitmq-part-2-now-with-python/