Skip to content
Snippets Groups Projects
networkElement.go 28.8 KiB
Newer Older
	"time"

	mnepb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/networkelement"
	ppb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/pnd"
	tpb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/transport"
	"code.fbi.h-da.de/danet/gosdn/controller/conflict"
	"code.fbi.h-da.de/danet/gosdn/controller/customerrs"
	"code.fbi.h-da.de/danet/gosdn/controller/interfaces/change"
	"code.fbi.h-da.de/danet/gosdn/controller/interfaces/networkdomain"
	"code.fbi.h-da.de/danet/gosdn/controller/interfaces/networkelement"
	"code.fbi.h-da.de/danet/gosdn/controller/interfaces/plugin"
	"code.fbi.h-da.de/danet/gosdn/controller/metrics"
	"code.fbi.h-da.de/danet/gosdn/controller/nucleus"
	"code.fbi.h-da.de/danet/gosdn/controller/nucleus/types"
	util "code.fbi.h-da.de/danet/gosdn/controller/nucleus/util/gnmi"
	"code.fbi.h-da.de/danet/gosdn/controller/store"
	aGNMI "code.fbi.h-da.de/danet/gosdn/forks/goarista/gnmi"
	gpb "github.com/openconfig/gnmi/proto/gnmi"

	"github.com/openconfig/ygot/ygot"
	"github.com/prometheus/client_golang/prometheus"
	log "github.com/sirupsen/logrus"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/status"
	"google.golang.org/protobuf/proto"
)

// NetworkElementServer represents a NetworkElementServer.
type NetworkElementServer struct {
	mnepb.UnimplementedNetworkElementServiceServer
	mneService    networkelement.Service
	pndService    networkdomain.Service
	pluginService plugin.Service
	changeStore   store.ChangeStore
}

// NewNetworkElementServer returns a new NetWorkElementServer.
func NewNetworkElementServer(mneService networkelement.Service, pndService networkdomain.Service, pluginService plugin.Service, changeStore store.ChangeStore) *NetworkElementServer {
	return &NetworkElementServer{
		mneService:    mneService,
		pndService:    pndService,
		pluginService: pluginService,
		changeStore:   changeStore,
// // Add adds a new network element.
// func (n *NetworkElementServer) Add(ctx context.Context, request *mnepb.AddNetworkElementRequest) (*mnepb.AddNetworkElementResponse, error) {
// 	sbiID, err := uuid.Parse(request.NetworkElement.Sbi.Id)
// 	if err != nil {
// 		return nil, status.Errorf(codes.Aborted, "%v", err)
// 	}
// 	pndID, err := uuid.Parse(request.Pid)
// 	if err != nil {
// 		return nil, status.Errorf(codes.Aborted, "%v", err)
// 	}
// 	id, err := n.addNetworkElement(ctx, request.NetworkElement.NetworkElementName, request.NetworkElement.TransportOption, sbiID, pndID)
// 	if err != nil {
// 		return nil, status.Errorf(codes.Aborted, "%v", err)
// 	}

// 	return &mnepb.AddNetworkElementResponse{
// 		Timestamp:        time.Now().UnixNano(),
// 		Status:           mnepb.Status_STATUS_OK,
// 		NetworkElementId: id.String(),
// 	}, nil
// }

// // TODO(merge): add plugin here, remove sbi
// func (n *NetworkElementServer) addNetworkElement(ctx context.Context, name string, transportOpt *tpb.TransportOption, sbiID uuid.UUID, pndID uuid.UUID) (uuid.UUID, error) {
// 	var sbi southbound.SouthboundInterface
// 	var err error

// 	// Note: cSBI not supported currently, so this is commented fow now.
// 	// Might be needed or removed in the future.
// 	//
// 	// switch t := opt.Type; t {
// 	// case spb.Type_TYPE_CONTAINERISED:
// 	// 	return pnd.handleCsbiEnrolment(name, opt)
// 	// case spb.Type_TYPE_PLUGIN:
// 	// 	sbi, err = pnd.requestPlugin(name, opt)
// 	// 	if err != nil {
// 	// 		return uuid.Nil, err
// 	// 	}
// 	// default:
// 	// 	var err error
// 	// 	sbi, err = n.sbiService.Get(store.Query{ID: sbiID})
// 	// 	if err != nil {
// 	// 		return uuid.Nil, err
// 	// 	}
// 	// }

// 	// TODO(merge): add plugin stuff here!
// 	// sbi, err = n.sbiService.Get(store.Query{ID: sbiID})
// 	// if err != nil {
// 	// 	return uuid.Nil, err
// 	// }

// 	mne, err := nucleus.NewNetworkElement(name, uuid.Nil, transportOpt, sbi, pndID, conflict.Metadata{ResourceVersion: 0})
// 	if err != nil {
// 		return uuid.Nil, err
// 	}

// 	err = n.mneService.Add(mne)
// 	if err != nil {
// 		return uuid.Nil, err
// 	}

// 	if mne.IsTransportValid() {
// 		_, err = n.getPath(ctx, mne.ID(), "/interfaces")
// 		if err != nil {
// 			return uuid.Nil, err
// 		}
// 	}

// 	return mne.ID(), nil
// }

// GetAll returns all stored network elements.
func (n *NetworkElementServer) GetAll(ctx context.Context, request *mnepb.GetAllNetworkElementRequest) (*mnepb.GetAllNetworkElementResponse, error) {
	networkElements, err := n.mneService.GetAll()
	if err != nil {
		return nil, err
	}
	mnes := []*mnepb.ManagedNetworkElement{}
	for _, mne := range networkElements {
		ygotStructAsJSON, err := mne.GetModelAsString()
		if err != nil {
			log.Error(err)
			return nil, status.Errorf(codes.Aborted, "%v", err)
		}

		mnes = append(mnes, &mnepb.ManagedNetworkElement{
			Id:    mne.ID().String(),
			Name:  mne.Name(),
			Model: ygotStructAsJSON,
		})
	}

	return &mnepb.GetAllNetworkElementResponse{
		Timestamp:      time.Now().UnixNano(),
		Status:         mnepb.Status_STATUS_OK,
		NetworkElement: mnes,
	}, nil
}

// Get returns a network element.
func (n *NetworkElementServer) Get(ctx context.Context, request *mnepb.GetNetworkElementRequest) (*mnepb.GetNetworkElementResponse, error) {
	mne, err := n.mneService.Get(store.Query{ID: uuid.MustParse(request.NetworkElementId)})
	}

	ygotStructAsJSON, err := mne.GetModelAsString()
	if err != nil {
		log.Error(err)
		return nil, status.Errorf(codes.Aborted, "%v", err)
	}

	networkElement := &mnepb.ManagedNetworkElement{
		Id:               mne.ID().String(),
		Name:             mne.Name(),
		Model:            ygotStructAsJSON,
		TransportAddress: mne.TransportAddress(),
	}

	return &mnepb.GetNetworkElementResponse{
		Timestamp:      time.Now().UnixNano(),
		Status:         mnepb.Status_STATUS_OK,
		NetworkElement: networkElement,
	}, nil
}

// Update updates a network element.
func (n *NetworkElementServer) Update(ctx context.Context, request *mnepb.UpdateNetworkElementRequest) (*mnepb.UpdateNetworkElementResponse, error) {
	mneID, err := uuid.Parse(request.NetworkElement.Id)
	if err != nil {
		return nil, err
	}

	err = n.mneService.UpdateModel(mneID, request.NetworkElement.Model)
	if err != nil {
		return nil, err
	networkElement, err := n.mneService.Get(store.Query{ID: mneID})
	if err != nil {
		return nil, err
	}

	err = networkelement.EnsureIntendedConfigurationIsAppliedOnNetworkElement(networkElement)
	if err != nil {
		return &mnepb.UpdateNetworkElementResponse{
			Timestamp: time.Now().UnixNano(),
			Status:    mnepb.Status_STATUS_ERROR,
		}, err
	}

	return &mnepb.UpdateNetworkElementResponse{
		Timestamp: time.Now().UnixNano(),
		Status:    mnepb.Status_STATUS_OK,
	}, nil
}

// GetMne gets a specific mne.
func (n *NetworkElementServer) GetMne(ctx context.Context, request *mnepb.GetMneRequest) (*mnepb.GetMneResponse, error) {
	labels := prometheus.Labels{"service": "mne", "rpc": "get"}
	start := metrics.StartHook(labels, grpcRequestsTotal)
	defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)

	pnd, err := n.getPnd(request.Pid)
	if err != nil {
		log.Error(err)
		return nil, status.Errorf(codes.Aborted, "%v", err)
	}

	networkElement, err := n.getMne(request.Mneid)
	if err != nil {
		log.Error(err)
		return nil, status.Errorf(codes.Aborted, "%v", err)
	}

	// TODO(path): This needs some adjustments when we're switching towards a new
	// path request handling.
	mne, err := fillMneBySpecificPath(networkElement, "/", false)
	if err != nil {
		log.Error(err)
		return nil, status.Errorf(codes.Aborted, "%v", err)
	}

	return &mnepb.GetMneResponse{
		Timestamp: time.Now().UnixNano(),
		Pnd: &ppb.PrincipalNetworkDomain{
			Id:          pnd.ID().String(),
			Name:        pnd.GetName(),
			Description: pnd.GetDescription(),
		},
		Mne: mne,
	}, nil
}

// GetFlattenedMne gets a specific mne.
func (n *NetworkElementServer) GetFlattenedMne(ctx context.Context, request *mnepb.GetMneRequest) (*mnepb.GetFlattenedMneResponse, error) {
	labels := prometheus.Labels{"service": "pnd", "rpc": "get"}
	start := metrics.StartHook(labels, grpcRequestsTotal)
	defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)

	networkElement, err := n.getMne(request.Mneid)
	if err != nil {
		log.Error(err)
		return nil, status.Errorf(codes.Aborted, "%v", err)
	}

	return &mnepb.GetFlattenedMneResponse{
		Timestamp: time.Now().UnixNano(),
		Mne: &mnepb.FlattenedManagedNetworkElement{
			Id:       networkElement.ID().String(),
			Name:     networkElement.Name(),
			Pid:      networkElement.PndID().String(),
			Pluginid: networkElement.GetPlugin().ID().String(),
		},
	}, nil
}

func (n *NetworkElementServer) getMne(identifier string) (networkelement.NetworkElement, error) {
	id, err := uuid.Parse(identifier)
	if err != nil {
		id = uuid.Nil
	}

	mne, err := n.mneService.Get(store.Query{
		ID: id,
	})
	if mne == nil {
		return nil, fmt.Errorf("no network element found")
	}
	if err != nil {
		return nil, err
	}

	return mne, nil
}

func (n *NetworkElementServer) getPnd(identifier string) (networkdomain.NetworkDomain, error) {
	pid, err := uuid.Parse(identifier)
	if err != nil {
		return nil, err
	}

	pnd, err := n.pndService.Get(store.Query{ID: pid})
	if err != nil {
		log.Error(err)
		return nil, status.Errorf(codes.Aborted, "%v", err)
	}

	return pnd, nil
}

// GetMneList returns a list of existing mnes.
func (n *NetworkElementServer) GetMneList(ctx context.Context, request *mnepb.GetMneListRequest) (*mnepb.GetMneListResponse, error) {
	labels := prometheus.Labels{"service": "mne", "rpc": "get"}
	start := metrics.StartHook(labels, grpcRequestsTotal)
	defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)

	pnd, err := n.getPnd(request.Pid)
	if err != nil {
		log.Error(err)
		return nil, status.Errorf(codes.Aborted, "%v", err)
	}

	storedMNE, err := n.mneService.GetAll()
	if err != nil {
		return nil, err
	}

	mnes := make([]*mnepb.ManagedNetworkElement, len(storedMNE))
	for i, networkElement := range storedMNE {
		mne, err := fillMneBySpecificPath(networkElement, "/", false)
		if err != nil {
			log.Error(err)
			return nil, status.Errorf(codes.Aborted, "%v", err)
		}
		mnes[i] = mne
	}

	return &mnepb.GetMneListResponse{
		Timestamp: time.Now().UnixNano(),
		Pnd: &ppb.PrincipalNetworkDomain{
			Id:          pnd.ID().String(),
			Name:        pnd.GetName(),
			Description: pnd.GetDescription(),
		},
		Mne: mnes,
	}, nil
}

// GetFlattenedMneList returns a list of existing mnes.
func (n *NetworkElementServer) GetFlattenedMneList(ctx context.Context, request *mnepb.GetFlattenedMneListRequest) (*mnepb.GetFlattenedMneListResponse, error) {
	labels := prometheus.Labels{"service": "mne", "rpc": "get"}
	start := metrics.StartHook(labels, grpcRequestsTotal)
	defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)

	pnd, err := n.getPnd(request.Pid)
	if err != nil {
		log.Error(err)
		return nil, status.Errorf(codes.Aborted, "%v", err)
	}

	mnes, err := n.mneService.GetAllAsLoaded()
	if err != nil {
		return nil, err
	}

	flattenedMnes := make([]*mnepb.FlattenedManagedNetworkElement, len(mnes))
	for i, mne := range mnes {
		mne := &mnepb.FlattenedManagedNetworkElement{
			Id:       mne.ID,
			Name:     mne.Name,
			Pluginid: mne.Plugin,
			Pid:      mne.PndID,
		}
		flattenedMnes[i] = mne
	}

	return &mnepb.GetFlattenedMneListResponse{
		Timestamp: time.Now().UnixNano(),
		Pnd: &ppb.PrincipalNetworkDomain{
			Id:          pnd.ID().String(),
			Name:        pnd.GetName(),
			Description: pnd.GetDescription(),
		},
		Mne: flattenedMnes,
	}, nil
}

func fillMneBySpecificPath(nme networkelement.NetworkElement, path string, requestForIntendedState bool) (*mnepb.ManagedNetworkElement, error) {
	gnmiPath, err := ygot.StringToStructuredPath(path)
	if err != nil {
		log.Error(err)
		return nil, status.Errorf(codes.Aborted, "%v", err)
	}

	notifications, err := nme.GetPlugin().GetNode(gnmiPath, requestForIntendedState)
	if err != nil {
		return nil, status.Errorf(codes.Aborted, "%v", err)
	}

	mne := &mnepb.ManagedNetworkElement{
		Id:              nme.ID().String(),
		Name:            nme.Name(),
		MneNotification: notifications,
	}

	return mne, nil
}

// GetPath gets an actual state of the path on a mne.
func (n *NetworkElementServer) GetPath(ctx context.Context, request *mnepb.GetPathRequest) (*mnepb.GetPathResponse, error) {
	labels := prometheus.Labels{"service": "mne", "rpc": "get"}
	start := metrics.StartHook(labels, grpcRequestsTotal)
	defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)

	pnd, err := n.getPnd(request.Pid)
	if err != nil {
		log.Error(err)
		return nil, status.Errorf(codes.Aborted, "%v", err)
	}

	networkElement, err := n.getMne(request.Mneid)
	if err != nil {
		log.Error(err)
		return nil, status.Errorf(codes.Aborted, "%v", err)
	}

	// In case we get the path from grpc-gateway we have to replace
	path := strings.ReplaceAll(request.Path, "||", "/")

	resp, err := n.getPath(ctx, networkElement, path)
	if err != nil {
		log.Error(err)
		return nil, status.Errorf(codes.Aborted, "%v", err)
	}

	return &mnepb.GetPathResponse{
		Timestamp: time.Now().UnixNano(),
		Pnd: &ppb.PrincipalNetworkDomain{
			Id:          pnd.ID().String(),
			Name:        pnd.GetName(),
			Description: pnd.GetDescription(),
		},
		MneNotification: resp.(*gpb.GetResponse).Notification,
	}, nil
}

func (n *NetworkElementServer) getPath(ctx context.Context, mne networkelement.NetworkElement, path string) (proto.Message, error) {
	res, err := mne.Transport().Get(ctx, path)
	if err != nil {
		return nil, err
	}
	resp, ok := res.(proto.Message)
	if !ok {
		return nil, &customerrs.InvalidTypeAssertionError{
			Value: res,
			Type:  (*proto.Message)(nil),
		}
	}
	err = mne.ProcessResponse(resp)
	if err != nil {
		return nil, err
	}

	return resp, nil
}

// GetIntendedPath gets a path as the intended state stored in the storage.
func (n *NetworkElementServer) GetIntendedPath(ctx context.Context, request *mnepb.GetIntendedPathRequest) (*mnepb.GetIntendedPathResponse, error) {
	pnd, err := n.getPnd(request.Pid)
		log.Error(err)
		return nil, status.Errorf(codes.Aborted, "%v", err)
	// In case we get the path from grpc-gateway we have to replace
	intendedPath := strings.ReplaceAll(request.IntendedPath, "||", "/")

	networkElement, err := n.getMne(request.Mneid)
	if err != nil {
		log.Error(err)
		return nil, status.Errorf(codes.Aborted, "%v", err)
	}

	mne, err := fillMneBySpecificPath(networkElement, intendedPath, true)
	return &mnepb.GetIntendedPathResponse{
		Timestamp: time.Now().UnixNano(),
		Pnd: &ppb.PrincipalNetworkDomain{
			Id:          pnd.ID().String(),
			Name:        pnd.GetName(),
			Description: pnd.GetDescription(),
		},
		MneNotification: mne.MneNotification,
	}, nil
}

// GetChange gets a specific change of a mne.
func (n *NetworkElementServer) GetChange(ctx context.Context, request *mnepb.GetChangeRequest) (*mnepb.GetChangeResponse, error) {
	labels := prometheus.Labels{"service": "mne", "rpc": "get"}
	start := metrics.StartHook(labels, grpcRequestsTotal)
	defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)

	pnd, err := n.getPnd(request.Pid)
	if err != nil {
		log.Error(err)
		return nil, status.Errorf(codes.Aborted, "%v", err)
	}

	changes, err := n.fillChanges(false, request.Cuid...)
	if err != nil {
		log.Error(err)
		return nil, status.Errorf(codes.Aborted, "%v", err)
	}

	return &mnepb.GetChangeResponse{
		Timestamp: time.Now().UnixNano(),
		Pnd: &ppb.PrincipalNetworkDomain{
			Id:          pnd.ID().String(),
			Name:        pnd.GetName(),
			Description: pnd.GetDescription(),
		},
		Change: changes,
	}, nil
}

// GetChangeList gets all existing changes.
func (n *NetworkElementServer) GetChangeList(ctx context.Context, request *mnepb.GetChangeListRequest) (*mnepb.GetChangeListResponse, error) {
	labels := prometheus.Labels{"service": "mne", "rpc": "get"}
	start := metrics.StartHook(labels, grpcRequestsTotal)
	defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)

	pnd, err := n.getPnd(request.Pid)
	if err != nil {
		log.Error(err)
		return nil, status.Errorf(codes.Aborted, "%v", err)
	}

	changes, err := n.fillChanges(true, "")
	if err != nil {
		log.Error(err)
		return nil, status.Errorf(codes.Aborted, "%v", err)
	}

	return &mnepb.GetChangeListResponse{
		Timestamp: time.Now().UnixNano(),
		Pnd: &ppb.PrincipalNetworkDomain{
			Id:          pnd.ID().String(),
			Name:        pnd.GetName(),
			Description: pnd.GetDescription(),
		},
		Change: changes,
	}, nil
}

func (n *NetworkElementServer) fillChanges(all bool, cuid ...string) ([]*mnepb.Change, error) {
	var changeList []uuid.UUID

	switch all {
	case true:
		changeList = n.changeStore.Pending()
		changeList = append(changeList, n.changeStore.Committed()...)
	default:
		var err error
		if len(cuid) == 0 {
			return nil, &customerrs.InvalidParametersError{
				Func:  n.fillChanges,
				Param: "length of 'mneID' cannot be '0' when 'all' is set to 'false'",
			}
		}
		changeList, err = stringArrayToUUIDs(cuid)
		if err != nil {
			log.Error(err)
			return nil, status.Errorf(codes.Aborted, "%v", err)
		}
	}

	changes := make([]*mnepb.Change, len(changeList))
	for i, ch := range changeList {
		c, err := n.changeStore.GetChange(ch)
		if err != nil {
			log.Error(err)
			return nil, status.Errorf(codes.Aborted, "%v", err)
		}

		// Diff could be added here.
		changes[i] = &mnepb.Change{
			Id:    ch.String(),
			Age:   c.Age().Microseconds(),
			State: c.State(),
		}
	}
	return changes, nil
}

func stringArrayToUUIDs(sid []string) ([]uuid.UUID, error) {
	UUIDs := make([]uuid.UUID, len(sid))
	for i, id := range sid {
		parsed, err := uuid.Parse(id)
		if err != nil {
			log.Error(err)
			return nil, status.Errorf(codes.Aborted, "%v", err)
		}
		UUIDs[i] = parsed
	}
	return UUIDs, nil
}

// SetMneList adds the list of mnes to the storage.
func (n *NetworkElementServer) SetMneList(ctx context.Context, request *mnepb.SetMneListRequest) (*mnepb.SetMneListResponse, error) {
	labels := prometheus.Labels{"service": "mne", "rpc": "set"}
	start := metrics.StartHook(labels, grpcRequestsTotal)
	defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)

	pndID, err := uuid.Parse(request.Pid)
	if err != nil {
		log.Error(err)
		return nil, status.Errorf(codes.Aborted, "%v", err)
	}

	networkElementIDs := make([]uuid.UUID, 0, len(request.Mne))
	for _, r := range request.Mne {
		pluginId, err := uuid.Parse(r.GetPluginId())
		if err != nil {
			log.Error(err)
			return nil, status.Errorf(codes.Aborted, "%v", err)
		}
		mneID, err := n.addMne(ctx, r.MneName, r.TransportOption, nil, pluginId, pndID)
		if err != nil {
			log.Error(err)
			return nil, status.Errorf(codes.Aborted, "%v", err)
		}
		networkElementIDs = append(networkElementIDs, mneID)
	}

	r := make([]*mnepb.SetResponse, len(networkElementIDs))
	for i, mneID := range networkElementIDs {
		r[i] = &mnepb.SetResponse{Id: mneID.String(), Status: mnepb.Status_STATUS_OK}
	}

	return &mnepb.SetMneListResponse{
		Timestamp: time.Now().UnixNano(),
		Status:    mnepb.Status_STATUS_OK,
		Responses: r,
	}, nil
}

func (n *NetworkElementServer) addMne(ctx context.Context, name string, opt *tpb.TransportOption, requestPluginFunc func(uuid.UUID) (plugin.Plugin, error), pluginID uuid.UUID, pndID uuid.UUID, optionalNetworkElementID ...uuid.UUID) (uuid.UUID, error) {
	var err error

	// Note: cSBI not supported currently, so this is commented fow now.
	// Might be needed or removed in the future.
	//
	// switch t := opt.Type; t {
	// case spb.Type_TYPE_CONTAINERISED:
	// 	return n.handleCsbiEnrolment(name, opt)
	// case spb.Type_TYPE_PLUGIN:
	// 	sbi, err = n.requestPlugin(name, opt)
	// 	if err != nil {
	// 		return uuid.Nil, err
	// 	}
	// default:
	// 	var err error
	// 	sbi, err = pnd.southboundService.Get(store.Query{ID: sid})
	// 	if err != nil {
	// 		return uuid.Nil, err
	// 	}
	// }

	networkElementID := uuid.Nil
	if len(optionalNetworkElementID) > 0 {
		networkElementID = optionalNetworkElementID[0]
	}

	if requestPluginFunc == nil {
		requestPluginFunc = n.pluginService.RequestPlugin
	}

	plugin, err := requestPluginFunc(pluginID)
	if err != nil {
		return uuid.Nil, err
	}

	mne, err := nucleus.NewNetworkElement(name, networkElementID, opt, pndID, plugin, conflict.Metadata{ResourceVersion: 0})
	if err != nil {
		return uuid.Nil, err
	}

	if mne.IsTransportValid() {
		_, err = n.getPath(ctx, mne, "/")
		if err != nil {
			return uuid.Nil, err
		}

		err = n.mneService.Add(mne)
		if err != nil {
			return uuid.Nil, err
		}
	} else {
		return uuid.Nil, status.Errorf(codes.InvalidArgument, "invalid transport data provided")
	}

	return mne.ID(), nil
}

// SetChangeList sets a list of changes.
func (n *NetworkElementServer) SetChangeList(ctx context.Context, request *mnepb.SetChangeListRequest) (*mnepb.SetChangeListResponse, error) {
	labels := prometheus.Labels{"service": "mne", "rpc": "set"}
	start := metrics.StartHook(labels, grpcRequestsTotal)
	defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)

	responses := make([]*mnepb.SetResponse, len(request.Change))

	for i, r := range request.Change {
		cuid, err := uuid.Parse(r.Cuid)
		if err != nil {
			log.Error(err)
			return nil, status.Errorf(codes.Aborted, "%v", err)
		}
		switch r.Op {
		case mnepb.Operation_OPERATION_COMMIT:
			if err := n.Commit(cuid); err != nil {
				log.Error(err)
				return nil, status.Errorf(codes.Aborted, "%v", err)
			}
		case mnepb.Operation_OPERATION_CONFIRM:
			if err := n.Confirm(cuid); err != nil {
				log.Error(err)
				return nil, status.Errorf(codes.Aborted, "%v", err)
			}
		default:
			return nil, &customerrs.InvalidParametersError{
				Param: r.Op,
			}
		}

		responses[i] = &mnepb.SetResponse{
			Id:     cuid.String(),
			Status: mnepb.Status_STATUS_OK,
		}
	}
	return &mnepb.SetChangeListResponse{
		Timestamp: time.Now().UnixNano(),
		Status:    mnepb.Status_STATUS_OK,
		Responses: responses,
	}, nil
}

// Commit calls commit on the pending change with ID.
func (n *NetworkElementServer) Commit(u uuid.UUID) error {
	ch, err := n.changeStore.GetChange(u)
	if err != nil {
		return err
	}

	if err := ch.Commit(); err != nil {
		return err
	}

	// Set all the changes within the network elements model
	networkElement, err := n.mneService.Get(store.Query{ID: ch.AssociatedDeviceID()})
	if err != nil {
		return err
	}
	diff, err := networkElement.GetPlugin().Diff(ch.PreviousState(), ch.IntendedState())
	if err != nil {
		return err
	}
	for _, update := range diff.GetUpdate() {
		if err := networkElement.GetPlugin().SetNode(update.GetPath(), update.GetVal()); err != nil {
			return err
		}
	}
	for _, deletePath := range diff.GetDelete() {
		if err := networkElement.GetPlugin().DeleteNode(deletePath); err != nil {
			return err
		}
	}

	// update the network element
	return n.mneService.Update(networkElement)
}

// Confirm calls confirm on pending the pending change with ID.
func (n *NetworkElementServer) Confirm(u uuid.UUID) error {
	ch, err := n.changeStore.GetChange(u)
	if err != nil {
		return err
	}
	return ch.Confirm()
}

// SetPathList sets a list of paths.
func (n *NetworkElementServer) SetPathList(ctx context.Context, request *mnepb.SetPathListRequest) (*mnepb.SetPathListResponse, error) {
	labels := prometheus.Labels{"service": "mne", "rpc": "set"}
	start := metrics.StartHook(labels, grpcRequestsTotal)
	defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)

	responses := make([]*mnepb.SetResponse, len(request.ChangeRequest))

	for i, r := range request.ChangeRequest {
		mneID, err := uuid.Parse(r.Mneid)
		if err != nil {
			log.Error(err)
			return nil, status.Errorf(codes.Aborted, "%v", err)
		}
		cid, err := n.ChangeMNE(mneID, r.ApiOp, r.Path, r.Value)
		if err != nil {
			log.Error(err)
			return nil, status.Errorf(codes.Aborted, "%v", err)
		}

		responses[i] = &mnepb.SetResponse{
			Status: mnepb.Status_STATUS_OK,
			Id:     cid.String(),
		}
	}
	return &mnepb.SetPathListResponse{
		Timestamp: time.Now().UnixNano(),
		Status:    mnepb.Status_STATUS_OK,
		Responses: responses,
	}, nil
}

// ChangeMNE creates a change from the provided Operation, path and value.
// The Change is Pending and times out after the specified timeout period.
func (n *NetworkElementServer) ChangeMNE(duid uuid.UUID, operation mnepb.ApiOperation, path string, value ...string) (uuid.UUID, error) {
	mne, err := n.mneService.Get(store.Query{
		ID: duid,
	})
	if err != nil {
		return uuid.Nil, err
	}

	p, err := ygot.StringToStructuredPath(path)
	if err != nil {
		return uuid.Nil, err
	}

	plugin := mne.GetPlugin()

	validatedChangeModel, err := plugin.ValidateChange(operation, p, []byte(value[0]))
	if err != nil {
		return uuid.Nil, err
	}

	filteredMarshaledModel, err := plugin.PruneConfigFalse(validatedChangeModel)
	if err != nil {
		return uuid.Nil, err
	}

	if operation != mnepb.ApiOperation_API_OPERATION_DELETE && len(value) != 1 {
		return uuid.Nil, &customerrs.InvalidParametersError{
			Func:  n.ChangeMNE,
			Param: value,
		}
	}

	currentModel, err := mne.GetModelAsFilteredCopy()
	if err != nil {
		return uuid.Nil, err
	diff, err := plugin.Diff(currentModel, filteredMarshaledModel)
	if err != nil {
		return uuid.Nil, err
	}

	if util.IsGNMINotificationEmpty(diff) {
		return uuid.Nil, customerrs.NoNewChangesError{Original: string(currentModel), Modified: string(filteredMarshaledModel)}
	}

	callback := func(original, modified []byte) error {
		ctx := context.WithValue(context.Background(), types.CtxKeyOperation, operation) // nolint
		payload := change.Payload{Original: original, Modified: modified, Diff: diff}
		return mne.Transport().Set(ctx, payload)
	}

	ch := nucleus.NewChange(duid, currentModel, filteredMarshaledModel, diff, callback)

	if err := n.changeStore.Add(ch); err != nil {
		return uuid.Nil, err
	}

	return ch.ID(), nil
}

// DeleteMne deletes a mne.
func (n *NetworkElementServer) DeleteMne(ctx context.Context, request *mnepb.DeleteMneRequest) (*mnepb.DeleteMneResponse, error) {
	mneID, err := uuid.Parse(request.Mneid)
	if err != nil {
		log.Error(err)
		return nil, status.Errorf(codes.Aborted, "%v", err)
	}

	if err := n.deleteMne(mneID); err != nil {
		log.Error(err)
		return nil, status.Errorf(codes.Aborted, "%v", err)
	}
	return &mnepb.DeleteMneResponse{
		Timestamp: time.Now().UnixNano(),
		Status:    mnepb.Status_STATUS_OK,
	}, nil
}

func (n *NetworkElementServer) deleteMne(id uuid.UUID) error {
	mne, err := n.mneService.Get(store.Query{
		ID:   id,
		Name: id.String(),
	})

	if err != nil {
		return err
	}

	// Note: cSBI not supported currently, so this is commented fow now.
	// Might be needed or removed in the future.
	//
	// switch mne.(type) {
	// case *CsbiNetworkElement:
	// 	return pnd.handleCsbiDeletion(mne)
	// default:
	// 	return pnd.networkElementService.Delete(mne)
	// }

	return n.mneService.Delete(mne)
}

// SubscribePath subscribes to specifc paths of an mne.
func (n *NetworkElementServer) SubscribePath(request *mnepb.SubscribePathRequest, stream mnepb.NetworkElementService_SubscribePathServer) error {
	networkElement, err := n.getMne(request.Mneid)
		log.Error(err)
		return status.Errorf(codes.Aborted, "%v", err)
	if err := n.subscribePath(networkElement, request.Sublist); err != nil {
func (n *NetworkElementServer) subscribePath(networkElement networkelement.NetworkElement, subList *mnepb.SubscriptionList) error {
	mode, err := n.mapModeToAristaFork(subList.GetMode())
	if err != nil {
		return err
	}

	for _, sub := range subList.Subscription {
		streamMode, err := n.mapStreamModeToAristaFork(sub.GetStreamMode())
		if err != nil {
			return err
		}

		opts := &aGNMI.SubscribeOptions{
			Mode:           mode,
			StreamMode:     streamMode,
			Paths:          [][]string{n.splitStringPath(sub.GetPath())},
			SampleInterval: sub.SampleInterval,
		}

		ctx := context.Background()
		ctx = context.WithValue(ctx, types.CtxKeyOpts, opts)

		if err = networkElement.Transport().Subscribe(ctx); err != nil {
			return err
		}
	}

	return nil
}

func (n *NetworkElementServer) splitStringPath(s string) []string {
	return strings.Split(s, "/")
}

func (n *NetworkElementServer) mapStreamModeToAristaFork(mode mnepb.StreamMode) (string, error) {
	switch mode {
	case mnepb.StreamMode_STREAM_MODE_TARGET_DEFINED:
		return "target_defined", nil
	case mnepb.StreamMode_STREAM_MODE_ON_CHANGE:
		return "on_change", nil
	case mnepb.StreamMode_STREAM_MODE_SAMPLE:
		return "sample", nil
	default:
		return "", fmt.Errorf("StreamMode of type: %T is not supported", mode)
	}
}

func (n *NetworkElementServer) mapModeToAristaFork(mode mnepb.SubscriptionMode) (string, error) {
	switch mode {
	case mnepb.SubscriptionMode_SUBSCRIPTION_MODE_STREAM:
		return "stream", nil
	case mnepb.SubscriptionMode_SUBSCRIPTION_MODE_ONCE:
		return "once", nil
	case mnepb.SubscriptionMode_SUBSCRIPTION_MODE_POLL:
		return "poll", nil
	default:
		return "", fmt.Errorf("SubscriptionMode of type: %T is not supported", mode)
	}
}