diff --git a/controller/interfaces/transport/transport.go b/controller/interfaces/transport/transport.go index e5d1f807a95a9665cb33f6bbffed325f0611863c..8ff37b019726ebfc603aff8ab86f692fdaae9669 100644 --- a/controller/interfaces/transport/transport.go +++ b/controller/interfaces/transport/transport.go @@ -21,6 +21,7 @@ type Transport interface { Type() string ProcessResponse(resp interface{}, root interface{}, models *ytypes.Schema) error ProcessControlPlaneSubscribeResponse(resp *gpb.SubscribeResponse_Update, root any, schema *ytypes.Schema) error + NewControlPlaneSubscribe(ctx context.Context, subscriptionInfo *SubscriptionInformation, respChan chan *gpb.SubscribeResponse) error } type ( diff --git a/controller/mocks/Transport.go b/controller/mocks/Transport.go index d6961a3fb977757487dd33ffcdbcfe71c9c420d5..f04144954118d5b41bce17c61b741b4fc379b0b6 100644 --- a/controller/mocks/Transport.go +++ b/controller/mocks/Transport.go @@ -88,6 +88,20 @@ func (_m *Transport) Get(ctx context.Context, params ...string) (interface{}, er return r0, r1 } +// NewControlPlaneSubscribe provides a mock function with given fields: ctx, subscriptionInfo, respChan +func (_m *Transport) NewControlPlaneSubscribe(ctx context.Context, subscriptionInfo *transport.SubscriptionInformation, respChan chan *gnmi.SubscribeResponse) error { + ret := _m.Called(ctx, subscriptionInfo, respChan) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, *transport.SubscriptionInformation, chan *gnmi.SubscribeResponse) error); ok { + r0 = rf(ctx, subscriptionInfo, respChan) + } else { + r0 = ret.Error(0) + } + + return r0 +} + // ProcessControlPlaneSubscribeResponse provides a mock function with given fields: resp, root, schema func (_m *Transport) ProcessControlPlaneSubscribeResponse(resp *gnmi.SubscribeResponse_Update, root interface{}, schema *ytypes.Schema) error { ret := _m.Called(resp, root, schema) diff --git a/controller/nucleus/gnmi_transport.go b/controller/nucleus/gnmi_transport.go index eb1ea0e2409808f3a169a62b58ddd292d5b1c3ee..0e9453cbe426b1be7745ff990290d7871fffc6c1 100644 --- a/controller/nucleus/gnmi_transport.go +++ b/controller/nucleus/gnmi_transport.go @@ -448,6 +448,29 @@ func (g *Gnmi) controlPlaneSubscribe(ctx context.Context, subcribeCallbackFunc f return gnmi.SubscribeErr(ctx, g.client, opts, g.RespChan) } +func (g *Gnmi) NewControlPlaneSubscribe(ctx context.Context, subscriptionInfo *tpInterface.SubscriptionInformation, respChan chan *gpb.SubscribeResponse) error { + ctx = gnmi.NewContext(ctx, g.config) + opts, ok := ctx.Value(types.CtxKeyOpts).(*gnmi.SubscribeOptions) + if !ok { + return &customerrs.InvalidTypeAssertionError{ + Value: ctx.Value(types.CtxKeyOpts), + Type: &gnmi.SubscribeOptions{}, + } + } + + go func() { + log.WithFields(log.Fields{ + "address": opts.Target, + "paths": opts.Paths, + "mode": opts.Mode, + "interval": opts.SampleInterval, + }).Info("subscribed to gNMI target") + subResp := <-g.RespChan + respChan <- subResp + }() + return gnmi.SubscribeErr(ctx, g.client, opts, g.RespChan) +} + // Close calls GNMI close. func (g *Gnmi) Close() error { return nil diff --git a/controller/nucleus/networkElementWatcher.go b/controller/nucleus/networkElementWatcher.go index d3d0cc20c66e5da30923ffaf2e9f8f476bac9e3a..bfa39509ec1318c3bbb06d537a73b579d9a7e8f6 100644 --- a/controller/nucleus/networkElementWatcher.go +++ b/controller/nucleus/networkElementWatcher.go @@ -19,7 +19,7 @@ import ( const ( subscribeSampleInterval uint64 = 1000000000 // 1 second in nanoseconds // TODO: These gNMI options are adjusted to arista gNMI fork. Change when switching to native gNMI. - gNMISubscribeMode string = "stream" + gNMISubscribeMode string = "once" gNMIStreamMode string = "on_change" ) @@ -63,7 +63,24 @@ func (n *NetworkElementWatcher) SubToNetworkElements(paths [][]string, opts *gnm } for _, pnd := range pnds { - n.subscribeToPndNetworkElements(pnd.ID().String(), pnd, opts) + n.newSubscribeToPndNetworkElements(pnd.ID().String(), pnd, opts) + } +} + +func (n *NetworkElementWatcher) newSubscribeToPndNetworkElements(pndID string, pnd networkdomain.NetworkDomain, opts *gnmi.SubscribeOptions) { + for _, mne := range pnd.NetworkElements() { + gNMIOptionsCtx := context.Background() + gNMIOptionsCtx = context.WithValue(gNMIOptionsCtx, types.CtxKeyOpts, opts) + + respChan := make(chan *gpb.SubscribeResponse) + go mne.Transport().NewControlPlaneSubscribe(gNMIOptionsCtx, &transport.SubscriptionInformation{ + PndID: pndID, + NetworkElementID: mne.ID().String(), + NetworkElementName: mne.Name(), + }, respChan) + + value := <-respChan + fmt.Println(value.Response) } }