diff --git a/controller/controller.go b/controller/controller.go index 8f86f6ac6408f75f9ca41cfd8acaaaafe260a2e7..029e684ece84729e9dcde9a672e72f5f7be71866 100644 --- a/controller/controller.go +++ b/controller/controller.go @@ -44,14 +44,15 @@ var coreOnce sync.Once // Core is the representation of the controller's core type Core struct { - pndStore networkdomain.PndStore - userService rbac.UserService - roleService rbac.RoleService - httpServer *http.Server - grpcServer *grpc.Server - nbi *nbi.NorthboundInterface - eventService eventInterfaces.Service - stopChan chan os.Signal + pndStore networkdomain.PndStore + userService rbac.UserService + roleService rbac.RoleService + httpServer *http.Server + grpcServer *grpc.Server + nbi *nbi.NorthboundInterface + eventService eventInterfaces.Service + stopChan chan os.Signal + subToDataPlaneComp *nucleus.SubToDataPlane csbiClient cpb.CsbiServiceClient } @@ -78,6 +79,10 @@ func initialize() error { stopChan: make(chan os.Signal, 1), } + c.subToDataPlaneComp = nucleus.NewSubToDataPlane(c.pndStore) + //TODO: remove this call after testing! + c.subToDataPlaneComp.SubToDevices() + // Setting up signal capturing signal.Notify(c.stopChan, os.Interrupt, syscall.SIGTERM) diff --git a/controller/interfaces/transport/transport.go b/controller/interfaces/transport/transport.go index 9d732c370599062776d9151c52fa4395df7c3d6f..66db9d8059fd561cf16508d931944baf18c1f8d1 100644 --- a/controller/interfaces/transport/transport.go +++ b/controller/interfaces/transport/transport.go @@ -14,6 +14,7 @@ type Transport interface { Get(ctx context.Context, params ...string) (interface{}, error) Set(ctx context.Context, payload change.Payload, path string, schema *ytypes.Schema) error Subscribe(ctx context.Context, params ...string) error + SubscribeInternal(ctx context.Context, handleSubscribeRepsonse any) error Type() string ProcessResponse(resp interface{}, root interface{}, models *ytypes.Schema) error } diff --git a/controller/mocks/Transport.go b/controller/mocks/Transport.go index 10fa882e37cf54a9c81a7652ec0da45694e472ef..40623ad7114e6199db7561e4634184ec879dbecb 100644 --- a/controller/mocks/Transport.go +++ b/controller/mocks/Transport.go @@ -98,6 +98,20 @@ func (_m *Transport) Subscribe(ctx context.Context, params ...string) error { return r0 } +// SubscribeInternal provides a mock function with given fields: ctx, handleSubscribeRepsonse +func (_m *Transport) SubscribeInternal(ctx context.Context, handleSubscribeRepsonse interface{}) error { + ret := _m.Called(ctx, handleSubscribeRepsonse) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, interface{}) error); ok { + r0 = rf(ctx, handleSubscribeRepsonse) + } else { + r0 = ret.Error(0) + } + + return r0 +} + // Type provides a mock function with given fields: func (_m *Transport) Type() string { ret := _m.Called() diff --git a/controller/nucleus/gnmi_transport.go b/controller/nucleus/gnmi_transport.go index 5e7e2e145e79d937774aef1a3fd2694261a9c66c..ad6d51fcfe51089311581d7c63e850126d102024 100644 --- a/controller/nucleus/gnmi_transport.go +++ b/controller/nucleus/gnmi_transport.go @@ -198,7 +198,21 @@ func (g *Gnmi) Subscribe(ctx context.Context, params ...string) error { if g.client == nil { return &errors.ErrNilClient{} } - return g.subscribe2(ctx) + return g.subscribe(ctx) +} + +func (g *Gnmi) SubscribeInternal(ctx context.Context, handleSubscribeRepsonse any) error { + if g.client == nil { + return &errors.ErrNilClient{} + } + + callback, ok := handleSubscribeRepsonse.(func(*gpb.SubscribeResponse)) + if !ok { + // TODO: fix error, change to wrong method or sth + return &errors.ErrNotYetImplemented{} + } + + return g.subscribeInternal(ctx, callback) } // Type returns the gNMI transport type @@ -330,7 +344,7 @@ func (g *Gnmi) subscribe(ctx context.Context) error { } // Subscribe calls GNMI subscribe -func (g *Gnmi) subscribe2(ctx context.Context) error { +func (g *Gnmi) subscribeInternal(ctx context.Context, subcribeCallbackFunc func(*gpb.SubscribeResponse)) error { // add callback function instead of chan string as parameter ctx = gnmi.NewContext(ctx, g.config) opts, ok := ctx.Value(types.CtxKeyOpts).(*gnmi.SubscribeOptions) if !ok { @@ -339,18 +353,6 @@ func (g *Gnmi) subscribe2(ctx context.Context) error { Type: &gnmi.SubscribeOptions{}, } } - stringRespChan := make(chan string) - - go func() { - for { - resp := <-stringRespChan - if resp != "" { - log.Info(resp) - } else { - log.Info("recieved empty string") - } - } - }() go func() { log.WithFields(log.Fields{ @@ -362,9 +364,8 @@ func (g *Gnmi) subscribe2(ctx context.Context) error { for { resp := <-g.RespChan if resp != nil { - if err := LogSubscribeResponse(resp, stringRespChan); err != nil { - log.Fatal(err) - } + // do callback(respMessage) + go subcribeCallbackFunc(resp) } } }() diff --git a/controller/nucleus/subToDataPlane.go b/controller/nucleus/subToDataPlane.go new file mode 100644 index 0000000000000000000000000000000000000000..52516294595ecfdc494be1b2ced62b4d09f9288a --- /dev/null +++ b/controller/nucleus/subToDataPlane.go @@ -0,0 +1,57 @@ +package nucleus + +import ( + "context" + "fmt" + + "code.fbi.h-da.de/danet/gosdn/controller/interfaces/device" + "code.fbi.h-da.de/danet/gosdn/controller/interfaces/networkdomain" + "code.fbi.h-da.de/danet/gosdn/controller/nucleus/types" + "code.fbi.h-da.de/danet/gosdn/controller/store" + "code.fbi.h-da.de/danet/gosdn/forks/goarista/gnmi" + gpb "github.com/openconfig/gnmi/proto/gnmi" + log "github.com/sirupsen/logrus" +) + +//TODO: rename file and go struct! + +type SubToDataPlane struct { + pndStore networkdomain.PndStore +} + +func NewSubToDataPlane(pndStore networkdomain.PndStore) *SubToDataPlane { + return &SubToDataPlane{ + pndStore: pndStore, + } +} + +func (s *SubToDataPlane) SubToDevices() { + + //TODO: make this configurable and move away from arista stuff?! + opts := &gnmi.SubscribeOptions{ + Mode: "stream", + StreamMode: "sample", + Paths: [][]string{{"system", "config", "hostname"}}, + SampleInterval: 1000000000, + } + + pnd, err := s.pndStore.Get(store.Query{Name: "base"}) + if err != nil { + log.Error(err) + } + + for _, d := range pnd.Devices() { + go s.callSubscribe(d, opts) + } +} + +func (s *SubToDataPlane) callSubscribe(device device.Device, opts *gnmi.SubscribeOptions) { + ctx := context.Background() + ctx = context.WithValue(ctx, types.CtxKeyOpts, opts) + + device.Transport().SubscribeInternal(ctx, handleSubscribeRepsonse) +} + +func handleSubscribeRepsonse(resp *gpb.SubscribeResponse) { + fmt.Printf("YEP HANDLER CALLED Type:%T ExampleMessage:%s\n", resp, resp.String()) +}