- Описание функциональных характеристик системы
- Поддержание жизненного цикла
- Установка PromUC
- Настройка сервисов
- Обновление PromUC
- Резервное копирование и восстановление базы данных Promuc FrameWork
- Рекомендации по безопасности
- Руководство по эксплуатации и администрированию
- Module GIS
- Конфигурирование правил и сценариев
- Module Framework
- Module Interface
- Технологии проекта
- Эксплуатация КТС
- Единый сервис аутентификации
- Информация о релизах
Kafka¶
KafkaOutput
![]()
Узел Kafka отправляет сообщения брокерам Kafka. Принимает на вход сообщения любого типа. Отправит запись через производителя Kafka на сервер Kafka.
На вход принимает сообщение, забирает весь блок DATA и преобразует его в строковой операнд, который отправляется в заданный топик. ## Конфигурация:

Входные параметры¶
Адрес - Адрес брокера Kafka
(string)addrПорт - Порт брокера Kafka
(int)portТопик - Топик в кластере Kafka
(string)topic
Выходные параметры¶
DATA * KafkaDeliveryReport: * offset - смещение сообщения
или ошибки, если применимо int * partition - раздел, если
применимо int * size - полезная нагрузка сообщения
long unsigned int * Входное сообщение - предыдущее значение
блока DATA
Ошибки¶
SSL errors
Compression Codec
Queue is Full
Пример использования¶
Для любого сообщения требуется адрес отправителя и получателя. Чтобы сообщение доходило до адресата, требуется его указать. Для данного примера мы возьмём уже готовый кластер из примера в KafkaInput
Создадим новый топик outKafka со следующими настройками:

В примере будет использоваться данная схема:

Для Генератора будем использовать стандартную настройку:
{
"connection_status":"Ok"
}
Повтор: 10000 мс.
Настройки для узла X:

После сохранения цепочки в KafkaUI посмотрим на приходящие сообщения:

Далее было написано кем-то, пока стирать не буду
Topic pattern - может быть статической строкой или шаблоном, который разрешается с помощью свойств метаданных сообщения. Например, ${deviceType}
bootstrap servers - список брокеров kafka, разделенных запятой.
Automatically retry times - количество попыток повторной отправки сообщения в случае сбоя соединения.
Produces batch size - размер пакета в байтах для группировки сообщений с одним и тем же разделом.
Time to buffer locally - максимальная продолжительность локального окна буферизации в мс.
Client buffer max size - максимальный размер буфера в байтах для отправки сообщений.
Number of acknowledgments - количество подтверждений, которые узел должен получить, прежде чем рассматривать запрос завершенным.
Key serializer - по умолчанию org.apache.kafka.common.serialization.StringSerializer
Value serializer - по умолчанию org.apache.kafka.common.serialization.StringSerializer
Other properties - любые другие дополнительные свойства могут быть предоставлены для подключения брокера kafka.
Published body - узел отправит полную полезную нагрузку сообщения в тему Kafka. При необходимости цепочка правил может быть настроена на использование цепочки узлов преобразования для отправки правильной полезной нагрузки в Kafka.
Исходящее сообщение - с этого узла будет содержать свойства смещения ответа, раздела и темы в метаданных сообщения. Полезная нагрузка, тип и отправитель исходного сообщения не будут изменены.
Примечание
если вы хотите использовать Confluent cloud в качестве
брокера kafka, вам следует добавить следующие свойства:
* - **Ключ**
- **Значение**
* - ssl.endpoint.identification.algorithm
- https
* - sasl.mechanism
- PLAIN
* - sasl.jaas.config
- PLAIN
* - org.apache.kafka.common.security.plain.PlainLogin - обязателен, модель: username='CLUSTER_API_KEY', password='CLUSTER_API_SECRET';
- SASL_SSL
CLUSTER_API_KEY - ваш ключ доступа из настроек кластера.
CLUSTER_API_SECRET - секрет доступа из настроек кластера.