Skip to content
Snippets Groups Projects
networkElementWatcher.go 13 KiB
Newer Older
  • Learn to ignore specific revisions
  • 	"code.fbi.h-da.de/danet/gosdn/controller/config"
    
    	"code.fbi.h-da.de/danet/gosdn/controller/customerrs"
    
    	"code.fbi.h-da.de/danet/gosdn/controller/event"
    	eventInterfaces "code.fbi.h-da.de/danet/gosdn/controller/interfaces/event"
    
    	"code.fbi.h-da.de/danet/gosdn/controller/interfaces/networkelement"
    
    	"code.fbi.h-da.de/danet/gosdn/controller/interfaces/transport"
    	"code.fbi.h-da.de/danet/gosdn/controller/nucleus/types"
    
    	"code.fbi.h-da.de/danet/gosdn/controller/store"
    
    	"code.fbi.h-da.de/danet/gosdn/forks/goarista/gnmi"
    	"github.com/google/uuid"
    	gpb "github.com/openconfig/gnmi/proto/gnmi"
    
    	ygotutil "github.com/openconfig/ygot/util"
    
    	log "github.com/sirupsen/logrus"
    )
    
    const (
    	subscribeSampleInterval uint64 = 1000000000 // 1 second in nanoseconds
    
    	// Note: These gNMI options are adjusted to arista gNMI fork. Change when switching to native gNMI.
    
    	gNMISubscribeMode string = "stream"
    	gNMIStreamMode    string = "on_change"
    
    // NetworkElementWatcher is a component that subscribes to network elements via gNMI from within the controller and handles
    
    // responses by triggering the internal event process.
    
    type NetworkElementWatcher struct {
    
    	mneService                       networkelement.Service
    	networkelementSubscriptionsMutex sync.Mutex
    	networkelementSubcriptions       map[uuid.UUID]*networkelementSubscriptionHelper
    	eventService                     eventInterfaces.Service
    
    // networkelementSubscriptionHelper is used to store information to stop a running subscribe go routine.
    type networkelementSubscriptionHelper struct {
    
    	SubID            string
    	MneID            string
    	MneName          string
    	PndID            string
    
    	stopSubscribeCtx context.Context
    	stopFunc         context.CancelFunc
    
    	Opts             *gnmi.SubscribeOptions
    
    // NewNetworkElementWatcher allows to subscribe to network element paths.
    func NewNetworkElementWatcher(mneService networkelement.Service, eventService eventInterfaces.Service) *NetworkElementWatcher {
    
    	return &NetworkElementWatcher{
    
    		mneService:                 mneService,
    
    		networkelementSubcriptions: make(map[uuid.UUID]*networkelementSubscriptionHelper),
    
    // SubscribeToNetworkElements subscribes to every available network element in each network domain according to provided SubscribeOptions.
    
    // SubscribeOptions can be nil. Use nil for a fixed, pre-defined set of gNMI subscription options (streaming in sample mode each second).
    
    func (n *NetworkElementWatcher) SubscribeToNetworkElements(subScriptionInfos []*networkelementSubscriptionHelper) {
    	var tmpOpts *gnmi.SubscribeOptions
    	if len(subScriptionInfos) == 0 {
    		tmpOpts = &gnmi.SubscribeOptions{
    
    			Mode:           gNMISubscribeMode,
    			StreamMode:     gNMIStreamMode,
    			SampleInterval: subscribeSampleInterval,
    		}
    	}
    
    
    	mnes, err := n.mneService.GetAll()
    
    	for _, mne := range mnes {
    
    		if len(subScriptionInfos) > 0 {
    			tmpOpts, err = getOptionsForMne(mne.ID().String(), subScriptionInfos)
    			if err != nil {
    				log.Infof("Couldn't find options for mne %s, reason: %v. \n Skipping subscription.", mne.Name(), err)
    				continue
    			}
    		}
    
    		n.subscribeToNetworkElement(mne, tmpOpts)
    
    // SubscribeToNetworkElement subscribes to the provided network element according to provided SubscribeOptions.
    // SubscribeOptions can be nil. Use nil for a fixed, pre-defined set of gNMI subscription options (streaming in sample mode each second).
    
    func (n *NetworkElementWatcher) SubscribeToNetworkElement(mne networkelement.NetworkElement, opts *gnmi.SubscribeOptions) {
    
    	if opts == nil {
    		opts = &gnmi.SubscribeOptions{
    			Mode:           gNMISubscribeMode,
    			StreamMode:     gNMIStreamMode,
    			SampleInterval: subscribeSampleInterval,
    		}
    	}
    	n.subscribeToNetworkElement(mne, opts)
    }
    
    
    // SubscribeToNetworkElementWithID subscribes to the network element matching the provided ID according to provided SubscribeOptions.
    // SubscribeOptions can be nil. Use nil for a fixed, pre-defined set of gNMI subscription options (streaming in sample mode each second).
    func (n *NetworkElementWatcher) SubscribeToNetworkElementWithID(mneID uuid.UUID, opts *gnmi.SubscribeOptions) error {
    	if opts == nil {
    		opts = &gnmi.SubscribeOptions{
    			Mode:           gNMISubscribeMode,
    			StreamMode:     gNMIStreamMode,
    			SampleInterval: subscribeSampleInterval,
    		}
    	}
    
    	mne, err := n.mneService.Get(store.Query{ID: mneID})
    	if err != nil {
    		log.Error(err)
    		return err
    	}
    
    	n.subscribeToNetworkElement(mne, opts)
    	return nil
    }
    
    
    func (n *NetworkElementWatcher) subscribeToNetworkElement(mne networkelement.NetworkElement, opts *gnmi.SubscribeOptions) {
    	subID := uuid.New()
    
    	opts.Paths = n.mergeGnmiSubscriptions(mne.GetGnmiSubscriptionPaths(), config.GetGnmiSubscriptionPaths())
    
    
    	stopContext, cancel := context.WithCancel(context.Background())
    	n.addToNetworkElementSubscriptions(subID, &networkelementSubscriptionHelper{
    		stopSubscribeCtx: stopContext,
    		stopFunc:         cancel,
    
    		SubID:            subID.String(),
    		MneID:            mne.ID().String(),
    		MneName:          mne.Name(),
    		PndID:            mne.PndID().String(),
    		Opts:             opts,
    
    	})
    
    	go n.callSubscribe(stopContext, mne, opts)
    
    // callSubscribe spawns a worker pool to handle gNMI subscription updates for each individual subscription
    // and then sets up the gNMI subscription listener.
    
    func (n *NetworkElementWatcher) callSubscribe(stopContext context.Context, mne networkelement.NetworkElement, opts *gnmi.SubscribeOptions) {
    
    	gNMIOptionsCtx := context.Background()
    	gNMIOptionsCtx = context.WithValue(gNMIOptionsCtx, types.CtxKeyOpts, opts)
    
    
    	subInfoChan := make(chan transport.SubscriptionInformation, workerQueueSize)
    
    	for i := 1; i <= numberWorkers; i++ {
    		name := "Worker " + strconv.Itoa(i)
    
    		worker := NewSubscriptionQueueWorker(name, n.handleSubscribeResponse)
    		go worker.HandleGnmiSubscriptionUpdates(subInfoChan)
    	}
    
    
    	// SubscriptionInformation contains pnd ID, network element ID and name to be used in the internal subscribe to check
    	// from which network element a response was sent
    
    	if err := mne.Transport().ControlPlaneSubscribe(gNMIOptionsCtx, transport.SubscriptionInformation{
    
    		PndID:              mne.PndID().String(),
    
    		NetworkElementID:   mne.ID().String(),
    		NetworkElementName: mne.Name(),
    		StopContext:        stopContext,
    
    func (n *NetworkElementWatcher) addToNetworkElementSubscriptions(subID uuid.UUID, devSub *networkelementSubscriptionHelper) {
    
    	n.networkelementSubscriptionsMutex.Lock()
    	defer n.networkelementSubscriptionsMutex.Unlock()
    
    
    	n.networkelementSubcriptions[subID] = devSub
    
    // StopAndRemoveAllNetworkElementSubscriptions stops and removes all the available running subscriptions.
    func (n *NetworkElementWatcher) StopAndRemoveAllNetworkElementSubscriptions() {
    	for key := range n.networkelementSubcriptions {
    		n.StopAndRemoveNetworkElementSubscription(key)
    
    // StopAndRemoveNetworkElementSubscription passes a subscription uuid to stop the running subscription go routing and removes the entry from the map
    
    // of network element subscriptions.
    
    func (n *NetworkElementWatcher) StopAndRemoveNetworkElementSubscription(subID uuid.UUID) {
    
    	n.networkelementSubscriptionsMutex.Lock()
    	defer n.networkelementSubscriptionsMutex.Unlock()
    
    
    	n.networkelementSubcriptions[subID].stopFunc()
    	delete(n.networkelementSubcriptions, subID)
    
    // handleSubscribeResponse takes the subscribe response and additional information about the network element to distinguish
    // from which network element a subscribe response was sent including improved error handling.
    
    func (n *NetworkElementWatcher) handleSubscribeResponse(subscriptionInfo *transport.SubscriptionInformation, workerName string) {
    
    	log.Debugf("Received Subscribe response: MNE ID: %s, MNE Name: %s, SubResponse: %v", subscriptionInfo.NetworkElementID, subscriptionInfo.NetworkElementName, subscriptionInfo.SubResponse)
    
    
    	if subscriptionInfo.SubResponse == nil {
    		// Note: This needs proper error handling, no idea how yet. Simply logging would lead to spam in the console
    		// if the target that was subscribed to is not reachable anymore.
    		// log.Error("Error: subresponse == nil")
    	} else {
    		switch resp := subscriptionInfo.SubResponse.Response.(type) {
    		case *gpb.SubscribeResponse_Error:
    			log.Error(&customerrs.SubscribeResponseError{
    
    				PndID:              subscriptionInfo.PndID,
    				NetworkElementID:   subscriptionInfo.NetworkElementID,
    				NetworkElementName: subscriptionInfo.NetworkElementName,
    
    		case *gpb.SubscribeResponse_SyncResponse:
    			if !resp.SyncResponse {
    				log.Error(&customerrs.SubscribeSyncResponseError{
    					PndID:              subscriptionInfo.PndID,
    					NetworkElementID:   subscriptionInfo.NetworkElementID,
    					NetworkElementName: subscriptionInfo.NetworkElementName,
    				})
    			}
    		case *gpb.SubscribeResponse_Update:
    			n.handleSubscribeResponseUpdate(resp, subscriptionInfo)
    		default:
    			log.Infof("Invalid SubscribeResponse, %v", resp)
    
    func (n *NetworkElementWatcher) handleSubscribeResponseUpdate(resp *gpb.SubscribeResponse_Update, subscriptionInfo *transport.SubscriptionInformation) {
    
    Malte Bauch's avatar
    Malte Bauch committed
    	pathsAndValues := make(map[string]string, len(resp.Update.Update))
    
    	if resp.Update == nil || len(resp.Update.Update) == 0 {
    		log.Debugf("handleSubscribeResponseUpdate empty update or updates; Update: %v, InnerUpdates: %v", resp.Update, resp.Update.Update)
    		return
    	}
    
    
    	for _, update := range resp.Update.Update {
    
    		pathString, err := ygot.PathToString(update.Path)
    		if err != nil {
    			log.Errorf("Error trying to create a string from path: %v", err)
    
    		switch v := update.GetVal().GetValue().(type) {
    		case *gpb.TypedValue_StringVal:
    			pathsAndValues[pathString] = update.Val.GetStringVal()
    		case *gpb.TypedValue_JsonIetfVal:
    			pathsAndValues[pathString] = string(update.GetVal().GetJsonIetfVal())
    		case *gpb.TypedValue_UintVal:
    			pathsAndValues[pathString] = fmt.Sprintf("%d", update.Val.GetUintVal())
    		default:
    			log.Errorf("The given value of type: %T, provided by a SubResponse from network element with ID: %s is not supported", v, subscriptionInfo.NetworkElementID)
    			return
    		}
    
    	mneID, err := uuid.Parse(subscriptionInfo.NetworkElementID)
    	if err != nil {
    		log.Errorf("Error trying to parse uuid, could not handle subscription response: %v", err)
    	}
    
    
    	pubEvent := event.NewGnmiSubscribeEvent(mneID, pathsAndValues)
    
    	if err := n.eventService.PublishEvent(NetworkElementEventTopic, pubEvent); err != nil {
    		go func() {
    			n.eventService.Reconnect()
    
    			retryErr := n.eventService.RetryPublish(NetworkElementEventTopic, pubEvent)
    			if retryErr != nil {
    				log.Error(retryErr)
    			}
    		}()
    
    
    // mergeGnmiSubscriptions takes paths for gNMI Subscriptions from two sources (the MNE and a config file)
    // and merges them to one set of subscription paths without duplicates.
    // Paths should be provided in the following format [][]string{{"system", "config", "hostname"}}.
    func (n *NetworkElementWatcher) mergeGnmiSubscriptions(gNMISusbcriptionPathsFromMne, gNMISusbcriptionPathsFromConfig [][]string) [][]string {
    	// create slice with all elements
    	var tempJoinedPaths = append(gNMISusbcriptionPathsFromMne, gNMISusbcriptionPathsFromConfig...)
    
    	// remove duplicates
    	inResult := make(map[string]bool)
    	var joinedPaths [][]string
    	for _, pathSlice := range tempJoinedPaths {
    		path := ygotutil.SlicePathToString(pathSlice)
    		if _, ok := inResult[path]; !ok {
    			inResult[path] = true
    			joinedPaths = append(joinedPaths, pathSlice)
    		}
    	}
    
    	return joinedPaths
    }
    
    
    // GetAllSubscriptionInformations returns the information of all running sunscriptions.
    func (n *NetworkElementWatcher) GetAllSubscriptionInformations() []*networkelementSubscriptionHelper {
    	var information []*networkelementSubscriptionHelper
    	n.networkelementSubscriptionsMutex.Lock()
    	defer n.networkelementSubscriptionsMutex.Unlock()
    
    	for _, info := range n.networkelementSubcriptions {
    		information = append(information, info)
    	}
    
    	return information
    }
    
    // GetSubscriptionInformations returns the information for one specific subscription referenced by its ID.
    func (n *NetworkElementWatcher) GetSubscriptionInformations(subID uuid.UUID) (*networkelementSubscriptionHelper, error) {
    	n.networkelementSubscriptionsMutex.Lock()
    	defer n.networkelementSubscriptionsMutex.Unlock()
    
    	information, ok := n.networkelementSubcriptions[subID]
    	if !ok {
    		return nil, errors.New(fmt.Sprintf("Couldn't retrieve information for subscription with ID: %s", subID.String()))
    	}
    
    	return information, nil
    }
    
    // getOptionsForMne checks if there is a match of the mneID with all the mne IDs provided in the slice of subscription
    // information and returns the related subscribe options or an error.
    func getOptionsForMne(mneID string, subInfos []*networkelementSubscriptionHelper) (*gnmi.SubscribeOptions, error) {
    	for _, subInfo := range subInfos {
    		if subInfo.MneID == mneID {
    			return subInfo.Opts, nil
    		}
    	}
    
    	return nil, fmt.Errorf("error: did not find subscription infos matching to provided mne ID: %s", mneID)
    }