Skip to content

Broadcasters

MQTT Broadcaster

start_processing_output(system_queue, mqtt_q)

Called by the broadcaster module to initialize a connection to the desired MQTT broker.

Source code in broadcasters/mqtt.py
def start_processing_output(system_queue, mqtt_q):
    """Called by the broadcaster module to initialize a connection to the desired MQTT broker."""

    def on_connect(client, userdata, flags, rc):
        """Callback function executed upon the successful connection to the desired broker"""
        logger.info("MQTT Client connected ({})", rc)

    def on_disconnect(client, userdata, rc):
        """Callback function executed upon disconnecting from the broker"""
        logger.info("MQTT Client disconnected ({})", rc)

    with open("mqtt_config.yaml") as f:
        config = safe_load(f)

    if not config["host"] or not config["port"]:
        raise ValueError("mqtt_config.yaml not set up")

    client = mqtt.Client()
    client.username_pw_set(config["username"], config["password"])

    if config["tls"]:
        client.tls_set()

    client.on_connect = on_connect
    client.on_disconnect = on_disconnect

    def process():
        """Contains generator which fetches all messages from the `byu_sss/output` topic on broker"""
        while True:
            try:
                client.connect(config["host"], config["port"])

                while True:
                    client.loop(timeout=0.01)

                    for item in utils.get_all_from_queue(mqtt_q):
                        client.publish(
                            "byu_sss/output",
                            payload=str(item),
                        )

                    yield

            except ConnectionRefusedError:
                logger.warning("Unable to connect to broker... trying again later.")

                for _ in range(100):
                    yield
                continue

    return process()

get_all_from_queue(queue)

Helper function that gets all items from a given queue.

Source code in broadcasters/utils.py
1
2
3
4
def get_all_from_queue(queue):
    """Helper function that gets all items from a given queue."""
    while not queue.empty():
        yield queue.get()

Last update: August 10, 2022
Created: July 13, 2022