Skip to content
Snippets Groups Projects
Commit d8cb3ccc authored by Konrad Zemek's avatar Konrad Zemek
Browse files

Add UDP proxying.

parent cdac58b2
No related branches found
No related tags found
No related merge requests found
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
This is a Go reimplementation of [mmproxy](https://github.com/cloudflare/mmproxy), created to improve on mmproxy's runtime stability while providing potentially greater performance in terms of connection and packet throughput. This is a Go reimplementation of [mmproxy](https://github.com/cloudflare/mmproxy), created to improve on mmproxy's runtime stability while providing potentially greater performance in terms of connection and packet throughput.
`go-mmproxy` is a standalone application that unwraps HAProxy's [PROXY protocol](http://www.haproxy.org/download/1.8/doc/proxy-protocol.txt) (also adopted by other projects such as NGINX) so that the TCP connection to the end server comes from client's - instead of proxy server's - IP address and port number. `go-mmproxy` is a standalone application that unwraps HAProxy's [PROXY protocol](http://www.haproxy.org/download/1.8/doc/proxy-protocol.txt) (also adopted by other projects such as NGINX) so that the network connection to the end server comes from client's - instead of proxy server's - IP address and port number.
Because they share basic mechanisms, [Cloudflare's blogpost on mmproxy](https://blog.cloudflare.com/mmproxy-creative-way-of-preserving-client-ips-in-spectrum/) serves as a great write-up on how `go-mmproxy` works under the hood. Because they share basic mechanisms, [Cloudflare's blogpost on mmproxy](https://blog.cloudflare.com/mmproxy-creative-way-of-preserving-client-ips-in-spectrum/) serves as a great write-up on how `go-mmproxy` works under the hood.
## Building ## Building
...@@ -38,22 +38,33 @@ ip -6 route add local ::/0 dev lo table 123 ...@@ -38,22 +38,33 @@ ip -6 route add local ::/0 dev lo table 123
If `--mark` option is given to `go-mmproxy`, all packets routed to the loopback interface will have the mark set. If `--mark` option is given to `go-mmproxy`, all packets routed to the loopback interface will have the mark set.
This can be used for setting up more advanced routing rules with iptables, for example when you need traffic from loopback to be routed outside of the machine. This can be used for setting up more advanced routing rules with iptables, for example when you need traffic from loopback to be routed outside of the machine.
#### Routing UDP packets
Because UDP is connectionless, if a socket is bound to `0.0.0.0` the kernel stack will search for an interface in order to send a reply to the spoofed source address - instead of just using the interface it received the original packet from.
The found interface will most likely _not_ be the loopback interface, which will avoid the rules specified above.
The simplest way to fix this is to bind the end server's listeners to `127.0.0.1` (or `::1`).
This is also generally recommended in order to avoid receiving non-proxied connections.
### Starting go-mmproxy ### Starting go-mmproxy
``` ```
Usage of ./go-mmproxy: Usage of ./go-mmproxy:
-4 string -4 string
Address to which IPv4 TCP traffic will be forwarded to (default "127.0.0.1:443") Address to which IPv4 traffic will be forwarded to (default "127.0.0.1:443")
-6 string -6 string
Address to which IPv6 TCP traffic will be forwarded to (default "[::1]:443") Address to which IPv6 traffic will be forwarded to (default "[::1]:443")
-allowed-subnets string -allowed-subnets string
Path to a file that contains allowed subnets of the proxy servers Path to a file that contains allowed subnets of the proxy servers
-close-after int
Number of seconds after which UDP socket will be cleaned up (default 60)
-l string -l string
Adress the proxy listens on (default "0.0.0.0:8443") Adress the proxy listens on (default "0.0.0.0:8443")
-listeners int -listeners int
Number of listener sockets that will be opened for the listen address (Linux 3.9+) (default 1) Number of listener sockets that will be opened for the listen address (Linux 3.9+) (default 1)
-mark int -mark int
The mark that will be set on outbound packets The mark that will be set on outbound packets
-p string
Protocol that will be proxied: tcp, udp (default "tcp")
-v int -v int
0 - no logging of individual connections 0 - no logging of individual connections
1 - log errors occuring in individual connections 1 - log errors occuring in individual connections
......
// Copyright 2019 Path Network, Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package main
import (
"math"
"sync"
)
var buffers sync.Pool
func init() {
buffers.New = func() interface{} { return make([]byte, math.MaxUint16) }
}
func GetBuffer() []byte {
return buffers.Get().([]byte)
}
func PutBuffer(buf []byte) {
buffers.Put(buf)
}
...@@ -6,356 +6,74 @@ package main ...@@ -6,356 +6,74 @@ package main
import ( import (
"bufio" "bufio"
"bytes"
"context"
"encoding/binary"
"flag" "flag"
"fmt"
"io"
"log" "log"
"net" "net"
"os" "os"
"strings"
"syscall" "syscall"
"time"
"go.uber.org/zap" "go.uber.org/zap"
) )
var listenAddr string type options struct {
var targetAddr4 string Protocol string
var targetAddr6 string ListenAddr string
var allowedSubnetsPath string TargetAddr4 string
var mark int TargetAddr6 string
var verbose int Mark int
var listeners int Verbose int
allowedSubnetsPath string
AllowedSubnets []*net.IPNet
Listeners int
Logger *zap.Logger
udpCloseAfter int
UDPCloseAfter time.Duration
}
var allowedSubnets []*net.IPNet var Opts options
var logger *zap.Logger
func init() { func init() {
flag.StringVar(&listenAddr, "l", "0.0.0.0:8443", "Adress the proxy listens on") flag.StringVar(&Opts.Protocol, "p", "tcp", "Protocol that will be proxied: tcp, udp")
flag.StringVar(&targetAddr4, "4", "127.0.0.1:443", "Address to which IPv4 TCP traffic will be forwarded to") flag.StringVar(&Opts.ListenAddr, "l", "0.0.0.0:8443", "Adress the proxy listens on")
flag.StringVar(&targetAddr6, "6", "[::1]:443", "Address to which IPv6 TCP traffic will be forwarded to") flag.StringVar(&Opts.TargetAddr4, "4", "127.0.0.1:443", "Address to which IPv4 traffic will be forwarded to")
flag.IntVar(&mark, "mark", 0, "The mark that will be set on outbound packets") flag.StringVar(&Opts.TargetAddr6, "6", "[::1]:443", "Address to which IPv6 traffic will be forwarded to")
flag.StringVar(&allowedSubnetsPath, "allowed-subnets", "", flag.IntVar(&Opts.Mark, "mark", 0, "The mark that will be set on outbound packets")
"Path to a file that contains allowed subnets of the proxy servers") flag.IntVar(&Opts.Verbose, "v", 0, `0 - no logging of individual connections
flag.IntVar(&verbose, "v", 0, `0 - no logging of individual connections
1 - log errors occuring in individual connections 1 - log errors occuring in individual connections
2 - log all state changes of individual connections`) 2 - log all state changes of individual connections`)
flag.IntVar(&listeners, "listeners", 1, flag.StringVar(&Opts.allowedSubnetsPath, "allowed-subnets", "",
"Path to a file that contains allowed subnets of the proxy servers")
flag.IntVar(&Opts.Listeners, "listeners", 1,
"Number of listener sockets that will be opened for the listen address (Linux 3.9+)") "Number of listener sockets that will be opened for the listen address (Linux 3.9+)")
} flag.IntVar(&Opts.udpCloseAfter, "close-after", 60, "Number of seconds after which UDP socket will be cleaned up")
func readRemoteAddrPROXYv2(conn net.Conn, ctrlBuf []byte) (net.Addr, net.Addr, []byte, error) {
if (ctrlBuf[12] >> 4) != 2 {
return nil, nil, nil, fmt.Errorf("unknown protocol version %d", ctrlBuf[12]>>4)
}
if ctrlBuf[12]&0xF > 1 {
return nil, nil, nil, fmt.Errorf("unknown command %d", ctrlBuf[12]&0xF)
}
if ctrlBuf[12]&0xF == 1 && ctrlBuf[13] != 0x11 && ctrlBuf[13] != 0x21 {
return nil, nil, nil, fmt.Errorf("invalid family/protocol %d/%d", ctrlBuf[13]>>4, ctrlBuf[13]&0xF)
}
var dataLen uint16
reader := bytes.NewReader(ctrlBuf[14:16])
if err := binary.Read(reader, binary.BigEndian, &dataLen); err != nil {
return nil, nil, nil, fmt.Errorf("failed to decode address data length: %s", err.Error())
}
if len(ctrlBuf) < 16+int(dataLen) {
return nil, nil, nil, fmt.Errorf("incomplete PROXY header")
}
if ctrlBuf[12]&0xF == 0 { // LOCAL
return conn.RemoteAddr(), conn.LocalAddr(), ctrlBuf[16+dataLen:], nil
}
var sport, dport uint16
if ctrlBuf[13] == 0x11 { // IPv4
reader = bytes.NewReader(ctrlBuf[24:])
} else {
reader = bytes.NewReader(ctrlBuf[48:])
}
if err := binary.Read(reader, binary.BigEndian, &sport); err != nil {
return nil, nil, nil, fmt.Errorf("failed to decode source TCP port: %s", err.Error())
}
if err := binary.Read(reader, binary.BigEndian, &dport); err != nil {
return nil, nil, nil, fmt.Errorf("failed to decode destination TCP port: %s", err.Error())
}
if ctrlBuf[13] == 0x11 { // TCP over IPv4
srcIP := net.IPv4(ctrlBuf[16], ctrlBuf[17], ctrlBuf[18], ctrlBuf[19])
dstIP := net.IPv4(ctrlBuf[20], ctrlBuf[21], ctrlBuf[22], ctrlBuf[23])
return &net.TCPAddr{IP: srcIP, Port: int(sport)},
&net.TCPAddr{IP: dstIP, Port: int(dport)},
ctrlBuf[16+dataLen:], nil
}
return &net.TCPAddr{IP: ctrlBuf[16:32], Port: int(sport)},
&net.TCPAddr{IP: ctrlBuf[32:48], Port: int(dport)},
ctrlBuf[16+dataLen:], nil
}
func readRemoteAddrPROXYv1(conn net.Conn, ctrlBuf []byte) (net.Addr, net.Addr, []byte, error) {
str := string(ctrlBuf)
if idx := strings.Index(str, "\r\n"); idx >= 0 {
var protocol, src, dst string
var sport, dport int
n, err := fmt.Sscanf(str, "PROXY %s", &protocol)
if err != nil {
return nil, nil, nil, err
}
if n != 1 {
return nil, nil, nil, fmt.Errorf("failed to decode elements")
}
if protocol == "UNKNOWN" {
return conn.RemoteAddr(), conn.LocalAddr(), ctrlBuf[idx+2:], nil
}
if protocol != "TCP4" && protocol != "TCP6" {
return nil, nil, nil, fmt.Errorf("unknown protocol %s", protocol)
}
n, err = fmt.Sscanf(str, "PROXY %s %s %s %d %d", &protocol, &src, &dst, &sport, &dport)
if err != nil {
return nil, nil, nil, err
}
if n != 5 {
return nil, nil, nil, fmt.Errorf("failed to decode elements")
}
srcIP := net.ParseIP(src)
if srcIP == nil {
return nil, nil, nil, fmt.Errorf("failed to parse source IP address %s", src)
}
dstIP := net.ParseIP(dst)
if dstIP == nil {
return nil, nil, nil, fmt.Errorf("failed to parse destination IP address %s", dst)
}
return &net.TCPAddr{IP: srcIP, Port: sport},
&net.TCPAddr{IP: dstIP, Port: dport},
ctrlBuf[idx+2:], nil
}
return nil, nil, nil, fmt.Errorf("did not find \\r\\n in first data segment")
}
func readRemoteAddr(conn net.Conn) (net.Addr, net.Addr, []byte, error) {
buf := make([]byte, 108)
n, err := conn.Read(buf)
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to read header: %s", err.Error())
}
if n >= 16 && bytes.Equal(buf[:12],
[]byte{0x0D, 0x0A, 0x0D, 0x0A, 0x00, 0x0D, 0x0A, 0x51, 0x55, 0x49, 0x54, 0x0A}) {
saddr, daddr, rest, err := readRemoteAddrPROXYv2(conn, buf[:n])
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to parse PROXY v2 header: %s", err.Error())
}
return saddr, daddr, rest, err
}
if n >= 8 && bytes.Equal(buf[:5], []byte("PROXY")) {
saddr, daddr, rest, err := readRemoteAddrPROXYv1(conn, buf[:n])
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to parse PROXY v1 header: %s", err.Error())
}
return saddr, daddr, rest, err
}
return nil, nil, nil, fmt.Errorf("PROXY header missing")
}
func dialUpstreamControl(sport int, connLog *zap.Logger) func(string, string, syscall.RawConn) error {
return func(network, address string, c syscall.RawConn) error {
var syscallErr error
err := c.Control(func(fd uintptr) {
syscallErr = syscall.SetsockoptInt(int(fd), syscall.IPPROTO_TCP, syscall.TCP_SYNCNT, 2)
if syscallErr != nil {
syscallErr = fmt.Errorf("setsockopt(IPPROTO_TCP, TCP_SYNCTNT, 2): %s", syscallErr.Error())
return
}
syscallErr = syscall.SetsockoptInt(int(fd), syscall.IPPROTO_IP, syscall.IP_TRANSPARENT, 1)
if syscallErr != nil {
syscallErr = fmt.Errorf("setsockopt(IPPROTO_IP, IP_TRANSPARENT, 1): %s", syscallErr.Error())
return
}
syscallErr = syscall.SetsockoptInt(int(fd), syscall.SOL_SOCKET, syscall.SO_REUSEADDR, 1)
if syscallErr != nil {
syscallErr = fmt.Errorf("setsockopt(SOL_SOCKET, SO_REUSEADDR, 1): %s", syscallErr.Error())
return
}
if sport == 0 {
ipBindAddressNoPort := 24
err := syscall.SetsockoptInt(int(fd), syscall.IPPROTO_IP, ipBindAddressNoPort, 1)
if err != nil && verbose > 1 {
connLog.Debug("Failed to set IP_BIND_ADDRESS_NO_PORT", zap.Error(err))
}
}
if mark != 0 {
syscallErr = syscall.SetsockoptInt(int(fd), syscall.SOL_SOCKET, syscall.SO_MARK, mark)
if syscallErr != nil {
syscallErr = fmt.Errorf("setsockopt(SOL_SOCK, SO_MARK, %d): %s", mark, syscallErr.Error())
return
}
}
if network == "tcp6" {
syscallErr = syscall.SetsockoptInt(int(fd), syscall.IPPROTO_IP, syscall.IPV6_V6ONLY, 0)
if syscallErr != nil {
syscallErr = fmt.Errorf("setsockopt(IPPROTO_IP, IPV6_ONLY, 0): %s", syscallErr.Error())
return
}
}
})
if err != nil {
return err
}
return syscallErr
}
}
func copyData(dst net.Conn, src net.Conn, ch chan<- error) {
_, err := io.Copy(dst, src)
ch <- err
}
func checkOriginAllowed(conn net.Conn) bool {
if len(allowedSubnets) == 0 {
return true
}
addr := conn.RemoteAddr().(*net.TCPAddr)
for _, ipNet := range allowedSubnets {
if ipNet.Contains(addr.IP) {
return true
}
}
return false
}
func handleConnection(conn net.Conn, listenLog *zap.Logger) {
defer conn.Close()
connLog := listenLog.With(zap.String("remoteAddr", conn.RemoteAddr().String()),
zap.String("localAddr", conn.LocalAddr().String()))
if !checkOriginAllowed(conn) {
connLog.Debug("connection origin not in allowed subnets", zap.Bool("dropConnection", true))
return
}
if verbose > 1 {
connLog.Debug("new connection")
}
saddr, _, restBytes, err := readRemoteAddr(conn)
if err != nil {
connLog.Debug("failed to parse PROXY header", zap.Error(err), zap.Bool("dropConnection", true))
return
}
targetAddr := targetAddr6
if strings.ContainsRune(saddr.String(), '.') { // poor man's ipv6 check - golang makes it unnecessarily hard
targetAddr = targetAddr4
}
connLog = connLog.With(zap.String("clientAddr", saddr.String()), zap.String("targetAddr", targetAddr))
if verbose > 1 {
connLog.Debug("successfuly parsed PROXY header")
}
dialer := net.Dialer{LocalAddr: saddr, Control: dialUpstreamControl(saddr.(*net.TCPAddr).Port, connLog)}
upstreamConn, err := dialer.Dial("tcp", targetAddr)
if err != nil {
connLog.Debug("failed to establish upstream connection", zap.Error(err), zap.Bool("dropConnection", true))
return
}
defer upstreamConn.Close()
if verbose > 1 {
connLog.Debug("successfuly established upstream connection")
}
if err := conn.(*net.TCPConn).SetNoDelay(true); err != nil {
connLog.Debug("failed to set nodelay on downstream connection", zap.Error(err), zap.Bool("dropConnection", true))
} else if verbose > 1 {
connLog.Debug("successfuly set NoDelay on downstream connection")
}
if err := upstreamConn.(*net.TCPConn).SetNoDelay(true); err != nil {
connLog.Debug("failed to set nodelay on upstream connection", zap.Error(err), zap.Bool("dropConnection", true))
} else if verbose > 1 {
connLog.Debug("successfuly set NoDelay on upstream connection")
}
for len(restBytes) > 0 {
n, err := upstreamConn.Write(restBytes)
if err != nil {
connLog.Debug("failed to write data to upstream connection",
zap.Error(err), zap.Bool("dropConnection", true))
return
}
restBytes = restBytes[n:]
}
outErr := make(chan error, 2)
go copyData(upstreamConn, conn, outErr)
go copyData(conn, upstreamConn, outErr)
err = <-outErr
if err != nil {
connLog.Debug("connection broken", zap.Error(err), zap.Bool("dropConnection", true))
} else if verbose > 1 {
connLog.Debug("connection closing")
}
} }
func listen(listenerNum int, errors chan<- error) { func listen(listenerNum int, errors chan<- error) {
listenLog := logger.With(zap.Int("listenerNum", listenerNum)) logger := Opts.Logger.With(zap.Int("listenerNum", listenerNum),
zap.String("protocol", Opts.Protocol), zap.String("listenAdr", Opts.ListenAddr))
listenConfig := net.ListenConfig{} listenConfig := net.ListenConfig{}
if listeners > 1 { if Opts.Listeners > 1 {
listenConfig.Control = func(network, address string, c syscall.RawConn) error { listenConfig.Control = func(network, address string, c syscall.RawConn) error {
return c.Control(func(fd uintptr) { return c.Control(func(fd uintptr) {
soReusePort := 15 soReusePort := 15
if err := syscall.SetsockoptInt(int(fd), syscall.SOL_SOCKET, soReusePort, 1); err != nil { if err := syscall.SetsockoptInt(int(fd), syscall.SOL_SOCKET, soReusePort, 1); err != nil {
listenLog.Warn("failed to set SO_REUSEPORT - only one listener setup will succeed") logger.Warn("failed to set SO_REUSEPORT - only one listener setup will succeed")
} }
}) })
} }
} }
ctx := context.Background() if Opts.Protocol == "tcp" {
ln, err := listenConfig.Listen(ctx, "tcp", listenAddr) TCPListen(&listenConfig, logger, errors)
if err != nil { } else {
listenLog.Error("failed to bind listener", zap.String("listenAddr", listenAddr), zap.Error(err)) UDPListen(&listenConfig, logger, errors)
errors <- err
return
}
listenLog.Info("listening", zap.String("listenAddr", listenAddr))
for {
conn, err := ln.Accept()
if err != nil {
listenLog.Error("failed to accept new connection", zap.Error(err))
errors <- err
return
}
go handleConnection(conn, listenLog)
} }
} }
func loadAllowedSubnets() error { func loadAllowedSubnets() error {
file, err := os.Open(allowedSubnetsPath) file, err := os.Open(Opts.allowedSubnetsPath)
if err != nil { if err != nil {
return err return err
} }
...@@ -368,8 +86,8 @@ func loadAllowedSubnets() error { ...@@ -368,8 +86,8 @@ func loadAllowedSubnets() error {
if err != nil { if err != nil {
return err return err
} }
allowedSubnets = append(allowedSubnets, ipNet) Opts.AllowedSubnets = append(Opts.AllowedSubnets, ipNet)
logger.Info("allowed subnet", zap.String("subnet", ipNet.String())) Opts.Logger.Info("allowed subnet", zap.String("subnet", ipNet.String()))
} }
return nil return nil
...@@ -377,13 +95,13 @@ func loadAllowedSubnets() error { ...@@ -377,13 +95,13 @@ func loadAllowedSubnets() error {
func initLogger() error { func initLogger() error {
logConfig := zap.NewProductionConfig() logConfig := zap.NewProductionConfig()
if verbose > 0 { if Opts.Verbose > 0 {
logConfig.Level.SetLevel(zap.DebugLevel) logConfig.Level.SetLevel(zap.DebugLevel)
} }
l, err := logConfig.Build() l, err := logConfig.Build()
if err == nil { if err == nil {
logger = l Opts.Logger = l
} }
return err return err
} }
...@@ -393,23 +111,41 @@ func main() { ...@@ -393,23 +111,41 @@ func main() {
if err := initLogger(); err != nil { if err := initLogger(); err != nil {
log.Fatalf("Failed to initialize logging: %s", err.Error()) log.Fatalf("Failed to initialize logging: %s", err.Error())
} }
defer logger.Sync() defer Opts.Logger.Sync()
if listeners <= 0 {
logger.Fatal("--listeners has to be >= 1")
}
if allowedSubnetsPath != "" { if Opts.allowedSubnetsPath != "" {
if err := loadAllowedSubnets(); err != nil { if err := loadAllowedSubnets(); err != nil {
logger.Fatal("failed to load allowed subnets file", zap.String("path", allowedSubnetsPath), zap.Error(err)) Opts.Logger.Fatal("failed to load allowed subnets file",
zap.String("path", Opts.allowedSubnetsPath), zap.Error(err))
} }
} }
listenErrors := make(chan error, listeners) if Opts.Protocol != "tcp" && Opts.Protocol != "udp" {
for i := 0; i < listeners; i++ { Opts.Logger.Fatal("--protocol has to be one of udp, tcp", zap.String("protocol", Opts.Protocol))
}
if Opts.Mark < 0 {
Opts.Logger.Fatal("--mark has to be >= 0", zap.Int("mark", Opts.Mark))
}
if Opts.Verbose < 0 {
Opts.Logger.Fatal("-v has to be >= 0", zap.Int("verbose", Opts.Verbose))
}
if Opts.Listeners < 1 {
Opts.Logger.Fatal("--listeners has to be >= 1")
}
if Opts.udpCloseAfter < 0 {
Opts.Logger.Fatal("--close-after has to be >= 0", zap.Int("close-after", Opts.udpCloseAfter))
}
Opts.UDPCloseAfter = time.Duration(Opts.udpCloseAfter) * time.Second
listenErrors := make(chan error, Opts.Listeners)
for i := 0; i < Opts.Listeners; i++ {
go listen(i, listenErrors) go listen(i, listenErrors)
} }
for i := 0; i < listeners; i++ { for i := 0; i < Opts.Listeners; i++ {
<-listenErrors <-listenErrors
} }
} }
// Copyright 2019 Path Network, Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package main
import (
"bytes"
"encoding/binary"
"fmt"
"net"
"strings"
)
func readRemoteAddrPROXYv2(ctrlBuf []byte, protocol Protocol) (net.Addr, net.Addr, []byte, error) {
if (ctrlBuf[12] >> 4) != 2 {
return nil, nil, nil, fmt.Errorf("unknown protocol version %d", ctrlBuf[12]>>4)
}
if ctrlBuf[12]&0xF > 1 {
return nil, nil, nil, fmt.Errorf("unknown command %d", ctrlBuf[12]&0xF)
}
if ctrlBuf[12]&0xF == 1 && ((protocol == TCP && ctrlBuf[13] != 0x11 && ctrlBuf[13] != 0x21) ||
(protocol == UDP && ctrlBuf[13] != 0x12 && ctrlBuf[13] != 0x22)) {
return nil, nil, nil, fmt.Errorf("invalid family/protocol %d/%d", ctrlBuf[13]>>4, ctrlBuf[13]&0xF)
}
var dataLen uint16
reader := bytes.NewReader(ctrlBuf[14:16])
if err := binary.Read(reader, binary.BigEndian, &dataLen); err != nil {
return nil, nil, nil, fmt.Errorf("failed to decode address data length: %s", err.Error())
}
if len(ctrlBuf) < 16+int(dataLen) {
return nil, nil, nil, fmt.Errorf("incomplete PROXY header")
}
if ctrlBuf[12]&0xF == 0 { // LOCAL
return nil, nil, ctrlBuf[16+dataLen:], nil
}
var sport, dport uint16
if ctrlBuf[13]>>4 == 0x1 { // IPv4
reader = bytes.NewReader(ctrlBuf[24:])
} else {
reader = bytes.NewReader(ctrlBuf[48:])
}
if err := binary.Read(reader, binary.BigEndian, &sport); err != nil {
return nil, nil, nil, fmt.Errorf("failed to decode source port: %s", err.Error())
}
if err := binary.Read(reader, binary.BigEndian, &dport); err != nil {
return nil, nil, nil, fmt.Errorf("failed to decode destination port: %s", err.Error())
}
var srcIP, dstIP net.IP
if ctrlBuf[13]>>4 == 0x1 { // IPv4
srcIP = net.IPv4(ctrlBuf[16], ctrlBuf[17], ctrlBuf[18], ctrlBuf[19])
dstIP = net.IPv4(ctrlBuf[20], ctrlBuf[21], ctrlBuf[22], ctrlBuf[23])
} else {
srcIP = ctrlBuf[16:32]
dstIP = ctrlBuf[32:48]
}
if ctrlBuf[13]&0xF == 0x1 { // TCP
return &net.TCPAddr{IP: srcIP, Port: int(sport)},
&net.TCPAddr{IP: dstIP, Port: int(dport)},
ctrlBuf[16+dataLen:], nil
}
return &net.UDPAddr{IP: srcIP, Port: int(sport)},
&net.UDPAddr{IP: dstIP, Port: int(dport)},
ctrlBuf[16+dataLen:], nil
}
func readRemoteAddrPROXYv1(ctrlBuf []byte) (net.Addr, net.Addr, []byte, error) {
str := string(ctrlBuf)
if idx := strings.Index(str, "\r\n"); idx >= 0 {
var headerProtocol, src, dst string
var sport, dport int
n, err := fmt.Sscanf(str, "PROXY %s", &headerProtocol)
if err != nil {
return nil, nil, nil, err
}
if n != 1 {
return nil, nil, nil, fmt.Errorf("failed to decode elements")
}
if headerProtocol == "UNKNOWN" {
return nil, nil, ctrlBuf[idx+2:], nil
}
if headerProtocol != "TCP4" && headerProtocol != "TCP6" {
return nil, nil, nil, fmt.Errorf("unknown protocol %s", headerProtocol)
}
n, err = fmt.Sscanf(str, "PROXY %s %s %s %d %d", &headerProtocol, &src, &dst, &sport, &dport)
if err != nil {
return nil, nil, nil, err
}
if n != 5 {
return nil, nil, nil, fmt.Errorf("failed to decode elements")
}
srcIP := net.ParseIP(src)
if srcIP == nil {
return nil, nil, nil, fmt.Errorf("failed to parse source IP address %s", src)
}
dstIP := net.ParseIP(dst)
if dstIP == nil {
return nil, nil, nil, fmt.Errorf("failed to parse destination IP address %s", dst)
}
return &net.TCPAddr{IP: srcIP, Port: sport},
&net.TCPAddr{IP: dstIP, Port: dport},
ctrlBuf[idx+2:], nil
}
return nil, nil, nil, fmt.Errorf("did not find \\r\\n in first data segment")
}
func PROXYReadRemoteAddr(buf []byte, protocol Protocol) (net.Addr, net.Addr, []byte, error) {
if len(buf) >= 16 && bytes.Equal(buf[:12],
[]byte{0x0D, 0x0A, 0x0D, 0x0A, 0x00, 0x0D, 0x0A, 0x51, 0x55, 0x49, 0x54, 0x0A}) {
saddr, daddr, rest, err := readRemoteAddrPROXYv2(buf, protocol)
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to parse PROXY v2 header: %s", err.Error())
}
return saddr, daddr, rest, err
}
// PROXYv1 only works with TCP
if protocol == TCP && len(buf) >= 8 && bytes.Equal(buf[:5], []byte("PROXY")) {
saddr, daddr, rest, err := readRemoteAddrPROXYv1(buf)
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to parse PROXY v1 header: %s", err.Error())
}
return saddr, daddr, rest, err
}
return nil, nil, nil, fmt.Errorf("PROXY header missing")
}
tcp.go 0 → 100644
// Copyright 2019 Path Network, Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package main
import (
"context"
"io"
"net"
"go.uber.org/zap"
)
func tcpCopyData(dst net.Conn, src net.Conn, ch chan<- error) {
_, err := io.Copy(dst, src)
ch <- err
}
func tcpHandleConnection(conn net.Conn, logger *zap.Logger) {
defer conn.Close()
logger = logger.With(zap.String("remoteAddr", conn.RemoteAddr().String()),
zap.String("localAddr", conn.LocalAddr().String()))
if !CheckOriginAllowed(conn.RemoteAddr().(*net.TCPAddr).IP) {
logger.Debug("connection origin not in allowed subnets", zap.Bool("dropConnection", true))
return
}
if Opts.Verbose > 1 {
logger.Debug("new connection")
}
buffer := GetBuffer()
defer func() {
if buffer != nil {
PutBuffer(buffer)
}
}()
n, err := conn.Read(buffer)
if err != nil {
logger.Debug("failed to read PROXY header", zap.Error(err), zap.Bool("dropConnection", true))
return
}
saddr, _, restBytes, err := PROXYReadRemoteAddr(buffer[:n], TCP)
if err != nil {
logger.Debug("failed to parse PROXY header", zap.Error(err), zap.Bool("dropConnection", true))
return
}
targetAddr := Opts.TargetAddr6
if AddrVersion(conn.RemoteAddr()) == 4 {
targetAddr = Opts.TargetAddr4
}
logger = logger.With(zap.String("clientAddr", saddr.String()), zap.String("targetAddr", targetAddr))
if Opts.Verbose > 1 {
logger.Debug("successfuly parsed PROXY header")
}
dialer := net.Dialer{LocalAddr: saddr}
if saddr != nil {
dialer.Control = DialUpstreamControl(saddr.(*net.TCPAddr).Port)
}
upstreamConn, err := dialer.Dial("tcp", targetAddr)
if err != nil {
logger.Debug("failed to establish upstream connection", zap.Error(err), zap.Bool("dropConnection", true))
return
}
defer upstreamConn.Close()
if Opts.Verbose > 1 {
logger.Debug("successfuly established upstream connection")
}
if err := conn.(*net.TCPConn).SetNoDelay(true); err != nil {
logger.Debug("failed to set nodelay on downstream connection", zap.Error(err), zap.Bool("dropConnection", true))
} else if Opts.Verbose > 1 {
logger.Debug("successfuly set NoDelay on downstream connection")
}
if err := upstreamConn.(*net.TCPConn).SetNoDelay(true); err != nil {
logger.Debug("failed to set nodelay on upstream connection", zap.Error(err), zap.Bool("dropConnection", true))
} else if Opts.Verbose > 1 {
logger.Debug("successfuly set NoDelay on upstream connection")
}
for len(restBytes) > 0 {
n, err := upstreamConn.Write(restBytes)
if err != nil {
logger.Debug("failed to write data to upstream connection",
zap.Error(err), zap.Bool("dropConnection", true))
return
}
restBytes = restBytes[n:]
}
PutBuffer(buffer)
buffer = nil
outErr := make(chan error, 2)
go tcpCopyData(upstreamConn, conn, outErr)
go tcpCopyData(conn, upstreamConn, outErr)
err = <-outErr
if err != nil {
logger.Debug("connection broken", zap.Error(err), zap.Bool("dropConnection", true))
} else if Opts.Verbose > 1 {
logger.Debug("connection closing")
}
}
func TCPListen(listenConfig *net.ListenConfig, logger *zap.Logger, errors chan<- error) {
ctx := context.Background()
ln, err := listenConfig.Listen(ctx, "tcp", Opts.ListenAddr)
if err != nil {
logger.Error("failed to bind listener", zap.Error(err))
errors <- err
return
}
logger.Info("listening")
for {
conn, err := ln.Accept()
if err != nil {
logger.Error("failed to accept new connection", zap.Error(err))
errors <- err
return
}
go tcpHandleConnection(conn, logger)
}
}
udp.go 0 → 100644
// Copyright 2019 Path Network, Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package main
import (
"context"
"net"
"sync/atomic"
"syscall"
"time"
"go.uber.org/zap"
)
type udpConnection struct {
lastActivity *int64
clientAddr *net.UDPAddr
downstreamAddr *net.UDPAddr
upstream *net.UDPConn
logger *zap.Logger
}
func udpCloseAfterInactivity(conn *udpConnection, socketClosures chan<- string) {
for {
lastActivity := atomic.LoadInt64(conn.lastActivity)
<-time.After(Opts.UDPCloseAfter)
if atomic.LoadInt64(conn.lastActivity) == lastActivity {
break
}
}
conn.upstream.Close()
socketClosures <- conn.clientAddr.String()
}
func udpCopyFromUpstream(downstream net.PacketConn, conn *udpConnection) {
rawConn, err := conn.upstream.SyscallConn()
if err != nil {
conn.logger.Error("failed to retrieve raw connection from upstream socket", zap.Error(err))
return
}
var syscallErr error
err = rawConn.Read(func(fd uintptr) bool {
buf := GetBuffer()
defer PutBuffer(buf)
for {
n, _, serr := syscall.Recvfrom(int(fd), buf, syscall.MSG_DONTWAIT)
if serr == syscall.EWOULDBLOCK {
return false
}
if serr != nil {
syscallErr = serr
return true
}
if n == 0 {
return true
}
atomic.AddInt64(conn.lastActivity, 1)
if _, serr := downstream.WriteTo(buf[:n], conn.downstreamAddr); serr != nil {
syscallErr = serr
return true
}
}
})
if err == nil {
err = syscallErr
}
if err != nil {
conn.logger.Debug("failed to read from upstream", zap.Error(err))
}
}
func udpGetSocketFromMap(downstream net.PacketConn, downstreamAddr, saddr net.Addr, logger *zap.Logger,
connMap map[string]*udpConnection, socketClosures chan<- string) (*udpConnection, error) {
connKey := ""
if saddr != nil {
connKey = saddr.String()
}
if conn := connMap[saddr.String()]; conn != nil {
atomic.AddInt64(conn.lastActivity, 1)
return conn, nil
}
targetAddr := Opts.TargetAddr6
if AddrVersion(downstreamAddr) == 4 {
targetAddr = Opts.TargetAddr4
}
logger = logger.With(zap.String("downstreamAddr", downstreamAddr.String()), zap.String("targetAddr", targetAddr))
dialer := net.Dialer{LocalAddr: saddr}
if saddr != nil {
logger = logger.With(zap.String("clientAddr", saddr.String()))
dialer.Control = DialUpstreamControl(saddr.(*net.UDPAddr).Port)
}
if Opts.Verbose > 1 {
logger.Debug("new connection")
}
conn, err := dialer.Dial("udp", targetAddr)
if err != nil {
logger.Debug("failed to connect to upstream", zap.Error(err))
return nil, err
}
udpConn := &udpConnection{upstream: conn.(*net.UDPConn),
logger: logger,
lastActivity: new(int64),
clientAddr: saddr.(*net.UDPAddr),
downstreamAddr: downstreamAddr.(*net.UDPAddr)}
go udpCopyFromUpstream(downstream, udpConn)
go udpCloseAfterInactivity(udpConn, socketClosures)
connMap[connKey] = udpConn
return udpConn, nil
}
func UDPListen(listenConfig *net.ListenConfig, logger *zap.Logger, errors chan<- error) {
ctx := context.Background()
ln, err := listenConfig.ListenPacket(ctx, "udp", Opts.ListenAddr)
if err != nil {
logger.Error("failed to bind listener", zap.Error(err))
errors <- err
return
}
logger.Info("listening")
socketClosures := make(chan string, 1024)
connectionMap := make(map[string]*udpConnection)
buffer := GetBuffer()
defer PutBuffer(buffer)
for {
n, remoteAddr, err := ln.ReadFrom(buffer)
if err != nil {
logger.Error("failed to read from socket", zap.Error(err))
continue
}
if !CheckOriginAllowed(remoteAddr.(*net.UDPAddr).IP) {
logger.Debug("packet origin not in allowed subnets", zap.String("remoteAddr", remoteAddr.String()))
continue
}
saddr, _, restBytes, err := PROXYReadRemoteAddr(buffer[:n], UDP)
if err != nil {
logger.Debug("failed to parse PROXY header", zap.Error(err), zap.String("remoteAddr", remoteAddr.String()))
continue
}
for {
doneClosing := false
select {
case mapKey := <-socketClosures:
delete(connectionMap, mapKey)
default:
doneClosing = true
}
if doneClosing {
break
}
}
conn, err := udpGetSocketFromMap(ln, remoteAddr, saddr, logger, connectionMap, socketClosures)
if err != nil {
continue
}
_, err = conn.upstream.Write(restBytes)
if err != nil {
conn.logger.Error("failed to write to upstream socket", zap.Error(err))
}
}
}
utils.go 0 → 100644
// Copyright 2019 Path Network, Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package main
import (
"fmt"
"net"
"strings"
"syscall"
)
type Protocol int
const (
TCP Protocol = iota
UDP
)
func CheckOriginAllowed(remoteIP net.IP) bool {
if len(Opts.AllowedSubnets) == 0 {
return true
}
for _, ipNet := range Opts.AllowedSubnets {
if ipNet.Contains(remoteIP) {
return true
}
}
return false
}
func DialUpstreamControl(sport int) func(string, string, syscall.RawConn) error {
return func(network, address string, c syscall.RawConn) error {
var syscallErr error
err := c.Control(func(fd uintptr) {
if Opts.Protocol == "tcp" {
syscallErr = syscall.SetsockoptInt(int(fd), syscall.IPPROTO_TCP, syscall.TCP_SYNCNT, 2)
if syscallErr != nil {
syscallErr = fmt.Errorf("setsockopt(IPPROTO_TCP, TCP_SYNCTNT, 2): %s", syscallErr.Error())
return
}
}
syscallErr = syscall.SetsockoptInt(int(fd), syscall.IPPROTO_IP, syscall.IP_TRANSPARENT, 1)
if syscallErr != nil {
syscallErr = fmt.Errorf("setsockopt(IPPROTO_IP, IP_TRANSPARENT, 1): %s", syscallErr.Error())
return
}
syscallErr = syscall.SetsockoptInt(int(fd), syscall.SOL_SOCKET, syscall.SO_REUSEADDR, 1)
if syscallErr != nil {
syscallErr = fmt.Errorf("setsockopt(SOL_SOCKET, SO_REUSEADDR, 1): %s", syscallErr.Error())
return
}
if sport == 0 {
ipBindAddressNoPort := 24
syscall.SetsockoptInt(int(fd), syscall.IPPROTO_IP, ipBindAddressNoPort, 1)
}
if Opts.Mark != 0 {
syscallErr = syscall.SetsockoptInt(int(fd), syscall.SOL_SOCKET, syscall.SO_MARK, Opts.Mark)
if syscallErr != nil {
syscallErr = fmt.Errorf("setsockopt(SOL_SOCK, SO_MARK, %d): %s", Opts.Mark, syscallErr.Error())
return
}
}
if network == "tcp6" || network == "udp6" {
syscallErr = syscall.SetsockoptInt(int(fd), syscall.IPPROTO_IP, syscall.IPV6_V6ONLY, 0)
if syscallErr != nil {
syscallErr = fmt.Errorf("setsockopt(IPPROTO_IP, IPV6_ONLY, 0): %s", syscallErr.Error())
return
}
}
})
if err != nil {
return err
}
return syscallErr
}
}
func AddrVersion(addr net.Addr) int {
// poor man's ipv6 check - golang makes it unnecessarily hard
if strings.ContainsRune(addr.String(), '.') {
return 4
}
return 6
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment