package eventservice import ( "context" "encoding/json" "time" "code.fbi.h-da.de/danet/gosdn/controller/config" "code.fbi.h-da.de/danet/gosdn/controller/customerrs" "code.fbi.h-da.de/danet/gosdn/controller/event" interfaces "code.fbi.h-da.de/danet/gosdn/controller/interfaces/event" amqp "github.com/rabbitmq/amqp091-go" "github.com/sethvargo/go-retry" "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus" ) const ( // Duration to try reconnecting until cancelation. reconnectDelay = 5 * time.Second // Duration to try re-initializing a channel. reInitDelay = 2 * time.Second // Max number of tries to reconnect to Broker. reconnectAttempts = 5 // Max number of tries to re-initialize a channel. As re-init is part of the reconnect process this multiplies // with number of reconnectAttempts. reInitAttempts = 5 // Max number of tries to republish a failed message. rePublishAttempts = 5 ) // EventService is used to setup a connection to a broker and publish events to topics. type EventService struct { connectionAddr string connection *amqp.Connection channel *amqp.Channel done chan bool notifyConnClose chan *amqp.Error notifyChanClose chan *amqp.Error isReady bool } // NewEventService creates a new connection to the broker and opens a channel for later usage. func NewEventService() (interfaces.Service, error) { // TODO: This is an fugly hack to mitigate that some tests are trying to connect to an actual broker. (staester) if config.AMQPPrefix == "" { return NewMockEventService(), nil } addr := amqpURIBuilder(config.AMQPPrefix, config.AMQPUser, config.AMQPPassword, config.AMQPHost, config.AMQPPort) conn, err := connect(addr) if err != nil { return nil, err } notifyConnClose := make(chan *amqp.Error) conn.NotifyClose(notifyConnClose) ch, err := initChannel(conn) if err != nil { return nil, err } notifyChanClose := make(chan *amqp.Error) ch.NotifyClose(notifyChanClose) return &EventService{ connectionAddr: addr, connection: conn, channel: ch, done: make(chan bool), notifyConnClose: notifyConnClose, notifyChanClose: notifyChanClose, }, nil } func connect(addr string) (*amqp.Connection, error) { var err error // Will equal two Minuten of retries retries := 60 logrus.Infof("will try to connect to rabbitmq: %s", addr) for i := 0; i < retries; i++ { conn, err := amqp.Dial(addr) if err == nil { logrus.Info("Connected to RabbitMQ") return conn, nil } logrus.Errorf("could not connect to RabbitMQ with error: %s. Retrying in 2 seconds.", err.Error()) time.Sleep(2 * time.Second) } return nil, &customerrs.AMQPInitFailError{Action: "finally failed to connect to RabbitMQ", Err: err} } // Reconnect attempts to setup a new connection to the RabbitMQ server after an disconnect. func (e *EventService) Reconnect() { i := 1 for { e.isReady = false conn, err := connect(e.connectionAddr) if err != nil { select { case <-e.done: return case <-time.After(reconnectDelay): } continue } done := e.handleReInit(conn) if done || i == reconnectAttempts { break } i++ } } func (e *EventService) handleReInit(conn *amqp.Connection) bool { i := 1 for { e.isReady = false ch, err := initChannel(conn) if err != nil { select { case <-e.done: return true case <-time.After(reInitDelay): i++ } if i == reInitAttempts { return false } continue } notifyConnClose := make(chan *amqp.Error) conn.NotifyClose(notifyConnClose) e.notifyConnClose = notifyConnClose e.connection = conn notifyChanClose := make(chan *amqp.Error) ch.NotifyClose(notifyChanClose) e.notifyChanClose = notifyChanClose e.channel = ch e.isReady = true return true } } func initChannel(conn *amqp.Connection) (*amqp.Channel, error) { ch, err := conn.Channel() if err != nil { return nil, &customerrs.AMQPInitFailError{Action: "failed to open a channel", Err: err} } return ch, nil } // PublishEvent declares a queue and publishes events. func (e *EventService) PublishEvent(topic string, event event.Event) error { err := e.channel.ExchangeDeclare( topic, // name "fanout", // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments ) if err != nil { return &customerrs.AMQPInitFailError{Action: "failed declaring exchange", Err: err} } eventBody, err := json.Marshal(event) if err != nil { return &customerrs.CouldNotMarshallError{Identifier: topic + " " + event.EntityID.String(), Type: event.Type, Err: err} } ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() err = e.channel.PublishWithContext( ctx, topic, // exchange "", // routing key false, // mandatory false, // immediate amqp.Publishing{ ContentType: "application/json", Body: eventBody, }) if err != nil { return &customerrs.AMQPMessageFailError{Action: "failed to publish message", Err: err} } return nil } // RetryPublish is used to retry publishing an event after a failed attempt. func (e *EventService) RetryPublish(topic string, event event.Event) error { ctx := context.Background() backOff := retry.NewFibonacci(time.Second) if err := retry.Do(ctx, retry.WithMaxRetries(rePublishAttempts, backOff), func(ctx context.Context) error { if pubErr := e.PublishEvent(topic, event); pubErr != nil { return retry.RetryableError(&customerrs.AMQPMessageFailError{Action: "retrying to publish message failed", Err: pubErr}) } return nil }); err != nil { return err } return nil } // CloseConnection closes an exisiting connection. func (e *EventService) CloseConnection() { if err := e.connection.Close(); err != nil { log.Error(err) } }