-
Malte Bauch authoredMalte Bauch authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
principalNetworkDomain.go 25.45 KiB
package nucleus
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"strings"
"time"
"code.fbi.h-da.de/danet/gosdn/controller/conflict"
"code.fbi.h-da.de/danet/gosdn/controller/metrics"
"code.fbi.h-da.de/danet/gosdn/controller/nucleus/types"
"code.fbi.h-da.de/danet/gosdn/controller/nucleus/util"
"go.mongodb.org/mongo-driver/bson"
"golang.org/x/sync/errgroup"
cpb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/csbi"
ppb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/pnd"
spb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/southbound"
tpb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/transport"
eventservice "code.fbi.h-da.de/danet/gosdn/controller/eventService"
"google.golang.org/grpc"
"google.golang.org/protobuf/proto"
"code.fbi.h-da.de/danet/gosdn/controller/customerrs"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/change"
eventInterfaces "code.fbi.h-da.de/danet/gosdn/controller/interfaces/event"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/networkdomain"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/networkelement"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/southbound"
gGnmi "code.fbi.h-da.de/danet/gosdn/controller/nucleus/util/gnmi"
"code.fbi.h-da.de/danet/gosdn/controller/store"
"code.fbi.h-da.de/danet/gosdn/forks/goarista/gnmi"
"github.com/google/uuid"
gpb "github.com/openconfig/gnmi/proto/gnmi"
"github.com/openconfig/ygot/ygot"
"github.com/openconfig/ygot/ytypes"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
"github.com/spf13/viper"
)
// NOTE: Until we've added database support for changes, we will hold
// changeStores in memory for now.
var changeStoreMap = make(map[uuid.UUID]*store.ChangeStore)
// NewPND creates a Principle Network Domain.
func NewPND(
name string,
description string,
id uuid.UUID,
c cpb.CsbiServiceClient,
callback func(uuid.UUID, chan networkelement.Details),
) (networkdomain.NetworkDomain, error) {
eventService, err := eventservice.NewEventService()
if err != nil {
return nil, err
}
sbiStore := NewSbiStore(id)
networkElementStore := NewNetworkElementStore(id)
sbiService := NewSbiService(sbiStore, eventService)
networkElementService := NewNetworkElementService(
networkElementStore,
sbiService,
eventService,
)
changeStore, ok := changeStoreMap[id]
if !ok {
changeStore = store.NewChangeStore()
changeStoreMap[id] = changeStore
}
pnd := &pndImplementation{
Name: name,
Description: description,
southboundService: sbiService,
networkElementService: networkElementService,
changes: changeStore,
Id: id,
csbiClient: c,
callback: callback,
eventService: eventService,
}
existingSBIs, err := sbiStore.GetAll()
if err != nil {
return nil, err
}
if len(existingSBIs) == 0 {
newSBI, _ := NewSBI(spb.Type_TYPE_OPENCONFIG)
err = pnd.southboundService.Add(newSBI)
if err != nil {
return nil, err
}
}
return pnd, nil
}
type pndImplementation struct {
Name string `json:"name,omitempty"`
Description string `json:"description,omitempty"`
southboundService southbound.Service
networkElementService networkelement.Service
changes *store.ChangeStore
//nolint
Id uuid.UUID `json:"id,omitempty"`
csbiClient cpb.CsbiServiceClient
callback func(uuid.UUID, chan networkelement.Details)
eventService eventInterfaces.Service
}
func (pnd *pndImplementation) PendingChanges() []uuid.UUID {
return pnd.changes.Pending()
}
func (pnd *pndImplementation) CommittedChanges() []uuid.UUID {
return pnd.changes.Committed()
}
func (pnd *pndImplementation) ConfirmedChanges() []uuid.UUID {
return pnd.changes.Confirmed()
}
func (pnd *pndImplementation) GetChange(cuid uuid.UUID) (change.Change, error) {
return pnd.changes.GetChange(cuid)
}
func (pnd *pndImplementation) Commit(u uuid.UUID) error {
ch, err := pnd.changes.GetChange(u)
if err != nil {
return err
}
return ch.Commit()
}
func (pnd *pndImplementation) Confirm(u uuid.UUID) error {
ch, err := pnd.changes.GetChange(u)
if err != nil {
return err
}
return ch.Confirm()
}
func (pnd *pndImplementation) ID() uuid.UUID {
return pnd.Id
}
func (pnd *pndImplementation) NetworkElements() []networkelement.NetworkElement {
allNetworkElements, _ := pnd.networkElementService.GetAll()
return allNetworkElements
}
func (pnd *pndImplementation) FlattenedNetworkElements() []networkelement.LoadedNetworkElement {
allNetworkElements, _ := pnd.networkElementService.GetAllAsLoaded()
return allNetworkElements
}
// GetName returns the name of the PND.
func (pnd *pndImplementation) GetName() string {
return pnd.Name
}
// GetDescription returns the current description of the PND.
func (pnd *pndImplementation) GetDescription() string {
return pnd.Description
}
// GetSBIs returns the registered SBIs.
func (pnd *pndImplementation) GetSBIs() ([]southbound.SouthboundInterface, error) {
sbis, err := pnd.southboundService.GetAll()
if err != nil {
return nil, err
}
return sbis, nil
}
// GetSBIs returns the registered SBIs.
func (pnd *pndImplementation) GetSBI(sbiUUID uuid.UUID) (southbound.SouthboundInterface, error) {
sbis, err := pnd.southboundService.Get(store.Query{ID: sbiUUID})
if err != nil {
return nil, err
}
return sbis, nil
}
// Destroy destroys the PND.
func (pnd *pndImplementation) Destroy() error {
return destroy()
}
// AddSbi adds a SBI to the PND which will be supported.
func (pnd *pndImplementation) AddSbi(s southbound.SouthboundInterface) error {
return pnd.addSbi(s)
}
// RemoveSbi removes a SBI from the PND
// network elements and remove the network elements using this SBI.
func (pnd *pndImplementation) RemoveSbi(sid uuid.UUID) error {
var associatedNetworkElements []networkelement.LoadedNetworkElement
allExistingNetworkElements, err := pnd.networkElementService.GetAllAsLoaded()
if err != nil {
return err
}
// range over all storable items within the network element store
for _, mne := range allExistingNetworkElements {
// check if the network element uses the provided SBI and add it to the network element
// slice.
loadedSbiUUID, err := uuid.Parse(mne.SBI)
if err != nil {
return err
}
if loadedSbiUUID == sid {
associatedNetworkElements = append(associatedNetworkElements, mne)
}
}
// range over associated network elements and remove each one of them
for _, aMNE := range associatedNetworkElements {
loadedNetworkElementUUID, err := uuid.Parse(aMNE.ID)
if err != nil {
return err
}
if err := pnd.removeNetworkElement(loadedNetworkElementUUID); err != nil {
return err
}
}
return pnd.removeSbi(sid)
}
// AddNetworkElement adds a new network element to the PND.
func (pnd *pndImplementation) AddNetworkElement(name string, opt *tpb.TransportOption, sid uuid.UUID) (uuid.UUID, error) {
labels := prometheus.Labels{"type": opt.Type.String()}
start := metrics.StartHook(labels, networkElementCreationsTotal)
defer metrics.FinishHook(labels, start, networkElementCreationDurationSecondsTotal, networkElementCreationDurationSeconds)
var sbi southbound.SouthboundInterface
var err error
switch t := opt.Type; t {
case spb.Type_TYPE_CONTAINERISED:
return pnd.handleCsbiEnrolment(name, opt)
case spb.Type_TYPE_PLUGIN:
sbi, err = pnd.requestPlugin(name, opt)
if err != nil {
return uuid.Nil, err
}
default:
var err error
sbi, err = pnd.southboundService.Get(store.Query{ID: sid})
if err != nil {
return uuid.Nil, err
}
}
mne, err := NewNetworkElement(name, uuid.Nil, opt, sbi, conflict.Metadata{ResourceVersion: 0})
if err != nil {
return uuid.Nil, err
}
return pnd.addNetworkElement(mne)
}
// TODO: (maba): This should be changed to UUID.
func (pnd *pndImplementation) GetNetworkElement(identifier string) (networkelement.NetworkElement, error) {
id, err := uuid.Parse(identifier)
if err != nil {
id = uuid.Nil
}
mne, err := pnd.networkElementService.Get(store.Query{
ID: id,
Name: identifier,
})
if mne == nil {
return nil, fmt.Errorf("no network element found")
}
if err != nil {
return nil, err
}
return mne, nil
}
// RemoveNetworkElement removes a network element from the PND.
func (pnd *pndImplementation) RemoveNetworkElement(uuid uuid.UUID) error {
return pnd.removeNetworkElement(uuid)
}
// UpdateNetworkElement updates a network element from the PND.
func (pnd *pndImplementation) UpdateNetworkElement(networkElementID uuid.UUID, modelAsString string) error {
err := pnd.networkElementService.UpdateModel(networkElementID, modelAsString)
if err != nil {
return err
}
//TODO: check if it could be worth to provide the method with a network
//element instead of an ID.
err = pnd.ensureIntendedConfigurationIsAppliedOnNetworkElement(networkElementID)
if err != nil {
return err
}
return err
}
// Actual implementation, bind to struct if necessary.
func destroy() error {
return nil
}
// addSbi adds a SBI to the PND's SBI store.
func (pnd *pndImplementation) addSbi(sbi southbound.SouthboundInterface) error {
return pnd.southboundService.Add(sbi)
}
// removeSbi removes an SBI based on the given ID from the PND's SBI store.
func (pnd *pndImplementation) removeSbi(id uuid.UUID) error {
sbi, err := pnd.southboundService.Get(store.Query{ID: id})
if sbi == nil {
return fmt.Errorf("no sbi found")
}
if err != nil {
return err
}
return pnd.southboundService.Delete(sbi)
}
// addNetworkElement adds a network element to the PND's network element store.
func (pnd *pndImplementation) addNetworkElement(mne networkelement.NetworkElement) (uuid.UUID, error) {
err := pnd.networkElementService.Add(mne)
if err != nil {
return uuid.Nil, err
}
if mne.IsTransportValid() {
_, err = pnd.Request(mne.ID(), "/interfaces")
if err != nil {
return uuid.Nil, err
}
}
return mne.ID(), nil
}
func (pnd *pndImplementation) removeNetworkElement(id uuid.UUID) error {
mne, err := pnd.networkElementService.Get(store.Query{
ID: id,
Name: id.String(),
})
if err != nil {
return err
}
if mne == nil {
return fmt.Errorf("no network element found")
}
labels := prometheus.Labels{"type": mne.SBI().Type().String()}
start := metrics.StartHook(labels, networkElementDeletionsTotal)
defer metrics.FinishHook(labels, start, networkElementDeletionDurationSecondsTotal, networkElementDeletionDurationSeconds)
switch mne.(type) {
case *CsbiNetworkElement:
return pnd.handleCsbiDeletion(mne)
default:
return pnd.networkElementService.Delete(mne)
}
}
func (pnd *pndImplementation) MarshalNetworkElement(identifier string) (string, error) {
foundNetworkElement, err := pnd.networkElementService.Get(store.Query{
ID: uuid.MustParse(identifier),
Name: identifier,
})
if err != nil {
return "", err
}
jsonTree, err := json.MarshalIndent(foundNetworkElement.GetModel(), "", "\t")
if err != nil {
return "", err
}
log.WithFields(log.Fields{
"pnd": pnd.Id,
"Identifier": identifier,
"Name": foundNetworkElement.Name,
}).Info("marshalled network element")
return string(jsonTree), nil
}
// Request sends a get request to a specific network element.
// TODO: this method needs some heavy refactoring, especially in regards to the
// UpdateModel call
func (pnd *pndImplementation) Request(uuid uuid.UUID, path string) (proto.Message, error) {
mne, err := pnd.networkElementService.Get(store.Query{
ID: uuid,
Name: uuid.String(),
})
if err != nil {
return nil, err
}
if mne == nil {
return nil, fmt.Errorf("no network element found")
}
ctx := context.Background()
res, err := mne.Transport().Get(ctx, path)
if err != nil {
return nil, err
}
resp, ok := res.(proto.Message)
if !ok {
return nil, &customerrs.InvalidTypeAssertionError{
Value: res,
Type: (*proto.Message)(nil),
}
}
err = mne.ProcessResponse(resp)
if err != nil {
return nil, err
}
modelAsString, err := mne.GetModelAsString()
if err != nil {
return nil, err
}
// TODO(path): We probably have to remove this when we address path request handling.
err = pnd.networkElementService.UpdateModel(uuid, modelAsString)
if err != nil {
return nil, err
}
return resp, nil
}
// RequestAll sends a request for all registered network elements.
func (pnd *pndImplementation) RequestAll(path string) error {
allNetworkElements, err := pnd.networkElementService.GetAllAsLoaded()
if err != nil {
return err
}
for _, mne := range allNetworkElements {
mneUUID, err := uuid.Parse(mne.ID)
if err != nil {
return err
}
_, err = pnd.Request(mneUUID, path)
if err != nil {
return err
}
}
// TODO: (maba): this is not returning any useful information; this should
// return some feedback if the requests were successful
log.WithFields(log.Fields{
"pnd": pnd.Id,
"path": path,
}).Info("sent request to all network elements")
return nil
}
func (pnd *pndImplementation) ensureIntendedConfigurationIsAppliedOnNetworkElement(mneID uuid.UUID) error {
mne, err := pnd.networkElementService.Get(store.Query{
ID: mneID,
})
if err != nil {
return err
}
model, err := mne.GetModelAsString()
if err != nil {
return err
}
req := &gpb.SetRequest{}
path, err := ygot.StringToStructuredPath("/")
if err != nil {
return err
}
req.Update = []*gpb.Update{{
Path: path,
Val: &gpb.TypedValue{
Value: &gpb.TypedValue_JsonIetfVal{JsonIetfVal: []byte(model)},
},
}}
response, err := mne.Transport().CustomSet(context.Background(), req)
if err != nil {
log.Errorf("Failed to apply model of network element err=%+v, response=%+v", err, response)
return err
}
return nil
}
// ChangeMNE creates a change from the provided Operation, path and value.
// The Change is Pending and times out after the specified timeout period.
//
// nolint:gocyclo
func (pnd *pndImplementation) ChangeMNE(duid uuid.UUID, operation ppb.ApiOperation, path string, value ...string) (uuid.UUID, error) {
//TODO: check if we can get cyclomatic complexity from 16 to at least 15
mne, err := pnd.networkElementService.Get(store.Query{
ID: duid,
})
if err != nil {
return uuid.Nil, err
}
validatedCpy, err := mne.CreateModelCopy()
if err != nil {
return uuid.Nil, err
}
p, err := ygot.StringToStructuredPath(path)
if err != nil {
return uuid.Nil, err
}
if operation != ppb.ApiOperation_API_OPERATION_DELETE && len(value) != 1 {
return uuid.Nil, &customerrs.InvalidParametersError{
Func: pnd.ChangeMNE,
Param: value,
}
}
switch operation {
case ppb.ApiOperation_API_OPERATION_UPDATE, ppb.ApiOperation_API_OPERATION_REPLACE:
_, entry, err := ytypes.GetOrCreateNode(mne.SBI().Schema().RootSchema(), validatedCpy, p)
if err != nil {
return uuid.Nil, err
}
if entry.IsDir() {
opts := []ytypes.UnmarshalOpt{&ytypes.IgnoreExtraFields{}}
if err := mne.SBI().Unmarshal([]byte(value[0]), p, validatedCpy, opts...); err != nil {
return uuid.Nil, err
}
} else if entry.IsLeaf() {
typedValue, err := gGnmi.ConvertStringToGnmiTypedValue(value[0], entry.Type)
if err != nil {
return uuid.Nil, err
}
opts := []ytypes.SetNodeOpt{&ytypes.InitMissingElements{}, &ytypes.TolerateJSONInconsistencies{}}
if err := ytypes.SetNode(mne.SBI().Schema().RootSchema(), validatedCpy, p, typedValue, opts...); err != nil {
return uuid.Nil, err
}
}
case ppb.ApiOperation_API_OPERATION_DELETE:
if err := ytypes.DeleteNode(mne.SBI().Schema().RootSchema(), validatedCpy, p); err != nil {
return uuid.Nil, err
}
default:
return uuid.Nil, &customerrs.OperationNotSupportedError{Op: operation}
}
ygot.PruneEmptyBranches(validatedCpy)
callback := func(original ygot.GoStruct, modified ygot.GoStruct) error {
ctx := context.WithValue(context.Background(), types.CtxKeyOperation, operation) // nolint
payload := change.Payload{Original: original, Modified: modified}
pathToSet := path
schema := mne.SBI().Schema()
return mne.Transport().Set(ctx, payload, pathToSet, schema)
}
ch := NewChange(duid, mne.GetModel(), validatedCpy, callback)
if err := pnd.changes.Add(ch); err != nil {
return uuid.Nil, err
}
return ch.cuid, nil
}
func (pnd *pndImplementation) SubscribePath(uuid uuid.UUID, subList *ppb.SubscriptionList) error {
mne, err := pnd.networkElementService.Get(store.Query{
ID: uuid,
})
if err != nil {
return err
}
mode, err := mapModeToAristaFork(subList.GetMode())
if err != nil {
return err
}
for _, sub := range subList.Subscription {
streamMode, err := mapStreamModeToAristaFork(sub.GetStreamMode())
if err != nil {
return err
}
opts := &gnmi.SubscribeOptions{
Mode: mode,
StreamMode: streamMode,
Paths: [][]string{splitStringPath(sub.GetPath())},
SampleInterval: sub.SampleInterval,
}
ctx := context.Background()
ctx = context.WithValue(ctx, types.CtxKeyOpts, opts)
if err = mne.Transport().Subscribe(ctx); err != nil {
return err
}
}
return nil
}
func splitStringPath(s string) []string {
return strings.Split(s, "/")
}
func mapStreamModeToAristaFork(mode ppb.StreamMode) (string, error) {
switch mode {
case ppb.StreamMode_STREAM_MODE_TARGET_DEFINED:
return "target_defined", nil
case ppb.StreamMode_STREAM_MODE_ON_CHANGE:
return "on_change", nil
case ppb.StreamMode_STREAM_MODE_SAMPLE:
return "sample", nil
default:
return "", fmt.Errorf("StreamMode of type: %T is not supported", mode)
}
}
func mapModeToAristaFork(mode ppb.SubscriptionMode) (string, error) {
switch mode {
case ppb.SubscriptionMode_SUBSCRIPTION_MODE_STREAM:
return "stream", nil
case ppb.SubscriptionMode_SUBSCRIPTION_MODE_ONCE:
return "once", nil
case ppb.SubscriptionMode_SUBSCRIPTION_MODE_POLL:
return "poll", nil
default:
return "", fmt.Errorf("SubscriptionMode of type: %T is not supported", mode)
}
}
// nolint
// handleRollbackError will be implemented in the near future
func handleRollbackError(id uuid.UUID, err error) {
log.Error(err)
// TODO: Notion of invalid state needed.
}
func (pnd *pndImplementation) handleCsbiDeletion(mne networkelement.NetworkElement) error {
log.Infof("csbi deletion triggered for %v", mne.ID().String())
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
req := &cpb.DeleteRequest{
Timestamp: time.Now().UnixNano(),
Did: []string{mne.ID().String()},
}
resp, err := pnd.csbiClient.Delete(ctx, req)
if err != nil {
return err
}
err = pnd.southboundService.Delete(mne.SBI())
if err != nil {
return err
}
log.WithFields(log.Fields{
"uuid": mne.ID().String(),
"status": resp.Status,
}).Info("csbi deleted")
return nil
}
func (pnd *pndImplementation) handleCsbiEnrolment(name string, opt *tpb.TransportOption) (uuid.UUID, error) {
g := new(errgroup.Group)
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5)
defer cancel()
req := &cpb.CreateRequest{
Timestamp: time.Now().UnixNano(),
TransportOption: []*tpb.TransportOption{opt},
}
resp, err := pnd.csbiClient.Create(ctx, req)
if err != nil {
return uuid.Nil, err
}
// the slice only contains one deployment
var mneID uuid.UUID
for _, deployment := range resp.Deployments {
dCopy := deployment
g.Go(func() error {
mneID, err = pnd.createCsbiNetworkElement(ctx, name, dCopy, opt)
if err != nil {
return err
}
return nil
})
}
err = g.Wait()
if err != nil {
return uuid.Nil, err
}
return mneID, nil
}
// createCsbiNetworkElement is a helper method for cSBI network element creation. The method
// waits for a SYN (which indicates that the cSBI is running and addressable)
// of the commissioned cSBI and creates the network element within the controller.
func (pnd *pndImplementation) createCsbiNetworkElement(
ctx context.Context,
name string,
deployment *cpb.Deployment,
opt *tpb.TransportOption,
) (uuid.UUID, error) {
id, err := uuid.Parse(deployment.Id)
if err != nil {
return uuid.Nil, err
}
ch := make(chan networkelement.Details, 1)
pnd.callback(id, ch)
defer pnd.callback(id, nil)
defer close(ch)
tickatus := time.NewTicker(time.Minute * 1)
defer tickatus.Stop()
select {
case <-tickatus.C:
log.WithFields(log.Fields{
"id": deployment.Id,
"err": ctx.Err(),
}).Error("csbi handshake timed out")
case mneDetails := <-ch:
log.Infof("syn from csbi %v", mneDetails.ID)
id, err := uuid.Parse(mneDetails.ID)
if err != nil {
return uuid.Nil, err
}
csbiTransportOptions := &tpb.TransportOption{
Address: mneDetails.Address,
Username: opt.Username,
Password: opt.Password,
Tls: opt.Tls,
Type: opt.Type,
TransportOption: opt.TransportOption,
}
log.WithField("transport option", csbiTransportOptions).Debug("gosdn gnmi transport options")
files := []string{util.GoStructName, util.ManifestFileName, util.GoStructAdditionsName}
csbiID := uuid.New()
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*10)
defer cancel()
g := new(errgroup.Group)
for _, f := range files {
req := &cpb.GetPayloadRequest{
Timestamp: time.Now().UnixNano(),
Did: deployment.Id,
File: f,
}
g.Go(func() error {
gClient, err := pnd.csbiClient.GetFile(ctx, req)
if err != nil {
return err
}
err = saveStreamToFile(gClient, req.GetFile(), csbiID)
if err != nil {
return err
}
return nil
})
}
err = g.Wait()
if err != nil {
return uuid.Nil, err
}
csbi, err := NewSBI(spb.Type_TYPE_CONTAINERISED, csbiID)
if err != nil {
return uuid.Nil, err
}
err = pnd.southboundService.Add(csbi)
if err != nil {
return uuid.Nil, err
}
mne, err := NewNetworkElement(name, uuid.Nil, csbiTransportOptions, csbi, conflict.Metadata{ResourceVersion: 0})
if err != nil {
return uuid.Nil, err
}
mne.(*CsbiNetworkElement).UUID = id
ch <- networkelement.Details{TransportOption: opt}
if err := pnd.networkElementService.Add(mne); err != nil {
return uuid.Nil, err
}
return id, nil
}
return uuid.Nil, nil
}
// requestPlugin is a feature for cSBIs and sends a plugin request to the cSBI
// orchestrator and processes the received ygot generated go code, builds the
// plugin and integrates the Plugin within the goSDN as SouthboundInterface.
// The generated code is passed into a gostructs.go file, which is the
// foundation for the created plugin.
func (pnd *pndImplementation) requestPlugin(name string, opt *tpb.TransportOption) (southbound.SouthboundInterface, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*10)
defer cancel()
cReq := &cpb.CreateRequest{
Timestamp: time.Now().UnixNano(),
TransportOption: []*tpb.TransportOption{opt},
}
resp, err := pnd.csbiClient.CreateGoStruct(ctx, cReq)
if err != nil {
return nil, err
}
files := []string{util.GoStructName, util.ManifestFileName, util.GoStructAdditionsName}
g := new(errgroup.Group)
// we only request one plugin
//nolint
for _, dep := range resp.GetDeployments() {
id := uuid.New()
for _, f := range files {
req := &cpb.GetPayloadRequest{
Timestamp: time.Now().UnixNano(),
Did: dep.GetId(),
File: f,
}
g.Go(func() error {
gClient, err := pnd.csbiClient.GetFile(ctx, req)
if err != nil {
return err
}
err = saveStreamToFile(gClient, req.GetFile(), id)
if err != nil {
return err
}
return nil
})
}
err = g.Wait()
if err != nil {
return nil, err
}
sbi, err := NewSBI(spb.Type_TYPE_PLUGIN, id)
if err != nil {
return nil, err
}
err = pnd.southboundService.Add(sbi)
if err != nil {
return nil, err
}
return sbi, nil
}
return nil, fmt.Errorf("requestPlugin: received deployment slice was empty")
}
// StreamClient allows to distinguish between the different ygot
// generated GoStruct clients, which return a stream of bytes.
type StreamClient interface {
Recv() (*cpb.Payload, error)
grpc.ClientStream
}
// saveStreamToFile takes a GenericGoStructClient and processes the included
// gRPC stream. A 'gostructs.go' file is created within the goSDN's
// 'plugin-folder'. Each 'gostructs.go' file is stored in its own folder based
// on a new uuid.UUID.
func saveStreamToFile[T StreamClient](sc T, filename string, id uuid.UUID) (err error) {
folderName := viper.GetString("plugin-folder")
path := filepath.Join(folderName, id.String(), filename)
// clean path to prevent attackers to get access to to directories elsewhere on the system
path = filepath.Clean(path)
if !strings.HasPrefix(path, folderName) {
return &customerrs.InvalidParametersError{
Func: saveStreamToFile[T],
Param: path,
}
}
// create the directory hierarchy based on the path
if err := os.MkdirAll(filepath.Dir(path), 0770); err != nil {
return err
}
// create the gostructs.go file at path
f, err := os.Create(path)
if err != nil {
return err
}
defer func() {
if ferr := f.Close(); ferr != nil {
fErrString := ferr.Error()
err = fmt.Errorf("InternalError=%w error closing file:%+s", err, fErrString)
}
}()
// receive byte stream
for {
payload, err := sc.Recv()
if err != nil {
if errors.Is(err, io.EOF) {
break
}
closeErr := sc.CloseSend()
if closeErr != nil {
return closeErr
}
return err
}
n, err := f.Write(payload.Chunk)
if err != nil {
closeErr := sc.CloseSend()
if closeErr != nil {
return closeErr
}
return err
}
log.WithField("n", n).Trace("wrote bytes")
}
if err := f.Sync(); err != nil {
return err
}
return nil
}
// MarshalBSON implements the MarshalBSON interface to store a network element as BSON.
func (pnd *pndImplementation) MarshalBSON() ([]byte, error) {
return bson.Marshal(&struct {
ID string `bson:"_id"`
Name string `bson:"name"`
Description string `bson:"description"`
}{
ID: pnd.Id.String(),
Name: pnd.Name,
Description: pnd.Description,
})
}