Newer
Older
package server
import (
"context"
"time"
mnepb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/networkelement"
ppb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/pnd"
tpb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/transport"
Fabian Seidl
committed
"code.fbi.h-da.de/danet/gosdn/controller/config"
"code.fbi.h-da.de/danet/gosdn/controller/conflict"
"code.fbi.h-da.de/danet/gosdn/controller/customerrs"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/change"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/networkdomain"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/networkelement"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/plugin"
"code.fbi.h-da.de/danet/gosdn/controller/metrics"
"code.fbi.h-da.de/danet/gosdn/controller/nucleus"
"code.fbi.h-da.de/danet/gosdn/controller/nucleus/types"
util "code.fbi.h-da.de/danet/gosdn/controller/nucleus/util/gnmi"
"code.fbi.h-da.de/danet/gosdn/controller/store"
aGNMI "code.fbi.h-da.de/danet/gosdn/forks/goarista/gnmi"
"github.com/bufbuild/protovalidate-go"
"github.com/google/uuid"
Malte Bauch
committed
"github.com/hashicorp/go-multierror"
"github.com/openconfig/gnmi/proto/gnmi"
Fabian Seidl
committed
ygotutil "github.com/openconfig/ygot/util"
"github.com/openconfig/ygot/ygot"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
)
// NetworkElementServer represents a NetworkElementServer.
type NetworkElementServer struct {
mnepb.UnimplementedNetworkElementServiceServer
mneService networkelement.Service
pndService networkdomain.Service
pluginService plugin.Service
changeStore store.ChangeStore
protoValidator *protovalidate.Validator
networkElementWatchter *nucleus.NetworkElementWatcher
}
// NewNetworkElementServer returns a new NetWorkElementServer.
func NewNetworkElementServer(
mneService networkelement.Service,
pndService networkdomain.Service,
pluginService plugin.Service,
changeStore store.ChangeStore,
protoValidator *protovalidate.Validator,
networkElementWatchter *nucleus.NetworkElementWatcher,
return &NetworkElementServer{
mneService: mneService,
pndService: pndService,
pluginService: pluginService,
changeStore: changeStore,
protoValidator: protoValidator,
networkElementWatchter: networkElementWatchter,
}
}
// Update updates a network element.
func (n *NetworkElementServer) Update(ctx context.Context, request *mnepb.UpdateNetworkElementRequest) (*mnepb.UpdateNetworkElementResponse, error) {
if err := n.protoValidator.Validate(request); err != nil {
return nil, status.Errorf(codes.Aborted, "%v", err)
}
mneID, err := uuid.Parse(request.NetworkElement.Id)
if err != nil {
return nil, err
}
err = n.mneService.UpdateModel(mneID, request.NetworkElement.Model)
if err != nil {
return nil, err
networkElement, err := n.mneService.Get(store.Query{ID: mneID})
if err != nil {
return nil, err
}
err = networkelement.EnsureIntendedConfigurationIsAppliedOnNetworkElement(networkElement)
if err != nil {
}
return &mnepb.UpdateNetworkElementResponse{
Timestamp: time.Now().UnixNano(),
}, nil
}
// Get gets a specific mne.
Fabian Seidl
committed
func (n *NetworkElementServer) Get(ctx context.Context, request *mnepb.GetRequest) (*mnepb.GetResponse, error) {
if err := n.protoValidator.Validate(request); err != nil {
return nil, status.Errorf(codes.Aborted, "%v", err)
}
labels := prometheus.Labels{"service": "mne", "rpc": "get"}
start := metrics.StartHook(labels, grpcRequestsTotal)
defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
pnd, err := n.getPnd(request.Pid)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
networkElement, err := n.getMne(request.Mneid)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
// TODO(path): This needs some adjustments when we're switching towards a new
// path request handling.
mne, err := fillMneBySpecificPath(networkElement, "/", false)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
mne.Model, err = networkElement.GetModelAsString()
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
mne.TransportAddress = networkElement.TransportAddress()
Fabian Seidl
committed
return &mnepb.GetResponse{
Timestamp: time.Now().UnixNano(),
Pnd: &ppb.PrincipalNetworkDomain{
Id: pnd.ID().String(),
Name: pnd.GetName(),
Description: pnd.GetDescription(),
},
Mne: mne,
}, nil
}
// GetFlattened gets a specific mne.
Fabian Seidl
committed
func (n *NetworkElementServer) GetFlattened(ctx context.Context, request *mnepb.GetFlattenedRequest) (*mnepb.GetFlattenedResponse, error) {
if err := n.protoValidator.Validate(request); err != nil {
return nil, status.Errorf(codes.Aborted, "%v", err)
}
labels := prometheus.Labels{"service": "pnd", "rpc": "get"}
start := metrics.StartHook(labels, grpcRequestsTotal)
defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
networkElement, err := n.getMne(request.Mneid)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
Fabian Seidl
committed
return &mnepb.GetFlattenedResponse{
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
Timestamp: time.Now().UnixNano(),
Mne: &mnepb.FlattenedManagedNetworkElement{
Id: networkElement.ID().String(),
Name: networkElement.Name(),
Pid: networkElement.PndID().String(),
Pluginid: networkElement.GetPlugin().ID().String(),
},
}, nil
}
func (n *NetworkElementServer) getMne(identifier string) (networkelement.NetworkElement, error) {
id, err := uuid.Parse(identifier)
if err != nil {
id = uuid.Nil
}
mne, err := n.mneService.Get(store.Query{
ID: id,
})
if mne == nil {
return nil, fmt.Errorf("no network element found")
}
if err != nil {
return nil, err
}
return mne, nil
}
func (n *NetworkElementServer) getPnd(identifier string) (networkdomain.NetworkDomain, error) {
pid, err := uuid.Parse(identifier)
if err != nil {
return nil, err
}
pnd, err := n.pndService.Get(store.Query{ID: pid})
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
return pnd, nil
}
// GetAll returns a list of existing mnes.
Fabian Seidl
committed
func (n *NetworkElementServer) GetAll(ctx context.Context, request *mnepb.GetAllRequest) (*mnepb.GetAllResponse, error) {
if err := n.protoValidator.Validate(request); err != nil {
return nil, status.Errorf(codes.Aborted, "%v", err)
}
labels := prometheus.Labels{"service": "mne", "rpc": "get"}
start := metrics.StartHook(labels, grpcRequestsTotal)
defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
pnd, err := n.getPnd(request.Pid)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
storedMNE, err := n.mneService.GetAll()
if err != nil {
return nil, err
}
mnes := make([]*mnepb.ManagedNetworkElement, len(storedMNE))
for i, networkElement := range storedMNE {
mne, err := fillMneBySpecificPath(networkElement, "/", false)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
mnes[i] = mne
}
Fabian Seidl
committed
return &mnepb.GetAllResponse{
Timestamp: time.Now().UnixNano(),
Pnd: &ppb.PrincipalNetworkDomain{
Id: pnd.ID().String(),
Name: pnd.GetName(),
Description: pnd.GetDescription(),
},
Mne: mnes,
}, nil
}
// GetAllFlattened returns a list of existing mnes.
Fabian Seidl
committed
func (n *NetworkElementServer) GetAllFlattened(ctx context.Context, request *mnepb.GetAllFlattenedRequest) (*mnepb.GetAllFlattenedResponse, error) {
if err := n.protoValidator.Validate(request); err != nil {
return nil, status.Errorf(codes.Aborted, "%v", err)
}
labels := prometheus.Labels{"service": "mne", "rpc": "get"}
start := metrics.StartHook(labels, grpcRequestsTotal)
defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
pnd, err := n.getPnd(request.Pid)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
mnes, err := n.mneService.GetAllAsLoaded()
if err != nil {
return nil, err
}
flattenedMnes := make([]*mnepb.FlattenedManagedNetworkElement, len(mnes))
for i, mne := range mnes {
mne := &mnepb.FlattenedManagedNetworkElement{
Id: mne.ID,
Name: mne.Name,
Pluginid: mne.Plugin,
Pid: mne.PndID,
}
flattenedMnes[i] = mne
}
Fabian Seidl
committed
return &mnepb.GetAllFlattenedResponse{
Timestamp: time.Now().UnixNano(),
Pnd: &ppb.PrincipalNetworkDomain{
Id: pnd.ID().String(),
Name: pnd.GetName(),
Description: pnd.GetDescription(),
},
Mne: flattenedMnes,
}, nil
}
func fillMneBySpecificPath(nme networkelement.NetworkElement, path string, requestForIntendedState bool) (*mnepb.ManagedNetworkElement, error) {
gnmiPath, err := ygot.StringToStructuredPath(path)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
notifications, err := nme.GetPlugin().GetNode(gnmiPath, requestForIntendedState)
if err != nil {
return nil, status.Errorf(codes.Aborted, "%v", err)
}
mne := &mnepb.ManagedNetworkElement{
Id: nme.ID().String(),
Name: nme.Name(),
MneNotification: notifications,
}
return mne, nil
}
// GetPath gets an actual state of the path on a mne.
func (n *NetworkElementServer) GetPath(ctx context.Context, request *mnepb.GetPathRequest) (*mnepb.GetPathResponse, error) {
if err := n.protoValidator.Validate(request); err != nil {
return nil, status.Errorf(codes.Aborted, "%v", err)
}
labels := prometheus.Labels{"service": "mne", "rpc": "get"}
start := metrics.StartHook(labels, grpcRequestsTotal)
defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
pnd, err := n.getPnd(request.Pid)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
networkElement, err := n.getMne(request.Mneid)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
// In case we get the path from grpc-gateway we have to replace
path := strings.ReplaceAll(request.Path, "||", "/")
resp, err := n.getPath(ctx, networkElement, path)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
return &mnepb.GetPathResponse{
Timestamp: time.Now().UnixNano(),
Pnd: &ppb.PrincipalNetworkDomain{
Id: pnd.ID().String(),
Name: pnd.GetName(),
Description: pnd.GetDescription(),
},
MneNotification: resp.Notification,
func (n *NetworkElementServer) getPath(ctx context.Context, mne networkelement.NetworkElement, path string) (*gnmi.GetResponse, error) {
res, err := mne.Transport().Get(ctx, path)
if err != nil {
return nil, err
}
protoMessage, ok := res.(proto.Message)
if !ok {
return nil, &customerrs.InvalidTypeAssertionError{
Value: res,
Type: (*proto.Message)(nil),
}
}
getResponse, ok := protoMessage.(*gnmi.GetResponse)
if !ok {
return nil, &customerrs.InvalidTypeAssertionError{
Value: res,
Type: (*gnmi.GetResponse)(nil),
}
return getResponse, nil
}
// GetIntendedPath gets a path as the intended state stored in the storage.
func (n *NetworkElementServer) GetIntendedPath(ctx context.Context, request *mnepb.GetIntendedPathRequest) (*mnepb.GetIntendedPathResponse, error) {
if err := n.protoValidator.Validate(request); err != nil {
return nil, status.Errorf(codes.Aborted, "%v", err)
}
pnd, err := n.getPnd(request.Pid)
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
// In case we get the path from grpc-gateway we have to replace
intendedPath := strings.ReplaceAll(request.IntendedPath, "||", "/")
networkElement, err := n.getMne(request.Mneid)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
mne, err := fillMneBySpecificPath(networkElement, intendedPath, true)
if err != nil {
return nil, err
}
return &mnepb.GetIntendedPathResponse{
Timestamp: time.Now().UnixNano(),
Pnd: &ppb.PrincipalNetworkDomain{
Id: pnd.ID().String(),
Name: pnd.GetName(),
Description: pnd.GetDescription(),
},
MneNotification: mne.MneNotification,
}, nil
}
// GetChange gets a specific change of a mne.
func (n *NetworkElementServer) GetChange(ctx context.Context, request *mnepb.GetChangeRequest) (*mnepb.GetChangeResponse, error) {
if err := n.protoValidator.Validate(request); err != nil {
return nil, status.Errorf(codes.Aborted, "%v", err)
}
labels := prometheus.Labels{"service": "mne", "rpc": "get"}
start := metrics.StartHook(labels, grpcRequestsTotal)
defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
pnd, err := n.getPnd(request.Pid)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
changes, err := n.fillChanges(false, request.Cuid...)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
return &mnepb.GetChangeResponse{
Timestamp: time.Now().UnixNano(),
Pnd: &ppb.PrincipalNetworkDomain{
Id: pnd.ID().String(),
Name: pnd.GetName(),
Description: pnd.GetDescription(),
},
Change: changes,
}, nil
}
// GetChangeList gets all existing changes.
func (n *NetworkElementServer) GetChangeList(ctx context.Context, request *mnepb.GetChangeListRequest) (*mnepb.GetChangeListResponse, error) {
if err := n.protoValidator.Validate(request); err != nil {
return nil, status.Errorf(codes.Aborted, "%v", err)
}
labels := prometheus.Labels{"service": "mne", "rpc": "get"}
start := metrics.StartHook(labels, grpcRequestsTotal)
defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
pnd, err := n.getPnd(request.Pid)
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
changes, err := n.fillChanges(true, "")
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
return &mnepb.GetChangeListResponse{
Timestamp: time.Now().UnixNano(),
Pnd: &ppb.PrincipalNetworkDomain{
Id: pnd.ID().String(),
Name: pnd.GetName(),
Description: pnd.GetDescription(),
},
Change: changes,
}, nil
}
func (n *NetworkElementServer) fillChanges(all bool, cuid ...string) ([]*mnepb.Change, error) {
var changeList []uuid.UUID
switch all {
case true:
changeList = n.changeStore.Pending()
changeList = append(changeList, n.changeStore.Committed()...)
default:
var err error
if len(cuid) == 0 {
return nil, &customerrs.InvalidParametersError{
Func: n.fillChanges,
Param: "length of 'mneID' cannot be '0' when 'all' is set to 'false'",
}
}
changeList, err = stringArrayToUUIDs(cuid)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
}
changes := make([]*mnepb.Change, len(changeList))
for i, ch := range changeList {
c, err := n.changeStore.GetChange(ch)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
// Diff could be added here.
changes[i] = &mnepb.Change{
Id: ch.String(),
Age: c.Age().Microseconds(),
State: c.State(),
}
}
return changes, nil
}
func stringArrayToUUIDs(sid []string) ([]uuid.UUID, error) {
UUIDs := make([]uuid.UUID, len(sid))
for i, id := range sid {
parsed, err := uuid.Parse(id)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
UUIDs[i] = parsed
}
return UUIDs, nil
}
// AddList adds the list of mnes to the storage.
Fabian Seidl
committed
func (n *NetworkElementServer) AddList(ctx context.Context, request *mnepb.AddListRequest) (*mnepb.AddListResponse, error) {
if err := n.protoValidator.Validate(request); err != nil {
return nil, status.Errorf(codes.Aborted, "%v", err)
}
labels := prometheus.Labels{"service": "mne", "rpc": "set"}
start := metrics.StartHook(labels, grpcRequestsTotal)
defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
pndID, err := uuid.Parse(request.Pid)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
networkElementIDs := make([]uuid.UUID, 0, len(request.Mne))
for _, r := range request.Mne {
pluginId, err := uuid.Parse(r.GetPluginId())
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
Fabian Seidl
committed
gNMISubscriptionPaths := n.get2dSlicePathsFrom1dPaths(r.GnmiSubscribePaths)
mneID, err := uuid.Parse(r.MneId)
if err != nil {
log.Errorf("Error parsing UUID: %v. For: %v, ", r.MneId, r.MneName)
}
mneID, err = n.addMne(ctx, r.MneName, r.TransportOption, nil, pluginId, pndID, gNMISubscriptionPaths, mneID)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
networkElementIDs = append(networkElementIDs, mneID)
}
r := make([]*mnepb.SetResponse, len(networkElementIDs))
for i, mneID := range networkElementIDs {
r[i] = &mnepb.SetResponse{Id: mneID.String()}
Fabian Seidl
committed
return &mnepb.AddListResponse{
Timestamp: time.Now().UnixNano(),
Responses: r,
}, nil
}
Fabian Seidl
committed
func (n *NetworkElementServer) addMne(ctx context.Context,
name string,
opt *tpb.TransportOption,
requestPluginFunc func(uuid.UUID) (plugin.Plugin, error),
pluginID uuid.UUID, pndID uuid.UUID,
gnmiSubscriptionPaths [][]string,
optionalNetworkElementID ...uuid.UUID) (mneId uuid.UUID, err error) {
Malte Bauch
committed
var plugin plugin.Plugin
var mne networkelement.NetworkElement
defer func() {
if err != nil && plugin != nil {
// clean up plugin
if pErr := plugin.Remove(); pErr != nil {
err = multierror.Append(err, pErr)
}
}
}()
networkElementID := uuid.Nil
if len(optionalNetworkElementID) > 0 {
networkElementID = optionalNetworkElementID[0]
}
if requestPluginFunc == nil {
requestPluginFunc = n.pluginService.RequestPlugin
}
Malte Bauch
committed
plugin, err = requestPluginFunc(pluginID)
if err != nil {
return uuid.Nil, err
}
Fabian Seidl
committed
mne, err = nucleus.NewNetworkElement(name, networkElementID, opt, pndID, plugin, gnmiSubscriptionPaths, conflict.Metadata{ResourceVersion: 0})
if err != nil {
return uuid.Nil, err
}
if mne.IsTransportValid() {
Malte Bauch
committed
err = n.initialNetworkElementRootPathRequest(ctx, mne, plugin)
if err != nil {
return uuid.Nil, err
}
Malte Bauch
committed
err = n.mneService.Add(mne)
if err != nil {
return uuid.Nil, err
}
Malte Bauch
committed
err = n.pluginService.Add(plugin)
Malte Bauch
committed
if err := n.mneService.Delete(mne); err != nil {
return uuid.Nil, err
}
return uuid.Nil, err
}
Fabian Seidl
committed
if mne.GetGnmiSubscriptionPaths() != nil || config.GetGnmiSubscriptionPaths() != nil {
n.networkElementWatchter.SubscribeToNetworkElement(mne, nil)
}
} else {
Malte Bauch
committed
err = fmt.Errorf("invalid transport data provided")
return uuid.Nil, err
}
return mne.ID(), nil
}
Malte Bauch
committed
func (n *NetworkElementServer) initialNetworkElementRootPathRequest(ctx context.Context, mne networkelement.NetworkElement, plugin plugin.Plugin) error {
resp, err := n.getPath(ctx, mne, "/")
if err != nil {
return err
}
err = mne.ProcessResponse(resp)
if err != nil {
return err
}
return nil
}
// SetChangeList sets a list of changes.
func (n *NetworkElementServer) SetChangeList(ctx context.Context, request *mnepb.SetChangeListRequest) (*mnepb.SetChangeListResponse, error) {
labels := prometheus.Labels{"service": "mne", "rpc": "set"}
start := metrics.StartHook(labels, grpcRequestsTotal)
defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
if err := n.protoValidator.Validate(request); err != nil {
return nil, status.Errorf(codes.Aborted, "%v", err)
}
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
responses := make([]*mnepb.SetResponse, len(request.Change))
for i, r := range request.Change {
cuid, err := uuid.Parse(r.Cuid)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
switch r.Op {
case mnepb.Operation_OPERATION_COMMIT:
if err := n.Commit(cuid); err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
case mnepb.Operation_OPERATION_CONFIRM:
if err := n.Confirm(cuid); err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
default:
return nil, &customerrs.InvalidParametersError{
Param: r.Op,
}
}
responses[i] = &mnepb.SetResponse{
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
}
}
return &mnepb.SetChangeListResponse{
Timestamp: time.Now().UnixNano(),
Responses: responses,
}, nil
}
// Commit calls commit on the pending change with ID.
func (n *NetworkElementServer) Commit(u uuid.UUID) error {
ch, err := n.changeStore.GetChange(u)
if err != nil {
return err
}
if err := ch.Commit(); err != nil {
return err
}
// Set all the changes within the network elements model
networkElement, err := n.mneService.Get(store.Query{ID: ch.AssociatedDeviceID()})
if err != nil {
return err
}
diff, err := networkElement.GetPlugin().Diff(ch.PreviousState(), ch.IntendedState())
if err != nil {
return err
}
for _, update := range diff.GetUpdate() {
if err := networkElement.GetPlugin().SetNode(update.GetPath(), update.GetVal()); err != nil {
return err
}
}
for _, deletePath := range diff.GetDelete() {
if err := networkElement.GetPlugin().DeleteNode(deletePath); err != nil {
return err
}
}
// update the network element
return n.mneService.Update(networkElement)
}
// Confirm calls confirm on pending the pending change with ID.
func (n *NetworkElementServer) Confirm(u uuid.UUID) error {
ch, err := n.changeStore.GetChange(u)
if err != nil {
return err
}
return ch.Confirm()
}
// SetPathList sets a list of paths.
func (n *NetworkElementServer) SetPathList(ctx context.Context, request *mnepb.SetPathListRequest) (*mnepb.SetPathListResponse, error) {
labels := prometheus.Labels{"service": "mne", "rpc": "set"}
start := metrics.StartHook(labels, grpcRequestsTotal)
defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
if err := n.protoValidator.Validate(request); err != nil {
return nil, status.Errorf(codes.Aborted, "%v", err)
}
responses := make([]*mnepb.SetResponse, len(request.ChangeRequest))
for i, r := range request.ChangeRequest {
var cid uuid.UUID
var err error
mneID, err := uuid.Parse(r.Mneid)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
// NOTE: Could be useful to split method into two, one for
// update/replace and on for delete.
cid, err = n.ChangeMNE(mneID, r.ApiOp, r.Path, r.Value)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
responses[i] = &mnepb.SetResponse{
}
}
return &mnepb.SetPathListResponse{
Timestamp: time.Now().UnixNano(),
Responses: responses,
}, nil
}
// ChangeMNE creates a change from the provided Operation, path and value.
// The Change is Pending and times out after the specified timeout period.
func (n *NetworkElementServer) ChangeMNE(duid uuid.UUID, operation mnepb.ApiOperation, path *gnmi.Path, value *gnmi.TypedValue) (uuid.UUID, error) {
mne, err := n.mneService.Get(store.Query{
ID: duid,
})
if err != nil {
return uuid.Nil, err
}
plugin := mne.GetPlugin()
validatedChangeModel, err := plugin.ValidateChange(operation, path, value)
if err != nil {
return uuid.Nil, err
}
filteredMarshaledModel, err := plugin.PruneConfigFalse(validatedChangeModel)
if err != nil {
return uuid.Nil, err
}
currentModel, err := mne.GetModelAsFilteredCopy()
if err != nil {
return uuid.Nil, err
diff, err := plugin.Diff(currentModel, filteredMarshaledModel)
if err != nil {
return uuid.Nil, err
}
if util.IsGNMINotificationEmpty(diff) {
return uuid.Nil, customerrs.NoNewChangesError{Original: string(currentModel), Modified: string(filteredMarshaledModel)}
}
callback := func(original, modified []byte) error {
ctx := context.WithValue(context.Background(), types.CtxKeyOperation, operation) // nolint
payload := change.Payload{Original: original, Modified: modified, Diff: diff}
return mne.Transport().Set(ctx, payload)
}
ch := nucleus.NewChange(duid, currentModel, filteredMarshaledModel, diff, callback)
if err := n.changeStore.Add(ch); err != nil {
return uuid.Nil, err
}
return ch.ID(), nil
}
// Delete deletes a mne.
Fabian Seidl
committed
func (n *NetworkElementServer) Delete(ctx context.Context, request *mnepb.DeleteRequest) (*mnepb.DeleteResponse, error) {
if err := n.protoValidator.Validate(request); err != nil {
return nil, status.Errorf(codes.Aborted, "%v", err)
}
mneID, err := uuid.Parse(request.Mneid)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
if err := n.deleteMne(mneID); err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
Fabian Seidl
committed
return &mnepb.DeleteResponse{
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
Timestamp: time.Now().UnixNano(),
}, nil
}
func (n *NetworkElementServer) deleteMne(id uuid.UUID) error {
mne, err := n.mneService.Get(store.Query{
ID: id,
Name: id.String(),
})
if err != nil {
return err
}
// Note: cSBI not supported currently, so this is commented fow now.
// Might be needed or removed in the future.
//
// switch mne.(type) {
// case *CsbiNetworkElement:
// return pnd.handleCsbiDeletion(mne)
// default:
// return pnd.networkElementService.Delete(mne)
// }
return n.mneService.Delete(mne)
}
// SubscribePath subscribes to specifc paths of an mne.
func (n *NetworkElementServer) SubscribePath(request *mnepb.SubscribePathRequest, stream mnepb.NetworkElementService_SubscribePathServer) error {
networkElement, err := n.getMne(request.Mneid)
log.Error(err)
return status.Errorf(codes.Aborted, "%v", err)
if err := n.subscribePath(networkElement, request.Sublist); err != nil {
return err
}
return nil
}
func (n *NetworkElementServer) subscribePath(networkElement networkelement.NetworkElement, subList *mnepb.SubscriptionList) error {
mode, err := n.mapModeToAristaFork(subList.GetMode())
if err != nil {
return err
}
for _, sub := range subList.Subscription {
streamMode, err := n.mapStreamModeToAristaFork(sub.GetStreamMode())
if err != nil {
return err
}
opts := &aGNMI.SubscribeOptions{
Mode: mode,
StreamMode: streamMode,
Paths: [][]string{n.splitStringPath(sub.GetPath())},
SampleInterval: sub.SampleInterval,
}
ctx := context.Background()
ctx = context.WithValue(ctx, types.CtxKeyOpts, opts)
if err = networkElement.Transport().Subscribe(ctx); err != nil {
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
return err
}
}
return nil
}
func (n *NetworkElementServer) splitStringPath(s string) []string {
return strings.Split(s, "/")
}
func (n *NetworkElementServer) mapStreamModeToAristaFork(mode mnepb.StreamMode) (string, error) {
switch mode {
case mnepb.StreamMode_STREAM_MODE_TARGET_DEFINED:
return "target_defined", nil
case mnepb.StreamMode_STREAM_MODE_ON_CHANGE:
return "on_change", nil
case mnepb.StreamMode_STREAM_MODE_SAMPLE:
return "sample", nil
default:
return "", fmt.Errorf("StreamMode of type: %T is not supported", mode)
}
}
func (n *NetworkElementServer) mapModeToAristaFork(mode mnepb.SubscriptionMode) (string, error) {
switch mode {
case mnepb.SubscriptionMode_SUBSCRIPTION_MODE_STREAM:
return "stream", nil
case mnepb.SubscriptionMode_SUBSCRIPTION_MODE_ONCE:
return "once", nil
case mnepb.SubscriptionMode_SUBSCRIPTION_MODE_POLL:
return "poll", nil
default:
return "", fmt.Errorf("SubscriptionMode of type: %T is not supported", mode)
}
}
Fabian Seidl
committed
func (n *NetworkElementServer) get2dSlicePathsFrom1dPaths(gNMISubscribePaths []string) [][]string {
var paths [][]string
for _, path := range gNMISubscribePaths {
paths = append(paths, ygotutil.PathStringToElements(path))
}
return paths
}