diff --git a/README.md b/README.md index 0348bfaf2b240308fae10060fbeb19c8f6cb79f9..e14eee97728d4133d3b4bc7cacddad631e21adce 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ This is a Go reimplementation of [mmproxy](https://github.com/cloudflare/mmproxy), created to improve on mmproxy's runtime stability while providing potentially greater performance in terms of connection and packet throughput. -`go-mmproxy` is a standalone application that unwraps HAProxy's [PROXY protocol](http://www.haproxy.org/download/1.8/doc/proxy-protocol.txt) (also adopted by other projects such as NGINX) so that the TCP connection to the end server comes from client's - instead of proxy server's - IP address and port number. +`go-mmproxy` is a standalone application that unwraps HAProxy's [PROXY protocol](http://www.haproxy.org/download/1.8/doc/proxy-protocol.txt) (also adopted by other projects such as NGINX) so that the network connection to the end server comes from client's - instead of proxy server's - IP address and port number. Because they share basic mechanisms, [Cloudflare's blogpost on mmproxy](https://blog.cloudflare.com/mmproxy-creative-way-of-preserving-client-ips-in-spectrum/) serves as a great write-up on how `go-mmproxy` works under the hood. ## Building @@ -38,22 +38,33 @@ ip -6 route add local ::/0 dev lo table 123 If `--mark` option is given to `go-mmproxy`, all packets routed to the loopback interface will have the mark set. This can be used for setting up more advanced routing rules with iptables, for example when you need traffic from loopback to be routed outside of the machine. +#### Routing UDP packets + +Because UDP is connectionless, if a socket is bound to `0.0.0.0` the kernel stack will search for an interface in order to send a reply to the spoofed source address - instead of just using the interface it received the original packet from. +The found interface will most likely _not_ be the loopback interface, which will avoid the rules specified above. +The simplest way to fix this is to bind the end server's listeners to `127.0.0.1` (or `::1`). +This is also generally recommended in order to avoid receiving non-proxied connections. + ### Starting go-mmproxy ``` Usage of ./go-mmproxy: -4 string - Address to which IPv4 TCP traffic will be forwarded to (default "127.0.0.1:443") + Address to which IPv4 traffic will be forwarded to (default "127.0.0.1:443") -6 string - Address to which IPv6 TCP traffic will be forwarded to (default "[::1]:443") + Address to which IPv6 traffic will be forwarded to (default "[::1]:443") -allowed-subnets string Path to a file that contains allowed subnets of the proxy servers + -close-after int + Number of seconds after which UDP socket will be cleaned up (default 60) -l string Adress the proxy listens on (default "0.0.0.0:8443") -listeners int Number of listener sockets that will be opened for the listen address (Linux 3.9+) (default 1) -mark int The mark that will be set on outbound packets + -p string + Protocol that will be proxied: tcp, udp (default "tcp") -v int 0 - no logging of individual connections 1 - log errors occuring in individual connections diff --git a/buffers.go b/buffers.go new file mode 100644 index 0000000000000000000000000000000000000000..1d2ec457b33e075c26d86814a8b930f3af743a2a --- /dev/null +++ b/buffers.go @@ -0,0 +1,24 @@ +// Copyright 2019 Path Network, Inc. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package main + +import ( + "math" + "sync" +) + +var buffers sync.Pool + +func init() { + buffers.New = func() interface{} { return make([]byte, math.MaxUint16) } +} + +func GetBuffer() []byte { + return buffers.Get().([]byte) +} + +func PutBuffer(buf []byte) { + buffers.Put(buf) +} diff --git a/main.go b/main.go index 9258bbae13a148c6d82370aec18596333ab09036..36a5e46487bfbed4927284bc2de2c8b95f080c6d 100644 --- a/main.go +++ b/main.go @@ -6,356 +6,74 @@ package main import ( "bufio" - "bytes" - "context" - "encoding/binary" "flag" - "fmt" - "io" "log" "net" "os" - "strings" "syscall" + "time" "go.uber.org/zap" ) -var listenAddr string -var targetAddr4 string -var targetAddr6 string -var allowedSubnetsPath string -var mark int -var verbose int -var listeners int +type options struct { + Protocol string + ListenAddr string + TargetAddr4 string + TargetAddr6 string + Mark int + Verbose int + allowedSubnetsPath string + AllowedSubnets []*net.IPNet + Listeners int + Logger *zap.Logger + udpCloseAfter int + UDPCloseAfter time.Duration +} -var allowedSubnets []*net.IPNet -var logger *zap.Logger +var Opts options func init() { - flag.StringVar(&listenAddr, "l", "0.0.0.0:8443", "Adress the proxy listens on") - flag.StringVar(&targetAddr4, "4", "127.0.0.1:443", "Address to which IPv4 TCP traffic will be forwarded to") - flag.StringVar(&targetAddr6, "6", "[::1]:443", "Address to which IPv6 TCP traffic will be forwarded to") - flag.IntVar(&mark, "mark", 0, "The mark that will be set on outbound packets") - flag.StringVar(&allowedSubnetsPath, "allowed-subnets", "", - "Path to a file that contains allowed subnets of the proxy servers") - flag.IntVar(&verbose, "v", 0, `0 - no logging of individual connections + flag.StringVar(&Opts.Protocol, "p", "tcp", "Protocol that will be proxied: tcp, udp") + flag.StringVar(&Opts.ListenAddr, "l", "0.0.0.0:8443", "Adress the proxy listens on") + flag.StringVar(&Opts.TargetAddr4, "4", "127.0.0.1:443", "Address to which IPv4 traffic will be forwarded to") + flag.StringVar(&Opts.TargetAddr6, "6", "[::1]:443", "Address to which IPv6 traffic will be forwarded to") + flag.IntVar(&Opts.Mark, "mark", 0, "The mark that will be set on outbound packets") + flag.IntVar(&Opts.Verbose, "v", 0, `0 - no logging of individual connections 1 - log errors occuring in individual connections 2 - log all state changes of individual connections`) - flag.IntVar(&listeners, "listeners", 1, + flag.StringVar(&Opts.allowedSubnetsPath, "allowed-subnets", "", + "Path to a file that contains allowed subnets of the proxy servers") + flag.IntVar(&Opts.Listeners, "listeners", 1, "Number of listener sockets that will be opened for the listen address (Linux 3.9+)") -} - -func readRemoteAddrPROXYv2(conn net.Conn, ctrlBuf []byte) (net.Addr, net.Addr, []byte, error) { - if (ctrlBuf[12] >> 4) != 2 { - return nil, nil, nil, fmt.Errorf("unknown protocol version %d", ctrlBuf[12]>>4) - } - - if ctrlBuf[12]&0xF > 1 { - return nil, nil, nil, fmt.Errorf("unknown command %d", ctrlBuf[12]&0xF) - } - - if ctrlBuf[12]&0xF == 1 && ctrlBuf[13] != 0x11 && ctrlBuf[13] != 0x21 { - return nil, nil, nil, fmt.Errorf("invalid family/protocol %d/%d", ctrlBuf[13]>>4, ctrlBuf[13]&0xF) - } - - var dataLen uint16 - reader := bytes.NewReader(ctrlBuf[14:16]) - if err := binary.Read(reader, binary.BigEndian, &dataLen); err != nil { - return nil, nil, nil, fmt.Errorf("failed to decode address data length: %s", err.Error()) - } - - if len(ctrlBuf) < 16+int(dataLen) { - return nil, nil, nil, fmt.Errorf("incomplete PROXY header") - } - - if ctrlBuf[12]&0xF == 0 { // LOCAL - return conn.RemoteAddr(), conn.LocalAddr(), ctrlBuf[16+dataLen:], nil - } - - var sport, dport uint16 - if ctrlBuf[13] == 0x11 { // IPv4 - reader = bytes.NewReader(ctrlBuf[24:]) - } else { - reader = bytes.NewReader(ctrlBuf[48:]) - } - if err := binary.Read(reader, binary.BigEndian, &sport); err != nil { - return nil, nil, nil, fmt.Errorf("failed to decode source TCP port: %s", err.Error()) - } - if err := binary.Read(reader, binary.BigEndian, &dport); err != nil { - return nil, nil, nil, fmt.Errorf("failed to decode destination TCP port: %s", err.Error()) - } - - if ctrlBuf[13] == 0x11 { // TCP over IPv4 - srcIP := net.IPv4(ctrlBuf[16], ctrlBuf[17], ctrlBuf[18], ctrlBuf[19]) - dstIP := net.IPv4(ctrlBuf[20], ctrlBuf[21], ctrlBuf[22], ctrlBuf[23]) - return &net.TCPAddr{IP: srcIP, Port: int(sport)}, - &net.TCPAddr{IP: dstIP, Port: int(dport)}, - ctrlBuf[16+dataLen:], nil - } - - return &net.TCPAddr{IP: ctrlBuf[16:32], Port: int(sport)}, - &net.TCPAddr{IP: ctrlBuf[32:48], Port: int(dport)}, - ctrlBuf[16+dataLen:], nil -} - -func readRemoteAddrPROXYv1(conn net.Conn, ctrlBuf []byte) (net.Addr, net.Addr, []byte, error) { - str := string(ctrlBuf) - if idx := strings.Index(str, "\r\n"); idx >= 0 { - var protocol, src, dst string - var sport, dport int - n, err := fmt.Sscanf(str, "PROXY %s", &protocol) - if err != nil { - return nil, nil, nil, err - } - if n != 1 { - return nil, nil, nil, fmt.Errorf("failed to decode elements") - } - if protocol == "UNKNOWN" { - return conn.RemoteAddr(), conn.LocalAddr(), ctrlBuf[idx+2:], nil - } - if protocol != "TCP4" && protocol != "TCP6" { - return nil, nil, nil, fmt.Errorf("unknown protocol %s", protocol) - } - - n, err = fmt.Sscanf(str, "PROXY %s %s %s %d %d", &protocol, &src, &dst, &sport, &dport) - if err != nil { - return nil, nil, nil, err - } - if n != 5 { - return nil, nil, nil, fmt.Errorf("failed to decode elements") - } - srcIP := net.ParseIP(src) - if srcIP == nil { - return nil, nil, nil, fmt.Errorf("failed to parse source IP address %s", src) - } - dstIP := net.ParseIP(dst) - if dstIP == nil { - return nil, nil, nil, fmt.Errorf("failed to parse destination IP address %s", dst) - } - return &net.TCPAddr{IP: srcIP, Port: sport}, - &net.TCPAddr{IP: dstIP, Port: dport}, - ctrlBuf[idx+2:], nil - } - - return nil, nil, nil, fmt.Errorf("did not find \\r\\n in first data segment") -} - -func readRemoteAddr(conn net.Conn) (net.Addr, net.Addr, []byte, error) { - buf := make([]byte, 108) - n, err := conn.Read(buf) - if err != nil { - return nil, nil, nil, fmt.Errorf("failed to read header: %s", err.Error()) - } - - if n >= 16 && bytes.Equal(buf[:12], - []byte{0x0D, 0x0A, 0x0D, 0x0A, 0x00, 0x0D, 0x0A, 0x51, 0x55, 0x49, 0x54, 0x0A}) { - saddr, daddr, rest, err := readRemoteAddrPROXYv2(conn, buf[:n]) - if err != nil { - return nil, nil, nil, fmt.Errorf("failed to parse PROXY v2 header: %s", err.Error()) - } - return saddr, daddr, rest, err - } - - if n >= 8 && bytes.Equal(buf[:5], []byte("PROXY")) { - saddr, daddr, rest, err := readRemoteAddrPROXYv1(conn, buf[:n]) - if err != nil { - return nil, nil, nil, fmt.Errorf("failed to parse PROXY v1 header: %s", err.Error()) - } - return saddr, daddr, rest, err - } - - return nil, nil, nil, fmt.Errorf("PROXY header missing") -} - -func dialUpstreamControl(sport int, connLog *zap.Logger) func(string, string, syscall.RawConn) error { - return func(network, address string, c syscall.RawConn) error { - var syscallErr error - err := c.Control(func(fd uintptr) { - syscallErr = syscall.SetsockoptInt(int(fd), syscall.IPPROTO_TCP, syscall.TCP_SYNCNT, 2) - if syscallErr != nil { - syscallErr = fmt.Errorf("setsockopt(IPPROTO_TCP, TCP_SYNCTNT, 2): %s", syscallErr.Error()) - return - } - - syscallErr = syscall.SetsockoptInt(int(fd), syscall.IPPROTO_IP, syscall.IP_TRANSPARENT, 1) - if syscallErr != nil { - syscallErr = fmt.Errorf("setsockopt(IPPROTO_IP, IP_TRANSPARENT, 1): %s", syscallErr.Error()) - return - } - - syscallErr = syscall.SetsockoptInt(int(fd), syscall.SOL_SOCKET, syscall.SO_REUSEADDR, 1) - if syscallErr != nil { - syscallErr = fmt.Errorf("setsockopt(SOL_SOCKET, SO_REUSEADDR, 1): %s", syscallErr.Error()) - return - } - - if sport == 0 { - ipBindAddressNoPort := 24 - err := syscall.SetsockoptInt(int(fd), syscall.IPPROTO_IP, ipBindAddressNoPort, 1) - if err != nil && verbose > 1 { - connLog.Debug("Failed to set IP_BIND_ADDRESS_NO_PORT", zap.Error(err)) - } - } - - if mark != 0 { - syscallErr = syscall.SetsockoptInt(int(fd), syscall.SOL_SOCKET, syscall.SO_MARK, mark) - if syscallErr != nil { - syscallErr = fmt.Errorf("setsockopt(SOL_SOCK, SO_MARK, %d): %s", mark, syscallErr.Error()) - return - } - } - - if network == "tcp6" { - syscallErr = syscall.SetsockoptInt(int(fd), syscall.IPPROTO_IP, syscall.IPV6_V6ONLY, 0) - if syscallErr != nil { - syscallErr = fmt.Errorf("setsockopt(IPPROTO_IP, IPV6_ONLY, 0): %s", syscallErr.Error()) - return - } - } - }) - - if err != nil { - return err - } - return syscallErr - } -} - -func copyData(dst net.Conn, src net.Conn, ch chan<- error) { - _, err := io.Copy(dst, src) - ch <- err -} - -func checkOriginAllowed(conn net.Conn) bool { - if len(allowedSubnets) == 0 { - return true - } - - addr := conn.RemoteAddr().(*net.TCPAddr) - for _, ipNet := range allowedSubnets { - if ipNet.Contains(addr.IP) { - return true - } - } - return false -} - -func handleConnection(conn net.Conn, listenLog *zap.Logger) { - defer conn.Close() - connLog := listenLog.With(zap.String("remoteAddr", conn.RemoteAddr().String()), - zap.String("localAddr", conn.LocalAddr().String())) - - if !checkOriginAllowed(conn) { - connLog.Debug("connection origin not in allowed subnets", zap.Bool("dropConnection", true)) - return - } - - if verbose > 1 { - connLog.Debug("new connection") - } - - saddr, _, restBytes, err := readRemoteAddr(conn) - if err != nil { - connLog.Debug("failed to parse PROXY header", zap.Error(err), zap.Bool("dropConnection", true)) - return - } - - targetAddr := targetAddr6 - if strings.ContainsRune(saddr.String(), '.') { // poor man's ipv6 check - golang makes it unnecessarily hard - targetAddr = targetAddr4 - } - - connLog = connLog.With(zap.String("clientAddr", saddr.String()), zap.String("targetAddr", targetAddr)) - if verbose > 1 { - connLog.Debug("successfuly parsed PROXY header") - } - - dialer := net.Dialer{LocalAddr: saddr, Control: dialUpstreamControl(saddr.(*net.TCPAddr).Port, connLog)} - upstreamConn, err := dialer.Dial("tcp", targetAddr) - if err != nil { - connLog.Debug("failed to establish upstream connection", zap.Error(err), zap.Bool("dropConnection", true)) - return - } - - defer upstreamConn.Close() - if verbose > 1 { - connLog.Debug("successfuly established upstream connection") - } - - if err := conn.(*net.TCPConn).SetNoDelay(true); err != nil { - connLog.Debug("failed to set nodelay on downstream connection", zap.Error(err), zap.Bool("dropConnection", true)) - } else if verbose > 1 { - connLog.Debug("successfuly set NoDelay on downstream connection") - } - - if err := upstreamConn.(*net.TCPConn).SetNoDelay(true); err != nil { - connLog.Debug("failed to set nodelay on upstream connection", zap.Error(err), zap.Bool("dropConnection", true)) - } else if verbose > 1 { - connLog.Debug("successfuly set NoDelay on upstream connection") - } - - for len(restBytes) > 0 { - n, err := upstreamConn.Write(restBytes) - if err != nil { - connLog.Debug("failed to write data to upstream connection", - zap.Error(err), zap.Bool("dropConnection", true)) - return - } - restBytes = restBytes[n:] - } - - outErr := make(chan error, 2) - go copyData(upstreamConn, conn, outErr) - go copyData(conn, upstreamConn, outErr) - - err = <-outErr - if err != nil { - connLog.Debug("connection broken", zap.Error(err), zap.Bool("dropConnection", true)) - } else if verbose > 1 { - connLog.Debug("connection closing") - } + flag.IntVar(&Opts.udpCloseAfter, "close-after", 60, "Number of seconds after which UDP socket will be cleaned up") } func listen(listenerNum int, errors chan<- error) { - listenLog := logger.With(zap.Int("listenerNum", listenerNum)) + logger := Opts.Logger.With(zap.Int("listenerNum", listenerNum), + zap.String("protocol", Opts.Protocol), zap.String("listenAdr", Opts.ListenAddr)) listenConfig := net.ListenConfig{} - if listeners > 1 { + if Opts.Listeners > 1 { listenConfig.Control = func(network, address string, c syscall.RawConn) error { return c.Control(func(fd uintptr) { soReusePort := 15 if err := syscall.SetsockoptInt(int(fd), syscall.SOL_SOCKET, soReusePort, 1); err != nil { - listenLog.Warn("failed to set SO_REUSEPORT - only one listener setup will succeed") + logger.Warn("failed to set SO_REUSEPORT - only one listener setup will succeed") } }) } } - ctx := context.Background() - ln, err := listenConfig.Listen(ctx, "tcp", listenAddr) - if err != nil { - listenLog.Error("failed to bind listener", zap.String("listenAddr", listenAddr), zap.Error(err)) - errors <- err - return - } - - listenLog.Info("listening", zap.String("listenAddr", listenAddr)) - - for { - conn, err := ln.Accept() - if err != nil { - listenLog.Error("failed to accept new connection", zap.Error(err)) - errors <- err - return - } - - go handleConnection(conn, listenLog) + if Opts.Protocol == "tcp" { + TCPListen(&listenConfig, logger, errors) + } else { + UDPListen(&listenConfig, logger, errors) } } func loadAllowedSubnets() error { - file, err := os.Open(allowedSubnetsPath) + file, err := os.Open(Opts.allowedSubnetsPath) if err != nil { return err } @@ -368,8 +86,8 @@ func loadAllowedSubnets() error { if err != nil { return err } - allowedSubnets = append(allowedSubnets, ipNet) - logger.Info("allowed subnet", zap.String("subnet", ipNet.String())) + Opts.AllowedSubnets = append(Opts.AllowedSubnets, ipNet) + Opts.Logger.Info("allowed subnet", zap.String("subnet", ipNet.String())) } return nil @@ -377,13 +95,13 @@ func loadAllowedSubnets() error { func initLogger() error { logConfig := zap.NewProductionConfig() - if verbose > 0 { + if Opts.Verbose > 0 { logConfig.Level.SetLevel(zap.DebugLevel) } l, err := logConfig.Build() if err == nil { - logger = l + Opts.Logger = l } return err } @@ -393,23 +111,41 @@ func main() { if err := initLogger(); err != nil { log.Fatalf("Failed to initialize logging: %s", err.Error()) } - defer logger.Sync() - - if listeners <= 0 { - logger.Fatal("--listeners has to be >= 1") - } + defer Opts.Logger.Sync() - if allowedSubnetsPath != "" { + if Opts.allowedSubnetsPath != "" { if err := loadAllowedSubnets(); err != nil { - logger.Fatal("failed to load allowed subnets file", zap.String("path", allowedSubnetsPath), zap.Error(err)) + Opts.Logger.Fatal("failed to load allowed subnets file", + zap.String("path", Opts.allowedSubnetsPath), zap.Error(err)) } } - listenErrors := make(chan error, listeners) - for i := 0; i < listeners; i++ { + if Opts.Protocol != "tcp" && Opts.Protocol != "udp" { + Opts.Logger.Fatal("--protocol has to be one of udp, tcp", zap.String("protocol", Opts.Protocol)) + } + + if Opts.Mark < 0 { + Opts.Logger.Fatal("--mark has to be >= 0", zap.Int("mark", Opts.Mark)) + } + + if Opts.Verbose < 0 { + Opts.Logger.Fatal("-v has to be >= 0", zap.Int("verbose", Opts.Verbose)) + } + + if Opts.Listeners < 1 { + Opts.Logger.Fatal("--listeners has to be >= 1") + } + + if Opts.udpCloseAfter < 0 { + Opts.Logger.Fatal("--close-after has to be >= 0", zap.Int("close-after", Opts.udpCloseAfter)) + } + Opts.UDPCloseAfter = time.Duration(Opts.udpCloseAfter) * time.Second + + listenErrors := make(chan error, Opts.Listeners) + for i := 0; i < Opts.Listeners; i++ { go listen(i, listenErrors) } - for i := 0; i < listeners; i++ { + for i := 0; i < Opts.Listeners; i++ { <-listenErrors } } diff --git a/proxyprotocol.go b/proxyprotocol.go new file mode 100644 index 0000000000000000000000000000000000000000..7e88642f88f7909700f5c2688516eb2c7fd571d8 --- /dev/null +++ b/proxyprotocol.go @@ -0,0 +1,138 @@ +// Copyright 2019 Path Network, Inc. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package main + +import ( + "bytes" + "encoding/binary" + "fmt" + "net" + "strings" +) + +func readRemoteAddrPROXYv2(ctrlBuf []byte, protocol Protocol) (net.Addr, net.Addr, []byte, error) { + if (ctrlBuf[12] >> 4) != 2 { + return nil, nil, nil, fmt.Errorf("unknown protocol version %d", ctrlBuf[12]>>4) + } + + if ctrlBuf[12]&0xF > 1 { + return nil, nil, nil, fmt.Errorf("unknown command %d", ctrlBuf[12]&0xF) + } + + if ctrlBuf[12]&0xF == 1 && ((protocol == TCP && ctrlBuf[13] != 0x11 && ctrlBuf[13] != 0x21) || + (protocol == UDP && ctrlBuf[13] != 0x12 && ctrlBuf[13] != 0x22)) { + return nil, nil, nil, fmt.Errorf("invalid family/protocol %d/%d", ctrlBuf[13]>>4, ctrlBuf[13]&0xF) + } + + var dataLen uint16 + reader := bytes.NewReader(ctrlBuf[14:16]) + if err := binary.Read(reader, binary.BigEndian, &dataLen); err != nil { + return nil, nil, nil, fmt.Errorf("failed to decode address data length: %s", err.Error()) + } + + if len(ctrlBuf) < 16+int(dataLen) { + return nil, nil, nil, fmt.Errorf("incomplete PROXY header") + } + + if ctrlBuf[12]&0xF == 0 { // LOCAL + return nil, nil, ctrlBuf[16+dataLen:], nil + } + + var sport, dport uint16 + if ctrlBuf[13]>>4 == 0x1 { // IPv4 + reader = bytes.NewReader(ctrlBuf[24:]) + } else { + reader = bytes.NewReader(ctrlBuf[48:]) + } + if err := binary.Read(reader, binary.BigEndian, &sport); err != nil { + return nil, nil, nil, fmt.Errorf("failed to decode source port: %s", err.Error()) + } + if err := binary.Read(reader, binary.BigEndian, &dport); err != nil { + return nil, nil, nil, fmt.Errorf("failed to decode destination port: %s", err.Error()) + } + + var srcIP, dstIP net.IP + if ctrlBuf[13]>>4 == 0x1 { // IPv4 + srcIP = net.IPv4(ctrlBuf[16], ctrlBuf[17], ctrlBuf[18], ctrlBuf[19]) + dstIP = net.IPv4(ctrlBuf[20], ctrlBuf[21], ctrlBuf[22], ctrlBuf[23]) + } else { + srcIP = ctrlBuf[16:32] + dstIP = ctrlBuf[32:48] + } + + if ctrlBuf[13]&0xF == 0x1 { // TCP + return &net.TCPAddr{IP: srcIP, Port: int(sport)}, + &net.TCPAddr{IP: dstIP, Port: int(dport)}, + ctrlBuf[16+dataLen:], nil + } + + return &net.UDPAddr{IP: srcIP, Port: int(sport)}, + &net.UDPAddr{IP: dstIP, Port: int(dport)}, + ctrlBuf[16+dataLen:], nil +} + +func readRemoteAddrPROXYv1(ctrlBuf []byte) (net.Addr, net.Addr, []byte, error) { + str := string(ctrlBuf) + if idx := strings.Index(str, "\r\n"); idx >= 0 { + var headerProtocol, src, dst string + var sport, dport int + n, err := fmt.Sscanf(str, "PROXY %s", &headerProtocol) + if err != nil { + return nil, nil, nil, err + } + if n != 1 { + return nil, nil, nil, fmt.Errorf("failed to decode elements") + } + if headerProtocol == "UNKNOWN" { + return nil, nil, ctrlBuf[idx+2:], nil + } + if headerProtocol != "TCP4" && headerProtocol != "TCP6" { + return nil, nil, nil, fmt.Errorf("unknown protocol %s", headerProtocol) + } + + n, err = fmt.Sscanf(str, "PROXY %s %s %s %d %d", &headerProtocol, &src, &dst, &sport, &dport) + if err != nil { + return nil, nil, nil, err + } + if n != 5 { + return nil, nil, nil, fmt.Errorf("failed to decode elements") + } + srcIP := net.ParseIP(src) + if srcIP == nil { + return nil, nil, nil, fmt.Errorf("failed to parse source IP address %s", src) + } + dstIP := net.ParseIP(dst) + if dstIP == nil { + return nil, nil, nil, fmt.Errorf("failed to parse destination IP address %s", dst) + } + return &net.TCPAddr{IP: srcIP, Port: sport}, + &net.TCPAddr{IP: dstIP, Port: dport}, + ctrlBuf[idx+2:], nil + } + + return nil, nil, nil, fmt.Errorf("did not find \\r\\n in first data segment") +} + +func PROXYReadRemoteAddr(buf []byte, protocol Protocol) (net.Addr, net.Addr, []byte, error) { + if len(buf) >= 16 && bytes.Equal(buf[:12], + []byte{0x0D, 0x0A, 0x0D, 0x0A, 0x00, 0x0D, 0x0A, 0x51, 0x55, 0x49, 0x54, 0x0A}) { + saddr, daddr, rest, err := readRemoteAddrPROXYv2(buf, protocol) + if err != nil { + return nil, nil, nil, fmt.Errorf("failed to parse PROXY v2 header: %s", err.Error()) + } + return saddr, daddr, rest, err + } + + // PROXYv1 only works with TCP + if protocol == TCP && len(buf) >= 8 && bytes.Equal(buf[:5], []byte("PROXY")) { + saddr, daddr, rest, err := readRemoteAddrPROXYv1(buf) + if err != nil { + return nil, nil, nil, fmt.Errorf("failed to parse PROXY v1 header: %s", err.Error()) + } + return saddr, daddr, rest, err + } + + return nil, nil, nil, fmt.Errorf("PROXY header missing") +} diff --git a/tcp.go b/tcp.go new file mode 100644 index 0000000000000000000000000000000000000000..3c79fabba768b163fc589551e69249fee5f0394c --- /dev/null +++ b/tcp.go @@ -0,0 +1,136 @@ +// Copyright 2019 Path Network, Inc. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package main + +import ( + "context" + "io" + "net" + + "go.uber.org/zap" +) + +func tcpCopyData(dst net.Conn, src net.Conn, ch chan<- error) { + _, err := io.Copy(dst, src) + ch <- err +} + +func tcpHandleConnection(conn net.Conn, logger *zap.Logger) { + defer conn.Close() + logger = logger.With(zap.String("remoteAddr", conn.RemoteAddr().String()), + zap.String("localAddr", conn.LocalAddr().String())) + + if !CheckOriginAllowed(conn.RemoteAddr().(*net.TCPAddr).IP) { + logger.Debug("connection origin not in allowed subnets", zap.Bool("dropConnection", true)) + return + } + + if Opts.Verbose > 1 { + logger.Debug("new connection") + } + + buffer := GetBuffer() + defer func() { + if buffer != nil { + PutBuffer(buffer) + } + }() + + n, err := conn.Read(buffer) + if err != nil { + logger.Debug("failed to read PROXY header", zap.Error(err), zap.Bool("dropConnection", true)) + return + } + + saddr, _, restBytes, err := PROXYReadRemoteAddr(buffer[:n], TCP) + if err != nil { + logger.Debug("failed to parse PROXY header", zap.Error(err), zap.Bool("dropConnection", true)) + return + } + + targetAddr := Opts.TargetAddr6 + if AddrVersion(conn.RemoteAddr()) == 4 { + targetAddr = Opts.TargetAddr4 + } + + logger = logger.With(zap.String("clientAddr", saddr.String()), zap.String("targetAddr", targetAddr)) + if Opts.Verbose > 1 { + logger.Debug("successfuly parsed PROXY header") + } + + dialer := net.Dialer{LocalAddr: saddr} + if saddr != nil { + dialer.Control = DialUpstreamControl(saddr.(*net.TCPAddr).Port) + } + upstreamConn, err := dialer.Dial("tcp", targetAddr) + if err != nil { + logger.Debug("failed to establish upstream connection", zap.Error(err), zap.Bool("dropConnection", true)) + return + } + + defer upstreamConn.Close() + if Opts.Verbose > 1 { + logger.Debug("successfuly established upstream connection") + } + + if err := conn.(*net.TCPConn).SetNoDelay(true); err != nil { + logger.Debug("failed to set nodelay on downstream connection", zap.Error(err), zap.Bool("dropConnection", true)) + } else if Opts.Verbose > 1 { + logger.Debug("successfuly set NoDelay on downstream connection") + } + + if err := upstreamConn.(*net.TCPConn).SetNoDelay(true); err != nil { + logger.Debug("failed to set nodelay on upstream connection", zap.Error(err), zap.Bool("dropConnection", true)) + } else if Opts.Verbose > 1 { + logger.Debug("successfuly set NoDelay on upstream connection") + } + + for len(restBytes) > 0 { + n, err := upstreamConn.Write(restBytes) + if err != nil { + logger.Debug("failed to write data to upstream connection", + zap.Error(err), zap.Bool("dropConnection", true)) + return + } + restBytes = restBytes[n:] + } + + PutBuffer(buffer) + buffer = nil + + outErr := make(chan error, 2) + go tcpCopyData(upstreamConn, conn, outErr) + go tcpCopyData(conn, upstreamConn, outErr) + + err = <-outErr + if err != nil { + logger.Debug("connection broken", zap.Error(err), zap.Bool("dropConnection", true)) + } else if Opts.Verbose > 1 { + logger.Debug("connection closing") + } +} + +func TCPListen(listenConfig *net.ListenConfig, logger *zap.Logger, errors chan<- error) { + ctx := context.Background() + ln, err := listenConfig.Listen(ctx, "tcp", Opts.ListenAddr) + if err != nil { + logger.Error("failed to bind listener", zap.Error(err)) + errors <- err + return + } + + logger.Info("listening") + + for { + conn, err := ln.Accept() + if err != nil { + logger.Error("failed to accept new connection", zap.Error(err)) + errors <- err + return + } + + go tcpHandleConnection(conn, logger) + } +} diff --git a/udp.go b/udp.go new file mode 100644 index 0000000000000000000000000000000000000000..7d543c283cb76ad1c503c384cb359a8f0f74c8b6 --- /dev/null +++ b/udp.go @@ -0,0 +1,184 @@ +// Copyright 2019 Path Network, Inc. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package main + +import ( + "context" + "net" + "sync/atomic" + "syscall" + "time" + + "go.uber.org/zap" +) + +type udpConnection struct { + lastActivity *int64 + clientAddr *net.UDPAddr + downstreamAddr *net.UDPAddr + upstream *net.UDPConn + logger *zap.Logger +} + +func udpCloseAfterInactivity(conn *udpConnection, socketClosures chan<- string) { + for { + lastActivity := atomic.LoadInt64(conn.lastActivity) + <-time.After(Opts.UDPCloseAfter) + if atomic.LoadInt64(conn.lastActivity) == lastActivity { + break + } + } + conn.upstream.Close() + socketClosures <- conn.clientAddr.String() +} + +func udpCopyFromUpstream(downstream net.PacketConn, conn *udpConnection) { + rawConn, err := conn.upstream.SyscallConn() + if err != nil { + conn.logger.Error("failed to retrieve raw connection from upstream socket", zap.Error(err)) + return + } + + var syscallErr error + + err = rawConn.Read(func(fd uintptr) bool { + buf := GetBuffer() + defer PutBuffer(buf) + + for { + n, _, serr := syscall.Recvfrom(int(fd), buf, syscall.MSG_DONTWAIT) + if serr == syscall.EWOULDBLOCK { + return false + } + if serr != nil { + syscallErr = serr + return true + } + if n == 0 { + return true + } + + atomic.AddInt64(conn.lastActivity, 1) + + if _, serr := downstream.WriteTo(buf[:n], conn.downstreamAddr); serr != nil { + syscallErr = serr + return true + } + } + }) + + if err == nil { + err = syscallErr + } + if err != nil { + conn.logger.Debug("failed to read from upstream", zap.Error(err)) + } +} + +func udpGetSocketFromMap(downstream net.PacketConn, downstreamAddr, saddr net.Addr, logger *zap.Logger, + connMap map[string]*udpConnection, socketClosures chan<- string) (*udpConnection, error) { + connKey := "" + if saddr != nil { + connKey = saddr.String() + } + if conn := connMap[saddr.String()]; conn != nil { + atomic.AddInt64(conn.lastActivity, 1) + return conn, nil + } + + targetAddr := Opts.TargetAddr6 + if AddrVersion(downstreamAddr) == 4 { + targetAddr = Opts.TargetAddr4 + } + + logger = logger.With(zap.String("downstreamAddr", downstreamAddr.String()), zap.String("targetAddr", targetAddr)) + dialer := net.Dialer{LocalAddr: saddr} + if saddr != nil { + logger = logger.With(zap.String("clientAddr", saddr.String())) + dialer.Control = DialUpstreamControl(saddr.(*net.UDPAddr).Port) + } + + if Opts.Verbose > 1 { + logger.Debug("new connection") + } + + conn, err := dialer.Dial("udp", targetAddr) + if err != nil { + logger.Debug("failed to connect to upstream", zap.Error(err)) + return nil, err + } + + udpConn := &udpConnection{upstream: conn.(*net.UDPConn), + logger: logger, + lastActivity: new(int64), + clientAddr: saddr.(*net.UDPAddr), + downstreamAddr: downstreamAddr.(*net.UDPAddr)} + + go udpCopyFromUpstream(downstream, udpConn) + go udpCloseAfterInactivity(udpConn, socketClosures) + + connMap[connKey] = udpConn + return udpConn, nil +} + +func UDPListen(listenConfig *net.ListenConfig, logger *zap.Logger, errors chan<- error) { + ctx := context.Background() + ln, err := listenConfig.ListenPacket(ctx, "udp", Opts.ListenAddr) + if err != nil { + logger.Error("failed to bind listener", zap.Error(err)) + errors <- err + return + } + + logger.Info("listening") + + socketClosures := make(chan string, 1024) + connectionMap := make(map[string]*udpConnection) + + buffer := GetBuffer() + defer PutBuffer(buffer) + + for { + n, remoteAddr, err := ln.ReadFrom(buffer) + if err != nil { + logger.Error("failed to read from socket", zap.Error(err)) + continue + } + + if !CheckOriginAllowed(remoteAddr.(*net.UDPAddr).IP) { + logger.Debug("packet origin not in allowed subnets", zap.String("remoteAddr", remoteAddr.String())) + continue + } + + saddr, _, restBytes, err := PROXYReadRemoteAddr(buffer[:n], UDP) + if err != nil { + logger.Debug("failed to parse PROXY header", zap.Error(err), zap.String("remoteAddr", remoteAddr.String())) + continue + } + + for { + doneClosing := false + select { + case mapKey := <-socketClosures: + delete(connectionMap, mapKey) + default: + doneClosing = true + } + if doneClosing { + break + } + } + + conn, err := udpGetSocketFromMap(ln, remoteAddr, saddr, logger, connectionMap, socketClosures) + if err != nil { + continue + } + + _, err = conn.upstream.Write(restBytes) + if err != nil { + conn.logger.Error("failed to write to upstream socket", zap.Error(err)) + } + } +} diff --git a/utils.go b/utils.go new file mode 100644 index 0000000000000000000000000000000000000000..57065603051d102f76185e81068e4ea462516122 --- /dev/null +++ b/utils.go @@ -0,0 +1,93 @@ +// Copyright 2019 Path Network, Inc. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package main + +import ( + "fmt" + "net" + "strings" + "syscall" +) + +type Protocol int + +const ( + TCP Protocol = iota + UDP +) + +func CheckOriginAllowed(remoteIP net.IP) bool { + if len(Opts.AllowedSubnets) == 0 { + return true + } + + for _, ipNet := range Opts.AllowedSubnets { + if ipNet.Contains(remoteIP) { + return true + } + } + return false +} + +func DialUpstreamControl(sport int) func(string, string, syscall.RawConn) error { + return func(network, address string, c syscall.RawConn) error { + var syscallErr error + err := c.Control(func(fd uintptr) { + if Opts.Protocol == "tcp" { + syscallErr = syscall.SetsockoptInt(int(fd), syscall.IPPROTO_TCP, syscall.TCP_SYNCNT, 2) + if syscallErr != nil { + syscallErr = fmt.Errorf("setsockopt(IPPROTO_TCP, TCP_SYNCTNT, 2): %s", syscallErr.Error()) + return + } + } + + syscallErr = syscall.SetsockoptInt(int(fd), syscall.IPPROTO_IP, syscall.IP_TRANSPARENT, 1) + if syscallErr != nil { + syscallErr = fmt.Errorf("setsockopt(IPPROTO_IP, IP_TRANSPARENT, 1): %s", syscallErr.Error()) + return + } + + syscallErr = syscall.SetsockoptInt(int(fd), syscall.SOL_SOCKET, syscall.SO_REUSEADDR, 1) + if syscallErr != nil { + syscallErr = fmt.Errorf("setsockopt(SOL_SOCKET, SO_REUSEADDR, 1): %s", syscallErr.Error()) + return + } + + if sport == 0 { + ipBindAddressNoPort := 24 + syscall.SetsockoptInt(int(fd), syscall.IPPROTO_IP, ipBindAddressNoPort, 1) + } + + if Opts.Mark != 0 { + syscallErr = syscall.SetsockoptInt(int(fd), syscall.SOL_SOCKET, syscall.SO_MARK, Opts.Mark) + if syscallErr != nil { + syscallErr = fmt.Errorf("setsockopt(SOL_SOCK, SO_MARK, %d): %s", Opts.Mark, syscallErr.Error()) + return + } + } + + if network == "tcp6" || network == "udp6" { + syscallErr = syscall.SetsockoptInt(int(fd), syscall.IPPROTO_IP, syscall.IPV6_V6ONLY, 0) + if syscallErr != nil { + syscallErr = fmt.Errorf("setsockopt(IPPROTO_IP, IPV6_ONLY, 0): %s", syscallErr.Error()) + return + } + } + }) + + if err != nil { + return err + } + return syscallErr + } +} + +func AddrVersion(addr net.Addr) int { + // poor man's ipv6 check - golang makes it unnecessarily hard + if strings.ContainsRune(addr.String(), '.') { + return 4 + } + return 6 +}