Skip to content
Snippets Groups Projects
orchestrator.go 6.96 KiB
Newer Older
  • Learn to ignore specific revisions
  • package csbi
    
    import (
    	"context"
    	"errors"
    	"fmt"
    	"path/filepath"
    	"time"
    
    	pb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/csbi"
    	spb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/southbound"
    	"code.fbi.h-da.de/danet/gosdn/csbi/config"
    	"github.com/docker/docker/api/types"
    	"github.com/docker/docker/api/types/container"
    	"github.com/docker/docker/api/types/network"
    	"github.com/docker/docker/client"
    	"github.com/docker/docker/pkg/stdcopy"
    	"github.com/google/uuid"
    	gpb "github.com/openconfig/gnmi/proto/gnmi"
    	"github.com/prometheus/client_golang/prometheus"
    	log "github.com/sirupsen/logrus"
    )
    
    // OrchestratorType is used as a selector for the NewOrchestrator function
    // and determines which implementation of the Orchestrator interface will
    // be returned.
    type OrchestratorType int
    
    // nolint
    const (
    	Docker OrchestratorType = iota
    )
    
    
    // Orchestrator manages the lifecycle of cSBI deployments.
    
    type Orchestrator interface {
    	Build(ctx context.Context, model []*gpb.ModelData) (Deployment, error)
    	Deploy(deployment Deployment) error
    	Destroy(ctx context.Context, id uuid.UUID) error
    	Get(id uuid.UUID) (Deployment, error)
    	Shutdown(ctx context.Context)
    	Repository() Repository
    }
    
    // NewOrchestrator returns an implementation of the Orchestrator interface
    // depending on the passed OrchestratorTYpe. Returns an error if an invalid
    
    func NewOrchestrator(flavour OrchestratorType) (Orchestrator, error) {
    	switch flavour {
    	case Docker:
    		c, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
    		if err != nil {
    			return nil, err
    		}
    		repo := NewRepository(config.RepositoryBasePath())
    		return &dockerOrchestrator{
    			client:           c,
    			store:            NewDeploymentStore(),
    			repo:             repo,
    			activeContainers: make(map[uuid.UUID]string),
    			stopTimeout:      config.OrchestratorShutdownTimeout(),
    		}, nil
    	default:
    		return nil, errors.New("invalid orchestrator type")
    	}
    }
    
    type dockerOrchestrator struct {
    	repo             Repository
    	client           *client.Client
    	store            DeploymentStore
    	activeContainers map[uuid.UUID]string
    	registry         string
    	stopTimeout      time.Duration
    }
    
    func (o *dockerOrchestrator) Build(ctx context.Context, model []*gpb.ModelData) (Deployment, error) {
    	d, err := Generate(ctx, model, o.repo, spb.Type_TYPE_CONTAINERISED)
    	if err != nil {
    		return d, err
    	}
    	d.Name = filepath.Join(o.registry, d.ID.String())
    
    	o.store.Set(d)
    	if err := buildImage(d, o.client); err != nil {
    		d.State = pb.State_STATE_DECOMMISSIONED
    		o.store.Set(d)
    		return d, err
    	}
    	return o.deploy(d)
    }
    
    func (o *dockerOrchestrator) Deploy(deployment Deployment) error {
    	_, err := o.deploy(deployment)
    	if err != nil {
    		return err
    	}
    	return nil
    }
    
    func (o *dockerOrchestrator) Destroy(ctx context.Context, id uuid.UUID) error {
    	if err := o.prune(ctx, id); err != nil {
    		log.Error(err)
    	}
    	return o.store.Delete(id)
    }
    
    func (o *dockerOrchestrator) Get(id uuid.UUID) (Deployment, error) {
    	return o.store.Get(id)
    }
    
    func (o *dockerOrchestrator) Shutdown(ctx context.Context) {
    	for k := range o.activeContainers {
    		if err := o.prune(ctx, k); err != nil {
    			log.Error(err)
    		}
    	}
    	o.store.Shutdown(ctx)
    }
    
    func (o *dockerOrchestrator) Repository() Repository {
    	return o.repo
    }
    
    func (o *dockerOrchestrator) deploy(d Deployment) (Deployment, error) {
    	labels := prometheus.Labels{"type": "docker"}
    	start := promStartHook(labels, orchestratorCreationsTotal)
    	defer promEndHook(labels, start, orchestratorCreateDurationSecondsTotal, orchestratorCreateDurationSeconds)
    	ctx := context.Background()
    
    	containerConfig := &container.Config{
    		Hostname: d.ID.String(),
    		Image:    d.Name,
    	}
    	resp, err := o.client.ContainerCreate(ctx, containerConfig, nil, nil, nil, "")
    	if err != nil {
    		return Deployment{}, promHandleError(labels, err, orchestratorErrorsTotal)
    	}
    	log.Infof("container %v created", resp.ID)
    
    	if err := o.client.NetworkConnect(ctx, config.DockerOrchestratorNetwork(), resp.ID, &network.EndpointSettings{}); err != nil {
    		return Deployment{}, promHandleError(labels, err, orchestratorErrorsTotal)
    	}
    	log.Infof("container %v attached to network", resp.ID)
    
    	if err := o.client.ContainerStart(ctx, resp.ID, types.ContainerStartOptions{}); err != nil {
    		return Deployment{}, promHandleError(labels, err, orchestratorErrorsTotal)
    	}
    
    	log.Infof("container %v started", resp.ID)
    	o.attachLogger(ctx, resp.ID)
    	d.State = pb.State_STATE_DEPLOYED
    	o.activeContainers[d.ID] = resp.ID
    	o.store.Set(d)
    	return d, nil
    }
    
    func (o *dockerOrchestrator) attachLogger(ctx context.Context, containerID string) {
    	if log.GetLevel() >= log.DebugLevel {
    		log.Info("attaching container logger")
    		go func() {
    			opts := types.ContainerLogsOptions{
    				ShowStdout: true,
    				ShowStderr: true,
    				Follow:     true,
    				Details:    true,
    			}
    			logStream, err := o.client.ContainerLogs(ctx, containerID, opts)
    			if err != nil {
    				log.Error(err)
    			}
    
    
    			defer func() {
    				if err := logStream.Close(); err != nil {
    					log.Error(err)
    				}
    			}()
    
    
    			stdlogger := log.StandardLogger()
    			stdlogger.Trace("detached logger")
    			stdout := stdlogger.Writer()
    			stderr := stdlogger.WriterLevel(log.ErrorLevel)
    			for {
    				_, err := stdcopy.StdCopy(stdout, stderr, logStream)
    				if err != nil {
    					log.Error(err)
    					break
    				}
    			}
    		}()
    	}
    }
    
    func (o *dockerOrchestrator) prune(ctx context.Context, id uuid.UUID) error {
    	labels := prometheus.Labels{"type": "docker"}
    	start := promStartHook(labels, orchestratorDestructionsTotal)
    	defer promEndHook(labels, start, orchestratorDestroyDurationSecondsTotal, orchestratorDestroyDurationSeconds)
    	done := make(chan time.Duration)
    	go func() {
    		start := time.Now()
    		d, err := o.store.Get(id)
    		if err != nil {
    			log.Error(promHandleError(labels, err, orchestratorErrorsTotal))
    		}
    
    		if err := o.client.ContainerStop(ctx, o.activeContainers[id], container.StopOptions{}); err != nil { // Note: container.StopOptions{} used to be '&o.stopTimeout', need to figure out real stop options now.
    
    			log.Error(promHandleError(labels, err, orchestratorErrorsTotal))
    		}
    		log.Debugf("stopped container for deployment %v", id)
    		if err := o.client.ContainerRemove(ctx, o.activeContainers[id], types.ContainerRemoveOptions{RemoveVolumes: true}); err != nil {
    			log.Error(promHandleError(labels, err, orchestratorErrorsTotal))
    		}
    		log.Debugf("removed container for deployment %v", id)
    		resp, err := o.client.ImageRemove(ctx, d.ID.String(), types.ImageRemoveOptions{PruneChildren: true})
    		if err != nil {
    			log.Error(promHandleError(labels, err, orchestratorErrorsTotal))
    		}
    		for _, r := range resp {
    			log.WithFields(log.Fields{
    				"ID": r.Deleted,
    			}).Debugf("removed image for deployment %v", id)
    		}
    		done <- time.Since(start)
    	}()
    	select {
    	case duration := <-done:
    		log.WithFields(log.Fields{
    			"id":            id,
    			"duration (ms)": duration.Milliseconds(),
    		}).Infof("deployment pruned")
    		return nil
    	case <-ctx.Done():
    		err := fmt.Errorf("pruning timed out for deployment %v", id)
    		return promHandleError(labels, err, orchestratorErrorsTotal)
    	}
    }