From d8096bdd90517b45ec2c644dbb6edcfb03f2b1d7 Mon Sep 17 00:00:00 2001
From: Fabian Seidl <fabian.b.seidl@stud.h-da.de>
Date: Fri, 1 Jul 2022 16:22:46 +0200
Subject: [PATCH] subscribe impl, WIP

---
 controller/interfaces/networkdomain/pnd.go   |  1 +
 controller/northbound/server/pnd.go          | 31 +++++++++++++++++++
 controller/nucleus/principalNetworkDomain.go | 32 ++++++++++++++++++++
 3 files changed, 64 insertions(+)

diff --git a/controller/interfaces/networkdomain/pnd.go b/controller/interfaces/networkdomain/pnd.go
index fc9dec411..9cc5a0285 100644
--- a/controller/interfaces/networkdomain/pnd.go
+++ b/controller/interfaces/networkdomain/pnd.go
@@ -34,4 +34,5 @@ type NetworkDomain interface {
 	GetChange(uuid.UUID) (change.Change, error)
 	Commit(uuid.UUID) error
 	Confirm(uuid.UUID) error
+	SubscribePath(uuid.UUID, *ppb.SubscriptionList) error
 }
diff --git a/controller/northbound/server/pnd.go b/controller/northbound/server/pnd.go
index 5124730e4..268d2fa6f 100644
--- a/controller/northbound/server/pnd.go
+++ b/controller/northbound/server/pnd.go
@@ -652,3 +652,34 @@ func (p PndServer) DeleteOnd(ctx context.Context, request *ppb.DeleteOndRequest)
 		Status:    ppb.Status_STATUS_OK,
 	}, nil
 }
+
+// SubscribePath subscribes to specifc paths of an ond
+func (p PndServer) SubscribePath(ctx context.Context, request *ppb.SubscribePathRequest, stream ppb.PndService_SubscribePathServer) error {
+	labels := prometheus.Labels{"service": "pnd", "rpc": "subscribe path"}
+	start := metrics.StartHook(labels, grpcRequestsTotal)
+	defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
+
+	pid, err := uuid.Parse(request.Pid)
+	if err != nil {
+		return handleRPCError(labels, err)
+	}
+
+	pnd, err := p.pndStore.Get(store.Query{ID: pid})
+	if err != nil {
+		log.Error(err)
+		return handleRPCError(labels, err)
+	}
+
+	did, err := uuid.Parse(request.Did)
+	if err != nil {
+		log.Error(err)
+		return handleRPCError(labels, err)
+	}
+
+	if err := pnd.SubscribePath(did, request.Sublist); err != nil {
+		log.Error(err)
+		return handleRPCError(labels, err)
+	}
+
+	return nil
+}
diff --git a/controller/nucleus/principalNetworkDomain.go b/controller/nucleus/principalNetworkDomain.go
index ad53c9694..b77652285 100644
--- a/controller/nucleus/principalNetworkDomain.go
+++ b/controller/nucleus/principalNetworkDomain.go
@@ -32,6 +32,7 @@ import (
 
 	"code.fbi.h-da.de/danet/gosdn/controller/store"
 
+	"code.fbi.h-da.de/danet/gosdn/forks/goarista/gnmi"
 	"github.com/google/uuid"
 	"github.com/openconfig/ygot/ygot"
 	"github.com/openconfig/ygot/ytypes"
@@ -503,6 +504,37 @@ func (pnd *pndImplementation) ChangeOND(duid uuid.UUID, operation ppb.ApiOperati
 	return ch.cuid, nil
 }
 
+func (pnd *pndImplementation) SubscribePath(uuid uuid.UUID, subList *ppb.SubscriptionList) error {
+	d, err := pnd.deviceService.Get(store.Query{
+		ID: uuid,
+	})
+	if err != nil {
+		return err
+	}
+
+	//TODO(faseid): add more from params of subList or change proto to only have path
+	// TODO(faseid): missing target address!
+	// TODO: cehck gnmi client.go NewSubscribeRequest forks/go/arista/...
+	for _, sub := range subList.Subscription {
+
+		opts := &gnmi.SubscribeOptions{
+			Paths:          [][]string{{sub.Path}},
+			StreamMode:     sub.GetStreamMode().String(),
+			SampleInterval: sub.SampleInterval,
+		}
+
+		ctx := context.Background()
+		ctx = context.WithValue(ctx, types.CtxKeyOpts, opts)
+
+		err := d.Transport().Subscribe(ctx, sub.Path)
+		if err != nil {
+			return err
+		}
+	}
+
+	return nil
+}
+
 //nolint
 // handleRollbackError will be implemented in the near future
 func handleRollbackError(id uuid.UUID, err error) {
-- 
GitLab