Skip to content
Snippets Groups Projects

Fix multiple rabbitmq consumers receiving all messages

Merged Fabian Seidl requested to merge fix-for-multiple-rabbitmq-consumers into master
Files
3
@@ -77,17 +77,40 @@ func (e *Service) SetupEventReciever(stopChan chan os.Signal) {
// setupQueueConsume sets up a connection to the broker, listens to messages on a specific topic
// and writes them into a channel.
func (e *Service) setupQueueConsume(topic string, stopChan chan os.Signal) error {
err := e.channel.ExchangeDeclare(
topic, // name
"fanout", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
if err != nil {
return &customerrs.AMQPInitFailError{Action: "failed declaring exchange", Err: err}
}
q, err := e.channel.QueueDeclare(
topic, // name
"", // name
false, // durable
false, // delete when unused
false, // exclusive
true, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
return &customerrs.AMQPInitFailError{Action: "failed declaring queue", Err: err}
}
err = e.channel.QueueBind(
q.Name, // queue name
"", // routing key
topic, // exchange
false,
nil,
)
if err != nil {
return &customerrs.AMQPMessageFailError{Action: "failed declaring queue", Err: err}
return &customerrs.AMQPInitFailError{Action: "failed binding queue to exchange", Err: err}
}
msgs, err := e.channel.Consume(
Loading