Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
principalNetworkDomain.go 25.45 KiB
package nucleus

import (
	"context"
	"encoding/json"
	"errors"
	"fmt"
	"io"
	"os"
	"path/filepath"
	"strings"
	"time"

	"code.fbi.h-da.de/danet/gosdn/controller/conflict"
	"code.fbi.h-da.de/danet/gosdn/controller/metrics"
	"code.fbi.h-da.de/danet/gosdn/controller/nucleus/types"
	"code.fbi.h-da.de/danet/gosdn/controller/nucleus/util"
	"go.mongodb.org/mongo-driver/bson"
	"golang.org/x/sync/errgroup"

	cpb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/csbi"
	ppb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/pnd"
	spb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/southbound"
	tpb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/transport"
	eventservice "code.fbi.h-da.de/danet/gosdn/controller/eventService"
	"google.golang.org/grpc"
	"google.golang.org/protobuf/proto"

	"code.fbi.h-da.de/danet/gosdn/controller/customerrs"
	"code.fbi.h-da.de/danet/gosdn/controller/interfaces/change"
	eventInterfaces "code.fbi.h-da.de/danet/gosdn/controller/interfaces/event"
	"code.fbi.h-da.de/danet/gosdn/controller/interfaces/networkdomain"
	"code.fbi.h-da.de/danet/gosdn/controller/interfaces/networkelement"
	"code.fbi.h-da.de/danet/gosdn/controller/interfaces/southbound"
	gGnmi "code.fbi.h-da.de/danet/gosdn/controller/nucleus/util/gnmi"

	"code.fbi.h-da.de/danet/gosdn/controller/store"

	"code.fbi.h-da.de/danet/gosdn/forks/goarista/gnmi"
	"github.com/google/uuid"
	gpb "github.com/openconfig/gnmi/proto/gnmi"
	"github.com/openconfig/ygot/ygot"
	"github.com/openconfig/ygot/ytypes"
	"github.com/prometheus/client_golang/prometheus"
	log "github.com/sirupsen/logrus"
	"github.com/spf13/viper"
)

// NOTE: Until we've added database support for changes, we will hold
// changeStores in memory for now.
var changeStoreMap = make(map[uuid.UUID]*store.ChangeStore)

// NewPND creates a Principle Network Domain.
func NewPND(
	name string,
	description string,
	id uuid.UUID,
	c cpb.CsbiServiceClient,
	callback func(uuid.UUID, chan networkelement.Details),
) (networkdomain.NetworkDomain, error) {
	eventService, err := eventservice.NewEventService()
	if err != nil {
		return nil, err
	}

	sbiStore := NewSbiStore(id)
	networkElementStore := NewNetworkElementStore(id)

	sbiService := NewSbiService(sbiStore, eventService)
	networkElementService := NewNetworkElementService(
		networkElementStore,
		sbiService,
		eventService,
	)

	changeStore, ok := changeStoreMap[id]
	if !ok {
		changeStore = store.NewChangeStore()
		changeStoreMap[id] = changeStore
	}

	pnd := &pndImplementation{
		Name:                  name,
		Description:           description,
		southboundService:     sbiService,
		networkElementService: networkElementService,
		changes:               changeStore,
		Id:                    id,

		csbiClient:   c,
		callback:     callback,
		eventService: eventService,
	}

	existingSBIs, err := sbiStore.GetAll()
	if err != nil {
		return nil, err
	}

	if len(existingSBIs) == 0 {
		newSBI, _ := NewSBI(spb.Type_TYPE_OPENCONFIG)
		err = pnd.southboundService.Add(newSBI)
		if err != nil {
			return nil, err
		}
	}

	return pnd, nil
}

type pndImplementation struct {
	Name                  string `json:"name,omitempty"`
	Description           string `json:"description,omitempty"`
	southboundService     southbound.Service
	networkElementService networkelement.Service
	changes               *store.ChangeStore
	//nolint
	Id uuid.UUID `json:"id,omitempty"`

	csbiClient   cpb.CsbiServiceClient
	callback     func(uuid.UUID, chan networkelement.Details)
	eventService eventInterfaces.Service
}

func (pnd *pndImplementation) PendingChanges() []uuid.UUID {
	return pnd.changes.Pending()
}

func (pnd *pndImplementation) CommittedChanges() []uuid.UUID {
	return pnd.changes.Committed()
}

func (pnd *pndImplementation) ConfirmedChanges() []uuid.UUID {
	return pnd.changes.Confirmed()
}

func (pnd *pndImplementation) GetChange(cuid uuid.UUID) (change.Change, error) {
	return pnd.changes.GetChange(cuid)
}
func (pnd *pndImplementation) Commit(u uuid.UUID) error {
	ch, err := pnd.changes.GetChange(u)
	if err != nil {
		return err
	}
	return ch.Commit()
}

func (pnd *pndImplementation) Confirm(u uuid.UUID) error {
	ch, err := pnd.changes.GetChange(u)
	if err != nil {
		return err
	}
	return ch.Confirm()
}

func (pnd *pndImplementation) ID() uuid.UUID {
	return pnd.Id
}

func (pnd *pndImplementation) NetworkElements() []networkelement.NetworkElement {
	allNetworkElements, _ := pnd.networkElementService.GetAll()

	return allNetworkElements
}

func (pnd *pndImplementation) FlattenedNetworkElements() []networkelement.LoadedNetworkElement {
	allNetworkElements, _ := pnd.networkElementService.GetAllAsLoaded()

	return allNetworkElements
}

// GetName returns the name of the PND.
func (pnd *pndImplementation) GetName() string {
	return pnd.Name
}

// GetDescription returns the current description of the PND.
func (pnd *pndImplementation) GetDescription() string {
	return pnd.Description
}

// GetSBIs returns the registered SBIs.
func (pnd *pndImplementation) GetSBIs() ([]southbound.SouthboundInterface, error) {
	sbis, err := pnd.southboundService.GetAll()
	if err != nil {
		return nil, err
	}
	return sbis, nil
}

// GetSBIs returns the registered SBIs.
func (pnd *pndImplementation) GetSBI(sbiUUID uuid.UUID) (southbound.SouthboundInterface, error) {
	sbis, err := pnd.southboundService.Get(store.Query{ID: sbiUUID})
	if err != nil {
		return nil, err
	}
	return sbis, nil
}

// Destroy destroys the PND.
func (pnd *pndImplementation) Destroy() error {
	return destroy()
}

// AddSbi adds a SBI to the PND which will be supported.
func (pnd *pndImplementation) AddSbi(s southbound.SouthboundInterface) error {
	return pnd.addSbi(s)
}
// RemoveSbi removes a SBI from the PND
// network elements and remove the network elements using this SBI.
func (pnd *pndImplementation) RemoveSbi(sid uuid.UUID) error {
	var associatedNetworkElements []networkelement.LoadedNetworkElement

	allExistingNetworkElements, err := pnd.networkElementService.GetAllAsLoaded()
	if err != nil {
		return err
	}

	// range over all storable items within the network element store
	for _, mne := range allExistingNetworkElements {
		// check if the network element uses the provided SBI and add it to the network element
		// slice.
		loadedSbiUUID, err := uuid.Parse(mne.SBI)
		if err != nil {
			return err
		}
		if loadedSbiUUID == sid {
			associatedNetworkElements = append(associatedNetworkElements, mne)
		}
	}

	// range over associated network elements and remove each one of them
	for _, aMNE := range associatedNetworkElements {
		loadedNetworkElementUUID, err := uuid.Parse(aMNE.ID)
		if err != nil {
			return err
		}
		if err := pnd.removeNetworkElement(loadedNetworkElementUUID); err != nil {
			return err
		}
	}

	return pnd.removeSbi(sid)
}

// AddNetworkElement adds a new network element to the PND.
func (pnd *pndImplementation) AddNetworkElement(name string, opt *tpb.TransportOption, sid uuid.UUID) (uuid.UUID, error) {
	labels := prometheus.Labels{"type": opt.Type.String()}
	start := metrics.StartHook(labels, networkElementCreationsTotal)
	defer metrics.FinishHook(labels, start, networkElementCreationDurationSecondsTotal, networkElementCreationDurationSeconds)
	var sbi southbound.SouthboundInterface
	var err error

	switch t := opt.Type; t {
	case spb.Type_TYPE_CONTAINERISED:
		return pnd.handleCsbiEnrolment(name, opt)
	case spb.Type_TYPE_PLUGIN:
		sbi, err = pnd.requestPlugin(name, opt)
		if err != nil {
			return uuid.Nil, err
		}
	default:
		var err error
		sbi, err = pnd.southboundService.Get(store.Query{ID: sid})
		if err != nil {
			return uuid.Nil, err
		}
	}

	mne, err := NewNetworkElement(name, uuid.Nil, opt, sbi, conflict.Metadata{ResourceVersion: 0})
	if err != nil {
		return uuid.Nil, err
	}

	return pnd.addNetworkElement(mne)
}

// TODO: (maba): This should be changed to UUID.
func (pnd *pndImplementation) GetNetworkElement(identifier string) (networkelement.NetworkElement, error) {
	id, err := uuid.Parse(identifier)
	if err != nil {
		id = uuid.Nil
	}

	mne, err := pnd.networkElementService.Get(store.Query{
		ID:   id,
		Name: identifier,
	})
	if mne == nil {
		return nil, fmt.Errorf("no network element found")
	}
	if err != nil {
		return nil, err
	}

	return mne, nil
}

// RemoveNetworkElement removes a network element from the PND.
func (pnd *pndImplementation) RemoveNetworkElement(uuid uuid.UUID) error {
	return pnd.removeNetworkElement(uuid)
}

// UpdateNetworkElement updates a network element from the PND.
func (pnd *pndImplementation) UpdateNetworkElement(networkElementID uuid.UUID, modelAsString string) error {
	err := pnd.networkElementService.UpdateModel(networkElementID, modelAsString)
	if err != nil {
		return err
	}

	//TODO: check if it could be worth to provide the method with a network
	//element instead of an ID.
	err = pnd.ensureIntendedConfigurationIsAppliedOnNetworkElement(networkElementID)
	if err != nil {
		return err
	}

	return err
}

// Actual implementation, bind to struct if necessary.
func destroy() error {
	return nil
}

// addSbi adds a SBI to the PND's SBI store.
func (pnd *pndImplementation) addSbi(sbi southbound.SouthboundInterface) error {
	return pnd.southboundService.Add(sbi)
}

// removeSbi removes an SBI based on the given ID from the PND's SBI store.
func (pnd *pndImplementation) removeSbi(id uuid.UUID) error {
	sbi, err := pnd.southboundService.Get(store.Query{ID: id})
	if sbi == nil {
		return fmt.Errorf("no sbi found")
	}
	if err != nil {
		return err
	}

	return pnd.southboundService.Delete(sbi)
}

// addNetworkElement adds a network element to the PND's network element store.
func (pnd *pndImplementation) addNetworkElement(mne networkelement.NetworkElement) (uuid.UUID, error) {
	err := pnd.networkElementService.Add(mne)
	if err != nil {
		return uuid.Nil, err
	}

	if mne.IsTransportValid() {
		_, err = pnd.Request(mne.ID(), "/interfaces")
		if err != nil {
			return uuid.Nil, err
		}
	}

	return mne.ID(), nil
}

func (pnd *pndImplementation) removeNetworkElement(id uuid.UUID) error {
	mne, err := pnd.networkElementService.Get(store.Query{
		ID:   id,
		Name: id.String(),
	})
	if err != nil {
		return err
	}

	if mne == nil {
		return fmt.Errorf("no network element found")
	}

	labels := prometheus.Labels{"type": mne.SBI().Type().String()}
	start := metrics.StartHook(labels, networkElementDeletionsTotal)
	defer metrics.FinishHook(labels, start, networkElementDeletionDurationSecondsTotal, networkElementDeletionDurationSeconds)
	switch mne.(type) {
	case *CsbiNetworkElement:
		return pnd.handleCsbiDeletion(mne)
	default:
		return pnd.networkElementService.Delete(mne)
	}
}

func (pnd *pndImplementation) MarshalNetworkElement(identifier string) (string, error) {
	foundNetworkElement, err := pnd.networkElementService.Get(store.Query{
		ID:   uuid.MustParse(identifier),
		Name: identifier,
	})
	if err != nil {
		return "", err
	}

	jsonTree, err := json.MarshalIndent(foundNetworkElement.GetModel(), "", "\t")
	if err != nil {
		return "", err
	}
	log.WithFields(log.Fields{
		"pnd":        pnd.Id,
		"Identifier": identifier,
		"Name":       foundNetworkElement.Name,
	}).Info("marshalled network element")

	return string(jsonTree), nil
}

// Request sends a get request to a specific network element.
// TODO: this method needs some heavy refactoring, especially in regards to the
// UpdateModel call
func (pnd *pndImplementation) Request(uuid uuid.UUID, path string) (proto.Message, error) {
	mne, err := pnd.networkElementService.Get(store.Query{
		ID:   uuid,
		Name: uuid.String(),
	})
	if err != nil {
		return nil, err
	}
	if mne == nil {
		return nil, fmt.Errorf("no network element found")
	}
	ctx := context.Background()
	res, err := mne.Transport().Get(ctx, path)
	if err != nil {
		return nil, err
	}
	resp, ok := res.(proto.Message)
	if !ok {
		return nil, &customerrs.InvalidTypeAssertionError{
			Value: res,
			Type:  (*proto.Message)(nil),
		}
	}
	err = mne.ProcessResponse(resp)
	if err != nil {
		return nil, err
	}

	modelAsString, err := mne.GetModelAsString()
	if err != nil {
		return nil, err
	}

	// TODO(path): We probably have to remove this when we address path request handling.
	err = pnd.networkElementService.UpdateModel(uuid, modelAsString)
	if err != nil {
		return nil, err
	}

	return resp, nil
}

// RequestAll sends a request for all registered network elements.
func (pnd *pndImplementation) RequestAll(path string) error {
	allNetworkElements, err := pnd.networkElementService.GetAllAsLoaded()
	if err != nil {
		return err
	}

	for _, mne := range allNetworkElements {
		mneUUID, err := uuid.Parse(mne.ID)
		if err != nil {
			return err
		}
		_, err = pnd.Request(mneUUID, path)
		if err != nil {
			return err
		}
	}
	// TODO: (maba): this is not returning any useful information; this should
	// return some feedback if the requests were successful
	log.WithFields(log.Fields{
		"pnd":  pnd.Id,
		"path": path,
	}).Info("sent request to all network elements")
	return nil
}

func (pnd *pndImplementation) ensureIntendedConfigurationIsAppliedOnNetworkElement(mneID uuid.UUID) error {
	mne, err := pnd.networkElementService.Get(store.Query{
		ID: mneID,
	})
	if err != nil {
		return err
	}

	model, err := mne.GetModelAsString()
	if err != nil {
		return err
	}

	req := &gpb.SetRequest{}
	path, err := ygot.StringToStructuredPath("/")
	if err != nil {
		return err
	}

	req.Update = []*gpb.Update{{
		Path: path,
		Val: &gpb.TypedValue{
			Value: &gpb.TypedValue_JsonIetfVal{JsonIetfVal: []byte(model)},
		},
	}}

	response, err := mne.Transport().CustomSet(context.Background(), req)
	if err != nil {
		log.Errorf("Failed to apply model of network element err=%+v, response=%+v", err, response)
		return err
	}

	return nil
}

// ChangeMNE creates a change from the provided Operation, path and value.
// The Change is Pending and times out after the specified timeout period.
//
// nolint:gocyclo
func (pnd *pndImplementation) ChangeMNE(duid uuid.UUID, operation ppb.ApiOperation, path string, value ...string) (uuid.UUID, error) {
	//TODO: check if we can get cyclomatic complexity from 16 to at least 15
	mne, err := pnd.networkElementService.Get(store.Query{
		ID: duid,
	})
	if err != nil {
		return uuid.Nil, err
	}

	validatedCpy, err := mne.CreateModelCopy()
	if err != nil {
		return uuid.Nil, err
	}

	p, err := ygot.StringToStructuredPath(path)
	if err != nil {
		return uuid.Nil, err
	}

	if operation != ppb.ApiOperation_API_OPERATION_DELETE && len(value) != 1 {
		return uuid.Nil, &customerrs.InvalidParametersError{
			Func:  pnd.ChangeMNE,
			Param: value,
		}
	}
	switch operation {
	case ppb.ApiOperation_API_OPERATION_UPDATE, ppb.ApiOperation_API_OPERATION_REPLACE:
		_, entry, err := ytypes.GetOrCreateNode(mne.SBI().Schema().RootSchema(), validatedCpy, p)
		if err != nil {
			return uuid.Nil, err
		}

		if entry.IsDir() {
			opts := []ytypes.UnmarshalOpt{&ytypes.IgnoreExtraFields{}}
			if err := mne.SBI().Unmarshal([]byte(value[0]), p, validatedCpy, opts...); err != nil {
				return uuid.Nil, err
			}
		} else if entry.IsLeaf() {
			typedValue, err := gGnmi.ConvertStringToGnmiTypedValue(value[0], entry.Type)
			if err != nil {
				return uuid.Nil, err
			}
			opts := []ytypes.SetNodeOpt{&ytypes.InitMissingElements{}, &ytypes.TolerateJSONInconsistencies{}}
			if err := ytypes.SetNode(mne.SBI().Schema().RootSchema(), validatedCpy, p, typedValue, opts...); err != nil {
				return uuid.Nil, err
			}
		}
	case ppb.ApiOperation_API_OPERATION_DELETE:
		if err := ytypes.DeleteNode(mne.SBI().Schema().RootSchema(), validatedCpy, p); err != nil {
			return uuid.Nil, err
		}
	default:
		return uuid.Nil, &customerrs.OperationNotSupportedError{Op: operation}
	}

	ygot.PruneEmptyBranches(validatedCpy)
	callback := func(original ygot.GoStruct, modified ygot.GoStruct) error {
		ctx := context.WithValue(context.Background(), types.CtxKeyOperation, operation) // nolint
		payload := change.Payload{Original: original, Modified: modified}
		pathToSet := path
		schema := mne.SBI().Schema()
		return mne.Transport().Set(ctx, payload, pathToSet, schema)
	}

	ch := NewChange(duid, mne.GetModel(), validatedCpy, callback)

	if err := pnd.changes.Add(ch); err != nil {
		return uuid.Nil, err
	}

	return ch.cuid, nil
}

func (pnd *pndImplementation) SubscribePath(uuid uuid.UUID, subList *ppb.SubscriptionList) error {
	mne, err := pnd.networkElementService.Get(store.Query{
		ID: uuid,
	})
	if err != nil {
		return err
	}

	mode, err := mapModeToAristaFork(subList.GetMode())
	if err != nil {
		return err
	}

	for _, sub := range subList.Subscription {
		streamMode, err := mapStreamModeToAristaFork(sub.GetStreamMode())
		if err != nil {
			return err
		}

		opts := &gnmi.SubscribeOptions{
			Mode:           mode,
			StreamMode:     streamMode,
			Paths:          [][]string{splitStringPath(sub.GetPath())},
			SampleInterval: sub.SampleInterval,
		}

		ctx := context.Background()
		ctx = context.WithValue(ctx, types.CtxKeyOpts, opts)

		if err = mne.Transport().Subscribe(ctx); err != nil {
			return err
		}
	}

	return nil
}

func splitStringPath(s string) []string {
	return strings.Split(s, "/")
}

func mapStreamModeToAristaFork(mode ppb.StreamMode) (string, error) {
	switch mode {
	case ppb.StreamMode_STREAM_MODE_TARGET_DEFINED:
		return "target_defined", nil
	case ppb.StreamMode_STREAM_MODE_ON_CHANGE:
		return "on_change", nil
	case ppb.StreamMode_STREAM_MODE_SAMPLE:
		return "sample", nil
	default:
		return "", fmt.Errorf("StreamMode of type: %T is not supported", mode)
	}
}

func mapModeToAristaFork(mode ppb.SubscriptionMode) (string, error) {
	switch mode {
	case ppb.SubscriptionMode_SUBSCRIPTION_MODE_STREAM:
		return "stream", nil
	case ppb.SubscriptionMode_SUBSCRIPTION_MODE_ONCE:
		return "once", nil
	case ppb.SubscriptionMode_SUBSCRIPTION_MODE_POLL:
		return "poll", nil
	default:
		return "", fmt.Errorf("SubscriptionMode of type: %T is not supported", mode)
	}
}

// nolint
// handleRollbackError will be implemented in the near future
func handleRollbackError(id uuid.UUID, err error) {
	log.Error(err)
	// TODO: Notion of invalid state needed.
}

func (pnd *pndImplementation) handleCsbiDeletion(mne networkelement.NetworkElement) error {
	log.Infof("csbi deletion triggered for %v", mne.ID().String())
	ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
	defer cancel()
	req := &cpb.DeleteRequest{
		Timestamp: time.Now().UnixNano(),
		Did:       []string{mne.ID().String()},
	}
	resp, err := pnd.csbiClient.Delete(ctx, req)
	if err != nil {
		return err
	}
	err = pnd.southboundService.Delete(mne.SBI())
	if err != nil {
		return err
	}
	log.WithFields(log.Fields{
		"uuid":   mne.ID().String(),
		"status": resp.Status,
	}).Info("csbi deleted")
	return nil
}

func (pnd *pndImplementation) handleCsbiEnrolment(name string, opt *tpb.TransportOption) (uuid.UUID, error) {
	g := new(errgroup.Group)
	ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5)
	defer cancel()
	req := &cpb.CreateRequest{
		Timestamp:       time.Now().UnixNano(),
		TransportOption: []*tpb.TransportOption{opt},
	}
	resp, err := pnd.csbiClient.Create(ctx, req)
	if err != nil {
		return uuid.Nil, err
	}
	// the slice only contains one deployment
	var mneID uuid.UUID
	for _, deployment := range resp.Deployments {
		dCopy := deployment
		g.Go(func() error {
			mneID, err = pnd.createCsbiNetworkElement(ctx, name, dCopy, opt)
			if err != nil {
				return err
			}
			return nil
		})
	}
	err = g.Wait()
	if err != nil {
		return uuid.Nil, err
	}

	return mneID, nil
}

// createCsbiNetworkElement is a helper method for cSBI network element creation. The method
// waits for a SYN (which indicates that the cSBI is running and addressable)
// of the commissioned cSBI and creates the network element within the controller.
func (pnd *pndImplementation) createCsbiNetworkElement(
	ctx context.Context,
	name string,
	deployment *cpb.Deployment,
	opt *tpb.TransportOption,
) (uuid.UUID, error) {
	id, err := uuid.Parse(deployment.Id)
	if err != nil {
		return uuid.Nil, err
	}
	ch := make(chan networkelement.Details, 1)
	pnd.callback(id, ch)
	defer pnd.callback(id, nil)
	defer close(ch)
	tickatus := time.NewTicker(time.Minute * 1)
	defer tickatus.Stop()
	select {
	case <-tickatus.C:
		log.WithFields(log.Fields{
			"id":  deployment.Id,
			"err": ctx.Err(),
		}).Error("csbi handshake timed out")
	case mneDetails := <-ch:
		log.Infof("syn from csbi %v", mneDetails.ID)
		id, err := uuid.Parse(mneDetails.ID)
		if err != nil {
			return uuid.Nil, err
		}
		csbiTransportOptions := &tpb.TransportOption{
			Address:         mneDetails.Address,
			Username:        opt.Username,
			Password:        opt.Password,
			Tls:             opt.Tls,
			Type:            opt.Type,
			TransportOption: opt.TransportOption,
		}
		log.WithField("transport option", csbiTransportOptions).Debug("gosdn gnmi transport options")

		files := []string{util.GoStructName, util.ManifestFileName, util.GoStructAdditionsName}
		csbiID := uuid.New()
		ctx, cancel := context.WithTimeout(context.Background(), time.Minute*10)
		defer cancel()

		g := new(errgroup.Group)
		for _, f := range files {
			req := &cpb.GetPayloadRequest{
				Timestamp: time.Now().UnixNano(),
				Did:       deployment.Id,
				File:      f,
			}
			g.Go(func() error {
				gClient, err := pnd.csbiClient.GetFile(ctx, req)
				if err != nil {
					return err
				}
				err = saveStreamToFile(gClient, req.GetFile(), csbiID)
				if err != nil {
					return err
				}
				return nil
			})
		}

		err = g.Wait()
		if err != nil {
			return uuid.Nil, err
		}

		csbi, err := NewSBI(spb.Type_TYPE_CONTAINERISED, csbiID)
		if err != nil {
			return uuid.Nil, err
		}
		err = pnd.southboundService.Add(csbi)
		if err != nil {
			return uuid.Nil, err
		}
		mne, err := NewNetworkElement(name, uuid.Nil, csbiTransportOptions, csbi, conflict.Metadata{ResourceVersion: 0})
		if err != nil {
			return uuid.Nil, err
		}
		mne.(*CsbiNetworkElement).UUID = id
		ch <- networkelement.Details{TransportOption: opt}
		if err := pnd.networkElementService.Add(mne); err != nil {
			return uuid.Nil, err
		}
		return id, nil
	}
	return uuid.Nil, nil
}

// requestPlugin is a feature for cSBIs and sends a plugin request to the cSBI
// orchestrator and processes the received ygot generated go code, builds the
// plugin and integrates the Plugin within the goSDN as SouthboundInterface.
// The generated code is passed into a gostructs.go file, which is the
// foundation for the created plugin.
func (pnd *pndImplementation) requestPlugin(name string, opt *tpb.TransportOption) (southbound.SouthboundInterface, error) {
	ctx, cancel := context.WithTimeout(context.Background(), time.Minute*10)
	defer cancel()
	cReq := &cpb.CreateRequest{
		Timestamp:       time.Now().UnixNano(),
		TransportOption: []*tpb.TransportOption{opt},
	}
	resp, err := pnd.csbiClient.CreateGoStruct(ctx, cReq)
	if err != nil {
		return nil, err
	}

	files := []string{util.GoStructName, util.ManifestFileName, util.GoStructAdditionsName}
	g := new(errgroup.Group)
	// we only request one plugin
	//nolint
	for _, dep := range resp.GetDeployments() {
		id := uuid.New()
		for _, f := range files {
			req := &cpb.GetPayloadRequest{
				Timestamp: time.Now().UnixNano(),
				Did:       dep.GetId(),
				File:      f,
			}

			g.Go(func() error {
				gClient, err := pnd.csbiClient.GetFile(ctx, req)
				if err != nil {
					return err
				}
				err = saveStreamToFile(gClient, req.GetFile(), id)
				if err != nil {
					return err
				}
				return nil
			})
		}

		err = g.Wait()
		if err != nil {
			return nil, err
		}

		sbi, err := NewSBI(spb.Type_TYPE_PLUGIN, id)
		if err != nil {
			return nil, err
		}
		err = pnd.southboundService.Add(sbi)
		if err != nil {
			return nil, err
		}

		return sbi, nil
	}

	return nil, fmt.Errorf("requestPlugin: received deployment slice was empty")
}

// StreamClient allows to distinguish between the different ygot
// generated GoStruct clients, which return a stream of bytes.
type StreamClient interface {
	Recv() (*cpb.Payload, error)
	grpc.ClientStream
}

// saveStreamToFile takes a GenericGoStructClient and processes the included
// gRPC stream. A 'gostructs.go' file is created within the goSDN's
// 'plugin-folder'. Each 'gostructs.go' file is stored in its own folder based
// on a new uuid.UUID.
func saveStreamToFile[T StreamClient](sc T, filename string, id uuid.UUID) (err error) {
	folderName := viper.GetString("plugin-folder")
	path := filepath.Join(folderName, id.String(), filename)

	// clean path to prevent attackers to get access to to directories elsewhere on the system
	path = filepath.Clean(path)
	if !strings.HasPrefix(path, folderName) {
		return &customerrs.InvalidParametersError{
			Func:  saveStreamToFile[T],
			Param: path,
		}
	}

	// create the directory hierarchy based on the path
	if err := os.MkdirAll(filepath.Dir(path), 0770); err != nil {
		return err
	}
	// create the gostructs.go file at path
	f, err := os.Create(path)
	if err != nil {
		return err
	}
	defer func() {
		if ferr := f.Close(); ferr != nil {
			fErrString := ferr.Error()
			err = fmt.Errorf("InternalError=%w error closing file:%+s", err, fErrString)
		}
	}()

	// receive byte stream
	for {
		payload, err := sc.Recv()
		if err != nil {
			if errors.Is(err, io.EOF) {
				break
			}
			closeErr := sc.CloseSend()
			if closeErr != nil {
				return closeErr
			}

			return err
		}
		n, err := f.Write(payload.Chunk)
		if err != nil {
			closeErr := sc.CloseSend()
			if closeErr != nil {
				return closeErr
			}

			return err
		}
		log.WithField("n", n).Trace("wrote bytes")
	}
	if err := f.Sync(); err != nil {
		return err
	}

	return nil
}

// MarshalBSON implements the MarshalBSON interface to store a network element as BSON.
func (pnd *pndImplementation) MarshalBSON() ([]byte, error) {
	return bson.Marshal(&struct {
		ID          string `bson:"_id"`
		Name        string `bson:"name"`
		Description string `bson:"description"`
	}{
		ID:          pnd.Id.String(),
		Name:        pnd.Name,
		Description: pnd.Description,
	})
}