Skip to content
Snippets Groups Projects
Commit 8badc591 authored by Fabian Seidl's avatar Fabian Seidl
Browse files

code cleanup

parent 61ad782c
No related branches found
No related tags found
3 merge requests!376Add additional example application hostname-checker,!343Add basic application framework and example application to show interaction between events an NBI,!342Resolve "Add an option to send gNMI Subscribe requests via SBI"
Pipeline #108332 passed
This commit is part of merge request !342. Comments created here will be created in the context of that merge request.
......@@ -81,10 +81,10 @@ func initialize() error {
}
c.deviceWatcher = nucleus.NewDeviceWatcher(c.pndStore)
//TODO: remove these calls after testing!
//TODO: Just an example for testing purposes, remove these calls after complete implementation of subscription handling!
c.deviceWatcher.SubToDevices([][]string{{"system", "config", "hostname"}})
go func() {
time.Sleep(15 * time.Second)
time.Sleep(5 * time.Second)
c.deviceWatcher.StopAndRemoveAllDeviceSubscriptions()
return
}()
......
......@@ -15,7 +15,8 @@ type Transport interface {
Get(ctx context.Context, params ...string) (interface{}, error)
Set(ctx context.Context, payload change.Payload, path string, schema *ytypes.Schema) error
Subscribe(ctx context.Context, params ...string) error
SubscribeInternal(ctx context.Context, subscribeCallbackFunc HandleSubscribeResponse, subscriptionInfo *SubscriptionInformation) error
SubscribeInternal(ctx context.Context, subscribeCallbackFunc HandleSubscribeResponse,
subscriptionInfo *SubscriptionInformation) error
Type() string
ProcessResponse(resp interface{}, root interface{}, models *ytypes.Schema) error
}
......
......@@ -45,6 +45,7 @@ func NewDeviceWatcher(pndStore networkdomain.PndStore) *DeviceWatcher {
// SubToDevices subscribes to every available device in each network domain with fixed gNMI subscription options (streaming in sample mode each second).
// Paths should be provided in the following format [][]string{{"system", "config", "hostname"}}
func (d *DeviceWatcher) SubToDevices(paths [][]string) {
// TODO: think about passing opts as parameter for more configurability
opts := &gnmi.SubscribeOptions{
Mode: gNMISubscribeMode,
StreamMode: gNMIStreamMode,
......@@ -70,7 +71,6 @@ func (d *DeviceWatcher) subscribeToPndDevices(pndID string, devices []device.Dev
return
}
// TODO: Correctly fill DeviceSubscription with upcoming stopping mechanism requirements.
stopContext, cancel := context.WithCancel(context.Background())
d.addToDeviceSubscriptions(subID, &deviceSubscriptionHelper{
stopSubscribeCtx: stopContext,
......@@ -98,7 +98,6 @@ func (d *DeviceWatcher) callSubscribe(stopContext context.Context, pndID string,
func (d *DeviceWatcher) addToDeviceSubscriptions(subID uuid.UUID, devSub *deviceSubscriptionHelper) {
//TODO: improve handling of subscriptions, like be able to expose to apps instead of just prionitng in controller!
log.Infof("Added new gNMI Subscription, ID: %v", subID)
d.deviceSubcriptions[subID] = devSub
}
......@@ -112,9 +111,7 @@ func (d *DeviceWatcher) StopAndRemoveAllDeviceSubscriptions() {
// StopAndRemoveDeviceSubscription passes a subscription uuid to stop the running subscription go routing and removes the entry from the map
// of device subscriptions.
func (d *DeviceWatcher) StopAndRemoveDeviceSubscription(subID uuid.UUID) {
subscribeHelper := d.deviceSubcriptions[subID]
subscribeHelper.stopFunc()
d.deviceSubcriptions[subID].stopFunc()
delete(d.deviceSubcriptions, subID)
}
......@@ -138,13 +135,13 @@ func handleSubscribeResponse(resp *gpb.SubscribeResponse, subscriptionInfo *tran
})
}
case *gpb.SubscribeResponse_Update:
//ExampleMessage:update:{timestamp:1657200848272415469 update:{path:{elem:{name:"system"} elem:{name:"config"} elem:{name:"hostname"}} val:{string_val:"ceos0"}}}
handleSubscribeResponseUpdate(resp, subscriptionInfo)
default:
log.Infof("Invalid SubscribeResponse, %v", resp)
}
}
func handleSubscribeResponseUpdate(resp *gpb.SubscribeResponse_Update, subscriptionInfo *transport.SubscriptionInformation) {
//ExampleMessage:update:{timestamp:1657200848272415469 update:{path:{elem:{name:"system"} elem:{name:"config"} elem:{name:"hostname"}} val:{string_val:"ceos0"}}}
// Not implemented yet, just a simple log to see if this works for now
// Not fully implemented yet, just a simple log to see if this works for now
log.Infof("Pnd: %s, Device: %s, Message: %s", subscriptionInfo.PndID, subscriptionInfo.DeviceID, resp.Update.String())
}
......@@ -309,7 +309,7 @@ func (g *Gnmi) getWithRequest(ctx context.Context, req *gpb.GetRequest) (interfa
return resp, nil
}
// Subscribe calls GNMI subscribe
// subscribe calls GNMI subscribe.
func (g *Gnmi) subscribe(ctx context.Context) error {
ctx = gnmi.NewContext(ctx, g.config)
opts, ok := ctx.Value(types.CtxKeyOpts).(*gnmi.SubscribeOptions)
......@@ -338,8 +338,10 @@ func (g *Gnmi) subscribe(ctx context.Context) error {
return gnmi.SubscribeErr(ctx, g.client, opts, g.RespChan)
}
// Subscribe calls GNMI subscribe
func (g *Gnmi) subscribeInternal(ctx context.Context, subcribeCallbackFunc func(*gpb.SubscribeResponse, *tpInterface.SubscriptionInformation), subscriptionInfo *tpInterface.SubscriptionInformation) error { // add callback function instead of chan string as parameter
// subscribeInternal calls gNMI subscribe with a callback for responses and additional device information including
// an option to stop the subscription.
func (g *Gnmi) subscribeInternal(ctx context.Context, subcribeCallbackFunc func(*gpb.SubscribeResponse,
*tpInterface.SubscriptionInformation), subscriptionInfo *tpInterface.SubscriptionInformation) error {
ctx = gnmi.NewContext(ctx, g.config)
opts, ok := ctx.Value(types.CtxKeyOpts).(*gnmi.SubscribeOptions)
if !ok {
......@@ -368,7 +370,6 @@ func (g *Gnmi) subscribeInternal(ctx context.Context, subcribeCallbackFunc func(
if err := subscriptionInfo.StopContext.Err(); err != nil {
log.Error(err)
}
log.Infof("StopContext for Device: %s was called and Subscription closed.", subscriptionInfo.DeviceName)
return
default:
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment