Newer
Older
package nucleus
import (
"context"
Fabian Seidl
committed
"strconv"
Fabian Seidl
committed
"code.fbi.h-da.de/danet/gosdn/controller/config"
Fabian Seidl
committed
"code.fbi.h-da.de/danet/gosdn/controller/customerrs"
"code.fbi.h-da.de/danet/gosdn/controller/event"
eventInterfaces "code.fbi.h-da.de/danet/gosdn/controller/interfaces/event"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/networkelement"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/transport"
"code.fbi.h-da.de/danet/gosdn/controller/nucleus/types"
"code.fbi.h-da.de/danet/gosdn/controller/store"
"code.fbi.h-da.de/danet/gosdn/forks/goarista/gnmi"
"github.com/google/uuid"
gpb "github.com/openconfig/gnmi/proto/gnmi"
Fabian Seidl
committed
ygotutil "github.com/openconfig/ygot/util"
Malte Bauch
committed
"github.com/openconfig/ygot/ygot"
log "github.com/sirupsen/logrus"
)
const (
subscribeSampleInterval uint64 = 1000000000 // 1 second in nanoseconds
// Note: These gNMI options are adjusted to arista gNMI fork. Change when switching to native gNMI.
gNMISubscribeMode string = "stream"
gNMIStreamMode string = "on_change"
Fabian Seidl
committed
numberWorkers int = 5
workerQueueSize int = 1000
// NetworkElementWatcher is a component that subscribes to network elements via gNMI from within the controller and handles
// responses by triggering the internal event process.
type NetworkElementWatcher struct {
mneService networkelement.Service
networkelementSubscriptionsMutex sync.Mutex
networkelementSubcriptions map[uuid.UUID]*networkelementSubscriptionHelper
eventService eventInterfaces.Service
// networkelementSubscriptionHelper is used to store information to stop a running subscribe go routine.
type networkelementSubscriptionHelper struct {
SubID string
MneID string
MneName string
PndID string
stopSubscribeCtx context.Context
stopFunc context.CancelFunc
Opts *gnmi.SubscribeOptions
// NewNetworkElementWatcher allows to subscribe to network element paths.
func NewNetworkElementWatcher(mneService networkelement.Service, eventService eventInterfaces.Service) *NetworkElementWatcher {
return &NetworkElementWatcher{
mneService: mneService,
networkelementSubcriptions: make(map[uuid.UUID]*networkelementSubscriptionHelper),
eventService: eventService,
// SubscribeToNetworkElements subscribes to every available network element in each network domain according to provided SubscribeOptions.
// SubscribeOptions can be nil. Use nil for a fixed, pre-defined set of gNMI subscription options (streaming in sample mode each second).
func (n *NetworkElementWatcher) SubscribeToNetworkElements(subScriptionInfos []*networkelementSubscriptionHelper) {
var tmpOpts *gnmi.SubscribeOptions
if len(subScriptionInfos) == 0 {
tmpOpts = &gnmi.SubscribeOptions{
Mode: gNMISubscribeMode,
StreamMode: gNMIStreamMode,
SampleInterval: subscribeSampleInterval,
}
}
mnes, err := n.mneService.GetAll()
if err != nil {
log.Error(err)
for _, mne := range mnes {
if len(subScriptionInfos) > 0 {
tmpOpts, err = getOptionsForMne(mne.ID().String(), subScriptionInfos)
if err != nil {
log.Infof("Couldn't find options for mne %s, reason: %v. \n Skipping subscription.", mne.Name(), err)
continue
}
}
n.subscribeToNetworkElement(mne, tmpOpts)
// SubscribeToNetworkElement subscribes to the provided network element according to provided SubscribeOptions.
// SubscribeOptions can be nil. Use nil for a fixed, pre-defined set of gNMI subscription options (streaming in sample mode each second).
Fabian Seidl
committed
func (n *NetworkElementWatcher) SubscribeToNetworkElement(mne networkelement.NetworkElement, opts *gnmi.SubscribeOptions) {
if opts == nil {
opts = &gnmi.SubscribeOptions{
Mode: gNMISubscribeMode,
StreamMode: gNMIStreamMode,
SampleInterval: subscribeSampleInterval,
}
}
n.subscribeToNetworkElement(mne, opts)
}
// SubscribeToNetworkElementWithID subscribes to the network element matching the provided ID according to provided SubscribeOptions.
// SubscribeOptions can be nil. Use nil for a fixed, pre-defined set of gNMI subscription options (streaming in sample mode each second).
func (n *NetworkElementWatcher) SubscribeToNetworkElementWithID(mneID uuid.UUID, opts *gnmi.SubscribeOptions) error {
if opts == nil {
opts = &gnmi.SubscribeOptions{
Mode: gNMISubscribeMode,
StreamMode: gNMIStreamMode,
SampleInterval: subscribeSampleInterval,
}
}
mne, err := n.mneService.Get(store.Query{ID: mneID})
if err != nil {
log.Error(err)
return err
}
n.subscribeToNetworkElement(mne, opts)
return nil
}
func (n *NetworkElementWatcher) subscribeToNetworkElement(mne networkelement.NetworkElement, opts *gnmi.SubscribeOptions) {
subID := uuid.New()
opts.Paths = n.mergeGnmiSubscriptions(mne.GetGnmiSubscriptionPaths(), config.GetGnmiSubscriptionPaths())
stopContext, cancel := context.WithCancel(context.Background())
n.addToNetworkElementSubscriptions(subID, &networkelementSubscriptionHelper{
stopSubscribeCtx: stopContext,
stopFunc: cancel,
SubID: subID.String(),
MneID: mne.ID().String(),
MneName: mne.Name(),
PndID: mne.PndID().String(),
Opts: opts,
})
go n.callSubscribe(stopContext, mne, opts)
Fabian Seidl
committed
// callSubscribe spawns a worker pool to handle gNMI subscription updates for each individual subscription
// and then sets up the gNMI subscription listener.
func (n *NetworkElementWatcher) callSubscribe(stopContext context.Context, mne networkelement.NetworkElement, opts *gnmi.SubscribeOptions) {
gNMIOptionsCtx := context.Background()
gNMIOptionsCtx = context.WithValue(gNMIOptionsCtx, types.CtxKeyOpts, opts)
Fabian Seidl
committed
subInfoChan := make(chan transport.SubscriptionInformation, workerQueueSize)
for i := 1; i <= numberWorkers; i++ {
name := "Worker " + strconv.Itoa(i)
worker := NewSubscriptionQueueWorker(name, n.handleSubscribeResponse)
go worker.HandleGnmiSubscriptionUpdates(subInfoChan)
}
// SubscriptionInformation contains pnd ID, network element ID and name to be used in the internal subscribe to check
// from which network element a response was sent
Fabian Seidl
committed
if err := mne.Transport().ControlPlaneSubscribe(gNMIOptionsCtx, transport.SubscriptionInformation{
PndID: mne.PndID().String(),
NetworkElementID: mne.ID().String(),
NetworkElementName: mne.Name(),
StopContext: stopContext,
Fabian Seidl
committed
},
subInfoChan); err != nil {
log.Error(err)
}
}
func (n *NetworkElementWatcher) addToNetworkElementSubscriptions(subID uuid.UUID, devSub *networkelementSubscriptionHelper) {
n.networkelementSubscriptionsMutex.Lock()
defer n.networkelementSubscriptionsMutex.Unlock()
// StopAndRemoveAllNetworkElementSubscriptions stops and removes all the available running subscriptions.
func (n *NetworkElementWatcher) StopAndRemoveAllNetworkElementSubscriptions() {
for key := range n.networkelementSubcriptions {
n.StopAndRemoveNetworkElementSubscription(key)
}
}
// StopAndRemoveNetworkElementSubscription passes a subscription uuid to stop the running subscription go routing and removes the entry from the map
// of network element subscriptions.
func (n *NetworkElementWatcher) StopAndRemoveNetworkElementSubscription(subID uuid.UUID) {
n.networkelementSubscriptionsMutex.Lock()
defer n.networkelementSubscriptionsMutex.Unlock()
n.networkelementSubcriptions[subID].stopFunc()
delete(n.networkelementSubcriptions, subID)
// handleSubscribeResponse takes the subscribe response and additional information about the network element to distinguish
// from which network element a subscribe response was sent including improved error handling.
Fabian Seidl
committed
func (n *NetworkElementWatcher) handleSubscribeResponse(subscriptionInfo *transport.SubscriptionInformation, workerName string) {
log.Debugf("Received Subscribe response: MNE ID: %s, MNE Name: %s, SubResponse: %v", subscriptionInfo.NetworkElementID, subscriptionInfo.NetworkElementName, subscriptionInfo.SubResponse)
Fabian Seidl
committed
if subscriptionInfo.SubResponse == nil {
// Note: This needs proper error handling, no idea how yet. Simply logging would lead to spam in the console
// if the target that was subscribed to is not reachable anymore.
// log.Error("Error: subresponse == nil")
} else {
switch resp := subscriptionInfo.SubResponse.Response.(type) {
case *gpb.SubscribeResponse_Error:
log.Error(&customerrs.SubscribeResponseError{
PndID: subscriptionInfo.PndID,
NetworkElementID: subscriptionInfo.NetworkElementID,
NetworkElementName: subscriptionInfo.NetworkElementName,
Fabian Seidl
committed
Err: fmt.Sprint("SubscribeResponse_Error"),
Fabian Seidl
committed
case *gpb.SubscribeResponse_SyncResponse:
if !resp.SyncResponse {
log.Error(&customerrs.SubscribeSyncResponseError{
PndID: subscriptionInfo.PndID,
NetworkElementID: subscriptionInfo.NetworkElementID,
NetworkElementName: subscriptionInfo.NetworkElementName,
})
}
case *gpb.SubscribeResponse_Update:
n.handleSubscribeResponseUpdate(resp, subscriptionInfo)
default:
log.Infof("Invalid SubscribeResponse, %v", resp)
}
}
}
func (n *NetworkElementWatcher) handleSubscribeResponseUpdate(resp *gpb.SubscribeResponse_Update, subscriptionInfo *transport.SubscriptionInformation) {
pathsAndValues := make(map[string]string, len(resp.Update.Update))
if resp.Update == nil || len(resp.Update.Update) == 0 {
log.Debugf("handleSubscribeResponseUpdate empty update or updates; Update: %v, InnerUpdates: %v", resp.Update, resp.Update.Update)
return
}
for _, update := range resp.Update.Update {
Malte Bauch
committed
pathString, err := ygot.PathToString(update.Path)
if err != nil {
log.Errorf("Error trying to create a string from path: %v", err)
switch v := update.GetVal().GetValue().(type) {
case *gpb.TypedValue_StringVal:
pathsAndValues[pathString] = update.Val.GetStringVal()
case *gpb.TypedValue_JsonIetfVal:
pathsAndValues[pathString] = string(update.GetVal().GetJsonIetfVal())
case *gpb.TypedValue_UintVal:
pathsAndValues[pathString] = fmt.Sprintf("%d", update.Val.GetUintVal())
default:
log.Errorf("The given value of type: %T, provided by a SubResponse from network element with ID: %s is not supported", v, subscriptionInfo.NetworkElementID)
return
}
}
Fabian Seidl
committed
mneID, err := uuid.Parse(subscriptionInfo.NetworkElementID)
if err != nil {
log.Errorf("Error trying to parse uuid, could not handle subscription response: %v", err)
}
Fabian Seidl
committed
pubEvent := event.NewGnmiSubscribeEvent(mneID, pathsAndValues)
if err := n.eventService.PublishEvent(NetworkElementEventTopic, pubEvent); err != nil {
go func() {
n.eventService.Reconnect()
retryErr := n.eventService.RetryPublish(NetworkElementEventTopic, pubEvent)
if retryErr != nil {
log.Error(retryErr)
}
}()
Fabian Seidl
committed
// mergeGnmiSubscriptions takes paths for gNMI Subscriptions from two sources (the MNE and a config file)
// and merges them to one set of subscription paths without duplicates.
// Paths should be provided in the following format [][]string{{"system", "config", "hostname"}}.
func (n *NetworkElementWatcher) mergeGnmiSubscriptions(gNMISusbcriptionPathsFromMne, gNMISusbcriptionPathsFromConfig [][]string) [][]string {
// create slice with all elements
var tempJoinedPaths = append(gNMISusbcriptionPathsFromMne, gNMISusbcriptionPathsFromConfig...)
// remove duplicates
inResult := make(map[string]bool)
var joinedPaths [][]string
for _, pathSlice := range tempJoinedPaths {
path := ygotutil.SlicePathToString(pathSlice)
if _, ok := inResult[path]; !ok {
inResult[path] = true
joinedPaths = append(joinedPaths, pathSlice)
}
}
return joinedPaths
}
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
// GetAllSubscriptionInformations returns the information of all running sunscriptions.
func (n *NetworkElementWatcher) GetAllSubscriptionInformations() []*networkelementSubscriptionHelper {
var information []*networkelementSubscriptionHelper
n.networkelementSubscriptionsMutex.Lock()
defer n.networkelementSubscriptionsMutex.Unlock()
for _, info := range n.networkelementSubcriptions {
information = append(information, info)
}
return information
}
// GetSubscriptionInformations returns the information for one specific subscription referenced by its ID.
func (n *NetworkElementWatcher) GetSubscriptionInformations(subID uuid.UUID) (*networkelementSubscriptionHelper, error) {
n.networkelementSubscriptionsMutex.Lock()
defer n.networkelementSubscriptionsMutex.Unlock()
information, ok := n.networkelementSubcriptions[subID]
if !ok {
return nil, errors.New(fmt.Sprintf("Couldn't retrieve information for subscription with ID: %s", subID.String()))
}
return information, nil
}
// getOptionsForMne checks if there is a match of the mneID with all the mne IDs provided in the slice of subscription
// information and returns the related subscribe options or an error.
func getOptionsForMne(mneID string, subInfos []*networkelementSubscriptionHelper) (*gnmi.SubscribeOptions, error) {
for _, subInfo := range subInfos {
if subInfo.MneID == mneID {
return subInfo.Opts, nil
}
}
return nil, fmt.Errorf("error: did not find subscription infos matching to provided mne ID: %s", mneID)
}