Skip to content
Snippets Groups Projects
principalNetworkDomain.go 22.9 KiB
Newer Older
  • Learn to ignore specific revisions
  • package nucleus
    
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    import (
    
    Andre Sterba's avatar
    Andre Sterba committed
    	"context"
    
    Malte Bauch's avatar
    Malte Bauch committed
    	"io"
    	"os"
    
    	"time"
    
    	"code.fbi.h-da.de/danet/gosdn/controller/conflict"
    
    	"code.fbi.h-da.de/danet/gosdn/controller/metrics"
    	"code.fbi.h-da.de/danet/gosdn/controller/nucleus/types"
    
    	"code.fbi.h-da.de/danet/gosdn/controller/nucleus/util"
    
    	"go.mongodb.org/mongo-driver/bson"
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    
    
    	cpb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/csbi"
    
    	rpb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/plugin-registry"
    
    	ppb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/pnd"
    	tpb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/transport"
    
    	eventservice "code.fbi.h-da.de/danet/gosdn/controller/eventService"
    
    	"google.golang.org/protobuf/proto"
    
    	"code.fbi.h-da.de/danet/gosdn/controller/customerrs"
    
    	"code.fbi.h-da.de/danet/gosdn/controller/interfaces/change"
    
    	eventInterfaces "code.fbi.h-da.de/danet/gosdn/controller/interfaces/event"
    
    	"code.fbi.h-da.de/danet/gosdn/controller/interfaces/networkdomain"
    
    	"code.fbi.h-da.de/danet/gosdn/controller/interfaces/networkelement"
    
    	"code.fbi.h-da.de/danet/gosdn/controller/interfaces/plugin"
    
    	"code.fbi.h-da.de/danet/gosdn/controller/store"
    
    	"code.fbi.h-da.de/danet/gosdn/forks/goarista/gnmi"
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    	"github.com/google/uuid"
    
    	gpb "github.com/openconfig/gnmi/proto/gnmi"
    
    	"github.com/openconfig/ygot/ygot"
    
    Malte Bauch's avatar
    Malte Bauch committed
    	"github.com/prometheus/client_golang/prometheus"
    
    	log "github.com/sirupsen/logrus"
    
    // NOTE: Until we've added database support for changes, we will hold
    // changeStores in memory for now.
    var changeStoreMap = make(map[uuid.UUID]*store.ChangeStore)
    
    
    // NewPND creates a Principle Network Domain.
    
    func NewPND(
    	name string,
    	description string,
    	id uuid.UUID,
    	c cpb.CsbiServiceClient,
    
    	p rpb.PluginRegistryServiceClient,
    
    	ps plugin.Service,
    
    	callback func(uuid.UUID, chan networkelement.Details),
    
    ) (networkdomain.NetworkDomain, error) {
    
    	eventService, err := eventservice.NewEventService()
    	if err != nil {
    		return nil, err
    	}
    
    	networkElementStore := NewNetworkElementStore(id)
    	networkElementService := NewNetworkElementService(
    		networkElementStore,
    
    	changeStore, ok := changeStoreMap[id]
    	if !ok {
    		changeStore = store.NewChangeStore()
    		changeStoreMap[id] = changeStore
    	}
    
    
    	pnd := &pndImplementation{
    
    		Name:                  name,
    		Description:           description,
    
    		pluginService:         ps,
    
    		networkElementService: networkElementService,
    		changes:               changeStore,
    		Id:                    id,
    
    		csbiClient:           c,
    		pluginRegistryClient: p,
    		callback:             callback,
    		eventService:         eventService,
    
    type pndImplementation struct {
    
    	Name                  string `json:"name,omitempty"`
    	Description           string `json:"description,omitempty"`
    
    	networkElementService networkelement.Service
    	changes               *store.ChangeStore
    
    	//nolint
    	Id uuid.UUID `json:"id,omitempty"`
    
    	csbiClient           cpb.CsbiServiceClient
    	pluginRegistryClient rpb.PluginRegistryServiceClient
    
    	callback             func(uuid.UUID, chan networkelement.Details)
    
    	eventService         eventInterfaces.Service
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    func (pnd *pndImplementation) PendingChanges() []uuid.UUID {
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    	return pnd.changes.Pending()
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    }
    
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    func (pnd *pndImplementation) CommittedChanges() []uuid.UUID {
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    	return pnd.changes.Committed()
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    }
    
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    func (pnd *pndImplementation) ConfirmedChanges() []uuid.UUID {
    	return pnd.changes.Confirmed()
    }
    
    func (pnd *pndImplementation) GetChange(cuid uuid.UUID) (change.Change, error) {
    	return pnd.changes.GetChange(cuid)
    
    func (pnd *pndImplementation) Commit(u uuid.UUID) error {
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    	ch, err := pnd.changes.GetChange(u)
    
    	if err != nil {
    		return err
    	}
    
    	if err := ch.Commit(); err != nil {
    		return err
    	}
    
    	// Set all the changes within the network elements model
    	networkElement, err := pnd.networkElementService.Get(store.Query{ID: ch.AssociatedDeviceID()})
    	if err != nil {
    		return err
    	}
    
    	diff, err := networkElement.GetPlugin().Diff(ch.PreviousState(), ch.IntendedState())
    
    	if err != nil {
    		return err
    	}
    	for _, update := range diff.GetUpdate() {
    
    		if err := networkElement.GetPlugin().SetNode(update.GetPath(), update.GetVal()); err != nil {
    
    			return err
    		}
    	}
    	for _, deletePath := range diff.GetDelete() {
    
    		if err := networkElement.GetPlugin().DeleteNode(deletePath); err != nil {
    
    			return err
    		}
    	}
    
    	// update the network element
    	return pnd.networkElementService.Update(networkElement)
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    }
    
    
    func (pnd *pndImplementation) Confirm(u uuid.UUID) error {
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    	ch, err := pnd.changes.GetChange(u)
    
    	if err != nil {
    		return err
    	}
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    	return ch.Confirm()
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    }
    
    
    func (pnd *pndImplementation) ID() uuid.UUID {
    
    func (pnd *pndImplementation) NetworkElements() []networkelement.NetworkElement {
    	allNetworkElements, _ := pnd.networkElementService.GetAll()
    
    func (pnd *pndImplementation) FlattenedNetworkElements() []networkelement.LoadedNetworkElement {
    	allNetworkElements, _ := pnd.networkElementService.GetAllAsLoaded()
    
    // GetName returns the name of the PND.
    
    Malte Bauch's avatar
    Malte Bauch committed
    func (pnd *pndImplementation) GetName() string {
    
    // GetDescription returns the current description of the PND.
    
    func (pnd *pndImplementation) GetDescription() string {
    
    // AddNetworkElement adds a new network element to the PND. The UUID for the networkElementID is optional and should normally be empty.
    
    func (pnd *pndImplementation) AddNetworkElement(name string, opt *tpb.TransportOption, requestPluginFunc func(uuid.UUID) (plugin.Plugin, error), pluginID uuid.UUID, optionalNetworkElementID ...uuid.UUID) (uuid.UUID, error) {
    
    	networkElementID := uuid.Nil
    	if len(optionalNetworkElementID) > 0 {
    		networkElementID = optionalNetworkElementID[0]
    	}
    
    
    	if requestPluginFunc == nil {
    		requestPluginFunc = pnd.requestPlugin
    	}
    
    
    Malte Bauch's avatar
    Malte Bauch committed
    	labels := prometheus.Labels{"type": opt.Type.String()}
    
    	start := metrics.StartHook(labels, networkElementCreationsTotal)
    	defer metrics.FinishHook(labels, start, networkElementCreationDurationSecondsTotal, networkElementCreationDurationSeconds)
    
    	// TODO: SouthboundService and SouthboundStorage have to be changed to a
    	// PluginService and PluginStore.
    	// NOTE: currently this is commented out; since for testing purposes a
    	// basic example plugin is returned.
    	//plugin, err := pnd.southboundService.Get(store.Query{ID: pluginID})
    	//if err != nil {
    	//	switch err.(type) {
    	//	case *customerrs.CouldNotFindError:
    	//		plugin, err = pnd.requestPlugin()
    	//		if err != nil {
    	//			return uuid.Nil, err
    	//		}
    	//	default:
    	//		return uuid.Nil, err
    	//	}
    	//}
    
    
    	plugin, err := requestPluginFunc(pluginID)
    
    	if err != nil {
    		return uuid.Nil, err
    	}
    
    	mne, err := NewNetworkElement(name, networkElementID, opt, plugin, conflict.Metadata{ResourceVersion: 0})
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    	if err != nil {
    
    	return pnd.addNetworkElement(mne)
    
    // TODO: (maba): This should be changed to UUID.
    
    func (pnd *pndImplementation) GetNetworkElement(identifier string) (networkelement.NetworkElement, error) {
    
    	mne, err := pnd.networkElementService.Get(store.Query{
    
    	if mne == nil {
    		return nil, fmt.Errorf("no network element found")
    
    	if err != nil {
    		return nil, err
    	}
    
    // RemoveNetworkElement removes a network element from the PND.
    func (pnd *pndImplementation) RemoveNetworkElement(uuid uuid.UUID) error {
    	return pnd.removeNetworkElement(uuid)
    
    Malte Bauch's avatar
    Malte Bauch committed
    // UpdateNetworkElement updates a network element from the PND.
    func (pnd *pndImplementation) UpdateNetworkElement(networkElementID uuid.UUID, modelAsString string) error {
    	err := pnd.networkElementService.UpdateModel(networkElementID, modelAsString)
    
    Malte Bauch's avatar
    Malte Bauch committed
    	//TODO: check if it could be worth to provide the method with a network
    	//element instead of an ID.
    	err = pnd.ensureIntendedConfigurationIsAppliedOnNetworkElement(networkElementID)
    
    // addNetworkElement adds a network element to the PND's network element store.
    func (pnd *pndImplementation) addNetworkElement(mne networkelement.NetworkElement) (uuid.UUID, error) {
    	err := pnd.networkElementService.Add(mne)
    
    		_, err = pnd.Request(mne.ID(), "/")
    
    		if err != nil {
    			return uuid.Nil, err
    		}
    	}
    
    
    func (pnd *pndImplementation) removeNetworkElement(id uuid.UUID) error {
    	mne, err := pnd.networkElementService.Get(store.Query{
    
    	if err != nil {
    		return err
    	}
    
    	if mne == nil {
    		return fmt.Errorf("no network element found")
    
    	labels := prometheus.Labels{"type": mne.Transport().Type()}
    
    	start := metrics.StartHook(labels, networkElementDeletionsTotal)
    	defer metrics.FinishHook(labels, start, networkElementDeletionDurationSecondsTotal, networkElementDeletionDurationSeconds)
    	switch mne.(type) {
    	case *CsbiNetworkElement:
    		return pnd.handleCsbiDeletion(mne)
    
    	default:
    
    		return pnd.networkElementService.Delete(mne)
    
    func (pnd *pndImplementation) MarshalNetworkElement(identifier string) (string, error) {
    	foundNetworkElement, err := pnd.networkElementService.Get(store.Query{
    
    		ID:   uuid.MustParse(identifier),
    		Name: identifier,
    	})
    
    	if err != nil {
    		return "", err
    	}
    
    	model, err := foundNetworkElement.GetModel()
    	if err != nil {
    		return "", err
    	}
    
    
    	log.WithFields(log.Fields{
    
    		"Identifier": identifier,
    
    		"Name":       foundNetworkElement.Name,
    	}).Info("marshalled network element")
    
    	return string(model), nil
    
    // Request sends a get request to a specific network element.
    
    Malte Bauch's avatar
    Malte Bauch committed
    // TODO: this method needs some heavy refactoring, especially in regards to the
    // UpdateModel call.
    
    func (pnd *pndImplementation) Request(uuid uuid.UUID, path string) (proto.Message, error) {
    
    	mne, err := pnd.networkElementService.Get(store.Query{
    
    	if mne == nil {
    		return nil, fmt.Errorf("no network element found")
    
    	ctx := context.Background()
    
    	res, err := mne.Transport().Get(ctx, path)
    
    	if err != nil {
    
    	resp, ok := res.(proto.Message)
    	if !ok {
    
    			Value: res,
    			Type:  (*proto.Message)(nil),
    		}
    
    	err = mne.ProcessResponse(resp)
    
    	if err != nil {
    
    Malte Bauch's avatar
    Malte Bauch committed
    	modelAsString, err := mne.GetModelAsString()
    	if err != nil {
    		return nil, err
    	}
    
    	// TODO(path): We probably have to remove this when we address path request handling.
    	err = pnd.networkElementService.UpdateModel(uuid, modelAsString)
    
    Andre Sterba's avatar
    Andre Sterba committed
    	if err != nil {
    		return nil, err
    	}
    
    
    	return resp, nil
    
    // RequestAll sends a request for all registered network elements.
    
    func (pnd *pndImplementation) RequestAll(path string) error {
    
    	allNetworkElements, err := pnd.networkElementService.GetAllAsLoaded()
    
    	for _, mne := range allNetworkElements {
    		mneUUID, err := uuid.Parse(mne.ID)
    
    		_, err = pnd.Request(mneUUID, path)
    
    	// TODO: (maba): this is not returning any useful information; this should
    
    	// return some feedback if the requests were successful
    
    	log.WithFields(log.Fields{
    
    		"path": path,
    
    	}).Info("sent request to all network elements")
    
    func (pnd *pndImplementation) ensureIntendedConfigurationIsAppliedOnNetworkElement(mneID uuid.UUID) error {
    	mne, err := pnd.networkElementService.Get(store.Query{
    		ID: mneID,
    
    	model, err := mne.GetModelAsFilteredCopy()
    	if err != nil {
    		return err
    	}
    
    	modelAsString, err := ygot.EmitJSON(model, &ygot.EmitJSONConfig{
    		Format:         ygot.RFC7951,
    		Indent:         "",
    		SkipValidation: true,
    		RFC7951Config: &ygot.RFC7951JSONConfig{
    			AppendModuleName: true,
    		},
    	})
    
    	if err != nil {
    		return err
    	}
    
    	req := &gpb.SetRequest{}
    	path, err := ygot.StringToStructuredPath("/")
    	if err != nil {
    		return err
    	}
    
    	req.Update = []*gpb.Update{{
    		Path: path,
    		Val: &gpb.TypedValue{
    
    			Value: &gpb.TypedValue_JsonIetfVal{JsonIetfVal: []byte(modelAsString)},
    
    	response, err := mne.Transport().CustomSet(context.Background(), req)
    
    		log.Errorf("Failed to apply model of network element err=%+v, response=%+v", err, response)
    
    // ChangeMNE creates a change from the provided Operation, path and value.
    
    // The Change is Pending and times out after the specified timeout period.
    
    func (pnd *pndImplementation) ChangeMNE(duid uuid.UUID, operation ppb.ApiOperation, path string, value ...string) (uuid.UUID, error) {
    
    	mne, err := pnd.networkElementService.Get(store.Query{
    		ID: duid,
    	})
    	if err != nil {
    		return uuid.Nil, err
    	}
    
    	p, err := ygot.StringToStructuredPath(path)
    	if err != nil {
    		return uuid.Nil, err
    	}
    
    	plugin := mne.GetPlugin()
    
    	marshaledModel, err := plugin.ValidateChange(operation, p, []byte(value[0]))
    	if err != nil {
    		return uuid.Nil, err
    	}
    
    
    	if operation != ppb.ApiOperation_API_OPERATION_DELETE && len(value) != 1 {
    		return uuid.Nil, &customerrs.InvalidParametersError{
    			Func:  pnd.ChangeMNE,
    			Param: value,
    		}
    	}
    
    	callback := func(original, modified []byte) error {
    		ctx := context.WithValue(context.Background(), types.CtxKeyOperation, operation) // nolint
    		payload := change.Payload{Original: original, Modified: modified}
    		pathToSet := path
    		if err := mne.Transport().Set(ctx, payload, pathToSet, plugin); err != nil {
    			return err
    		}
    		return pnd.networkElementService.Update(mne)
    	}
    
    
    Malte Bauch's avatar
    Malte Bauch committed
    	//TODO: provide a filtered model
    
    	currentModel, err := mne.GetModel()
    	if err != nil {
    		return uuid.Nil, err
    	}
    
    	ch := NewChange(duid, currentModel, marshaledModel, callback)
    
    	if err := pnd.changes.Add(ch); err != nil {
    		return uuid.Nil, err
    	}
    
    	return ch.cuid, nil
    
    func (pnd *pndImplementation) SubscribePath(uuid uuid.UUID, subList *ppb.SubscriptionList) error {
    
    	mne, err := pnd.networkElementService.Get(store.Query{
    
    		ID: uuid,
    	})
    	if err != nil {
    		return err
    	}
    
    	mode, err := mapModeToAristaFork(subList.GetMode())
    	if err != nil {
    		return err
    	}
    
    	for _, sub := range subList.Subscription {
    		streamMode, err := mapStreamModeToAristaFork(sub.GetStreamMode())
    		if err != nil {
    			return err
    		}
    
    		opts := &gnmi.SubscribeOptions{
    			Mode:           mode,
    			StreamMode:     streamMode,
    			Paths:          [][]string{splitStringPath(sub.GetPath())},
    			SampleInterval: sub.SampleInterval,
    		}
    
    		ctx := context.Background()
    		ctx = context.WithValue(ctx, types.CtxKeyOpts, opts)
    
    
    		if err = mne.Transport().Subscribe(ctx); err != nil {
    
    			return err
    		}
    	}
    
    	return nil
    }
    
    func splitStringPath(s string) []string {
    	return strings.Split(s, "/")
    }
    
    func mapStreamModeToAristaFork(mode ppb.StreamMode) (string, error) {
    	switch mode {
    	case ppb.StreamMode_STREAM_MODE_TARGET_DEFINED:
    		return "target_defined", nil
    	case ppb.StreamMode_STREAM_MODE_ON_CHANGE:
    		return "on_change", nil
    	case ppb.StreamMode_STREAM_MODE_SAMPLE:
    		return "sample", nil
    	default:
    		return "", fmt.Errorf("StreamMode of type: %T is not supported", mode)
    	}
    }
    
    func mapModeToAristaFork(mode ppb.SubscriptionMode) (string, error) {
    	switch mode {
    	case ppb.SubscriptionMode_SUBSCRIPTION_MODE_STREAM:
    		return "stream", nil
    	case ppb.SubscriptionMode_SUBSCRIPTION_MODE_ONCE:
    		return "once", nil
    	case ppb.SubscriptionMode_SUBSCRIPTION_MODE_POLL:
    		return "poll", nil
    	default:
    		return "", fmt.Errorf("SubscriptionMode of type: %T is not supported", mode)
    	}
    }
    
    
    Andre Sterba's avatar
    Andre Sterba committed
    // nolint
    
    // handleRollbackError will be implemented in the near future
    
    func handleRollbackError(id uuid.UUID, err error) {
    	log.Error(err)
    	// TODO: Notion of invalid state needed.
    }
    
    func (pnd *pndImplementation) handleCsbiDeletion(mne networkelement.NetworkElement) error {
    	log.Infof("csbi deletion triggered for %v", mne.ID().String())
    
    	ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
    	defer cancel()
    	req := &cpb.DeleteRequest{
    		Timestamp: time.Now().UnixNano(),
    
    		Did:       []string{mne.ID().String()},
    
    	}
    	resp, err := pnd.csbiClient.Delete(ctx, req)
    	if err != nil {
    		return err
    	}
    
    	//err = pnd.southboundService.Delete(mne.SBI())
    
    	//if err != nil {
    	//	return err
    	//}
    
    	log.WithFields(log.Fields{
    
    		"status": resp.Status,
    	}).Info("csbi deleted")
    	return nil
    }
    
    
    Malte Bauch's avatar
    Malte Bauch committed
    //TODO: can probably be removed
    //func (pnd *pndImplementation) handleCsbiEnrolment(name string, opt *tpb.TransportOption) (uuid.UUID, error) {
    //	g := new(errgroup.Group)
    //	ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5)
    //	defer cancel()
    //	req := &cpb.CreateRequest{
    //		Timestamp:       time.Now().UnixNano(),
    //		TransportOption: []*tpb.TransportOption{opt},
    //	}
    //	resp, err := pnd.csbiClient.Create(ctx, req)
    //	if err != nil {
    //		return uuid.Nil, err
    //	}
    //	// the slice only contains one deployment
    //	var mneID uuid.UUID
    //	for _, deployment := range resp.Deployments {
    //		dCopy := deployment
    //		g.Go(func() error {
    //			mneID, err = pnd.createCsbiNetworkElement(ctx, name, dCopy, opt)
    //			if err != nil {
    //				return err
    //			}
    //			return nil
    //		})
    //	}
    //	err = g.Wait()
    //	if err != nil {
    //		return uuid.Nil, err
    //	}
    //
    //	return mneID, nil
    //}
    
    //TODO: can probably be removed
    
    // createCsbiNetworkElement is a helper method for cSBI network element creation. The method
    
    // waits for a SYN (which indicates that the cSBI is running and addressable)
    
    // of the commissioned cSBI and creates the network element within the controller.
    
    Malte Bauch's avatar
    Malte Bauch committed
    //func (pnd *pndImplementation) createCsbiNetworkElement(
    //	ctx context.Context,
    //	name string,
    //	deployment *cpb.Deployment,
    //	opt *tpb.TransportOption,
    //) (uuid.UUID, error) {
    //	//id, err := uuid.Parse(deployment.Id)
    //	//if err != nil {
    //	//	return uuid.Nil, err
    //	//}
    //	//ch := make(chan networkelement.Details, 1)
    //	//pnd.callback(id, ch)
    //	//defer pnd.callback(id, nil)
    //	//defer close(ch)
    //	//tickatus := time.NewTicker(time.Minute * 1)
    //	//defer tickatus.Stop()
    //	//select {
    //	//case <-tickatus.C:
    //	//	log.WithFields(log.Fields{
    //	//		"id":  deployment.Id,
    //	//		"err": ctx.Err(),
    //	//	}).Error("csbi handshake timed out")
    //	//case mneDetails := <-ch:
    //	//	log.Infof("syn from csbi %v", mneDetails.ID)
    //	//	id, err := uuid.Parse(mneDetails.ID)
    //	//	if err != nil {
    //	//		return uuid.Nil, err
    //	//	}
    //	//	csbiTransportOptions := &tpb.TransportOption{
    //	//		Address:         mneDetails.Address,
    //	//		Username:        opt.Username,
    //	//		Password:        opt.Password,
    //	//		Tls:             opt.Tls,
    //	//		Type:            opt.Type,
    //	//		TransportOption: opt.TransportOption,
    //	//	}
    //	//	log.WithField("transport option", csbiTransportOptions).Debug("gosdn gnmi transport options")
    //	//
    //	//	files := []string{util.GoStructName, util.ManifestFileName, util.GoStructAdditionsName}
    //	//	csbiID := uuid.New()
    //	//	ctx, cancel := context.WithTimeout(context.Background(), time.Minute*10)
    //	//	defer cancel()
    //	//
    //	//	g := new(errgroup.Group)
    //	//	for _, f := range files {
    //	//		req := &cpb.GetPayloadRequest{
    //	//			Timestamp: time.Now().UnixNano(),
    //	//			Did:       deployment.Id,
    //	//			File:      f,
    //	//		}
    //	//		g.Go(func() error {
    //	//			gClient, err := pnd.csbiClient.GetFile(ctx, req)
    //	//			if err != nil {
    //	//				return err
    //	//			}
    //	//			err = saveStreamToFile(gClient, req.GetFile(), csbiID)
    //	//			if err != nil {
    //	//				return err
    //	//			}
    //	//			return nil
    //	//		})
    //	//	}
    //	//
    //	//	err = g.Wait()
    //	//	if err != nil {
    //	//		return uuid.Nil, err
    //	//	}
    //	//
    //	//	csbi, err := NewSBI(spb.Type_TYPE_CONTAINERISED, csbiID)
    //	//	if err != nil {
    //	//		return uuid.Nil, err
    //	//	}
    //	//	err = pnd.southboundService.Add(csbi)
    //	//	if err != nil {
    //	//		return uuid.Nil, err
    //	//	}
    //	//	mne, err := NewNetworkElement(name, uuid.Nil, csbiTransportOptions, csbi, conflict.Metadata{ResourceVersion: 0})
    //	//	if err != nil {
    //	//		return uuid.Nil, err
    //	//	}
    //	//	mne.(*CsbiNetworkElement).UUID = id
    //	//	ch <- networkelement.Details{TransportOption: opt}
    //	//	if err := pnd.networkElementService.Add(mne); err != nil {
    //	//		return uuid.Nil, err
    //	//	}
    //	//	return id, nil
    //	//}
    //	return uuid.Nil, nil
    //}
    
    // TODO: update
    // requestPlugin request a plugin from the plugin-registry.
    
    func (pnd *pndImplementation) requestPlugin(requestID uuid.UUID) (plugin.Plugin, error) {
    
    	ctx, cancel := context.WithTimeout(context.Background(), time.Minute*1)
    	defer cancel()
    
    	pluginDownloadRequest := &rpb.GetDownloadRequest{
    		Timestamp: time.Now().UnixNano(),
    		Id:        requestID.String(),
    	}
    
    	dClient, err := pnd.pluginRegistryClient.Download(ctx, pluginDownloadRequest)
    	if err != nil {
    		return nil, err
    	}
    
    	if err := saveStreamToFile(dClient, util.BundledPluginName, requestID); err != nil {
    		return nil, err
    	}
    
    	if err := util.UnzipPlugin(requestID); err != nil {
    		return nil, err
    	}
    
    
    	plugin, err := NewPlugin(requestID)
    	if err != nil {
    		return nil, err
    	}
    
    	if err := pnd.pluginService.Add(plugin); err != nil {
    		return nil, err
    	}
    
    // StreamClient allows to distinguish between the different ygot
    
    // generated GoStruct clients, which return a stream of bytes.
    
    type StreamClient interface {
    
    	Recv() (*rpb.GetDownloadPayload, error)
    
    // saveStreamToFile takes a StreamClient and processes the included gRPC
    // stream. A file with the provided filename is created within the goSDN's
    // 'plugin-folder'. Each file is stored in its own folder based on a new
    // uuid.UUID.
    
    func saveStreamToFile(sc StreamClient, filename string, id uuid.UUID) (err error) {
    
    	folderName := viper.GetString("plugin-folder")
    	path := filepath.Join(folderName, id.String(), filename)
    
    
    	// clean path to prevent attackers to get access to to directories elsewhere on the system
    	path = filepath.Clean(path)
    	if !strings.HasPrefix(path, folderName) {
    
    			Func:  saveStreamToFile,
    
    	// create the directory hierarchy based on the path
    	if err := os.MkdirAll(filepath.Dir(path), 0770); err != nil {
    
    	}
    	// create the gostructs.go file at path
    	f, err := os.Create(path)
    	if err != nil {
    
    		if ferr := f.Close(); ferr != nil {
    			fErrString := ferr.Error()
    			err = fmt.Errorf("InternalError=%w error closing file:%+s", err, fErrString)
    
    Malte Bauch's avatar
    Malte Bauch committed
    	for {
    
    		payload, err := sc.Recv()
    
    Malte Bauch's avatar
    Malte Bauch committed
    		if err != nil {
    
    Malte Bauch's avatar
    Malte Bauch committed
    				break
    			}
    
    			closeErr := sc.CloseSend()
    
    			if closeErr != nil {
    
    Malte Bauch's avatar
    Malte Bauch committed
    		}
    		n, err := f.Write(payload.Chunk)
    		if err != nil {
    
    			closeErr := sc.CloseSend()
    
    			if closeErr != nil {
    
    Malte Bauch's avatar
    Malte Bauch committed
    		}
    		log.WithField("n", n).Trace("wrote bytes")
    	}
    	if err := f.Sync(); err != nil {
    
    Malte Bauch's avatar
    Malte Bauch committed
    	}
    
    
    Malte Bauch's avatar
    Malte Bauch committed
    }
    
    
    // MarshalBSON implements the MarshalBSON interface to store a network element as BSON.
    
    func (pnd *pndImplementation) MarshalBSON() ([]byte, error) {
    	return bson.Marshal(&struct {
    		ID          string `bson:"_id"`
    		Name        string `bson:"name"`
    		Description string `bson:"description"`
    	}{
    		ID:          pnd.Id.String(),
    		Name:        pnd.Name,
    		Description: pnd.Description,
    	})