Skip to content
Snippets Groups Projects
Commit ee52f5cb authored by André Sterba's avatar André Sterba
Browse files

Shutdown grpc server gracefully

parent 437fdb0b
No related branches found
No related tags found
1 merge request!20Shutdown grpc server gracefully
Pipeline #270399 passed
......@@ -3,19 +3,15 @@ package main
import (
"context"
"flag"
"net"
"runtime/debug"
config "code.fbi.h-da.de/danet/costaquanta/ctrl/internal"
"code.fbi.h-da.de/danet/costaquanta/ctrl/internal/application"
server "code.fbi.h-da.de/danet/costaquanta/ctrl/internal/infrastructure/grpc"
"code.fbi.h-da.de/danet/costaquanta/ctrl/internal/infrastructure/shutdown"
pb "code.fbi.h-da.de/danet/costaquanta/gen/go/ctrl/v1"
"code.fbi.h-da.de/danet/costaquanta/libs/logging"
"code.fbi.h-da.de/danet/costaquanta/libs/tracing"
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/recovery"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
func main() {
......@@ -37,15 +33,11 @@ func main() {
flag.Parse()
cfg := config.GetConfig()
cfg.AddrBind = bindAddr
logs.Infof("config: %+v", cfg)
logs.Infof("binding to address %s", bindAddr)
listen, err := net.Listen("tcp", bindAddr)
if err != nil {
logs.Fatalf("failed to listen: %v", err)
}
tracing.SetRuntimeSettings("kms")
traceProvider, err := tracing.GetTracer(context.Background(), tracing.Backend)
if err != nil {
......@@ -57,53 +49,23 @@ func main() {
}
}()
grpcPanicRecoveryHandler := func(p any) (err error) {
logs.Errorf("recovered from gRPC panic %+v; %v", p, debug.Stack())
return status.Errorf(codes.Internal, "%s", p)
}
// var opts []grpc.ServerOption
// if *tls {
// if *certFile == "" {
// *certFile = data.Path("x509/server_cert.pem")
// }
// if *keyFile == "" {
// *keyFile = data.Path("x509/server_key.pem")
// }
// creds, err := credentials.NewServerTLSFromFile(*certFile, *keyFile)
// if err != nil {
// log.Fatalf("Failed to generate credentials: %v", err)
// }
// opts = []grpc.ServerOption{grpc.Creds(creds)}
// }
// grpcServer := grpc.NewServer(opts...)
grpcServer := grpc.NewServer(
grpc.StatsHandler(otelgrpc.NewServerHandler()),
grpc.ChainUnaryInterceptor(
// srvMetrics.UnaryServerInterceptor(grpcprom.WithExemplarFromContext(exemplarFromContext)),
// logging.UnaryServerInterceptor(interceptorLogger(rpcLogger), logging.WithFieldsFromContext(logTraceID)),
// selector.UnaryServerInterceptor(auth.UnaryServerInterceptor(authFn), selector.MatchFunc(allButHealthZ)),
recovery.UnaryServerInterceptor(recovery.WithRecoveryHandler(grpcPanicRecoveryHandler)),
),
grpc.ChainStreamInterceptor(
// srvMetrics.StreamServerInterceptor(grpcprom.WithExemplarFromContext(exemplarFromContext)),
// logging.StreamServerInterceptor(interceptorLogger(rpcLogger), logging.WithFieldsFromContext(logTraceID)),
// selector.StreamServerInterceptor(auth.StreamServerInterceptor(authFn), selector.MatchFunc(allButHealthZ)),
recovery.StreamServerInterceptor(
recovery.WithRecoveryHandler(grpcPanicRecoveryHandler),
),
),
)
app := application.NewApplication(log, traceProvider)
pb.RegisterHealthCtrlServer(grpcServer, app.HealthServer)
pb.RegisterCtrlRoutingServiceServer(grpcServer, app.RoutingServer)
err = grpcServer.Serve(listen)
grpcServer, err := server.NewGrpcServer(
cfg,
app,
logs,
)
if err != nil {
panic(err)
}
go grpcServer.Start(
func(server *grpc.Server, app *application.Application) {
pb.RegisterHealthCtrlServer(server, app.HealthServer)
pb.RegisterCtrlRoutingServiceServer(server, app.RoutingServer)
},
)
shutdown.AddShutdownHook(logs, grpcServer)
}
......@@ -10,6 +10,10 @@ import (
"go.uber.org/zap"
)
// Application is the main application struct that holds all the components
// of the application.
// It initializes the components and creates child loggers and tracers for
// each component.
type Application struct {
tracer *sdktrace.TracerProvider
......
package config
import "github.com/caarlos0/env/v11"
import (
"github.com/caarlos0/env/v11"
)
type Config struct {
AddrBind string
......
package server
import (
"io"
"net"
"runtime/debug"
config "code.fbi.h-da.de/danet/costaquanta/ctrl/internal"
"code.fbi.h-da.de/danet/costaquanta/ctrl/internal/application"
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/recovery"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// GRPCServer is an interface that defines the methods for a gRPC server.
// It is used to start the server and register services.
// It also implements the io.Closer interface to allow for graceful shutdown.
type GRPCServer interface {
Start(serviceRegister func(server *grpc.Server, app *application.Application))
io.Closer
}
type gRPCServer struct {
logger *zap.SugaredLogger
grpcServer *grpc.Server
config config.Config
app *application.Application
}
func NewGrpcServer(
config config.Config,
app *application.Application,
logger *zap.SugaredLogger,
) (GRPCServer, error) {
options, err := buildOptions(config, logger)
if err != nil {
return nil, err
}
server := grpc.NewServer(options...)
return &gRPCServer{
config: config,
grpcServer: server,
app: app,
logger: logger,
}, err
}
func (g gRPCServer) Start(serviceRegister func(server *grpc.Server, app *application.Application)) {
grpcListener, err := net.Listen("tcp", ""+g.config.AddrBind)
if err != nil {
g.logger.Fatal("failed to start grpc server", zap.Any("err", err))
}
serviceRegister(g.grpcServer, g.app)
g.logger.Info("start grpc server success ", zap.Any("endpoint", grpcListener.Addr()))
if err := g.grpcServer.Serve(grpcListener); err != nil {
g.logger.Fatal("failed to grpc server serve", zap.Any("err", err))
}
}
func (g gRPCServer) Close() error {
g.grpcServer.GracefulStop()
return nil
}
func buildOptions(
config config.Config,
logs *zap.SugaredLogger,
) ([]grpc.ServerOption, error) {
grpcPanicRecoveryHandler := func(p any) (err error) {
logs.Errorf("recovered from gRPC panic %+v; %v", p, debug.Stack())
return status.Errorf(codes.Internal, "%s", p)
}
return []grpc.ServerOption{
grpc.StatsHandler(otelgrpc.NewServerHandler()),
grpc.ChainUnaryInterceptor(
recovery.UnaryServerInterceptor(recovery.WithRecoveryHandler(grpcPanicRecoveryHandler)),
),
grpc.ChainStreamInterceptor(
recovery.StreamServerInterceptor(
recovery.WithRecoveryHandler(grpcPanicRecoveryHandler),
),
),
}, nil
}
package shutdown
import (
"errors"
"io"
"os"
"os/signal"
"syscall"
"go.uber.org/zap"
)
// AddShutdownHook listens for shutdown signals and notifies all registered
// services to call their Close() function.
func AddShutdownHook(logger *zap.SugaredLogger, closers ...io.Closer) {
logger.Info("listening signals...")
c := make(chan os.Signal, 1)
signal.Notify(
c, os.Interrupt, syscall.SIGHUP, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGTERM,
)
<-c
logger.Info("graceful shutdown...")
for _, closer := range closers {
if err := closer.Close(); err != nil {
logger.Error("failed to stop closer", zap.Any("err", err))
}
}
logger.Info("completed graceful shutdown")
if err := logger.Sync(); err != nil {
if !errors.Is(err, syscall.ENOTTY) {
logger.Infof("failed to flush logger err=%v\n", err)
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment