-
Malte Bauch authored
See merge request !384 Co-authored-by:
Malte Bauch <malte.bauch@extern.h-da.de>
Malte Bauch authoredSee merge request !384 Co-authored-by:
Malte Bauch <malte.bauch@extern.h-da.de>
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
eventService.go 3.23 KiB
package event
import (
"fmt"
"os"
"code.fbi.h-da.de/danet/gosdn/controller/customerrs"
amqp "github.com/rabbitmq/amqp091-go"
"github.com/sirupsen/logrus"
)
type (
eventCallbackFunc func(*Event)
)
// TypeToCallbackTuple provides a mapping between an event type and a provided callback.
type TypeToCallbackTuple struct {
Type Type
Callback eventCallbackFunc
}
// ServiceInterface defines an event service.
type ServiceInterface interface {
SubscribeToEventType([]TypeToCallbackTuple)
SetupEventReciever(stopChan chan os.Signal)
CloseConnection()
}
// Service is used to setup a connection to a broker and publish events to topics.
type Service struct {
connection *amqp.Connection
channel *amqp.Channel
subscriber map[Type]eventCallbackFunc
topics []Topic
}
// NewEventService creates a new connection to the broker and opens a channel for later usage.
func NewEventService(brokerConnectionString string, topics []Topic) (ServiceInterface, error) {
conn, err := amqp.Dial(brokerConnectionString)
if err != nil {
return nil, &customerrs.AMQPInitFailError{Action: "failed to connect to RabbitMQ", Err: err}
}
ch, err := conn.Channel()
if err != nil {
return nil, &customerrs.AMQPInitFailError{Action: "failed to open a channel", Err: err}
}
return &Service{
connection: conn,
channel: ch,
subscriber: map[Type]eventCallbackFunc{},
topics: topics,
}, nil
}
// SubscribeToEventType subscribes a provided callback to an event.
func (e *Service) SubscribeToEventType(eventTypeToCallbackMapping []TypeToCallbackTuple) {
for _, eventTypeToCallback := range eventTypeToCallbackMapping {
e.subscriber[eventTypeToCallback.Type] = eventTypeToCallback.Callback
}
}
// SetupEventReciever will be removed in the future.
func (e *Service) SetupEventReciever(stopChan chan os.Signal) {
for _, topic := range e.topics {
go func(topic string) {
setupErr := e.setupQueueConsume(topic, stopChan)
if setupErr != nil {
stopChan <- os.Interrupt
}
}(topic.String())
}
}
// setupQueueConsume sets up a connection to the broker, listens to messages on a specific topic
// and writes them into a channel.
func (e *Service) setupQueueConsume(topic string, stopChan chan os.Signal) error {
q, err := e.channel.QueueDeclare(
topic, // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
return &customerrs.AMQPMessageFailError{Action: "failed declaring queue", Err: err}
}
msgs, err := e.channel.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
return &customerrs.AMQPInitFailError{Action: "failed to register consumer", Err: err}
}
var forever chan struct{}
go func() {
for d := range msgs {
event, err := getEventFromPayload(d.Body)
if err != nil {
fmt.Printf("Error: +%v\n", err)
}
val, ok := e.subscriber[parseTypeString(event.Type)]
if ok {
val(event)
}
}
}()
<-forever
return nil
}
// CloseConnection closes an exisiting connection.
func (e *Service) CloseConnection() {
err := e.connection.Close()
if err != nil {
logrus.Errorf("could not close connection %s", err)
}
}