Ingest Kafka messages into EFK stack

In the article we will see how can we take deep dive into messages going on a Kafka bus. For eg if you have a large enterprise grade solution, and want to capture a copy of messages going to and fro within systems without any additional development.

To achieve this, we will use fluentd to hook onto Kafka, read messages and insert them into elasticsearch database. Later, you can visualize these messages Kibana. You can also build nice dashboards of bar charts, donuts, pie charts out of collected data.

Here goes the Fluentd Kafka plugin installation:

sudo fluent-gem install fluent-plugin-kafka

Fluentd kafka configuration:

<source>
  @type kafka
  brokers kafkahost:9092
  topics MyTopic
  format json
  message_key message
  add_prefix test.kafka
  #add_suffix <tag suffix (Optional)>

  #Optionally, you can manage topic offset by using zookeeper
  #offset_zookeeper    <zookeer node list (<zookeeper1_host>:<zookeeper1_port>,<zookeeper2_host>:<zookeeper2_port>,..)>
  #offset_zk_root_node <offset path in zookeeper> default => '/fluent-plugin-kafka'

  #ruby-kafka consumer options
  #max_bytes     (integer) :default => nil (Use default of ruby-kafka)
  #max_wait_time (integer) :default => nil (Use default of ruby-kafka)
  #min_bytes     (integer) :default => nil (Use default of ruby-kafka)
</source>

Fluentd config to ingest messages into elasticsearch:

<match test.**>
  @type copy
  <store>
    @type elasticsearch
    include_tag_key true
    host localhost
    port 9200
    logstash_format true
    logstash_prefix kafkamessages
  </store>
</match>

With this Fluentd configuration, fluentd will read message exchanges on “MyTopic” and ingest them into elasticsearch with “test.kafka” tag