diff --git a/goKMS/kms/peers/kmsPeer.go b/goKMS/kms/peers/kmsPeer.go index 5c616f650fc405c3874edf795dae9fae40bfef5a..88a3f71241a0cbf44557351b2e8ad997cd157aee 100644 --- a/goKMS/kms/peers/kmsPeer.go +++ b/goKMS/kms/peers/kmsPeer.go @@ -110,49 +110,65 @@ func NewKmsPeer(peerKmsId string, quantummodule QuantumModule, cryptoAlgorithm c return nil, err } - go func() { - timeout := time.Second * 5 - defer remoteConn.Close() - - healthClient := healthpb.NewHealthClient(remoteConn) - // TODO: add option to cancel - for range time.Tick(time.Second) { - ctx, cancel := context.WithTimeout(context.Background(), timeout) - resp, err := healthClient.Check(ctx, &healthpb.HealthCheckRequest{Service: ""}) - cancel() - if err != nil { - if stat, ok := status.FromError(err); ok && stat.Code() == codes.Unimplemented { - log.Printf("error: grpc health not implemented on peer with id: %s; %s", peerKmsId, stat.Message()) - if kmsPeer.peerStatus != KmsPeerUp { - kmsPeer.peerStatus = KmsPeerUp - kmsPeer.eventBus.Publish(event.NewPeerEvent(kmsPeer.TcpSocketStr)) + go kmsPeer.initializeHealthCheck(remoteConn) + + return kmsPeer, nil +} + +func (kp *KmsPeer) initializeHealthCheck(remoteConn *grpc.ClientConn) { + timeout := time.Second * 5 + defer func() { + if err := remoteConn.Close(); err != nil { + log.Errorf("Failed to close health check connection, for peer: %s; err: %v", kp.peerKmsId, err) + } + }() + + healthClient := healthpb.NewHealthClient(remoteConn) + // TODO: add option to cancel + for range time.Tick(time.Second) { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + resp, err := healthClient.Check(ctx, &healthpb.HealthCheckRequest{Service: ""}) + cancel() + if err != nil { + if stat, ok := status.FromError(err); ok && stat.Code() == codes.Unimplemented { + log.Printf("error: grpc health not implemented on peer with id: %s; %s", kp.peerKmsId, stat.Message()) + if kp.peerStatus != KmsPeerUp { + kp.peerStatus = KmsPeerUp + if err := kp.eventBus.Publish(event.NewPeerEvent(kp.TcpSocketStr)); err != nil { + log.Errorf("Failed to publish peer status update, for peer: %s; err: %v", kp.peerKmsId, err) } - break - } else if stat, ok := status.FromError(err); ok && stat.Code() == codes.DeadlineExceeded { - log.Printf("health request for peer: %s timed out after %d seconds", peerKmsId, timeout) - if kmsPeer.peerStatus != KmsPeerDown { - kmsPeer.peerStatus = KmsPeerDown - kmsPeer.eventBus.Publish(event.NewPeerEvent(kmsPeer.TcpSocketStr)) + } + break + } else if stat, ok := status.FromError(err); ok && stat.Code() == codes.DeadlineExceeded { + log.Printf("health request for peer: %s timed out after %d seconds", kp.peerKmsId, timeout) + if kp.peerStatus != KmsPeerDown { + kp.peerStatus = KmsPeerDown + if err := kp.eventBus.Publish(event.NewPeerEvent(kp.TcpSocketStr)); err != nil { + log.Errorf("Failed to publish peer status update, for peer: %s; err: %v", kp.peerKmsId, err) } - } else { - log.Printf("health request for peer: %s failed; error: %+v", peerKmsId, err) - if kmsPeer.peerStatus != KmsPeerDown { - kmsPeer.peerStatus = KmsPeerDown - kmsPeer.eventBus.Publish(event.NewPeerEvent(kmsPeer.TcpSocketStr)) + } + } else { + log.Printf("health request for peer: %s failed; error: %+v", kp.peerKmsId, err) + if kp.peerStatus != KmsPeerDown { + kp.peerStatus = KmsPeerDown + if err := kp.eventBus.Publish(event.NewPeerEvent(kp.TcpSocketStr)); err != nil { + log.Errorf("Failed to publish peer status update, for peer: %s; err: %v", kp.peerKmsId, err) } } } - if resp != nil && resp.Status != healthpb.HealthCheckResponse_SERVING && kmsPeer.peerStatus != KmsPeerDown { - kmsPeer.peerStatus = KmsPeerDown - kmsPeer.eventBus.Publish(event.NewPeerEvent(kmsPeer.TcpSocketStr)) - } else if resp != nil && resp.Status == healthpb.HealthCheckResponse_SERVING && kmsPeer.peerStatus != KmsPeerUp { - kmsPeer.peerStatus = KmsPeerUp - kmsPeer.eventBus.Publish(event.NewPeerEvent(kmsPeer.TcpSocketStr)) + } + if resp != nil && resp.Status != healthpb.HealthCheckResponse_SERVING && kp.peerStatus != KmsPeerDown { + kp.peerStatus = KmsPeerDown + if err := kp.eventBus.Publish(event.NewPeerEvent(kp.TcpSocketStr)); err != nil { + log.Errorf("Failed to publish peer status update, for peer: %s; err: %v", kp.peerKmsId, err) + } + } else if resp != nil && resp.Status == healthpb.HealthCheckResponse_SERVING && kp.peerStatus != KmsPeerUp { + kp.peerStatus = KmsPeerUp + if err := kp.eventBus.Publish(event.NewPeerEvent(kp.TcpSocketStr)); err != nil { + log.Errorf("Failed to publish peer status update, for peer: %s; err: %v", kp.peerKmsId, err) } } - }() - - return kmsPeer, nil + } } func (kp *KmsPeer) Client() *GRPCClient {