package event

import (
	"fmt"
	"os"

	"code.fbi.h-da.de/danet/gosdn/controller/customerrs"
	amqp "github.com/rabbitmq/amqp091-go"

	"github.com/sirupsen/logrus"
)

type (
	eventCallbackFunc func(*Event)
)

// TypeToCallbackTuple provides a mapping between an event type and a provided callback.
type TypeToCallbackTuple struct {
	Type     Type
	Callback eventCallbackFunc
}

// ServiceInterface defines an event service.
type ServiceInterface interface {
	SubscribeToEventType([]TypeToCallbackTuple)
	SetupEventReciever(stopChan chan os.Signal)
	CloseConnection()
}

// Service is used to setup a connection to a broker and publish events to topics.
type Service struct {
	connection *amqp.Connection
	channel    *amqp.Channel
	subscriber map[Type]eventCallbackFunc
	topics     []Topic
}

// NewEventService creates a new connection to the broker and opens a channel for later usage.
func NewEventService(brokerConnectionString string, topics []Topic) (ServiceInterface, error) {
	conn, err := amqp.Dial(brokerConnectionString)
	if err != nil {
		return nil, &customerrs.AMQPInitFailError{Action: "failed to connect to RabbitMQ", Err: err}
	}

	ch, err := conn.Channel()
	if err != nil {
		return nil, &customerrs.AMQPInitFailError{Action: "failed to open a channel", Err: err}
	}

	return &Service{
		connection: conn,
		channel:    ch,
		subscriber: map[Type]eventCallbackFunc{},
		topics:     topics,
	}, nil
}

// SubscribeToEventType subscribes a provided callback to an event.
func (e *Service) SubscribeToEventType(eventTypeToCallbackMapping []TypeToCallbackTuple) {
	for _, eventTypeToCallback := range eventTypeToCallbackMapping {
		e.subscriber[eventTypeToCallback.Type] = eventTypeToCallback.Callback
	}
}

// SetupEventReciever will be removed in the future.
func (e *Service) SetupEventReciever(stopChan chan os.Signal) {
	for _, topic := range e.topics {
		go func(topic string) {
			setupErr := e.setupQueueConsume(topic, stopChan)
			if setupErr != nil {
				stopChan <- os.Interrupt
			}
		}(topic.String())
	}
}

// setupQueueConsume sets up a connection to the broker, listens to messages on a specific topic
// and writes them into a channel.
func (e *Service) setupQueueConsume(topic string, stopChan chan os.Signal) error {
	err := e.channel.ExchangeDeclare(
		topic,    // name
		"fanout", // type
		true,     // durable
		false,    // auto-deleted
		false,    // internal
		false,    // no-wait
		nil,      // arguments
	)
	if err != nil {
		return &customerrs.AMQPInitFailError{Action: "failed declaring exchange", Err: err}
	}

	q, err := e.channel.QueueDeclare(
		"",    // name
		false, // durable
		false, // delete when unused
		true,  // exclusive
		false, // no-wait
		nil,   // arguments
	)
	if err != nil {
		return &customerrs.AMQPInitFailError{Action: "failed declaring queue", Err: err}
	}

	err = e.channel.QueueBind(
		q.Name, // queue name
		"",     // routing key
		topic,  // exchange
		false,
		nil,
	)
	if err != nil {
		return &customerrs.AMQPInitFailError{Action: "failed binding queue to exchange", Err: err}
	}

	msgs, err := e.channel.Consume(
		q.Name, // queue
		"",     // consumer
		true,   // auto-ack
		false,  // exclusive
		false,  // no-local
		false,  // no-wait
		nil,    // args
	)

	if err != nil {
		return &customerrs.AMQPInitFailError{Action: "failed to register consumer", Err: err}
	}

	var forever chan struct{}

	go func() {
		for d := range msgs {
			event, err := getEventFromPayload(d.Body)
			if err != nil {
				fmt.Printf("Error: +%v\n", err)
			}

			val, ok := e.subscriber[parseTypeString(event.Type)]
			if ok {
				val(event)
			}
		}
	}()

	<-forever

	return nil
}

// CloseConnection closes an exisiting connection.
func (e *Service) CloseConnection() {
	err := e.connection.Close()
	if err != nil {
		logrus.Errorf("could not close connection %s", err)
	}
}