Newer
Older
"fmt"
"path/filepath"
"code.fbi.h-da.de/danet/gosdn/controller/metrics"
"code.fbi.h-da.de/danet/gosdn/controller/nucleus/types"
"golang.org/x/sync/errgroup"
cpb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/csbi"
ppb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/pnd"
spb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/southbound"
tpb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/transport"
"google.golang.org/grpc"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/change"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/device"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/networkdomain"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/southbound"
si "code.fbi.h-da.de/danet/gosdn/controller/interfaces/store"
"code.fbi.h-da.de/danet/gosdn/controller/nucleus/errors"
"code.fbi.h-da.de/danet/gosdn/controller/store"
"github.com/openconfig/ygot/ygot"
"github.com/openconfig/ygot/ytypes"
"github.com/spf13/viper"
// NewPND creates a Principle Network Domain
func NewPND(name, description string, id uuid.UUID, sbi southbound.SouthboundInterface, c cpb.CsbiServiceClient, callback func(uuid.UUID, chan store.DeviceDetails)) (networkdomain.NetworkDomain, error) {
Name: name,
Description: description,
sbic: store.NewSbiStore(id),
devices: store.NewDeviceStore(id),
if err := pnd.loadStoredSbis(); err != nil {
// If the SBI is not provided, then do not add a SBI to the store
if sbi != nil {
if !pnd.sbic.Exists(sbi.ID()) {
if err := pnd.sbic.Add(sbi); err != nil {
return nil, err
}
}
}
if err := pnd.loadStoredDevices(); err != nil {
return nil, err
}
type pndImplementation struct {
Name string `json:"name,omitempty"`
Description string `json:"description,omitempty"`
sbic *store.SbiStore
devices *store.DeviceStore
changes *store.ChangeStore
//nolint
Id uuid.UUID `json:"id,omitempty"`
callback func(uuid.UUID, chan store.DeviceDetails)
func (pnd *pndImplementation) PendingChanges() []uuid.UUID {
func (pnd *pndImplementation) CommittedChanges() []uuid.UUID {
func (pnd *pndImplementation) ConfirmedChanges() []uuid.UUID {
return pnd.changes.Confirmed()
}
func (pnd *pndImplementation) GetChange(cuid uuid.UUID) (change.Change, error) {
return pnd.changes.GetChange(cuid)
func (pnd *pndImplementation) Commit(u uuid.UUID) error {
if err != nil {
return err
}
func (pnd *pndImplementation) Confirm(u uuid.UUID) error {
if err != nil {
return err
}
func (pnd *pndImplementation) ID() uuid.UUID {
return pnd.Id
func (pnd *pndImplementation) Devices() []uuid.UUID {
return pnd.devices.UUIDs()
}
// GetName returns the name of the PND
return pnd.Name
// ContainsDevice checks if the given device uuid is registered for this PND
func (pnd *pndImplementation) ContainsDevice(id uuid.UUID) bool {
// GetDescription returns the current description of the PND
func (pnd *pndImplementation) GetDescription() string {
return pnd.Description
// GetSBIs returns the registered SBIs
func (pnd *pndImplementation) GetSBIs() si.Store {
return pnd.sbic
// Destroy destroys the PND
func (pnd *pndImplementation) Destroy() error {
return destroy()
}
// AddSbi adds a SBI to the PND which will be supported
func (pnd *pndImplementation) AddSbi(s southbound.SouthboundInterface) error {
// AddSbiFromStore creates a SBI based on the given ID, type and path provided.
// The type determines if a SouthboundPlugin or a standard OpenConfig SBI is
// created. The SBI is then added to the PND's SBI store.
func (pnd *pndImplementation) AddSbiFromStore(id uuid.UUID, sbiType string, path string) error {
var sbi southbound.SouthboundInterface
var err error
if spb.Type_value[sbiType] != int32(spb.Type_TYPE_OPENCONFIG) {
sbi, err = NewSouthboundPlugin(id, path, false)
if err != nil {
return err
}
} else {
sbi, err = NewSBI(spb.Type_TYPE_OPENCONFIG, id)
if err != nil {
return err
}
}
return pnd.addSbi(sbi)
}
// RemoveSbi removes a SBI from the PND
// TODO: this should to recursively through
// devices and remove the devices using
// this SBI
func (pnd *pndImplementation) RemoveSbi(id uuid.UUID) error {
associatedDevices, err := pnd.devices.GetDevicesAssociatedWithSbi(id)
if err != nil {
return err
}
// range over associated devices and remove each one of them
for _, d := range associatedDevices {
if err := pnd.removeDevice(d.ID()); err != nil {
return err
}
}
//AddDevice adds a new device to the PND
func (pnd *pndImplementation) AddDevice(name string, opt *tpb.TransportOption, sid uuid.UUID) error {
labels := prometheus.Labels{"type": opt.Type.String()}
start := metrics.StartHook(labels, deviceCreationsTotal)
defer metrics.FinishHook(labels, start, deviceCreationDurationSecondsTotal, deviceCreationDurationSeconds)
var sbi southbound.SouthboundInterface
switch t := opt.Type; t {
var err error
sbi, err = pnd.requestPlugin(name, opt)
if err != nil {
return err
}
default:
var err error
sbi, err = pnd.sbic.GetSBI(sid)
if err != nil {
return err
}
Martin Stiemerling
committed
d, err := NewDevice(name, uuid.Nil, opt, sbi)
//AddDeviceFromStore adds a new device to the PND
func (pnd *pndImplementation) AddDeviceFromStore(name string, deviceUUID uuid.UUID, opt *tpb.TransportOption, sid uuid.UUID) error {
return pnd.handleCsbiEnrolment(name, opt)
}
sbi, err := pnd.sbic.GetSBI(sid)
if err != nil {
return err
}
Martin Stiemerling
committed
d, err := NewDevice(name, deviceUUID, opt, sbi)
if err != nil {
return err
}
return pnd.addDevice(d)
}
func (pnd *pndImplementation) GetDevice(identifier string) (device.Device, error) {
d, err := pnd.devices.GetDevice(store.FromString(identifier))
if err != nil {
return nil, err
}
copiedDevice := &CommonDevice{name: d.Name(), UUID: d.ID(), GoStruct: copiedGoStruct, sbi: d.SBI()}
return copiedDevice, nil
// RemoveDevice removes a device from the PND
func (pnd *pndImplementation) RemoveDevice(uuid uuid.UUID) error {
// Actual implementation, bind to struct if neccessary
func destroy() error {
return nil
}
// addSbi adds a SBI to the PND's SBI store.
func (pnd *pndImplementation) addSbi(sbi southbound.SouthboundInterface) error {
// removeSbi removes an SBI based on the given ID from the PND's SBI store.
func (pnd *pndImplementation) removeSbi(id uuid.UUID) error {
// addDevice adds a device to the PND's device store.
func (pnd *pndImplementation) addDevice(device device.Device) error {
if err != nil {
return err
}
func (pnd *pndImplementation) removeDevice(id uuid.UUID) error {
d, err := pnd.devices.GetDevice(id)
if err != nil {
return err
}
labels := prometheus.Labels{"type": d.SBI().Type().String()}
start := metrics.StartHook(labels, deviceDeletionsTotal)
defer metrics.FinishHook(labels, start, deviceDeletionDurationSecondsTotal, deviceDeletionDurationSeconds)
switch d.(type) {
case *CsbiDevice:
return pnd.handleCsbiDeletion(id)
default:
return pnd.devices.Delete(id)
}
func (pnd *pndImplementation) MarshalDevice(identifier string) (string, error) {
foundDevice, err := pnd.devices.GetDevice(store.FromString(identifier))
if err != nil {
return "", err
}
jsonTree, err := json.MarshalIndent(foundDevice.Model(), "", "\t")
if err != nil {
return "", err
}
"pnd": pnd.Id,
"Identifier": identifier,
"Name": foundDevice.Name,
return string(jsonTree), nil
// Request sends a get request to a specific device
func (pnd *pndImplementation) Request(uuid uuid.UUID, path string) (proto.Message, error) {
d, err := pnd.devices.GetDevice(store.FromString(uuid.String()))
resp, ok := res.(proto.Message)
if !ok {
return nil, &errors.ErrInvalidTypeAssertion{
Value: res,
Type: (*proto.Message)(nil),
}
}
err = d.ProcessResponse(resp)
// RequestAll sends a request for all registered devices
func (pnd *pndImplementation) RequestAll(path string) error {
for _, k := range pnd.devices.UUIDs() {
_, err := pnd.Request(k, path)
if err != nil {
"pnd": pnd.Id,
"path": path,
}).Info("sent request to all devices")
// ChangeOND creates a change from the provided Operation, path and value.
// The Change is Pending and times out after the specified timeout period
func (pnd *pndImplementation) ChangeOND(duid uuid.UUID, operation ppb.ApiOperation, path string, value ...string) (uuid.UUID, error) {
d, err := pnd.devices.GetDevice(duid)
p, err := ygot.StringToStructuredPath(path)
if err != nil {
if operation != ppb.ApiOperation_API_OPERATION_DELETE && len(value) != 1 {
return uuid.Nil, &errors.ErrInvalidParameters{
case ppb.ApiOperation_API_OPERATION_UPDATE, ppb.ApiOperation_API_OPERATION_REPLACE:
if err := ytypes.SetNode(d.SBI().Schema().RootSchema(), cpy, p, typedValue); err != nil {
if err := ytypes.DeleteNode(d.SBI().Schema().RootSchema(), cpy, p); err != nil {
return uuid.Nil, &errors.ErrOperationNotSupported{Op: operation}
callback := func(original ygot.GoStruct, modified ygot.GoStruct) error {
ctx := context.WithValue(context.Background(), types.CtxKeyOperation, operation) // nolint
payload := change.Payload{Original: original, Modified: modified}
return d.Transport().Set(ctx, payload)
ch := NewChange(duid, d.Model(), cpy, callback)
if err := pnd.changes.Add(ch); err != nil {
return uuid.Nil, err
}
return ch.cuid, nil
// nolint will be implemented in the near future
func handleRollbackError(id uuid.UUID, err error) {
log.Error(err)
// TODO: Notion of invalid state needed.
}
func (pnd *pndImplementation) handleCsbiDeletion(id uuid.UUID) error {
log.Infof("csbi deletion triggered for %v", id)
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
req := &cpb.DeleteRequest{
Timestamp: time.Now().UnixNano(),
Did: []string{id.String()},
}
resp, err := pnd.csbiClient.Delete(ctx, req)
if err != nil {
return err
}
log.WithFields(log.Fields{
"uuid": id,
"status": resp.Status,
}).Info("csbi deleted")
return nil
}
func (pnd *pndImplementation) handleCsbiEnrolment(name string, opt *tpb.TransportOption) error {
g := new(errgroup.Group)
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5)
defer cancel()
req := &cpb.CreateRequest{
Timestamp: time.Now().UnixNano(),
TransportOption: []*tpb.TransportOption{opt},
}
resp, err := pnd.csbiClient.Create(ctx, req)
if err != nil {
return err
}
for _, d := range resp.Deployments {
dCopy := d
g.Go(func() error {
return pnd.createCsbiDevice(ctx, name, dCopy, opt)
})
}
err = g.Wait()
if err != nil {
return err
// createCsbiDevice is a helper method for cSBI device creation. The method
// waits for a SYN (which indicates that the cSBI is running and addressable)
// of the commissioned cSBI and creates the device within the controller.
func (pnd *pndImplementation) createCsbiDevice(ctx context.Context, name string, d *cpb.Deployment, opt *tpb.TransportOption) error {
id, err := uuid.Parse(d.Id)
if err != nil {
return err
}
defer pnd.callback(id, nil)
defer close(ch)
defer tickatus.Stop()
select {
case <-tickatus.C:
log.WithFields(log.Fields{
"id": d.Id,
"err": ctx.Err(),
}).Error("csbi handshake timed out")
case deviceDetails := <-ch:
log.Infof("syn from csbi %v", deviceDetails.ID)
id, err := uuid.Parse(deviceDetails.ID)
if err != nil {
return err
csbiTransportOptions := &tpb.TransportOption{
Address: deviceDetails.Address,
Username: opt.Username,
Password: opt.Password,
Tls: opt.Tls,
Type: opt.Type,
TransportOption: opt.TransportOption,
}
log.WithField("transport option", csbiTransportOptions).Debug("gosdn gnmi transport options")
req := &cpb.GetPayloadRequest{
Timestamp: time.Now().UnixNano(),
Did: d.Id,
}
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*10)
defer cancel()
// TODO: this is currently just a workaround needs major adjustments
// here and in csbi.
gClient, err := pnd.csbiClient.GetGoStruct(ctx, req)
if err != nil {
return err
}
csbiID, err := saveGenericClientStreamToFile(gClient, "gostructs.go", uuid.New())
if err != nil {
return err
}
// TODO: this is currently just a workaround needs major adjustments
// here and in csbi.
mClient, err := pnd.csbiClient.GetManifest(ctx, req)
if err != nil {
return err
}
_, err = saveGenericClientStreamToFile(mClient, "plugin.yml", csbiID)
if err != nil {
return err
}
csbi, err := NewSBI(spb.Type_TYPE_CONTAINERISED, csbiID)
if err != nil {
return err
}
err = pnd.sbic.Add(csbi)
if err != nil {
return err
}
d, err := NewDevice(name, uuid.Nil, csbiTransportOptions, csbi)
if err != nil {
return err
}
d.(*CsbiDevice).UUID = id
ch <- store.DeviceDetails{TransportOption: opt}
if err := pnd.devices.Add(d, d.Name()); err != nil {
return err
}
}
// requestPlugin is a feature for cSBIs and sends a plugin request to the cSBI
// orchestrator and processes the received ygot generated go code, builds the
// plugin and integrates the Plugin within the goSDN as SouthboundInterface.
// The generated code is passed into a gostructs.go file, which is the
// foundation for the created plugin.
func (pnd *pndImplementation) requestPlugin(name string, opt *tpb.TransportOption) (southbound.SouthboundInterface, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*10)
defer cancel()
cReq := &cpb.CreateRequest{
Timestamp: time.Now().UnixNano(),
TransportOption: []*tpb.TransportOption{opt},
}
resp, err := pnd.csbiClient.CreateGoStruct(ctx, cReq)
// we only request one plugin
for _, dep := range resp.GetDeployments() {
gReq := &cpb.GetPayloadRequest{
Timestamp: time.Now().UnixNano(),
Did: dep.GetId(),
}
// TODO: this is currently just a workaround needs major adjustments
// here and in csbi.
gClient, err := pnd.csbiClient.GetGoStruct(ctx, gReq)
if err != nil {
return nil, err
}
id, err := saveGenericClientStreamToFile(gClient, "gostructs.go", uuid.New())
if err != nil {
return nil, err
}
// TODO: this is currently just a workaround needs major adjustments
// here and in csbi.
mClient, err := pnd.csbiClient.GetManifest(ctx, gReq)
if err != nil {
return nil, err
}
_, err = saveGenericClientStreamToFile(mClient, "plugin.yml", id)
if err != nil {
return nil, err
}
sbi, err := NewSBI(spb.Type_TYPE_PLUGIN, id)
if err != nil {
return nil, err
}
err = pnd.sbic.Add(sbi)
if err != nil {
return nil, err
}
return sbi, nil
}
return nil, fmt.Errorf("requestPlugin: received deployment slice was empty.")
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
}
// GenericGrpcClient allows to distinguish between the different ygot
// generated GoStruct clients, which return a stream of bytes.
type GenericGrpcClient interface {
Recv() (*cpb.Payload, error)
grpc.ClientStream
}
// saveGenericClientStreamToFile takes a GenericGoStructClient and processes the included
// gRPC stream. A 'gostructs.go' file is created within the goSDN's
// 'plugin-folder'. Each 'gostructs.go' file is stored in its own folder based
// on a new uuid.UUID.
func saveGenericClientStreamToFile(t GenericGrpcClient, filename string, id uuid.UUID) (uuid.UUID, error) {
folderName := viper.GetString("plugin-folder")
path := filepath.Join(folderName, id.String(), filename)
// create the directory hierarchy based on the path
if err := os.MkdirAll(filepath.Dir(path), 0770); err != nil {
return uuid.Nil, err
}
// create the gostructs.go file at path
f, err := os.Create(path)
if err != nil {
return uuid.Nil, err
}
// receive byte stream
payload, err := t.Recv()
t.CloseSend()
return uuid.Nil, err
t.CloseSend()
return uuid.Nil, err
}
log.WithField("n", n).Trace("wrote bytes")
}
if err := f.Sync(); err != nil {
return uuid.Nil, err
return id, nil
// loadStoredSbis loads all stored SBIs and add each one of them to the PND's
// SBI store.
func (pnd *pndImplementation) loadStoredSbis() error {
sbis, err := pnd.sbic.Load()
return err
for _, sbi := range sbis {
err := pnd.AddSbiFromStore(sbi.ID, sbi.Type, sbi.Path)
if err != nil {
return err
return nil
// loadStoredDevices loads all stored devices and adds each one of them to the
// PND's device store.
func (pnd *pndImplementation) loadStoredDevices() error {
devices, err := pnd.devices.Load()
if err != nil {
return err
}
for _, device := range devices {
err := pnd.AddDeviceFromStore(
device.Name,
device.DeviceID,
&tpb.TransportOption{
Address: device.TransportAddress,
Username: device.TransportUsername,
Password: device.TransportPassword,
TransportOption: &tpb.TransportOption_GnmiTransportOption{
GnmiTransportOption: &tpb.GnmiTransportOption{},
},
}, device.SBI)
if err != nil {
return err
}
}
return nil
}