Skip to content
Snippets Groups Projects
Commit 673d42fd authored by Malte Bauch's avatar Malte Bauch Committed by Neil-Jocelyn Schark
Browse files

Resolve "KMS - Boilerplate"

parent 61702a19
No related branches found
No related tags found
1 merge request!24Resolve "KMS - Boilerplate"
Pipeline #271347 passed
Showing with 262 additions and 87 deletions
syntax = "proto3";
package ctrl.v1;
service HealthCtrl {
service HealthCtrlService {
rpc Health(HealthRequest) returns (HealthResponse);
}
message HealthRequest {
string Message = 1;
string message = 1;
}
message HealthResponse {
string Message = 1;
string message = 1;
}
syntax = "proto3";
package kms.v1;
service HealthKms {
rpc Health(HealthKmsRequest) returns (HealthKmsResponse);
service HealthKmsService {
rpc Health(HealthRequest) returns (HealthResponse);
}
message HealthKmsRequest {
string Message = 1;
message HealthRequest {
string message = 1;
}
message HealthKmsResponse {
string Message = 1;
message HealthResponse {
string message = 1;
}
......@@ -6,16 +6,16 @@ import (
config "code.fbi.h-da.de/danet/costaquanta/ctrl/internal"
"code.fbi.h-da.de/danet/costaquanta/ctrl/internal/application"
server "code.fbi.h-da.de/danet/costaquanta/ctrl/internal/infrastructure/grpc"
"code.fbi.h-da.de/danet/costaquanta/ctrl/internal/infrastructure/shutdown"
server "code.fbi.h-da.de/danet/costaquanta/ctrl/internal/infrastructure/interaction/grpc"
pb "code.fbi.h-da.de/danet/costaquanta/gen/go/ctrl/v1"
"code.fbi.h-da.de/danet/costaquanta/libs/logging"
"code.fbi.h-da.de/danet/costaquanta/libs/shutdown"
"code.fbi.h-da.de/danet/costaquanta/libs/tracing"
"google.golang.org/grpc"
)
func main() {
log := logging.CreateProductionLogger("kms")
log := logging.CreateProductionLogger("controller")
defer func() {
_ = log.Sync()
}()
......@@ -38,7 +38,7 @@ func main() {
logs.Infof("config: %+v", cfg)
logs.Infof("binding to address %s", bindAddr)
tracing.SetRuntimeSettings("kms")
tracing.SetRuntimeSettings("controller")
traceProvider, err := tracing.GetTracer(context.Background(), tracing.Backend)
if err != nil {
logs.Fatal(err)
......@@ -62,7 +62,7 @@ func main() {
go grpcServer.Start(
func(server *grpc.Server, app *application.Application) {
pb.RegisterHealthCtrlServer(server, app.HealthServer)
pb.RegisterHealthCtrlServiceServer(server, app.HealthServer)
pb.RegisterCtrlRoutingServiceServer(server, app.RoutingServer)
},
)
......
......@@ -9,7 +9,7 @@ import (
)
type HealthServer struct {
pb.UnimplementedHealthCtrlServer
pb.UnimplementedHealthCtrlServiceServer
logger *zap.SugaredLogger
tracer trace.Tracer
......
......@@ -14,7 +14,7 @@ import (
"google.golang.org/grpc/test/bufconn"
)
func newHealthServer() pb.HealthCtrlServer {
func newHealthServer() pb.HealthCtrlServiceServer {
logs := zap.NewExample()
traceProvider := tracing.GetTestTracer()
......@@ -32,20 +32,19 @@ func newHealthServer() pb.HealthCtrlServer {
return srv
}
//nolint:staticcheck
func getTestServer(ctx context.Context) (pb.HealthCtrlClient, func()) {
buffer := 101024 * 1024
func getTestServer() (pb.HealthCtrlServiceClient, func()) {
buffer := 1024 * 1024
lis := bufconn.Listen(buffer)
baseServer := grpc.NewServer()
pb.RegisterHealthCtrlServer(baseServer, newHealthServer())
pb.RegisterHealthCtrlServiceServer(baseServer, newHealthServer())
go func() {
if err := baseServer.Serve(lis); err != nil {
log.Printf("error serving server: %v", err)
}
}()
conn, err := grpc.DialContext(ctx, "",
conn, err := grpc.NewClient("passthrough:///bufnet",
grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) {
return lis.Dial()
}), grpc.WithTransportCredentials(insecure.NewCredentials()))
......@@ -61,17 +60,19 @@ func getTestServer(ctx context.Context) (pb.HealthCtrlClient, func()) {
baseServer.Stop()
}
client := pb.NewHealthCtrlClient(conn)
client := pb.NewHealthCtrlServiceClient(conn)
return client, closer
}
//nolint:paralleltest
func TestHealthServer_Health(t *testing.T) {
t.Parallel()
ctx := context.Background()
client, closer := getTestServer(ctx)
defer closer()
client, closer := getTestServer()
t.Cleanup(func() {
defer closer()
})
type expectation struct {
out *pb.HealthResponse
......@@ -97,6 +98,7 @@ func TestHealthServer_Health(t *testing.T) {
for scenario, tt := range tests {
t.Run(scenario, func(t *testing.T) {
t.Parallel()
out, err := client.Health(ctx, tt.in)
if err != nil {
if tt.expected.err.Error() != err.Error() {
......
......@@ -36,9 +36,8 @@ func newRoutingServer() pb.CtrlRoutingServiceServer {
return srv
}
//nolint:staticcheck
func getTestRoutingServer(ctx context.Context) (pb.CtrlRoutingServiceClient, func()) {
buffer := 101024 * 1024
func getTestRoutingServer() (pb.CtrlRoutingServiceClient, func()) {
buffer := 1024 * 1024
lis := bufconn.Listen(buffer)
baseServer := grpc.NewServer()
......@@ -49,7 +48,7 @@ func getTestRoutingServer(ctx context.Context) (pb.CtrlRoutingServiceClient, fun
}
}()
conn, err := grpc.DialContext(ctx, "",
conn, err := grpc.NewClient("passthrough:///bufnet",
grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) {
return lis.Dial()
}), grpc.WithTransportCredentials(insecure.NewCredentials()))
......@@ -70,12 +69,14 @@ func getTestRoutingServer(ctx context.Context) (pb.CtrlRoutingServiceClient, fun
return client, closer
}
//nolint:paralleltest
func TestRoutingServer_RequestRoute(t *testing.T) {
t.Parallel()
ctx := context.Background()
client, closer := getTestRoutingServer(ctx)
defer closer()
client, closer := getTestRoutingServer()
t.Cleanup(func() {
defer closer()
})
type expectation struct {
out *pb.RequestRouteResponse
......@@ -103,6 +104,7 @@ func TestRoutingServer_RequestRoute(t *testing.T) {
for scenario, tt := range tests {
t.Run(scenario, func(t *testing.T) {
t.Parallel()
out, err := client.RequestRoute(ctx, tt.in)
if err != nil {
if tt.expected.err.Error() != err.Error() {
......
......@@ -3,19 +3,15 @@ package main
import (
"context"
"flag"
"net"
"runtime/debug"
pb "code.fbi.h-da.de/danet/costaquanta/gen/go/kms/v1"
config "code.fbi.h-da.de/danet/costaquanta/kms/internal"
"code.fbi.h-da.de/danet/costaquanta/kms/internal/core/application"
"code.fbi.h-da.de/danet/costaquanta/kms/internal/application"
server "code.fbi.h-da.de/danet/costaquanta/kms/internal/infrastructure/interaction/grpc"
"code.fbi.h-da.de/danet/costaquanta/libs/logging"
"code.fbi.h-da.de/danet/costaquanta/libs/shutdown"
"code.fbi.h-da.de/danet/costaquanta/libs/tracing"
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/recovery"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
func main() {
......@@ -44,14 +40,12 @@ func main() {
flag.Parse()
cfg := config.GetConfig()
cfg.AddrBind = bindAddr
cfg.CtrlAddr = ctrlAddr
logs.Infof("config: %+v", cfg)
logs.Infof("binding to address %s", bindAddr)
listen, err := net.Listen("tcp", bindAddr)
if err != nil {
logs.Fatalf("failed to listen: %v", err)
}
logs.Infof("address of managing controller %s", ctrlAddr)
tracing.SetRuntimeSettings("kms")
traceProvider, err := tracing.GetTracer(context.Background(), tracing.Backend)
......@@ -64,48 +58,18 @@ func main() {
}
}()
grpcPanicRecoveryHandler := func(p any) (err error) {
logs.Errorf("recovered from gRPC panic %+v; %v", p, debug.Stack())
app := application.NewApplication(log, traceProvider)
return status.Errorf(codes.Internal, "%s", p)
grpcServer, err := server.NewGrpcServer(cfg, app, logs)
if err != nil {
panic(err)
}
// var opts []grpc.ServerOption
// if *tls {
// if *certFile == "" {
// *certFile = data.Path("x509/server_cert.pem")
// }
// if *keyFile == "" {
// *keyFile = data.Path("x509/server_key.pem")
// }
// creds, err := credentials.NewServerTLSFromFile(*certFile, *keyFile)
// if err != nil {
// log.Fatalf("Failed to generate credentials: %v", err)
// }
// opts = []grpc.ServerOption{grpc.Creds(creds)}
// }
// grpcServer := grpc.NewServer(opts...)
grpcServer := grpc.NewServer(
grpc.StatsHandler(otelgrpc.NewServerHandler()),
grpc.ChainUnaryInterceptor(
// srvMetrics.UnaryServerInterceptor(grpcprom.WithExemplarFromContext(exemplarFromContext)),
// logging.UnaryServerInterceptor(interceptorLogger(rpcLogger), logging.WithFieldsFromContext(logTraceID)),
// selector.UnaryServerInterceptor(auth.UnaryServerInterceptor(authFn), selector.MatchFunc(allButHealthZ)),
recovery.UnaryServerInterceptor(recovery.WithRecoveryHandler(grpcPanicRecoveryHandler)),
),
grpc.ChainStreamInterceptor(
// srvMetrics.StreamServerInterceptor(grpcprom.WithExemplarFromContext(exemplarFromContext)),
// logging.StreamServerInterceptor(interceptorLogger(rpcLogger), logging.WithFieldsFromContext(logTraceID)),
// selector.StreamServerInterceptor(auth.StreamServerInterceptor(authFn), selector.MatchFunc(allButHealthZ)),
recovery.StreamServerInterceptor(
recovery.WithRecoveryHandler(grpcPanicRecoveryHandler),
),
),
go grpcServer.Start(
func(server *grpc.Server, app *application.Application) {
pb.RegisterHealthKmsServiceServer(server, app.HealthServer)
},
)
app := application.NewApplication(log, traceProvider)
pb.RegisterHealthKmsServer(grpcServer, app.HealthServer)
_ = grpcServer.Serve(listen)
shutdown.AddShutdownHook(logs, grpcServer)
}
package server
import (
"io"
"net"
"runtime/debug"
config "code.fbi.h-da.de/danet/costaquanta/kms/internal"
"code.fbi.h-da.de/danet/costaquanta/kms/internal/application"
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/recovery"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// GRPCServer is an interface that defines the methods for a gRPC server.
// It is used to start the server and register services.
// It also implements the io.Closer interface to allow for graceful shutdown.
type GRPCServer interface {
Start(func(server *grpc.Server, app *application.Application))
io.Closer
}
type gRPCServer struct {
logger *zap.SugaredLogger
grpcServer *grpc.Server
config config.Config
app *application.Application
}
func NewGrpcServer(
config config.Config,
app *application.Application,
logger *zap.SugaredLogger,
) (GRPCServer, error) {
options, err := buildOptions(logger)
if err != nil {
return nil, err
}
server := grpc.NewServer(options...)
return &gRPCServer{
config: config,
grpcServer: server,
app: app,
logger: logger,
}, err
}
func (g gRPCServer) Start(serviceRegister func(server *grpc.Server, app *application.Application)) {
grpcListener, err := net.Listen("tcp", ""+g.config.AddrBind)
if err != nil {
g.logger.Fatal("failed to start grpc server", zap.Any("err", err))
}
serviceRegister(g.grpcServer, g.app)
g.logger.Info("start grpc server success ", zap.Any("endpoint", grpcListener.Addr()))
if err := g.grpcServer.Serve(grpcListener); err != nil {
g.logger.Fatal("failed to grpc server serve", zap.Any("err", err))
}
}
func (g gRPCServer) Close() error {
g.grpcServer.GracefulStop()
return nil
}
func buildOptions(
logs *zap.SugaredLogger,
) ([]grpc.ServerOption, error) {
grpcPanicRecoveryHandler := func(p any) (err error) {
logs.Errorf("recovered from gRPC panic %+v; %v", p, debug.Stack())
return status.Errorf(codes.Internal, "%s", p)
}
return []grpc.ServerOption{
grpc.StatsHandler(otelgrpc.NewServerHandler()),
grpc.ChainUnaryInterceptor(
recovery.UnaryServerInterceptor(recovery.WithRecoveryHandler(grpcPanicRecoveryHandler)),
),
grpc.ChainStreamInterceptor(
recovery.StreamServerInterceptor(
recovery.WithRecoveryHandler(grpcPanicRecoveryHandler),
),
),
}, nil
}
......@@ -9,7 +9,7 @@ import (
)
type HealthServer struct {
pb.UnimplementedHealthKmsServer
pb.UnimplementedHealthKmsServiceServer
logger *zap.SugaredLogger
tracer trace.Tracer
......@@ -17,14 +17,14 @@ type HealthServer struct {
func (h *HealthServer) Health(
ctx context.Context,
request *pb.HealthKmsRequest,
) (*pb.HealthKmsResponse, error) {
request *pb.HealthRequest,
) (*pb.HealthResponse, error) {
h.logger.Debugf("got health request %+v", request)
_, span := h.tracer.Start(ctx, "Health")
defer span.End()
return &pb.HealthKmsResponse{
return &pb.HealthResponse{
Message: request.GetMessage(),
}, nil
}
......
package interaction
import (
"context"
"log"
"net"
"testing"
pb "code.fbi.h-da.de/danet/costaquanta/gen/go/kms/v1"
"code.fbi.h-da.de/danet/costaquanta/libs/tracing"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/test/bufconn"
)
func newHealthServer() pb.HealthKmsServiceServer {
logs := zap.NewExample()
traceProvider := tracing.GetTestTracer()
defer func() {
if err := traceProvider.Shutdown(context.Background()); err != nil {
logs.Info(err.Error())
}
}()
srv := NewHealthServer(
zap.NewExample().Sugar(),
traceProvider.Tracer("test"),
)
return srv
}
func getTestServer() (pb.HealthKmsServiceClient, func()) {
buffer := 1024 * 1024
lis := bufconn.Listen(buffer)
baseServer := grpc.NewServer()
pb.RegisterHealthKmsServiceServer(baseServer, newHealthServer())
go func() {
if err := baseServer.Serve(lis); err != nil {
log.Printf("error serving server: %v", err)
}
}()
conn, err := grpc.NewClient("passthrough:///bufnet",
grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) {
return lis.Dial()
}), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Printf("error connecting to server: %v", err)
}
closer := func() {
err := lis.Close()
if err != nil {
log.Printf("error closing listener: %v", err)
}
baseServer.Stop()
}
client := pb.NewHealthKmsServiceClient(conn)
return client, closer
}
func TestHealthServer_Health(t *testing.T) {
t.Parallel()
ctx := context.Background()
client, closer := getTestServer()
t.Cleanup(func() {
defer closer()
})
type expectation struct {
out *pb.HealthResponse
err error
}
tests := map[string]struct {
in *pb.HealthRequest
expected expectation
}{
"echo-should-succeed": {
in: &pb.HealthRequest{
Message: "hello-world",
},
expected: expectation{
out: &pb.HealthResponse{
Message: "hello-world",
},
err: nil,
},
},
}
for scenario, tt := range tests {
t.Run(scenario, func(t *testing.T) {
t.Parallel()
out, err := client.Health(ctx, tt.in)
if err != nil {
if tt.expected.err.Error() != err.Error() {
t.Errorf("Err -> \nWant: %q\nGot: %q\n", tt.expected.err, err)
}
} else {
if tt.expected.out.GetMessage() != out.GetMessage() {
t.Errorf("Out -> \nWant: %q\nGot : %q", tt.expected.out, out)
}
}
})
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment