package event import ( "fmt" "os" "code.fbi.h-da.de/danet/gosdn/controller/customerrs" amqp "github.com/rabbitmq/amqp091-go" "github.com/sirupsen/logrus" ) type ( eventCallbackFunc func(*Event) ) // TypeToCallbackTuple provides a mapping between an event type and a provided callback. type TypeToCallbackTuple struct { Type Type Callback eventCallbackFunc } // ServiceInterface defines an event service. type ServiceInterface interface { SubscribeToEventType([]TypeToCallbackTuple) SetupEventReciever(stopChan chan os.Signal) CloseConnection() } // Service is used to setup a connection to a broker and publish events to topics. type Service struct { connection *amqp.Connection channel *amqp.Channel subscriber map[Type]eventCallbackFunc topics []Topic } // NewEventService creates a new connection to the broker and opens a channel for later usage. func NewEventService(brokerConnectionString string, topics []Topic) (ServiceInterface, error) { conn, err := amqp.Dial(brokerConnectionString) if err != nil { return nil, &customerrs.AMQPInitFailError{Action: "failed to connect to RabbitMQ", Err: err} } ch, err := conn.Channel() if err != nil { return nil, &customerrs.AMQPInitFailError{Action: "failed to open a channel", Err: err} } return &Service{ connection: conn, channel: ch, subscriber: map[Type]eventCallbackFunc{}, topics: topics, }, nil } // SubscribeToEventType subscribes a provided callback to an event. func (e *Service) SubscribeToEventType(eventTypeToCallbackMapping []TypeToCallbackTuple) { for _, eventTypeToCallback := range eventTypeToCallbackMapping { e.subscriber[eventTypeToCallback.Type] = eventTypeToCallback.Callback } } // SetupEventReciever will be removed in the future. func (e *Service) SetupEventReciever(stopChan chan os.Signal) { for _, topic := range e.topics { go func(topic string) { setupErr := e.setupQueueConsume(topic, stopChan) if setupErr != nil { stopChan <- os.Interrupt } }(topic.String()) } } // setupQueueConsume sets up a connection to the broker, listens to messages on a specific topic // and writes them into a channel. func (e *Service) setupQueueConsume(topic string, stopChan chan os.Signal) error { err := e.channel.ExchangeDeclare( topic, // name "fanout", // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments ) if err != nil { return &customerrs.AMQPInitFailError{Action: "failed declaring exchange", Err: err} } q, err := e.channel.QueueDeclare( "", // name false, // durable false, // delete when unused true, // exclusive false, // no-wait nil, // arguments ) if err != nil { return &customerrs.AMQPInitFailError{Action: "failed declaring queue", Err: err} } err = e.channel.QueueBind( q.Name, // queue name "", // routing key topic, // exchange false, nil, ) if err != nil { return &customerrs.AMQPInitFailError{Action: "failed binding queue to exchange", Err: err} } msgs, err := e.channel.Consume( q.Name, // queue "", // consumer true, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args ) if err != nil { return &customerrs.AMQPInitFailError{Action: "failed to register consumer", Err: err} } var forever chan struct{} go func() { for d := range msgs { event, err := getEventFromPayload(d.Body) if err != nil { fmt.Printf("Error: +%v\n", err) } val, ok := e.subscriber[parseTypeString(event.Type)] if ok { val(event) } } }() <-forever return nil } // CloseConnection closes an exisiting connection. func (e *Service) CloseConnection() { err := e.connection.Close() if err != nil { logrus.Errorf("could not close connection %s", err) } }