Newer
Older
package nucleus
import (
"context"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/device"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/networkdomain"
Fabian Seidl
committed
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/transport"
Fabian Seidl
committed
"code.fbi.h-da.de/danet/gosdn/controller/nucleus/errors"
"code.fbi.h-da.de/danet/gosdn/controller/nucleus/types"
"code.fbi.h-da.de/danet/gosdn/controller/store"
"code.fbi.h-da.de/danet/gosdn/forks/goarista/gnmi"
gpb "github.com/openconfig/gnmi/proto/gnmi"
log "github.com/sirupsen/logrus"
)
const (
subscribeSampleInterval uint64 = 1000000000 // 1 second in nanoseconds
Fabian Seidl
committed
// TODO: These gNMI options are adjusted to arista gNMI fork. Change when switching to native gNMI.
gNMISubscribeMode string = "stream"
gNMIStreamMode string = "sample"
)
// DeviceWatcher is a component that subscribes to devices via gNMI from within the controller and handles
// responses by triggering the internal event process.
type DeviceWatcher struct {
pnd networkdomain.NetworkDomain
Fabian Seidl
committed
pndStore networkdomain.PndStore
deviceSubcriptions map[uuid.UUID]*deviceSubscriptionHelper
}
// deviceSubscriptionHelper is used to store information to stop a running subscribe go routine.
type deviceSubscriptionHelper struct {
stopSubscribeCtx context.Context
stopFunc context.CancelFunc
}
// NewDeviceWatcher takes a pndStore to subscribe to device paths.
func NewDeviceWatcher(pnd networkdomain.NetworkDomain, pndStore networkdomain.PndStore) *DeviceWatcher {
pnd: pnd,
Fabian Seidl
committed
pndStore: pndStore,
deviceSubcriptions: make(map[uuid.UUID]*deviceSubscriptionHelper),
// SubToDevices subscribes to every available device in each network domain according to provided SubscribeOptions.
// Paths should be provided in the following format [][]string{{"system", "config", "hostname"}}
// SubscribeOptions can be nil. Use nil for a fixed, pre-defined set of gNMI subscription options (streaming in sample mode each second).
func (d *DeviceWatcher) SubToDevices(paths [][]string, opts *gnmi.SubscribeOptions) {
if opts == nil {
opts = &gnmi.SubscribeOptions{
Mode: gNMISubscribeMode,
StreamMode: gNMIStreamMode,
Paths: paths,
SampleInterval: subscribeSampleInterval,
}
}
pnds, err := d.pndStore.GetAll()
if err != nil {
log.Error(err)
}
for _, pnd := range pnds {
Fabian Seidl
committed
d.subscribeToPndDevices(pnd.ID().String(), pnd.Devices(), opts)
Fabian Seidl
committed
func (d *DeviceWatcher) subscribeToPndDevices(pndID string, devices []device.Device, opts *gnmi.SubscribeOptions) {
for _, device := range devices {
Fabian Seidl
committed
stopContext, cancel := context.WithCancel(context.Background())
d.addToDeviceSubscriptions(subID, &deviceSubscriptionHelper{
stopSubscribeCtx: stopContext,
stopFunc: cancel,
})
go d.callSubscribe(stopContext, pndID, device, opts)
Fabian Seidl
committed
func (d *DeviceWatcher) callSubscribe(stopContext context.Context, pndID string, device device.Device, opts *gnmi.SubscribeOptions) {
gNMIOptionsCtx := context.Background()
gNMIOptionsCtx = context.WithValue(gNMIOptionsCtx, types.CtxKeyOpts, opts)
// SubscriptionInformation contains pnd ID, device ID and name to be used in the internal subscribe to check
if err := device.Transport().ControlPlaneSubscribe(gNMIOptionsCtx, d.handleSubscribeResponse, &transport.SubscriptionInformation{
Fabian Seidl
committed
PndID: pndID,
DeviceID: device.ID().String(),
DeviceName: device.Name(),
StopContext: stopContext,
}); err != nil {
log.Error(err)
}
}
func (d *DeviceWatcher) addToDeviceSubscriptions(subID uuid.UUID, devSub *deviceSubscriptionHelper) {
//TODO: improve handling of subscriptions, like be able to expose to apps so specific subscriptions instead of only all can be stopped in the future
Fabian Seidl
committed
d.deviceSubcriptions[subID] = devSub
}
// StopAndRemoveAllDeviceSubscriptions stops and removes all the available running subscriptions.
func (d *DeviceWatcher) StopAndRemoveAllDeviceSubscriptions() {
for key := range d.deviceSubcriptions {
d.StopAndRemoveDeviceSubscription(key)
Fabian Seidl
committed
// StopAndRemoveDeviceSubscription passes a subscription uuid to stop the running subscription go routing and removes the entry from the map
// of device subscriptions.
func (d *DeviceWatcher) StopAndRemoveDeviceSubscription(subID uuid.UUID) {
Fabian Seidl
committed
delete(d.deviceSubcriptions, subID)
}
// handleSubscribeResponse takes the subscribe response and additional information about the device to distinguish
// from which device a subscribe response was sent including improved error handling
func (d *DeviceWatcher) handleSubscribeResponse(resp *gpb.SubscribeResponse, subscriptionInfo *transport.SubscriptionInformation) {
Fabian Seidl
committed
switch resp := resp.Response.(type) {
case *gpb.SubscribeResponse_Error:
Fabian Seidl
committed
log.Error(&errors.ErrSubscribeResponse{
PndID: subscriptionInfo.PndID,
DeviceID: subscriptionInfo.DeviceID,
DeviceName: subscriptionInfo.DeviceName,
Err: fmt.Sprintf("SubscribeResponse_Error"),
Fabian Seidl
committed
})
case *gpb.SubscribeResponse_SyncResponse:
if !resp.SyncResponse {
Fabian Seidl
committed
log.Error(&errors.ErrSubscribeSyncResponse{
PndID: subscriptionInfo.PndID,
DeviceID: subscriptionInfo.DeviceID,
DeviceName: subscriptionInfo.DeviceName,
Fabian Seidl
committed
})
}
case *gpb.SubscribeResponse_Update:
d.handleSubscribeResponseUpdate(resp, subscriptionInfo)
default:
log.Infof("Invalid SubscribeResponse, %v", resp)
Fabian Seidl
committed
}
Fabian Seidl
committed
func (d *DeviceWatcher) handleSubscribeResponseUpdate(resp *gpb.SubscribeResponse_Update, subscriptionInfo *transport.SubscriptionInformation) {
// Not fully implemented yet, just a simple log to see if this works for now
Fabian Seidl
committed
log.Infof("Pnd: %s, Device: %s, Message: %s", subscriptionInfo.PndID, subscriptionInfo.DeviceID, resp.Update.String())
pndID, err := uuid.Parse(subscriptionInfo.PndID)
if err != nil {
log.Error(err)
}
pnd, err := d.pndStore.Get(store.Query{ID: pndID})
if err != nil {
log.Error(err)
}
device, err := pnd.GetDevice(subscriptionInfo.DeviceID)
if err != nil {
log.Error(err)
}
err = device.Transport().ProcessControlPlaneSubscribeResponse(resp, device.GetModel(), device.SBI().Schema())
if err != nil {
log.Error(err)
} else {
if err := pnd.UpdateONDAfterSubscribeResponse(device); err != nil {
log.Error(err)
}
}