-
Notifications
You must be signed in to change notification settings - Fork 0
/
mqtt-switch2.py
249 lines (202 loc) · 8.36 KB
/
mqtt-switch2.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
#!/usr/bin/python3
# -*- coding: utf-8 -*-
import logging
import os
import signal
from datetime import datetime, timedelta
import time
from gpiozero import LED
import paho.mqtt.client as mqtt
from threading import Thread, Timer, Event
from flask import Flask, jsonify
from flask import render_template
from threading import Thread
def gpio_factory(factory):
if factory == 'rpigpio':
from gpiozero.pins.rpigpio import RPiGPIOFactory
return RPiGPIOFactory()
elif factory == 'pigpio':
from gpiozero.pins.pigpio import PiGPIOFactory
return PiGPIOFactory()
elif factory == 'lgpio':
from gpiozero.pins.lgpio import LGPIOFactory
return LGPIOFactory()
elif factory == 'native':
from gpiozero.pins.native import NativeFactory
return NativeFactory()
else:
raise ValueError(f"Unknown GPIO factory: {factory}")
app = Flask(__name__)
# Configuration constants updated to load from environment variables
MQTT_HOST = os.getenv('MQTT_HOST', "172.16.100.80")
MQTT_PORT = int(os.getenv('MQTT_PORT', 1883))
MQTT_KEEPALIVE_INTERVAL = int(os.getenv('MQTT_KEEPALIVE_INTERVAL', 45))
MQTT_TOPIC = os.getenv('MQTT_TOPIC', "home/kotel")
MQTT_TOPIC_SUB = os.getenv('MQTT_TOPIC_SUB', "home/kotel/set")
MQTT_TOPIC_AVAILABILITY = os.getenv('MQTT_TOPIC_AVAILABILITY', "home/kotel/availability")
AUTOMATIC_SHUTDOWN_DELAY = int(os.getenv('AUTOMATIC_SHUTDOWN_DELAY', 15))
GPIO_ID = int(os.getenv('GPIO_ID', 21))
GPIOZERO_PIN_FACTORY = os.getenv('GPIOZERO_PIN_FACTORY', 'rpigpio') # rpigpio, pigpio, lgpio, native
# Setup logging
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
logger = logging.getLogger(__name__)
# GPIO setup
switch = LED(GPIO_ID, pin_factory=gpio_factory(GPIOZERO_PIN_FACTORY))
last_call = None
manual_control = False
class ScheduledTask:
def __init__(self, interval, function):
self.interval = interval
self.function = function
self.thread = None
self.stop_event = Event()
def start(self):
self.stop_event.clear()
self.schedule()
def schedule(self):
if not self.stop_event.is_set():
self.thread = Timer(self.interval.total_seconds(), self.run)
self.thread.start()
def run(self):
self.function()
self.schedule() # Reschedule the next run
def stop(self):
self.stop_event.set()
if self.thread is not None:
self.thread.cancel()
self.thread.join()
class MQTTController:
def __init__(self):
self.mqttc = mqtt.Client(callback_api_version=mqtt.CallbackAPIVersion.VERSION2)
self.mqttc.enable_logger(logger)
self.setup_callbacks()
self.reconnect_delay = 30
def setup_callbacks(self):
self.mqttc.on_message = self.on_message
self.mqttc.on_connect = self.on_connect
self.mqttc.on_subscribe = self.on_subscribe
self.mqttc.on_disconnect = self.disconnect_callback
def connect(self):
try:
self.mqttc.connect(MQTT_HOST, MQTT_PORT, MQTT_KEEPALIVE_INTERVAL)
self.mqttc.loop_start()
except Exception as e:
logger.error(f"Could not connect to MQTT server: {e}. Retrying in {self.reconnect_delay} seconds...")
self.schedule_reconnect()
def schedule_reconnect(self):
# Use threading.Timer to schedule a reconnect attempt
Timer(self.reconnect_delay, self.connect).start()
def on_connect(self, client, userdata, flags, reason_code, properties):
self.publish_availability('online')
self.mqttc.subscribe(MQTT_TOPIC_SUB, 0)
logger.info(f"Connected to MQTT broker at {MQTT_HOST}")
def on_message(self, client, userdata, msg):
global last_call
payload = msg.payload.decode("utf-8")
logger.info(f"Received message on topic {msg.topic}: {payload}")
last_call = datetime.now()
self.handle_message(payload)
def handle_message(self, payload):
global manual_control
if manual_control:
logger.info("Manual control is enabled, skipping message handling.")
else:
state = switch.value
if payload == 'ON' and state != 1:
switch.on()
logger.info("Turning on the device.")
elif payload == 'OFF' and state == 1:
switch.off()
logger.info("Turning off the device.")
self.publish_state()
def on_subscribe(self, client, userdata, mid, reason_codes, properties):
logger.info(f"Subscribed to {MQTT_TOPIC_SUB}")
def disconnect_callback(self, client, userdata, flags, reason_code, properties):
self.publish_availability('offline')
def publish_state(self):
state = 'ON' if switch.is_lit else 'OFF'
logger.info(f"Publishing state {state} to {MQTT_TOPIC}")
self.mqttc.publish(MQTT_TOPIC, state, qos=0, retain=True)
def publish_availability(self, availability):
logger.info(f"Publishing availability {availability} to {MQTT_TOPIC_AVAILABILITY}")
self.mqttc.publish(MQTT_TOPIC_AVAILABILITY, availability, qos=0, retain=False)
def start(self):
self.connect()
self.mqttc.loop_start()
def stop(self):
self.mqttc.loop_stop()
self.publish_availability('offline')
self.mqttc.disconnect()
# Replace the timeloop jobs with instances of ScheduledTask
def scheduled_turn_off_function():
global last_call, manual_control
if manual_control:
logger.info("Manual control is enabled, skipping automatic shutdown.")
return
if last_call and (datetime.now() - last_call).seconds / 60 > AUTOMATIC_SHUTDOWN_DELAY:
logger.info("No recent activity, turning off the device.")
switch.off()
mqtt_controller.publish_state()
def send_availability_and_state_function():
mqtt_controller.publish_availability('online')
mqtt_controller.publish_state()
@app.route('/health', methods=['GET'])
def health_check():
if mqtt_controller.mqttc.is_connected():
return jsonify({'status': 'healthy'}), 200
else:
return jsonify({'status': 'unhealthy'}), 503
@app.route('/control', methods=['GET'])
def control_page():
global manual_control, last_call
state = 'ON' if switch.is_lit else 'OFF'
manual_control_state = 'ON' if manual_control else 'OFF'
return render_template('switch_control.html', state=state, manual_control=manual_control_state, last_call=last_call)
@app.route('/switch/<state>', methods=['POST'])
def change_switch(state):
if state == 'ON':
switch.on()
elif state == 'OFF':
switch.off()
else:
return jsonify({'error': 'Invalid state'}), 400
new_state = 'ON' if switch.is_lit else 'OFF'
logger.info(f"Switch is now {new_state} via web interface.")
return jsonify({'state': new_state})
@app.route('/mode/<state>', methods=['POST'])
def change_mode(state):
global manual_control
if state == 'ON':
manual_control = True
elif state == 'OFF':
manual_control = False
else:
return jsonify({'error': 'Invalid state'}), 400
manual_control_state = 'ON' if manual_control else 'OFF'
logger.info(f"Manual control is now {manual_control_state} via web interface.")
return jsonify({'manual_control': manual_control_state})
def run_flask_app():
app.run(host='0.0.0.0', port=5000)
# Modify the signal_handler function to handle the new ScheduledTask class
def signal_handler(sig, frame):
logger.info("Gracefully shutting down...")
scheduled_turn_off.stop()
send_availability_and_state.stop()
mqtt_controller.stop()
# Allow time for all threads to finish
logger.info("Waiting for background processes to terminate...")
time.sleep(2)
os._exit(0)
if __name__ == "__main__":
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
flask_thread = Thread(target=run_flask_app)
flask_thread.start()
scheduled_turn_off = ScheduledTask(timedelta(seconds=60), scheduled_turn_off_function)
send_availability_and_state = ScheduledTask(timedelta(seconds=20), send_availability_and_state_function)
scheduled_turn_off.start()
send_availability_and_state.start()
mqtt_controller = MQTTController()
mqtt_controller.start()