diff --git a/controller/interfaces/transport/transport.go b/controller/interfaces/transport/transport.go index 586115cda3ff97831832ea16e5e7810f262cbd10..90fbb3ccbcbcc31b082bdfbd038c3034566128ad 100644 --- a/controller/interfaces/transport/transport.go +++ b/controller/interfaces/transport/transport.go @@ -15,8 +15,7 @@ type Transport interface { Set(ctx context.Context, payload change.Payload) error CustomSet(ctx context.Context, req *gpb.SetRequest) (*gpb.SetResponse, error) Subscribe(ctx context.Context, params ...string) error - ControlPlaneSubscribe(ctx context.Context, subscribeCallbackFunc HandleSubscribeResponse, - subscriptionInfo *SubscriptionInformation) error + ControlPlaneSubscribe(ctx context.Context, subscriptionInfo SubscriptionInformation, subInfoChannel chan SubscriptionInformation) error Type() string ProcessResponse(resp interface{}) error ProcessControlPlaneSubscribeResponse(resp *gpb.SubscribeResponse_Update) error @@ -35,4 +34,5 @@ type SubscriptionInformation struct { NetworkElementID string NetworkElementName string StopContext context.Context + SubResponse *gpb.SubscribeResponse } diff --git a/controller/mocks/Transport.go b/controller/mocks/Transport.go index b3a01d0af93be7841901f15c2da8a267554bf529..c089f6f014d6fc870567f54bb2fc35eecc0241e2 100644 --- a/controller/mocks/Transport.go +++ b/controller/mocks/Transport.go @@ -19,13 +19,13 @@ type Transport struct { mock.Mock } -// ControlPlaneSubscribe provides a mock function with given fields: ctx, subscribeCallbackFunc, subscriptionInfo -func (_m *Transport) ControlPlaneSubscribe(ctx context.Context, subscribeCallbackFunc transport.HandleSubscribeResponse, subscriptionInfo *transport.SubscriptionInformation) error { - ret := _m.Called(ctx, subscribeCallbackFunc, subscriptionInfo) +// ControlPlaneSubscribe provides a mock function with given fields: ctx, subscriptionInfo, subInfoChannel +func (_m *Transport) ControlPlaneSubscribe(ctx context.Context, subscriptionInfo transport.SubscriptionInformation, subInfoChannel chan transport.SubscriptionInformation) error { + ret := _m.Called(ctx, subscriptionInfo, subInfoChannel) var r0 error - if rf, ok := ret.Get(0).(func(context.Context, transport.HandleSubscribeResponse, *transport.SubscriptionInformation) error); ok { - r0 = rf(ctx, subscribeCallbackFunc, subscriptionInfo) + if rf, ok := ret.Get(0).(func(context.Context, transport.SubscriptionInformation, chan transport.SubscriptionInformation) error); ok { + r0 = rf(ctx, subscriptionInfo, subInfoChannel) } else { r0 = ret.Error(0) } diff --git a/controller/nucleus/gnmi_transport.go b/controller/nucleus/gnmi_transport.go index 4df799bc46b0ef5ff2092232e525f031edf9d8a4..99a71797561d5aea94cb4e5efb1d7842c1ecfce9 100644 --- a/controller/nucleus/gnmi_transport.go +++ b/controller/nucleus/gnmi_transport.go @@ -3,6 +3,7 @@ package nucleus import ( "context" "fmt" + "time" "code.fbi.h-da.de/danet/gosdn/controller/interfaces/change" tpInterface "code.fbi.h-da.de/danet/gosdn/controller/interfaces/transport" @@ -125,12 +126,12 @@ func (g *Gnmi) Subscribe(ctx context.Context, params ...string) error { // ControlPlaneSubscribe is used to subscribe to devices from within the controller. gNMI SubscribeOptions need to be provided in the context, // the callback function handles the responses received from the subscription. -func (g *Gnmi) ControlPlaneSubscribe(ctx context.Context, subscribeCallbackFunc tpInterface.HandleSubscribeResponse, subscriptionInfo *tpInterface.SubscriptionInformation) error { +func (g *Gnmi) ControlPlaneSubscribe(ctx context.Context, subscriptionInfo tpInterface.SubscriptionInformation, subInfoChannel chan tpInterface.SubscriptionInformation) error { if g.client == nil { return &customerrs.NilClientError{} } - return g.controlPlaneSubscribe(ctx, subscribeCallbackFunc, subscriptionInfo) + return g.controlPlaneSubscribe(ctx, subscriptionInfo, subInfoChannel) } // Type returns the gNMI transport type. @@ -289,8 +290,7 @@ func (g *Gnmi) subscribe(ctx context.Context) error { // controlPlaneSubscribe calls gNMI subscribe with a callback for responses and additional network element information including // an option to stop the subscription. -func (g *Gnmi) controlPlaneSubscribe(ctx context.Context, subcribeCallbackFunc func(*gpb.SubscribeResponse, - *tpInterface.SubscriptionInformation), subscriptionInfo *tpInterface.SubscriptionInformation) error { +func (g *Gnmi) controlPlaneSubscribe(ctx context.Context, subscriptionInfo tpInterface.SubscriptionInformation, subInfoChannel chan tpInterface.SubscriptionInformation) error { ctx = gnmi.NewContext(ctx, g.config) opts, ok := ctx.Value(types.CtxKeyOpts).(*gnmi.SubscribeOptions) if !ok { @@ -309,10 +309,8 @@ func (g *Gnmi) controlPlaneSubscribe(ctx context.Context, subcribeCallbackFunc f }).Info("subscribed to gNMI target") for { resp := <-g.RespChan - if resp != nil { - // callback to trigger internal event handling process - go subcribeCallbackFunc(resp, subscriptionInfo) - } + subscriptionInfo.SubResponse = resp + subInfoChannel <- subscriptionInfo select { case <-subscriptionInfo.StopContext.Done(): @@ -321,9 +319,12 @@ func (g *Gnmi) controlPlaneSubscribe(ctx context.Context, subcribeCallbackFunc f } return default: + log.Infof("Buffer Length: %v", len(subInfoChannel)) + time.Sleep(time.Millisecond * 2) } } }() + return gnmi.SubscribeErr(ctx, g.client, opts, g.RespChan) } diff --git a/controller/nucleus/networkElementWatcher.go b/controller/nucleus/networkElementWatcher.go index 2ead5fcdb1644d334978ffa1a8cebbfa155e2c56..3c88f4f1bb64ccb2fa4e07f5a1fa2fc03d24cb46 100644 --- a/controller/nucleus/networkElementWatcher.go +++ b/controller/nucleus/networkElementWatcher.go @@ -3,6 +3,7 @@ package nucleus import ( "context" "fmt" + "strconv" "strings" "code.fbi.h-da.de/danet/gosdn/controller/customerrs" @@ -11,7 +12,6 @@ import ( "code.fbi.h-da.de/danet/gosdn/controller/interfaces/networkelement" "code.fbi.h-da.de/danet/gosdn/controller/interfaces/transport" "code.fbi.h-da.de/danet/gosdn/controller/nucleus/types" - "code.fbi.h-da.de/danet/gosdn/controller/store" "code.fbi.h-da.de/danet/gosdn/forks/goarista/gnmi" "github.com/google/uuid" gpb "github.com/openconfig/gnmi/proto/gnmi" @@ -23,6 +23,8 @@ const ( // Note: These gNMI options are adjusted to arista gNMI fork. Change when switching to native gNMI. gNMISubscribeMode string = "stream" gNMIStreamMode string = "on_change" + numberWorkers int = 5 + workerQueueSize int = 1000 ) // NetworkElementWatcher is a component that subscribes to network elements via gNMI from within the controller and handles @@ -84,18 +86,30 @@ func (n *NetworkElementWatcher) subscribeToNetworkElement(mne networkelement.Net go n.callSubscribe(stopContext, mne, opts) } +// callSubscribe spawns a worker pool to handle gNMI subscription updates for each individual subscription +// and then sets up the gNMI subscription listener. func (n *NetworkElementWatcher) callSubscribe(stopContext context.Context, mne networkelement.NetworkElement, opts *gnmi.SubscribeOptions) { gNMIOptionsCtx := context.Background() gNMIOptionsCtx = context.WithValue(gNMIOptionsCtx, types.CtxKeyOpts, opts) + subInfoChan := make(chan transport.SubscriptionInformation, workerQueueSize) + + for i := 1; i <= numberWorkers; i++ { + name := "Worker " + strconv.Itoa(i) + + worker := NewSubscriptionQueueWorker(name, n.handleSubscribeResponse) + go worker.HandleGnmiSubscriptionUpdates(subInfoChan) + } + // SubscriptionInformation contains pnd ID, network element ID and name to be used in the internal subscribe to check // from which network element a response was sent - if err := mne.Transport().ControlPlaneSubscribe(gNMIOptionsCtx, n.handleSubscribeResponse, &transport.SubscriptionInformation{ + if err := mne.Transport().ControlPlaneSubscribe(gNMIOptionsCtx, transport.SubscriptionInformation{ PndID: mne.PndID().String(), NetworkElementID: mne.ID().String(), NetworkElementName: mne.Name(), StopContext: stopContext, - }); err != nil { + }, + subInfoChan); err != nil { log.Error(err) } } @@ -121,36 +135,37 @@ func (n *NetworkElementWatcher) StopAndRemoveNetworkElementSubscription(subID uu // handleSubscribeResponse takes the subscribe response and additional information about the network element to distinguish // from which network element a subscribe response was sent including improved error handling. -func (n *NetworkElementWatcher) handleSubscribeResponse(resp *gpb.SubscribeResponse, subscriptionInfo *transport.SubscriptionInformation) { - switch resp := resp.Response.(type) { - case *gpb.SubscribeResponse_Error: - log.Error(&customerrs.SubscribeResponseError{ - PndID: subscriptionInfo.PndID, - NetworkElementID: subscriptionInfo.NetworkElementID, - NetworkElementName: subscriptionInfo.NetworkElementName, - Err: fmt.Sprint("SubscribeResponse_Error"), - }) - case *gpb.SubscribeResponse_SyncResponse: - if !resp.SyncResponse { - log.Error(&customerrs.SubscribeSyncResponseError{ +func (n *NetworkElementWatcher) handleSubscribeResponse(subscriptionInfo *transport.SubscriptionInformation, workerName string) { + if subscriptionInfo.SubResponse == nil { + // Note: This needs proper error handling, no idea how yet. Simply logging would lead to spam in the console + // if the target that was subscribed to is not reachable anymore. + // log.Error("Error: subresponse == nil") + } else { + switch resp := subscriptionInfo.SubResponse.Response.(type) { + case *gpb.SubscribeResponse_Error: + log.Error(&customerrs.SubscribeResponseError{ PndID: subscriptionInfo.PndID, NetworkElementID: subscriptionInfo.NetworkElementID, NetworkElementName: subscriptionInfo.NetworkElementName, + Err: fmt.Sprint("SubscribeResponse_Error"), }) + case *gpb.SubscribeResponse_SyncResponse: + if !resp.SyncResponse { + log.Error(&customerrs.SubscribeSyncResponseError{ + PndID: subscriptionInfo.PndID, + NetworkElementID: subscriptionInfo.NetworkElementID, + NetworkElementName: subscriptionInfo.NetworkElementName, + }) + } + case *gpb.SubscribeResponse_Update: + n.handleSubscribeResponseUpdate(resp, subscriptionInfo) + default: + log.Infof("Invalid SubscribeResponse, %v", resp) } - case *gpb.SubscribeResponse_Update: - n.handleSubscribeResponseUpdate(resp, subscriptionInfo) - default: - log.Infof("Invalid SubscribeResponse, %v", resp) } } func (n *NetworkElementWatcher) handleSubscribeResponseUpdate(resp *gpb.SubscribeResponse_Update, subscriptionInfo *transport.SubscriptionInformation) { - mne, err := n.mneService.Get(store.Query{ID: uuid.MustParse(subscriptionInfo.NetworkElementID)}) - if err != nil { - log.Error(err) - } - pathsAndValues := make(map[string]string, len(resp.Update.Update)) for _, update := range resp.Update.Update { @@ -166,7 +181,12 @@ func (n *NetworkElementWatcher) handleSubscribeResponseUpdate(resp *gpb.Subscrib pathsAndValues[pathString] = update.Val.GetStringVal() } - pubEvent := event.NewMneUpdateEvent(mne.ID(), pathsAndValues) + mneID, err := uuid.Parse(subscriptionInfo.NetworkElementID) + if err != nil { + log.Errorf("Error trying to parse uuid, could not handle subscription response: %v", err) + } + + pubEvent := event.NewMneUpdateEvent(mneID, pathsAndValues) if err := n.eventService.PublishEvent(NetworkElementEventTopic, pubEvent); err != nil { go func() { n.eventService.Reconnect() diff --git a/controller/nucleus/subscriptionQueueHandler.go b/controller/nucleus/subscriptionQueueHandler.go new file mode 100644 index 0000000000000000000000000000000000000000..28bcd73f13dfb9b5892c84edcd8793b25f7555cb --- /dev/null +++ b/controller/nucleus/subscriptionQueueHandler.go @@ -0,0 +1,41 @@ +package nucleus + +import ( + "time" + + "code.fbi.h-da.de/danet/gosdn/controller/interfaces/transport" +) + +// SubscriptionQueueWorker is an interface to define a worker to handle the updates from gNMI Subscription from the NetWorkElementWatcher. +type SubscriptionQueueWorker interface { + HandleGnmiSubscriptionUpdates(chan transport.SubscriptionInformation) +} + +// SubscriptionQueueWorkerImpl implements the SubscriptionQueueWorker interface. This is used as a worker pool to handle gNMI subscription updates. +type SubscriptionQueueWorkerImpl struct { + WorkerName string + workFunc func(*transport.SubscriptionInformation, string) +} + +// NewSubscriptionQueueWorker creates a new SubscriptionQueueWorker. +func NewSubscriptionQueueWorker(name string, workFunc func(*transport.SubscriptionInformation, string)) SubscriptionQueueWorker { + return &SubscriptionQueueWorkerImpl{ + WorkerName: name, + workFunc: workFunc, + } +} + +// HandleGnmiSubscriptionUpdates handles assignment of tasks to free workers of the worker pool from SubscriptionQueueWorkerImpl. +func (s *SubscriptionQueueWorkerImpl) HandleGnmiSubscriptionUpdates(subInfoChan chan transport.SubscriptionInformation) { + // Note: Sleep was needed to prevent some issue with deadlocks. + // Maybe this needs some additional investigation/improvements in the future + for { + select { + case subInfo := <-subInfoChan: + s.workFunc(&subInfo, s.WorkerName) + time.Sleep(time.Millisecond * 2) + default: + time.Sleep(time.Millisecond * 2) + } + } +}