Newer
Older
Fabian Seidl
committed
"errors"
"fmt"
"path/filepath"
"code.fbi.h-da.de/danet/gosdn/controller/conflict"
"code.fbi.h-da.de/danet/gosdn/controller/metrics"
"code.fbi.h-da.de/danet/gosdn/controller/nucleus/types"
"code.fbi.h-da.de/danet/gosdn/controller/nucleus/util"
"go.mongodb.org/mongo-driver/bson"
"golang.org/x/sync/errgroup"
cpb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/csbi"
ppb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/pnd"
spb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/southbound"
tpb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/transport"
Fabian Seidl
committed
eventservice "code.fbi.h-da.de/danet/gosdn/controller/eventService"
"google.golang.org/grpc"
Fabian Seidl
committed
"code.fbi.h-da.de/danet/gosdn/controller/customerrs"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/change"
Fabian Seidl
committed
eventInterfaces "code.fbi.h-da.de/danet/gosdn/controller/interfaces/event"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/networkdomain"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/networkelement"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/southbound"
Malte Bauch
committed
gGnmi "code.fbi.h-da.de/danet/gosdn/controller/nucleus/util/gnmi"
"code.fbi.h-da.de/danet/gosdn/controller/store"
"code.fbi.h-da.de/danet/gosdn/forks/goarista/gnmi"
gpb "github.com/openconfig/gnmi/proto/gnmi"
"github.com/openconfig/ygot/ygot"
"github.com/openconfig/ygot/ytypes"
"github.com/spf13/viper"
// NOTE: Until we've added database support for changes, we will hold
// changeStores in memory for now.
var changeStoreMap = make(map[uuid.UUID]*store.ChangeStore)
// NewPND creates a Principle Network Domain.
func NewPND(
name string,
description string,
id uuid.UUID,
c cpb.CsbiServiceClient,
callback func(uuid.UUID, chan networkelement.Details),
) (networkdomain.NetworkDomain, error) {
Fabian Seidl
committed
eventService, err := eventservice.NewEventService()
if err != nil {
return nil, err
}
sbiStore := NewSbiStore(id)
networkElementStore := NewNetworkElementStore(id)
Fabian Seidl
committed
sbiService := NewSbiService(sbiStore, eventService)
networkElementService := NewNetworkElementService(
networkElementStore,
Fabian Seidl
committed
eventService,
changeStore, ok := changeStoreMap[id]
if !ok {
changeStore = store.NewChangeStore()
changeStoreMap[id] = changeStore
}
Name: name,
Description: description,
southboundService: sbiService,
networkElementService: networkElementService,
changes: changeStore,
Id: id,
Fabian Seidl
committed
csbiClient: c,
callback: callback,
eventService: eventService,
existingSBIs, err := sbiStore.GetAll()
if err != nil {
if len(existingSBIs) == 0 {
newSBI, _ := NewSBI(spb.Type_TYPE_OPENCONFIG)
err = pnd.southboundService.Add(newSBI)
if err != nil {
return nil, err
}
type pndImplementation struct {
Name string `json:"name,omitempty"`
Description string `json:"description,omitempty"`
southboundService southbound.Service
networkElementService networkelement.Service
changes *store.ChangeStore
//nolint
Id uuid.UUID `json:"id,omitempty"`
Fabian Seidl
committed
csbiClient cpb.CsbiServiceClient
callback func(uuid.UUID, chan networkelement.Details)
Fabian Seidl
committed
eventService eventInterfaces.Service
func (pnd *pndImplementation) PendingChanges() []uuid.UUID {
func (pnd *pndImplementation) CommittedChanges() []uuid.UUID {
func (pnd *pndImplementation) ConfirmedChanges() []uuid.UUID {
return pnd.changes.Confirmed()
}
func (pnd *pndImplementation) GetChange(cuid uuid.UUID) (change.Change, error) {
return pnd.changes.GetChange(cuid)
func (pnd *pndImplementation) Commit(u uuid.UUID) error {
if err != nil {
return err
}
func (pnd *pndImplementation) Confirm(u uuid.UUID) error {
if err != nil {
return err
}
func (pnd *pndImplementation) ID() uuid.UUID {
return pnd.Id
func (pnd *pndImplementation) NetworkElements() []networkelement.NetworkElement {
allNetworkElements, _ := pnd.networkElementService.GetAll()
return allNetworkElements
func (pnd *pndImplementation) FlattenedNetworkElements() []networkelement.LoadedNetworkElement {
allNetworkElements, _ := pnd.networkElementService.GetAllAsLoaded()
Malte Bauch
committed
return allNetworkElements
Malte Bauch
committed
}
// GetName returns the name of the PND.
return pnd.Name
// GetDescription returns the current description of the PND.
func (pnd *pndImplementation) GetDescription() string {
return pnd.Description
// GetSBIs returns the registered SBIs.
func (pnd *pndImplementation) GetSBIs() ([]southbound.SouthboundInterface, error) {
sbis, err := pnd.southboundService.GetAll()
if err != nil {
return nil, err
}
return sbis, nil
}
// GetSBIs returns the registered SBIs.
func (pnd *pndImplementation) GetSBI(sbiUUID uuid.UUID) (southbound.SouthboundInterface, error) {
sbis, err := pnd.southboundService.Get(store.Query{ID: sbiUUID})
if err != nil {
return nil, err
}
return sbis, nil
// Destroy destroys the PND.
func (pnd *pndImplementation) Destroy() error {
return destroy()
}
// AddSbi adds a SBI to the PND which will be supported.
func (pnd *pndImplementation) AddSbi(s southbound.SouthboundInterface) error {
// RemoveSbi removes a SBI from the PND
// network elements and remove the network elements using this SBI.
func (pnd *pndImplementation) RemoveSbi(sid uuid.UUID) error {
var associatedNetworkElements []networkelement.LoadedNetworkElement
allExistingNetworkElements, err := pnd.networkElementService.GetAllAsLoaded()
if err != nil {
return err
}
// range over all storable items within the network element store
for _, mne := range allExistingNetworkElements {
// check if the network element uses the provided SBI and add it to the network element
// slice.
loadedSbiUUID, err := uuid.Parse(mne.SBI)
if err != nil {
return err
}
if loadedSbiUUID == sid {
associatedNetworkElements = append(associatedNetworkElements, mne)
}
}
// range over associated network elements and remove each one of them
for _, aMNE := range associatedNetworkElements {
loadedNetworkElementUUID, err := uuid.Parse(aMNE.ID)
if err != nil {
return err
}
if err := pnd.removeNetworkElement(loadedNetworkElementUUID); err != nil {
return err
}
}
return pnd.removeSbi(sid)
// AddNetworkElement adds a new network element to the PND.
func (pnd *pndImplementation) AddNetworkElement(name string, opt *tpb.TransportOption, sid uuid.UUID) (uuid.UUID, error) {
start := metrics.StartHook(labels, networkElementCreationsTotal)
defer metrics.FinishHook(labels, start, networkElementCreationDurationSecondsTotal, networkElementCreationDurationSeconds)
return uuid.Nil, err
var err error
sbi, err = pnd.southboundService.Get(store.Query{ID: sid})
return uuid.Nil, err
mne, err := NewNetworkElement(name, uuid.Nil, opt, sbi, conflict.Metadata{ResourceVersion: 0})
return uuid.Nil, err
return pnd.addNetworkElement(mne)
// TODO: (maba): This should be changed to UUID.
func (pnd *pndImplementation) GetNetworkElement(identifier string) (networkelement.NetworkElement, error) {
id, err := uuid.Parse(identifier)
if err != nil {
id = uuid.Nil
mne, err := pnd.networkElementService.Get(store.Query{
ID: id,
Name: identifier,
})
if mne == nil {
return nil, fmt.Errorf("no network element found")
return mne, nil
// RemoveNetworkElement removes a network element from the PND.
func (pnd *pndImplementation) RemoveNetworkElement(uuid uuid.UUID) error {
return pnd.removeNetworkElement(uuid)
// UpdateNetworkElementModel updates a network element from the PND.
func (pnd *pndImplementation) UpdateNetworkElement(mne networkelement.NetworkElement, modelAsString string) error {
err := pnd.networkElementService.UpdateModel(mne, modelAsString)
if err != nil {
return err
}
err = pnd.ensureIntendedConfigurationIsAppliedOnNetworkElement(mne.ID())
if err != nil {
return err
}
return err
}
// Actual implementation, bind to struct if necessary.
func destroy() error {
return nil
}
// addSbi adds a SBI to the PND's SBI store.
func (pnd *pndImplementation) addSbi(sbi southbound.SouthboundInterface) error {
return pnd.southboundService.Add(sbi)
// removeSbi removes an SBI based on the given ID from the PND's SBI store.
func (pnd *pndImplementation) removeSbi(id uuid.UUID) error {
sbi, err := pnd.southboundService.Get(store.Query{ID: id})
if sbi == nil {
return fmt.Errorf("no sbi found")
}
if err != nil {
return err
}
return pnd.southboundService.Delete(sbi)
// addNetworkElement adds a network element to the PND's network element store.
func (pnd *pndImplementation) addNetworkElement(mne networkelement.NetworkElement) (uuid.UUID, error) {
err := pnd.networkElementService.Add(mne)
return uuid.Nil, err
if mne.IsTransportValid() {
if err != nil {
return uuid.Nil, err
}
}
return mne.ID(), nil
func (pnd *pndImplementation) removeNetworkElement(id uuid.UUID) error {
mne, err := pnd.networkElementService.Get(store.Query{
ID: id,
Name: id.String(),
})
if mne == nil {
return fmt.Errorf("no network element found")
}
labels := prometheus.Labels{"type": mne.SBI().Type().String()}
start := metrics.StartHook(labels, networkElementDeletionsTotal)
defer metrics.FinishHook(labels, start, networkElementDeletionDurationSecondsTotal, networkElementDeletionDurationSeconds)
switch mne.(type) {
case *CsbiNetworkElement:
return pnd.handleCsbiDeletion(mne)
return pnd.networkElementService.Delete(mne)
func (pnd *pndImplementation) MarshalNetworkElement(identifier string) (string, error) {
foundNetworkElement, err := pnd.networkElementService.Get(store.Query{
ID: uuid.MustParse(identifier),
Name: identifier,
})
if err != nil {
return "", err
}
jsonTree, err := json.MarshalIndent(foundNetworkElement.GetModel(), "", "\t")
if err != nil {
return "", err
}
"pnd": pnd.Id,
"Identifier": identifier,
"Name": foundNetworkElement.Name,
}).Info("marshalled network element")
return string(jsonTree), nil
// Request sends a get request to a specific network element.
func (pnd *pndImplementation) Request(uuid uuid.UUID, path string) (proto.Message, error) {
mne, err := pnd.networkElementService.Get(store.Query{
ID: uuid,
Name: uuid.String(),
})
if mne == nil {
return nil, fmt.Errorf("no network element found")
}
res, err := mne.Transport().Get(ctx, path)
resp, ok := res.(proto.Message)
if !ok {
Fabian Seidl
committed
return nil, &customerrs.InvalidTypeAssertionError{
Value: res,
Type: (*proto.Message)(nil),
}
err = mne.ProcessResponse(resp)
err = pnd.networkElementService.Update(mne)
// RequestAll sends a request for all registered network elements.
func (pnd *pndImplementation) RequestAll(path string) error {
allNetworkElements, err := pnd.networkElementService.GetAllAsLoaded()
if err != nil {
return err
}
for _, mne := range allNetworkElements {
mneUUID, err := uuid.Parse(mne.ID)
if err != nil {
return err
}
_, err = pnd.Request(mneUUID, path)
// TODO: (maba): this is not returning any useful information; this should
// return some feedback if the requests were successful
"pnd": pnd.Id,
}).Info("sent request to all network elements")
func (pnd *pndImplementation) ensureIntendedConfigurationIsAppliedOnNetworkElement(mneID uuid.UUID) error {
mne, err := pnd.networkElementService.Get(store.Query{
ID: mneID,
})
if err != nil {
return err
}
model, err := mne.GetModelAsString()
if err != nil {
return err
}
req := &gpb.SetRequest{}
path, err := ygot.StringToStructuredPath("/")
if err != nil {
return err
}
req.Update = []*gpb.Update{{
Path: path,
Val: &gpb.TypedValue{
Value: &gpb.TypedValue_JsonIetfVal{JsonIetfVal: []byte(model)},
},
}}
response, err := mne.Transport().CustomSet(context.Background(), req)
if err != nil {
log.Errorf("Failed to apply model of network element err=%+v, response=%+v", err, response)
return err
}
return nil
}
// ChangeMNE creates a change from the provided Operation, path and value.
// The Change is Pending and times out after the specified timeout period.
func (pnd *pndImplementation) ChangeMNE(duid uuid.UUID, operation ppb.ApiOperation, path string, value ...string) (uuid.UUID, error) {
Malte Bauch
committed
//TODO: check if we can get cyclomatic complexity from 16 to at least 15
mne, err := pnd.networkElementService.Get(store.Query{
ID: duid,
})
if err != nil {
return uuid.Nil, err
}
Malte Bauch
committed
validatedCpy, err := mne.CreateModelCopy()
p, err := ygot.StringToStructuredPath(path)
if err != nil {
if operation != ppb.ApiOperation_API_OPERATION_DELETE && len(value) != 1 {
Fabian Seidl
committed
return uuid.Nil, &customerrs.InvalidParametersError{
Func: pnd.ChangeMNE,
case ppb.ApiOperation_API_OPERATION_UPDATE, ppb.ApiOperation_API_OPERATION_REPLACE:
_, entry, err := ytypes.GetOrCreateNode(mne.SBI().Schema().RootSchema(), validatedCpy, p)
Malte Bauch
committed
if err != nil {
Malte Bauch
committed
if entry.IsDir() {
opts := []ytypes.UnmarshalOpt{&ytypes.IgnoreExtraFields{}}
if err := mne.SBI().Unmarshal([]byte(value[0]), p, validatedCpy, opts...); err != nil {
Malte Bauch
committed
return uuid.Nil, err
}
} else if entry.IsLeaf() {
typedValue, err := gGnmi.ConvertStringToGnmiTypedValue(value[0], entry.Type)
if err != nil {
return uuid.Nil, err
}
opts := []ytypes.SetNodeOpt{&ytypes.InitMissingElements{}, &ytypes.TolerateJSONInconsistencies{}}
if err := ytypes.SetNode(mne.SBI().Schema().RootSchema(), validatedCpy, p, typedValue, opts...); err != nil {
Malte Bauch
committed
return uuid.Nil, err
}
}
if err := ytypes.DeleteNode(mne.SBI().Schema().RootSchema(), validatedCpy, p); err != nil {
Fabian Seidl
committed
return uuid.Nil, &customerrs.OperationNotSupportedError{Op: operation}
Malte Bauch
committed
ygot.PruneEmptyBranches(validatedCpy)
callback := func(original ygot.GoStruct, modified ygot.GoStruct) error {
ctx := context.WithValue(context.Background(), types.CtxKeyOperation, operation) // nolint
payload := change.Payload{Original: original, Modified: modified}
pathToSet := path
schema := mne.SBI().Schema()
return mne.Transport().Set(ctx, payload, pathToSet, schema)
Malte Bauch
committed
ch := NewChange(duid, mne.GetModel(), validatedCpy, callback)
Malte Bauch
committed
if err := pnd.changes.Add(ch); err != nil {
return uuid.Nil, err
}
Malte Bauch
committed
func (pnd *pndImplementation) SubscribePath(uuid uuid.UUID, subList *ppb.SubscriptionList) error {
mne, err := pnd.networkElementService.Get(store.Query{
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
ID: uuid,
})
if err != nil {
return err
}
mode, err := mapModeToAristaFork(subList.GetMode())
if err != nil {
return err
}
for _, sub := range subList.Subscription {
streamMode, err := mapStreamModeToAristaFork(sub.GetStreamMode())
if err != nil {
return err
}
opts := &gnmi.SubscribeOptions{
Mode: mode,
StreamMode: streamMode,
Paths: [][]string{splitStringPath(sub.GetPath())},
SampleInterval: sub.SampleInterval,
}
ctx := context.Background()
ctx = context.WithValue(ctx, types.CtxKeyOpts, opts)
if err = mne.Transport().Subscribe(ctx); err != nil {
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
return err
}
}
return nil
}
func splitStringPath(s string) []string {
return strings.Split(s, "/")
}
func mapStreamModeToAristaFork(mode ppb.StreamMode) (string, error) {
switch mode {
case ppb.StreamMode_STREAM_MODE_TARGET_DEFINED:
return "target_defined", nil
case ppb.StreamMode_STREAM_MODE_ON_CHANGE:
return "on_change", nil
case ppb.StreamMode_STREAM_MODE_SAMPLE:
return "sample", nil
default:
return "", fmt.Errorf("StreamMode of type: %T is not supported", mode)
}
}
func mapModeToAristaFork(mode ppb.SubscriptionMode) (string, error) {
switch mode {
case ppb.SubscriptionMode_SUBSCRIPTION_MODE_STREAM:
return "stream", nil
case ppb.SubscriptionMode_SUBSCRIPTION_MODE_ONCE:
return "once", nil
case ppb.SubscriptionMode_SUBSCRIPTION_MODE_POLL:
return "poll", nil
default:
return "", fmt.Errorf("SubscriptionMode of type: %T is not supported", mode)
}
}
Malte Bauch
committed
// handleRollbackError will be implemented in the near future
func handleRollbackError(id uuid.UUID, err error) {
log.Error(err)
// TODO: Notion of invalid state needed.
}
func (pnd *pndImplementation) handleCsbiDeletion(mne networkelement.NetworkElement) error {
log.Infof("csbi deletion triggered for %v", mne.ID().String())
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
req := &cpb.DeleteRequest{
Timestamp: time.Now().UnixNano(),
Did: []string{mne.ID().String()},
}
resp, err := pnd.csbiClient.Delete(ctx, req)
if err != nil {
return err
}
err = pnd.southboundService.Delete(mne.SBI())
Shrey Garg
committed
if err != nil {
return err
}
"uuid": mne.ID().String(),
"status": resp.Status,
}).Info("csbi deleted")
return nil
}
func (pnd *pndImplementation) handleCsbiEnrolment(name string, opt *tpb.TransportOption) (uuid.UUID, error) {
g := new(errgroup.Group)
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5)
defer cancel()
req := &cpb.CreateRequest{
Timestamp: time.Now().UnixNano(),
TransportOption: []*tpb.TransportOption{opt},
}
resp, err := pnd.csbiClient.Create(ctx, req)
if err != nil {
return uuid.Nil, err
// the slice only contains one deployment
var mneID uuid.UUID
for _, deployment := range resp.Deployments {
dCopy := deployment
g.Go(func() error {
mneID, err = pnd.createCsbiNetworkElement(ctx, name, dCopy, opt)
if err != nil {
return err
}
return nil
})
}
err = g.Wait()
if err != nil {
return uuid.Nil, err
return mneID, nil
// createCsbiNetworkElement is a helper method for cSBI network element creation. The method
// waits for a SYN (which indicates that the cSBI is running and addressable)
// of the commissioned cSBI and creates the network element within the controller.
func (pnd *pndImplementation) createCsbiNetworkElement(
ctx context.Context,
name string,
deployment *cpb.Deployment,
opt *tpb.TransportOption,
) (uuid.UUID, error) {
id, err := uuid.Parse(deployment.Id)
return uuid.Nil, err
ch := make(chan networkelement.Details, 1)
defer pnd.callback(id, nil)
defer close(ch)
defer tickatus.Stop()
select {
case <-tickatus.C:
log.WithFields(log.Fields{
"id": deployment.Id,
"err": ctx.Err(),
}).Error("csbi handshake timed out")
case mneDetails := <-ch:
log.Infof("syn from csbi %v", mneDetails.ID)
id, err := uuid.Parse(mneDetails.ID)
if err != nil {
return uuid.Nil, err
csbiTransportOptions := &tpb.TransportOption{
Address: mneDetails.Address,
Username: opt.Username,
Password: opt.Password,
Tls: opt.Tls,
Type: opt.Type,
TransportOption: opt.TransportOption,
}
log.WithField("transport option", csbiTransportOptions).Debug("gosdn gnmi transport options")
files := []string{util.GoStructName, util.ManifestFileName, util.GoStructAdditionsName}
csbiID := uuid.New()
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*10)
defer cancel()
g := new(errgroup.Group)
for _, f := range files {
req := &cpb.GetPayloadRequest{
Timestamp: time.Now().UnixNano(),
Did: deployment.Id,
File: f,
}
g.Go(func() error {
gClient, err := pnd.csbiClient.GetFile(ctx, req)
if err != nil {
return err
}
err = saveStreamToFile(gClient, req.GetFile(), csbiID)
if err != nil {
return err
}
return nil
})
}
err = g.Wait()
if err != nil {
return uuid.Nil, err
}
csbi, err := NewSBI(spb.Type_TYPE_CONTAINERISED, csbiID)
if err != nil {
return uuid.Nil, err
}
err = pnd.southboundService.Add(csbi)
if err != nil {
return uuid.Nil, err
}
mne, err := NewNetworkElement(name, uuid.Nil, csbiTransportOptions, csbi, conflict.Metadata{ResourceVersion: 0})
if err != nil {
return uuid.Nil, err
mne.(*CsbiNetworkElement).UUID = id
ch <- networkelement.Details{TransportOption: opt}
if err := pnd.networkElementService.Add(mne); err != nil {
return uuid.Nil, err
return uuid.Nil, nil
// requestPlugin is a feature for cSBIs and sends a plugin request to the cSBI
// orchestrator and processes the received ygot generated go code, builds the
// plugin and integrates the Plugin within the goSDN as SouthboundInterface.
// The generated code is passed into a gostructs.go file, which is the
// foundation for the created plugin.
func (pnd *pndImplementation) requestPlugin(name string, opt *tpb.TransportOption) (southbound.SouthboundInterface, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*10)
defer cancel()
cReq := &cpb.CreateRequest{
Timestamp: time.Now().UnixNano(),
TransportOption: []*tpb.TransportOption{opt},
}
resp, err := pnd.csbiClient.CreateGoStruct(ctx, cReq)
files := []string{util.GoStructName, util.ManifestFileName, util.GoStructAdditionsName}
g := new(errgroup.Group)
// we only request one plugin
for _, dep := range resp.GetDeployments() {
id := uuid.New()
for _, f := range files {
req := &cpb.GetPayloadRequest{
Timestamp: time.Now().UnixNano(),
Did: dep.GetId(),
File: f,
}
g.Go(func() error {
gClient, err := pnd.csbiClient.GetFile(ctx, req)
if err != nil {
return err
}
err = saveStreamToFile(gClient, req.GetFile(), id)
if err != nil {
return err
}
return nil
})
}
if err != nil {
return nil, err
}
sbi, err := NewSBI(spb.Type_TYPE_PLUGIN, id)
if err != nil {
return nil, err
}
err = pnd.southboundService.Add(sbi)
if err != nil {
return nil, err
}
return sbi, nil
}
return nil, fmt.Errorf("requestPlugin: received deployment slice was empty")
}
// StreamClient allows to distinguish between the different ygot
// generated GoStruct clients, which return a stream of bytes.
type StreamClient interface {
Recv() (*cpb.Payload, error)
grpc.ClientStream
}
// saveStreamToFile takes a GenericGoStructClient and processes the included
// gRPC stream. A 'gostructs.go' file is created within the goSDN's
// 'plugin-folder'. Each 'gostructs.go' file is stored in its own folder based
// on a new uuid.UUID.
func saveStreamToFile[T StreamClient](sc T, filename string, id uuid.UUID) (err error) {
folderName := viper.GetString("plugin-folder")
path := filepath.Join(folderName, id.String(), filename)
// clean path to prevent attackers to get access to to directories elsewhere on the system
path = filepath.Clean(path)
if !strings.HasPrefix(path, folderName) {
Fabian Seidl
committed
return &customerrs.InvalidParametersError{
Func: saveStreamToFile[T],
Param: path,
}
}
// create the directory hierarchy based on the path
if err := os.MkdirAll(filepath.Dir(path), 0770); err != nil {
}
// create the gostructs.go file at path
f, err := os.Create(path)
if err != nil {
}
if ferr := f.Close(); ferr != nil {
fErrString := ferr.Error()
err = fmt.Errorf("InternalError=%w error closing file:%+s", err, fErrString)
// receive byte stream
payload, err := sc.Recv()
Fabian Seidl
committed
if errors.Is(err, io.EOF) {
closeErr := sc.CloseSend()
return closeErr
closeErr := sc.CloseSend()
return closeErr
}
log.WithField("n", n).Trace("wrote bytes")
}
if err := f.Sync(); err != nil {
// MarshalBSON implements the MarshalBSON interface to store a network element as BSON.
func (pnd *pndImplementation) MarshalBSON() ([]byte, error) {
return bson.Marshal(&struct {
ID string `bson:"_id"`
Name string `bson:"name"`
Description string `bson:"description"`
}{
ID: pnd.Id.String(),
Name: pnd.Name,
Description: pnd.Description,
})
// UpdateMNEAfterSubscribeResponse takes a network element and forwards it to the network element service to handle the update.
func (pnd *pndImplementation) UpdateNetworkElementAfterSubscribeResponse(mne networkelement.NetworkElement) error {
if err := pnd.networkElementService.Update(mne); err != nil {
return err
}
return nil
}