// Copyright 2017 The Go Authors. All rights reserved. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. // +build aix darwin dragonfly freebsd linux nacl netbsd openbsd solaris package poll import ( "io" "runtime" "syscall" ) // FD is a file descriptor. The net and os packages use this type as a // field of a larger type representing a network connection or OS file. type FD struct { // Lock sysfd and serialize access to Read and Write methods. fdmu fdMutex // System file descriptor. Immutable until Close. Sysfd int // I/O poller. pd pollDesc // Writev cache. iovecs *[]syscall.Iovec // Semaphore signaled when file is closed. csema uint32 // Whether this is a streaming descriptor, as opposed to a // packet-based descriptor like a UDP socket. Immutable. IsStream bool // Whether a zero byte read indicates EOF. This is false for a // message based socket connection. ZeroReadIsEOF bool // Whether this is a file rather than a network socket. isFile bool // Whether this file has been set to blocking mode. isBlocking bool } // Init initializes the FD. The Sysfd field should already be set. // This can be called multiple times on a single FD. // The net argument is a network name from the net package (e.g., "tcp"), // or "file". // Set pollable to true if fd should be managed by runtime netpoll. func (fd *FD) Init(net string, pollable bool) error { // We don't actually care about the various network types. if net == "file" { fd.isFile = true } if !pollable { fd.isBlocking = true return nil } return fd.pd.init(fd) } // Destroy closes the file descriptor. This is called when there are // no remaining references. func (fd *FD) destroy() error { // Poller may want to unregister fd in readiness notification mechanism, // so this must be executed before CloseFunc. fd.pd.close() err := CloseFunc(fd.Sysfd) fd.Sysfd = -1 runtime_Semrelease(&fd.csema) return err } // Close closes the FD. The underlying file descriptor is closed by the // destroy method when there are no remaining references. func (fd *FD) Close() error { if !fd.fdmu.increfAndClose() { return errClosing(fd.isFile) } // Unblock any I/O. Once it all unblocks and returns, // so that it cannot be referring to fd.sysfd anymore, // the final decref will close fd.sysfd. This should happen // fairly quickly, since all the I/O is non-blocking, and any // attempts to block in the pollDesc will return errClosing(fd.isFile). fd.pd.evict() // The call to decref will call destroy if there are no other // references. err := fd.decref() // Wait until the descriptor is closed. If this was the only // reference, it is already closed. Only wait if the file has // not been set to blocking mode, as otherwise any current I/O // may be blocking, and that would block the Close. if !fd.isBlocking { runtime_Semacquire(&fd.csema) } return err } // Shutdown wraps the shutdown network call. func (fd *FD) Shutdown(how int) error { if err := fd.incref(); err != nil { return err } defer fd.decref() return syscall.Shutdown(fd.Sysfd, how) } // SetBlocking puts the file into blocking mode. func (fd *FD) SetBlocking() error { if err := fd.incref(); err != nil { return err } defer fd.decref() fd.isBlocking = true return syscall.SetNonblock(fd.Sysfd, false) } // Darwin and FreeBSD can't read or write 2GB+ files at a time, // even on 64-bit systems. // The same is true of socket implementations on many systems. // See golang.org/issue/7812 and golang.org/issue/16266. // Use 1GB instead of, say, 2GB-1, to keep subsequent reads aligned. const maxRW = 1 << 30 // Read implements io.Reader. func (fd *FD) Read(p []byte) (int, error) { if err := fd.readLock(); err != nil { return 0, err } defer fd.readUnlock() if len(p) == 0 { // If the caller wanted a zero byte read, return immediately // without trying (but after acquiring the readLock). // Otherwise syscall.Read returns 0, nil which looks like // io.EOF. // TODO(bradfitz): make it wait for readability? (Issue 15735) return 0, nil } if err := fd.pd.prepareRead(fd.isFile); err != nil { return 0, err } if fd.IsStream && len(p) > maxRW { p = p[:maxRW] } for { n, err := syscall.Read(fd.Sysfd, p) if err != nil { n = 0 if err == syscall.EAGAIN && fd.pd.pollable() { if err = fd.pd.waitRead(fd.isFile); err == nil { continue } } // On MacOS we can see EINTR here if the user // pressed ^Z. See issue #22838. if runtime.GOOS == "darwin" && err == syscall.EINTR { continue } } err = fd.eofError(n, err) return n, err } } // Pread wraps the pread system call. func (fd *FD) Pread(p []byte, off int64) (int, error) { // Call incref, not readLock, because since pread specifies the // offset it is independent from other reads. // Similarly, using the poller doesn't make sense for pread. if err := fd.incref(); err != nil { return 0, err } if fd.IsStream && len(p) > maxRW { p = p[:maxRW] } n, err := syscall.Pread(fd.Sysfd, p, off) if err != nil { n = 0 } fd.decref() err = fd.eofError(n, err) return n, err } // ReadFrom wraps the recvfrom network call. func (fd *FD) ReadFrom(p []byte) (int, syscall.Sockaddr, error) { if err := fd.readLock(); err != nil { return 0, nil, err } defer fd.readUnlock() if err := fd.pd.prepareRead(fd.isFile); err != nil { return 0, nil, err } for { n, sa, err := syscall.Recvfrom(fd.Sysfd, p, 0) if err != nil { n = 0 if err == syscall.EAGAIN && fd.pd.pollable() { if err = fd.pd.waitRead(fd.isFile); err == nil { continue } } } err = fd.eofError(n, err) return n, sa, err } } // ReadMsg wraps the recvmsg network call. func (fd *FD) ReadMsg(p []byte, oob []byte) (int, int, int, syscall.Sockaddr, error) { if err := fd.readLock(); err != nil { return 0, 0, 0, nil, err } defer fd.readUnlock() if err := fd.pd.prepareRead(fd.isFile); err != nil { return 0, 0, 0, nil, err } for { n, oobn, flags, sa, err := syscall.Recvmsg(fd.Sysfd, p, oob, 0) if err != nil { // TODO(dfc) should n and oobn be set to 0 if err == syscall.EAGAIN && fd.pd.pollable() { if err = fd.pd.waitRead(fd.isFile); err == nil { continue } } } err = fd.eofError(n, err) return n, oobn, flags, sa, err } } // Write implements io.Writer. func (fd *FD) Write(p []byte) (int, error) { if err := fd.writeLock(); err != nil { return 0, err } defer fd.writeUnlock() if err := fd.pd.prepareWrite(fd.isFile); err != nil { return 0, err } var nn int for { max := len(p) if fd.IsStream && max-nn > maxRW { max = nn + maxRW } n, err := syscall.Write(fd.Sysfd, p[nn:max]) if n > 0 { nn += n } if nn == len(p) { return nn, err } if err == syscall.EAGAIN && fd.pd.pollable() { if err = fd.pd.waitWrite(fd.isFile); err == nil { continue } } if err != nil { return nn, err } if n == 0 { return nn, io.ErrUnexpectedEOF } } } // Pwrite wraps the pwrite system call. func (fd *FD) Pwrite(p []byte, off int64) (int, error) { // Call incref, not writeLock, because since pwrite specifies the // offset it is independent from other writes. // Similarly, using the poller doesn't make sense for pwrite. if err := fd.incref(); err != nil { return 0, err } defer fd.decref() var nn int for { max := len(p) if fd.IsStream && max-nn > maxRW { max = nn + maxRW } n, err := syscall.Pwrite(fd.Sysfd, p[nn:max], off+int64(nn)) if n > 0 { nn += n } if nn == len(p) { return nn, err } if err != nil { return nn, err } if n == 0 { return nn, io.ErrUnexpectedEOF } } } // WriteTo wraps the sendto network call. func (fd *FD) WriteTo(p []byte, sa syscall.Sockaddr) (int, error) { if err := fd.writeLock(); err != nil { return 0, err } defer fd.writeUnlock() if err := fd.pd.prepareWrite(fd.isFile); err != nil { return 0, err } for { err := syscall.Sendto(fd.Sysfd, p, 0, sa) if err == syscall.EAGAIN && fd.pd.pollable() { if err = fd.pd.waitWrite(fd.isFile); err == nil { continue } } if err != nil { return 0, err } return len(p), nil } } // WriteMsg wraps the sendmsg network call. func (fd *FD) WriteMsg(p []byte, oob []byte, sa syscall.Sockaddr) (int, int, error) { if err := fd.writeLock(); err != nil { return 0, 0, err } defer fd.writeUnlock() if err := fd.pd.prepareWrite(fd.isFile); err != nil { return 0, 0, err } for { n, err := syscall.SendmsgN(fd.Sysfd, p, oob, sa, 0) if err == syscall.EAGAIN && fd.pd.pollable() { if err = fd.pd.waitWrite(fd.isFile); err == nil { continue } } if err != nil { return n, 0, err } return n, len(oob), err } } // Accept wraps the accept network call. func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) { if err := fd.readLock(); err != nil { return -1, nil, "", err } defer fd.readUnlock() if err := fd.pd.prepareRead(fd.isFile); err != nil { return -1, nil, "", err } for { s, rsa, errcall, err := accept(fd.Sysfd) if err == nil { return s, rsa, "", err } switch err { case syscall.EAGAIN: if fd.pd.pollable() { if err = fd.pd.waitRead(fd.isFile); err == nil { continue } } case syscall.ECONNABORTED: // This means that a socket on the listen // queue was closed before we Accept()ed it; // it's a silly error, so try again. continue } return -1, nil, errcall, err } } // Seek wraps syscall.Seek. func (fd *FD) Seek(offset int64, whence int) (int64, error) { if err := fd.incref(); err != nil { return 0, err } defer fd.decref() return syscall.Seek(fd.Sysfd, offset, whence) } // ReadDirent wraps syscall.ReadDirent. // We treat this like an ordinary system call rather than a call // that tries to fill the buffer. func (fd *FD) ReadDirent(buf []byte) (int, error) { if err := fd.incref(); err != nil { return 0, err } defer fd.decref() for { n, err := syscall.ReadDirent(fd.Sysfd, buf) if err != nil { n = 0 if err == syscall.EAGAIN && fd.pd.pollable() { if err = fd.pd.waitRead(fd.isFile); err == nil { continue } } } // Do not call eofError; caller does not expect to see io.EOF. return n, err } } // Fchdir wraps syscall.Fchdir. func (fd *FD) Fchdir() error { if err := fd.incref(); err != nil { return err } defer fd.decref() return syscall.Fchdir(fd.Sysfd) } // Fstat wraps syscall.Fstat func (fd *FD) Fstat(s *syscall.Stat_t) error { if err := fd.incref(); err != nil { return err } defer fd.decref() return syscall.Fstat(fd.Sysfd, s) } // On Unix variants only, expose the IO event for the net code. // WaitWrite waits until data can be read from fd. func (fd *FD) WaitWrite() error { return fd.pd.waitWrite(fd.isFile) } // WriteOnce is for testing only. It makes a single write call. func (fd *FD) WriteOnce(p []byte) (int, error) { if err := fd.writeLock(); err != nil { return 0, err } defer fd.writeUnlock() return syscall.Write(fd.Sysfd, p) } // RawControl invokes the user-defined function f for a non-IO // operation. func (fd *FD) RawControl(f func(uintptr)) error { if err := fd.incref(); err != nil { return err } defer fd.decref() f(uintptr(fd.Sysfd)) return nil } // RawRead invokes the user-defined function f for a read operation. func (fd *FD) RawRead(f func(uintptr) bool) error { if err := fd.readLock(); err != nil { return err } defer fd.readUnlock() if err := fd.pd.prepareRead(fd.isFile); err != nil { return err } for { if f(uintptr(fd.Sysfd)) { return nil } if err := fd.pd.waitRead(fd.isFile); err != nil { return err } } } // RawWrite invokes the user-defined function f for a write operation. func (fd *FD) RawWrite(f func(uintptr) bool) error { if err := fd.writeLock(); err != nil { return err } defer fd.writeUnlock() if err := fd.pd.prepareWrite(fd.isFile); err != nil { return err } for { if f(uintptr(fd.Sysfd)) { return nil } if err := fd.pd.waitWrite(fd.isFile); err != nil { return err } } }