Skip to content
Snippets Groups Projects
Service.go 5.72 KiB
Newer Older
  • Learn to ignore specific revisions
  • Andre Sterba's avatar
    Andre Sterba committed
    	"context"
    
    
    	"code.fbi.h-da.de/danet/gosdn/controller/config"
    
    	"code.fbi.h-da.de/danet/gosdn/controller/customerrs"
    
    	"code.fbi.h-da.de/danet/gosdn/controller/event"
    
    	interfaces "code.fbi.h-da.de/danet/gosdn/controller/interfaces/event"
    
    	amqp "github.com/rabbitmq/amqp091-go"
    
    	log "github.com/sirupsen/logrus"
    
    const (
    	// Duration to try reconnecting until cancelation.
    	reconnectDelay = 5 * time.Second
    
    	// Duration to try re-initializing a channel.
    	reInitDelay = 2 * time.Second
    
    	// Max number of tries to reconnect to Broker.
    	reconnectAttempts = 5
    
    	// Max number of tries to re-initialize a channel. As re-init is part of the reconnect process this multiplies
    	// with number of reconnectAttempts.
    	reInitAttempts = 5
    
    	// Max number of tries to republish a failed message.
    	rePublishAttempts = 5
    )
    
    
    // EventService is used to setup a connection to a broker and publish events to topics.
    type EventService struct {
    
    	connectionAddr  string
    	connection      *amqp.Connection
    	channel         *amqp.Channel
    	done            chan bool
    	notifyConnClose chan *amqp.Error
    	notifyChanClose chan *amqp.Error
    	isReady         bool
    
    }
    
    // NewEventService creates a new connection to the broker and opens a channel for later usage.
    func NewEventService() (interfaces.Service, error) {
    	// TODO: This is an fugly hack to mitigate that some tests are trying to connect to an actual broker. (staester)
    	if config.AMQPPrefix == "" {
    		return NewMockEventService(), nil
    	}
    
    
    	addr := amqpURIBuilder(config.AMQPPrefix, config.AMQPUser, config.AMQPPassword, config.AMQPHost, config.AMQPPort)
    
    	conn, err := connect(addr)
    	if err != nil {
    		return nil, err
    	}
    
    	notifyConnClose := make(chan *amqp.Error)
    	conn.NotifyClose(notifyConnClose)
    
    	ch, err := initChannel(conn)
    	if err != nil {
    		return nil, err
    	}
    	notifyChanClose := make(chan *amqp.Error)
    	ch.NotifyClose(notifyChanClose)
    
    	return &EventService{
    		connectionAddr:  addr,
    		connection:      conn,
    		channel:         ch,
    		done:            make(chan bool),
    		notifyConnClose: notifyConnClose,
    		notifyChanClose: notifyChanClose,
    	}, nil
    }
    
    func connect(addr string) (*amqp.Connection, error) {
    
    	var err error
    	// Will equal two Minuten of retries
    	retries := 60
    
    	logrus.Infof("will try to connect to rabbitmq: %s", addr)
    
    	for i := 0; i < retries; i++ {
    		conn, err := amqp.Dial(addr)
    		if err == nil {
    			logrus.Info("Connected to RabbitMQ")
    			return conn, nil
    		}
    		logrus.Errorf("could not connect to RabbitMQ with error: %s. Retrying in 2 seconds.", err.Error())
    		time.Sleep(2 * time.Second)
    
    	return nil, &customerrs.AMQPInitFailError{Action: "finally failed to connect to RabbitMQ", Err: err}
    
    }
    
    // Reconnect attempts to setup a new connection to the RabbitMQ server after an disconnect.
    func (e *EventService) Reconnect() {
    	i := 1
    	for {
    		e.isReady = false
    
    		conn, err := connect(e.connectionAddr)
    
    		if err != nil {
    			select {
    			case <-e.done:
    				return
    			case <-time.After(reconnectDelay):
    			}
    			continue
    		}
    
    		done := e.handleReInit(conn)
    		if done || i == reconnectAttempts {
    			break
    		}
    		i++
    	}
    }
    
    func (e *EventService) handleReInit(conn *amqp.Connection) bool {
    	i := 1
    	for {
    		e.isReady = false
    
    		ch, err := initChannel(conn)
    		if err != nil {
    			select {
    			case <-e.done:
    				return true
    			case <-time.After(reInitDelay):
    				i++
    			}
    			if i == reInitAttempts {
    				return false
    			}
    			continue
    		}
    
    		notifyConnClose := make(chan *amqp.Error)
    		conn.NotifyClose(notifyConnClose)
    		e.notifyConnClose = notifyConnClose
    		e.connection = conn
    
    		notifyChanClose := make(chan *amqp.Error)
    		ch.NotifyClose(notifyChanClose)
    		e.notifyChanClose = notifyChanClose
    		e.channel = ch
    		e.isReady = true
    
    		return true
    	}
    }
    
    func initChannel(conn *amqp.Connection) (*amqp.Channel, error) {
    
    		return nil, &customerrs.AMQPInitFailError{Action: "failed to open a channel", Err: err}
    
    }
    
    // PublishEvent declares a queue and publishes events.
    func (e *EventService) PublishEvent(topic string, event event.Event) error {
    
    	err := e.channel.ExchangeDeclare(
    		topic,    // name
    		"fanout", // type
    		true,     // durable
    		false,    // auto-deleted
    		false,    // internal
    		false,    // no-wait
    		nil,      // arguments
    
    		return &customerrs.AMQPInitFailError{Action: "failed declaring exchange", Err: err}
    
    		return &customerrs.CouldNotMarshallError{Identifier: topic + " " + event.EntityID.String(), Type: event.Type, Err: err}
    
    	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    	defer cancel()
    
    
    Andre Sterba's avatar
    Andre Sterba committed
    	err = e.channel.PublishWithContext(
    
    		ctx,
    		topic, // exchange
    		"",    // routing key
    		false, // mandatory
    		false, // immediate
    
    		amqp.Publishing{
    			ContentType: "application/json",
    			Body:        eventBody,
    		})
    	if err != nil {
    
    		return &customerrs.AMQPMessageFailError{Action: "failed to publish message", Err: err}
    
    // RetryPublish is used to retry publishing an event after a failed attempt.
    func (e *EventService) RetryPublish(topic string, event event.Event) error {
    	ctx := context.Background()
    	backOff := retry.NewFibonacci(time.Second)
    
    	if err := retry.Do(ctx, retry.WithMaxRetries(rePublishAttempts, backOff), func(ctx context.Context) error {
    		if pubErr := e.PublishEvent(topic, event); pubErr != nil {
    			return retry.RetryableError(&customerrs.AMQPMessageFailError{Action: "retrying to publish message failed", Err: pubErr})
    		}
    		return nil
    	}); err != nil {
    		return err
    	}
    
    	return nil
    }
    
    
    // CloseConnection closes an exisiting connection.
    func (e *EventService) CloseConnection() {
    
    	if err := e.connection.Close(); err != nil {
    		log.Error(err)
    	}