Python

image0

Представляет собой функцию Python, на вход которой подаётся объект map (сообщение) - на выходе изменённый объект map

Инициализирует Python и загружает скрипт, далее считывает этот скрипт построчно до встречи с exit

Конфигурация узла

image1

В коде должно присутствовать return obj, иначе Error

Пример использования

Данные на вход:

image2

Этот json приходит на вход нашего скрипт-узла:

{"id":3,"in":{"DATA":{"connection_status":"Hello Redis"},"METADATA":{"chain":"test","from":"Generator","id":4,"time":1722413175620}},"name":"test","type":"debug"}

Этот блок кода мы вставляем в узел Python

import datetime
obj['DATA']['Notebook']= str(datetime.datetime.now())
return obj

Возвращаемые значения

{"id":3,"in":{"DATA":{"Notebook":"2024-07-31 12:15:39.730471","connection_status":"Hello Redis"},"METADATA":{"chain":"test","from":"Generator","id":4,"time":1722413739729}},"name":"test","type":"debug"}

image3

Видим, что в объект DATA была добавлена строка Notebook

Python TESTS

Тестирование Python заключается в проверке работы подключенных библиотек

Тестируем библиотеки

Math ✓

import math
obj['METADATA']['StorageWrite'] = {}
obj['METADATA']['StorageWrite']['key']= "test"+str((int)(math.pow(2,2)))
obj['METADATA']['StorageWrite']['path'] = "DATA.connection_status"
return obj

image4 В Redis записалось как test4

array ✓

from array import array
obj['DATA']['Notebook'] = {}
obj['DATA']['Notebook']= str(array('d',[1.1,6,7.77777777]))+str(array('b', b'is array'))
return obj

image5

binascii ✓

import binascii
obj['DATA']['Notebook'] = {}
obj['DATA']['Notebook']= str(binascii.hexlify(("somestring").encode("utf-8")))
return obj

image6

asyncio ✓

На код не ругается - значит работает

import asyncio
obj['DATA']['Notebook'] = {}
asyncio.sleep(1)
obj['DATA']['Notebook']= str("it works?")
return obj

bultins ✓

Даже без импорта работает

obj['DATA']['Notebook'] = {}
obj['DATA']['Notebook']= str("it works?")+str(MemoryError.text)
return obj

image7

collections ✓

import collections
d = collections.deque([1, 2, 3])
d.append(4)
d.appendleft(0)
obj['DATA']['Notebook'] = {}
obj['DATA']['Notebook']= str("it works?")+str(d)
return obj

image8

gc ✓

import gc
obj['DATA']['Notebook'] = {}
obj['DATA']['Notebook']= str(gc.collect())
return obj

image9

hashlib ✓

import hashlib
text = "Promuc"
obj['DATA']['Notebook'] = {}
obj['DATA']['Notebook']= str(hashlib.sha256(text.encode()).hexdigest())
return obj

image10

heapq ✓

import heapq
data = [4, 3, 1, 16, 6]
heapq.heapify(data)
heapq.heappush(data, 2)
smallest = heapq.heappop(data)
obj['DATA']['Notebook'] = {}
obj['DATA']['Notebook2'] = {}
obj['DATA']['Notebook']= str(smallest)
obj['DATA']['Notebook2']= str(data)
return obj

image11

datetime ✓

import datetime
obj['DATA']['Notebook'] = {}
obj['DATA']['Notebook']= str(datetime.datetime.now())
return obj

image12

json ✓

import json,datetime
data = {
    "test":"Ok",
    "time": datetime.datetime.now().hour,
    "check": "checked"
}
json_str = json.dumps(data)
obj['DATA']['Notebook'] = {}
obj['DATA']['Notebook']= str(json_str)
return obj

image13

random ✓

import random
obj['DATA']['Notebook'] = {}
obj['DATA']['Notebook']= str(random.randint(1,10))
return obj

image14

re ✓

import re
text = "some html tags use this structure <html> something </html><hr> "
pattern = r'<[^>]+>'
obj['DATA']['Notebook'] = {}
obj['DATA']['Notebook']= str(re.findall(pattern,text))
return obj

image15

struct ✓

import struct
obj['DATA']['Notebook'] = {}
obj['DATA']['Notebook']= str(struct.pack(">xBxxBx", 255, 128))
return obj

image16

time ✓

import time
local_time = time.localtime(time.time())
readtime = time.strftime("%Y-%m-%d %H:%M:%S", local_time)
obj['DATA']['Notebook'] = {}
obj['DATA']['Notebook']= str(readtime)
return obj

image17

ctypes ✓

import ctypes
libc = ctypes.CDLL('libc.so.6')
libc.strlen.argtypes = [ctypes.c_char_p]
libc.strlen.restype = ctypes.c_size_t
someStr = b"string for the test"
obj['DATA']['Notebook'] = {}
obj['DATA']['Notebook']= str(libc.strlen(someStr))
return obj

image18