Newer
Older
goErrors "errors"
"fmt"
"path/filepath"
"code.fbi.h-da.de/danet/gosdn/controller/metrics"
"code.fbi.h-da.de/danet/gosdn/controller/nucleus/types"
"code.fbi.h-da.de/danet/gosdn/controller/nucleus/util"
"go.mongodb.org/mongo-driver/bson"
"golang.org/x/sync/errgroup"
cpb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/csbi"
ppb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/pnd"
spb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/southbound"
tpb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/transport"
Fabian Seidl
committed
eventservice "code.fbi.h-da.de/danet/gosdn/controller/eventService"
"google.golang.org/grpc"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/change"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/device"
Fabian Seidl
committed
eventInterfaces "code.fbi.h-da.de/danet/gosdn/controller/interfaces/event"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/networkdomain"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/southbound"
"code.fbi.h-da.de/danet/gosdn/controller/nucleus/errors"
Malte Bauch
committed
gGnmi "code.fbi.h-da.de/danet/gosdn/controller/nucleus/util/gnmi"
"code.fbi.h-da.de/danet/gosdn/controller/store"
"code.fbi.h-da.de/danet/gosdn/forks/goarista/gnmi"
gpb "github.com/openconfig/gnmi/proto/gnmi"
"github.com/openconfig/ygot/ygot"
"github.com/openconfig/ygot/ytypes"
"github.com/spf13/viper"
// NOTE: Until we've added database support for changes, we will hold
// changeStores in memory for now.
var changeStoreMap = make(map[uuid.UUID]*store.ChangeStore)
// NewPND creates a Principle Network Domain
func NewPND(
name string,
description string,
id uuid.UUID,
c cpb.CsbiServiceClient,
callback func(uuid.UUID, chan device.Details),
) (networkdomain.NetworkDomain, error) {
Fabian Seidl
committed
eventService, err := eventservice.NewEventService()
if err != nil {
return nil, err
}
sbiStore := NewSbiStore(id)
deviceStore := NewDeviceStore(id)
Fabian Seidl
committed
sbiService := NewSbiService(sbiStore, eventService)
deviceService := NewDeviceService(
deviceStore,
sbiService,
Fabian Seidl
committed
eventService,
changeStore, ok := changeStoreMap[id]
if !ok {
changeStore = store.NewChangeStore()
changeStoreMap[id] = changeStore
}
Name: name,
Description: description,
southboundService: sbiService,
deviceService: deviceService,
changes: changeStore,
Id: id,
Fabian Seidl
committed
csbiClient: c,
callback: callback,
eventService: eventService,
existingSBIs, err := sbiStore.GetAll()
if err != nil {
if len(existingSBIs) == 0 {
newSBI, _ := NewSBI(spb.Type_TYPE_OPENCONFIG)
err = pnd.southboundService.Add(newSBI)
if err != nil {
return nil, err
}
type pndImplementation struct {
Name string `json:"name,omitempty"`
Description string `json:"description,omitempty"`
southboundService southbound.Service
deviceService device.Service
changes *store.ChangeStore
//nolint
Id uuid.UUID `json:"id,omitempty"`
Fabian Seidl
committed
csbiClient cpb.CsbiServiceClient
callback func(uuid.UUID, chan device.Details)
eventService eventInterfaces.Service
func (pnd *pndImplementation) PendingChanges() []uuid.UUID {
func (pnd *pndImplementation) CommittedChanges() []uuid.UUID {
func (pnd *pndImplementation) ConfirmedChanges() []uuid.UUID {
return pnd.changes.Confirmed()
}
func (pnd *pndImplementation) GetChange(cuid uuid.UUID) (change.Change, error) {
return pnd.changes.GetChange(cuid)
func (pnd *pndImplementation) Commit(u uuid.UUID) error {
if err != nil {
return err
}
func (pnd *pndImplementation) Confirm(u uuid.UUID) error {
if err != nil {
return err
}
func (pnd *pndImplementation) ID() uuid.UUID {
return pnd.Id
func (pnd *pndImplementation) Devices() []device.Device {
allDevices, _ := pnd.deviceService.GetAll()
return allDevices
// GetName returns the name of the PND
return pnd.Name
// GetDescription returns the current description of the PND
func (pnd *pndImplementation) GetDescription() string {
return pnd.Description
// GetSBIs returns the registered SBIs
func (pnd *pndImplementation) GetSBIs() ([]southbound.SouthboundInterface, error) {
sbis, err := pnd.southboundService.GetAll()
if err != nil {
return nil, err
}
return sbis, nil
}
// GetSBIs returns the registered SBIs
func (pnd *pndImplementation) GetSBI(sbiUUID uuid.UUID) (southbound.SouthboundInterface, error) {
sbis, err := pnd.southboundService.Get(store.Query{ID: sbiUUID})
if err != nil {
return nil, err
}
return sbis, nil
// Destroy destroys the PND
func (pnd *pndImplementation) Destroy() error {
return destroy()
}
// AddSbi adds a SBI to the PND which will be supported
func (pnd *pndImplementation) AddSbi(s southbound.SouthboundInterface) error {
// RemoveSbi removes a SBI from the PND
// devices and remove the devices using this SBI
func (pnd *pndImplementation) RemoveSbi(sid uuid.UUID) error {
var associatedDevices []device.Device
allExistingDevices, err := pnd.deviceService.GetAll()
if err != nil {
return err
}
// range over all storable items within the device store
for _, device := range allExistingDevices {
// check if the device uses the provided SBI and add it to the devices
// slice.
if device.SBI().ID() == sid {
associatedDevices = append(associatedDevices, device)
}
}
// range over associated devices and remove each one of them
for _, d := range associatedDevices {
if err := pnd.removeDevice(d.ID()); err != nil {
return err
}
}
return pnd.removeSbi(sid)
// AddDevice adds a new device to the PND
func (pnd *pndImplementation) AddDevice(name string, opt *tpb.TransportOption, sid uuid.UUID) (uuid.UUID, error) {
labels := prometheus.Labels{"type": opt.Type.String()}
start := metrics.StartHook(labels, deviceCreationsTotal)
defer metrics.FinishHook(labels, start, deviceCreationDurationSecondsTotal, deviceCreationDurationSeconds)
var sbi southbound.SouthboundInterface
return uuid.Nil, err
var err error
sbi, err = pnd.southboundService.Get(store.Query{ID: sid})
return uuid.Nil, err
Martin Stiemerling
committed
d, err := NewDevice(name, uuid.Nil, opt, sbi)
return uuid.Nil, err
func (pnd *pndImplementation) GetDevice(identifier string) (device.Device, error) {
id, err := uuid.Parse(identifier)
if err != nil {
id = uuid.Nil
d, err := pnd.deviceService.Get(store.Query{
ID: id,
Name: identifier,
})
if d == nil {
return nil, fmt.Errorf("no device found")
return d, nil
// RemoveDevice removes a device from the PND
func (pnd *pndImplementation) RemoveDevice(uuid uuid.UUID) error {
// UpdateDeviceModel updates a device from the PND
func (pnd *pndImplementation) UpdateDevice(device device.Device, modelAsString string) error {
err := pnd.deviceService.UpdateModel(device, modelAsString)
if err != nil {
return err
}
err = pnd.ensureIntendedConfigurationIsAppliedOnDevice(device.ID())
if err != nil {
return err
}
return err
}
// Actual implementation, bind to struct if neccessary
func destroy() error {
return nil
}
// addSbi adds a SBI to the PND's SBI store.
func (pnd *pndImplementation) addSbi(sbi southbound.SouthboundInterface) error {
return pnd.southboundService.Add(sbi)
// removeSbi removes an SBI based on the given ID from the PND's SBI store.
func (pnd *pndImplementation) removeSbi(id uuid.UUID) error {
sbi, err := pnd.southboundService.Get(store.Query{ID: id})
if sbi == nil {
return fmt.Errorf("no sbi found")
}
if err != nil {
return err
}
return pnd.southboundService.Delete(sbi)
// addDevice adds a device to the PND's device store.
func (pnd *pndImplementation) addDevice(device device.Device) (uuid.UUID, error) {
err := pnd.deviceService.Add(device)
return uuid.Nil, err
if device.IsTransportValid() {
_, err = pnd.Request(device.ID(), "/interfaces")
if err != nil {
return uuid.Nil, err
}
}
return device.ID(), nil
func (pnd *pndImplementation) removeDevice(id uuid.UUID) error {
d, err := pnd.deviceService.Get(store.Query{
ID: id,
Name: id.String(),
})
if d == nil {
return fmt.Errorf("no device found")
}
labels := prometheus.Labels{"type": d.SBI().Type().String()}
start := metrics.StartHook(labels, deviceDeletionsTotal)
defer metrics.FinishHook(labels, start, deviceDeletionDurationSecondsTotal, deviceDeletionDurationSeconds)
Shrey Garg
committed
return pnd.handleCsbiDeletion(d)
func (pnd *pndImplementation) MarshalDevice(identifier string) (string, error) {
foundDevice, err := pnd.deviceService.Get(store.Query{
ID: uuid.MustParse(identifier),
Name: identifier,
})
if err != nil {
return "", err
}
jsonTree, err := json.MarshalIndent(foundDevice.GetModel(), "", "\t")
if err != nil {
return "", err
}
"pnd": pnd.Id,
"Identifier": identifier,
"Name": foundDevice.Name,
return string(jsonTree), nil
// Request sends a get request to a specific device
func (pnd *pndImplementation) Request(uuid uuid.UUID, path string) (proto.Message, error) {
d, err := pnd.deviceService.Get(store.Query{
ID: uuid,
Name: uuid.String(),
})
if d == nil {
return nil, fmt.Errorf("no device found")
}
resp, ok := res.(proto.Message)
if !ok {
return nil, &errors.ErrInvalidTypeAssertion{
Value: res,
Type: (*proto.Message)(nil),
}
}
err = d.ProcessResponse(resp)
// RequestAll sends a request for all registered devices
func (pnd *pndImplementation) RequestAll(path string) error {
allDevices, err := pnd.deviceService.GetAll()
if err != nil {
return err
}
for _, k := range allDevices {
_, err := pnd.Request(k.ID(), path)
"pnd": pnd.Id,
"path": path,
}).Info("sent request to all devices")
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
func (pnd *pndImplementation) ensureIntendedConfigurationIsAppliedOnDevice(deviceID uuid.UUID) error {
device, err := pnd.deviceService.Get(store.Query{
ID: deviceID,
})
if err != nil {
return err
}
model, err := device.GetModelAsString()
if err != nil {
return err
}
req := &gpb.SetRequest{}
path, err := ygot.StringToStructuredPath("/")
if err != nil {
return err
}
req.Update = []*gpb.Update{{
Path: path,
Val: &gpb.TypedValue{
Value: &gpb.TypedValue_JsonIetfVal{JsonIetfVal: []byte(model)},
},
}}
response, err := device.Transport().CustomSet(context.Background(), req)
if err != nil {
log.Errorf("Failed to apply model of device err=%+v, response=%+v", err, response)
return err
}
return nil
}
Malte Bauch
committed
//nolint:gocyclo
// ChangeOND creates a change from the provided Operation, path and value.
// The Change is Pending and times out after the specified timeout period
func (pnd *pndImplementation) ChangeOND(duid uuid.UUID, operation ppb.ApiOperation, path string, value ...string) (uuid.UUID, error) {
Malte Bauch
committed
//TODO: check if we can get cyclomatic complexity from 16 to at least 15
d, err := pnd.deviceService.Get(store.Query{
ID: duid,
})
if err != nil {
return uuid.Nil, err
}
Malte Bauch
committed
validatedCpy, err := d.CreateModelCopy()
p, err := ygot.StringToStructuredPath(path)
if err != nil {
if operation != ppb.ApiOperation_API_OPERATION_DELETE && len(value) != 1 {
return uuid.Nil, &errors.ErrInvalidParameters{
case ppb.ApiOperation_API_OPERATION_UPDATE, ppb.ApiOperation_API_OPERATION_REPLACE:
Malte Bauch
committed
_, entry, err := ytypes.GetOrCreateNode(d.SBI().Schema().RootSchema(), validatedCpy, p)
if err != nil {
Malte Bauch
committed
if entry.IsDir() {
opts := []ytypes.UnmarshalOpt{&ytypes.IgnoreExtraFields{}}
if err := d.SBI().Unmarshal([]byte(value[0]), p, validatedCpy, opts...); err != nil {
return uuid.Nil, err
}
} else if entry.IsLeaf() {
typedValue, err := gGnmi.ConvertStringToGnmiTypedValue(value[0], entry.Type)
if err != nil {
return uuid.Nil, err
}
opts := []ytypes.SetNodeOpt{&ytypes.InitMissingElements{}, &ytypes.TolerateJSONInconsistencies{}}
if err := ytypes.SetNode(d.SBI().Schema().RootSchema(), validatedCpy, p, typedValue, opts...); err != nil {
Malte Bauch
committed
return uuid.Nil, err
}
}
Malte Bauch
committed
if err := ytypes.DeleteNode(d.SBI().Schema().RootSchema(), validatedCpy, p); err != nil {
return uuid.Nil, &errors.ErrOperationNotSupported{Op: operation}
Malte Bauch
committed
ygot.PruneEmptyBranches(validatedCpy)
callback := func(original ygot.GoStruct, modified ygot.GoStruct) error {
ctx := context.WithValue(context.Background(), types.CtxKeyOperation, operation) // nolint
payload := change.Payload{Original: original, Modified: modified}
pathToSet := path
schema := d.SBI().Schema()
return d.Transport().Set(ctx, payload, pathToSet, schema)
Malte Bauch
committed
ch := NewChange(duid, d.GetModel(), validatedCpy, callback)
if err := pnd.changes.Add(ch); err != nil {
return uuid.Nil, err
}
Malte Bauch
committed
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
func (pnd *pndImplementation) SubscribePath(uuid uuid.UUID, subList *ppb.SubscriptionList) error {
d, err := pnd.deviceService.Get(store.Query{
ID: uuid,
})
if err != nil {
return err
}
mode, err := mapModeToAristaFork(subList.GetMode())
if err != nil {
return err
}
for _, sub := range subList.Subscription {
streamMode, err := mapStreamModeToAristaFork(sub.GetStreamMode())
if err != nil {
return err
}
opts := &gnmi.SubscribeOptions{
Mode: mode,
StreamMode: streamMode,
Paths: [][]string{splitStringPath(sub.GetPath())},
SampleInterval: sub.SampleInterval,
}
ctx := context.Background()
ctx = context.WithValue(ctx, types.CtxKeyOpts, opts)
if err = d.Transport().Subscribe(ctx); err != nil {
return err
}
}
return nil
}
func splitStringPath(s string) []string {
return strings.Split(s, "/")
}
func mapStreamModeToAristaFork(mode ppb.StreamMode) (string, error) {
switch mode {
case ppb.StreamMode_STREAM_MODE_TARGET_DEFINED:
return "target_defined", nil
case ppb.StreamMode_STREAM_MODE_ON_CHANGE:
return "on_change", nil
case ppb.StreamMode_STREAM_MODE_SAMPLE:
return "sample", nil
default:
return "", fmt.Errorf("StreamMode of type: %T is not supported", mode)
}
}
func mapModeToAristaFork(mode ppb.SubscriptionMode) (string, error) {
switch mode {
case ppb.SubscriptionMode_SUBSCRIPTION_MODE_STREAM:
return "stream", nil
case ppb.SubscriptionMode_SUBSCRIPTION_MODE_ONCE:
return "once", nil
case ppb.SubscriptionMode_SUBSCRIPTION_MODE_POLL:
return "poll", nil
default:
return "", fmt.Errorf("SubscriptionMode of type: %T is not supported", mode)
}
}
Malte Bauch
committed
//nolint
// handleRollbackError will be implemented in the near future
func handleRollbackError(id uuid.UUID, err error) {
log.Error(err)
// TODO: Notion of invalid state needed.
}
Shrey Garg
committed
func (pnd *pndImplementation) handleCsbiDeletion(d device.Device) error {
log.Infof("csbi deletion triggered for %v", d.ID().String())
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
req := &cpb.DeleteRequest{
Timestamp: time.Now().UnixNano(),
Shrey Garg
committed
Did: []string{d.ID().String()},
}
resp, err := pnd.csbiClient.Delete(ctx, req)
if err != nil {
return err
}
Shrey Garg
committed
err = pnd.southboundService.Delete(d.SBI())
if err != nil {
return err
}
Shrey Garg
committed
"uuid": d.ID().String(),
"status": resp.Status,
}).Info("csbi deleted")
return nil
}
func (pnd *pndImplementation) handleCsbiEnrolment(name string, opt *tpb.TransportOption) (uuid.UUID, error) {
g := new(errgroup.Group)
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5)
defer cancel()
req := &cpb.CreateRequest{
Timestamp: time.Now().UnixNano(),
TransportOption: []*tpb.TransportOption{opt},
}
resp, err := pnd.csbiClient.Create(ctx, req)
if err != nil {
return uuid.Nil, err
// the slice only contains one deployment
var devID uuid.UUID
dCopy := d
g.Go(func() error {
devID, err = pnd.createCsbiDevice(ctx, name, dCopy, opt)
if err != nil {
return err
}
return nil
})
}
err = g.Wait()
if err != nil {
return uuid.Nil, err
return devID, nil
// createCsbiDevice is a helper method for cSBI device creation. The method
// waits for a SYN (which indicates that the cSBI is running and addressable)
// of the commissioned cSBI and creates the device within the controller.
func (pnd *pndImplementation) createCsbiDevice(
ctx context.Context,
name string,
d *cpb.Deployment,
opt *tpb.TransportOption,
) (uuid.UUID, error) {
id, err := uuid.Parse(d.Id)
if err != nil {
return uuid.Nil, err
ch := make(chan device.Details, 1)
defer pnd.callback(id, nil)
defer close(ch)
defer tickatus.Stop()
select {
case <-tickatus.C:
log.WithFields(log.Fields{
"id": d.Id,
"err": ctx.Err(),
}).Error("csbi handshake timed out")
case deviceDetails := <-ch:
log.Infof("syn from csbi %v", deviceDetails.ID)
id, err := uuid.Parse(deviceDetails.ID)
if err != nil {
return uuid.Nil, err
csbiTransportOptions := &tpb.TransportOption{
Address: deviceDetails.Address,
Username: opt.Username,
Password: opt.Password,
Tls: opt.Tls,
Type: opt.Type,
TransportOption: opt.TransportOption,
}
log.WithField("transport option", csbiTransportOptions).Debug("gosdn gnmi transport options")
files := []string{util.GoStructName, util.ManifestFileName, util.GoStructAdditionsName}
csbiID := uuid.New()
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*10)
defer cancel()
g := new(errgroup.Group)
for _, f := range files {
req := &cpb.GetPayloadRequest{
Timestamp: time.Now().UnixNano(),
Did: d.Id,
File: f,
}
g.Go(func() error {
gClient, err := pnd.csbiClient.GetFile(ctx, req)
if err != nil {
return err
}
err = saveStreamToFile(gClient, req.GetFile(), csbiID)
if err != nil {
return err
}
return nil
})
}
err = g.Wait()
if err != nil {
return uuid.Nil, err
}
csbi, err := NewSBI(spb.Type_TYPE_CONTAINERISED, csbiID)
if err != nil {
return uuid.Nil, err
}
err = pnd.southboundService.Add(csbi)
if err != nil {
return uuid.Nil, err
}
d, err := NewDevice(name, uuid.Nil, csbiTransportOptions, csbi)
if err != nil {
return uuid.Nil, err
}
d.(*CsbiDevice).UUID = id
ch <- device.Details{TransportOption: opt}
if err := pnd.deviceService.Add(d); err != nil {
return uuid.Nil, err
return uuid.Nil, nil
// requestPlugin is a feature for cSBIs and sends a plugin request to the cSBI
// orchestrator and processes the received ygot generated go code, builds the
// plugin and integrates the Plugin within the goSDN as SouthboundInterface.
// The generated code is passed into a gostructs.go file, which is the
// foundation for the created plugin.
func (pnd *pndImplementation) requestPlugin(name string, opt *tpb.TransportOption) (southbound.SouthboundInterface, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*10)
defer cancel()
cReq := &cpb.CreateRequest{
Timestamp: time.Now().UnixNano(),
TransportOption: []*tpb.TransportOption{opt},
}
resp, err := pnd.csbiClient.CreateGoStruct(ctx, cReq)
files := []string{util.GoStructName, util.ManifestFileName, util.GoStructAdditionsName}
g := new(errgroup.Group)
// we only request one plugin
for _, dep := range resp.GetDeployments() {
id := uuid.New()
for _, f := range files {
req := &cpb.GetPayloadRequest{
Timestamp: time.Now().UnixNano(),
Did: dep.GetId(),
File: f,
}
g.Go(func() error {
gClient, err := pnd.csbiClient.GetFile(ctx, req)
if err != nil {
return err
}
err = saveStreamToFile(gClient, req.GetFile(), id)
if err != nil {
return err
}
return nil
})
}
if err != nil {
return nil, err
}
sbi, err := NewSBI(spb.Type_TYPE_PLUGIN, id)
if err != nil {
return nil, err
}
err = pnd.southboundService.Add(sbi)
if err != nil {
return nil, err
}
return sbi, nil
}
return nil, fmt.Errorf("requestPlugin: received deployment slice was empty")
}
// StreamClient allows to distinguish between the different ygot
// generated GoStruct clients, which return a stream of bytes.
type StreamClient interface {
Recv() (*cpb.Payload, error)
grpc.ClientStream
}
// saveStreamToFile takes a GenericGoStructClient and processes the included
// gRPC stream. A 'gostructs.go' file is created within the goSDN's
// 'plugin-folder'. Each 'gostructs.go' file is stored in its own folder based
// on a new uuid.UUID.
func saveStreamToFile[T StreamClient](sc T, filename string, id uuid.UUID) error {
folderName := viper.GetString("plugin-folder")
path := filepath.Join(folderName, id.String(), filename)
// clean path to prevent attackers to get access to to directories elsewhere on the system
path = filepath.Clean(path)
if !strings.HasPrefix(path, folderName) {
return &errors.ErrInvalidParameters{
Func: saveStreamToFile[T],
Param: path,
}
}
// create the directory hierarchy based on the path
if err := os.MkdirAll(filepath.Dir(path), 0770); err != nil {
}
// create the gostructs.go file at path
f, err := os.Create(path)
if err != nil {
}
defer func() {
if err := f.Close(); err != nil {
log.Error("error closing file: ", err)
}
}()
// receive byte stream
payload, err := sc.Recv()
if goErrors.Is(err, io.EOF) {
closeErr := sc.CloseSend()
return closeErr
closeErr := sc.CloseSend()
return closeErr
}
log.WithField("n", n).Trace("wrote bytes")
}
if err := f.Sync(); err != nil {
// MarshalBSON implements the MarshalBSON interface to store a device as BSON
func (pnd *pndImplementation) MarshalBSON() ([]byte, error) {
return bson.Marshal(&struct {
ID string `bson:"_id"`
Name string `bson:"name"`
Description string `bson:"description"`
}{
ID: pnd.Id.String(),
Name: pnd.Name,
Description: pnd.Description,
})
// UpdateONDAfterSubscribeResponse takes a device and forwards it to the device service to handle the update.
func (pnd *pndImplementation) UpdateDeviceAfterSubscribeResponse(device device.Device) error {
if err := pnd.deviceService.Update(device); err != nil {
return err
}
return nil
}