Skip to content
Snippets Groups Projects
meta.py 5.33 KiB
Newer Older
  • Learn to ignore specific revisions
  • istmxrein's avatar
    istmxrein committed
    import json
    import logging
    import threading
    
    import time
    
    istmxrein's avatar
    istmxrein committed
    import traceback
    from enum import Enum
    from ipaddress import IPv4Address
    
    istmxrein's avatar
    istmxrein committed
    from typing import List, Optional, Callable, Dict
    
    Alfred Ashur Oshana's avatar
    Alfred Ashur Oshana committed
    from datetime import datetime
    
    istmxrein's avatar
    istmxrein committed
    import pika
    import pika.exceptions
    from pydantic import BaseModel
    
    
    from pilab.events import utils
    
    istmxrein's avatar
    istmxrein committed
    
    
    logger = logging.getLogger(__name__)
    
    istmxrein's avatar
    istmxrein committed
    
    
    STREAM_NAME = "meta.cube"
    
    
    
    def convert_to_optional(schema):
    
    istmxrein's avatar
    istmxrein committed
        return {k: Optional[v] for k, v in schema.__annotations__.items()}
    
    istmxrein's avatar
    istmxrein committed
    
    
    class Host(BaseModel):
        """
        representation of host object with id
        """
        id: int
        mac: str
        ipv4_address: IPv4Address
        hostname: str
    
    Alfred Ashur Oshana's avatar
    Alfred Ashur Oshana committed
        model: str
    
    istmxrein's avatar
    istmxrein committed
    
    
    class Switch(Host):
        """
        representation of host object with id
        """
        poe: bool
    
    class Controller(Host):
        """
        representation of host object with id
        """
    
    Alfred Ashur Oshana's avatar
    Alfred Ashur Oshana committed
        pass
    
    istmxrein's avatar
    istmxrein committed
    
    
    class HostUpdate(Host):
        """
        representation of host for update
        """
        __annotations__ = convert_to_optional(Host)
    
    
    
    class Pi_old(Host):
    
    istmxrein's avatar
    istmxrein committed
        """
        representation of pi object with host id
        """
        serial: bytes
        position: int
        ssh_host_ed25519_key: Optional[str]
    
    Alfred Ashur Oshana's avatar
    Alfred Ashur Oshana committed
    class Pi_legacy(Host):
    
        """
        representation of pi object with host id
        """
        serial: bytes
        position: int
        key_id: Optional[int]
        image_id: Optional[int]
        user_data_id: Optional[int]
        playbook_id: Optional[int]
        cube_id: int
    
    Alfred Ashur Oshana's avatar
    Alfred Ashur Oshana committed
    class Pi(Host):
        """
        representation of pi object with host id
        """
        serial: bytes
        position: int
        cube_id: int
    
    
    class PiUpdate(HostUpdate):
        """
        representation of pi object for update
        """
        __annotations__ = convert_to_optional(Pi)
    
    istmxrein's avatar
    istmxrein committed
    
    
    class Cube(BaseModel):
        """
        representation of cube object with host ids
        """
        id: int
    
        controller: Optional[Controller]
    
        pis: List[Pi]
    
    class UserData(BaseModel):
        id: int
        owner: str
        data: str
    
    class Image(BaseModel):
        id: int
        name: str
        version: str
        owner: Optional[str]
        size: Optional[int]
        public: bool
        cloud_init: bool
        script_id: Optional[int]
        change: datetime
    
    
    
    class ImageBinding(BaseModel):
        """
        representation of Image Binding object
        """
    
    Alfred Ashur Oshana's avatar
    Alfred Ashur Oshana committed
        pi_id: int
        image_id: Optional[int]
        user_data_id: Optional[int]
    
    class Script(BaseModel):
        id: int
        name: str
        owner: str
        read_only: bool
        script: str
        script_chroot: str
    
    
    class Key(BaseModel):
        """
        representation of key object
        """
        id: int
        host_key: str
        owner: str
    
    class KeyBinding(BaseModel):
        """
        representation of key_binding object
        """
        pi_id: int
        key_id: str
    
    Alfred Ashur Oshana's avatar
    Alfred Ashur Oshana committed
    
    
    class EventType(str, Enum):
        CREATE = 'create'
        UPDATE = 'update'
        DELETE = 'delete'
    
    istmxrein's avatar
    istmxrein committed
    
    
    class Event(BaseModel):
        type: EventType
        timestamp: float
        payload: Cube
    
    
    
    istmxrein's avatar
    istmxrein committed
    class MetaListener(threading.Thread):
    
    istmxrein's avatar
    istmxrein committed
        def __init__(self, callback: Callable, queue: str = STREAM_NAME, offset="first"):
    
    istmxrein's avatar
    istmxrein committed
            super(MetaListener, self).__init__()
    
    istmxrein's avatar
    istmxrein committed
            self.queue: str = queue
            self.callback: Callable = callback
    
    istmxrein's avatar
    istmxrein committed
            self.offset = offset
    
    istmxrein's avatar
    istmxrein committed
            self.channel = None
            self.connection = None
    
    istmxrein's avatar
    istmxrein committed
            self._prefetch_count = 1
    
    istmxrein's avatar
    istmxrein committed
        def default_callback(self, ch, method, properties, body):
            try:
                event = Event.parse_obj(json.loads(body.decode('utf-8')))
    
    istmxrein's avatar
    istmxrein committed
                offset = properties.headers['x-stream-offset']
                self.callback(event, offset)
    
    istmxrein's avatar
    istmxrein committed
                self.channel.basic_ack(method.delivery_tag)
    
    istmxrein's avatar
    istmxrein committed
                self.offset = offset
    
    istmxrein's avatar
    istmxrein committed
                logger.info(f"Event processed succesful")
            except Exception as e:
                traceback.print_exc()
    
    istmxrein's avatar
    istmxrein committed
                logger.error(f"Failed to process Meta event; message {e}")
                raise
    
    istmxrein's avatar
    istmxrein committed
    
        def run(self):
            while True:
                try:
    
                    self.connection = utils.get_blocking_connection()
    
    istmxrein's avatar
    istmxrein committed
                    self.channel = self.connection.channel()
    
    istmxrein's avatar
    istmxrein committed
                    self.channel.queue_declare(queue=self.queue, durable=True, exclusive=False, auto_delete=False,
                                               arguments={"x-queue-type": "stream"})
                    self.channel.basic_qos(prefetch_count=self._prefetch_count)
                    self.channel.basic_consume(queue=self.queue, on_message_callback=self.default_callback, auto_ack=False,
                                               arguments={"x-stream-offset": self.offset})
    
    
    istmxrein's avatar
    istmxrein committed
                    try:
                        self.channel.start_consuming()
                    except SystemExit:
                        logger.info(f"System exit. Close Broker Connection...")
                        self.channel.stop_consuming()
                        self.connection.close()
                        break
                except pika.exceptions.ConnectionClosedByBroker:
    
                    logger.error(f"Connection was closed, retrying...")
                    time.sleep(1)
    
    istmxrein's avatar
    istmxrein committed
                    continue
                # Do not recover on channel errors
                except pika.exceptions.AMQPChannelError as err:
                    logger.error(f"Caught a channel error: {err}, stopping...")
    
    istmxrein's avatar
    istmxrein committed
                    continue
    
    istmxrein's avatar
    istmxrein committed
                # Recover on all other connection errors
                except pika.exceptions.AMQPConnectionError:
    
                    logger.error(f"Connection closed unexpected, retrying...")
                    time.sleep(1)
    
    istmxrein's avatar
    istmxrein committed
                    continue
    
    istmxrein's avatar
    istmxrein committed
                except Exception:
                    logger.error(f"Unexpected error occured, retrying...")
                    time.sleep(1)
                    continue