Newer
Older
package server
import (
"context"
"time"
mnepb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/networkelement"
ppb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/pnd"
tpb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/transport"
"code.fbi.h-da.de/danet/gosdn/controller/config"
"code.fbi.h-da.de/danet/gosdn/controller/conflict"
"code.fbi.h-da.de/danet/gosdn/controller/customerrs"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/change"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/networkdomain"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/networkelement"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/plugin"
"code.fbi.h-da.de/danet/gosdn/controller/metrics"
"code.fbi.h-da.de/danet/gosdn/controller/nucleus"
"code.fbi.h-da.de/danet/gosdn/controller/nucleus/types"
util "code.fbi.h-da.de/danet/gosdn/controller/nucleus/util/gnmi"
"code.fbi.h-da.de/danet/gosdn/controller/store"
aGNMI "code.fbi.h-da.de/danet/gosdn/forks/goarista/gnmi"
"github.com/bufbuild/protovalidate-go"
"github.com/google/uuid"
Malte Bauch
committed
"github.com/hashicorp/go-multierror"
"github.com/openconfig/gnmi/proto/gnmi"
"github.com/openconfig/ygot/ygot"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
)
// NetworkElementServer represents a NetworkElementServer.
type NetworkElementServer struct {
mnepb.UnimplementedNetworkElementServiceServer
mneService networkelement.Service
pndService networkdomain.Service
pluginService plugin.Service
changeStore store.ChangeStore
protoValidator *protovalidate.Validator
networkElementWatchter *nucleus.NetworkElementWatcher
}
// NewNetworkElementServer returns a new NetWorkElementServer.
func NewNetworkElementServer(
mneService networkelement.Service,
pndService networkdomain.Service,
pluginService plugin.Service,
changeStore store.ChangeStore,
protoValidator *protovalidate.Validator,
networkElementWatchter *nucleus.NetworkElementWatcher,
return &NetworkElementServer{
mneService: mneService,
pndService: pndService,
pluginService: pluginService,
changeStore: changeStore,
protoValidator: protoValidator,
networkElementWatchter: networkElementWatchter,
}
}
// Update updates a network element.
func (n *NetworkElementServer) Update(ctx context.Context, request *mnepb.UpdateNetworkElementRequest) (*mnepb.UpdateNetworkElementResponse, error) {
if err := n.protoValidator.Validate(request); err != nil {
return nil, status.Errorf(codes.Aborted, "%v", err)
}
mneID, err := uuid.Parse(request.NetworkElement.Id)
if err != nil {
return nil, err
}
err = n.mneService.UpdateModel(mneID, request.NetworkElement.Model)
if err != nil {
return nil, err
networkElement, err := n.mneService.Get(store.Query{ID: mneID})
if err != nil {
return nil, err
}
err = networkelement.EnsureIntendedConfigurationIsAppliedOnNetworkElement(networkElement)
if err != nil {
}
return &mnepb.UpdateNetworkElementResponse{
Timestamp: time.Now().UnixNano(),
}, nil
}
// GetMne gets a specific mne.
Fabian Seidl
committed
func (n *NetworkElementServer) Get(ctx context.Context, request *mnepb.GetRequest) (*mnepb.GetResponse, error) {
if err := n.protoValidator.Validate(request); err != nil {
return nil, status.Errorf(codes.Aborted, "%v", err)
}
labels := prometheus.Labels{"service": "mne", "rpc": "get"}
start := metrics.StartHook(labels, grpcRequestsTotal)
defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
pnd, err := n.getPnd(request.Pid)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
networkElement, err := n.getMne(request.Mneid)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
// TODO(path): This needs some adjustments when we're switching towards a new
// path request handling.
mne, err := fillMneBySpecificPath(networkElement, "/", false)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
Fabian Seidl
committed
return &mnepb.GetResponse{
Timestamp: time.Now().UnixNano(),
Pnd: &ppb.PrincipalNetworkDomain{
Id: pnd.ID().String(),
Name: pnd.GetName(),
Description: pnd.GetDescription(),
},
Mne: mne,
}, nil
}
// GetFlattenedMne gets a specific mne.
Fabian Seidl
committed
func (n *NetworkElementServer) GetFlattened(ctx context.Context, request *mnepb.GetFlattenedRequest) (*mnepb.GetFlattenedResponse, error) {
if err := n.protoValidator.Validate(request); err != nil {
return nil, status.Errorf(codes.Aborted, "%v", err)
}
labels := prometheus.Labels{"service": "pnd", "rpc": "get"}
start := metrics.StartHook(labels, grpcRequestsTotal)
defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
networkElement, err := n.getMne(request.Mneid)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
Fabian Seidl
committed
return &mnepb.GetFlattenedResponse{
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
Timestamp: time.Now().UnixNano(),
Mne: &mnepb.FlattenedManagedNetworkElement{
Id: networkElement.ID().String(),
Name: networkElement.Name(),
Pid: networkElement.PndID().String(),
Pluginid: networkElement.GetPlugin().ID().String(),
},
}, nil
}
func (n *NetworkElementServer) getMne(identifier string) (networkelement.NetworkElement, error) {
id, err := uuid.Parse(identifier)
if err != nil {
id = uuid.Nil
}
mne, err := n.mneService.Get(store.Query{
ID: id,
})
if mne == nil {
return nil, fmt.Errorf("no network element found")
}
if err != nil {
return nil, err
}
return mne, nil
}
func (n *NetworkElementServer) getPnd(identifier string) (networkdomain.NetworkDomain, error) {
pid, err := uuid.Parse(identifier)
if err != nil {
return nil, err
}
pnd, err := n.pndService.Get(store.Query{ID: pid})
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
return pnd, nil
}
// GetMneList returns a list of existing mnes.
Fabian Seidl
committed
func (n *NetworkElementServer) GetAll(ctx context.Context, request *mnepb.GetAllRequest) (*mnepb.GetAllResponse, error) {
if err := n.protoValidator.Validate(request); err != nil {
return nil, status.Errorf(codes.Aborted, "%v", err)
}
labels := prometheus.Labels{"service": "mne", "rpc": "get"}
start := metrics.StartHook(labels, grpcRequestsTotal)
defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
pnd, err := n.getPnd(request.Pid)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
storedMNE, err := n.mneService.GetAll()
if err != nil {
return nil, err
}
mnes := make([]*mnepb.ManagedNetworkElement, len(storedMNE))
for i, networkElement := range storedMNE {
mne, err := fillMneBySpecificPath(networkElement, "/", false)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
mnes[i] = mne
}
Fabian Seidl
committed
return &mnepb.GetAllResponse{
Timestamp: time.Now().UnixNano(),
Pnd: &ppb.PrincipalNetworkDomain{
Id: pnd.ID().String(),
Name: pnd.GetName(),
Description: pnd.GetDescription(),
},
Mne: mnes,
}, nil
}
// GetFlattenedMneList returns a list of existing mnes.
Fabian Seidl
committed
func (n *NetworkElementServer) GetAllFlattened(ctx context.Context, request *mnepb.GetAllFlattenedRequest) (*mnepb.GetAllFlattenedResponse, error) {
if err := n.protoValidator.Validate(request); err != nil {
return nil, status.Errorf(codes.Aborted, "%v", err)
}
labels := prometheus.Labels{"service": "mne", "rpc": "get"}
start := metrics.StartHook(labels, grpcRequestsTotal)
defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
pnd, err := n.getPnd(request.Pid)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
mnes, err := n.mneService.GetAllAsLoaded()
if err != nil {
return nil, err
}
flattenedMnes := make([]*mnepb.FlattenedManagedNetworkElement, len(mnes))
for i, mne := range mnes {
mne := &mnepb.FlattenedManagedNetworkElement{
Id: mne.ID,
Name: mne.Name,
Pluginid: mne.Plugin,
Pid: mne.PndID,
}
flattenedMnes[i] = mne
}
Fabian Seidl
committed
return &mnepb.GetAllFlattenedResponse{
Timestamp: time.Now().UnixNano(),
Pnd: &ppb.PrincipalNetworkDomain{
Id: pnd.ID().String(),
Name: pnd.GetName(),
Description: pnd.GetDescription(),
},
Mne: flattenedMnes,
}, nil
}
func fillMneBySpecificPath(nme networkelement.NetworkElement, path string, requestForIntendedState bool) (*mnepb.ManagedNetworkElement, error) {
gnmiPath, err := ygot.StringToStructuredPath(path)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
notifications, err := nme.GetPlugin().GetNode(gnmiPath, requestForIntendedState)
if err != nil {
return nil, status.Errorf(codes.Aborted, "%v", err)
}
mne := &mnepb.ManagedNetworkElement{
Id: nme.ID().String(),
Name: nme.Name(),
MneNotification: notifications,
}
return mne, nil
}
// GetPath gets an actual state of the path on a mne.
func (n *NetworkElementServer) GetPath(ctx context.Context, request *mnepb.GetPathRequest) (*mnepb.GetPathResponse, error) {
if err := n.protoValidator.Validate(request); err != nil {
return nil, status.Errorf(codes.Aborted, "%v", err)
}
labels := prometheus.Labels{"service": "mne", "rpc": "get"}
start := metrics.StartHook(labels, grpcRequestsTotal)
defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
pnd, err := n.getPnd(request.Pid)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
networkElement, err := n.getMne(request.Mneid)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
// In case we get the path from grpc-gateway we have to replace
path := strings.ReplaceAll(request.Path, "||", "/")
resp, err := n.getPath(ctx, networkElement, path)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
return &mnepb.GetPathResponse{
Timestamp: time.Now().UnixNano(),
Pnd: &ppb.PrincipalNetworkDomain{
Id: pnd.ID().String(),
Name: pnd.GetName(),
Description: pnd.GetDescription(),
},
MneNotification: resp.Notification,
func (n *NetworkElementServer) getPath(ctx context.Context, mne networkelement.NetworkElement, path string) (*gnmi.GetResponse, error) {
res, err := mne.Transport().Get(ctx, path)
if err != nil {
return nil, err
}
protoMessage, ok := res.(proto.Message)
if !ok {
return nil, &customerrs.InvalidTypeAssertionError{
Value: res,
Type: (*proto.Message)(nil),
}
}
getResponse, ok := protoMessage.(*gnmi.GetResponse)
if !ok {
return nil, &customerrs.InvalidTypeAssertionError{
Value: res,
Type: (*gnmi.GetResponse)(nil),
}
return getResponse, nil
}
// GetIntendedPath gets a path as the intended state stored in the storage.
func (n *NetworkElementServer) GetIntendedPath(ctx context.Context, request *mnepb.GetIntendedPathRequest) (*mnepb.GetIntendedPathResponse, error) {
if err := n.protoValidator.Validate(request); err != nil {
return nil, status.Errorf(codes.Aborted, "%v", err)
}
pnd, err := n.getPnd(request.Pid)
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
// In case we get the path from grpc-gateway we have to replace
intendedPath := strings.ReplaceAll(request.IntendedPath, "||", "/")
networkElement, err := n.getMne(request.Mneid)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
mne, err := fillMneBySpecificPath(networkElement, intendedPath, true)
if err != nil {
return nil, err
}
return &mnepb.GetIntendedPathResponse{
Timestamp: time.Now().UnixNano(),
Pnd: &ppb.PrincipalNetworkDomain{
Id: pnd.ID().String(),
Name: pnd.GetName(),
Description: pnd.GetDescription(),
},
MneNotification: mne.MneNotification,
}, nil
}
// GetChange gets a specific change of a mne.
func (n *NetworkElementServer) GetChange(ctx context.Context, request *mnepb.GetChangeRequest) (*mnepb.GetChangeResponse, error) {
if err := n.protoValidator.Validate(request); err != nil {
return nil, status.Errorf(codes.Aborted, "%v", err)
}
labels := prometheus.Labels{"service": "mne", "rpc": "get"}
start := metrics.StartHook(labels, grpcRequestsTotal)
defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
pnd, err := n.getPnd(request.Pid)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
changes, err := n.fillChanges(false, request.Cuid...)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
return &mnepb.GetChangeResponse{
Timestamp: time.Now().UnixNano(),
Pnd: &ppb.PrincipalNetworkDomain{
Id: pnd.ID().String(),
Name: pnd.GetName(),
Description: pnd.GetDescription(),
},
Change: changes,
}, nil
}
// GetChangeList gets all existing changes.
func (n *NetworkElementServer) GetChangeList(ctx context.Context, request *mnepb.GetChangeListRequest) (*mnepb.GetChangeListResponse, error) {
if err := n.protoValidator.Validate(request); err != nil {
return nil, status.Errorf(codes.Aborted, "%v", err)
}
labels := prometheus.Labels{"service": "mne", "rpc": "get"}
start := metrics.StartHook(labels, grpcRequestsTotal)
defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
pnd, err := n.getPnd(request.Pid)
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
changes, err := n.fillChanges(true, "")
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
return &mnepb.GetChangeListResponse{
Timestamp: time.Now().UnixNano(),
Pnd: &ppb.PrincipalNetworkDomain{
Id: pnd.ID().String(),
Name: pnd.GetName(),
Description: pnd.GetDescription(),
},
Change: changes,
}, nil
}
func (n *NetworkElementServer) fillChanges(all bool, cuid ...string) ([]*mnepb.Change, error) {
var changeList []uuid.UUID
switch all {
case true:
changeList = n.changeStore.Pending()
changeList = append(changeList, n.changeStore.Committed()...)
default:
var err error
if len(cuid) == 0 {
return nil, &customerrs.InvalidParametersError{
Func: n.fillChanges,
Param: "length of 'mneID' cannot be '0' when 'all' is set to 'false'",
}
}
changeList, err = stringArrayToUUIDs(cuid)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
}
changes := make([]*mnepb.Change, len(changeList))
for i, ch := range changeList {
c, err := n.changeStore.GetChange(ch)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
// Diff could be added here.
changes[i] = &mnepb.Change{
Id: ch.String(),
Age: c.Age().Microseconds(),
State: c.State(),
}
}
return changes, nil
}
func stringArrayToUUIDs(sid []string) ([]uuid.UUID, error) {
UUIDs := make([]uuid.UUID, len(sid))
for i, id := range sid {
parsed, err := uuid.Parse(id)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
UUIDs[i] = parsed
}
return UUIDs, nil
}
// SetMneList adds the list of mnes to the storage.
Fabian Seidl
committed
func (n *NetworkElementServer) AddList(ctx context.Context, request *mnepb.AddListRequest) (*mnepb.AddListResponse, error) {
if err := n.protoValidator.Validate(request); err != nil {
return nil, status.Errorf(codes.Aborted, "%v", err)
}
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
labels := prometheus.Labels{"service": "mne", "rpc": "set"}
start := metrics.StartHook(labels, grpcRequestsTotal)
defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
pndID, err := uuid.Parse(request.Pid)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
networkElementIDs := make([]uuid.UUID, 0, len(request.Mne))
for _, r := range request.Mne {
pluginId, err := uuid.Parse(r.GetPluginId())
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
mneID, err := n.addMne(ctx, r.MneName, r.TransportOption, nil, pluginId, pndID)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
networkElementIDs = append(networkElementIDs, mneID)
}
r := make([]*mnepb.SetResponse, len(networkElementIDs))
for i, mneID := range networkElementIDs {
r[i] = &mnepb.SetResponse{Id: mneID.String()}
Fabian Seidl
committed
return &mnepb.AddListResponse{
Timestamp: time.Now().UnixNano(),
Responses: r,
}, nil
}
Malte Bauch
committed
func (n *NetworkElementServer) addMne(ctx context.Context, name string, opt *tpb.TransportOption, requestPluginFunc func(uuid.UUID) (plugin.Plugin, error), pluginID uuid.UUID, pndID uuid.UUID, optionalNetworkElementID ...uuid.UUID) (mneId uuid.UUID, err error) {
var plugin plugin.Plugin
var mne networkelement.NetworkElement
defer func() {
if err != nil && plugin != nil {
// clean up plugin
if pErr := plugin.Remove(); pErr != nil {
err = multierror.Append(err, pErr)
}
}
}()
networkElementID := uuid.Nil
if len(optionalNetworkElementID) > 0 {
networkElementID = optionalNetworkElementID[0]
}
if requestPluginFunc == nil {
requestPluginFunc = n.pluginService.RequestPlugin
}
Malte Bauch
committed
plugin, err = requestPluginFunc(pluginID)
if err != nil {
return uuid.Nil, err
}
Malte Bauch
committed
mne, err = nucleus.NewNetworkElement(name, networkElementID, opt, pndID, plugin, conflict.Metadata{ResourceVersion: 0})
if err != nil {
return uuid.Nil, err
}
if mne.IsTransportValid() {
Malte Bauch
committed
err = n.initialNetworkElementRootPathRequest(ctx, mne, plugin)
if err != nil {
return uuid.Nil, err
}
Malte Bauch
committed
err = n.mneService.Add(mne)
if err != nil {
return uuid.Nil, err
}
Malte Bauch
committed
err = n.pluginService.Add(plugin)
Malte Bauch
committed
if err := n.mneService.Delete(mne); err != nil {
return uuid.Nil, err
}
return uuid.Nil, err
}
n.networkElementWatchter.SubscribeToNetworkElement(mne, config.GetGnmiSubscriptionPaths(), nil)
} else {
Malte Bauch
committed
err = fmt.Errorf("invalid transport data provided")
return uuid.Nil, err
}
return mne.ID(), nil
}
Malte Bauch
committed
func (n *NetworkElementServer) initialNetworkElementRootPathRequest(ctx context.Context, mne networkelement.NetworkElement, plugin plugin.Plugin) error {
resp, err := n.getPath(ctx, mne, "/")
if err != nil {
return err
}
err = mne.ProcessResponse(resp)
if err != nil {
return err
}
return nil
}
// SetChangeList sets a list of changes.
func (n *NetworkElementServer) SetChangeList(ctx context.Context, request *mnepb.SetChangeListRequest) (*mnepb.SetChangeListResponse, error) {
labels := prometheus.Labels{"service": "mne", "rpc": "set"}
start := metrics.StartHook(labels, grpcRequestsTotal)
defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
if err := n.protoValidator.Validate(request); err != nil {
return nil, status.Errorf(codes.Aborted, "%v", err)
}
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
responses := make([]*mnepb.SetResponse, len(request.Change))
for i, r := range request.Change {
cuid, err := uuid.Parse(r.Cuid)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
switch r.Op {
case mnepb.Operation_OPERATION_COMMIT:
if err := n.Commit(cuid); err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
case mnepb.Operation_OPERATION_CONFIRM:
if err := n.Confirm(cuid); err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
default:
return nil, &customerrs.InvalidParametersError{
Param: r.Op,
}
}
responses[i] = &mnepb.SetResponse{
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
}
}
return &mnepb.SetChangeListResponse{
Timestamp: time.Now().UnixNano(),
Responses: responses,
}, nil
}
// Commit calls commit on the pending change with ID.
func (n *NetworkElementServer) Commit(u uuid.UUID) error {
ch, err := n.changeStore.GetChange(u)
if err != nil {
return err
}
if err := ch.Commit(); err != nil {
return err
}
// Set all the changes within the network elements model
networkElement, err := n.mneService.Get(store.Query{ID: ch.AssociatedDeviceID()})
if err != nil {
return err
}
diff, err := networkElement.GetPlugin().Diff(ch.PreviousState(), ch.IntendedState())
if err != nil {
return err
}
for _, update := range diff.GetUpdate() {
if err := networkElement.GetPlugin().SetNode(update.GetPath(), update.GetVal()); err != nil {
return err
}
}
for _, deletePath := range diff.GetDelete() {
if err := networkElement.GetPlugin().DeleteNode(deletePath); err != nil {
return err
}
}
// update the network element
return n.mneService.Update(networkElement)
}
// Confirm calls confirm on pending the pending change with ID.
func (n *NetworkElementServer) Confirm(u uuid.UUID) error {
ch, err := n.changeStore.GetChange(u)
if err != nil {
return err
}
return ch.Confirm()
}
// SetPathList sets a list of paths.
func (n *NetworkElementServer) SetPathList(ctx context.Context, request *mnepb.SetPathListRequest) (*mnepb.SetPathListResponse, error) {
labels := prometheus.Labels{"service": "mne", "rpc": "set"}
start := metrics.StartHook(labels, grpcRequestsTotal)
defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
if err := n.protoValidator.Validate(request); err != nil {
return nil, status.Errorf(codes.Aborted, "%v", err)
}
responses := make([]*mnepb.SetResponse, len(request.ChangeRequest))
for i, r := range request.ChangeRequest {
var cid uuid.UUID
var err error
mneID, err := uuid.Parse(r.Mneid)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
// NOTE: Could be useful to split method into two, one for
// update/replace and on for delete.
cid, err = n.ChangeMNE(mneID, r.ApiOp, r.Path, r.Value)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
responses[i] = &mnepb.SetResponse{
}
}
return &mnepb.SetPathListResponse{
Timestamp: time.Now().UnixNano(),
Responses: responses,
}, nil
}
// ChangeMNE creates a change from the provided Operation, path and value.
// The Change is Pending and times out after the specified timeout period.
func (n *NetworkElementServer) ChangeMNE(duid uuid.UUID, operation mnepb.ApiOperation, path *gnmi.Path, value *gnmi.TypedValue) (uuid.UUID, error) {
mne, err := n.mneService.Get(store.Query{
ID: duid,
})
if err != nil {
return uuid.Nil, err
}
plugin := mne.GetPlugin()
validatedChangeModel, err := plugin.ValidateChange(operation, path, value)
if err != nil {
return uuid.Nil, err
}
filteredMarshaledModel, err := plugin.PruneConfigFalse(validatedChangeModel)
if err != nil {
return uuid.Nil, err
}
currentModel, err := mne.GetModelAsFilteredCopy()
if err != nil {
return uuid.Nil, err
diff, err := plugin.Diff(currentModel, filteredMarshaledModel)
if err != nil {
return uuid.Nil, err
}
if util.IsGNMINotificationEmpty(diff) {
return uuid.Nil, customerrs.NoNewChangesError{Original: string(currentModel), Modified: string(filteredMarshaledModel)}
}
callback := func(original, modified []byte) error {
ctx := context.WithValue(context.Background(), types.CtxKeyOperation, operation) // nolint
payload := change.Payload{Original: original, Modified: modified, Diff: diff}
return mne.Transport().Set(ctx, payload)
}
ch := nucleus.NewChange(duid, currentModel, filteredMarshaledModel, diff, callback)
if err := n.changeStore.Add(ch); err != nil {
return uuid.Nil, err
}
return ch.ID(), nil
}
// DeleteMne deletes a mne.
Fabian Seidl
committed
func (n *NetworkElementServer) Delete(ctx context.Context, request *mnepb.DeleteRequest) (*mnepb.DeleteResponse, error) {
if err := n.protoValidator.Validate(request); err != nil {
return nil, status.Errorf(codes.Aborted, "%v", err)
}
mneID, err := uuid.Parse(request.Mneid)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
if err := n.deleteMne(mneID); err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
Fabian Seidl
committed
return &mnepb.DeleteResponse{
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
Timestamp: time.Now().UnixNano(),
}, nil
}
func (n *NetworkElementServer) deleteMne(id uuid.UUID) error {
mne, err := n.mneService.Get(store.Query{
ID: id,
Name: id.String(),
})
if err != nil {
return err
}
// Note: cSBI not supported currently, so this is commented fow now.
// Might be needed or removed in the future.
//
// switch mne.(type) {
// case *CsbiNetworkElement:
// return pnd.handleCsbiDeletion(mne)
// default:
// return pnd.networkElementService.Delete(mne)
// }
return n.mneService.Delete(mne)
}
// SubscribePath subscribes to specifc paths of an mne.
func (n *NetworkElementServer) SubscribePath(request *mnepb.SubscribePathRequest, stream mnepb.NetworkElementService_SubscribePathServer) error {
networkElement, err := n.getMne(request.Mneid)
log.Error(err)
return status.Errorf(codes.Aborted, "%v", err)
if err := n.subscribePath(networkElement, request.Sublist); err != nil {
return err
}
return nil
}
func (n *NetworkElementServer) subscribePath(networkElement networkelement.NetworkElement, subList *mnepb.SubscriptionList) error {
mode, err := n.mapModeToAristaFork(subList.GetMode())
if err != nil {
return err
}
for _, sub := range subList.Subscription {
streamMode, err := n.mapStreamModeToAristaFork(sub.GetStreamMode())
if err != nil {
return err
}
opts := &aGNMI.SubscribeOptions{
Mode: mode,
StreamMode: streamMode,
Paths: [][]string{n.splitStringPath(sub.GetPath())},
SampleInterval: sub.SampleInterval,
}
ctx := context.Background()
ctx = context.WithValue(ctx, types.CtxKeyOpts, opts)
if err = networkElement.Transport().Subscribe(ctx); err != nil {
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
return err
}
}
return nil
}
func (n *NetworkElementServer) splitStringPath(s string) []string {
return strings.Split(s, "/")
}
func (n *NetworkElementServer) mapStreamModeToAristaFork(mode mnepb.StreamMode) (string, error) {
switch mode {
case mnepb.StreamMode_STREAM_MODE_TARGET_DEFINED:
return "target_defined", nil
case mnepb.StreamMode_STREAM_MODE_ON_CHANGE:
return "on_change", nil
case mnepb.StreamMode_STREAM_MODE_SAMPLE:
return "sample", nil
default:
return "", fmt.Errorf("StreamMode of type: %T is not supported", mode)
}
}
func (n *NetworkElementServer) mapModeToAristaFork(mode mnepb.SubscriptionMode) (string, error) {
switch mode {
case mnepb.SubscriptionMode_SUBSCRIPTION_MODE_STREAM:
return "stream", nil
case mnepb.SubscriptionMode_SUBSCRIPTION_MODE_ONCE:
return "once", nil
case mnepb.SubscriptionMode_SUBSCRIPTION_MODE_POLL:
return "poll", nil
default:
return "", fmt.Errorf("SubscriptionMode of type: %T is not supported", mode)
}
}