Newer
Older
Fabian Seidl
committed
"errors"
"fmt"
"path/filepath"
"code.fbi.h-da.de/danet/gosdn/controller/conflict"
"code.fbi.h-da.de/danet/gosdn/controller/metrics"
"code.fbi.h-da.de/danet/gosdn/controller/nucleus/types"
"code.fbi.h-da.de/danet/gosdn/controller/nucleus/util"
"go.mongodb.org/mongo-driver/bson"
cpb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/csbi"
rpb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/plugin-registry"
ppb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/pnd"
tpb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/transport"
Fabian Seidl
committed
eventservice "code.fbi.h-da.de/danet/gosdn/controller/eventService"
"google.golang.org/grpc"
Fabian Seidl
committed
"code.fbi.h-da.de/danet/gosdn/controller/customerrs"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/change"
Fabian Seidl
committed
eventInterfaces "code.fbi.h-da.de/danet/gosdn/controller/interfaces/event"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/networkdomain"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/networkelement"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/plugin"
"code.fbi.h-da.de/danet/gosdn/controller/store"
"code.fbi.h-da.de/danet/gosdn/forks/goarista/gnmi"
gpb "github.com/openconfig/gnmi/proto/gnmi"
"github.com/spf13/viper"
// NOTE: Until we've added database support for changes, we will hold
// changeStores in memory for now.
var changeStoreMap = make(map[uuid.UUID]*store.ChangeStore)
// NewPND creates a Principle Network Domain.
func NewPND(
name string,
description string,
id uuid.UUID,
c cpb.CsbiServiceClient,
p rpb.PluginRegistryServiceClient,
callback func(uuid.UUID, chan networkelement.Details),
) (networkdomain.NetworkDomain, error) {
Fabian Seidl
committed
eventService, err := eventservice.NewEventService()
if err != nil {
return nil, err
}
networkElementStore := NewNetworkElementStore(id)
networkElementService := NewNetworkElementService(
networkElementStore,
Fabian Seidl
committed
eventService,
changeStore, ok := changeStoreMap[id]
if !ok {
changeStore = store.NewChangeStore()
changeStoreMap[id] = changeStore
}
Name: name,
Description: description,
networkElementService: networkElementService,
changes: changeStore,
Id: id,
csbiClient: c,
pluginRegistryClient: p,
callback: callback,
eventService: eventService,
type pndImplementation struct {
Name string `json:"name,omitempty"`
Description string `json:"description,omitempty"`
Malte Bauch
committed
pluginService plugin.Service
networkElementService networkelement.Service
changes *store.ChangeStore
//nolint
Id uuid.UUID `json:"id,omitempty"`
csbiClient cpb.CsbiServiceClient
pluginRegistryClient rpb.PluginRegistryServiceClient
Malte Bauch
committed
callback func(uuid.UUID, chan networkelement.Details)
func (pnd *pndImplementation) PendingChanges() []uuid.UUID {
func (pnd *pndImplementation) CommittedChanges() []uuid.UUID {
func (pnd *pndImplementation) ConfirmedChanges() []uuid.UUID {
return pnd.changes.Confirmed()
}
func (pnd *pndImplementation) GetChange(cuid uuid.UUID) (change.Change, error) {
return pnd.changes.GetChange(cuid)
func (pnd *pndImplementation) Commit(u uuid.UUID) error {
if err != nil {
return err
}
if err := ch.Commit(); err != nil {
return err
}
// Set all the changes within the network elements model
networkElement, err := pnd.networkElementService.Get(store.Query{ID: ch.AssociatedDeviceID()})
if err != nil {
return err
}
Malte Bauch
committed
diff, err := networkElement.GetPlugin().Diff(ch.PreviousState(), ch.IntendedState())
if err != nil {
return err
}
for _, update := range diff.GetUpdate() {
Malte Bauch
committed
if err := networkElement.GetPlugin().SetNode(update.GetPath(), update.GetVal()); err != nil {
return err
}
}
for _, deletePath := range diff.GetDelete() {
Malte Bauch
committed
if err := networkElement.GetPlugin().DeleteNode(deletePath); err != nil {
return err
}
}
// update the network element
return pnd.networkElementService.Update(networkElement)
func (pnd *pndImplementation) Confirm(u uuid.UUID) error {
if err != nil {
return err
}
func (pnd *pndImplementation) ID() uuid.UUID {
return pnd.Id
func (pnd *pndImplementation) NetworkElements() []networkelement.NetworkElement {
allNetworkElements, _ := pnd.networkElementService.GetAll()
return allNetworkElements
func (pnd *pndImplementation) FlattenedNetworkElements() []networkelement.LoadedNetworkElement {
allNetworkElements, _ := pnd.networkElementService.GetAllAsLoaded()
Malte Bauch
committed
return allNetworkElements
Malte Bauch
committed
}
// GetName returns the name of the PND.
return pnd.Name
// GetDescription returns the current description of the PND.
func (pnd *pndImplementation) GetDescription() string {
return pnd.Description
// AddNetworkElement adds a new network element to the PND. The UUID for the networkElementID is optional and should normally be empty.
func (pnd *pndImplementation) AddNetworkElement(name string, opt *tpb.TransportOption, requestPluginFunc func(uuid.UUID) (plugin.Plugin, error), pluginID uuid.UUID, optionalNetworkElementID ...uuid.UUID) (uuid.UUID, error) {
networkElementID := uuid.Nil
if len(optionalNetworkElementID) > 0 {
networkElementID = optionalNetworkElementID[0]
}
if requestPluginFunc == nil {
requestPluginFunc = pnd.requestPlugin
}
start := metrics.StartHook(labels, networkElementCreationsTotal)
defer metrics.FinishHook(labels, start, networkElementCreationDurationSecondsTotal, networkElementCreationDurationSeconds)
// TODO: SouthboundService and SouthboundStorage have to be changed to a
// PluginService and PluginStore.
// NOTE: currently this is commented out; since for testing purposes a
// basic example plugin is returned.
//plugin, err := pnd.southboundService.Get(store.Query{ID: pluginID})
//if err != nil {
// switch err.(type) {
// case *customerrs.CouldNotFindError:
// plugin, err = pnd.requestPlugin()
// if err != nil {
// return uuid.Nil, err
// }
// default:
// return uuid.Nil, err
// }
//}
plugin, err := requestPluginFunc(pluginID)
if err != nil {
return uuid.Nil, err
}
Malte Bauch
committed
mne, err := NewNetworkElement(name, networkElementID, opt, plugin, conflict.Metadata{ResourceVersion: 0})
return uuid.Nil, err
return pnd.addNetworkElement(mne)
// TODO: (maba): This should be changed to UUID.
func (pnd *pndImplementation) GetNetworkElement(identifier string) (networkelement.NetworkElement, error) {
id, err := uuid.Parse(identifier)
if err != nil {
id = uuid.Nil
mne, err := pnd.networkElementService.Get(store.Query{
ID: id,
Name: identifier,
})
if mne == nil {
return nil, fmt.Errorf("no network element found")
return mne, nil
// RemoveNetworkElement removes a network element from the PND.
func (pnd *pndImplementation) RemoveNetworkElement(uuid uuid.UUID) error {
return pnd.removeNetworkElement(uuid)
// UpdateNetworkElement updates a network element from the PND.
func (pnd *pndImplementation) UpdateNetworkElement(networkElementID uuid.UUID, modelAsString string) error {
err := pnd.networkElementService.UpdateModel(networkElementID, modelAsString)
if err != nil {
return err
}
//TODO: check if it could be worth to provide the method with a network
//element instead of an ID.
err = pnd.ensureIntendedConfigurationIsAppliedOnNetworkElement(networkElementID)
if err != nil {
return err
}
return err
}
// addNetworkElement adds a network element to the PND's network element store.
func (pnd *pndImplementation) addNetworkElement(mne networkelement.NetworkElement) (uuid.UUID, error) {
err := pnd.networkElementService.Add(mne)
return uuid.Nil, err
if mne.IsTransportValid() {
_, err = pnd.Request(mne.ID(), "/")
if err != nil {
return uuid.Nil, err
}
}
return mne.ID(), nil
func (pnd *pndImplementation) removeNetworkElement(id uuid.UUID) error {
mne, err := pnd.networkElementService.Get(store.Query{
ID: id,
Name: id.String(),
})
if mne == nil {
return fmt.Errorf("no network element found")
}
Malte Bauch
committed
labels := prometheus.Labels{"type": mne.Transport().Type()}
start := metrics.StartHook(labels, networkElementDeletionsTotal)
defer metrics.FinishHook(labels, start, networkElementDeletionDurationSecondsTotal, networkElementDeletionDurationSeconds)
switch mne.(type) {
case *CsbiNetworkElement:
return pnd.handleCsbiDeletion(mne)
return pnd.networkElementService.Delete(mne)
func (pnd *pndImplementation) MarshalNetworkElement(identifier string) (string, error) {
foundNetworkElement, err := pnd.networkElementService.Get(store.Query{
ID: uuid.MustParse(identifier),
Name: identifier,
})
if err != nil {
return "", err
}
Malte Bauch
committed
model, err := foundNetworkElement.GetModel()
if err != nil {
return "", err
}
"pnd": pnd.Id,
"Identifier": identifier,
"Name": foundNetworkElement.Name,
}).Info("marshalled network element")
// Request sends a get request to a specific network element.
// TODO: this method needs some heavy refactoring, especially in regards to the
// UpdateModel call.
func (pnd *pndImplementation) Request(uuid uuid.UUID, path string) (proto.Message, error) {
mne, err := pnd.networkElementService.Get(store.Query{
ID: uuid,
Name: uuid.String(),
})
if mne == nil {
return nil, fmt.Errorf("no network element found")
}
res, err := mne.Transport().Get(ctx, path)
resp, ok := res.(proto.Message)
if !ok {
Fabian Seidl
committed
return nil, &customerrs.InvalidTypeAssertionError{
Value: res,
Type: (*proto.Message)(nil),
}
err = mne.ProcessResponse(resp)
modelAsString, err := mne.GetModelAsString()
if err != nil {
return nil, err
}
// TODO(path): We probably have to remove this when we address path request handling.
err = pnd.networkElementService.UpdateModel(uuid, modelAsString)
// RequestAll sends a request for all registered network elements.
func (pnd *pndImplementation) RequestAll(path string) error {
allNetworkElements, err := pnd.networkElementService.GetAllAsLoaded()
if err != nil {
return err
}
for _, mne := range allNetworkElements {
mneUUID, err := uuid.Parse(mne.ID)
if err != nil {
return err
}
_, err = pnd.Request(mneUUID, path)
// TODO: (maba): this is not returning any useful information; this should
// return some feedback if the requests were successful
"pnd": pnd.Id,
}).Info("sent request to all network elements")
func (pnd *pndImplementation) ensureIntendedConfigurationIsAppliedOnNetworkElement(mneID uuid.UUID) error {
mne, err := pnd.networkElementService.Get(store.Query{
ID: mneID,
})
if err != nil {
return err
}
model, err := mne.GetModelAsFilteredCopy()
if err != nil {
return err
}
modelAsString, err := ygot.EmitJSON(model, &ygot.EmitJSONConfig{
Format: ygot.RFC7951,
Indent: "",
SkipValidation: true,
RFC7951Config: &ygot.RFC7951JSONConfig{
AppendModuleName: true,
},
})
if err != nil {
return err
}
req := &gpb.SetRequest{}
path, err := ygot.StringToStructuredPath("/")
if err != nil {
return err
}
req.Update = []*gpb.Update{{
Path: path,
Val: &gpb.TypedValue{
Value: &gpb.TypedValue_JsonIetfVal{JsonIetfVal: []byte(modelAsString)},
},
}}
response, err := mne.Transport().CustomSet(context.Background(), req)
if err != nil {
log.Errorf("Failed to apply model of network element err=%+v, response=%+v", err, response)
return err
}
return nil
}
// ChangeMNE creates a change from the provided Operation, path and value.
// The Change is Pending and times out after the specified timeout period.
func (pnd *pndImplementation) ChangeMNE(duid uuid.UUID, operation ppb.ApiOperation, path string, value ...string) (uuid.UUID, error) {
mne, err := pnd.networkElementService.Get(store.Query{
ID: duid,
})
if err != nil {
return uuid.Nil, err
}
p, err := ygot.StringToStructuredPath(path)
if err != nil {
return uuid.Nil, err
}
plugin := mne.GetPlugin()
marshaledModel, err := plugin.ValidateChange(operation, p, []byte(value[0]))
if err != nil {
return uuid.Nil, err
}
Malte Bauch
committed
//TODO: provide a filtered mashaledModel
if operation != ppb.ApiOperation_API_OPERATION_DELETE && len(value) != 1 {
return uuid.Nil, &customerrs.InvalidParametersError{
Func: pnd.ChangeMNE,
Param: value,
}
}
callback := func(original, modified []byte) error {
ctx := context.WithValue(context.Background(), types.CtxKeyOperation, operation) // nolint
payload := change.Payload{Original: original, Modified: modified}
pathToSet := path
if err := mne.Transport().Set(ctx, payload, pathToSet, plugin); err != nil {
return err
}
return pnd.networkElementService.Update(mne)
}
currentModel, err := mne.GetModel()
if err != nil {
return uuid.Nil, err
}
ch := NewChange(duid, currentModel, marshaledModel, callback)
if err := pnd.changes.Add(ch); err != nil {
return uuid.Nil, err
}
return ch.cuid, nil
func (pnd *pndImplementation) SubscribePath(uuid uuid.UUID, subList *ppb.SubscriptionList) error {
mne, err := pnd.networkElementService.Get(store.Query{
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
ID: uuid,
})
if err != nil {
return err
}
mode, err := mapModeToAristaFork(subList.GetMode())
if err != nil {
return err
}
for _, sub := range subList.Subscription {
streamMode, err := mapStreamModeToAristaFork(sub.GetStreamMode())
if err != nil {
return err
}
opts := &gnmi.SubscribeOptions{
Mode: mode,
StreamMode: streamMode,
Paths: [][]string{splitStringPath(sub.GetPath())},
SampleInterval: sub.SampleInterval,
}
ctx := context.Background()
ctx = context.WithValue(ctx, types.CtxKeyOpts, opts)
if err = mne.Transport().Subscribe(ctx); err != nil {
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
return err
}
}
return nil
}
func splitStringPath(s string) []string {
return strings.Split(s, "/")
}
func mapStreamModeToAristaFork(mode ppb.StreamMode) (string, error) {
switch mode {
case ppb.StreamMode_STREAM_MODE_TARGET_DEFINED:
return "target_defined", nil
case ppb.StreamMode_STREAM_MODE_ON_CHANGE:
return "on_change", nil
case ppb.StreamMode_STREAM_MODE_SAMPLE:
return "sample", nil
default:
return "", fmt.Errorf("StreamMode of type: %T is not supported", mode)
}
}
func mapModeToAristaFork(mode ppb.SubscriptionMode) (string, error) {
switch mode {
case ppb.SubscriptionMode_SUBSCRIPTION_MODE_STREAM:
return "stream", nil
case ppb.SubscriptionMode_SUBSCRIPTION_MODE_ONCE:
return "once", nil
case ppb.SubscriptionMode_SUBSCRIPTION_MODE_POLL:
return "poll", nil
default:
return "", fmt.Errorf("SubscriptionMode of type: %T is not supported", mode)
}
}
Malte Bauch
committed
// handleRollbackError will be implemented in the near future
func handleRollbackError(id uuid.UUID, err error) {
log.Error(err)
// TODO: Notion of invalid state needed.
}
func (pnd *pndImplementation) handleCsbiDeletion(mne networkelement.NetworkElement) error {
log.Infof("csbi deletion triggered for %v", mne.ID().String())
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
req := &cpb.DeleteRequest{
Timestamp: time.Now().UnixNano(),
Did: []string{mne.ID().String()},
}
resp, err := pnd.csbiClient.Delete(ctx, req)
if err != nil {
return err
}
Malte Bauch
committed
//err = pnd.southboundService.Delete(mne.SBI())
//if err != nil {
// return err
//}
"uuid": mne.ID().String(),
"status": resp.Status,
}).Info("csbi deleted")
return nil
}
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
//TODO: can probably be removed
//func (pnd *pndImplementation) handleCsbiEnrolment(name string, opt *tpb.TransportOption) (uuid.UUID, error) {
// g := new(errgroup.Group)
// ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5)
// defer cancel()
// req := &cpb.CreateRequest{
// Timestamp: time.Now().UnixNano(),
// TransportOption: []*tpb.TransportOption{opt},
// }
// resp, err := pnd.csbiClient.Create(ctx, req)
// if err != nil {
// return uuid.Nil, err
// }
// // the slice only contains one deployment
// var mneID uuid.UUID
// for _, deployment := range resp.Deployments {
// dCopy := deployment
// g.Go(func() error {
// mneID, err = pnd.createCsbiNetworkElement(ctx, name, dCopy, opt)
// if err != nil {
// return err
// }
// return nil
// })
// }
// err = g.Wait()
// if err != nil {
// return uuid.Nil, err
// }
//
// return mneID, nil
//}
//TODO: can probably be removed
// createCsbiNetworkElement is a helper method for cSBI network element creation. The method
// waits for a SYN (which indicates that the cSBI is running and addressable)
// of the commissioned cSBI and creates the network element within the controller.
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
//func (pnd *pndImplementation) createCsbiNetworkElement(
// ctx context.Context,
// name string,
// deployment *cpb.Deployment,
// opt *tpb.TransportOption,
//) (uuid.UUID, error) {
// //id, err := uuid.Parse(deployment.Id)
// //if err != nil {
// // return uuid.Nil, err
// //}
// //ch := make(chan networkelement.Details, 1)
// //pnd.callback(id, ch)
// //defer pnd.callback(id, nil)
// //defer close(ch)
// //tickatus := time.NewTicker(time.Minute * 1)
// //defer tickatus.Stop()
// //select {
// //case <-tickatus.C:
// // log.WithFields(log.Fields{
// // "id": deployment.Id,
// // "err": ctx.Err(),
// // }).Error("csbi handshake timed out")
// //case mneDetails := <-ch:
// // log.Infof("syn from csbi %v", mneDetails.ID)
// // id, err := uuid.Parse(mneDetails.ID)
// // if err != nil {
// // return uuid.Nil, err
// // }
// // csbiTransportOptions := &tpb.TransportOption{
// // Address: mneDetails.Address,
// // Username: opt.Username,
// // Password: opt.Password,
// // Tls: opt.Tls,
// // Type: opt.Type,
// // TransportOption: opt.TransportOption,
// // }
// // log.WithField("transport option", csbiTransportOptions).Debug("gosdn gnmi transport options")
// //
// // files := []string{util.GoStructName, util.ManifestFileName, util.GoStructAdditionsName}
// // csbiID := uuid.New()
// // ctx, cancel := context.WithTimeout(context.Background(), time.Minute*10)
// // defer cancel()
// //
// // g := new(errgroup.Group)
// // for _, f := range files {
// // req := &cpb.GetPayloadRequest{
// // Timestamp: time.Now().UnixNano(),
// // Did: deployment.Id,
// // File: f,
// // }
// // g.Go(func() error {
// // gClient, err := pnd.csbiClient.GetFile(ctx, req)
// // if err != nil {
// // return err
// // }
// // err = saveStreamToFile(gClient, req.GetFile(), csbiID)
// // if err != nil {
// // return err
// // }
// // return nil
// // })
// // }
// //
// // err = g.Wait()
// // if err != nil {
// // return uuid.Nil, err
// // }
// //
// // csbi, err := NewSBI(spb.Type_TYPE_CONTAINERISED, csbiID)
// // if err != nil {
// // return uuid.Nil, err
// // }
// // err = pnd.southboundService.Add(csbi)
// // if err != nil {
// // return uuid.Nil, err
// // }
// // mne, err := NewNetworkElement(name, uuid.Nil, csbiTransportOptions, csbi, conflict.Metadata{ResourceVersion: 0})
// // if err != nil {
// // return uuid.Nil, err
// // }
// // mne.(*CsbiNetworkElement).UUID = id
// // ch <- networkelement.Details{TransportOption: opt}
// // if err := pnd.networkElementService.Add(mne); err != nil {
// // return uuid.Nil, err
// // }
// // return id, nil
// //}
// return uuid.Nil, nil
//}
// TODO: update
// requestPlugin request a plugin from the plugin-registry.
func (pnd *pndImplementation) requestPlugin(requestID uuid.UUID) (plugin.Plugin, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*1)
defer cancel()
pluginDownloadRequest := &rpb.GetDownloadRequest{
Timestamp: time.Now().UnixNano(),
Id: requestID.String(),
}
dClient, err := pnd.pluginRegistryClient.Download(ctx, pluginDownloadRequest)
if err != nil {
return nil, err
}
if err := saveStreamToFile(dClient, util.BundledPluginName, requestID); err != nil {
return nil, err
}
if err := util.UnzipPlugin(requestID); err != nil {
return nil, err
}
plugin, err := NewPlugin(requestID)
if err != nil {
return nil, err
}
if err := pnd.pluginService.Add(plugin); err != nil {
return nil, err
}
// StreamClient allows to distinguish between the different ygot
// generated GoStruct clients, which return a stream of bytes.
type StreamClient interface {
grpc.ClientStream
}
// saveStreamToFile takes a StreamClient and processes the included gRPC
// stream. A file with the provided filename is created within the goSDN's
// 'plugin-folder'. Each file is stored in its own folder based on a new
// uuid.UUID.
func saveStreamToFile(sc StreamClient, filename string, id uuid.UUID) (err error) {
folderName := viper.GetString("plugin-folder")
path := filepath.Join(folderName, id.String(), filename)
// clean path to prevent attackers to get access to to directories elsewhere on the system
path = filepath.Clean(path)
if !strings.HasPrefix(path, folderName) {
Fabian Seidl
committed
return &customerrs.InvalidParametersError{
Param: path,
}
}
// create the directory hierarchy based on the path
if err := os.MkdirAll(filepath.Dir(path), 0770); err != nil {
}
// create the gostructs.go file at path
f, err := os.Create(path)
if err != nil {
}
if ferr := f.Close(); ferr != nil {
fErrString := ferr.Error()
err = fmt.Errorf("InternalError=%w error closing file:%+s", err, fErrString)
// receive byte stream
payload, err := sc.Recv()
Fabian Seidl
committed
if errors.Is(err, io.EOF) {
closeErr := sc.CloseSend()
return closeErr
closeErr := sc.CloseSend()
return closeErr
}
log.WithField("n", n).Trace("wrote bytes")
}
if err := f.Sync(); err != nil {
// MarshalBSON implements the MarshalBSON interface to store a network element as BSON.
func (pnd *pndImplementation) MarshalBSON() ([]byte, error) {
return bson.Marshal(&struct {
ID string `bson:"_id"`
Name string `bson:"name"`
Description string `bson:"description"`
}{
ID: pnd.Id.String(),
Name: pnd.Name,
Description: pnd.Description,
})