Skip to content
Snippets Groups Projects
Commit ffe16f8c authored by Fabian Seidl's avatar Fabian Seidl
Browse files

multiple consumers possible

parent 4be61b62
No related branches found
No related tags found
1 merge request!1217Fix multiple rabbitmq consumers receiving all messages
Pipeline #262405 failed
This commit is part of merge request !1217. Comments created here will be created in the context of that merge request.
...@@ -77,15 +77,38 @@ func (e *Service) SetupEventReciever(stopChan chan os.Signal) { ...@@ -77,15 +77,38 @@ func (e *Service) SetupEventReciever(stopChan chan os.Signal) {
// setupQueueConsume sets up a connection to the broker, listens to messages on a specific topic // setupQueueConsume sets up a connection to the broker, listens to messages on a specific topic
// and writes them into a channel. // and writes them into a channel.
func (e *Service) setupQueueConsume(topic string, stopChan chan os.Signal) error { func (e *Service) setupQueueConsume(topic string, stopChan chan os.Signal) error {
err := e.channel.ExchangeDeclare(
topic, // name
"fanout", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
if err != nil {
return &customerrs.AMQPMessageFailError{Action: "failed declaring queue", Err: err}
}
q, err := e.channel.QueueDeclare( q, err := e.channel.QueueDeclare(
topic, // name "", // name
false, // durable false, // durable
false, // delete when unused false, // delete when unused
false, // exclusive true, // exclusive
false, // no-wait false, // no-wait
nil, // arguments nil, // arguments
) )
if err != nil {
return &customerrs.AMQPMessageFailError{Action: "failed declaring queue", Err: err}
}
err = e.channel.QueueBind(
q.Name, // queue name
"", // routing key
topic, // exchange
false,
nil,
)
if err != nil { if err != nil {
return &customerrs.AMQPMessageFailError{Action: "failed declaring queue", Err: err} return &customerrs.AMQPMessageFailError{Action: "failed declaring queue", Err: err}
} }
......
...@@ -170,13 +170,14 @@ func initChannel(conn *amqp.Connection) (*amqp.Channel, error) { ...@@ -170,13 +170,14 @@ func initChannel(conn *amqp.Connection) (*amqp.Channel, error) {
// PublishEvent declares a queue and publishes events. // PublishEvent declares a queue and publishes events.
func (e *EventService) PublishEvent(topic string, event event.Event) error { func (e *EventService) PublishEvent(topic string, event event.Event) error {
q, err := e.channel.QueueDeclare( err := e.channel.ExchangeDeclare(
topic, // name topic, // name
false, // durable "fanout", // type
false, // delete when unused true, // durable
false, // exclusive false, // auto-deleted
false, // no-wait false, // internal
nil, // arguments false, // no-wait
nil, // arguments
) )
if err != nil { if err != nil {
return &customerrs.AMQPInitFailError{Action: "failed declaring queue", Err: err} return &customerrs.AMQPInitFailError{Action: "failed declaring queue", Err: err}
...@@ -189,10 +190,10 @@ func (e *EventService) PublishEvent(topic string, event event.Event) error { ...@@ -189,10 +190,10 @@ func (e *EventService) PublishEvent(topic string, event event.Event) error {
err = e.channel.PublishWithContext( err = e.channel.PublishWithContext(
context.TODO(), context.TODO(),
"", // exchange topic, // exchange
q.Name, // routing key "", // routing key
false, // mandatory false, // mandatory
false, // immediate false, // immediate
amqp.Publishing{ amqp.Publishing{
ContentType: "application/json", ContentType: "application/json",
Body: eventBody, Body: eventBody,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment