Skip to content
Snippets Groups Projects
Commit 4aa8252e authored by Fabian Seidl's avatar Fabian Seidl
Browse files

update descriptions, add proper ctx

parent 93f2f5f8
No related branches found
No related tags found
1 merge request!1217Fix multiple rabbitmq consumers receiving all messages
Pipeline #262444 passed
This commit is part of merge request !1217. Comments created here will be created in the context of that merge request.
......@@ -87,7 +87,7 @@ func (e *Service) setupQueueConsume(topic string, stopChan chan os.Signal) error
nil, // arguments
)
if err != nil {
return &customerrs.AMQPMessageFailError{Action: "failed declaring queue", Err: err}
return &customerrs.AMQPInitFailError{Action: "failed declaring exchange", Err: err}
}
q, err := e.channel.QueueDeclare(
......@@ -99,7 +99,7 @@ func (e *Service) setupQueueConsume(topic string, stopChan chan os.Signal) error
nil, // arguments
)
if err != nil {
return &customerrs.AMQPMessageFailError{Action: "failed declaring queue", Err: err}
return &customerrs.AMQPInitFailError{Action: "failed declaring queue", Err: err}
}
err = e.channel.QueueBind(
......@@ -110,7 +110,7 @@ func (e *Service) setupQueueConsume(topic string, stopChan chan os.Signal) error
nil,
)
if err != nil {
return &customerrs.AMQPMessageFailError{Action: "failed declaring queue", Err: err}
return &customerrs.AMQPInitFailError{Action: "failed binding to queue to exchange", Err: err}
}
msgs, err := e.channel.Consume(
......
......@@ -180,7 +180,7 @@ func (e *EventService) PublishEvent(topic string, event event.Event) error {
nil, // arguments
)
if err != nil {
return &customerrs.AMQPInitFailError{Action: "failed declaring queue", Err: err}
return &customerrs.AMQPInitFailError{Action: "failed declaring exchange", Err: err}
}
eventBody, err := json.Marshal(event)
......@@ -188,8 +188,11 @@ func (e *EventService) PublishEvent(topic string, event event.Event) error {
return &customerrs.CouldNotMarshallError{Identifier: topic + " " + event.EntityID.String(), Type: event.Type, Err: err}
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
err = e.channel.PublishWithContext(
context.TODO(),
ctx,
topic, // exchange
"", // routing key
false, // mandatory
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment