Skip to content
Snippets Groups Projects
Commit 41534047 authored by Fabian Seidl's avatar Fabian Seidl
Browse files

added merge without duplicates for subscription paths, removed paths as...

added merge without duplicates for subscription paths, removed paths as parameter from methods in watcher
parent 53d7a64f
No related branches found
No related tags found
1 merge request!625Resolve "The mechanism to watch devices during controller start currently only supports watching the same path for each device"
Pipeline #170649 passed
...@@ -146,7 +146,7 @@ func initialize() error { ...@@ -146,7 +146,7 @@ func initialize() error {
} }
c.networkElementWatcher = nucleus.NewNetworkElementWatcher(c.mneService, c.eventService) c.networkElementWatcher = nucleus.NewNetworkElementWatcher(c.mneService, c.eventService)
c.networkElementWatcher.SubscribeToNetworkElements(config.GetGnmiSubscriptionPaths(), nil) c.networkElementWatcher.SubscribeToNetworkElements(nil)
if err := ensureDefaultRoleExists(); err != nil { if err := ensureDefaultRoleExists(); err != nil {
return err return err
......
...@@ -9,7 +9,6 @@ import ( ...@@ -9,7 +9,6 @@ import (
mnepb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/networkelement" mnepb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/networkelement"
ppb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/pnd" ppb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/pnd"
tpb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/transport" tpb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/transport"
"code.fbi.h-da.de/danet/gosdn/controller/config"
"code.fbi.h-da.de/danet/gosdn/controller/conflict" "code.fbi.h-da.de/danet/gosdn/controller/conflict"
"code.fbi.h-da.de/danet/gosdn/controller/customerrs" "code.fbi.h-da.de/danet/gosdn/controller/customerrs"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/change" "code.fbi.h-da.de/danet/gosdn/controller/interfaces/change"
...@@ -629,7 +628,7 @@ func (n *NetworkElementServer) addMne(ctx context.Context, ...@@ -629,7 +628,7 @@ func (n *NetworkElementServer) addMne(ctx context.Context,
return uuid.Nil, err return uuid.Nil, err
} }
n.networkElementWatchter.SubscribeToNetworkElement(mne, config.GetGnmiSubscriptionPaths(), nil) n.networkElementWatchter.SubscribeToNetworkElement(mne, nil)
} else { } else {
err = fmt.Errorf("invalid transport data provided") err = fmt.Errorf("invalid transport data provided")
return uuid.Nil, err return uuid.Nil, err
......
...@@ -3,8 +3,10 @@ package nucleus ...@@ -3,8 +3,10 @@ package nucleus
import ( import (
"context" "context"
"fmt" "fmt"
"sort"
"strconv" "strconv"
"code.fbi.h-da.de/danet/gosdn/controller/config"
"code.fbi.h-da.de/danet/gosdn/controller/customerrs" "code.fbi.h-da.de/danet/gosdn/controller/customerrs"
"code.fbi.h-da.de/danet/gosdn/controller/event" "code.fbi.h-da.de/danet/gosdn/controller/event"
eventInterfaces "code.fbi.h-da.de/danet/gosdn/controller/interfaces/event" eventInterfaces "code.fbi.h-da.de/danet/gosdn/controller/interfaces/event"
...@@ -14,6 +16,7 @@ import ( ...@@ -14,6 +16,7 @@ import (
"code.fbi.h-da.de/danet/gosdn/forks/goarista/gnmi" "code.fbi.h-da.de/danet/gosdn/forks/goarista/gnmi"
"github.com/google/uuid" "github.com/google/uuid"
gpb "github.com/openconfig/gnmi/proto/gnmi" gpb "github.com/openconfig/gnmi/proto/gnmi"
ygotutil "github.com/openconfig/ygot/util"
"github.com/openconfig/ygot/ygot" "github.com/openconfig/ygot/ygot"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
...@@ -51,14 +54,12 @@ func NewNetworkElementWatcher(mneService networkelement.Service, eventService ev ...@@ -51,14 +54,12 @@ func NewNetworkElementWatcher(mneService networkelement.Service, eventService ev
} }
// SubscribeToNetworkElements subscribes to every available network element in each network domain according to provided SubscribeOptions. // SubscribeToNetworkElements subscribes to every available network element in each network domain according to provided SubscribeOptions.
// Paths should be provided in the following format [][]string{{"system", "config", "hostname"}}
// SubscribeOptions can be nil. Use nil for a fixed, pre-defined set of gNMI subscription options (streaming in sample mode each second). // SubscribeOptions can be nil. Use nil for a fixed, pre-defined set of gNMI subscription options (streaming in sample mode each second).
func (n *NetworkElementWatcher) SubscribeToNetworkElements(paths [][]string, opts *gnmi.SubscribeOptions) { func (n *NetworkElementWatcher) SubscribeToNetworkElements(opts *gnmi.SubscribeOptions) {
if opts == nil { if opts == nil {
opts = &gnmi.SubscribeOptions{ opts = &gnmi.SubscribeOptions{
Mode: gNMISubscribeMode, Mode: gNMISubscribeMode,
StreamMode: gNMIStreamMode, StreamMode: gNMIStreamMode,
Paths: paths,
SampleInterval: subscribeSampleInterval, SampleInterval: subscribeSampleInterval,
} }
} }
...@@ -75,14 +76,12 @@ func (n *NetworkElementWatcher) SubscribeToNetworkElements(paths [][]string, opt ...@@ -75,14 +76,12 @@ func (n *NetworkElementWatcher) SubscribeToNetworkElements(paths [][]string, opt
} }
// SubscribeToNetworkElement subscribes to the provided network element according to provided SubscribeOptions. // SubscribeToNetworkElement subscribes to the provided network element according to provided SubscribeOptions.
// Paths should be provided in the following format [][]string{{"system", "config", "hostname"}}
// SubscribeOptions can be nil. Use nil for a fixed, pre-defined set of gNMI subscription options (streaming in sample mode each second). // SubscribeOptions can be nil. Use nil for a fixed, pre-defined set of gNMI subscription options (streaming in sample mode each second).
func (n *NetworkElementWatcher) SubscribeToNetworkElement(mne networkelement.NetworkElement, paths [][]string, opts *gnmi.SubscribeOptions) { func (n *NetworkElementWatcher) SubscribeToNetworkElement(mne networkelement.NetworkElement, opts *gnmi.SubscribeOptions) {
if opts == nil { if opts == nil {
opts = &gnmi.SubscribeOptions{ opts = &gnmi.SubscribeOptions{
Mode: gNMISubscribeMode, Mode: gNMISubscribeMode,
StreamMode: gNMIStreamMode, StreamMode: gNMIStreamMode,
Paths: paths,
SampleInterval: subscribeSampleInterval, SampleInterval: subscribeSampleInterval,
} }
} }
...@@ -98,14 +97,14 @@ func (n *NetworkElementWatcher) subscribeToNetworkElement(mne networkelement.Net ...@@ -98,14 +97,14 @@ func (n *NetworkElementWatcher) subscribeToNetworkElement(mne networkelement.Net
stopFunc: cancel, stopFunc: cancel,
}) })
opts.Paths = n.mergeGnmiSubscriptions(mne.GetGnmiSubscriptionPaths(), config.GetGnmiSubscriptionPaths())
go n.callSubscribe(stopContext, mne, opts) go n.callSubscribe(stopContext, mne, opts)
} }
// callSubscribe spawns a worker pool to handle gNMI subscription updates for each individual subscription // callSubscribe spawns a worker pool to handle gNMI subscription updates for each individual subscription
// and then sets up the gNMI subscription listener. // and then sets up the gNMI subscription listener.
func (n *NetworkElementWatcher) callSubscribe(stopContext context.Context, mne networkelement.NetworkElement, opts *gnmi.SubscribeOptions) { func (n *NetworkElementWatcher) callSubscribe(stopContext context.Context, mne networkelement.NetworkElement, opts *gnmi.SubscribeOptions) {
opts.Paths = mne.GetGnmiSubscriptionPaths()
gNMIOptionsCtx := context.Background() gNMIOptionsCtx := context.Background()
gNMIOptionsCtx = context.WithValue(gNMIOptionsCtx, types.CtxKeyOpts, opts) gNMIOptionsCtx = context.WithValue(gNMIOptionsCtx, types.CtxKeyOpts, opts)
...@@ -212,21 +211,38 @@ func (n *NetworkElementWatcher) handleSubscribeResponseUpdate(resp *gpb.Subscrib ...@@ -212,21 +211,38 @@ func (n *NetworkElementWatcher) handleSubscribeResponseUpdate(resp *gpb.Subscrib
} }
} }
// TODO(faseid): implement path join! // mergeGnmiSubscriptions takes paths for gNMI Subscriptions from two sources (the MNE and a config file)
// func (n *NetworkElementWatcher) joinGnmiSubscriptions(gNMISusbcriptionPathsFromMne, gNMISusbcriptionPathsFromConfig [][]string) [][]string { // and merges them to one set of subscription paths without duplicates.
// var joinedPaths = gNMISusbcriptionPathsFromMne // Paths should be provided in the following format [][]string{{"system", "config", "hostname"}}.
func (n *NetworkElementWatcher) mergeGnmiSubscriptions(gNMISusbcriptionPathsFromMne, gNMISusbcriptionPathsFromConfig [][]string) [][]string {
// for i, _ := range gNMISusbcriptionPathsFromMne { // create slice with all elements
var tempJoinedPaths = gNMISusbcriptionPathsFromMne
// } for _, configPath := range gNMISusbcriptionPathsFromConfig {
tempJoinedPaths = append(tempJoinedPaths, configPath)
}
// for _, mnePath := range joinedPaths { // sort slice
// for _, configPath := range gNMISusbcriptionPathsFromConfig { sort.Slice(tempJoinedPaths[:], func(i, j int) bool {
// if mnePath != configPath { for elem := range tempJoinedPaths[i] {
if tempJoinedPaths[i][elem] == tempJoinedPaths[j][elem] {
continue
}
return tempJoinedPaths[i][elem] < tempJoinedPaths[j][elem]
}
return false
})
// } // remove duplicates
// } inResult := make(map[string]bool)
// } var joinedPaths [][]string
for _, pathSlice := range tempJoinedPaths {
path := ygotutil.SlicePathToString(pathSlice)
if _, ok := inResult[path]; !ok {
inResult[path] = true
joinedPaths = append(joinedPaths, pathSlice)
}
}
// return joinedPaths return joinedPaths
// } }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment