Skip to content
Snippets Groups Projects

Resolve "Error handling in event publishing via entity services could be improved"

Files
13
@@ -3,6 +3,7 @@ package eventservice
@@ -3,6 +3,7 @@ package eventservice
import (
import (
"context"
"context"
"encoding/json"
"encoding/json"
 
"time"
"code.fbi.h-da.de/danet/gosdn/controller/config"
"code.fbi.h-da.de/danet/gosdn/controller/config"
"code.fbi.h-da.de/danet/gosdn/controller/customerrs"
"code.fbi.h-da.de/danet/gosdn/controller/customerrs"
@@ -11,13 +12,37 @@ import (
@@ -11,13 +12,37 @@ import (
interfaces "code.fbi.h-da.de/danet/gosdn/controller/interfaces/event"
interfaces "code.fbi.h-da.de/danet/gosdn/controller/interfaces/event"
amqp "github.com/rabbitmq/amqp091-go"
amqp "github.com/rabbitmq/amqp091-go"
 
"github.com/sethvargo/go-retry"
log "github.com/sirupsen/logrus"
log "github.com/sirupsen/logrus"
)
)
 
const (
 
// Duration to try reconnecting until cancelation.
 
reconnectDelay = 5 * time.Second
 
 
// Duration to try re-initializing a channel.
 
reInitDelay = 2 * time.Second
 
 
// Max number of tries to reconnect to Broker.
 
reconnectAttempts = 5
 
 
// Max number of tries to re-initialize a channel. As re-init is part of the reconnect process this multiplies
 
// with number of reconnectAttempts.
 
reInitAttempts = 5
 
 
// Max number of tries to republish a failed message.
 
rePublishAttempts = 5
 
)
 
// EventService is used to setup a connection to a broker and publish events to topics.
// EventService is used to setup a connection to a broker and publish events to topics.
type EventService struct {
type EventService struct {
connection *amqp.Connection
connectionAddr string
channel *amqp.Channel
connection *amqp.Connection
 
channel *amqp.Channel
 
done chan bool
 
notifyConnClose chan *amqp.Error
 
notifyChanClose chan *amqp.Error
 
isReady bool
}
}
// NewEventService creates a new connection to the broker and opens a channel for later usage.
// NewEventService creates a new connection to the broker and opens a channel for later usage.
@@ -27,20 +52,108 @@ func NewEventService() (interfaces.Service, error) {
@@ -27,20 +52,108 @@ func NewEventService() (interfaces.Service, error) {
return NewMockEventService(), nil
return NewMockEventService(), nil
}
}
conn, err := amqp.Dial(amqpURIBuilder(config.AMQPPrefix, config.AMQPUser, config.AMQPPassword, config.AMQPHost, config.AMQPPort))
addr := amqpURIBuilder(config.AMQPPrefix, config.AMQPUser, config.AMQPPassword, config.AMQPHost, config.AMQPPort)
 
 
conn, err := connect(addr)
 
if err != nil {
 
return nil, err
 
}
 
 
notifyConnClose := make(chan *amqp.Error)
 
conn.NotifyClose(notifyConnClose)
 
 
ch, err := initChannel(conn)
 
if err != nil {
 
return nil, err
 
}
 
notifyChanClose := make(chan *amqp.Error)
 
ch.NotifyClose(notifyChanClose)
 
 
return &EventService{
 
connectionAddr: addr,
 
connection: conn,
 
channel: ch,
 
done: make(chan bool),
 
notifyConnClose: notifyConnClose,
 
notifyChanClose: notifyChanClose,
 
}, nil
 
}
 
 
func connect(addr string) (*amqp.Connection, error) {
 
conn, err := amqp.Dial(addr)
if err != nil {
if err != nil {
return nil, &customerrs.AMQPInitFailError{Action: "failed to connect to RabbitMQ", Err: err}
return nil, &customerrs.AMQPInitFailError{Action: "failed to connect to RabbitMQ", Err: err}
}
}
 
return conn, nil
 
}
 
 
// Reconnect attempts to setup a new connection to the RabbitMQ server after an disconnect.
 
func (e *EventService) Reconnect() {
 
i := 1
 
for {
 
e.isReady = false
 
 
conn, err := connect(e.connectionAddr)
 
 
if err != nil {
 
select {
 
case <-e.done:
 
return
 
case <-time.After(reconnectDelay):
 
}
 
continue
 
}
 
 
done := e.handleReInit(conn)
 
if done || i == reconnectAttempts {
 
break
 
}
 
i++
 
}
 
}
 
 
func (e *EventService) handleReInit(conn *amqp.Connection) bool {
 
i := 1
 
for {
 
e.isReady = false
 
 
ch, err := initChannel(conn)
 
if err != nil {
 
select {
 
case <-e.done:
 
return true
 
case <-time.After(reInitDelay):
 
i++
 
}
 
if i == reInitAttempts {
 
return false
 
}
 
continue
 
}
 
 
notifyConnClose := make(chan *amqp.Error)
 
conn.NotifyClose(notifyConnClose)
 
e.notifyConnClose = notifyConnClose
 
e.connection = conn
 
 
notifyChanClose := make(chan *amqp.Error)
 
ch.NotifyClose(notifyChanClose)
 
e.notifyChanClose = notifyChanClose
 
e.channel = ch
 
e.isReady = true
 
 
return true
 
}
 
}
 
 
func initChannel(conn *amqp.Connection) (*amqp.Channel, error) {
ch, err := conn.Channel()
ch, err := conn.Channel()
if err != nil {
if err != nil {
return nil, &customerrs.AMQPInitFailError{Action: "failed to open a channel", Err: err}
return nil, &customerrs.AMQPInitFailError{Action: "failed to open a channel", Err: err}
}
}
return &EventService{
return ch, nil
connection: conn,
channel: ch,
}, nil
}
}
// PublishEvent declares a queue and publishes events.
// PublishEvent declares a queue and publishes events.
@@ -79,6 +192,23 @@ func (e *EventService) PublishEvent(topic string, event event.Event) error {
@@ -79,6 +192,23 @@ func (e *EventService) PublishEvent(topic string, event event.Event) error {
return nil
return nil
}
}
 
// RetryPublish is used to retry publishing an event after a failed attempt.
 
func (e *EventService) RetryPublish(topic string, event event.Event) error {
 
ctx := context.Background()
 
backOff := retry.NewFibonacci(time.Second)
 
 
if err := retry.Do(ctx, retry.WithMaxRetries(rePublishAttempts, backOff), func(ctx context.Context) error {
 
if pubErr := e.PublishEvent(topic, event); pubErr != nil {
 
return retry.RetryableError(&customerrs.AMQPMessageFailError{Action: "retrying to publish message failed", Err: pubErr})
 
}
 
return nil
 
}); err != nil {
 
return err
 
}
 
 
return nil
 
}
 
// CloseConnection closes an exisiting connection.
// CloseConnection closes an exisiting connection.
func (e *EventService) CloseConnection() {
func (e *EventService) CloseConnection() {
if err := e.connection.Close(); err != nil {
if err := e.connection.Close(); err != nil {
Loading