Kafka

KafkaInput.cpp

image0

Узел Kafka подключается к заданному топику и мониторит сообщения поступающие в него.

Конфигурация узла

Входные параметры

image1

  • Host - адрес сервера с Apache Kafka.

  • Port - порт сервера по которому доступен Apache Kafka. По умолчанию 9092.

  • Topic - топик из которого будут приниматься сообщения.

  • Group_id - идентефикатор необходимый для Apache Kafka для разделения слушателей.

  • format - формат сообщения, которое приходит в топик.

  • metric_period - устанавливается как значение в settings, иначе 1 час.

Если выбран формат сообщения Avro, то станут доступны дополнительные настройки:

image2

  • Адрес хранилища схем - адрес сервера, где хранятся Avro схемы. Формат адреса указан в плейсхолдере.

  • Имя схемы - имя, с которым схема будет зарегистрирована.

  • Схема - файл формата avsc с описанием Avro схемы. (Кодировка UTF-8. Избегайте использования Кириллицы)

Примеры использования

Kafka Json

  1. Построй схему и напиши в ней следующее:

image3

  1. Для корректной работы сервиса создадим контейнер kafka, а также добавим zookeper и kafka_ui для отображения наших кластеров.

Из-за возникающей ошибки в fluentd стоит добавить elasticsearch. Вместе с этим сервисом для визуализации данных используется kibana. Конечный compose.yaml будет выглядеть так:

  1. Забилдим наш проект.


Зайдём на localhost:8082, надо настроить, как на рисунке:

image4

Сохраняем, получаем работающий кластер (зеленая точка будет в kafka-ui)

Создай свой топик в Topics, далее он пригодится. (Я создал testTopic)

Проверим в Consumers, если указанной в настройках узла группы не будет, то перезапустим kafka контейнер

sudo docker stop kafka
sudo docker start kafka

После этого

Создадим следующий питон файл

import json
import time

from kafka import KafkaProducer


broker = 'localhost:9092' #10.186.0.165 192.168.1.207
topic = 'testTopic'
producer = KafkaProducer(bootstrap_servers=broker,value_serializer=lambda v: json.dumps(v).encode('ascii'))
print("Producer connected successfully")

data = {'name':'abc', 'email':'abc@example.com'}
for i in range(1000):
    producer.send(topic,data)

producer.flush()

Запустим PromUC или обновим страницу, после исполнения кода python выше в консоли браузера должно прийти сообщение как на рисунке, а в узле KafkaInput должна загореться зеленая лампочка.

image5

Пример Avro

  1. В compose создадим контейнер, он поможет с созданием схем Avro:

schema-registry:
    image: confluentinc/cp-schema-registry:7.2.1
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
      - zookeeper
      - kafka
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'PLAINTEXT://kafka:29092'
      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
      SCHEMA_REGISTRY_ADVERTISED_LISTENERS: 'PLAINTEXT://schema-registry:8081'
  1. Соберём следующую схему: test_Kafka_Avro.yaml (promuc_flow/doc_md/chains) В ней уже записаны нужные параметры.

  2. Добавим Schema_Url (localhost:8082): image6

  3. Обновим страницу PromUC и запустим python-скрипт, чтобы отослать сообщение в нашу систему. найти скрипт можно здесь:/promuc_flow/doc_md/codes/avro_kafka_sender.py*

from confluent_kafka.avro import AvroProducer
from confluent_kafka import avro
kafka_config = {
    'bootstrap.servers': 'localhost:9092',
    'schema.registry.url': 'http://0.0.0.0:8081'
}

avro_producer = AvroProducer(kafka_config)

data = {
    "firstName": "John",
    "lastName": "Doe",
    "age": 30
}
try:
    avro_producer.produce(topic='my_topic1', value=data, value_schema=avro.loads('''
     {
      "namespace": "avro",
      "type": "record",
      "name": "Person",
      "fields": [
        {"name": "firstName", "type": "string"},
        {"name": "lastName", "type": "string"},
        {"name": "age", "type": "int"}
      ]
    }
    '''))
    avro_producer.flush()
    print("Данные успешно отправлены в Kafka")
except Exception as e:
    print(f"Ошибка: {e}")

На выходе нас будет встречать наше сообщение. image7