diff --git a/src/internal/poll/fd_poll_nacl.go b/src/internal/poll/fd_poll_nacl.go index 69c728d084365e72048bb8d6babc1d5c6938a895..8cf54ef6d50b174447a6958e1ec07f6fc0a116e1 100644 --- a/src/internal/poll/fd_poll_nacl.go +++ b/src/internal/poll/fd_poll_nacl.go @@ -85,3 +85,7 @@ func setDeadlineImpl(fd *FD, t time.Time, mode int) error { fd.decref() return nil } + +func PollDescriptor() uintptr { + return ^uintptr(0) +} diff --git a/src/internal/poll/fd_poll_runtime.go b/src/internal/poll/fd_poll_runtime.go index 5040d6a1f79466f35c7a3814ba042df654067d67..032a0f71bbed1096c496cc5cac0bdc387199106a 100644 --- a/src/internal/poll/fd_poll_runtime.go +++ b/src/internal/poll/fd_poll_runtime.go @@ -17,6 +17,7 @@ import ( func runtimeNano() int64 func runtime_pollServerInit() +func runtime_pollServerDescriptor() uintptr func runtime_pollOpen(fd uintptr) (uintptr, int) func runtime_pollClose(ctx uintptr) func runtime_pollWait(ctx uintptr, mode int) int @@ -146,3 +147,9 @@ func setDeadlineImpl(fd *FD, t time.Time, mode int) error { fd.decref() return nil } + +// PollDescriptor returns the descriptor being used by the poller, +// or ^uintptr(0) if there isn't one. This is only used for testing. +func PollDescriptor() uintptr { + return runtime_pollServerDescriptor() +} diff --git a/src/internal/poll/fd_unix.go b/src/internal/poll/fd_unix.go index 0a7b72fe91c96952de16aa75435d453c73a1c016..0cf3d933aa9d16a73756997a22b4da9126b84979 100644 --- a/src/internal/poll/fd_unix.go +++ b/src/internal/poll/fd_unix.go @@ -365,7 +365,19 @@ func (fd *FD) ReadDirent(buf []byte) (int, error) { return 0, err } defer fd.decref() - return syscall.ReadDirent(fd.Sysfd, buf) + for { + n, err := syscall.ReadDirent(fd.Sysfd, buf) + if err != nil { + n = 0 + if err == syscall.EAGAIN { + if err = fd.pd.waitRead(); err == nil { + continue + } + } + } + // Do not call eofError; caller does not expect to see io.EOF. + return n, err + } } // Fchdir wraps syscall.Fchdir. diff --git a/src/internal/poll/fd_windows.go b/src/internal/poll/fd_windows.go index db3f1f423b84fc26c46764d6c88d9eccb99b5e34..16e70e6093a68452c6ef17e8776eb35c683e9daf 100644 --- a/src/internal/poll/fd_windows.go +++ b/src/internal/poll/fd_windows.go @@ -523,13 +523,15 @@ func (fd *FD) Pread(b []byte, off int64) (int, error) { var done uint32 e = syscall.ReadFile(fd.Sysfd, b, &done, &o) if e != nil { + done = 0 if e == syscall.ERROR_HANDLE_EOF { - // end of file - return 0, nil + e = io.EOF } - return 0, e } - return int(done), nil + if len(b) != 0 { + e = fd.eofError(int(done), e) + } + return int(done), e } func (fd *FD) RecvFrom(buf []byte) (int, syscall.Sockaddr, error) { diff --git a/src/os/dir_unix.go b/src/os/dir_unix.go index 03d949af1a2c93e78542d158911fe999d71f5a84..3424688e8cfeab91ee38e25a5df8d72fdffa307b 100644 --- a/src/os/dir_unix.go +++ b/src/os/dir_unix.go @@ -8,6 +8,7 @@ package os import ( "io" + "runtime" "syscall" ) @@ -63,9 +64,10 @@ func (f *File) readdirnames(n int) (names []string, err error) { if d.bufp >= d.nbuf { d.bufp = 0 var errno error - d.nbuf, errno = fixCount(syscall.ReadDirent(f.fd, d.buf)) + d.nbuf, errno = f.pfd.ReadDirent(d.buf) + runtime.KeepAlive(f) if errno != nil { - return names, NewSyscallError("readdirent", errno) + return names, wrapSyscallError("readdirent", errno) } if d.nbuf <= 0 { break // EOF diff --git a/src/os/dir_windows.go b/src/os/dir_windows.go index 76024fc1e3fd92f9fb91cce0ca7b42e133aa2dab..2a012a8a1236d8f5101e77d5efa6f530e08edfbd 100644 --- a/src/os/dir_windows.go +++ b/src/os/dir_windows.go @@ -6,6 +6,7 @@ package os import ( "io" + "runtime" "syscall" ) @@ -16,7 +17,7 @@ func (file *File) readdir(n int) (fi []FileInfo, err error) { if !file.isdir() { return nil, &PathError{"Readdir", file.name, syscall.ENOTDIR} } - if !file.dirinfo.isempty && file.fd == syscall.InvalidHandle { + if !file.dirinfo.isempty && file.pfd.Sysfd == syscall.InvalidHandle { return nil, syscall.EINVAL } wantAll := n <= 0 @@ -29,7 +30,8 @@ func (file *File) readdir(n int) (fi []FileInfo, err error) { d := &file.dirinfo.data for n != 0 && !file.dirinfo.isempty { if file.dirinfo.needdata { - e := syscall.FindNextFile(file.fd, d) + e := file.pfd.FindNextFile(d) + runtime.KeepAlive(file) if e != nil { if e == syscall.ERROR_NO_MORE_FILES { break diff --git a/src/os/error_posix.go b/src/os/error_posix.go new file mode 100644 index 0000000000000000000000000000000000000000..2049e448e8fe86631c1b1b03bbdc3090ba41338a --- /dev/null +++ b/src/os/error_posix.go @@ -0,0 +1,18 @@ +// Copyright 2017 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// +build darwin dragonfly freebsd linux nacl netbsd openbsd solaris windows + +package os + +import "syscall" + +// wrapSyscallError takes an error and a syscall name. If the error is +// a syscall.Errno, it wraps it in a os.SyscallError using the syscall name. +func wrapSyscallError(name string, err error) error { + if _, ok := err.(syscall.Errno); ok { + err = NewSyscallError(name, err) + } + return err +} diff --git a/src/os/exec/exec_test.go b/src/os/exec/exec_test.go index 34337450a08c7d78b9efe9faae81ef1087bf73ee..1a159e261f2bc745c5adb1b065bdeeb7fae6e9b3 100644 --- a/src/os/exec/exec_test.go +++ b/src/os/exec/exec_test.go @@ -12,6 +12,7 @@ import ( "bytes" "context" "fmt" + "internal/poll" "internal/testenv" "io" "io/ioutil" @@ -369,12 +370,16 @@ var testedAlreadyLeaked = false // basefds returns the number of expected file descriptors // to be present in a process at start. +// stdin, stdout, stderr, epoll/kqueue func basefds() uintptr { return os.Stderr.Fd() + 1 } func closeUnexpectedFds(t *testing.T, m string) { for fd := basefds(); fd <= 101; fd++ { + if fd == poll.PollDescriptor() { + continue + } err := os.NewFile(fd, "").Close() if err == nil { t.Logf("%s: Something already leaked - closed fd %d", m, fd) @@ -732,6 +737,9 @@ func TestHelperProcess(*testing.T) { // Now verify that there are no other open fds. var files []*os.File for wantfd := basefds() + 1; wantfd <= 100; wantfd++ { + if wantfd == poll.PollDescriptor() { + continue + } f, err := os.Open(os.Args[0]) if err != nil { fmt.Printf("error opening file with expected fd %d: %v", wantfd, err) diff --git a/src/os/export_windows_test.go b/src/os/export_windows_test.go index 3bb2d2015f77b68546061427e3de02811c7f4646..d08bd747cef3bfd677f80b372c5c560b4a056dc8 100644 --- a/src/os/export_windows_test.go +++ b/src/os/export_windows_test.go @@ -7,7 +7,6 @@ package os // Export for testing. var ( - FixLongPath = fixLongPath - NewConsoleFile = newConsoleFile - ReadConsoleFunc = &readConsole + FixLongPath = fixLongPath + NewConsoleFile = newConsoleFile ) diff --git a/src/os/file.go b/src/os/file.go index d45a00b12396a64cdf50fca59540392538cc5abf..047217881f8943faad059cc79613f1c9a6455457 100644 --- a/src/os/file.go +++ b/src/os/file.go @@ -99,11 +99,12 @@ func (f *File) Read(b []byte) (n int, err error) { return 0, err } n, e := f.read(b) - if n == 0 && len(b) > 0 && e == nil { - return 0, io.EOF - } if e != nil { - err = &PathError{"read", f.name, e} + if e == io.EOF { + err = e + } else { + err = &PathError{"read", f.name, e} + } } return n, err } @@ -118,11 +119,12 @@ func (f *File) ReadAt(b []byte, off int64) (n int, err error) { } for len(b) > 0 { m, e := f.pread(b, off) - if m == 0 && e == nil { - return n, io.EOF - } if e != nil { - err = &PathError{"read", f.name, e} + if e == io.EOF { + err = e + } else { + err = &PathError{"read", f.name, e} + } break } n += m @@ -226,19 +228,6 @@ func Chdir(dir string) error { return nil } -// Chdir changes the current working directory to the file, -// which must be a directory. -// If there is an error, it will be of type *PathError. -func (f *File) Chdir() error { - if err := f.checkValid("chdir"); err != nil { - return err - } - if e := syscall.Fchdir(f.fd); e != nil { - return &PathError{"chdir", f.name, e} - } - return nil -} - // Open opens the named file for reading. If successful, methods on // the returned file can be used for reading; the associated file // descriptor has mode O_RDONLY. @@ -275,15 +264,3 @@ func fixCount(n int, err error) (int, error) { } return n, err } - -// checkValid checks whether f is valid for use. -// If not, it returns an appropriate error, perhaps incorporating the operation name op. -func (f *File) checkValid(op string) error { - if f == nil { - return ErrInvalid - } - if f.fd == badFd { - return &PathError{op, f.name, ErrClosed} - } - return nil -} diff --git a/src/os/file_plan9.go b/src/os/file_plan9.go index 5276a7ec541299c1b2abe677417e7639c0a33e55..f1f244a8d4e11e210ca329dacfe8598ea336fddc 100644 --- a/src/os/file_plan9.go +++ b/src/os/file_plan9.go @@ -244,14 +244,22 @@ func (f *File) Sync() error { // read reads up to len(b) bytes from the File. // It returns the number of bytes read and an error, if any. func (f *File) read(b []byte) (n int, err error) { - return fixCount(syscall.Read(f.fd, b)) + n, e := fixCount(syscall.Read(f.fd, b)) + if n == 0 && len(b) > 0 && e == nil { + return 0, io.EOF + } + return n, e } // pread reads len(b) bytes from the File starting at byte offset off. // It returns the number of bytes read and the error, if any. // EOF is signaled by a zero count with err set to nil. func (f *File) pread(b []byte, off int64) (n int, err error) { - return fixCount(syscall.Pread(f.fd, b, off)) + n, e := fixCount(syscall.Pread(f.fd, b, off)) + if n == 0 && len(b) > 0 && e == nil { + return 0, io.EOF + } + return n, e } // write writes len(b) bytes to the File. @@ -472,3 +480,28 @@ func (f *File) Chown(uid, gid int) error { func TempDir() string { return "/tmp" } + +// Chdir changes the current working directory to the file, +// which must be a directory. +// If there is an error, it will be of type *PathError. +func (f *File) Chdir() error { + if err := f.checkValid("chdir"); err != nil { + return err + } + if e := syscall.Fchdir(f.fd); e != nil { + return &PathError{"chdir", f.name, e} + } + return nil +} + +// checkValid checks whether f is valid for use. +// If not, it returns an appropriate error, perhaps incorporating the operation name op. +func (f *File) checkValid(op string) error { + if f == nil { + return ErrInvalid + } + if f.fd == badFd { + return &PathError{op, f.name, ErrClosed} + } + return nil +} diff --git a/src/os/file_posix.go b/src/os/file_posix.go index d817f34b1d010879d333d876a070e4805d7ad7c5..e38668684c80d4df12ba06129aecd18a6b745043 100644 --- a/src/os/file_posix.go +++ b/src/os/file_posix.go @@ -7,6 +7,7 @@ package os import ( + "runtime" "syscall" "time" ) @@ -60,9 +61,10 @@ func (f *File) Chmod(mode FileMode) error { if err := f.checkValid("chmod"); err != nil { return err } - if e := syscall.Fchmod(f.fd, syscallMode(mode)); e != nil { + if e := f.pfd.Fchmod(syscallMode(mode)); e != nil { return &PathError{"chmod", f.name, e} } + runtime.KeepAlive(f) return nil } @@ -92,9 +94,10 @@ func (f *File) Chown(uid, gid int) error { if err := f.checkValid("chown"); err != nil { return err } - if e := syscall.Fchown(f.fd, uid, gid); e != nil { + if e := f.pfd.Fchown(uid, gid); e != nil { return &PathError{"chown", f.name, e} } + runtime.KeepAlive(f) return nil } @@ -105,9 +108,10 @@ func (f *File) Truncate(size int64) error { if err := f.checkValid("truncate"); err != nil { return err } - if e := syscall.Ftruncate(f.fd, size); e != nil { + if e := f.pfd.Ftruncate(size); e != nil { return &PathError{"truncate", f.name, e} } + runtime.KeepAlive(f) return nil } @@ -118,9 +122,10 @@ func (f *File) Sync() error { if err := f.checkValid("sync"); err != nil { return err } - if e := syscall.Fsync(f.fd); e != nil { + if e := f.pfd.Fsync(); e != nil { return &PathError{"sync", f.name, e} } + runtime.KeepAlive(f) return nil } @@ -139,3 +144,29 @@ func Chtimes(name string, atime time.Time, mtime time.Time) error { } return nil } + +// Chdir changes the current working directory to the file, +// which must be a directory. +// If there is an error, it will be of type *PathError. +func (f *File) Chdir() error { + if err := f.checkValid("chdir"); err != nil { + return err + } + if e := f.pfd.Fchdir(); e != nil { + return &PathError{"chdir", f.name, e} + } + runtime.KeepAlive(f) + return nil +} + +// checkValid checks whether f is valid for use. +// If not, it returns an appropriate error, perhaps incorporating the operation name op. +func (f *File) checkValid(op string) error { + if f == nil { + return ErrInvalid + } + if f.pfd.Sysfd == badFd { + return &PathError{op, f.name, ErrClosed} + } + return nil +} diff --git a/src/os/file_unix.go b/src/os/file_unix.go index 1cff93a4d29c93ddce6ea2ccd1eb1d0cb73fc966..6e00f483930922b1e6c69986f39936f2e35104e8 100644 --- a/src/os/file_unix.go +++ b/src/os/file_unix.go @@ -7,6 +7,7 @@ package os import ( + "internal/poll" "runtime" "syscall" ) @@ -33,9 +34,10 @@ func rename(oldname, newname string) error { // can overwrite this data, which could cause the finalizer // to close the wrong file descriptor. type file struct { - fd int - name string - dirinfo *dirInfo // nil unless directory being read + pfd poll.FD + name string + dirinfo *dirInfo // nil unless directory being read + nonblock bool // whether we set nonblocking mode } // Fd returns the integer Unix file descriptor referencing the open file. @@ -44,16 +46,64 @@ func (f *File) Fd() uintptr { if f == nil { return ^(uintptr(0)) } - return uintptr(f.fd) + + // If we put the file descriptor into nonblocking mode, + // then set it to blocking mode before we return it, + // because historically we have always returned a descriptor + // opened in blocking mode. The File will continue to work, + // but any blocking operation will tie up a thread. + if f.nonblock { + syscall.SetNonblock(f.pfd.Sysfd, false) + } + + return uintptr(f.pfd.Sysfd) } // NewFile returns a new File with the given file descriptor and name. func NewFile(fd uintptr, name string) *File { + return newFile(fd, name, false) +} + +// newFile is like NewFile, but if pollable is true it tries to add the +// file to the runtime poller. +func newFile(fd uintptr, name string, pollable bool) *File { fdi := int(fd) if fdi < 0 { return nil } - f := &File{&file{fd: fdi, name: name}} + f := &File{&file{ + pfd: poll.FD{ + Sysfd: fdi, + IsStream: true, + ZeroReadIsEOF: true, + }, + name: name, + }} + + // Don't try to use kqueue with regular files on FreeBSD. + // It crashes the system unpredictably while running all.bash. + // Issue 19093. + if runtime.GOOS == "freebsd" { + pollable = false + } + + if pollable { + if err := f.pfd.Init(); err != nil { + // An error here indicates a failure to register + // with the netpoll system. That can happen for + // a file descriptor that is not supported by + // epoll/kqueue; for example, disk files on + // GNU/Linux systems. We assume that any real error + // will show up in later I/O. + } else { + // We successfully registered with netpoll, so put + // the file into nonblocking mode. + if err := syscall.SetNonblock(fdi, true); err == nil { + f.nonblock = true + } + } + } + runtime.SetFinalizer(f.file, (*file).close) return f } @@ -69,7 +119,7 @@ type dirInfo struct { // output or standard error. See the SIGPIPE docs in os/signal, and // issue 11845. func epipecheck(file *File, e error) { - if e == syscall.EPIPE && (file.fd == 1 || file.fd == 2) { + if e == syscall.EPIPE && (file.pfd.Sysfd == 1 || file.pfd.Sysfd == 2) { sigpipe() } } @@ -120,7 +170,7 @@ func OpenFile(name string, flag int, perm FileMode) (*File, error) { syscall.CloseOnExec(r) } - return NewFile(uintptr(r), name), nil + return newFile(uintptr(r), name, true), nil } // Close closes the File, rendering it unusable for I/O. @@ -133,83 +183,51 @@ func (f *File) Close() error { } func (file *file) close() error { - if file == nil || file.fd == badFd { + if file == nil || file.pfd.Sysfd == badFd { return syscall.EINVAL } var err error - if e := syscall.Close(file.fd); e != nil { + if e := file.pfd.Close(); e != nil { err = &PathError{"close", file.name, e} } - file.fd = -1 // so it can't be closed again + file.pfd.Sysfd = badFd // so it can't be closed again // no need for a finalizer anymore runtime.SetFinalizer(file, nil) return err } -// Darwin and FreeBSD can't read or write 2GB+ at a time, -// even on 64-bit systems. See golang.org/issue/7812. -// Use 1GB instead of, say, 2GB-1, to keep subsequent -// reads aligned. -const ( - needsMaxRW = runtime.GOOS == "darwin" || runtime.GOOS == "freebsd" - maxRW = 1 << 30 -) - // read reads up to len(b) bytes from the File. // It returns the number of bytes read and an error, if any. func (f *File) read(b []byte) (n int, err error) { - if needsMaxRW && len(b) > maxRW { - b = b[:maxRW] - } - return fixCount(syscall.Read(f.fd, b)) + n, err = f.pfd.Read(b) + runtime.KeepAlive(f) + return n, err } // pread reads len(b) bytes from the File starting at byte offset off. // It returns the number of bytes read and the error, if any. // EOF is signaled by a zero count with err set to nil. func (f *File) pread(b []byte, off int64) (n int, err error) { - if needsMaxRW && len(b) > maxRW { - b = b[:maxRW] - } - return fixCount(syscall.Pread(f.fd, b, off)) + n, err = f.pfd.Pread(b, off) + runtime.KeepAlive(f) + return n, err } // write writes len(b) bytes to the File. // It returns the number of bytes written and an error, if any. func (f *File) write(b []byte) (n int, err error) { - for { - bcap := b - if needsMaxRW && len(bcap) > maxRW { - bcap = bcap[:maxRW] - } - m, err := fixCount(syscall.Write(f.fd, bcap)) - n += m - - // If the syscall wrote some data but not all (short write) - // or it returned EINTR, then assume it stopped early for - // reasons that are uninteresting to the caller, and try again. - if 0 < m && m < len(bcap) || err == syscall.EINTR { - b = b[m:] - continue - } - - if needsMaxRW && len(bcap) != len(b) && err == nil { - b = b[m:] - continue - } - - return n, err - } + n, err = f.pfd.Write(b) + runtime.KeepAlive(f) + return n, err } // pwrite writes len(b) bytes to the File starting at byte offset off. // It returns the number of bytes written and an error, if any. func (f *File) pwrite(b []byte, off int64) (n int, err error) { - if needsMaxRW && len(b) > maxRW { - b = b[:maxRW] - } - return fixCount(syscall.Pwrite(f.fd, b, off)) + n, err = f.pfd.Pwrite(b, off) + runtime.KeepAlive(f) + return n, err } // seek sets the offset for the next Read or Write on file to offset, interpreted @@ -217,7 +235,9 @@ func (f *File) pwrite(b []byte, off int64) (n int, err error) { // relative to the current offset, and 2 means relative to the end. // It returns the new offset and an error, if any. func (f *File) seek(offset int64, whence int) (ret int64, err error) { - return syscall.Seek(f.fd, offset, whence) + ret, err = f.pfd.Seek(offset, whence) + runtime.KeepAlive(f) + return ret, err } // Truncate changes the size of the named file. diff --git a/src/os/file_windows.go b/src/os/file_windows.go index 97be324faedd2a30ac34d3c3fe055537a5bd44fb..b7d4275d17e259f9b90d1e3a4997f9e8f3c17669 100644 --- a/src/os/file_windows.go +++ b/src/os/file_windows.go @@ -5,13 +5,11 @@ package os import ( + "internal/poll" "internal/syscall/windows" - "io" "runtime" - "sync" "syscall" "unicode/utf16" - "unicode/utf8" "unsafe" ) @@ -20,17 +18,9 @@ import ( // can overwrite this data, which could cause the finalizer // to close the wrong file descriptor. type file struct { - fd syscall.Handle + pfd poll.FD name string - dirinfo *dirInfo // nil unless directory being read - l sync.Mutex // used to implement windows pread/pwrite - - // only for console io - isConsole bool - lastbits []byte // first few bytes of the last incomplete rune in last write - readuint16 []uint16 // buffer to hold uint16s obtained with ReadConsole - readbyte []byte // buffer to hold decoding of readuint16 from utf16 to utf8 - readbyteOffset int // readbyte[readOffset:] is yet to be consumed with file.Read + dirinfo *dirInfo // nil unless directory being read } // Fd returns the Windows handle referencing the open file. @@ -39,22 +29,39 @@ func (file *File) Fd() uintptr { if file == nil { return uintptr(syscall.InvalidHandle) } - return uintptr(file.fd) + return uintptr(file.pfd.Sysfd) } // newFile returns a new File with the given file handle and name. // Unlike NewFile, it does not check that h is syscall.InvalidHandle. -func newFile(h syscall.Handle, name string) *File { - f := &File{&file{fd: h, name: name}} +func newFile(h syscall.Handle, name string, kind string) *File { + if kind == "file" { + var m uint32 + if syscall.GetConsoleMode(h, &m) == nil { + kind = "console" + } + } + + f := &File{&file{ + pfd: poll.FD{ + Sysfd: h, + IsStream: true, + ZeroReadIsEOF: true, + }, + name: name, + }} runtime.SetFinalizer(f.file, (*file).close) + + // Ignore initialization errors. + // Assume any problems will show up in later I/O. + f.pfd.Init(kind) + return f } // newConsoleFile creates new File that will be used as console. func newConsoleFile(h syscall.Handle, name string) *File { - f := newFile(h, name) - f.isConsole = true - return f + return newFile(h, name, "console") } // NewFile returns a new File with the given file descriptor and name. @@ -63,11 +70,7 @@ func NewFile(fd uintptr, name string) *File { if h == syscall.InvalidHandle { return nil } - var m uint32 - if syscall.GetConsoleMode(h, &m) == nil { - return newConsoleFile(h, name) - } - return newFile(h, name) + return newFile(h, name, "file") } // Auxiliary information if the File describes a directory @@ -90,7 +93,7 @@ func openFile(name string, flag int, perm FileMode) (file *File, err error) { if e != nil { return nil, e } - return NewFile(uintptr(r), name), nil + return newFile(r, name, "file"), nil } func openDir(name string) (file *File, err error) { @@ -137,7 +140,7 @@ func openDir(name string) (file *File, err error) { return nil, e } } - f := newFile(r, name) + f := newFile(r, name, "dir") f.dirinfo = d return f, nil } @@ -176,220 +179,55 @@ func (file *File) Close() error { } func (file *file) close() error { - if file == nil { + if file == nil || file.pfd.Sysfd == badFd { return syscall.EINVAL } if file.isdir() && file.dirinfo.isempty { // "special" empty directories return nil } - if file.fd == syscall.InvalidHandle { - return syscall.EINVAL - } - var e error - if file.isdir() { - e = syscall.FindClose(file.fd) - } else { - e = syscall.CloseHandle(file.fd) - } var err error - if e != nil { + if e := file.pfd.Close(); e != nil { err = &PathError{"close", file.name, e} } - file.fd = badFd // so it can't be closed again + file.pfd.Sysfd = badFd // so it can't be closed again // no need for a finalizer anymore runtime.SetFinalizer(file, nil) return err } -var readConsole = syscall.ReadConsole // changed for testing - -// readConsole reads utf16 characters from console File, -// encodes them into utf8 and stores them in buffer b. -// It returns the number of utf8 bytes read and an error, if any. -func (f *File) readConsole(b []byte) (n int, err error) { - if len(b) == 0 { - return 0, nil - } - - if f.readuint16 == nil { - // Note: syscall.ReadConsole fails for very large buffers. - // The limit is somewhere around (but not exactly) 16384. - // Stay well below. - f.readuint16 = make([]uint16, 0, 10000) - f.readbyte = make([]byte, 0, 4*cap(f.readuint16)) - } - - for f.readbyteOffset >= len(f.readbyte) { - n := cap(f.readuint16) - len(f.readuint16) - if n > len(b) { - n = len(b) - } - var nw uint32 - err := readConsole(f.fd, &f.readuint16[:len(f.readuint16)+1][len(f.readuint16)], uint32(n), &nw, nil) - if err != nil { - return 0, err - } - uint16s := f.readuint16[:len(f.readuint16)+int(nw)] - f.readuint16 = f.readuint16[:0] - buf := f.readbyte[:0] - for i := 0; i < len(uint16s); i++ { - r := rune(uint16s[i]) - if utf16.IsSurrogate(r) { - if i+1 == len(uint16s) { - if nw > 0 { - // Save half surrogate pair for next time. - f.readuint16 = f.readuint16[:1] - f.readuint16[0] = uint16(r) - break - } - r = utf8.RuneError - } else { - r = utf16.DecodeRune(r, rune(uint16s[i+1])) - if r != utf8.RuneError { - i++ - } - } - } - n := utf8.EncodeRune(buf[len(buf):cap(buf)], r) - buf = buf[:len(buf)+n] - } - f.readbyte = buf - f.readbyteOffset = 0 - if nw == 0 { - break - } - } - - src := f.readbyte[f.readbyteOffset:] - var i int - for i = 0; i < len(src) && i < len(b); i++ { - x := src[i] - if x == 0x1A { // Ctrl-Z - if i == 0 { - f.readbyteOffset++ - } - break - } - b[i] = x - } - f.readbyteOffset += i - return i, nil -} - // read reads up to len(b) bytes from the File. // It returns the number of bytes read and an error, if any. func (f *File) read(b []byte) (n int, err error) { - f.l.Lock() - defer f.l.Unlock() - if f.isConsole { - return f.readConsole(b) - } - return fixCount(syscall.Read(f.fd, b)) + n, err = f.pfd.Read(b) + runtime.KeepAlive(f) + return n, err } // pread reads len(b) bytes from the File starting at byte offset off. // It returns the number of bytes read and the error, if any. // EOF is signaled by a zero count with err set to 0. func (f *File) pread(b []byte, off int64) (n int, err error) { - f.l.Lock() - defer f.l.Unlock() - curoffset, e := syscall.Seek(f.fd, 0, io.SeekCurrent) - if e != nil { - return 0, e - } - defer syscall.Seek(f.fd, curoffset, io.SeekStart) - o := syscall.Overlapped{ - OffsetHigh: uint32(off >> 32), - Offset: uint32(off), - } - var done uint32 - e = syscall.ReadFile(f.fd, b, &done, &o) - if e != nil { - if e == syscall.ERROR_HANDLE_EOF { - // end of file - return 0, nil - } - return 0, e - } - return int(done), nil -} - -// writeConsole writes len(b) bytes to the console File. -// It returns the number of bytes written and an error, if any. -func (f *File) writeConsole(b []byte) (n int, err error) { - n = len(b) - runes := make([]rune, 0, 256) - if len(f.lastbits) > 0 { - b = append(f.lastbits, b...) - f.lastbits = nil - - } - for len(b) >= utf8.UTFMax || utf8.FullRune(b) { - r, l := utf8.DecodeRune(b) - runes = append(runes, r) - b = b[l:] - } - if len(b) > 0 { - f.lastbits = make([]byte, len(b)) - copy(f.lastbits, b) - } - // syscall.WriteConsole seems to fail, if given large buffer. - // So limit the buffer to 16000 characters. This number was - // discovered by experimenting with syscall.WriteConsole. - const maxWrite = 16000 - for len(runes) > 0 { - m := len(runes) - if m > maxWrite { - m = maxWrite - } - chunk := runes[:m] - runes = runes[m:] - uint16s := utf16.Encode(chunk) - for len(uint16s) > 0 { - var written uint32 - err = syscall.WriteConsole(f.fd, &uint16s[0], uint32(len(uint16s)), &written, nil) - if err != nil { - return 0, nil - } - uint16s = uint16s[written:] - } - } - return n, nil + n, err = f.pfd.Pread(b, off) + runtime.KeepAlive(f) + return n, err } // write writes len(b) bytes to the File. // It returns the number of bytes written and an error, if any. func (f *File) write(b []byte) (n int, err error) { - f.l.Lock() - defer f.l.Unlock() - if f.isConsole { - return f.writeConsole(b) - } - return fixCount(syscall.Write(f.fd, b)) + n, err = f.pfd.Write(b) + runtime.KeepAlive(f) + return n, err } // pwrite writes len(b) bytes to the File starting at byte offset off. // It returns the number of bytes written and an error, if any. func (f *File) pwrite(b []byte, off int64) (n int, err error) { - f.l.Lock() - defer f.l.Unlock() - curoffset, e := syscall.Seek(f.fd, 0, io.SeekCurrent) - if e != nil { - return 0, e - } - defer syscall.Seek(f.fd, curoffset, io.SeekStart) - o := syscall.Overlapped{ - OffsetHigh: uint32(off >> 32), - Offset: uint32(off), - } - var done uint32 - e = syscall.WriteFile(f.fd, b, &done, &o) - if e != nil { - return 0, e - } - return int(done), nil + n, err = f.pfd.Pwrite(b, off) + runtime.KeepAlive(f) + return n, err } // seek sets the offset for the next Read or Write on file to offset, interpreted @@ -397,9 +235,9 @@ func (f *File) pwrite(b []byte, off int64) (n int, err error) { // relative to the current offset, and 2 means relative to the end. // It returns the new offset and an error, if any. func (f *File) seek(offset int64, whence int) (ret int64, err error) { - f.l.Lock() - defer f.l.Unlock() - return syscall.Seek(f.fd, offset, whence) + ret, err = f.pfd.Seek(offset, whence) + runtime.KeepAlive(f) + return ret, err } // Truncate changes the size of the named file. @@ -480,7 +318,7 @@ func Pipe() (r *File, w *File, err error) { syscall.CloseOnExec(p[1]) syscall.ForkLock.RUnlock() - return NewFile(uintptr(p[0]), "|0"), NewFile(uintptr(p[1]), "|1"), nil + return newFile(p[0], "|0", "file"), newFile(p[1], "|1", "file"), nil } // TempDir returns the default directory to use for temporary files. diff --git a/src/os/os_test.go b/src/os/os_test.go index 9d74070e7f30d2685a412b1211632326e50cb76d..5657693ffddb9201df61319a3cb68ca6be7361dc 100644 --- a/src/os/os_test.go +++ b/src/os/os_test.go @@ -17,6 +17,7 @@ import ( "path/filepath" "reflect" "runtime" + "runtime/debug" "sort" "strings" "sync" @@ -112,7 +113,7 @@ func size(name string, t *testing.T) int64 { break } if e != nil { - t.Fatal("read failed:", err) + t.Fatal("read failed:", e) } } return int64(len) @@ -1940,3 +1941,75 @@ func TestRemoveAllRace(t *testing.T) { close(hold) // let workers race to remove root wg.Wait() } + +// Test that reading from a pipe doesn't use up a thread. +func TestPipeThreads(t *testing.T) { + switch runtime.GOOS { + case "freebsd": + t.Skip("skipping on FreeBSD; issue 19093") + case "windows": + t.Skip("skipping on Windows; issue 19098") + } + + threads := 100 + + // OpenBSD has a low default for max number of files. + if runtime.GOOS == "openbsd" { + threads = 50 + } + + r := make([]*File, threads) + w := make([]*File, threads) + for i := 0; i < threads; i++ { + rp, wp, err := Pipe() + if err != nil { + for j := 0; j < i; j++ { + r[j].Close() + w[j].Close() + } + t.Fatal(err) + } + r[i] = rp + w[i] = wp + } + + defer debug.SetMaxThreads(debug.SetMaxThreads(threads / 2)) + + var wg sync.WaitGroup + wg.Add(threads) + c := make(chan bool, threads) + for i := 0; i < threads; i++ { + go func(i int) { + defer wg.Done() + var b [1]byte + c <- true + if _, err := r[i].Read(b[:]); err != nil { + t.Error(err) + } + }(i) + } + + for i := 0; i < threads; i++ { + <-c + } + + // If we are still alive, it means that the 100 goroutines did + // not require 100 threads. + + for i := 0; i < threads; i++ { + if _, err := w[i].Write([]byte{0}); err != nil { + t.Error(err) + } + if err := w[i].Close(); err != nil { + t.Error(err) + } + } + + wg.Wait() + + for i := 0; i < threads; i++ { + if err := r[i].Close(); err != nil { + t.Error(err) + } + } +} diff --git a/src/os/os_windows_test.go b/src/os/os_windows_test.go index 54ba99bf8849ead0a01e7efcc633ba397f7259a9..761931e9e98b906e6481a894914f45ba855ac648 100644 --- a/src/os/os_windows_test.go +++ b/src/os/os_windows_test.go @@ -6,6 +6,7 @@ package os_test import ( "fmt" + "internal/poll" "internal/syscall/windows" "internal/testenv" "io" @@ -643,9 +644,9 @@ func TestStatSymlinkLoop(t *testing.T) { } func TestReadStdin(t *testing.T) { - old := *os.ReadConsoleFunc + old := poll.ReadConsole defer func() { - *os.ReadConsoleFunc = old + poll.ReadConsole = old }() testConsole := os.NewConsoleFile(syscall.Stdin, "test") @@ -664,7 +665,7 @@ func TestReadStdin(t *testing.T) { for _, s := range tests { t.Run(fmt.Sprintf("c%d/r%d/%s", consoleSize, readSize, s), func(t *testing.T) { s16 := utf16.Encode([]rune(s)) - *os.ReadConsoleFunc = func(h syscall.Handle, buf *uint16, toread uint32, read *uint32, inputControl *byte) error { + poll.ReadConsole = func(h syscall.Handle, buf *uint16, toread uint32, read *uint32, inputControl *byte) error { if inputControl != nil { t.Fatalf("inputControl not nil") } diff --git a/src/os/pipe_bsd.go b/src/os/pipe_bsd.go index 3b81ed20f1b72df2d0de968721a92909c187c3e0..58cafcc999dd0b0e9e4048375b583396bf8df6d5 100644 --- a/src/os/pipe_bsd.go +++ b/src/os/pipe_bsd.go @@ -24,5 +24,5 @@ func Pipe() (r *File, w *File, err error) { syscall.CloseOnExec(p[1]) syscall.ForkLock.RUnlock() - return NewFile(uintptr(p[0]), "|0"), NewFile(uintptr(p[1]), "|1"), nil + return newFile(uintptr(p[0]), "|0", true), newFile(uintptr(p[1]), "|1", true), nil } diff --git a/src/os/pipe_linux.go b/src/os/pipe_linux.go index 9bafad84f9fd2d1f0781fd0c110969e546e1a6b7..96f2ce900cbd91108f26a4038669af198d818b8f 100644 --- a/src/os/pipe_linux.go +++ b/src/os/pipe_linux.go @@ -29,5 +29,5 @@ func Pipe() (r *File, w *File, err error) { return nil, nil, NewSyscallError("pipe2", e) } - return NewFile(uintptr(p[0]), "|0"), NewFile(uintptr(p[1]), "|1"), nil + return newFile(uintptr(p[0]), "|0", true), newFile(uintptr(p[1]), "|1", true), nil } diff --git a/src/os/stat_unix.go b/src/os/stat_unix.go index 1733d3f1327d0235e3bd974ab7b5b4d906dc46a9..1dd5714f7915400f56c78418af62a9a9252e0200 100644 --- a/src/os/stat_unix.go +++ b/src/os/stat_unix.go @@ -17,7 +17,7 @@ func (f *File) Stat() (FileInfo, error) { return nil, ErrInvalid } var fs fileStat - err := syscall.Fstat(f.fd, &fs.sys) + err := f.pfd.Fstat(&fs.sys) if err != nil { return nil, &PathError{"stat", f.name, err} } diff --git a/src/os/stat_windows.go b/src/os/stat_windows.go index c8379381b1870fb5bcc891d743516c6622114ae7..0b8132f5c86c93d1ea5b8d32ee503794bcbe1a8b 100644 --- a/src/os/stat_windows.go +++ b/src/os/stat_windows.go @@ -16,7 +16,7 @@ func (file *File) Stat() (FileInfo, error) { if file == nil { return nil, ErrInvalid } - if file == nil || file.fd < 0 { + if file == nil || file.pfd.Sysfd < 0 { return nil, syscall.EINVAL } if file.isdir() { @@ -27,7 +27,7 @@ func (file *File) Stat() (FileInfo, error) { return &devNullStat, nil } - ft, err := syscall.GetFileType(file.fd) + ft, err := file.pfd.GetFileType() if err != nil { return nil, &PathError{"GetFileType", file.name, err} } @@ -37,7 +37,7 @@ func (file *File) Stat() (FileInfo, error) { } var d syscall.ByHandleFileInformation - err = syscall.GetFileInformationByHandle(file.fd, &d) + err = file.pfd.GetFileInformationByHandle(&d) if err != nil { return nil, &PathError{"GetFileInformationByHandle", file.name, err} } diff --git a/src/runtime/netpoll.go b/src/runtime/netpoll.go index ac8d071045828674ed8e603562136b1fef8eb54f..56fb286c3c18dbfa88a6b4e3f5e5c0b1f6138510 100644 --- a/src/runtime/netpoll.go +++ b/src/runtime/netpoll.go @@ -77,8 +77,9 @@ type pollCache struct { } var ( - netpollInited uint32 - pollcache pollCache + netpollInited uint32 + pollcache pollCache + netpollWaiters uint32 ) //go:linkname poll_runtime_pollServerInit internal/poll.runtime_pollServerInit @@ -91,6 +92,14 @@ func netpollinited() bool { return atomic.Load(&netpollInited) != 0 } +//go:linkname poll_runtime_pollServerDescriptor internal/poll.runtime_pollServerDescriptor + +// poll_runtime_pollServerDescriptor returns the descriptor being used, +// or ^uintptr(0) if the system does not use a poll descriptor. +func poll_runtime_pollServerDescriptor() uintptr { + return netpolldescriptor() +} + //go:linkname poll_runtime_pollOpen internal/poll.runtime_pollOpen func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) { pd := pollcache.alloc() @@ -244,10 +253,10 @@ func poll_runtime_pollSetDeadline(pd *pollDesc, d int64, mode int) { } unlock(&pd.lock) if rg != nil { - goready(rg, 3) + netpollgoready(rg, 3) } if wg != nil { - goready(wg, 3) + netpollgoready(wg, 3) } } @@ -273,10 +282,10 @@ func poll_runtime_pollUnblock(pd *pollDesc) { } unlock(&pd.lock) if rg != nil { - goready(rg, 3) + netpollgoready(rg, 3) } if wg != nil { - goready(wg, 3) + netpollgoready(wg, 3) } } @@ -312,7 +321,19 @@ func netpollcheckerr(pd *pollDesc, mode int32) int { } func netpollblockcommit(gp *g, gpp unsafe.Pointer) bool { - return atomic.Casuintptr((*uintptr)(gpp), pdWait, uintptr(unsafe.Pointer(gp))) + r := atomic.Casuintptr((*uintptr)(gpp), pdWait, uintptr(unsafe.Pointer(gp))) + if r { + // Bump the count of goroutines waiting for the poller. + // The scheduler uses this to decide whether to block + // waiting for the poller if there is nothing else to do. + atomic.Xadd(&netpollWaiters, 1) + } + return r +} + +func netpollgoready(gp *g, traceskip int) { + atomic.Xadd(&netpollWaiters, -1) + goready(gp, traceskip+1) } // returns true if IO is ready, or false if timedout or closed @@ -410,10 +431,10 @@ func netpolldeadlineimpl(pd *pollDesc, seq uintptr, read, write bool) { } unlock(&pd.lock) if rg != nil { - goready(rg, 0) + netpollgoready(rg, 0) } if wg != nil { - goready(wg, 0) + netpollgoready(wg, 0) } } diff --git a/src/runtime/netpoll_epoll.go b/src/runtime/netpoll_epoll.go index e06eff83be1e2744cb9252e882ddbc0342e7de7e..63f943bc6a13d57957bb382be86c3add6a4614ac 100644 --- a/src/runtime/netpoll_epoll.go +++ b/src/runtime/netpoll_epoll.go @@ -36,6 +36,10 @@ func netpollinit() { throw("netpollinit: failed to create descriptor") } +func netpolldescriptor() uintptr { + return uintptr(epfd) +} + func netpollopen(fd uintptr, pd *pollDesc) int32 { var ev epollevent ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET diff --git a/src/runtime/netpoll_kqueue.go b/src/runtime/netpoll_kqueue.go index 337377a95b09d8c31e268b6bf0ae9fb3d345d115..5adf19ca0994034ed9a3978313b8329e07926a63 100644 --- a/src/runtime/netpoll_kqueue.go +++ b/src/runtime/netpoll_kqueue.go @@ -29,6 +29,10 @@ func netpollinit() { closeonexec(kq) } +func netpolldescriptor() uintptr { + return uintptr(kq) +} + func netpollopen(fd uintptr, pd *pollDesc) int32 { // Arm both EVFILT_READ and EVFILT_WRITE in edge-triggered mode (EV_CLEAR) // for the whole fd lifetime. The notifications are automatically unregistered diff --git a/src/runtime/netpoll_nacl.go b/src/runtime/netpoll_nacl.go index 5cbc30032141482703da4d653c5a353eed69ec30..dc5a55ec8469822a9c51d8a3d466f616a041cd48 100644 --- a/src/runtime/netpoll_nacl.go +++ b/src/runtime/netpoll_nacl.go @@ -10,6 +10,10 @@ package runtime func netpollinit() { } +func netpolldescriptor() uintptr { + return ^uintptr(0) +} + func netpollopen(fd uintptr, pd *pollDesc) int32 { return 0 } diff --git a/src/runtime/netpoll_solaris.go b/src/runtime/netpoll_solaris.go index 53b2aacdb57746d75ad5db866c58bd94d84f4389..a19bd16fd2099fff4de282af3c62420d89628d7a 100644 --- a/src/runtime/netpoll_solaris.go +++ b/src/runtime/netpoll_solaris.go @@ -121,6 +121,10 @@ func netpollinit() { throw("netpollinit: failed to create port") } +func netpolldescriptor() uintptr { + return uintptr(portfd) +} + func netpollopen(fd uintptr, pd *pollDesc) int32 { lock(&pd.lock) // We don't register for any specific type of events yet, that's diff --git a/src/runtime/netpoll_stub.go b/src/runtime/netpoll_stub.go index 09f64ad9b5b4d920f623b2728e8157824da3f109..a4d6b4608ac63916fd7286dc86cf47c1d20dd466 100644 --- a/src/runtime/netpoll_stub.go +++ b/src/runtime/netpoll_stub.go @@ -6,6 +6,8 @@ package runtime +var netpollWaiters uint32 + // Polls for ready network connections. // Returns list of goroutines that become runnable. func netpoll(block bool) (gp *g) { diff --git a/src/runtime/netpoll_windows.go b/src/runtime/netpoll_windows.go index 32c120c4c30e1c8ed8acc2539acafd75848606e9..d714d0ac6e5fd9c7bbd062d425f153b8cfbea676 100644 --- a/src/runtime/netpoll_windows.go +++ b/src/runtime/netpoll_windows.go @@ -41,6 +41,10 @@ func netpollinit() { } } +func netpolldescriptor() uintptr { + return iocphandle +} + func netpollopen(fd uintptr, pd *pollDesc) int32 { if stdcall4(_CreateIoCompletionPort, fd, iocphandle, 0, 0) == 0 { return -int32(getlasterror()) diff --git a/src/runtime/proc.go b/src/runtime/proc.go index 5fc7d2539097daae725c82d13699bf952d9a32a7..6562eaa8a0898ec1b0d49b6837f2d5ecabea9757 100644 --- a/src/runtime/proc.go +++ b/src/runtime/proc.go @@ -2060,7 +2060,7 @@ stop: } // poll network - if netpollinited() && atomic.Xchg64(&sched.lastpoll, 0) != 0 { + if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Xchg64(&sched.lastpoll, 0) != 0 { if _g_.m.p != 0 { throw("findrunnable: netpoll with p") } @@ -2101,7 +2101,7 @@ func pollWork() bool { if !runqempty(p) { return true } - if netpollinited() && sched.lastpoll != 0 { + if netpollinited() && atomic.Load(&netpollWaiters) > 0 && sched.lastpoll != 0 { if gp := netpoll(false); gp != nil { injectglist(gp) return true diff --git a/src/runtime/trace/trace_stack_test.go b/src/runtime/trace/trace_stack_test.go index d6a3858b913376d42b91349c87e4bcd0debc27cc..fed6bad3a09d00b9c46f48f0a2e1305c40fafcb9 100644 --- a/src/runtime/trace/trace_stack_test.go +++ b/src/runtime/trace/trace_stack_test.go @@ -240,6 +240,7 @@ func TestTraceSymbolize(t *testing.T) { {trace.EvGoSysCall, []frame{ {"syscall.read", 0}, {"syscall.Read", 0}, + {"internal/poll.(*FD).Read", 0}, {"os.(*File).read", 0}, {"os.(*File).Read", 0}, {"runtime/trace_test.TestTraceSymbolize.func11", 102},