Newer
Older
package nucleus
import (
"context"
"fmt"
Fabian Seidl
committed
"strconv"
Fabian Seidl
committed
"code.fbi.h-da.de/danet/gosdn/controller/customerrs"
"code.fbi.h-da.de/danet/gosdn/controller/event"
eventInterfaces "code.fbi.h-da.de/danet/gosdn/controller/interfaces/event"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/networkelement"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/transport"
"code.fbi.h-da.de/danet/gosdn/controller/nucleus/types"
"code.fbi.h-da.de/danet/gosdn/forks/goarista/gnmi"
"github.com/google/uuid"
gpb "github.com/openconfig/gnmi/proto/gnmi"
Malte Bauch
committed
"github.com/openconfig/ygot/ygot"
log "github.com/sirupsen/logrus"
)
const (
subscribeSampleInterval uint64 = 1000000000 // 1 second in nanoseconds
// Note: These gNMI options are adjusted to arista gNMI fork. Change when switching to native gNMI.
gNMISubscribeMode string = "stream"
gNMIStreamMode string = "on_change"
Fabian Seidl
committed
numberWorkers int = 5
workerQueueSize int = 1000
// NetworkElementWatcher is a component that subscribes to network elements via gNMI from within the controller and handles
// responses by triggering the internal event process.
type NetworkElementWatcher struct {
mneService networkelement.Service
networkelementSubcriptions map[uuid.UUID]*networkelementSubscriptionHelper
eventService eventInterfaces.Service
// networkelementSubscriptionHelper is used to store information to stop a running subscribe go routine.
type networkelementSubscriptionHelper struct {
stopSubscribeCtx context.Context
stopFunc context.CancelFunc
}
// NewNetworkElementWatcher allows to subscribe to network element paths.
func NewNetworkElementWatcher(mneService networkelement.Service, eventService eventInterfaces.Service) *NetworkElementWatcher {
return &NetworkElementWatcher{
mneService: mneService,
networkelementSubcriptions: make(map[uuid.UUID]*networkelementSubscriptionHelper),
eventService: eventService,
// SubscribeToNetworkElements subscribes to every available network element in each network domain according to provided SubscribeOptions.
// Paths should be provided in the following format [][]string{{"system", "config", "hostname"}}
// SubscribeOptions can be nil. Use nil for a fixed, pre-defined set of gNMI subscription options (streaming in sample mode each second).
func (n *NetworkElementWatcher) SubscribeToNetworkElements(paths [][]string, opts *gnmi.SubscribeOptions) {
if opts == nil {
opts = &gnmi.SubscribeOptions{
Mode: gNMISubscribeMode,
StreamMode: gNMIStreamMode,
Paths: paths,
SampleInterval: subscribeSampleInterval,
}
}
mnes, err := n.mneService.GetAll()
if err != nil {
log.Error(err)
for _, mne := range mnes {
n.subscribeToNetworkElement(mne, opts)
// SubscribeToNetworkElement subscribes to the provided network element according to provided SubscribeOptions.
// Paths should be provided in the following format [][]string{{"system", "config", "hostname"}}
// SubscribeOptions can be nil. Use nil for a fixed, pre-defined set of gNMI subscription options (streaming in sample mode each second).
func (n *NetworkElementWatcher) SubscribeToNetworkElement(mne networkelement.NetworkElement, paths [][]string, opts *gnmi.SubscribeOptions) {
if opts == nil {
opts = &gnmi.SubscribeOptions{
Mode: gNMISubscribeMode,
StreamMode: gNMIStreamMode,
Paths: paths,
SampleInterval: subscribeSampleInterval,
}
}
n.subscribeToNetworkElement(mne, opts)
}
func (n *NetworkElementWatcher) subscribeToNetworkElement(mne networkelement.NetworkElement, opts *gnmi.SubscribeOptions) {
subID := uuid.New()
stopContext, cancel := context.WithCancel(context.Background())
n.addToNetworkElementSubscriptions(subID, &networkelementSubscriptionHelper{
stopSubscribeCtx: stopContext,
stopFunc: cancel,
})
go n.callSubscribe(stopContext, mne, opts)
Fabian Seidl
committed
// callSubscribe spawns a worker pool to handle gNMI subscription updates for each individual subscription
// and then sets up the gNMI subscription listener.
func (n *NetworkElementWatcher) callSubscribe(stopContext context.Context, mne networkelement.NetworkElement, opts *gnmi.SubscribeOptions) {
gNMIOptionsCtx := context.Background()
gNMIOptionsCtx = context.WithValue(gNMIOptionsCtx, types.CtxKeyOpts, opts)
Fabian Seidl
committed
subInfoChan := make(chan transport.SubscriptionInformation, workerQueueSize)
for i := 1; i <= numberWorkers; i++ {
name := "Worker " + strconv.Itoa(i)
worker := NewSubscriptionQueueWorker(name, n.handleSubscribeResponse)
go worker.HandleGnmiSubscriptionUpdates(subInfoChan)
}
// SubscriptionInformation contains pnd ID, network element ID and name to be used in the internal subscribe to check
// from which network element a response was sent
Fabian Seidl
committed
if err := mne.Transport().ControlPlaneSubscribe(gNMIOptionsCtx, transport.SubscriptionInformation{
PndID: mne.PndID().String(),
NetworkElementID: mne.ID().String(),
NetworkElementName: mne.Name(),
StopContext: stopContext,
Fabian Seidl
committed
},
subInfoChan); err != nil {
log.Error(err)
}
}
func (n *NetworkElementWatcher) addToNetworkElementSubscriptions(subID uuid.UUID, devSub *networkelementSubscriptionHelper) {
//TODO: improve handling of subscriptions, like be able to expose to apps so specific subscriptions instead of only all can be stopped in the future
// StopAndRemoveAllNetworkElementSubscriptions stops and removes all the available running subscriptions.
func (n *NetworkElementWatcher) StopAndRemoveAllNetworkElementSubscriptions() {
for key := range n.networkelementSubcriptions {
n.StopAndRemoveNetworkElementSubscription(key)
}
}
// StopAndRemoveNetworkElementSubscription passes a subscription uuid to stop the running subscription go routing and removes the entry from the map
// of network element subscriptions.
func (n *NetworkElementWatcher) StopAndRemoveNetworkElementSubscription(subID uuid.UUID) {
n.networkelementSubcriptions[subID].stopFunc()
delete(n.networkelementSubcriptions, subID)
// handleSubscribeResponse takes the subscribe response and additional information about the network element to distinguish
// from which network element a subscribe response was sent including improved error handling.
Fabian Seidl
committed
func (n *NetworkElementWatcher) handleSubscribeResponse(subscriptionInfo *transport.SubscriptionInformation, workerName string) {
if subscriptionInfo.SubResponse == nil {
// Note: This needs proper error handling, no idea how yet. Simply logging would lead to spam in the console
// if the target that was subscribed to is not reachable anymore.
// log.Error("Error: subresponse == nil")
} else {
switch resp := subscriptionInfo.SubResponse.Response.(type) {
case *gpb.SubscribeResponse_Error:
log.Error(&customerrs.SubscribeResponseError{
PndID: subscriptionInfo.PndID,
NetworkElementID: subscriptionInfo.NetworkElementID,
NetworkElementName: subscriptionInfo.NetworkElementName,
Fabian Seidl
committed
Err: fmt.Sprint("SubscribeResponse_Error"),
Fabian Seidl
committed
case *gpb.SubscribeResponse_SyncResponse:
if !resp.SyncResponse {
log.Error(&customerrs.SubscribeSyncResponseError{
PndID: subscriptionInfo.PndID,
NetworkElementID: subscriptionInfo.NetworkElementID,
NetworkElementName: subscriptionInfo.NetworkElementName,
})
}
case *gpb.SubscribeResponse_Update:
n.handleSubscribeResponseUpdate(resp, subscriptionInfo)
default:
log.Infof("Invalid SubscribeResponse, %v", resp)
}
}
}
func (n *NetworkElementWatcher) handleSubscribeResponseUpdate(resp *gpb.SubscribeResponse_Update, subscriptionInfo *transport.SubscriptionInformation) {
pathsAndValues := make(map[string]string, len(resp.Update.Update))
for _, update := range resp.Update.Update {
Malte Bauch
committed
pathString, err := ygot.PathToString(update.Path)
if err != nil {
log.Errorf("Error trying to create a string from path: %v", err)
pathsAndValues[pathString] = update.Val.GetStringVal()
}
Fabian Seidl
committed
mneID, err := uuid.Parse(subscriptionInfo.NetworkElementID)
if err != nil {
log.Errorf("Error trying to parse uuid, could not handle subscription response: %v", err)
}
Fabian Seidl
committed
pubEvent := event.NewGnmiSubscribeEvent(mneID, pathsAndValues)
if err := n.eventService.PublishEvent(NetworkElementEventTopic, pubEvent); err != nil {
go func() {
n.eventService.Reconnect()
retryErr := n.eventService.RetryPublish(NetworkElementEventTopic, pubEvent)
if retryErr != nil {
log.Error(retryErr)
}
}()