Back-End

26 set, 2012

Implementação de um client hibrido sync & async de request/response em Python

Publicidade

Arakoon, nosso armazenamento de chave-valor desenvolvido internamente, é um dos nossos projetos mais emblemáticos. Uma vez que um servidor não é de muita utilidade caso não permita que os clientes se comuniquem com ele, nós desenvolvemos também algumas bibliotecas de clientes, incluindo um cliente OCaml, C, PHP e Python.

Foi desenvolvido um cliente alternativo próximo ao cliente Python mantido dentro do repositório principal do Arakoon, (fonte). Um dos objetivos desse cliente alternativo era apoiar o Twisted, um framework de rede assíncrona.

Neste artigo, vou apresentar o método utilizado para conseguir esse resultado. Ele mantém uma clara separação entre o protocolo (os bytes passando sobre o fio) e o transporte (o próprio fio). Os transportes síncronos e assíncronos pode ser implementados podem ser implementados, e novos comandos de request/response podem ser adicionados facilmente, em um único lugar no código fonte.

Ao longo deste artigo, vamos escrever um cliente para esse servidor, que implementa um protocolo semelhante ao protocolo Arakoon:

import socket
import struct
import threading

HOST = 'localhost'
PORT = 8080

COMMAND_STRUCT = struct.Struct('<I')

SUCCESS_CODE = struct.pack('<I', 0)
ERROR_CODE = struct.pack('<I', 1)
ERROR_MESSAGE = 'Invalid request'
ERROR_MESSAGE_DATA = struct.pack('<I%ds' % len(ERROR_MESSAGE),
    len(ERROR_MESSAGE), ERROR_MESSAGE)

LEN_STRUCT = struct.Struct('<I')

def handle(conn):
    while True:
        command_data = ''
        while len(command_data) < COMMAND_STRUCT.size:
            data = conn.recv(COMMAND_STRUCT.size - len(command_data))

            if not data:
                return

            command_data += data

        command, = COMMAND_STRUCT.unpack(command_data)

        if command == 1:
            len_data = ''

            while len(len_data) < LEN_STRUCT.size:
                data = conn.recv(LEN_STRUCT.size - len(len_data))
                len_data += data

            len_, = LEN_STRUCT.unpack(len_data)

            data = ''
            while len(data) < len_:
                data += conn.recv(len_ - len(data))

            conn.send(SUCCESS_CODE)
            conn.send(struct.pack('<L%ds' % len(data), len(data), data[::-1]))
        else:
            conn.send(ERROR_CODE)
            conn.send(ERROR_MESSAGE_DATA)

def main():
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    sock.bind((HOST, PORT))
    sock.listen(1)

    while True:
        conn, addr = sock.accept()
        print 'Connect: %r' % (addr, )

        threading.Thread(target=lambda: handle(conn)).start()

if __name__ == '__main__':
    main()

Cada mensagem enviada por um cliente começa com um número inteiro de 32 bits, o identificador de comando. Atualmente, apenas um comando, ‘reverse’ com ID 1, está implementado. Isso leva uma única string como argumento. As strings são codificadas em duas partes: primeiro, um número inteiro de 32 bits que contém o tamanho da string, seguido pelos dados reais da string como caracteres.

Quando o servidor recebe um comando, ele envia um inteiro de 32 bits denotando sucesso (0×00) ou fracasso (0×01). O comando ‘reverse’ simplesmente retorna uma string de entrada, invertida, usando a codificação da mesma string como descrito anteriormente. Uma vez que esse ciclo estiver completo, um novo comando pode ser enviado por um cliente.

Agora, no lado do cliente. Vamos precisar de algumas importações:

import socket
import struct
import logging
import functools
import collections

from twisted.python import log
from twisted.internet import defer, protocol, reactor
from twisted.protocols import basic, stateful

import utils

Os módulos ‘utils’ contêm alguns helpers, e estão contidos no repositório Pyrakoon.

Nós precisamos de uma maneira de comunicação entre a camada de protocolo e a camada de transporte. Só o lado do protocolo sabe quantos dados são esperados, e apenas o lado de transporte pode fornecer esses bytes, mesmo que não possa ter a quantidade necessária de dados disponíveis imediatamente, e deseje obter a execução (no caso de rede assíncrona). Para tornar o desenvolvimento mais fácil dentro dessas limitações, as co-rotinas são utilizadas em todo o sistema para encapsular o estado intermediário sempre que possível, mantendo a API simples.

Veja como a API funciona: uma ação única de protocolo (por exemplo, a leitura da resposta de um pedido) é feita por uma co-rotina, o que rende um ou mais objetos “Request”, que encapsulam o número de bytes que devem ser fornecidos à co-rotina para o protocolo ser capaz de construir um valor, ou um objeto “Result” que encapsula o valor final. Sempre que a camada superior receber um objeto “Request”, ela deve ler o número solicitado de bytes do transporte, depois envia os dados para a co-rotina, o que vai render outro “Request”, ou, finalmente, um ‘Response’.

As definições são muito simples:

class Request(object):
    def __init__(self, count):
        self.count = count

class Result(object):
    def __init__(self, value):
        self.value = value

Em seguida, o protocolo utiliza alguns tipos diferentes de valores: inteiros de 32 bits (sem assinatura) e strings. Este último utiliza o primeiro em sua codificação interna.

Cada tipo tem três métodos: ‘check’, ‘serialize’ e ‘receive’.

‘check’ realiza a validação de entrada para valores de determinado tipo (verificação de tipo, verificação de limites,…). ‘serialize’ é um gerador que produz os dados codificados para um determinado valor. “receive” é uma co-rotina que produz objetos ‘Request’ e ‘Result’ para receber um valor do tipo.

Para os tipos básicos (por exemplo, números inteiros), os valores embalados utilizando o módulo ‘struct’ módulo podem ser usados, portanto, as implementações de base fornecem a funcionalidade necessária para isso. Aqui está o tipo básico, e as implementações para ambos inteiros não-assinados de 32 bits e strings:

class Type(object):
    PACKER = None

    def check(self, value):
        raise NotImplementedError

    def serialize(self, value):
        if not self.PACKER:
            raise NotImplementedError

        yield self.PACKER.pack(value)

    def receive(self):
        if not self.PACKER:
            raise NotImplementedError

        data = yield Request(self.PACKER.size)
        result, = self.PACKER.unpack(data)

        yield Result(result)

class UnsignedInt32(Type):
    PACKER = struct.Struct('<I')
    MAX_INT = (2 ** 32) - 1

    def check(self, value):
        if not isinstance(value, (int, long)):
            raise TypeError

        if value < 0:
            raise ValueError('Unsigned integer expected')

        if value > self.MAX_INT:
            raise ValueError('Integer overflow')

UNSIGNED_INT32 = UnsignedInt32()

class String(Type):
    def check(self, value):
        if not isinstance(value, str):
            raise TypeError

    def serialize(self, value):
        length = len(value)

        for bytes_ in UNSIGNED_INT32.serialize(length):
            yield bytes_

        yield struct.pack('<%ds' % length, value)

    def receive(self):
        length_receiver = UNSIGNED_INT32.receive()
        request = length_receiver.next()

        while isinstance(request, Request):
            value = yield request
            request = length_receiver.send(value)

        if not isinstance(request, Result):
            raise TypeError

        length = request.value

        if length == 0:
            result = ''
        else:
            data = yield Request(length)
            result, = struct.unpack('<%ds' % length, data)

        yield Result(result)

STRING = String()

Agora que os tipos básicos estão definidos, podemos descrever as mensagens de solicitação/resposta transferidas entre o cliente e o servidor.

Cada mensagem possui uma tag (seu identificador), zero ou mais argumentos, e um tipo de retorno. As mensagens podem ser serializadas e recebidas de forma semelhante aos métodos correspondentes em ‘Type’. A maioria, se não todos as ligações necessárias, pode ser escondida dentro da classe ‘Message’, então as classes específicas de comando podem ser muito curtas e simples. Isto torna mais fácil adicionar novos comandos de protocolo para o cliente também!

Aqui está a definição de ‘Message’, bem como a implementação do nosso comando ‘Reverse’. Observe como a definição deste último é simples.

class Message(object):
    TAG = None
    ARGS = None
    RETURN_TYPE = None

    def serialize(self):
        for bytes_ in UNSIGNED_INT32.serialize(self.TAG):
            yield bytes_

        for arg in self.ARGS:
            name, type_ = arg

            for bytes_ in type_.serialize(getattr(self, name)):
                yield bytes_

    def receive(self):
        code_receiver = UNSIGNED_INT32.receive()
        request = code_receiver.next()

        while isinstance(request, Request):
            value = yield request
            request = code_receiver.send(value)

        if not isinstance(request, Result):
            yield TypeError

        code = request.value

        if code == 0x00:
            result_receiver = self.RETURN_TYPE.receive()
        else:
            result_receiver = STRING.receive()

        request = result_receiver.next()

        while isinstance(request, Request):
            value = yield request
            request = result_receiver.send(value)

        if not isinstance(request, Result):
            raise TypeError

        result = request.value

        if code == 0x00:
            yield Result(result)
        else:
            raise Exception('Error %d: %s' % (code, result))

class Reverse(Message):
    TAG = 0x01
    ARGS = ('text', STRING),
    RETURN_TYPE = STRING

    def __init__(self, text):
        super(Reverse, self).__init__()

        self.text = text

O próximo passo, é escrever a classe base para todas as implementações de clientes reais. Algumas construções de método dinâmico são usadas on the go, com base nas seguintes duas funções utilitárias:

def validate_types(specs, args):
    for spec, arg in zip(specs, args):
        name, type_ = spec[:2]

        try:
            type_.check(arg)
        except TypeError:
            raise TypeError('Invalid type of argument "%s"' % name)
        except ValueError:
            raise ValueError('Invalid value of argument "%s"' % name)

def call(message_type):
    def wrapper(fun):
        argspec = ['self']
        for arg in message_type.ARGS:
            argspec.append(arg[0])

        @utils.update_argspec(*argspec)
        @functools.wraps(fun)
        def wrapped(**kwargs):
            self = kwargs['self']

            if not self.connected:
                raise RuntimeError('Not connected')

            args = tuple(kwargs[arg[0]] for arg in message_type.ARGS)
            validate_types(message_type.ARGS, args)

            message = message_type(*args)

            return self._process(message)

        return wrapped

    return wrapper

A classe base ‘Cliente’ se torna extremamente simples. Sempre que um novo comando é adicionado ao protocolo, acaba ficando óbvio adicioná-lo a essa classe (como é feito para a chamada ‘reverse’).

class Client(object):
    connected = False

    @call(Reverse)
    def reverse(self):
        assert False

    def _process(self, message):
        raise NotImplementedError

Isso é tudo que existe. O que resta são as implementações do transporte específico do cliente.

Começar com um cliente de socket síncrono é o mais fácil. Tudo o que precisamos é de um método ‘connect’ para configurar um socket e implementar o ‘Client._process’ para lidar com a interação entre o protocolo e o transporte. A implementação é bem simples e direta:

class SyncClient(Client):
    def __init__(self):
        self._socket = None

    def connect(self, addr, port):
        self._socket = socket.create_connection((addr, port))

    @property
    def connected(self):
        return self._socket is not None

    def _process(self, message):
        try:
            for part in message.serialize():
                self._socket.sendall(part)

            receiver = message.receive()
            request = receiver.next()

            while isinstance(request, Request):
                data = ''

                while len(data) < request.count:
                    d = self._socket.recv(request.count - len(data))
                    if not d:
                        raise Exception

                    data += d

                request = receiver.send(data)

            if not isinstance(request, Result):
                raise TypeError

            utils.kill_coroutine(receiver, logging.exception)

            return request.value
        except Exception:
            try:
                self._socket.close()
            finally:
                self._socket = None

            raise

O protocolo Twisted é um pouco mais complexo e não será abordado em detalhes neste artigo. Se você alguma vez escreveu um protocolo Twisted sozinho, deve ser fácil seguir em frente. A implementação piggy-backs em ‘twisted.protocol.stateful.StatefulProtocol’, o que simplifica muito.

class TwistedProtocol(Client, stateful.StatefulProtocol, basic._PauseableMixin):
    _INITIAL_REQUEST_SIZE = UNSIGNED_INT32.PACKER.size

    def __init__(self):
        Client.__init__(self)

        self._handlers = collections.deque()
        self._currentHandler = None

        self._connected = False
        self._deferredLock = defer.DeferredLock()

    def _process(self, message):
        deferred = defer.Deferred()
        self._handlers.append((message.receive(), deferred))

        def process(_):
            try:
                for data in message.serialize():
                    self.transport.write(data)
            finally:
                self._deferredLock.release()

        self._deferredLock.acquire().addCallback(process)

        return deferred

    def getInitialState(self):
        self._currentHandler = None

        return self._responseCodeReceived, self._INITIAL_REQUEST_SIZE

    def _responseCodeReceived(self, data):
        self._currentHandler = None

        try:
            self._currentHandler = handler = self._handlers.pop()
        except IndexError:
            log.msg('Request data received but no handler registered')
            self.transport.loseConnection()

            return None

        request = handler[0].next()

        if isinstance(request, Result):
            return self._handleResult(request)
        elif isinstance(request, Request):
            if request.count != self._INITIAL_REQUEST_SIZE:
                handler[1].errback(ValueError('Unexpected request count'))
                self.transport.loseConnection()

                return None

            return self._handleRequest(data)
        else:
            log.err(TypeError,
                'Received unknown type from message parsing coroutine')
            handler[1].errback(TypeError)

            self.transport.loseConnection()

            return None

    def _handleRequest(self, data):
        if not self._currentHandler:
            log.msg('Request data received but no handler registered')
            self.transport.loseConnection()

            return None

        receiver, deferred = self._currentHandler

        try:
            request = receiver.send(data)
        except Exception, exc: #pylint: disable-msg=W0703
            log.err(exc, 'Exception raised by message receive loop')
            deferred.errback(exc)

            return self.getInitialState()

        if isinstance(request, Result):
            return self._handleResult(request)
        elif isinstance(request, Request):
            return self._handleRequest, request.count
        else:
            log.err(TypeError,
                'Received unknown type from message parsing coroutine')
            deferred.errback(TypeError)

            self.transport.loseConnection()

            return None

    def _handleResult(self, result):
        receiver, deferred = self._currentHandler
        self._currentHandler = None

        # To be on the safe side...
        utils.kill_coroutine(receiver, lambda msg: log.err(None, msg))

        deferred.callback(result.value)

        return self.getInitialState()

    def connectionLost(self, reason=protocol.connectionDone):
        self._connected = False

        self._cancelHandlers(reason)

        return stateful.StatefulProtocol.connectionLost(self, reason)

    def _cancelHandlers(self, reason):
        while self._handlers:
            receiver, deferred = self._handlers.popleft()

            utils.kill_coroutine(receiver, lambda msg: log.err(None, msg))

            deferred.errback(reason)

É isso aí! Finalmente, podemos testar os nossos clientes contra o servidor:

HOST = 'localhost'
PORT = 8080

def test_sync():
    client = SyncClient()
    client.connect(HOST, PORT)
    r = client.reverse('sync')
    print 'sync =>', r
    print r, '=>', client.reverse(r)

def test_twisted():
    def create_client(host, port):
        client = protocol.ClientCreator(reactor, TwistedProtocol)
        return client.connectTCP(host, port)

    @defer.inlineCallbacks
    def run(proto):
        result = yield proto.reverse('twisted')
        print 'twisted =>', result
        result2 = yield proto.reverse(result)
        print result2, '=>', result
        proto.transport.loseConnection()

    deferred = create_client(HOST, PORT)
    deferred.addCallback(run)
    deferred.addBoth(lambda _: reactor.stop())

    reactor.run()

if __name__ == '__main__':
    test_sync()
    test_twisted()

Se, por exemplo, um método ‘add’ fosse adicionado ao servidor, que por sua vez retornasse a soma de dois dados de inteiros de 32 bits não-assinados, poderíamos definir um novo comando tipo este:

class Add(Message):
    TAG = 0x02
    ARGS = ('a', UNSIGNED_INT32), ('b', UNSIGNED_INT32),
    RETURN_TYPE = UNSIGNED_INT32

    def __init__(self, a, b):
        super(Add, self).__init__()

        self.a = a
        self.b = b

Em seguida, adicione-o à classe “Client” assim:

@call(Add)
def add(self):
    assert False

Uma vez feito isso, o método ‘add(self, a, b)’ estará disponível em todos os clientes e irá funcionar como o esperado!

Este é apenas um exemplo básico. O protocolo Arakoon contém tipos mais complexos,  incluindo tipos ‘option’ e listas’.

Consulte o código-fonte Pyrakoon para ver como isso é feito. Apenas uma definição de tipo deve ser adicionada, vários comandos podem usá-los, já que é fácil.

Utilizando a abordagem descrita neste artigo, torna-se fácil fornecer implementações de clientes que usam vários backends diferentes (bloqueio, sem bloqueio, sockets ou qualquer outra coisa como transporte,…), e simplificar a adição de novos comandos/chamadas para todos os clientes de uma só vez (mantendo-os em sincronia). Isso simplifica demais a manutenção do cliente.

***

Texto original disponível em http://blog.incubaid.com/2011/12/13/hybrid-sync-async-python-requestresponse-protocol-client-implementations/