Skip to content
Snippets Groups Projects
Commit fc92f381 authored by Fabian Seidl's avatar Fabian Seidl
Browse files

added callback function to subscribe and simple way of testing the mechanism, WIP

parent 3c338ffe
No related branches found
No related tags found
3 merge requests!376Add additional example application hostname-checker,!343Add basic application framework and example application to show interaction between events an NBI,!342Resolve "Add an option to send gNMI Subscribe requests via SBI"
Pipeline #108081 failed
...@@ -44,14 +44,15 @@ var coreOnce sync.Once ...@@ -44,14 +44,15 @@ var coreOnce sync.Once
// Core is the representation of the controller's core // Core is the representation of the controller's core
type Core struct { type Core struct {
pndStore networkdomain.PndStore pndStore networkdomain.PndStore
userService rbac.UserService userService rbac.UserService
roleService rbac.RoleService roleService rbac.RoleService
httpServer *http.Server httpServer *http.Server
grpcServer *grpc.Server grpcServer *grpc.Server
nbi *nbi.NorthboundInterface nbi *nbi.NorthboundInterface
eventService eventInterfaces.Service eventService eventInterfaces.Service
stopChan chan os.Signal stopChan chan os.Signal
subToDataPlaneComp *nucleus.SubToDataPlane
csbiClient cpb.CsbiServiceClient csbiClient cpb.CsbiServiceClient
} }
...@@ -78,6 +79,10 @@ func initialize() error { ...@@ -78,6 +79,10 @@ func initialize() error {
stopChan: make(chan os.Signal, 1), stopChan: make(chan os.Signal, 1),
} }
c.subToDataPlaneComp = nucleus.NewSubToDataPlane(c.pndStore)
//TODO: remove this call after testing!
c.subToDataPlaneComp.SubToDevices()
// Setting up signal capturing // Setting up signal capturing
signal.Notify(c.stopChan, os.Interrupt, syscall.SIGTERM) signal.Notify(c.stopChan, os.Interrupt, syscall.SIGTERM)
......
...@@ -14,6 +14,7 @@ type Transport interface { ...@@ -14,6 +14,7 @@ type Transport interface {
Get(ctx context.Context, params ...string) (interface{}, error) Get(ctx context.Context, params ...string) (interface{}, error)
Set(ctx context.Context, payload change.Payload, path string, schema *ytypes.Schema) error Set(ctx context.Context, payload change.Payload, path string, schema *ytypes.Schema) error
Subscribe(ctx context.Context, params ...string) error Subscribe(ctx context.Context, params ...string) error
SubscribeInternal(ctx context.Context, handleSubscribeRepsonse any) error
Type() string Type() string
ProcessResponse(resp interface{}, root interface{}, models *ytypes.Schema) error ProcessResponse(resp interface{}, root interface{}, models *ytypes.Schema) error
} }
...@@ -98,6 +98,20 @@ func (_m *Transport) Subscribe(ctx context.Context, params ...string) error { ...@@ -98,6 +98,20 @@ func (_m *Transport) Subscribe(ctx context.Context, params ...string) error {
return r0 return r0
} }
// SubscribeInternal provides a mock function with given fields: ctx, handleSubscribeRepsonse
func (_m *Transport) SubscribeInternal(ctx context.Context, handleSubscribeRepsonse interface{}) error {
ret := _m.Called(ctx, handleSubscribeRepsonse)
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, interface{}) error); ok {
r0 = rf(ctx, handleSubscribeRepsonse)
} else {
r0 = ret.Error(0)
}
return r0
}
// Type provides a mock function with given fields: // Type provides a mock function with given fields:
func (_m *Transport) Type() string { func (_m *Transport) Type() string {
ret := _m.Called() ret := _m.Called()
......
...@@ -198,7 +198,21 @@ func (g *Gnmi) Subscribe(ctx context.Context, params ...string) error { ...@@ -198,7 +198,21 @@ func (g *Gnmi) Subscribe(ctx context.Context, params ...string) error {
if g.client == nil { if g.client == nil {
return &errors.ErrNilClient{} return &errors.ErrNilClient{}
} }
return g.subscribe2(ctx) return g.subscribe(ctx)
}
func (g *Gnmi) SubscribeInternal(ctx context.Context, handleSubscribeRepsonse any) error {
if g.client == nil {
return &errors.ErrNilClient{}
}
callback, ok := handleSubscribeRepsonse.(func(*gpb.SubscribeResponse))
if !ok {
// TODO: fix error, change to wrong method or sth
return &errors.ErrNotYetImplemented{}
}
return g.subscribeInternal(ctx, callback)
} }
// Type returns the gNMI transport type // Type returns the gNMI transport type
...@@ -330,7 +344,7 @@ func (g *Gnmi) subscribe(ctx context.Context) error { ...@@ -330,7 +344,7 @@ func (g *Gnmi) subscribe(ctx context.Context) error {
} }
// Subscribe calls GNMI subscribe // Subscribe calls GNMI subscribe
func (g *Gnmi) subscribe2(ctx context.Context) error { func (g *Gnmi) subscribeInternal(ctx context.Context, subcribeCallbackFunc func(*gpb.SubscribeResponse)) error { // add callback function instead of chan string as parameter
ctx = gnmi.NewContext(ctx, g.config) ctx = gnmi.NewContext(ctx, g.config)
opts, ok := ctx.Value(types.CtxKeyOpts).(*gnmi.SubscribeOptions) opts, ok := ctx.Value(types.CtxKeyOpts).(*gnmi.SubscribeOptions)
if !ok { if !ok {
...@@ -339,18 +353,6 @@ func (g *Gnmi) subscribe2(ctx context.Context) error { ...@@ -339,18 +353,6 @@ func (g *Gnmi) subscribe2(ctx context.Context) error {
Type: &gnmi.SubscribeOptions{}, Type: &gnmi.SubscribeOptions{},
} }
} }
stringRespChan := make(chan string)
go func() {
for {
resp := <-stringRespChan
if resp != "" {
log.Info(resp)
} else {
log.Info("recieved empty string")
}
}
}()
go func() { go func() {
log.WithFields(log.Fields{ log.WithFields(log.Fields{
...@@ -362,9 +364,8 @@ func (g *Gnmi) subscribe2(ctx context.Context) error { ...@@ -362,9 +364,8 @@ func (g *Gnmi) subscribe2(ctx context.Context) error {
for { for {
resp := <-g.RespChan resp := <-g.RespChan
if resp != nil { if resp != nil {
if err := LogSubscribeResponse(resp, stringRespChan); err != nil { // do callback(respMessage)
log.Fatal(err) go subcribeCallbackFunc(resp)
}
} }
} }
}() }()
......
package nucleus
import (
"context"
"fmt"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/device"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/networkdomain"
"code.fbi.h-da.de/danet/gosdn/controller/nucleus/types"
"code.fbi.h-da.de/danet/gosdn/controller/store"
"code.fbi.h-da.de/danet/gosdn/forks/goarista/gnmi"
gpb "github.com/openconfig/gnmi/proto/gnmi"
log "github.com/sirupsen/logrus"
)
//TODO: rename file and go struct!
type SubToDataPlane struct {
pndStore networkdomain.PndStore
}
func NewSubToDataPlane(pndStore networkdomain.PndStore) *SubToDataPlane {
return &SubToDataPlane{
pndStore: pndStore,
}
}
func (s *SubToDataPlane) SubToDevices() {
//TODO: make this configurable and move away from arista stuff?!
opts := &gnmi.SubscribeOptions{
Mode: "stream",
StreamMode: "sample",
Paths: [][]string{{"system", "config", "hostname"}},
SampleInterval: 1000000000,
}
pnd, err := s.pndStore.Get(store.Query{Name: "base"})
if err != nil {
log.Error(err)
}
for _, d := range pnd.Devices() {
go s.callSubscribe(d, opts)
}
}
func (s *SubToDataPlane) callSubscribe(device device.Device, opts *gnmi.SubscribeOptions) {
ctx := context.Background()
ctx = context.WithValue(ctx, types.CtxKeyOpts, opts)
device.Transport().SubscribeInternal(ctx, handleSubscribeRepsonse)
}
func handleSubscribeRepsonse(resp *gpb.SubscribeResponse) {
fmt.Printf("YEP HANDLER CALLED Type:%T ExampleMessage:%s\n", resp, resp.String())
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment