5 minuto(s) de leitura

Asynchronous-communication.png

Esse artigo surgiu como uma ideia quando eu estava revisando conteúdos que eu já havia estudado, e com os quais já trabalhei. Como precisava estudar alguns outros tópicos relacionados ao uso de Kafka em soluções de aplicação tive a ideia de compartilhar um pouco de informação. É um artigo de nível bem iniciante. Não sou um especialista em Kafka, mas já trabalhei com soluções que utilizavam o modelo de comunicação assíncrona e sistemas de mensageria. Baseado nisso, tentarei explorar o assunto cobrindo os seguintes items:

  • Trazer algumas considerações sobre cenários de solução de comunicação síncrona e assíncrona;
  • Exemplo de uso do Kafka no contexto de um cenário de comunicação assíncrona (implementação básica de producer e consumer).

A ideia não é explorar os trade-offs sobre Kafka como tecnologia ou plataforma. Então, não haverá aprofundamento nessa direção. Sobre o tema o
AWS link relacionado com o serviço Amazon Managed Streaming para Apache Kafka tem algumas comparações quando compara Kafka e RabbitMQ. É possível ter uma ideia sobre as diferenças quando Kafka é comparado com ferramentas de solução de mensageria.

Comunicação síncrona e assíncrona

Nesses anos em que tenho trabalho com TI é possível ver como uma situação comum cenários onde desenvolvedores tem dificuldade de distinguir sobre quando usar comunicação síncrona e assíncrona em cenários de solução. Muitas vezes é possível perceber a falta da avaliação de alguns detalhes dos requisitos para projetar a solução a ser implementada, ou uma avaliação errada sobre quais poderiam ser a abordagem apropriada a ser aplicada a um problema. É de grande ajuda o entendimento das principais características de ambas as abordagens quando se está fazendo essa avaliação.

Integração Síncrona vs. Assíncrona

Em resumo, o principal ponto de destingue a necessidade de comunicação síncrona e assíncrona será a necessidade de precisar ou não receber uma resposta de uma requisição para poder continuar o processamento de alguma outra coisa. Quando a resposta é necessária para processar algo, o cenário de comunicação síncrona será geralmente a melhor abordagem. Quando num cenário de comunicação for possível ou necessário desacoplar o comportamento do produtor da informação do comportamento do consumidor de forma que a resposta não seja necessária para continuar o processamento de algo, geralmente a abordagem assíncrona será melhor.

synch_vs_asynch.png

Importante reforçar que não existe bala de prata quando se discute projeto de solução. As escolhas vêm com trade-offs a serem analisados onde a escolha tem pontos positivos e problema a serem tratados, assim como podem existir alguns requisitos que podem invalidar algum candidato à solução. Por exemplo, É possível trabalhar com APIs para um projeto de comunicação assíncrona se a corporação trabalha com uma abordagem API driven. O serviço devolve uma resposta para a requisição com um HTTP status code 202 para informar que algo vai ser processado no futuro. E algum endpoint de callback do cliente pode ser chamado posteriormente quando esse processamento terminar. Mas essa abordagem tem esses trade-offs.

Ao considerar Kafka como um componente em sua solução é possível dizer que a maioria dos casos ele vai se encaixar em um
Enterprise Integration Pattern relacionado à Messaging Patterns. Com Kafka é possível
combinar dois modelos de mensageria (filas e publish-subscribe) para prover os benefícios chave para cada consumidor. Mais detalhes podem ser encontrados nesse AWS link sobre Amazon Managed Streaming para Apache Kafka.

Exemplo de Producer e Consumer usando Kafka

Apache Kafka é uma tecnologia de streaming de dados real-time capaz de lidar com trilhões de eventos por dia. É comumente utilizada para construir real-time streaming data pipelines e aplicações event-driven. Provê uma plataforma distribuída, com high-throughput, low-latency, e tolerância a falhas, para lidar com feeds de dados real-time - conhecidos como eventos. Mais detalhes podem ser encontrados em Apache Kafka official page.

A principal ideia é mostrar um exemplo com Kafka para envio e consumo de mensagens em uma comunicação assíncrona. Para ilustrar um producer e um consumer foram criadas 2 aplicações Kotlin utilizando Kafka como exemplo em que Tweets são consumidos baseado em algum parâmetro e sendo o evento salvo em um repositório ElasticSearh que permita pesquisar o conteúdo dos Tweets. Em uma aplicação de real-time isso poderia ser usado para acompanhar em tempo real assuntos ou temas que são
trend topics por qualquer motivo em específico. Mais detalhes sobre a implementação podem ser verificados aqui.

Nesse exemplo não foi profundamente explorado o conjunto de configurações do consumer ou configurações do producer. A ideia principal era ressaltar um exemplo de implementação para produzir mensagens/eventos para um Kafka e consumi-los. A fonte de informação são os Tweets filtrados. Todos os componentes estão relacionados em uma estrutura docker-compose para facilitar o uso e manusear o exemplo.

cached image

Nas aplicações foram criadas configurações de componentes para ter todas as configurações de Kafka para consumers e producers.

   @Bean
    fun consumerFactory(): ConsumerFactory<String?, Any?> {
        val props: MutableMap<String, Any> = HashMap()
        props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = servers
        props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
        props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
        props[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest"
        return DefaultKafkaConsumerFactory(props)
    }

    @Bean
    fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, Any>? {
        val factory = ConcurrentKafkaListenerContainerFactory<String, Any>()
        factory.consumerFactory = consumerFactory()
        factory.containerProperties.ackMode = ContainerProperties.AckMode.MANUAL_IMMEDIATE
        factory.containerProperties.isSyncCommits = true;
        return factory
    }
    @Bean
    fun producerFactory(): ProducerFactory<String, Any> {
        val configProps: MutableMap<String, Any> = HashMap()
        configProps[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = servers
        configProps[ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG] = true
        configProps[ProducerConfig.ACKS_CONFIG] = "all"
        configProps[ProducerConfig.COMPRESSION_TYPE_CONFIG] = "snappy"
        configProps[ProducerConfig.LINGER_MS_CONFIG] = 5
        configProps[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
        configProps[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
        return DefaultKafkaProducerFactory(configProps)
    }

    @Bean
    fun kafkaTemplate(): KafkaTemplate<String, Any> {
        return KafkaTemplate(producerFactory())
    }

Uma vez que as mensagens de tweet são objetos JSon grandes não tentei mapeá-los para objetos específicos. Foram serializados como uma String JSon em ambos os lados da comunicação. Os detalhes sobre o consumo das mensagens de tweet estão no seguinte GitHub repository. Após lê-las, é possível publicá-las em um tópico Kafka:

@Component
class KafkaTweetsProducer(
    private val kafkaTemplate: KafkaTemplate<String, Any>
) {

    companion object {
        private val LOGGER = LoggerFactory.getLogger(KafkaTweetsProducer::class.java)
    }

    fun send(key: String, tweet: String) {
        LOGGER.info("Tweet message: {}", tweet)
        kafkaTemplate.send("tweets-message-topic", key, tweet)
    }

}

O consumer lê o tweet do mesmo modo que ele chega do tópico Kafka para salvar no repositório ElasticSearch:

@Component
class KafkaTweetsConsumer(
    private val elasticSearchClient: RestHighLevelClient,
    @Value("\${elasticsearch.index}")
    private val tweeterIndex: String
) {

    companion object {
        private val LOGGER = LoggerFactory.getLogger(KafkaTweetsConsumer::class.java)
    }

    @KafkaListener(topics = ["tweets-message-topic"], groupId = "simple-kotlin-tweets-message-consumer")
    fun consume(tweet: String) {
        LOGGER.info("got tweet: {}", tweet)
        val indexRequest = IndexRequest(tweeterIndex).source(tweet, XContentType.JSON)
        val indexResponse = elasticSearchClient.index(indexRequest, RequestOptions.DEFAULT)
        LOGGER.info("ElasticSearch id: {}", indexResponse.id)
    }
}

O arquivo README do repositório tem instruções para configurar e rodar o exemplo. O resultado da comunicação entre o producer e consumer pode ser visto em uma interface do Kibana:

cached image

Considerações finais

Kafka como ferramenta e tecnologia atende vários use-cases reais poderosos. É uma tecnologia válida de ser explorada para abordagem de microserviços, arquitetura event driven,
de comunicação assíncrona, processamento real time, streaming data, e vários outros cenários. Espero que esse artigo possa ser útil para uma leitura de nível iniciante.

Referencias

Asynchronous Communication; The What, The Why, And The How
Apache Kafka - Fundamentals, Use cases and Trade-Offs
Apache Kafka Documentation
What is Kafka?
What is Kafka?
Managing Asynchronous Workflows with a REST API