Newer
Older
cpb "code.fbi.h-da.de/danet/api/go/gosdn/csbi"
ppb "code.fbi.h-da.de/danet/api/go/gosdn/pnd"
tpb "code.fbi.h-da.de/danet/api/go/gosdn/transport"
"code.fbi.h-da.de/danet/gosdn/interfaces/change"
"code.fbi.h-da.de/danet/gosdn/interfaces/device"
"code.fbi.h-da.de/danet/gosdn/interfaces/networkdomain"
"code.fbi.h-da.de/danet/gosdn/interfaces/southbound"
si "code.fbi.h-da.de/danet/gosdn/interfaces/store"
"github.com/openconfig/ygot/ygot"
"github.com/openconfig/ygot/ytypes"
// NewPND creates a Principle Network Domain
func NewPND(name, description string, id uuid.UUID, sbi southbound.SouthboundInterface, c cpb.CsbiClient, callback func(uuid.UUID, chan store.DeviceDetails)) (networkdomain.NetworkDomain, error) {
Name: name,
Description: description,
devices: store.NewDeviceStore(id),
if err := pnd.sbic.Add(sbi); err != nil {
if err := pnd.loadStoredDevices(); err != nil {
return nil, err
}
type pndImplementation struct {
Name string `json:"name,omitempty"`
Description string `json:"description,omitempty"`
sbic *store.SbiStore
devices *store.DeviceStore
changes *store.ChangeStore
//nolint
Id uuid.UUID `json:"id,omitempty"`
callback func(uuid.UUID, chan store.DeviceDetails)
func (pnd *pndImplementation) PendingChanges() []uuid.UUID {
func (pnd *pndImplementation) CommittedChanges() []uuid.UUID {
func (pnd *pndImplementation) ConfirmedChanges() []uuid.UUID {
return pnd.changes.Confirmed()
}
func (pnd *pndImplementation) GetChange(cuid uuid.UUID) (change.Change, error) {
return pnd.changes.GetChange(cuid)
func (pnd *pndImplementation) Commit(u uuid.UUID) error {
if err != nil {
return err
}
func (pnd *pndImplementation) Confirm(u uuid.UUID) error {
if err != nil {
return err
}
func (pnd *pndImplementation) ID() uuid.UUID {
return pnd.Id
func (pnd *pndImplementation) Devices() []uuid.UUID {
return pnd.devices.UUIDs()
}
// GetName returns the name of the PND
return pnd.Name
// ContainsDevice checks if the given device uuid is registered for this PND
func (pnd *pndImplementation) ContainsDevice(id uuid.UUID) bool {
// GetDescription returns the current description of the PND
func (pnd *pndImplementation) GetDescription() string {
return pnd.Description
// GetSBIs returns the registered SBIs
func (pnd *pndImplementation) GetSBIs() si.Store {
return pnd.sbic
// Destroy destroys the PND
func (pnd *pndImplementation) Destroy() error {
return destroy()
}
// AddSbi adds a SBI to the PND which will be supported
func (pnd *pndImplementation) AddSbi(s southbound.SouthboundInterface) error {
// devices and remove the devices using
// this SBI
func (pnd *pndImplementation) RemoveSbi(id uuid.UUID) error {
return pnd.removeSbi(id)
//AddDevice adds a new device to the PND
func (pnd *pndImplementation) AddDevice(name string, opt *tpb.TransportOption, sid uuid.UUID) error {
labels := prometheus.Labels{"type": opt.Type.String()}
start := metrics.StartHook(labels, deviceCreationsTotal)
defer metrics.FinishHook(labels, start, deviceCreationDurationSecondsTotal, deviceCreationDurationSeconds)
var sbi southbound.SouthboundInterface
switch t := opt.Type; t {
case spb.Type_CONTAINERISED:
case spb.Type_PLUGIN:
var err error
sbi, err = pnd.requestPlugin(name, opt)
if err != nil {
return err
}
default:
var err error
sbi, err = pnd.sbic.GetSBI(sid)
if err != nil {
return err
}
Martin Stiemerling
committed
d, err := NewDevice(name, uuid.Nil, opt, sbi)
//AddDeviceFromStore adds a new device to the PND
func (pnd *pndImplementation) AddDeviceFromStore(name string, deviceUUID uuid.UUID, opt *tpb.TransportOption, sid uuid.UUID) error {
return pnd.handleCsbiEnrolment(name, opt)
}
sbi, err := pnd.sbic.GetSBI(sid)
if err != nil {
return err
}
Martin Stiemerling
committed
d, err := NewDevice(name, deviceUUID, opt, sbi)
if err != nil {
return err
}
return pnd.addDevice(d)
}
func (pnd *pndImplementation) GetDevice(identifier string) (device.Device, error) {
d, err := pnd.devices.GetDevice(store.FromString(identifier))
if err != nil {
return nil, err
}
copiedDevice := &CommonDevice{name: d.Name(), UUID: d.ID(), GoStruct: copiedGoStruct}
return copiedDevice, nil
// RemoveDevice removes a device from the PND
func (pnd *pndImplementation) RemoveDevice(uuid uuid.UUID) error {
// Actual implementation, bind to struct if
// neccessary
func destroy() error {
return nil
}
func (pnd *pndImplementation) addSbi(sbi southbound.SouthboundInterface) error {
func (pnd *pndImplementation) removeSbi(id uuid.UUID) error {
func (pnd *pndImplementation) addDevice(device device.Device) error {
if err != nil {
return err
}
func (pnd *pndImplementation) removeDevice(id uuid.UUID) error {
d, err := pnd.devices.GetDevice(id)
if err != nil {
return err
}
labels := prometheus.Labels{"type": d.SBI().SbiIdentifier()}
start := metrics.StartHook(labels, deviceDeletionsTotal)
defer metrics.FinishHook(labels, start, deviceDeletionDurationSecondsTotal, deviceDeletionDurationSeconds)
switch d.(type) {
case *CsbiDevice:
return pnd.handleCsbiDeletion(id)
default:
return pnd.devices.Delete(id)
}
func (pnd *pndImplementation) MarshalDevice(identifier string) (string, error) {
foundDevice, err := pnd.devices.GetDevice(store.FromString(identifier))
if err != nil {
return "", err
}
jsonTree, err := json.MarshalIndent(foundDevice.Model(), "", "\t")
if err != nil {
return "", err
}
"pnd": pnd.Id,
"Identifier": identifier,
"Name": foundDevice.Name,
return string(jsonTree), nil
// Request sends a get request to a specific device
func (pnd *pndImplementation) Request(uuid uuid.UUID, path string) (proto.Message, error) {
d, err := pnd.devices.GetDevice(store.FromString(uuid.String()))
resp, ok := res.(proto.Message)
if !ok {
return nil, &errors.ErrInvalidTypeAssertion{
Value: res,
Type: (*proto.Message)(nil),
}
}
err = d.ProcessResponse(resp)
// RequestAll sends a request for all registered devices
func (pnd *pndImplementation) RequestAll(path string) error {
for _, k := range pnd.devices.UUIDs() {
_, err := pnd.Request(k, path)
if err != nil {
"pnd": pnd.Id,
"path": path,
}).Info("sent request to all devices")
// ChangeOND creates a change from the provided Operation, path and value.
// The Change is Pending and times out after the specified timeout period
func (pnd *pndImplementation) ChangeOND(duid uuid.UUID, operation ppb.ApiOperation, path string, value ...string) (uuid.UUID, error) {
d, err := pnd.devices.GetDevice(duid)
p, err := ygot.StringToStructuredPath(path)
if err != nil {
if operation != ppb.ApiOperation_DELETE && len(value) != 1 {
return uuid.Nil, &errors.ErrInvalidParameters{
case ppb.ApiOperation_UPDATE, ppb.ApiOperation_REPLACE:
if err := ytypes.SetNode(d.SBI().Schema().RootSchema(), cpy, p, typedValue); err != nil {
if err := ytypes.DeleteNode(d.SBI().Schema().RootSchema(), cpy, p); err != nil {
return uuid.Nil, &errors.ErrOperationNotSupported{Op: operation}
callback := func(original ygot.GoStruct, modified ygot.GoStruct) error {
ctx := context.WithValue(context.Background(), types.CtxKeyOperation, operation) // nolint
payload := change.Payload{Original: original, Modified: modified}
return d.Transport().Set(ctx, payload)
ch := NewChange(duid, d.Model(), cpy, callback)
if err := pnd.changes.Add(ch); err != nil {
return uuid.Nil, err
}
return ch.cuid, nil
// nolint will be implemented in the near future
func handleRollbackError(id uuid.UUID, err error) {
log.Error(err)
// TODO: Notion of invalid state needed.
}
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
func (pnd *pndImplementation) handleCsbiDeletion(id uuid.UUID) error {
log.Infof("csbi deletion triggered for %v", id)
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
req := &cpb.DeleteRequest{
Timestamp: time.Now().UnixNano(),
Did: []string{id.String()},
}
resp, err := pnd.csbiClient.Delete(ctx, req)
if err != nil {
return err
}
log.WithFields(log.Fields{
"uuid": id,
"status": resp.Status,
}).Info("csbi deleted")
return nil
}
func (pnd *pndImplementation) handleCsbiEnrolment(name string, opt *tpb.TransportOption) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5)
defer cancel()
req := &cpb.CreateRequest{
Timestamp: time.Now().UnixNano(),
TransportOption: []*tpb.TransportOption{opt},
}
resp, err := pnd.csbiClient.Create(ctx, req)
if err != nil {
return err
}
for _, d := range resp.Deployments {
log.Error(err)
}
}
return nil
}
func (pnd *pndImplementation) createCsbiDevice(ctx context.Context, name string, d *cpb.Deployment, opt *tpb.TransportOption) error {
defer func() {
if r := recover(); r != nil {
log.Errorf("recovered in sbi enrolment: %v", r)
}
}()
id, err := uuid.Parse(d.Id)
if err != nil {
return err
}
select {
case <-tickatus.C:
log.WithFields(log.Fields{
"id": d.Id,
"err": ctx.Err(),
}).Error("csbi handshake timed out")
case deviceDetails := <-ch:
log.Infof("syn from csbi %v", deviceDetails.ID)
id, err := uuid.Parse(deviceDetails.ID)
if err != nil {
panic(err)
}
csbiTransportOptions := &tpb.TransportOption{
Address: deviceDetails.Address,
Username: opt.Username,
Password: opt.Password,
Tls: opt.Tls,
Type: opt.Type,
TransportOption: opt.TransportOption,
}
log.WithField("transport option", csbiTransportOptions).Debug("gosdn gnmi transport options")
Martin Stiemerling
committed
d, err := NewDevice(name, uuid.Nil, csbiTransportOptions, csbi)
if err != nil {
panic(err)
}
d.(*CsbiDevice).UUID = id
ch <- store.DeviceDetails{TransportOption: opt}
if err := pnd.devices.Add(d, d.Name()); err != nil {
panic(err)
}
}
pnd.callback(id, nil)
close(ch)
}()
return nil
}
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
func (pnd *pndImplementation) requestPlugin(name string, opt *tpb.TransportOption) (southbound.SouthboundInterface, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*10)
defer cancel()
req := &cpb.CreateRequest{
Timestamp: time.Now().UnixNano(),
TransportOption: []*tpb.TransportOption{opt},
}
client, err := pnd.csbiClient.CreatePlugin(ctx, req)
if err != nil {
return nil, err
}
id := uuid.New()
f, err := os.Create("plugin-" + id.String() + ".so")
if err != nil {
return nil, err
}
defer f.Close()
for {
payload, err := client.Recv()
if err != nil {
if err == io.EOF {
break
}
client.CloseSend()
return nil, err
}
n, err := f.Write(payload.Chunk)
if err != nil {
client.CloseSend()
return nil, err
}
log.WithField("n", n).Trace("wrote bytes")
}
if err := f.Sync(); err != nil {
return nil, err
}
return loadPlugin(id)
}
func loadPlugin(id uuid.UUID) (southbound.SouthboundInterface, error) {
p, err := plugin.Open("plugin-" + id.String() + ".so")
if err != nil {
return nil, err
}
symbol, err := p.Lookup("PluginSymbol")
if err != nil {
return nil, err
}
var sbi southbound.SouthboundInterface
sbi, ok := symbol.(southbound.SouthboundInterface)
if !ok {
return nil, &errors.ErrInvalidTypeAssertion{
Value: symbol,
Type: (*southbound.SouthboundInterface)(nil),
}
}
log.WithFields(log.Fields{
"identifier": sbi.SbiIdentifier(),
"id": sbi.ID(),
"type": sbi.Type(),
}).Trace("plugin information")
return sbi, nil
}
func (pnd *pndImplementation) loadStoredDevices() error {
devices, err := pnd.devices.Load()
if err != nil {
return err
}
for _, device := range devices {
err := pnd.AddDeviceFromStore(
device.Name,
device.DeviceID,
&tpb.TransportOption{
Address: device.TransportAddress,
Username: device.TransportUsername,
Password: device.TransportPassword,
TransportOption: &tpb.TransportOption_GnmiTransportOption{
GnmiTransportOption: &tpb.GnmiTransportOption{},
},
}, device.SBI)
if err != nil {
return err
}
}
return nil
}