Skip to content
Snippets Groups Projects
Commit 6a730881 authored by Fabian Seidl's avatar Fabian Seidl
Browse files

Resolve "The device watching mechanism does not scale well and might cause performance issues"

See merge request !493
parent 4b3afba5
No related branches found
No related tags found
1 merge request!493Resolve "The device watching mechanism does not scale well and might cause performance issues"
Pipeline #159510 failed
......@@ -15,8 +15,7 @@ type Transport interface {
Set(ctx context.Context, payload change.Payload) error
CustomSet(ctx context.Context, req *gpb.SetRequest) (*gpb.SetResponse, error)
Subscribe(ctx context.Context, params ...string) error
ControlPlaneSubscribe(ctx context.Context, subscribeCallbackFunc HandleSubscribeResponse,
subscriptionInfo *SubscriptionInformation) error
ControlPlaneSubscribe(ctx context.Context, subscriptionInfo SubscriptionInformation, subInfoChannel chan SubscriptionInformation) error
Type() string
ProcessResponse(resp interface{}) error
ProcessControlPlaneSubscribeResponse(resp *gpb.SubscribeResponse_Update) error
......@@ -35,4 +34,5 @@ type SubscriptionInformation struct {
NetworkElementID string
NetworkElementName string
StopContext context.Context
SubResponse *gpb.SubscribeResponse
}
......@@ -19,13 +19,13 @@ type Transport struct {
mock.Mock
}
// ControlPlaneSubscribe provides a mock function with given fields: ctx, subscribeCallbackFunc, subscriptionInfo
func (_m *Transport) ControlPlaneSubscribe(ctx context.Context, subscribeCallbackFunc transport.HandleSubscribeResponse, subscriptionInfo *transport.SubscriptionInformation) error {
ret := _m.Called(ctx, subscribeCallbackFunc, subscriptionInfo)
// ControlPlaneSubscribe provides a mock function with given fields: ctx, subscriptionInfo, subInfoChannel
func (_m *Transport) ControlPlaneSubscribe(ctx context.Context, subscriptionInfo transport.SubscriptionInformation, subInfoChannel chan transport.SubscriptionInformation) error {
ret := _m.Called(ctx, subscriptionInfo, subInfoChannel)
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, transport.HandleSubscribeResponse, *transport.SubscriptionInformation) error); ok {
r0 = rf(ctx, subscribeCallbackFunc, subscriptionInfo)
if rf, ok := ret.Get(0).(func(context.Context, transport.SubscriptionInformation, chan transport.SubscriptionInformation) error); ok {
r0 = rf(ctx, subscriptionInfo, subInfoChannel)
} else {
r0 = ret.Error(0)
}
......
......@@ -3,6 +3,7 @@ package nucleus
import (
"context"
"fmt"
"time"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/change"
tpInterface "code.fbi.h-da.de/danet/gosdn/controller/interfaces/transport"
......@@ -125,12 +126,12 @@ func (g *Gnmi) Subscribe(ctx context.Context, params ...string) error {
// ControlPlaneSubscribe is used to subscribe to devices from within the controller. gNMI SubscribeOptions need to be provided in the context,
// the callback function handles the responses received from the subscription.
func (g *Gnmi) ControlPlaneSubscribe(ctx context.Context, subscribeCallbackFunc tpInterface.HandleSubscribeResponse, subscriptionInfo *tpInterface.SubscriptionInformation) error {
func (g *Gnmi) ControlPlaneSubscribe(ctx context.Context, subscriptionInfo tpInterface.SubscriptionInformation, subInfoChannel chan tpInterface.SubscriptionInformation) error {
if g.client == nil {
return &customerrs.NilClientError{}
}
return g.controlPlaneSubscribe(ctx, subscribeCallbackFunc, subscriptionInfo)
return g.controlPlaneSubscribe(ctx, subscriptionInfo, subInfoChannel)
}
// Type returns the gNMI transport type.
......@@ -289,8 +290,7 @@ func (g *Gnmi) subscribe(ctx context.Context) error {
// controlPlaneSubscribe calls gNMI subscribe with a callback for responses and additional network element information including
// an option to stop the subscription.
func (g *Gnmi) controlPlaneSubscribe(ctx context.Context, subcribeCallbackFunc func(*gpb.SubscribeResponse,
*tpInterface.SubscriptionInformation), subscriptionInfo *tpInterface.SubscriptionInformation) error {
func (g *Gnmi) controlPlaneSubscribe(ctx context.Context, subscriptionInfo tpInterface.SubscriptionInformation, subInfoChannel chan tpInterface.SubscriptionInformation) error {
ctx = gnmi.NewContext(ctx, g.config)
opts, ok := ctx.Value(types.CtxKeyOpts).(*gnmi.SubscribeOptions)
if !ok {
......@@ -309,10 +309,8 @@ func (g *Gnmi) controlPlaneSubscribe(ctx context.Context, subcribeCallbackFunc f
}).Info("subscribed to gNMI target")
for {
resp := <-g.RespChan
if resp != nil {
// callback to trigger internal event handling process
go subcribeCallbackFunc(resp, subscriptionInfo)
}
subscriptionInfo.SubResponse = resp
subInfoChannel <- subscriptionInfo
select {
case <-subscriptionInfo.StopContext.Done():
......@@ -321,9 +319,12 @@ func (g *Gnmi) controlPlaneSubscribe(ctx context.Context, subcribeCallbackFunc f
}
return
default:
log.Infof("Buffer Length: %v", len(subInfoChannel))
time.Sleep(time.Millisecond * 2)
}
}
}()
return gnmi.SubscribeErr(ctx, g.client, opts, g.RespChan)
}
......
......@@ -3,6 +3,7 @@ package nucleus
import (
"context"
"fmt"
"strconv"
"strings"
"code.fbi.h-da.de/danet/gosdn/controller/customerrs"
......@@ -11,7 +12,6 @@ import (
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/networkelement"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/transport"
"code.fbi.h-da.de/danet/gosdn/controller/nucleus/types"
"code.fbi.h-da.de/danet/gosdn/controller/store"
"code.fbi.h-da.de/danet/gosdn/forks/goarista/gnmi"
"github.com/google/uuid"
gpb "github.com/openconfig/gnmi/proto/gnmi"
......@@ -23,6 +23,8 @@ const (
// Note: These gNMI options are adjusted to arista gNMI fork. Change when switching to native gNMI.
gNMISubscribeMode string = "stream"
gNMIStreamMode string = "on_change"
numberWorkers int = 5
workerQueueSize int = 1000
)
// NetworkElementWatcher is a component that subscribes to network elements via gNMI from within the controller and handles
......@@ -84,18 +86,30 @@ func (n *NetworkElementWatcher) subscribeToNetworkElement(mne networkelement.Net
go n.callSubscribe(stopContext, mne, opts)
}
// callSubscribe spawns a worker pool to handle gNMI subscription updates for each individual subscription
// and then sets up the gNMI subscription listener.
func (n *NetworkElementWatcher) callSubscribe(stopContext context.Context, mne networkelement.NetworkElement, opts *gnmi.SubscribeOptions) {
gNMIOptionsCtx := context.Background()
gNMIOptionsCtx = context.WithValue(gNMIOptionsCtx, types.CtxKeyOpts, opts)
subInfoChan := make(chan transport.SubscriptionInformation, workerQueueSize)
for i := 1; i <= numberWorkers; i++ {
name := "Worker " + strconv.Itoa(i)
worker := NewSubscriptionQueueWorker(name, n.handleSubscribeResponse)
go worker.HandleGnmiSubscriptionUpdates(subInfoChan)
}
// SubscriptionInformation contains pnd ID, network element ID and name to be used in the internal subscribe to check
// from which network element a response was sent
if err := mne.Transport().ControlPlaneSubscribe(gNMIOptionsCtx, n.handleSubscribeResponse, &transport.SubscriptionInformation{
if err := mne.Transport().ControlPlaneSubscribe(gNMIOptionsCtx, transport.SubscriptionInformation{
PndID: mne.PndID().String(),
NetworkElementID: mne.ID().String(),
NetworkElementName: mne.Name(),
StopContext: stopContext,
}); err != nil {
},
subInfoChan); err != nil {
log.Error(err)
}
}
......@@ -121,36 +135,37 @@ func (n *NetworkElementWatcher) StopAndRemoveNetworkElementSubscription(subID uu
// handleSubscribeResponse takes the subscribe response and additional information about the network element to distinguish
// from which network element a subscribe response was sent including improved error handling.
func (n *NetworkElementWatcher) handleSubscribeResponse(resp *gpb.SubscribeResponse, subscriptionInfo *transport.SubscriptionInformation) {
switch resp := resp.Response.(type) {
case *gpb.SubscribeResponse_Error:
log.Error(&customerrs.SubscribeResponseError{
PndID: subscriptionInfo.PndID,
NetworkElementID: subscriptionInfo.NetworkElementID,
NetworkElementName: subscriptionInfo.NetworkElementName,
Err: fmt.Sprint("SubscribeResponse_Error"),
})
case *gpb.SubscribeResponse_SyncResponse:
if !resp.SyncResponse {
log.Error(&customerrs.SubscribeSyncResponseError{
func (n *NetworkElementWatcher) handleSubscribeResponse(subscriptionInfo *transport.SubscriptionInformation, workerName string) {
if subscriptionInfo.SubResponse == nil {
// Note: This needs proper error handling, no idea how yet. Simply logging would lead to spam in the console
// if the target that was subscribed to is not reachable anymore.
// log.Error("Error: subresponse == nil")
} else {
switch resp := subscriptionInfo.SubResponse.Response.(type) {
case *gpb.SubscribeResponse_Error:
log.Error(&customerrs.SubscribeResponseError{
PndID: subscriptionInfo.PndID,
NetworkElementID: subscriptionInfo.NetworkElementID,
NetworkElementName: subscriptionInfo.NetworkElementName,
Err: fmt.Sprint("SubscribeResponse_Error"),
})
case *gpb.SubscribeResponse_SyncResponse:
if !resp.SyncResponse {
log.Error(&customerrs.SubscribeSyncResponseError{
PndID: subscriptionInfo.PndID,
NetworkElementID: subscriptionInfo.NetworkElementID,
NetworkElementName: subscriptionInfo.NetworkElementName,
})
}
case *gpb.SubscribeResponse_Update:
n.handleSubscribeResponseUpdate(resp, subscriptionInfo)
default:
log.Infof("Invalid SubscribeResponse, %v", resp)
}
case *gpb.SubscribeResponse_Update:
n.handleSubscribeResponseUpdate(resp, subscriptionInfo)
default:
log.Infof("Invalid SubscribeResponse, %v", resp)
}
}
func (n *NetworkElementWatcher) handleSubscribeResponseUpdate(resp *gpb.SubscribeResponse_Update, subscriptionInfo *transport.SubscriptionInformation) {
mne, err := n.mneService.Get(store.Query{ID: uuid.MustParse(subscriptionInfo.NetworkElementID)})
if err != nil {
log.Error(err)
}
pathsAndValues := make(map[string]string, len(resp.Update.Update))
for _, update := range resp.Update.Update {
......@@ -166,7 +181,12 @@ func (n *NetworkElementWatcher) handleSubscribeResponseUpdate(resp *gpb.Subscrib
pathsAndValues[pathString] = update.Val.GetStringVal()
}
pubEvent := event.NewMneUpdateEvent(mne.ID(), pathsAndValues)
mneID, err := uuid.Parse(subscriptionInfo.NetworkElementID)
if err != nil {
log.Errorf("Error trying to parse uuid, could not handle subscription response: %v", err)
}
pubEvent := event.NewMneUpdateEvent(mneID, pathsAndValues)
if err := n.eventService.PublishEvent(NetworkElementEventTopic, pubEvent); err != nil {
go func() {
n.eventService.Reconnect()
......
package nucleus
import (
"time"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/transport"
)
// SubscriptionQueueWorker is an interface to define a worker to handle the updates from gNMI Subscription from the NetWorkElementWatcher.
type SubscriptionQueueWorker interface {
HandleGnmiSubscriptionUpdates(chan transport.SubscriptionInformation)
}
// SubscriptionQueueWorkerImpl implements the SubscriptionQueueWorker interface. This is used as a worker pool to handle gNMI subscription updates.
type SubscriptionQueueWorkerImpl struct {
WorkerName string
workFunc func(*transport.SubscriptionInformation, string)
}
// NewSubscriptionQueueWorker creates a new SubscriptionQueueWorker.
func NewSubscriptionQueueWorker(name string, workFunc func(*transport.SubscriptionInformation, string)) SubscriptionQueueWorker {
return &SubscriptionQueueWorkerImpl{
WorkerName: name,
workFunc: workFunc,
}
}
// HandleGnmiSubscriptionUpdates handles assignment of tasks to free workers of the worker pool from SubscriptionQueueWorkerImpl.
func (s *SubscriptionQueueWorkerImpl) HandleGnmiSubscriptionUpdates(subInfoChan chan transport.SubscriptionInformation) {
// Note: Sleep was needed to prevent some issue with deadlocks.
// Maybe this needs some additional investigation/improvements in the future
for {
select {
case subInfo := <-subInfoChan:
s.workFunc(&subInfo, s.WorkerName)
time.Sleep(time.Millisecond * 2)
default:
time.Sleep(time.Millisecond * 2)
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment