Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
pnd.go 18.74 KiB
package server

import (
	"context"
	"fmt"
	"strings"
	"time"

	ppb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/pnd"
	spb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/southbound"
	"code.fbi.h-da.de/danet/gosdn/controller/interfaces/networkdomain"
	"code.fbi.h-da.de/danet/gosdn/controller/metrics"
	"code.fbi.h-da.de/danet/gosdn/controller/nucleus"
	"code.fbi.h-da.de/danet/gosdn/controller/nucleus/errors"
	"code.fbi.h-da.de/danet/gosdn/controller/store"
	"github.com/google/uuid"
	"github.com/openconfig/gnmi/proto/gnmi"
	"github.com/openconfig/ygot/ygot"
	"github.com/openconfig/ygot/ytypes"
	"github.com/prometheus/client_golang/prometheus"
	log "github.com/sirupsen/logrus"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/status"
)

// PndServer implements a pnd server
type PndServer struct {
	ppb.UnimplementedPndServiceServer
	pndStore networkdomain.PndStore
}

// NewPndServer receives a pndStore and returns a new pndServer.
func NewPndServer(pndStore networkdomain.PndStore) *PndServer {
	return &PndServer{
		pndStore: pndStore,
	}
}

// GetOnd gets a specific ond
func (p PndServer) GetOnd(ctx context.Context, request *ppb.GetOndRequest) (*ppb.GetOndResponse, error) {
	labels := prometheus.Labels{"service": "pnd", "rpc": "get"}
	start := metrics.StartHook(labels, grpcRequestsTotal)
	defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
	pid, err := uuid.Parse(request.Pid)
	if err != nil {
		return nil, handleRPCError(labels, err)
	}

	pnd, err := p.pndStore.Get(store.Query{ID: pid})
	if err != nil {
		log.Error(err)
		return nil, status.Errorf(codes.Aborted, "%v", err)
	}

	ond, err := fillOndBySpecificPath(pnd, request.Did, "/")
	if err != nil {
		log.Error(err)
		return nil, status.Errorf(codes.Aborted, "%v", err)
	}

	return &ppb.GetOndResponse{
		Timestamp: time.Now().UnixNano(),
		Pnd: &ppb.PrincipalNetworkDomain{
			Id:          pnd.ID().String(),
			Name:        pnd.GetName(),
			Description: pnd.GetDescription(),
		},
		Ond: ond,
	}, nil
}

// GetOndList returns a list of existing onds
func (p PndServer) GetOndList(ctx context.Context, request *ppb.GetOndListRequest) (*ppb.GetOndListResponse, error) {
	labels := prometheus.Labels{"service": "pnd", "rpc": "get"}
	start := metrics.StartHook(labels, grpcRequestsTotal)
	defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
	pid, err := uuid.Parse(request.Pid)
	if err != nil {
		return nil, handleRPCError(labels, err)
	}

	pnd, err := p.pndStore.Get(store.Query{ID: pid})
	if err != nil {
		log.Error(err)
		return nil, status.Errorf(codes.Aborted, "%v", err)
	}

	onds := make([]*ppb.OrchestratedNetworkingDevice, len(pnd.Devices()))
	for i, ond := range pnd.Devices() {
		ond, err := fillOndBySpecificPath(pnd, ond.ID().String(), "/")
		if err != nil {
			log.Error(err)
			return nil, status.Errorf(codes.Aborted, "%v", err)
		}
		onds[i] = ond
	}

	return &ppb.GetOndListResponse{
		Timestamp: time.Now().UnixNano(),
		Pnd: &ppb.PrincipalNetworkDomain{
			Id:          pnd.ID().String(),
			Name:        pnd.GetName(),
			Description: pnd.GetDescription(),
		},
		Ond: onds,
	}, nil
}

func fillOndBySpecificPath(pnd networkdomain.NetworkDomain, did string, path string) (*ppb.OrchestratedNetworkingDevice, error) {
	d, err := pnd.GetDevice(did)
	if err != nil {
		log.Error(err)
		return nil, status.Errorf(codes.Aborted, "%v", err)
	}

	gnmiPath, err := ygot.StringToStructuredPath(path)
	if err != nil {
		log.Error(err)
		return nil, status.Errorf(codes.Aborted, "%v", err)
	}

	opts := []ytypes.GetNodeOpt{
		&ytypes.GetHandleWildcards{},
		&ytypes.GetPartialKeyMatch{},
	}
	nodes, err := ytypes.GetNode(d.SBI().Schema().RootSchema(), d.GetModel(), gnmiPath, opts...)
	if err != nil {
		log.Error(err)
		return nil, status.Errorf(codes.Aborted, "%v", err)
	}

	devices := make([]*gnmi.Notification, len(nodes))
	for i, node := range nodes {
		dev, err := genGnmiNotification(gnmiPath, node.Data)
		if err != nil {
			log.Error(err)
			return nil, status.Errorf(codes.Aborted, "%v", err)
		}

		devices[i] = dev
	}

	sbi := spb.SouthboundInterface{}
	if d.SBI() != nil {
		sbi.Id = d.SBI().ID().String()
		sbi.Type = d.SBI().Type()
	}

	ond := &ppb.OrchestratedNetworkingDevice{
		Id:     d.ID().String(),
		Name:   d.Name(),
		Device: devices,
		Sbi:    &sbi,
	}

	return ond, nil
}

func genGnmiNotification(path *gnmi.Path, val any) (*gnmi.Notification, error) {
	typedVal, err := ygot.EncodeTypedValue(val, gnmi.Encoding_JSON_IETF)
	if err != nil {
		return nil, err
	}
	return &gnmi.Notification{
		Timestamp: time.Now().UnixNano(),
		Update: []*gnmi.Update{
			{
				Path: &gnmi.Path{
					Elem: path.GetElem(),
				},
				Val: typedVal,
			},
		},
	}, nil
}

// GetSbi gets a specific sbi
func (p PndServer) GetSbi(ctx context.Context, request *ppb.GetSbiRequest) (*ppb.GetSbiResponse, error) {
	labels := prometheus.Labels{"service": "pnd", "rpc": "get"}
	start := metrics.StartHook(labels, grpcRequestsTotal)
	defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
	pid, err := uuid.Parse(request.Pid)
	if err != nil {
		return nil, handleRPCError(labels, err)
	}

	pnd, err := p.pndStore.Get(store.Query{ID: pid})
	if err != nil {
		log.Error(err)
		return nil, status.Errorf(codes.Aborted, "%v", err)
	}

	sbiID, err := uuid.Parse(request.Sid)
	if err != nil {
		return nil, err
	}

	sbi, err := pnd.GetSBI(sbiID)
	if err != nil {
		log.Error(err)
		return nil, status.Errorf(codes.Aborted, "%v", err)
	}
	return &ppb.GetSbiResponse{
		Timestamp: time.Now().UnixNano(),
		Pnd: &ppb.PrincipalNetworkDomain{
			Id:          pnd.ID().String(),
			Name:        pnd.GetName(),
			Description: pnd.GetDescription(),
		},
		Sbi: &spb.SouthboundInterface{
			Id:   sbiID.String(),
			Type: sbi.Type(),
		},
	}, nil
}

// GetSbiList gets all existing sbis
func (p PndServer) GetSbiList(ctx context.Context, request *ppb.GetSbiListRequest) (*ppb.GetSbiListResponse, error) {
	labels := prometheus.Labels{"service": "pnd", "rpc": "get"}
	start := metrics.StartHook(labels, grpcRequestsTotal)
	defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
	pid, err := uuid.Parse(request.Pid)
	if err != nil {
		return nil, handleRPCError(labels, err)
	}

	pnd, err := p.pndStore.Get(store.Query{ID: pid})
	if err != nil {
		log.Error(err)
		return nil, status.Errorf(codes.Aborted, "%v", err)
	}
	sbis, err := fillSbis(pnd)
	if err != nil {
		log.Error(err)
		return nil, status.Errorf(codes.Aborted, "%v", err)
	}
	return &ppb.GetSbiListResponse{
		Timestamp: time.Now().UnixNano(),
		Pnd: &ppb.PrincipalNetworkDomain{
			Id:          pnd.ID().String(),
			Name:        pnd.GetName(),
			Description: pnd.GetDescription(),
		},
		Sbi: sbis,
	}, nil
}

func fillSbis(pnd networkdomain.NetworkDomain) ([]*spb.SouthboundInterface, error) {
	sbis, err := pnd.GetSBIs()
	if err != nil {
		return nil, err
	}

	fmt.Printf("SBIS: %+v\n", sbis)

	sbisToReturn := []*spb.SouthboundInterface{}

	for _, sbi := range sbis {
		sbisToReturn = append(sbisToReturn, &spb.SouthboundInterface{
			Id: sbi.ID().String(),
		})
	}

	fmt.Printf("SBIS: %+v\n", sbisToReturn)

	return sbisToReturn, nil
}

func stringArrayToUUIDs(sid []string) ([]uuid.UUID, error) {
	UUIDs := make([]uuid.UUID, len(sid))
	for i, id := range sid {
		parsed, err := uuid.Parse(id)
		if err != nil {
			log.Error(err)
			return nil, status.Errorf(codes.Aborted, "%v", err)
		}
		UUIDs[i] = parsed
	}
	return UUIDs, nil
}

// GetPath gets a path on a ond
func (p PndServer) GetPath(ctx context.Context, request *ppb.GetPathRequest) (*ppb.GetPathResponse, error) {
	labels := prometheus.Labels{"service": "pnd", "rpc": "get"}
	start := metrics.StartHook(labels, grpcRequestsTotal)
	defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
	pid, err := uuid.Parse(request.Pid)
	if err != nil {
		return nil, handleRPCError(labels, err)
	}

	pnd, err := p.pndStore.Get(store.Query{ID: pid})
	if err != nil {
		log.Error(err)
		return nil, status.Errorf(codes.Aborted, "%v", err)
	}
	duid, err := uuid.Parse(request.Did)
	if err != nil {
		log.Error(err)
		return nil, status.Errorf(codes.Aborted, "%v", err)
	}

	// In case we get the path from grpc-gateway we have to replace
	path := strings.ReplaceAll(request.Path, "||", "/")

	_, err = pnd.Request(duid, path)
	if err != nil {
		log.Error(err)
		return nil, status.Errorf(codes.Aborted, "%v", err)
	}

	ond, err := fillOndBySpecificPath(pnd, request.Did, path)
	if err != nil {
		log.Error(err)
		return nil, status.Errorf(codes.Aborted, "%v", err)
	}

	return &ppb.GetPathResponse{
		Timestamp: time.Now().UnixNano(),
		Pnd: &ppb.PrincipalNetworkDomain{
			Id:          pnd.ID().String(),
			Name:        pnd.GetName(),
			Description: pnd.GetDescription(),
		},
		Device: ond.Device,
	}, nil
}

// GetChange gets a specific change of a ond
func (p PndServer) GetChange(ctx context.Context, request *ppb.GetChangeRequest) (*ppb.GetChangeResponse, error) {
	labels := prometheus.Labels{"service": "pnd", "rpc": "get"}
	start := metrics.StartHook(labels, grpcRequestsTotal)
	defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
	pid, err := uuid.Parse(request.Pid)
	if err != nil {
		return nil, handleRPCError(labels, err)
	}

	pnd, err := p.pndStore.Get(store.Query{ID: pid})
	if err != nil {
		log.Error(err)
		return nil, status.Errorf(codes.Aborted, "%v", err)
	}
	changes, err := fillChanges(pnd, false, request.Cuid...)
	if err != nil {
		log.Error(err)
		return nil, status.Errorf(codes.Aborted, "%v", err)
	}
	return &ppb.GetChangeResponse{
		Timestamp: time.Now().UnixNano(),
		Pnd: &ppb.PrincipalNetworkDomain{
			Id:          pnd.ID().String(),
			Name:        pnd.GetName(),
			Description: pnd.GetDescription(),
		},
		Change: changes,
	}, nil
}

// GetChangeList gets all existing changes
func (p PndServer) GetChangeList(ctx context.Context, request *ppb.GetChangeListRequest) (*ppb.GetChangeListResponse, error) {
	labels := prometheus.Labels{"service": "pnd", "rpc": "get"}
	start := metrics.StartHook(labels, grpcRequestsTotal)
	defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
	pid, err := uuid.Parse(request.Pid)
	if err != nil {
		return nil, handleRPCError(labels, err)
	}

	pnd, err := p.pndStore.Get(store.Query{ID: pid})
	if err != nil {
		log.Error(err)
		return nil, status.Errorf(codes.Aborted, "%v", err)
	}
	changes, err := fillChanges(pnd, true, "")
	if err != nil {
		log.Error(err)
		return nil, status.Errorf(codes.Aborted, "%v", err)
	}
	return &ppb.GetChangeListResponse{
		Timestamp: time.Now().UnixNano(),
		Pnd: &ppb.PrincipalNetworkDomain{
			Id:          pnd.ID().String(),
			Name:        pnd.GetName(),
			Description: pnd.GetDescription(),
		},
		Change: changes,
	}, nil
}

func fillChanges(pnd networkdomain.NetworkDomain, all bool, cuid ...string) ([]*ppb.Change, error) {
	var changeList []uuid.UUID

	switch all {
	case true:
		changeList = pnd.PendingChanges()
		changeList = append(changeList, pnd.CommittedChanges()...)
	default:
		var err error
		if len(cuid) == 0 {
			return nil, &errors.ErrInvalidParameters{
				Func:  fillChanges,
				Param: "length of 'did' cannot be '0' when 'all' is set to 'false'",
			}
		}
		changeList, err = stringArrayToUUIDs(cuid)
		if err != nil {
			log.Error(err)
			return nil, status.Errorf(codes.Aborted, "%v", err)
		}
	}

	changes := make([]*ppb.Change, len(changeList))
	for i, ch := range changeList {
		c, err := pnd.GetChange(ch)
		if err != nil {
			log.Error(err)
			return nil, status.Errorf(codes.Aborted, "%v", err)
		}
		diff, err := ygot.Diff(c.PreviousState(), c.IntendedState())
		if err != nil {
			log.Error(err)
			return nil, status.Errorf(codes.Aborted, "%v", err)
		}

		changes[i] = &ppb.Change{
			Id:    ch.String(),
			Age:   c.Age().Microseconds(),
			State: c.State(),
			Diff:  diff,
		}
	}
	return changes, nil
}

// SetOndList updates the list of onds
func (p PndServer) SetOndList(ctx context.Context, request *ppb.SetOndListRequest) (*ppb.SetOndListResponse, error) {
	labels := prometheus.Labels{"service": "pnd", "rpc": "set"}
	start := metrics.StartHook(labels, grpcRequestsTotal)
	defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
	pid, err := uuid.Parse(request.Pid)
	if err != nil {
		return nil, handleRPCError(labels, err)
	}

	pnd, err := p.pndStore.Get(store.Query{ID: pid})
	if err != nil {
		return nil, handleRPCError(labels, err)
	}

	deviceIDs := make([]uuid.UUID, 0, len(request.Ond))
	for _, r := range request.Ond {
		sid, err := uuid.Parse(r.Sbi.Id)
		if err != nil {
			log.Error(err)
			return nil, status.Errorf(codes.Aborted, "%v", err)
		}
		did, err := pnd.AddDevice(r.DeviceName, r.TransportOption, sid)
		if err != nil {
			log.Error(err)
			return nil, status.Errorf(codes.Aborted, "%v", err)
		}
		deviceIDs = append(deviceIDs, did)
	}

	r := make([]*ppb.SetResponse, len(deviceIDs))
	for i, did := range deviceIDs {
		r[i] = &ppb.SetResponse{Id: did.String(), Status: ppb.Status_STATUS_OK}
	}

	return &ppb.SetOndListResponse{
		Timestamp: time.Now().UnixNano(),
		Status:    ppb.Status_STATUS_OK,
		Responses: r,
	}, nil
}

// SetChangeList sets a list of changes
func (p PndServer) SetChangeList(ctx context.Context, request *ppb.SetChangeListRequest) (*ppb.SetChangeListResponse, error) {
	labels := prometheus.Labels{"service": "pnd", "rpc": "set"}
	start := metrics.StartHook(labels, grpcRequestsTotal)
	defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
	pid, err := uuid.Parse(request.Pid)
	if err != nil {
		return nil, handleRPCError(labels, err)
	}

	pnd, err := p.pndStore.Get(store.Query{ID: pid})
	if err != nil {
		return nil, handleRPCError(labels, err)
	}

	responses := make([]*ppb.SetResponse, len(request.Change))

	for i, r := range request.Change {
		cuid, err := uuid.Parse(r.Cuid)
		if err != nil {
			log.Error(err)
			return nil, status.Errorf(codes.Aborted, "%v", err)
		}
		switch r.Op {
		case ppb.Operation_OPERATION_COMMIT:
			if err := pnd.Commit(cuid); err != nil {
				log.Error(err)
				return nil, status.Errorf(codes.Aborted, "%v", err)
			}
		case ppb.Operation_OPERATION_CONFIRM:
			if err := pnd.Confirm(cuid); err != nil {
				log.Error(err)
				return nil, status.Errorf(codes.Aborted, "%v", err)
			}
		default:
			return nil, &errors.ErrInvalidParameters{
				Param: r.Op,
			}
		}

		responses[i] = &ppb.SetResponse{
			Id:     cuid.String(),
			Status: ppb.Status_STATUS_OK,
		}
	}
	return &ppb.SetChangeListResponse{
		Timestamp: time.Now().UnixNano(),
		Status:    ppb.Status_STATUS_OK,
		Responses: responses,
	}, nil
}

// SetPathList sets a list of paths
func (p PndServer) SetPathList(ctx context.Context, request *ppb.SetPathListRequest) (*ppb.SetPathListResponse, error) {
	labels := prometheus.Labels{"service": "pnd", "rpc": "set"}
	start := metrics.StartHook(labels, grpcRequestsTotal)
	defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
	pid, err := uuid.Parse(request.Pid)
	if err != nil {
		return nil, handleRPCError(labels, err)
	}

	pnd, err := p.pndStore.Get(store.Query{ID: pid})
	if err != nil {
		return nil, handleRPCError(labels, err)
	}

	responses := make([]*ppb.SetResponse, len(request.ChangeRequest))

	for i, r := range request.ChangeRequest {
		did, err := uuid.Parse(r.Did)
		if err != nil {
			log.Error(err)
			return nil, status.Errorf(codes.Aborted, "%v", err)
		}
		cid, err := pnd.ChangeOND(did, r.ApiOp, r.Path, r.Value)
		if err != nil {
			log.Error(err)
			return nil, status.Errorf(codes.Aborted, "%v", err)
		}

		responses[i] = &ppb.SetResponse{
			Status: ppb.Status_STATUS_OK,
			Id:     cid.String(),
		}
	}
	return &ppb.SetPathListResponse{
		Timestamp: time.Now().UnixNano(),
		Status:    ppb.Status_STATUS_OK,
		Responses: responses,
	}, nil
}

// SetSbiList sets a list of sbis
func (p PndServer) SetSbiList(ctx context.Context, request *ppb.SetSbiListRequest) (*ppb.SetSbiListResponse, error) {
	labels := prometheus.Labels{"service": "pnd", "rpc": "set"}
	start := metrics.StartHook(labels, grpcRequestsTotal)
	defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
	pid, err := uuid.Parse(request.Pid)
	if err != nil {
		return nil, handleRPCError(labels, err)
	}

	pnd, err := p.pndStore.Get(store.Query{ID: pid})
	if err != nil {
		return nil, handleRPCError(labels, err)
	}

	for _, r := range request.Sbi {
		sbiType := filterSbiType(r.SbiType)
		sbi, err := nucleus.NewSBI(sbiType)
		if err != nil {
			return nil, handleRPCError(labels, err)
		}

		err = pnd.AddSbi(sbi)
		if err != nil {
			return nil, handleRPCError(labels, err)
		}
	}

	return &ppb.SetSbiListResponse{
		Timestamp: time.Now().UnixNano(),
		Status:    ppb.Status_STATUS_OK,
		Responses: []*ppb.SetResponse{
			{
				Status: ppb.Status_STATUS_OK,
			},
		},
	}, nil
}

func filterSbiType(sbiType ppb.SbiType) spb.Type {
	var spbType spb.Type

	switch sbiType {
	case ppb.SbiType_SBI_TYPE_OPENCONFIG:
		spbType = spb.Type_TYPE_OPENCONFIG
	case ppb.SbiType_SBI_TYPE_CONTAINERISED:
		spbType = spb.Type_TYPE_CONTAINERISED
	case ppb.SbiType_SBI_TYPE_PLUGIN:
		spbType = spb.Type_TYPE_PLUGIN
	default:
		spbType = spb.Type_TYPE_UNSPECIFIED
	}

	return spbType
}

// DeleteOnd deletes a ond
func (p PndServer) DeleteOnd(ctx context.Context, request *ppb.DeleteOndRequest) (*ppb.DeleteOndResponse, error) {
	pid, err := uuid.Parse(request.Pid)
	if err != nil {
		log.Error(err)
		return nil, status.Errorf(codes.Aborted, "%v", err)
	}
	pnd, err := p.pndStore.Get(store.Query{ID: pid})
	if err != nil {
		log.Error(err)
		return nil, status.Errorf(codes.Aborted, "%v", err)
	}

	did, err := uuid.Parse(request.Did)
	if err != nil {
		log.Error(err)
		return nil, status.Errorf(codes.Aborted, "%v", err)
	}
	if err := pnd.RemoveDevice(did); err != nil {
		log.Error(err)
		return nil, status.Errorf(codes.Aborted, "%v", err)
	}
	return &ppb.DeleteOndResponse{
		Timestamp: time.Now().UnixNano(),
		Status:    ppb.Status_STATUS_OK,
	}, nil
}

// SubscribePath subscribes to specifc paths of an ond
func (p PndServer) SubscribePath(ctx context.Context, request *ppb.SubscribePathRequest, stream ppb.PndService_SubscribePathServer) error {
	labels := prometheus.Labels{"service": "pnd", "rpc": "subscribe path"}
	start := metrics.StartHook(labels, grpcRequestsTotal)
	defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)

	pid, err := uuid.Parse(request.Pid)
	if err != nil {
		return handleRPCError(labels, err)
	}

	pnd, err := p.pndStore.Get(store.Query{ID: pid})
	if err != nil {
		log.Error(err)
		return handleRPCError(labels, err)
	}

	did, err := uuid.Parse(request.Did)
	if err != nil {
		log.Error(err)
		return handleRPCError(labels, err)
	}

	if err := pnd.SubscribePath(did, request.Sublist); err != nil {
		log.Error(err)
		return handleRPCError(labels, err)
	}

	return nil
}