- Описание функциональных характеристик системы
- Поддержание жизненного цикла
- Установка PromUC
- Настройка сервисов
- Обновление PromUC
- Резервное копирование и восстановление базы данных Promuc FrameWork
- Рекомендации по безопасности
- Руководство по эксплуатации и администрированию
- Module GIS
- Конфигурирование правил и сценариев
- Module Framework
- Module Interface
- Технологии проекта
- Эксплуатация КТС
- Единый сервис аутентификации
- Информация о релизах
Kafka¶
KafkaInput.cpp
Узел Kafka подключается к заданному топику и мониторит сообщения поступающие в него.
Конфигурация узла¶
Входные параметры¶
Host - адрес сервера с Apache Kafka.
Port - порт сервера по которому доступен Apache Kafka. По умолчанию 9092.
Topic - топик из которого будут приниматься сообщения.
Group_id - идентефикатор необходимый для Apache Kafka для разделения слушателей.
format - формат сообщения, которое приходит в топик.
metric_period - устанавливается как значение в settings, иначе 1 час.
Если выбран формат сообщения Avro, то станут доступны дополнительные настройки:¶
Адрес хранилища схем - адрес сервера, где хранятся Avro схемы. Формат адреса указан в плейсхолдере.
Имя схемы - имя, с которым схема будет зарегистрирована.
Схема - файл формата avsc с описанием Avro схемы. (Кодировка UTF-8. Избегайте использования Кириллицы)
Примеры использования¶
Kafka Json¶
Построй схему и напиши в ней следующее:
Для корректной работы сервиса создадим контейнер kafka, а также добавим zookeper и kafka_ui для отображения наших кластеров.
Из-за возникающей ошибки в fluentd стоит добавить elasticsearch. Вместе с этим сервисом для визуализации данных используется kibana. Конечный compose.yaml будет выглядеть так:
Забилдим наш проект.
Зайдём на localhost:8082, надо настроить, как на рисунке:
Сохраняем, получаем работающий кластер (зеленая точка будет в kafka-ui)
Создай свой топик в Topics, далее он пригодится. (Я создал testTopic)
Проверим в Consumers, если указанной в настройках узла группы не будет, то перезапустим kafka контейнер
sudo docker stop kafka
sudo docker start kafka
После этого
Создадим следующий питон файл
import json
import time
from kafka import KafkaProducer
broker = 'localhost:9092' #10.186.0.165 192.168.1.207
topic = 'testTopic'
producer = KafkaProducer(bootstrap_servers=broker,value_serializer=lambda v: json.dumps(v).encode('ascii'))
print("Producer connected successfully")
data = {'name':'abc', 'email':'abc@example.com'}
for i in range(1000):
producer.send(topic,data)
producer.flush()
Запустим PromUC или обновим страницу, после исполнения кода python выше в консоли браузера должно прийти сообщение как на рисунке, а в узле KafkaInput должна загореться зеленая лампочка.
Пример Avro¶
В compose создадим контейнер, он поможет с созданием схем Avro:
schema-registry:
image: confluentinc/cp-schema-registry:7.2.1
hostname: schema-registry
container_name: schema-registry
depends_on:
- zookeeper
- kafka
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'PLAINTEXT://kafka:29092'
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
SCHEMA_REGISTRY_ADVERTISED_LISTENERS: 'PLAINTEXT://schema-registry:8081'
Соберём следующую схему: test_Kafka_Avro.yaml (promuc_flow/doc_md/chains) В ней уже записаны нужные параметры.
Добавим Schema_Url (localhost:8082):
Обновим страницу PromUC и запустим python-скрипт, чтобы отослать сообщение в нашу систему. найти скрипт можно здесь:/promuc_flow/doc_md/codes/avro_kafka_sender.py*
from confluent_kafka.avro import AvroProducer
from confluent_kafka import avro
kafka_config = {
'bootstrap.servers': 'localhost:9092',
'schema.registry.url': 'http://0.0.0.0:8081'
}
avro_producer = AvroProducer(kafka_config)
data = {
"firstName": "John",
"lastName": "Doe",
"age": 30
}
try:
avro_producer.produce(topic='my_topic1', value=data, value_schema=avro.loads('''
{
"namespace": "avro",
"type": "record",
"name": "Person",
"fields": [
{"name": "firstName", "type": "string"},
{"name": "lastName", "type": "string"},
{"name": "age", "type": "int"}
]
}
'''))
avro_producer.flush()
print("Данные успешно отправлены в Kafka")
except Exception as e:
print(f"Ошибка: {e}")
На выходе нас будет встречать наше сообщение.