From 20311d88a6bc252351b4c81eecba94b38991b5a8 Mon Sep 17 00:00:00 2001 From: Fabian Seidl <fabian.seidl@h-da.de> Date: Tue, 13 Dec 2022 15:11:07 +0100 Subject: [PATCH] first try once subscription --- controller/interfaces/transport/transport.go | 1 + controller/mocks/Transport.go | 14 ++++++++++++ controller/nucleus/gnmi_transport.go | 23 ++++++++++++++++++++ controller/nucleus/networkElementWatcher.go | 21 ++++++++++++++++-- 4 files changed, 57 insertions(+), 2 deletions(-) diff --git a/controller/interfaces/transport/transport.go b/controller/interfaces/transport/transport.go index e5d1f807a..8ff37b019 100644 --- a/controller/interfaces/transport/transport.go +++ b/controller/interfaces/transport/transport.go @@ -21,6 +21,7 @@ type Transport interface { Type() string ProcessResponse(resp interface{}, root interface{}, models *ytypes.Schema) error ProcessControlPlaneSubscribeResponse(resp *gpb.SubscribeResponse_Update, root any, schema *ytypes.Schema) error + NewControlPlaneSubscribe(ctx context.Context, subscriptionInfo *SubscriptionInformation, respChan chan *gpb.SubscribeResponse) error } type ( diff --git a/controller/mocks/Transport.go b/controller/mocks/Transport.go index d6961a3fb..f04144954 100644 --- a/controller/mocks/Transport.go +++ b/controller/mocks/Transport.go @@ -88,6 +88,20 @@ func (_m *Transport) Get(ctx context.Context, params ...string) (interface{}, er return r0, r1 } +// NewControlPlaneSubscribe provides a mock function with given fields: ctx, subscriptionInfo, respChan +func (_m *Transport) NewControlPlaneSubscribe(ctx context.Context, subscriptionInfo *transport.SubscriptionInformation, respChan chan *gnmi.SubscribeResponse) error { + ret := _m.Called(ctx, subscriptionInfo, respChan) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, *transport.SubscriptionInformation, chan *gnmi.SubscribeResponse) error); ok { + r0 = rf(ctx, subscriptionInfo, respChan) + } else { + r0 = ret.Error(0) + } + + return r0 +} + // ProcessControlPlaneSubscribeResponse provides a mock function with given fields: resp, root, schema func (_m *Transport) ProcessControlPlaneSubscribeResponse(resp *gnmi.SubscribeResponse_Update, root interface{}, schema *ytypes.Schema) error { ret := _m.Called(resp, root, schema) diff --git a/controller/nucleus/gnmi_transport.go b/controller/nucleus/gnmi_transport.go index eb1ea0e24..0e9453cbe 100644 --- a/controller/nucleus/gnmi_transport.go +++ b/controller/nucleus/gnmi_transport.go @@ -448,6 +448,29 @@ func (g *Gnmi) controlPlaneSubscribe(ctx context.Context, subcribeCallbackFunc f return gnmi.SubscribeErr(ctx, g.client, opts, g.RespChan) } +func (g *Gnmi) NewControlPlaneSubscribe(ctx context.Context, subscriptionInfo *tpInterface.SubscriptionInformation, respChan chan *gpb.SubscribeResponse) error { + ctx = gnmi.NewContext(ctx, g.config) + opts, ok := ctx.Value(types.CtxKeyOpts).(*gnmi.SubscribeOptions) + if !ok { + return &customerrs.InvalidTypeAssertionError{ + Value: ctx.Value(types.CtxKeyOpts), + Type: &gnmi.SubscribeOptions{}, + } + } + + go func() { + log.WithFields(log.Fields{ + "address": opts.Target, + "paths": opts.Paths, + "mode": opts.Mode, + "interval": opts.SampleInterval, + }).Info("subscribed to gNMI target") + subResp := <-g.RespChan + respChan <- subResp + }() + return gnmi.SubscribeErr(ctx, g.client, opts, g.RespChan) +} + // Close calls GNMI close. func (g *Gnmi) Close() error { return nil diff --git a/controller/nucleus/networkElementWatcher.go b/controller/nucleus/networkElementWatcher.go index d3d0cc20c..bfa39509e 100644 --- a/controller/nucleus/networkElementWatcher.go +++ b/controller/nucleus/networkElementWatcher.go @@ -19,7 +19,7 @@ import ( const ( subscribeSampleInterval uint64 = 1000000000 // 1 second in nanoseconds // TODO: These gNMI options are adjusted to arista gNMI fork. Change when switching to native gNMI. - gNMISubscribeMode string = "stream" + gNMISubscribeMode string = "once" gNMIStreamMode string = "on_change" ) @@ -63,7 +63,24 @@ func (n *NetworkElementWatcher) SubToNetworkElements(paths [][]string, opts *gnm } for _, pnd := range pnds { - n.subscribeToPndNetworkElements(pnd.ID().String(), pnd, opts) + n.newSubscribeToPndNetworkElements(pnd.ID().String(), pnd, opts) + } +} + +func (n *NetworkElementWatcher) newSubscribeToPndNetworkElements(pndID string, pnd networkdomain.NetworkDomain, opts *gnmi.SubscribeOptions) { + for _, mne := range pnd.NetworkElements() { + gNMIOptionsCtx := context.Background() + gNMIOptionsCtx = context.WithValue(gNMIOptionsCtx, types.CtxKeyOpts, opts) + + respChan := make(chan *gpb.SubscribeResponse) + go mne.Transport().NewControlPlaneSubscribe(gNMIOptionsCtx, &transport.SubscriptionInformation{ + PndID: pndID, + NetworkElementID: mne.ID().String(), + NetworkElementName: mne.Name(), + }, respChan) + + value := <-respChan + fmt.Println(value.Response) } } -- GitLab