Newer
Older
"fmt"
"path/filepath"
"code.fbi.h-da.de/danet/gosdn/controller/metrics"
"code.fbi.h-da.de/danet/gosdn/controller/nucleus/types"
"go.mongodb.org/mongo-driver/bson"
"golang.org/x/sync/errgroup"
cpb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/csbi"
ppb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/pnd"
spb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/southbound"
tpb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/transport"
"google.golang.org/grpc"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/change"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/device"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/networkdomain"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/southbound"
"code.fbi.h-da.de/danet/gosdn/controller/nucleus/errors"
"code.fbi.h-da.de/danet/gosdn/controller/store"
"github.com/openconfig/ygot/ygot"
"github.com/openconfig/ygot/ytypes"
"github.com/spf13/viper"
// NOTE: Until we've added database support for changes, we will hold
// changeStores in memory for now.
var changeStoreMap = make(map[uuid.UUID]*store.ChangeStore)
// NewPND creates a Principle Network Domain
func NewPND(
name string,
description string,
id uuid.UUID,
c cpb.CsbiServiceClient,
callback func(uuid.UUID, chan device.Details),
) (networkdomain.NetworkDomain, error) {
sbiStore := NewSbiStore(id)
deviceStore := NewDeviceStore(id)
changeStore := store.NewChangeStore()
sbiService := NewSbiService(sbiStore)
deviceService := NewDeviceService(
deviceStore,
sbiService,
)
changeStore, ok := changeStoreMap[id]
if !ok {
changeStore = store.NewChangeStore()
changeStoreMap[id] = changeStore
}
Name: name,
Description: description,
sbic: sbiService,
devices: deviceService,
changes: changeStore,
existingSBIs, err := sbiStore.GetAll()
if err != nil {
if len(existingSBIs) == 0 {
newSBI, _ := NewSBI(spb.Type_TYPE_OPENCONFIG)
err = pnd.sbic.Add(newSBI)
if err != nil {
return nil, err
}
type pndImplementation struct {
Name string `json:"name,omitempty"`
Description string `json:"description,omitempty"`
sbic southbound.Service
devices device.Service
//nolint
Id uuid.UUID `json:"id,omitempty"`
callback func(uuid.UUID, chan device.Details)
func (pnd *pndImplementation) PendingChanges() []uuid.UUID {
func (pnd *pndImplementation) CommittedChanges() []uuid.UUID {
func (pnd *pndImplementation) ConfirmedChanges() []uuid.UUID {
return pnd.changes.Confirmed()
}
func (pnd *pndImplementation) GetChange(cuid uuid.UUID) (change.Change, error) {
return pnd.changes.GetChange(cuid)
func (pnd *pndImplementation) Commit(u uuid.UUID) error {
if err != nil {
return err
}
func (pnd *pndImplementation) Confirm(u uuid.UUID) error {
if err != nil {
return err
}
func (pnd *pndImplementation) ID() uuid.UUID {
return pnd.Id
func (pnd *pndImplementation) Devices() []device.Device {
allDevices, _ := pnd.devices.GetAll()
return allDevices
// GetName returns the name of the PND
return pnd.Name
// GetDescription returns the current description of the PND
func (pnd *pndImplementation) GetDescription() string {
return pnd.Description
// GetSBIs returns the registered SBIs
func (pnd *pndImplementation) GetSBIs() ([]southbound.SouthboundInterface, error) {
sbis, err := pnd.sbic.GetAll()
if err != nil {
return nil, err
}
return sbis, nil
}
// GetSBIs returns the registered SBIs
func (pnd *pndImplementation) GetSBI(sbiUUID uuid.UUID) (southbound.SouthboundInterface, error) {
sbis, err := pnd.sbic.Get(store.Query{ID: sbiUUID})
if err != nil {
return nil, err
}
return sbis, nil
// Destroy destroys the PND
func (pnd *pndImplementation) Destroy() error {
return destroy()
}
// AddSbi adds a SBI to the PND which will be supported
func (pnd *pndImplementation) AddSbi(s southbound.SouthboundInterface) error {
// RemoveSbi removes a SBI from the PND
// devices and remove the devices using this SBI
func (pnd *pndImplementation) RemoveSbi(sid uuid.UUID) error {
var associatedDevices []device.Device
allExistingDevices, err := pnd.devices.GetAll()
if err != nil {
return err
}
// range over all storable items within the device store
for _, device := range allExistingDevices {
// check if the device uses the provided SBI and add it to the devices
// slice.
if device.SBI().ID() == sid {
associatedDevices = append(associatedDevices, device)
}
}
// range over associated devices and remove each one of them
for _, d := range associatedDevices {
if err := pnd.removeDevice(d.ID()); err != nil {
return err
}
}
return pnd.removeSbi(sid)
// AddDevice adds a new device to the PND
func (pnd *pndImplementation) AddDevice(name string, opt *tpb.TransportOption, sid uuid.UUID) (uuid.UUID, error) {
labels := prometheus.Labels{"type": opt.Type.String()}
start := metrics.StartHook(labels, deviceCreationsTotal)
defer metrics.FinishHook(labels, start, deviceCreationDurationSecondsTotal, deviceCreationDurationSeconds)
var sbi southbound.SouthboundInterface
return uuid.Nil, err
sbi, err = pnd.sbic.Get(store.Query{ID: sid})
return uuid.Nil, err
Martin Stiemerling
committed
d, err := NewDevice(name, uuid.Nil, opt, sbi)
return uuid.Nil, err
func (pnd *pndImplementation) GetDevice(identifier string) (device.Device, error) {
id, err := uuid.Parse(identifier)
if err != nil {
id = uuid.Nil
d, err := pnd.devices.Get(store.Query{
ID: id,
Name: identifier,
})
if d == nil {
return nil, fmt.Errorf("no device found")
// TODO: We should investigate why we copy the device here.
copiedDevice := &CommonDevice{
name: d.Name(),
UUID: d.ID(),
Model: d.GetModel(),
sbi: d.SBI(),
}
return copiedDevice, nil
// RemoveDevice removes a device from the PND
func (pnd *pndImplementation) RemoveDevice(uuid uuid.UUID) error {
// Actual implementation, bind to struct if neccessary
func destroy() error {
return nil
}
// addSbi adds a SBI to the PND's SBI store.
func (pnd *pndImplementation) addSbi(sbi southbound.SouthboundInterface) error {
// removeSbi removes an SBI based on the given ID from the PND's SBI store.
func (pnd *pndImplementation) removeSbi(id uuid.UUID) error {
sbi, err := pnd.sbic.Get(store.Query{ID: id})
if sbi == nil {
return fmt.Errorf("no sbi found")
}
if err != nil {
return err
}
return pnd.sbic.Delete(sbi)
// addDevice adds a device to the PND's device store.
func (pnd *pndImplementation) addDevice(device device.Device) (uuid.UUID, error) {
err := pnd.devices.Add(device)
return uuid.Nil, err
return device.ID(), nil
func (pnd *pndImplementation) removeDevice(id uuid.UUID) error {
d, err := pnd.devices.Get(store.Query{
ID: id,
Name: id.String(),
})
if d == nil {
return fmt.Errorf("no device found")
}
labels := prometheus.Labels{"type": d.SBI().Type().String()}
start := metrics.StartHook(labels, deviceDeletionsTotal)
defer metrics.FinishHook(labels, start, deviceDeletionDurationSecondsTotal, deviceDeletionDurationSeconds)
switch d.(type) {
case *CsbiDevice:
return pnd.handleCsbiDeletion(id)
default:
return pnd.devices.Delete(d)
func (pnd *pndImplementation) MarshalDevice(identifier string) (string, error) {
foundDevice, err := pnd.devices.Get(store.Query{
ID: uuid.MustParse(identifier),
Name: identifier,
})
if err != nil {
return "", err
}
jsonTree, err := json.MarshalIndent(foundDevice.GetModel(), "", "\t")
if err != nil {
return "", err
}
"pnd": pnd.Id,
"Identifier": identifier,
"Name": foundDevice.Name,
return string(jsonTree), nil
// Request sends a get request to a specific device
func (pnd *pndImplementation) Request(uuid uuid.UUID, path string) (proto.Message, error) {
d, err := pnd.devices.Get(store.Query{
ID: uuid,
Name: uuid.String(),
})
if d == nil {
return nil, fmt.Errorf("no device found")
}
resp, ok := res.(proto.Message)
if !ok {
return nil, &errors.ErrInvalidTypeAssertion{
Value: res,
Type: (*proto.Message)(nil),
}
}
err = d.ProcessResponse(resp)
err = pnd.devices.Update(d)
if err != nil {
return nil, err
}
// RequestAll sends a request for all registered devices
func (pnd *pndImplementation) RequestAll(path string) error {
allDevices, err := pnd.devices.GetAll()
if err != nil {
return err
}
for _, k := range allDevices {
_, err := pnd.Request(k.ID(), path)
"pnd": pnd.Id,
"path": path,
}).Info("sent request to all devices")
// ChangeOND creates a change from the provided Operation, path and value.
// The Change is Pending and times out after the specified timeout period
func (pnd *pndImplementation) ChangeOND(duid uuid.UUID, operation ppb.ApiOperation, path string, value ...string) (uuid.UUID, error) {
d, err := pnd.devices.Get(store.Query{
ID: duid,
})
if err != nil {
return uuid.Nil, err
}
p, err := ygot.StringToStructuredPath(path)
if err != nil {
if operation != ppb.ApiOperation_API_OPERATION_DELETE && len(value) != 1 {
return uuid.Nil, &errors.ErrInvalidParameters{
case ppb.ApiOperation_API_OPERATION_UPDATE, ppb.ApiOperation_API_OPERATION_REPLACE:
if err := ytypes.SetNode(d.SBI().Schema().RootSchema(), cpy, p, typedValue); err != nil {
if err := ytypes.DeleteNode(d.SBI().Schema().RootSchema(), cpy, p); err != nil {
return uuid.Nil, &errors.ErrOperationNotSupported{Op: operation}
callback := func(original ygot.GoStruct, modified ygot.GoStruct) error {
ctx := context.WithValue(context.Background(), types.CtxKeyOperation, operation) // nolint
payload := change.Payload{Original: original, Modified: modified}
return d.Transport().Set(ctx, payload)
if err := pnd.changes.Add(ch); err != nil {
return uuid.Nil, err
}
return ch.cuid, nil
// nolint will be implemented in the near future
func handleRollbackError(id uuid.UUID, err error) {
log.Error(err)
// TODO: Notion of invalid state needed.
}
func (pnd *pndImplementation) handleCsbiDeletion(id uuid.UUID) error {
log.Infof("csbi deletion triggered for %v", id)
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
req := &cpb.DeleteRequest{
Timestamp: time.Now().UnixNano(),
Did: []string{id.String()},
}
resp, err := pnd.csbiClient.Delete(ctx, req)
if err != nil {
return err
}
log.WithFields(log.Fields{
"uuid": id,
"status": resp.Status,
}).Info("csbi deleted")
return nil
}
func (pnd *pndImplementation) handleCsbiEnrolment(name string, opt *tpb.TransportOption) (uuid.UUID, error) {
g := new(errgroup.Group)
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5)
defer cancel()
req := &cpb.CreateRequest{
Timestamp: time.Now().UnixNano(),
TransportOption: []*tpb.TransportOption{opt},
}
resp, err := pnd.csbiClient.Create(ctx, req)
if err != nil {
return uuid.Nil, err
// the slice only contains one deployment
var devID uuid.UUID
dCopy := d
g.Go(func() error {
devID, err = pnd.createCsbiDevice(ctx, name, dCopy, opt)
if err != nil {
return err
}
return nil
})
}
err = g.Wait()
if err != nil {
return uuid.Nil, err
return devID, nil
// createCsbiDevice is a helper method for cSBI device creation. The method
// waits for a SYN (which indicates that the cSBI is running and addressable)
// of the commissioned cSBI and creates the device within the controller.
func (pnd *pndImplementation) createCsbiDevice(
ctx context.Context,
name string,
d *cpb.Deployment,
opt *tpb.TransportOption,
) (uuid.UUID, error) {
id, err := uuid.Parse(d.Id)
if err != nil {
return uuid.Nil, err
ch := make(chan device.Details, 1)
defer pnd.callback(id, nil)
defer close(ch)
defer tickatus.Stop()
select {
case <-tickatus.C:
log.WithFields(log.Fields{
"id": d.Id,
"err": ctx.Err(),
}).Error("csbi handshake timed out")
case deviceDetails := <-ch:
log.Infof("syn from csbi %v", deviceDetails.ID)
id, err := uuid.Parse(deviceDetails.ID)
if err != nil {
return uuid.Nil, err
csbiTransportOptions := &tpb.TransportOption{
Address: deviceDetails.Address,
Username: opt.Username,
Password: opt.Password,
Tls: opt.Tls,
Type: opt.Type,
TransportOption: opt.TransportOption,
}
log.WithField("transport option", csbiTransportOptions).Debug("gosdn gnmi transport options")
req := &cpb.GetPayloadRequest{
Timestamp: time.Now().UnixNano(),
Did: d.Id,
}
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*10)
defer cancel()
// TODO: this is currently just a workaround needs major adjustments
// here and in csbi.
gClient, err := pnd.csbiClient.GetGoStruct(ctx, req)
if err != nil {
return uuid.Nil, err
}
csbiID, err := saveGenericClientStreamToFile(gClient, "gostructs.go", uuid.New())
if err != nil {
return uuid.Nil, err
}
// TODO: this is currently just a workaround needs major adjustments
// here and in csbi.
mClient, err := pnd.csbiClient.GetManifest(ctx, req)
if err != nil {
return uuid.Nil, err
}
_, err = saveGenericClientStreamToFile(mClient, "plugin.yml", csbiID)
if err != nil {
return uuid.Nil, err
}
csbi, err := NewSBI(spb.Type_TYPE_CONTAINERISED, csbiID)
if err != nil {
return uuid.Nil, err
}
err = pnd.sbic.Add(csbi)
if err != nil {
return uuid.Nil, err
}
d, err := NewDevice(name, uuid.Nil, csbiTransportOptions, csbi)
if err != nil {
return uuid.Nil, err
}
d.(*CsbiDevice).UUID = id
ch <- device.Details{TransportOption: opt}
if err := pnd.devices.Add(d); err != nil {
return uuid.Nil, err
return uuid.Nil, nil
// requestPlugin is a feature for cSBIs and sends a plugin request to the cSBI
// orchestrator and processes the received ygot generated go code, builds the
// plugin and integrates the Plugin within the goSDN as SouthboundInterface.
// The generated code is passed into a gostructs.go file, which is the
// foundation for the created plugin.
func (pnd *pndImplementation) requestPlugin(name string, opt *tpb.TransportOption) (southbound.SouthboundInterface, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*10)
defer cancel()
cReq := &cpb.CreateRequest{
Timestamp: time.Now().UnixNano(),
TransportOption: []*tpb.TransportOption{opt},
}
resp, err := pnd.csbiClient.CreateGoStruct(ctx, cReq)
// we only request one plugin
for _, dep := range resp.GetDeployments() {
gReq := &cpb.GetPayloadRequest{
Timestamp: time.Now().UnixNano(),
Did: dep.GetId(),
}
// TODO: this is currently just a workaround needs major adjustments
// here and in csbi.
gClient, err := pnd.csbiClient.GetGoStruct(ctx, gReq)
if err != nil {
return nil, err
}
id, err := saveGenericClientStreamToFile(gClient, "gostructs.go", uuid.New())
if err != nil {
return nil, err
}
// TODO: this is currently just a workaround needs major adjustments
// here and in csbi.
mClient, err := pnd.csbiClient.GetManifest(ctx, gReq)
if err != nil {
return nil, err
}
_, err = saveGenericClientStreamToFile(mClient, "plugin.yml", id)
if err != nil {
return nil, err
}
sbi, err := NewSBI(spb.Type_TYPE_PLUGIN, id)
if err != nil {
return nil, err
}
err = pnd.sbic.Add(sbi)
if err != nil {
return nil, err
}
return sbi, nil
}
return nil, fmt.Errorf("requestPlugin: received deployment slice was empty")
}
// GenericGrpcClient allows to distinguish between the different ygot
// generated GoStruct clients, which return a stream of bytes.
type GenericGrpcClient interface {
Recv() (*cpb.Payload, error)
grpc.ClientStream
}
// saveGenericClientStreamToFile takes a GenericGoStructClient and processes the included
// gRPC stream. A 'gostructs.go' file is created within the goSDN's
// 'plugin-folder'. Each 'gostructs.go' file is stored in its own folder based
// on a new uuid.UUID.
func saveGenericClientStreamToFile(t GenericGrpcClient, filename string, id uuid.UUID) (uuid.UUID, error) {
folderName := viper.GetString("plugin-folder")
path := filepath.Join(folderName, id.String(), filename)
// clean path to prevent attackers to get access to to directories elsewhere on the system
path = filepath.Clean(path)
if !strings.HasPrefix(path, folderName) {
return uuid.Nil, &errors.ErrInvalidParameters{
Func: saveGenericClientStreamToFile,
Param: path,
}
}
// create the directory hierarchy based on the path
if err := os.MkdirAll(filepath.Dir(path), 0770); err != nil {
return uuid.Nil, err
}
// create the gostructs.go file at path
f, err := os.Create(path)
if err != nil {
return uuid.Nil, err
}
defer func() {
if err := f.Close(); err != nil {
log.Error("error closing file: ", err)
}
}()
// receive byte stream
payload, err := t.Recv()
closeErr := t.CloseSend()
if closeErr != nil {
return uuid.Nil, closeErr
}
return uuid.Nil, err
closeErr := t.CloseSend()
if closeErr != nil {
return uuid.Nil, closeErr
}
return uuid.Nil, err
}
log.WithField("n", n).Trace("wrote bytes")
}
if err := f.Sync(); err != nil {
return uuid.Nil, err
return id, nil
// MarshalBSON implements the MarshalBSON interface to store a device as BSON
func (pnd *pndImplementation) MarshalBSON() ([]byte, error) {
return bson.Marshal(&struct {
ID string `bson:"_id"`
Name string `bson:"name"`
Description string `bson:"description"`
}{
ID: pnd.Id.String(),
Name: pnd.Name,
Description: pnd.Description,
})