defstart_processing_output(system_queue,mqtt_q):"""Called by the broadcaster module to initialize a connection to the desired MQTT broker."""defon_connect(client,userdata,flags,rc):"""Callback function executed upon the successful connection to the desired broker"""logger.info("MQTT Client connected ({})",rc)defon_disconnect(client,userdata,rc):"""Callback function executed upon disconnecting from the broker"""logger.info("MQTT Client disconnected ({})",rc)withopen("mqtt_config.yaml")asf:config=safe_load(f)ifnotconfig["host"]ornotconfig["port"]:raiseValueError("mqtt_config.yaml not set up")client=mqtt.Client()client.username_pw_set(config["username"],config["password"])ifconfig["tls"]:client.tls_set()client.on_connect=on_connectclient.on_disconnect=on_disconnectdefprocess():"""Contains generator which fetches all messages from the `byu_sss/output` topic on broker"""whileTrue:try:client.connect(config["host"],config["port"])whileTrue:client.loop(timeout=0.01)foriteminutils.get_all_from_queue(mqtt_q):client.publish("byu_sss/output",payload=str(item),)yieldexceptConnectionRefusedError:logger.warning("Unable to connect to broker... trying again later.")for_inrange(100):yieldcontinuereturnprocess()
get_all_from_queue(queue)
Helper function that gets all items from a given queue.