Desenvolvimento

10 dez, 2012

Vedando a sua implementação de algoritmo distribuído

Publicidade

Eu: Ok, todos os testes unitários estão ok.
Todos os testes do sistema também.
Todos os testes de aceitação também.
Libere!
(algum tempo depois)
NN: Nós temos um problema com o seu banco de dados distribuído.
Eu: Ok, qual é o problema?
NN: De vez em quando, o cluster parece ficar preso.
Me: Preso como?
NN: Parece ser incapaz de eleger um master.
Eu: O que você fez para chegar lá?
NN: Nós não sabemos, mas temos os logs. Aqui estão eles. Feliz caçada.
Me: $ #% # & *!
(algum tempo depois)
Eu: Ah, isso é o que deu errado!

Introdução

A equipe Arakoon gastou grande parte dos últimos dois anos em variações desse tema. Alguns dos problemas eram problemas de configuração (por vezes até de outros clusters), mas definitivamente houve situações em que a questão foi causada por um erro de execução. Como se pode ver, todos eles têm a mesma causa: nem todos os possíveis cenários foram cobertos. Então, é possível escapar desse círculo vicioso?

Nós já aprendemos algumas coisas (ver este artigo anterior), por isso usamos a passagem de mensagens assíncronas e removemos todos os IO da nossa implementação. Então o que sobrou?

Bem, temos um conjunto de objetos distribuídos com estados bem definidos e um conjunto de mensagens que podem ser trocadas entre elas. Não podemos apenas gerar todos os estados do sistema distribuído e todas as mensagens possíveis e verificar como são as relações entre eles? Isso não é trivial, já que o número de estados possíveis é infinito e também é o número de mensagens possíveis. Por outro lado, a maioria das combinações de estado é ruim e inconsistente, e a maioria das mensagens é irrelevante… Deve existir algo que pode podemos fazer.

Aumentando um diagrama de estado space-ish

E se a gente começar a partir de um estado consistente de partida para todos os objetos (vamos chamar isso de estado do sistema), gerar todas as mensagens relevantes que podem ser trocadas a partir desse estado e aplicá-los em todas as ordens possíveis? Os estados do sistema que podem ser alcançados dessa forma a partir do estado de partida devem todos ser consistentes. Se encontrarmos um estado que não seja, nós temos que parar. Para os estados de sistema consistentes, podemos iterar. E os estados inconsistentes? Bem, isso significa claramente que o nosso algoritmo é capaz de fazer coisas ruins. Devemos verificar o cenário que produziu isso e corrigi-lo, e repetir novamente. Ele é viável? Bem, talvez… E que tal simular mensagens descartadas?

Começos humildes

Vamos começar pequeno. Que tal desenvolver uma implementação Basic Paxos para um sistema de três nós? Modelar as mensagens não deve ser muito difícil:

type 'v kind =
| Prepare
| Promise of 'v option
| Accept  of 'v
| Accepted

type 'v message = {n:n; s:id; t:id; k:'v kind}

Cada mensagem está associada a um círculo paxos n, possui uma fonte de s e um alvo t e tem a semântica descrita pelo seu tipo k. Finalmente existe algum tipo de valor (‘v) para as coisas que o sistema deve tentar encontrar um consenso. (Você pode encontrar o código no Github)

A modelação dos agentes dá um pouco mais de trabalho:

type 'v agent_state =
| SHalted
| SIdle
| SRequest  of ('v proposal)
| SPromised of 'v option
| SLead of ('v * int) (* value, outstanding acks *)

type 'v agent = {
id : id;
pop : id list;
_n : n;
state : 'v agent_state;
store : (n * 'v) list;
}

Um agente possui uma id, conhece os outros agentes (pop), tem um armazenamento e um atual circulo paxos n. A parte interessante é o estado interno que representa o papel que ele está desenvolvendo. Ele pode ser interrompido ou inativo, pedindo para se tornar um líder de uma proposta ou liderar uma atualização.

Agora que temos as mensagens e os agentes, podemos modelar o estado do sistema.

type 'v network = 'v message list
type 'v agents  = 'v agent list
type 'v state   = { net: 'v network; ags: 'v agents}

O estado do sistema é apenas uma coleção de agentes (ags) e uma rede (net) representando as mensagens que podem ser entregues aos agentes.

Como o estado do sistema pode mudar? Em primeiro lugar, a entrega de uma mensagem muito provavelmente terá um impacto. Poderíamos acrescentar outras ações mais tarde.

type 'v move =
| DeliverMsg of 'v message

Gerar todas as ações agora é fácil: para cada mensagem na rede existe uma ação que a entrega, e a não ser que nós desejemos parar em estados ruins, não geramos mensagens lá:

let generate_moves state =
if is_bad state
then []
else
let deliver = List.map (fun x -> DeliverMsg x) state.net in
...

E a execução de uma ação? Há alguma administração a ser feita lá.
Temos uma ação para executar, um estado atual, um caminho que foi seguido para chegar a esse estado e um conjunto de estados observados. Se a ação for a entrega de uma mensagem, nós temos que achar o agente alvo e deixá-los lidar com a mensagem. Isso vai mudar o estado do agente e produzir algumas mensagens (extra).

let execute_move move state path observed =

let deliver m ag =
let agent = find_agent ag m.t in
let agent', extra = A.handle_message agent m in
let ag' = replace_agent agent' ag in
ag', extra
in

let state' =
match move with
| DeliverMsg m ->
let net' = remove m state.net in
let ags',extra = deliver m state.ags in
{ net = net' @ extra; ags = ags'}

A execução de uma ação vai causar um novo estado do sistema. Vamos gravar observando a transição do estado antigo para o novo através dessa ação e criar o novo caminho.

let transition = (state_label state, move, state_label state') in
TSet.add transition observed in
state' , (move,state) :: path , observed'

Todo o simulador está a algumas funções de distância:

let rec check_all level state path observed =
if not (is_consistent state) then raise (Bad (path,state));
if level = limit
then observed
else
let rec loop observed = function
| [] -> observed
| move :: rest ->
let state', path', observed' = execute_move move state path observed in
let observed'' = check_all (level + 1) state' path' observed' in
loop observed'' rest
in
let moves = generate_moves state in
loop observed moves

let run state0 = check_all 0 state0 [] TSet.empty

Basicamente, começamos de um estado inicial, descemos pela árvore de ações possíveis, executamos tudo isso e  acumulamos transições observadas.

Com Graphviz, gerar um diagrama é trivial. Apenas itere sobre as transições observadas. (o que não for mostrado aqui, veja no Github para mais detalhes)

A simulação

Criamos três agentes, deixamos que o primeiro começasse com um valor, executamos nosso simulador a partir do estado inicial e fizemos um dottify nas transições observadas.

let main () =
let ids = List.map id [1;2;3] in
let a1,a1_out = start (make_agent (Id 1) ids) "x" in
let a2,a2_out = (make_agent (Id 2) ids),[] in
let a3,a3_out = (make_agent (Id 3) ids),[] in
let world = [a1;a2;a3] in
let state0 = {net = a1_out @ a2_out @ a3_out; ags = world} in
try
let observed = M.run state0 in
M.dottify state0 observed
with (M.Bad (path, state)) ->
Printf.eprintf "bad path:\n";
M.dump_path path;
Printf.eprintf "%s\n" (state_label state)

Mark0

Vamos tentar isso numa implementação de morte cerebral simples de um agente.

Aquela que vai para o estado parado logo que ele recebe uma mensagem, ao não enviar nenhuma mensagem.

module Mark0 = (struct
let handle_message agent message = halt agent, []

end : ALGO)

 

O que vemos aqui? Primeiro, existe um sistema de rótulos para os estados: R1I0I0 significa que o primeiro agente tem n=1 e está em um estado requerente, enquanto o segundo e o terceiro agentes estão no estado inativo com n=0.
Após a entrega da mensagem {Prepare;1->2;n=1}, uma preparação do agente 1 para o agente 2, para segundo agente. O mesmo vale para a outra mensagem de preparação. Isso parece ok, então vamos seguir em frente.

Mark1

Vamos construir um agente de execução que abrange o melhor caminho.

module Mark1 = (struct

let prepare_when_idle source n agent=
let an = agent._n in
if n > an
then
let pv = None in
let agent' = {agent with state = SPromised pv; _n = n;} in
let msg = {n = n;s = agent.id;t = source; k = Promise pv } in
let out = [msg] in
agent', out
else
halt agent,[]

let promise_for_request (source:id) (mn:n) vo (proposal:'v proposal) agent =
let pv,pballot = proposal in
if mn = agent._n
then
let pballot' = Ballot.register_vote pballot source vo in
if Ballot.have_majority pballot'
then
let value = Ballot.pick_value pballot' in
let outstanding_acks = Ballot.quorum pballot' -1 in
let me = agent.id in
let targets = others agent in
let make_msg t = {n = mn; s = me; t ; k =  Accept value} in
let broadcast = List.map make_msg targets in
let agent' = { agent with
store = (mn,value) :: agent.store;
state = SLead (value, outstanding_acks);
}
in
agent', broadcast
else
agent, []
else
halt agent, []

let handle_accept m v agent =
match agent.state with
| SPromised vo when agent._n = m.n ->
let agent' = {agent with state = SIdle; store = (m.n, v) :: agent.store;} in
let out = [{n = m.n;s = agent.id;t = m.s;k = Accepted}] in
agent', out
| _ -> halt agent, []

let handle_accepted m agent =
match agent.state with
| SLead (v,out) when agent._n = m.n ->
let out' = out -1 in
let state' = if out' = 0 then SIdle else SLead(v,out') in
{agent with state = state'},[]
| _ -> halt agent, []

let handle_message agent m =
match m.k with
| Prepare when agent.state = SIdle -> prepare_when_idle m.s m.n agent
| Promise vo ->
begin
match agent.state with
| SRequest p -> promise_for_request m.s m.n vo p agent
| _ -> halt agent, []
end
| Accept v -> handle_accept m v agent
| Accepted -> handle_accepted m agent
| _ -> halt agent,[]
end : ALGO)

O que isso faz? (clique na imagem para vê-la em tamanho completo)


A boa notícia é que há um grande número de caminhos a partir do estado inicial I0I0I0 que atinge nosso estado feliz I1I1I1, mas há também uma série de cenários que acabam em maus estados.

Vejamos um em detalhe.

R1I0I0:{Prepare;1->3;n=1} --->
R1I0P1:{Promise;3->1;n=1} --->
L1I0P1:{Accept; 1->2;n=1} --->
L1H0P1

O que aconteceu aqui? Uma mensagem Preparar vai do agente 1 até o agente 3. Aquele agente envia uma Promessa de volta.

Isso faz com que o agente 1 se torne líder e transmita mensagens Aceitar. Uma delas alcança o agente 1, que está sem saber de nada, já que ele não recebeu uma mensagem de preparar primeiro. O agente 1, portanto, pára.

O diagrama nos permite entender os cenários que levam a maus estados e a modificar o algoritmo de forma apropriada. Esse processo de encontrar buracos em seu algoritmo, remendá-los e iterar é algo que eu chamo de vedação, na ausência de um termo melhor. Nesse caso em particular, um agente que está Inativo pode receber um Accept para o próximo n e deve ser capaz de mover-se para o estado Inativo, no n seguinte.

E as mensagens descartadas?

No início, eu não respondi à pergunta sobre a simulação de mensagens descartadas. O cenário acima deve deixar claro que estamos, na verdade, com sorte. Não existe nenhuma diferença entre esse cenário e um onde um Prepare do agente 1 e do agente 2 foi descartado. No geral, não existe nenhuma diferença entre descartar uma mensagem e retardá-la até que já não seja mais relevante. Isso significa que não há qualquer necessidade de simulá-las!

Mark2

Vamos vedar Mark1. Olhando para o diagrama, não tem muita coisa que precisa ser corrigida. Aqui está uma lista de mensagens que dão errado.

  • Accept;n quando o agente está ocioso em pred n
  • Accepted;n quando o agente já está ocioso em n
  • Promise;n quando o agente já está levando em n
  • Promise;n quando o agente já está ocioso em n

Ok, adaptar o código é fácil:

module Mark2 = (struct

let prepare_when_idle source n agent=
let an = agent._n in
if n > an
then
let pv = None in
let agent' = {agent with state = SPromised pv; _n = n;} in
let msg = {n = n;s = agent.id;t = source; k = Promise pv } in
let out = [msg] in
agent', out
else
halt agent,[]

let promise_for_request (source:id) (mn:n) vo (proposal:'v proposal) agent =
let pv,pballot = proposal in
if mn = agent._n
then
let pballot' = Ballot.register_vote pballot source vo in
if Ballot.have_majority pballot'
then
let value = Ballot.pick_value pballot' in
let outstanding_acks = Ballot.quorum pballot' -1 in
let me = agent.id in
let targets = others agent in
let make_msg t = {n = mn; s = me; t ; k =  Accept value} in
let broadcast = List.map make_msg targets in
let agent' = { agent with
store = (mn,value) :: agent.store;
state = SLead (value, outstanding_acks);
}
in
agent', broadcast
else
agent, []
else
halt agent, []

let handle_accept m v agent =
let _accept m =
let agent' = {agent with state = SIdle; store = (m.n, v) :: agent.store;} in
let out = [{n = m.n;s = agent.id;t = m.s;k = Accepted}] in
agent', out
in
match agent.state with
| SPromised vo when agent._n = m.n -> _accept m
| SIdle when (next agent._n) = m.n -> _accept m
| _ -> halt agent, []

let handle_accepted m agent =
match agent.state with
| SLead (v,out) when agent._n = m.n ->
let out' = out -1 in
let state' = if out' = 0 then SIdle else SLead(v,out') in
{agent with state = state'},[]
| SIdle when agent._n = m.n -> agent,[]
| _ -> halt agent, []

let handle_message agent m =
match m.k with
| Prepare when agent.state = SIdle -> prepare_when_idle m.s m.n agent
| Promise vo ->
begin
match agent.state with
| SRequest p -> promise_for_request m.s m.n vo p agent
| SLead(v,out) when agent._n = m.n -> agent, []
| SIdle when agent._n = m.n -> agent, []
| _ -> halt agent, []
end
| Accept v -> handle_accept m v agent
| Accepted -> handle_accepted m agent
| _ -> halt agent,[]
end : ALGO)

Observe o diagrama de saída:

Não é ótimo que consertar os buracos no nosso algoritmo realmente o torna o diagrama menor? Uma vez que não terminamos mais em maus estados, existem bem menos transições. Ele também está esteticamente satisfazendo graphviz, ao mostrar todas as setas da esquerda para a direita, ou seja, não há transições que realmente aumentam a distância entre o estado atual e o estado que estamos buscando.

E os agentes que estão limpos?

Este tipo de calamidade não é muito difícil de se simular. Basicamente é uma ação que coloca o agente de volta em seu estado inicial. Vamos adicionar a possibilidade de que um dos agentes estar limpo.

let generate_moves state =
if is_bad state
then []
else
let deliver = List.map (fun x -> DeliverMsg x) state.net in
let id3 = Id 3 in
let agent = find_agent state.ags id3 in
let wipe =
if is_halted agent
then []
else [Wipe id3]
in
deliver @ wipe

Vamos tentar isto:

./paxos.byte > mark3.dot
bad path:
1 ---(1: Prepare) ---> 3
3 ---(1: Promise) ---> 1
1 ---(1:  Accept) ---> 2
1 ---(1:  Accept) ---> 3
Wipe 3
L1I0I0

Opa!! Existe realmente algo errado por aqui. Como se vê, tem um erro no módulo Mark2.

É este fragmento que está errado:

let handle_accept m v agent =
let _accept m =
let agent' = {agent with state = SIdle; store = (m.n, v) :: agent.store;} in
let out = [{n = m.n;s = agent.id;t = m.s;k = Accepted}] in
agent', out
in
match agent.state with
| SPromised vo when agent._n = m.n -> _accept m
| SIdle when (next agent._n) = m.n -> _accept m
| _ -> halt agent, []

Manusear um Accept quando o agente está no estado inativo também deve configurar o n corretamente (o da mensagem). Vamos corrigir isso e tentar novamente.

Aqui está:

Mais uma vez, somos confrontados com muitos estados ruins, mas sabemos como corrigir isso. Quais são os cenários que nos trazem esses estados? Como se vê, eles são causados por mensagens Prepare antigas. Acrescentando isso e gerando o diagrama novamente, temos:

Na verdade, limpar um agente leva para um estado em algum lugar à esquerda do atual estado, o que corresponde à ideia de estar mais longe do nosso objetivo.

Já acabou?

Até agora, nós só abordamos os casos em que existe apenas um agente de um estado requerente. Então, o que aconteceria se houvesse dois agentes solicitando algo ao mesmo tempo?

Feliz vedação!

Observações finais

Chegar a uma aplicação correta de um algoritmo é difícil, ainda mais no caso de sistemas distribuídos. Este artigo mostra uma estratégia que você pode aplicar na vedação das suas próprias implementações. Como dito antes, fazer a sua implementação pura ajuda muito.

Divirta-se!

***

Texto original disponível em http://blog.incubaid.com/2012/10/25/caulking-your-distributed-algorithm-implementation/