-
Andre Sterba authored
See merge request !267
Andre Sterba authoredSee merge request !267
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
controller.go 5.47 KiB
package controller
import (
"context"
"net"
"net/http"
"os"
"os/signal"
"sync"
"syscall"
"time"
"github.com/google/uuid"
log "github.com/sirupsen/logrus"
"github.com/spf13/viper"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
pb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/core"
cpb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/csbi"
ppb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/pnd"
apb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/rbac"
spb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/southbound"
"code.fbi.h-da.de/danet/gosdn/controller/config"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/device"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/networkdomain"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/southbound"
"code.fbi.h-da.de/danet/gosdn/controller/northbound/server"
nbi "code.fbi.h-da.de/danet/gosdn/controller/northbound/server"
"code.fbi.h-da.de/danet/gosdn/controller/rbac"
"code.fbi.h-da.de/danet/gosdn/controller/store"
"code.fbi.h-da.de/danet/gosdn/controller/nucleus"
)
var coreLock sync.RWMutex
var coreOnce sync.Once
// Core is the representation of the controller's core
type Core struct {
pndc networkdomain.PndStore
httpServer *http.Server
grpcServer *grpc.Server
nbi *nbi.NorthboundInterface
stopChan chan os.Signal
csbiClient cpb.CsbiServiceClient
}
var c *Core
// initialize does start-up housekeeping like reading controller config files
func initialize() error {
err := config.InitializeConfig()
if err != nil {
return err
}
c = &Core{
pndc: nucleus.NewPndStore(),
stopChan: make(chan os.Signal, 1),
}
// Setting up signal capturing
signal.Notify(c.stopChan, os.Interrupt, syscall.SIGTERM)
if err := startGrpc(); err != nil {
return err
}
coreLock.Lock()
startHttpServer()
coreLock.Unlock()
err = config.InitializeConfig()
if err != nil {
return err
}
err = createPrincipalNetworkDomain()
if err != nil {
return err
}
return nil
}
func startGrpc() error {
sock := viper.GetString("socket")
lis, err := net.Listen("tcp", sock)
if err != nil {
return err
}
log.Infof("listening to %v", lis.Addr())
jwtManager := rbac.NewJWTManager("", (60 * time.Minute)) //TODO add real secret and proper duration data here!
setupGRPCServerWithCorrectSecurityLevel(jwtManager)
c.nbi = nbi.NewNBI(c.pndc)
c.nbi.Auth = nbi.NewAuthServer(jwtManager)
pb.RegisterCoreServiceServer(c.grpcServer, c.nbi.Core)
ppb.RegisterPndServiceServer(c.grpcServer, c.nbi.Pnd)
cpb.RegisterCsbiServiceServer(c.grpcServer, c.nbi.Csbi)
spb.RegisterSbiServiceServer(c.grpcServer, c.nbi.Sbi)
apb.RegisterAuthServiceServer(c.grpcServer, c.nbi.Auth)
go func() {
if err := c.grpcServer.Serve(lis); err != nil {
log.Fatal(err)
}
}()
orchestrator := viper.GetString("csbi-orchestrator")
conn, err := grpc.Dial(orchestrator, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatal(err)
}
c.csbiClient = cpb.NewCsbiServiceClient(conn)
return nil
}
// createSouthboundInterfaces initializes the controller with its supported SBIs
func createSouthboundInterfaces() (southbound.SouthboundInterface, error) {
sbi, err := nucleus.NewSBI(spb.Type(config.BaseSouthBoundType), config.BaseSouthBoundUUID)
if err != nil {
return nil, err
}
return sbi, nil
}
// createPrincipalNetworkDomain initializes the controller with an initial PND
func createPrincipalNetworkDomain() error {
basePnd, err := c.pndc.Get(store.Query{ID: config.BasePndUUID})
if err != nil {
log.Info(err)
}
if basePnd == nil {
pnd, err := nucleus.NewPND(
"base",
"gosdn base pnd",
config.BasePndUUID,
c.csbiClient,
callback,
)
if err != nil {
return err
}
err = c.pndc.Add(pnd)
if err != nil {
return err
}
return nil
}
return nil
}
// Run calls initialize to start the controller
func Run(ctx context.Context) error {
var initError error
coreOnce.Do(func() {
initError = initialize()
})
if initError != nil {
log.WithFields(log.Fields{}).Error(initError)
return initError
}
log.WithFields(log.Fields{}).Info("initialisation finished")
select {
case <-c.stopChan:
return shutdown()
case <-ctx.Done():
return shutdown()
}
}
func shutdown() error {
log.Info("shutting down controller")
coreLock.Lock()
defer coreLock.Unlock()
c.grpcServer.GracefulStop()
return stopHttpServer()
}
func callback(id uuid.UUID, ch chan device.Details) {
if ch != nil {
c.pndc.AddPendingChannel(id, ch)
log.Infof("pending channel %v added", id)
} else {
c.pndc.RemovePendingChannel(id)
log.Infof("pending channel %v removed", id)
}
}
// setupGRPCServerWithCorrectSecurityLevel sets up a gRPC server with desired security level
//
// Only two options for now: insecure or secure, add 'else if' if required.
// Secure is the recommended mode and is set as default.
// Insecure starts the controller without the gRPC interceptor which is supposed to handle authz.
// This allows users to operate on the controller without any authentication/authorization,
// but they could still login if they want to.
// Use insecure only for testing purposes and with caution.
func setupGRPCServerWithCorrectSecurityLevel(jwt *rbac.JWTManager) {
securityLevel := viper.GetString("security")
if securityLevel == "insecure" {
c.grpcServer = grpc.NewServer()
log.Info("set up grpc server in insecure mode")
} else {
interceptor := server.NewAuthInterceptor(jwt)
c.grpcServer = grpc.NewServer(grpc.UnaryInterceptor(interceptor.Unary()))
log.Info("set up grpc server in secure mode")
}
}