diff --git a/cmd/ris-mirror/main.go b/cmd/ris-mirror/main.go index 25540961a26a892954494e67619c1c3a0e773b04..65772646259911c9f83ada976b4e9dc56c0466a6 100644 --- a/cmd/ris-mirror/main.go +++ b/cmd/ris-mirror/main.go @@ -37,7 +37,7 @@ func main() { grpcClientManager := clientmanager.New() for _, instance := range cfg.GetRISInstances() { - err := grpcClientManager.Add(instance, grpc.WithInsecure(), grpc.WithKeepaliveParams(keepalive.ClientParameters{ + err := grpcClientManager.AddIfNotExists(instance, grpc.WithInsecure(), grpc.WithKeepaliveParams(keepalive.ClientParameters{ Time: time.Second * 10, Timeout: time.Second * time.Duration(*risTimeout), PermitWithoutStream: true, diff --git a/util/grpc/clientmanager/clientmanager.go b/util/grpc/clientmanager/clientmanager.go index ef01eb70575f4913e5fe2ee3174630f0b2381f25..6dcb0f8ff0ce230201d3fd6354c9521510199743 100644 --- a/util/grpc/clientmanager/clientmanager.go +++ b/util/grpc/clientmanager/clientmanager.go @@ -1,7 +1,6 @@ package clientmanager import ( - "fmt" "sync" "github.com/bio-routing/bio-rd/util/grpc/clientmanager/metrics" @@ -11,36 +10,42 @@ import ( // ClientManager manages GRPC client connections type ClientManager struct { - connections map[string]*grpc.ClientConn + connections map[string]*conn connectionsMu sync.RWMutex } +type conn struct { + gc *grpc.ClientConn + refCount uint +} + // New creates a new ClientManager func New() *ClientManager { return &ClientManager{ - connections: make(map[string]*grpc.ClientConn), + connections: make(map[string]*conn), } } -// Get gets a target connection +// Get gets a target connection and tracks it's usage func (cm *ClientManager) Get(target string) *grpc.ClientConn { - cm.connectionsMu.RLock() - defer cm.connectionsMu.RUnlock() + cm.connectionsMu.Lock() + defer cm.connectionsMu.Unlock() if _, exists := cm.connections[target]; !exists { return nil } - return cm.connections[target] + cm.connections[target].refCount++ + return cm.connections[target].gc } -// Add adds a target -func (cm *ClientManager) Add(target string, opts ...grpc.DialOption) error { +// AddIfNotExists adds a client if it doesn't exist already +func (cm *ClientManager) AddIfNotExists(target string, opts ...grpc.DialOption) error { cm.connectionsMu.Lock() defer cm.connectionsMu.Unlock() if _, exists := cm.connections[target]; exists { - return fmt.Errorf("Target exists already") + return nil } cc, err := grpc.Dial(target, opts...) @@ -48,10 +53,31 @@ func (cm *ClientManager) Add(target string, opts ...grpc.DialOption) error { return errors.Wrap(err, "grpc.Dial failed") } - cm.connections[target] = cc + cm.connections[target] = &conn{ + gc: cc, + } + return nil } +// Release releases a connection if refCount reaches 0 +func (cm *ClientManager) Release(target string) { + cm.connectionsMu.Lock() + defer cm.connectionsMu.Unlock() + + if _, exists := cm.connections[target]; exists { + return + } + + cm.connections[target].refCount-- + if cm.connections[target].refCount > 0 { + return + } + + cm.connections[target].gc.Close() + delete(cm.connections, target) +} + // Metrics gets ClientManager metrics func (cm *ClientManager) Metrics() *metrics.ClientManagerMetrics { ret := metrics.New() @@ -61,7 +87,7 @@ func (cm *ClientManager) Metrics() *metrics.ClientManagerMetrics { for t, c := range cm.connections { ret.Connections = append(ret.Connections, &metrics.GRPCConnectionMetrics{ Target: t, - State: int(c.GetState()), + State: int(c.gc.GetState()), }) }