You need to sign in or sign up before continuing.
Newer
Older
import traceback
from enum import Enum
from ipaddress import IPv4Address
import pika
import pika.exceptions
from pydantic import BaseModel
from pilab.events import utils
logger = logging.getLogger(__name__)
return {k: Optional[v] for k, v in schema.__annotations__.items()}
class Host(BaseModel):
"""
representation of host object with id
"""
id: int
mac: str
ipv4_address: IPv4Address
hostname: str
class Switch(Host):
"""
representation of host object with id
"""
model: str
poe: bool
class Controller(Host):
"""
representation of host object with id
"""
model: str
class HostUpdate(Host):
"""
representation of host for update
"""
__annotations__ = convert_to_optional(Host)
class Pi(Host):
"""
representation of pi object with host id
"""
serial: bytes
display: bool
position: int
ssh_host_ed25519_key: Optional[str]
class PiUpdate(HostUpdate):
"""
representation of pi object for update
"""
__annotations__ = convert_to_optional(Pi)
class Cube(BaseModel):
"""
representation of cube object with host ids
"""
id: int
controller: Optional[Controller]
switch: Switch
class EventType(str, Enum):
CREATE = 'create'
UPDATE = 'update'
DELETE = 'delete'
class Event(BaseModel):
type: EventType
timestamp: float
payload: Cube
def __init__(self, callback: Callable, queue: str = STREAM_NAME, offset="first"):
def default_callback(self, ch, method, properties, body):
try:
event = Event.parse_obj(json.loads(body.decode('utf-8')))
offset = properties.headers['x-stream-offset']
self.callback(event, offset)
logger.info(f"Event processed succesful")
except Exception as e:
traceback.print_exc()
logger.error(f"Failed to process Meta event; message {e}")
raise
self.connection = utils.get_blocking_connection()
self.channel.queue_declare(queue=self.queue, durable=True, exclusive=False, auto_delete=False,
arguments={"x-queue-type": "stream"})
self.channel.basic_qos(prefetch_count=self._prefetch_count)
self.channel.basic_consume(queue=self.queue, on_message_callback=self.default_callback, auto_ack=False,
arguments={"x-stream-offset": self.offset})
try:
self.channel.start_consuming()
except SystemExit:
logger.info(f"System exit. Close Broker Connection...")
self.channel.stop_consuming()
self.connection.close()
break
except pika.exceptions.ConnectionClosedByBroker:
logger.error(f"Connection was closed, retrying...")
time.sleep(1)
continue
# Do not recover on channel errors
except pika.exceptions.AMQPChannelError as err:
logger.error(f"Caught a channel error: {err}, stopping...")
# Recover on all other connection errors
except pika.exceptions.AMQPConnectionError:
logger.error(f"Connection closed unexpected, retrying...")
time.sleep(1)