Newer
Older
package csbi
import (
"context"
"fmt"
"io"
"os"
"path/filepath"
"time"
pb "code.fbi.h-da.de/danet/api/go/gosdn/csbi"
"github.com/google/uuid"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
)
type byteSize float64
// constants representing human friendly data sizes as per https://www.socketloop.com/tutorials/golang-how-to-declare-kilobyte-megabyte-gigabyte-terabyte-and-so-on
const (
_ = iota // ignore first value by assigning to blank identifier
KB byteSize = 1 << (10 * iota)
MB
GB
TB
PB
EB
ZB
YB
)
type pluginStream interface {
Send(*pb.Payload) error
grpc.ServerStream
}
type server struct {
pb.UnimplementedCsbiServer
orchestrator Orchestrator
}
func (s server) Get(ctx context.Context, req *pb.GetRequest) (*pb.GetResponse, error) {
labels := prometheus.Labels{"rpc": "get"}
start := time.Now()
defer promEndHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
deployments := make([]*pb.Deployment, len(req.Did))
for i, id := range req.Did {
deploymentID, _ := uuid.Parse(id)
dep, err := s.orchestrator.Get(deploymentID)
if err != nil {
}
deployments[i] = &pb.Deployment{
Id: dep.ID.String(),
Name: dep.Name,
State: dep.State,
}
}
return &pb.GetResponse{
Timestamp: time.Now().UnixNano(),
Deployments: deployments,
}, nil
}
func (s server) Create(ctx context.Context, req *pb.CreateRequest) (*pb.CreateResponse, error) {
labels := prometheus.Labels{"rpc": "create"}
start := promStartHook(labels, grpcRequestsTotal)
defer promEndHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
deployments := make([]*pb.Deployment, len(req.TransportOption))
for i, opt := range req.TransportOption {
model, err := Discover(ctx, opt)
if err != nil {
ctx = context.WithValue(ctx, "target-address", opt.Address) //nolint
d, err := s.orchestrator.Build(ctx, model)
}
deployments[i] = &pb.Deployment{
Id: d.ID.String(),
Name: d.Name,
State: d.State,
}
}
return &pb.CreateResponse{
Timestamp: time.Now().UnixNano(),
Deployments: deployments,
}, nil
}
// TODO(maba): add description and consider to allow requesting
func (s server) GetPlugin(req *pb.GetRequest, stream pb.Csbi_GetPluginServer) error {
log.Info("started GetPlugin")
labels := prometheus.Labels{"rpc": "get_go_struct"}
start := promStartHook(labels, grpcRequestsTotal)
defer promEndHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
deploymentID, _ := uuid.Parse(req.Did[0])
dep, err := s.orchestrator.Get(deploymentID)
if err != nil {
return handleRPCError(labels, err)
}
return sendPlugin(dep.ID, labels, stream)
func (s server) CreatePlugin(req *pb.CreateRequest, stream pb.Csbi_CreatePluginServer) error {
log.Info("started CreatePlugin")
labels := prometheus.Labels{"rpc": "create_plugin"}
start := promStartHook(labels, grpcRequestsTotal)
defer promEndHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
ctx := context.Background()
for _, opt := range req.TransportOption {
model, err := Discover(ctx, opt)
if err != nil {
d, err := Generate(ctx, model, s.orchestrator.Repository(), opt.Type)
err = sendPlugin(d.ID, labels, stream)
if err != nil {
return handleRPCError(labels, err)
}
}
return nil
}
// sendPlugin takes a
func sendPlugin(id uuid.UUID, labels prometheus.Labels, stream pluginStream) error {
file, err := os.Open(filepath.Join(id.String(), "plugin.so"))
if err != nil {
return handleRPCError(labels, err)
}
defer file.Close()
buffer := make([]byte, int(MB))
for {
n, err := file.Read(buffer)
if err != nil {
if err != io.EOF {
fmt.Println(err)
break
}
log.WithField("n", n).Trace("read bytes")
payload := &pb.Payload{Chunk: buffer[:n]}
err = stream.Send(payload)
if err != nil {
return handleRPCError(labels, err)
return nil
}
func (s server) Delete(ctx context.Context, req *pb.DeleteRequest) (*pb.DeleteResponse, error) {
labels := prometheus.Labels{"rpc": "delete"}
start := promStartHook(labels, grpcRequestsTotal)
defer promEndHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
for _, id := range req.Did {
log.Infof("processing deletion request for %v", id)
deploymentID, _ := uuid.Parse(id)
if err := s.orchestrator.Destroy(ctx, deploymentID); err != nil {
}
}
return &pb.DeleteResponse{
Timestamp: time.Now().UnixNano(),
Status: pb.DeleteResponse_OK,
}, nil
}
func handleRPCError(labels prometheus.Labels, err error) error {
return status.Errorf(codes.Aborted, "%v", promHandleError(labels, err, grpcAPIErrorsTotal))
}