Python based CLI automated test for Linux and network

On big networks or data centres where you have run variety of command line tests in minutes, then this solution can help you.

  • Project has got python code, a json config and test cases CSV files (you can write as many as needed).
  • The code logs into machines/devices using ssh with predefined credentials in json config.
  • In parallel how many machines/devices will be picked in also defined in json config.
  • Test cases file is a CSV file containing command to be executed, results to be matched with and match logic.

Here is the json config.

{
   "maxhosts": 5,
   "credsets": {
                 "testbot": { "username": "testbot", "password": "xxxx", "enpass": "xxxxx" },
                 "testbot2": { "username": "user1", "password": "xxxxx", "enpass": "xxxxxx" },
                 "testbot1": { "username": "testbot", "password": "xxxxxx" }
             },
   "hosts": {
               "192.168.1.1": {"type": "linux", "credset": "testbot", "testfile": "/home/manish/test1.csv" },
               "192.168.1.2": {"type": "linux", "credset": "testbot2", "testfile": "/home/manish/test1.csv" },
               "192.168.1.3": {"type": "linux", "credset": "testbot1", "port": 22, "testfile": "/home/manish/test2.csv" }
            },
   "results": {
                 "output": "/home/manish/results.csv",
                 "enable": true
              },
   "logging": {
                "logfile": "/home/manish/logs/test.logs",
                "remote": {
                             "enable": true,
                             "LoggerName": "MKALogs",
                             "sysloghosts": {
                                             "lab1": {
                                             "sysloghost": "192.168.1.100",
                                             "port": 514
                                             },
                                             "lab2": {
                                             "sysloghost": "192.168.1.101",
                                             "port": 514
                                             }
                             }
                }
              },
   "webhook": {
                  "enable": true,
                  "auth": {
                          "auth1": {
                                     "username": "user1",
                                     "password": "pass1"
                           },
                          "auth2": {
                                     "username": "user2",
                                     "password": "pass2"
                          }
                  },

                  "hooks": {
                              "hook1": {
                                          "endpoint": "http://192.168.1.102:8080",
                                          "auth": "auth1"
                              },
                              "hook2": {
                                          "endpoint": "http://192.168.1.103:8090",
                                          "auth": "auth2"
                              }
                  }
              }
}

Python code:

#!/usr/local/bin/python3
from netmiko import ConnectHandler
from multiprocessing import Pool
import os
import json
from time import strftime, localtime
import sys
import csv
import base64
import logging
import logging.handlers

try:
   params = sys.argv[1]
   param, value = params.split('=')
   if param != "--config":
      sys.exit()
   playconf = value
except:
   print("Usage: testbot.py --config=<json config>")
   sys.exit()

with open(playconf) as cfg:
  cfgdata = json.load(cfg)

def RemoteSyslog(LogMessage):
   sysloghosts = cfgdata.get('logging').get('remote').get('sysloghosts')
   LoggerName = cfgdata.get('logging').get('remote').get('LoggerName')
   #print("In Syslog", LogMessage)
   Logger = logging.getLogger(LoggerName)
   Logger.setLevel(logging.INFO)
   for syslogobj in sysloghosts.keys():
      print(syslogobj)
      sysloghost = cfgdata.get('logging').get('remote').get('sysloghosts').get(syslogobj).get('sysloghost')
      port = cfgdata.get('logging').get('remote').get('sysloghosts').get(syslogobj).get('port')
      handler = logging.handlers.SysLogHandler(address = (sysloghost,port))
      Logger.addHandler(handler)
      Logger.info(LogMessage)

def CommitLogs(LogMessage):
    logfile = cfgdata.get('logging').get('logfile')
    try:
         fopen = open(logfile, "a")
         try:
            fopen.write(LogMessage+"\n")
            fopen.close()
         except:
            print("Failed to write ",LogMessage)
         return
    except:
         print("failed to open file", logfile)
    return

def CommitResults(message):
    resultfile = cfgdata.get('results').get('output')
    try:
         fopen = open(resultfile, "a")
         try:
            fopen.write(message+"\n")
            fopen.close()
         except:
            print("Failed to write ",message)
         return
    except:
         print("failed to open file", resultfile)
    return

def base64encode(message):
    message_bytes = message.encode("ascii")
    base64_bytes = base64.b64encode(message_bytes)
    base64_string = base64_bytes.decode("ascii")
    return(base64_string)

def base64decode(base64_string):
    base64_bytes = base64_string.encode("ascii")
    message_bytes = base64.b64decode(base64_bytes)
    message = message_bytes.decode("ascii")
    return(message)

def compare(output, expected_output, matchLogic):
   print(expected_output, matchLogic)
   if matchLogic == "exact":
     if expected_output == output:
        #print(output, expected_output, "OK")
        #output = base64encode(output)
        message = "{}, {}, {}".format(expected_output, output, "Pass")
     else:
        #print(output, expected_output, "Not OK")
        #output = base64encode(output)
        message = "{}, {}, {}".format(expected_output, output, "Fail")
     return(message)

   if matchLogic == "exists":
     if expected_output in output:
        #print(output, expected_output, "OK")
        output = base64encode(output)
        message = "{}, {}, {}".format(expected_output, output, "Pass")
     else:
        #print(output, expected_output, "Not OK")
        output = base64encode(output)
        message = "{}, {}, {}".format(expected_output, output, "Fail")
     return(message)

   if matchLogic == "not_exists":
     if expected_output not in output:
        #print(output, expected_output, "OK")
        output = base64encode(output)
        message = "{}, {}, {}".format(expected_output, output, "Pass")
     else:
        #print(output, expected_output, "Not OK")
        output = base64encode(output)
        message = "{}, {}, {}".format(expected_output, output, "Fail")
     return(message)

   if matchLogic == "min":
     if float(expected_output) <= float(output):
        message = "{}, {}, {}".format(expected_output, output, "Pass")
     else:
        message = "{}, {}, {}".format(expected_output, output, "Fail")
     return(message)

   if matchLogic == "max":
     if float(expected_output) >= float(output):
        message = "{}, {}, {}".format(expected_output, output, "Pass")
     else:
        message = "{}, {}, {}".format(expected_output, output, "Fail")
     return(message)

   if matchLogic == "record":
     output = base64encode(output)
     message = "{}, {}, {}".format(expected_output, output, "Check")
     return(message)

def injectcmds(device, testfile):
   try:
      net_connect = ConnectHandler(**device)
      try:
           LogMessage = strftime("%Y-%m-%d %H:%M:%S", localtime())+" "+device.get('host')+" Logged in..."
           print(LogMessage)
           CommitLogs(LogMessage)
           if device.get('secret') is not None:
               net_connect.enable()
               LogMessage = strftime("%Y-%m-%d %H:%M:%S", localtime())+" "+device.get('host')+" Enabled Elevated access...."
               print(LogMessage)
               CommitLogs(LogMessage)
           sshstatus = 0
      except:
           LogMessage = strftime("%Y-%m-%d %H:%M:%S", localtime())+" "+device.get('host')+";Cannot gain elevated access...."
           print(LogMessage)
           CommitLogs(LogMessage)
           sshstatus = -1

   except:
      LogMessage = strftime("%Y-%m-%d %H:%M:%S", localtime())+" "+device.get('host')+";SSH failed as "+device.get('username')
      print(LogMessage)
      CommitLogs(LogMessage)
      sshstatus = -1

   if (sshstatus == 0):
     print(testfile)
     with open(testfile, 'r') as testcases:
       tcobj = csv.reader(testcases, delimiter=',')
       for tc in tcobj:
         cmd, delayfac=tc[0], int(tc[3])
         #print(cmd)
         try:
            status = net_connect.send_command(cmd, delay_factor=delayfac)
            LogMessage = strftime("%Y-%m-%d %H:%M:%S", localtime())+" "+device.get('host')+" Command:"+cmd+"\nOutput:"+status
            compresult = compare(status, tc[1].strip(), tc[2].strip())
            #print(compresult)
            resultrec = "{},{},{},{}, {}".format(device.get('host'),tc[4],tc[0],tc[2],compresult)
            #print(resultrec)
            #print(LogMessage)
            if cfgdata.get('results').get('enable') == True:
               CommitResults(resultrec)
            if cfgdata.get('logging').get('remote').get('enable') == True:
               RemoteSyslog(resultrec)
            CommitLogs(LogMessage)
         except:
            LogMessage = strftime("%Y-%m-%d %H:%M:%S", localtime())+" "+device.get('host')+" "+cmd+" command failed"
            print(LogMessage)
            CommitLogs(LogMessage)

def hostcmd(host):
  hostdata = cfgdata.get('hosts').get(host)
  type = hostdata.get('type')
  testfile = hostdata.get('testfile')
  port = hostdata.get('port')
  if port is None:
    port = 22
  credset = hostdata.get('credset')

  credparams = cfgdata.get('credsets').get(credset).keys()
  username = cfgdata.get('credsets').get(credset).get("username")
  password = cfgdata.get('credsets').get(credset).get("password")
  if "enpass" in credparams:
     enpass = cfgdata.get('credsets').get(credset).get("enpass")
  else:
     enpass = None

#  print("Enpass", host, enpass)
  if enpass is None:
      device = {
         'device_type': type,
         'host': host,
         'port': port,
         'username': username,
         'password': password,
      }
  else:
      device = {
         'device_type': type,
         'host': host,
         'port': port,
         'username': username,
         'password': password,
         'secret': enpass,
      }
  injectcmds(device, testfile)

if __name__ == "__main__":
    hostlist = list(cfgdata.get('hosts').keys())
    maxhosts = cfgdata.get('maxhosts')
    with Pool(maxhosts) as p:
      #print(p, hostlist)
      p.map(hostcmd, hostlist)

Sample test cases:

"1"; "ls -l /tmp/test*"; "nso"; "exact"; 2; "tag1"
"2"; "uptime"; "average"; "exists"; 1; "tag2"
"3"; "cat /etc/redhat-release"; "8"; "not_exists"; 3; "tag2"

Test Results:

"1"; "192.168.1.3"; "tag1"; "ls -l /tmp/test*"; "exact"; "nso"; "-rw-rw-r-- 1 marya marya 1301 Aug 17 15:52 /tmp/test.txt"; "Fail"
"2"; "10.91.140.108"; "tag2"; "uptime"; "exists"; "average"; " 10:55:56 up 68 days, 23:56, 3 users, load average: 0.00, 0.01, 0.05"; "Pass"
"3"; "10.91.140.108"; "tag2"; "cat /etc/redhat-release"; "not_exists"; "8"; "Red Hat Enterprise Linux Server release 7.9 (Maipo)"; "Pass"

Trace logs:

2023-08-18 10:55:52 192.168.1.2;SSH failed as testbot
2023-08-18 10:55:54 192.168.1.3 Logged in...
2023-08-18 10:55:55 192.168.1.3 Command: ls -l /tmp/test*
Output:-rw-rw-r-- 1 marya marya 1301 Aug 17 15:52 /tmp/test.txt
2023-08-18 10:55:56 192.168.1.3 Command: uptime
Output: 10:55:56 up 68 days, 23:56,  3 users,  load average: 0.00, 0.01, 0.05
2023-08-18 10:55:58 192.168.1.3 Command: cat /etc/redhat-release
Output:Red Hat Enterprise Linux Server release 7.9 (Maipo)