Skip to content
Snippets Groups Projects
Commit 45f3e7f4 authored by istmxrein's avatar istmxrein
Browse files

fix bugs bump to 1.1.1

parent 77469ec2
Branches
Tags
No related merge requests found
import functools
import json
import logging
import threading
import time
import traceback
from datetime import datetime
from enum import Enum
from typing import List, Callable, Optional
from typing import List, Callable, Optional, Dict
import pika
import pika.exceptions
......@@ -101,7 +102,7 @@ def send_switch_event(mac: str, state: State):
class PowerConsumer(object):
def __init__(self, amqp_url, queues: List[str], callback: Callable):
def __init__(self, amqp_url, queues: List[str], callback: Callable, arguments: Dict):
"""Create a new instance of the consumer class, passing in the AMQP
URL used to connect to RabbitMQ.
......@@ -123,6 +124,7 @@ class PowerConsumer(object):
self.callback = callback
self.queues = queues
self.arguments = arguments
def connect(self):
"""This method connects to RabbitMQ, returning the connection handle.
......@@ -226,7 +228,10 @@ class PowerConsumer(object):
logger.info('Issuing consumer related RPC commands')
self.add_on_cancel_callback()
_consumer_tag = self._channel.basic_consume(
queue_name, self.on_message, auto_ack=False)
queue_name,
self.on_message,
auto_ack=False,
arguments=self.arguments)
self._consumer_tags.append(_consumer_tag)
self.was_consuming = True
self._consuming = True
......@@ -272,6 +277,7 @@ class PowerConsumer(object):
for tag in self._consumer_tags:
cb = functools.partial(self.on_cancelok, userdata=tag)
self._channel.basic_cancel(tag, cb)
self._consumer_tags.remove(tag)
def on_cancelok(self, _unused_frame, userdata):
if not self._consumer_tags:
......@@ -317,16 +323,22 @@ class PowerConsumer(object):
logger.info('Stopped')
class ReconnectingPowerConsumer(object):
class ReconnectingPowerConsumer(threading.Thread):
"""This is an example consumer that will reconnect if the nested
ExampleConsumer indicates that a reconnect is necessary.
"""
def __init__(self, queues: List[str], callback: Callable, amqp_url=utils.BROKER_URL):
def __init__(self, queues: List[str], callback: Callable, amqp_url: str = utils.BROKER_URL,
arguments: Dict = {"x-stream-offset": "last"}):
super(ReconnectingPowerConsumer, self).__init__()
self._reconnect_delay = 0
self._amqp_url = amqp_url
self._consumer = PowerConsumer(self._amqp_url, queues, callback)
self._callback = callback
self._queues = queues
self._arguments = arguments
self._consumer = PowerConsumer(amqp_url=self._amqp_url, queues=self._queues, callback=self._callback,
arguments=self._arguments)
def run(self):
while True:
......@@ -343,7 +355,8 @@ class ReconnectingPowerConsumer(object):
reconnect_delay = self._get_reconnect_delay()
logger.info('Reconnecting after %d seconds', reconnect_delay)
time.sleep(reconnect_delay)
self._consumer = PowerConsumer(self._amqp_url)
self._consumer = PowerConsumer(amqp_url=self._amqp_url, queues=self._queues, callback=self._callback,
arguments=self._arguments)
def _get_reconnect_delay(self):
if self._consumer.was_consuming:
......
......@@ -2,7 +2,7 @@ from setuptools import setup, find_packages
setup(
name='pilab',
version='1.1.0',
version='1.1.1',
description='Shared-Libs for the pi-lab microservices',
url='https://code.fbi.h-da.de/api/v4/projects/27896/packages/pypi/pilab',
author='Max Reinheimer',
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment