diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml new file mode 100644 index 0000000000000000000000000000000000000000..5b0e2b693e31d17619924ba42d88411b6469b3b4 --- /dev/null +++ b/.gitlab-ci.yml @@ -0,0 +1,7 @@ +image: python:latest + +run: + script: + - pip install build twine + - python setup.py sdist bdist_wheel + - TWINE_PASSWORD=${CI_JOB_TOKEN} TWINE_USERNAME=gitlab-ci-token python -m twine upload --repository-url ${CI_API_V4_URL}/projects/${CI_PROJECT_ID}/packages/pypi dist/* diff --git a/pilab/__init__.py b/pilab/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/pilab/config.py b/pilab/config.py new file mode 100644 index 0000000000000000000000000000000000000000..a7167986876263d14c200f9c7690ec77887c17e7 --- /dev/null +++ b/pilab/config.py @@ -0,0 +1,39 @@ +import threading +from pydantic import BaseModel +import logging +import ctypes + +LOG_LEVEL="INFO" + +class LogConfig(BaseModel): + """Logging configuration to be set for the server""" + + LOG_FORMAT: str = "%(levelprefix)s | %(asctime)s | %(message)s" + LOG_LEVEL: str = LOG_LEVEL + + # Logging config + version = 1 + disable_existing_loggers = False + formatters = { + "default": { + "()": "uvicorn.logging.DefaultFormatter", + "fmt": LOG_FORMAT, + "datefmt": "%Y-%m-%d %H:%M:%S", + }, + } + handlers = { + "default": { + "formatter": "default", + "class": "logging.StreamHandler", + "stream": "ext://sys.stderr", + }, + } + loggers = { + '': {"handlers": ["default"], "level": LOG_LEVEL, 'propagate': False}, + 'uvicorn': {"handlers": ["default"], "level": LOG_LEVEL, 'propagate': False} + } + + +class EndpointFilter(logging.Filter): + def filter(self, record: logging.LogRecord) -> bool: + return record.getMessage().find("/health") == -1 diff --git a/pilab/events.py b/pilab/events.py new file mode 100644 index 0000000000000000000000000000000000000000..81bb6145929f6c11c88e85895ae0ca6ca1715c60 --- /dev/null +++ b/pilab/events.py @@ -0,0 +1,141 @@ +import json +import logging +import os +import threading +import traceback +from enum import Enum +from ipaddress import IPv4Address +from typing import List, Optional, Callable + +import pika +import pika.exceptions +from pydantic import BaseModel + +logger = logging.getLogger(__name__) +logging.getLogger("pika").setLevel(logging.WARNING) + +BROKER_ADDRESS = os.getenv('BROKER_ADDRESS') or 'localhost' +BROKER_USER = os.getenv('BROKER_USER') or None +BROKER_PASSWORD = os.getenv('BROKER_PASSWORD') or None + +custom_callback: Callable + + +class EventType(str, Enum): + CREATE = 'create' + UPDATE = 'update' + DELETE = 'delete' + + +class Host(BaseModel): + """ + representation of host object with id + """ + id: int + mac: str + ipv4_address: IPv4Address + hostname: str + + +class Pi(Host): + """ + representation of pi object with host id + """ + serial: bytes + display: bool + position: int + ssh_host_ed25519_key: Optional[str] + pass + + +class Cube(BaseModel): + """ + representation of cube object with host ids + """ + id: int + controller: Host + switch: Host + head: Pi + workers: List[Pi] + pass + + +class Event(BaseModel): + type: EventType + timestamp: float + payload: Cube + pass + + +def test_connection(): + try: + # Create a connection to the broker + connection = get_connection() + + # Declare a channel and a queue + channel = connection.channel() + + # Close the connection + connection.close() + except Exception as e: + logger.error(f"Failed to connect to broker; message: {e}") + raise + + +def get_connection(): + if BROKER_USER is not None: + credentials = pika.PlainCredentials(username=BROKER_USER, password=BROKER_PASSWORD) + parameters = pika.ConnectionParameters(host=BROKER_ADDRESS, credentials=credentials) + return pika.BlockingConnection(parameters) + else: + parameters = pika.ConnectionParameters(host=BROKER_ADDRESS) + return pika.BlockingConnection(parameters) + + +class EventListener(threading.Thread): + + def __init__(self, exchange: str, queue: str, callback: Callable): + super(EventListener, self).__init__() + self._is_interrupted = False + self.exchange: str = exchange + self.queue: str = queue + self.callback: Callable = callback + self.channel = None + self.connection = None + + def default_callback(self, ch, method, properties, body): + try: + event = Event.parse_obj(json.loads(body.decode('utf-8'))) + self.callback(event) + ch.basic_ack(delivery_tag=method.delivery_tag) + logger.info(f"Event processed succesful") + except Exception as e: + traceback.print_exc() + logger.error(f"Failed to process Event; message {e}") + + def run(self): + while True: + try: + self.connection = get_connection() + self.channel = self.connection.channel() + self.channel.exchange_declare(exchange=self.exchange, exchange_type='fanout') + self.channel.queue_declare(queue=self.queue, durable=True, exclusive=False, auto_delete=False) + self.channel.queue_bind(exchange=self.exchange, queue=self.queue) + self.channel.basic_consume(queue=self.queue, on_message_callback=self.default_callback, auto_ack=False) + try: + self.channel.start_consuming() + except SystemExit: + logger.info(f"System exit. Close Broker Connection...") + self.channel.stop_consuming() + self.connection.close() + break + except pika.exceptions.ConnectionClosedByBroker: + continue + # Do not recover on channel errors + except pika.exceptions.AMQPChannelError as err: + logger.error(f"Caught a channel error: {err}, stopping...") + break + # Recover on all other connection errors + except pika.exceptions.AMQPConnectionError: + logger.error(f"Connection was closed, retrying...") + continue diff --git a/pilab/utils.py b/pilab/utils.py new file mode 100644 index 0000000000000000000000000000000000000000..e10217d8b73c6d30bf89e866c23bdcf735e28c5f --- /dev/null +++ b/pilab/utils.py @@ -0,0 +1,22 @@ +import ctypes +import threading + + +def terminate_thread(thread: threading.Thread): + """Terminates a python thread from another thread. + + :param thread: a threading.Thread instance + """ + if not thread.is_alive(): + return + + exc = ctypes.py_object(SystemExit) + res = ctypes.pythonapi.PyThreadState_SetAsyncExc( + ctypes.c_long(thread.ident), exc) + if res == 0: + raise ValueError("nonexistent thread id") + elif res > 1: + # """if it returns a number greater than one, you're in trouble, + # and you should call it again with exc=NULL to revert the effect""" + ctypes.pythonapi.PyThreadState_SetAsyncExc(thread.ident, None) + raise SystemError("PyThreadState_SetAsyncExc failed") diff --git a/setup.sh b/setup.sh new file mode 100644 index 0000000000000000000000000000000000000000..852a72d46ef2317ef41295987dd91cf9874d93b8 --- /dev/null +++ b/setup.sh @@ -0,0 +1,18 @@ +from setuptools import setup, find_packages + +setup( + name='pilab', + version='0.1.0', + description='Shared-Libs for the pi-lab microservices', + url='https://code.fbi.h-da.de/api/v4/projects/27896/packages/pypi/pilab', + author='Max Reinheimer', + author_email='maximilian.reinheimer@stud.h-da.de', + license='MIT', + packages=find_packages(), + install_requires=['pika'], + classifiers=[ + 'Development Status :: 3 - Alpha', + 'Operating System :: POSIX :: Linux', + 'Programming Language :: Python :: 3.10', + ], +)