Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
networkElementWatcher.go 9.17 KiB
package nucleus

import (
	"context"
	"fmt"
	"strconv"

	"code.fbi.h-da.de/danet/gosdn/controller/config"
	"code.fbi.h-da.de/danet/gosdn/controller/customerrs"
	"code.fbi.h-da.de/danet/gosdn/controller/event"
	eventInterfaces "code.fbi.h-da.de/danet/gosdn/controller/interfaces/event"
	"code.fbi.h-da.de/danet/gosdn/controller/interfaces/networkelement"
	"code.fbi.h-da.de/danet/gosdn/controller/interfaces/transport"
	"code.fbi.h-da.de/danet/gosdn/controller/nucleus/types"
	"code.fbi.h-da.de/danet/gosdn/forks/goarista/gnmi"
	"github.com/google/uuid"
	gpb "github.com/openconfig/gnmi/proto/gnmi"
	ygotutil "github.com/openconfig/ygot/util"
	"github.com/openconfig/ygot/ygot"
	log "github.com/sirupsen/logrus"
)

const (
	subscribeSampleInterval uint64 = 1000000000 // 1 second in nanoseconds
	// Note: These gNMI options are adjusted to arista gNMI fork. Change when switching to native gNMI.
	gNMISubscribeMode string = "stream"
	gNMIStreamMode    string = "on_change"
	numberWorkers     int    = 5
	workerQueueSize   int    = 1000
)

// NetworkElementWatcher is a component that subscribes to network elements via gNMI from within the controller and handles
// responses by triggering the internal event process.
type NetworkElementWatcher struct {
	mneService                 networkelement.Service
	networkelementSubcriptions map[uuid.UUID]*networkelementSubscriptionHelper
	eventService               eventInterfaces.Service
}

// networkelementSubscriptionHelper is used to store information to stop a running subscribe go routine.
type networkelementSubscriptionHelper struct {
	stopSubscribeCtx context.Context
	stopFunc         context.CancelFunc
}

// NewNetworkElementWatcher allows to subscribe to network element paths.
func NewNetworkElementWatcher(mneService networkelement.Service, eventService eventInterfaces.Service) *NetworkElementWatcher {
	return &NetworkElementWatcher{
		mneService:                 mneService,
		networkelementSubcriptions: make(map[uuid.UUID]*networkelementSubscriptionHelper),
		eventService:               eventService,
	}
}

// SubscribeToNetworkElements subscribes to every available network element in each network domain according to provided SubscribeOptions.
// SubscribeOptions can be nil. Use nil for a fixed, pre-defined set of gNMI subscription options (streaming in sample mode each second).
func (n *NetworkElementWatcher) SubscribeToNetworkElements(opts *gnmi.SubscribeOptions) {
	if opts == nil {
		opts = &gnmi.SubscribeOptions{
			Mode:           gNMISubscribeMode,
			StreamMode:     gNMIStreamMode,
			SampleInterval: subscribeSampleInterval,
		}
	}

	mnes, err := n.mneService.GetAll()
	if err != nil {
		log.Error(err)
		return
	}

	for _, mne := range mnes {
		n.subscribeToNetworkElement(mne, opts)
	}
}

// SubscribeToNetworkElement subscribes to the provided network element according to provided SubscribeOptions.
// SubscribeOptions can be nil. Use nil for a fixed, pre-defined set of gNMI subscription options (streaming in sample mode each second).
func (n *NetworkElementWatcher) SubscribeToNetworkElement(mne networkelement.NetworkElement, opts *gnmi.SubscribeOptions) {
	if opts == nil {
		opts = &gnmi.SubscribeOptions{
			Mode:           gNMISubscribeMode,
			StreamMode:     gNMIStreamMode,
			SampleInterval: subscribeSampleInterval,
		}
	}
	n.subscribeToNetworkElement(mne, opts)
}

func (n *NetworkElementWatcher) subscribeToNetworkElement(mne networkelement.NetworkElement, opts *gnmi.SubscribeOptions) {
	subID := uuid.New()

	stopContext, cancel := context.WithCancel(context.Background())
	n.addToNetworkElementSubscriptions(subID, &networkelementSubscriptionHelper{
		stopSubscribeCtx: stopContext,
		stopFunc:         cancel,
	})

	opts.Paths = n.mergeGnmiSubscriptions(mne.GetGnmiSubscriptionPaths(), config.GetGnmiSubscriptionPaths())

	go n.callSubscribe(stopContext, mne, opts)
}

// callSubscribe spawns a worker pool to handle gNMI subscription updates for each individual subscription
// and then sets up the gNMI subscription listener.
func (n *NetworkElementWatcher) callSubscribe(stopContext context.Context, mne networkelement.NetworkElement, opts *gnmi.SubscribeOptions) {
	gNMIOptionsCtx := context.Background()
	gNMIOptionsCtx = context.WithValue(gNMIOptionsCtx, types.CtxKeyOpts, opts)

	subInfoChan := make(chan transport.SubscriptionInformation, workerQueueSize)

	for i := 1; i <= numberWorkers; i++ {
		name := "Worker " + strconv.Itoa(i)

		worker := NewSubscriptionQueueWorker(name, n.handleSubscribeResponse)
		go worker.HandleGnmiSubscriptionUpdates(subInfoChan)
	}

	// SubscriptionInformation contains pnd ID, network element ID and name to be used in the internal subscribe to check
	// from which network element a response was sent
	if err := mne.Transport().ControlPlaneSubscribe(gNMIOptionsCtx, transport.SubscriptionInformation{
		PndID:              mne.PndID().String(),
		NetworkElementID:   mne.ID().String(),
		NetworkElementName: mne.Name(),
		StopContext:        stopContext,
	},
		subInfoChan); err != nil {
		log.Error(err)
	}
}

func (n *NetworkElementWatcher) addToNetworkElementSubscriptions(subID uuid.UUID, devSub *networkelementSubscriptionHelper) {
	//TODO: improve handling of subscriptions, like be able to expose to apps so specific subscriptions instead of only all can be stopped in the future
	n.networkelementSubcriptions[subID] = devSub
}

// StopAndRemoveAllNetworkElementSubscriptions stops and removes all the available running subscriptions.
func (n *NetworkElementWatcher) StopAndRemoveAllNetworkElementSubscriptions() {
	for key := range n.networkelementSubcriptions {
		n.StopAndRemoveNetworkElementSubscription(key)
	}
}

// StopAndRemoveNetworkElementSubscription passes a subscription uuid to stop the running subscription go routing and removes the entry from the map
// of network element subscriptions.
func (n *NetworkElementWatcher) StopAndRemoveNetworkElementSubscription(subID uuid.UUID) {
	n.networkelementSubcriptions[subID].stopFunc()
	delete(n.networkelementSubcriptions, subID)
}

// handleSubscribeResponse takes the subscribe response and additional information about the network element to distinguish
// from which network element a subscribe response was sent including improved error handling.
func (n *NetworkElementWatcher) handleSubscribeResponse(subscriptionInfo *transport.SubscriptionInformation, workerName string) {
	if subscriptionInfo.SubResponse == nil {
		// Note: This needs proper error handling, no idea how yet. Simply logging would lead to spam in the console
		// if the target that was subscribed to is not reachable anymore.
		// log.Error("Error: subresponse == nil")
	} else {
		switch resp := subscriptionInfo.SubResponse.Response.(type) {
		case *gpb.SubscribeResponse_Error:
			log.Error(&customerrs.SubscribeResponseError{
				PndID:              subscriptionInfo.PndID,
				NetworkElementID:   subscriptionInfo.NetworkElementID,
				NetworkElementName: subscriptionInfo.NetworkElementName,
				Err:                fmt.Sprint("SubscribeResponse_Error"),
			})
		case *gpb.SubscribeResponse_SyncResponse:
			if !resp.SyncResponse {
				log.Error(&customerrs.SubscribeSyncResponseError{
					PndID:              subscriptionInfo.PndID,
					NetworkElementID:   subscriptionInfo.NetworkElementID,
					NetworkElementName: subscriptionInfo.NetworkElementName,
				})
			}
		case *gpb.SubscribeResponse_Update:
			n.handleSubscribeResponseUpdate(resp, subscriptionInfo)
		default:
			log.Infof("Invalid SubscribeResponse, %v", resp)
		}
	}
}

func (n *NetworkElementWatcher) handleSubscribeResponseUpdate(resp *gpb.SubscribeResponse_Update, subscriptionInfo *transport.SubscriptionInformation) {
	pathsAndValues := make(map[string]string, len(resp.Update.Update))

	for _, update := range resp.Update.Update {
		pathString, err := ygot.PathToString(update.Path)
		if err != nil {
			log.Errorf("Error trying to create a string from path: %v", err)
		}

		pathsAndValues[pathString] = update.Val.GetStringVal()
	}

	mneID, err := uuid.Parse(subscriptionInfo.NetworkElementID)
	if err != nil {
		log.Errorf("Error trying to parse uuid, could not handle subscription response: %v", err)
	}

	pubEvent := event.NewGnmiSubscribeEvent(mneID, pathsAndValues)
	if err := n.eventService.PublishEvent(NetworkElementEventTopic, pubEvent); err != nil {
		go func() {
			n.eventService.Reconnect()

			retryErr := n.eventService.RetryPublish(NetworkElementEventTopic, pubEvent)
			if retryErr != nil {
				log.Error(retryErr)
			}
		}()
	}
}

// mergeGnmiSubscriptions takes paths for gNMI Subscriptions from two sources (the MNE and a config file)
// and merges them to one set of subscription paths without duplicates.
// Paths should be provided in the following format [][]string{{"system", "config", "hostname"}}.
func (n *NetworkElementWatcher) mergeGnmiSubscriptions(gNMISusbcriptionPathsFromMne, gNMISusbcriptionPathsFromConfig [][]string) [][]string {
	// create slice with all elements
	var tempJoinedPaths = append(gNMISusbcriptionPathsFromMne, gNMISusbcriptionPathsFromConfig...)

	// remove duplicates
	inResult := make(map[string]bool)
	var joinedPaths [][]string
	for _, pathSlice := range tempJoinedPaths {
		path := ygotutil.SlicePathToString(pathSlice)
		if _, ok := inResult[path]; !ok {
			inResult[path] = true
			joinedPaths = append(joinedPaths, pathSlice)
		}
	}

	return joinedPaths
}