Kafka
=====
``KafkaInput.cpp``
|image0|
Узел Kafka подключается к заданному топику и мониторит сообщения
поступающие в него.
Конфигурация узла
-----------------
Входные параметры
~~~~~~~~~~~~~~~~~
|image1|
- **Host** - адрес сервера с Apache Kafka.
- **Port** - порт сервера по которому доступен Apache Kafka. *По
умолчанию 9092*.
- **Topic** - топик из которого будут приниматься сообщения.
- **Group_id** - идентефикатор необходимый для Apache Kafka для
разделения слушателей.
- **format** - формат сообщения, которое приходит в топик.
- **metric_period** - устанавливается как значение в **settings**,
иначе 1 час.
Если выбран формат сообщения **Avro**, то станут доступны дополнительные настройки:
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|image2|
- **Адрес хранилища схем** - адрес сервера, где хранятся Avro схемы.
Формат адреса указан в плейсхолдере.
- **Имя схемы** - имя, с которым схема будет зарегистрирована.
- **Схема** - файл формата avsc с описанием Avro схемы. (Кодировка
UTF-8. Избегайте использования Кириллицы)
Примеры использования
---------------------
Kafka Json
~~~~~~~~~~
1. Построй схему и напиши в ней следующее:
|image3|
2. Для корректной работы сервиса создадим контейнер **kafka**, а также
добавим **zookeper** и **kafka_ui** для отображения наших кластеров.
Из-за возникающей ошибки в fluentd стоит добавить **elasticsearch**.
Вместе с этим сервисом для визуализации данных используется **kibana**.
Конечный **compose.yaml** будет выглядеть так:
3. Забилдим наш проект.
.. raw:: html
Зайдём на **localhost:8082**, надо настроить, как на рисунке:
|image4|
Сохраняем, получаем работающий кластер (зеленая точка будет в kafka-ui)
Создай свой топик в Topics, далее он пригодится. (Я создал
**testTopic**)
Проверим в Consumers, если указанной в настройках узла группы не
будет, то перезапустим kafka контейнер
.. code:: sh
sudo docker stop kafka
.. code:: sh
sudo docker start kafka
После этого
Создадим следующий питон файл
.. code:: python
import json
import time
from kafka import KafkaProducer
broker = 'localhost:9092' #10.186.0.165 192.168.1.207
topic = 'testTopic'
producer = KafkaProducer(bootstrap_servers=broker,value_serializer=lambda v: json.dumps(v).encode('ascii'))
print("Producer connected successfully")
data = {'name':'abc', 'email':'abc@example.com'}
for i in range(1000):
producer.send(topic,data)
producer.flush()
Запустим PromUC или обновим страницу, после исполнения кода python выше
в консоли браузера должно прийти сообщение как на рисунке, а в узле
KafkaInput должна загореться зеленая лампочка.
|image5|
Пример Avro
~~~~~~~~~~~
1. В *compose* создадим контейнер, он поможет с созданием схем *Avro*:
.. code:: yaml
schema-registry:
image: confluentinc/cp-schema-registry:7.2.1
hostname: schema-registry
container_name: schema-registry
depends_on:
- zookeeper
- kafka
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'PLAINTEXT://kafka:29092'
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
SCHEMA_REGISTRY_ADVERTISED_LISTENERS: 'PLAINTEXT://schema-registry:8081'
2. Соберём следующую схему: *test_Kafka_Avro.yaml
(promuc_flow/doc_md/chains)* В ней уже записаны нужные параметры.
3. Добавим Schema_Url (localhost:8082): |image6|
4. Обновим страницу PromUC и запустим python-скрипт, чтобы отослать
сообщение в нашу систему. *найти скрипт можно
здесь:*/promuc_flow/doc_md/codes/avro_kafka_sender.py\*
.. code:: python
from confluent_kafka.avro import AvroProducer
from confluent_kafka import avro
kafka_config = {
'bootstrap.servers': 'localhost:9092',
'schema.registry.url': 'http://0.0.0.0:8081'
}
avro_producer = AvroProducer(kafka_config)
data = {
"firstName": "John",
"lastName": "Doe",
"age": 30
}
try:
avro_producer.produce(topic='my_topic1', value=data, value_schema=avro.loads('''
{
"namespace": "avro",
"type": "record",
"name": "Person",
"fields": [
{"name": "firstName", "type": "string"},
{"name": "lastName", "type": "string"},
{"name": "age", "type": "int"}
]
}
'''))
avro_producer.flush()
print("Данные успешно отправлены в Kafka")
except Exception as e:
print(f"Ошибка: {e}")
На выходе нас будет встречать наше сообщение. |image7|
.. |image0| image:: ../../_img/input-kafka.png
.. |image1| image:: ../../_img/input-kafka-settings1.png
.. |image2| image:: ../../_img/input-kafka-settings2.png
.. |image3| image:: ../../_img/input-kafka-settings_example.png
.. |image4| image:: ../../_img/input-kafka-ui-example.png
.. |image5| image:: ../../_img/kafka-input-example-chainout.png
.. |image6| image:: ../../_img/kafka_avro_schema-registry.png
.. |image7| image:: ../../_img/kafka_avro_socket.png