Skip to content
Snippets Groups Projects
Service.go 2.24 KiB
Newer Older
  • Learn to ignore specific revisions
  • package eventservice
    
    import (
    	"encoding/json"
    
    	"code.fbi.h-da.de/danet/gosdn/controller/config"
    	"code.fbi.h-da.de/danet/gosdn/controller/event"
    	"code.fbi.h-da.de/danet/gosdn/controller/nucleus/errors"
    
    	interfaces "code.fbi.h-da.de/danet/gosdn/controller/interfaces/event"
    
    	amqp "github.com/rabbitmq/amqp091-go"
    )
    
    // EventService is used to setup a connection to a broker and publish events to topics.
    type EventService struct {
    	connection *amqp.Connection
    	channel    *amqp.Channel
    }
    
    // NewEventService creates a new connection to the broker and opens a channel for later usage.
    func NewEventService() (interfaces.Service, error) {
    	// TODO: This is an fugly hack to mitigate that some tests are trying to connect to an actual broker. (staester)
    	if config.AMQPPrefix == "" {
    		return NewMockEventService(), nil
    	}
    
    	conn, err := amqp.Dial(amqpURIBuilder(config.AMQPPrefix, config.AMQPUser, config.AMQPPassword, config.AMQPHost, config.AMQPPort))
    	if err != nil {
    		return nil, &errors.ErrAMQPInitFail{Action: "failed to connect to RabbitMQ", Err: err}
    	}
    
    	ch, err := conn.Channel()
    	if err != nil {
    		return nil, &errors.ErrAMQPInitFail{Action: "failed to open a channel", Err: err}
    	}
    
    	return &EventService{
    		connection: conn,
    		channel:    ch,
    	}, nil
    }
    
    // PublishEvent declares a queue and publishes events.
    func (e *EventService) PublishEvent(topic string, event event.Event) error {
    	q, err := e.channel.QueueDeclare(
    		topic, // name
    		false, // durable
    		false, // delete when unused
    		false, // exclusive
    		false, // no-wait
    		nil,   // arguments
    	)
    	if err != nil {
    		return &errors.ErrAMQPInitFail{Action: "failed declaring queue", Err: err}
    	}
    
    	eventBody, err := json.Marshal(event)
    	if err != nil {
    		return &errors.ErrCouldNotMarshall{Identifier: topic + " " + event.EntityID.String(), Type: event.Type, Err: err}
    	}
    
    	err = e.channel.Publish(
    		"",     // exchange
    		q.Name, // routing key
    		false,  // mandatory
    		false,  // immediate
    		amqp.Publishing{
    			ContentType: "application/json",
    			Body:        eventBody,
    		})
    	if err != nil {
    		return &errors.ErrAMQPMessageFail{Action: "failed to publish message", Err: err}
    	}
    
    	return nil
    }
    
    // CloseConnection closes an exisiting connection.
    func (e *EventService) CloseConnection() {
    	e.connection.Close()
    }