Multi topic IoT subscriber model

In a typical IoT model, you have many IoT sensors publishing data to IoT broker. You may want to collect data from all these IoT nodes in a database for visualization and further analytics.

I have couple of IoT nodes installed in two houses in two different cities. And I collect data from all IoT nodes in a central database.

To achieve this, I am using a raspberry pi 3 running 24×7 at home. On this raspberry I have configured influxdb/grafana and a MQTT broker.

I wrote mqtt subscriber about an year ago to receive data for one topic. But now I have improved it further to accept messages from multiple “topics”.

“topics” and other configuration parameters are configured in a json file, config.json as following.


cat /opt/tools/mqttsub/config.json
{
   "mqtt": { "broker": "127.0.0.1", "mqttuser": "username", "mqttpass": "password", "mqttport": 1883, "mqttclient": "Collector Node" },
   "topics": ["weather", "dustdensity", "whatever",....],
   "dataconn": {"dbhost": "127.0.0.1", "dbname": "dbName", "dbuser": "database user", "dbpass": "database password"},
   "BaseLocation": {"city": "Gurgaon"}
}

Here is the code that reads above config.json and then keeps listing on listed “topics”.

This code accepts payload from IoT publishers in format {“tags”: {“tag1”: tag1value, “tag2”: tag2value….}, “fields”:{“field1” field1value, “field2”: field2value,…..}}

MQTT subscriber appends publisher “topic” as influxdb measurement in json payload. final database payloads looks like:

{‘fields’: {‘humidity’: 61.3, ‘temperature’: 23.5}, ‘tags’: {‘location’: ‘Gurgaon’}, ‘measurement’: ‘weather’}

MQTT subscriber will collect data only from the topics mentioned in config.json. No other “topic” will be processed.


cat /opt/tools/mqttsub/mqttcol.py

#!/usr/bin/python3
import paho.mqtt.client as paho
import datetime
import time
import logging
import json
import subprocess
from influxdb import InfluxDBClient

with open('/opt/tools/mqttsub/config.json') as cfg:
  cfgdata = json.load(cfg)

broker = cfgdata.get('mqtt').get('broker')
port = cfgdata.get('mqtt').get('mqttport')
username = cfgdata.get('mqtt').get('mqttuser')
password = cfgdata.get('mqtt').get('mqttpass')
topics = cfgdata.get('topics')
dbhost = cfgdata.get('dataconn').get('dbhost')
dbname = cfgdata.get('dataconn').get('dbname')
dbuser = cfgdata.get('dataconn').get('dbuser')
dbpass = cfgdata.get('dataconn').get('dbpass')

print(broker, port, username, password, topics, dbhost, dbname, dbuser, dbpass)
dbclient = InfluxDBClient(dbhost, 8086, dbuser, dbpass, dbname)

logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
handler = logging.FileHandler('/var/log/mqttcol.log')  # create a file handler
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') # create a logging format
handler.setFormatter(formatter)
logger.addHandler(handler)# add the handlers to the logger

def on_subscribe(client, userdata, mid, granted_qos):
    #print("Subscribed: "+str(mid)+" "+str(granted_qos))
    print("Waiting for message...")

def on_message(client, userdata, mqttmsg):
    print(mqttmsg.topic+" "+str(mqttmsg.qos)+" "+mqttmsg.payload.decode("utf-8"))

    if mqttmsg.topic in topics:
         dbmeasurement = mqttmsg.topic
         jsondata=json.loads(mqttmsg.payload.decode("utf-8"))
         jsondata['measurement']=dbmeasurement
         #jsondata['time']=str(datetime.datetime.now())
         payload=[jsondata]
         print(payload)
         dbclient.write_points(payload)
         logger.info(payload)

client = paho.Client()
client.username_pw_set(username, password)
client.on_subscribe = on_subscribe
client.on_message = on_message
client.connect(broker, port)
for topic in topics:
   client.subscribe(topic)

client.loop_forever()

This program will also write messages to a log file /var/log/mqttcol.log

Next, let us create systemctl service to manage start | stop | status operations on MQTT collector.

Create a file “/etc/systemd/system/mkaiot.service” like:


[Unit]
Description=MKA IoT

[Service]
Type=simple
ExecStart=/opt/tools/mqttsub/mqttcol.py

[Install]
WantedBy=multi-user.target

Set permissions

chmod 644 /etc/systemd/system/mkasiot.service

Enable service to start at boot up

systemctl enable mkaiot

Start service

systemctl start mkaiot

Let us check Status


systemctl status mkaiot
mkaiot.service - MKA IoT
   Loaded: loaded (/etc/systemd/system/mkaiot.service; enabled; vendor preset: enabled)
   Active: active (running) since Sat 2020-03-14 14:40:55 IST; 4h 38min ago
 Main PID: 373 (mqttcol.py)
    Tasks: 1 (limit: 4915)
   CGroup: /system.slice/mkaiot.service
           └─373 /usr/bin/python3 /opt/tools/mqttsub/mqttcol.py

Mar 14 18:14:07 homebox mqttcol.py[373]: weather 0 {"tags": {"location": "Gurgaon"}, "fields": {"humidity": 61.4, "temperature": 23.5}}
Mar 14 18:14:07 homebox mqttcol.py[373]: [{'fields': {'humidity': 61.4, 'temperature': 23.5}, 'tags': {'location': 'Gurgaon'}, 'measurement': 'weather'}]
Mar 14 18:14:07 homebox mqttcol.py[373]: dustdensity 0 {"tags": {"location": "Gurgaon"}, "fields": {"dust": 0.408558}}
Mar 14 18:14:07 homebox mqttcol.py[373]: [{'fields': {'dust': 0.408558}, 'tags': {'location': 'Gurgaon'}, 'measurement': 'dustdensity'}]

I hope you find this article useful.