Skip to content
Snippets Groups Projects
Commit 2af4c6a1 authored by istmxrein's avatar istmxrein
Browse files

init project

- add ci definition publish to gitlab package registry
- add common event methods
- add common log config
- add common utils function
parent ce891bc9
Branches
No related tags found
No related merge requests found
image: python:latest
run:
script:
- pip install build twine
- python setup.py sdist bdist_wheel
- TWINE_PASSWORD=${CI_JOB_TOKEN} TWINE_USERNAME=gitlab-ci-token python -m twine upload --repository-url ${CI_API_V4_URL}/projects/${CI_PROJECT_ID}/packages/pypi dist/*
import threading
from pydantic import BaseModel
import logging
import ctypes
LOG_LEVEL="INFO"
class LogConfig(BaseModel):
"""Logging configuration to be set for the server"""
LOG_FORMAT: str = "%(levelprefix)s | %(asctime)s | %(message)s"
LOG_LEVEL: str = LOG_LEVEL
# Logging config
version = 1
disable_existing_loggers = False
formatters = {
"default": {
"()": "uvicorn.logging.DefaultFormatter",
"fmt": LOG_FORMAT,
"datefmt": "%Y-%m-%d %H:%M:%S",
},
}
handlers = {
"default": {
"formatter": "default",
"class": "logging.StreamHandler",
"stream": "ext://sys.stderr",
},
}
loggers = {
'': {"handlers": ["default"], "level": LOG_LEVEL, 'propagate': False},
'uvicorn': {"handlers": ["default"], "level": LOG_LEVEL, 'propagate': False}
}
class EndpointFilter(logging.Filter):
def filter(self, record: logging.LogRecord) -> bool:
return record.getMessage().find("/health") == -1
import json
import logging
import os
import threading
import traceback
from enum import Enum
from ipaddress import IPv4Address
from typing import List, Optional, Callable
import pika
import pika.exceptions
from pydantic import BaseModel
logger = logging.getLogger(__name__)
logging.getLogger("pika").setLevel(logging.WARNING)
BROKER_ADDRESS = os.getenv('BROKER_ADDRESS') or 'localhost'
BROKER_USER = os.getenv('BROKER_USER') or None
BROKER_PASSWORD = os.getenv('BROKER_PASSWORD') or None
custom_callback: Callable
class EventType(str, Enum):
CREATE = 'create'
UPDATE = 'update'
DELETE = 'delete'
class Host(BaseModel):
"""
representation of host object with id
"""
id: int
mac: str
ipv4_address: IPv4Address
hostname: str
class Pi(Host):
"""
representation of pi object with host id
"""
serial: bytes
display: bool
position: int
ssh_host_ed25519_key: Optional[str]
pass
class Cube(BaseModel):
"""
representation of cube object with host ids
"""
id: int
controller: Host
switch: Host
head: Pi
workers: List[Pi]
pass
class Event(BaseModel):
type: EventType
timestamp: float
payload: Cube
pass
def test_connection():
try:
# Create a connection to the broker
connection = get_connection()
# Declare a channel and a queue
channel = connection.channel()
# Close the connection
connection.close()
except Exception as e:
logger.error(f"Failed to connect to broker; message: {e}")
raise
def get_connection():
if BROKER_USER is not None:
credentials = pika.PlainCredentials(username=BROKER_USER, password=BROKER_PASSWORD)
parameters = pika.ConnectionParameters(host=BROKER_ADDRESS, credentials=credentials)
return pika.BlockingConnection(parameters)
else:
parameters = pika.ConnectionParameters(host=BROKER_ADDRESS)
return pika.BlockingConnection(parameters)
class EventListener(threading.Thread):
def __init__(self, exchange: str, queue: str, callback: Callable):
super(EventListener, self).__init__()
self._is_interrupted = False
self.exchange: str = exchange
self.queue: str = queue
self.callback: Callable = callback
self.channel = None
self.connection = None
def default_callback(self, ch, method, properties, body):
try:
event = Event.parse_obj(json.loads(body.decode('utf-8')))
self.callback(event)
ch.basic_ack(delivery_tag=method.delivery_tag)
logger.info(f"Event processed succesful")
except Exception as e:
traceback.print_exc()
logger.error(f"Failed to process Event; message {e}")
def run(self):
while True:
try:
self.connection = get_connection()
self.channel = self.connection.channel()
self.channel.exchange_declare(exchange=self.exchange, exchange_type='fanout')
self.channel.queue_declare(queue=self.queue, durable=True, exclusive=False, auto_delete=False)
self.channel.queue_bind(exchange=self.exchange, queue=self.queue)
self.channel.basic_consume(queue=self.queue, on_message_callback=self.default_callback, auto_ack=False)
try:
self.channel.start_consuming()
except SystemExit:
logger.info(f"System exit. Close Broker Connection...")
self.channel.stop_consuming()
self.connection.close()
break
except pika.exceptions.ConnectionClosedByBroker:
continue
# Do not recover on channel errors
except pika.exceptions.AMQPChannelError as err:
logger.error(f"Caught a channel error: {err}, stopping...")
break
# Recover on all other connection errors
except pika.exceptions.AMQPConnectionError:
logger.error(f"Connection was closed, retrying...")
continue
import ctypes
import threading
def terminate_thread(thread: threading.Thread):
"""Terminates a python thread from another thread.
:param thread: a threading.Thread instance
"""
if not thread.is_alive():
return
exc = ctypes.py_object(SystemExit)
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(
ctypes.c_long(thread.ident), exc)
if res == 0:
raise ValueError("nonexistent thread id")
elif res > 1:
# """if it returns a number greater than one, you're in trouble,
# and you should call it again with exc=NULL to revert the effect"""
ctypes.pythonapi.PyThreadState_SetAsyncExc(thread.ident, None)
raise SystemError("PyThreadState_SetAsyncExc failed")
setup.sh 0 → 100644
from setuptools import setup, find_packages
setup(
name='pilab',
version='0.1.0',
description='Shared-Libs for the pi-lab microservices',
url='https://code.fbi.h-da.de/api/v4/projects/27896/packages/pypi/pilab',
author='Max Reinheimer',
author_email='maximilian.reinheimer@stud.h-da.de',
license='MIT',
packages=find_packages(),
install_requires=['pika'],
classifiers=[
'Development Status :: 3 - Alpha',
'Operating System :: POSIX :: Linux',
'Programming Language :: Python :: 3.10',
],
)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment