Skip to content
Snippets Groups Projects
Commit a36753db authored by Tomasz Maczukin's avatar Tomasz Maczukin
Browse files

Merge branch '3467-add-docker-support-for-interactive-web-terminal' into 'master'

Resolve "Add docker support for interactive web terminal"

Closes #3467

See merge request gitlab-org/gitlab-runner!1008
parents 31498a94 396287b6
Branches
Tags
No related merge requests found
Showing
with 958 additions and 18 deletions
......@@ -624,11 +624,11 @@
revision = "a7cf72d604cdf0af6031dd5d54a4e513abeff0d4"
[[projects]]
digest = "1:a0d8ffbd3d159d9d2f43aaca906c5c16d65cec791c5ab3259194f0113d10ce5e"
digest = "1:3b5bf7524bfc4a050d46210fa51e7e92f2371dcf64d86f15a9d70e2ceea26db1"
name = "gitlab.com/gitlab-org/gitlab-terminal"
packages = ["."]
pruneopts = "N"
revision = "d523b4fd2bb3c8728724dce365809e09113430a9"
revision = "5af59b871b1bcc3f4b733f6db0ff3b6e8b247b92"
[[projects]]
digest = "1:90a0e11d13444dbf388ada729d5a9b8c57355de59924cb402c2a7711c3c306e6"
......
......@@ -148,7 +148,7 @@ ignored = ["test", "appengine"]
[[constraint]]
name = "gitlab.com/gitlab-org/gitlab-terminal"
revision = "d523b4fd2bb3c8728724dce365809e09113430a9"
revision = "5af59b871b1bcc3f4b733f6db0ff3b6e8b247b92"
##
## Refrain innovations ;)
......
......@@ -95,7 +95,7 @@ Supported features by different executors:
| Absolute paths: caching, artifacts | ✗ | ✗ | ✗ | ✗ | ✗ | ✓ |
| Passing artifacts between stages | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ |
| Use GitLab Container Registry private images | n/a | n/a | n/a | n/a | ✓ | ✓ |
| Interactive Web terminal | ✗ | ✓ (bash)| ✗ | ✗ | | ✓ |
| Interactive Web terminal | ✗ | ✓ (bash)| ✗ | ✗ | | ✓ |
Supported systems by different shells:
......
......@@ -9,3 +9,5 @@ const prebuiltImageName = "gitlab/gitlab-runner-helper"
const prebuiltImageExtension = ".tar.xz"
const dockerCleanupTimeout = 5 * time.Minute
const waitForContainerTimeout = 15 * time.Second
......@@ -419,7 +419,7 @@ func (e *executor) createCacheVolume(containerName, containerPath string) (strin
}
e.Debugln("Waiting for cache container", resp.ID, "...")
err = e.waitForContainer(resp.ID)
err = e.waitForContainer(e.Context, resp.ID)
if err != nil {
e.temporary = append(e.temporary, resp.ID)
return "", err
......@@ -954,14 +954,14 @@ func (e *executor) killContainer(id string, waitCh chan error) (err error) {
}
}
func (e *executor) waitForContainer(id string) error {
func (e *executor) waitForContainer(ctx context.Context, id string) error {
e.Debugln("Waiting for container", id, "...")
retries := 0
// Use active wait
for {
container, err := e.client.ContainerInspect(e.Context, id)
for ctx.Err() == nil {
container, err := e.client.ContainerInspect(ctx, id)
if err != nil {
if docker_helpers.IsErrNotFound(err) {
return err
......@@ -992,6 +992,8 @@ func (e *executor) waitForContainer(id string) error {
return nil
}
return ctx.Err()
}
func (e *executor) watchContainer(ctx context.Context, id string, input io.Reader) (err error) {
......@@ -1037,7 +1039,7 @@ func (e *executor) watchContainer(ctx context.Context, id string, input io.Reade
waitCh := make(chan error, 1)
go func() {
waitCh <- e.waitForContainer(id)
waitCh <- e.waitForContainer(e.Context, id)
}()
select {
......@@ -1320,7 +1322,7 @@ func (e *executor) runServiceHealthCheckContainer(service *types.Container, time
waitResult := make(chan error, 1)
go func() {
waitResult <- e.waitForContainer(resp.ID)
waitResult <- e.waitForContainer(e.Context, resp.ID)
}()
// these are warnings and they don't make the build fail
......
......@@ -3,6 +3,7 @@ package docker
import (
"bytes"
"errors"
"sync"
"github.com/docker/docker/api/types"
"gitlab.com/gitlab-org/gitlab-runner/common"
......@@ -12,6 +13,14 @@ import (
type commandExecutor struct {
executor
buildContainer *types.ContainerJSON
sync.Mutex
}
func (s *commandExecutor) getBuildContainer() *types.ContainerJSON {
s.Lock()
defer s.Unlock()
return s.buildContainer
}
func (s *commandExecutor) Prepare(options common.ExecutorPrepareOptions) error {
......@@ -57,6 +66,9 @@ func (s *commandExecutor) requestNewPredefinedContainer() (*types.ContainerJSON,
}
func (s *commandExecutor) requestBuildContainer() (*types.ContainerJSON, error) {
s.Lock()
defer s.Unlock()
if s.buildContainer == nil {
var err error
......@@ -118,6 +130,8 @@ func init() {
features.Variables = true
features.Image = true
features.Services = true
features.Session = true
features.Terminal = true
}
common.RegisterExecutor("docker", executors.DefaultExecutorProvider{
......
package docker
import (
"context"
"errors"
"fmt"
"net/http"
"time"
"github.com/docker/docker/api/types"
"gitlab.com/gitlab-org/gitlab-runner/common"
"gitlab.com/gitlab-org/gitlab-runner/helpers/docker"
terminalsession "gitlab.com/gitlab-org/gitlab-runner/session/terminal"
"gitlab.com/gitlab-org/gitlab-terminal"
)
// buildContainerTerminalTimeout is the error used when the build container is
// not running yet an we have a terminal request waiting for the container to
// start and a certain amount of time is exceeded.
type buildContainerTerminalTimeout struct {
}
func (buildContainerTerminalTimeout) Error() string {
return "timeout for waiting for build container"
}
func (s *commandExecutor) watchForRunningBuildContainer(deadline time.Time) (string, error) {
for time.Since(deadline) < 0 {
buildContainer := s.getBuildContainer()
if buildContainer == nil {
time.Sleep(time.Second)
continue
}
containerID := buildContainer.ID
container, err := s.client.ContainerInspect(s.Context, containerID)
if err != nil {
return "", err
}
if container.State.Running {
return containerID, nil
}
}
return "", buildContainerTerminalTimeout{}
}
func (s *commandExecutor) Connect() (terminalsession.Conn, error) {
// Waiting for the container to start, is not ideal as it might be hiding a
// real issue and the user is not aware of it. Ideally, the runner should
// inform the user in an interactive way that the container has no started
// yet and should wait/try again. This isn't an easy task to do since we
// can't access the WebSocket here since that is the responsibility of
// `gitlab-terminal` package. There are plans to improve this please take a
// look at https://gitlab.com/gitlab-org/gitlab-ce/issues/50384#proposal and
// https://gitlab.com/gitlab-org/gitlab-terminal/issues/4
containerID, err := s.watchForRunningBuildContainer(time.Now().Add(waitForContainerTimeout))
if err != nil {
return nil, err
}
ctx, cancelFn := context.WithCancel(s.Context)
return terminalConn{
logger: &s.BuildLogger,
ctx: ctx,
cancelFn: cancelFn,
executor: s,
client: s.client,
containerID: containerID,
shell: s.BuildShell.DockerCommand,
}, nil
}
type terminalConn struct {
logger *common.BuildLogger
ctx context.Context
cancelFn func()
executor *commandExecutor
client docker_helpers.Client
containerID string
shell []string
}
func (t terminalConn) Start(w http.ResponseWriter, r *http.Request, timeoutCh, disconnectCh chan error) {
execConfig := types.ExecConfig{
Tty: true,
AttachStdin: true,
AttachStderr: true,
AttachStdout: true,
Cmd: t.shell,
}
exec, err := t.client.ContainerExecCreate(t.ctx, t.containerID, execConfig)
if err != nil {
t.logger.Errorln("Failed to create exec container for terminal:", err)
http.Error(w, "failed to create exec for build container", http.StatusInternalServerError)
return
}
resp, err := t.client.ContainerExecAttach(t.ctx, exec.ID, execConfig)
if err != nil {
t.logger.Errorln("Failed to exec attach to container for terminal:", err)
http.Error(w, "failed to attach tty to build container", http.StatusInternalServerError)
return
}
dockerTTY := newDockerTTY(&resp)
proxy := terminal.NewStreamProxy(1) // one stopper: terminal exit handler
// wait for container to exit
go func() {
t.logger.Debugln("Waiting for the terminal container:", t.containerID)
err := t.executor.waitForContainer(t.ctx, t.containerID)
t.logger.Debugln("The terminal container:", t.containerID, "finished with:", err)
stopCh := proxy.GetStopCh()
if err != nil {
stopCh <- fmt.Errorf("build container exited with %q", err)
} else {
stopCh <- errors.New("build container exited")
}
}()
terminalsession.ProxyTerminal(
timeoutCh,
disconnectCh,
proxy.StopCh,
func() {
terminal.ProxyStream(w, r, dockerTTY, proxy)
},
)
}
func (t terminalConn) Close() error {
if t.cancelFn != nil {
t.cancelFn()
}
return nil
}
package docker
import (
"bufio"
"context"
"errors"
"net"
"net/http"
"net/http/httptest"
"net/url"
"os"
"strings"
"testing"
"time"
"github.com/docker/docker/api/types"
"github.com/gorilla/websocket"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitlab-runner/common"
"gitlab.com/gitlab-org/gitlab-runner/executors"
"gitlab.com/gitlab-org/gitlab-runner/helpers"
"gitlab.com/gitlab-org/gitlab-runner/helpers/docker"
"gitlab.com/gitlab-org/gitlab-runner/session"
)
func TestInteractiveTerminal(t *testing.T) {
if helpers.SkipIntegrationTests(t, "docker", "info") {
return
}
successfulBuild, err := common.GetRemoteLongRunningBuild()
assert.NoError(t, err)
sess, err := session.NewSession(nil)
require.NoError(t, err)
build := &common.Build{
JobResponse: successfulBuild,
Runner: &common.RunnerConfig{
RunnerSettings: common.RunnerSettings{
Executor: "docker",
Docker: &common.DockerConfig{
Image: common.TestAlpineImage,
PullPolicy: common.PullPolicyIfNotPresent,
},
},
},
Session: sess,
}
// Start build
go func() {
_ = build.Run(&common.Config{}, &common.Trace{Writer: os.Stdout})
}()
srv := httptest.NewServer(build.Session.Mux())
defer srv.Close()
u := url.URL{
Scheme: "ws",
Host: srv.Listener.Addr().String(),
Path: build.Session.Endpoint + "/exec",
}
headers := http.Header{
"Authorization": []string{build.Session.Token},
}
var webSocket *websocket.Conn
var resp *http.Response
started := time.Now()
for time.Since(started) < 25*time.Second {
webSocket, resp, err = websocket.DefaultDialer.Dial(u.String(), headers)
if err == nil {
break
}
time.Sleep(50 * time.Millisecond)
}
require.NotNil(t, webSocket)
require.Equal(t, http.StatusSwitchingProtocols, resp.StatusCode)
defer webSocket.Close()
err = webSocket.WriteMessage(websocket.BinaryMessage, []byte("uname\n"))
require.NoError(t, err)
readStarted := time.Now()
var tty []byte
for time.Since(readStarted) < 5*time.Second {
typ, b, err := webSocket.ReadMessage()
require.NoError(t, err)
require.Equal(t, websocket.BinaryMessage, typ)
tty = append(tty, b...)
if strings.Contains(string(b), "Linux") {
break
}
time.Sleep(50 * time.Microsecond)
}
t.Log(string(tty))
assert.Contains(t, string(tty), "Linux")
}
func TestCommandExecutor_Connect_Timeout(t *testing.T) {
c := &docker_helpers.MockClient{}
s := commandExecutor{
executor: executor{
AbstractExecutor: executors.AbstractExecutor{
Context: context.Background(),
BuildShell: &common.ShellConfiguration{
DockerCommand: []string{"/bin/sh"},
},
},
client: c,
},
buildContainer: &types.ContainerJSON{
ContainerJSONBase: &types.ContainerJSONBase{
ID: "1234",
},
},
}
c.On("ContainerInspect", s.Context, "1234").Return(types.ContainerJSON{
ContainerJSONBase: &types.ContainerJSONBase{
State: &types.ContainerState{
Running: false,
},
},
}, nil)
}
func TestCommandExecutor_Connect(t *testing.T) {
tests := []struct {
name string
buildContainerRunning bool
hasBuildContainer bool
containerInspectErr error
expectedErr error
}{
{
name: "Connect Timeout",
buildContainerRunning: false,
hasBuildContainer: true,
expectedErr: buildContainerTerminalTimeout{},
},
{
name: "Successful connect",
buildContainerRunning: true,
hasBuildContainer: true,
containerInspectErr: nil,
},
{
name: "Container inspect failed",
buildContainerRunning: false,
hasBuildContainer: true,
containerInspectErr: errors.New("container not found"),
expectedErr: errors.New("container not found"),
},
{
name: "Not build container",
buildContainerRunning: false,
hasBuildContainer: false,
expectedErr: buildContainerTerminalTimeout{},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
c := &docker_helpers.MockClient{}
s := commandExecutor{
executor: executor{
AbstractExecutor: executors.AbstractExecutor{
Context: context.Background(),
BuildShell: &common.ShellConfiguration{
DockerCommand: []string{"/bin/sh"},
},
},
client: c,
},
}
if test.hasBuildContainer {
s.buildContainer = &types.ContainerJSON{
ContainerJSONBase: &types.ContainerJSONBase{
ID: "1234",
},
}
}
c.On("ContainerInspect", s.Context, "1234").Return(types.ContainerJSON{
ContainerJSONBase: &types.ContainerJSONBase{
State: &types.ContainerState{
Running: test.buildContainerRunning,
},
},
}, test.containerInspectErr)
conn, err := s.Connect()
if test.buildContainerRunning {
assert.NoError(t, err)
assert.NotNil(t, conn)
assert.IsType(t, terminalConn{}, conn)
return
}
assert.EqualError(t, err, test.expectedErr.Error())
assert.Nil(t, conn)
})
}
}
func TestTerminalConn_FailToStart(t *testing.T) {
tests := []struct {
name string
containerExecCreateErr error
containerExecAttachErr error
}{
{
name: "Failed to create exec container",
containerExecCreateErr: errors.New("failed to create exec container"),
containerExecAttachErr: nil,
},
{
name: "Failed to attach exec container",
containerExecCreateErr: nil,
containerExecAttachErr: errors.New("failed to attach exec container"),
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
c := &docker_helpers.MockClient{}
s := commandExecutor{
executor: executor{
AbstractExecutor: executors.AbstractExecutor{
Context: context.Background(),
BuildShell: &common.ShellConfiguration{
DockerCommand: []string{"/bin/sh"},
},
},
client: c,
},
buildContainer: &types.ContainerJSON{
ContainerJSONBase: &types.ContainerJSONBase{
ID: "1234",
},
},
}
c.On("ContainerInspect", mock.Anything, mock.Anything).Return(types.ContainerJSON{
ContainerJSONBase: &types.ContainerJSONBase{
State: &types.ContainerState{
Running: true,
},
},
}, nil)
c.On("ContainerExecCreate", mock.Anything, mock.Anything, mock.Anything).Return(
types.IDResponse{},
test.containerExecCreateErr,
)
c.On("ContainerExecAttach", mock.Anything, mock.Anything, mock.Anything).Return(
types.HijackedResponse{},
test.containerExecAttachErr,
)
conn, err := s.Connect()
require.NoError(t, err)
timeoutCh := make(chan error)
disconnectCh := make(chan error)
w := httptest.NewRecorder()
req := httptest.NewRequest(http.MethodGet, "wss://example.com/foo", nil)
conn.Start(w, req, timeoutCh, disconnectCh)
resp := w.Result()
assert.Equal(t, http.StatusInternalServerError, resp.StatusCode)
})
}
}
type nopReader struct {
}
func (w *nopReader) Read(b []byte) (int, error) {
return len(b), nil
}
type nopConn struct {
}
func (nopConn) Read(b []byte) (n int, err error) {
return len(b), nil
}
func (nopConn) Write(b []byte) (n int, err error) {
return len(b), nil
}
func (nopConn) Close() error {
return nil
}
func (nopConn) LocalAddr() net.Addr {
return &net.TCPAddr{}
}
func (nopConn) RemoteAddr() net.Addr {
return &net.TCPAddr{}
}
func (nopConn) SetDeadline(t time.Time) error {
return nil
}
func (nopConn) SetReadDeadline(t time.Time) error {
return nil
}
func (nopConn) SetWriteDeadline(t time.Time) error {
return nil
}
func TestTerminalConn_Start(t *testing.T) {
c := &docker_helpers.MockClient{}
s := commandExecutor{
executor: executor{
AbstractExecutor: executors.AbstractExecutor{
Context: context.Background(),
BuildShell: &common.ShellConfiguration{
DockerCommand: []string{"/bin/sh"},
},
},
client: c,
},
buildContainer: &types.ContainerJSON{
ContainerJSONBase: &types.ContainerJSONBase{
ID: "1234",
},
},
}
c.On("ContainerInspect", mock.Anything, "1234").Return(types.ContainerJSON{
ContainerJSONBase: &types.ContainerJSONBase{
State: &types.ContainerState{
Running: true,
},
},
}, nil).Once()
c.On("ContainerExecCreate", mock.Anything, mock.Anything, mock.Anything).Return(types.IDResponse{
ID: "4321",
}, nil).Once()
c.On("ContainerExecAttach", mock.Anything, mock.Anything, mock.Anything).Return(types.HijackedResponse{
Conn: nopConn{},
Reader: bufio.NewReader(&nopReader{}),
}, nil).Once()
c.On("ContainerInspect", mock.Anything, "1234").Return(types.ContainerJSON{
ContainerJSONBase: &types.ContainerJSONBase{
State: &types.ContainerState{
Running: false,
},
},
}, nil)
session, err := session.NewSession(nil)
require.NoError(t, err)
session.Token = "validToken"
session.SetInteractiveTerminal(&s)
srv := httptest.NewServer(session.Mux())
u := url.URL{
Scheme: "ws",
Host: srv.Listener.Addr().String(),
Path: session.Endpoint + "/exec",
}
headers := http.Header{
"Authorization": []string{"validToken"},
}
conn, resp, err := websocket.DefaultDialer.Dial(u.String(), headers)
require.NoError(t, err)
require.NotNil(t, conn)
require.Equal(t, resp.StatusCode, http.StatusSwitchingProtocols)
defer conn.Close()
go func() {
for {
err := conn.WriteMessage(websocket.BinaryMessage, []byte("data"))
if err != nil {
return
}
time.Sleep(time.Second)
}
}()
started := time.Now()
for time.Since(started) < 5*time.Second {
if !session.Connected() {
break
}
time.Sleep(50 * time.Microsecond)
}
assert.False(t, session.Connected())
}
package docker
import "github.com/docker/docker/api/types"
func newDockerTTY(hijackedResp *types.HijackedResponse) *dockerTTY {
return &dockerTTY{
hijackedResp: hijackedResp,
}
}
type dockerTTY struct {
hijackedResp *types.HijackedResponse
}
func (d *dockerTTY) Read(p []byte) (int, error) {
return d.hijackedResp.Reader.Read(p)
}
func (d *dockerTTY) Write(p []byte) (int, error) {
return d.hijackedResp.Conn.Write(p)
}
func (d *dockerTTY) Close() error {
d.hijackedResp.Close()
d.hijackedResp.CloseWrite()
return nil
}
......@@ -728,7 +728,7 @@ func TestInteractiveTerminal(t *testing.T) {
u := url.URL{Scheme: "ws", Host: srv.Listener.Addr().String(), Path: build.Session.Endpoint + "/exec"}
conn, resp, err := websocket.DefaultDialer.Dial(u.String(), http.Header{"Authorization": []string{build.Session.Token}})
assert.NoError(t, err)
assert.Equal(t, resp.StatusCode, c.expectedStatusCode)
assert.Equal(t, c.expectedStatusCode, resp.StatusCode)
defer func() {
if conn != nil {
......
......@@ -23,6 +23,8 @@ type Client interface {
ContainerAttach(ctx context.Context, container string, options types.ContainerAttachOptions) (types.HijackedResponse, error)
ContainerRemove(ctx context.Context, containerID string, options types.ContainerRemoveOptions) error
ContainerLogs(ctx context.Context, container string, options types.ContainerLogsOptions) (io.ReadCloser, error)
ContainerExecCreate(ctx context.Context, container string, config types.ExecConfig) (types.IDResponse, error)
ContainerExecAttach(ctx context.Context, execID string, config types.ExecConfig) (types.HijackedResponse, error)
NetworkDisconnect(ctx context.Context, networkID, containerID string, force bool) error
NetworkList(ctx context.Context, options types.NetworkListOptions) ([]types.NetworkResource, error)
......
......@@ -72,6 +72,48 @@ func (_m *MockClient) ContainerCreate(ctx context.Context, config *container.Con
return r0, r1
}
// ContainerExecAttach provides a mock function with given fields: ctx, execID, config
func (_m *MockClient) ContainerExecAttach(ctx context.Context, execID string, config types.ExecConfig) (types.HijackedResponse, error) {
ret := _m.Called(ctx, execID, config)
var r0 types.HijackedResponse
if rf, ok := ret.Get(0).(func(context.Context, string, types.ExecConfig) types.HijackedResponse); ok {
r0 = rf(ctx, execID, config)
} else {
r0 = ret.Get(0).(types.HijackedResponse)
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, string, types.ExecConfig) error); ok {
r1 = rf(ctx, execID, config)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// ContainerExecCreate provides a mock function with given fields: ctx, _a1, config
func (_m *MockClient) ContainerExecCreate(ctx context.Context, _a1 string, config types.ExecConfig) (types.IDResponse, error) {
ret := _m.Called(ctx, _a1, config)
var r0 types.IDResponse
if rf, ok := ret.Get(0).(func(context.Context, string, types.ExecConfig) types.IDResponse); ok {
r0 = rf(ctx, _a1, config)
} else {
r0 = ret.Get(0).(types.IDResponse)
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, string, types.ExecConfig) error); ok {
r1 = rf(ctx, _a1, config)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// ContainerInspect provides a mock function with given fields: ctx, containerID
func (_m *MockClient) ContainerInspect(ctx context.Context, containerID string) (types.ContainerJSON, error) {
ret := _m.Called(ctx, containerID)
......
......@@ -122,6 +122,18 @@ func (c *officialDockerClient) ContainerLogs(ctx context.Context, container stri
return rc, wrapError("ContainerLogs", err, started)
}
func (c *officialDockerClient) ContainerExecCreate(ctx context.Context, container string, config types.ExecConfig) (types.IDResponse, error) {
started := time.Now()
resp, err := c.client.ContainerExecCreate(ctx, container, config)
return resp, wrapError("ContainerExecCreate", err, started)
}
func (c *officialDockerClient) ContainerExecAttach(ctx context.Context, execID string, config types.ExecConfig) (types.HijackedResponse, error) {
started := time.Now()
resp, err := c.client.ContainerExecAttach(ctx, execID, config)
return resp, wrapError("ContainerExecAttach", err, started)
}
func (c *officialDockerClient) NetworkDisconnect(ctx context.Context, networkID string, containerID string, force bool) error {
started := time.Now()
err := c.client.NetworkDisconnect(ctx, networkID, containerID, force)
......
......@@ -2,6 +2,7 @@ package session
import (
"net/http"
"reflect"
"sync"
"github.com/gorilla/websocket"
......@@ -166,7 +167,8 @@ func (s *Session) closeTerminalConn(conn terminal.Conn) {
s.log.WithError(err).Warn("Failed to close terminal connection")
}
if s.terminalConn == conn {
if reflect.ValueOf(s.terminalConn) == reflect.ValueOf(conn) {
s.log.Warningln("Closed active terminal connection")
s.terminalConn = nil
}
}
......
......@@ -166,3 +166,33 @@ func TestKillFailedToClose(t *testing.T) {
// Even though an error occurred closing it still is removed.
assert.Nil(t, sess.terminalConn)
}
type fakeTerminalConn struct {
commands []string
}
func (fakeTerminalConn) Close() error {
return nil
}
func (fakeTerminalConn) Start(w http.ResponseWriter, r *http.Request, timeoutCh, disconnectCh chan error) {
}
func TestCloseTerminalConn(t *testing.T) {
conn := &fakeTerminalConn{
commands: []string{"command", "-c", "random"},
}
mockConn := new(terminal.MockConn)
mockConn.On("Close").Return(nil).Once()
sess, err := NewSession(nil)
sess.terminalConn = conn
require.NoError(t, err)
sess.closeTerminalConn(mockConn)
assert.NotNil(t, sess.terminalConn)
sess.closeTerminalConn(conn)
assert.Nil(t, sess.terminalConn)
}
The MIT License (MIT)
Copyright (c) 2018 GitLab B.V.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
package terminal
import (
"fmt"
"io"
)
type StreamProxy struct {
StopCh chan error
}
func NewStreamProxy(stoppers int) *StreamProxy {
return &StreamProxy{
StopCh: make(chan error, stoppers+2), // each proxy() call is a stopper
}
}
func (p *StreamProxy) GetStopCh() chan error {
return p.StopCh
}
func (p *StreamProxy) Serve(client io.ReadWriter, server io.ReadWriter) error {
go p.proxy(client, server)
go p.proxy(server, client)
err := <-p.StopCh
return err
}
func (p *StreamProxy) proxy(to, from io.ReadWriter) {
_, err := io.Copy(to, from)
if err != nil {
p.StopCh <- fmt.Errorf("failed to pipe stream: %v", err)
}
}
package terminal
import (
"bytes"
"errors"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
type chStringReadWriter struct {
wrieDone chan struct{}
bytes.Buffer
}
func (c *chStringReadWriter) Write(p []byte) (int, error) {
defer func() {
c.wrieDone <- struct{}{}
}()
return c.Buffer.Write(p)
}
func TestServe(t *testing.T) {
encProxy := NewStreamProxy(1)
downstream := bytes.Buffer{}
upstream := chStringReadWriter{
wrieDone: make(chan struct{}),
}
writeString := []byte("data from downstream")
downstream.Write([]byte(writeString))
go func() {
err := encProxy.Serve(&upstream, &downstream)
if err != nil {
t.Fatalf("unexpected error from serve: %v", err)
}
}()
// Wait until the write is done
<-upstream.wrieDone
b := make([]byte, 20)
_, err := upstream.Read(b)
require.NoError(t, err)
assert.Equal(t, writeString, b)
}
func TestServeError(t *testing.T) {
encProxy := NewStreamProxy(1)
downstream := errorReadWriter{}
upstream := bytes.Buffer{}
err := encProxy.Serve(&upstream, &downstream)
assert.Error(t, err)
}
type errorReadWriter struct {
}
func (rw *errorReadWriter) Read(p []byte) (int, error) {
return 0, errors.New("failed to read")
}
func (rw *errorReadWriter) Write(p []byte) (int, error) {
return 0, errors.New("failed to read")
}
package terminal
import (
"errors"
"fmt"
"io"
"net/http"
"os"
"time"
log "github.com/sirupsen/logrus"
"github.com/gorilla/websocket"
"fmt"
"errors"
log "github.com/sirupsen/logrus"
)
var (
......@@ -18,6 +19,52 @@ var (
BrowserPingInterval = 30 * time.Second
)
// ProxyStream takes the given request, upgrades the connection to a WebSocket
// connection, and also takes a dst ReadWriteCloser where a
// bi-directional stream is set up, were the STDIN of the WebSocket it sent
// dst and the STDOUT/STDERR of dst is written to the WebSocket
// connection. The messages to the WebSocket are encoded into binary text.
func ProxyStream(w http.ResponseWriter, r *http.Request, stream io.ReadWriteCloser, proxy *StreamProxy) {
clientAddr := getClientAddr(r) // We can't know the port with confidence
logger := log.WithFields(log.Fields{
"clientAddr": clientAddr,
"pkg": "terminal",
})
clientConn, err := upgradeClient(w, r)
if err != nil {
logger.WithError(err).Error("failed to upgrade client connection to websocket")
return
}
defer func() {
err := clientConn.UnderlyingConn().Close()
if err != nil {
logger.WithError(err).Error("failed to close client connection")
}
err = stream.Close()
if err != nil {
logger.WithError(err).Error("failed to close stream")
}
}()
client := NewIOWrapper(clientConn)
// Regularly send ping messages to the browser to keep the websocket from
// being timed out by intervening proxies.
go pingLoop(client)
if err := proxy.Serve(client, stream); err != nil {
logger.WithError(err).Error("failed to proxy stream")
}
}
// ProxyWebSocket takes the given request, upgrades the connection to a
// WebSocket connection. The terminal settings are used to connect to the
// dst WebSocket connection where it establishes a bi-directional stream
// between both web sockets.
func ProxyWebSocket(w http.ResponseWriter, r *http.Request, terminal *TerminalSettings, proxy *WebSocketProxy) {
server, err := connectToServer(terminal, r)
if err != nil {
......@@ -55,6 +102,10 @@ func ProxyWebSocket(w http.ResponseWriter, r *http.Request, terminal *TerminalSe
}
}
// ProxyFileDescriptor takes the given request, upgrades the connection to a
// WebSocket connection. A bi-directional stream is opened between the WebSocket
// and FileDescriptor that pipes the STDIN from the WebSocket to the
// FileDescriptor , and STDERR/STDOUT back to the WebSocket.
func ProxyFileDescriptor(w http.ResponseWriter, r *http.Request, fd *os.File, proxy *FileDescriptorProxy) {
clientConn, err := upgradeClient(w, r)
if err != nil {
......
package terminal
import (
"bytes"
"net/http"
"net/http/httptest"
"testing"
"github.com/gorilla/websocket"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
const StreamMessage = "this is a test"
func TestProxyStream(t *testing.T) {
downstream := bufferCloser{}
downstream.Write([]byte(StreamMessage))
srv := streamServer{
downstream: downstream,
}
s := httptest.NewServer(&srv)
defer s.Close()
c, _, err := websocket.DefaultDialer.Dial("ws://"+s.Listener.Addr().String()+"/ws", nil)
require.NoError(t, err)
// Check if writing to websocket works
c.WriteMessage(websocket.BinaryMessage, []byte(StreamMessage))
b := make([]byte, len(StreamMessage))
_, err = downstream.Read(b)
require.NoError(t, err)
assert.Equal(t, []byte(StreamMessage), b)
// Check if reading from websocket works
typ, b, err := c.ReadMessage()
require.NoError(t, err)
assert.Equal(t, typ, websocket.BinaryMessage)
assert.Equal(t, []byte(StreamMessage), b)
}
type streamServer struct {
downstream bufferCloser
}
func (d *streamServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
proxy := NewStreamProxy(1)
ProxyStream(w, r, &d.downstream, proxy)
}
type bufferCloser struct {
bytes.Buffer
}
func (b *bufferCloser) Close() error {
return nil
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment