Skip to content
Snippets Groups Projects
networkElementWatcher.go 7.42 KiB
Newer Older
  • Learn to ignore specific revisions
  • package nucleus
    
    import (
    	"context"
    	"fmt"
    
    	"code.fbi.h-da.de/danet/gosdn/controller/customerrs"
    
    	"code.fbi.h-da.de/danet/gosdn/controller/event"
    	eventInterfaces "code.fbi.h-da.de/danet/gosdn/controller/interfaces/event"
    
    	"code.fbi.h-da.de/danet/gosdn/controller/interfaces/networkdomain"
    
    	"code.fbi.h-da.de/danet/gosdn/controller/interfaces/networkelement"
    
    	"code.fbi.h-da.de/danet/gosdn/controller/interfaces/transport"
    	"code.fbi.h-da.de/danet/gosdn/controller/nucleus/types"
    	"code.fbi.h-da.de/danet/gosdn/controller/store"
    	"code.fbi.h-da.de/danet/gosdn/forks/goarista/gnmi"
    	"github.com/google/uuid"
    	gpb "github.com/openconfig/gnmi/proto/gnmi"
    	log "github.com/sirupsen/logrus"
    )
    
    const (
    	subscribeSampleInterval uint64 = 1000000000 // 1 second in nanoseconds
    	// TODO: These gNMI options are adjusted to arista gNMI fork. Change when switching to native gNMI.
    	gNMISubscribeMode string = "stream"
    	gNMIStreamMode    string = "on_change"
    )
    
    
    // NetworkElementWatcher is a component that subscribes to network elements via gNMI from within the controller and handles
    
    // responses by triggering the internal event process.
    
    type NetworkElementWatcher struct {
    
    	pndStore                   networkdomain.PndStore
    	networkelementSubcriptions map[uuid.UUID]*networkelementSubscriptionHelper
    
    	eventService               eventInterfaces.Service
    
    // networkelementSubscriptionHelper is used to store information to stop a running subscribe go routine.
    type networkelementSubscriptionHelper struct {
    
    	stopSubscribeCtx context.Context
    	stopFunc         context.CancelFunc
    }
    
    
    // NewNetworkElementWatcher takes a pndStore to subscribe to network element paths.
    
    func NewNetworkElementWatcher(pndStore networkdomain.PndStore, eventService eventInterfaces.Service) *NetworkElementWatcher {
    
    	return &NetworkElementWatcher{
    
    		pndStore:                   pndStore,
    		networkelementSubcriptions: make(map[uuid.UUID]*networkelementSubscriptionHelper),
    
    // SubToNetworkElements subscribes to every available network element in each network domain according to provided SubscribeOptions.
    
    // Paths should be provided in the following format [][]string{{"system", "config", "hostname"}}
    // SubscribeOptions can be nil. Use nil for a fixed, pre-defined set of gNMI subscription options (streaming in sample mode each second).
    
    func (n *NetworkElementWatcher) SubToNetworkElements(paths [][]string, opts *gnmi.SubscribeOptions) {
    
    	if opts == nil {
    		opts = &gnmi.SubscribeOptions{
    			Mode:           gNMISubscribeMode,
    			StreamMode:     gNMIStreamMode,
    			Paths:          paths,
    			SampleInterval: subscribeSampleInterval,
    		}
    	}
    
    
    	pnds, err := n.pndStore.GetAll()
    
    	if err != nil {
    		log.Error(err)
    	}
    
    	for _, pnd := range pnds {
    
    		n.subscribeToPndNetworkElements(pnd, opts)
    
    func (n *NetworkElementWatcher) subscribeToPndNetworkElements(pnd networkdomain.NetworkDomain, opts *gnmi.SubscribeOptions) {
    
    	for _, mne := range pnd.NetworkElements() {
    
    		subID := uuid.New()
    
    		stopContext, cancel := context.WithCancel(context.Background())
    
    		n.addToNetworkElementSubscriptions(subID, &networkelementSubscriptionHelper{
    
    			stopSubscribeCtx: stopContext,
    			stopFunc:         cancel,
    		})
    
    		go n.callSubscribe(stopContext, pnd.ID().String(), mne, opts)
    
    func (n *NetworkElementWatcher) callSubscribe(stopContext context.Context, pndID string, mne networkelement.NetworkElement, opts *gnmi.SubscribeOptions) {
    
    	gNMIOptionsCtx := context.Background()
    	gNMIOptionsCtx = context.WithValue(gNMIOptionsCtx, types.CtxKeyOpts, opts)
    
    
    	// SubscriptionInformation contains pnd ID, network element ID and name to be used in the internal subscribe to check
    	// from which network element a response was sent
    
    	if err := mne.Transport().ControlPlaneSubscribe(gNMIOptionsCtx, n.handleSubscribeResponse, &transport.SubscriptionInformation{
    		PndID:              pndID,
    		NetworkElementID:   mne.ID().String(),
    		NetworkElementName: mne.Name(),
    		StopContext:        stopContext,
    
    func (n *NetworkElementWatcher) addToNetworkElementSubscriptions(subID uuid.UUID, devSub *networkelementSubscriptionHelper) {
    
    	//TODO: improve handling of subscriptions, like be able to expose to apps so specific subscriptions instead of only all can be stopped in the future
    
    	n.networkelementSubcriptions[subID] = devSub
    
    // StopAndRemoveAllNetworkElementSubscriptions stops and removes all the available running subscriptions.
    func (n *NetworkElementWatcher) StopAndRemoveAllNetworkElementSubscriptions() {
    	for key := range n.networkelementSubcriptions {
    		n.StopAndRemoveNetworkElementSubscription(key)
    
    // StopAndRemoveNetworkElementSubscription passes a subscription uuid to stop the running subscription go routing and removes the entry from the map
    
    // of network element subscriptions.
    
    func (n *NetworkElementWatcher) StopAndRemoveNetworkElementSubscription(subID uuid.UUID) {
    	n.networkelementSubcriptions[subID].stopFunc()
    	delete(n.networkelementSubcriptions, subID)
    
    // handleSubscribeResponse takes the subscribe response and additional information about the network element to distinguish
    // from which network element a subscribe response was sent including improved error handling.
    
    func (n *NetworkElementWatcher) handleSubscribeResponse(resp *gpb.SubscribeResponse, subscriptionInfo *transport.SubscriptionInformation) {
    
    	switch resp := resp.Response.(type) {
    	case *gpb.SubscribeResponse_Error:
    
    			PndID:              subscriptionInfo.PndID,
    			NetworkElementID:   subscriptionInfo.NetworkElementID,
    			NetworkElementName: subscriptionInfo.NetworkElementName,
    			Err:                fmt.Sprint("SubscribeResponse_Error"),
    
    		})
    	case *gpb.SubscribeResponse_SyncResponse:
    		if !resp.SyncResponse {
    
    				PndID:              subscriptionInfo.PndID,
    				NetworkElementID:   subscriptionInfo.NetworkElementID,
    				NetworkElementName: subscriptionInfo.NetworkElementName,
    
    			})
    		}
    	case *gpb.SubscribeResponse_Update:
    
    		n.handleSubscribeResponseUpdate(resp, subscriptionInfo)
    
    	default:
    		log.Infof("Invalid SubscribeResponse, %v", resp)
    	}
    }
    
    
    func (n *NetworkElementWatcher) handleSubscribeResponseUpdate(resp *gpb.SubscribeResponse_Update, subscriptionInfo *transport.SubscriptionInformation) {
    
    	pndID, err := uuid.Parse(subscriptionInfo.PndID)
    	if err != nil {
    		log.Error(err)
    	}
    
    
    	pnd, err := n.pndStore.Get(store.Query{ID: pndID})
    
    	mne, err := pnd.GetNetworkElement(subscriptionInfo.NetworkElementID)
    
    	err = mne.Transport().ProcessControlPlaneSubscribeResponse(resp)
    
    Malte Bauch's avatar
    Malte Bauch committed
    	pathsAndValues := make(map[string]string, len(resp.Update.Update))
    
    
    	for _, update := range resp.Update.Update {
    		pathString := ""
    
    		// go through elem to build full path
    		for _, elem := range update.Path.Elem {
    			// remove unwanted parts of path string example: "name:\"system\"" -> "system"
    			filteredElem := elem.String()[strings.Index(elem.String(), ":\"")+2 : len(elem.String())-1]
    			pathString += filteredElem + "/"
    
    
    		pathsAndValues[pathString] = update.Val.GetStringVal()
    	}
    
    	pubEvent := event.NewMneUpdateEvent(mne.ID(), pathsAndValues)
    	if err := n.eventService.PublishEvent(NetworkElementEventTopic, pubEvent); err != nil {
    		go func() {
    			n.eventService.Reconnect()
    
    			retryErr := n.eventService.RetryPublish(NetworkElementEventTopic, pubEvent)
    			if retryErr != nil {
    				log.Error(retryErr)
    			}
    		}()