-
Malte Bauch authoredMalte Bauch authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
plugin.go 2.26 KiB
package server
import (
"bytes"
"context"
"errors"
"io"
"time"
pipb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/plugin-internal"
rpb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/plugin-registry"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/plugin"
"code.fbi.h-da.de/danet/gosdn/controller/metrics"
"code.fbi.h-da.de/danet/gosdn/controller/store"
"github.com/google/uuid"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
)
type PluginInternalServer struct {
pipb.UnimplementedPluginInternalServiceServer
pluginRegistryClient rpb.PluginRegistryServiceClient
pluginService plugin.Service
}
func NewPluginInternalServer(pluginRegistryClient rpb.PluginRegistryServiceClient, pluginService plugin.Service) *PluginInternalServer {
return &PluginInternalServer{
pluginRegistryClient: pluginRegistryClient,
pluginService: pluginService,
}
}
func (pis *PluginInternalServer) AvailablePlugins(ctx context.Context, request *pipb.GetAvailablePluginsRequest) (*rpb.GetResponse, error) {
ctx, cancel := context.WithTimeout(ctx, time.Minute*1)
defer cancel()
getAllRequest := &rpb.GetAllRequest{
Timestamp: time.Now().UnixNano(),
}
return pis.pluginRegistryClient.GetAll(ctx, getAllRequest)
}
func (pis *PluginInternalServer) GetPluginSchema(request *pipb.GetPluginSchemaRequest, stream pipb.PluginInternalService_GetPluginSchemaServer) error {
labels := prometheus.Labels{"service": "plugin", "rpc": "get plugin schema"}
start := metrics.StartHook(labels, grpcRequestsTotal)
defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)
buffer := make([]byte, int(MB))
plugin, err := pis.pluginService.Get(store.Query{ID: uuid.MustParse(request.GetPid())})
if err != nil {
return handleRPCError(labels, err)
}
schema, err := plugin.SchemaTreeGzip()
if err != nil {
return handleRPCError(labels, err)
}
schemaReader := bytes.NewReader(schema)
for {
n, err := schemaReader.Read(buffer)
if err != nil {
if errors.Is(err, io.EOF) {
log.Println(err)
}
break
}
log.WithField("n", n).Trace("read bytes")
payload := &pipb.PluginSchemaPayload{Chunk: buffer[:n]}
err = stream.Send(payload)
if err != nil {
return handleRPCError(labels, err)
}
}
return nil
}