-
Manuel Kieweg authoredManuel Kieweg authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
controller.go 3.34 KiB
package gosdn
import (
"context"
"net"
"net/http"
"os"
"os/signal"
"sync"
"time"
"github.com/google/uuid"
log "github.com/sirupsen/logrus"
"github.com/spf13/viper"
"google.golang.org/grpc"
pb "code.fbi.h-da.de/cocsn/api/go/gosdn/core"
cpb "code.fbi.h-da.de/cocsn/api/go/gosdn/csbi"
ppb "code.fbi.h-da.de/cocsn/api/go/gosdn/pnd"
spb "code.fbi.h-da.de/cocsn/api/go/gosdn/southbound"
nbi "code.fbi.h-da.de/cocsn/gosdn/northbound/server"
"code.fbi.h-da.de/cocsn/gosdn/database"
"code.fbi.h-da.de/cocsn/gosdn/nucleus"
)
var coreLock sync.RWMutex
var coreOnce sync.Once
// Core is the representation of the controller's core
type Core struct {
// deprecated
database database.Database
pndc *nucleus.PndStore
httpServer *http.Server
grpcServer *grpc.Server
nbi *nbi.NorthboundInterface
stopChan chan os.Signal
csbiClient cpb.CsbiClient
}
var c *Core
func init() {
c = &Core{
database: database.Database{},
pndc: nucleus.NewPndStore(),
stopChan: make(chan os.Signal, 1),
}
// Setting up signal capturing
signal.Notify(c.stopChan, os.Interrupt)
}
// initialize does start-up housekeeping like reading controller config files
func initialize() error {
if err := startGrpc(); err != nil {
return err
}
coreLock.Lock()
startHttpServer()
coreLock.Unlock()
return createSouthboundInterfaces()
}
func startGrpc() error {
sock := viper.GetString("socket")
lis, err := net.Listen("tcp", sock)
if err != nil {
return err
}
c.grpcServer = grpc.NewServer()
c.nbi = nbi.NewNBI(c.pndc)
pb.RegisterCoreServer(c.grpcServer, c.nbi.Core)
ppb.RegisterPndServer(c.grpcServer, c.nbi.Pnd)
cpb.RegisterCsbiServer(c.grpcServer, c.nbi.Csbi)
go func() {
if err := c.grpcServer.Serve(lis); err != nil {
log.Fatal(err)
}
}()
orchestrator := viper.GetString("csbi-orchestrator")
conn, err := grpc.Dial(orchestrator, grpc.WithInsecure())
c.csbiClient = cpb.NewCsbiClient(conn)
return nil
}
// createSouthboundInterfaces initializes the controller with its supported SBIs
func createSouthboundInterfaces() error {
sbi := nucleus.NewSBI(spb.Type_OPENCONFIG)
return createPrincipalNetworkDomain(sbi)
}
// createPrincipalNetworkDomain initializes the controller with an initial PND
func createPrincipalNetworkDomain(s nucleus.SouthboundInterface) error {
pnd, err := nucleus.NewPND("base", "gosdn base pnd", uuid.New(), s, c.csbiClient, callback)
if err != nil {
return err
}
err = c.pndc.Add(pnd)
if err != nil {
return err
}
return nil
}
// Run calls initialize to start the controller
func Run(ctx context.Context) error {
var initError error
coreOnce.Do(func() {
initError = initialize()
})
if initError != nil {
log.WithFields(log.Fields{}).Error(initError)
return initError
}
log.WithFields(log.Fields{}).Info("initialisation finished")
for {
select {
case <-c.stopChan:
return shutdown()
case <-ctx.Done():
return shutdown()
case <-time.Tick(time.Minute):
log.Debug("up and running")
}
}
}
func shutdown() error {
log.Info("shutting down controller")
coreLock.Lock()
defer coreLock.Unlock()
c.grpcServer.GracefulStop()
return stopHttpServer()
}
func callback(id uuid.UUID, ch chan nucleus.DeviceDetails) {
if ch != nil {
c.pndc.AddPendingChannel(id, ch)
log.Infof("pending channel %v added", id)
} else {
c.pndc.RemovePendingChannel(id)
log.Infof("pending channel %v removed", id)
}
}