Skip to content
Snippets Groups Projects
Commit c06f90a9 authored by istmxrein's avatar istmxrein
Browse files

update meta consumer

parent 570244fe
No related branches found
No related tags found
No related merge requests found
...@@ -88,20 +88,20 @@ class MetaListener(threading.Thread): ...@@ -88,20 +88,20 @@ class MetaListener(threading.Thread):
self.offset = offset self.offset = offset
self.channel = None self.channel = None
self.connection = None self.connection = None
self._prefetch_count = 1
self._prefetch_count = 2
def default_callback(self, ch, method, properties, body): def default_callback(self, ch, method, properties, body):
try: try:
event = Event.parse_obj(json.loads(body.decode('utf-8'))) event = Event.parse_obj(json.loads(body.decode('utf-8')))
offset = properties.headers['x-stream-offset'] offset = properties.headers['x-stream-offset']
self.callback(event, offset) self.callback(event, offset)
ch.basic_ack(delivery_tag=method.delivery_tag) self.channel.basic_ack(method.delivery_tag)
self.offset = offset self.offset = offset
logger.info(f"Event processed succesful") logger.info(f"Event processed succesful")
except Exception as e: except Exception as e:
traceback.print_exc() traceback.print_exc()
logger.error(f"Failed to process Event; message {e}") logger.error(f"Failed to process Meta event; message {e}")
raise
def run(self): def run(self):
while True: while True:
...@@ -128,9 +128,13 @@ class MetaListener(threading.Thread): ...@@ -128,9 +128,13 @@ class MetaListener(threading.Thread):
# Do not recover on channel errors # Do not recover on channel errors
except pika.exceptions.AMQPChannelError as err: except pika.exceptions.AMQPChannelError as err:
logger.error(f"Caught a channel error: {err}, stopping...") logger.error(f"Caught a channel error: {err}, stopping...")
break continue
# Recover on all other connection errors # Recover on all other connection errors
except pika.exceptions.AMQPConnectionError: except pika.exceptions.AMQPConnectionError:
logger.error(f"Connection closed unexpected, retrying...") logger.error(f"Connection closed unexpected, retrying...")
time.sleep(1) time.sleep(1)
continue continue
except Exception:
logger.error(f"Unexpected error occured, retrying...")
time.sleep(1)
continue
...@@ -2,7 +2,7 @@ from setuptools import setup, find_packages ...@@ -2,7 +2,7 @@ from setuptools import setup, find_packages
setup( setup(
name='pilab', name='pilab',
version='3.4.2', version='3.5.2',
description='Shared-Libs for the pi-lab microservices', description='Shared-Libs for the pi-lab microservices',
url='https://code.fbi.h-da.de/api/v4/projects/27896/packages/pypi/pilab', url='https://code.fbi.h-da.de/api/v4/projects/27896/packages/pypi/pilab',
author='Max Reinheimer', author='Max Reinheimer',
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment