AWS

25 set, 2014

Pesquisa e interação com o streaming de dados com o Kinesis Connector para ElasticSearch

Publicidade

A equipe Amazon Kinesis está feliz em anunciar o Kinesis Connector para ElasticSearch! Usando o conector, os desenvolvedores podem escrever um aplicativo que carrega o streaming de dados do Kinesis em um cluster ElasticSearch em escala em tempo real e de forma confiável. O Elasticsearch é um motor de busca e análise em código aberto. Ele indexa dados estruturados e não estruturados em tempo real. O Kibana é o motor de visualização de dados do ElasticSearch; ele é usado por devops e analistas de negócios para configuração de painéis interativos. Os dados em um cluster ElasticSearch também podem ser acessados por programação usando a API RESTful ou aplicativos SDKs. Use o template CloudFormation em nosso exemplo para criar rapidamente um cluster ElasticSearch no Amazon Elastic Compute Cloud (EC2), totalmente gerenciado por Auto Scaling.

Conexão entre Kinesis, Elasticsearch e Kibana

O diagrama de blocos abaixo mostra como as peças se encaixam no tabuleiro:

kinesis_elasticsearch_wiring_1

Ao usar o novo Kinesis Connector para ElasticSearch, é possível criar um aplicativo para consumir os dados do Kinesis Stream e indexá-los em um cluster ElasticSearch. Você pode transformar, filtrar e fazer o buffer de registros antes de emiti-los para o ElasticSearch. Você pode também fazer o ajuste fino de operações de indexação específicas do ElasticSearch para adicionar campos como time to live, version number, type e id em uma base por registro. O fluxo de registros segue conforme ilustrado no diagrama abaixo:

kinesis_elasticsearch_data_flow_pipeline_1

Note que também é possível executar todo o conector pipeline de dentro do cluster ElasticSearch usando River.

Como começar

Seu código possui as seguintes atribuições:

  1. Configurações específicas da aplicação
  2. Criar e configurar umKinesisConnectorPipeline com um Transformer, um Filter, um Buffer e um Emitter.
  3. Criar um KinesisConnectorExecutor que executa o pipeline continuamente.

Todos os componentes acima vêm com uma implementação padrão, que pode ser facilmente substituída pela lógica personalizada do usuário.

Configure as propriedades do conector

exemplo vem com um arquivo .properties e um configurador. Há muitas configurações e o usuário pode deixar a maioria delas definidas para os valores padrão. Por exemplo, as seguintes definições irão configurar o conector para o carregamento em massa de dados no ElasticSearch somente após coletar ao menos 1.000 registros e usar o cluster local do ElasticSearch para testes.

bufferRecordCountLimit = 1000
elasticSearchEndpoint = localhost

Implementação de componentes do pipeline

Para conectar Transformer, Filter, Buffer e Emitter, o código deve implementar a interface IKinesisConnectorPipeline.

public class ElasticSearchPipeline implements
IKinesisConnectorPipeline<String,ElasticSearchObject>
public IEmitter<ElasticSearchObject> getEmitter
(KinesisConnectorConfiguration configuration) {
return new ElasticSearchEmitter(configuration);
}
view sourceprint?
public IBuffer<String> getBuffer(
KinesisConnectorConfiguration configuration) {
return new BasicMemoryBuffer<String>(configuration);
}
public ITransformerBase <String, ElasticSearchObject> getTransformer
(KinesisConnectorConfiguration configuration) {
return new StringToElasticSearchTransformer();
}
public IFilter<String> getFilter
(KinesisConnectorConfiguration configuration) {
return new AllPassFilter<String>();
}

O trecho a seguir implementa o método de construção abstrato, indicando o pipeline que se deseja usar:

public KinesisConnectorRecordProcessorFactory<String,ElasticSearchObject> 
 getKinesisConnectorRecordProcessorFactory() {
 return new KinesisConnectorRecordProcessorFactory<String, 
 ElasticSearchObject>(new ElasticSearchPipeline(), config);

}

Definição do executor

O trecho a seguir define um pipeline onde os registros de entrada Kinesis são strings e os registros de saída são um ElasticSearchObject:

public class ElasticSearchExecutor extends
 KinesisConnectorExecutor<String,ElasticSearchObject>

O trecho a seguir implementa o método principal, cria o Executor e começa a executá-lo:

public static void main(String[] args) {
 KinesisConnectorExecutor<String, ElasticSearchObject> executor 
 = new ElasticSearchExecutor(configFile);
 executor.run();
}

A partir daqui, certifique-se de que suas credenciais AWS sejam fornecidas corretamente. Configure as dependências do projeto usando ant setup. Para executar o aplicativo, use ant run e pronto! Todo o código está no GitHub e pode ser usado imediatamente. Por favor envie suas perguntas e sugestões no Kinesis Forum.

Kinesis Client Library e Kinesis Connector Library

Quando lançamos o Kinesis em novembro de 2013, também introduzimos a Kinesis Client Library. Você pode usar a biblioteca cliente para construir aplicações que processam dados de streaming. Ela lida com questões complexas, como balanceamento de carga de streaming de dados e coordenação de serviços distribuídos, enquanto se adapta às alterações no volume de transmissão, tudo de uma forma tolerante a falhas. Sabemos que muitos desenvolvedores querem consumir e processar fluxos de entrada usando uma variedade de outros serviços AWS e não AWS. A fim de atender a essa necessidade, lançamos o Kinesis Connector Library no final do ano passado, com suporte para Amazon DynamoDBAmazon Redshift e Amazon Simple Storage Service (S3). Em seguida, demos andamento ao Kinesis Storm Spout e ao conector Amazon EMR no início deste ano. Hoje, expandimos o Kinesis Connector Library com suporte ao ElasticSearch.

 

***

Este artigo faz parte do AWSHUB, rede de profissionais AWS gerenciado pelo iMasters.