Receive and process logs using Fluentd and Python

In normal use cases, we just have to receive logs from multiple hosts or devices, or have to forward logs from one syslog collector to another to further store them in filesystem or elastic like databases.

In this case, I will discuss a scenario, where the ask is to filter logs by looking at the content in log payload. You can further process these logs to do some transformation to the original logs and much more.

Idea is to get line by line log stream in control of a python code to process logs.

Here goes the fluentd td-agent.conf config to receive stream on port 5140 port and then input log stream to a python code, test.py.


<source>
  @type syslog
  port 5140
  bind 0.0.0.0
  protocol_type udp
  tag mka.logs
  <parse>
    message_format auto
  </parse>
</source>

<match mka.**>
  @type exec
  command /opt/poc/test.py
  <buffer>
    @type file
    path /opt/poc/test.buff
    flush_interval 20s
  </buffer>
  <format>
    @type single_value
  </format>
</match>

Here the sample python code. This code just picks log stream line by line and writes log to a file. You can easily extend this to apply pattern matches and do other processing on log stream.

def CommitLogs(LogMessage):
    try:
         fopen = open("/tmp/messages.log", "a")
         try:
            fopen.write(LogMessage+"\n")
            fopen.close()
         except:
            print("Failed to write ",LogMessage)
         return
    except:
         print("failed to open file")
    return

inputF = open(sys.argv[-1])
for line in inputF:
    CommitLogs(line)