Skip to content
Snippets Groups Projects
Commit a19bd256 authored by Fabian Seidl's avatar Fabian Seidl
Browse files

first try once subscription

parent e74a39a1
No related branches found
No related tags found
1 merge request!405Resolve "To improve the device watching mechanism a fetch all after time interval method should be implemented"
This commit is part of merge request !405. Comments created here will be created in the context of that merge request.
......@@ -21,6 +21,7 @@ type Transport interface {
Type() string
ProcessResponse(resp interface{}, root interface{}, models *ytypes.Schema) error
ProcessControlPlaneSubscribeResponse(resp *gpb.SubscribeResponse_Update, root any, schema *ytypes.Schema) error
NewControlPlaneSubscribe(ctx context.Context, subscriptionInfo *SubscriptionInformation, respChan chan *gpb.SubscribeResponse) error
}
type (
......
......@@ -88,6 +88,20 @@ func (_m *Transport) Get(ctx context.Context, params ...string) (interface{}, er
return r0, r1
}
// NewControlPlaneSubscribe provides a mock function with given fields: ctx, subscriptionInfo, respChan
func (_m *Transport) NewControlPlaneSubscribe(ctx context.Context, subscriptionInfo *transport.SubscriptionInformation, respChan chan *gnmi.SubscribeResponse) error {
ret := _m.Called(ctx, subscriptionInfo, respChan)
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, *transport.SubscriptionInformation, chan *gnmi.SubscribeResponse) error); ok {
r0 = rf(ctx, subscriptionInfo, respChan)
} else {
r0 = ret.Error(0)
}
return r0
}
// ProcessControlPlaneSubscribeResponse provides a mock function with given fields: resp, root, schema
func (_m *Transport) ProcessControlPlaneSubscribeResponse(resp *gnmi.SubscribeResponse_Update, root interface{}, schema *ytypes.Schema) error {
ret := _m.Called(resp, root, schema)
......
......@@ -448,6 +448,29 @@ func (g *Gnmi) controlPlaneSubscribe(ctx context.Context, subcribeCallbackFunc f
return gnmi.SubscribeErr(ctx, g.client, opts, g.RespChan)
}
func (g *Gnmi) NewControlPlaneSubscribe(ctx context.Context, subscriptionInfo *tpInterface.SubscriptionInformation, respChan chan *gpb.SubscribeResponse) error {
ctx = gnmi.NewContext(ctx, g.config)
opts, ok := ctx.Value(types.CtxKeyOpts).(*gnmi.SubscribeOptions)
if !ok {
return &customerrs.InvalidTypeAssertionError{
Value: ctx.Value(types.CtxKeyOpts),
Type: &gnmi.SubscribeOptions{},
}
}
go func() {
log.WithFields(log.Fields{
"address": opts.Target,
"paths": opts.Paths,
"mode": opts.Mode,
"interval": opts.SampleInterval,
}).Info("subscribed to gNMI target")
subResp := <-g.RespChan
respChan <- subResp
}()
return gnmi.SubscribeErr(ctx, g.client, opts, g.RespChan)
}
// Close calls GNMI close.
func (g *Gnmi) Close() error {
return nil
......
......@@ -19,7 +19,7 @@ import (
const (
subscribeSampleInterval uint64 = 1000000000 // 1 second in nanoseconds
// TODO: These gNMI options are adjusted to arista gNMI fork. Change when switching to native gNMI.
gNMISubscribeMode string = "stream"
gNMISubscribeMode string = "once"
gNMIStreamMode string = "on_change"
)
......@@ -63,7 +63,24 @@ func (n *NetworkElementWatcher) SubToNetworkElements(paths [][]string, opts *gnm
}
for _, pnd := range pnds {
n.subscribeToPndNetworkElements(pnd.ID().String(), pnd, opts)
n.newSubscribeToPndNetworkElements(pnd.ID().String(), pnd, opts)
}
}
func (n *NetworkElementWatcher) newSubscribeToPndNetworkElements(pndID string, pnd networkdomain.NetworkDomain, opts *gnmi.SubscribeOptions) {
for _, mne := range pnd.NetworkElements() {
gNMIOptionsCtx := context.Background()
gNMIOptionsCtx = context.WithValue(gNMIOptionsCtx, types.CtxKeyOpts, opts)
respChan := make(chan *gpb.SubscribeResponse)
go mne.Transport().NewControlPlaneSubscribe(gNMIOptionsCtx, &transport.SubscriptionInformation{
PndID: pndID,
NetworkElementID: mne.ID().String(),
NetworkElementName: mne.Name(),
}, respChan)
value := <-respChan
fmt.Println(value.Response)
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment