Skip to content
Snippets Groups Projects
Commit 717ce88a authored by Oliver Herms's avatar Oliver Herms
Browse files

Implement usage tracking for grpc connection manager

parent 3814d865
No related branches found
No related tags found
No related merge requests found
......@@ -37,7 +37,7 @@ func main() {
grpcClientManager := clientmanager.New()
for _, instance := range cfg.GetRISInstances() {
err := grpcClientManager.Add(instance, grpc.WithInsecure(), grpc.WithKeepaliveParams(keepalive.ClientParameters{
err := grpcClientManager.AddIfNotExists(instance, grpc.WithInsecure(), grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: time.Second * 10,
Timeout: time.Second * time.Duration(*risTimeout),
PermitWithoutStream: true,
......
package clientmanager
import (
"fmt"
"sync"
"github.com/bio-routing/bio-rd/util/grpc/clientmanager/metrics"
......@@ -11,36 +10,42 @@ import (
// ClientManager manages GRPC client connections
type ClientManager struct {
connections map[string]*grpc.ClientConn
connections map[string]*conn
connectionsMu sync.RWMutex
}
type conn struct {
gc *grpc.ClientConn
refCount uint
}
// New creates a new ClientManager
func New() *ClientManager {
return &ClientManager{
connections: make(map[string]*grpc.ClientConn),
connections: make(map[string]*conn),
}
}
// Get gets a target connection
// Get gets a target connection and tracks it's usage
func (cm *ClientManager) Get(target string) *grpc.ClientConn {
cm.connectionsMu.RLock()
defer cm.connectionsMu.RUnlock()
cm.connectionsMu.Lock()
defer cm.connectionsMu.Unlock()
if _, exists := cm.connections[target]; !exists {
return nil
}
return cm.connections[target]
cm.connections[target].refCount++
return cm.connections[target].gc
}
// Add adds a target
func (cm *ClientManager) Add(target string, opts ...grpc.DialOption) error {
// AddIfNotExists adds a client if it doesn't exist already
func (cm *ClientManager) AddIfNotExists(target string, opts ...grpc.DialOption) error {
cm.connectionsMu.Lock()
defer cm.connectionsMu.Unlock()
if _, exists := cm.connections[target]; exists {
return fmt.Errorf("Target exists already")
return nil
}
cc, err := grpc.Dial(target, opts...)
......@@ -48,10 +53,31 @@ func (cm *ClientManager) Add(target string, opts ...grpc.DialOption) error {
return errors.Wrap(err, "grpc.Dial failed")
}
cm.connections[target] = cc
cm.connections[target] = &conn{
gc: cc,
}
return nil
}
// Release releases a connection if refCount reaches 0
func (cm *ClientManager) Release(target string) {
cm.connectionsMu.Lock()
defer cm.connectionsMu.Unlock()
if _, exists := cm.connections[target]; exists {
return
}
cm.connections[target].refCount--
if cm.connections[target].refCount > 0 {
return
}
cm.connections[target].gc.Close()
delete(cm.connections, target)
}
// Metrics gets ClientManager metrics
func (cm *ClientManager) Metrics() *metrics.ClientManagerMetrics {
ret := metrics.New()
......@@ -61,7 +87,7 @@ func (cm *ClientManager) Metrics() *metrics.ClientManagerMetrics {
for t, c := range cm.connections {
ret.Connections = append(ret.Connections, &metrics.GRPCConnectionMetrics{
Target: t,
State: int(c.GetState()),
State: int(c.gc.GetState()),
})
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment