Newer
Older
package server
import (
"context"
"time"
mnepb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/networkelement"
plugin_registry "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/plugin-registry"
ppb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/pnd"
tpb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/transport"
Fabian Seidl
committed
"code.fbi.h-da.de/danet/gosdn/controller/config"
"code.fbi.h-da.de/danet/gosdn/controller/conflict"
"code.fbi.h-da.de/danet/gosdn/controller/customerrs"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/change"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/networkdomain"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/networkelement"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/plugin"
"code.fbi.h-da.de/danet/gosdn/controller/metrics"
"code.fbi.h-da.de/danet/gosdn/controller/nucleus"
"code.fbi.h-da.de/danet/gosdn/controller/nucleus/types"
util "code.fbi.h-da.de/danet/gosdn/controller/nucleus/util/gnmi"
"code.fbi.h-da.de/danet/gosdn/controller/store"
aGNMI "code.fbi.h-da.de/danet/gosdn/forks/goarista/gnmi"
"code.fbi.h-da.de/danet/gosdn/models/generated/openconfig"
"github.com/bufbuild/protovalidate-go"
"github.com/google/uuid"
Malte Bauch
committed
"github.com/hashicorp/go-multierror"
"github.com/openconfig/gnmi/proto/gnmi"
Fabian Seidl
committed
ygotutil "github.com/openconfig/ygot/util"
"github.com/openconfig/ygot/ygot"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
)
// NetworkElementServer represents a NetworkElementServer.
type NetworkElementServer struct {
mnepb.UnimplementedNetworkElementServiceServer
mneService networkelement.Service
pndService networkdomain.Service
pluginService plugin.Service
changeStore store.ChangeStore
protoValidator *protovalidate.Validator
networkElementWatchter *nucleus.NetworkElementWatcher
}
// NewNetworkElementServer returns a new NetWorkElementServer.
func NewNetworkElementServer(
mneService networkelement.Service,
pndService networkdomain.Service,
pluginService plugin.Service,
changeStore store.ChangeStore,
protoValidator *protovalidate.Validator,
networkElementWatchter *nucleus.NetworkElementWatcher,
return &NetworkElementServer{
mneService: mneService,
pndService: pndService,
pluginService: pluginService,
changeStore: changeStore,
protoValidator: protoValidator,
networkElementWatchter: networkElementWatchter,
}
}
// Update updates a network element.
func (n *NetworkElementServer) Update(ctx context.Context, request *mnepb.UpdateNetworkElementRequest) (*mnepb.UpdateNetworkElementResponse, error) {
if err := n.protoValidator.Validate(request); err != nil {
return nil, status.Errorf(codes.Aborted, "%v", err)
}
mneID, err := uuid.Parse(request.NetworkElement.Id)
if err != nil {
return nil, err
}
err = n.mneService.UpdateModel(mneID, request.NetworkElement.Model)
if err != nil {
return nil, err
networkElement, err := n.mneService.Get(store.Query{ID: mneID})
if err != nil {
return nil, err
}
err = networkelement.EnsureIntendedConfigurationIsAppliedOnNetworkElement(networkElement)
if err != nil {
}
return &mnepb.UpdateNetworkElementResponse{
Timestamp: time.Now().UnixNano(),
}, nil
}
// Get gets a specific mne.
Fabian Seidl
committed
func (n *NetworkElementServer) Get(ctx context.Context, request *mnepb.GetRequest) (*mnepb.GetResponse, error) {
if err := n.protoValidator.Validate(request); err != nil {
return nil, status.Errorf(codes.Aborted, "%v", err)
}
labels := prometheus.Labels{"service": "mne", "rpc": "get"}
start := metrics.StartHook(labels, grpcRequestsTotal)
defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
pnd, err := n.getPnd(request.Pid)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
networkElement, err := n.getMne(request.Mneid)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
mne := &mnepb.ManagedNetworkElement{
Id: networkElement.ID().String(),
Name: networkElement.Name(),
TransportAddress: networkElement.TransportAddress(),
}
err = fillMneBySpecificPath(mne, networkElement, "/", false)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
err = fillMneWithPluginInformation(mne, networkElement)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
mne.Model, err = networkElement.GetModelAsString()
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
Fabian Seidl
committed
return &mnepb.GetResponse{
Timestamp: time.Now().UnixNano(),
Pnd: &ppb.PrincipalNetworkDomain{
Id: pnd.ID().String(),
Name: pnd.GetName(),
Description: pnd.GetDescription(),
},
Mne: mne,
}, nil
}
// GetFlattened gets a specific mne.
Fabian Seidl
committed
func (n *NetworkElementServer) GetFlattened(ctx context.Context, request *mnepb.GetFlattenedRequest) (*mnepb.GetFlattenedResponse, error) {
if err := n.protoValidator.Validate(request); err != nil {
return nil, status.Errorf(codes.Aborted, "%v", err)
}
labels := prometheus.Labels{"service": "pnd", "rpc": "get"}
start := metrics.StartHook(labels, grpcRequestsTotal)
defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
networkElement, err := n.getMne(request.Mneid)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
Fabian Seidl
committed
return &mnepb.GetFlattenedResponse{
Timestamp: time.Now().UnixNano(),
Mne: &mnepb.FlattenedManagedNetworkElement{
Id: networkElement.ID().String(),
Name: networkElement.Name(),
Pid: networkElement.PndID().String(),
Pluginid: networkElement.GetPlugin().ID().String(),
},
}, nil
}
func (n *NetworkElementServer) ParseYang(ctx context.Context, request *mnepb.ParseYangRequest) (*mnepb.ParseYangResponse, error) {
if err := n.protoValidator.Validate(request); err != nil {
return nil, status.Errorf(codes.Aborted, "%v", err)
}
device := &openconfig.Device{}
if err := openconfig.Unmarshal([]byte(request.Yang), device); err != nil {
log.Error(err)
return nil, err
}
json, err := ygot.EmitJSON(device, &ygot.EmitJSONConfig{
Format: ygot.RFC7951,
Indent: " ",
RFC7951Config: &ygot.RFC7951JSONConfig{
AppendModuleName: true,
},
})
if err != nil {
log.Error(err)
return nil, err
}
return &mnepb.ParseYangResponse{
Timestamp: time.Now().UnixNano(),
func (n *NetworkElementServer) getMne(identifier string) (networkelement.NetworkElement, error) {
id, err := uuid.Parse(identifier)
if err != nil {
id = uuid.Nil
}
mne, err := n.mneService.Get(store.Query{
ID: id,
})
if mne == nil {
return nil, fmt.Errorf("no network element found")
}
if err != nil {
return nil, err
}
return mne, nil
}
func (n *NetworkElementServer) getPnd(identifier string) (networkdomain.NetworkDomain, error) {
pid, err := uuid.Parse(identifier)
if err != nil {
return nil, err
}
pnd, err := n.pndService.Get(store.Query{ID: pid})
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
return pnd, nil
}
// GetAll returns a list of existing mnes.
Fabian Seidl
committed
func (n *NetworkElementServer) GetAll(ctx context.Context, request *mnepb.GetAllRequest) (*mnepb.GetAllResponse, error) {
if err := n.protoValidator.Validate(request); err != nil {
return nil, status.Errorf(codes.Aborted, "%v", err)
}
labels := prometheus.Labels{"service": "mne", "rpc": "get"}
start := metrics.StartHook(labels, grpcRequestsTotal)
defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
pnd, err := n.getPnd(request.Pid)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
storedMNE, err := n.mneService.GetAll()
if err != nil {
return nil, err
}
mnes := make([]*mnepb.ManagedNetworkElement, len(storedMNE))
for i, networkElement := range storedMNE {
mne := &mnepb.ManagedNetworkElement{
Id: networkElement.ID().String(),
Name: networkElement.Name(),
TransportAddress: networkElement.TransportAddress(),
}
err := fillMneBySpecificPath(mne, networkElement, "/", false)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
err = fillMneWithPluginInformation(mne, networkElement)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
mnes[i] = mne
}
Fabian Seidl
committed
return &mnepb.GetAllResponse{
Timestamp: time.Now().UnixNano(),
Pnd: &ppb.PrincipalNetworkDomain{
Id: pnd.ID().String(),
Name: pnd.GetName(),
Description: pnd.GetDescription(),
},
Mne: mnes,
}, nil
}
// GetAllFlattened returns a list of existing mnes.
Fabian Seidl
committed
func (n *NetworkElementServer) GetAllFlattened(ctx context.Context, request *mnepb.GetAllFlattenedRequest) (*mnepb.GetAllFlattenedResponse, error) {
if err := n.protoValidator.Validate(request); err != nil {
return nil, status.Errorf(codes.Aborted, "%v", err)
}
labels := prometheus.Labels{"service": "mne", "rpc": "get"}
start := metrics.StartHook(labels, grpcRequestsTotal)
defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
pnd, err := n.getPnd(request.Pid)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
mnes, err := n.mneService.GetAllAsLoaded()
if err != nil {
return nil, err
}
flattenedMnes := make([]*mnepb.FlattenedManagedNetworkElement, len(mnes))
for i, mne := range mnes {
mne := &mnepb.FlattenedManagedNetworkElement{
Id: mne.ID,
Name: mne.Name,
Pluginid: mne.Plugin,
Pid: mne.PndID,
}
flattenedMnes[i] = mne
}
Fabian Seidl
committed
return &mnepb.GetAllFlattenedResponse{
Timestamp: time.Now().UnixNano(),
Pnd: &ppb.PrincipalNetworkDomain{
Id: pnd.ID().String(),
Name: pnd.GetName(),
Description: pnd.GetDescription(),
},
Mne: flattenedMnes,
}, nil
}
func fillMneWithPluginInformation(mne *mnepb.ManagedNetworkElement, nme networkelement.NetworkElement) error {
plugin := nme.GetPlugin()
pluginManifest := plugin.Manifest()
mne.Plugin = &plugin_registry.Plugin{
Id: plugin.ID().String(),
Manifest: &plugin_registry.Manifest{
Name: pluginManifest.Name,
Firmware: pluginManifest.Firmware,
Author: pluginManifest.Author,
Version: pluginManifest.Version,
},
}
return nil
}
func fillMneBySpecificPath(mne *mnepb.ManagedNetworkElement, nme networkelement.NetworkElement, path string, requestForIntendedState bool) error {
gnmiPath, err := ygot.StringToStructuredPath(path)
if err != nil {
log.Error(err)
return status.Errorf(codes.Aborted, "%v", err)
plugin := nme.GetPlugin()
notifications, err := plugin.GetNode(gnmiPath, requestForIntendedState)
return status.Errorf(codes.Aborted, "%v", err)
mne.MneNotification = notifications
return nil
// GetPath gets an actual state of the path on a mne.
func (n *NetworkElementServer) GetPath(ctx context.Context, request *mnepb.GetPathRequest) (*mnepb.GetPathResponse, error) {
if err := n.protoValidator.Validate(request); err != nil {
return nil, status.Errorf(codes.Aborted, "%v", err)
}
labels := prometheus.Labels{"service": "mne", "rpc": "get"}
start := metrics.StartHook(labels, grpcRequestsTotal)
defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
pnd, err := n.getPnd(request.Pid)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
networkElement, err := n.getMne(request.Mneid)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
// In case we get the path from grpc-gateway we have to replace
path := strings.ReplaceAll(request.Path, "||", "/")
resp, err := n.getPath(ctx, networkElement, path)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
return &mnepb.GetPathResponse{
Timestamp: time.Now().UnixNano(),
Pnd: &ppb.PrincipalNetworkDomain{
Id: pnd.ID().String(),
Name: pnd.GetName(),
Description: pnd.GetDescription(),
},
MneNotification: resp.Notification,
func (n *NetworkElementServer) getPath(ctx context.Context, mne networkelement.NetworkElement, path string) (*gnmi.GetResponse, error) {
res, err := mne.Transport().Get(ctx, path)
if err != nil {
return nil, err
}
protoMessage, ok := res.(proto.Message)
if !ok {
return nil, &customerrs.InvalidTypeAssertionError{
Value: res,
Type: (*proto.Message)(nil),
}
}
getResponse, ok := protoMessage.(*gnmi.GetResponse)
if !ok {
return nil, &customerrs.InvalidTypeAssertionError{
Value: res,
Type: (*gnmi.GetResponse)(nil),
}
return getResponse, nil
}
// GetIntendedPath gets a path as the intended state stored in the storage.
func (n *NetworkElementServer) GetIntendedPath(ctx context.Context, request *mnepb.GetIntendedPathRequest) (*mnepb.GetIntendedPathResponse, error) {
if err := n.protoValidator.Validate(request); err != nil {
return nil, status.Errorf(codes.Aborted, "%v", err)
}
pnd, err := n.getPnd(request.Pid)
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
// In case we get the path from grpc-gateway we have to replace
intendedPath := strings.ReplaceAll(request.IntendedPath, "||", "/")
networkElement, err := n.getMne(request.Mneid)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
mne := &mnepb.ManagedNetworkElement{}
err = fillMneBySpecificPath(mne, networkElement, intendedPath, true)
if err != nil {
return nil, err
}
return &mnepb.GetIntendedPathResponse{
Timestamp: time.Now().UnixNano(),
Pnd: &ppb.PrincipalNetworkDomain{
Id: pnd.ID().String(),
Name: pnd.GetName(),
Description: pnd.GetDescription(),
},
MneNotification: mne.MneNotification,
}, nil
}
// GetChange gets a specific change of a mne.
func (n *NetworkElementServer) GetChange(ctx context.Context, request *mnepb.GetChangeRequest) (*mnepb.GetChangeResponse, error) {
if err := n.protoValidator.Validate(request); err != nil {
return nil, status.Errorf(codes.Aborted, "%v", err)
}
labels := prometheus.Labels{"service": "mne", "rpc": "get"}
start := metrics.StartHook(labels, grpcRequestsTotal)
defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
pnd, err := n.getPnd(request.Pid)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
changes, err := n.fillChanges(false, request.Cuid...)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
return &mnepb.GetChangeResponse{
Timestamp: time.Now().UnixNano(),
Pnd: &ppb.PrincipalNetworkDomain{
Id: pnd.ID().String(),
Name: pnd.GetName(),
Description: pnd.GetDescription(),
},
Change: changes,
}, nil
}
// GetChangeList gets all existing changes.
func (n *NetworkElementServer) GetChangeList(ctx context.Context, request *mnepb.GetChangeListRequest) (*mnepb.GetChangeListResponse, error) {
if err := n.protoValidator.Validate(request); err != nil {
return nil, status.Errorf(codes.Aborted, "%v", err)
}
labels := prometheus.Labels{"service": "mne", "rpc": "get"}
start := metrics.StartHook(labels, grpcRequestsTotal)
defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
pnd, err := n.getPnd(request.Pid)
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
changes, err := n.fillChanges(true, "")
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
return &mnepb.GetChangeListResponse{
Timestamp: time.Now().UnixNano(),
Pnd: &ppb.PrincipalNetworkDomain{
Id: pnd.ID().String(),
Name: pnd.GetName(),
Description: pnd.GetDescription(),
},
Change: changes,
}, nil
}
func (n *NetworkElementServer) fillChanges(all bool, cuid ...string) ([]*mnepb.Change, error) {
var changeList []uuid.UUID
switch all {
case true:
changeList = n.changeStore.Pending()
changeList = append(changeList, n.changeStore.Committed()...)
default:
var err error
if len(cuid) == 0 {
return nil, &customerrs.InvalidParametersError{
Func: n.fillChanges,
Param: "length of 'mneID' cannot be '0' when 'all' is set to 'false'",
}
}
changeList, err = stringArrayToUUIDs(cuid)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
}
changes := make([]*mnepb.Change, len(changeList))
for i, ch := range changeList {
c, err := n.changeStore.GetChange(ch)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
// Diff could be added here.
changes[i] = &mnepb.Change{
Id: ch.String(),
Age: c.Age().Microseconds(),
State: c.State(),
}
}
return changes, nil
}
func stringArrayToUUIDs(sid []string) ([]uuid.UUID, error) {
UUIDs := make([]uuid.UUID, len(sid))
for i, id := range sid {
parsed, err := uuid.Parse(id)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
UUIDs[i] = parsed
}
return UUIDs, nil
}
// AddList adds the list of mnes to the storage.
Fabian Seidl
committed
func (n *NetworkElementServer) AddList(ctx context.Context, request *mnepb.AddListRequest) (*mnepb.AddListResponse, error) {
if err := n.protoValidator.Validate(request); err != nil {
return nil, status.Errorf(codes.Aborted, "%v", err)
}
labels := prometheus.Labels{"service": "mne", "rpc": "set"}
start := metrics.StartHook(labels, grpcRequestsTotal)
defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
pndID, err := uuid.Parse(request.Pid)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
networkElementIDs := make([]uuid.UUID, 0, len(request.Mne))
for _, r := range request.Mne {
pluginId, err := uuid.Parse(r.GetPluginId())
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
Fabian Seidl
committed
gNMISubscriptionPaths := n.get2dSlicePathsFrom1dPaths(r.GnmiSubscribePaths)
mneID, err := uuid.Parse(r.MneId)
if err != nil {
log.Errorf("Error parsing UUID: %v. For: %v, ", r.MneId, r.MneName)
}
mneID, err = n.addMne(ctx, r.MneName, r.TransportOption, nil, pluginId, pndID, gNMISubscriptionPaths, mneID)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
networkElementIDs = append(networkElementIDs, mneID)
}
r := make([]*mnepb.SetResponse, len(networkElementIDs))
for i, mneID := range networkElementIDs {
r[i] = &mnepb.SetResponse{Id: mneID.String()}
Fabian Seidl
committed
return &mnepb.AddListResponse{
Timestamp: time.Now().UnixNano(),
Responses: r,
}, nil
}
Fabian Seidl
committed
func (n *NetworkElementServer) addMne(ctx context.Context,
name string,
opt *tpb.TransportOption,
requestPluginFunc func(uuid.UUID) (plugin.Plugin, error),
pluginID uuid.UUID, pndID uuid.UUID,
gnmiSubscriptionPaths [][]string,
optionalNetworkElementID ...uuid.UUID) (mneId uuid.UUID, err error) {
Malte Bauch
committed
var plugin plugin.Plugin
var mne networkelement.NetworkElement
defer func() {
if err != nil && plugin != nil {
// clean up plugin
if pErr := plugin.Remove(); pErr != nil {
err = multierror.Append(err, pErr)
}
}
}()
networkElementID := uuid.Nil
if len(optionalNetworkElementID) > 0 {
networkElementID = optionalNetworkElementID[0]
}
if requestPluginFunc == nil {
requestPluginFunc = n.pluginService.RequestPlugin
}
Malte Bauch
committed
plugin, err = requestPluginFunc(pluginID)
if err != nil {
return uuid.Nil, err
}
Fabian Seidl
committed
mne, err = nucleus.NewNetworkElement(name, networkElementID, opt, pndID, plugin, gnmiSubscriptionPaths, conflict.Metadata{ResourceVersion: 0})
if err != nil {
return uuid.Nil, err
}
if mne.IsTransportValid() {
err = n.initialNetworkElementRootPathRequest(ctx, mne)
if err != nil {
return uuid.Nil, err
}
Malte Bauch
committed
err = n.mneService.Add(mne)
if err != nil {
return uuid.Nil, err
}
Malte Bauch
committed
err = n.pluginService.Add(plugin)
Malte Bauch
committed
if err := n.mneService.Delete(mne); err != nil {
return uuid.Nil, err
}
return uuid.Nil, err
}
Fabian Seidl
committed
if mne.GetGnmiSubscriptionPaths() != nil || config.GetGnmiSubscriptionPaths() != nil {
n.networkElementWatchter.SubscribeToNetworkElement(mne, nil)
}
} else {
Malte Bauch
committed
err = fmt.Errorf("invalid transport data provided")
return uuid.Nil, err
}
return mne.ID(), nil
}
func (n *NetworkElementServer) initialNetworkElementRootPathRequest(ctx context.Context, mne networkelement.NetworkElement) error {
Malte Bauch
committed
resp, err := n.getPath(ctx, mne, "/")
if err != nil {
return err
}
err = mne.ProcessResponse(resp)
if err != nil {
return err
}
return nil
}
// SetChangeList sets a list of changes.
func (n *NetworkElementServer) SetChangeList(ctx context.Context, request *mnepb.SetChangeListRequest) (*mnepb.SetChangeListResponse, error) {
labels := prometheus.Labels{"service": "mne", "rpc": "set"}
start := metrics.StartHook(labels, grpcRequestsTotal)
defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
if err := n.protoValidator.Validate(request); err != nil {
return nil, status.Errorf(codes.Aborted, "%v", err)
}
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
responses := make([]*mnepb.SetResponse, len(request.Change))
for i, r := range request.Change {
cuid, err := uuid.Parse(r.Cuid)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
switch r.Op {
case mnepb.Operation_OPERATION_COMMIT:
if err := n.Commit(cuid); err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
case mnepb.Operation_OPERATION_CONFIRM:
if err := n.Confirm(cuid); err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
default:
return nil, &customerrs.InvalidParametersError{
Param: r.Op,
}
}
responses[i] = &mnepb.SetResponse{
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
}
}
return &mnepb.SetChangeListResponse{
Timestamp: time.Now().UnixNano(),
Responses: responses,
}, nil
}
// Commit calls commit on the pending change with ID.
func (n *NetworkElementServer) Commit(u uuid.UUID) error {
ch, err := n.changeStore.GetChange(u)
if err != nil {
return err
}
if err := ch.Commit(); err != nil {
return err
}
// Set all the changes within the network elements model
networkElement, err := n.mneService.Get(store.Query{ID: ch.AssociatedDeviceID()})
if err != nil {
return err
}
diff, err := networkElement.GetPlugin().Diff(ch.PreviousState(), ch.IntendedState())
if err != nil {
return err
}
for _, update := range diff.GetUpdate() {
if err := networkElement.GetPlugin().SetNode(update.GetPath(), update.GetVal()); err != nil {
return err
}
}
for _, deletePath := range diff.GetDelete() {
if err := networkElement.GetPlugin().DeleteNode(deletePath); err != nil {
return err
}
}
// update the network element
return n.mneService.Update(networkElement)
}
// Confirm calls confirm on pending the pending change with ID.
func (n *NetworkElementServer) Confirm(u uuid.UUID) error {
ch, err := n.changeStore.GetChange(u)
if err != nil {
return err
}
return ch.Confirm()
}
// SetPathList sets a list of paths.
func (n *NetworkElementServer) SetPathList(ctx context.Context, request *mnepb.SetPathListRequest) (*mnepb.SetPathListResponse, error) {
labels := prometheus.Labels{"service": "mne", "rpc": "set"}
start := metrics.StartHook(labels, grpcRequestsTotal)
defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
if err := n.protoValidator.Validate(request); err != nil {
return nil, status.Errorf(codes.Aborted, "%v", err)
}
responses := make([]*mnepb.SetResponse, len(request.ChangeRequest))
for i, r := range request.ChangeRequest {
var cid uuid.UUID
var err error
mneID, err := uuid.Parse(r.Mneid)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
// NOTE: Could be useful to split method into two, one for
// update/replace and on for delete.
cid, err = n.ChangeMNE(mneID, r.ApiOp, r.Path, r.Value)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
responses[i] = &mnepb.SetResponse{
}
}
return &mnepb.SetPathListResponse{
Timestamp: time.Now().UnixNano(),
Responses: responses,
}, nil
}
// ChangeMNE creates a change from the provided Operation, path and value.
// The Change is Pending and times out after the specified timeout period.
func (n *NetworkElementServer) ChangeMNE(duid uuid.UUID, operation mnepb.ApiOperation, path *gnmi.Path, value *gnmi.TypedValue) (uuid.UUID, error) {
mne, err := n.mneService.Get(store.Query{
ID: duid,
})
if err != nil {
return uuid.Nil, err
}
plugin := mne.GetPlugin()
validatedChangeModel, err := plugin.ValidateChange(operation, path, value)
if err != nil {
return uuid.Nil, err
}
filteredMarshaledModel, err := plugin.PruneConfigFalse(validatedChangeModel)
if err != nil {
return uuid.Nil, err
}
currentModel, err := mne.GetModelAsFilteredCopy()
if err != nil {
return uuid.Nil, err
diff, err := plugin.Diff(currentModel, filteredMarshaledModel)
if err != nil {
return uuid.Nil, err
}
if util.IsGNMINotificationEmpty(diff) {
return uuid.Nil, customerrs.NoNewChangesError{Original: string(currentModel), Modified: string(filteredMarshaledModel)}
}
callback := func(original, modified []byte) error {
ctx := context.WithValue(context.Background(), types.CtxKeyOperation, operation) // nolint
payload := change.Payload{Original: original, Modified: modified, Diff: diff}
return mne.Transport().Set(ctx, payload)
}
ch := nucleus.NewChange(duid, currentModel, filteredMarshaledModel, diff, callback)
if err := n.changeStore.Add(ch); err != nil {
return uuid.Nil, err
}
return ch.ID(), nil
}
// Delete deletes a mne.
Fabian Seidl
committed
func (n *NetworkElementServer) Delete(ctx context.Context, request *mnepb.DeleteRequest) (*mnepb.DeleteResponse, error) {
if err := n.protoValidator.Validate(request); err != nil {
return nil, status.Errorf(codes.Aborted, "%v", err)
}
mneID, err := uuid.Parse(request.Mneid)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
if err := n.deleteMne(mneID); err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
Fabian Seidl
committed
return &mnepb.DeleteResponse{
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
Timestamp: time.Now().UnixNano(),
}, nil
}
func (n *NetworkElementServer) deleteMne(id uuid.UUID) error {
mne, err := n.mneService.Get(store.Query{
ID: id,
Name: id.String(),
})
if err != nil {
return err
}
// Note: cSBI not supported currently, so this is commented fow now.
// Might be needed or removed in the future.
//
// switch mne.(type) {
// case *CsbiNetworkElement:
// return pnd.handleCsbiDeletion(mne)
// default:
// return pnd.networkElementService.Delete(mne)
// }
return n.mneService.Delete(mne)
}
// SubscribePath subscribes to specifc paths of an mne.
func (n *NetworkElementServer) SubscribePath(request *mnepb.SubscribePathRequest, stream mnepb.NetworkElementService_SubscribePathServer) error {
networkElement, err := n.getMne(request.Mneid)
log.Error(err)
return status.Errorf(codes.Aborted, "%v", err)
if err := n.subscribePath(networkElement, request.Sublist); err != nil {
return err
}
return nil
}
func (n *NetworkElementServer) subscribePath(networkElement networkelement.NetworkElement, subList *mnepb.SubscriptionList) error {
mode, err := n.mapModeToAristaFork(subList.GetMode())
if err != nil {
return err
}
for _, sub := range subList.Subscription {
streamMode, err := n.mapStreamModeToAristaFork(sub.GetStreamMode())
if err != nil {
return err
}
opts := &aGNMI.SubscribeOptions{
Mode: mode,
StreamMode: streamMode,
Paths: [][]string{n.splitStringPath(sub.GetPath())},
SampleInterval: sub.SampleInterval,
}
ctx := context.Background()
ctx = context.WithValue(ctx, types.CtxKeyOpts, opts)