Skip to content
Snippets Groups Projects
Commit 469e43e5 authored by istmxrein's avatar istmxrein
Browse files

Add functions to dispatch and consume Events regarding power state

parent 5ab1bc1c
No related branches found
No related tags found
No related merge requests found
import json
import logging
import os
import threading
import traceback
from enum import Enum
......@@ -11,14 +10,9 @@ import pika
import pika.exceptions
from pydantic import BaseModel
logger = logging.getLogger(__name__)
logging.getLogger("pika").setLevel(logging.WARNING)
BROKER_ADDRESS = os.getenv('BROKER_ADDRESS') or 'localhost'
BROKER_USER = os.getenv('BROKER_USER') or None
BROKER_PASSWORD = os.getenv('BROKER_PASSWORD') or None
from pilab.events import utils
custom_callback: Callable
logger = logging.getLogger(__name__)
class EventType(str, Enum):
......@@ -67,36 +61,10 @@ class Event(BaseModel):
pass
def test_connection():
try:
# Create a connection to the broker
connection = get_connection()
# Declare a channel and a queue
channel = connection.channel()
# Close the connection
connection.close()
except Exception as e:
logger.error(f"Failed to connect to broker; message: {e}")
raise
def get_connection():
if BROKER_USER is not None:
credentials = pika.PlainCredentials(username=BROKER_USER, password=BROKER_PASSWORD)
parameters = pika.ConnectionParameters(host=BROKER_ADDRESS, credentials=credentials)
return pika.BlockingConnection(parameters)
else:
parameters = pika.ConnectionParameters(host=BROKER_ADDRESS)
return pika.BlockingConnection(parameters)
class EventListener(threading.Thread):
def __init__(self, exchange: str, queue: str, callback: Callable):
super(EventListener, self).__init__()
self._is_interrupted = False
self.exchange: str = exchange
self.queue: str = queue
self.callback: Callable = callback
......@@ -116,7 +84,7 @@ class EventListener(threading.Thread):
def run(self):
while True:
try:
self.connection = get_connection()
self.connection = utils.get_blocking_connection()
self.channel = self.connection.channel()
self.channel.exchange_declare(exchange=self.exchange, exchange_type='fanout')
self.channel.queue_declare(queue=self.queue, durable=True, exclusive=False, auto_delete=False)
......
import json
import logging
import threading
import traceback
from datetime import datetime
from enum import Enum
from typing import List, Callable, Optional
import pika
import pika.exceptions
from fastapi.encoders import jsonable_encoder
from pydantic import BaseModel
from pilab.events import utils
logger = logging.getLogger(__name__)
logging.getLogger("pika").setLevel(logging.WARNING)
class State(str, Enum):
starting = "start kernel"
workflow = "fetch workflow"
download = "downloading image"
pre_boot_config = "pre boot configuration"
boot_image = "boot image"
cloud_init = "cloud_init configuration"
started = "started"
configuration = "configuring image"
inactive = "inactive"
failed = "failed"
class Event(BaseModel):
mac: str
timestamp: float = datetime.utcnow().timestamp()
state: State
percentage: Optional[int]
payload: Optional[str]
pass
class HostListener(threading.Thread):
def __init__(self, hosts: List[str], callback: Callable):
super(HostListener, self).__init__()
self.hosts = hosts
self.callback = callback
def callback(self, ch, method, properties, body):
try:
event = Event.parse_obj(json.loads(body.decode('utf-8')))
self.callback(event)
ch.basic_ack(delivery_tag=method.delivery_tag)
logger.info(f"Event processed successful")
except Exception as e:
traceback.print_exc()
logger.error(f"Failed to process Event; message {e}")
def on_open(self, connection):
connection.channel(on_open_callback=self.on_channel_open)
def on_channel_open(self, channel):
for host in self.hosts:
channel.queue_declare(
queue=host,
durable=True,
arguments={"x-queue-type": "stream"}
)
channel.basic_consume(queue=host, on_message_callback=self.callback, auto_ack=False,
arguments={"x-stream-offset": "first"})
def run(self):
while True:
connection = utils.get_select_connection(self.on_open)
try:
connection = utils.get_select_connection(self.on_open)
connection.ioloop.start()
except SystemExit:
logger.info(f"System exit. Close Broker Connection...")
connection.close()
break
except pika.exceptions.ConnectionClosedByBroker:
continue
# Do not recover on channel errors
except pika.exceptions.AMQPChannelError as err:
logger.error(f"Caught a channel error: {err}, stopping...")
break
# Recover on all other connection errors
except pika.exceptions.AMQPConnectionError:
logger.error(f"Connection was closed, retrying...")
continue
def send_event(routing_key: str, event: Event):
try:
connection = utils.get_blocking_connection()
channel = connection.channel()
channel.queue_declare(
queue=routing_key,
durable=True,
arguments={"x-queue-type": "stream"}
)
event_serialized = json.dumps(jsonable_encoder(event)).encode('utf-8')
channel.basic_publish(
exchange='',
routing_key=routing_key,
body=event_serialized)
connection.close()
except pika.exceptions.UnroutableError as e:
print(f'Could not send event to Broker; message {e}')
raise
except Exception as e:
logger.error(f'Error sending host event; message: {e}')
raise
def send_pi_event(mac: str, type: State, payload: str = None, percentage: int = None):
routing_key = f"host.pi.{mac}"
event = Event(
mac=mac,
state=type,
percentage=percentage,
payload=payload
)
send_event(routing_key=routing_key, event=event)
def send_controller_event(mac: str, state: State):
routing_key = f"host.controller.{mac}"
event = Event(
mac=mac,
state=state,
)
send_event(routing_key=routing_key, event=event)
def send_switch_event(mac: str, state: State):
routing_key = f"host.switch.{mac}"
event = Event(
mac=mac,
state=state,
)
send_event(routing_key=routing_key, event=event)
import logging
import os
import pika
logger = logging.getLogger(__name__)
logging.getLogger("pika").setLevel(logging.WARNING)
BROKER_ADDRESS = os.getenv('BROKER_ADDRESS') or 'localhost'
BROKER_USER = os.getenv('BROKER_USER') or None
BROKER_PASSWORD = os.getenv('BROKER_PASSWORD') or None
def test_connection():
try:
# Create a connection to the broker
connection = get_blocking_connection()
# Declare a channel and a queue
channel = connection.channel()
# Close the connection
connection.close()
except Exception as e:
logger.error(f"Failed to connect to broker; message: {e}")
raise
def get_blocking_connection():
if BROKER_USER is not None:
credentials = pika.PlainCredentials(username=BROKER_USER, password=BROKER_PASSWORD)
parameters = pika.ConnectionParameters(host=BROKER_ADDRESS, credentials=credentials)
return pika.BlockingConnection(parameters)
else:
parameters = pika.ConnectionParameters(host=BROKER_ADDRESS)
return pika.BlockingConnection(parameters)
def get_select_connection(on_open_callback):
if BROKER_USER is not None:
credentials = pika.PlainCredentials(username=BROKER_USER, password=BROKER_PASSWORD)
parameters = pika.ConnectionParameters(host=BROKER_ADDRESS, credentials=credentials)
return pika.SelectConnection(parameters=parameters, on_open_callback=on_open_callback)
else:
parameters = pika.ConnectionParameters(host=BROKER_ADDRESS)
return pika.SelectConnection(parameters=parameters, on_open_callback=on_open_callback)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment