Kafka
=====
``KafkaOutput``
|image0|
Узел Kafka отправляет сообщения брокерам Kafka. Принимает на вход
сообщения любого типа. Отправит запись через производителя Kafka на
сервер Kafka.
На вход принимает сообщение, забирает весь блок **DATA** и преобразует
его в строковой операнд, который отправляется в заданный топик. ##
Конфигурация:
|image1|
Входные параметры
^^^^^^^^^^^^^^^^^
- **Адрес** - Адрес брокера Kafka ``(string)addr``
- **Порт** - Порт брокера Kafka ``(int)port``
- **Топик** - Топик в кластере Kafka ``(string)topic``
Выходные параметры
^^^^^^^^^^^^^^^^^^
**DATA** \* **KafkaDeliveryReport**: \* **offset** - смещение сообщения
или ошибки, если применимо ``int`` \* **partition** - раздел, если
применимо ``int`` \* **size** - полезная нагрузка сообщения
``long unsigned int`` \* **Входное сообщение** - предыдущее значение
блока DATA
Ошибки
^^^^^^
- SSL errors
- Compression Codec
- Queue is Full
Пример использования
--------------------
Для любого сообщения требуется адрес отправителя и получателя. Чтобы
сообщение доходило до адресата, требуется его указать. Для данного
примера мы возьмём уже готовый кластер из примера в
`KafkaInput <../InputNodes/Kafka.md>`__
Создадим новый топик **outKafka** со следующими настройками:
|image2|
В примере будет использоваться данная схема:
|image3|
Для Генератора будем использовать стандартную настройку:
.. code:: json
{
"connection_status":"Ok"
}
- Повтор: 10000 мс.
Настройки для **узла X**:
|image4|
После сохранения цепочки в KafkaUI посмотрим на приходящие сообщения:
|image5|
.. raw:: html
..
Далее было написано кем-то, пока стирать не буду
- **Topic pattern** - может быть статической строкой или шаблоном,
который разрешается с помощью свойств метаданных сообщения. Например,
**${deviceType}**
- **bootstrap servers** - список брокеров kafka, разделенных запятой.
- **Automatically retry times** - количество попыток повторной отправки
сообщения в случае сбоя соединения.
- **Produces batch size** - размер пакета в байтах для группировки
сообщений с одним и тем же разделом.
- **Time to buffer locally** - максимальная продолжительность
локального окна буферизации в мс.
- **Client buffer max size** - максимальный размер буфера в байтах для
отправки сообщений.
- **Number of acknowledgments** - количество подтверждений, которые
узел должен получить, прежде чем рассматривать запрос завершенным.
- **Key serializer** - по умолчанию
**org.apache.kafka.common.serialization.StringSerializer**
- **Value serializer** - по умолчанию
**org.apache.kafka.common.serialization.StringSerializer**
- **Other properties** - любые другие дополнительные свойства могут
быть предоставлены для подключения брокера kafka.
**Published body** - узел отправит полную полезную нагрузку сообщения в
тему Kafka. При необходимости цепочка правил может быть настроена на
использование цепочки узлов преобразования для отправки правильной
полезной нагрузки в Kafka.
**Исходящее сообщение** - с этого узла будет содержать свойства смещения
ответа, раздела и темы в метаданных сообщения. Полезная нагрузка, тип и
отправитель исходного сообщения не будут изменены.
.. note:: если вы хотите использовать **Confluent cloud** в качестве
брокера kafka, вам следует добавить следующие свойства:
.. list-table:: Таблица параметров :widths: 80 20 :header-rows: 1
::
* - **Ключ**
- **Значение**
* - ssl.endpoint.identification.algorithm
- https
* - sasl.mechanism
- PLAIN
* - sasl.jaas.config
- PLAIN
* - org.apache.kafka.common.security.plain.PlainLogin - обязателен, модель: username='CLUSTER_API_KEY', password='CLUSTER_API_SECRET';
- SASL_SSL
**CLUSTER_API_KEY** - ваш ключ доступа из настроек кластера.
**CLUSTER_API_SECRET** - секрет доступа из настроек кластера.
.. |image0| image:: ../../_img/kafka_out_icon.png
.. |image1| image:: ../../_img/kafka_out_settings.png
.. |image2| image:: ../../_img/kafka_create_topic.png
.. |image3| image:: ../../_img/kafka_out_chain.png
.. |image4| image:: ../../_img/kafka_out_settings_example.png
.. |image5| image:: ../../_img/kafka_out_msg_get.png