package nucleus import ( "context" "fmt" "code.fbi.h-da.de/danet/gosdn/controller/interfaces/device" "code.fbi.h-da.de/danet/gosdn/controller/interfaces/networkdomain" "code.fbi.h-da.de/danet/gosdn/controller/interfaces/transport" "code.fbi.h-da.de/danet/gosdn/controller/nucleus/errors" "code.fbi.h-da.de/danet/gosdn/controller/nucleus/types" "code.fbi.h-da.de/danet/gosdn/controller/store" "code.fbi.h-da.de/danet/gosdn/forks/goarista/gnmi" "github.com/google/uuid" gpb "github.com/openconfig/gnmi/proto/gnmi" log "github.com/sirupsen/logrus" ) const ( subscribeSampleInterval uint64 = 1000000000 // 1 second in nanoseconds // TODO: These gNMI options are adjusted to arista gNMI fork. Change when switching to native gNMI. gNMISubscribeMode string = "stream" gNMIStreamMode string = "on_change" ) // DeviceWatcher is a component that subscribes to devices via gNMI from within the controller and handles // responses by triggering the internal event process. type DeviceWatcher struct { pndStore networkdomain.PndStore deviceSubcriptions map[uuid.UUID]*deviceSubscriptionHelper } // deviceSubscriptionHelper is used to store information to stop a running subscribe go routine. type deviceSubscriptionHelper struct { stopSubscribeCtx context.Context stopFunc context.CancelFunc } // NewDeviceWatcher takes a pndStore to subscribe to device paths. func NewDeviceWatcher(pndStore networkdomain.PndStore) *DeviceWatcher { return &DeviceWatcher{ pndStore: pndStore, deviceSubcriptions: make(map[uuid.UUID]*deviceSubscriptionHelper), } } // SubToDevices subscribes to every available device in each network domain according to provided SubscribeOptions. // Paths should be provided in the following format [][]string{{"system", "config", "hostname"}} // SubscribeOptions can be nil. Use nil for a fixed, pre-defined set of gNMI subscription options (streaming in sample mode each second). func (d *DeviceWatcher) SubToDevices(paths [][]string, opts *gnmi.SubscribeOptions) { if opts == nil { opts = &gnmi.SubscribeOptions{ Mode: gNMISubscribeMode, StreamMode: gNMIStreamMode, Paths: paths, SampleInterval: subscribeSampleInterval, } } pnds, err := d.pndStore.GetAll() if err != nil { log.Error(err) } for _, pnd := range pnds { d.subscribeToPndDevices(pnd.ID().String(), pnd, opts) } } func (d *DeviceWatcher) subscribeToPndDevices(pndID string, pnd networkdomain.NetworkDomain, opts *gnmi.SubscribeOptions) { for _, loadedDevice := range pnd.Devices() { device, _ := loadedDevice.ConvertToDevice() subID := uuid.New() stopContext, cancel := context.WithCancel(context.Background()) d.addToDeviceSubscriptions(subID, &deviceSubscriptionHelper{ stopSubscribeCtx: stopContext, stopFunc: cancel, }) go d.callSubscribe(stopContext, pndID, device, opts) } } func (d *DeviceWatcher) callSubscribe(stopContext context.Context, pndID string, device device.Device, opts *gnmi.SubscribeOptions) { gNMIOptionsCtx := context.Background() gNMIOptionsCtx = context.WithValue(gNMIOptionsCtx, types.CtxKeyOpts, opts) // SubscriptionInformation contains pnd ID, device ID and name to be used in the internal subscribe to check // from which device a response was sent if err := device.Transport().ControlPlaneSubscribe(gNMIOptionsCtx, d.handleSubscribeResponse, &transport.SubscriptionInformation{ PndID: pndID, DeviceID: device.ID().String(), DeviceName: device.Name(), StopContext: stopContext, }); err != nil { log.Error(err) } } func (d *DeviceWatcher) addToDeviceSubscriptions(subID uuid.UUID, devSub *deviceSubscriptionHelper) { //TODO: improve handling of subscriptions, like be able to expose to apps so specific subscriptions instead of only all can be stopped in the future d.deviceSubcriptions[subID] = devSub } // StopAndRemoveAllDeviceSubscriptions stops and removes all the available running subscriptions. func (d *DeviceWatcher) StopAndRemoveAllDeviceSubscriptions() { for key := range d.deviceSubcriptions { d.StopAndRemoveDeviceSubscription(key) } } // StopAndRemoveDeviceSubscription passes a subscription uuid to stop the running subscription go routing and removes the entry from the map // of device subscriptions. func (d *DeviceWatcher) StopAndRemoveDeviceSubscription(subID uuid.UUID) { d.deviceSubcriptions[subID].stopFunc() delete(d.deviceSubcriptions, subID) } // handleSubscribeResponse takes the subscribe response and additional information about the device to distinguish // from which device a subscribe response was sent including improved error handling. func (d *DeviceWatcher) handleSubscribeResponse(resp *gpb.SubscribeResponse, subscriptionInfo *transport.SubscriptionInformation) { switch resp := resp.Response.(type) { case *gpb.SubscribeResponse_Error: log.Error(&errors.ErrSubscribeResponse{ PndID: subscriptionInfo.PndID, DeviceID: subscriptionInfo.DeviceID, DeviceName: subscriptionInfo.DeviceName, Err: fmt.Sprint("SubscribeResponse_Error"), }) case *gpb.SubscribeResponse_SyncResponse: if !resp.SyncResponse { log.Error(&errors.ErrSubscribeSyncResponse{ PndID: subscriptionInfo.PndID, DeviceID: subscriptionInfo.DeviceID, DeviceName: subscriptionInfo.DeviceName, }) } case *gpb.SubscribeResponse_Update: d.handleSubscribeResponseUpdate(resp, subscriptionInfo) default: log.Infof("Invalid SubscribeResponse, %v", resp) } } func (d *DeviceWatcher) handleSubscribeResponseUpdate(resp *gpb.SubscribeResponse_Update, subscriptionInfo *transport.SubscriptionInformation) { pndID, err := uuid.Parse(subscriptionInfo.PndID) if err != nil { log.Error(err) } pnd, err := d.pndStore.Get(store.Query{ID: pndID}) if err != nil { log.Error(err) } device, err := pnd.GetDevice(subscriptionInfo.DeviceID) if err != nil { log.Error(err) } err = device.Transport().ProcessControlPlaneSubscribeResponse(resp, device.GetModel(), device.SBI().Schema()) if err != nil { log.Error(err) } else { if err := pnd.UpdateDeviceAfterSubscribeResponse(device); err != nil { log.Error(err) } } }