package eventservice import ( "encoding/json" "code.fbi.h-da.de/danet/gosdn/controller/config" "code.fbi.h-da.de/danet/gosdn/controller/event" "code.fbi.h-da.de/danet/gosdn/controller/nucleus/errors" interfaces "code.fbi.h-da.de/danet/gosdn/controller/interfaces/event" amqp "github.com/rabbitmq/amqp091-go" ) // EventService is used to setup a connection to a broker and publish events to topics. type EventService struct { connection *amqp.Connection channel *amqp.Channel } // NewEventService creates a new connection to the broker and opens a channel for later usage. func NewEventService() (interfaces.Service, error) { // TODO: This is an fugly hack to mitigate that some tests are trying to connect to an actual broker. (staester) if config.AMQPPrefix == "" { return NewMockEventService(), nil } conn, err := amqp.Dial(amqpURIBuilder(config.AMQPPrefix, config.AMQPUser, config.AMQPPassword, config.AMQPHost, config.AMQPPort)) if err != nil { return nil, &errors.ErrAMQPInitFail{Action: "failed to connect to RabbitMQ", Err: err} } ch, err := conn.Channel() if err != nil { return nil, &errors.ErrAMQPInitFail{Action: "failed to open a channel", Err: err} } return &EventService{ connection: conn, channel: ch, }, nil } // PublishEvent declares a queue and publishes events. func (e *EventService) PublishEvent(topic string, event event.Event) error { q, err := e.channel.QueueDeclare( topic, // name false, // durable false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) if err != nil { return &errors.ErrAMQPInitFail{Action: "failed declaring queue", Err: err} } eventBody, err := json.Marshal(event) if err != nil { return &errors.ErrCouldNotMarshall{Identifier: topic + " " + event.EntityID.String(), Type: event.Type, Err: err} } err = e.channel.Publish( "", // exchange q.Name, // routing key false, // mandatory false, // immediate amqp.Publishing{ ContentType: "application/json", Body: eventBody, }) if err != nil { return &errors.ErrAMQPMessageFail{Action: "failed to publish message", Err: err} } return nil } // CloseConnection closes an exisiting connection. func (e *EventService) CloseConnection() { e.connection.Close() }