Skip to content
Snippets Groups Projects
grpc.go 5.08 KiB
Newer Older
  • Learn to ignore specific revisions
  • Manuel Kieweg's avatar
    Manuel Kieweg committed
    package csbi
    
    import (
    	"context"
    	"fmt"
    	"io"
    	"os"
    	"path/filepath"
    	"time"
    
    	pb "code.fbi.h-da.de/danet/api/go/gosdn/csbi"
    	"github.com/google/uuid"
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    	"github.com/prometheus/client_golang/prometheus"
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    	log "github.com/sirupsen/logrus"
    
    	"google.golang.org/grpc"
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    	codes "google.golang.org/grpc/codes"
    	status "google.golang.org/grpc/status"
    )
    
    type byteSize float64
    
    // constants representing human friendly data sizes as per https://www.socketloop.com/tutorials/golang-how-to-declare-kilobyte-megabyte-gigabyte-terabyte-and-so-on
    const (
    	_           = iota // ignore first value by assigning to blank identifier
    	KB byteSize = 1 << (10 * iota)
    	MB
    	GB
    	TB
    	PB
    	EB
    	ZB
    	YB
    )
    
    
    type pluginStream interface {
    	Send(*pb.Payload) error
    	grpc.ServerStream
    }
    
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    type server struct {
    	pb.UnimplementedCsbiServer
    	orchestrator Orchestrator
    }
    
    func (s server) Get(ctx context.Context, req *pb.GetRequest) (*pb.GetResponse, error) {
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    	labels := prometheus.Labels{"rpc": "get"}
    	start := time.Now()
    	defer promEndHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
    
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    	deployments := make([]*pb.Deployment, len(req.Did))
    	for i, id := range req.Did {
    		deploymentID, _ := uuid.Parse(id)
    		dep, err := s.orchestrator.Get(deploymentID)
    		if err != nil {
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    			return nil, handleRPCError(labels, err)
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    		}
    		deployments[i] = &pb.Deployment{
    			Id:    dep.ID.String(),
    			Name:  dep.Name,
    			State: dep.State,
    		}
    	}
    
    	return &pb.GetResponse{
    		Timestamp:   time.Now().UnixNano(),
    		Deployments: deployments,
    	}, nil
    }
    
    func (s server) Create(ctx context.Context, req *pb.CreateRequest) (*pb.CreateResponse, error) {
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    	labels := prometheus.Labels{"rpc": "create"}
    	start := promStartHook(labels, grpcRequestsTotal)
    	defer promEndHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
    
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    	deployments := make([]*pb.Deployment, len(req.TransportOption))
    	for i, opt := range req.TransportOption {
    		model, err := Discover(ctx, opt)
    		if err != nil {
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    			return nil, handleRPCError(labels, err)
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    		}
    
    		ctx = context.WithValue(ctx, "target-address", opt.Address) //nolint
    		d, err := s.orchestrator.Build(ctx, model)
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    		if err != nil {
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    			return nil, handleRPCError(labels, err)
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    		}
    		deployments[i] = &pb.Deployment{
    			Id:    d.ID.String(),
    			Name:  d.Name,
    			State: d.State,
    		}
    	}
    	return &pb.CreateResponse{
    		Timestamp:   time.Now().UnixNano(),
    		Deployments: deployments,
    	}, nil
    }
    
    
    // TODO(maba): add description and consider to allow requesting
    func (s server) GetPlugin(req *pb.GetRequest, stream pb.Csbi_GetPluginServer) error {
    	log.Info("started GetPlugin")
    
    	labels := prometheus.Labels{"rpc": "get_go_struct"}
    	start := promStartHook(labels, grpcRequestsTotal)
    	defer promEndHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
    
    	deploymentID, _ := uuid.Parse(req.Did[0])
    	dep, err := s.orchestrator.Get(deploymentID)
    	if err != nil {
    		return handleRPCError(labels, err)
    	}
    
    
    	return sendPlugin(dep.ID, labels, stream)
    
    
    func (s server) CreatePlugin(req *pb.CreateRequest, stream pb.Csbi_CreatePluginServer) error {
    	log.Info("started CreatePlugin")
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    	labels := prometheus.Labels{"rpc": "create_plugin"}
    	start := promStartHook(labels, grpcRequestsTotal)
    	defer promEndHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
    
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    	ctx := context.Background()
    	for _, opt := range req.TransportOption {
    		model, err := Discover(ctx, opt)
    		if err != nil {
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    			return handleRPCError(labels, err)
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    		}
    
    		d, err := Generate(ctx, model, s.orchestrator.Repository(), opt.Type)
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    		if err != nil {
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    			return handleRPCError(labels, err)
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    		}
    
    		buildPlugin(d.ID)
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    		if err != nil {
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    			return handleRPCError(labels, err)
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    		}
    
    		err = sendPlugin(d.ID, labels, stream)
    		if err != nil {
    			return handleRPCError(labels, err)
    		}
    	}
    	return nil
    }
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    
    
    // sendPlugin takes a
    func sendPlugin(id uuid.UUID, labels prometheus.Labels, stream pluginStream) error {
    	file, err := os.Open(filepath.Join(id.String(), "plugin.so"))
    	if err != nil {
    		return handleRPCError(labels, err)
    	}
    	defer file.Close()
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    
    
    	buffer := make([]byte, int(MB))
    
    	for {
    		n, err := file.Read(buffer)
    		if err != nil {
    			if err != io.EOF {
    				fmt.Println(err)
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    			}
    
    			break
    		}
    		log.WithField("n", n).Trace("read bytes")
    		payload := &pb.Payload{Chunk: buffer[:n]}
    		err = stream.Send(payload)
    		if err != nil {
    			return handleRPCError(labels, err)
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    		}
    	}
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    	return nil
    }
    
    func (s server) Delete(ctx context.Context, req *pb.DeleteRequest) (*pb.DeleteResponse, error) {
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    	labels := prometheus.Labels{"rpc": "delete"}
    	start := promStartHook(labels, grpcRequestsTotal)
    	defer promEndHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
    
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    	for _, id := range req.Did {
    		log.Infof("processing deletion request for %v", id)
    		deploymentID, _ := uuid.Parse(id)
    		if err := s.orchestrator.Destroy(ctx, deploymentID); err != nil {
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    			return nil, handleRPCError(labels, err)
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    		}
    	}
    	return &pb.DeleteResponse{
    		Timestamp: time.Now().UnixNano(),
    		Status:    pb.DeleteResponse_OK,
    	}, nil
    }
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    
    func handleRPCError(labels prometheus.Labels, err error) error {
    	return status.Errorf(codes.Aborted, "%v", promHandleError(labels, err, grpcAPIErrorsTotal))
    }