Skip to content
Snippets Groups Projects
Commit b4c0ff7c authored by Alfred Ashur Oshana's avatar Alfred Ashur Oshana
Browse files

feat: single pi reservation

parent a15f0e7b
No related branches found
No related tags found
No related merge requests found
......@@ -22,7 +22,7 @@ logging.getLogger("pika").setLevel(logging.WARNING)
class Reservation(BaseModel):
owner: str
cube_id: int
pi_id : int
starttime: datetime
endtime: datetime
extraUsers: Optional[List[str]] = None
......@@ -38,7 +38,7 @@ class DateTimeEncoder(JSONEncoder):
def send_event(event: Reservation):
routing_key = f"res.cube.{event.cube_id}"
routing_key = f"res.pi.{event.pi_id}"
try:
connection = utils.get_blocking_connection()
channel = connection.channel()
......@@ -69,7 +69,7 @@ def send_event(event: Reservation):
class ReservationConsumer(threading.Thread):
def __init__(self, cubes: List[int], callback: Callable, amqp_url: str = utils.BROKER_URL,
def __init__(self, pis: List[int], callback: Callable, amqp_url: str = utils.BROKER_URL,
arguments: Dict = {"x-stream-offset": "last"}):
"""Create a new instance of the consumer class, passing in the AMQP
URL used to connect to RabbitMQ.
......@@ -92,7 +92,7 @@ class ReservationConsumer(threading.Thread):
self._prefetch_count = 10
self.callback = callback
self.cubes = cubes
self.pis = pis
self.arguments = arguments
def connect(self):
......@@ -153,8 +153,9 @@ class ReservationConsumer(threading.Thread):
self._channel = channel
self._channel.add_on_close_callback(self.on_channel_closed)
self._channel.add_on_cancel_callback(self.on_consumer_cancelled)
for cube in self.cubes:
queue_name = f'res.cube.{str(cube)}'
for pi in self.pis:
#TODO queue_name = f'res.cube.{str(cube)}'
queue_name = f'res.pi.{str(pi)}'
self.setup_queue(queue_name)
def on_channel_closed(self, channel, reason):
......@@ -290,13 +291,13 @@ class ReservationConsumer(threading.Thread):
logger.info('ReservationConsumer stopped')
cube_reservations: Dict[int, Reservation] = {}
pi_reservations: Dict[int, Reservation] = {}
res_thread: ReservationConsumer = None
custom_callback: Callable = None
def get_active_users(cube_id: int):
res: Reservation = cube_reservations.get(cube_id)
def get_active_users(pi_id: int):
res: Reservation = pi_reservations.get(pi_id)
if not res_thread:
raise RuntimeError("Reservation consumer is not initialized")
......@@ -309,8 +310,8 @@ def get_active_users(cube_id: int):
return []
def get_active_reservation(cube_id: int):
res: Reservation = cube_reservations.get(cube_id)
def get_active_reservation(pi_id: int):
res: Reservation = pi_reservations.get(pi_id)
if not res_thread:
raise RuntimeError("Reservation consumer is not initialized")
......@@ -327,19 +328,19 @@ def restart_reservation_consumer(ids: List[int], callback: Callable = None):
custom_callback = callback
def res_event_callback(res: Reservation):
logger.info(f'Received Reservation for cube {res.cube_id}; {res}')
cube_reservations[res.cube_id] = res
logger.info(f'Received Reservation for Pi {res.pi_id}; {res}')
pi_reservations[res.pi_id] = res
if custom_callback:
custom_callback(res)
if not res_thread:
res_thread = ReservationConsumer(cubes=ids, callback=res_event_callback)
res_thread = ReservationConsumer(pis=ids, callback=res_event_callback)
res_thread.start()
elif Counter(ids) != Counter(res_thread.cubes):
res_thread.stop()
res_thread.join()
res_thread = ReservationConsumer(cubes=ids, callback=res_event_callback)
res_thread = ReservationConsumer(pis=ids, callback=res_event_callback)
res_thread.start()
return res_thread
......@@ -2,7 +2,7 @@ from setuptools import setup, find_packages
setup(
name='pilab',
version='4.1.0',
version='4.2.0',
description='Shared-Libs for the pi-lab microservices',
url='https://code.fbi.h-da.de/api/v4/projects/27896/packages/pypi/pilab',
author='Max Reinheimer',
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment