Skip to content
Snippets Groups Projects
Commit 57693fd1 authored by Malte Bauch's avatar Malte Bauch
Browse files

Resolve "Too many open files"

See merge request !1074
parent 2376280c
No related branches found
No related tags found
1 merge request!1074Resolve "Too many open files"
Pipeline #227957 passed
Showing
with 172 additions and 92 deletions
...@@ -2,6 +2,7 @@ package registration ...@@ -2,6 +2,7 @@ package registration
import ( import (
"context" "context"
"fmt"
"time" "time"
"code.fbi.h-da.de/danet/gosdn/api/go/gosdn/app" "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/app"
...@@ -15,6 +16,12 @@ func Register(ctx context.Context, gosdnAddress, name, token string) (string, er ...@@ -15,6 +16,12 @@ func Register(ctx context.Context, gosdnAddress, name, token string) (string, er
if err != nil { if err != nil {
return "", err return "", err
} }
defer func() {
if ferr := conn.Close(); ferr != nil {
fErrString := ferr.Error()
err = fmt.Errorf("InternalError=%w DeferError=%+s", err, fErrString)
}
}()
appService := app.NewAppServiceClient(conn) appService := app.NewAppServiceClient(conn)
......
...@@ -5,13 +5,19 @@ import ( ...@@ -5,13 +5,19 @@ import (
"google.golang.org/grpc" "google.golang.org/grpc"
) )
var appClientConnection *grpc.ClientConn
// AppClient returns a client for the gRPC App service. It takes // AppClient returns a client for the gRPC App service. It takes
// the address of the gRPC endpoint and optional grpc.DialOption // the address of the gRPC endpoint and optional grpc.DialOption
// as argument. // as argument.
func AppClient(addr string, opts ...grpc.DialOption) (apb.AppServiceClient, error) { func AppClient(addr string, opts ...grpc.DialOption) (apb.AppServiceClient, error) {
conn, err := grpc.NewClient(addr, opts...) var err error
if err != nil {
return nil, err if appClientConnection == nil {
appClientConnection, err = grpc.NewClient(addr, opts...)
if err != nil {
return nil, err
}
} }
return apb.NewAppServiceClient(conn), nil return apb.NewAppServiceClient(appClientConnection), nil
} }
...@@ -5,13 +5,20 @@ import ( ...@@ -5,13 +5,20 @@ import (
"google.golang.org/grpc" "google.golang.org/grpc"
) )
var configurationManagementClientConnection *grpc.ClientConn
// ConfigurationManagementClient returns a client for the gRPC ConfigurationManagement service. It takes // ConfigurationManagementClient returns a client for the gRPC ConfigurationManagement service. It takes
// the address of the gRPC endpoint and optional grpc.DialOption // the address of the gRPC endpoint and optional grpc.DialOption
// as argument. // as argument.
func ConfigurationManagementClient(addr string, opts ...grpc.DialOption) (cpb.ConfigurationManagementServiceClient, error) { func ConfigurationManagementClient(addr string, opts ...grpc.DialOption) (cpb.ConfigurationManagementServiceClient, error) {
conn, err := grpc.NewClient(addr, opts...) var err error
if err != nil {
return nil, err if configurationManagementClientConnection == nil {
configurationManagementClientConnection, err = grpc.NewClient(addr, opts...)
if err != nil {
return nil, err
}
} }
return cpb.NewConfigurationManagementServiceClient(conn), nil
return cpb.NewConfigurationManagementServiceClient(configurationManagementClientConnection), nil
} }
...@@ -5,13 +5,20 @@ import ( ...@@ -5,13 +5,20 @@ import (
"google.golang.org/grpc" "google.golang.org/grpc"
) )
var networkElementServiceClientConnection *grpc.ClientConn
// NetworkElementClient returns a client for the gRPC NetworkElement service. It takes // NetworkElementClient returns a client for the gRPC NetworkElement service. It takes
// the address of the gRPC endpoint and optional grpc.DialOption // the address of the gRPC endpoint and optional grpc.DialOption
// as argument. // as argument.
func NetworkElementClient(addr string, opts ...grpc.DialOption) (mnepb.NetworkElementServiceClient, error) { func NetworkElementClient(addr string, opts ...grpc.DialOption) (mnepb.NetworkElementServiceClient, error) {
conn, err := grpc.NewClient(addr, opts...) var err error
if err != nil {
return nil, err if networkElementServiceClientConnection == nil {
networkElementServiceClientConnection, err = grpc.NewClient(addr, opts...)
if err != nil {
return nil, err
}
} }
return mnepb.NewNetworkElementServiceClient(conn), nil
return mnepb.NewNetworkElementServiceClient(networkElementServiceClientConnection), nil
} }
...@@ -5,10 +5,16 @@ import ( ...@@ -5,10 +5,16 @@ import (
"google.golang.org/grpc" "google.golang.org/grpc"
) )
var pluginClientConnection *grpc.ClientConn
func PluginClient(addr string, opts ...grpc.DialOption) (pipb.PluginInternalServiceClient, error) { func PluginClient(addr string, opts ...grpc.DialOption) (pipb.PluginInternalServiceClient, error) {
conn, err := grpc.NewClient(addr, opts...) var err error
if err != nil {
return nil, err if pluginClientConnection == nil {
pluginClientConnection, err = grpc.NewClient(addr, opts...)
if err != nil {
return nil, err
}
} }
return pipb.NewPluginInternalServiceClient(conn), nil return pipb.NewPluginInternalServiceClient(pluginClientConnection), nil
} }
...@@ -5,13 +5,20 @@ import ( ...@@ -5,13 +5,20 @@ import (
"google.golang.org/grpc" "google.golang.org/grpc"
) )
var pndClientConnection *grpc.ClientConn
// PndClient returns a client for the gRPC PND service. It takes // PndClient returns a client for the gRPC PND service. It takes
// the address of the gRPC endpoint and optional grpc.DialOption // the address of the gRPC endpoint and optional grpc.DialOption
// as argument. // as argument.
func PndClient(addr string, opts ...grpc.DialOption) (ppb.PndServiceClient, error) { func PndClient(addr string, opts ...grpc.DialOption) (ppb.PndServiceClient, error) {
conn, err := grpc.NewClient(addr, opts...) var err error
if err != nil {
return nil, err if pndClientConnection == nil {
pndClientConnection, err = grpc.NewClient(addr, opts...)
if err != nil {
return nil, err
}
} }
return ppb.NewPndServiceClient(conn), nil
return ppb.NewPndServiceClient(pndClientConnection), nil
} }
...@@ -5,35 +5,51 @@ import ( ...@@ -5,35 +5,51 @@ import (
"google.golang.org/grpc" "google.golang.org/grpc"
) )
var authClientConnection *grpc.ClientConn
var userClientConnection *grpc.ClientConn
var roleClientConnection *grpc.ClientConn
// AuthClient returns a client for the gRPC Auth service. It takes // AuthClient returns a client for the gRPC Auth service. It takes
// the address of the gRPC endpoint and optional grpc.DialOption // the address of the gRPC endpoint and optional grpc.DialOption
// as argument. // as argument.
func AuthClient(addr string, opts ...grpc.DialOption) (apb.AuthServiceClient, error) { func AuthClient(addr string, opts ...grpc.DialOption) (apb.AuthServiceClient, error) {
conn, err := grpc.NewClient(addr, opts...) var err error
if err != nil {
return nil, err if authClientConnection == nil {
authClientConnection, err = grpc.NewClient(addr, opts...)
if err != nil {
return nil, err
}
} }
return apb.NewAuthServiceClient(conn), nil return apb.NewAuthServiceClient(authClientConnection), nil
} }
// UserClient returns a client for the gRPC User service. It takes // UserClient returns a client for the gRPC User service. It takes
// the address of the gRPC endpoint and optional grpc.DialOption // the address of the gRPC endpoint and optional grpc.DialOption
// as argument. // as argument.
func UserClient(addr string, opts ...grpc.DialOption) (apb.UserServiceClient, error) { func UserClient(addr string, opts ...grpc.DialOption) (apb.UserServiceClient, error) {
conn, err := grpc.NewClient(addr, opts...) var err error
if err != nil {
return nil, err if userClientConnection == nil {
userClientConnection, err = grpc.NewClient(addr, opts...)
if err != nil {
return nil, err
}
} }
return apb.NewUserServiceClient(conn), nil return apb.NewUserServiceClient(userClientConnection), nil
} }
// RoleClient returns a client for the gRPC Role service. It takes // RoleClient returns a client for the gRPC Role service. It takes
// the address of the gRPC endpoint and optional grpc.DialOption // the address of the gRPC endpoint and optional grpc.DialOption
// as argument. // as argument.
func RoleClient(addr string, opts ...grpc.DialOption) (apb.RoleServiceClient, error) { func RoleClient(addr string, opts ...grpc.DialOption) (apb.RoleServiceClient, error) {
conn, err := grpc.NewClient(addr, opts...) var err error
if err != nil {
return nil, err if roleClientConnection == nil {
roleClientConnection, err = grpc.NewClient(addr, opts...)
if err != nil {
return nil, err
}
} }
return apb.NewRoleServiceClient(conn), nil return apb.NewRoleServiceClient(roleClientConnection), nil
} }
...@@ -5,13 +5,20 @@ import ( ...@@ -5,13 +5,20 @@ import (
"google.golang.org/grpc" "google.golang.org/grpc"
) )
var sbiClientConnection *grpc.ClientConn
// SbiClient returns a client for the gRPC SBI service. It takes // SbiClient returns a client for the gRPC SBI service. It takes
// the address of the gRPC endpoint and optional grpc.DialOption // the address of the gRPC endpoint and optional grpc.DialOption
// as argument. // as argument.
func SbiClient(addr string, opts ...grpc.DialOption) (spb.SbiServiceClient, error) { func SbiClient(addr string, opts ...grpc.DialOption) (spb.SbiServiceClient, error) {
conn, err := grpc.NewClient(addr, opts...) var err error
if err != nil {
return nil, err if sbiClientConnection == nil {
sbiClientConnection, err = grpc.NewClient(addr, opts...)
if err != nil {
return nil, err
}
} }
return spb.NewSbiServiceClient(conn), nil
return spb.NewSbiServiceClient(sbiClientConnection), nil
} }
...@@ -5,13 +5,20 @@ import ( ...@@ -5,13 +5,20 @@ import (
"google.golang.org/grpc" "google.golang.org/grpc"
) )
var subManagementClientConnection *grpc.ClientConn
// SubManagementClient returns a client for the gRPC SubscriptionManagement service. It takes // SubManagementClient returns a client for the gRPC SubscriptionManagement service. It takes
// the address of the gRPC endpoint and optional grpc.DialOption // the address of the gRPC endpoint and optional grpc.DialOption
// as argument. // as argument.
func SubManagementClient(addr string, opts ...grpc.DialOption) (subpb.SubscriptionManagementServiceClient, error) { func SubManagementClient(addr string, opts ...grpc.DialOption) (subpb.SubscriptionManagementServiceClient, error) {
conn, err := grpc.NewClient(addr, opts...) var err error
if err != nil {
return nil, err if subManagementClientConnection == nil {
subManagementClientConnection, err = grpc.NewClient(addr, opts...)
if err != nil {
return nil, err
}
} }
return subpb.NewSubscriptionManagementServiceClient(conn), nil
return subpb.NewSubscriptionManagementServiceClient(subManagementClientConnection), nil
} }
...@@ -6,7 +6,6 @@ import ( ...@@ -6,7 +6,6 @@ import (
"code.fbi.h-da.de/danet/gosdn/controller/customerrs" "code.fbi.h-da.de/danet/gosdn/controller/customerrs"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/networkelement" "code.fbi.h-da.de/danet/gosdn/controller/interfaces/networkelement"
"code.fbi.h-da.de/danet/gosdn/controller/nucleus/database"
"code.fbi.h-da.de/danet/gosdn/controller/store" "code.fbi.h-da.de/danet/gosdn/controller/store"
"go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/bson/primitive"
...@@ -122,15 +121,10 @@ func (s *DatabaseNetworkElementStore) Add(ctx context.Context, networkElementToA ...@@ -122,15 +121,10 @@ func (s *DatabaseNetworkElementStore) Add(ctx context.Context, networkElementToA
func (s *DatabaseNetworkElementStore) Update(ctx context.Context, networkElementToUpdate networkelement.NetworkElement) (err error) { func (s *DatabaseNetworkElementStore) Update(ctx context.Context, networkElementToUpdate networkelement.NetworkElement) (err error) {
var updatedLoadedNetworkElement networkelement.LoadedNetworkElement var updatedLoadedNetworkElement networkelement.LoadedNetworkElement
db, err := database.GetDatabaseConnection()
if err != nil {
return err
}
wc := writeconcern.Majority() wc := writeconcern.Majority()
txnOptions := options.Transaction().SetWriteConcern(wc) txnOptions := options.Transaction().SetWriteConcern(wc)
// Starts a session on the client // Starts a session on the client
session, err := db.Client().StartSession() session, err := s.collection.Database().Client().StartSession()
if err != nil { if err != nil {
return err return err
} }
......
...@@ -20,6 +20,8 @@ import ( ...@@ -20,6 +20,8 @@ import (
tpb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/transport" tpb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/transport"
) )
var gnmiClients = make(map[string]gpb.GNMIClient, 0)
// Gnmi implements the Transport interface and provides an SBI with the // Gnmi implements the Transport interface and provides an SBI with the
// possibility to access a gNMI endpoint. // possibility to access a gNMI endpoint.
type Gnmi struct { type Gnmi struct {
...@@ -64,9 +66,14 @@ func newGnmiTransport(opts *tpb.TransportOption, model shared.DeviceModel) (*Gnm ...@@ -64,9 +66,14 @@ func newGnmiTransport(opts *tpb.TransportOption, model shared.DeviceModel) (*Gnm
} }
} }
c, err := gnmi.Dial(gnmiConfig) var err error
if err != nil { c, ok := gnmiClients[opts.GetAddress()]
return nil, err if !ok {
c, err = gnmi.Dial(gnmiConfig)
if err != nil {
return nil, err
}
gnmiClients[opts.GetAddress()] = c
} }
log.WithFields(log.Fields{ log.WithFields(log.Fields{
......
...@@ -16,6 +16,13 @@ import ( ...@@ -16,6 +16,13 @@ import (
"go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson"
) )
type pluginConnection struct {
client *hcplugin.Client
model shared.DeviceModel
}
var pluginClients = make(map[uuid.UUID]pluginConnection, 0)
// Plugin is the controllers internal representation of a plugin. // Plugin is the controllers internal representation of a plugin.
type Plugin struct { type Plugin struct {
UUID uuid.UUID UUID uuid.UUID
...@@ -66,6 +73,11 @@ func NewPlugin(id uuid.UUID, execPath string) (*Plugin, error) { ...@@ -66,6 +73,11 @@ func NewPlugin(id uuid.UUID, execPath string) (*Plugin, error) {
} }
} }
pluginClients[id] = pluginConnection{
client: client,
model: model,
}
return &Plugin{ return &Plugin{
UUID: id, UUID: id,
client: client, client: client,
...@@ -78,43 +90,52 @@ func NewPlugin(id uuid.UUID, execPath string) (*Plugin, error) { ...@@ -78,43 +90,52 @@ func NewPlugin(id uuid.UUID, execPath string) (*Plugin, error) {
// NewPluginThroughReattachConfig creates a new Plugin through a reattach config. // NewPluginThroughReattachConfig creates a new Plugin through a reattach config.
func NewPluginThroughReattachConfig(loadedPlugin plugin.LoadedPlugin) (plugin.Plugin, error) { func NewPluginThroughReattachConfig(loadedPlugin plugin.LoadedPlugin) (plugin.Plugin, error) {
client := hcplugin.NewClient(&hcplugin.ClientConfig{ //client := hcplugin.NewClient(&hcplugin.ClientConfig{
HandshakeConfig: shared.Handshake, // HandshakeConfig: shared.Handshake,
Plugins: shared.PluginMap, // Plugins: shared.PluginMap,
Reattach: &loadedPlugin.ReattachConfig, // Reattach: &loadedPlugin.ReattachConfig,
AllowedProtocols: []hcplugin.Protocol{hcplugin.ProtocolGRPC}, // AllowedProtocols: []hcplugin.Protocol{hcplugin.ProtocolGRPC},
}) //})
// create a client that is within the AllowedProtocols. In this case this //// create a client that is within the AllowedProtocols. In this case this
// returns a gRPCClient. Allows to connect through gRPC. //// returns a gRPCClient. Allows to connect through gRPC.
gRPCClient, err := client.Client() //gRPCClient, err := client.Client()
//if err != nil {
// return nil, err
//}
//// Request the plugin. This returns the gRPC client from the
//// DeviceModelPlugin. This can then be casted to the interface that we are
//// exposing through the plugin (in this case "DeviceModel").
//raw, err := gRPCClient.Dispense("deviceModel")
//if err != nil {
// return nil, err
//}
//// cast the raw plugin to the DeviceModel interface. This allows to call
//// methods on the plugin as if it were a normal DeviceModel instance but
//// actually they are executed on the plugin sent through gRPC.
//model, ok := raw.(shared.DeviceModel)
//if !ok {
// return nil, customerrs.InvalidTypeAssertionError{
// Value: model,
// Type: (*shared.DeviceModel)(nil),
// }
//}
pluginId, err := uuid.Parse(loadedPlugin.ID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// Request the plugin. This returns the gRPC client from the pc, ok := pluginClients[pluginId]
// DeviceModelPlugin. This can then be casted to the interface that we are
// exposing through the plugin (in this case "DeviceModel").
raw, err := gRPCClient.Dispense("deviceModel")
if err != nil {
return nil, err
}
// cast the raw plugin to the DeviceModel interface. This allows to call
// methods on the plugin as if it were a normal DeviceModel instance but
// actually they are executed on the plugin sent through gRPC.
model, ok := raw.(shared.DeviceModel)
if !ok { if !ok {
return nil, customerrs.InvalidTypeAssertionError{ return nil, fmt.Errorf("plugin not found")
Value: model,
Type: (*shared.DeviceModel)(nil),
}
} }
return &Plugin{ return &Plugin{
UUID: uuid.MustParse(loadedPlugin.ID), UUID: uuid.MustParse(loadedPlugin.ID),
client: client, client: pc.client,
DeviceModel: model, DeviceModel: pc.model,
manifest: &loadedPlugin.Manifest, manifest: &loadedPlugin.Manifest,
state: plugin.INITIALIZED, state: plugin.INITIALIZED,
}, nil }, nil
......
...@@ -6,7 +6,6 @@ import ( ...@@ -6,7 +6,6 @@ import (
"code.fbi.h-da.de/danet/gosdn/controller/customerrs" "code.fbi.h-da.de/danet/gosdn/controller/customerrs"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/rbac" "code.fbi.h-da.de/danet/gosdn/controller/interfaces/rbac"
"code.fbi.h-da.de/danet/gosdn/controller/nucleus/database"
"code.fbi.h-da.de/danet/gosdn/controller/store" "code.fbi.h-da.de/danet/gosdn/controller/store"
"github.com/google/uuid" "github.com/google/uuid"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
...@@ -134,15 +133,10 @@ func (s *DatabaseUserStore) GetAll(ctx context.Context) (loadedUsers []rbac.Load ...@@ -134,15 +133,10 @@ func (s *DatabaseUserStore) GetAll(ctx context.Context) (loadedUsers []rbac.Load
func (s *DatabaseUserStore) Update(ctx context.Context, userToUpdate rbac.User) (err error) { func (s *DatabaseUserStore) Update(ctx context.Context, userToUpdate rbac.User) (err error) {
var updatedLoadedUser rbac.LoadedUser var updatedLoadedUser rbac.LoadedUser
db, err := database.GetDatabaseConnection()
if err != nil {
return err
}
wc := writeconcern.Majority() wc := writeconcern.Majority()
txnOptions := options.Transaction().SetWriteConcern(wc) txnOptions := options.Transaction().SetWriteConcern(wc)
// Starts a session on the client // Starts a session on the client
session, err := db.Client().StartSession() session, err := s.collection.Database().Client().StartSession()
if err != nil { if err != nil {
return err return err
} }
......
...@@ -6,7 +6,6 @@ import ( ...@@ -6,7 +6,6 @@ import (
"time" "time"
"code.fbi.h-da.de/danet/gosdn/controller/customerrs" "code.fbi.h-da.de/danet/gosdn/controller/customerrs"
"code.fbi.h-da.de/danet/gosdn/controller/nucleus/database"
query "code.fbi.h-da.de/danet/gosdn/controller/store" query "code.fbi.h-da.de/danet/gosdn/controller/store"
"github.com/google/uuid" "github.com/google/uuid"
...@@ -133,15 +132,10 @@ func (s *DatabaseNodeStore) Add(ctx context.Context, node Node) (err error) { ...@@ -133,15 +132,10 @@ func (s *DatabaseNodeStore) Add(ctx context.Context, node Node) (err error) {
func (s *DatabaseNodeStore) Update(ctx context.Context, node Node) (err error) { func (s *DatabaseNodeStore) Update(ctx context.Context, node Node) (err error) {
var updatedLoadedNodes Node var updatedLoadedNodes Node
db, err := database.GetDatabaseConnection()
if err != nil {
return err
}
wc := writeconcern.Majority() wc := writeconcern.Majority()
txnOptions := options.Transaction().SetWriteConcern(wc) txnOptions := options.Transaction().SetWriteConcern(wc)
// Starts a session on the client // Starts a session on the client
session, err := db.Client().StartSession() session, err := s.collection.Database().Client().StartSession()
if err != nil { if err != nil {
return err return err
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment