From 37ecfb8a707c22c68a349a2112b204a5e8e36fb1 Mon Sep 17 00:00:00 2001 From: Fabian Seidl <fabian.seidl@h-da.de> Date: Tue, 20 Dec 2022 10:10:05 +0100 Subject: [PATCH] Revert "first try once subscription" This reverts commit 20311d88a6bc252351b4c81eecba94b38991b5a8. --- controller/interfaces/transport/transport.go | 1 - controller/mocks/Transport.go | 14 ------------ controller/nucleus/gnmi_transport.go | 23 -------------------- controller/nucleus/networkElementWatcher.go | 21 ++---------------- 4 files changed, 2 insertions(+), 57 deletions(-) diff --git a/controller/interfaces/transport/transport.go b/controller/interfaces/transport/transport.go index 8ff37b019..e5d1f807a 100644 --- a/controller/interfaces/transport/transport.go +++ b/controller/interfaces/transport/transport.go @@ -21,7 +21,6 @@ type Transport interface { Type() string ProcessResponse(resp interface{}, root interface{}, models *ytypes.Schema) error ProcessControlPlaneSubscribeResponse(resp *gpb.SubscribeResponse_Update, root any, schema *ytypes.Schema) error - NewControlPlaneSubscribe(ctx context.Context, subscriptionInfo *SubscriptionInformation, respChan chan *gpb.SubscribeResponse) error } type ( diff --git a/controller/mocks/Transport.go b/controller/mocks/Transport.go index f04144954..d6961a3fb 100644 --- a/controller/mocks/Transport.go +++ b/controller/mocks/Transport.go @@ -88,20 +88,6 @@ func (_m *Transport) Get(ctx context.Context, params ...string) (interface{}, er return r0, r1 } -// NewControlPlaneSubscribe provides a mock function with given fields: ctx, subscriptionInfo, respChan -func (_m *Transport) NewControlPlaneSubscribe(ctx context.Context, subscriptionInfo *transport.SubscriptionInformation, respChan chan *gnmi.SubscribeResponse) error { - ret := _m.Called(ctx, subscriptionInfo, respChan) - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, *transport.SubscriptionInformation, chan *gnmi.SubscribeResponse) error); ok { - r0 = rf(ctx, subscriptionInfo, respChan) - } else { - r0 = ret.Error(0) - } - - return r0 -} - // ProcessControlPlaneSubscribeResponse provides a mock function with given fields: resp, root, schema func (_m *Transport) ProcessControlPlaneSubscribeResponse(resp *gnmi.SubscribeResponse_Update, root interface{}, schema *ytypes.Schema) error { ret := _m.Called(resp, root, schema) diff --git a/controller/nucleus/gnmi_transport.go b/controller/nucleus/gnmi_transport.go index 0e9453cbe..eb1ea0e24 100644 --- a/controller/nucleus/gnmi_transport.go +++ b/controller/nucleus/gnmi_transport.go @@ -448,29 +448,6 @@ func (g *Gnmi) controlPlaneSubscribe(ctx context.Context, subcribeCallbackFunc f return gnmi.SubscribeErr(ctx, g.client, opts, g.RespChan) } -func (g *Gnmi) NewControlPlaneSubscribe(ctx context.Context, subscriptionInfo *tpInterface.SubscriptionInformation, respChan chan *gpb.SubscribeResponse) error { - ctx = gnmi.NewContext(ctx, g.config) - opts, ok := ctx.Value(types.CtxKeyOpts).(*gnmi.SubscribeOptions) - if !ok { - return &customerrs.InvalidTypeAssertionError{ - Value: ctx.Value(types.CtxKeyOpts), - Type: &gnmi.SubscribeOptions{}, - } - } - - go func() { - log.WithFields(log.Fields{ - "address": opts.Target, - "paths": opts.Paths, - "mode": opts.Mode, - "interval": opts.SampleInterval, - }).Info("subscribed to gNMI target") - subResp := <-g.RespChan - respChan <- subResp - }() - return gnmi.SubscribeErr(ctx, g.client, opts, g.RespChan) -} - // Close calls GNMI close. func (g *Gnmi) Close() error { return nil diff --git a/controller/nucleus/networkElementWatcher.go b/controller/nucleus/networkElementWatcher.go index bfa39509e..d3d0cc20c 100644 --- a/controller/nucleus/networkElementWatcher.go +++ b/controller/nucleus/networkElementWatcher.go @@ -19,7 +19,7 @@ import ( const ( subscribeSampleInterval uint64 = 1000000000 // 1 second in nanoseconds // TODO: These gNMI options are adjusted to arista gNMI fork. Change when switching to native gNMI. - gNMISubscribeMode string = "once" + gNMISubscribeMode string = "stream" gNMIStreamMode string = "on_change" ) @@ -63,24 +63,7 @@ func (n *NetworkElementWatcher) SubToNetworkElements(paths [][]string, opts *gnm } for _, pnd := range pnds { - n.newSubscribeToPndNetworkElements(pnd.ID().String(), pnd, opts) - } -} - -func (n *NetworkElementWatcher) newSubscribeToPndNetworkElements(pndID string, pnd networkdomain.NetworkDomain, opts *gnmi.SubscribeOptions) { - for _, mne := range pnd.NetworkElements() { - gNMIOptionsCtx := context.Background() - gNMIOptionsCtx = context.WithValue(gNMIOptionsCtx, types.CtxKeyOpts, opts) - - respChan := make(chan *gpb.SubscribeResponse) - go mne.Transport().NewControlPlaneSubscribe(gNMIOptionsCtx, &transport.SubscriptionInformation{ - PndID: pndID, - NetworkElementID: mne.ID().String(), - NetworkElementName: mne.Name(), - }, respChan) - - value := <-respChan - fmt.Println(value.Response) + n.subscribeToPndNetworkElements(pnd.ID().String(), pnd, opts) } } -- GitLab