Skip to content
Snippets Groups Projects
Commit efd3437c authored by istmxrein's avatar istmxrein
Browse files

make default consumer reconnect on error remove ReconnectConsumer & bump to 1.2.1

parent 45f3e7f4
No related branches found
No related tags found
No related merge requests found
import functools
import json
import logging
import threading
import time
import traceback
from datetime import datetime
from enum import Enum
......@@ -33,6 +31,12 @@ class State(str, Enum):
failed = "failed"
class Type(str, Enum):
controller = "controller"
pi = "pi"
switch = "switch"
class Event(BaseModel):
mac: str
timestamp: float = datetime.utcnow().timestamp()
......@@ -102,7 +106,8 @@ def send_switch_event(mac: str, state: State):
class PowerConsumer(object):
def __init__(self, amqp_url, queues: List[str], callback: Callable, arguments: Dict):
def __init__(self, queues: List[str], callback: Callable, type: Type, amqp_url: str = utils.BROKER_URL,
arguments: Dict = {"x-stream-offset": "last"}):
"""Create a new instance of the consumer class, passing in the AMQP
URL used to connect to RabbitMQ.
......@@ -122,6 +127,7 @@ class PowerConsumer(object):
# for higher consumer throughput
self._prefetch_count = 10
self.type = type
self.callback = callback
self.queues = queues
self.arguments = arguments
......@@ -184,7 +190,8 @@ class PowerConsumer(object):
self._channel = channel
self.add_on_channel_close_callback()
for q in self.queues:
self.setup_queue(q)
queue_name = f'host.{self.type}.{q}'
self.setup_queue(queue_name)
def add_on_channel_close_callback(self):
logger.info('Adding channel close callback')
......@@ -277,9 +284,9 @@ class PowerConsumer(object):
for tag in self._consumer_tags:
cb = functools.partial(self.on_cancelok, userdata=tag)
self._channel.basic_cancel(tag, cb)
self._consumer_tags.remove(tag)
def on_cancelok(self, _unused_frame, userdata):
self._consumer_tags.remove(userdata)
if not self._consumer_tags:
self._consuming = False
logger.info('RabbitMQ acknowledged the cancellation of all consumers')
......@@ -294,12 +301,15 @@ class PowerConsumer(object):
self._channel.close()
def run(self):
"""Run the example consumer by connecting to RabbitMQ and then
starting the IOLoop to block and allow the SelectConnection to operate.
"""
self._connection = self.connect()
self._connection.ioloop.start()
while True:
try:
self._connection = self.connect()
self._connection.ioloop.start()
except (KeyboardInterrupt, SystemExit):
self.stop()
break
if not self.should_reconnect:
break
def stop(self):
"""Cleanly shutdown the connection to RabbitMQ by stopping the consumer
......@@ -321,48 +331,3 @@ class PowerConsumer(object):
else:
self._connection.ioloop.stop()
logger.info('Stopped')
class ReconnectingPowerConsumer(threading.Thread):
"""This is an example consumer that will reconnect if the nested
ExampleConsumer indicates that a reconnect is necessary.
"""
def __init__(self, queues: List[str], callback: Callable, amqp_url: str = utils.BROKER_URL,
arguments: Dict = {"x-stream-offset": "last"}):
super(ReconnectingPowerConsumer, self).__init__()
self._reconnect_delay = 0
self._amqp_url = amqp_url
self._callback = callback
self._queues = queues
self._arguments = arguments
self._consumer = PowerConsumer(amqp_url=self._amqp_url, queues=self._queues, callback=self._callback,
arguments=self._arguments)
def run(self):
while True:
try:
self._consumer.run()
except (KeyboardInterrupt, SystemExit):
self._consumer.stop()
break
self._maybe_reconnect()
def _maybe_reconnect(self):
if self._consumer.should_reconnect:
self._consumer.stop()
reconnect_delay = self._get_reconnect_delay()
logger.info('Reconnecting after %d seconds', reconnect_delay)
time.sleep(reconnect_delay)
self._consumer = PowerConsumer(amqp_url=self._amqp_url, queues=self._queues, callback=self._callback,
arguments=self._arguments)
def _get_reconnect_delay(self):
if self._consumer.was_consuming:
self._reconnect_delay = 0
else:
self._reconnect_delay += 1
if self._reconnect_delay > 30:
self._reconnect_delay = 30
return self._reconnect_delay
......@@ -2,7 +2,7 @@ from setuptools import setup, find_packages
setup(
name='pilab',
version='1.1.1',
version='1.2.1',
description='Shared-Libs for the pi-lab microservices',
url='https://code.fbi.h-da.de/api/v4/projects/27896/packages/pypi/pilab',
author='Max Reinheimer',
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment