mosquitto_ex: mosq_feed_ex.py

File mosq_feed_ex.py, 6.8 KB (added by krit, 3 years ago)
Line 
1#!/usr/bin/env python3
2
3# -*- coding: utf-8 -*-
4
5import paho.mqtt.client as paho
6from subprocess import Popen, PIPE
7import os
8import json
9import logging
10import queue
11import threading
12
13############################################
14#    Initial global variable               #
15############################################
16
17q = queue.Queue()
18num_threads = 3
19ping_warn = 800
20ping_crit = 900
21port_warn = 1
22port_crit = 2
23
24############################################
25#    Initial logging to file               #
26############################################
27# Gets or creates a logger
28logger = logging.getLogger("mosq_feed_actDas_7_queue.py")
29# set log level
30logger.setLevel(logging.DEBUG)
31# define file handler and set formatter
32file_handler = logging.FileHandler('/tmp/mosq_feed_actDas_7_queue.log')
33formatter = logging.Formatter(
34    '%(asctime)s : %(levelname)s : %(name)s : %(message)s')
35file_handler.setFormatter(formatter)
36# add file handler to logger
37logger.addHandler(file_handler)
38logger.info("log start")
39
40############################################
41#    function utility                      #
42############################################
43
44def is_json(myjson):
45    print("is_json()")
46    try:
47        json_object = json.loads(myjson)
48    #except ValueError as e:
49    except:
50        print("is_json() false")
51        return False
52    #print("is_json() True")
53    return True
54
55def do_stuff(q, thread_no):
56    while True:
57        task = q.get()
58        #print( "*********** g.get()=", q.get() )
59        process_id(task)
60        q.task_done()
61        #print(f'Thread #{thread_no} is doing task #{task} in the queue.')
62        print(f'Thread #{thread_no} is doing task in the queue.')
63
64def on_subscribe(client, userdata, mid, granted_qos):
65    print("Subscribed: "+str(mid)+" "+str(granted_qos))
66
67def check_cmd2( id, service, value ):
68    print("check_cmd2()")
69    print("id = {}".format(id))
70    #logger.info("check_cmd2()")
71    myCmd = ""
72
73    #print("delay time=%d" % (delayT))
74    if ( service == "pass_ping" ):
75         #print ("pass_ping=%s" % (value) )
76         code_status = check_ping( value )
77         myCmd = "./send_gearman_temperature.sh %s pass_ping '%s msec. | pass_ping=%s;%s;%s;0;100' %d" % (id, value, value, ping_warn, ping_crit, code_status)
78         #output = os.popen(myCmd).read()
79
80    elif( service == "rssi" ):
81         #print ("Temp=%s" % (value) )
82         code_status = check_dist( value )
83         #myCmd = "./send_gearman_temperature.sh %s Temp '%s C. | Temp=%s;%s;%s;0;100' %d" % (id, value, value, delay_warn, delay_crit, code_status)
84         myCmd = "curl -X POST -H \"Content-Type: application/json\" \"http://172.19.0.4:8080\" -d '{\"deviceID\":\"%s\", \"%s\":%d}' " %  (device, service, value)
85         print(myCmd)
86         #output = os.popen(myCmd).read()
87    elif( id.find("BU") == 0 or id.find("RU") == 0):
88         myCmd = ""
89         myCmd = "./send_gearman_temperature.sh \"%s\" \"%s\"  \"%s | status=%s;%s;%s;0;2\" %d" % (id, service, value, value, port_warn, port_crit, 0)
90         print(myCmd)
91         #output = os.popen(myCmd).read()
92    else:
93         print("service not match")
94    logger.info(myCmd)
95
96
97def verify_msg(raw_msg):
98    try:
99        id = raw_msg.topic.split("/")
100        print(id[-1])
101    except:
102        print("cannot split str")
103    print("------------")
104    text = raw_msg.payload.decode("utf-8")
105    #print(text)
106    logger.info(text)
107    #logger.info(text)
108    if (is_json(text) == False):
109        print("not json")
110        logger.info("Not JASON")
111        return -1
112    else:
113        print("msg is json")
114        logger.info("MSG is JSON")
115        # dumps the json object into an element
116        #json_str = json.dumps(text)
117        # print(json_str)
118        # load the json to a string
119        good_msg = json.loads(text)
120        return good_msg
121
122
123
124
125def process_id(resp):
126    print("process_id() ")
127    resp = verify_msg(resp)
128    if (resp == -1):
129        return -1
130    try:
131        key = list(resp)
132    except:
133        print("list(resp) error resp=", resp)
134        return -1
135    print( "key = {}".format(key) )
136    if ( list(resp)[0] == "module" ):
137         serviceName = list(resp)[3]
138         print( "serviceName = {}".format(serviceName) )
139    else:
140         print("no module service Name in msg resp")
141         return -1
142    print ("serial_number:%s siteID:%s serviceName:%s value:%s " % ( resp['serial_number'], resp["site_id"], serviceName, resp[serviceName]) )
143    servName = resp[serviceName]
144    #print ("\n ***  serviceName = {}".format(servName) )
145    #print ("\n ***** type servName ", type(servName ) )
146    key = list(servName)
147    #print( "key = {}".format(key) )
148    #print( "key[0] = {}\n key[1] = {}\n".format( key[0], key[1] ) )
149    #for msg_json in resp:
150    #    print( "user = {}".format(msg_json ))
151    #print("module = {}".format( resp["module"] ))
152    #print("serial_number = {}".format( resp["serial_number"] ))
153    serial_number = resp["serial_number"]
154    #print("type = {} serial_number = {}".format( type(serial_number),  serial_number) )
155    #print("site_id = {}".format( resp["site_id"] ))
156    #print("digital_service = {}".format( resp["digital_service"] ))
157    #print("analog_service = {}".format( resp["analog_service"] ))
158
159    digital_service = resp["digital_service"]
160    analog_service = resp["analog_service"]
161    #print("digital_service[0] = {} \n digital_service[1] = {} ".format(digital_service[0], digital_service[1]) )
162    for msg in digital_service:
163        #print(msg)
164        #print(type(msg))
165        print( msg["service"], msg["value"], msg["filter"] )
166        check_cmd2( resp["serial_number"], msg["service"], msg["value"]  )
167        #is_json(msg2)
168
169    for msg in analog_service:
170        #print(msg)
171        print( msg["service"], msg["value"], msg["filter"] )
172        check_cmd2( resp["serial_number"], msg["service"], msg["value"]  )
173
174
175
176def on_message(client, userdata, msg):
177    global q
178    q.put(msg) #put messages on queue
179    print("put msg in Queue")
180    #process_id(resp)
181    print("------------")
182    q.join()
183
184############################################
185#    Create thread worker waiting to       #
186#    read the msg in queue                 #
187############################################
188for i in range(num_threads):
189  print("worker start ....")
190  worker = threading.Thread(target=do_stuff, args=(q, i,), daemon=True)
191  worker.start()
192
193
194################################################
195#             MQTT Broker config               #
196################################################
197user = 'xx:xxxxx'
198password = 'yyyyyyy'
199keepAlive = 3
200client = paho.Client(client_id="id20210903", clean_session=False)
201client.username_pw_set(user, password=password)  # set username and password
202client.on_subscribe = on_subscribe
203client.on_message = on_message
204client.connect("mq1.mycompany.com", 1883, keepAlive, bind_address = "" )
205client.subscribe("/mycompany/ActDAS/#", qos=1)
206client.loop_forever()