Skip to content
Snippets Groups Projects
Commit 20311d88 authored by Fabian Seidl's avatar Fabian Seidl
Browse files

first try once subscription

parent 1f8c21d4
Branches
Tags
No related merge requests found
Pipeline #123993 failed
This commit is part of merge request !405. Comments created here will be created in the context of that merge request.
...@@ -21,6 +21,7 @@ type Transport interface { ...@@ -21,6 +21,7 @@ type Transport interface {
Type() string Type() string
ProcessResponse(resp interface{}, root interface{}, models *ytypes.Schema) error ProcessResponse(resp interface{}, root interface{}, models *ytypes.Schema) error
ProcessControlPlaneSubscribeResponse(resp *gpb.SubscribeResponse_Update, root any, schema *ytypes.Schema) error ProcessControlPlaneSubscribeResponse(resp *gpb.SubscribeResponse_Update, root any, schema *ytypes.Schema) error
NewControlPlaneSubscribe(ctx context.Context, subscriptionInfo *SubscriptionInformation, respChan chan *gpb.SubscribeResponse) error
} }
type ( type (
......
...@@ -88,6 +88,20 @@ func (_m *Transport) Get(ctx context.Context, params ...string) (interface{}, er ...@@ -88,6 +88,20 @@ func (_m *Transport) Get(ctx context.Context, params ...string) (interface{}, er
return r0, r1 return r0, r1
} }
// NewControlPlaneSubscribe provides a mock function with given fields: ctx, subscriptionInfo, respChan
func (_m *Transport) NewControlPlaneSubscribe(ctx context.Context, subscriptionInfo *transport.SubscriptionInformation, respChan chan *gnmi.SubscribeResponse) error {
ret := _m.Called(ctx, subscriptionInfo, respChan)
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, *transport.SubscriptionInformation, chan *gnmi.SubscribeResponse) error); ok {
r0 = rf(ctx, subscriptionInfo, respChan)
} else {
r0 = ret.Error(0)
}
return r0
}
// ProcessControlPlaneSubscribeResponse provides a mock function with given fields: resp, root, schema // ProcessControlPlaneSubscribeResponse provides a mock function with given fields: resp, root, schema
func (_m *Transport) ProcessControlPlaneSubscribeResponse(resp *gnmi.SubscribeResponse_Update, root interface{}, schema *ytypes.Schema) error { func (_m *Transport) ProcessControlPlaneSubscribeResponse(resp *gnmi.SubscribeResponse_Update, root interface{}, schema *ytypes.Schema) error {
ret := _m.Called(resp, root, schema) ret := _m.Called(resp, root, schema)
......
...@@ -448,6 +448,29 @@ func (g *Gnmi) controlPlaneSubscribe(ctx context.Context, subcribeCallbackFunc f ...@@ -448,6 +448,29 @@ func (g *Gnmi) controlPlaneSubscribe(ctx context.Context, subcribeCallbackFunc f
return gnmi.SubscribeErr(ctx, g.client, opts, g.RespChan) return gnmi.SubscribeErr(ctx, g.client, opts, g.RespChan)
} }
func (g *Gnmi) NewControlPlaneSubscribe(ctx context.Context, subscriptionInfo *tpInterface.SubscriptionInformation, respChan chan *gpb.SubscribeResponse) error {
ctx = gnmi.NewContext(ctx, g.config)
opts, ok := ctx.Value(types.CtxKeyOpts).(*gnmi.SubscribeOptions)
if !ok {
return &customerrs.InvalidTypeAssertionError{
Value: ctx.Value(types.CtxKeyOpts),
Type: &gnmi.SubscribeOptions{},
}
}
go func() {
log.WithFields(log.Fields{
"address": opts.Target,
"paths": opts.Paths,
"mode": opts.Mode,
"interval": opts.SampleInterval,
}).Info("subscribed to gNMI target")
subResp := <-g.RespChan
respChan <- subResp
}()
return gnmi.SubscribeErr(ctx, g.client, opts, g.RespChan)
}
// Close calls GNMI close. // Close calls GNMI close.
func (g *Gnmi) Close() error { func (g *Gnmi) Close() error {
return nil return nil
......
...@@ -19,7 +19,7 @@ import ( ...@@ -19,7 +19,7 @@ import (
const ( const (
subscribeSampleInterval uint64 = 1000000000 // 1 second in nanoseconds subscribeSampleInterval uint64 = 1000000000 // 1 second in nanoseconds
// TODO: These gNMI options are adjusted to arista gNMI fork. Change when switching to native gNMI. // TODO: These gNMI options are adjusted to arista gNMI fork. Change when switching to native gNMI.
gNMISubscribeMode string = "stream" gNMISubscribeMode string = "once"
gNMIStreamMode string = "on_change" gNMIStreamMode string = "on_change"
) )
...@@ -63,7 +63,24 @@ func (n *NetworkElementWatcher) SubToNetworkElements(paths [][]string, opts *gnm ...@@ -63,7 +63,24 @@ func (n *NetworkElementWatcher) SubToNetworkElements(paths [][]string, opts *gnm
} }
for _, pnd := range pnds { for _, pnd := range pnds {
n.subscribeToPndNetworkElements(pnd.ID().String(), pnd, opts) n.newSubscribeToPndNetworkElements(pnd.ID().String(), pnd, opts)
}
}
func (n *NetworkElementWatcher) newSubscribeToPndNetworkElements(pndID string, pnd networkdomain.NetworkDomain, opts *gnmi.SubscribeOptions) {
for _, mne := range pnd.NetworkElements() {
gNMIOptionsCtx := context.Background()
gNMIOptionsCtx = context.WithValue(gNMIOptionsCtx, types.CtxKeyOpts, opts)
respChan := make(chan *gpb.SubscribeResponse)
go mne.Transport().NewControlPlaneSubscribe(gNMIOptionsCtx, &transport.SubscriptionInformation{
PndID: pndID,
NetworkElementID: mne.ID().String(),
NetworkElementName: mne.Name(),
}, respChan)
value := <-respChan
fmt.Println(value.Response)
} }
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment