Skip to content

Commit

Permalink
"Fixing" reconnections using loop_forever and adding a proper logging…
Browse files Browse the repository at this point in the history
… setup
  • Loading branch information
freol35241 committed Dec 3, 2021
1 parent 877c95d commit 8367448
Showing 1 changed file with 46 additions and 6 deletions.
52 changes: 46 additions & 6 deletions main.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
"""Main entrypoint for this application"""
import logging
import warnings
from threading import Thread

from time import sleep
from environs import Env
from paho.mqtt.client import Client, MQTTv5
Expand Down Expand Up @@ -31,6 +35,14 @@
# Bridging config
MQTT_TOPICS = env.list("MQTT_TOPICS", [])

LOG_LEVEL = env.log_level("LOG_LEVEL", logging.WARNING)

# Setup logger
logging.basicConfig(level=LOG_LEVEL)
logging.captureWarnings(True)
warnings.filterwarnings("once")
LOGGER = logging.getLogger("crowsnest-bridge-mqtt")

# Source setup
source = Client(
client_id=MQTT_SOURCE_CLIENT_ID, transport=MQTT_SOURCE_TRANSPORT, protocol=MQTTv5
Expand All @@ -56,7 +68,7 @@
retainHandling=SubscribeOptions.RETAIN_SEND_ON_SUBSCRIBE,
)


# on-message callbacks
def publish_to_source(client, userdata, message): # pylint: disable=unused-argument
"""Publish to source broker"""
source.publish(message.topic, message.payload, message.qos, message.retain)
Expand All @@ -72,26 +84,54 @@ def publish_to_remote(client, userdata, message): # pylint: disable=unused-argu

source.on_message = publish_to_remote


## on-connect callback
def on_connect(
client, userdata, flags, reason_code, properties=None
): # pylint: disable=unused-argument
"""Subscribe on connect"""
if reason_code != 0:
LOGGER.error(
"Connection failed to %s with reason code: %s", client, reason_code
)
return

for topic in MQTT_TOPICS:
client.subscribe(topic, options=SUBSCRIBE_OPTIONS, properties=None)


source.on_connect = on_connect
remote.on_connect = on_connect

## on-disconnect callback
def on_disconnect(
client, userdata, flags, reason_code, properties=None
): # pylint: disable=unused-argument
"""Subscribe on connect"""
if reason_code != 0:
LOGGER.error("Disconnected from %s with reason code: %s", client, reason_code)


source.on_disconnect = on_disconnect
remote.on_disconnect = on_disconnect


# Logging
source.enable_logger(logging.getLogger("crowsnest-bridge-mqtt-SOURCE"))
remote.enable_logger(logging.getLogger("crowsnest-bridge-mqtt-REMOTE"))

# Do connection
remote.connect(MQTT_REMOTE_HOST, MQTT_REMOTE_PORT)
source.connect(MQTT_SOURCE_HOST, MQTT_SOURCE_PORT)

# Start loops
remote.loop_start()
source.loop_start()

# Just idle, this service is data-driven
# Loop_forever handles reconnections automatically, lets use that
source_thread = Thread(target=source.loop_forever)
source_thread.daemon = True
source_thread.start()

remote_thread = Thread(target=remote.loop_forever)
remote_thread.daemon = True
remote_thread.start()

while True:
sleep(1)

0 comments on commit 8367448

Please sign in to comment.