Newer
Older
package server
import (
"context"
"time"
mnepb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/networkelement"
plugin_registry "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/plugin-registry"
ppb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/pnd"
tpb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/transport"
Fabian Seidl
committed
"code.fbi.h-da.de/danet/gosdn/controller/config"
"code.fbi.h-da.de/danet/gosdn/controller/conflict"
"code.fbi.h-da.de/danet/gosdn/controller/customerrs"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/change"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/networkdomain"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/networkelement"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/plugin"
"code.fbi.h-da.de/danet/gosdn/controller/metrics"
"code.fbi.h-da.de/danet/gosdn/controller/nucleus"
"code.fbi.h-da.de/danet/gosdn/controller/nucleus/types"
util "code.fbi.h-da.de/danet/gosdn/controller/nucleus/util/gnmi"
"code.fbi.h-da.de/danet/gosdn/controller/store"
aGNMI "code.fbi.h-da.de/danet/gosdn/forks/goarista/gnmi"
"github.com/bufbuild/protovalidate-go"
"github.com/google/uuid"
Malte Bauch
committed
"github.com/hashicorp/go-multierror"
"github.com/openconfig/gnmi/proto/gnmi"
Fabian Seidl
committed
ygotutil "github.com/openconfig/ygot/util"
"github.com/openconfig/ygot/ygot"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
)
// NetworkElementServer represents a NetworkElementServer.
type NetworkElementServer struct {
mnepb.UnimplementedNetworkElementServiceServer
mneService networkelement.Service
pndService networkdomain.Service
pluginService plugin.Service
changeStore store.ChangeStore
protoValidator *protovalidate.Validator
networkElementWatchter *nucleus.NetworkElementWatcher
}
// NewNetworkElementServer returns a new NetWorkElementServer.
func NewNetworkElementServer(
mneService networkelement.Service,
pndService networkdomain.Service,
pluginService plugin.Service,
changeStore store.ChangeStore,
protoValidator *protovalidate.Validator,
networkElementWatchter *nucleus.NetworkElementWatcher,
return &NetworkElementServer{
mneService: mneService,
pndService: pndService,
pluginService: pluginService,
changeStore: changeStore,
protoValidator: protoValidator,
networkElementWatchter: networkElementWatchter,
}
}
// Update updates a network element.
func (n *NetworkElementServer) Update(ctx context.Context, request *mnepb.UpdateNetworkElementRequest) (*mnepb.UpdateNetworkElementResponse, error) {
if err := n.protoValidator.Validate(request); err != nil {
return nil, status.Errorf(codes.Aborted, "%v", err)
}
mneID, err := uuid.Parse(request.NetworkElement.Id)
if err != nil {
return nil, err
}
err = n.mneService.UpdateModel(mneID, request.NetworkElement.Model)
if err != nil {
return nil, err
networkElement, err := n.mneService.Get(store.Query{ID: mneID})
if err != nil {
return nil, err
}
err = networkelement.EnsureIntendedConfigurationIsAppliedOnNetworkElement(networkElement)
if err != nil {
}
return &mnepb.UpdateNetworkElementResponse{
Timestamp: time.Now().UnixNano(),
}, nil
}
// Get gets a specific mne.
Fabian Seidl
committed
func (n *NetworkElementServer) Get(ctx context.Context, request *mnepb.GetRequest) (*mnepb.GetResponse, error) {
if err := n.protoValidator.Validate(request); err != nil {
return nil, status.Errorf(codes.Aborted, "%v", err)
}
labels := prometheus.Labels{"service": "mne", "rpc": "get"}
start := metrics.StartHook(labels, grpcRequestsTotal)
defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
pnd, err := n.getPnd(request.Pid)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
networkElement, err := n.getMne(request.Mneid)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
mne := &mnepb.ManagedNetworkElement{
Id: networkElement.ID().String(),
Name: networkElement.Name(),
TransportAddress: networkElement.TransportAddress(),
}
err = fillMneBySpecificPath(mne, networkElement, "/", false)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
err = fillMneWithPluginInformation(mne, networkElement)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
mne.Model, err = networkElement.GetModelAsString()
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
Fabian Seidl
committed
return &mnepb.GetResponse{
Timestamp: time.Now().UnixNano(),
Pnd: &ppb.PrincipalNetworkDomain{
Id: pnd.ID().String(),
Name: pnd.GetName(),
Description: pnd.GetDescription(),
},
Mne: mne,
}, nil
}
// GetFlattened gets a specific mne.
Fabian Seidl
committed
func (n *NetworkElementServer) GetFlattened(ctx context.Context, request *mnepb.GetFlattenedRequest) (*mnepb.GetFlattenedResponse, error) {
if err := n.protoValidator.Validate(request); err != nil {
return nil, status.Errorf(codes.Aborted, "%v", err)
}
labels := prometheus.Labels{"service": "pnd", "rpc": "get"}
start := metrics.StartHook(labels, grpcRequestsTotal)
defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
networkElement, err := n.getMne(request.Mneid)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
Fabian Seidl
committed
return &mnepb.GetFlattenedResponse{
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
Timestamp: time.Now().UnixNano(),
Mne: &mnepb.FlattenedManagedNetworkElement{
Id: networkElement.ID().String(),
Name: networkElement.Name(),
Pid: networkElement.PndID().String(),
Pluginid: networkElement.GetPlugin().ID().String(),
},
}, nil
}
func (n *NetworkElementServer) getMne(identifier string) (networkelement.NetworkElement, error) {
id, err := uuid.Parse(identifier)
if err != nil {
id = uuid.Nil
}
mne, err := n.mneService.Get(store.Query{
ID: id,
})
if mne == nil {
return nil, fmt.Errorf("no network element found")
}
if err != nil {
return nil, err
}
return mne, nil
}
func (n *NetworkElementServer) getPnd(identifier string) (networkdomain.NetworkDomain, error) {
pid, err := uuid.Parse(identifier)
if err != nil {
return nil, err
}
pnd, err := n.pndService.Get(store.Query{ID: pid})
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
return pnd, nil
}
// GetAll returns a list of existing mnes.
Fabian Seidl
committed
func (n *NetworkElementServer) GetAll(ctx context.Context, request *mnepb.GetAllRequest) (*mnepb.GetAllResponse, error) {
if err := n.protoValidator.Validate(request); err != nil {
return nil, status.Errorf(codes.Aborted, "%v", err)
}
labels := prometheus.Labels{"service": "mne", "rpc": "get"}
start := metrics.StartHook(labels, grpcRequestsTotal)
defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
pnd, err := n.getPnd(request.Pid)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
storedMNE, err := n.mneService.GetAll()
if err != nil {
return nil, err
}
mnes := make([]*mnepb.ManagedNetworkElement, len(storedMNE))
for i, networkElement := range storedMNE {
mne := &mnepb.ManagedNetworkElement{
Id: networkElement.ID().String(),
Name: networkElement.Name(),
TransportAddress: networkElement.TransportAddress(),
}
err := fillMneBySpecificPath(mne, networkElement, "/", false)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
err = fillMneWithPluginInformation(mne, networkElement)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
mnes[i] = mne
}
Fabian Seidl
committed
return &mnepb.GetAllResponse{
Timestamp: time.Now().UnixNano(),
Pnd: &ppb.PrincipalNetworkDomain{
Id: pnd.ID().String(),
Name: pnd.GetName(),
Description: pnd.GetDescription(),
},
Mne: mnes,
}, nil
}
// GetAllFlattened returns a list of existing mnes.
Fabian Seidl
committed
func (n *NetworkElementServer) GetAllFlattened(ctx context.Context, request *mnepb.GetAllFlattenedRequest) (*mnepb.GetAllFlattenedResponse, error) {
if err := n.protoValidator.Validate(request); err != nil {
return nil, status.Errorf(codes.Aborted, "%v", err)
}
labels := prometheus.Labels{"service": "mne", "rpc": "get"}
start := metrics.StartHook(labels, grpcRequestsTotal)
defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
pnd, err := n.getPnd(request.Pid)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
mnes, err := n.mneService.GetAllAsLoaded()
if err != nil {
return nil, err
}
flattenedMnes := make([]*mnepb.FlattenedManagedNetworkElement, len(mnes))
for i, mne := range mnes {
mne := &mnepb.FlattenedManagedNetworkElement{
Id: mne.ID,
Name: mne.Name,
Pluginid: mne.Plugin,
Pid: mne.PndID,
}
flattenedMnes[i] = mne
}
Fabian Seidl
committed
return &mnepb.GetAllFlattenedResponse{
Timestamp: time.Now().UnixNano(),
Pnd: &ppb.PrincipalNetworkDomain{
Id: pnd.ID().String(),
Name: pnd.GetName(),
Description: pnd.GetDescription(),
},
Mne: flattenedMnes,
}, nil
}
func fillMneWithPluginInformation(mne *mnepb.ManagedNetworkElement, nme networkelement.NetworkElement) error {
plugin := nme.GetPlugin()
pluginManifest := plugin.Manifest()
mne.Plugin = &plugin_registry.Plugin{
Id: plugin.ID().String(),
Manifest: &plugin_registry.Manifest{
Name: pluginManifest.Name,
Firmware: pluginManifest.Firmware,
Author: pluginManifest.Author,
Version: pluginManifest.Version,
},
}
return nil
}
func fillMneBySpecificPath(mne *mnepb.ManagedNetworkElement, nme networkelement.NetworkElement, path string, requestForIntendedState bool) error {
gnmiPath, err := ygot.StringToStructuredPath(path)
if err != nil {
log.Error(err)
return status.Errorf(codes.Aborted, "%v", err)
plugin := nme.GetPlugin()
notifications, err := plugin.GetNode(gnmiPath, requestForIntendedState)
return status.Errorf(codes.Aborted, "%v", err)
mne.MneNotification = notifications
return nil
// GetPath gets an actual state of the path on a mne.
func (n *NetworkElementServer) GetPath(ctx context.Context, request *mnepb.GetPathRequest) (*mnepb.GetPathResponse, error) {
if err := n.protoValidator.Validate(request); err != nil {
return nil, status.Errorf(codes.Aborted, "%v", err)
}
labels := prometheus.Labels{"service": "mne", "rpc": "get"}
start := metrics.StartHook(labels, grpcRequestsTotal)
defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
pnd, err := n.getPnd(request.Pid)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
networkElement, err := n.getMne(request.Mneid)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
// In case we get the path from grpc-gateway we have to replace
path := strings.ReplaceAll(request.Path, "||", "/")
resp, err := n.getPath(ctx, networkElement, path)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
return &mnepb.GetPathResponse{
Timestamp: time.Now().UnixNano(),
Pnd: &ppb.PrincipalNetworkDomain{
Id: pnd.ID().String(),
Name: pnd.GetName(),
Description: pnd.GetDescription(),
},
MneNotification: resp.Notification,
func (n *NetworkElementServer) getPath(ctx context.Context, mne networkelement.NetworkElement, path string) (*gnmi.GetResponse, error) {
res, err := mne.Transport().Get(ctx, path)
if err != nil {
return nil, err
}
protoMessage, ok := res.(proto.Message)
if !ok {
return nil, &customerrs.InvalidTypeAssertionError{
Value: res,
Type: (*proto.Message)(nil),
}
}
getResponse, ok := protoMessage.(*gnmi.GetResponse)
if !ok {
return nil, &customerrs.InvalidTypeAssertionError{
Value: res,
Type: (*gnmi.GetResponse)(nil),
}
return getResponse, nil
}
// GetIntendedPath gets a path as the intended state stored in the storage.
func (n *NetworkElementServer) GetIntendedPath(ctx context.Context, request *mnepb.GetIntendedPathRequest) (*mnepb.GetIntendedPathResponse, error) {
if err := n.protoValidator.Validate(request); err != nil {
return nil, status.Errorf(codes.Aborted, "%v", err)
}
pnd, err := n.getPnd(request.Pid)
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
// In case we get the path from grpc-gateway we have to replace
intendedPath := strings.ReplaceAll(request.IntendedPath, "||", "/")
networkElement, err := n.getMne(request.Mneid)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
mne := &mnepb.ManagedNetworkElement{}
err = fillMneBySpecificPath(mne, networkElement, intendedPath, true)
if err != nil {
return nil, err
}
return &mnepb.GetIntendedPathResponse{
Timestamp: time.Now().UnixNano(),
Pnd: &ppb.PrincipalNetworkDomain{
Id: pnd.ID().String(),
Name: pnd.GetName(),
Description: pnd.GetDescription(),
},
MneNotification: mne.MneNotification,
}, nil
}
// GetChange gets a specific change of a mne.
func (n *NetworkElementServer) GetChange(ctx context.Context, request *mnepb.GetChangeRequest) (*mnepb.GetChangeResponse, error) {
if err := n.protoValidator.Validate(request); err != nil {
return nil, status.Errorf(codes.Aborted, "%v", err)
}
labels := prometheus.Labels{"service": "mne", "rpc": "get"}
start := metrics.StartHook(labels, grpcRequestsTotal)
defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
pnd, err := n.getPnd(request.Pid)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
changes, err := n.fillChanges(false, request.Cuid...)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
return &mnepb.GetChangeResponse{
Timestamp: time.Now().UnixNano(),
Pnd: &ppb.PrincipalNetworkDomain{
Id: pnd.ID().String(),
Name: pnd.GetName(),
Description: pnd.GetDescription(),
},
Change: changes,
}, nil
}
// GetChangeList gets all existing changes.
func (n *NetworkElementServer) GetChangeList(ctx context.Context, request *mnepb.GetChangeListRequest) (*mnepb.GetChangeListResponse, error) {
if err := n.protoValidator.Validate(request); err != nil {
return nil, status.Errorf(codes.Aborted, "%v", err)
}
labels := prometheus.Labels{"service": "mne", "rpc": "get"}
start := metrics.StartHook(labels, grpcRequestsTotal)
defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
pnd, err := n.getPnd(request.Pid)
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
changes, err := n.fillChanges(true, "")
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
return &mnepb.GetChangeListResponse{
Timestamp: time.Now().UnixNano(),
Pnd: &ppb.PrincipalNetworkDomain{
Id: pnd.ID().String(),
Name: pnd.GetName(),
Description: pnd.GetDescription(),
},
Change: changes,
}, nil
}
func (n *NetworkElementServer) fillChanges(all bool, cuid ...string) ([]*mnepb.Change, error) {
var changeList []uuid.UUID
switch all {
case true:
changeList = n.changeStore.Pending()
changeList = append(changeList, n.changeStore.Committed()...)
default:
var err error
if len(cuid) == 0 {
return nil, &customerrs.InvalidParametersError{
Func: n.fillChanges,
Param: "length of 'mneID' cannot be '0' when 'all' is set to 'false'",
}
}
changeList, err = stringArrayToUUIDs(cuid)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
}
changes := make([]*mnepb.Change, len(changeList))
for i, ch := range changeList {
c, err := n.changeStore.GetChange(ch)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
// Diff could be added here.
changes[i] = &mnepb.Change{
Id: ch.String(),
Age: c.Age().Microseconds(),
State: c.State(),
}
}
return changes, nil
}
func stringArrayToUUIDs(sid []string) ([]uuid.UUID, error) {
UUIDs := make([]uuid.UUID, len(sid))
for i, id := range sid {
parsed, err := uuid.Parse(id)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
UUIDs[i] = parsed
}
return UUIDs, nil
}
// AddList adds the list of mnes to the storage.
Fabian Seidl
committed
func (n *NetworkElementServer) AddList(ctx context.Context, request *mnepb.AddListRequest) (*mnepb.AddListResponse, error) {
if err := n.protoValidator.Validate(request); err != nil {
return nil, status.Errorf(codes.Aborted, "%v", err)
}
labels := prometheus.Labels{"service": "mne", "rpc": "set"}
start := metrics.StartHook(labels, grpcRequestsTotal)
defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
pndID, err := uuid.Parse(request.Pid)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
networkElementIDs := make([]uuid.UUID, 0, len(request.Mne))
for _, r := range request.Mne {
pluginId, err := uuid.Parse(r.GetPluginId())
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
Fabian Seidl
committed
gNMISubscriptionPaths := n.get2dSlicePathsFrom1dPaths(r.GnmiSubscribePaths)
mneID, err := uuid.Parse(r.MneId)
if err != nil {
log.Errorf("Error parsing UUID: %v. For: %v, ", r.MneId, r.MneName)
}
mneID, err = n.addMne(ctx, r.MneName, r.TransportOption, nil, pluginId, pndID, gNMISubscriptionPaths, mneID)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
networkElementIDs = append(networkElementIDs, mneID)
}
r := make([]*mnepb.SetResponse, len(networkElementIDs))
for i, mneID := range networkElementIDs {
r[i] = &mnepb.SetResponse{Id: mneID.String()}
Fabian Seidl
committed
return &mnepb.AddListResponse{
Timestamp: time.Now().UnixNano(),
Responses: r,
}, nil
}
Fabian Seidl
committed
func (n *NetworkElementServer) addMne(ctx context.Context,
name string,
opt *tpb.TransportOption,
requestPluginFunc func(uuid.UUID) (plugin.Plugin, error),
pluginID uuid.UUID, pndID uuid.UUID,
gnmiSubscriptionPaths [][]string,
optionalNetworkElementID ...uuid.UUID) (mneId uuid.UUID, err error) {
Malte Bauch
committed
var plugin plugin.Plugin
var mne networkelement.NetworkElement
defer func() {
if err != nil && plugin != nil {
// clean up plugin
if pErr := plugin.Remove(); pErr != nil {
err = multierror.Append(err, pErr)
}
}
}()
networkElementID := uuid.Nil
if len(optionalNetworkElementID) > 0 {
networkElementID = optionalNetworkElementID[0]
}
if requestPluginFunc == nil {
requestPluginFunc = n.pluginService.RequestPlugin
}
Malte Bauch
committed
plugin, err = requestPluginFunc(pluginID)
if err != nil {
return uuid.Nil, err
}
Fabian Seidl
committed
mne, err = nucleus.NewNetworkElement(name, networkElementID, opt, pndID, plugin, gnmiSubscriptionPaths, conflict.Metadata{ResourceVersion: 0})
if err != nil {
return uuid.Nil, err
}
if mne.IsTransportValid() {
Malte Bauch
committed
err = n.initialNetworkElementRootPathRequest(ctx, mne, plugin)
if err != nil {
return uuid.Nil, err
}
Malte Bauch
committed
err = n.mneService.Add(mne)
if err != nil {
return uuid.Nil, err
}
Malte Bauch
committed
err = n.pluginService.Add(plugin)
Malte Bauch
committed
if err := n.mneService.Delete(mne); err != nil {
return uuid.Nil, err
}
return uuid.Nil, err
}
Fabian Seidl
committed
if mne.GetGnmiSubscriptionPaths() != nil || config.GetGnmiSubscriptionPaths() != nil {
n.networkElementWatchter.SubscribeToNetworkElement(mne, nil)
}
} else {
Malte Bauch
committed
err = fmt.Errorf("invalid transport data provided")
return uuid.Nil, err
}
return mne.ID(), nil
}
Malte Bauch
committed
func (n *NetworkElementServer) initialNetworkElementRootPathRequest(ctx context.Context, mne networkelement.NetworkElement, plugin plugin.Plugin) error {
resp, err := n.getPath(ctx, mne, "/")
if err != nil {
return err
}
err = mne.ProcessResponse(resp)
if err != nil {
return err
}
return nil
}
// SetChangeList sets a list of changes.
func (n *NetworkElementServer) SetChangeList(ctx context.Context, request *mnepb.SetChangeListRequest) (*mnepb.SetChangeListResponse, error) {
labels := prometheus.Labels{"service": "mne", "rpc": "set"}
start := metrics.StartHook(labels, grpcRequestsTotal)
defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
if err := n.protoValidator.Validate(request); err != nil {
return nil, status.Errorf(codes.Aborted, "%v", err)
}
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
responses := make([]*mnepb.SetResponse, len(request.Change))
for i, r := range request.Change {
cuid, err := uuid.Parse(r.Cuid)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
switch r.Op {
case mnepb.Operation_OPERATION_COMMIT:
if err := n.Commit(cuid); err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
case mnepb.Operation_OPERATION_CONFIRM:
if err := n.Confirm(cuid); err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
default:
return nil, &customerrs.InvalidParametersError{
Param: r.Op,
}
}
responses[i] = &mnepb.SetResponse{
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
}
}
return &mnepb.SetChangeListResponse{
Timestamp: time.Now().UnixNano(),
Responses: responses,
}, nil
}
// Commit calls commit on the pending change with ID.
func (n *NetworkElementServer) Commit(u uuid.UUID) error {
ch, err := n.changeStore.GetChange(u)
if err != nil {
return err
}
if err := ch.Commit(); err != nil {
return err
}
// Set all the changes within the network elements model
networkElement, err := n.mneService.Get(store.Query{ID: ch.AssociatedDeviceID()})
if err != nil {
return err
}
diff, err := networkElement.GetPlugin().Diff(ch.PreviousState(), ch.IntendedState())
if err != nil {
return err
}
for _, update := range diff.GetUpdate() {
if err := networkElement.GetPlugin().SetNode(update.GetPath(), update.GetVal()); err != nil {
return err
}
}
for _, deletePath := range diff.GetDelete() {
if err := networkElement.GetPlugin().DeleteNode(deletePath); err != nil {
return err
}
}
// update the network element
return n.mneService.Update(networkElement)
}
// Confirm calls confirm on pending the pending change with ID.
func (n *NetworkElementServer) Confirm(u uuid.UUID) error {
ch, err := n.changeStore.GetChange(u)
if err != nil {
return err
}
return ch.Confirm()
}
// SetPathList sets a list of paths.
func (n *NetworkElementServer) SetPathList(ctx context.Context, request *mnepb.SetPathListRequest) (*mnepb.SetPathListResponse, error) {
labels := prometheus.Labels{"service": "mne", "rpc": "set"}
start := metrics.StartHook(labels, grpcRequestsTotal)
defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
if err := n.protoValidator.Validate(request); err != nil {
return nil, status.Errorf(codes.Aborted, "%v", err)
}
responses := make([]*mnepb.SetResponse, len(request.ChangeRequest))
for i, r := range request.ChangeRequest {
var cid uuid.UUID
var err error
mneID, err := uuid.Parse(r.Mneid)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
// NOTE: Could be useful to split method into two, one for
// update/replace and on for delete.
cid, err = n.ChangeMNE(mneID, r.ApiOp, r.Path, r.Value)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
responses[i] = &mnepb.SetResponse{
}
}
return &mnepb.SetPathListResponse{
Timestamp: time.Now().UnixNano(),
Responses: responses,
}, nil
}
// ChangeMNE creates a change from the provided Operation, path and value.
// The Change is Pending and times out after the specified timeout period.
func (n *NetworkElementServer) ChangeMNE(duid uuid.UUID, operation mnepb.ApiOperation, path *gnmi.Path, value *gnmi.TypedValue) (uuid.UUID, error) {
mne, err := n.mneService.Get(store.Query{
ID: duid,
})
if err != nil {
return uuid.Nil, err
}
plugin := mne.GetPlugin()
validatedChangeModel, err := plugin.ValidateChange(operation, path, value)
if err != nil {
return uuid.Nil, err
}
filteredMarshaledModel, err := plugin.PruneConfigFalse(validatedChangeModel)
if err != nil {
return uuid.Nil, err
}
currentModel, err := mne.GetModelAsFilteredCopy()
if err != nil {
return uuid.Nil, err
diff, err := plugin.Diff(currentModel, filteredMarshaledModel)
if err != nil {
return uuid.Nil, err
}
if util.IsGNMINotificationEmpty(diff) {
return uuid.Nil, customerrs.NoNewChangesError{Original: string(currentModel), Modified: string(filteredMarshaledModel)}
}
callback := func(original, modified []byte) error {
ctx := context.WithValue(context.Background(), types.CtxKeyOperation, operation) // nolint
payload := change.Payload{Original: original, Modified: modified, Diff: diff}
return mne.Transport().Set(ctx, payload)
}
ch := nucleus.NewChange(duid, currentModel, filteredMarshaledModel, diff, callback)
if err := n.changeStore.Add(ch); err != nil {
return uuid.Nil, err
}
return ch.ID(), nil
}
// Delete deletes a mne.
Fabian Seidl
committed
func (n *NetworkElementServer) Delete(ctx context.Context, request *mnepb.DeleteRequest) (*mnepb.DeleteResponse, error) {
if err := n.protoValidator.Validate(request); err != nil {
return nil, status.Errorf(codes.Aborted, "%v", err)
}
mneID, err := uuid.Parse(request.Mneid)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
if err := n.deleteMne(mneID); err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
Fabian Seidl
committed
return &mnepb.DeleteResponse{
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
Timestamp: time.Now().UnixNano(),
}, nil
}
func (n *NetworkElementServer) deleteMne(id uuid.UUID) error {
mne, err := n.mneService.Get(store.Query{
ID: id,
Name: id.String(),
})
if err != nil {
return err
}
// Note: cSBI not supported currently, so this is commented fow now.
// Might be needed or removed in the future.
//
// switch mne.(type) {
// case *CsbiNetworkElement:
// return pnd.handleCsbiDeletion(mne)
// default:
// return pnd.networkElementService.Delete(mne)
// }
return n.mneService.Delete(mne)
}
// SubscribePath subscribes to specifc paths of an mne.
func (n *NetworkElementServer) SubscribePath(request *mnepb.SubscribePathRequest, stream mnepb.NetworkElementService_SubscribePathServer) error {
networkElement, err := n.getMne(request.Mneid)
log.Error(err)
return status.Errorf(codes.Aborted, "%v", err)
if err := n.subscribePath(networkElement, request.Sublist); err != nil {
return err
}
return nil
}
func (n *NetworkElementServer) subscribePath(networkElement networkelement.NetworkElement, subList *mnepb.SubscriptionList) error {
mode, err := n.mapModeToAristaFork(subList.GetMode())
if err != nil {
return err
}
for _, sub := range subList.Subscription {
streamMode, err := n.mapStreamModeToAristaFork(sub.GetStreamMode())
if err != nil {
return err
}
opts := &aGNMI.SubscribeOptions{
Mode: mode,
StreamMode: streamMode,
Paths: [][]string{n.splitStringPath(sub.GetPath())},
SampleInterval: sub.SampleInterval,
}
ctx := context.Background()
ctx = context.WithValue(ctx, types.CtxKeyOpts, opts)
if err = networkElement.Transport().Subscribe(ctx); err != nil {
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
return err
}
}
return nil
}
func (n *NetworkElementServer) splitStringPath(s string) []string {
return strings.Split(s, "/")
}
func (n *NetworkElementServer) mapStreamModeToAristaFork(mode mnepb.StreamMode) (string, error) {
switch mode {
case mnepb.StreamMode_STREAM_MODE_TARGET_DEFINED:
return "target_defined", nil
case mnepb.StreamMode_STREAM_MODE_ON_CHANGE:
return "on_change", nil
case mnepb.StreamMode_STREAM_MODE_SAMPLE:
return "sample", nil
default:
return "", fmt.Errorf("StreamMode of type: %T is not supported", mode)
}
}
func (n *NetworkElementServer) mapModeToAristaFork(mode mnepb.SubscriptionMode) (string, error) {
switch mode {
case mnepb.SubscriptionMode_SUBSCRIPTION_MODE_STREAM:
return "stream", nil
case mnepb.SubscriptionMode_SUBSCRIPTION_MODE_ONCE:
return "once", nil
case mnepb.SubscriptionMode_SUBSCRIPTION_MODE_POLL: