Skip to content
Snippets Groups Projects
Commit 32c7e404 authored by istmxrein's avatar istmxrein
Browse files

fix bug in MetaListener

parent c099b76a
Tags
No related merge requests found
...@@ -81,19 +81,23 @@ class Event(BaseModel): ...@@ -81,19 +81,23 @@ class Event(BaseModel):
class MetaListener(threading.Thread): class MetaListener(threading.Thread):
def __init__(self, callback: Callable, queue: str = STREAM_NAME, offset = "last"): def __init__(self, callback: Callable, queue: str = STREAM_NAME, offset="first"):
super(MetaListener, self).__init__() super(MetaListener, self).__init__()
self.queue: str = queue self.queue: str = queue
self.callback: Callable = callback self.callback: Callable = callback
self.arguments = {"x-stream-offset": offset} self.offset = offset
self.channel = None self.channel = None
self.connection = None self.connection = None
self._prefetch_count = 2
def default_callback(self, ch, method, properties, body): def default_callback(self, ch, method, properties, body):
try: try:
event = Event.parse_obj(json.loads(body.decode('utf-8'))) event = Event.parse_obj(json.loads(body.decode('utf-8')))
self.callback(event) offset = properties.headers['x-stream-offset']
self.callback(event, offset)
ch.basic_ack(delivery_tag=method.delivery_tag) ch.basic_ack(delivery_tag=method.delivery_tag)
self.offset = offset
logger.info(f"Event processed succesful") logger.info(f"Event processed succesful")
except Exception as e: except Exception as e:
traceback.print_exc() traceback.print_exc()
...@@ -104,8 +108,12 @@ class MetaListener(threading.Thread): ...@@ -104,8 +108,12 @@ class MetaListener(threading.Thread):
try: try:
self.connection = utils.get_blocking_connection() self.connection = utils.get_blocking_connection()
self.channel = self.connection.channel() self.channel = self.connection.channel()
self.channel.queue_declare(queue=self.queue, durable=True, exclusive=False, auto_delete=False, arguments={"x-queue-type": "stream"}) self.channel.queue_declare(queue=self.queue, durable=True, exclusive=False, auto_delete=False,
self.channel.basic_consume(queue=self.queue, on_message_callback=self.default_callback, auto_ack=False, arguments=self.arguments) arguments={"x-queue-type": "stream"})
self.channel.basic_qos(prefetch_count=self._prefetch_count)
self.channel.basic_consume(queue=self.queue, on_message_callback=self.default_callback, auto_ack=False,
arguments={"x-stream-offset": self.offset})
try: try:
self.channel.start_consuming() self.channel.start_consuming()
except SystemExit: except SystemExit:
...@@ -126,3 +134,4 @@ class MetaListener(threading.Thread): ...@@ -126,3 +134,4 @@ class MetaListener(threading.Thread):
logger.error(f"Connection closed unexpected, retrying...") logger.error(f"Connection closed unexpected, retrying...")
time.sleep(1) time.sleep(1)
continue continue
...@@ -2,7 +2,7 @@ from setuptools import setup, find_packages ...@@ -2,7 +2,7 @@ from setuptools import setup, find_packages
setup( setup(
name='pilab', name='pilab',
version='3.0.1', version='3.1.1',
description='Shared-Libs for the pi-lab microservices', description='Shared-Libs for the pi-lab microservices',
url='https://code.fbi.h-da.de/api/v4/projects/27896/packages/pypi/pilab', url='https://code.fbi.h-da.de/api/v4/projects/27896/packages/pypi/pilab',
author='Max Reinheimer', author='Max Reinheimer',
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment