Skip to content
Snippets Groups Projects
Commit 37ecfb8a authored by Fabian Seidl's avatar Fabian Seidl
Browse files

Revert "first try once subscription"

This reverts commit 20311d88.
parent a19bd256
No related branches found
No related tags found
1 merge request!405Resolve "To improve the device watching mechanism a fetch all after time interval method should be implemented"
...@@ -21,7 +21,6 @@ type Transport interface { ...@@ -21,7 +21,6 @@ type Transport interface {
Type() string Type() string
ProcessResponse(resp interface{}, root interface{}, models *ytypes.Schema) error ProcessResponse(resp interface{}, root interface{}, models *ytypes.Schema) error
ProcessControlPlaneSubscribeResponse(resp *gpb.SubscribeResponse_Update, root any, schema *ytypes.Schema) error ProcessControlPlaneSubscribeResponse(resp *gpb.SubscribeResponse_Update, root any, schema *ytypes.Schema) error
NewControlPlaneSubscribe(ctx context.Context, subscriptionInfo *SubscriptionInformation, respChan chan *gpb.SubscribeResponse) error
} }
type ( type (
......
...@@ -88,20 +88,6 @@ func (_m *Transport) Get(ctx context.Context, params ...string) (interface{}, er ...@@ -88,20 +88,6 @@ func (_m *Transport) Get(ctx context.Context, params ...string) (interface{}, er
return r0, r1 return r0, r1
} }
// NewControlPlaneSubscribe provides a mock function with given fields: ctx, subscriptionInfo, respChan
func (_m *Transport) NewControlPlaneSubscribe(ctx context.Context, subscriptionInfo *transport.SubscriptionInformation, respChan chan *gnmi.SubscribeResponse) error {
ret := _m.Called(ctx, subscriptionInfo, respChan)
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, *transport.SubscriptionInformation, chan *gnmi.SubscribeResponse) error); ok {
r0 = rf(ctx, subscriptionInfo, respChan)
} else {
r0 = ret.Error(0)
}
return r0
}
// ProcessControlPlaneSubscribeResponse provides a mock function with given fields: resp, root, schema // ProcessControlPlaneSubscribeResponse provides a mock function with given fields: resp, root, schema
func (_m *Transport) ProcessControlPlaneSubscribeResponse(resp *gnmi.SubscribeResponse_Update, root interface{}, schema *ytypes.Schema) error { func (_m *Transport) ProcessControlPlaneSubscribeResponse(resp *gnmi.SubscribeResponse_Update, root interface{}, schema *ytypes.Schema) error {
ret := _m.Called(resp, root, schema) ret := _m.Called(resp, root, schema)
......
...@@ -448,29 +448,6 @@ func (g *Gnmi) controlPlaneSubscribe(ctx context.Context, subcribeCallbackFunc f ...@@ -448,29 +448,6 @@ func (g *Gnmi) controlPlaneSubscribe(ctx context.Context, subcribeCallbackFunc f
return gnmi.SubscribeErr(ctx, g.client, opts, g.RespChan) return gnmi.SubscribeErr(ctx, g.client, opts, g.RespChan)
} }
func (g *Gnmi) NewControlPlaneSubscribe(ctx context.Context, subscriptionInfo *tpInterface.SubscriptionInformation, respChan chan *gpb.SubscribeResponse) error {
ctx = gnmi.NewContext(ctx, g.config)
opts, ok := ctx.Value(types.CtxKeyOpts).(*gnmi.SubscribeOptions)
if !ok {
return &customerrs.InvalidTypeAssertionError{
Value: ctx.Value(types.CtxKeyOpts),
Type: &gnmi.SubscribeOptions{},
}
}
go func() {
log.WithFields(log.Fields{
"address": opts.Target,
"paths": opts.Paths,
"mode": opts.Mode,
"interval": opts.SampleInterval,
}).Info("subscribed to gNMI target")
subResp := <-g.RespChan
respChan <- subResp
}()
return gnmi.SubscribeErr(ctx, g.client, opts, g.RespChan)
}
// Close calls GNMI close. // Close calls GNMI close.
func (g *Gnmi) Close() error { func (g *Gnmi) Close() error {
return nil return nil
......
...@@ -19,7 +19,7 @@ import ( ...@@ -19,7 +19,7 @@ import (
const ( const (
subscribeSampleInterval uint64 = 1000000000 // 1 second in nanoseconds subscribeSampleInterval uint64 = 1000000000 // 1 second in nanoseconds
// TODO: These gNMI options are adjusted to arista gNMI fork. Change when switching to native gNMI. // TODO: These gNMI options are adjusted to arista gNMI fork. Change when switching to native gNMI.
gNMISubscribeMode string = "once" gNMISubscribeMode string = "stream"
gNMIStreamMode string = "on_change" gNMIStreamMode string = "on_change"
) )
...@@ -63,24 +63,7 @@ func (n *NetworkElementWatcher) SubToNetworkElements(paths [][]string, opts *gnm ...@@ -63,24 +63,7 @@ func (n *NetworkElementWatcher) SubToNetworkElements(paths [][]string, opts *gnm
} }
for _, pnd := range pnds { for _, pnd := range pnds {
n.newSubscribeToPndNetworkElements(pnd.ID().String(), pnd, opts) n.subscribeToPndNetworkElements(pnd.ID().String(), pnd, opts)
}
}
func (n *NetworkElementWatcher) newSubscribeToPndNetworkElements(pndID string, pnd networkdomain.NetworkDomain, opts *gnmi.SubscribeOptions) {
for _, mne := range pnd.NetworkElements() {
gNMIOptionsCtx := context.Background()
gNMIOptionsCtx = context.WithValue(gNMIOptionsCtx, types.CtxKeyOpts, opts)
respChan := make(chan *gpb.SubscribeResponse)
go mne.Transport().NewControlPlaneSubscribe(gNMIOptionsCtx, &transport.SubscriptionInformation{
PndID: pndID,
NetworkElementID: mne.ID().String(),
NetworkElementName: mne.Name(),
}, respChan)
value := <-respChan
fmt.Println(value.Response)
} }
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment