Newer
Older
import traceback
from enum import Enum
from ipaddress import IPv4Address
import pika
import pika.exceptions
from pydantic import BaseModel
from pilab.events import utils
logger = logging.getLogger(__name__)
return {k: Optional[v] for k, v in schema.__annotations__.items()}
class Host(BaseModel):
"""
representation of host object with id
"""
id: int
mac: str
ipv4_address: IPv4Address
hostname: str
class HostUpdate(Host):
"""
representation of host for update
"""
__annotations__ = convert_to_optional(Host)
class Pi(Host):
"""
representation of pi object with host id
"""
serial: bytes
display: bool
position: int
ssh_host_ed25519_key: Optional[str]
class PiUpdate(HostUpdate):
"""
representation of pi object for update
"""
__annotations__ = convert_to_optional(Pi)
class Cube(BaseModel):
"""
representation of cube object with host ids
"""
id: int
controller: Host
switch: Host
head: Pi
workers: List[Pi]
class EventType(str, Enum):
CREATE = 'create'
UPDATE = 'update'
DELETE = 'delete'
class Event(BaseModel):
type: EventType
timestamp: float
payload: Cube
def __init__(self, callback: Callable, queue: str = STREAM_NAME, offset="first"):
def default_callback(self, ch, method, properties, body):
try:
event = Event.parse_obj(json.loads(body.decode('utf-8')))
offset = properties.headers['x-stream-offset']
self.callback(event, offset)
logger.info(f"Event processed succesful")
except Exception as e:
traceback.print_exc()
logger.error(f"Failed to process Meta event; message {e}")
raise
self.connection = utils.get_blocking_connection()
self.channel.queue_declare(queue=self.queue, durable=True, exclusive=False, auto_delete=False,
arguments={"x-queue-type": "stream"})
self.channel.basic_qos(prefetch_count=self._prefetch_count)
self.channel.basic_consume(queue=self.queue, on_message_callback=self.default_callback, auto_ack=False,
arguments={"x-stream-offset": self.offset})
try:
self.channel.start_consuming()
except SystemExit:
logger.info(f"System exit. Close Broker Connection...")
self.channel.stop_consuming()
self.connection.close()
break
except pika.exceptions.ConnectionClosedByBroker:
logger.error(f"Connection was closed, retrying...")
time.sleep(1)
continue
# Do not recover on channel errors
except pika.exceptions.AMQPChannelError as err:
logger.error(f"Caught a channel error: {err}, stopping...")
# Recover on all other connection errors
except pika.exceptions.AMQPConnectionError:
logger.error(f"Connection closed unexpected, retrying...")
time.sleep(1)