Skip to content
Snippets Groups Projects
cli-handling.go 4.02 KiB
Newer Older
  • Learn to ignore specific revisions
  • /*
    	This file contains the grpc cli server-side calls.
        Functions here should call other functions in charge of the
    	particular task.
    
    package nucleus
    
    import (
    
    Malte Bauch's avatar
    Malte Bauch committed
    	"io"
    
    Malte Bauch's avatar
    Malte Bauch committed
    	"os"
    
    Malte Bauch's avatar
    Malte Bauch committed
    	pb "code.fbi.h-da.de/cocsn/gosdn/api/proto"
    
    	"code.fbi.h-da.de/cocsn/gosdn/log"
    	"code.fbi.h-da.de/cocsn/gosdn/sbi/restconf/client/ciena"
    
    	"google.golang.org/grpc"
    
    Malte Bauch's avatar
    Malte Bauch committed
    	"google.golang.org/grpc/health"
    	healthpb "google.golang.org/grpc/health/grpc_health_v1"
    
    	"google.golang.org/protobuf/types/known/emptypb"
    
    Malte Bauch's avatar
    Malte Bauch committed
    type logConnection struct {
    
    	stream pb.GrpcCli_CreateLogStreamServer
    	id     string
    	active bool
    	error  chan error
    }
    
    
    // server is used to implement the grcp cli server
    
    type server struct {
    
    	pb.UnimplementedGrpcCliServer
    
    Malte Bauch's avatar
    Malte Bauch committed
    	logConnections []*logConnection
    
    Malte Bauch's avatar
    Malte Bauch committed
    var srv *server
    
    type buf []byte
    
    func (b *buf) Write(p []byte) (n int, err error) {
    
    	reply := pb.LogReply{Log: string(p)}
    	srv.BroadcastLog(&reply)
    	return len(p), nil
    }
    
    
    func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
    
    	log.Info("Received: ", in.GetName())
    
    	return &pb.HelloReply{Message: "Hello " + in.GetName(), GoSDNInfo: "goSDN in version: DEVELOP"}, nil
    
    //GetLog creates a continuous stream between client and server to send goSDN logs
    func (s *server) CreateLogStream(req *emptypb.Empty, stream pb.GrpcCli_CreateLogStreamServer) error {
    
    Malte Bauch's avatar
    Malte Bauch committed
    	conn := &logConnection{
    
    		stream: stream,
    		active: true,
    		error:  make(chan error),
    	}
    	s.logConnections = append(s.logConnections, conn)
    
    	return <-conn.error
    }
    
    func (s *server) BroadcastLog(log *pb.LogReply) {
    	wait := sync.WaitGroup{}
    	done := make(chan int)
    
    	for _, conn := range s.logConnections {
    		wait.Add(1)
    
    
    Malte Bauch's avatar
    Malte Bauch committed
    		go func(conn *logConnection) {
    
    			defer wait.Done()
    			if conn.active {
    				err := conn.stream.Send(log)
    
    				if err != nil {
    					conn.active = false
    					conn.error <- err
    				}
    			}
    		}(conn)
    	}
    
    	go func() {
    		//blocks until all send routines are finished
    		wait.Wait()
    		close(done)
    	}()
    
    	<-done
    }
    
    
    func (s *server) Shutdown(ctx context.Context, in *pb.ShutdownRequest) (*pb.ShutdownReply, error) {
    
    	log.Info("Shutdown Received: ", in.GetName())
    
    	return &pb.ShutdownReply{Message: "Shutdown " + in.GetName()}, nil
    }
    
    func getCLIGoing(core *Core) {
    
    Malte Bauch's avatar
    Malte Bauch committed
    	var (
    		logConnections []*logConnection
    		logBuffer      buf
    		system         = ""
    	)
    
    	log.Info("Starting: GetCLIGoing")
    
    	// Boot-up the control interface for the cli
    	cliControlListener, err := net.Listen("tcp", core.config.CliSocket)
    	if err != nil {
    
    		log.Fatal(err)
    
    	}
    
    	cliControlServer := grpc.NewServer()
    
    Malte Bauch's avatar
    Malte Bauch committed
    	healthCheck := health.NewServer()
    
    Malte Bauch's avatar
    Malte Bauch committed
    	srv = &server{core: core, logConnections: logConnections}
    
    Malte Bauch's avatar
    Malte Bauch committed
    	//TODO: move?
    	wrt := io.MultiWriter(os.Stdout, &logBuffer)
    	log.Output(wrt)
    
    Malte Bauch's avatar
    Malte Bauch committed
    	healthpb.RegisterHealthServer(cliControlServer, healthCheck)
    
    Malte Bauch's avatar
    Malte Bauch committed
    	pb.RegisterGrpcCliServer(cliControlServer, srv)
    
    Malte Bauch's avatar
    Malte Bauch committed
    	healthCheck.SetServingStatus(system, healthpb.HealthCheckResponse_SERVING)
    
    
    	if err := cliControlServer.Serve(cliControlListener); err != nil {
    
    		log.Fatal(err)
    
    }
    
    // SBI specific calls, by now TAPI only
    func (s *server) TAPIGetEdge(ctx context.Context, in *pb.TAPIRequest) (*pb.TAPIReply, error) {
    
    	log.Info("Received: ", in.GetName())
    
    	if err := s.core.clients["ciena-mcp"].(*ciena.MCPClient).GetNodes(); err != nil {
    		log.Error(err)
    
    		return &pb.TAPIReply{Message: "TAPI error"}, nil
    
    	return &pb.TAPIReply{Message: "Done"}, nil
    }
    
    func (s *server) TAPIGetEdgeNode(ctx context.Context, in *pb.TAPIRequest) (*pb.TAPIReply, error) {
    
    	log.Info("Received: ", in.GetName())
    
    	if err := s.core.clients["ciena-mcp"].(*ciena.MCPClient).GetNodeEdgePoints(); err != nil {
    		log.Error(err)
    
    		return &pb.TAPIReply{Message: "TAPI error"}, nil
    
    	return &pb.TAPIReply{Message: "Done"}, nil
    }
    
    func (s *server) TAPIGetLink(ctx context.Context, in *pb.TAPIRequest) (*pb.TAPIReply, error) {
    
    	log.Info("Received: ", in.GetName())
    
    	if err := s.core.clients["ciena-mcp"].(*ciena.MCPClient).GetLinks(); err != nil {
    		log.Error(err)
    
    		return &pb.TAPIReply{Message: "TAPI error"}, nil
    
    	return &pb.TAPIReply{Message: "Done"}, nil
    }