Skip to content
Snippets Groups Projects
events.py 4.07 KiB
Newer Older
  • Learn to ignore specific revisions
  • istmxrein's avatar
    istmxrein committed
    import json
    import logging
    import os
    import threading
    import traceback
    from enum import Enum
    from ipaddress import IPv4Address
    from typing import List, Optional, Callable
    
    import pika
    import pika.exceptions
    from pydantic import BaseModel
    
    logger = logging.getLogger(__name__)
    logging.getLogger("pika").setLevel(logging.WARNING)
    
    BROKER_ADDRESS = os.getenv('BROKER_ADDRESS') or 'localhost'
    BROKER_USER = os.getenv('BROKER_USER') or None
    BROKER_PASSWORD = os.getenv('BROKER_PASSWORD') or None
    
    custom_callback: Callable
    
    
    class EventType(str, Enum):
        CREATE = 'create'
        UPDATE = 'update'
        DELETE = 'delete'
    
    
    class Host(BaseModel):
        """
        representation of host object with id
        """
        id: int
        mac: str
        ipv4_address: IPv4Address
        hostname: str
    
    
    class Pi(Host):
        """
        representation of pi object with host id
        """
        serial: bytes
        display: bool
        position: int
        ssh_host_ed25519_key: Optional[str]
        pass
    
    
    class Cube(BaseModel):
        """
        representation of cube object with host ids
        """
        id: int
        controller: Host
        switch: Host
        head: Pi
        workers: List[Pi]
        pass
    
    
    class Event(BaseModel):
        type: EventType
        timestamp: float
        payload: Cube
        pass
    
    
    def test_connection():
        try:
            # Create a connection to the broker
            connection = get_connection()
    
            # Declare a channel and a queue
            channel = connection.channel()
    
            # Close the connection
            connection.close()
        except Exception as e:
            logger.error(f"Failed to connect to broker; message: {e}")
            raise
    
    
    def get_connection():
        if BROKER_USER is not None:
            credentials = pika.PlainCredentials(username=BROKER_USER, password=BROKER_PASSWORD)
            parameters = pika.ConnectionParameters(host=BROKER_ADDRESS, credentials=credentials)
            return pika.BlockingConnection(parameters)
        else:
            parameters = pika.ConnectionParameters(host=BROKER_ADDRESS)
            return pika.BlockingConnection(parameters)
    
    
    class EventListener(threading.Thread):
    
        def __init__(self, exchange: str, queue: str, callback: Callable):
            super(EventListener, self).__init__()
            self._is_interrupted = False
            self.exchange: str = exchange
            self.queue: str = queue
            self.callback: Callable = callback
            self.channel = None
            self.connection = None
    
        def default_callback(self, ch, method, properties, body):
            try:
                event = Event.parse_obj(json.loads(body.decode('utf-8')))
                self.callback(event)
                ch.basic_ack(delivery_tag=method.delivery_tag)
                logger.info(f"Event processed succesful")
            except Exception as e:
                traceback.print_exc()
                logger.error(f"Failed to process Event; message {e}")
    
        def run(self):
            while True:
                try:
                    self.connection = get_connection()
                    self.channel = self.connection.channel()
                    self.channel.exchange_declare(exchange=self.exchange, exchange_type='fanout')
                    self.channel.queue_declare(queue=self.queue, durable=True, exclusive=False, auto_delete=False)
                    self.channel.queue_bind(exchange=self.exchange, queue=self.queue)
                    self.channel.basic_consume(queue=self.queue, on_message_callback=self.default_callback, auto_ack=False)
                    try:
                        self.channel.start_consuming()
                    except SystemExit:
                        logger.info(f"System exit. Close Broker Connection...")
                        self.channel.stop_consuming()
                        self.connection.close()
                        break
                except pika.exceptions.ConnectionClosedByBroker:
                    continue
                # Do not recover on channel errors
                except pika.exceptions.AMQPChannelError as err:
                    logger.error(f"Caught a channel error: {err}, stopping...")
                    break
                # Recover on all other connection errors
                except pika.exceptions.AMQPConnectionError:
                    logger.error(f"Connection was closed, retrying...")
                    continue