Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
plugin.go 2.26 KiB
package server

import (
	"bytes"
	"context"
	"errors"
	"io"
	"time"

	pipb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/plugin-internal"
	rpb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/plugin-registry"
	"code.fbi.h-da.de/danet/gosdn/controller/interfaces/plugin"
	"code.fbi.h-da.de/danet/gosdn/controller/metrics"
	"code.fbi.h-da.de/danet/gosdn/controller/store"
	"github.com/google/uuid"
	"github.com/prometheus/client_golang/prometheus"
	log "github.com/sirupsen/logrus"
)

type PluginInternalServer struct {
	pipb.UnimplementedPluginInternalServiceServer
	pluginRegistryClient rpb.PluginRegistryServiceClient
	pluginService        plugin.Service
}

func NewPluginInternalServer(pluginRegistryClient rpb.PluginRegistryServiceClient, pluginService plugin.Service) *PluginInternalServer {
	return &PluginInternalServer{
		pluginRegistryClient: pluginRegistryClient,
		pluginService:        pluginService,
	}
}

func (pis *PluginInternalServer) AvailablePlugins(ctx context.Context, request *pipb.GetAvailablePluginsRequest) (*rpb.GetResponse, error) {
	ctx, cancel := context.WithTimeout(ctx, time.Minute*1)
	defer cancel()

	getAllRequest := &rpb.GetAllRequest{
		Timestamp: time.Now().UnixNano(),
	}

	return pis.pluginRegistryClient.GetAll(ctx, getAllRequest)
}

func (pis *PluginInternalServer) GetPluginSchema(request *pipb.GetPluginSchemaRequest, stream pipb.PluginInternalService_GetPluginSchemaServer) error {
	labels := prometheus.Labels{"service": "plugin", "rpc": "get plugin schema"}
	start := metrics.StartHook(labels, grpcRequestsTotal)
	defer metrics.FinishHook(labels, start, grpcRequestDurationSecondsTotal, grpcRequestDurationSeconds)

	buffer := make([]byte, int(MB))
	plugin, err := pis.pluginService.Get(store.Query{ID: uuid.MustParse(request.GetPid())})
	if err != nil {
		return handleRPCError(labels, err)
	}
	schema, err := plugin.SchemaTreeGzip()
	if err != nil {
		return handleRPCError(labels, err)
	}
	schemaReader := bytes.NewReader(schema)

	for {
		n, err := schemaReader.Read(buffer)
		if err != nil {
			if errors.Is(err, io.EOF) {
				log.Println(err)
			}
			break
		}
		log.WithField("n", n).Trace("read bytes")
		payload := &pipb.PluginSchemaPayload{Chunk: buffer[:n]}
		err = stream.Send(payload)
		if err != nil {
			return handleRPCError(labels, err)
		}
	}

	return nil
}