Skip to content
Snippets Groups Projects
Unverified Commit 1140b936 authored by Christoph Petrausch's avatar Christoph Petrausch Committed by GitHub
Browse files

Merge pull request #285 from bio-routing/clientmanager/cleanup

Implement usage tracking for grpc connection manager
parents 3814d865 717ce88a
No related branches found
No related tags found
No related merge requests found
......@@ -37,7 +37,7 @@ func main() {
grpcClientManager := clientmanager.New()
for _, instance := range cfg.GetRISInstances() {
err := grpcClientManager.Add(instance, grpc.WithInsecure(), grpc.WithKeepaliveParams(keepalive.ClientParameters{
err := grpcClientManager.AddIfNotExists(instance, grpc.WithInsecure(), grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: time.Second * 10,
Timeout: time.Second * time.Duration(*risTimeout),
PermitWithoutStream: true,
......
package clientmanager
import (
"fmt"
"sync"
"github.com/bio-routing/bio-rd/util/grpc/clientmanager/metrics"
......@@ -11,36 +10,42 @@ import (
// ClientManager manages GRPC client connections
type ClientManager struct {
connections map[string]*grpc.ClientConn
connections map[string]*conn
connectionsMu sync.RWMutex
}
type conn struct {
gc *grpc.ClientConn
refCount uint
}
// New creates a new ClientManager
func New() *ClientManager {
return &ClientManager{
connections: make(map[string]*grpc.ClientConn),
connections: make(map[string]*conn),
}
}
// Get gets a target connection
// Get gets a target connection and tracks it's usage
func (cm *ClientManager) Get(target string) *grpc.ClientConn {
cm.connectionsMu.RLock()
defer cm.connectionsMu.RUnlock()
cm.connectionsMu.Lock()
defer cm.connectionsMu.Unlock()
if _, exists := cm.connections[target]; !exists {
return nil
}
return cm.connections[target]
cm.connections[target].refCount++
return cm.connections[target].gc
}
// Add adds a target
func (cm *ClientManager) Add(target string, opts ...grpc.DialOption) error {
// AddIfNotExists adds a client if it doesn't exist already
func (cm *ClientManager) AddIfNotExists(target string, opts ...grpc.DialOption) error {
cm.connectionsMu.Lock()
defer cm.connectionsMu.Unlock()
if _, exists := cm.connections[target]; exists {
return fmt.Errorf("Target exists already")
return nil
}
cc, err := grpc.Dial(target, opts...)
......@@ -48,10 +53,31 @@ func (cm *ClientManager) Add(target string, opts ...grpc.DialOption) error {
return errors.Wrap(err, "grpc.Dial failed")
}
cm.connections[target] = cc
cm.connections[target] = &conn{
gc: cc,
}
return nil
}
// Release releases a connection if refCount reaches 0
func (cm *ClientManager) Release(target string) {
cm.connectionsMu.Lock()
defer cm.connectionsMu.Unlock()
if _, exists := cm.connections[target]; exists {
return
}
cm.connections[target].refCount--
if cm.connections[target].refCount > 0 {
return
}
cm.connections[target].gc.Close()
delete(cm.connections, target)
}
// Metrics gets ClientManager metrics
func (cm *ClientManager) Metrics() *metrics.ClientManagerMetrics {
ret := metrics.New()
......@@ -61,7 +87,7 @@ func (cm *ClientManager) Metrics() *metrics.ClientManagerMetrics {
for t, c := range cm.connections {
ret.Connections = append(ret.Connections, &metrics.GRPCConnectionMetrics{
Target: t,
State: int(c.GetState()),
State: int(c.gc.GetState()),
})
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment