Skip to content
Snippets Groups Projects

Resolve "Error handling in event publishing via entity services could be improved"

Files
10
@@ -16,17 +16,35 @@ import (
log "github.com/sirupsen/logrus"
)
// EventService is used to setup a connection to a broker and publish events to topics.
type EventService struct {
connection *amqp.Connection
channel *amqp.Channel
}
const (
// Duration to try reconnecting until cancelation.
reconnectDelay = 5 * time.Second
// Duration to try re-initializing a channel.
reInitDelay = 2 * time.Second
// Max number of tries to reconnect to Broker.
reconnectAttempts = 5
// Max number of tries to re-initialize a channel. As re-init is part of the reconnect process this multiplies
// with number of reconnectAttempts.
reInitAttempts = 5
// Max number of tries to republish a failed message.
rePublishAttempts = 10
rePublishAttempts = 5
)
// EventService is used to setup a connection to a broker and publish events to topics.
type EventService struct {
connectionAddr string
connection *amqp.Connection
channel *amqp.Channel
done chan bool
notifyConnClose chan *amqp.Error
notifyChanClose chan *amqp.Error
isReady bool
}
// NewEventService creates a new connection to the broker and opens a channel for later usage.
func NewEventService() (interfaces.Service, error) {
// TODO: This is an fugly hack to mitigate that some tests are trying to connect to an actual broker. (staester)
@@ -34,20 +52,108 @@ func NewEventService() (interfaces.Service, error) {
return NewMockEventService(), nil
}
conn, err := amqp.Dial(amqpURIBuilder(config.AMQPPrefix, config.AMQPUser, config.AMQPPassword, config.AMQPHost, config.AMQPPort))
addr := amqpURIBuilder(config.AMQPPrefix, config.AMQPUser, config.AMQPPassword, config.AMQPHost, config.AMQPPort)
conn, err := connect(addr)
if err != nil {
return nil, err
}
notifyConnClose := make(chan *amqp.Error)
conn.NotifyClose(notifyConnClose)
ch, err := initChannel(conn)
if err != nil {
return nil, err
}
notifyChanClose := make(chan *amqp.Error)
ch.NotifyClose(notifyChanClose)
return &EventService{
connectionAddr: addr,
connection: conn,
channel: ch,
done: make(chan bool),
notifyConnClose: notifyConnClose,
notifyChanClose: notifyChanClose,
}, nil
}
func connect(addr string) (*amqp.Connection, error) {
conn, err := amqp.Dial(addr)
if err != nil {
return nil, &customerrs.AMQPInitFailError{Action: "failed to connect to RabbitMQ", Err: err}
}
return conn, nil
}
// Reconnect attempts to setup a new connection to the RabbitMQ server after an disconnect.
func (e *EventService) Reconnect() {
i := 1
for {
e.isReady = false
conn, err := connect(e.connectionAddr)
if err != nil {
select {
case <-e.done:
return
case <-time.After(reconnectDelay):
}
continue
}
done := e.handleReInit(conn)
if done || i == reconnectAttempts {
break
}
i++
}
}
func (e *EventService) handleReInit(conn *amqp.Connection) bool {
i := 1
for {
e.isReady = false
ch, err := initChannel(conn)
if err != nil {
select {
case <-e.done:
return true
case <-time.After(reInitDelay):
i++
}
if i == reInitAttempts {
return false
}
continue
}
notifyConnClose := make(chan *amqp.Error)
conn.NotifyClose(notifyConnClose)
e.notifyConnClose = notifyConnClose
e.connection = conn
notifyChanClose := make(chan *amqp.Error)
ch.NotifyClose(notifyChanClose)
e.notifyChanClose = notifyChanClose
e.channel = ch
e.isReady = true
return true
}
}
func initChannel(conn *amqp.Connection) (*amqp.Channel, error) {
ch, err := conn.Channel()
if err != nil {
return nil, &customerrs.AMQPInitFailError{Action: "failed to open a channel", Err: err}
}
return &EventService{
connection: conn,
channel: ch,
}, nil
return ch, nil
}
// PublishEvent declares a queue and publishes events.
Loading