diff --git a/controller/interfaces/networkdomain/pnd.go b/controller/interfaces/networkdomain/pnd.go index fc9dec411c54ec8a1877445401bb801f37aec274..9cc5a0285b127527987c3afea30f12d11c18295c 100644 --- a/controller/interfaces/networkdomain/pnd.go +++ b/controller/interfaces/networkdomain/pnd.go @@ -34,4 +34,5 @@ type NetworkDomain interface { GetChange(uuid.UUID) (change.Change, error) Commit(uuid.UUID) error Confirm(uuid.UUID) error + SubscribePath(uuid.UUID, *ppb.SubscriptionList) error } diff --git a/controller/northbound/server/pnd.go b/controller/northbound/server/pnd.go index 5124730e44bda354d81c0433a395783a5221b462..268d2fa6f14a1a0841337abdf14b8417d1d27e1d 100644 --- a/controller/northbound/server/pnd.go +++ b/controller/northbound/server/pnd.go @@ -652,3 +652,34 @@ func (p PndServer) DeleteOnd(ctx context.Context, request *ppb.DeleteOndRequest) Status: ppb.Status_STATUS_OK, }, nil } + +// SubscribePath subscribes to specifc paths of an ond +func (p PndServer) SubscribePath(ctx context.Context, request *ppb.SubscribePathRequest, stream ppb.PndService_SubscribePathServer) error { + labels := prometheus.Labels{"service": "pnd", "rpc": "subscribe path"} + start := metrics.StartHook(labels, grpcRequestsTotal) + defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds) + + pid, err := uuid.Parse(request.Pid) + if err != nil { + return handleRPCError(labels, err) + } + + pnd, err := p.pndStore.Get(store.Query{ID: pid}) + if err != nil { + log.Error(err) + return handleRPCError(labels, err) + } + + did, err := uuid.Parse(request.Did) + if err != nil { + log.Error(err) + return handleRPCError(labels, err) + } + + if err := pnd.SubscribePath(did, request.Sublist); err != nil { + log.Error(err) + return handleRPCError(labels, err) + } + + return nil +} diff --git a/controller/nucleus/principalNetworkDomain.go b/controller/nucleus/principalNetworkDomain.go index ad53c9694b1c85e87f9702cbaed130e2edd278c4..b77652285933942dd8eaa2f9f47d843aa4e30b6d 100644 --- a/controller/nucleus/principalNetworkDomain.go +++ b/controller/nucleus/principalNetworkDomain.go @@ -32,6 +32,7 @@ import ( "code.fbi.h-da.de/danet/gosdn/controller/store" + "code.fbi.h-da.de/danet/gosdn/forks/goarista/gnmi" "github.com/google/uuid" "github.com/openconfig/ygot/ygot" "github.com/openconfig/ygot/ytypes" @@ -503,6 +504,37 @@ func (pnd *pndImplementation) ChangeOND(duid uuid.UUID, operation ppb.ApiOperati return ch.cuid, nil } +func (pnd *pndImplementation) SubscribePath(uuid uuid.UUID, subList *ppb.SubscriptionList) error { + d, err := pnd.deviceService.Get(store.Query{ + ID: uuid, + }) + if err != nil { + return err + } + + //TODO(faseid): add more from params of subList or change proto to only have path + // TODO(faseid): missing target address! + // TODO: cehck gnmi client.go NewSubscribeRequest forks/go/arista/... + for _, sub := range subList.Subscription { + + opts := &gnmi.SubscribeOptions{ + Paths: [][]string{{sub.Path}}, + StreamMode: sub.GetStreamMode().String(), + SampleInterval: sub.SampleInterval, + } + + ctx := context.Background() + ctx = context.WithValue(ctx, types.CtxKeyOpts, opts) + + err := d.Transport().Subscribe(ctx, sub.Path) + if err != nil { + return err + } + } + + return nil +} + //nolint // handleRollbackError will be implemented in the near future func handleRollbackError(id uuid.UUID, err error) {