diff --git a/pilab/events/meta.py b/pilab/events/meta.py index 2ea6b3f489ba1165fbaee11e64d76c2a8252e9b7..3c67b2a4e0e74cbb6c11402753d7f7d531141c4f 100644 --- a/pilab/events/meta.py +++ b/pilab/events/meta.py @@ -88,20 +88,20 @@ class MetaListener(threading.Thread): self.offset = offset self.channel = None self.connection = None - - self._prefetch_count = 2 + self._prefetch_count = 1 def default_callback(self, ch, method, properties, body): try: event = Event.parse_obj(json.loads(body.decode('utf-8'))) offset = properties.headers['x-stream-offset'] self.callback(event, offset) - ch.basic_ack(delivery_tag=method.delivery_tag) + self.channel.basic_ack(method.delivery_tag) self.offset = offset logger.info(f"Event processed succesful") except Exception as e: traceback.print_exc() - logger.error(f"Failed to process Event; message {e}") + logger.error(f"Failed to process Meta event; message {e}") + raise def run(self): while True: @@ -128,9 +128,13 @@ class MetaListener(threading.Thread): # Do not recover on channel errors except pika.exceptions.AMQPChannelError as err: logger.error(f"Caught a channel error: {err}, stopping...") - break + continue # Recover on all other connection errors except pika.exceptions.AMQPConnectionError: logger.error(f"Connection closed unexpected, retrying...") time.sleep(1) continue + except Exception: + logger.error(f"Unexpected error occured, retrying...") + time.sleep(1) + continue diff --git a/setup.py b/setup.py index 89f4aae968b5137ecbf8de79becbce3bfd44fe1c..57f612eb99e41ad2612be6e3fd2497950fcfa6e0 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ from setuptools import setup, find_packages setup( name='pilab', - version='3.4.2', + version='3.5.2', description='Shared-Libs for the pi-lab microservices', url='https://code.fbi.h-da.de/api/v4/projects/27896/packages/pypi/pilab', author='Max Reinheimer',