Newer
Older
Fabian Seidl
committed
package eventservice
import (
Fabian Seidl
committed
"encoding/json"
Fabian Seidl
committed
"time"
Fabian Seidl
committed
"code.fbi.h-da.de/danet/gosdn/controller/config"
Fabian Seidl
committed
"code.fbi.h-da.de/danet/gosdn/controller/customerrs"
Fabian Seidl
committed
"code.fbi.h-da.de/danet/gosdn/controller/event"
interfaces "code.fbi.h-da.de/danet/gosdn/controller/interfaces/event"
amqp "github.com/rabbitmq/amqp091-go"
Fabian Seidl
committed
"github.com/sethvargo/go-retry"
"github.com/sirupsen/logrus"
log "github.com/sirupsen/logrus"
Fabian Seidl
committed
)
Fabian Seidl
committed
const (
// Duration to try reconnecting until cancelation.
reconnectDelay = 5 * time.Second
// Duration to try re-initializing a channel.
reInitDelay = 2 * time.Second
// Max number of tries to reconnect to Broker.
reconnectAttempts = 5
// Max number of tries to re-initialize a channel. As re-init is part of the reconnect process this multiplies
// with number of reconnectAttempts.
reInitAttempts = 5
// Max number of tries to republish a failed message.
rePublishAttempts = 5
)
Fabian Seidl
committed
// EventService is used to setup a connection to a broker and publish events to topics.
type EventService struct {
Fabian Seidl
committed
connectionAddr string
connection *amqp.Connection
channel *amqp.Channel
done chan bool
notifyConnClose chan *amqp.Error
notifyChanClose chan *amqp.Error
isReady bool
Fabian Seidl
committed
}
// NewEventService creates a new connection to the broker and opens a channel for later usage.
func NewEventService() (interfaces.Service, error) {
// TODO: This is an fugly hack to mitigate that some tests are trying to connect to an actual broker. (staester)
if config.AMQPPrefix == "" {
return NewMockEventService(), nil
}
Fabian Seidl
committed
addr := amqpURIBuilder(config.AMQPPrefix, config.AMQPUser, config.AMQPPassword, config.AMQPHost, config.AMQPPort)
conn, err := connect(addr)
if err != nil {
return nil, err
}
notifyConnClose := make(chan *amqp.Error)
conn.NotifyClose(notifyConnClose)
ch, err := initChannel(conn)
if err != nil {
return nil, err
}
notifyChanClose := make(chan *amqp.Error)
ch.NotifyClose(notifyChanClose)
return &EventService{
connectionAddr: addr,
connection: conn,
channel: ch,
done: make(chan bool),
notifyConnClose: notifyConnClose,
notifyChanClose: notifyChanClose,
}, nil
}
func connect(addr string) (*amqp.Connection, error) {
var err error
// Will equal two Minuten of retries
retries := 60
logrus.Infof("will try to connect to rabbitmq: %s", addr)
for i := 0; i < retries; i++ {
conn, err := amqp.Dial(addr)
if err == nil {
logrus.Info("Connected to RabbitMQ")
return conn, nil
}
logrus.Errorf("could not connect to RabbitMQ with error: %s. Retrying in 2 seconds.", err.Error())
time.Sleep(2 * time.Second)
Fabian Seidl
committed
}
return nil, &customerrs.AMQPInitFailError{Action: "finally failed to connect to RabbitMQ", Err: err}
Fabian Seidl
committed
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
}
// Reconnect attempts to setup a new connection to the RabbitMQ server after an disconnect.
func (e *EventService) Reconnect() {
i := 1
for {
e.isReady = false
conn, err := connect(e.connectionAddr)
if err != nil {
select {
case <-e.done:
return
case <-time.After(reconnectDelay):
}
continue
}
done := e.handleReInit(conn)
if done || i == reconnectAttempts {
break
}
i++
}
}
func (e *EventService) handleReInit(conn *amqp.Connection) bool {
i := 1
for {
e.isReady = false
ch, err := initChannel(conn)
if err != nil {
select {
case <-e.done:
return true
case <-time.After(reInitDelay):
i++
}
if i == reInitAttempts {
return false
}
continue
}
notifyConnClose := make(chan *amqp.Error)
conn.NotifyClose(notifyConnClose)
e.notifyConnClose = notifyConnClose
e.connection = conn
notifyChanClose := make(chan *amqp.Error)
ch.NotifyClose(notifyChanClose)
e.notifyChanClose = notifyChanClose
e.channel = ch
e.isReady = true
return true
}
}
func initChannel(conn *amqp.Connection) (*amqp.Channel, error) {
Fabian Seidl
committed
ch, err := conn.Channel()
if err != nil {
Fabian Seidl
committed
return nil, &customerrs.AMQPInitFailError{Action: "failed to open a channel", Err: err}
Fabian Seidl
committed
}
Fabian Seidl
committed
return ch, nil
Fabian Seidl
committed
}
// PublishEvent declares a queue and publishes events.
func (e *EventService) PublishEvent(topic string, event event.Event) error {
err := e.channel.ExchangeDeclare(
topic, // name
"fanout", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
Fabian Seidl
committed
)
if err != nil {
return &customerrs.AMQPInitFailError{Action: "failed declaring exchange", Err: err}
Fabian Seidl
committed
}
eventBody, err := json.Marshal(event)
if err != nil {
Fabian Seidl
committed
return &customerrs.CouldNotMarshallError{Identifier: topic + " " + event.EntityID.String(), Type: event.Type, Err: err}
Fabian Seidl
committed
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
ctx,
topic, // exchange
"", // routing key
false, // mandatory
false, // immediate
Fabian Seidl
committed
amqp.Publishing{
ContentType: "application/json",
Body: eventBody,
})
if err != nil {
Fabian Seidl
committed
return &customerrs.AMQPMessageFailError{Action: "failed to publish message", Err: err}
Fabian Seidl
committed
}
return nil
}
Fabian Seidl
committed
// RetryPublish is used to retry publishing an event after a failed attempt.
func (e *EventService) RetryPublish(topic string, event event.Event) error {
ctx := context.Background()
backOff := retry.NewFibonacci(time.Second)
if err := retry.Do(ctx, retry.WithMaxRetries(rePublishAttempts, backOff), func(ctx context.Context) error {
if pubErr := e.PublishEvent(topic, event); pubErr != nil {
return retry.RetryableError(&customerrs.AMQPMessageFailError{Action: "retrying to publish message failed", Err: pubErr})
}
return nil
}); err != nil {
return err
}
return nil
}
Fabian Seidl
committed
// CloseConnection closes an exisiting connection.
func (e *EventService) CloseConnection() {
if err := e.connection.Close(); err != nil {
log.Error(err)
}