Kafka ===== ``KafkaInput.cpp`` |image0| Узел Kafka подключается к заданному топику и мониторит сообщения поступающие в него. Конфигурация узла ----------------- Входные параметры ~~~~~~~~~~~~~~~~~ |image1| - **Host** - адрес сервера с Apache Kafka. - **Port** - порт сервера по которому доступен Apache Kafka. *По умолчанию 9092*. - **Topic** - топик из которого будут приниматься сообщения. - **Group_id** - идентефикатор необходимый для Apache Kafka для разделения слушателей. - **format** - формат сообщения, которое приходит в топик. - **metric_period** - устанавливается как значение в **settings**, иначе 1 час. Если выбран формат сообщения **Avro**, то станут доступны дополнительные настройки: ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ |image2| - **Адрес хранилища схем** - адрес сервера, где хранятся Avro схемы. Формат адреса указан в плейсхолдере. - **Имя схемы** - имя, с которым схема будет зарегистрирована. - **Схема** - файл формата avsc с описанием Avro схемы. (Кодировка UTF-8. Избегайте использования Кириллицы) Примеры использования --------------------- Kafka Json ~~~~~~~~~~ 1. Построй схему и напиши в ней следующее: |image3| 2. Для корректной работы сервиса создадим контейнер **kafka**, а также добавим **zookeper** и **kafka_ui** для отображения наших кластеров. Из-за возникающей ошибки в fluentd стоит добавить **elasticsearch**. Вместе с этим сервисом для визуализации данных используется **kibana**. Конечный **compose.yaml** будет выглядеть так: 3. Забилдим наш проект. .. raw:: html
Зайдём на **localhost:8082**, надо настроить, как на рисунке: |image4| Сохраняем, получаем работающий кластер (зеленая точка будет в kafka-ui) Создай свой топик в Topics, далее он пригодится. (Я создал **testTopic**) Проверим в Consumers, если указанной в настройках узла группы не будет, то перезапустим kafka контейнер .. code:: sh sudo docker stop kafka .. code:: sh sudo docker start kafka После этого Создадим следующий питон файл .. code:: python import json import time from kafka import KafkaProducer broker = 'localhost:9092' #10.186.0.165 192.168.1.207 topic = 'testTopic' producer = KafkaProducer(bootstrap_servers=broker,value_serializer=lambda v: json.dumps(v).encode('ascii')) print("Producer connected successfully") data = {'name':'abc', 'email':'abc@example.com'} for i in range(1000): producer.send(topic,data) producer.flush() Запустим PromUC или обновим страницу, после исполнения кода python выше в консоли браузера должно прийти сообщение как на рисунке, а в узле KafkaInput должна загореться зеленая лампочка. |image5| Пример Avro ~~~~~~~~~~~ 1. В *compose* создадим контейнер, он поможет с созданием схем *Avro*: .. code:: yaml schema-registry: image: confluentinc/cp-schema-registry:7.2.1 hostname: schema-registry container_name: schema-registry depends_on: - zookeeper - kafka ports: - "8081:8081" environment: SCHEMA_REGISTRY_HOST_NAME: schema-registry SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'PLAINTEXT://kafka:29092' SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081 SCHEMA_REGISTRY_ADVERTISED_LISTENERS: 'PLAINTEXT://schema-registry:8081' 2. Соберём следующую схему: *test_Kafka_Avro.yaml (promuc_flow/doc_md/chains)* В ней уже записаны нужные параметры. 3. Добавим Schema_Url (localhost:8082): |image6| 4. Обновим страницу PromUC и запустим python-скрипт, чтобы отослать сообщение в нашу систему. *найти скрипт можно здесь:*/promuc_flow/doc_md/codes/avro_kafka_sender.py\* .. code:: python from confluent_kafka.avro import AvroProducer from confluent_kafka import avro kafka_config = { 'bootstrap.servers': 'localhost:9092', 'schema.registry.url': 'http://0.0.0.0:8081' } avro_producer = AvroProducer(kafka_config) data = { "firstName": "John", "lastName": "Doe", "age": 30 } try: avro_producer.produce(topic='my_topic1', value=data, value_schema=avro.loads(''' { "namespace": "avro", "type": "record", "name": "Person", "fields": [ {"name": "firstName", "type": "string"}, {"name": "lastName", "type": "string"}, {"name": "age", "type": "int"} ] } ''')) avro_producer.flush() print("Данные успешно отправлены в Kafka") except Exception as e: print(f"Ошибка: {e}") На выходе нас будет встречать наше сообщение. |image7| .. |image0| image:: ../../_img/input-kafka.png .. |image1| image:: ../../_img/input-kafka-settings1.png .. |image2| image:: ../../_img/input-kafka-settings2.png .. |image3| image:: ../../_img/input-kafka-settings_example.png .. |image4| image:: ../../_img/input-kafka-ui-example.png .. |image5| image:: ../../_img/kafka-input-example-chainout.png .. |image6| image:: ../../_img/kafka_avro_schema-registry.png .. |image7| image:: ../../_img/kafka_avro_socket.png