APIs e Microsserviços

29 jan, 2014

Twitter API, ElasticSearch e Kibana – Analisando a rede social

Publicidade

Neste artigo iremos mostrar como você pode fazer para coletar tweets diretamente da STREAM API do Twitter, utilizando qualquer um dos filtros oferecidos pela API.

Para armazenar os tweets utilizaremos o ElasticSearch, que funciona como uma base de dados e é focada principalmente em consulta, e o Kibana para montar dashboards de consulta ao ElasticSearch. Tudo isso rodará sob um sistema operacional Ubuntu.

1. Instalando o ElasticSearch

Comece baixando o ElasticSearch. No momento em que escrevo este artigo, a versão corrente é a 0.90.10. Como estou instalando num sistema operacional Ubuntu, optei por baixar o pacote “.deb”, que facilita muito a instalação no sistema para ser rodado como um serviço.

Se você está usando um servidor sem interface gráfica, para baixar utilize o comando: https://download.elasticsearch.org/elasticsearch/elasticsearch/elasticsearch-0.90.10.deb

Agora, para realizar a instalação do pacote baixado basta executar o comando:

sudo dpkg -i elasticsearch-0.90.10.deb

Agora iremos gerar o arquivo de configurações do elasticsearch. Para tanto, editaremos o arquivo /etc/elasticsearch/elasticsearch.yml, alterando/adicionando os seguintes parâmetros:

cluster.name: twitter_stream
node.name: "twitter_node"
node.master: true
node.data: true
index.number_of_shards: 5
index.number_of_replicas: 1
network.host: 0.0.0.0 #ip da máquina aonde o serviço estará rodando
transport.tcp.port: 9300
http.port: 9200

Pronto, seu elasticsearch já está pronto para começar a “rodar”. Para facilitar o acompanhamento do estado do servidor, vamos instalar mais dois plugins. O primeiro deles é o elasticsearch-head e o segundo é o paramedic. Para instalar os plugins, a maneira mais fácil é usando o próprio elasticsearch, num modelo de “repositório”. Para instalar o elasticsearch-head basta executar o seguinte comando:

/usr/share/elasticsearch/bin/plugin -install mobz/elasticsearch-head

Ele irá baixar e instalar o plugin. Já o paramedic precisa do comando abaixo:

/usr/share/elasticsearch/bin/plugin -install karmi/elasticsearch-paramedic

Pronto, caso o elasticsearch já esteja rodando basta reiniciá-lo. Caso contrário, basta iniciá-lo. Como instalamos o elasticsearch via pacote “.deb”, ele está disponível como um serviço do sistema. Assim, para iniciá-lo, pará-lo ou reiniciá-lo, temos os seguintes três comandos, respectivamente:

sudo service elasticsearch start
sudo service elasticsearch stop
sudo service elasticsearch restart

Agora você já consegue acessá-lo pelo seguinte endereço: http://<ip_da_maquina_aonde_ele_esta_rodando>:9200/_plugin/headhttp://<ip_da_maquina_aonde_ele_esta_rodando>:9200/_plugin/paramedic

2. Lendo tweets e armazenando

O próximo passo agora é ler o stream do Twitter, coletar o que desejamos e salvar no elasticsearch. Antes de mais nada é importante dizer que a API do Twitter só permite acesso a 1% dos tweets mundiais, se a sua consulta, porventura, chegar a esse limite, você receberá uma amostra dos tweets que gostaria de receber. Além disso, pelo que pude entender, esse limite de 1% é “por requisição”.

Há diversas maneiras de se fazer isso. Pode-se montar um shell script, já que o ElasticSearch é pensado para ser “gerido” via requisições GET/POST, enviando e recebendo JSONs. Mas eu prefiro usar as libs python desenvolvidas pelo pessoal do elasticsearch e do twitter que faz muito bem o serviço.

Vale destacar que uma outra opção seria utilizar um plugin do elasticsearch chamado “twitter-river”. Porém sua implementação ainda é muito parcial e ele não suporta filtro por idiomas, o que atrapalha muito a nossa vida, já que com o limite de 1% seria bom filtrar os twittes só pelos em “pt”. Aliás, essa é uma outra questão. Dos diversos filtros existentes na API, o de geolocalização não é muito preciso (não recomendo utilizarem) e o de idioma é muito bom!

Voltando ao nosso script com a lib Python… Primeiramente vamos instalar as dependências necessárias. Para tanto, vou usar o “pip” do python; então você precisa dele instalado (“sudo apt-get install python-pip“). As duas bibliotecas que precisamos instalar são a Elasticsearch e a TwitterAPI:

sudo pip install elasticsearch
sudo pip install twitterapi

Antes de ir ao nosso script, você precisa ter uma conta no Twitter e solicitar uma chave de conexão à API (é automático e instantâneo o processo) – https://dev.twitter.com/discussions/631

Agora o código do script que faz a mágica toda acontecer:

#!/usr/bin/python
# -*- coding: utf-8 -*-

# Importando as bibliotecas

from TwitterAPI import TwitterAPI
import elasticsearch
import logging
import rfc822
import datetime
import numbers
import time
from subprocess import Popen, PIPE

def patch_tweet(d):
    """A API do twitter retorna as datas num formato que o elasticsearch não consegue reconhecer,
        então precisamos parsear a data para um formato que o ES entende, essa função faz isso.
    """

    if 'created_at' in d:
        # twitter uses rfc822 style dates. elasticsearch uses iso dates.
        # we translate twitter dates into datetime instances (pyes will
        # convert datetime into the right iso format understood by ES).
        new_date = datetime.datetime(*rfc822.parsedate(d['created_at'])[:6])
        d['created_at'] = new_date

    count_is_number = isinstance(d['retweet_count'], numbers.Number)
    if 'retweet_count' in d and not count_is_number:
        # sometimes retweet_count is a string instead of a number (eg. "100+"),
        # here we transform it to a number (an attribute in ES cannot have
        # more than one type).
        d['retweet_count'] = int(d['retweet_count'].rstrip('+')) + 1

    return d

def check_es_status():
   """Essa é uma função que verifica se o serviço do ElasticSearch está operando
       e inicia-o ou reinicia-o caso seja necessário.
   """
    cmd = Popen(["service", "elasticsearch", "status"], stdout=PIPE)
    cmd_out, cmd_err = cmd.communicate()
    print(cmd_out) #print para acompanhamento no shell
    if "not running" in cmd_out:
        print "Elastic Search Not Running, trying to start it"
        cmd = Popen(["service", "elasticsearch", "start"], stdout=PIPE)
        cmd_out, cmd_err = cmd.communicate()
        print(cmd_out)  #print para acompanhamento no shell
    time.sleep(15)

# configurando traces e logs
log_dir = "/var/log/elasticsearch/"
tracer = logging.getLogger('elasticsearch.trace')
tracer.setLevel(logging.WARN)
tracer.addHandler(logging.FileHandler(log_dir + 'trace.log'))
default_logger = logging.getLogger('Elasticsearch')
default_logger.setLevel(logging.WARN)
default_logger.addHandler(logging.FileHandler(log_dir + 'default.log'))

# Criando conexão ao elasticsearch
es = elasticsearch.Elasticsearch(["<dominio_do_servidor>:9200"]) # domínio sem o http://

# Configurando as chaves de acesso à API do Twitter.
# Não esqueça de alterar os valores abaixo para os da sua conta.
twitter_api = TwitterAPI(consumer_key='consumer_key',
                      consumer_secret='consumer_secret',
                      access_token_key='access_token',
                      access_token_secret='access_token_secret')

#Aqui inicialmos os filtros que desejamos, para ver a documentação completa acesse:
# https://dev.twitter.com/docs/streaming-apis/parameters
# No caso iremos usar apenas o filtro de idioma e por "palavras" (os termos em "track")
# veja mais sobre como escolher as palavras aqui: https://dev.twitter.com/docs/streaming-apis/parameters#track
filters = {
    "language": ["pt"],
    "track": [ "software", "livre", "open", "source", "floss", "gnu", "gpl", "polignu" ]
}

#Agora é que a mágina acontece, esse é um código feito para rodar 24x7
while True:
    #creates the stream object
    stream = twitter_api.request('statuses/filter', filters)

    #For each item in the stream (tweet data), save it on the elastisearch
    for item in stream.get_iterator():
        try:
            # Saving the tweet on the ES
            es.index(
                index="tweets",
                doc_type="tweet",
                body=patch_tweet(item)
            )
        except:
           #caso haja qualquer problema com o elasticsearch ele verifica o estado e reinicia se necessário
            check_es_status()
            es = elasticsearch.Elasticsearch(["<dominio_do_servidor>:9200"])
            print ("Getting back to tweet recording")

Pronto, agora enquanto você deixar esse script rodando ele coletará os tweets e salvará na base de dados do elasticsearch.

Vale destacar que a API do Twitter retorna um JSON, que é armazenado na íntegra no ElasticSearch.

3. Analisando os dados com o Kibana

Mas de nada adianta armazenarmos quilos e quilos de dados sem analisá-los, não é verdade?

Então para fazer a análise utilizaremos o Kibana, que nada mais é que um conjunto de páginas estáticas (html+css) com um pouco de javascript para se conectar ao ElasticSearch e montar um dashboard lindão, podendo salvar diversos dashboards (ficam salvos como jsosn no próprio elasticsearch). [ex. do kibana: demo.kibana.org/#/dashboard ]

Como o Kibana é baseado em páginas estáticas, precisamos de um servidor web rodando para servir as páginas. Sinta-se à vontade para escolher a sua opção (Apache, Nginx, etc). Descompacte o kibana na pasta pública do servido web e vamos configurá-lo para acessar o ES.

Na pasta principal do Kibana tem um arquivo “config.js“. O que precisamos fazer é alterá-lo para configurar o endereço do elasticsearch. Assim, procure a linha que contém “elasticsearch: ” e coloque ali o endereço de seu servidor. Salve o arquivo e acesse o endereço do servidor pelo navegador e boa diversão!