/* This file contains the grpc cli server-side calls. Functions here should call other functions in charge of the particular task. */ package nucleus import ( "context" "io" "net" "os" "sync" pb "code.fbi.h-da.de/cocsn/gosdn/api/proto" "code.fbi.h-da.de/cocsn/gosdn/log" "code.fbi.h-da.de/cocsn/gosdn/sbi/restconf/client/ciena" "google.golang.org/grpc" "google.golang.org/grpc/health" healthpb "google.golang.org/grpc/health/grpc_health_v1" "google.golang.org/protobuf/types/known/emptypb" ) type logConnection struct { stream pb.GrpcCli_CreateLogStreamServer id string active bool error chan error } // server is used to implement the grcp cli server type server struct { pb.UnimplementedGrpcCliServer core *Core logConnections []*logConnection } var srv *server type buf []byte func (b *buf) Write(p []byte) (n int, err error) { reply := pb.LogReply{Log: string(p)} srv.BroadcastLog(&reply) return len(p), nil } func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) { log.Info("Received: ", in.GetName()) return &pb.HelloReply{Message: "Hello " + in.GetName(), GoSDNInfo: "goSDN in version: DEVELOP"}, nil } //GetLog creates a continuous stream between client and server to send goSDN logs func (s *server) CreateLogStream(req *emptypb.Empty, stream pb.GrpcCli_CreateLogStreamServer) error { conn := &logConnection{ stream: stream, active: true, error: make(chan error), } s.logConnections = append(s.logConnections, conn) return <-conn.error } func (s *server) BroadcastLog(log *pb.LogReply) { wait := sync.WaitGroup{} done := make(chan int) for _, conn := range s.logConnections { wait.Add(1) go func(conn *logConnection) { defer wait.Done() if conn.active { err := conn.stream.Send(log) if err != nil { conn.active = false conn.error <- err } } }(conn) } go func() { //blocks until all send routines are finished wait.Wait() close(done) }() <-done } func (s *server) Shutdown(ctx context.Context, in *pb.ShutdownRequest) (*pb.ShutdownReply, error) { log.Info("Shutdown Received: ", in.GetName()) s.core.IsRunning <- false return &pb.ShutdownReply{Message: "Shutdown " + in.GetName()}, nil } func getCLIGoing(core *Core) { var ( logConnections []*logConnection logBuffer buf system = "" ) log.Info("Starting: GetCLIGoing") // Boot-up the control interface for the cli cliControlListener, err := net.Listen("tcp", core.config.CliSocket) if err != nil { log.Fatal(err) } cliControlServer := grpc.NewServer() healthCheck := health.NewServer() srv = &server{core: core, logConnections: logConnections} //TODO: move? wrt := io.MultiWriter(os.Stdout, &logBuffer) log.Output(wrt) healthpb.RegisterHealthServer(cliControlServer, healthCheck) pb.RegisterGrpcCliServer(cliControlServer, srv) healthCheck.SetServingStatus(system, healthpb.HealthCheckResponse_SERVING) if err := cliControlServer.Serve(cliControlListener); err != nil { log.Fatal(err) } } // SBI specific calls, by now TAPI only func (s *server) TAPIGetEdge(ctx context.Context, in *pb.TAPIRequest) (*pb.TAPIReply, error) { log.Info("Received: ", in.GetName()) if err := s.core.clients["ciena-mcp"].(*ciena.MCPClient).GetNodes(); err != nil { log.Error(err) return &pb.TAPIReply{Message: "TAPI error"}, nil } return &pb.TAPIReply{Message: "Done"}, nil } func (s *server) TAPIGetEdgeNode(ctx context.Context, in *pb.TAPIRequest) (*pb.TAPIReply, error) { log.Info("Received: ", in.GetName()) if err := s.core.clients["ciena-mcp"].(*ciena.MCPClient).GetNodeEdgePoints(); err != nil { log.Error(err) return &pb.TAPIReply{Message: "TAPI error"}, nil } return &pb.TAPIReply{Message: "Done"}, nil } func (s *server) TAPIGetLink(ctx context.Context, in *pb.TAPIRequest) (*pb.TAPIReply, error) { log.Info("Received: ", in.GetName()) if err := s.core.clients["ciena-mcp"].(*ciena.MCPClient).GetLinks(); err != nil { log.Error(err) return &pb.TAPIReply{Message: "TAPI error"}, nil } return &pb.TAPIReply{Message: "Done"}, nil }