Skip to content
Snippets Groups Projects
Commit 60890166 authored by Kamil Trzciński's avatar Kamil Trzciński
Browse files

Merge branch 'acquire-before-asking-for-build' into 'master'

Acquire before asking for build

This introduces a three fixes:
- Acquire runner context before build, instead of locking main queue,
- Give back the runner to main queue, making it faster for the worker threads to pick new builds,
- Execute `docker-machine ls` only once at time if more than one goroutine asks for it.


See merge request !200
parents a9361c3e 5a1ea39a
Branches
Tags
No related merge requests found
...@@ -11,7 +11,7 @@ type buildsHelper struct { ...@@ -11,7 +11,7 @@ type buildsHelper struct {
lock sync.Mutex lock sync.Mutex
} }
func (b *buildsHelper) acquire(runner *runnerAcquire) bool { func (b *buildsHelper) acquire(runner *common.RunnerConfig) bool {
b.lock.Lock() b.lock.Lock()
defer b.lock.Unlock() defer b.lock.Unlock()
...@@ -30,7 +30,7 @@ func (b *buildsHelper) acquire(runner *runnerAcquire) bool { ...@@ -30,7 +30,7 @@ func (b *buildsHelper) acquire(runner *runnerAcquire) bool {
return true return true
} }
func (b *buildsHelper) release(runner *runnerAcquire) bool { func (b *buildsHelper) release(runner *common.RunnerConfig) bool {
b.lock.Lock() b.lock.Lock()
defer b.lock.Unlock() defer b.lock.Unlock()
......
...@@ -20,16 +20,6 @@ import ( ...@@ -20,16 +20,6 @@ import (
"gitlab.com/gitlab-org/gitlab-ci-multi-runner/network" "gitlab.com/gitlab-org/gitlab-ci-multi-runner/network"
) )
type runnerAcquire struct {
common.RunnerConfig
provider common.ExecutorProvider
data common.ExecutorData
}
func (r *runnerAcquire) Release() {
r.provider.Release(&r.RunnerConfig, r.data)
}
type RunCommand struct { type RunCommand struct {
configOptions configOptions
network common.Network network common.Network
...@@ -66,26 +56,15 @@ func (mr *RunCommand) log() *log.Entry { ...@@ -66,26 +56,15 @@ func (mr *RunCommand) log() *log.Entry {
return log.WithField("builds", len(mr.buildsHelper.builds)) return log.WithField("builds", len(mr.buildsHelper.builds))
} }
func (mr *RunCommand) feedRunner(runner *common.RunnerConfig, runners chan *runnerAcquire) { func (mr *RunCommand) feedRunner(runner *common.RunnerConfig, runners chan *common.RunnerConfig) {
if !mr.isHealthy(runner.UniqueID()) { if !mr.isHealthy(runner.UniqueID()) {
return return
} }
provider := common.GetExecutor(runner.Executor) runners <- runner
if provider == nil {
return
}
data, err := provider.Acquire(runner)
if err != nil {
log.Warningln("Failed to update executor", runner.Executor, "for", runner.ShortDescription(), err)
return
}
runners <- &runnerAcquire{*runner, provider, data}
} }
func (mr *RunCommand) feedRunners(runners chan *runnerAcquire) { func (mr *RunCommand) feedRunners(runners chan *common.RunnerConfig) {
for mr.stopSignal == nil { for mr.stopSignal == nil {
mr.log().Debugln("Feeding runners to channel") mr.log().Debugln("Feeding runners to channel")
config := mr.config config := mr.config
...@@ -106,8 +85,18 @@ func (mr *RunCommand) feedRunners(runners chan *runnerAcquire) { ...@@ -106,8 +85,18 @@ func (mr *RunCommand) feedRunners(runners chan *runnerAcquire) {
} }
} }
func (mr *RunCommand) processRunner(id int, runner *runnerAcquire) (err error) { func (mr *RunCommand) processRunner(id int, runner *common.RunnerConfig, runners chan *common.RunnerConfig) (err error) {
defer runner.Release() provider := common.GetExecutor(runner.Executor)
if provider == nil {
return
}
context, err := provider.Acquire(runner)
if err != nil {
log.Warningln("Failed to update executor", runner.Executor, "for", runner.ShortDescription(), err)
return
}
defer provider.Release(runner, context)
// Acquire build slot // Acquire build slot
if !mr.buildsHelper.acquire(runner) { if !mr.buildsHelper.acquire(runner) {
...@@ -116,21 +105,21 @@ func (mr *RunCommand) processRunner(id int, runner *runnerAcquire) (err error) { ...@@ -116,21 +105,21 @@ func (mr *RunCommand) processRunner(id int, runner *runnerAcquire) (err error) {
defer mr.buildsHelper.release(runner) defer mr.buildsHelper.release(runner)
// Receive a new build // Receive a new build
buildData, healthy := mr.network.GetBuild(runner.RunnerConfig) buildData, healthy := mr.network.GetBuild(*runner)
mr.makeHealthy(runner.UniqueID(), healthy) mr.makeHealthy(runner.UniqueID(), healthy)
if buildData == nil { if buildData == nil {
return return
} }
// Make sure to always close output // Make sure to always close output
trace := mr.network.ProcessBuild(runner.RunnerConfig, buildData.ID) trace := mr.network.ProcessBuild(*runner, buildData.ID)
defer trace.Fail(err) defer trace.Fail(err)
// Create a new build // Create a new build
build := &common.Build{ build := &common.Build{
GetBuildResponse: *buildData, GetBuildResponse: *buildData,
Runner: &runner.RunnerConfig, Runner: runner,
ExecutorData: runner.data, ExecutorData: context,
BuildAbort: mr.abortBuilds, BuildAbort: mr.abortBuilds,
} }
...@@ -138,16 +127,26 @@ func (mr *RunCommand) processRunner(id int, runner *runnerAcquire) (err error) { ...@@ -138,16 +127,26 @@ func (mr *RunCommand) processRunner(id int, runner *runnerAcquire) (err error) {
mr.buildsHelper.addBuild(build) mr.buildsHelper.addBuild(build)
defer mr.buildsHelper.removeBuild(build) defer mr.buildsHelper.removeBuild(build)
// Process the same runner by different worker again
// to speed up taking the builds
select {
case runners <- runner:
mr.log().Debugln("Requeued the runner: ", runner.ShortDescription())
default:
mr.log().Debugln("Failed to requeue the runner: ", runner.ShortDescription())
}
// Process a build // Process a build
return build.Run(mr.config, trace) return build.Run(mr.config, trace)
} }
func (mr *RunCommand) processRunners(id int, stopWorker chan bool, runners chan *runnerAcquire) { func (mr *RunCommand) processRunners(id int, stopWorker chan bool, runners chan *common.RunnerConfig) {
mr.log().Debugln("Starting worker", id) mr.log().Debugln("Starting worker", id)
for mr.stopSignal == nil { for mr.stopSignal == nil {
select { select {
case runner := <-runners: case runner := <-runners:
mr.processRunner(id, runner) mr.processRunner(id, runner, runners)
// force GC cycle after processing build // force GC cycle after processing build
runtime.GC() runtime.GC()
...@@ -160,7 +159,7 @@ func (mr *RunCommand) processRunners(id int, stopWorker chan bool, runners chan ...@@ -160,7 +159,7 @@ func (mr *RunCommand) processRunners(id int, stopWorker chan bool, runners chan
<-stopWorker <-stopWorker
} }
func (mr *RunCommand) startWorkers(startWorker chan int, stopWorker chan bool, runners chan *runnerAcquire) { func (mr *RunCommand) startWorkers(startWorker chan int, stopWorker chan bool, runners chan *common.RunnerConfig) {
for mr.stopSignal == nil { for mr.stopSignal == nil {
id := <-startWorker id := <-startWorker
go mr.processRunners(id, stopWorker, runners) go mr.processRunners(id, stopWorker, runners)
...@@ -284,7 +283,7 @@ func (mr *RunCommand) runWait() { ...@@ -284,7 +283,7 @@ func (mr *RunCommand) runWait() {
} }
func (mr *RunCommand) Run() { func (mr *RunCommand) Run() {
runners := make(chan *runnerAcquire) runners := make(chan *common.RunnerConfig)
go mr.feedRunners(runners) go mr.feedRunners(runners)
signal.Notify(mr.stopSignals, syscall.SIGQUIT, syscall.SIGTERM, os.Interrupt, os.Kill) signal.Notify(mr.stopSignals, syscall.SIGQUIT, syscall.SIGTERM, os.Interrupt, os.Kill)
......
...@@ -118,7 +118,8 @@ func (b *Build) executeShellScript(scriptType ShellScriptType, executor Executor ...@@ -118,7 +118,8 @@ func (b *Build) executeShellScript(scriptType ShellScriptType, executor Executor
} }
cmd := ExecutorCommand{ cmd := ExecutorCommand{
Abort: abort, Script: script,
Abort: abort,
} }
switch scriptType { switch scriptType {
......
...@@ -13,9 +13,10 @@ import ( ...@@ -13,9 +13,10 @@ import (
) )
type machineProvider struct { type machineProvider struct {
machine docker_helpers.Machine machine docker_helpers.Machine
details machinesDetails details machinesDetails
lock sync.RWMutex lock sync.RWMutex
acquireLock sync.Mutex
// provider stores a real executor that is used to start run the builds // provider stores a real executor that is used to start run the builds
provider common.ExecutorProvider provider common.ExecutorProvider
} }
...@@ -243,12 +244,17 @@ func (m *machineProvider) Acquire(config *common.RunnerConfig) (data common.Exec ...@@ -243,12 +244,17 @@ func (m *machineProvider) Acquire(config *common.RunnerConfig) (data common.Exec
return return
} }
// Lock updating machines, because two Acquires can be run at the same time
m.acquireLock.Lock()
// Update a list of currently configured machines // Update a list of currently configured machines
machinesData := m.updateMachines(machines, config) machinesData := m.updateMachines(machines, config)
// Pre-create machines // Pre-create machines
m.createMachines(config, &machinesData) m.createMachines(config, &machinesData)
m.acquireLock.Unlock()
logrus.WithFields(machinesData.Fields()). logrus.WithFields(machinesData.Fields()).
WithField("runner", config.ShortDescription()). WithField("runner", config.ShortDescription()).
WithField("minIdleCount", config.Machine.IdleCount). WithField("minIdleCount", config.Machine.IdleCount).
......
...@@ -9,11 +9,40 @@ import ( ...@@ -9,11 +9,40 @@ import (
"os" "os"
"os/exec" "os/exec"
"strings" "strings"
"sync"
"github.com/Sirupsen/logrus" "github.com/Sirupsen/logrus"
) )
type machineCommand struct { type machineCommand struct {
lsCmd *exec.Cmd
lsLock sync.Mutex
lsCond *sync.Cond
lsData []byte
lsError error
}
func (m *machineCommand) ls() (data []byte, err error) {
m.lsLock.Lock()
defer m.lsLock.Unlock()
if m.lsCond == nil {
m.lsCond = sync.NewCond(&m.lsLock)
}
if m.lsCmd == nil {
m.lsCmd = exec.Command("docker-machine", "ls", "-q")
m.lsCmd.Env = os.Environ()
go func() {
m.lsData, m.lsError = m.lsCmd.Output()
m.lsCmd = nil
m.lsCond.Broadcast()
}()
}
m.lsCond.Wait()
return m.lsData, m.lsError
} }
func (m *machineCommand) Create(driver, name string, opts ...string) error { func (m *machineCommand) Create(driver, name string, opts ...string) error {
...@@ -51,9 +80,7 @@ func (m *machineCommand) Remove(name string) error { ...@@ -51,9 +80,7 @@ func (m *machineCommand) Remove(name string) error {
} }
func (m *machineCommand) List(nodeFilter string) (machines []string, err error) { func (m *machineCommand) List(nodeFilter string) (machines []string, err error) {
cmd := exec.Command("docker-machine", "ls", "-q") data, err := m.ls()
cmd.Env = os.Environ()
data, err := cmd.Output()
if err != nil { if err != nil {
return return
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment