diff --git a/controller/controller.go b/controller/controller.go index b8067db64acae00f467d6863d82020a0276c0291..99a57a18071db6fdac0bcb5644243ee26b9c866c 100644 --- a/controller/controller.go +++ b/controller/controller.go @@ -81,10 +81,10 @@ func initialize() error { } c.deviceWatcher = nucleus.NewDeviceWatcher(c.pndStore) - //TODO: remove these calls after testing! + //TODO: Just an example for testing purposes, remove these calls after complete implementation of subscription handling! c.deviceWatcher.SubToDevices([][]string{{"system", "config", "hostname"}}) go func() { - time.Sleep(15 * time.Second) + time.Sleep(5 * time.Second) c.deviceWatcher.StopAndRemoveAllDeviceSubscriptions() return }() diff --git a/controller/interfaces/transport/transport.go b/controller/interfaces/transport/transport.go index 59e7b9a22c2e2530c2eac61645cffee1cd7bcda1..a20aa2a553a50400403b8802b7e400df7a9f75d5 100644 --- a/controller/interfaces/transport/transport.go +++ b/controller/interfaces/transport/transport.go @@ -15,7 +15,8 @@ type Transport interface { Get(ctx context.Context, params ...string) (interface{}, error) Set(ctx context.Context, payload change.Payload, path string, schema *ytypes.Schema) error Subscribe(ctx context.Context, params ...string) error - SubscribeInternal(ctx context.Context, subscribeCallbackFunc HandleSubscribeResponse, subscriptionInfo *SubscriptionInformation) error + SubscribeInternal(ctx context.Context, subscribeCallbackFunc HandleSubscribeResponse, + subscriptionInfo *SubscriptionInformation) error Type() string ProcessResponse(resp interface{}, root interface{}, models *ytypes.Schema) error } diff --git a/controller/nucleus/deviceWatcher.go b/controller/nucleus/deviceWatcher.go index 2ca6f1911d5b43c64282edb363e42fa92205c480..d27e98cb8e688fcef6c4ddb1a99513024d714b03 100644 --- a/controller/nucleus/deviceWatcher.go +++ b/controller/nucleus/deviceWatcher.go @@ -45,6 +45,7 @@ func NewDeviceWatcher(pndStore networkdomain.PndStore) *DeviceWatcher { // SubToDevices subscribes to every available device in each network domain with fixed gNMI subscription options (streaming in sample mode each second). // Paths should be provided in the following format [][]string{{"system", "config", "hostname"}} func (d *DeviceWatcher) SubToDevices(paths [][]string) { + // TODO: think about passing opts as parameter for more configurability opts := &gnmi.SubscribeOptions{ Mode: gNMISubscribeMode, StreamMode: gNMIStreamMode, @@ -70,7 +71,6 @@ func (d *DeviceWatcher) subscribeToPndDevices(pndID string, devices []device.Dev return } - // TODO: Correctly fill DeviceSubscription with upcoming stopping mechanism requirements. stopContext, cancel := context.WithCancel(context.Background()) d.addToDeviceSubscriptions(subID, &deviceSubscriptionHelper{ stopSubscribeCtx: stopContext, @@ -98,7 +98,6 @@ func (d *DeviceWatcher) callSubscribe(stopContext context.Context, pndID string, func (d *DeviceWatcher) addToDeviceSubscriptions(subID uuid.UUID, devSub *deviceSubscriptionHelper) { //TODO: improve handling of subscriptions, like be able to expose to apps instead of just prionitng in controller! - log.Infof("Added new gNMI Subscription, ID: %v", subID) d.deviceSubcriptions[subID] = devSub } @@ -112,9 +111,7 @@ func (d *DeviceWatcher) StopAndRemoveAllDeviceSubscriptions() { // StopAndRemoveDeviceSubscription passes a subscription uuid to stop the running subscription go routing and removes the entry from the map // of device subscriptions. func (d *DeviceWatcher) StopAndRemoveDeviceSubscription(subID uuid.UUID) { - subscribeHelper := d.deviceSubcriptions[subID] - subscribeHelper.stopFunc() - + d.deviceSubcriptions[subID].stopFunc() delete(d.deviceSubcriptions, subID) } @@ -138,13 +135,13 @@ func handleSubscribeResponse(resp *gpb.SubscribeResponse, subscriptionInfo *tran }) } case *gpb.SubscribeResponse_Update: - //ExampleMessage:update:{timestamp:1657200848272415469 update:{path:{elem:{name:"system"} elem:{name:"config"} elem:{name:"hostname"}} val:{string_val:"ceos0"}}} handleSubscribeResponseUpdate(resp, subscriptionInfo) + default: + log.Infof("Invalid SubscribeResponse, %v", resp) } } func handleSubscribeResponseUpdate(resp *gpb.SubscribeResponse_Update, subscriptionInfo *transport.SubscriptionInformation) { - //ExampleMessage:update:{timestamp:1657200848272415469 update:{path:{elem:{name:"system"} elem:{name:"config"} elem:{name:"hostname"}} val:{string_val:"ceos0"}}} - // Not implemented yet, just a simple log to see if this works for now + // Not fully implemented yet, just a simple log to see if this works for now log.Infof("Pnd: %s, Device: %s, Message: %s", subscriptionInfo.PndID, subscriptionInfo.DeviceID, resp.Update.String()) } diff --git a/controller/nucleus/gnmi_transport.go b/controller/nucleus/gnmi_transport.go index f252b56a261f91f1cbe48b54f3539bcdef729d06..6b8e4a90b13d822468385171342fa1187042871b 100644 --- a/controller/nucleus/gnmi_transport.go +++ b/controller/nucleus/gnmi_transport.go @@ -309,7 +309,7 @@ func (g *Gnmi) getWithRequest(ctx context.Context, req *gpb.GetRequest) (interfa return resp, nil } -// Subscribe calls GNMI subscribe +// subscribe calls GNMI subscribe. func (g *Gnmi) subscribe(ctx context.Context) error { ctx = gnmi.NewContext(ctx, g.config) opts, ok := ctx.Value(types.CtxKeyOpts).(*gnmi.SubscribeOptions) @@ -338,8 +338,10 @@ func (g *Gnmi) subscribe(ctx context.Context) error { return gnmi.SubscribeErr(ctx, g.client, opts, g.RespChan) } -// Subscribe calls GNMI subscribe -func (g *Gnmi) subscribeInternal(ctx context.Context, subcribeCallbackFunc func(*gpb.SubscribeResponse, *tpInterface.SubscriptionInformation), subscriptionInfo *tpInterface.SubscriptionInformation) error { // add callback function instead of chan string as parameter +// subscribeInternal calls gNMI subscribe with a callback for responses and additional device information including +// an option to stop the subscription. +func (g *Gnmi) subscribeInternal(ctx context.Context, subcribeCallbackFunc func(*gpb.SubscribeResponse, + *tpInterface.SubscriptionInformation), subscriptionInfo *tpInterface.SubscriptionInformation) error { ctx = gnmi.NewContext(ctx, g.config) opts, ok := ctx.Value(types.CtxKeyOpts).(*gnmi.SubscribeOptions) if !ok { @@ -368,7 +370,6 @@ func (g *Gnmi) subscribeInternal(ctx context.Context, subcribeCallbackFunc func( if err := subscriptionInfo.StopContext.Err(); err != nil { log.Error(err) } - log.Infof("StopContext for Device: %s was called and Subscription closed.", subscriptionInfo.DeviceName) return default: }