Skip to content
Snippets Groups Projects
Commit 4f9ef114 authored by Fabian Seidl's avatar Fabian Seidl
Browse files

Resolve "Subscribe to network element on creation in controller"

See merge request !564
parent 4e09da6c
No related branches found
No related tags found
1 merge request!564Resolve "Subscribe to network element on creation in controller"
Pipeline #163794 passed
...@@ -206,6 +206,8 @@ func bootstrapUnitTest() { ...@@ -206,6 +206,8 @@ func bootstrapUnitTest() {
conflict.Metadata{ResourceVersion: 0}) conflict.Metadata{ResourceVersion: 0})
_ = networkElementService.Add(mne) _ = networkElementService.Add(mne)
networkElementWatcher := nucleus.NewNetworkElementWatcher(networkElementService, eventService)
pndService := &mocks.PndService{} pndService := &mocks.PndService{}
pndService.On("GetAll").Return([]networkdomain.NetworkDomain{}, nil) pndService.On("GetAll").Return([]networkdomain.NetworkDomain{}, nil)
pndService.On("Add", mock.Anything).Return(nil) pndService.On("Add", mock.Anything).Return(nil)
...@@ -234,6 +236,7 @@ func bootstrapUnitTest() { ...@@ -234,6 +236,7 @@ func bootstrapUnitTest() {
rpb.NewPluginRegistryServiceClient(&grpc.ClientConn{}), rpb.NewPluginRegistryServiceClient(&grpc.ClientConn{}),
csbi.NewCsbiServiceClient(&grpc.ClientConn{}), csbi.NewCsbiServiceClient(&grpc.ClientConn{}),
func(u uuid.UUID, c chan networkelement.Details) {}, func(u uuid.UUID, c chan networkelement.Details) {},
networkElementWatcher,
) )
ppb.RegisterPndServiceServer(s, northbound.Pnd) ppb.RegisterPndServiceServer(s, northbound.Pnd)
......
...@@ -213,6 +213,7 @@ func startGrpc() error { ...@@ -213,6 +213,7 @@ func startGrpc() error {
c.pluginRegistryClient, c.pluginRegistryClient,
c.csbiClient, c.csbiClient,
callback, callback,
c.networkElementWatcher,
) )
ppb.RegisterPndServiceServer(c.grpcServer, c.nbi.Pnd) ppb.RegisterPndServiceServer(c.grpcServer, c.nbi.Pnd)
......
...@@ -8,6 +8,7 @@ import ( ...@@ -8,6 +8,7 @@ import (
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/networkelement" "code.fbi.h-da.de/danet/gosdn/controller/interfaces/networkelement"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/plugin" "code.fbi.h-da.de/danet/gosdn/controller/interfaces/plugin"
rbacInterfaces "code.fbi.h-da.de/danet/gosdn/controller/interfaces/rbac" rbacInterfaces "code.fbi.h-da.de/danet/gosdn/controller/interfaces/rbac"
"code.fbi.h-da.de/danet/gosdn/controller/nucleus"
"code.fbi.h-da.de/danet/gosdn/controller/rbac" "code.fbi.h-da.de/danet/gosdn/controller/rbac"
"code.fbi.h-da.de/danet/gosdn/controller/store" "code.fbi.h-da.de/danet/gosdn/controller/store"
"code.fbi.h-da.de/danet/gosdn/controller/topology" "code.fbi.h-da.de/danet/gosdn/controller/topology"
...@@ -58,6 +59,7 @@ func NewNBI( ...@@ -58,6 +59,7 @@ func NewNBI(
pluginRegistryClient rpb.PluginRegistryServiceClient, pluginRegistryClient rpb.PluginRegistryServiceClient,
csbiClient cpb.CsbiServiceClient, csbiClient cpb.CsbiServiceClient,
pndCallbackFn func(uuid.UUID, chan networkelement.Details), pndCallbackFn func(uuid.UUID, chan networkelement.Details),
networkElementWatchter *nucleus.NetworkElementWatcher,
) *NorthboundInterface { ) *NorthboundInterface {
protoValidator, err := protovalidate.New() protoValidator, err := protovalidate.New()
if err != nil { if err != nil {
...@@ -73,7 +75,7 @@ func NewNBI( ...@@ -73,7 +75,7 @@ func NewNBI(
Role: NewRoleServer(&jwt, roles, protoValidator), Role: NewRoleServer(&jwt, roles, protoValidator),
Topology: NewTopologyServer(topologyService, nodeService, portService, protoValidator), Topology: NewTopologyServer(topologyService, nodeService, portService, protoValidator),
App: NewAppServer(apps, protoValidator), App: NewAppServer(apps, protoValidator),
NetworkElement: NewNetworkElementServer(mneService, pndService, pluginService, changeStore, protoValidator), NetworkElement: NewNetworkElementServer(mneService, pndService, pluginService, changeStore, protoValidator, networkElementWatchter),
Routes: NewRoutingTableServiceServer(routeService, nodeService, portService, protoValidator), Routes: NewRoutingTableServiceServer(routeService, nodeService, portService, protoValidator),
ConfigurationManagement: NewConfigurationManagementServer(pndService, mneService, topologyService, nodeService, portService, pluginService, protoValidator), ConfigurationManagement: NewConfigurationManagementServer(pndService, mneService, topologyService, nodeService, portService, pluginService, protoValidator),
} }
......
...@@ -9,6 +9,7 @@ import ( ...@@ -9,6 +9,7 @@ import (
mnepb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/networkelement" mnepb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/networkelement"
ppb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/pnd" ppb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/pnd"
tpb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/transport" tpb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/transport"
"code.fbi.h-da.de/danet/gosdn/controller/config"
"code.fbi.h-da.de/danet/gosdn/controller/conflict" "code.fbi.h-da.de/danet/gosdn/controller/conflict"
"code.fbi.h-da.de/danet/gosdn/controller/customerrs" "code.fbi.h-da.de/danet/gosdn/controller/customerrs"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/change" "code.fbi.h-da.de/danet/gosdn/controller/interfaces/change"
...@@ -36,11 +37,12 @@ import ( ...@@ -36,11 +37,12 @@ import (
// NetworkElementServer represents a NetworkElementServer. // NetworkElementServer represents a NetworkElementServer.
type NetworkElementServer struct { type NetworkElementServer struct {
mnepb.UnimplementedNetworkElementServiceServer mnepb.UnimplementedNetworkElementServiceServer
mneService networkelement.Service mneService networkelement.Service
pndService networkdomain.Service pndService networkdomain.Service
pluginService plugin.Service pluginService plugin.Service
changeStore store.ChangeStore changeStore store.ChangeStore
protoValidator *protovalidate.Validator protoValidator *protovalidate.Validator
networkElementWatchter *nucleus.NetworkElementWatcher
} }
// NewNetworkElementServer returns a new NetWorkElementServer. // NewNetworkElementServer returns a new NetWorkElementServer.
...@@ -50,13 +52,15 @@ func NewNetworkElementServer( ...@@ -50,13 +52,15 @@ func NewNetworkElementServer(
pluginService plugin.Service, pluginService plugin.Service,
changeStore store.ChangeStore, changeStore store.ChangeStore,
protoValidator *protovalidate.Validator, protoValidator *protovalidate.Validator,
networkElementWatchter *nucleus.NetworkElementWatcher,
) *NetworkElementServer { ) *NetworkElementServer {
return &NetworkElementServer{ return &NetworkElementServer{
mneService: mneService, mneService: mneService,
pndService: pndService, pndService: pndService,
pluginService: pluginService, pluginService: pluginService,
changeStore: changeStore, changeStore: changeStore,
protoValidator: protoValidator, protoValidator: protoValidator,
networkElementWatchter: networkElementWatchter,
} }
} }
...@@ -756,6 +760,8 @@ func (n *NetworkElementServer) addMne(ctx context.Context, name string, opt *tpb ...@@ -756,6 +760,8 @@ func (n *NetworkElementServer) addMne(ctx context.Context, name string, opt *tpb
if err != nil { if err != nil {
return uuid.Nil, err return uuid.Nil, err
} }
n.networkElementWatchter.SubscribeToNetworkElement(mne, config.GetGnmiSubscriptionPaths(), nil)
} else { } else {
return uuid.Nil, status.Errorf(codes.InvalidArgument, "invalid transport data provided") return uuid.Nil, status.Errorf(codes.InvalidArgument, "invalid transport data provided")
} }
......
...@@ -74,6 +74,21 @@ func (n *NetworkElementWatcher) SubscribeToNetworkElements(paths [][]string, opt ...@@ -74,6 +74,21 @@ func (n *NetworkElementWatcher) SubscribeToNetworkElements(paths [][]string, opt
} }
} }
// SubscribeToNetworkElement subscribes to the provided network element according to provided SubscribeOptions.
// Paths should be provided in the following format [][]string{{"system", "config", "hostname"}}
// SubscribeOptions can be nil. Use nil for a fixed, pre-defined set of gNMI subscription options (streaming in sample mode each second).
func (n *NetworkElementWatcher) SubscribeToNetworkElement(mne networkelement.NetworkElement, paths [][]string, opts *gnmi.SubscribeOptions) {
if opts == nil {
opts = &gnmi.SubscribeOptions{
Mode: gNMISubscribeMode,
StreamMode: gNMIStreamMode,
Paths: paths,
SampleInterval: subscribeSampleInterval,
}
}
n.subscribeToNetworkElement(mne, opts)
}
func (n *NetworkElementWatcher) subscribeToNetworkElement(mne networkelement.NetworkElement, opts *gnmi.SubscribeOptions) { func (n *NetworkElementWatcher) subscribeToNetworkElement(mne networkelement.NetworkElement, opts *gnmi.SubscribeOptions) {
subID := uuid.New() subID := uuid.New()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment