Kafka

KafkaOutput

image0

Узел Kafka отправляет сообщения брокерам Kafka. Принимает на вход сообщения любого типа. Отправит запись через производителя Kafka на сервер Kafka.

На вход принимает сообщение, забирает весь блок DATA и преобразует его в строковой операнд, который отправляется в заданный топик. ## Конфигурация:

image1

Входные параметры

  • Адрес - Адрес брокера Kafka (string)addr

  • Порт - Порт брокера Kafka (int)port

  • Топик - Топик в кластере Kafka (string)topic

Выходные параметры

DATA * KafkaDeliveryReport: * offset - смещение сообщения или ошибки, если применимо int * partition - раздел, если применимо int * size - полезная нагрузка сообщения long unsigned int * Входное сообщение - предыдущее значение блока DATA

Ошибки

  • SSL errors

  • Compression Codec

  • Queue is Full

Пример использования

Для любого сообщения требуется адрес отправителя и получателя. Чтобы сообщение доходило до адресата, требуется его указать. Для данного примера мы возьмём уже готовый кластер из примера в KafkaInput

Создадим новый топик outKafka со следующими настройками:

image2

В примере будет использоваться данная схема:

image3

Для Генератора будем использовать стандартную настройку:

{
    "connection_status":"Ok"
}
  • Повтор: 10000 мс.

Настройки для узла X:

image4

После сохранения цепочки в KafkaUI посмотрим на приходящие сообщения:

image5


Далее было написано кем-то, пока стирать не буду

  • Topic pattern - может быть статической строкой или шаблоном, который разрешается с помощью свойств метаданных сообщения. Например, ${deviceType}

  • bootstrap servers - список брокеров kafka, разделенных запятой.

  • Automatically retry times - количество попыток повторной отправки сообщения в случае сбоя соединения.

  • Produces batch size - размер пакета в байтах для группировки сообщений с одним и тем же разделом.

  • Time to buffer locally - максимальная продолжительность локального окна буферизации в мс.

  • Client buffer max size - максимальный размер буфера в байтах для отправки сообщений.

  • Number of acknowledgments - количество подтверждений, которые узел должен получить, прежде чем рассматривать запрос завершенным.

  • Key serializer - по умолчанию org.apache.kafka.common.serialization.StringSerializer

  • Value serializer - по умолчанию org.apache.kafka.common.serialization.StringSerializer

  • Other properties - любые другие дополнительные свойства могут быть предоставлены для подключения брокера kafka.

Published body - узел отправит полную полезную нагрузку сообщения в тему Kafka. При необходимости цепочка правил может быть настроена на использование цепочки узлов преобразования для отправки правильной полезной нагрузки в Kafka.

Исходящее сообщение - с этого узла будет содержать свойства смещения ответа, раздела и темы в метаданных сообщения. Полезная нагрузка, тип и отправитель исходного сообщения не будут изменены.

Примечание

если вы хотите использовать Confluent cloud в качестве

брокера kafka, вам следует добавить следующие свойства:

*   - **Ключ**
    - **Значение**
*   - ssl.endpoint.identification.algorithm
    - https
*   - sasl.mechanism
    - PLAIN
*   - sasl.jaas.config
    - PLAIN
*   - org.apache.kafka.common.security.plain.PlainLogin - обязателен, модель: username='CLUSTER_API_KEY', password='CLUSTER_API_SECRET';
    - SASL_SSL

CLUSTER_API_KEY - ваш ключ доступа из настроек кластера.

CLUSTER_API_SECRET - секрет доступа из настроек кластера.