-
Fabian Seidl authoredFabian Seidl authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
pnd.go 18.74 KiB
package server
import (
"context"
"fmt"
"strings"
"time"
ppb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/pnd"
spb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/southbound"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/networkdomain"
"code.fbi.h-da.de/danet/gosdn/controller/metrics"
"code.fbi.h-da.de/danet/gosdn/controller/nucleus"
"code.fbi.h-da.de/danet/gosdn/controller/nucleus/errors"
"code.fbi.h-da.de/danet/gosdn/controller/store"
"github.com/google/uuid"
"github.com/openconfig/gnmi/proto/gnmi"
"github.com/openconfig/ygot/ygot"
"github.com/openconfig/ygot/ytypes"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// PndServer implements a pnd server
type PndServer struct {
ppb.UnimplementedPndServiceServer
pndStore networkdomain.PndStore
}
// NewPndServer receives a pndStore and returns a new pndServer.
func NewPndServer(pndStore networkdomain.PndStore) *PndServer {
return &PndServer{
pndStore: pndStore,
}
}
// GetOnd gets a specific ond
func (p PndServer) GetOnd(ctx context.Context, request *ppb.GetOndRequest) (*ppb.GetOndResponse, error) {
labels := prometheus.Labels{"service": "pnd", "rpc": "get"}
start := metrics.StartHook(labels, grpcRequestsTotal)
defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
pid, err := uuid.Parse(request.Pid)
if err != nil {
return nil, handleRPCError(labels, err)
}
pnd, err := p.pndStore.Get(store.Query{ID: pid})
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
ond, err := fillOndBySpecificPath(pnd, request.Did, "/")
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
return &ppb.GetOndResponse{
Timestamp: time.Now().UnixNano(),
Pnd: &ppb.PrincipalNetworkDomain{
Id: pnd.ID().String(),
Name: pnd.GetName(),
Description: pnd.GetDescription(),
},
Ond: ond,
}, nil
}
// GetOndList returns a list of existing onds
func (p PndServer) GetOndList(ctx context.Context, request *ppb.GetOndListRequest) (*ppb.GetOndListResponse, error) {
labels := prometheus.Labels{"service": "pnd", "rpc": "get"}
start := metrics.StartHook(labels, grpcRequestsTotal)
defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
pid, err := uuid.Parse(request.Pid)
if err != nil {
return nil, handleRPCError(labels, err)
}
pnd, err := p.pndStore.Get(store.Query{ID: pid})
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
onds := make([]*ppb.OrchestratedNetworkingDevice, len(pnd.Devices()))
for i, ond := range pnd.Devices() {
ond, err := fillOndBySpecificPath(pnd, ond.ID().String(), "/")
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
onds[i] = ond
}
return &ppb.GetOndListResponse{
Timestamp: time.Now().UnixNano(),
Pnd: &ppb.PrincipalNetworkDomain{
Id: pnd.ID().String(),
Name: pnd.GetName(),
Description: pnd.GetDescription(),
},
Ond: onds,
}, nil
}
func fillOndBySpecificPath(pnd networkdomain.NetworkDomain, did string, path string) (*ppb.OrchestratedNetworkingDevice, error) {
d, err := pnd.GetDevice(did)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
gnmiPath, err := ygot.StringToStructuredPath(path)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
opts := []ytypes.GetNodeOpt{
&ytypes.GetHandleWildcards{},
&ytypes.GetPartialKeyMatch{},
}
nodes, err := ytypes.GetNode(d.SBI().Schema().RootSchema(), d.GetModel(), gnmiPath, opts...)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
devices := make([]*gnmi.Notification, len(nodes))
for i, node := range nodes {
dev, err := genGnmiNotification(gnmiPath, node.Data)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
devices[i] = dev
}
sbi := spb.SouthboundInterface{}
if d.SBI() != nil {
sbi.Id = d.SBI().ID().String()
sbi.Type = d.SBI().Type()
}
ond := &ppb.OrchestratedNetworkingDevice{
Id: d.ID().String(),
Name: d.Name(),
Device: devices,
Sbi: &sbi,
}
return ond, nil
}
func genGnmiNotification(path *gnmi.Path, val any) (*gnmi.Notification, error) {
typedVal, err := ygot.EncodeTypedValue(val, gnmi.Encoding_JSON_IETF)
if err != nil {
return nil, err
}
return &gnmi.Notification{
Timestamp: time.Now().UnixNano(),
Update: []*gnmi.Update{
{
Path: &gnmi.Path{
Elem: path.GetElem(),
},
Val: typedVal,
},
},
}, nil
}
// GetSbi gets a specific sbi
func (p PndServer) GetSbi(ctx context.Context, request *ppb.GetSbiRequest) (*ppb.GetSbiResponse, error) {
labels := prometheus.Labels{"service": "pnd", "rpc": "get"}
start := metrics.StartHook(labels, grpcRequestsTotal)
defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
pid, err := uuid.Parse(request.Pid)
if err != nil {
return nil, handleRPCError(labels, err)
}
pnd, err := p.pndStore.Get(store.Query{ID: pid})
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
sbiID, err := uuid.Parse(request.Sid)
if err != nil {
return nil, err
}
sbi, err := pnd.GetSBI(sbiID)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
return &ppb.GetSbiResponse{
Timestamp: time.Now().UnixNano(),
Pnd: &ppb.PrincipalNetworkDomain{
Id: pnd.ID().String(),
Name: pnd.GetName(),
Description: pnd.GetDescription(),
},
Sbi: &spb.SouthboundInterface{
Id: sbiID.String(),
Type: sbi.Type(),
},
}, nil
}
// GetSbiList gets all existing sbis
func (p PndServer) GetSbiList(ctx context.Context, request *ppb.GetSbiListRequest) (*ppb.GetSbiListResponse, error) {
labels := prometheus.Labels{"service": "pnd", "rpc": "get"}
start := metrics.StartHook(labels, grpcRequestsTotal)
defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
pid, err := uuid.Parse(request.Pid)
if err != nil {
return nil, handleRPCError(labels, err)
}
pnd, err := p.pndStore.Get(store.Query{ID: pid})
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
sbis, err := fillSbis(pnd)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
return &ppb.GetSbiListResponse{
Timestamp: time.Now().UnixNano(),
Pnd: &ppb.PrincipalNetworkDomain{
Id: pnd.ID().String(),
Name: pnd.GetName(),
Description: pnd.GetDescription(),
},
Sbi: sbis,
}, nil
}
func fillSbis(pnd networkdomain.NetworkDomain) ([]*spb.SouthboundInterface, error) {
sbis, err := pnd.GetSBIs()
if err != nil {
return nil, err
}
fmt.Printf("SBIS: %+v\n", sbis)
sbisToReturn := []*spb.SouthboundInterface{}
for _, sbi := range sbis {
sbisToReturn = append(sbisToReturn, &spb.SouthboundInterface{
Id: sbi.ID().String(),
})
}
fmt.Printf("SBIS: %+v\n", sbisToReturn)
return sbisToReturn, nil
}
func stringArrayToUUIDs(sid []string) ([]uuid.UUID, error) {
UUIDs := make([]uuid.UUID, len(sid))
for i, id := range sid {
parsed, err := uuid.Parse(id)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
UUIDs[i] = parsed
}
return UUIDs, nil
}
// GetPath gets a path on a ond
func (p PndServer) GetPath(ctx context.Context, request *ppb.GetPathRequest) (*ppb.GetPathResponse, error) {
labels := prometheus.Labels{"service": "pnd", "rpc": "get"}
start := metrics.StartHook(labels, grpcRequestsTotal)
defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
pid, err := uuid.Parse(request.Pid)
if err != nil {
return nil, handleRPCError(labels, err)
}
pnd, err := p.pndStore.Get(store.Query{ID: pid})
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
duid, err := uuid.Parse(request.Did)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
// In case we get the path from grpc-gateway we have to replace
path := strings.ReplaceAll(request.Path, "||", "/")
_, err = pnd.Request(duid, path)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
ond, err := fillOndBySpecificPath(pnd, request.Did, path)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
return &ppb.GetPathResponse{
Timestamp: time.Now().UnixNano(),
Pnd: &ppb.PrincipalNetworkDomain{
Id: pnd.ID().String(),
Name: pnd.GetName(),
Description: pnd.GetDescription(),
},
Device: ond.Device,
}, nil
}
// GetChange gets a specific change of a ond
func (p PndServer) GetChange(ctx context.Context, request *ppb.GetChangeRequest) (*ppb.GetChangeResponse, error) {
labels := prometheus.Labels{"service": "pnd", "rpc": "get"}
start := metrics.StartHook(labels, grpcRequestsTotal)
defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
pid, err := uuid.Parse(request.Pid)
if err != nil {
return nil, handleRPCError(labels, err)
}
pnd, err := p.pndStore.Get(store.Query{ID: pid})
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
changes, err := fillChanges(pnd, false, request.Cuid...)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
return &ppb.GetChangeResponse{
Timestamp: time.Now().UnixNano(),
Pnd: &ppb.PrincipalNetworkDomain{
Id: pnd.ID().String(),
Name: pnd.GetName(),
Description: pnd.GetDescription(),
},
Change: changes,
}, nil
}
// GetChangeList gets all existing changes
func (p PndServer) GetChangeList(ctx context.Context, request *ppb.GetChangeListRequest) (*ppb.GetChangeListResponse, error) {
labels := prometheus.Labels{"service": "pnd", "rpc": "get"}
start := metrics.StartHook(labels, grpcRequestsTotal)
defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
pid, err := uuid.Parse(request.Pid)
if err != nil {
return nil, handleRPCError(labels, err)
}
pnd, err := p.pndStore.Get(store.Query{ID: pid})
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
changes, err := fillChanges(pnd, true, "")
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
return &ppb.GetChangeListResponse{
Timestamp: time.Now().UnixNano(),
Pnd: &ppb.PrincipalNetworkDomain{
Id: pnd.ID().String(),
Name: pnd.GetName(),
Description: pnd.GetDescription(),
},
Change: changes,
}, nil
}
func fillChanges(pnd networkdomain.NetworkDomain, all bool, cuid ...string) ([]*ppb.Change, error) {
var changeList []uuid.UUID
switch all {
case true:
changeList = pnd.PendingChanges()
changeList = append(changeList, pnd.CommittedChanges()...)
default:
var err error
if len(cuid) == 0 {
return nil, &errors.ErrInvalidParameters{
Func: fillChanges,
Param: "length of 'did' cannot be '0' when 'all' is set to 'false'",
}
}
changeList, err = stringArrayToUUIDs(cuid)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
}
changes := make([]*ppb.Change, len(changeList))
for i, ch := range changeList {
c, err := pnd.GetChange(ch)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
diff, err := ygot.Diff(c.PreviousState(), c.IntendedState())
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
changes[i] = &ppb.Change{
Id: ch.String(),
Age: c.Age().Microseconds(),
State: c.State(),
Diff: diff,
}
}
return changes, nil
}
// SetOndList updates the list of onds
func (p PndServer) SetOndList(ctx context.Context, request *ppb.SetOndListRequest) (*ppb.SetOndListResponse, error) {
labels := prometheus.Labels{"service": "pnd", "rpc": "set"}
start := metrics.StartHook(labels, grpcRequestsTotal)
defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
pid, err := uuid.Parse(request.Pid)
if err != nil {
return nil, handleRPCError(labels, err)
}
pnd, err := p.pndStore.Get(store.Query{ID: pid})
if err != nil {
return nil, handleRPCError(labels, err)
}
deviceIDs := make([]uuid.UUID, 0, len(request.Ond))
for _, r := range request.Ond {
sid, err := uuid.Parse(r.Sbi.Id)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
did, err := pnd.AddDevice(r.DeviceName, r.TransportOption, sid)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
deviceIDs = append(deviceIDs, did)
}
r := make([]*ppb.SetResponse, len(deviceIDs))
for i, did := range deviceIDs {
r[i] = &ppb.SetResponse{Id: did.String(), Status: ppb.Status_STATUS_OK}
}
return &ppb.SetOndListResponse{
Timestamp: time.Now().UnixNano(),
Status: ppb.Status_STATUS_OK,
Responses: r,
}, nil
}
// SetChangeList sets a list of changes
func (p PndServer) SetChangeList(ctx context.Context, request *ppb.SetChangeListRequest) (*ppb.SetChangeListResponse, error) {
labels := prometheus.Labels{"service": "pnd", "rpc": "set"}
start := metrics.StartHook(labels, grpcRequestsTotal)
defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
pid, err := uuid.Parse(request.Pid)
if err != nil {
return nil, handleRPCError(labels, err)
}
pnd, err := p.pndStore.Get(store.Query{ID: pid})
if err != nil {
return nil, handleRPCError(labels, err)
}
responses := make([]*ppb.SetResponse, len(request.Change))
for i, r := range request.Change {
cuid, err := uuid.Parse(r.Cuid)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
switch r.Op {
case ppb.Operation_OPERATION_COMMIT:
if err := pnd.Commit(cuid); err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
case ppb.Operation_OPERATION_CONFIRM:
if err := pnd.Confirm(cuid); err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
default:
return nil, &errors.ErrInvalidParameters{
Param: r.Op,
}
}
responses[i] = &ppb.SetResponse{
Id: cuid.String(),
Status: ppb.Status_STATUS_OK,
}
}
return &ppb.SetChangeListResponse{
Timestamp: time.Now().UnixNano(),
Status: ppb.Status_STATUS_OK,
Responses: responses,
}, nil
}
// SetPathList sets a list of paths
func (p PndServer) SetPathList(ctx context.Context, request *ppb.SetPathListRequest) (*ppb.SetPathListResponse, error) {
labels := prometheus.Labels{"service": "pnd", "rpc": "set"}
start := metrics.StartHook(labels, grpcRequestsTotal)
defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
pid, err := uuid.Parse(request.Pid)
if err != nil {
return nil, handleRPCError(labels, err)
}
pnd, err := p.pndStore.Get(store.Query{ID: pid})
if err != nil {
return nil, handleRPCError(labels, err)
}
responses := make([]*ppb.SetResponse, len(request.ChangeRequest))
for i, r := range request.ChangeRequest {
did, err := uuid.Parse(r.Did)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
cid, err := pnd.ChangeOND(did, r.ApiOp, r.Path, r.Value)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
responses[i] = &ppb.SetResponse{
Status: ppb.Status_STATUS_OK,
Id: cid.String(),
}
}
return &ppb.SetPathListResponse{
Timestamp: time.Now().UnixNano(),
Status: ppb.Status_STATUS_OK,
Responses: responses,
}, nil
}
// SetSbiList sets a list of sbis
func (p PndServer) SetSbiList(ctx context.Context, request *ppb.SetSbiListRequest) (*ppb.SetSbiListResponse, error) {
labels := prometheus.Labels{"service": "pnd", "rpc": "set"}
start := metrics.StartHook(labels, grpcRequestsTotal)
defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
pid, err := uuid.Parse(request.Pid)
if err != nil {
return nil, handleRPCError(labels, err)
}
pnd, err := p.pndStore.Get(store.Query{ID: pid})
if err != nil {
return nil, handleRPCError(labels, err)
}
for _, r := range request.Sbi {
sbiType := filterSbiType(r.SbiType)
sbi, err := nucleus.NewSBI(sbiType)
if err != nil {
return nil, handleRPCError(labels, err)
}
err = pnd.AddSbi(sbi)
if err != nil {
return nil, handleRPCError(labels, err)
}
}
return &ppb.SetSbiListResponse{
Timestamp: time.Now().UnixNano(),
Status: ppb.Status_STATUS_OK,
Responses: []*ppb.SetResponse{
{
Status: ppb.Status_STATUS_OK,
},
},
}, nil
}
func filterSbiType(sbiType ppb.SbiType) spb.Type {
var spbType spb.Type
switch sbiType {
case ppb.SbiType_SBI_TYPE_OPENCONFIG:
spbType = spb.Type_TYPE_OPENCONFIG
case ppb.SbiType_SBI_TYPE_CONTAINERISED:
spbType = spb.Type_TYPE_CONTAINERISED
case ppb.SbiType_SBI_TYPE_PLUGIN:
spbType = spb.Type_TYPE_PLUGIN
default:
spbType = spb.Type_TYPE_UNSPECIFIED
}
return spbType
}
// DeleteOnd deletes a ond
func (p PndServer) DeleteOnd(ctx context.Context, request *ppb.DeleteOndRequest) (*ppb.DeleteOndResponse, error) {
pid, err := uuid.Parse(request.Pid)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
pnd, err := p.pndStore.Get(store.Query{ID: pid})
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
did, err := uuid.Parse(request.Did)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
if err := pnd.RemoveDevice(did); err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
return &ppb.DeleteOndResponse{
Timestamp: time.Now().UnixNano(),
Status: ppb.Status_STATUS_OK,
}, nil
}
// SubscribePath subscribes to specifc paths of an ond
func (p PndServer) SubscribePath(ctx context.Context, request *ppb.SubscribePathRequest, stream ppb.PndService_SubscribePathServer) error {
labels := prometheus.Labels{"service": "pnd", "rpc": "subscribe path"}
start := metrics.StartHook(labels, grpcRequestsTotal)
defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
pid, err := uuid.Parse(request.Pid)
if err != nil {
return handleRPCError(labels, err)
}
pnd, err := p.pndStore.Get(store.Query{ID: pid})
if err != nil {
log.Error(err)
return handleRPCError(labels, err)
}
did, err := uuid.Parse(request.Did)
if err != nil {
log.Error(err)
return handleRPCError(labels, err)
}
if err := pnd.SubscribePath(did, request.Sublist); err != nil {
log.Error(err)
return handleRPCError(labels, err)
}
return nil
}