Newer
Older
Fabian Seidl
committed
"errors"
"fmt"
"path/filepath"
"code.fbi.h-da.de/danet/gosdn/controller/metrics"
"code.fbi.h-da.de/danet/gosdn/controller/nucleus/types"
"code.fbi.h-da.de/danet/gosdn/controller/nucleus/util"
"go.mongodb.org/mongo-driver/bson"
"golang.org/x/sync/errgroup"
cpb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/csbi"
ppb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/pnd"
spb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/southbound"
tpb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/transport"
Fabian Seidl
committed
eventservice "code.fbi.h-da.de/danet/gosdn/controller/eventService"
"google.golang.org/grpc"
Fabian Seidl
committed
"code.fbi.h-da.de/danet/gosdn/controller/customerrs"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/change"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/device"
Fabian Seidl
committed
eventInterfaces "code.fbi.h-da.de/danet/gosdn/controller/interfaces/event"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/networkdomain"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/southbound"
Malte Bauch
committed
gGnmi "code.fbi.h-da.de/danet/gosdn/controller/nucleus/util/gnmi"
"code.fbi.h-da.de/danet/gosdn/controller/store"
"code.fbi.h-da.de/danet/gosdn/forks/goarista/gnmi"
gpb "github.com/openconfig/gnmi/proto/gnmi"
"github.com/openconfig/ygot/ygot"
"github.com/openconfig/ygot/ytypes"
"github.com/spf13/viper"
// NOTE: Until we've added database support for changes, we will hold
// changeStores in memory for now.
var changeStoreMap = make(map[uuid.UUID]*store.ChangeStore)
// NewPND creates a Principle Network Domain.
func NewPND(
name string,
description string,
id uuid.UUID,
c cpb.CsbiServiceClient,
callback func(uuid.UUID, chan device.Details),
) (networkdomain.NetworkDomain, error) {
Fabian Seidl
committed
eventService, err := eventservice.NewEventService()
if err != nil {
return nil, err
}
sbiStore := NewSbiStore(id)
deviceStore := NewDeviceStore(id)
Fabian Seidl
committed
sbiService := NewSbiService(sbiStore, eventService)
deviceService := NewDeviceService(
deviceStore,
sbiService,
Fabian Seidl
committed
eventService,
changeStore, ok := changeStoreMap[id]
if !ok {
changeStore = store.NewChangeStore()
changeStoreMap[id] = changeStore
}
Name: name,
Description: description,
southboundService: sbiService,
deviceService: deviceService,
changes: changeStore,
Id: id,
Fabian Seidl
committed
csbiClient: c,
callback: callback,
eventService: eventService,
existingSBIs, err := sbiStore.GetAll()
if err != nil {
if len(existingSBIs) == 0 {
newSBI, _ := NewSBI(spb.Type_TYPE_OPENCONFIG)
err = pnd.southboundService.Add(newSBI)
if err != nil {
return nil, err
}
type pndImplementation struct {
Name string `json:"name,omitempty"`
Description string `json:"description,omitempty"`
southboundService southbound.Service
deviceService device.Service
changes *store.ChangeStore
//nolint
Id uuid.UUID `json:"id,omitempty"`
Fabian Seidl
committed
csbiClient cpb.CsbiServiceClient
callback func(uuid.UUID, chan device.Details)
eventService eventInterfaces.Service
func (pnd *pndImplementation) PendingChanges() []uuid.UUID {
func (pnd *pndImplementation) CommittedChanges() []uuid.UUID {
func (pnd *pndImplementation) ConfirmedChanges() []uuid.UUID {
return pnd.changes.Confirmed()
}
func (pnd *pndImplementation) GetChange(cuid uuid.UUID) (change.Change, error) {
return pnd.changes.GetChange(cuid)
func (pnd *pndImplementation) Commit(u uuid.UUID) error {
if err != nil {
return err
}
func (pnd *pndImplementation) Confirm(u uuid.UUID) error {
if err != nil {
return err
}
func (pnd *pndImplementation) ID() uuid.UUID {
return pnd.Id
Malte Bauch
committed
func (pnd *pndImplementation) Devices() []device.Device {
allDevices, _ := pnd.deviceService.GetAll()
return allDevices
Malte Bauch
committed
func (pnd *pndImplementation) FlattenedDevices() []device.LoadedDevice {
allDevices, _ := pnd.deviceService.GetAllAsLoaded()
return allDevices
}
// GetName returns the name of the PND.
return pnd.Name
// GetDescription returns the current description of the PND.
func (pnd *pndImplementation) GetDescription() string {
return pnd.Description
// GetSBIs returns the registered SBIs.
func (pnd *pndImplementation) GetSBIs() ([]southbound.SouthboundInterface, error) {
sbis, err := pnd.southboundService.GetAll()
if err != nil {
return nil, err
}
return sbis, nil
}
// GetSBIs returns the registered SBIs.
func (pnd *pndImplementation) GetSBI(sbiUUID uuid.UUID) (southbound.SouthboundInterface, error) {
sbis, err := pnd.southboundService.Get(store.Query{ID: sbiUUID})
if err != nil {
return nil, err
}
return sbis, nil
// Destroy destroys the PND.
func (pnd *pndImplementation) Destroy() error {
return destroy()
}
// AddSbi adds a SBI to the PND which will be supported.
func (pnd *pndImplementation) AddSbi(s southbound.SouthboundInterface) error {
// RemoveSbi removes a SBI from the PND
// devices and remove the devices using this SBI.
func (pnd *pndImplementation) RemoveSbi(sid uuid.UUID) error {
var associatedDevices []device.LoadedDevice
Malte Bauch
committed
allExistingDevices, err := pnd.deviceService.GetAllAsLoaded()
if err != nil {
return err
}
// range over all storable items within the device store
for _, device := range allExistingDevices {
// check if the device uses the provided SBI and add it to the devices
// slice.
loadedSbiUUID, err := uuid.Parse(device.SBI)
if err != nil {
return err
}
if loadedSbiUUID == sid {
associatedDevices = append(associatedDevices, device)
}
}
// range over associated devices and remove each one of them
for _, d := range associatedDevices {
loadedDeviceUUID, err := uuid.Parse(d.ID)
if err != nil {
return err
}
if err := pnd.removeDevice(loadedDeviceUUID); err != nil {
return err
}
}
return pnd.removeSbi(sid)
// AddDevice adds a new device to the PND.
func (pnd *pndImplementation) AddDevice(name string, opt *tpb.TransportOption, sid uuid.UUID) (uuid.UUID, error) {
labels := prometheus.Labels{"type": opt.Type.String()}
start := metrics.StartHook(labels, deviceCreationsTotal)
defer metrics.FinishHook(labels, start, deviceCreationDurationSecondsTotal, deviceCreationDurationSeconds)
var sbi southbound.SouthboundInterface
return uuid.Nil, err
var err error
sbi, err = pnd.southboundService.Get(store.Query{ID: sid})
return uuid.Nil, err
Martin Stiemerling
committed
d, err := NewDevice(name, uuid.Nil, opt, sbi)
return uuid.Nil, err
// TODO: (maba): This should be changed to UUID.
func (pnd *pndImplementation) GetDevice(identifier string) (device.Device, error) {
id, err := uuid.Parse(identifier)
if err != nil {
id = uuid.Nil
d, err := pnd.deviceService.Get(store.Query{
ID: id,
Name: identifier,
})
if d == nil {
return nil, fmt.Errorf("no device found")
return d, nil
// RemoveDevice removes a device from the PND.
func (pnd *pndImplementation) RemoveDevice(uuid uuid.UUID) error {
// UpdateDeviceModel updates a device from the PND.
func (pnd *pndImplementation) UpdateDevice(device device.Device, modelAsString string) error {
err := pnd.deviceService.UpdateModel(device, modelAsString)
if err != nil {
return err
}
err = pnd.ensureIntendedConfigurationIsAppliedOnDevice(device.ID())
if err != nil {
return err
}
return err
}
// Actual implementation, bind to struct if necessary.
func destroy() error {
return nil
}
// addSbi adds a SBI to the PND's SBI store.
func (pnd *pndImplementation) addSbi(sbi southbound.SouthboundInterface) error {
return pnd.southboundService.Add(sbi)
// removeSbi removes an SBI based on the given ID from the PND's SBI store.
func (pnd *pndImplementation) removeSbi(id uuid.UUID) error {
sbi, err := pnd.southboundService.Get(store.Query{ID: id})
if sbi == nil {
return fmt.Errorf("no sbi found")
}
if err != nil {
return err
}
return pnd.southboundService.Delete(sbi)
// addDevice adds a device to the PND's device store.
func (pnd *pndImplementation) addDevice(device device.Device) (uuid.UUID, error) {
err := pnd.deviceService.Add(device)
return uuid.Nil, err
if device.IsTransportValid() {
_, err = pnd.Request(device.ID(), "/interfaces")
if err != nil {
return uuid.Nil, err
}
}
return device.ID(), nil
func (pnd *pndImplementation) removeDevice(id uuid.UUID) error {
d, err := pnd.deviceService.Get(store.Query{
ID: id,
Name: id.String(),
})
if d == nil {
return fmt.Errorf("no device found")
}
labels := prometheus.Labels{"type": d.SBI().Type().String()}
start := metrics.StartHook(labels, deviceDeletionsTotal)
defer metrics.FinishHook(labels, start, deviceDeletionDurationSecondsTotal, deviceDeletionDurationSeconds)
Shrey Garg
committed
return pnd.handleCsbiDeletion(d)
func (pnd *pndImplementation) MarshalDevice(identifier string) (string, error) {
foundDevice, err := pnd.deviceService.Get(store.Query{
ID: uuid.MustParse(identifier),
Name: identifier,
})
if err != nil {
return "", err
}
jsonTree, err := json.MarshalIndent(foundDevice.GetModel(), "", "\t")
if err != nil {
return "", err
}
"pnd": pnd.Id,
"Identifier": identifier,
"Name": foundDevice.Name,
return string(jsonTree), nil
// Request sends a get request to a specific device.
func (pnd *pndImplementation) Request(uuid uuid.UUID, path string) (proto.Message, error) {
d, err := pnd.deviceService.Get(store.Query{
ID: uuid,
Name: uuid.String(),
})
if d == nil {
return nil, fmt.Errorf("no device found")
}
resp, ok := res.(proto.Message)
if !ok {
Fabian Seidl
committed
return nil, &customerrs.InvalidTypeAssertionError{
Value: res,
Type: (*proto.Message)(nil),
}
}
err = d.ProcessResponse(resp)
// RequestAll sends a request for all registered devices.
func (pnd *pndImplementation) RequestAll(path string) error {
Malte Bauch
committed
allDevices, err := pnd.deviceService.GetAllAsLoaded()
if err != nil {
return err
}
for _, d := range allDevices {
deviceUUID, err := uuid.Parse(d.ID)
if err != nil {
return err
}
_, err = pnd.Request(deviceUUID, path)
// TODO: (maba): this is not returning any useful information; this should
// return some feedback if the requests were successful
"pnd": pnd.Id,
"path": path,
}).Info("sent request to all devices")
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
func (pnd *pndImplementation) ensureIntendedConfigurationIsAppliedOnDevice(deviceID uuid.UUID) error {
device, err := pnd.deviceService.Get(store.Query{
ID: deviceID,
})
if err != nil {
return err
}
model, err := device.GetModelAsString()
if err != nil {
return err
}
req := &gpb.SetRequest{}
path, err := ygot.StringToStructuredPath("/")
if err != nil {
return err
}
req.Update = []*gpb.Update{{
Path: path,
Val: &gpb.TypedValue{
Value: &gpb.TypedValue_JsonIetfVal{JsonIetfVal: []byte(model)},
},
}}
response, err := device.Transport().CustomSet(context.Background(), req)
if err != nil {
log.Errorf("Failed to apply model of device err=%+v, response=%+v", err, response)
return err
}
return nil
}
// ChangeOND creates a change from the provided Operation, path and value.
// The Change is Pending and times out after the specified timeout period.
func (pnd *pndImplementation) ChangeOND(duid uuid.UUID, operation ppb.ApiOperation, path string, value ...string) (uuid.UUID, error) {
Malte Bauch
committed
//TODO: check if we can get cyclomatic complexity from 16 to at least 15
d, err := pnd.deviceService.Get(store.Query{
ID: duid,
})
if err != nil {
return uuid.Nil, err
}
Malte Bauch
committed
validatedCpy, err := d.CreateModelCopy()
p, err := ygot.StringToStructuredPath(path)
if err != nil {
if operation != ppb.ApiOperation_API_OPERATION_DELETE && len(value) != 1 {
Fabian Seidl
committed
return uuid.Nil, &customerrs.InvalidParametersError{
case ppb.ApiOperation_API_OPERATION_UPDATE, ppb.ApiOperation_API_OPERATION_REPLACE:
Malte Bauch
committed
_, entry, err := ytypes.GetOrCreateNode(d.SBI().Schema().RootSchema(), validatedCpy, p)
if err != nil {
Malte Bauch
committed
if entry.IsDir() {
opts := []ytypes.UnmarshalOpt{&ytypes.IgnoreExtraFields{}}
if err := d.SBI().Unmarshal([]byte(value[0]), p, validatedCpy, opts...); err != nil {
return uuid.Nil, err
}
} else if entry.IsLeaf() {
typedValue, err := gGnmi.ConvertStringToGnmiTypedValue(value[0], entry.Type)
if err != nil {
return uuid.Nil, err
}
opts := []ytypes.SetNodeOpt{&ytypes.InitMissingElements{}, &ytypes.TolerateJSONInconsistencies{}}
if err := ytypes.SetNode(d.SBI().Schema().RootSchema(), validatedCpy, p, typedValue, opts...); err != nil {
Malte Bauch
committed
return uuid.Nil, err
}
}
Malte Bauch
committed
if err := ytypes.DeleteNode(d.SBI().Schema().RootSchema(), validatedCpy, p); err != nil {
Fabian Seidl
committed
return uuid.Nil, &customerrs.OperationNotSupportedError{Op: operation}
Malte Bauch
committed
ygot.PruneEmptyBranches(validatedCpy)
callback := func(original ygot.GoStruct, modified ygot.GoStruct) error {
ctx := context.WithValue(context.Background(), types.CtxKeyOperation, operation) // nolint
payload := change.Payload{Original: original, Modified: modified}
pathToSet := path
schema := d.SBI().Schema()
return d.Transport().Set(ctx, payload, pathToSet, schema)
Malte Bauch
committed
ch := NewChange(duid, d.GetModel(), validatedCpy, callback)
if err := pnd.changes.Add(ch); err != nil {
return uuid.Nil, err
}
Malte Bauch
committed
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
func (pnd *pndImplementation) SubscribePath(uuid uuid.UUID, subList *ppb.SubscriptionList) error {
d, err := pnd.deviceService.Get(store.Query{
ID: uuid,
})
if err != nil {
return err
}
mode, err := mapModeToAristaFork(subList.GetMode())
if err != nil {
return err
}
for _, sub := range subList.Subscription {
streamMode, err := mapStreamModeToAristaFork(sub.GetStreamMode())
if err != nil {
return err
}
opts := &gnmi.SubscribeOptions{
Mode: mode,
StreamMode: streamMode,
Paths: [][]string{splitStringPath(sub.GetPath())},
SampleInterval: sub.SampleInterval,
}
ctx := context.Background()
ctx = context.WithValue(ctx, types.CtxKeyOpts, opts)
if err = d.Transport().Subscribe(ctx); err != nil {
return err
}
}
return nil
}
func splitStringPath(s string) []string {
return strings.Split(s, "/")
}
func mapStreamModeToAristaFork(mode ppb.StreamMode) (string, error) {
switch mode {
case ppb.StreamMode_STREAM_MODE_TARGET_DEFINED:
return "target_defined", nil
case ppb.StreamMode_STREAM_MODE_ON_CHANGE:
return "on_change", nil
case ppb.StreamMode_STREAM_MODE_SAMPLE:
return "sample", nil
default:
return "", fmt.Errorf("StreamMode of type: %T is not supported", mode)
}
}
func mapModeToAristaFork(mode ppb.SubscriptionMode) (string, error) {
switch mode {
case ppb.SubscriptionMode_SUBSCRIPTION_MODE_STREAM:
return "stream", nil
case ppb.SubscriptionMode_SUBSCRIPTION_MODE_ONCE:
return "once", nil
case ppb.SubscriptionMode_SUBSCRIPTION_MODE_POLL:
return "poll", nil
default:
return "", fmt.Errorf("SubscriptionMode of type: %T is not supported", mode)
}
}
Malte Bauch
committed
// handleRollbackError will be implemented in the near future
func handleRollbackError(id uuid.UUID, err error) {
log.Error(err)
// TODO: Notion of invalid state needed.
}
Shrey Garg
committed
func (pnd *pndImplementation) handleCsbiDeletion(d device.Device) error {
log.Infof("csbi deletion triggered for %v", d.ID().String())
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
req := &cpb.DeleteRequest{
Timestamp: time.Now().UnixNano(),
Shrey Garg
committed
Did: []string{d.ID().String()},
}
resp, err := pnd.csbiClient.Delete(ctx, req)
if err != nil {
return err
}
Shrey Garg
committed
err = pnd.southboundService.Delete(d.SBI())
if err != nil {
return err
}
Shrey Garg
committed
"uuid": d.ID().String(),
"status": resp.Status,
}).Info("csbi deleted")
return nil
}
func (pnd *pndImplementation) handleCsbiEnrolment(name string, opt *tpb.TransportOption) (uuid.UUID, error) {
g := new(errgroup.Group)
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5)
defer cancel()
req := &cpb.CreateRequest{
Timestamp: time.Now().UnixNano(),
TransportOption: []*tpb.TransportOption{opt},
}
resp, err := pnd.csbiClient.Create(ctx, req)
if err != nil {
return uuid.Nil, err
// the slice only contains one deployment
var devID uuid.UUID
dCopy := d
g.Go(func() error {
devID, err = pnd.createCsbiDevice(ctx, name, dCopy, opt)
if err != nil {
return err
}
return nil
})
}
err = g.Wait()
if err != nil {
return uuid.Nil, err
return devID, nil
// createCsbiDevice is a helper method for cSBI device creation. The method
// waits for a SYN (which indicates that the cSBI is running and addressable)
// of the commissioned cSBI and creates the device within the controller.
func (pnd *pndImplementation) createCsbiDevice(
ctx context.Context,
name string,
d *cpb.Deployment,
opt *tpb.TransportOption,
) (uuid.UUID, error) {
id, err := uuid.Parse(d.Id)
if err != nil {
return uuid.Nil, err
ch := make(chan device.Details, 1)
defer pnd.callback(id, nil)
defer close(ch)
defer tickatus.Stop()
select {
case <-tickatus.C:
log.WithFields(log.Fields{
"id": d.Id,
"err": ctx.Err(),
}).Error("csbi handshake timed out")
case deviceDetails := <-ch:
log.Infof("syn from csbi %v", deviceDetails.ID)
id, err := uuid.Parse(deviceDetails.ID)
if err != nil {
return uuid.Nil, err
csbiTransportOptions := &tpb.TransportOption{
Address: deviceDetails.Address,
Username: opt.Username,
Password: opt.Password,
Tls: opt.Tls,
Type: opt.Type,
TransportOption: opt.TransportOption,
}
log.WithField("transport option", csbiTransportOptions).Debug("gosdn gnmi transport options")
files := []string{util.GoStructName, util.ManifestFileName, util.GoStructAdditionsName}
csbiID := uuid.New()
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*10)
defer cancel()
g := new(errgroup.Group)
for _, f := range files {
req := &cpb.GetPayloadRequest{
Timestamp: time.Now().UnixNano(),
Did: d.Id,
File: f,
}
g.Go(func() error {
gClient, err := pnd.csbiClient.GetFile(ctx, req)
if err != nil {
return err
}
err = saveStreamToFile(gClient, req.GetFile(), csbiID)
if err != nil {
return err
}
return nil
})
}
err = g.Wait()
if err != nil {
return uuid.Nil, err
}
csbi, err := NewSBI(spb.Type_TYPE_CONTAINERISED, csbiID)
if err != nil {
return uuid.Nil, err
}
err = pnd.southboundService.Add(csbi)
if err != nil {
return uuid.Nil, err
}
d, err := NewDevice(name, uuid.Nil, csbiTransportOptions, csbi)
if err != nil {
return uuid.Nil, err
}
d.(*CsbiDevice).UUID = id
ch <- device.Details{TransportOption: opt}
if err := pnd.deviceService.Add(d); err != nil {
return uuid.Nil, err
return uuid.Nil, nil
// requestPlugin is a feature for cSBIs and sends a plugin request to the cSBI
// orchestrator and processes the received ygot generated go code, builds the
// plugin and integrates the Plugin within the goSDN as SouthboundInterface.
// The generated code is passed into a gostructs.go file, which is the
// foundation for the created plugin.
func (pnd *pndImplementation) requestPlugin(name string, opt *tpb.TransportOption) (southbound.SouthboundInterface, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*10)
defer cancel()
cReq := &cpb.CreateRequest{
Timestamp: time.Now().UnixNano(),
TransportOption: []*tpb.TransportOption{opt},
}
resp, err := pnd.csbiClient.CreateGoStruct(ctx, cReq)
files := []string{util.GoStructName, util.ManifestFileName, util.GoStructAdditionsName}
g := new(errgroup.Group)
// we only request one plugin
for _, dep := range resp.GetDeployments() {
id := uuid.New()
for _, f := range files {
req := &cpb.GetPayloadRequest{
Timestamp: time.Now().UnixNano(),
Did: dep.GetId(),
File: f,
}
g.Go(func() error {
gClient, err := pnd.csbiClient.GetFile(ctx, req)
if err != nil {
return err
}
err = saveStreamToFile(gClient, req.GetFile(), id)
if err != nil {
return err
}
return nil
})
}
if err != nil {
return nil, err
}
sbi, err := NewSBI(spb.Type_TYPE_PLUGIN, id)
if err != nil {
return nil, err
}
err = pnd.southboundService.Add(sbi)
if err != nil {
return nil, err
}
return sbi, nil
}
return nil, fmt.Errorf("requestPlugin: received deployment slice was empty")
}
// StreamClient allows to distinguish between the different ygot
// generated GoStruct clients, which return a stream of bytes.
type StreamClient interface {
Recv() (*cpb.Payload, error)
grpc.ClientStream
}
// saveStreamToFile takes a GenericGoStructClient and processes the included
// gRPC stream. A 'gostructs.go' file is created within the goSDN's
// 'plugin-folder'. Each 'gostructs.go' file is stored in its own folder based
// on a new uuid.UUID.
func saveStreamToFile[T StreamClient](sc T, filename string, id uuid.UUID) (err error) {
folderName := viper.GetString("plugin-folder")
path := filepath.Join(folderName, id.String(), filename)
// clean path to prevent attackers to get access to to directories elsewhere on the system
path = filepath.Clean(path)
if !strings.HasPrefix(path, folderName) {
Fabian Seidl
committed
return &customerrs.InvalidParametersError{
Func: saveStreamToFile[T],
Param: path,
}
}
// create the directory hierarchy based on the path
if err := os.MkdirAll(filepath.Dir(path), 0770); err != nil {
}
// create the gostructs.go file at path
f, err := os.Create(path)
if err != nil {
}
if ferr := f.Close(); ferr != nil {
fErrString := ferr.Error()
err = fmt.Errorf("InternalError=%w error closing file:%+s", err, fErrString)
// receive byte stream
payload, err := sc.Recv()
Fabian Seidl
committed
if errors.Is(err, io.EOF) {
closeErr := sc.CloseSend()
return closeErr
closeErr := sc.CloseSend()
return closeErr
}
log.WithField("n", n).Trace("wrote bytes")
}
if err := f.Sync(); err != nil {
// MarshalBSON implements the MarshalBSON interface to store a device as BSON.
func (pnd *pndImplementation) MarshalBSON() ([]byte, error) {
return bson.Marshal(&struct {
ID string `bson:"_id"`
Name string `bson:"name"`
Description string `bson:"description"`
}{
ID: pnd.Id.String(),
Name: pnd.Name,
Description: pnd.Description,
})
// UpdateONDAfterSubscribeResponse takes a device and forwards it to the device service to handle the update.
func (pnd *pndImplementation) UpdateDeviceAfterSubscribeResponse(device device.Device) error {
if err := pnd.deviceService.Update(device); err != nil {
return err
}
return nil
}