package server import ( "context" "fmt" "strings" "time" mnepb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/networkelement" ppb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/pnd" tpb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/transport" "code.fbi.h-da.de/danet/gosdn/controller/conflict" "code.fbi.h-da.de/danet/gosdn/controller/customerrs" "code.fbi.h-da.de/danet/gosdn/controller/interfaces/change" "code.fbi.h-da.de/danet/gosdn/controller/interfaces/networkdomain" "code.fbi.h-da.de/danet/gosdn/controller/interfaces/networkelement" "code.fbi.h-da.de/danet/gosdn/controller/interfaces/plugin" "code.fbi.h-da.de/danet/gosdn/controller/metrics" "code.fbi.h-da.de/danet/gosdn/controller/nucleus" "code.fbi.h-da.de/danet/gosdn/controller/nucleus/types" util "code.fbi.h-da.de/danet/gosdn/controller/nucleus/util/gnmi" "code.fbi.h-da.de/danet/gosdn/controller/store" aGNMI "code.fbi.h-da.de/danet/gosdn/forks/goarista/gnmi" "github.com/google/uuid" gpb "github.com/openconfig/gnmi/proto/gnmi" "github.com/openconfig/ygot/ygot" "github.com/prometheus/client_golang/prometheus" log "github.com/sirupsen/logrus" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "google.golang.org/protobuf/proto" ) // NetworkElementServer represents a NetworkElementServer. type NetworkElementServer struct { mnepb.UnimplementedNetworkElementServiceServer mneService networkelement.Service pndService networkdomain.Service pluginService plugin.Service changeStore store.ChangeStore } // NewNetworkElementServer returns a new NetWorkElementServer. func NewNetworkElementServer(mneService networkelement.Service, pndService networkdomain.Service, pluginService plugin.Service, changeStore store.ChangeStore) *NetworkElementServer { return &NetworkElementServer{ mneService: mneService, pndService: pndService, pluginService: pluginService, changeStore: changeStore, } } // // Add adds a new network element. // func (n *NetworkElementServer) Add(ctx context.Context, request *mnepb.AddNetworkElementRequest) (*mnepb.AddNetworkElementResponse, error) { // sbiID, err := uuid.Parse(request.NetworkElement.Sbi.Id) // if err != nil { // return nil, status.Errorf(codes.Aborted, "%v", err) // } // pndID, err := uuid.Parse(request.Pid) // if err != nil { // return nil, status.Errorf(codes.Aborted, "%v", err) // } // id, err := n.addNetworkElement(ctx, request.NetworkElement.NetworkElementName, request.NetworkElement.TransportOption, sbiID, pndID) // if err != nil { // return nil, status.Errorf(codes.Aborted, "%v", err) // } // return &mnepb.AddNetworkElementResponse{ // Timestamp: time.Now().UnixNano(), // Status: mnepb.Status_STATUS_OK, // NetworkElementId: id.String(), // }, nil // } // // TODO(merge): add plugin here, remove sbi // func (n *NetworkElementServer) addNetworkElement(ctx context.Context, name string, transportOpt *tpb.TransportOption, sbiID uuid.UUID, pndID uuid.UUID) (uuid.UUID, error) { // var sbi southbound.SouthboundInterface // var err error // // Note: cSBI not supported currently, so this is commented fow now. // // Might be needed or removed in the future. // // // // switch t := opt.Type; t { // // case spb.Type_TYPE_CONTAINERISED: // // return pnd.handleCsbiEnrolment(name, opt) // // case spb.Type_TYPE_PLUGIN: // // sbi, err = pnd.requestPlugin(name, opt) // // if err != nil { // // return uuid.Nil, err // // } // // default: // // var err error // // sbi, err = n.sbiService.Get(store.Query{ID: sbiID}) // // if err != nil { // // return uuid.Nil, err // // } // // } // // TODO(merge): add plugin stuff here! // // sbi, err = n.sbiService.Get(store.Query{ID: sbiID}) // // if err != nil { // // return uuid.Nil, err // // } // mne, err := nucleus.NewNetworkElement(name, uuid.Nil, transportOpt, sbi, pndID, conflict.Metadata{ResourceVersion: 0}) // if err != nil { // return uuid.Nil, err // } // err = n.mneService.Add(mne) // if err != nil { // return uuid.Nil, err // } // if mne.IsTransportValid() { // _, err = n.getPath(ctx, mne.ID(), "/interfaces") // if err != nil { // return uuid.Nil, err // } // } // return mne.ID(), nil // } // GetAll returns all stored network elements. func (n *NetworkElementServer) GetAll(ctx context.Context, request *mnepb.GetAllNetworkElementRequest) (*mnepb.GetAllNetworkElementResponse, error) { networkElements, err := n.mneService.GetAll() if err != nil { return nil, err } mnes := []*mnepb.ManagedNetworkElement{} for _, mne := range networkElements { ygotStructAsJSON, err := mne.GetModelAsString() if err != nil { log.Error(err) return nil, status.Errorf(codes.Aborted, "%v", err) } mnes = append(mnes, &mnepb.ManagedNetworkElement{ Id: mne.ID().String(), Name: mne.Name(), Model: ygotStructAsJSON, }) } return &mnepb.GetAllNetworkElementResponse{ Timestamp: time.Now().UnixNano(), Status: mnepb.Status_STATUS_OK, NetworkElement: mnes, }, nil } // Get returns a network element. func (n *NetworkElementServer) Get(ctx context.Context, request *mnepb.GetNetworkElementRequest) (*mnepb.GetNetworkElementResponse, error) { mne, err := n.mneService.Get(store.Query{ID: uuid.MustParse(request.NetworkElementId)}) if err != nil { return nil, err } ygotStructAsJSON, err := mne.GetModelAsString() if err != nil { log.Error(err) return nil, status.Errorf(codes.Aborted, "%v", err) } networkElement := &mnepb.ManagedNetworkElement{ Id: mne.ID().String(), Name: mne.Name(), Model: ygotStructAsJSON, TransportAddress: mne.TransportAddress(), } return &mnepb.GetNetworkElementResponse{ Timestamp: time.Now().UnixNano(), Status: mnepb.Status_STATUS_OK, NetworkElement: networkElement, }, nil } // Update updates a network element. func (n *NetworkElementServer) Update(ctx context.Context, request *mnepb.UpdateNetworkElementRequest) (*mnepb.UpdateNetworkElementResponse, error) { mneID, err := uuid.Parse(request.NetworkElement.Id) if err != nil { return nil, err } err = n.mneService.UpdateModel(mneID, request.NetworkElement.Model) if err != nil { return nil, err } networkElement, err := n.mneService.Get(store.Query{ID: mneID}) if err != nil { return nil, err } err = networkelement.EnsureIntendedConfigurationIsAppliedOnNetworkElement(networkElement) if err != nil { return &mnepb.UpdateNetworkElementResponse{ Timestamp: time.Now().UnixNano(), Status: mnepb.Status_STATUS_ERROR, }, err } return &mnepb.UpdateNetworkElementResponse{ Timestamp: time.Now().UnixNano(), Status: mnepb.Status_STATUS_OK, }, nil } // GetMne gets a specific mne. func (n *NetworkElementServer) GetMne(ctx context.Context, request *mnepb.GetMneRequest) (*mnepb.GetMneResponse, error) { labels := prometheus.Labels{"service": "mne", "rpc": "get"} start := metrics.StartHook(labels, grpcRequestsTotal) defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds) pnd, err := n.getPnd(request.Pid) if err != nil { log.Error(err) return nil, status.Errorf(codes.Aborted, "%v", err) } networkElement, err := n.getMne(request.Mneid) if err != nil { log.Error(err) return nil, status.Errorf(codes.Aborted, "%v", err) } // TODO(path): This needs some adjustments when we're switching towards a new // path request handling. mne, err := fillMneBySpecificPath(networkElement, "/", false) if err != nil { log.Error(err) return nil, status.Errorf(codes.Aborted, "%v", err) } return &mnepb.GetMneResponse{ Timestamp: time.Now().UnixNano(), Pnd: &ppb.PrincipalNetworkDomain{ Id: pnd.ID().String(), Name: pnd.GetName(), Description: pnd.GetDescription(), }, Mne: mne, }, nil } // GetFlattenedMne gets a specific mne. func (n *NetworkElementServer) GetFlattenedMne(ctx context.Context, request *mnepb.GetMneRequest) (*mnepb.GetFlattenedMneResponse, error) { labels := prometheus.Labels{"service": "pnd", "rpc": "get"} start := metrics.StartHook(labels, grpcRequestsTotal) defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds) networkElement, err := n.getMne(request.Mneid) if err != nil { log.Error(err) return nil, status.Errorf(codes.Aborted, "%v", err) } return &mnepb.GetFlattenedMneResponse{ Timestamp: time.Now().UnixNano(), Mne: &mnepb.FlattenedManagedNetworkElement{ Id: networkElement.ID().String(), Name: networkElement.Name(), Pid: networkElement.PndID().String(), Pluginid: networkElement.GetPlugin().ID().String(), }, }, nil } func (n *NetworkElementServer) getMne(identifier string) (networkelement.NetworkElement, error) { id, err := uuid.Parse(identifier) if err != nil { id = uuid.Nil } mne, err := n.mneService.Get(store.Query{ ID: id, }) if mne == nil { return nil, fmt.Errorf("no network element found") } if err != nil { return nil, err } return mne, nil } func (n *NetworkElementServer) getPnd(identifier string) (networkdomain.NetworkDomain, error) { pid, err := uuid.Parse(identifier) if err != nil { return nil, err } pnd, err := n.pndService.Get(store.Query{ID: pid}) if err != nil { log.Error(err) return nil, status.Errorf(codes.Aborted, "%v", err) } return pnd, nil } // GetMneList returns a list of existing mnes. func (n *NetworkElementServer) GetMneList(ctx context.Context, request *mnepb.GetMneListRequest) (*mnepb.GetMneListResponse, error) { labels := prometheus.Labels{"service": "mne", "rpc": "get"} start := metrics.StartHook(labels, grpcRequestsTotal) defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds) pnd, err := n.getPnd(request.Pid) if err != nil { log.Error(err) return nil, status.Errorf(codes.Aborted, "%v", err) } storedMNE, err := n.mneService.GetAll() if err != nil { return nil, err } mnes := make([]*mnepb.ManagedNetworkElement, len(storedMNE)) for i, networkElement := range storedMNE { mne, err := fillMneBySpecificPath(networkElement, "/", false) if err != nil { log.Error(err) return nil, status.Errorf(codes.Aborted, "%v", err) } mnes[i] = mne } return &mnepb.GetMneListResponse{ Timestamp: time.Now().UnixNano(), Pnd: &ppb.PrincipalNetworkDomain{ Id: pnd.ID().String(), Name: pnd.GetName(), Description: pnd.GetDescription(), }, Mne: mnes, }, nil } // GetFlattenedMneList returns a list of existing mnes. func (n *NetworkElementServer) GetFlattenedMneList(ctx context.Context, request *mnepb.GetFlattenedMneListRequest) (*mnepb.GetFlattenedMneListResponse, error) { labels := prometheus.Labels{"service": "mne", "rpc": "get"} start := metrics.StartHook(labels, grpcRequestsTotal) defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds) pnd, err := n.getPnd(request.Pid) if err != nil { log.Error(err) return nil, status.Errorf(codes.Aborted, "%v", err) } mnes, err := n.mneService.GetAllAsLoaded() if err != nil { return nil, err } flattenedMnes := make([]*mnepb.FlattenedManagedNetworkElement, len(mnes)) for i, mne := range mnes { mne := &mnepb.FlattenedManagedNetworkElement{ Id: mne.ID, Name: mne.Name, Pluginid: mne.Plugin, Pid: mne.PndID, } flattenedMnes[i] = mne } return &mnepb.GetFlattenedMneListResponse{ Timestamp: time.Now().UnixNano(), Pnd: &ppb.PrincipalNetworkDomain{ Id: pnd.ID().String(), Name: pnd.GetName(), Description: pnd.GetDescription(), }, Mne: flattenedMnes, }, nil } func fillMneBySpecificPath(nme networkelement.NetworkElement, path string, requestForIntendedState bool) (*mnepb.ManagedNetworkElement, error) { gnmiPath, err := ygot.StringToStructuredPath(path) if err != nil { log.Error(err) return nil, status.Errorf(codes.Aborted, "%v", err) } notifications, err := nme.GetPlugin().GetNode(gnmiPath, requestForIntendedState) if err != nil { return nil, status.Errorf(codes.Aborted, "%v", err) } mne := &mnepb.ManagedNetworkElement{ Id: nme.ID().String(), Name: nme.Name(), MneNotification: notifications, } return mne, nil } // GetPath gets an actual state of the path on a mne. func (n *NetworkElementServer) GetPath(ctx context.Context, request *mnepb.GetPathRequest) (*mnepb.GetPathResponse, error) { labels := prometheus.Labels{"service": "mne", "rpc": "get"} start := metrics.StartHook(labels, grpcRequestsTotal) defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds) pnd, err := n.getPnd(request.Pid) if err != nil { log.Error(err) return nil, status.Errorf(codes.Aborted, "%v", err) } networkElement, err := n.getMne(request.Mneid) if err != nil { log.Error(err) return nil, status.Errorf(codes.Aborted, "%v", err) } // In case we get the path from grpc-gateway we have to replace path := strings.ReplaceAll(request.Path, "||", "/") resp, err := n.getPath(ctx, networkElement, path) if err != nil { log.Error(err) return nil, status.Errorf(codes.Aborted, "%v", err) } return &mnepb.GetPathResponse{ Timestamp: time.Now().UnixNano(), Pnd: &ppb.PrincipalNetworkDomain{ Id: pnd.ID().String(), Name: pnd.GetName(), Description: pnd.GetDescription(), }, MneNotification: resp.(*gpb.GetResponse).Notification, }, nil } func (n *NetworkElementServer) getPath(ctx context.Context, mne networkelement.NetworkElement, path string) (proto.Message, error) { res, err := mne.Transport().Get(ctx, path) if err != nil { return nil, err } resp, ok := res.(proto.Message) if !ok { return nil, &customerrs.InvalidTypeAssertionError{ Value: res, Type: (*proto.Message)(nil), } } err = mne.ProcessResponse(resp) if err != nil { return nil, err } return resp, nil } // GetIntendedPath gets a path as the intended state stored in the storage. func (n *NetworkElementServer) GetIntendedPath(ctx context.Context, request *mnepb.GetIntendedPathRequest) (*mnepb.GetIntendedPathResponse, error) { pnd, err := n.getPnd(request.Pid) if err != nil { log.Error(err) return nil, status.Errorf(codes.Aborted, "%v", err) } // In case we get the path from grpc-gateway we have to replace intendedPath := strings.ReplaceAll(request.IntendedPath, "||", "/") networkElement, err := n.getMne(request.Mneid) if err != nil { log.Error(err) return nil, status.Errorf(codes.Aborted, "%v", err) } mne, err := fillMneBySpecificPath(networkElement, intendedPath, true) if err != nil { return nil, err } return &mnepb.GetIntendedPathResponse{ Timestamp: time.Now().UnixNano(), Pnd: &ppb.PrincipalNetworkDomain{ Id: pnd.ID().String(), Name: pnd.GetName(), Description: pnd.GetDescription(), }, MneNotification: mne.MneNotification, }, nil } // GetChange gets a specific change of a mne. func (n *NetworkElementServer) GetChange(ctx context.Context, request *mnepb.GetChangeRequest) (*mnepb.GetChangeResponse, error) { labels := prometheus.Labels{"service": "mne", "rpc": "get"} start := metrics.StartHook(labels, grpcRequestsTotal) defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds) pnd, err := n.getPnd(request.Pid) if err != nil { log.Error(err) return nil, status.Errorf(codes.Aborted, "%v", err) } changes, err := n.fillChanges(false, request.Cuid...) if err != nil { log.Error(err) return nil, status.Errorf(codes.Aborted, "%v", err) } return &mnepb.GetChangeResponse{ Timestamp: time.Now().UnixNano(), Pnd: &ppb.PrincipalNetworkDomain{ Id: pnd.ID().String(), Name: pnd.GetName(), Description: pnd.GetDescription(), }, Change: changes, }, nil } // GetChangeList gets all existing changes. func (n *NetworkElementServer) GetChangeList(ctx context.Context, request *mnepb.GetChangeListRequest) (*mnepb.GetChangeListResponse, error) { labels := prometheus.Labels{"service": "mne", "rpc": "get"} start := metrics.StartHook(labels, grpcRequestsTotal) defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds) pnd, err := n.getPnd(request.Pid) if err != nil { log.Error(err) return nil, status.Errorf(codes.Aborted, "%v", err) } changes, err := n.fillChanges(true, "") if err != nil { log.Error(err) return nil, status.Errorf(codes.Aborted, "%v", err) } return &mnepb.GetChangeListResponse{ Timestamp: time.Now().UnixNano(), Pnd: &ppb.PrincipalNetworkDomain{ Id: pnd.ID().String(), Name: pnd.GetName(), Description: pnd.GetDescription(), }, Change: changes, }, nil } func (n *NetworkElementServer) fillChanges(all bool, cuid ...string) ([]*mnepb.Change, error) { var changeList []uuid.UUID switch all { case true: changeList = n.changeStore.Pending() changeList = append(changeList, n.changeStore.Committed()...) default: var err error if len(cuid) == 0 { return nil, &customerrs.InvalidParametersError{ Func: n.fillChanges, Param: "length of 'mneID' cannot be '0' when 'all' is set to 'false'", } } changeList, err = stringArrayToUUIDs(cuid) if err != nil { log.Error(err) return nil, status.Errorf(codes.Aborted, "%v", err) } } changes := make([]*mnepb.Change, len(changeList)) for i, ch := range changeList { c, err := n.changeStore.GetChange(ch) if err != nil { log.Error(err) return nil, status.Errorf(codes.Aborted, "%v", err) } // Diff could be added here. changes[i] = &mnepb.Change{ Id: ch.String(), Age: c.Age().Microseconds(), State: c.State(), } } return changes, nil } func stringArrayToUUIDs(sid []string) ([]uuid.UUID, error) { UUIDs := make([]uuid.UUID, len(sid)) for i, id := range sid { parsed, err := uuid.Parse(id) if err != nil { log.Error(err) return nil, status.Errorf(codes.Aborted, "%v", err) } UUIDs[i] = parsed } return UUIDs, nil } // SetMneList adds the list of mnes to the storage. func (n *NetworkElementServer) SetMneList(ctx context.Context, request *mnepb.SetMneListRequest) (*mnepb.SetMneListResponse, error) { labels := prometheus.Labels{"service": "mne", "rpc": "set"} start := metrics.StartHook(labels, grpcRequestsTotal) defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds) pndID, err := uuid.Parse(request.Pid) if err != nil { log.Error(err) return nil, status.Errorf(codes.Aborted, "%v", err) } networkElementIDs := make([]uuid.UUID, 0, len(request.Mne)) for _, r := range request.Mne { pluginId, err := uuid.Parse(r.GetPluginId()) if err != nil { log.Error(err) return nil, status.Errorf(codes.Aborted, "%v", err) } mneID, err := n.addMne(ctx, r.MneName, r.TransportOption, nil, pluginId, pndID) if err != nil { log.Error(err) return nil, status.Errorf(codes.Aborted, "%v", err) } networkElementIDs = append(networkElementIDs, mneID) } r := make([]*mnepb.SetResponse, len(networkElementIDs)) for i, mneID := range networkElementIDs { r[i] = &mnepb.SetResponse{Id: mneID.String(), Status: mnepb.Status_STATUS_OK} } return &mnepb.SetMneListResponse{ Timestamp: time.Now().UnixNano(), Status: mnepb.Status_STATUS_OK, Responses: r, }, nil } func (n *NetworkElementServer) addMne(ctx context.Context, name string, opt *tpb.TransportOption, requestPluginFunc func(uuid.UUID) (plugin.Plugin, error), pluginID uuid.UUID, pndID uuid.UUID, optionalNetworkElementID ...uuid.UUID) (uuid.UUID, error) { var err error // Note: cSBI not supported currently, so this is commented fow now. // Might be needed or removed in the future. // // switch t := opt.Type; t { // case spb.Type_TYPE_CONTAINERISED: // return n.handleCsbiEnrolment(name, opt) // case spb.Type_TYPE_PLUGIN: // sbi, err = n.requestPlugin(name, opt) // if err != nil { // return uuid.Nil, err // } // default: // var err error // sbi, err = pnd.southboundService.Get(store.Query{ID: sid}) // if err != nil { // return uuid.Nil, err // } // } networkElementID := uuid.Nil if len(optionalNetworkElementID) > 0 { networkElementID = optionalNetworkElementID[0] } if requestPluginFunc == nil { requestPluginFunc = n.pluginService.RequestPlugin } plugin, err := requestPluginFunc(pluginID) if err != nil { return uuid.Nil, err } mne, err := nucleus.NewNetworkElement(name, networkElementID, opt, pndID, plugin, conflict.Metadata{ResourceVersion: 0}) if err != nil { return uuid.Nil, err } if mne.IsTransportValid() { _, err = n.getPath(ctx, mne, "/") if err != nil { return uuid.Nil, err } err = n.mneService.Add(mne) if err != nil { return uuid.Nil, err } } else { return uuid.Nil, status.Errorf(codes.InvalidArgument, "invalid transport data provided") } return mne.ID(), nil } // SetChangeList sets a list of changes. func (n *NetworkElementServer) SetChangeList(ctx context.Context, request *mnepb.SetChangeListRequest) (*mnepb.SetChangeListResponse, error) { labels := prometheus.Labels{"service": "mne", "rpc": "set"} start := metrics.StartHook(labels, grpcRequestsTotal) defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds) responses := make([]*mnepb.SetResponse, len(request.Change)) for i, r := range request.Change { cuid, err := uuid.Parse(r.Cuid) if err != nil { log.Error(err) return nil, status.Errorf(codes.Aborted, "%v", err) } switch r.Op { case mnepb.Operation_OPERATION_COMMIT: if err := n.Commit(cuid); err != nil { log.Error(err) return nil, status.Errorf(codes.Aborted, "%v", err) } case mnepb.Operation_OPERATION_CONFIRM: if err := n.Confirm(cuid); err != nil { log.Error(err) return nil, status.Errorf(codes.Aborted, "%v", err) } default: return nil, &customerrs.InvalidParametersError{ Param: r.Op, } } responses[i] = &mnepb.SetResponse{ Id: cuid.String(), Status: mnepb.Status_STATUS_OK, } } return &mnepb.SetChangeListResponse{ Timestamp: time.Now().UnixNano(), Status: mnepb.Status_STATUS_OK, Responses: responses, }, nil } // Commit calls commit on the pending change with ID. func (n *NetworkElementServer) Commit(u uuid.UUID) error { ch, err := n.changeStore.GetChange(u) if err != nil { return err } if err := ch.Commit(); err != nil { return err } // Set all the changes within the network elements model networkElement, err := n.mneService.Get(store.Query{ID: ch.AssociatedDeviceID()}) if err != nil { return err } diff, err := networkElement.GetPlugin().Diff(ch.PreviousState(), ch.IntendedState()) if err != nil { return err } for _, update := range diff.GetUpdate() { if err := networkElement.GetPlugin().SetNode(update.GetPath(), update.GetVal()); err != nil { return err } } for _, deletePath := range diff.GetDelete() { if err := networkElement.GetPlugin().DeleteNode(deletePath); err != nil { return err } } // update the network element return n.mneService.Update(networkElement) } // Confirm calls confirm on pending the pending change with ID. func (n *NetworkElementServer) Confirm(u uuid.UUID) error { ch, err := n.changeStore.GetChange(u) if err != nil { return err } return ch.Confirm() } // SetPathList sets a list of paths. func (n *NetworkElementServer) SetPathList(ctx context.Context, request *mnepb.SetPathListRequest) (*mnepb.SetPathListResponse, error) { labels := prometheus.Labels{"service": "mne", "rpc": "set"} start := metrics.StartHook(labels, grpcRequestsTotal) defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds) responses := make([]*mnepb.SetResponse, len(request.ChangeRequest)) for i, r := range request.ChangeRequest { mneID, err := uuid.Parse(r.Mneid) if err != nil { log.Error(err) return nil, status.Errorf(codes.Aborted, "%v", err) } cid, err := n.ChangeMNE(mneID, r.ApiOp, r.Path, r.Value) if err != nil { log.Error(err) return nil, status.Errorf(codes.Aborted, "%v", err) } responses[i] = &mnepb.SetResponse{ Status: mnepb.Status_STATUS_OK, Id: cid.String(), } } return &mnepb.SetPathListResponse{ Timestamp: time.Now().UnixNano(), Status: mnepb.Status_STATUS_OK, Responses: responses, }, nil } // ChangeMNE creates a change from the provided Operation, path and value. // The Change is Pending and times out after the specified timeout period. func (n *NetworkElementServer) ChangeMNE(duid uuid.UUID, operation mnepb.ApiOperation, path string, value ...string) (uuid.UUID, error) { mne, err := n.mneService.Get(store.Query{ ID: duid, }) if err != nil { return uuid.Nil, err } p, err := ygot.StringToStructuredPath(path) if err != nil { return uuid.Nil, err } plugin := mne.GetPlugin() validatedChangeModel, err := plugin.ValidateChange(operation, p, []byte(value[0])) if err != nil { return uuid.Nil, err } filteredMarshaledModel, err := plugin.PruneConfigFalse(validatedChangeModel) if err != nil { return uuid.Nil, err } if operation != mnepb.ApiOperation_API_OPERATION_DELETE && len(value) != 1 { return uuid.Nil, &customerrs.InvalidParametersError{ Func: n.ChangeMNE, Param: value, } } currentModel, err := mne.GetModelAsFilteredCopy() if err != nil { return uuid.Nil, err } diff, err := plugin.Diff(currentModel, filteredMarshaledModel) if err != nil { return uuid.Nil, err } if util.IsGNMINotificationEmpty(diff) { return uuid.Nil, customerrs.NoNewChangesError{Original: string(currentModel), Modified: string(filteredMarshaledModel)} } callback := func(original, modified []byte) error { ctx := context.WithValue(context.Background(), types.CtxKeyOperation, operation) // nolint payload := change.Payload{Original: original, Modified: modified, Diff: diff} return mne.Transport().Set(ctx, payload) } ch := nucleus.NewChange(duid, currentModel, filteredMarshaledModel, diff, callback) if err := n.changeStore.Add(ch); err != nil { return uuid.Nil, err } return ch.ID(), nil } // DeleteMne deletes a mne. func (n *NetworkElementServer) DeleteMne(ctx context.Context, request *mnepb.DeleteMneRequest) (*mnepb.DeleteMneResponse, error) { mneID, err := uuid.Parse(request.Mneid) if err != nil { log.Error(err) return nil, status.Errorf(codes.Aborted, "%v", err) } if err := n.deleteMne(mneID); err != nil { log.Error(err) return nil, status.Errorf(codes.Aborted, "%v", err) } return &mnepb.DeleteMneResponse{ Timestamp: time.Now().UnixNano(), Status: mnepb.Status_STATUS_OK, }, nil } func (n *NetworkElementServer) deleteMne(id uuid.UUID) error { mne, err := n.mneService.Get(store.Query{ ID: id, Name: id.String(), }) if err != nil { return err } // Note: cSBI not supported currently, so this is commented fow now. // Might be needed or removed in the future. // // switch mne.(type) { // case *CsbiNetworkElement: // return pnd.handleCsbiDeletion(mne) // default: // return pnd.networkElementService.Delete(mne) // } return n.mneService.Delete(mne) } // SubscribePath subscribes to specifc paths of an mne. func (n *NetworkElementServer) SubscribePath(request *mnepb.SubscribePathRequest, stream mnepb.NetworkElementService_SubscribePathServer) error { networkElement, err := n.getMne(request.Mneid) if err != nil { log.Error(err) return status.Errorf(codes.Aborted, "%v", err) } if err := n.subscribePath(networkElement, request.Sublist); err != nil { return err } return nil } func (n *NetworkElementServer) subscribePath(networkElement networkelement.NetworkElement, subList *mnepb.SubscriptionList) error { mode, err := n.mapModeToAristaFork(subList.GetMode()) if err != nil { return err } for _, sub := range subList.Subscription { streamMode, err := n.mapStreamModeToAristaFork(sub.GetStreamMode()) if err != nil { return err } opts := &aGNMI.SubscribeOptions{ Mode: mode, StreamMode: streamMode, Paths: [][]string{n.splitStringPath(sub.GetPath())}, SampleInterval: sub.SampleInterval, } ctx := context.Background() ctx = context.WithValue(ctx, types.CtxKeyOpts, opts) if err = networkElement.Transport().Subscribe(ctx); err != nil { return err } } return nil } func (n *NetworkElementServer) splitStringPath(s string) []string { return strings.Split(s, "/") } func (n *NetworkElementServer) mapStreamModeToAristaFork(mode mnepb.StreamMode) (string, error) { switch mode { case mnepb.StreamMode_STREAM_MODE_TARGET_DEFINED: return "target_defined", nil case mnepb.StreamMode_STREAM_MODE_ON_CHANGE: return "on_change", nil case mnepb.StreamMode_STREAM_MODE_SAMPLE: return "sample", nil default: return "", fmt.Errorf("StreamMode of type: %T is not supported", mode) } } func (n *NetworkElementServer) mapModeToAristaFork(mode mnepb.SubscriptionMode) (string, error) { switch mode { case mnepb.SubscriptionMode_SUBSCRIPTION_MODE_STREAM: return "stream", nil case mnepb.SubscriptionMode_SUBSCRIPTION_MODE_ONCE: return "once", nil case mnepb.SubscriptionMode_SUBSCRIPTION_MODE_POLL: return "poll", nil default: return "", fmt.Errorf("SubscriptionMode of type: %T is not supported", mode) } }