-
Malte Bauch authored
Resolve "Plugin is created and persisted in the database even if the creation of a network element failed" See merge request !544
Malte Bauch authoredResolve "Plugin is created and persisted in the database even if the creation of a network element failed" See merge request !544
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
controller.go 13.96 KiB
package controller
import (
"context"
"encoding/base64"
"fmt"
"net"
"net/http"
"os"
"os/signal"
"sync"
"syscall"
"github.com/google/uuid"
"github.com/sethvargo/go-password/password"
log "github.com/sirupsen/logrus"
"github.com/spf13/viper"
"golang.org/x/crypto/argon2"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
apppb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/app"
cmpb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/configurationmanagement"
cpb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/csbi"
mnepb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/networkelement"
pipb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/plugin-internal"
rpb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/plugin-registry"
ppb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/pnd"
apb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/rbac"
tpb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/topology"
"code.fbi.h-da.de/danet/gosdn/controller/app"
"code.fbi.h-da.de/danet/gosdn/controller/config"
"code.fbi.h-da.de/danet/gosdn/controller/conflict"
eventservice "code.fbi.h-da.de/danet/gosdn/controller/eventService"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/networkdomain"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/networkelement"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/plugin"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/rbac"
"code.fbi.h-da.de/danet/gosdn/controller/northbound/server"
nbi "code.fbi.h-da.de/danet/gosdn/controller/northbound/server"
rbacImpl "code.fbi.h-da.de/danet/gosdn/controller/rbac"
"code.fbi.h-da.de/danet/gosdn/controller/store"
"code.fbi.h-da.de/danet/gosdn/controller/topology"
"code.fbi.h-da.de/danet/gosdn/controller/topology/nodes"
"code.fbi.h-da.de/danet/gosdn/controller/topology/ports"
routingtables "code.fbi.h-da.de/danet/gosdn/controller/topology/routing-tables"
eventInterfaces "code.fbi.h-da.de/danet/gosdn/controller/interfaces/event"
"code.fbi.h-da.de/danet/gosdn/controller/nucleus"
)
var coreLock sync.RWMutex
var coreOnce sync.Once
// Core is the representation of the controller's core.
type Core struct {
pndStore networkdomain.PndStore
pndService networkdomain.Service
mneService networkelement.Service
changeStore store.ChangeStore
userService rbac.UserService
roleService rbac.RoleService
topologyService topology.Service
nodeService nodes.Service
portService ports.Service
routeService routingtables.Service
pluginService plugin.Service
httpServer *http.Server
grpcServer *grpc.Server
nbi *nbi.NorthboundInterface
eventService eventInterfaces.Service
appService app.ManagementService
networkElementWatcher *nucleus.NetworkElementWatcher
stopChan chan os.Signal
csbiClient cpb.CsbiServiceClient
pluginRegistryClient rpb.PluginRegistryServiceClient
}
var c *Core
// initialize does start-up housekeeping like reading controller config files.
func initialize() error {
if err := config.InitializeConfig(); err != nil {
return err
}
if err := config.ReadGnmiSubscriptionPaths(); err != nil {
log.Error("Error reading in gNMI subscription paths, can not watch network elements automatically: ", err)
}
eventService, err := eventservice.NewEventService()
if err != nil {
return err
}
nodeService := nodes.NewNodeService(nodes.NewDatabaseNodeStore(), eventService)
portService := ports.NewPortService(ports.NewDatabasePortStore(), eventService)
routeService := routingtables.NewRoutingTableService(
routingtables.NewDatabaseRoutingTableStore(),
nodeService,
portService,
eventService,
)
pluginRegistryClient := setupPluginRegistryClient()
pluginService := nucleus.NewPluginService(nucleus.NewPluginStore(), eventService, nucleus.NewPluginThroughReattachConfig, pluginRegistryClient)
pndStore := nucleus.NewPndStore(pluginService)
changeStore := store.NewChangeStore()
c = &Core{
pndStore: pndStore,
pndService: nucleus.NewPndService(pndStore),
mneService: nucleus.NewNetworkElementService(nucleus.NewNetworkElementStore(), pluginService, eventService),
changeStore: *changeStore,
userService: rbacImpl.NewUserService(rbacImpl.NewUserStore(), eventService),
roleService: rbacImpl.NewRoleService(rbacImpl.NewRoleStore(), eventService),
topologyService: topology.NewTopologyService(
topology.NewDatabaseTopologyStore(),
nodeService,
portService,
eventService,
),
nodeService: nodeService,
portService: portService,
routeService: routeService,
eventService: eventService,
pluginService: pluginService,
appService: app.NewAppService(app.NewAppStore()),
stopChan: make(chan os.Signal, 1),
pluginRegistryClient: pluginRegistryClient,
}
// Setting up signal capturing
signal.Notify(c.stopChan, os.Interrupt, syscall.SIGTERM)
setupOrchestratorClient()
if err := createPrincipalNetworkDomain(); err != nil {
return err
}
c.networkElementWatcher = nucleus.NewNetworkElementWatcher(c.mneService, c.eventService)
c.networkElementWatcher.SubscribeToNetworkElements(config.GetGnmiSubscriptionPaths(), nil)
if err := ensureDefaultRoleExists(); err != nil {
return err
}
if err := ensureDefaultUserExists(); err != nil {
return err
}
if err := startGrpc(); err != nil {
return err
}
coreLock.Lock()
startHttpServer()
coreLock.Unlock()
return nil
}
func setupOrchestratorClient() {
orchestrator := viper.GetString("csbi-orchestrator")
conn, err := grpc.Dial(orchestrator, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatal(err)
}
c.csbiClient = cpb.NewCsbiServiceClient(conn)
}
func setupPluginRegistryClient() rpb.PluginRegistryServiceClient {
pluginRegistry := viper.GetString("plugin-registry")
conn, err := grpc.Dial(pluginRegistry, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatal(err)
}
return rpb.NewPluginRegistryServiceClient(conn)
}
func startGrpc() error {
socket := viper.GetString("socket")
lislisten, err := net.Listen("tcp", socket)
if err != nil {
return err
}
log.Infof("listening to %v", lislisten.Addr())
jwtManager := rbacImpl.NewJWTManager(config.JWTSecret, config.JWTDuration)
setupGRPCServerWithCorrectSecurityLevel(jwtManager, c.userService, c.roleService)
c.nbi = nbi.NewNBI(
c.pndStore,
c.pndService,
c.mneService,
c.changeStore,
c.userService,
c.roleService,
*jwtManager,
c.topologyService,
c.nodeService,
c.portService,
c.routeService,
c.appService,
c.pluginService,
c.pluginRegistryClient,
c.csbiClient,
callback,
c.networkElementWatcher,
)
ppb.RegisterPndServiceServer(c.grpcServer, c.nbi.Pnd)
cpb.RegisterCsbiServiceServer(c.grpcServer, c.nbi.Csbi)
apb.RegisterAuthServiceServer(c.grpcServer, c.nbi.Auth)
apb.RegisterUserServiceServer(c.grpcServer, c.nbi.User)
apb.RegisterRoleServiceServer(c.grpcServer, c.nbi.Role)
apppb.RegisterAppServiceServer(c.grpcServer, c.nbi.App)
tpb.RegisterTopologyServiceServer(c.grpcServer, c.nbi.Topology)
mnepb.RegisterNetworkElementServiceServer(c.grpcServer, c.nbi.NetworkElement)
tpb.RegisterRoutingTableServiceServer(c.grpcServer, c.nbi.Routes)
pipb.RegisterPluginInternalServiceServer(c.grpcServer, c.nbi.Plugin)
cmpb.RegisterConfigurationManagementServiceServer(c.grpcServer, c.nbi.ConfigurationManagement)
go func() {
if err := c.grpcServer.Serve(lislisten); err != nil {
log.Fatal(err)
}
}()
return nil
}
// createPrincipalNetworkDomain initializes the controller with an initial PND.
func createPrincipalNetworkDomain() error {
basePnd, err := c.pndService.Get(store.Query{ID: config.BasePndUUID})
if err != nil {
log.Info(err)
}
if basePnd == nil {
pnd := nucleus.NewPND(
config.BasePndUUID,
"base",
"gosdn base pnd",
)
err = c.pndStore.Add(pnd)
if err != nil {
return err
}
return nil
}
return nil
}
func ensureDefaultRoleExists() error {
defaultAdminRoleName := "admin"
adminRole, err := c.roleService.Get(store.Query{ID: uuid.Nil, Name: defaultAdminRoleName})
if err != nil {
log.Info(err)
}
if adminRole == nil {
err := c.roleService.Add(rbacImpl.NewRole(uuid.New(), defaultAdminRoleName, "admin role", []string{
"/gosdn.pnd.PndService/GetPnd",
"/gosdn.pnd.PndService/GetPndList",
"/gosdn.pnd.PndService/CreatePndList",
"/gosdn.pnd.PndService/DeletePnd",
"/gosdn.rbac.UserService/CreateUsers",
"/gosdn.rbac.UserService/GetUser",
"/gosdn.rbac.UserService/GetUsers",
"/gosdn.rbac.UserService/UpdateUsers",
"/gosdn.rbac.UserService/DeleteUsers",
"/gosdn.rbac.RoleService/CreateRoles",
"/gosdn.rbac.RoleService/GetRole",
"/gosdn.rbac.RoleService/GetRoles",
"/gosdn.rbac.RoleService/UpdateRoles",
"/gosdn.rbac.RoleService/DeletePermissionsForRole",
"/gosdn.rbac.RoleService/DeleteRoles",
"/gosdn.networkelement.NetworkElementService/GetMne",
"/gosdn.networkelement.NetworkElementService/GetFlattenedMne",
"/gosdn.networkelement.NetworkElementService/GetFlattenedMneList",
"/gosdn.networkelement.NetworkElementService/GetPath",
"/gosdn.networkelement.NetworkElementService/GetIntendedPath",
"/gosdn.networkelement.NetworkElementService/GetChange",
"/gosdn.networkelement.NetworkElementService/GetChangeList",
"/gosdn.networkelement.NetworkElementService/SetMneList",
"/gosdn.networkelement.NetworkElementService/SetChangeList",
"/gosdn.networkelement.NetworkElementService/SetPathList",
"/gosdn.networkelement.NetworkElementService/DeviceSchema",
"/gosdn.networkelement.NetworkElementService/DeleteMne",
"/gosdn.networkelement.NetworkElementService/SubscribePath",
"/gosdn.plugin_internal.PluginInternalService/AvailablePlugins",
"/gosdn.plugin_internal.PluginInternalService/GetPluginSchema",
"/gosdn.app.AppService/Register",
"/gosdn.app.AppService/Deregister",
"/gosdn.configurationmanagement.ConfigurationManagementService/ExportSDNConfig",
"/gosdn.configurationmanagement.ConfigurationManagementService/ImportSDNConfig",
"/gosdn.topology.RoutingTableService/AddRoutingTable",
"/gosdn.topology.RoutingTableService/GetRoutes",
"/gosdn.topology.RoutingTableService/DeleteRoute",
"/gosdn.topology.TopologyService/AddLink",
"/gosdn.topology.TopologyService/GetTopology",
"/gosdn.topology.TopologyService/UpdateLink",
"/gosdn.topology.TopologyService/DeleteLink",
}))
if err != nil {
return err
}
}
return nil
}
func ensureDefaultUserExists() error {
defaultUserName := "admin"
adminUser, err := c.userService.Get(store.Query{ID: uuid.Nil, Name: defaultUserName})
if err != nil {
log.Info(err)
}
if adminUser == nil {
// Getting the password from the environment variable which is set in gosdn.clab.yaml.
var preDefinedPassword = os.Getenv("GOSDN_ADMIN_PASSWORD")
var usedPassword string
// If environment variable is set and password is not 0, the password from the environment variable will be used.
if len(preDefinedPassword) == 0 {
// Generate a password that is 16 characters long with 3 digits, 0 symbols,
// allowing upper and lower case letters, disallowing repeat characters.
generatedPassword, err := password.Generate(16, 3, 0, true, false)
if err != nil {
log.Fatal(err)
}
usedPassword = generatedPassword
} else {
usedPassword = preDefinedPassword
}
salt, err := password.Generate(16, 3, 0, true, false)
if err != nil {
log.Fatal(err)
}
hashedPassword := base64.RawStdEncoding.EncodeToString(argon2.IDKey([]byte(usedPassword), []byte(salt), 1, 64*1024, 4, 32))
err = c.userService.Add(rbacImpl.NewUser(uuid.New(), defaultUserName, map[string]string{config.BasePndUUID.String(): "admin"}, string(hashedPassword), "", salt, conflict.Metadata{ResourceVersion: 0}))
if err != nil {
return err
}
fmt.Printf("########\n Generated admin password: %s\n########\n", usedPassword)
}
return nil
}
// Run calls initialize to start the controller.
func Run(ctx context.Context) error {
var initError error
coreOnce.Do(func() {
initError = initialize()
})
if initError != nil {
log.WithFields(log.Fields{}).Error(initError)
return initError
}
log.WithFields(log.Fields{}).Info("initialisation finished")
select {
case <-c.stopChan:
return shutdown()
case <-ctx.Done():
return shutdown()
}
}
func shutdown() error {
log.Info("shutting down controller")
coreLock.Lock()
defer func() {
plugins, err := c.pluginService.GetAll()
if err != nil {
log.Error(err)
}
for _, plugin := range plugins {
log.Info("Defer: ", plugin.Manifest().Name)
plugin.Close()
log.Info("Defer - exited: ", plugin.GetClient().Exited())
}
coreLock.Unlock()
}()
c.grpcServer.GracefulStop()
c.eventService.CloseConnection()
return stopHttpServer()
}
func callback(id uuid.UUID, ch chan networkelement.Details) {
if ch != nil {
c.pndStore.AddPendingChannel(id, ch)
log.Infof("pending channel %v added", id)
} else {
c.pndStore.RemovePendingChannel(id)
log.Infof("pending channel %v removed", id)
}
}
// setupGRPCServerWithCorrectSecurityLevel sets up a gRPC server with desired security level
//
// Only two options for now: insecure or secure, add 'else if' if required.
// Secure is the recommended mode and is set as default.
// Insecure starts the controller without the gRPC interceptor which is supposed to handle authz.
// This allows users to operate on the controller without any authentication/authorization,
// but they could still login if they want to.
// Use insecure only for testing purposes and with caution.
func setupGRPCServerWithCorrectSecurityLevel(jwt *rbacImpl.JWTManager, userService rbac.UserService, roleService rbac.RoleService) {
securityLevel := viper.GetString("security")
if securityLevel == "insecure" {
c.grpcServer = grpc.NewServer()
log.Info("set up grpc server in insecure mode")
} else {
interceptor := server.NewAuthInterceptor(jwt, userService, roleService)
c.grpcServer = grpc.NewServer(grpc.UnaryInterceptor(interceptor.Unary()), grpc.StreamInterceptor(interceptor.Stream()))
log.Info("set up grpc server in secure mode")
}
}