Skip to content
Snippets Groups Projects
deviceWatcher.go 6.27 KiB
Newer Older
  • Learn to ignore specific revisions
  • package nucleus
    
    import (
    	"context"
    
    
    	"code.fbi.h-da.de/danet/gosdn/controller/interfaces/device"
    	"code.fbi.h-da.de/danet/gosdn/controller/interfaces/networkdomain"
    
    	"code.fbi.h-da.de/danet/gosdn/controller/interfaces/transport"
    
    	"code.fbi.h-da.de/danet/gosdn/controller/nucleus/errors"
    
    	"code.fbi.h-da.de/danet/gosdn/controller/nucleus/types"
    
    	"code.fbi.h-da.de/danet/gosdn/controller/store"
    
    	"code.fbi.h-da.de/danet/gosdn/forks/goarista/gnmi"
    
    Fabian Seidl's avatar
    Fabian Seidl committed
    	"github.com/google/uuid"
    
    	gpb "github.com/openconfig/gnmi/proto/gnmi"
    	log "github.com/sirupsen/logrus"
    )
    
    const (
    	subscribeSampleInterval uint64 = 1000000000 // 1 second in nanoseconds
    
    	// TODO: These gNMI options are adjusted to arista gNMI fork. Change when switching to native gNMI.
    
    	gNMISubscribeMode string = "stream"
    	gNMIStreamMode    string = "sample"
    )
    
    // DeviceWatcher is a component that subscribes to devices via gNMI from within the controller and handles
    // responses by triggering the internal event process.
    type DeviceWatcher struct {
    
    	pndStore           networkdomain.PndStore
    	deviceSubcriptions map[uuid.UUID]*deviceSubscriptionHelper
    }
    
    // deviceSubscriptionHelper is used to store information to stop a running subscribe go routine.
    type deviceSubscriptionHelper struct {
    	stopSubscribeCtx context.Context
    	stopFunc         context.CancelFunc
    
    }
    
    // NewDeviceWatcher takes a pndStore to subscribe to device paths.
    
    func NewDeviceWatcher(pnd networkdomain.NetworkDomain, pndStore networkdomain.PndStore) *DeviceWatcher {
    
    	return &DeviceWatcher{
    
    		pndStore:           pndStore,
    		deviceSubcriptions: make(map[uuid.UUID]*deviceSubscriptionHelper),
    
    // SubToDevices subscribes to every available device in each network domain according to provided SubscribeOptions.
    
    // Paths should be provided in the following format [][]string{{"system", "config", "hostname"}}
    
    // SubscribeOptions can be nil. Use nil for a fixed, pre-defined set of gNMI subscription options (streaming in sample mode each second).
    func (d *DeviceWatcher) SubToDevices(paths [][]string, opts *gnmi.SubscribeOptions) {
    	if opts == nil {
    		opts = &gnmi.SubscribeOptions{
    			Mode:           gNMISubscribeMode,
    			StreamMode:     gNMIStreamMode,
    			Paths:          paths,
    			SampleInterval: subscribeSampleInterval,
    		}
    
    	}
    
    	pnds, err := d.pndStore.GetAll()
    	if err != nil {
    		log.Error(err)
    	}
    
    	for _, pnd := range pnds {
    
    		d.subscribeToPndDevices(pnd.ID().String(), pnd.Devices(), opts)
    
    func (d *DeviceWatcher) subscribeToPndDevices(pndID string, devices []device.Device, opts *gnmi.SubscribeOptions) {
    	for _, device := range devices {
    
    Fabian Seidl's avatar
    Fabian Seidl committed
    		subID := uuid.New()
    
    
    		stopContext, cancel := context.WithCancel(context.Background())
    		d.addToDeviceSubscriptions(subID, &deviceSubscriptionHelper{
    			stopSubscribeCtx: stopContext,
    			stopFunc:         cancel,
    		})
    		go d.callSubscribe(stopContext, pndID, device, opts)
    
    func (d *DeviceWatcher) callSubscribe(stopContext context.Context, pndID string, device device.Device, opts *gnmi.SubscribeOptions) {
    	gNMIOptionsCtx := context.Background()
    	gNMIOptionsCtx = context.WithValue(gNMIOptionsCtx, types.CtxKeyOpts, opts)
    
    Andre Sterba's avatar
    Andre Sterba committed
    	// SubscriptionInformation contains pnd ID, device ID and name to be used in the internal subscribe to check
    
    Fabian Seidl's avatar
    Fabian Seidl committed
    	// from which device a response was sent
    
    	if err := device.Transport().ControlPlaneSubscribe(gNMIOptionsCtx, d.handleSubscribeResponse, &transport.SubscriptionInformation{
    
    		PndID:       pndID,
    		DeviceID:    device.ID().String(),
    		DeviceName:  device.Name(),
    		StopContext: stopContext,
    	}); err != nil {
    		log.Error(err)
    	}
    }
    
    func (d *DeviceWatcher) addToDeviceSubscriptions(subID uuid.UUID, devSub *deviceSubscriptionHelper) {
    
    	//TODO: improve handling of subscriptions, like be able to expose to apps so specific subscriptions instead of only all can be stopped in the future
    
    	d.deviceSubcriptions[subID] = devSub
    }
    
    // StopAndRemoveAllDeviceSubscriptions stops and removes all the available running subscriptions.
    func (d *DeviceWatcher) StopAndRemoveAllDeviceSubscriptions() {
    	for key := range d.deviceSubcriptions {
    		d.StopAndRemoveDeviceSubscription(key)
    
    // StopAndRemoveDeviceSubscription passes a subscription uuid to stop the running subscription go routing and removes the entry from the map
    // of device subscriptions.
    func (d *DeviceWatcher) StopAndRemoveDeviceSubscription(subID uuid.UUID) {
    
    Fabian Seidl's avatar
    Fabian Seidl committed
    	d.deviceSubcriptions[subID].stopFunc()
    
    // handleSubscribeResponse takes the subscribe response and additional information about the device to distinguish
    // from which device a subscribe response was sent including improved error handling
    
    func (d *DeviceWatcher) handleSubscribeResponse(resp *gpb.SubscribeResponse, subscriptionInfo *transport.SubscriptionInformation) {
    
    	switch resp := resp.Response.(type) {
    	case *gpb.SubscribeResponse_Error:
    
    		log.Error(&errors.ErrSubscribeResponse{
    			PndID:      subscriptionInfo.PndID,
    			DeviceID:   subscriptionInfo.DeviceID,
    			DeviceName: subscriptionInfo.DeviceName,
    
    			Err:        fmt.Sprintf("SubscribeResponse_Error"),
    
    		})
    	case *gpb.SubscribeResponse_SyncResponse:
    		if !resp.SyncResponse {
    
    			log.Error(&errors.ErrSubscribeSyncResponse{
    				PndID:      subscriptionInfo.PndID,
    				DeviceID:   subscriptionInfo.DeviceID,
    				DeviceName: subscriptionInfo.DeviceName,
    
    		d.handleSubscribeResponseUpdate(resp, subscriptionInfo)
    
    Fabian Seidl's avatar
    Fabian Seidl committed
    	default:
    		log.Infof("Invalid SubscribeResponse, %v", resp)
    
    func (d *DeviceWatcher) handleSubscribeResponseUpdate(resp *gpb.SubscribeResponse_Update, subscriptionInfo *transport.SubscriptionInformation) {
    
    Fabian Seidl's avatar
    Fabian Seidl committed
    	// Not fully implemented yet, just a simple log to see if this works for now
    
    	log.Infof("Pnd: %s, Device: %s, Message: %s", subscriptionInfo.PndID, subscriptionInfo.DeviceID, resp.Update.String())
    
    
    	pndID, err := uuid.Parse(subscriptionInfo.PndID)
    	if err != nil {
    		log.Error(err)
    	}
    
    	pnd, err := d.pndStore.Get(store.Query{ID: pndID})
    	if err != nil {
    		log.Error(err)
    	}
    
    	device, err := pnd.GetDevice(subscriptionInfo.DeviceID)
    	if err != nil {
    		log.Error(err)
    	}
    
    	err = device.Transport().ProcessControlPlaneSubscribeResponse(resp, device.GetModel(), device.SBI().Schema())
    	if err != nil {
    		log.Error(err)
    	} else {
    		if err := pnd.UpdateONDAfterSubscribeResponse(device); err != nil {
    			log.Error(err)
    		}
    	}