MQTT

MQTT_Task.cpp

image0

Система связи, построенная на MQTT, состоит из сервера-издателя, сервера-брокера и одного или нескольких клиентов. Издатель не требует каких-либо настроек по количеству или расположению подписчиков, получающих сообщения. Кроме того, подписчикам не требуется настройка на конкретного издателя. В системе может быть несколько брокеров, распространяющих сообщения.

MQTT предоставляет способ создания иерархии каналов связи — своего рода ветвь с листьями. Всякий раз, когда у издателя есть новые данные для распространения среди клиентов, сообщение сопровождается примечанием контроля доставки. Клиенты более высокого уровня могут получать каждое сообщение, в то время как клиенты более низкого уровня могут получать сообщения, относящиеся только к одному или двум базовым каналам, «ответвляющимся» в нижней части иерархии. Это облегчает обмен информацией размером от двух байт до 256 мегабайт.

Любые данные, опубликованные или полученные брокером MQTT, будут закодированы в двоичном формате, поскольку MQTT является бинарным протоколом. Это означает, что для получения исходного содержимого нужно интерпретировать сообщение.


Описание работы узла

Код инициализирует MQTT клиент с уникальным идентификатором, который генерируется случайным образом в диапазоне от 1 до 1000.

image1

Клиент подключается к MQTT брокеру, используя предоставленные: * URL хоста - адрес вашего MQTT брокера, который вы собираетесь прослушивать. host * Порт - порт, по которому ведется прослушка топика port * Пользователь - user * Пароль - password * Топик - “Подобие группы или канала в мессенджерах, на который вы подписываетесь, чтобы получать новости” - вещает тестировщик. topic

Для прослушивания всех субтопиков пишем: #

  • Qos - качество обслуживания. Протокол MQTT поддерживает 3 уровня QoS. qos

Если подключение успешно, клиент подписывается на указанный топик с заданным уровнем QoS.

В бесконечном цикле клиент ожидает сообщения от брокера. При отсутсвии подключения к топику: status = false. Если топик инициализирован, то уведомления об отрицательном статусе сообщения перестают приходить. Если соединение с брокером теряется, клиент пытается переподключиться. В случае успешного переподключения, клиент снова подписывается на топик.

{"id":1,"msg":{"status":true},"name":"<имя узла>","type":"status"}

Если не указать наименование топика, то будут прослушиваться все доступные топики.

Если возникают ошибки при парсинге или обработке сообщения, они логируются, и цикл продолжается.

При вызове деструктора, задача устанавливает флаг остановки и ждет завершения потока. Если клиент все еще подключен, он отключается, и клиент удаляется.

Используется библиотека spdlog для логирования информации, предупреждений и ошибок. Логи включают детали о подключении, получении сообщений и ошибках.

Входные данные

Принимает на вход JSON, и пытается распарсить все его key:value:

{
    "mods": [
        {
            "name": "DI 24V X1",
            "enable": 1,
            "val": [
                {
                    "name": "11",
                    "type": 0,
                    "val": 0
                },
                {
                    "name": "12",
                    "type": 0,
                    "val": 1
                },
                {
                    "name": "13",
                    "type": 0,
                    "val": 1
                },
                {
                    "name": "14",
                    "type": 0,
                    "val": 0
                }
            ]
        },
        {
            "name": "DI 24V X2",
            "enable": 1,
            "val": [
                {
                    "name": "21",
                    "type": 0,
                    "val": 0
                },
                {
                    "name": "22",
                    "type": 0,
                    "val": 1
                },
                {
                    "name": "23",
                    "type": 0,
                    "val": 1
                },
                {
                    "name": "24",
                    "type": 0,
                    "val": 0
                }
            ]
        }

    ]
}

Пример вывода блока:

Json

{
"connection_status": "Ok",
"time": 1697718856699,
"mods": [
    {
        "name": "DI 24V X1",
        "enable": 1,
        "val": [
            {
                "name": "11",
                "type": 0,
                "val": 0
            },
            {
                "name": "12",
                "type": 0,
                "val": 1
            },
            {
                "name": "13",
                "type": 0,
                "val": 1
            },
            {
                "name": "14",
                "type": 0,
                "val": 0
            }
        ]
    },
    {
        "name": "DI 24V X2",
        "enable": 1,
        "val": [
            {
                "name": "21",
                "type": 0,
                "val": 0
            },
            {
                "name": "22",
                "type": 0,
                "val": 1
            },
            {
                "name": "23",
                "type": 0,
                "val": 1
            },
            {
                "name": "24",
                "type": 0,
                "val": 0
            }
        ]
    }

]
}

Пример использования

Для тестов используется контейнер, инициализрованный в compose файле с статическим ip: 172.20.0.5

Соберём цепочку или скачаем готовую test_mqtt.yaml

image2

Настроим наш узел чтения топика MQTT:

image3

При сохранении цепочки уведомления об отрицательном статусе подключения должны перестать приходить.

Для тестов имеется файл codes/producer_mqtt.py в папке с документацией, все переменные уже настроены, следует просто запустить файл и в топик будут поступать тестовые сообщения с передеодичностью в 1 секунду.