.NET

7 jan, 2019

.NET Core 2.1 + ASP.NET Core 2.1 + RabbitMQ: exemplos utilizando mensageria

100 visualizações
Publicidade

Neste novo artigo trago exemplos de implementações envolvendo mensageria e fazendo uso, para isto, do .NET Core 2.1, do ASP.NET Core 2.1 e do RabbitMQ. Já abordei, inclusive, este tipo de prática em outro artigo:

Tivemos ainda no primeiro semestre de 2018 um evento focado em RabbitMQ no Canal .NET, no qual realizei uma demonstração juntamente com o MVP Luiz Carlos Faria:

Configurações necessárias para uso do RabbitMQ com .NET Core e ASP.NET Core

Para os testes detalhados neste artigo fiz uso do RabbitMQ a partir de um container Docker, o qual foi criado a partir da seguinte instrução:

docker run -d --hostname rabbit-local --name testes-rabbitmq -p 5672:5672 -p 15672:15672 -e RABBITMQ_DEFAULT_USER=testes -e RABBITMQ_DEFAULT_PASS=Testes2018! rabbitmq:3-management

O package RabbitMQ.Client também deverá ser adicionado ao projeto (seja ele o publisher/gerador de uma mensagem ou, mesmo, o subscriber/consumidor de uma fila):

Os fontes da Solution contendo os projetos mencionados nesta seção foram disponibilizados no GitHub:

Gerando uma fila e enviando mensagens

Na próxima listagem temos um exemplo de uso do RabbitMQ em uma aplicação que funcionará como publisher (uma API REST criada com o ASP.NET Core 2.1 neste caso específico):

  • A conexão a um message broker baseado no RabbitMQ acontecerá por meio de instâncias dos tipos ConnectionFactory (variável factory), IConnection (variável connection) e IModel (variável channel);.Todas essas estruturas fazem parte do namespace RabbitMQ.Client
  • A chamada ao método QueueDeclare indicará a fila/queue empregada na troca de mensagens
  • Já a invocação da operação BasicPublish permitirá o envio de uma mensagem a uma fila
using System;
using System.Text;
using Microsoft.AspNetCore.Mvc;
using RabbitMQ.Client;
using APIMensagens.Models;

namespace APIMensagens.Controllers
{
    [Route("api/[controller]")]
    [ApiController]
    public class MensagensController : ControllerBase
    {
        private static Contador _CONTADOR = new Contador();

        [HttpGet]
        public object Get()
        {
            return new
            {
                QtdMensagensEnviadas = _CONTADOR.ValorAtual
            };
        }

        [HttpPost]
        public object Post(
            [FromServices]RabbitMQConfigurations configurations,
            Conteudo conteudo)
        {
            lock (_CONTADOR)
            {
                _CONTADOR.Incrementar();

                var factory = new ConnectionFactory()
                {
                    HostName = configurations.HostName,
                    Port = configurations.Port,
                    UserName = configurations.UserName,
                    Password = configurations.Password
                };

                using (var connection = factory.CreateConnection())
                using (var channel = connection.CreateModel())
                {
                    channel.QueueDeclare(queue: "TestesASPNETCore",
                                         durable: false,
                                         exclusive: false,
                                         autoDelete: false,
                                         arguments: null);

                    string message =
                        $"{DateTime.Now.ToString("dd/MM/yyyy HH:mm:ss")} - " +
                        $"Conteúdo da Mensagem: {conteudo.Mensagem}";
                    var body = Encoding.UTF8.GetBytes(message);

                    channel.BasicPublish(exchange: "",
                                         routingKey: "TestesASPNETCore",
                                         basicProperties: null,
                                         body: body);
                }

                return new
                {
                    Resultado = "Mensagem encaminhada com sucesso"
                };
            }
        }
    }
}

Processando mensagens

Já na listagem a seguir, temos um exemplo de consumo de mensagens vinculadas a uma fila em Console Application, baseada no .NET Core 2.1:

  • O subscriber/consumidor de uma fila também fará uso de instâncias dos tipos ConnectionFactory (variável factory), IConnection (variável connection) e IModel (variável channel), além de chamadas aos métodos QueueDeclare e BasicPublish;
  • Uma instância do tipo EventingBasicConsumer (namespace RabbitMQ.Client.Events) será criada, com a implementação do evento Received acontecendo por meio do método Consumer_Received;
  • Quanto a Consumer_Received, nesta operação ficarão as instruções para processamento de mensagens ainda não processadas na fila TestesASPNETCore.
using System;
using System.IO;
using System.Text;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Options;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace ProcessadorMensagens
{
    class Program
    {
        private static IConfiguration _configuration;

        static void Main(string[] args)
        {
            var builder = new ConfigurationBuilder()
                .SetBasePath(Directory.GetCurrentDirectory())
                .AddJsonFile($"appsettings.json");
            _configuration = builder.Build();

            // Para este exemplo foi criado um container Docker baseado
            // em uma imagem do RabbitMQ. Segue o comando para geração
            // desta estrutura:
            // docker run -d --hostname rabbit-local --name testes-rabbitmq -p 6672:5672 -p 15672:15672 -e RABBITMQ_DEFAULT_USER=testes -e RABBITMQ_DEFAULT_PASS=Testes2018! rabbitmq:3-management-alpine
            var rabbitMQConfigurations = new RabbitMQConfigurations();
            new ConfigureFromConfigurationOptions<RabbitMQConfigurations>(
                _configuration.GetSection("RabbitMQConfigurations"))
                    .Configure(rabbitMQConfigurations);

            var factory = new ConnectionFactory()
            {
                HostName = rabbitMQConfigurations.HostName,
                Port = rabbitMQConfigurations.Port,
                UserName = rabbitMQConfigurations.UserName,
                Password = rabbitMQConfigurations.Password
            };

            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.QueueDeclare(queue: "TestesASPNETCore",
                                     durable: false,
                                     exclusive: false,
                                     autoDelete: false,
                                     arguments: null);

                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += Consumer_Received;
                channel.BasicConsume(queue: "TestesASPNETCore",
                     autoAck: true,
                     consumer: consumer);

                Console.WriteLine("Aguardando mensagens para processamento");
                Console.WriteLine("Pressione uma tecla para encerrar...");
                Console.ReadKey();
            }
        }


        private static void Consumer_Received(
            object sender, BasicDeliverEventArgs e)
        {
            var message = Encoding.UTF8.GetString(e.Body);
            Console.WriteLine(Environment.NewLine +
                "[Nova mensagem recebida] " + message);
        }
    }
}

Testes

Requisições contendo em seu corpo dados com a estrutura do documento JSON reproduzido a seguir serão enviadas à API REST de testes, a fim de possibilitar a geração de mensagens associadas à fila TestesASPNETCore:

{
	"Mensagem": "Primeiro teste"
}

Os testes acontecerão então através do utilitário Postman:

Ao acessar a URL http://localhost:15672/ aparecerá o site de gerenciamento do RabbitMQ, com funcionalidades que permitem constatar que a fila TesteASPNETCore foi corretamente criada e que possui mensagens ainda não processadas:

Executando a Console Application que processará as mensagens da fila TesteASPNETCore teremos um resultado similar ao da próxima imagem (o evento Received será disparado de forma automática sempre que houverem mensagens pendentes na fila):

Referências