Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package main
import (
"context"
"errors"
"fmt"
"io"
"os"
"time"
pb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/plugin-registry"
"github.com/google/uuid"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
)
type Server struct {
pb.UnimplementedPluginRegistryServiceServer
registry *PluginRegistry
}
func NewServer(registry *PluginRegistry) *Server {
return &Server{
registry: registry,
}
}
func (s *Server) Get(ctx context.Context, req *pb.GetRequest) (*pb.GetResponse, error) {
pluginResultSlice := make([]*pb.Plugin, 0)
for _, query := range req.Query {
var plugin *Plugin
var err error
switch x := query.Identifier.(type) {
case *pb.Query_Id:
id, err := uuid.Parse(x.Id)
if err != nil {
return nil, status.Errorf(codes.Aborted, "%v", err)
}
plugin, err = s.registry.GetPluginByID(id)
if err != nil {
return nil, status.Errorf(codes.Aborted, "%v", err)
}
case *pb.Query_Name:
plugin, err = s.registry.GetPluginByName(x.Name)
if err != nil {
return nil, status.Errorf(codes.Aborted, "%v", err)
}
case nil:
return nil, status.Errorf(codes.Aborted, "Query.Identifier is not set")
default:
return nil, status.Errorf(codes.Aborted, "Unsupported type for identifier: %T", x)
}
// add the found plugin to the result slice
pluginResultSlice = append(pluginResultSlice, pluginConverter(plugin))
}
return &pb.GetResponse{
Timestamp: time.Now().UnixNano(),
Plugins: pluginResultSlice,
}, nil
}
func (s *Server) GetAll(ctx context.Context, req *pb.GetAllRequest) (*pb.GetResponse, error) {
pluginResultSlice := make([]*pb.Plugin, 0)
for _, plugin := range s.registry.Plugins {
pluginResultSlice = append(pluginResultSlice, pluginConverter(plugin))
}
return &pb.GetResponse{
Timestamp: time.Now().UnixNano(),
Plugins: pluginResultSlice,
}, nil
}
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
func (s *Server) Download(req *pb.GetDownloadRequest, stream pb.PluginRegistryService_DownloadServer) (err error) {
var file *os.File
defer func() {
if ferr := file.Close(); ferr != nil {
fErrString := ferr.Error()
err = fmt.Errorf("InternalError=%w DeferError=%+s", err, fErrString)
}
}()
// validate if req.ID() is a valid uuid.UUID
idAsUUID, err := uuid.Parse(req.GetId())
if err != nil {
return status.Errorf(codes.Aborted, "%v", err)
}
plugin, err := s.registry.GetPluginByID(idAsUUID)
if err != nil {
return status.Errorf(codes.Aborted, "%v", err)
}
file, err = os.Open(plugin.Path)
if err != nil {
return status.Errorf(codes.Aborted, "%v", err)
}
// buffer of kilobyte
buffer := make([]byte, int(1<<(10*1)))
for {
n, err := file.Read(buffer)
if err != nil {
if errors.Is(err, io.EOF) {
fmt.Println(err)
}
break
}
log.WithField("n", n).Trace("read bytes")
payload := &pb.GetDownloadPayload{Chunk: buffer[:n]}
err = stream.Send(payload)
if err != nil {
return status.Errorf(codes.Aborted, "%v", err)
}
}
return nil
}
func (s *Server) Delete(ctx context.Context, req *pb.DeleteRequest) (*pb.DeleteResponse, error) {
return nil, status.Error(codes.Unimplemented, "The delete method is currently not implemented.")