Skip to content
Snippets Groups Projects
Commit 480463b1 authored by istmxrein's avatar istmxrein
Browse files

refactor logging & bump version to 1.2.3

parent 045d6881
No related branches found
No related tags found
No related merge requests found
......@@ -159,7 +159,7 @@ class PowerConsumer(threading.Thread):
self._connection.close()
def on_connection_open(self, _unused_connection):
logger.info('Connection opened')
logger.debug('Connection opened')
self.open_channel()
def on_connection_open_error(self, _unused_connection, err):
......@@ -184,21 +184,18 @@ class PowerConsumer(threading.Thread):
self.stop()
def open_channel(self):
logger.info('Creating a new channel')
logger.debug('Creating a new channel')
self._connection.channel(on_open_callback=self.on_channel_open)
def on_channel_open(self, channel):
logger.info('Channel opened')
logger.debug('Channel opened')
self._channel = channel
self.add_on_channel_close_callback()
self._channel.add_on_close_callback(self.on_channel_closed)
self._channel.add_on_cancel_callback(self.on_consumer_cancelled)
for q in self.queues:
queue_name = f'host.{self.type}.{q}'
self.setup_queue(queue_name)
def add_on_channel_close_callback(self):
logger.info('Adding channel close callback')
self._channel.add_on_close_callback(self.on_channel_closed)
def on_channel_closed(self, channel, reason):
logger.warning('Channel %i was closed: %s', channel, reason)
self.close_connection()
......@@ -211,7 +208,7 @@ class PowerConsumer(threading.Thread):
:param str|unicode queue_name: The name of the queue to declare.
"""
logger.info('Declaring queue %s', queue_name)
logger.debug('Declaring queue %s', queue_name)
cb = functools.partial(self.on_queue_declareok, queue_name=queue_name)
self._channel.queue_declare(queue=queue_name,
durable=True,
......@@ -230,12 +227,11 @@ class PowerConsumer(threading.Thread):
prefetch_count=self._prefetch_count, callback=cb)
def on_basic_qos_ok(self, _unused_frame, queue_name):
logger.info('QOS set to: %d', self._prefetch_count)
logger.debug('QOS set to: %d', self._prefetch_count)
self.start_consuming(queue_name)
def start_consuming(self, queue_name):
logger.info('Issuing consumer related RPC commands')
self.add_on_cancel_callback()
logger.debug('Issuing consumer related RPC commands')
_consumer_tag = self._channel.basic_consume(
queue_name,
self.on_message,
......@@ -245,15 +241,6 @@ class PowerConsumer(threading.Thread):
self.was_consuming = True
self._consuming = True
def add_on_cancel_callback(self):
"""Add a callback that will be invoked if RabbitMQ cancels the consumer
for some reason. If RabbitMQ does cancel the consumer,
on_consumer_cancelled will be invoked by pika.
"""
logger.info('Adding consumer cancellation callback')
self._channel.add_on_cancel_callback(self.on_consumer_cancelled)
def on_consumer_cancelled(self, method_frame):
"""Invoked by pika when RabbitMQ sends a Basic.Cancel for a consumer
receiving messages.
......
......@@ -2,7 +2,7 @@ from setuptools import setup, find_packages
setup(
name='pilab',
version='1.2.2',
version='1.2.3',
description='Shared-Libs for the pi-lab microservices',
url='https://code.fbi.h-da.de/api/v4/projects/27896/packages/pypi/pilab',
author='Max Reinheimer',
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment