Kafka ===== ``KafkaOutput`` |image0| Узел Kafka отправляет сообщения брокерам Kafka. Принимает на вход сообщения любого типа. Отправит запись через производителя Kafka на сервер Kafka. На вход принимает сообщение, забирает весь блок **DATA** и преобразует его в строковой операнд, который отправляется в заданный топик. ## Конфигурация: |image1| Входные параметры ^^^^^^^^^^^^^^^^^ - **Адрес** - Адрес брокера Kafka ``(string)addr`` - **Порт** - Порт брокера Kafka ``(int)port`` - **Топик** - Топик в кластере Kafka ``(string)topic`` Выходные параметры ^^^^^^^^^^^^^^^^^^ **DATA** \* **KafkaDeliveryReport**: \* **offset** - смещение сообщения или ошибки, если применимо ``int`` \* **partition** - раздел, если применимо ``int`` \* **size** - полезная нагрузка сообщения ``long unsigned int`` \* **Входное сообщение** - предыдущее значение блока DATA Ошибки ^^^^^^ - SSL errors - Compression Codec - Queue is Full Пример использования -------------------- Для любого сообщения требуется адрес отправителя и получателя. Чтобы сообщение доходило до адресата, требуется его указать. Для данного примера мы возьмём уже готовый кластер из примера в `KafkaInput <../InputNodes/Kafka.md>`__ Создадим новый топик **outKafka** со следующими настройками: |image2| В примере будет использоваться данная схема: |image3| Для Генератора будем использовать стандартную настройку: .. code:: json { "connection_status":"Ok" } - Повтор: 10000 мс. Настройки для **узла X**: |image4| После сохранения цепочки в KafkaUI посмотрим на приходящие сообщения: |image5| .. raw:: html
.. Далее было написано кем-то, пока стирать не буду - **Topic pattern** - может быть статической строкой или шаблоном, который разрешается с помощью свойств метаданных сообщения. Например, **${deviceType}** - **bootstrap servers** - список брокеров kafka, разделенных запятой. - **Automatically retry times** - количество попыток повторной отправки сообщения в случае сбоя соединения. - **Produces batch size** - размер пакета в байтах для группировки сообщений с одним и тем же разделом. - **Time to buffer locally** - максимальная продолжительность локального окна буферизации в мс. - **Client buffer max size** - максимальный размер буфера в байтах для отправки сообщений. - **Number of acknowledgments** - количество подтверждений, которые узел должен получить, прежде чем рассматривать запрос завершенным. - **Key serializer** - по умолчанию **org.apache.kafka.common.serialization.StringSerializer** - **Value serializer** - по умолчанию **org.apache.kafka.common.serialization.StringSerializer** - **Other properties** - любые другие дополнительные свойства могут быть предоставлены для подключения брокера kafka. **Published body** - узел отправит полную полезную нагрузку сообщения в тему Kafka. При необходимости цепочка правил может быть настроена на использование цепочки узлов преобразования для отправки правильной полезной нагрузки в Kafka. **Исходящее сообщение** - с этого узла будет содержать свойства смещения ответа, раздела и темы в метаданных сообщения. Полезная нагрузка, тип и отправитель исходного сообщения не будут изменены. .. note:: если вы хотите использовать **Confluent cloud** в качестве брокера kafka, вам следует добавить следующие свойства: .. list-table:: Таблица параметров :widths: 80 20 :header-rows: 1 :: * - **Ключ** - **Значение** * - ssl.endpoint.identification.algorithm - https * - sasl.mechanism - PLAIN * - sasl.jaas.config - PLAIN * - org.apache.kafka.common.security.plain.PlainLogin - обязателен, модель: username='CLUSTER_API_KEY', password='CLUSTER_API_SECRET'; - SASL_SSL **CLUSTER_API_KEY** - ваш ключ доступа из настроек кластера. **CLUSTER_API_SECRET** - секрет доступа из настроек кластера. .. |image0| image:: ../../_img/kafka_out_icon.png .. |image1| image:: ../../_img/kafka_out_settings.png .. |image2| image:: ../../_img/kafka_create_topic.png .. |image3| image:: ../../_img/kafka_out_chain.png .. |image4| image:: ../../_img/kafka_out_settings_example.png .. |image5| image:: ../../_img/kafka_out_msg_get.png