Skip to content
Snippets Groups Projects

Resolve "Add an option to send gNMI Subscribe requests via SBI"

1 file
+ 11
4
Compare changes
  • Side-by-side
  • Inline
@@ -8,6 +8,7 @@ import (
@@ -8,6 +8,7 @@ import (
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/transport"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/transport"
"code.fbi.h-da.de/danet/gosdn/controller/nucleus/errors"
"code.fbi.h-da.de/danet/gosdn/controller/nucleus/errors"
"code.fbi.h-da.de/danet/gosdn/controller/nucleus/types"
"code.fbi.h-da.de/danet/gosdn/controller/nucleus/types"
 
"code.fbi.h-da.de/danet/gosdn/controller/store"
"code.fbi.h-da.de/danet/gosdn/forks/goarista/gnmi"
"code.fbi.h-da.de/danet/gosdn/forks/goarista/gnmi"
"github.com/google/uuid"
"github.com/google/uuid"
gpb "github.com/openconfig/gnmi/proto/gnmi"
gpb "github.com/openconfig/gnmi/proto/gnmi"
@@ -42,6 +43,10 @@ func NewDeviceWatcher(pndStore networkdomain.PndStore) *DeviceWatcher {
@@ -42,6 +43,10 @@ func NewDeviceWatcher(pndStore networkdomain.PndStore) *DeviceWatcher {
}
}
}
}
 
func (d *DeviceWatcher) GetDeviceWatcher() *DeviceWatcher {
 
return d
 
}
 
// SubToDevices subscribes to every available device in each network domain according to provided SubscribeOptions.
// SubToDevices subscribes to every available device in each network domain according to provided SubscribeOptions.
// Paths should be provided in the following format [][]string{{"system", "config", "hostname"}}
// Paths should be provided in the following format [][]string{{"system", "config", "hostname"}}
// SubscribeOptions can be nil. Use nil for a fixed, pre-defined set of gNMI subscription options (streaming in sample mode each second).
// SubscribeOptions can be nil. Use nil for a fixed, pre-defined set of gNMI subscription options (streaming in sample mode each second).
@@ -84,7 +89,7 @@ func (d *DeviceWatcher) callSubscribe(stopContext context.Context, pndID string,
@@ -84,7 +89,7 @@ func (d *DeviceWatcher) callSubscribe(stopContext context.Context, pndID string,
// SubscriptionInformation conatins pnd ID, device ID and name to be used in the internal subscribe to check
// SubscriptionInformation conatins pnd ID, device ID and name to be used in the internal subscribe to check
// from which device a response was sent
// from which device a response was sent
if err := device.Transport().SubscribeInternal(gNMIOptionsCtx, handleSubscribeResponse, &transport.SubscriptionInformation{
if err := device.Transport().SubscribeInternal(gNMIOptionsCtx, d.handleSubscribeResponse, &transport.SubscriptionInformation{
PndID: pndID,
PndID: pndID,
DeviceID: device.ID().String(),
DeviceID: device.ID().String(),
DeviceName: device.Name(),
DeviceName: device.Name(),
@@ -115,7 +120,7 @@ func (d *DeviceWatcher) StopAndRemoveDeviceSubscription(subID uuid.UUID) {
@@ -115,7 +120,7 @@ func (d *DeviceWatcher) StopAndRemoveDeviceSubscription(subID uuid.UUID) {
// handleSubscribeResponse takes the subscribe response and additional information about the device to distinguish
// handleSubscribeResponse takes the subscribe response and additional information about the device to distinguish
// from which device a subscribe response was sent including improved error handling
// from which device a subscribe response was sent including improved error handling
func handleSubscribeResponse(resp *gpb.SubscribeResponse, subscriptionInfo *transport.SubscriptionInformation) {
func (d *DeviceWatcher) handleSubscribeResponse(resp *gpb.SubscribeResponse, subscriptionInfo *transport.SubscriptionInformation) {
switch resp := resp.Response.(type) {
switch resp := resp.Response.(type) {
case *gpb.SubscribeResponse_Error:
case *gpb.SubscribeResponse_Error:
log.Error(&errors.ErrSubscribeResponse{
log.Error(&errors.ErrSubscribeResponse{
@@ -133,13 +138,15 @@ func handleSubscribeResponse(resp *gpb.SubscribeResponse, subscriptionInfo *tran
@@ -133,13 +138,15 @@ func handleSubscribeResponse(resp *gpb.SubscribeResponse, subscriptionInfo *tran
})
})
}
}
case *gpb.SubscribeResponse_Update:
case *gpb.SubscribeResponse_Update:
handleSubscribeResponseUpdate(resp, subscriptionInfo)
d.handleSubscribeResponseUpdate(resp, subscriptionInfo)
default:
default:
log.Infof("Invalid SubscribeResponse, %v", resp)
log.Infof("Invalid SubscribeResponse, %v", resp)
}
}
}
}
func handleSubscribeResponseUpdate(resp *gpb.SubscribeResponse_Update, subscriptionInfo *transport.SubscriptionInformation) {
func (d *DeviceWatcher) handleSubscribeResponseUpdate(resp *gpb.SubscribeResponse_Update, subscriptionInfo *transport.SubscriptionInformation) {
// Not fully implemented yet, just a simple log to see if this works for now
// Not fully implemented yet, just a simple log to see if this works for now
log.Infof("Pnd: %s, Device: %s, Message: %s", subscriptionInfo.PndID, subscriptionInfo.DeviceID, resp.Update.String())
log.Infof("Pnd: %s, Device: %s, Message: %s", subscriptionInfo.PndID, subscriptionInfo.DeviceID, resp.Update.String())
 
pnd, _ := d.pndStore.Get(store.Query{ID: uuid.MustParse(subscriptionInfo.PndID)})
 
log.Infof("Pnd from storage %v", pnd)
}
}
Loading