-
Fabian Seidl authored
See merge request !1218
Fabian Seidl authoredSee merge request !1218
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
networkElementWatcher.go 13.02 KiB
package nucleus
import (
"context"
"errors"
"fmt"
"strconv"
"sync"
"code.fbi.h-da.de/danet/gosdn/controller/config"
"code.fbi.h-da.de/danet/gosdn/controller/customerrs"
"code.fbi.h-da.de/danet/gosdn/controller/event"
eventInterfaces "code.fbi.h-da.de/danet/gosdn/controller/interfaces/event"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/networkelement"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/transport"
"code.fbi.h-da.de/danet/gosdn/controller/nucleus/types"
"code.fbi.h-da.de/danet/gosdn/controller/store"
"code.fbi.h-da.de/danet/gosdn/forks/goarista/gnmi"
"github.com/google/uuid"
gpb "github.com/openconfig/gnmi/proto/gnmi"
ygotutil "github.com/openconfig/ygot/util"
"github.com/openconfig/ygot/ygot"
log "github.com/sirupsen/logrus"
)
const (
subscribeSampleInterval uint64 = 1000000000 // 1 second in nanoseconds
// Note: These gNMI options are adjusted to arista gNMI fork. Change when switching to native gNMI.
gNMISubscribeMode string = "stream"
gNMIStreamMode string = "on_change"
numberWorkers int = 5
workerQueueSize int = 1000
)
// NetworkElementWatcher is a component that subscribes to network elements via gNMI from within the controller and handles
// responses by triggering the internal event process.
type NetworkElementWatcher struct {
mneService networkelement.Service
networkelementSubscriptionsMutex sync.Mutex
networkelementSubcriptions map[uuid.UUID]*networkelementSubscriptionHelper
eventService eventInterfaces.Service
}
// networkelementSubscriptionHelper is used to store information to stop a running subscribe go routine.
type networkelementSubscriptionHelper struct {
SubID string
MneID string
MneName string
PndID string
stopSubscribeCtx context.Context
stopFunc context.CancelFunc
Opts *gnmi.SubscribeOptions
}
// NewNetworkElementWatcher allows to subscribe to network element paths.
func NewNetworkElementWatcher(mneService networkelement.Service, eventService eventInterfaces.Service) *NetworkElementWatcher {
return &NetworkElementWatcher{
mneService: mneService,
networkelementSubcriptions: make(map[uuid.UUID]*networkelementSubscriptionHelper),
eventService: eventService,
}
}
// SubscribeToNetworkElements subscribes to every available network element in each network domain according to provided SubscribeOptions.
// SubscribeOptions can be nil. Use nil for a fixed, pre-defined set of gNMI subscription options (streaming in sample mode each second).
func (n *NetworkElementWatcher) SubscribeToNetworkElements(subScriptionInfos []*networkelementSubscriptionHelper) {
var tmpOpts *gnmi.SubscribeOptions
if len(subScriptionInfos) == 0 {
tmpOpts = &gnmi.SubscribeOptions{
Mode: gNMISubscribeMode,
StreamMode: gNMIStreamMode,
SampleInterval: subscribeSampleInterval,
}
}
mnes, err := n.mneService.GetAll()
if err != nil {
log.Error(err)
return
}
for _, mne := range mnes {
if len(subScriptionInfos) > 0 {
tmpOpts, err = getOptionsForMne(mne.ID().String(), subScriptionInfos)
if err != nil {
log.Infof("Couldn't find options for mne %s, reason: %v. \n Skipping subscription.", mne.Name(), err)
continue
}
}
n.subscribeToNetworkElement(mne, tmpOpts)
}
}
// SubscribeToNetworkElement subscribes to the provided network element according to provided SubscribeOptions.
// SubscribeOptions can be nil. Use nil for a fixed, pre-defined set of gNMI subscription options (streaming in sample mode each second).
func (n *NetworkElementWatcher) SubscribeToNetworkElement(mne networkelement.NetworkElement, opts *gnmi.SubscribeOptions) {
if opts == nil {
opts = &gnmi.SubscribeOptions{
Mode: gNMISubscribeMode,
StreamMode: gNMIStreamMode,
SampleInterval: subscribeSampleInterval,
}
}
n.subscribeToNetworkElement(mne, opts)
}
// SubscribeToNetworkElementWithID subscribes to the network element matching the provided ID according to provided SubscribeOptions.
// SubscribeOptions can be nil. Use nil for a fixed, pre-defined set of gNMI subscription options (streaming in sample mode each second).
func (n *NetworkElementWatcher) SubscribeToNetworkElementWithID(mneID uuid.UUID, opts *gnmi.SubscribeOptions) error {
if opts == nil {
opts = &gnmi.SubscribeOptions{
Mode: gNMISubscribeMode,
StreamMode: gNMIStreamMode,
SampleInterval: subscribeSampleInterval,
}
}
mne, err := n.mneService.Get(store.Query{ID: mneID})
if err != nil {
log.Error(err)
return err
}
n.subscribeToNetworkElement(mne, opts)
return nil
}
func (n *NetworkElementWatcher) subscribeToNetworkElement(mne networkelement.NetworkElement, opts *gnmi.SubscribeOptions) {
subID := uuid.New()
opts.Paths = n.mergeGnmiSubscriptions(mne.GetGnmiSubscriptionPaths(), config.GetGnmiSubscriptionPaths())
stopContext, cancel := context.WithCancel(context.Background())
n.addToNetworkElementSubscriptions(subID, &networkelementSubscriptionHelper{
stopSubscribeCtx: stopContext,
stopFunc: cancel,
SubID: subID.String(),
MneID: mne.ID().String(),
MneName: mne.Name(),
PndID: mne.PndID().String(),
Opts: opts,
})
go n.callSubscribe(stopContext, mne, opts)
}
// callSubscribe spawns a worker pool to handle gNMI subscription updates for each individual subscription
// and then sets up the gNMI subscription listener.
func (n *NetworkElementWatcher) callSubscribe(stopContext context.Context, mne networkelement.NetworkElement, opts *gnmi.SubscribeOptions) {
gNMIOptionsCtx := context.Background()
gNMIOptionsCtx = context.WithValue(gNMIOptionsCtx, types.CtxKeyOpts, opts)
subInfoChan := make(chan transport.SubscriptionInformation, workerQueueSize)
for i := 1; i <= numberWorkers; i++ {
name := "Worker " + strconv.Itoa(i)
worker := NewSubscriptionQueueWorker(name, n.handleSubscribeResponse)
go worker.HandleGnmiSubscriptionUpdates(subInfoChan)
}
// SubscriptionInformation contains pnd ID, network element ID and name to be used in the internal subscribe to check
// from which network element a response was sent
if err := mne.Transport().ControlPlaneSubscribe(gNMIOptionsCtx, transport.SubscriptionInformation{
PndID: mne.PndID().String(),
NetworkElementID: mne.ID().String(),
NetworkElementName: mne.Name(),
StopContext: stopContext,
},
subInfoChan); err != nil {
log.Error(err)
}
}
func (n *NetworkElementWatcher) addToNetworkElementSubscriptions(subID uuid.UUID, devSub *networkelementSubscriptionHelper) {
n.networkelementSubscriptionsMutex.Lock()
defer n.networkelementSubscriptionsMutex.Unlock()
n.networkelementSubcriptions[subID] = devSub
}
// StopAndRemoveAllNetworkElementSubscriptions stops and removes all the available running subscriptions.
func (n *NetworkElementWatcher) StopAndRemoveAllNetworkElementSubscriptions() {
for key := range n.networkelementSubcriptions {
n.StopAndRemoveNetworkElementSubscription(key)
}
}
// StopAndRemoveNetworkElementSubscription passes a subscription uuid to stop the running subscription go routing and removes the entry from the map
// of network element subscriptions.
func (n *NetworkElementWatcher) StopAndRemoveNetworkElementSubscription(subID uuid.UUID) {
n.networkelementSubscriptionsMutex.Lock()
defer n.networkelementSubscriptionsMutex.Unlock()
n.networkelementSubcriptions[subID].stopFunc()
delete(n.networkelementSubcriptions, subID)
}
// handleSubscribeResponse takes the subscribe response and additional information about the network element to distinguish
// from which network element a subscribe response was sent including improved error handling.
func (n *NetworkElementWatcher) handleSubscribeResponse(subscriptionInfo *transport.SubscriptionInformation, workerName string) {
log.Debugf("Received Subscribe response: MNE ID: %s, MNE Name: %s, SubResponse: %v", subscriptionInfo.NetworkElementID, subscriptionInfo.NetworkElementName, subscriptionInfo.SubResponse)
if subscriptionInfo.SubResponse == nil {
// Note: This needs proper error handling, no idea how yet. Simply logging would lead to spam in the console
// if the target that was subscribed to is not reachable anymore.
// log.Error("Error: subresponse == nil")
} else {
switch resp := subscriptionInfo.SubResponse.Response.(type) {
case *gpb.SubscribeResponse_Error:
log.Error(&customerrs.SubscribeResponseError{
PndID: subscriptionInfo.PndID,
NetworkElementID: subscriptionInfo.NetworkElementID,
NetworkElementName: subscriptionInfo.NetworkElementName,
Err: fmt.Sprint("SubscribeResponse_Error"),
})
case *gpb.SubscribeResponse_SyncResponse:
if !resp.SyncResponse {
log.Error(&customerrs.SubscribeSyncResponseError{
PndID: subscriptionInfo.PndID,
NetworkElementID: subscriptionInfo.NetworkElementID,
NetworkElementName: subscriptionInfo.NetworkElementName,
})
}
case *gpb.SubscribeResponse_Update:
n.handleSubscribeResponseUpdate(resp, subscriptionInfo)
default:
log.Infof("Invalid SubscribeResponse, %v", resp)
}
}
}
func (n *NetworkElementWatcher) handleSubscribeResponseUpdate(resp *gpb.SubscribeResponse_Update, subscriptionInfo *transport.SubscriptionInformation) {
pathsAndValues := make(map[string]string, len(resp.Update.Update))
if resp.Update == nil || len(resp.Update.Update) == 0 {
log.Debugf("handleSubscribeResponseUpdate empty update or updates; Update: %v, InnerUpdates: %v", resp.Update, resp.Update.Update)
return
}
for _, update := range resp.Update.Update {
pathString, err := ygot.PathToString(update.Path)
if err != nil {
log.Errorf("Error trying to create a string from path: %v", err)
}
switch v := update.GetVal().GetValue().(type) {
case *gpb.TypedValue_StringVal:
pathsAndValues[pathString] = update.Val.GetStringVal()
case *gpb.TypedValue_JsonIetfVal:
pathsAndValues[pathString] = string(update.GetVal().GetJsonIetfVal())
case *gpb.TypedValue_UintVal:
pathsAndValues[pathString] = fmt.Sprintf("%d", update.Val.GetUintVal())
default:
log.Errorf("The given value of type: %T, provided by a SubResponse from network element with ID: %s is not supported", v, subscriptionInfo.NetworkElementID)
return
}
}
mneID, err := uuid.Parse(subscriptionInfo.NetworkElementID)
if err != nil {
log.Errorf("Error trying to parse uuid, could not handle subscription response: %v", err)
}
pubEvent := event.NewGnmiSubscribeEvent(mneID, pathsAndValues)
if err := n.eventService.PublishEvent(NetworkElementEventTopic, pubEvent); err != nil {
go func() {
n.eventService.Reconnect()
retryErr := n.eventService.RetryPublish(NetworkElementEventTopic, pubEvent)
if retryErr != nil {
log.Error(retryErr)
}
}()
}
}
// mergeGnmiSubscriptions takes paths for gNMI Subscriptions from two sources (the MNE and a config file)
// and merges them to one set of subscription paths without duplicates.
// Paths should be provided in the following format [][]string{{"system", "config", "hostname"}}.
func (n *NetworkElementWatcher) mergeGnmiSubscriptions(gNMISusbcriptionPathsFromMne, gNMISusbcriptionPathsFromConfig [][]string) [][]string {
// create slice with all elements
var tempJoinedPaths = append(gNMISusbcriptionPathsFromMne, gNMISusbcriptionPathsFromConfig...)
// remove duplicates
inResult := make(map[string]bool)
var joinedPaths [][]string
for _, pathSlice := range tempJoinedPaths {
path := ygotutil.SlicePathToString(pathSlice)
if _, ok := inResult[path]; !ok {
inResult[path] = true
joinedPaths = append(joinedPaths, pathSlice)
}
}
return joinedPaths
}
// GetAllSubscriptionInformations returns the information of all running sunscriptions.
func (n *NetworkElementWatcher) GetAllSubscriptionInformations() []*networkelementSubscriptionHelper {
var information []*networkelementSubscriptionHelper
n.networkelementSubscriptionsMutex.Lock()
defer n.networkelementSubscriptionsMutex.Unlock()
for _, info := range n.networkelementSubcriptions {
information = append(information, info)
}
return information
}
// GetSubscriptionInformations returns the information for one specific subscription referenced by its ID.
func (n *NetworkElementWatcher) GetSubscriptionInformations(subID uuid.UUID) (*networkelementSubscriptionHelper, error) {
n.networkelementSubscriptionsMutex.Lock()
defer n.networkelementSubscriptionsMutex.Unlock()
information, ok := n.networkelementSubcriptions[subID]
if !ok {
return nil, errors.New(fmt.Sprintf("Couldn't retrieve information for subscription with ID: %s", subID.String()))
}
return information, nil
}
// getOptionsForMne checks if there is a match of the mneID with all the mne IDs provided in the slice of subscription
// information and returns the related subscribe options or an error.
func getOptionsForMne(mneID string, subInfos []*networkelementSubscriptionHelper) (*gnmi.SubscribeOptions, error) {
for _, subInfo := range subInfos {
if subInfo.MneID == mneID {
return subInfo.Opts, nil
}
}
return nil, fmt.Errorf("error: did not find subscription infos matching to provided mne ID: %s", mneID)
}