Skip to content
Snippets Groups Projects
eventService.go 3.75 KiB
Newer Older
  • Learn to ignore specific revisions
  • package event
    
    import (
    	"fmt"
    	"os"
    
    	"code.fbi.h-da.de/danet/gosdn/controller/customerrs"
    	amqp "github.com/rabbitmq/amqp091-go"
    
    	"github.com/sirupsen/logrus"
    )
    
    type (
    	eventCallbackFunc func(*Event)
    )
    
    // TypeToCallbackTuple provides a mapping between an event type and a provided callback.
    type TypeToCallbackTuple struct {
    	Type     Type
    	Callback eventCallbackFunc
    }
    
    // ServiceInterface defines an event service.
    type ServiceInterface interface {
    	SubscribeToEventType([]TypeToCallbackTuple)
    	SetupEventReciever(stopChan chan os.Signal)
    	CloseConnection()
    }
    
    // Service is used to setup a connection to a broker and publish events to topics.
    type Service struct {
    	connection *amqp.Connection
    	channel    *amqp.Channel
    	subscriber map[Type]eventCallbackFunc
    	topics     []Topic
    }
    
    // NewEventService creates a new connection to the broker and opens a channel for later usage.
    func NewEventService(brokerConnectionString string, topics []Topic) (ServiceInterface, error) {
    	conn, err := amqp.Dial(brokerConnectionString)
    	if err != nil {
    		return nil, &customerrs.AMQPInitFailError{Action: "failed to connect to RabbitMQ", Err: err}
    	}
    
    	ch, err := conn.Channel()
    	if err != nil {
    		return nil, &customerrs.AMQPInitFailError{Action: "failed to open a channel", Err: err}
    	}
    
    	return &Service{
    		connection: conn,
    		channel:    ch,
    		subscriber: map[Type]eventCallbackFunc{},
    		topics:     topics,
    	}, nil
    }
    
    // SubscribeToEventType subscribes a provided callback to an event.
    func (e *Service) SubscribeToEventType(eventTypeToCallbackMapping []TypeToCallbackTuple) {
    	for _, eventTypeToCallback := range eventTypeToCallbackMapping {
    		e.subscriber[eventTypeToCallback.Type] = eventTypeToCallback.Callback
    	}
    }
    
    // SetupEventReciever will be removed in the future.
    func (e *Service) SetupEventReciever(stopChan chan os.Signal) {
    	for _, topic := range e.topics {
    		go func(topic string) {
    			setupErr := e.setupQueueConsume(topic, stopChan)
    			if setupErr != nil {
    				stopChan <- os.Interrupt
    			}
    		}(topic.String())
    	}
    }
    
    // setupQueueConsume sets up a connection to the broker, listens to messages on a specific topic
    // and writes them into a channel.
    func (e *Service) setupQueueConsume(topic string, stopChan chan os.Signal) error {
    
    	err := e.channel.ExchangeDeclare(
    		topic,    // name
    		"fanout", // type
    		true,     // durable
    		false,    // auto-deleted
    		false,    // internal
    		false,    // no-wait
    		nil,      // arguments
    	)
    	if err != nil {
    		return &customerrs.AMQPInitFailError{Action: "failed declaring exchange", Err: err}
    	}
    
    
    	if err != nil {
    		return &customerrs.AMQPInitFailError{Action: "failed declaring queue", Err: err}
    	}
    
    	err = e.channel.QueueBind(
    		q.Name, // queue name
    		"",     // routing key
    		topic,  // exchange
    		false,
    		nil,
    	)
    
    		return &customerrs.AMQPInitFailError{Action: "failed binding queue to exchange", Err: err}
    
    	}
    
    	msgs, err := e.channel.Consume(
    		q.Name, // queue
    		"",     // consumer
    		true,   // auto-ack
    		false,  // exclusive
    		false,  // no-local
    		false,  // no-wait
    		nil,    // args
    	)
    
    	if err != nil {
    		return &customerrs.AMQPInitFailError{Action: "failed to register consumer", Err: err}
    	}
    
    	var forever chan struct{}
    
    	go func() {
    		for d := range msgs {
    			event, err := getEventFromPayload(d.Body)
    			if err != nil {
    				fmt.Printf("Error: +%v\n", err)
    			}
    
    			val, ok := e.subscriber[parseTypeString(event.Type)]
    			if ok {
    				val(event)
    			}
    		}
    	}()
    
    	<-forever
    
    	return nil
    }
    
    // CloseConnection closes an exisiting connection.
    func (e *Service) CloseConnection() {
    	err := e.connection.Close()
    	if err != nil {
    		logrus.Errorf("could not close connection %s", err)
    	}
    }