Skip to content
Snippets Groups Projects
Commit d8096bdd authored by Fabian Seidl's avatar Fabian Seidl
Browse files

subscribe impl, WIP

parent b6419824
No related branches found
No related tags found
3 merge requests!376Add additional example application hostname-checker,!343Add basic application framework and example application to show interaction between events an NBI,!342Resolve "Add an option to send gNMI Subscribe requests via SBI"
Pipeline #107336 failed
This commit is part of merge request !342. Comments created here will be created in the context of that merge request.
...@@ -34,4 +34,5 @@ type NetworkDomain interface { ...@@ -34,4 +34,5 @@ type NetworkDomain interface {
GetChange(uuid.UUID) (change.Change, error) GetChange(uuid.UUID) (change.Change, error)
Commit(uuid.UUID) error Commit(uuid.UUID) error
Confirm(uuid.UUID) error Confirm(uuid.UUID) error
SubscribePath(uuid.UUID, *ppb.SubscriptionList) error
} }
...@@ -652,3 +652,34 @@ func (p PndServer) DeleteOnd(ctx context.Context, request *ppb.DeleteOndRequest) ...@@ -652,3 +652,34 @@ func (p PndServer) DeleteOnd(ctx context.Context, request *ppb.DeleteOndRequest)
Status: ppb.Status_STATUS_OK, Status: ppb.Status_STATUS_OK,
}, nil }, nil
} }
// SubscribePath subscribes to specifc paths of an ond
func (p PndServer) SubscribePath(ctx context.Context, request *ppb.SubscribePathRequest, stream ppb.PndService_SubscribePathServer) error {
labels := prometheus.Labels{"service": "pnd", "rpc": "subscribe path"}
start := metrics.StartHook(labels, grpcRequestsTotal)
defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
pid, err := uuid.Parse(request.Pid)
if err != nil {
return handleRPCError(labels, err)
}
pnd, err := p.pndStore.Get(store.Query{ID: pid})
if err != nil {
log.Error(err)
return handleRPCError(labels, err)
}
did, err := uuid.Parse(request.Did)
if err != nil {
log.Error(err)
return handleRPCError(labels, err)
}
if err := pnd.SubscribePath(did, request.Sublist); err != nil {
log.Error(err)
return handleRPCError(labels, err)
}
return nil
}
...@@ -32,6 +32,7 @@ import ( ...@@ -32,6 +32,7 @@ import (
"code.fbi.h-da.de/danet/gosdn/controller/store" "code.fbi.h-da.de/danet/gosdn/controller/store"
"code.fbi.h-da.de/danet/gosdn/forks/goarista/gnmi"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/openconfig/ygot/ygot" "github.com/openconfig/ygot/ygot"
"github.com/openconfig/ygot/ytypes" "github.com/openconfig/ygot/ytypes"
...@@ -503,6 +504,37 @@ func (pnd *pndImplementation) ChangeOND(duid uuid.UUID, operation ppb.ApiOperati ...@@ -503,6 +504,37 @@ func (pnd *pndImplementation) ChangeOND(duid uuid.UUID, operation ppb.ApiOperati
return ch.cuid, nil return ch.cuid, nil
} }
func (pnd *pndImplementation) SubscribePath(uuid uuid.UUID, subList *ppb.SubscriptionList) error {
d, err := pnd.deviceService.Get(store.Query{
ID: uuid,
})
if err != nil {
return err
}
//TODO(faseid): add more from params of subList or change proto to only have path
// TODO(faseid): missing target address!
// TODO: cehck gnmi client.go NewSubscribeRequest forks/go/arista/...
for _, sub := range subList.Subscription {
opts := &gnmi.SubscribeOptions{
Paths: [][]string{{sub.Path}},
StreamMode: sub.GetStreamMode().String(),
SampleInterval: sub.SampleInterval,
}
ctx := context.Background()
ctx = context.WithValue(ctx, types.CtxKeyOpts, opts)
err := d.Transport().Subscribe(ctx, sub.Path)
if err != nil {
return err
}
}
return nil
}
//nolint //nolint
// handleRollbackError will be implemented in the near future // handleRollbackError will be implemented in the near future
func handleRollbackError(id uuid.UUID, err error) { func handleRollbackError(id uuid.UUID, err error) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment