Skip to content
Snippets Groups Projects
meta.py 3.96 KiB
Newer Older
  • Learn to ignore specific revisions
  • istmxrein's avatar
    istmxrein committed
    import json
    import logging
    import threading
    
    import time
    
    istmxrein's avatar
    istmxrein committed
    import traceback
    from enum import Enum
    from ipaddress import IPv4Address
    
    istmxrein's avatar
    istmxrein committed
    from typing import List, Optional, Callable, Dict
    
    istmxrein's avatar
    istmxrein committed
    
    import pika
    import pika.exceptions
    from pydantic import BaseModel
    
    
    from pilab.events import utils
    
    istmxrein's avatar
    istmxrein committed
    
    
    logger = logging.getLogger(__name__)
    
    istmxrein's avatar
    istmxrein committed
    
    
    STREAM_NAME = "meta.cube"
    
    
    
    def convert_to_optional(schema):
    
    istmxrein's avatar
    istmxrein committed
        return {k: Optional[v] for k, v in schema.__annotations__.items()}
    
    istmxrein's avatar
    istmxrein committed
    
    
    class Host(BaseModel):
        """
        representation of host object with id
        """
        id: int
        mac: str
        ipv4_address: IPv4Address
        hostname: str
    
    
    
    class HostUpdate(Host):
        """
        representation of host for update
        """
        __annotations__ = convert_to_optional(Host)
    
    
    
    istmxrein's avatar
    istmxrein committed
    class Pi(Host):
        """
        representation of pi object with host id
        """
        serial: bytes
        display: bool
        position: int
        ssh_host_ed25519_key: Optional[str]
    
    
    
    class PiUpdate(HostUpdate):
        """
        representation of pi object for update
        """
        __annotations__ = convert_to_optional(Pi)
    
    istmxrein's avatar
    istmxrein committed
    
    
    class Cube(BaseModel):
        """
        representation of cube object with host ids
        """
        id: int
        controller: Host
        switch: Host
        head: Pi
        workers: List[Pi]
    
    
    
    class EventType(str, Enum):
        CREATE = 'create'
        UPDATE = 'update'
        DELETE = 'delete'
    
    istmxrein's avatar
    istmxrein committed
    
    
    class Event(BaseModel):
        type: EventType
        timestamp: float
        payload: Cube
    
    
    
    istmxrein's avatar
    istmxrein committed
    class MetaListener(threading.Thread):
    
    istmxrein's avatar
    istmxrein committed
        def __init__(self, callback: Callable, queue: str = STREAM_NAME, offset="first"):
    
    istmxrein's avatar
    istmxrein committed
            super(MetaListener, self).__init__()
    
    istmxrein's avatar
    istmxrein committed
            self.queue: str = queue
            self.callback: Callable = callback
    
    istmxrein's avatar
    istmxrein committed
            self.offset = offset
    
    istmxrein's avatar
    istmxrein committed
            self.channel = None
            self.connection = None
    
    istmxrein's avatar
    istmxrein committed
            self._prefetch_count = 1
    
    istmxrein's avatar
    istmxrein committed
        def default_callback(self, ch, method, properties, body):
            try:
                event = Event.parse_obj(json.loads(body.decode('utf-8')))
    
    istmxrein's avatar
    istmxrein committed
                offset = properties.headers['x-stream-offset']
                self.callback(event, offset)
    
    istmxrein's avatar
    istmxrein committed
                self.channel.basic_ack(method.delivery_tag)
    
    istmxrein's avatar
    istmxrein committed
                self.offset = offset
    
    istmxrein's avatar
    istmxrein committed
                logger.info(f"Event processed succesful")
            except Exception as e:
                traceback.print_exc()
    
    istmxrein's avatar
    istmxrein committed
                logger.error(f"Failed to process Meta event; message {e}")
                raise
    
    istmxrein's avatar
    istmxrein committed
    
        def run(self):
            while True:
                try:
    
                    self.connection = utils.get_blocking_connection()
    
    istmxrein's avatar
    istmxrein committed
                    self.channel = self.connection.channel()
    
    istmxrein's avatar
    istmxrein committed
                    self.channel.queue_declare(queue=self.queue, durable=True, exclusive=False, auto_delete=False,
                                               arguments={"x-queue-type": "stream"})
                    self.channel.basic_qos(prefetch_count=self._prefetch_count)
                    self.channel.basic_consume(queue=self.queue, on_message_callback=self.default_callback, auto_ack=False,
                                               arguments={"x-stream-offset": self.offset})
    
    
    istmxrein's avatar
    istmxrein committed
                    try:
                        self.channel.start_consuming()
                    except SystemExit:
                        logger.info(f"System exit. Close Broker Connection...")
                        self.channel.stop_consuming()
                        self.connection.close()
                        break
                except pika.exceptions.ConnectionClosedByBroker:
    
                    logger.error(f"Connection was closed, retrying...")
                    time.sleep(1)
    
    istmxrein's avatar
    istmxrein committed
                    continue
                # Do not recover on channel errors
                except pika.exceptions.AMQPChannelError as err:
                    logger.error(f"Caught a channel error: {err}, stopping...")
    
    istmxrein's avatar
    istmxrein committed
                    continue
    
    istmxrein's avatar
    istmxrein committed
                # Recover on all other connection errors
                except pika.exceptions.AMQPConnectionError:
    
                    logger.error(f"Connection closed unexpected, retrying...")
                    time.sleep(1)
    
    istmxrein's avatar
    istmxrein committed
                    continue
    
    istmxrein's avatar
    istmxrein committed
                except Exception:
                    logger.error(f"Unexpected error occured, retrying...")
                    time.sleep(1)
                    continue