Arakoon, nosso armazenamento de chave-valor desenvolvido internamente, é um dos nossos projetos mais emblemáticos. Uma vez que um servidor não é de muita utilidade caso não permita que os clientes se comuniquem com ele, nós desenvolvemos também algumas bibliotecas de clientes, incluindo um cliente OCaml, C, PHP e Python.
Foi desenvolvido um cliente alternativo próximo ao cliente Python mantido dentro do repositório principal do Arakoon, (fonte). Um dos objetivos desse cliente alternativo era apoiar o Twisted, um framework de rede assíncrona.
Neste artigo, vou apresentar o método utilizado para conseguir esse resultado. Ele mantém uma clara separação entre o protocolo (os bytes passando sobre o fio) e o transporte (o próprio fio). Os transportes síncronos e assíncronos pode ser implementados podem ser implementados, e novos comandos de request/response podem ser adicionados facilmente, em um único lugar no código fonte.
Ao longo deste artigo, vamos escrever um cliente para esse servidor, que implementa um protocolo semelhante ao protocolo Arakoon:
import socket import struct import threading HOST = 'localhost' PORT = 8080 COMMAND_STRUCT = struct.Struct('<I') SUCCESS_CODE = struct.pack('<I', 0) ERROR_CODE = struct.pack('<I', 1) ERROR_MESSAGE = 'Invalid request' ERROR_MESSAGE_DATA = struct.pack('<I%ds' % len(ERROR_MESSAGE), len(ERROR_MESSAGE), ERROR_MESSAGE) LEN_STRUCT = struct.Struct('<I') def handle(conn): while True: command_data = '' while len(command_data) < COMMAND_STRUCT.size: data = conn.recv(COMMAND_STRUCT.size - len(command_data)) if not data: return command_data += data command, = COMMAND_STRUCT.unpack(command_data) if command == 1: len_data = '' while len(len_data) < LEN_STRUCT.size: data = conn.recv(LEN_STRUCT.size - len(len_data)) len_data += data len_, = LEN_STRUCT.unpack(len_data) data = '' while len(data) < len_: data += conn.recv(len_ - len(data)) conn.send(SUCCESS_CODE) conn.send(struct.pack('<L%ds' % len(data), len(data), data[::-1])) else: conn.send(ERROR_CODE) conn.send(ERROR_MESSAGE_DATA) def main(): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.bind((HOST, PORT)) sock.listen(1) while True: conn, addr = sock.accept() print 'Connect: %r' % (addr, ) threading.Thread(target=lambda: handle(conn)).start() if __name__ == '__main__': main()
Cada mensagem enviada por um cliente começa com um número inteiro de 32 bits, o identificador de comando. Atualmente, apenas um comando, ‘reverse’ com ID 1, está implementado. Isso leva uma única string como argumento. As strings são codificadas em duas partes: primeiro, um número inteiro de 32 bits que contém o tamanho da string, seguido pelos dados reais da string como caracteres.
Quando o servidor recebe um comando, ele envia um inteiro de 32 bits denotando sucesso (0×00) ou fracasso (0×01). O comando ‘reverse’ simplesmente retorna uma string de entrada, invertida, usando a codificação da mesma string como descrito anteriormente. Uma vez que esse ciclo estiver completo, um novo comando pode ser enviado por um cliente.
Agora, no lado do cliente. Vamos precisar de algumas importações:
import socket import struct import logging import functools import collections from twisted.python import log from twisted.internet import defer, protocol, reactor from twisted.protocols import basic, stateful import utils
Os módulos ‘utils’ contêm alguns helpers, e estão contidos no repositório Pyrakoon.
Nós precisamos de uma maneira de comunicação entre a camada de protocolo e a camada de transporte. Só o lado do protocolo sabe quantos dados são esperados, e apenas o lado de transporte pode fornecer esses bytes, mesmo que não possa ter a quantidade necessária de dados disponíveis imediatamente, e deseje obter a execução (no caso de rede assíncrona). Para tornar o desenvolvimento mais fácil dentro dessas limitações, as co-rotinas são utilizadas em todo o sistema para encapsular o estado intermediário sempre que possível, mantendo a API simples.
Veja como a API funciona: uma ação única de protocolo (por exemplo, a leitura da resposta de um pedido) é feita por uma co-rotina, o que rende um ou mais objetos “Request”, que encapsulam o número de bytes que devem ser fornecidos à co-rotina para o protocolo ser capaz de construir um valor, ou um objeto “Result” que encapsula o valor final. Sempre que a camada superior receber um objeto “Request”, ela deve ler o número solicitado de bytes do transporte, depois envia os dados para a co-rotina, o que vai render outro “Request”, ou, finalmente, um ‘Response’.
As definições são muito simples:
class Request(object): def __init__(self, count): self.count = count class Result(object): def __init__(self, value): self.value = value
Em seguida, o protocolo utiliza alguns tipos diferentes de valores: inteiros de 32 bits (sem assinatura) e strings. Este último utiliza o primeiro em sua codificação interna.
Cada tipo tem três métodos: ‘check’, ‘serialize’ e ‘receive’.
‘check’ realiza a validação de entrada para valores de determinado tipo (verificação de tipo, verificação de limites,…). ‘serialize’ é um gerador que produz os dados codificados para um determinado valor. “receive” é uma co-rotina que produz objetos ‘Request’ e ‘Result’ para receber um valor do tipo.
Para os tipos básicos (por exemplo, números inteiros), os valores embalados utilizando o módulo ‘struct’ módulo podem ser usados, portanto, as implementações de base fornecem a funcionalidade necessária para isso. Aqui está o tipo básico, e as implementações para ambos inteiros não-assinados de 32 bits e strings:
class Type(object): PACKER = None def check(self, value): raise NotImplementedError def serialize(self, value): if not self.PACKER: raise NotImplementedError yield self.PACKER.pack(value) def receive(self): if not self.PACKER: raise NotImplementedError data = yield Request(self.PACKER.size) result, = self.PACKER.unpack(data) yield Result(result) class UnsignedInt32(Type): PACKER = struct.Struct('<I') MAX_INT = (2 ** 32) - 1 def check(self, value): if not isinstance(value, (int, long)): raise TypeError if value < 0: raise ValueError('Unsigned integer expected') if value > self.MAX_INT: raise ValueError('Integer overflow') UNSIGNED_INT32 = UnsignedInt32() class String(Type): def check(self, value): if not isinstance(value, str): raise TypeError def serialize(self, value): length = len(value) for bytes_ in UNSIGNED_INT32.serialize(length): yield bytes_ yield struct.pack('<%ds' % length, value) def receive(self): length_receiver = UNSIGNED_INT32.receive() request = length_receiver.next() while isinstance(request, Request): value = yield request request = length_receiver.send(value) if not isinstance(request, Result): raise TypeError length = request.value if length == 0: result = '' else: data = yield Request(length) result, = struct.unpack('<%ds' % length, data) yield Result(result) STRING = String()
Agora que os tipos básicos estão definidos, podemos descrever as mensagens de solicitação/resposta transferidas entre o cliente e o servidor.
Cada mensagem possui uma tag (seu identificador), zero ou mais argumentos, e um tipo de retorno. As mensagens podem ser serializadas e recebidas de forma semelhante aos métodos correspondentes em ‘Type’. A maioria, se não todos as ligações necessárias, pode ser escondida dentro da classe ‘Message’, então as classes específicas de comando podem ser muito curtas e simples. Isto torna mais fácil adicionar novos comandos de protocolo para o cliente também!
Aqui está a definição de ‘Message’, bem como a implementação do nosso comando ‘Reverse’. Observe como a definição deste último é simples.
class Message(object): TAG = None ARGS = None RETURN_TYPE = None def serialize(self): for bytes_ in UNSIGNED_INT32.serialize(self.TAG): yield bytes_ for arg in self.ARGS: name, type_ = arg for bytes_ in type_.serialize(getattr(self, name)): yield bytes_ def receive(self): code_receiver = UNSIGNED_INT32.receive() request = code_receiver.next() while isinstance(request, Request): value = yield request request = code_receiver.send(value) if not isinstance(request, Result): yield TypeError code = request.value if code == 0x00: result_receiver = self.RETURN_TYPE.receive() else: result_receiver = STRING.receive() request = result_receiver.next() while isinstance(request, Request): value = yield request request = result_receiver.send(value) if not isinstance(request, Result): raise TypeError result = request.value if code == 0x00: yield Result(result) else: raise Exception('Error %d: %s' % (code, result)) class Reverse(Message): TAG = 0x01 ARGS = ('text', STRING), RETURN_TYPE = STRING def __init__(self, text): super(Reverse, self).__init__() self.text = text
O próximo passo, é escrever a classe base para todas as implementações de clientes reais. Algumas construções de método dinâmico são usadas on the go, com base nas seguintes duas funções utilitárias:
def validate_types(specs, args): for spec, arg in zip(specs, args): name, type_ = spec[:2] try: type_.check(arg) except TypeError: raise TypeError('Invalid type of argument "%s"' % name) except ValueError: raise ValueError('Invalid value of argument "%s"' % name) def call(message_type): def wrapper(fun): argspec = ['self'] for arg in message_type.ARGS: argspec.append(arg[0]) @utils.update_argspec(*argspec) @functools.wraps(fun) def wrapped(**kwargs): self = kwargs['self'] if not self.connected: raise RuntimeError('Not connected') args = tuple(kwargs[arg[0]] for arg in message_type.ARGS) validate_types(message_type.ARGS, args) message = message_type(*args) return self._process(message) return wrapped return wrapper
A classe base ‘Cliente’ se torna extremamente simples. Sempre que um novo comando é adicionado ao protocolo, acaba ficando óbvio adicioná-lo a essa classe (como é feito para a chamada ‘reverse’).
class Client(object): connected = False @call(Reverse) def reverse(self): assert False def _process(self, message): raise NotImplementedError
Isso é tudo que existe. O que resta são as implementações do transporte específico do cliente.
Começar com um cliente de socket síncrono é o mais fácil. Tudo o que precisamos é de um método ‘connect’ para configurar um socket e implementar o ‘Client._process’ para lidar com a interação entre o protocolo e o transporte. A implementação é bem simples e direta:
class SyncClient(Client): def __init__(self): self._socket = None def connect(self, addr, port): self._socket = socket.create_connection((addr, port)) @property def connected(self): return self._socket is not None def _process(self, message): try: for part in message.serialize(): self._socket.sendall(part) receiver = message.receive() request = receiver.next() while isinstance(request, Request): data = '' while len(data) < request.count: d = self._socket.recv(request.count - len(data)) if not d: raise Exception data += d request = receiver.send(data) if not isinstance(request, Result): raise TypeError utils.kill_coroutine(receiver, logging.exception) return request.value except Exception: try: self._socket.close() finally: self._socket = None raise
O protocolo Twisted é um pouco mais complexo e não será abordado em detalhes neste artigo. Se você alguma vez escreveu um protocolo Twisted sozinho, deve ser fácil seguir em frente. A implementação piggy-backs em ‘twisted.protocol.stateful.StatefulProtocol’, o que simplifica muito.
class TwistedProtocol(Client, stateful.StatefulProtocol, basic._PauseableMixin): _INITIAL_REQUEST_SIZE = UNSIGNED_INT32.PACKER.size def __init__(self): Client.__init__(self) self._handlers = collections.deque() self._currentHandler = None self._connected = False self._deferredLock = defer.DeferredLock() def _process(self, message): deferred = defer.Deferred() self._handlers.append((message.receive(), deferred)) def process(_): try: for data in message.serialize(): self.transport.write(data) finally: self._deferredLock.release() self._deferredLock.acquire().addCallback(process) return deferred def getInitialState(self): self._currentHandler = None return self._responseCodeReceived, self._INITIAL_REQUEST_SIZE def _responseCodeReceived(self, data): self._currentHandler = None try: self._currentHandler = handler = self._handlers.pop() except IndexError: log.msg('Request data received but no handler registered') self.transport.loseConnection() return None request = handler[0].next() if isinstance(request, Result): return self._handleResult(request) elif isinstance(request, Request): if request.count != self._INITIAL_REQUEST_SIZE: handler[1].errback(ValueError('Unexpected request count')) self.transport.loseConnection() return None return self._handleRequest(data) else: log.err(TypeError, 'Received unknown type from message parsing coroutine') handler[1].errback(TypeError) self.transport.loseConnection() return None def _handleRequest(self, data): if not self._currentHandler: log.msg('Request data received but no handler registered') self.transport.loseConnection() return None receiver, deferred = self._currentHandler try: request = receiver.send(data) except Exception, exc: #pylint: disable-msg=W0703 log.err(exc, 'Exception raised by message receive loop') deferred.errback(exc) return self.getInitialState() if isinstance(request, Result): return self._handleResult(request) elif isinstance(request, Request): return self._handleRequest, request.count else: log.err(TypeError, 'Received unknown type from message parsing coroutine') deferred.errback(TypeError) self.transport.loseConnection() return None def _handleResult(self, result): receiver, deferred = self._currentHandler self._currentHandler = None # To be on the safe side... utils.kill_coroutine(receiver, lambda msg: log.err(None, msg)) deferred.callback(result.value) return self.getInitialState() def connectionLost(self, reason=protocol.connectionDone): self._connected = False self._cancelHandlers(reason) return stateful.StatefulProtocol.connectionLost(self, reason) def _cancelHandlers(self, reason): while self._handlers: receiver, deferred = self._handlers.popleft() utils.kill_coroutine(receiver, lambda msg: log.err(None, msg)) deferred.errback(reason)
É isso aí! Finalmente, podemos testar os nossos clientes contra o servidor:
HOST = 'localhost' PORT = 8080 def test_sync(): client = SyncClient() client.connect(HOST, PORT) r = client.reverse('sync') print 'sync =>', r print r, '=>', client.reverse(r) def test_twisted(): def create_client(host, port): client = protocol.ClientCreator(reactor, TwistedProtocol) return client.connectTCP(host, port) @defer.inlineCallbacks def run(proto): result = yield proto.reverse('twisted') print 'twisted =>', result result2 = yield proto.reverse(result) print result2, '=>', result proto.transport.loseConnection() deferred = create_client(HOST, PORT) deferred.addCallback(run) deferred.addBoth(lambda _: reactor.stop()) reactor.run() if __name__ == '__main__': test_sync() test_twisted()
Se, por exemplo, um método ‘add’ fosse adicionado ao servidor, que por sua vez retornasse a soma de dois dados de inteiros de 32 bits não-assinados, poderíamos definir um novo comando tipo este:
class Add(Message): TAG = 0x02 ARGS = ('a', UNSIGNED_INT32), ('b', UNSIGNED_INT32), RETURN_TYPE = UNSIGNED_INT32 def __init__(self, a, b): super(Add, self).__init__() self.a = a self.b = b
Em seguida, adicione-o à classe “Client” assim:
@call(Add) def add(self): assert False
Uma vez feito isso, o método ‘add(self, a, b)’ estará disponível em todos os clientes e irá funcionar como o esperado!
Este é apenas um exemplo básico. O protocolo Arakoon contém tipos mais complexos, incluindo tipos ‘option’ e listas’.
Consulte o código-fonte Pyrakoon para ver como isso é feito. Apenas uma definição de tipo deve ser adicionada, vários comandos podem usá-los, já que é fácil.
Utilizando a abordagem descrita neste artigo, torna-se fácil fornecer implementações de clientes que usam vários backends diferentes (bloqueio, sem bloqueio, sockets ou qualquer outra coisa como transporte,…), e simplificar a adição de novos comandos/chamadas para todos os clientes de uma só vez (mantendo-os em sincronia). Isso simplifica demais a manutenção do cliente.
***
Texto original disponível em http://blog.incubaid.com/2011/12/13/hybrid-sync-async-python-requestresponse-protocol-client-implementations/