Python ====== |image0| Представляет собой функцию Python, на вход которой подаётся объект **map** (сообщение) - на выходе изменённый объект **map** Инициализирует Python и загружает скрипт, далее считывает этот скрипт построчно до встречи с **exit** Конфигурация узла ----------------- |image1| В коде должно присутствовать ``return obj``, иначе Error Пример использования ~~~~~~~~~~~~~~~~~~~~ **Данные на вход:** |image2| Этот json приходит на вход нашего скрипт-узла: .. code:: json {"id":3,"in":{"DATA":{"connection_status":"Hello Redis"},"METADATA":{"chain":"test","from":"Generator","id":4,"time":1722413175620}},"name":"test","type":"debug"} Этот блок кода мы вставляем в узел Python .. code:: python import datetime obj['DATA']['Notebook']= str(datetime.datetime.now()) return obj **Возвращаемые значения** .. code:: json {"id":3,"in":{"DATA":{"Notebook":"2024-07-31 12:15:39.730471","connection_status":"Hello Redis"},"METADATA":{"chain":"test","from":"Generator","id":4,"time":1722413739729}},"name":"test","type":"debug"} |image3| Видим, что в объект **DATA** была добавлена строка **Notebook** Python TESTS ============ Тестирование Python заключается в проверке работы подключенных библиотек Тестируем библиотеки -------------------- Math ✓ ~~~~~~ .. code:: python import math obj['METADATA']['StorageWrite'] = {} obj['METADATA']['StorageWrite']['key']= "test"+str((int)(math.pow(2,2))) obj['METADATA']['StorageWrite']['path'] = "DATA.connection_status" return obj |image4| В Redis записалось как test4 array ✓ ~~~~~~~ .. code:: python from array import array obj['DATA']['Notebook'] = {} obj['DATA']['Notebook']= str(array('d',[1.1,6,7.77777777]))+str(array('b', b'is array')) return obj |image5| binascii ✓ ~~~~~~~~~~ .. code:: python import binascii obj['DATA']['Notebook'] = {} obj['DATA']['Notebook']= str(binascii.hexlify(("somestring").encode("utf-8"))) return obj |image6| asyncio ✓ ~~~~~~~~~ На код не ругается - значит работает .. code:: python import asyncio obj['DATA']['Notebook'] = {} asyncio.sleep(1) obj['DATA']['Notebook']= str("it works?") return obj bultins ✓ ~~~~~~~~~ Даже без импорта работает .. code:: python obj['DATA']['Notebook'] = {} obj['DATA']['Notebook']= str("it works?")+str(MemoryError.text) return obj |image7| collections ✓ ~~~~~~~~~~~~~ .. code:: python import collections d = collections.deque([1, 2, 3]) d.append(4) d.appendleft(0) obj['DATA']['Notebook'] = {} obj['DATA']['Notebook']= str("it works?")+str(d) return obj |image8| gc ✓ ~~~~ .. code:: python import gc obj['DATA']['Notebook'] = {} obj['DATA']['Notebook']= str(gc.collect()) return obj |image9| hashlib ✓ ~~~~~~~~~ .. code:: python import hashlib text = "Promuc" obj['DATA']['Notebook'] = {} obj['DATA']['Notebook']= str(hashlib.sha256(text.encode()).hexdigest()) return obj |image10| heapq ✓ ~~~~~~~ .. code:: python import heapq data = [4, 3, 1, 16, 6] heapq.heapify(data) heapq.heappush(data, 2) smallest = heapq.heappop(data) obj['DATA']['Notebook'] = {} obj['DATA']['Notebook2'] = {} obj['DATA']['Notebook']= str(smallest) obj['DATA']['Notebook2']= str(data) return obj |image11| datetime ✓ ~~~~~~~~~~ .. code:: python import datetime obj['DATA']['Notebook'] = {} obj['DATA']['Notebook']= str(datetime.datetime.now()) return obj |image12| json ✓ ~~~~~~ .. code:: python import json,datetime data = { "test":"Ok", "time": datetime.datetime.now().hour, "check": "checked" } json_str = json.dumps(data) obj['DATA']['Notebook'] = {} obj['DATA']['Notebook']= str(json_str) return obj |image13| random ✓ ~~~~~~~~ .. code:: python import random obj['DATA']['Notebook'] = {} obj['DATA']['Notebook']= str(random.randint(1,10)) return obj |image14| re ✓ ~~~~ .. code:: python import re text = "some html tags use this structure something
" pattern = r'<[^>]+>' obj['DATA']['Notebook'] = {} obj['DATA']['Notebook']= str(re.findall(pattern,text)) return obj |image15| struct ✓ ~~~~~~~~ .. code:: python import struct obj['DATA']['Notebook'] = {} obj['DATA']['Notebook']= str(struct.pack(">xBxxBx", 255, 128)) return obj |image16| time ✓ ~~~~~~ .. code:: python import time local_time = time.localtime(time.time()) readtime = time.strftime("%Y-%m-%d %H:%M:%S", local_time) obj['DATA']['Notebook'] = {} obj['DATA']['Notebook']= str(readtime) return obj |image17| ctypes ✓ ~~~~~~~~ .. code:: python import ctypes libc = ctypes.CDLL('libc.so.6') libc.strlen.argtypes = [ctypes.c_char_p] libc.strlen.restype = ctypes.c_size_t someStr = b"string for the test" obj['DATA']['Notebook'] = {} obj['DATA']['Notebook']= str(libc.strlen(someStr)) return obj |image18| .. |image0| image:: ../../_img/python_icon.png .. |image1| image:: ../../_img/python_settings.png .. |image2| image:: ../../_img/python_example_chain.png .. |image3| image:: ../../_img/python_socket.png .. |image4| image:: ../../_img/test_py_math.png .. |image5| image:: ../../_img/test_py_array_socket.png .. |image6| image:: ../../_img/test_py_binascii.png .. |image7| image:: ../../_img/test_py_builtins.png .. |image8| image:: ../../_img/test_py_collections.png .. |image9| image:: ../../_img/test_py_gc.png .. |image10| image:: ../../_img/test_py_hashlib.png .. |image11| image:: ../../_img/test_py_heapq.png .. |image12| image:: ../../_img/test_py_datetime.png .. |image13| image:: ../../_img/test_py_json.png .. |image14| image:: ../../_img/test_py_random.png .. |image15| image:: ../../_img/test_py_re.png .. |image16| image:: ../../_img/test_py_struct.png .. |image17| image:: ../../_img/test_py_time.png .. |image18| image:: ../../_img/test_py_ctypes.png