Newer
Older
ppb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/pnd"
spb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/southbound"
Fabian Seidl
committed
"code.fbi.h-da.de/danet/gosdn/controller/customerrs"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/device"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/networkdomain"
"code.fbi.h-da.de/danet/gosdn/controller/metrics"
"code.fbi.h-da.de/danet/gosdn/controller/nucleus"
"code.fbi.h-da.de/danet/gosdn/controller/store"
Malte Bauch
committed
"github.com/openconfig/gnmi/proto/gnmi"
Malte Bauch
committed
"github.com/openconfig/ygot/ytypes"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
// PndServer implements a pnd server.
type PndServer struct {
pndStore networkdomain.PndStore
// NewPndServer receives a pndStore and returns a new pndServer.
func NewPndServer(pndStore networkdomain.PndStore) *PndServer {
return &PndServer{
pndStore: pndStore,
}
}
// GetOnd gets a specific ond.
func (p PndServer) GetOnd(ctx context.Context, request *ppb.GetOndRequest) (*ppb.GetOndResponse, error) {
labels := prometheus.Labels{"service": "pnd", "rpc": "get"}
start := metrics.StartHook(labels, grpcRequestsTotal)
defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
pid, err := uuid.Parse(request.Pid)
if err != nil {
pnd, err := p.pndStore.Get(store.Query{ID: pid})
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
Malte Bauch
committed
device, err := pnd.GetDevice(request.Did)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
Malte Bauch
committed
ond, err := fillOndBySpecificPath(device, "/")
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
Malte Bauch
committed
Timestamp: time.Now().UnixNano(),
Pnd: &ppb.PrincipalNetworkDomain{
Id: pnd.ID().String(),
Name: pnd.GetName(),
Description: pnd.GetDescription(),
},
Malte Bauch
committed
Ond: ond,
// GetOndList returns a list of existing onds.
func (p PndServer) GetOndList(ctx context.Context, request *ppb.GetOndListRequest) (*ppb.GetOndListResponse, error) {
labels := prometheus.Labels{"service": "pnd", "rpc": "get"}
start := metrics.StartHook(labels, grpcRequestsTotal)
defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
pid, err := uuid.Parse(request.Pid)
if err != nil {
return nil, handleRPCError(labels, err)
}
pnd, err := p.pndStore.Get(store.Query{ID: pid})
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
Malte Bauch
committed
onds := make([]*ppb.OrchestratedNetworkingDevice, len(pnd.Devices()))
Malte Bauch
committed
for i, device := range pnd.Devices() {
ond, err := fillOndBySpecificPath(device, "/")
Malte Bauch
committed
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
onds[i] = ond
Malte Bauch
committed
Pnd: &ppb.PrincipalNetworkDomain{
Id: pnd.ID().String(),
Name: pnd.GetName(),
Description: pnd.GetDescription(),
},
Ond: onds,
// GetFlattenedOndList returns a list of existing onds.
func (p PndServer) GetFlattenedOndList(ctx context.Context, request *ppb.GetOndListRequest) (*ppb.GetFlattenedOndListResponse, error) {
labels := prometheus.Labels{"service": "pnd", "rpc": "get"}
start := metrics.StartHook(labels, grpcRequestsTotal)
defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
pid, err := uuid.Parse(request.Pid)
if err != nil {
return nil, handleRPCError(labels, err)
}
pnd, err := p.pndStore.Get(store.Query{ID: pid})
Malte Bauch
committed
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
Malte Bauch
committed
onds := pnd.FlattenedDevices()
flattenedOnds := make([]*ppb.FlattenedOrchestratedNetworkingDevice, len(onds))
for i, ond := range onds {
ond := &ppb.FlattenedOrchestratedNetworkingDevice{
Id: ond.ID,
Name: ond.Name,
Sbi: ond.SBI,
}
flattenedOnds[i] = ond
}
return &ppb.GetFlattenedOndListResponse{
Timestamp: time.Now().UnixNano(),
Pnd: &ppb.PrincipalNetworkDomain{
Id: pnd.ID().String(),
Name: pnd.GetName(),
Description: pnd.GetDescription(),
},
Ond: flattenedOnds,
}, nil
}
Malte Bauch
committed
func fillOndBySpecificPath(d device.Device, path string) (*ppb.OrchestratedNetworkingDevice, error) {
Malte Bauch
committed
gnmiPath, err := ygot.StringToStructuredPath(path)
Malte Bauch
committed
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
Malte Bauch
committed
opts := []ytypes.GetNodeOpt{
&ytypes.GetHandleWildcards{},
&ytypes.GetPartialKeyMatch{},
}
nodes, err := ytypes.GetNode(d.SBI().Schema().RootSchema(), d.GetModel(), gnmiPath, opts...)
Malte Bauch
committed
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
Malte Bauch
committed
devices := make([]*gnmi.Notification, len(nodes))
for i, node := range nodes {
dev, err := genGnmiNotification(gnmiPath, node.Data)
Malte Bauch
committed
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
Malte Bauch
committed
devices[i] = dev
Malte Bauch
committed
}
sbi := spb.SouthboundInterface{}
if d.SBI() != nil {
sbi.Id = d.SBI().ID().String()
sbi.Type = d.SBI().Type()
}
ond := &ppb.OrchestratedNetworkingDevice{
Id: d.ID().String(),
Name: d.Name(),
Malte Bauch
committed
Device: devices,
Malte Bauch
committed
Sbi: &sbi,
}
return ond, nil
}
Malte Bauch
committed
func genGnmiNotification(path *gnmi.Path, val any) (*gnmi.Notification, error) {
typedVal, err := ygot.EncodeTypedValue(val, gnmi.Encoding_JSON_IETF)
Malte Bauch
committed
if err != nil {
return nil, err
}
Malte Bauch
committed
return &gnmi.Notification{
Timestamp: time.Now().UnixNano(),
Update: []*gnmi.Update{
{
Path: &gnmi.Path{
Elem: path.GetElem(),
Malte Bauch
committed
},
Malte Bauch
committed
Val: typedVal,
Malte Bauch
committed
},
},
}, nil
}
// GetSbi gets a specific sbi.
func (p PndServer) GetSbi(ctx context.Context, request *ppb.GetSbiRequest) (*ppb.GetSbiResponse, error) {
labels := prometheus.Labels{"service": "pnd", "rpc": "get"}
start := metrics.StartHook(labels, grpcRequestsTotal)
defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
pid, err := uuid.Parse(request.Pid)
pnd, err := p.pndStore.Get(store.Query{ID: pid})
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
Fabian Seidl
committed
sbiID, err := uuid.Parse(request.Sid)
if err != nil {
return nil, err
}
sbi, err := pnd.GetSBI(sbiID)
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
Pnd: &ppb.PrincipalNetworkDomain{
Id: pnd.ID().String(),
Name: pnd.GetName(),
Description: pnd.GetDescription(),
},
Fabian Seidl
committed
Sbi: &spb.SouthboundInterface{
Id: sbiID.String(),
Type: sbi.Type(),
},
// GetSbiList gets all existing sbis.
func (p PndServer) GetSbiList(ctx context.Context, request *ppb.GetSbiListRequest) (*ppb.GetSbiListResponse, error) {
labels := prometheus.Labels{"service": "pnd", "rpc": "get"}
start := metrics.StartHook(labels, grpcRequestsTotal)
defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
pid, err := uuid.Parse(request.Pid)
pnd, err := p.pndStore.Get(store.Query{ID: pid})
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
Fabian Seidl
committed
sbis, err := fillSbis(pnd)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
Pnd: &ppb.PrincipalNetworkDomain{
Id: pnd.ID().String(),
Name: pnd.GetName(),
Description: pnd.GetDescription(),
},
Sbi: sbis,
Fabian Seidl
committed
func fillSbis(pnd networkdomain.NetworkDomain) ([]*spb.SouthboundInterface, error) {
sbis, err := pnd.GetSBIs()
if err != nil {
return nil, err
sbisToReturn := []*spb.SouthboundInterface{}
for _, sbi := range sbis {
sbisToReturn = append(sbisToReturn, &spb.SouthboundInterface{
Id: sbi.ID().String(),
})
return sbisToReturn, nil
Fabian Seidl
committed
func stringArrayToUUIDs(sid []string) ([]uuid.UUID, error) {
UUIDs := make([]uuid.UUID, len(sid))
for i, id := range sid {
parsed, err := uuid.Parse(id)
if err != nil {
return nil, status.Errorf(codes.Aborted, "%v", err)
UUIDs[i] = parsed
}
return UUIDs, nil
}
// GetPath gets a path on a ond.
func (p PndServer) GetPath(ctx context.Context, request *ppb.GetPathRequest) (*ppb.GetPathResponse, error) {
labels := prometheus.Labels{"service": "pnd", "rpc": "get"}
start := metrics.StartHook(labels, grpcRequestsTotal)
defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
pid, err := uuid.Parse(request.Pid)
if err != nil {
return nil, handleRPCError(labels, err)
pnd, err := p.pndStore.Get(store.Query{ID: pid})
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
device, err := pnd.GetDevice(request.Did)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
duid, err := uuid.Parse(request.Did)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
// In case we get the path from grpc-gateway we have to replace
path := strings.ReplaceAll(request.Path, "||", "/")
_, err = pnd.Request(duid, path)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
Malte Bauch
committed
Malte Bauch
committed
ond, err := fillOndBySpecificPath(device, path)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
Malte Bauch
committed
return &ppb.GetPathResponse{
Timestamp: time.Now().UnixNano(),
Pnd: &ppb.PrincipalNetworkDomain{
Id: pnd.ID().String(),
Name: pnd.GetName(),
Description: pnd.GetDescription(),
},
Malte Bauch
committed
Device: ond.Device,
// GetChange gets a specific change of a ond.
func (p PndServer) GetChange(ctx context.Context, request *ppb.GetChangeRequest) (*ppb.GetChangeResponse, error) {
labels := prometheus.Labels{"service": "pnd", "rpc": "get"}
start := metrics.StartHook(labels, grpcRequestsTotal)
defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
pid, err := uuid.Parse(request.Pid)
if err != nil {
return nil, handleRPCError(labels, err)
}
pnd, err := p.pndStore.Get(store.Query{ID: pid})
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
changes, err := fillChanges(pnd, false, request.Cuid...)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
return &ppb.GetChangeResponse{
Timestamp: time.Now().UnixNano(),
Pnd: &ppb.PrincipalNetworkDomain{
Id: pnd.ID().String(),
Name: pnd.GetName(),
Description: pnd.GetDescription(),
},
Change: changes,
}, nil
}
// GetChangeList gets all existing changes.
func (p PndServer) GetChangeList(ctx context.Context, request *ppb.GetChangeListRequest) (*ppb.GetChangeListResponse, error) {
labels := prometheus.Labels{"service": "pnd", "rpc": "get"}
start := metrics.StartHook(labels, grpcRequestsTotal)
defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
pid, err := uuid.Parse(request.Pid)
if err != nil {
return nil, handleRPCError(labels, err)
}
pnd, err := p.pndStore.Get(store.Query{ID: pid})
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
changes, err := fillChanges(pnd, true, "")
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
return &ppb.GetChangeListResponse{
Timestamp: time.Now().UnixNano(),
Pnd: &ppb.PrincipalNetworkDomain{
Id: pnd.ID().String(),
Name: pnd.GetName(),
Description: pnd.GetDescription(),
},
Change: changes,
}, nil
func fillChanges(pnd networkdomain.NetworkDomain, all bool, cuid ...string) ([]*ppb.Change, error) {
var changeList []uuid.UUID
switch all {
case true:
changeList = pnd.PendingChanges()
changeList = append(changeList, pnd.CommittedChanges()...)
default:
var err error
if len(cuid) == 0 {
Fabian Seidl
committed
return nil, &customerrs.InvalidParametersError{
Malte Bauch
committed
Func: fillChanges,
Param: "length of 'did' cannot be '0' when 'all' is set to 'false'",
}
}
Fabian Seidl
committed
changeList, err = stringArrayToUUIDs(cuid)
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
}
changes := make([]*ppb.Change, len(changeList))
for i, ch := range changeList {
c, err := pnd.GetChange(ch)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
diff, err := ygot.Diff(c.PreviousState(), c.IntendedState())
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
Age: c.Age().Microseconds(),
State: c.State(),
// SetOndList updates the list of onds.
func (p PndServer) SetOndList(ctx context.Context, request *ppb.SetOndListRequest) (*ppb.SetOndListResponse, error) {
labels := prometheus.Labels{"service": "pnd", "rpc": "set"}
start := metrics.StartHook(labels, grpcRequestsTotal)
defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
pid, err := uuid.Parse(request.Pid)
if err != nil {
pnd, err := p.pndStore.Get(store.Query{ID: pid})
deviceIDs := make([]uuid.UUID, 0, len(request.Ond))
sid, err := uuid.Parse(r.Sbi.Id)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
did, err := pnd.AddDevice(r.DeviceName, r.TransportOption, sid)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
deviceIDs = append(deviceIDs, did)
r := make([]*ppb.SetResponse, len(deviceIDs))
for i, did := range deviceIDs {
r[i] = &ppb.SetResponse{Id: did.String(), Status: ppb.Status_STATUS_OK}
}
// SetChangeList sets a list of changes.
func (p PndServer) SetChangeList(ctx context.Context, request *ppb.SetChangeListRequest) (*ppb.SetChangeListResponse, error) {
labels := prometheus.Labels{"service": "pnd", "rpc": "set"}
start := metrics.StartHook(labels, grpcRequestsTotal)
defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
pid, err := uuid.Parse(request.Pid)
if err != nil {
return nil, handleRPCError(labels, err)
}
pnd, err := p.pndStore.Get(store.Query{ID: pid})
if err != nil {
return nil, handleRPCError(labels, err)
}
responses := make([]*ppb.SetResponse, len(request.Change))
for i, r := range request.Change {
cuid, err := uuid.Parse(r.Cuid)
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
Fabian Seidl
committed
return nil, &customerrs.InvalidParametersError{
responses[i] = &ppb.SetResponse{
Id: cuid.String(),
Status: ppb.Status_STATUS_OK,
}
Responses: responses,
// SetPathList sets a list of paths.
func (p PndServer) SetPathList(ctx context.Context, request *ppb.SetPathListRequest) (*ppb.SetPathListResponse, error) {
labels := prometheus.Labels{"service": "pnd", "rpc": "set"}
start := metrics.StartHook(labels, grpcRequestsTotal)
defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
pid, err := uuid.Parse(request.Pid)
if err != nil {
return nil, handleRPCError(labels, err)
}
pnd, err := p.pndStore.Get(store.Query{ID: pid})
if err != nil {
return nil, handleRPCError(labels, err)
}
responses := make([]*ppb.SetResponse, len(request.ChangeRequest))
for i, r := range request.ChangeRequest {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
cid, err := pnd.ChangeOND(did, r.ApiOp, r.Path, r.Value)
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
responses[i] = &ppb.SetResponse{
Status: ppb.Status_STATUS_OK,
Id: cid.String(),
}
Responses: responses,
// SetSbiList sets a list of sbis.
func (p PndServer) SetSbiList(ctx context.Context, request *ppb.SetSbiListRequest) (*ppb.SetSbiListResponse, error) {
labels := prometheus.Labels{"service": "pnd", "rpc": "set"}
start := metrics.StartHook(labels, grpcRequestsTotal)
defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
pid, err := uuid.Parse(request.Pid)
if err != nil {
return nil, handleRPCError(labels, err)
}
pnd, err := p.pndStore.Get(store.Query{ID: pid})
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
if err != nil {
return nil, handleRPCError(labels, err)
}
for _, r := range request.Sbi {
sbiType := filterSbiType(r.SbiType)
sbi, err := nucleus.NewSBI(sbiType)
if err != nil {
return nil, handleRPCError(labels, err)
}
err = pnd.AddSbi(sbi)
if err != nil {
return nil, handleRPCError(labels, err)
}
}
return &ppb.SetSbiListResponse{
Timestamp: time.Now().UnixNano(),
Status: ppb.Status_STATUS_OK,
Responses: []*ppb.SetResponse{
{
Status: ppb.Status_STATUS_OK,
},
},
}, nil
}
func filterSbiType(sbiType ppb.SbiType) spb.Type {
var spbType spb.Type
switch sbiType {
case ppb.SbiType_SBI_TYPE_OPENCONFIG:
spbType = spb.Type_TYPE_OPENCONFIG
case ppb.SbiType_SBI_TYPE_CONTAINERISED:
spbType = spb.Type_TYPE_CONTAINERISED
case ppb.SbiType_SBI_TYPE_PLUGIN:
spbType = spb.Type_TYPE_PLUGIN
default:
spbType = spb.Type_TYPE_UNSPECIFIED
}
return spbType
}
// DeleteOnd deletes a ond.
func (p PndServer) DeleteOnd(ctx context.Context, request *ppb.DeleteOndRequest) (*ppb.DeleteOndResponse, error) {
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
pnd, err := p.pndStore.Get(store.Query{ID: pid})
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
if err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
if err := pnd.RemoveDevice(did); err != nil {
log.Error(err)
return nil, status.Errorf(codes.Aborted, "%v", err)
}
// SubscribePath subscribes to specifc paths of an ond.
func (p PndServer) SubscribePath(request *ppb.SubscribePathRequest, stream ppb.PndService_SubscribePathServer) error {
pid, err := uuid.Parse(request.Pid)
if err != nil {
return err
}
pnd, err := p.pndStore.Get(store.Query{ID: pid})
if err != nil {
return err
}
did, err := uuid.Parse(request.Did)
if err != nil {
return err
}
if err := pnd.SubscribePath(did, request.Sublist); err != nil {
return err
}
return nil
}