package eventservice

import (
	"context"
	"encoding/json"
	"time"

	"code.fbi.h-da.de/danet/gosdn/controller/config"
	"code.fbi.h-da.de/danet/gosdn/controller/customerrs"
	"code.fbi.h-da.de/danet/gosdn/controller/event"

	interfaces "code.fbi.h-da.de/danet/gosdn/controller/interfaces/event"

	amqp "github.com/rabbitmq/amqp091-go"
	"github.com/sethvargo/go-retry"
	"github.com/sirupsen/logrus"
	log "github.com/sirupsen/logrus"
)

const (
	// Duration to try reconnecting until cancelation.
	reconnectDelay = 5 * time.Second

	// Duration to try re-initializing a channel.
	reInitDelay = 2 * time.Second

	// Max number of tries to reconnect to Broker.
	reconnectAttempts = 5

	// Max number of tries to re-initialize a channel. As re-init is part of the reconnect process this multiplies
	// with number of reconnectAttempts.
	reInitAttempts = 5

	// Max number of tries to republish a failed message.
	rePublishAttempts = 5
)

// EventService is used to setup a connection to a broker and publish events to topics.
type EventService struct {
	connectionAddr  string
	connection      *amqp.Connection
	channel         *amqp.Channel
	done            chan bool
	notifyConnClose chan *amqp.Error
	notifyChanClose chan *amqp.Error
	isReady         bool
}

// NewEventService creates a new connection to the broker and opens a channel for later usage.
func NewEventService() (interfaces.Service, error) {
	// TODO: This is an fugly hack to mitigate that some tests are trying to connect to an actual broker. (staester)
	if config.AMQPPrefix == "" {
		return NewMockEventService(), nil
	}

	addr := amqpURIBuilder(config.AMQPPrefix, config.AMQPUser, config.AMQPPassword, config.AMQPHost, config.AMQPPort)

	conn, err := connect(addr)
	if err != nil {
		return nil, err
	}

	notifyConnClose := make(chan *amqp.Error)
	conn.NotifyClose(notifyConnClose)

	ch, err := initChannel(conn)
	if err != nil {
		return nil, err
	}
	notifyChanClose := make(chan *amqp.Error)
	ch.NotifyClose(notifyChanClose)

	return &EventService{
		connectionAddr:  addr,
		connection:      conn,
		channel:         ch,
		done:            make(chan bool),
		notifyConnClose: notifyConnClose,
		notifyChanClose: notifyChanClose,
	}, nil
}

func connect(addr string) (*amqp.Connection, error) {
	var err error
	// Will equal two Minuten of retries
	retries := 60

	logrus.Infof("will try to connect to rabbitmq: %s", addr)

	for i := 0; i < retries; i++ {
		conn, err := amqp.Dial(addr)
		if err == nil {
			logrus.Info("Connected to RabbitMQ")
			return conn, nil
		}
		logrus.Errorf("could not connect to RabbitMQ with error: %s. Retrying in 2 seconds.", err.Error())
		time.Sleep(2 * time.Second)
	}

	return nil, &customerrs.AMQPInitFailError{Action: "finally failed to connect to RabbitMQ", Err: err}
}

// Reconnect attempts to setup a new connection to the RabbitMQ server after an disconnect.
func (e *EventService) Reconnect() {
	i := 1
	for {
		e.isReady = false

		conn, err := connect(e.connectionAddr)

		if err != nil {
			select {
			case <-e.done:
				return
			case <-time.After(reconnectDelay):
			}
			continue
		}

		done := e.handleReInit(conn)
		if done || i == reconnectAttempts {
			break
		}
		i++
	}
}

func (e *EventService) handleReInit(conn *amqp.Connection) bool {
	i := 1
	for {
		e.isReady = false

		ch, err := initChannel(conn)
		if err != nil {
			select {
			case <-e.done:
				return true
			case <-time.After(reInitDelay):
				i++
			}
			if i == reInitAttempts {
				return false
			}
			continue
		}

		notifyConnClose := make(chan *amqp.Error)
		conn.NotifyClose(notifyConnClose)
		e.notifyConnClose = notifyConnClose
		e.connection = conn

		notifyChanClose := make(chan *amqp.Error)
		ch.NotifyClose(notifyChanClose)
		e.notifyChanClose = notifyChanClose
		e.channel = ch
		e.isReady = true

		return true
	}
}

func initChannel(conn *amqp.Connection) (*amqp.Channel, error) {
	ch, err := conn.Channel()
	if err != nil {
		return nil, &customerrs.AMQPInitFailError{Action: "failed to open a channel", Err: err}
	}

	return ch, nil
}

// PublishEvent declares a queue and publishes events.
func (e *EventService) PublishEvent(topic string, event event.Event) error {
	err := e.channel.ExchangeDeclare(
		topic,    // name
		"fanout", // type
		true,     // durable
		false,    // auto-deleted
		false,    // internal
		false,    // no-wait
		nil,      // arguments
	)
	if err != nil {
		return &customerrs.AMQPInitFailError{Action: "failed declaring exchange", Err: err}
	}

	eventBody, err := json.Marshal(event)
	if err != nil {
		return &customerrs.CouldNotMarshallError{Identifier: topic + " " + event.EntityID.String(), Type: event.Type, Err: err}
	}

	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()

	err = e.channel.PublishWithContext(
		ctx,
		topic, // exchange
		"",    // routing key
		false, // mandatory
		false, // immediate
		amqp.Publishing{
			ContentType: "application/json",
			Body:        eventBody,
		})
	if err != nil {
		return &customerrs.AMQPMessageFailError{Action: "failed to publish message", Err: err}
	}

	return nil
}

// RetryPublish is used to retry publishing an event after a failed attempt.
func (e *EventService) RetryPublish(topic string, event event.Event) error {
	ctx := context.Background()
	backOff := retry.NewFibonacci(time.Second)

	if err := retry.Do(ctx, retry.WithMaxRetries(rePublishAttempts, backOff), func(ctx context.Context) error {
		if pubErr := e.PublishEvent(topic, event); pubErr != nil {
			return retry.RetryableError(&customerrs.AMQPMessageFailError{Action: "retrying to publish message failed", Err: pubErr})
		}
		return nil
	}); err != nil {
		return err
	}

	return nil
}

// CloseConnection closes an exisiting connection.
func (e *EventService) CloseConnection() {
	if err := e.connection.Close(); err != nil {
		log.Error(err)
	}
}