Skip to content
Snippets Groups Projects
networkElement.go 26.2 KiB
Newer Older
  • Learn to ignore specific revisions
  • 	mnepb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/networkelement"
    
    	spb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/southbound"
    	tpb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/transport"
    	"code.fbi.h-da.de/danet/gosdn/controller/conflict"
    	"code.fbi.h-da.de/danet/gosdn/controller/customerrs"
    
    	"code.fbi.h-da.de/danet/gosdn/controller/interfaces/networkdomain"
    
    	"code.fbi.h-da.de/danet/gosdn/controller/interfaces/networkelement"
    	"code.fbi.h-da.de/danet/gosdn/controller/interfaces/southbound"
    	"code.fbi.h-da.de/danet/gosdn/controller/metrics"
    	"code.fbi.h-da.de/danet/gosdn/controller/nucleus"
    	"code.fbi.h-da.de/danet/gosdn/controller/nucleus/types"
    	"code.fbi.h-da.de/danet/gosdn/controller/store"
    	aGNMI "code.fbi.h-da.de/danet/gosdn/forks/goarista/gnmi"
    	"github.com/google/uuid"
    
    	gpb "github.com/openconfig/gnmi/proto/gnmi"
    
    
    	"github.com/openconfig/ygot/ygot"
    	"github.com/openconfig/ygot/ytypes"
    	"github.com/prometheus/client_golang/prometheus"
    	log "github.com/sirupsen/logrus"
    	"google.golang.org/grpc/codes"
    	"google.golang.org/grpc/status"
    	"google.golang.org/protobuf/proto"
    
    )
    
    // NetworkElementServer represents a NetworkElementServer.
    type NetworkElementServer struct {
    	mnepb.UnimplementedNetworkElementServiceServer
    
    	mneService networkelement.Service
    	pndService networkdomain.Service
    	sbiService southbound.Service
    
    }
    
    // NewNetworkElementServer returns a new NetWorkElementServer.
    
    func NewNetworkElementServer(mneService networkelement.Service, pndService networkdomain.Service, sbiService southbound.Service) *NetworkElementServer {
    
    	return &NetworkElementServer{
    
    		mneService: mneService,
    		pndService: pndService,
    		sbiService: sbiService,
    
    // Add adds a new network element.
    func (n *NetworkElementServer) Add(ctx context.Context, request *mnepb.AddNetworkElementRequest) (*mnepb.AddNetworkElementResponse, error) {
    	sbiID, err := uuid.Parse(request.NetworkElement.Sbi.Id)
    	if err != nil {
    		return nil, status.Errorf(codes.Aborted, "%v", err)
    	}
    
    	id, err := n.addNetworkElement(ctx, request.NetworkElement.NetworkElementName, request.NetworkElement.TransportOption, sbiID)
    
    	if err != nil {
    		return nil, status.Errorf(codes.Aborted, "%v", err)
    	}
    
    	return &mnepb.AddNetworkElementResponse{
    		Timestamp:        time.Now().UnixNano(),
    		Status:           mnepb.Status_STATUS_OK,
    		NetworkElementId: id.String(),
    	}, nil
    }
    
    func (n *NetworkElementServer) addNetworkElement(ctx context.Context, name string, transportOpt *tpb.TransportOption, sbiID uuid.UUID) (uuid.UUID, error) {
    	var sbi southbound.SouthboundInterface
    	var err error
    
    	// TODO(PND): figure out how to handle cSBI stuff?!
    	// switch t := opt.Type; t {
    	// case spb.Type_TYPE_CONTAINERISED:
    	// 	return pnd.handleCsbiEnrolment(name, opt)
    	// case spb.Type_TYPE_PLUGIN:
    	// 	sbi, err = pnd.requestPlugin(name, opt)
    	// 	if err != nil {
    	// 		return uuid.Nil, err
    	// 	}
    	// default:
    	// 	var err error
    	// 	sbi, err = n.sbiService.Get(store.Query{ID: sbiID})
    	// 	if err != nil {
    	// 		return uuid.Nil, err
    	// 	}
    	// }
    
    	sbi, err = n.sbiService.Get(store.Query{ID: sbiID})
    	if err != nil {
    		return uuid.Nil, err
    	}
    
    	mne, err := nucleus.NewNetworkElement(name, uuid.Nil, transportOpt, sbi, conflict.Metadata{ResourceVersion: 0})
    	if err != nil {
    		return uuid.Nil, err
    	}
    
    	err = n.mneService.Add(mne)
    	if err != nil {
    		return uuid.Nil, err
    	}
    
    	if mne.IsTransportValid() {
    		_, err = n.getPath(ctx, mne.ID(), "/interfaces")
    		if err != nil {
    			return uuid.Nil, err
    		}
    	}
    
    	return mne.ID(), nil
    }
    
    // GetAll returns all stored network elements.
    func (n *NetworkElementServer) GetAll(ctx context.Context, request *mnepb.GetAllNetworkElementRequest) (*mnepb.GetAllNetworkElementResponse, error) {
    	networkElements, err := n.mneService.GetAll()
    	if err != nil {
    		return nil, err
    	}
    
    	mnes := []*mnepb.NetworkElement{}
    	for _, mne := range networkElements {
    		ygotStructAsJSON, err := mne.GetModelAsString()
    		if err != nil {
    			log.Error(err)
    			return nil, status.Errorf(codes.Aborted, "%v", err)
    		}
    
    		mnes = append(mnes, &mnepb.NetworkElement{
    			Id:    mne.ID().String(),
    			Name:  mne.Name(),
    			Model: ygotStructAsJSON,
    		})
    	}
    
    	return &mnepb.GetAllNetworkElementResponse{
    		Timestamp:      time.Now().UnixNano(),
    		Status:         mnepb.Status_STATUS_OK,
    		NetworkElement: mnes,
    	}, nil
    }
    
    // Get returns a network element.
    func (n *NetworkElementServer) Get(ctx context.Context, request *mnepb.GetNetworkElementRequest) (*mnepb.GetNetworkElementResponse, error) {
    	mne, err := n.mneService.Get(store.Query{ID: uuid.MustParse(request.NetworkElementId)})
    	if err != nil {
    		return nil, err
    	}
    
    	ygotStructAsJSON, err := mne.GetModelAsString()
    	if err != nil {
    		log.Error(err)
    		return nil, status.Errorf(codes.Aborted, "%v", err)
    	}
    
    	networkElement := &mnepb.NetworkElement{
    		Id:               mne.ID().String(),
    		Name:             mne.Name(),
    		Model:            ygotStructAsJSON,
    		TransportAddress: mne.TransportAddress(),
    	}
    
    	return &mnepb.GetNetworkElementResponse{
    		Timestamp:      time.Now().UnixNano(),
    		Status:         mnepb.Status_STATUS_OK,
    		NetworkElement: networkElement,
    	}, nil
    }
    
    // Update updates a network element.
    func (n *NetworkElementServer) Update(ctx context.Context, request *mnepb.UpdateNetworkElementRequest) (*mnepb.UpdateNetworkElementResponse, error) {
    	mneID, err := uuid.Parse(request.NetworkElement.Id)
    	if err != nil {
    		return &mnepb.UpdateNetworkElementResponse{
    			Timestamp: time.Now().UnixNano(),
    			Status:    mnepb.Status_STATUS_ERROR,
    		}, err
    	}
    
    	err = n.mneService.UpdateModel(mneID, request.NetworkElement.Model)
    	if err != nil {
    		return &mnepb.UpdateNetworkElementResponse{
    			Timestamp: time.Now().UnixNano(),
    
    Fabian Seidl's avatar
    Fabian Seidl committed
    			Status:    mnepb.Status_STATUS_ERROR,
    
    		}, err
    	}
    
    	err = n.ensureIntendedConfigurationIsAppliedOnNetworkElement(mneID)
    
    Fabian Seidl's avatar
    Fabian Seidl committed
    	if err != nil {
    		return &mnepb.UpdateNetworkElementResponse{
    			Timestamp: time.Now().UnixNano(),
    			Status:    mnepb.Status_STATUS_ERROR,
    		}, err
    	}
    
    
    	return &mnepb.UpdateNetworkElementResponse{
    		Timestamp: time.Now().UnixNano(),
    		Status:    mnepb.Status_STATUS_OK,
    	}, nil
    }
    
    func (n *NetworkElementServer) ensureIntendedConfigurationIsAppliedOnNetworkElement(mneID uuid.UUID) error {
    	mne, err := n.mneService.Get(store.Query{
    		ID: mneID,
    	})
    	if err != nil {
    		return err
    	}
    
    	model, err := mne.GetModelAsString()
    	if err != nil {
    		return err
    	}
    
    	req := &gpb.SetRequest{}
    	path, err := ygot.StringToStructuredPath("/")
    	if err != nil {
    		return err
    	}
    
    	req.Update = []*gpb.Update{{
    		Path: path,
    		Val: &gpb.TypedValue{
    			Value: &gpb.TypedValue_JsonIetfVal{JsonIetfVal: []byte(model)},
    		},
    	}}
    
    	response, err := mne.Transport().CustomSet(context.Background(), req)
    	if err != nil {
    		log.Errorf("Failed to apply model of network element err=%+v, response=%+v", err, response)
    		return err
    	}
    
    	return nil
    }
    
    
    // GetMne gets a specific mne.
    func (n *NetworkElementServer) GetMne(ctx context.Context, request *mnepb.GetMneRequest) (*mnepb.GetMneResponse, error) {
    	labels := prometheus.Labels{"service": "mne", "rpc": "get"}
    	start := metrics.StartHook(labels, grpcRequestsTotal)
    	defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
    
    	pid, err := uuid.Parse(request.Pid)
    	if err != nil {
    		return nil, handleRPCError(labels, err)
    	}
    
    
    	pnd, err := n.pndService.Get(store.Query{ID: pid})
    
    	if err != nil {
    		log.Error(err)
    		return nil, status.Errorf(codes.Aborted, "%v", err)
    	}
    
    	networkElement, err := n.getMne(request.Mneid)
    	if err != nil {
    		log.Error(err)
    		return nil, status.Errorf(codes.Aborted, "%v", err)
    	}
    
    	// TODO(path): This needs some adjustments when we're switching towards a new
    	// path request handling.
    	mne, err := fillMneBySpecificPath(networkElement, "/")
    	if err != nil {
    		log.Error(err)
    		return nil, status.Errorf(codes.Aborted, "%v", err)
    	}
    
    	return &mnepb.GetMneResponse{
    		Timestamp: time.Now().UnixNano(),
    		Pnd: &mnepb.PrincipalNetworkDomain{
    			Id:          pnd.ID().String(),
    			Name:        pnd.GetName(),
    			Description: pnd.GetDescription(),
    		},
    		Mne: mne,
    	}, nil
    }
    
    func (n *NetworkElementServer) getMne(identifier string) (networkelement.NetworkElement, error) {
    	id, err := uuid.Parse(identifier)
    	if err != nil {
    		id = uuid.Nil
    	}
    
    
    	mne, err := n.mneService.Get(store.Query{
    
    		ID: id,
    	})
    	if mne == nil {
    		return nil, fmt.Errorf("no network element found")
    	}
    	if err != nil {
    		return nil, err
    	}
    
    	return mne, nil
    }
    
    // GetMneList returns a list of existing mnes.
    func (n *NetworkElementServer) GetMneList(ctx context.Context, request *mnepb.GetMneListRequest) (*mnepb.GetMneListResponse, error) {
    	labels := prometheus.Labels{"service": "mne", "rpc": "get"}
    	start := metrics.StartHook(labels, grpcRequestsTotal)
    	defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
    
    	pid, err := uuid.Parse(request.Pid)
    	if err != nil {
    		return nil, handleRPCError(labels, err)
    	}
    
    
    	pnd, err := n.pndService.Get(store.Query{ID: pid})
    
    	if err != nil {
    		log.Error(err)
    		return nil, status.Errorf(codes.Aborted, "%v", err)
    	}
    
    
    	storedMNE, err := n.mneService.GetAll()
    
    	if err != nil {
    		return nil, err
    	}
    
    	mnes := make([]*mnepb.ManagedNetworkElement, len(storedMNE))
    	for i, networkElement := range storedMNE {
    		mne, err := fillMneBySpecificPath(networkElement, "/")
    		if err != nil {
    			log.Error(err)
    			return nil, status.Errorf(codes.Aborted, "%v", err)
    		}
    		mnes[i] = mne
    	}
    
    	return &mnepb.GetMneListResponse{
    		Timestamp: time.Now().UnixNano(),
    		Pnd: &mnepb.PrincipalNetworkDomain{
    			Id:          pnd.ID().String(),
    			Name:        pnd.GetName(),
    			Description: pnd.GetDescription(),
    		},
    		Mne: mnes,
    	}, nil
    }
    
    // GetFlattenedMneList returns a list of existing mnes.
    func (n *NetworkElementServer) GetFlattenedMneList(ctx context.Context, request *mnepb.GetMneListRequest) (*mnepb.GetFlattenedMneListResponse, error) {
    	labels := prometheus.Labels{"service": "mne", "rpc": "get"}
    	start := metrics.StartHook(labels, grpcRequestsTotal)
    	defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
    
    	pid, err := uuid.Parse(request.Pid)
    	if err != nil {
    		return nil, handleRPCError(labels, err)
    	}
    
    
    	pnd, err := n.pndService.Get(store.Query{ID: pid})
    
    	if err != nil {
    		log.Error(err)
    		return nil, status.Errorf(codes.Aborted, "%v", err)
    	}
    
    
    	mnes, err := n.mneService.GetAllAsLoaded()
    
    	if err != nil {
    		return nil, err
    	}
    
    	flattenedMnes := make([]*mnepb.FlattenedManagedNetworkElement, len(mnes))
    	for i, mne := range mnes {
    		mne := &mnepb.FlattenedManagedNetworkElement{
    			Id:   mne.ID,
    			Name: mne.Name,
    			Sbi:  mne.SBI,
    		}
    		flattenedMnes[i] = mne
    	}
    
    	return &mnepb.GetFlattenedMneListResponse{
    		Timestamp: time.Now().UnixNano(),
    		Pnd: &mnepb.PrincipalNetworkDomain{
    			Id:          pnd.ID().String(),
    			Name:        pnd.GetName(),
    			Description: pnd.GetDescription(),
    		},
    		Mne: flattenedMnes,
    	}, nil
    }
    
    func fillMneBySpecificPath(nme networkelement.NetworkElement, path string) (*mnepb.ManagedNetworkElement, error) {
    	gnmiPath, err := ygot.StringToStructuredPath(path)
    	if err != nil {
    		log.Error(err)
    		return nil, status.Errorf(codes.Aborted, "%v", err)
    	}
    
    	opts := []ytypes.GetNodeOpt{
    		&ytypes.GetHandleWildcards{},
    		&ytypes.GetPartialKeyMatch{},
    	}
    	nodes, err := ytypes.GetNode(nme.SBI().Schema().RootSchema(), nme.GetModel(), gnmiPath, opts...)
    	if err != nil {
    		log.Error(err)
    		return nil, status.Errorf(codes.Aborted, "%v", err)
    	}
    
    
    	notifications := make([]*gpb.Notification, len(nodes))
    
    	for i, node := range nodes {
    		mneNotification, err := genGnmiNotification(gnmiPath, node.Data)
    		if err != nil {
    			log.Error(err)
    			return nil, status.Errorf(codes.Aborted, "%v", err)
    		}
    
    		notifications[i] = mneNotification
    	}
    
    	sbi := spb.SouthboundInterface{}
    	if nme.SBI() != nil {
    		sbi.Id = nme.SBI().ID().String()
    		sbi.Type = nme.SBI().Type()
    	}
    
    	mne := &mnepb.ManagedNetworkElement{
    		Id:              nme.ID().String(),
    		Name:            nme.Name(),
    		MneNotification: notifications,
    		Sbi:             &sbi,
    	}
    
    	return mne, nil
    }
    
    
    func genGnmiNotification(path *gpb.Path, val any) (*gpb.Notification, error) {
    	typedVal, err := ygot.EncodeTypedValue(val, gpb.Encoding_JSON_IETF)
    
    	return &gpb.Notification{
    
    					Elem: path.GetElem(),
    				},
    				Val: typedVal,
    			},
    		},
    	}, nil
    }
    
    // GetPath gets a path on a mne.
    func (n *NetworkElementServer) GetPath(ctx context.Context, request *mnepb.GetPathRequest) (*mnepb.GetPathResponse, error) {
    	labels := prometheus.Labels{"service": "mne", "rpc": "get"}
    	start := metrics.StartHook(labels, grpcRequestsTotal)
    	defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
    
    	pid, err := uuid.Parse(request.Pid)
    	if err != nil {
    		return nil, handleRPCError(labels, err)
    	}
    
    
    	pnd, err := n.pndService.Get(store.Query{ID: pid})
    
    	if err != nil {
    		log.Error(err)
    		return nil, status.Errorf(codes.Aborted, "%v", err)
    	}
    
    	mneuid, err := uuid.Parse(request.Mneid)
    	if err != nil {
    		log.Error(err)
    		return nil, status.Errorf(codes.Aborted, "%v", err)
    	}
    
    	// In case we get the path from grpc-gateway we have to replace
    	path := strings.ReplaceAll(request.Path, "||", "/")
    
    	resp, err := n.getPath(ctx, mneuid, path)
    	if err != nil {
    		log.Error(err)
    		return nil, status.Errorf(codes.Aborted, "%v", err)
    	}
    
    	return &mnepb.GetPathResponse{
    		Timestamp: time.Now().UnixNano(),
    		Pnd: &mnepb.PrincipalNetworkDomain{
    			Id:          pnd.ID().String(),
    			Name:        pnd.GetName(),
    			Description: pnd.GetDescription(),
    		},
    
    		MneNotification: resp.(*gpb.GetResponse).Notification,
    
    	}, nil
    }
    
    func (n *NetworkElementServer) getPath(ctx context.Context, uuid uuid.UUID, path string) (proto.Message, error) {
    
    	mne, err := n.mneService.Get(store.Query{
    
    		ID:   uuid,
    		Name: uuid.String(),
    	})
    	if err != nil {
    		return nil, err
    	}
    	//TODO(PND): check if necessary?
    	if mne == nil {
    		return nil, fmt.Errorf("no network element found")
    	}
    
    	res, err := mne.Transport().Get(ctx, path)
    	if err != nil {
    		return nil, err
    	}
    	resp, ok := res.(proto.Message)
    	if !ok {
    		return nil, &customerrs.InvalidTypeAssertionError{
    			Value: res,
    			Type:  (*proto.Message)(nil),
    		}
    	}
    	err = mne.ProcessResponse(resp)
    	if err != nil {
    		return nil, err
    	}
    
    	modelAsString, err := mne.GetModelAsString()
    	if err != nil {
    		return nil, err
    	}
    
    	// TODO(path): We probably have to remove this when we address path request handling.
    
    	err = n.mneService.UpdateModel(uuid, modelAsString)
    
    	if err != nil {
    		return nil, err
    	}
    
    	return resp, nil
    }
    
    // GetChange gets a specific change of a mne.
    func (n *NetworkElementServer) GetChange(ctx context.Context, request *mnepb.GetChangeRequest) (*mnepb.GetChangeResponse, error) {
    	labels := prometheus.Labels{"service": "mne", "rpc": "get"}
    	start := metrics.StartHook(labels, grpcRequestsTotal)
    	defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
    
    	pid, err := uuid.Parse(request.Pid)
    	if err != nil {
    		return nil, handleRPCError(labels, err)
    	}
    
    
    	pnd, err := n.pndService.Get(store.Query{ID: pid})
    
    	if err != nil {
    		log.Error(err)
    		return nil, status.Errorf(codes.Aborted, "%v", err)
    	}
    
    	changes, err := fillChanges(pnd, false, request.Cuid...)
    	if err != nil {
    		log.Error(err)
    		return nil, status.Errorf(codes.Aborted, "%v", err)
    	}
    
    	return &mnepb.GetChangeResponse{
    		Timestamp: time.Now().UnixNano(),
    		Pnd: &mnepb.PrincipalNetworkDomain{
    			Id:          pnd.ID().String(),
    			Name:        pnd.GetName(),
    			Description: pnd.GetDescription(),
    		},
    		Change: changes,
    	}, nil
    }
    
    // GetChangeList gets all existing changes.
    func (n *NetworkElementServer) GetChangeList(ctx context.Context, request *mnepb.GetChangeListRequest) (*mnepb.GetChangeListResponse, error) {
    	labels := prometheus.Labels{"service": "mne", "rpc": "get"}
    	start := metrics.StartHook(labels, grpcRequestsTotal)
    	defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
    
    	pid, err := uuid.Parse(request.Pid)
    	if err != nil {
    		return nil, handleRPCError(labels, err)
    	}
    
    
    	pnd, err := n.pndService.Get(store.Query{ID: pid})
    
    	if err != nil {
    		log.Error(err)
    		return nil, status.Errorf(codes.Aborted, "%v", err)
    	}
    
    	changes, err := fillChanges(pnd, true, "")
    	if err != nil {
    		log.Error(err)
    		return nil, status.Errorf(codes.Aborted, "%v", err)
    	}
    
    	return &mnepb.GetChangeListResponse{
    		Timestamp: time.Now().UnixNano(),
    		Pnd: &mnepb.PrincipalNetworkDomain{
    			Id:          pnd.ID().String(),
    			Name:        pnd.GetName(),
    			Description: pnd.GetDescription(),
    		},
    		Change: changes,
    	}, nil
    }
    
    func fillChanges(pnd networkdomain.NetworkDomain, all bool, cuid ...string) ([]*mnepb.Change, error) {
    	var changeList []uuid.UUID
    
    	switch all {
    	case true:
    		changeList = pnd.PendingChanges()
    		changeList = append(changeList, pnd.CommittedChanges()...)
    	default:
    		var err error
    		if len(cuid) == 0 {
    			return nil, &customerrs.InvalidParametersError{
    				Func:  fillChanges,
    				Param: "length of 'mneID' cannot be '0' when 'all' is set to 'false'",
    			}
    		}
    		changeList, err = stringArrayToUUIDs(cuid)
    		if err != nil {
    			log.Error(err)
    			return nil, status.Errorf(codes.Aborted, "%v", err)
    		}
    	}
    
    	changes := make([]*mnepb.Change, len(changeList))
    	for i, ch := range changeList {
    		c, err := pnd.GetChange(ch)
    		if err != nil {
    			log.Error(err)
    			return nil, status.Errorf(codes.Aborted, "%v", err)
    		}
    
    		diff, err := ygot.Diff(c.PreviousState(), c.IntendedState())
    		if err != nil {
    			log.Error(err)
    			return nil, status.Errorf(codes.Aborted, "%v", err)
    		}
    
    		changes[i] = &mnepb.Change{
    			Id:    ch.String(),
    			Age:   c.Age().Microseconds(),
    			State: c.State(),
    			Diff:  diff,
    		}
    	}
    	return changes, nil
    }
    
    func stringArrayToUUIDs(sid []string) ([]uuid.UUID, error) {
    	UUIDs := make([]uuid.UUID, len(sid))
    	for i, id := range sid {
    		parsed, err := uuid.Parse(id)
    		if err != nil {
    			log.Error(err)
    			return nil, status.Errorf(codes.Aborted, "%v", err)
    		}
    		UUIDs[i] = parsed
    	}
    	return UUIDs, nil
    }
    
    // SetMneList updates the list of mnes.
    func (n *NetworkElementServer) SetMneList(ctx context.Context, request *mnepb.SetMneListRequest) (*mnepb.SetMneListResponse, error) {
    	labels := prometheus.Labels{"service": "mne", "rpc": "set"}
    	start := metrics.StartHook(labels, grpcRequestsTotal)
    	defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
    
    	networkElementIDs := make([]uuid.UUID, 0, len(request.Mne))
    	for _, r := range request.Mne {
    		sid, err := uuid.Parse(r.Sbi.Id)
    		if err != nil {
    			log.Error(err)
    			return nil, status.Errorf(codes.Aborted, "%v", err)
    		}
    		mneID, err := n.addMne(ctx, r.MneName, r.TransportOption, sid)
    		if err != nil {
    			log.Error(err)
    			return nil, status.Errorf(codes.Aborted, "%v", err)
    		}
    		networkElementIDs = append(networkElementIDs, mneID)
    	}
    
    	r := make([]*mnepb.SetResponse, len(networkElementIDs))
    	for i, mneID := range networkElementIDs {
    		r[i] = &mnepb.SetResponse{Id: mneID.String(), Status: mnepb.Status_STATUS_OK}
    	}
    
    	return &mnepb.SetMneListResponse{
    		Timestamp: time.Now().UnixNano(),
    		Status:    mnepb.Status_STATUS_OK,
    		Responses: r,
    	}, nil
    }
    
    func (n *NetworkElementServer) addMne(ctx context.Context, name string, opt *tpb.TransportOption, sid uuid.UUID) (uuid.UUID, error) {
    	var sbi southbound.SouthboundInterface
    	var err error
    
    	// TODO(PND): check how to handle this!
    
    	// switch t := opt.Type; t {
    	// case spb.Type_TYPE_CONTAINERISED:
    	// 	return n.handleCsbiEnrolment(name, opt)
    	// case spb.Type_TYPE_PLUGIN:
    	// 	sbi, err = n.requestPlugin(name, opt)
    	// 	if err != nil {
    	// 		return uuid.Nil, err
    	// 	}
    	// default:
    	// 	var err error
    	// 	sbi, err = pnd.southboundService.Get(store.Query{ID: sid})
    	// 	if err != nil {
    	// 		return uuid.Nil, err
    	// 	}
    	// }
    
    
    	sbi, err = n.sbiService.Get(store.Query{ID: sid})
    
    	if err != nil {
    		return uuid.Nil, err
    	}
    
    	mne, err := nucleus.NewNetworkElement(name, uuid.Nil, opt, sbi, conflict.Metadata{ResourceVersion: 0})
    	if err != nil {
    		return uuid.Nil, err
    	}
    
    
    	err = n.mneService.Add(mne)
    
    	if err != nil {
    		return uuid.Nil, err
    	}
    
    	if mne.IsTransportValid() {
    		_, err = n.getPath(ctx, mne.ID(), "/interfaces")
    		if err != nil {
    			return uuid.Nil, err
    		}
    	}
    
    	return mne.ID(), nil
    }
    
    // SetChangeList sets a list of changes.
    func (n *NetworkElementServer) SetChangeList(ctx context.Context, request *mnepb.SetChangeListRequest) (*mnepb.SetChangeListResponse, error) {
    	labels := prometheus.Labels{"service": "mne", "rpc": "set"}
    	start := metrics.StartHook(labels, grpcRequestsTotal)
    	defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
    	pid, err := uuid.Parse(request.Pid)
    	if err != nil {
    		return nil, handleRPCError(labels, err)
    	}
    
    
    	pnd, err := n.pndService.Get(store.Query{ID: pid})
    
    	if err != nil {
    		return nil, handleRPCError(labels, err)
    	}
    
    	responses := make([]*mnepb.SetResponse, len(request.Change))
    
    	for i, r := range request.Change {
    		cuid, err := uuid.Parse(r.Cuid)
    		if err != nil {
    			log.Error(err)
    			return nil, status.Errorf(codes.Aborted, "%v", err)
    		}
    		switch r.Op {
    		case mnepb.Operation_OPERATION_COMMIT:
    			if err := pnd.Commit(cuid); err != nil {
    				log.Error(err)
    				return nil, status.Errorf(codes.Aborted, "%v", err)
    			}
    		case mnepb.Operation_OPERATION_CONFIRM:
    			if err := pnd.Confirm(cuid); err != nil {
    				log.Error(err)
    				return nil, status.Errorf(codes.Aborted, "%v", err)
    			}
    		default:
    			return nil, &customerrs.InvalidParametersError{
    				Param: r.Op,
    			}
    		}
    
    		responses[i] = &mnepb.SetResponse{
    			Id:     cuid.String(),
    			Status: mnepb.Status_STATUS_OK,
    		}
    	}
    	return &mnepb.SetChangeListResponse{
    		Timestamp: time.Now().UnixNano(),
    		Status:    mnepb.Status_STATUS_OK,
    		Responses: responses,
    	}, nil
    }
    
    // SetPathList sets a list of paths.
    func (n *NetworkElementServer) SetPathList(ctx context.Context, request *mnepb.SetPathListRequest) (*mnepb.SetPathListResponse, error) {
    	labels := prometheus.Labels{"service": "mne", "rpc": "set"}
    	start := metrics.StartHook(labels, grpcRequestsTotal)
    	defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
    
    	pid, err := uuid.Parse(request.Pid)
    	if err != nil {
    		return nil, handleRPCError(labels, err)
    	}
    
    
    	pnd, err := n.pndService.Get(store.Query{ID: pid})
    
    	if err != nil {
    		return nil, handleRPCError(labels, err)
    	}
    
    	responses := make([]*mnepb.SetResponse, len(request.ChangeRequest))
    
    	for i, r := range request.ChangeRequest {
    		mneID, err := uuid.Parse(r.Mneid)
    		if err != nil {
    			log.Error(err)
    			return nil, status.Errorf(codes.Aborted, "%v", err)
    		}
    		cid, err := pnd.ChangeMNE(mneID, r.ApiOp, r.Path, r.Value)
    		if err != nil {
    			log.Error(err)
    			return nil, status.Errorf(codes.Aborted, "%v", err)
    		}
    
    		responses[i] = &mnepb.SetResponse{
    			Status: mnepb.Status_STATUS_OK,
    			Id:     cid.String(),
    		}
    	}
    	return &mnepb.SetPathListResponse{
    		Timestamp: time.Now().UnixNano(),
    		Status:    mnepb.Status_STATUS_OK,
    		Responses: responses,
    	}, nil
    }
    
    // DeleteMne deletes a mne.
    func (n *NetworkElementServer) DeleteMne(ctx context.Context, request *mnepb.DeleteMneRequest) (*mnepb.DeleteMneResponse, error) {
    	//TODO(PND): decide if this should be added or not, requires some changes in controller/nucleus/metrics.go too
    	// labels := prometheus.Labels{"service": "mne", "rpc": "delete"}
    	// start := metrics.StartHook(labels, networkElementDeletionsTotal)
    	// defer metrics.FinishHook(labels, start, networkElementDeletionDurationSecondsTotal, networkElementDeletionDurationSeconds)
    
    	mneID, err := uuid.Parse(request.Mneid)
    	if err != nil {
    		log.Error(err)
    		return nil, status.Errorf(codes.Aborted, "%v", err)
    	}
    
    	if err := n.deleteMne(mneID); err != nil {
    		log.Error(err)
    		return nil, status.Errorf(codes.Aborted, "%v", err)
    	}
    	return &mnepb.DeleteMneResponse{
    		Timestamp: time.Now().UnixNano(),
    		Status:    mnepb.Status_STATUS_OK,
    	}, nil
    }
    
    func (n *NetworkElementServer) deleteMne(id uuid.UUID) error {
    
    	mne, err := n.mneService.Get(store.Query{
    
    		ID:   id,
    		Name: id.String(),
    	})
    
    	if err != nil {
    		return err
    	}
    
    	//TODO(PND): check if necessary!
    	if mne == nil {
    		return fmt.Errorf("no network element found")
    	}
    
    	// TODO(PND): check how to handle this!
    	// switch mne.(type) {
    	// case *CsbiNetworkElement:
    	// 	return pnd.handleCsbiDeletion(mne)
    	// default:
    	// 	return pnd.networkElementService.Delete(mne)
    	// }
    
    
    	return n.mneService.Delete(mne)
    
    }
    
    // SubscribePath subscribes to specifc paths of an mne.
    func (n *NetworkElementServer) SubscribePath(request *mnepb.SubscribePathRequest, stream mnepb.NetworkElementService_SubscribePathServer) error {
    	mneID, err := uuid.Parse(request.Mneid)
    	if err != nil {
    		return err
    	}
    
    	if err := n.subscribePath(mneID, request.Sublist); err != nil {
    		return err
    	}
    
    	return nil
    }
    
    func (n *NetworkElementServer) subscribePath(uuid uuid.UUID, subList *mnepb.SubscriptionList) error {
    
    	mne, err := n.mneService.Get(store.Query{
    
    		ID: uuid,
    	})
    	if err != nil {
    		return err
    	}
    
    	mode, err := n.mapModeToAristaFork(subList.GetMode())
    	if err != nil {
    		return err
    	}
    
    	for _, sub := range subList.Subscription {
    		streamMode, err := n.mapStreamModeToAristaFork(sub.GetStreamMode())
    		if err != nil {
    			return err
    		}
    
    		opts := &aGNMI.SubscribeOptions{
    			Mode:           mode,
    			StreamMode:     streamMode,
    			Paths:          [][]string{n.splitStringPath(sub.GetPath())},
    			SampleInterval: sub.SampleInterval,
    		}
    
    		ctx := context.Background()
    		ctx = context.WithValue(ctx, types.CtxKeyOpts, opts)
    
    		if err = mne.Transport().Subscribe(ctx); err != nil {
    			return err
    		}
    	}
    
    	return nil
    }
    
    func (n *NetworkElementServer) splitStringPath(s string) []string {
    	return strings.Split(s, "/")
    }
    
    func (n *NetworkElementServer) mapStreamModeToAristaFork(mode mnepb.StreamMode) (string, error) {
    	switch mode {
    	case mnepb.StreamMode_STREAM_MODE_TARGET_DEFINED:
    		return "target_defined", nil
    	case mnepb.StreamMode_STREAM_MODE_ON_CHANGE:
    		return "on_change", nil
    	case mnepb.StreamMode_STREAM_MODE_SAMPLE:
    		return "sample", nil
    	default:
    		return "", fmt.Errorf("StreamMode of type: %T is not supported", mode)
    	}
    }
    
    func (n *NetworkElementServer) mapModeToAristaFork(mode mnepb.SubscriptionMode) (string, error) {
    	switch mode {
    	case mnepb.SubscriptionMode_SUBSCRIPTION_MODE_STREAM:
    		return "stream", nil
    	case mnepb.SubscriptionMode_SUBSCRIPTION_MODE_ONCE:
    		return "once", nil
    	case mnepb.SubscriptionMode_SUBSCRIPTION_MODE_POLL:
    		return "poll", nil
    	default:
    		return "", fmt.Errorf("SubscriptionMode of type: %T is not supported", mode)
    	}
    }