Newer
Older
package nucleus
import (
"context"
Fabian Seidl
committed
"strconv"
Fabian Seidl
committed
"code.fbi.h-da.de/danet/gosdn/controller/config"
Fabian Seidl
committed
"code.fbi.h-da.de/danet/gosdn/controller/customerrs"
"code.fbi.h-da.de/danet/gosdn/controller/event"
eventInterfaces "code.fbi.h-da.de/danet/gosdn/controller/interfaces/event"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/networkelement"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/transport"
"code.fbi.h-da.de/danet/gosdn/controller/nucleus/types"
"code.fbi.h-da.de/danet/gosdn/forks/goarista/gnmi"
"github.com/google/uuid"
gpb "github.com/openconfig/gnmi/proto/gnmi"
Fabian Seidl
committed
ygotutil "github.com/openconfig/ygot/util"
Malte Bauch
committed
"github.com/openconfig/ygot/ygot"
log "github.com/sirupsen/logrus"
)
const (
subscribeSampleInterval uint64 = 1000000000 // 1 second in nanoseconds
// Note: These gNMI options are adjusted to arista gNMI fork. Change when switching to native gNMI.
gNMISubscribeMode string = "stream"
gNMIStreamMode string = "on_change"
Fabian Seidl
committed
numberWorkers int = 5
workerQueueSize int = 1000
// NetworkElementWatcher is a component that subscribes to network elements via gNMI from within the controller and handles
// responses by triggering the internal event process.
type NetworkElementWatcher struct {
mneService networkelement.Service
networkelementSubscriptionsMutex sync.Mutex
networkelementSubcriptions map[uuid.UUID]*networkelementSubscriptionHelper
eventService eventInterfaces.Service
// networkelementSubscriptionHelper is used to store information to stop a running subscribe go routine.
type networkelementSubscriptionHelper struct {
SubID string
MneID string
MneName string
PndID string
stopSubscribeCtx context.Context
stopFunc context.CancelFunc
Opts *gnmi.SubscribeOptions
// NewNetworkElementWatcher allows to subscribe to network element paths.
func NewNetworkElementWatcher(mneService networkelement.Service, eventService eventInterfaces.Service) *NetworkElementWatcher {
return &NetworkElementWatcher{
mneService: mneService,
networkelementSubcriptions: make(map[uuid.UUID]*networkelementSubscriptionHelper),
eventService: eventService,
// SubscribeToNetworkElements subscribes to every available network element in each network domain according to provided SubscribeOptions.
// SubscribeOptions can be nil. Use nil for a fixed, pre-defined set of gNMI subscription options (streaming in sample mode each second).
func (n *NetworkElementWatcher) SubscribeToNetworkElements(subScriptionInfos []*networkelementSubscriptionHelper) {
var tmpOpts *gnmi.SubscribeOptions
if len(subScriptionInfos) == 0 {
tmpOpts = &gnmi.SubscribeOptions{
Mode: gNMISubscribeMode,
StreamMode: gNMIStreamMode,
SampleInterval: subscribeSampleInterval,
}
}
mnes, err := n.mneService.GetAll()
if err != nil {
log.Error(err)
for _, mne := range mnes {
if len(subScriptionInfos) > 0 {
tmpOpts, err = getOptionsForMne(mne.ID().String(), subScriptionInfos)
if err != nil {
log.Infof("Couldn't find options for mne %s, reason: %v. \n Skipping subscription.", mne.Name(), err)
continue
}
}
n.subscribeToNetworkElement(mne, tmpOpts)
// SubscribeToNetworkElement subscribes to the provided network element according to provided SubscribeOptions.
// SubscribeOptions can be nil. Use nil for a fixed, pre-defined set of gNMI subscription options (streaming in sample mode each second).
Fabian Seidl
committed
func (n *NetworkElementWatcher) SubscribeToNetworkElement(mne networkelement.NetworkElement, opts *gnmi.SubscribeOptions) {
if opts == nil {
opts = &gnmi.SubscribeOptions{
Mode: gNMISubscribeMode,
StreamMode: gNMIStreamMode,
SampleInterval: subscribeSampleInterval,
}
}
n.subscribeToNetworkElement(mne, opts)
}
func (n *NetworkElementWatcher) subscribeToNetworkElement(mne networkelement.NetworkElement, opts *gnmi.SubscribeOptions) {
subID := uuid.New()
opts.Paths = n.mergeGnmiSubscriptions(mne.GetGnmiSubscriptionPaths(), config.GetGnmiSubscriptionPaths())
stopContext, cancel := context.WithCancel(context.Background())
n.addToNetworkElementSubscriptions(subID, &networkelementSubscriptionHelper{
stopSubscribeCtx: stopContext,
stopFunc: cancel,
SubID: subID.String(),
MneID: mne.ID().String(),
MneName: mne.Name(),
PndID: mne.PndID().String(),
Opts: opts,
})
go n.callSubscribe(stopContext, mne, opts)
Fabian Seidl
committed
// callSubscribe spawns a worker pool to handle gNMI subscription updates for each individual subscription
// and then sets up the gNMI subscription listener.
func (n *NetworkElementWatcher) callSubscribe(stopContext context.Context, mne networkelement.NetworkElement, opts *gnmi.SubscribeOptions) {
gNMIOptionsCtx := context.Background()
gNMIOptionsCtx = context.WithValue(gNMIOptionsCtx, types.CtxKeyOpts, opts)
Fabian Seidl
committed
subInfoChan := make(chan transport.SubscriptionInformation, workerQueueSize)
for i := 1; i <= numberWorkers; i++ {
name := "Worker " + strconv.Itoa(i)
worker := NewSubscriptionQueueWorker(name, n.handleSubscribeResponse)
go worker.HandleGnmiSubscriptionUpdates(subInfoChan)
}
// SubscriptionInformation contains pnd ID, network element ID and name to be used in the internal subscribe to check
// from which network element a response was sent
Fabian Seidl
committed
if err := mne.Transport().ControlPlaneSubscribe(gNMIOptionsCtx, transport.SubscriptionInformation{
PndID: mne.PndID().String(),
NetworkElementID: mne.ID().String(),
NetworkElementName: mne.Name(),
StopContext: stopContext,
Fabian Seidl
committed
},
subInfoChan); err != nil {
log.Error(err)
}
}
func (n *NetworkElementWatcher) addToNetworkElementSubscriptions(subID uuid.UUID, devSub *networkelementSubscriptionHelper) {
n.networkelementSubscriptionsMutex.Lock()
defer n.networkelementSubscriptionsMutex.Unlock()
// StopAndRemoveAllNetworkElementSubscriptions stops and removes all the available running subscriptions.
func (n *NetworkElementWatcher) StopAndRemoveAllNetworkElementSubscriptions() {
for key := range n.networkelementSubcriptions {
n.StopAndRemoveNetworkElementSubscription(key)
}
}
// StopAndRemoveNetworkElementSubscription passes a subscription uuid to stop the running subscription go routing and removes the entry from the map
// of network element subscriptions.
func (n *NetworkElementWatcher) StopAndRemoveNetworkElementSubscription(subID uuid.UUID) {
n.networkelementSubscriptionsMutex.Lock()
defer n.networkelementSubscriptionsMutex.Unlock()
n.networkelementSubcriptions[subID].stopFunc()
delete(n.networkelementSubcriptions, subID)
// handleSubscribeResponse takes the subscribe response and additional information about the network element to distinguish
// from which network element a subscribe response was sent including improved error handling.
Fabian Seidl
committed
func (n *NetworkElementWatcher) handleSubscribeResponse(subscriptionInfo *transport.SubscriptionInformation, workerName string) {
if subscriptionInfo.SubResponse == nil {
// Note: This needs proper error handling, no idea how yet. Simply logging would lead to spam in the console
// if the target that was subscribed to is not reachable anymore.
// log.Error("Error: subresponse == nil")
} else {
switch resp := subscriptionInfo.SubResponse.Response.(type) {
case *gpb.SubscribeResponse_Error:
log.Error(&customerrs.SubscribeResponseError{
PndID: subscriptionInfo.PndID,
NetworkElementID: subscriptionInfo.NetworkElementID,
NetworkElementName: subscriptionInfo.NetworkElementName,
Fabian Seidl
committed
Err: fmt.Sprint("SubscribeResponse_Error"),
Fabian Seidl
committed
case *gpb.SubscribeResponse_SyncResponse:
if !resp.SyncResponse {
log.Error(&customerrs.SubscribeSyncResponseError{
PndID: subscriptionInfo.PndID,
NetworkElementID: subscriptionInfo.NetworkElementID,
NetworkElementName: subscriptionInfo.NetworkElementName,
})
}
case *gpb.SubscribeResponse_Update:
n.handleSubscribeResponseUpdate(resp, subscriptionInfo)
default:
log.Infof("Invalid SubscribeResponse, %v", resp)
}
}
}
func (n *NetworkElementWatcher) handleSubscribeResponseUpdate(resp *gpb.SubscribeResponse_Update, subscriptionInfo *transport.SubscriptionInformation) {
pathsAndValues := make(map[string]string, len(resp.Update.Update))
for _, update := range resp.Update.Update {
Malte Bauch
committed
pathString, err := ygot.PathToString(update.Path)
if err != nil {
log.Errorf("Error trying to create a string from path: %v", err)
pathsAndValues[pathString] = update.Val.GetStringVal()
}
Fabian Seidl
committed
mneID, err := uuid.Parse(subscriptionInfo.NetworkElementID)
if err != nil {
log.Errorf("Error trying to parse uuid, could not handle subscription response: %v", err)
}
Fabian Seidl
committed
pubEvent := event.NewGnmiSubscribeEvent(mneID, pathsAndValues)
if err := n.eventService.PublishEvent(NetworkElementEventTopic, pubEvent); err != nil {
go func() {
n.eventService.Reconnect()
retryErr := n.eventService.RetryPublish(NetworkElementEventTopic, pubEvent)
if retryErr != nil {
log.Error(retryErr)
}
}()
Fabian Seidl
committed
// mergeGnmiSubscriptions takes paths for gNMI Subscriptions from two sources (the MNE and a config file)
// and merges them to one set of subscription paths without duplicates.
// Paths should be provided in the following format [][]string{{"system", "config", "hostname"}}.
func (n *NetworkElementWatcher) mergeGnmiSubscriptions(gNMISusbcriptionPathsFromMne, gNMISusbcriptionPathsFromConfig [][]string) [][]string {
// create slice with all elements
var tempJoinedPaths = append(gNMISusbcriptionPathsFromMne, gNMISusbcriptionPathsFromConfig...)
// remove duplicates
inResult := make(map[string]bool)
var joinedPaths [][]string
for _, pathSlice := range tempJoinedPaths {
path := ygotutil.SlicePathToString(pathSlice)
if _, ok := inResult[path]; !ok {
inResult[path] = true
joinedPaths = append(joinedPaths, pathSlice)
}
}
return joinedPaths
}
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
// GetAllSubscriptionInformations returns the information of all running sunscriptions.
func (n *NetworkElementWatcher) GetAllSubscriptionInformations() []*networkelementSubscriptionHelper {
var information []*networkelementSubscriptionHelper
n.networkelementSubscriptionsMutex.Lock()
defer n.networkelementSubscriptionsMutex.Unlock()
for _, info := range n.networkelementSubcriptions {
information = append(information, info)
}
return information
}
// GetSubscriptionInformations returns the information for one specific subscription referenced by its ID.
func (n *NetworkElementWatcher) GetSubscriptionInformations(subID uuid.UUID) (*networkelementSubscriptionHelper, error) {
n.networkelementSubscriptionsMutex.Lock()
defer n.networkelementSubscriptionsMutex.Unlock()
information, ok := n.networkelementSubcriptions[subID]
if !ok {
return nil, errors.New(fmt.Sprintf("Couldn't retrieve information for subscription with ID: %s", subID.String()))
}
return information, nil
}
// getOptionsForMne checks if there is a match of the mneID with all the mne IDs provided in the slice of subscription
// information and returns the related subscribe options or an error.
func getOptionsForMne(mneID string, subInfos []*networkelementSubscriptionHelper) (*gnmi.SubscribeOptions, error) {
for _, subInfo := range subInfos {
if subInfo.MneID == mneID {
return subInfo.Opts, nil
}
}
return nil, fmt.Errorf("error: did not find subscription infos matching to provided mne ID: %s", mneID)
}