Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
controller.go 5.47 KiB
package controller

import (
	"context"
	"net"
	"net/http"
	"os"
	"os/signal"
	"sync"
	"syscall"
	"time"

	"github.com/google/uuid"
	log "github.com/sirupsen/logrus"
	"github.com/spf13/viper"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"

	pb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/core"
	cpb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/csbi"
	ppb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/pnd"
	apb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/rbac"
	spb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/southbound"
	"code.fbi.h-da.de/danet/gosdn/controller/config"
	"code.fbi.h-da.de/danet/gosdn/controller/interfaces/device"
	"code.fbi.h-da.de/danet/gosdn/controller/interfaces/networkdomain"
	"code.fbi.h-da.de/danet/gosdn/controller/interfaces/southbound"
	"code.fbi.h-da.de/danet/gosdn/controller/northbound/server"
	nbi "code.fbi.h-da.de/danet/gosdn/controller/northbound/server"
	"code.fbi.h-da.de/danet/gosdn/controller/rbac"
	"code.fbi.h-da.de/danet/gosdn/controller/store"

	"code.fbi.h-da.de/danet/gosdn/controller/nucleus"
)

var coreLock sync.RWMutex
var coreOnce sync.Once

// Core is the representation of the controller's core
type Core struct {
	pndc       networkdomain.PndStore
	httpServer *http.Server
	grpcServer *grpc.Server
	nbi        *nbi.NorthboundInterface
	stopChan   chan os.Signal

	csbiClient cpb.CsbiServiceClient
}

var c *Core

// initialize does start-up housekeeping like reading controller config files
func initialize() error {
	err := config.InitializeConfig()
	if err != nil {
		return err
	}

	c = &Core{
		pndc:     nucleus.NewPndStore(),
		stopChan: make(chan os.Signal, 1),
	}

	// Setting up signal capturing
	signal.Notify(c.stopChan, os.Interrupt, syscall.SIGTERM)

	if err := startGrpc(); err != nil {
		return err
	}
	coreLock.Lock()
	startHttpServer()
	coreLock.Unlock()

	err = config.InitializeConfig()
	if err != nil {
		return err
	}

	err = createPrincipalNetworkDomain()
	if err != nil {
		return err
	}

	return nil
}

func startGrpc() error {
	sock := viper.GetString("socket")
	lis, err := net.Listen("tcp", sock)
	if err != nil {
		return err
	}
	log.Infof("listening to %v", lis.Addr())

	jwtManager := rbac.NewJWTManager("", (60 * time.Minute)) //TODO add real secret and proper duration data here!
	setupGRPCServerWithCorrectSecurityLevel(jwtManager)

	c.nbi = nbi.NewNBI(c.pndc)
	c.nbi.Auth = nbi.NewAuthServer(jwtManager)

	pb.RegisterCoreServiceServer(c.grpcServer, c.nbi.Core)
	ppb.RegisterPndServiceServer(c.grpcServer, c.nbi.Pnd)
	cpb.RegisterCsbiServiceServer(c.grpcServer, c.nbi.Csbi)
	spb.RegisterSbiServiceServer(c.grpcServer, c.nbi.Sbi)
	apb.RegisterAuthServiceServer(c.grpcServer, c.nbi.Auth)
	go func() {
		if err := c.grpcServer.Serve(lis); err != nil {
			log.Fatal(err)
		}
	}()

	orchestrator := viper.GetString("csbi-orchestrator")
	conn, err := grpc.Dial(orchestrator, grpc.WithTransportCredentials(insecure.NewCredentials()))
	if err != nil {
		log.Fatal(err)
	}
	c.csbiClient = cpb.NewCsbiServiceClient(conn)
	return nil
}

// createSouthboundInterfaces initializes the controller with its supported SBIs
func createSouthboundInterfaces() (southbound.SouthboundInterface, error) {
	sbi, err := nucleus.NewSBI(spb.Type(config.BaseSouthBoundType), config.BaseSouthBoundUUID)
	if err != nil {
		return nil, err
	}

	return sbi, nil
}

// createPrincipalNetworkDomain initializes the controller with an initial PND
func createPrincipalNetworkDomain() error {
	basePnd, err := c.pndc.Get(store.Query{ID: config.BasePndUUID})
	if err != nil {
		log.Info(err)
	}

	if basePnd == nil {
		pnd, err := nucleus.NewPND(
			"base",
			"gosdn base pnd",
			config.BasePndUUID,
			c.csbiClient,
			callback,
		)
		if err != nil {
			return err
		}
		err = c.pndc.Add(pnd)
		if err != nil {
			return err
		}
		return nil
	}

	return nil
}

// Run calls initialize to start the controller
func Run(ctx context.Context) error {
	var initError error
	coreOnce.Do(func() {
		initError = initialize()
	})
	if initError != nil {
		log.WithFields(log.Fields{}).Error(initError)
		return initError
	}

	log.WithFields(log.Fields{}).Info("initialisation finished")

	select {
	case <-c.stopChan:
		return shutdown()
	case <-ctx.Done():
		return shutdown()
	}
}

func shutdown() error {
	log.Info("shutting down controller")
	coreLock.Lock()
	defer coreLock.Unlock()
	c.grpcServer.GracefulStop()
	return stopHttpServer()
}

func callback(id uuid.UUID, ch chan device.Details) {
	if ch != nil {
		c.pndc.AddPendingChannel(id, ch)
		log.Infof("pending channel %v added", id)
	} else {
		c.pndc.RemovePendingChannel(id)
		log.Infof("pending channel %v removed", id)
	}
}

// setupGRPCServerWithCorrectSecurityLevel sets up a gRPC server with desired security level
//
// Only two options for now: insecure or secure, add 'else if' if required.
// Secure is the recommended mode and is set as default.
// Insecure starts the controller without the gRPC interceptor which is supposed to handle authz.
// This allows users to operate on the controller without any authentication/authorization,
// but they could still login if they want to.
// Use insecure only for testing purposes and with caution.
func setupGRPCServerWithCorrectSecurityLevel(jwt *rbac.JWTManager) {
	securityLevel := viper.GetString("security")
	if securityLevel == "insecure" {
		c.grpcServer = grpc.NewServer()
		log.Info("set up grpc server in insecure mode")
	} else {
		interceptor := server.NewAuthInterceptor(jwt)
		c.grpcServer = grpc.NewServer(grpc.UnaryInterceptor(interceptor.Unary()))
		log.Info("set up grpc server in secure mode")
	}
}