Skip to content
Snippets Groups Projects
networkElement.go 31 KiB
Newer Older
  • Learn to ignore specific revisions
  • 	"time"
    
    	mnepb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/networkelement"
    
    	ppb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/pnd"
    	tpb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/transport"
    
    	"code.fbi.h-da.de/danet/gosdn/controller/config"
    
    	"code.fbi.h-da.de/danet/gosdn/controller/conflict"
    	"code.fbi.h-da.de/danet/gosdn/controller/customerrs"
    	"code.fbi.h-da.de/danet/gosdn/controller/interfaces/change"
    
    	"code.fbi.h-da.de/danet/gosdn/controller/interfaces/networkdomain"
    
    	"code.fbi.h-da.de/danet/gosdn/controller/interfaces/networkelement"
    	"code.fbi.h-da.de/danet/gosdn/controller/interfaces/plugin"
    	"code.fbi.h-da.de/danet/gosdn/controller/metrics"
    	"code.fbi.h-da.de/danet/gosdn/controller/nucleus"
    	"code.fbi.h-da.de/danet/gosdn/controller/nucleus/types"
    
    	util "code.fbi.h-da.de/danet/gosdn/controller/nucleus/util/gnmi"
    
    	"code.fbi.h-da.de/danet/gosdn/controller/store"
    	aGNMI "code.fbi.h-da.de/danet/gosdn/forks/goarista/gnmi"
    
    	"github.com/bufbuild/protovalidate-go"
    
    	"github.com/openconfig/gnmi/proto/gnmi"
    
    
    	"github.com/openconfig/ygot/ygot"
    	"github.com/prometheus/client_golang/prometheus"
    
    	log "github.com/sirupsen/logrus"
    	"google.golang.org/grpc/codes"
    	"google.golang.org/grpc/status"
    
    	"google.golang.org/protobuf/proto"
    
    )
    
    // NetworkElementServer represents a NetworkElementServer.
    type NetworkElementServer struct {
    	mnepb.UnimplementedNetworkElementServiceServer
    
    	mneService             networkelement.Service
    	pndService             networkdomain.Service
    	pluginService          plugin.Service
    	changeStore            store.ChangeStore
    	protoValidator         *protovalidate.Validator
    	networkElementWatchter *nucleus.NetworkElementWatcher
    
    }
    
    // NewNetworkElementServer returns a new NetWorkElementServer.
    
    func NewNetworkElementServer(
    	mneService networkelement.Service,
    	pndService networkdomain.Service,
    	pluginService plugin.Service,
    	changeStore store.ChangeStore,
    	protoValidator *protovalidate.Validator,
    
    	networkElementWatchter *nucleus.NetworkElementWatcher,
    
    ) *NetworkElementServer {
    
    	return &NetworkElementServer{
    
    		mneService:             mneService,
    		pndService:             pndService,
    		pluginService:          pluginService,
    		changeStore:            changeStore,
    		protoValidator:         protoValidator,
    		networkElementWatchter: networkElementWatchter,
    
    // // Add adds a new network element.
    // func (n *NetworkElementServer) Add(ctx context.Context, request *mnepb.AddNetworkElementRequest) (*mnepb.AddNetworkElementResponse, error) {
    // 	sbiID, err := uuid.Parse(request.NetworkElement.Sbi.Id)
    // 	if err != nil {
    // 		return nil, status.Errorf(codes.Aborted, "%v", err)
    // 	}
    
    // 	pndID, err := uuid.Parse(request.Pid)
    // 	if err != nil {
    // 		return nil, status.Errorf(codes.Aborted, "%v", err)
    // 	}
    
    // 	id, err := n.addNetworkElement(ctx, request.NetworkElement.NetworkElementName, request.NetworkElement.TransportOption, sbiID, pndID)
    // 	if err != nil {
    // 		return nil, status.Errorf(codes.Aborted, "%v", err)
    // 	}
    
    // 	return &mnepb.AddNetworkElementResponse{
    // 		Timestamp:        time.Now().UnixNano(),
    // 		Status:           mnepb.Status_STATUS_OK,
    // 		NetworkElementId: id.String(),
    // 	}, nil
    // }
    
    // // TODO(merge): add plugin here, remove sbi
    // func (n *NetworkElementServer) addNetworkElement(ctx context.Context, name string, transportOpt *tpb.TransportOption, sbiID uuid.UUID, pndID uuid.UUID) (uuid.UUID, error) {
    // 	var sbi southbound.SouthboundInterface
    // 	var err error
    
    // 	// Note: cSBI not supported currently, so this is commented fow now.
    // 	// Might be needed or removed in the future.
    // 	//
    // 	// switch t := opt.Type; t {
    // 	// case spb.Type_TYPE_CONTAINERISED:
    // 	// 	return pnd.handleCsbiEnrolment(name, opt)
    // 	// case spb.Type_TYPE_PLUGIN:
    // 	// 	sbi, err = pnd.requestPlugin(name, opt)
    // 	// 	if err != nil {
    // 	// 		return uuid.Nil, err
    // 	// 	}
    // 	// default:
    // 	// 	var err error
    // 	// 	sbi, err = n.sbiService.Get(store.Query{ID: sbiID})
    // 	// 	if err != nil {
    // 	// 		return uuid.Nil, err
    // 	// 	}
    // 	// }
    
    // 	// TODO(merge): add plugin stuff here!
    // 	// sbi, err = n.sbiService.Get(store.Query{ID: sbiID})
    // 	// if err != nil {
    // 	// 	return uuid.Nil, err
    // 	// }
    
    // 	mne, err := nucleus.NewNetworkElement(name, uuid.Nil, transportOpt, sbi, pndID, conflict.Metadata{ResourceVersion: 0})
    // 	if err != nil {
    // 		return uuid.Nil, err
    // 	}
    
    // 	err = n.mneService.Add(mne)
    // 	if err != nil {
    // 		return uuid.Nil, err
    // 	}
    
    // 	if mne.IsTransportValid() {
    // 		_, err = n.getPath(ctx, mne.ID(), "/interfaces")
    // 		if err != nil {
    // 			return uuid.Nil, err
    // 		}
    // 	}
    
    // 	return mne.ID(), nil
    // }
    
    
    // GetAll returns all stored network elements.
    
    func (n *NetworkElementServer) GetAll(ctx context.Context, request *mnepb.GetAllNetworkElementRequest) (*mnepb.GetAllNetworkElementResponse, error) {
    
    	if err := n.protoValidator.Validate(request); err != nil {
    		return nil, status.Errorf(codes.Aborted, "%v", err)
    	}
    
    
    	networkElements, err := n.mneService.GetAll()
    	if err != nil {
    		return nil, err
    	}
    
    	mnes := []*mnepb.ManagedNetworkElement{}
    
    	for _, mne := range networkElements {
    		ygotStructAsJSON, err := mne.GetModelAsString()
    		if err != nil {
    			log.Error(err)
    			return nil, status.Errorf(codes.Aborted, "%v", err)
    		}
    
    
    		mnes = append(mnes, &mnepb.ManagedNetworkElement{
    
    			Id:    mne.ID().String(),
    			Name:  mne.Name(),
    			Model: ygotStructAsJSON,
    		})
    	}
    
    	return &mnepb.GetAllNetworkElementResponse{
    		Timestamp:      time.Now().UnixNano(),
    		NetworkElement: mnes,
    	}, nil
    }
    
    // Get returns a network element.
    
    func (n *NetworkElementServer) Get(ctx context.Context, request *mnepb.GetNetworkElementRequest) (*mnepb.GetNetworkElementResponse, error) {
    
    	if err := n.protoValidator.Validate(request); err != nil {
    		return nil, status.Errorf(codes.Aborted, "%v", err)
    	}
    
    
    	mne, err := n.mneService.Get(store.Query{ID: uuid.MustParse(request.NetworkElementId)})
    
    	}
    
    	ygotStructAsJSON, err := mne.GetModelAsString()
    	if err != nil {
    		log.Error(err)
    		return nil, status.Errorf(codes.Aborted, "%v", err)
    	}
    
    
    	networkElement := &mnepb.ManagedNetworkElement{
    
    		Id:               mne.ID().String(),
    		Name:             mne.Name(),
    		Model:            ygotStructAsJSON,
    		TransportAddress: mne.TransportAddress(),
    	}
    
    	return &mnepb.GetNetworkElementResponse{
    		Timestamp:      time.Now().UnixNano(),
    		NetworkElement: networkElement,
    	}, nil
    }
    
    // Update updates a network element.
    
    func (n *NetworkElementServer) Update(ctx context.Context, request *mnepb.UpdateNetworkElementRequest) (*mnepb.UpdateNetworkElementResponse, error) {
    
    	if err := n.protoValidator.Validate(request); err != nil {
    		return nil, status.Errorf(codes.Aborted, "%v", err)
    	}
    
    
    	mneID, err := uuid.Parse(request.NetworkElement.Id)
    	if err != nil {
    
    		return nil, err
    	}
    
    	err = n.mneService.UpdateModel(mneID, request.NetworkElement.Model)
    	if err != nil {
    		return nil, err
    
    	networkElement, err := n.mneService.Get(store.Query{ID: mneID})
    	if err != nil {
    		return nil, err
    	}
    
    	err = networkelement.EnsureIntendedConfigurationIsAppliedOnNetworkElement(networkElement)
    
    	}
    
    	return &mnepb.UpdateNetworkElementResponse{
    		Timestamp: time.Now().UnixNano(),
    	}, nil
    }
    
    
    // GetMne gets a specific mne.
    func (n *NetworkElementServer) GetMne(ctx context.Context, request *mnepb.GetMneRequest) (*mnepb.GetMneResponse, error) {
    
    	if err := n.protoValidator.Validate(request); err != nil {
    		return nil, status.Errorf(codes.Aborted, "%v", err)
    	}
    
    
    	labels := prometheus.Labels{"service": "mne", "rpc": "get"}
    	start := metrics.StartHook(labels, grpcRequestsTotal)
    	defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
    
    
    	pnd, err := n.getPnd(request.Pid)
    
    	if err != nil {
    		log.Error(err)
    		return nil, status.Errorf(codes.Aborted, "%v", err)
    	}
    
    	networkElement, err := n.getMne(request.Mneid)
    	if err != nil {
    		log.Error(err)
    		return nil, status.Errorf(codes.Aborted, "%v", err)
    	}
    
    	// TODO(path): This needs some adjustments when we're switching towards a new
    	// path request handling.
    
    	mne, err := fillMneBySpecificPath(networkElement, "/", false)
    
    	if err != nil {
    		log.Error(err)
    		return nil, status.Errorf(codes.Aborted, "%v", err)
    	}
    
    	return &mnepb.GetMneResponse{
    		Timestamp: time.Now().UnixNano(),
    		Pnd: &ppb.PrincipalNetworkDomain{
    			Id:          pnd.ID().String(),
    			Name:        pnd.GetName(),
    			Description: pnd.GetDescription(),
    		},
    		Mne: mne,
    	}, nil
    }
    
    // GetFlattenedMne gets a specific mne.
    func (n *NetworkElementServer) GetFlattenedMne(ctx context.Context, request *mnepb.GetMneRequest) (*mnepb.GetFlattenedMneResponse, error) {
    
    	if err := n.protoValidator.Validate(request); err != nil {
    		return nil, status.Errorf(codes.Aborted, "%v", err)
    	}
    
    
    	labels := prometheus.Labels{"service": "pnd", "rpc": "get"}
    	start := metrics.StartHook(labels, grpcRequestsTotal)
    	defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
    
    
    	networkElement, err := n.getMne(request.Mneid)
    
    	if err != nil {
    		log.Error(err)
    		return nil, status.Errorf(codes.Aborted, "%v", err)
    	}
    
    	return &mnepb.GetFlattenedMneResponse{
    		Timestamp: time.Now().UnixNano(),
    		Mne: &mnepb.FlattenedManagedNetworkElement{
    			Id:       networkElement.ID().String(),
    			Name:     networkElement.Name(),
    			Pid:      networkElement.PndID().String(),
    			Pluginid: networkElement.GetPlugin().ID().String(),
    		},
    	}, nil
    }
    
    func (n *NetworkElementServer) getMne(identifier string) (networkelement.NetworkElement, error) {
    	id, err := uuid.Parse(identifier)
    	if err != nil {
    		id = uuid.Nil
    	}
    
    	mne, err := n.mneService.Get(store.Query{
    		ID: id,
    	})
    	if mne == nil {
    		return nil, fmt.Errorf("no network element found")
    	}
    	if err != nil {
    		return nil, err
    	}
    
    	return mne, nil
    }
    
    
    func (n *NetworkElementServer) getPnd(identifier string) (networkdomain.NetworkDomain, error) {
    	pid, err := uuid.Parse(identifier)
    	if err != nil {
    		return nil, err
    	}
    
    	pnd, err := n.pndService.Get(store.Query{ID: pid})
    	if err != nil {
    		log.Error(err)
    		return nil, status.Errorf(codes.Aborted, "%v", err)
    	}
    
    	return pnd, nil
    }
    
    
    // GetMneList returns a list of existing mnes.
    func (n *NetworkElementServer) GetMneList(ctx context.Context, request *mnepb.GetMneListRequest) (*mnepb.GetMneListResponse, error) {
    
    	if err := n.protoValidator.Validate(request); err != nil {
    		return nil, status.Errorf(codes.Aborted, "%v", err)
    	}
    
    
    	labels := prometheus.Labels{"service": "mne", "rpc": "get"}
    	start := metrics.StartHook(labels, grpcRequestsTotal)
    	defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
    
    
    	pnd, err := n.getPnd(request.Pid)
    
    	if err != nil {
    		log.Error(err)
    		return nil, status.Errorf(codes.Aborted, "%v", err)
    	}
    
    	storedMNE, err := n.mneService.GetAll()
    	if err != nil {
    		return nil, err
    	}
    
    	mnes := make([]*mnepb.ManagedNetworkElement, len(storedMNE))
    	for i, networkElement := range storedMNE {
    
    		mne, err := fillMneBySpecificPath(networkElement, "/", false)
    
    		if err != nil {
    			log.Error(err)
    			return nil, status.Errorf(codes.Aborted, "%v", err)
    		}
    		mnes[i] = mne
    	}
    
    	return &mnepb.GetMneListResponse{
    		Timestamp: time.Now().UnixNano(),
    		Pnd: &ppb.PrincipalNetworkDomain{
    			Id:          pnd.ID().String(),
    			Name:        pnd.GetName(),
    			Description: pnd.GetDescription(),
    		},
    		Mne: mnes,
    	}, nil
    }
    
    // GetFlattenedMneList returns a list of existing mnes.
    func (n *NetworkElementServer) GetFlattenedMneList(ctx context.Context, request *mnepb.GetFlattenedMneListRequest) (*mnepb.GetFlattenedMneListResponse, error) {
    
    	if err := n.protoValidator.Validate(request); err != nil {
    		return nil, status.Errorf(codes.Aborted, "%v", err)
    	}
    
    
    	labels := prometheus.Labels{"service": "mne", "rpc": "get"}
    	start := metrics.StartHook(labels, grpcRequestsTotal)
    	defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
    
    
    	pnd, err := n.getPnd(request.Pid)
    
    	if err != nil {
    		log.Error(err)
    		return nil, status.Errorf(codes.Aborted, "%v", err)
    	}
    
    	mnes, err := n.mneService.GetAllAsLoaded()
    	if err != nil {
    		return nil, err
    	}
    
    	flattenedMnes := make([]*mnepb.FlattenedManagedNetworkElement, len(mnes))
    	for i, mne := range mnes {
    		mne := &mnepb.FlattenedManagedNetworkElement{
    			Id:       mne.ID,
    			Name:     mne.Name,
    			Pluginid: mne.Plugin,
    			Pid:      mne.PndID,
    		}
    		flattenedMnes[i] = mne
    	}
    
    	return &mnepb.GetFlattenedMneListResponse{
    		Timestamp: time.Now().UnixNano(),
    		Pnd: &ppb.PrincipalNetworkDomain{
    			Id:          pnd.ID().String(),
    			Name:        pnd.GetName(),
    			Description: pnd.GetDescription(),
    		},
    		Mne: flattenedMnes,
    	}, nil
    }
    
    
    func fillMneBySpecificPath(nme networkelement.NetworkElement, path string, requestForIntendedState bool) (*mnepb.ManagedNetworkElement, error) {
    
    	gnmiPath, err := ygot.StringToStructuredPath(path)
    	if err != nil {
    		log.Error(err)
    		return nil, status.Errorf(codes.Aborted, "%v", err)
    	}
    
    
    	notifications, err := nme.GetPlugin().GetNode(gnmiPath, requestForIntendedState)
    
    	if err != nil {
    		return nil, status.Errorf(codes.Aborted, "%v", err)
    	}
    
    	mne := &mnepb.ManagedNetworkElement{
    		Id:              nme.ID().String(),
    		Name:            nme.Name(),
    		MneNotification: notifications,
    	}
    
    	return mne, nil
    }
    
    
    // GetPath gets an actual state of the path on a mne.
    
    func (n *NetworkElementServer) GetPath(ctx context.Context, request *mnepb.GetPathRequest) (*mnepb.GetPathResponse, error) {
    
    	if err := n.protoValidator.Validate(request); err != nil {
    		return nil, status.Errorf(codes.Aborted, "%v", err)
    	}
    
    
    	labels := prometheus.Labels{"service": "mne", "rpc": "get"}
    	start := metrics.StartHook(labels, grpcRequestsTotal)
    	defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
    
    
    	pnd, err := n.getPnd(request.Pid)
    
    	if err != nil {
    		log.Error(err)
    		return nil, status.Errorf(codes.Aborted, "%v", err)
    	}
    
    
    	networkElement, err := n.getMne(request.Mneid)
    
    	if err != nil {
    		log.Error(err)
    		return nil, status.Errorf(codes.Aborted, "%v", err)
    	}
    
    	// In case we get the path from grpc-gateway we have to replace
    	path := strings.ReplaceAll(request.Path, "||", "/")
    
    
    	resp, err := n.getPath(ctx, networkElement, path)
    
    	if err != nil {
    		log.Error(err)
    		return nil, status.Errorf(codes.Aborted, "%v", err)
    	}
    
    	return &mnepb.GetPathResponse{
    		Timestamp: time.Now().UnixNano(),
    		Pnd: &ppb.PrincipalNetworkDomain{
    			Id:          pnd.ID().String(),
    			Name:        pnd.GetName(),
    			Description: pnd.GetDescription(),
    		},
    
    		MneNotification: resp.Notification,
    
    func (n *NetworkElementServer) getPath(ctx context.Context, mne networkelement.NetworkElement, path string) (*gnmi.GetResponse, error) {
    
    	res, err := mne.Transport().Get(ctx, path)
    	if err != nil {
    		return nil, err
    	}
    
    	protoMessage, ok := res.(proto.Message)
    
    	if !ok {
    		return nil, &customerrs.InvalidTypeAssertionError{
    			Value: res,
    			Type:  (*proto.Message)(nil),
    		}
    	}
    
    	getResponse, ok := protoMessage.(*gnmi.GetResponse)
    	if !ok {
    		return nil, &customerrs.InvalidTypeAssertionError{
    			Value: res,
    			Type:  (*gnmi.GetResponse)(nil),
    		}
    
    }
    
    // GetIntendedPath gets a path as the intended state stored in the storage.
    func (n *NetworkElementServer) GetIntendedPath(ctx context.Context, request *mnepb.GetIntendedPathRequest) (*mnepb.GetIntendedPathResponse, error) {
    
    	if err := n.protoValidator.Validate(request); err != nil {
    		return nil, status.Errorf(codes.Aborted, "%v", err)
    	}
    
    
    	pnd, err := n.getPnd(request.Pid)
    
    		log.Error(err)
    		return nil, status.Errorf(codes.Aborted, "%v", err)
    
    	// In case we get the path from grpc-gateway we have to replace
    	intendedPath := strings.ReplaceAll(request.IntendedPath, "||", "/")
    
    	networkElement, err := n.getMne(request.Mneid)
    	if err != nil {
    		log.Error(err)
    		return nil, status.Errorf(codes.Aborted, "%v", err)
    	}
    
    	mne, err := fillMneBySpecificPath(networkElement, intendedPath, true)
    
    	return &mnepb.GetIntendedPathResponse{
    		Timestamp: time.Now().UnixNano(),
    		Pnd: &ppb.PrincipalNetworkDomain{
    			Id:          pnd.ID().String(),
    			Name:        pnd.GetName(),
    			Description: pnd.GetDescription(),
    		},
    		MneNotification: mne.MneNotification,
    	}, nil
    
    }
    
    // GetChange gets a specific change of a mne.
    func (n *NetworkElementServer) GetChange(ctx context.Context, request *mnepb.GetChangeRequest) (*mnepb.GetChangeResponse, error) {
    
    	if err := n.protoValidator.Validate(request); err != nil {
    		return nil, status.Errorf(codes.Aborted, "%v", err)
    	}
    
    
    	labels := prometheus.Labels{"service": "mne", "rpc": "get"}
    	start := metrics.StartHook(labels, grpcRequestsTotal)
    	defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
    
    
    	pnd, err := n.getPnd(request.Pid)
    
    	if err != nil {
    		log.Error(err)
    		return nil, status.Errorf(codes.Aborted, "%v", err)
    	}
    
    	changes, err := n.fillChanges(false, request.Cuid...)
    	if err != nil {
    		log.Error(err)
    		return nil, status.Errorf(codes.Aborted, "%v", err)
    	}
    
    	return &mnepb.GetChangeResponse{
    		Timestamp: time.Now().UnixNano(),
    		Pnd: &ppb.PrincipalNetworkDomain{
    			Id:          pnd.ID().String(),
    			Name:        pnd.GetName(),
    			Description: pnd.GetDescription(),
    		},
    		Change: changes,
    	}, nil
    }
    
    // GetChangeList gets all existing changes.
    func (n *NetworkElementServer) GetChangeList(ctx context.Context, request *mnepb.GetChangeListRequest) (*mnepb.GetChangeListResponse, error) {
    
    	if err := n.protoValidator.Validate(request); err != nil {
    		return nil, status.Errorf(codes.Aborted, "%v", err)
    	}
    
    
    	labels := prometheus.Labels{"service": "mne", "rpc": "get"}
    	start := metrics.StartHook(labels, grpcRequestsTotal)
    	defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
    
    
    	pnd, err := n.getPnd(request.Pid)
    
    	if err != nil {
    		log.Error(err)
    		return nil, status.Errorf(codes.Aborted, "%v", err)
    	}
    
    	changes, err := n.fillChanges(true, "")
    	if err != nil {
    		log.Error(err)
    		return nil, status.Errorf(codes.Aborted, "%v", err)
    	}
    
    	return &mnepb.GetChangeListResponse{
    		Timestamp: time.Now().UnixNano(),
    		Pnd: &ppb.PrincipalNetworkDomain{
    			Id:          pnd.ID().String(),
    			Name:        pnd.GetName(),
    			Description: pnd.GetDescription(),
    		},
    		Change: changes,
    	}, nil
    }
    
    func (n *NetworkElementServer) fillChanges(all bool, cuid ...string) ([]*mnepb.Change, error) {
    	var changeList []uuid.UUID
    
    	switch all {
    	case true:
    		changeList = n.changeStore.Pending()
    		changeList = append(changeList, n.changeStore.Committed()...)
    	default:
    		var err error
    		if len(cuid) == 0 {
    			return nil, &customerrs.InvalidParametersError{
    				Func:  n.fillChanges,
    				Param: "length of 'mneID' cannot be '0' when 'all' is set to 'false'",
    			}
    		}
    		changeList, err = stringArrayToUUIDs(cuid)
    		if err != nil {
    			log.Error(err)
    			return nil, status.Errorf(codes.Aborted, "%v", err)
    		}
    	}
    
    	changes := make([]*mnepb.Change, len(changeList))
    	for i, ch := range changeList {
    		c, err := n.changeStore.GetChange(ch)
    		if err != nil {
    			log.Error(err)
    			return nil, status.Errorf(codes.Aborted, "%v", err)
    		}
    
    		// Diff could be added here.
    		changes[i] = &mnepb.Change{
    			Id:    ch.String(),
    			Age:   c.Age().Microseconds(),
    			State: c.State(),
    		}
    	}
    	return changes, nil
    }
    
    func stringArrayToUUIDs(sid []string) ([]uuid.UUID, error) {
    	UUIDs := make([]uuid.UUID, len(sid))
    	for i, id := range sid {
    		parsed, err := uuid.Parse(id)
    		if err != nil {
    			log.Error(err)
    			return nil, status.Errorf(codes.Aborted, "%v", err)
    		}
    		UUIDs[i] = parsed
    	}
    	return UUIDs, nil
    }
    
    
    // SetMneList adds the list of mnes to the storage.
    
    func (n *NetworkElementServer) SetMneList(ctx context.Context, request *mnepb.SetMneListRequest) (*mnepb.SetMneListResponse, error) {
    
    	if err := n.protoValidator.Validate(request); err != nil {
    		return nil, status.Errorf(codes.Aborted, "%v", err)
    	}
    
    
    	labels := prometheus.Labels{"service": "mne", "rpc": "set"}
    	start := metrics.StartHook(labels, grpcRequestsTotal)
    	defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
    
    	pndID, err := uuid.Parse(request.Pid)
    	if err != nil {
    		log.Error(err)
    		return nil, status.Errorf(codes.Aborted, "%v", err)
    	}
    
    	networkElementIDs := make([]uuid.UUID, 0, len(request.Mne))
    	for _, r := range request.Mne {
    		pluginId, err := uuid.Parse(r.GetPluginId())
    		if err != nil {
    			log.Error(err)
    			return nil, status.Errorf(codes.Aborted, "%v", err)
    		}
    		mneID, err := n.addMne(ctx, r.MneName, r.TransportOption, nil, pluginId, pndID)
    		if err != nil {
    			log.Error(err)
    			return nil, status.Errorf(codes.Aborted, "%v", err)
    		}
    		networkElementIDs = append(networkElementIDs, mneID)
    	}
    
    	r := make([]*mnepb.SetResponse, len(networkElementIDs))
    	for i, mneID := range networkElementIDs {
    
    		r[i] = &mnepb.SetResponse{Id: mneID.String()}
    
    	}
    
    	return &mnepb.SetMneListResponse{
    		Timestamp: time.Now().UnixNano(),
    		Responses: r,
    	}, nil
    }
    
    func (n *NetworkElementServer) addMne(ctx context.Context, name string, opt *tpb.TransportOption, requestPluginFunc func(uuid.UUID) (plugin.Plugin, error), pluginID uuid.UUID, pndID uuid.UUID, optionalNetworkElementID ...uuid.UUID) (uuid.UUID, error) {
    	var err error
    
    	networkElementID := uuid.Nil
    	if len(optionalNetworkElementID) > 0 {
    		networkElementID = optionalNetworkElementID[0]
    	}
    
    	if requestPluginFunc == nil {
    		requestPluginFunc = n.pluginService.RequestPlugin
    	}
    
    	plugin, err := requestPluginFunc(pluginID)
    	if err != nil {
    		return uuid.Nil, err
    	}
    
    	mne, err := nucleus.NewNetworkElement(name, networkElementID, opt, pndID, plugin, conflict.Metadata{ResourceVersion: 0})
    	if err != nil {
    
    		if pluginRmErr := plugin.Remove(); err != nil {
    			return uuid.Nil, pluginRmErr
    		}
    
    		return uuid.Nil, err
    	}
    
    	if mne.IsTransportValid() {
    
    		err := n.initialNetworkElementRootPathRequest(ctx, mne, plugin)
    
    			if pluginRmErr := plugin.Remove(); err != nil {
    				return uuid.Nil, pluginRmErr
    			}
    
    			if pluginRmErr := plugin.Remove(); err != nil {
    				return uuid.Nil, pluginRmErr
    			}
    			if err := n.mneService.Delete(mne); err != nil {
    				return uuid.Nil, err
    			}
    
    
    		n.networkElementWatchter.SubscribeToNetworkElement(mne, config.GetGnmiSubscriptionPaths(), nil)
    
    		if pluginRmErr := plugin.Remove(); err != nil {
    			return uuid.Nil, pluginRmErr
    		}
    		return uuid.Nil, fmt.Errorf("invalid transport data provided")
    
    func (n *NetworkElementServer) initialNetworkElementRootPathRequest(ctx context.Context, mne networkelement.NetworkElement, plugin plugin.Plugin) error {
    	resp, err := n.getPath(ctx, mne, "/")
    	if err != nil {
    		if pluginRmErr := plugin.Remove(); err != nil {
    			return pluginRmErr
    		}
    		return err
    	}
    
    	err = mne.ProcessResponse(resp)
    	if err != nil {
    		if pluginRmErr := plugin.Remove(); err != nil {
    			return pluginRmErr
    		}
    		return err
    	}
    	return nil
    }
    
    
    // SetChangeList sets a list of changes.
    func (n *NetworkElementServer) SetChangeList(ctx context.Context, request *mnepb.SetChangeListRequest) (*mnepb.SetChangeListResponse, error) {
    	labels := prometheus.Labels{"service": "mne", "rpc": "set"}
    	start := metrics.StartHook(labels, grpcRequestsTotal)
    	defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
    
    
    	if err := n.protoValidator.Validate(request); err != nil {
    		return nil, status.Errorf(codes.Aborted, "%v", err)
    	}
    
    
    	responses := make([]*mnepb.SetResponse, len(request.Change))
    
    	for i, r := range request.Change {
    		cuid, err := uuid.Parse(r.Cuid)
    		if err != nil {
    			log.Error(err)
    			return nil, status.Errorf(codes.Aborted, "%v", err)
    		}
    		switch r.Op {
    		case mnepb.Operation_OPERATION_COMMIT:
    			if err := n.Commit(cuid); err != nil {
    				log.Error(err)
    				return nil, status.Errorf(codes.Aborted, "%v", err)
    			}
    		case mnepb.Operation_OPERATION_CONFIRM:
    			if err := n.Confirm(cuid); err != nil {
    				log.Error(err)
    				return nil, status.Errorf(codes.Aborted, "%v", err)
    			}
    		default:
    			return nil, &customerrs.InvalidParametersError{
    				Param: r.Op,
    			}
    		}
    
    		responses[i] = &mnepb.SetResponse{
    
    		}
    	}
    	return &mnepb.SetChangeListResponse{
    		Timestamp: time.Now().UnixNano(),
    		Responses: responses,
    	}, nil
    }
    
    // Commit calls commit on the pending change with ID.
    func (n *NetworkElementServer) Commit(u uuid.UUID) error {
    	ch, err := n.changeStore.GetChange(u)
    	if err != nil {
    		return err
    	}
    
    	if err := ch.Commit(); err != nil {
    		return err
    	}
    
    	// Set all the changes within the network elements model
    	networkElement, err := n.mneService.Get(store.Query{ID: ch.AssociatedDeviceID()})
    	if err != nil {
    		return err
    	}
    	diff, err := networkElement.GetPlugin().Diff(ch.PreviousState(), ch.IntendedState())
    	if err != nil {
    		return err
    	}
    	for _, update := range diff.GetUpdate() {
    		if err := networkElement.GetPlugin().SetNode(update.GetPath(), update.GetVal()); err != nil {
    			return err
    		}
    	}
    	for _, deletePath := range diff.GetDelete() {
    		if err := networkElement.GetPlugin().DeleteNode(deletePath); err != nil {
    			return err
    		}
    	}
    
    	// update the network element
    	return n.mneService.Update(networkElement)
    }
    
    // Confirm calls confirm on pending the pending change with ID.
    func (n *NetworkElementServer) Confirm(u uuid.UUID) error {
    	ch, err := n.changeStore.GetChange(u)
    	if err != nil {
    		return err
    	}
    	return ch.Confirm()
    }
    
    // SetPathList sets a list of paths.
    func (n *NetworkElementServer) SetPathList(ctx context.Context, request *mnepb.SetPathListRequest) (*mnepb.SetPathListResponse, error) {
    	labels := prometheus.Labels{"service": "mne", "rpc": "set"}
    	start := metrics.StartHook(labels, grpcRequestsTotal)
    	defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
    
    
    	if err := n.protoValidator.Validate(request); err != nil {
    		return nil, status.Errorf(codes.Aborted, "%v", err)
    	}
    
    
    	responses := make([]*mnepb.SetResponse, len(request.ChangeRequest))
    
    	for i, r := range request.ChangeRequest {
    
    		var cid uuid.UUID
    		var err error
    
    
    		mneID, err := uuid.Parse(r.Mneid)
    		if err != nil {
    			log.Error(err)
    			return nil, status.Errorf(codes.Aborted, "%v", err)
    		}
    
    
    		// NOTE: Could be useful to split method into two, one for
    		// update/replace and on for delete.
    		cid, err = n.ChangeMNE(mneID, r.ApiOp, r.Path, r.Value)
    
    		if err != nil {
    			log.Error(err)
    			return nil, status.Errorf(codes.Aborted, "%v", err)
    		}
    
    		responses[i] = &mnepb.SetResponse{
    
    		}
    	}
    	return &mnepb.SetPathListResponse{
    		Timestamp: time.Now().UnixNano(),
    		Responses: responses,
    	}, nil
    }
    
    // ChangeMNE creates a change from the provided Operation, path and value.
    // The Change is Pending and times out after the specified timeout period.
    
    func (n *NetworkElementServer) ChangeMNE(duid uuid.UUID, operation mnepb.ApiOperation, path *gnmi.Path, value *gnmi.TypedValue) (uuid.UUID, error) {
    
    	mne, err := n.mneService.Get(store.Query{
    		ID: duid,
    	})
    	if err != nil {
    		return uuid.Nil, err
    	}
    
    	plugin := mne.GetPlugin()
    
    
    	validatedChangeModel, err := plugin.ValidateChange(operation, path, value)
    
    	if err != nil {
    		return uuid.Nil, err
    	}
    
    	filteredMarshaledModel, err := plugin.PruneConfigFalse(validatedChangeModel)
    	if err != nil {
    		return uuid.Nil, err
    	}
    
    
    	currentModel, err := mne.GetModelAsFilteredCopy()
    	if err != nil {
    		return uuid.Nil, err
    
    	diff, err := plugin.Diff(currentModel, filteredMarshaledModel)
    
    	if err != nil {
    		return uuid.Nil, err
    	}
    
    
    	if util.IsGNMINotificationEmpty(diff) {
    		return uuid.Nil, customerrs.NoNewChangesError{Original: string(currentModel), Modified: string(filteredMarshaledModel)}
    	}
    
    	callback := func(original, modified []byte) error {
    		ctx := context.WithValue(context.Background(), types.CtxKeyOperation, operation) // nolint
    		payload := change.Payload{Original: original, Modified: modified, Diff: diff}
    		return mne.Transport().Set(ctx, payload)
    	}
    
    	ch := nucleus.NewChange(duid, currentModel, filteredMarshaledModel, diff, callback)
    
    
    	if err := n.changeStore.Add(ch); err != nil {
    		return uuid.Nil, err
    	}
    
    	return ch.ID(), nil
    }
    
    // DeleteMne deletes a mne.
    func (n *NetworkElementServer) DeleteMne(ctx context.Context, request *mnepb.DeleteMneRequest) (*mnepb.DeleteMneResponse, error) {
    
    	if err := n.protoValidator.Validate(request); err != nil {
    		return nil, status.Errorf(codes.Aborted, "%v", err)
    	}
    
    
    	mneID, err := uuid.Parse(request.Mneid)
    	if err != nil {
    		log.Error(err)
    		return nil, status.Errorf(codes.Aborted, "%v", err)
    	}
    
    	if err := n.deleteMne(mneID); err != nil {
    		log.Error(err)
    		return nil, status.Errorf(codes.Aborted, "%v", err)
    	}
    	return &mnepb.DeleteMneResponse{
    		Timestamp: time.Now().UnixNano(),
    	}, nil
    }
    
    func (n *NetworkElementServer) deleteMne(id uuid.UUID) error {
    	mne, err := n.mneService.Get(store.Query{
    		ID:   id,
    		Name: id.String(),
    	})
    
    	if err != nil {
    		return err
    	}
    
    	// Note: cSBI not supported currently, so this is commented fow now.
    	// Might be needed or removed in the future.
    	//