From 717ce88a318407f36d5db8f95c63a6e782696a47 Mon Sep 17 00:00:00 2001 From: Oliver Herms <oliver.herms@exaring.de> Date: Wed, 23 Sep 2020 22:55:39 +0200 Subject: [PATCH] Implement usage tracking for grpc connection manager --- cmd/ris-mirror/main.go | 2 +- util/grpc/clientmanager/clientmanager.go | 50 ++++++++++++++++++------ 2 files changed, 39 insertions(+), 13 deletions(-) diff --git a/cmd/ris-mirror/main.go b/cmd/ris-mirror/main.go index 25540961..65772646 100644 --- a/cmd/ris-mirror/main.go +++ b/cmd/ris-mirror/main.go @@ -37,7 +37,7 @@ func main() { grpcClientManager := clientmanager.New() for _, instance := range cfg.GetRISInstances() { - err := grpcClientManager.Add(instance, grpc.WithInsecure(), grpc.WithKeepaliveParams(keepalive.ClientParameters{ + err := grpcClientManager.AddIfNotExists(instance, grpc.WithInsecure(), grpc.WithKeepaliveParams(keepalive.ClientParameters{ Time: time.Second * 10, Timeout: time.Second * time.Duration(*risTimeout), PermitWithoutStream: true, diff --git a/util/grpc/clientmanager/clientmanager.go b/util/grpc/clientmanager/clientmanager.go index ef01eb70..6dcb0f8f 100644 --- a/util/grpc/clientmanager/clientmanager.go +++ b/util/grpc/clientmanager/clientmanager.go @@ -1,7 +1,6 @@ package clientmanager import ( - "fmt" "sync" "github.com/bio-routing/bio-rd/util/grpc/clientmanager/metrics" @@ -11,36 +10,42 @@ import ( // ClientManager manages GRPC client connections type ClientManager struct { - connections map[string]*grpc.ClientConn + connections map[string]*conn connectionsMu sync.RWMutex } +type conn struct { + gc *grpc.ClientConn + refCount uint +} + // New creates a new ClientManager func New() *ClientManager { return &ClientManager{ - connections: make(map[string]*grpc.ClientConn), + connections: make(map[string]*conn), } } -// Get gets a target connection +// Get gets a target connection and tracks it's usage func (cm *ClientManager) Get(target string) *grpc.ClientConn { - cm.connectionsMu.RLock() - defer cm.connectionsMu.RUnlock() + cm.connectionsMu.Lock() + defer cm.connectionsMu.Unlock() if _, exists := cm.connections[target]; !exists { return nil } - return cm.connections[target] + cm.connections[target].refCount++ + return cm.connections[target].gc } -// Add adds a target -func (cm *ClientManager) Add(target string, opts ...grpc.DialOption) error { +// AddIfNotExists adds a client if it doesn't exist already +func (cm *ClientManager) AddIfNotExists(target string, opts ...grpc.DialOption) error { cm.connectionsMu.Lock() defer cm.connectionsMu.Unlock() if _, exists := cm.connections[target]; exists { - return fmt.Errorf("Target exists already") + return nil } cc, err := grpc.Dial(target, opts...) @@ -48,10 +53,31 @@ func (cm *ClientManager) Add(target string, opts ...grpc.DialOption) error { return errors.Wrap(err, "grpc.Dial failed") } - cm.connections[target] = cc + cm.connections[target] = &conn{ + gc: cc, + } + return nil } +// Release releases a connection if refCount reaches 0 +func (cm *ClientManager) Release(target string) { + cm.connectionsMu.Lock() + defer cm.connectionsMu.Unlock() + + if _, exists := cm.connections[target]; exists { + return + } + + cm.connections[target].refCount-- + if cm.connections[target].refCount > 0 { + return + } + + cm.connections[target].gc.Close() + delete(cm.connections, target) +} + // Metrics gets ClientManager metrics func (cm *ClientManager) Metrics() *metrics.ClientManagerMetrics { ret := metrics.New() @@ -61,7 +87,7 @@ func (cm *ClientManager) Metrics() *metrics.ClientManagerMetrics { for t, c := range cm.connections { ret.Connections = append(ret.Connections, &metrics.GRPCConnectionMetrics{ Target: t, - State: int(c.GetState()), + State: int(c.gc.GetState()), }) } -- GitLab