Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
controller.go 13.96 KiB
package controller

import (
	"context"
	"encoding/base64"
	"fmt"
	"net"
	"net/http"
	"os"
	"os/signal"
	"sync"
	"syscall"

	"github.com/google/uuid"
	"github.com/sethvargo/go-password/password"
	log "github.com/sirupsen/logrus"
	"github.com/spf13/viper"
	"golang.org/x/crypto/argon2"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"

	apppb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/app"
	cmpb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/configurationmanagement"
	cpb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/csbi"
	mnepb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/networkelement"
	pipb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/plugin-internal"
	rpb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/plugin-registry"
	ppb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/pnd"
	apb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/rbac"
	tpb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/topology"

	"code.fbi.h-da.de/danet/gosdn/controller/app"

	"code.fbi.h-da.de/danet/gosdn/controller/config"
	"code.fbi.h-da.de/danet/gosdn/controller/conflict"
	eventservice "code.fbi.h-da.de/danet/gosdn/controller/eventService"
	"code.fbi.h-da.de/danet/gosdn/controller/interfaces/networkdomain"
	"code.fbi.h-da.de/danet/gosdn/controller/interfaces/networkelement"
	"code.fbi.h-da.de/danet/gosdn/controller/interfaces/plugin"
	"code.fbi.h-da.de/danet/gosdn/controller/interfaces/rbac"
	"code.fbi.h-da.de/danet/gosdn/controller/northbound/server"
	nbi "code.fbi.h-da.de/danet/gosdn/controller/northbound/server"
	rbacImpl "code.fbi.h-da.de/danet/gosdn/controller/rbac"
	"code.fbi.h-da.de/danet/gosdn/controller/store"
	"code.fbi.h-da.de/danet/gosdn/controller/topology"
	"code.fbi.h-da.de/danet/gosdn/controller/topology/nodes"
	"code.fbi.h-da.de/danet/gosdn/controller/topology/ports"
	routingtables "code.fbi.h-da.de/danet/gosdn/controller/topology/routing-tables"

	eventInterfaces "code.fbi.h-da.de/danet/gosdn/controller/interfaces/event"

	"code.fbi.h-da.de/danet/gosdn/controller/nucleus"
)

var coreLock sync.RWMutex
var coreOnce sync.Once

// Core is the representation of the controller's core.
type Core struct {
	pndStore              networkdomain.PndStore
	pndService            networkdomain.Service
	mneService            networkelement.Service
	changeStore           store.ChangeStore
	userService           rbac.UserService
	roleService           rbac.RoleService
	topologyService       topology.Service
	nodeService           nodes.Service
	portService           ports.Service
	routeService          routingtables.Service
	pluginService         plugin.Service
	httpServer            *http.Server
	grpcServer            *grpc.Server
	nbi                   *nbi.NorthboundInterface
	eventService          eventInterfaces.Service
	appService            app.ManagementService
	networkElementWatcher *nucleus.NetworkElementWatcher
	stopChan              chan os.Signal

	csbiClient           cpb.CsbiServiceClient
	pluginRegistryClient rpb.PluginRegistryServiceClient
}

var c *Core

// initialize does start-up housekeeping like reading controller config files.
func initialize() error {
	if err := config.InitializeConfig(); err != nil {
		return err
	}

	if err := config.ReadGnmiSubscriptionPaths(); err != nil {
		log.Error("Error reading in gNMI subscription paths, can not watch network elements automatically: ", err)
	}

	eventService, err := eventservice.NewEventService()
	if err != nil {
		return err
	}

	nodeService := nodes.NewNodeService(nodes.NewDatabaseNodeStore(), eventService)
	portService := ports.NewPortService(ports.NewDatabasePortStore(), eventService)
	routeService := routingtables.NewRoutingTableService(
		routingtables.NewDatabaseRoutingTableStore(),
		nodeService,
		portService,
		eventService,
	)

	pluginRegistryClient := setupPluginRegistryClient()
	pluginService := nucleus.NewPluginService(nucleus.NewPluginStore(), eventService, nucleus.NewPluginThroughReattachConfig, pluginRegistryClient)

	pndStore := nucleus.NewPndStore(pluginService)

	changeStore := store.NewChangeStore()

	c = &Core{
		pndStore:    pndStore,
		pndService:  nucleus.NewPndService(pndStore),
		mneService:  nucleus.NewNetworkElementService(nucleus.NewNetworkElementStore(), pluginService, eventService),
		changeStore: *changeStore,
		userService: rbacImpl.NewUserService(rbacImpl.NewUserStore(), eventService),
		roleService: rbacImpl.NewRoleService(rbacImpl.NewRoleStore(), eventService),
		topologyService: topology.NewTopologyService(
			topology.NewDatabaseTopologyStore(),
			nodeService,
			portService,
			eventService,
		),
		nodeService:          nodeService,
		portService:          portService,
		routeService:         routeService,
		eventService:         eventService,
		pluginService:        pluginService,
		appService:           app.NewAppService(app.NewAppStore()),
		stopChan:             make(chan os.Signal, 1),
		pluginRegistryClient: pluginRegistryClient,
	}

	// Setting up signal capturing
	signal.Notify(c.stopChan, os.Interrupt, syscall.SIGTERM)

	setupOrchestratorClient()

	if err := createPrincipalNetworkDomain(); err != nil {
		return err
	}

	c.networkElementWatcher = nucleus.NewNetworkElementWatcher(c.mneService, c.eventService)
	c.networkElementWatcher.SubscribeToNetworkElements(config.GetGnmiSubscriptionPaths(), nil)

	if err := ensureDefaultRoleExists(); err != nil {
		return err
	}

	if err := ensureDefaultUserExists(); err != nil {
		return err
	}

	if err := startGrpc(); err != nil {
		return err
	}

	coreLock.Lock()
	startHttpServer()
	coreLock.Unlock()

	return nil
}

func setupOrchestratorClient() {
	orchestrator := viper.GetString("csbi-orchestrator")
	conn, err := grpc.Dial(orchestrator, grpc.WithTransportCredentials(insecure.NewCredentials()))
	if err != nil {
		log.Fatal(err)
	}
	c.csbiClient = cpb.NewCsbiServiceClient(conn)
}

func setupPluginRegistryClient() rpb.PluginRegistryServiceClient {
	pluginRegistry := viper.GetString("plugin-registry")
	conn, err := grpc.Dial(pluginRegistry, grpc.WithTransportCredentials(insecure.NewCredentials()))
	if err != nil {
		log.Fatal(err)
	}
	return rpb.NewPluginRegistryServiceClient(conn)
}

func startGrpc() error {
	socket := viper.GetString("socket")
	lislisten, err := net.Listen("tcp", socket)
	if err != nil {
		return err
	}
	log.Infof("listening to %v", lislisten.Addr())

	jwtManager := rbacImpl.NewJWTManager(config.JWTSecret, config.JWTDuration)
	setupGRPCServerWithCorrectSecurityLevel(jwtManager, c.userService, c.roleService)

	c.nbi = nbi.NewNBI(
		c.pndStore,
		c.pndService,
		c.mneService,
		c.changeStore,
		c.userService,
		c.roleService,
		*jwtManager,
		c.topologyService,
		c.nodeService,
		c.portService,
		c.routeService,
		c.appService,
		c.pluginService,
		c.pluginRegistryClient,
		c.csbiClient,
		callback,
		c.networkElementWatcher,
	)

	ppb.RegisterPndServiceServer(c.grpcServer, c.nbi.Pnd)
	cpb.RegisterCsbiServiceServer(c.grpcServer, c.nbi.Csbi)
	apb.RegisterAuthServiceServer(c.grpcServer, c.nbi.Auth)
	apb.RegisterUserServiceServer(c.grpcServer, c.nbi.User)
	apb.RegisterRoleServiceServer(c.grpcServer, c.nbi.Role)
	apppb.RegisterAppServiceServer(c.grpcServer, c.nbi.App)
	tpb.RegisterTopologyServiceServer(c.grpcServer, c.nbi.Topology)
	mnepb.RegisterNetworkElementServiceServer(c.grpcServer, c.nbi.NetworkElement)
	tpb.RegisterRoutingTableServiceServer(c.grpcServer, c.nbi.Routes)
	pipb.RegisterPluginInternalServiceServer(c.grpcServer, c.nbi.Plugin)
	cmpb.RegisterConfigurationManagementServiceServer(c.grpcServer, c.nbi.ConfigurationManagement)

	go func() {
		if err := c.grpcServer.Serve(lislisten); err != nil {
			log.Fatal(err)
		}
	}()

	return nil
}

// createPrincipalNetworkDomain initializes the controller with an initial PND.
func createPrincipalNetworkDomain() error {
	basePnd, err := c.pndService.Get(store.Query{ID: config.BasePndUUID})
	if err != nil {
		log.Info(err)
	}

	if basePnd == nil {
		pnd := nucleus.NewPND(
			config.BasePndUUID,
			"base",
			"gosdn base pnd",
		)
		err = c.pndStore.Add(pnd)
		if err != nil {
			return err
		}
		return nil
	}

	return nil
}

func ensureDefaultRoleExists() error {
	defaultAdminRoleName := "admin"
	adminRole, err := c.roleService.Get(store.Query{ID: uuid.Nil, Name: defaultAdminRoleName})
	if err != nil {
		log.Info(err)
	}

	if adminRole == nil {
		err := c.roleService.Add(rbacImpl.NewRole(uuid.New(), defaultAdminRoleName, "admin role", []string{
			"/gosdn.pnd.PndService/GetPnd",
			"/gosdn.pnd.PndService/GetPndList",
			"/gosdn.pnd.PndService/CreatePndList",
			"/gosdn.pnd.PndService/DeletePnd",
			"/gosdn.rbac.UserService/CreateUsers",
			"/gosdn.rbac.UserService/GetUser",
			"/gosdn.rbac.UserService/GetUsers",
			"/gosdn.rbac.UserService/UpdateUsers",
			"/gosdn.rbac.UserService/DeleteUsers",
			"/gosdn.rbac.RoleService/CreateRoles",
			"/gosdn.rbac.RoleService/GetRole",
			"/gosdn.rbac.RoleService/GetRoles",
			"/gosdn.rbac.RoleService/UpdateRoles",
			"/gosdn.rbac.RoleService/DeletePermissionsForRole",
			"/gosdn.rbac.RoleService/DeleteRoles",
			"/gosdn.networkelement.NetworkElementService/GetMne",
			"/gosdn.networkelement.NetworkElementService/GetFlattenedMne",
			"/gosdn.networkelement.NetworkElementService/GetFlattenedMneList",
			"/gosdn.networkelement.NetworkElementService/GetPath",
			"/gosdn.networkelement.NetworkElementService/GetIntendedPath",
			"/gosdn.networkelement.NetworkElementService/GetChange",
			"/gosdn.networkelement.NetworkElementService/GetChangeList",
			"/gosdn.networkelement.NetworkElementService/SetMneList",
			"/gosdn.networkelement.NetworkElementService/SetChangeList",
			"/gosdn.networkelement.NetworkElementService/SetPathList",
			"/gosdn.networkelement.NetworkElementService/DeviceSchema",
			"/gosdn.networkelement.NetworkElementService/DeleteMne",
			"/gosdn.networkelement.NetworkElementService/SubscribePath",
			"/gosdn.plugin_internal.PluginInternalService/AvailablePlugins",
			"/gosdn.plugin_internal.PluginInternalService/GetPluginSchema",
			"/gosdn.app.AppService/Register",
			"/gosdn.app.AppService/Deregister",
			"/gosdn.configurationmanagement.ConfigurationManagementService/ExportSDNConfig",
			"/gosdn.configurationmanagement.ConfigurationManagementService/ImportSDNConfig",
			"/gosdn.topology.RoutingTableService/AddRoutingTable",
			"/gosdn.topology.RoutingTableService/GetRoutes",
			"/gosdn.topology.RoutingTableService/DeleteRoute",
			"/gosdn.topology.TopologyService/AddLink",
			"/gosdn.topology.TopologyService/GetTopology",
			"/gosdn.topology.TopologyService/UpdateLink",
			"/gosdn.topology.TopologyService/DeleteLink",
		}))
		if err != nil {
			return err
		}
	}

	return nil
}

func ensureDefaultUserExists() error {
	defaultUserName := "admin"
	adminUser, err := c.userService.Get(store.Query{ID: uuid.Nil, Name: defaultUserName})
	if err != nil {
		log.Info(err)
	}
	if adminUser == nil {
		// Getting the password from the environment variable which is set in gosdn.clab.yaml.
		var preDefinedPassword = os.Getenv("GOSDN_ADMIN_PASSWORD")
		var usedPassword string

		// If environment variable is set and password is not 0, the password from the environment variable will be used.
		if len(preDefinedPassword) == 0 {
			// Generate a password that is 16 characters long with 3 digits, 0 symbols,
			// allowing upper and lower case letters, disallowing repeat characters.
			generatedPassword, err := password.Generate(16, 3, 0, true, false)
			if err != nil {
				log.Fatal(err)
			}
			usedPassword = generatedPassword
		} else {
			usedPassword = preDefinedPassword
		}

		salt, err := password.Generate(16, 3, 0, true, false)
		if err != nil {
			log.Fatal(err)
		}
		hashedPassword := base64.RawStdEncoding.EncodeToString(argon2.IDKey([]byte(usedPassword), []byte(salt), 1, 64*1024, 4, 32))

		err = c.userService.Add(rbacImpl.NewUser(uuid.New(), defaultUserName, map[string]string{config.BasePndUUID.String(): "admin"}, string(hashedPassword), "", salt, conflict.Metadata{ResourceVersion: 0}))
		if err != nil {
			return err
		}

		fmt.Printf("########\n Generated admin password: %s\n########\n", usedPassword)
	}

	return nil
}

// Run calls initialize to start the controller.
func Run(ctx context.Context) error {
	var initError error
	coreOnce.Do(func() {
		initError = initialize()
	})
	if initError != nil {
		log.WithFields(log.Fields{}).Error(initError)
		return initError
	}

	log.WithFields(log.Fields{}).Info("initialisation finished")

	select {
	case <-c.stopChan:
		return shutdown()
	case <-ctx.Done():
		return shutdown()
	}
}

func shutdown() error {
	log.Info("shutting down controller")
	coreLock.Lock()
	defer func() {
		plugins, err := c.pluginService.GetAll()
		if err != nil {
			log.Error(err)
		}
		for _, plugin := range plugins {
			log.Info("Defer: ", plugin.Manifest().Name)
			plugin.Close()
			log.Info("Defer - exited: ", plugin.GetClient().Exited())
		}
		coreLock.Unlock()
	}()
	c.grpcServer.GracefulStop()
	c.eventService.CloseConnection()
	return stopHttpServer()
}

func callback(id uuid.UUID, ch chan networkelement.Details) {
	if ch != nil {
		c.pndStore.AddPendingChannel(id, ch)
		log.Infof("pending channel %v added", id)
	} else {
		c.pndStore.RemovePendingChannel(id)
		log.Infof("pending channel %v removed", id)
	}
}

// setupGRPCServerWithCorrectSecurityLevel sets up a gRPC server with desired security level
//
// Only two options for now: insecure or secure, add 'else if' if required.
// Secure is the recommended mode and is set as default.
// Insecure starts the controller without the gRPC interceptor which is supposed to handle authz.
// This allows users to operate on the controller without any authentication/authorization,
// but they could still login if they want to.
// Use insecure only for testing purposes and with caution.
func setupGRPCServerWithCorrectSecurityLevel(jwt *rbacImpl.JWTManager, userService rbac.UserService, roleService rbac.RoleService) {
	securityLevel := viper.GetString("security")
	if securityLevel == "insecure" {
		c.grpcServer = grpc.NewServer()
		log.Info("set up grpc server in insecure mode")
	} else {
		interceptor := server.NewAuthInterceptor(jwt, userService, roleService)
		c.grpcServer = grpc.NewServer(grpc.UnaryInterceptor(interceptor.Unary()), grpc.StreamInterceptor(interceptor.Stream()))
		log.Info("set up grpc server in secure mode")
	}
}