diff --git a/controller/interfaces/transport/transport.go b/controller/interfaces/transport/transport.go index 8ff37b019726ebfc603aff8ab86f692fdaae9669..e5d1f807a95a9665cb33f6bbffed325f0611863c 100644 --- a/controller/interfaces/transport/transport.go +++ b/controller/interfaces/transport/transport.go @@ -21,7 +21,6 @@ type Transport interface { Type() string ProcessResponse(resp interface{}, root interface{}, models *ytypes.Schema) error ProcessControlPlaneSubscribeResponse(resp *gpb.SubscribeResponse_Update, root any, schema *ytypes.Schema) error - NewControlPlaneSubscribe(ctx context.Context, subscriptionInfo *SubscriptionInformation, respChan chan *gpb.SubscribeResponse) error } type ( diff --git a/controller/mocks/Transport.go b/controller/mocks/Transport.go index f04144954118d5b41bce17c61b741b4fc379b0b6..d6961a3fb977757487dd33ffcdbcfe71c9c420d5 100644 --- a/controller/mocks/Transport.go +++ b/controller/mocks/Transport.go @@ -88,20 +88,6 @@ func (_m *Transport) Get(ctx context.Context, params ...string) (interface{}, er return r0, r1 } -// NewControlPlaneSubscribe provides a mock function with given fields: ctx, subscriptionInfo, respChan -func (_m *Transport) NewControlPlaneSubscribe(ctx context.Context, subscriptionInfo *transport.SubscriptionInformation, respChan chan *gnmi.SubscribeResponse) error { - ret := _m.Called(ctx, subscriptionInfo, respChan) - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, *transport.SubscriptionInformation, chan *gnmi.SubscribeResponse) error); ok { - r0 = rf(ctx, subscriptionInfo, respChan) - } else { - r0 = ret.Error(0) - } - - return r0 -} - // ProcessControlPlaneSubscribeResponse provides a mock function with given fields: resp, root, schema func (_m *Transport) ProcessControlPlaneSubscribeResponse(resp *gnmi.SubscribeResponse_Update, root interface{}, schema *ytypes.Schema) error { ret := _m.Called(resp, root, schema) diff --git a/controller/nucleus/gnmi_transport.go b/controller/nucleus/gnmi_transport.go index 0e9453cbe426b1be7745ff990290d7871fffc6c1..eb1ea0e2409808f3a169a62b58ddd292d5b1c3ee 100644 --- a/controller/nucleus/gnmi_transport.go +++ b/controller/nucleus/gnmi_transport.go @@ -448,29 +448,6 @@ func (g *Gnmi) controlPlaneSubscribe(ctx context.Context, subcribeCallbackFunc f return gnmi.SubscribeErr(ctx, g.client, opts, g.RespChan) } -func (g *Gnmi) NewControlPlaneSubscribe(ctx context.Context, subscriptionInfo *tpInterface.SubscriptionInformation, respChan chan *gpb.SubscribeResponse) error { - ctx = gnmi.NewContext(ctx, g.config) - opts, ok := ctx.Value(types.CtxKeyOpts).(*gnmi.SubscribeOptions) - if !ok { - return &customerrs.InvalidTypeAssertionError{ - Value: ctx.Value(types.CtxKeyOpts), - Type: &gnmi.SubscribeOptions{}, - } - } - - go func() { - log.WithFields(log.Fields{ - "address": opts.Target, - "paths": opts.Paths, - "mode": opts.Mode, - "interval": opts.SampleInterval, - }).Info("subscribed to gNMI target") - subResp := <-g.RespChan - respChan <- subResp - }() - return gnmi.SubscribeErr(ctx, g.client, opts, g.RespChan) -} - // Close calls GNMI close. func (g *Gnmi) Close() error { return nil diff --git a/controller/nucleus/networkElementWatcher.go b/controller/nucleus/networkElementWatcher.go index bfa39509ec1318c3bbb06d537a73b579d9a7e8f6..d3d0cc20c66e5da30923ffaf2e9f8f476bac9e3a 100644 --- a/controller/nucleus/networkElementWatcher.go +++ b/controller/nucleus/networkElementWatcher.go @@ -19,7 +19,7 @@ import ( const ( subscribeSampleInterval uint64 = 1000000000 // 1 second in nanoseconds // TODO: These gNMI options are adjusted to arista gNMI fork. Change when switching to native gNMI. - gNMISubscribeMode string = "once" + gNMISubscribeMode string = "stream" gNMIStreamMode string = "on_change" ) @@ -63,24 +63,7 @@ func (n *NetworkElementWatcher) SubToNetworkElements(paths [][]string, opts *gnm } for _, pnd := range pnds { - n.newSubscribeToPndNetworkElements(pnd.ID().String(), pnd, opts) - } -} - -func (n *NetworkElementWatcher) newSubscribeToPndNetworkElements(pndID string, pnd networkdomain.NetworkDomain, opts *gnmi.SubscribeOptions) { - for _, mne := range pnd.NetworkElements() { - gNMIOptionsCtx := context.Background() - gNMIOptionsCtx = context.WithValue(gNMIOptionsCtx, types.CtxKeyOpts, opts) - - respChan := make(chan *gpb.SubscribeResponse) - go mne.Transport().NewControlPlaneSubscribe(gNMIOptionsCtx, &transport.SubscriptionInformation{ - PndID: pndID, - NetworkElementID: mne.ID().String(), - NetworkElementName: mne.Name(), - }, respChan) - - value := <-respChan - fmt.Println(value.Response) + n.subscribeToPndNetworkElements(pnd.ID().String(), pnd, opts) } }