Skip to content
Snippets Groups Projects
Commit eb785f99 authored by Fabian Seidl's avatar Fabian Seidl
Browse files

Fix multiple rabbitmq consumers receiving all messages

See merge request !1217
parent fa7274f6
No related branches found
No related tags found
1 merge request!1217Fix multiple rabbitmq consumers receiving all messages
Pipeline #262450 passed
...@@ -77,17 +77,40 @@ func (e *Service) SetupEventReciever(stopChan chan os.Signal) { ...@@ -77,17 +77,40 @@ func (e *Service) SetupEventReciever(stopChan chan os.Signal) {
// setupQueueConsume sets up a connection to the broker, listens to messages on a specific topic // setupQueueConsume sets up a connection to the broker, listens to messages on a specific topic
// and writes them into a channel. // and writes them into a channel.
func (e *Service) setupQueueConsume(topic string, stopChan chan os.Signal) error { func (e *Service) setupQueueConsume(topic string, stopChan chan os.Signal) error {
err := e.channel.ExchangeDeclare(
topic, // name
"fanout", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
if err != nil {
return &customerrs.AMQPInitFailError{Action: "failed declaring exchange", Err: err}
}
q, err := e.channel.QueueDeclare( q, err := e.channel.QueueDeclare(
topic, // name "", // name
false, // durable false, // durable
false, // delete when unused false, // delete when unused
false, // exclusive true, // exclusive
false, // no-wait false, // no-wait
nil, // arguments nil, // arguments
) )
if err != nil {
return &customerrs.AMQPInitFailError{Action: "failed declaring queue", Err: err}
}
err = e.channel.QueueBind(
q.Name, // queue name
"", // routing key
topic, // exchange
false,
nil,
)
if err != nil { if err != nil {
return &customerrs.AMQPMessageFailError{Action: "failed declaring queue", Err: err} return &customerrs.AMQPInitFailError{Action: "failed binding queue to exchange", Err: err}
} }
msgs, err := e.channel.Consume( msgs, err := e.channel.Consume(
......
...@@ -170,16 +170,17 @@ func initChannel(conn *amqp.Connection) (*amqp.Channel, error) { ...@@ -170,16 +170,17 @@ func initChannel(conn *amqp.Connection) (*amqp.Channel, error) {
// PublishEvent declares a queue and publishes events. // PublishEvent declares a queue and publishes events.
func (e *EventService) PublishEvent(topic string, event event.Event) error { func (e *EventService) PublishEvent(topic string, event event.Event) error {
q, err := e.channel.QueueDeclare( err := e.channel.ExchangeDeclare(
topic, // name topic, // name
false, // durable "fanout", // type
false, // delete when unused true, // durable
false, // exclusive false, // auto-deleted
false, // no-wait false, // internal
nil, // arguments false, // no-wait
nil, // arguments
) )
if err != nil { if err != nil {
return &customerrs.AMQPInitFailError{Action: "failed declaring queue", Err: err} return &customerrs.AMQPInitFailError{Action: "failed declaring exchange", Err: err}
} }
eventBody, err := json.Marshal(event) eventBody, err := json.Marshal(event)
...@@ -187,12 +188,15 @@ func (e *EventService) PublishEvent(topic string, event event.Event) error { ...@@ -187,12 +188,15 @@ func (e *EventService) PublishEvent(topic string, event event.Event) error {
return &customerrs.CouldNotMarshallError{Identifier: topic + " " + event.EntityID.String(), Type: event.Type, Err: err} return &customerrs.CouldNotMarshallError{Identifier: topic + " " + event.EntityID.String(), Type: event.Type, Err: err}
} }
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
err = e.channel.PublishWithContext( err = e.channel.PublishWithContext(
context.TODO(), ctx,
"", // exchange topic, // exchange
q.Name, // routing key "", // routing key
false, // mandatory false, // mandatory
false, // immediate false, // immediate
amqp.Publishing{ amqp.Publishing{
ContentType: "application/json", ContentType: "application/json",
Body: eventBody, Body: eventBody,
......
...@@ -94,17 +94,6 @@ func TestMain(m *testing.M) { ...@@ -94,17 +94,6 @@ func TestMain(m *testing.M) {
} }
go application.Run(eventTypeCallbackTuples) go application.Run(eventTypeCallbackTuples)
// This is needed to clear the go channel of the messages sent by RabbitMQ when creating
// and logging in with the admin user.
// Important note: only works once after starting the setup, because first time use creates
// a user and role and update the user because of the login. After then only logins are done, no user and role creations.
// This means that this will block after trying once, because of the three attempts to read from eventChannels.
<-application.addEventChannel
<-application.addEventChannel
<-application.addEventChannel
<-application.updateEventChannel
m.Run() m.Run()
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment