1// +build windows 2 3package winio 4 5import ( 6 "errors" 7 "io" 8 "runtime" 9 "sync" 10 "sync/atomic" 11 "syscall" 12 "time" 13) 14 15//sys cancelIoEx(file syscall.Handle, o *syscall.Overlapped) (err error) = CancelIoEx 16//sys createIoCompletionPort(file syscall.Handle, port syscall.Handle, key uintptr, threadCount uint32) (newport syscall.Handle, err error) = CreateIoCompletionPort 17//sys getQueuedCompletionStatus(port syscall.Handle, bytes *uint32, key *uintptr, o **ioOperation, timeout uint32) (err error) = GetQueuedCompletionStatus 18//sys setFileCompletionNotificationModes(h syscall.Handle, flags uint8) (err error) = SetFileCompletionNotificationModes 19 20type atomicBool int32 21 22func (b *atomicBool) isSet() bool { return atomic.LoadInt32((*int32)(b)) != 0 } 23func (b *atomicBool) setFalse() { atomic.StoreInt32((*int32)(b), 0) } 24func (b *atomicBool) setTrue() { atomic.StoreInt32((*int32)(b), 1) } 25func (b *atomicBool) swap(new bool) bool { 26 var newInt int32 27 if new { 28 newInt = 1 29 } 30 return atomic.SwapInt32((*int32)(b), newInt) == 1 31} 32 33const ( 34 cFILE_SKIP_COMPLETION_PORT_ON_SUCCESS = 1 35 cFILE_SKIP_SET_EVENT_ON_HANDLE = 2 36) 37 38var ( 39 ErrFileClosed = errors.New("file has already been closed") 40 ErrTimeout = &timeoutError{} 41) 42 43type timeoutError struct{} 44 45func (e *timeoutError) Error() string { return "i/o timeout" } 46func (e *timeoutError) Timeout() bool { return true } 47func (e *timeoutError) Temporary() bool { return true } 48 49type timeoutChan chan struct{} 50 51var ioInitOnce sync.Once 52var ioCompletionPort syscall.Handle 53 54// ioResult contains the result of an asynchronous IO operation 55type ioResult struct { 56 bytes uint32 57 err error 58} 59 60// ioOperation represents an outstanding asynchronous Win32 IO 61type ioOperation struct { 62 o syscall.Overlapped 63 ch chan ioResult 64} 65 66func initIo() { 67 h, err := createIoCompletionPort(syscall.InvalidHandle, 0, 0, 0xffffffff) 68 if err != nil { 69 panic(err) 70 } 71 ioCompletionPort = h 72 go ioCompletionProcessor(h) 73} 74 75// win32File implements Reader, Writer, and Closer on a Win32 handle without blocking in a syscall. 76// It takes ownership of this handle and will close it if it is garbage collected. 77type win32File struct { 78 handle syscall.Handle 79 wg sync.WaitGroup 80 wgLock sync.RWMutex 81 closing atomicBool 82 readDeadline deadlineHandler 83 writeDeadline deadlineHandler 84} 85 86type deadlineHandler struct { 87 setLock sync.Mutex 88 channel timeoutChan 89 channelLock sync.RWMutex 90 timer *time.Timer 91 timedout atomicBool 92} 93 94// makeWin32File makes a new win32File from an existing file handle 95func makeWin32File(h syscall.Handle) (*win32File, error) { 96 f := &win32File{handle: h} 97 ioInitOnce.Do(initIo) 98 _, err := createIoCompletionPort(h, ioCompletionPort, 0, 0xffffffff) 99 if err != nil { 100 return nil, err 101 } 102 err = setFileCompletionNotificationModes(h, cFILE_SKIP_COMPLETION_PORT_ON_SUCCESS|cFILE_SKIP_SET_EVENT_ON_HANDLE) 103 if err != nil { 104 return nil, err 105 } 106 f.readDeadline.channel = make(timeoutChan) 107 f.writeDeadline.channel = make(timeoutChan) 108 return f, nil 109} 110 111func MakeOpenFile(h syscall.Handle) (io.ReadWriteCloser, error) { 112 return makeWin32File(h) 113} 114 115// closeHandle closes the resources associated with a Win32 handle 116func (f *win32File) closeHandle() { 117 f.wgLock.Lock() 118 // Atomically set that we are closing, releasing the resources only once. 119 if !f.closing.swap(true) { 120 f.wgLock.Unlock() 121 // cancel all IO and wait for it to complete 122 cancelIoEx(f.handle, nil) 123 f.wg.Wait() 124 // at this point, no new IO can start 125 syscall.Close(f.handle) 126 f.handle = 0 127 } else { 128 f.wgLock.Unlock() 129 } 130} 131 132// Close closes a win32File. 133func (f *win32File) Close() error { 134 f.closeHandle() 135 return nil 136} 137 138// prepareIo prepares for a new IO operation. 139// The caller must call f.wg.Done() when the IO is finished, prior to Close() returning. 140func (f *win32File) prepareIo() (*ioOperation, error) { 141 f.wgLock.RLock() 142 if f.closing.isSet() { 143 f.wgLock.RUnlock() 144 return nil, ErrFileClosed 145 } 146 f.wg.Add(1) 147 f.wgLock.RUnlock() 148 c := &ioOperation{} 149 c.ch = make(chan ioResult) 150 return c, nil 151} 152 153// ioCompletionProcessor processes completed async IOs forever 154func ioCompletionProcessor(h syscall.Handle) { 155 for { 156 var bytes uint32 157 var key uintptr 158 var op *ioOperation 159 err := getQueuedCompletionStatus(h, &bytes, &key, &op, syscall.INFINITE) 160 if op == nil { 161 panic(err) 162 } 163 op.ch <- ioResult{bytes, err} 164 } 165} 166 167// asyncIo processes the return value from ReadFile or WriteFile, blocking until 168// the operation has actually completed. 169func (f *win32File) asyncIo(c *ioOperation, d *deadlineHandler, bytes uint32, err error) (int, error) { 170 if err != syscall.ERROR_IO_PENDING { 171 return int(bytes), err 172 } 173 174 if f.closing.isSet() { 175 cancelIoEx(f.handle, &c.o) 176 } 177 178 var timeout timeoutChan 179 if d != nil { 180 d.channelLock.Lock() 181 timeout = d.channel 182 d.channelLock.Unlock() 183 } 184 185 var r ioResult 186 select { 187 case r = <-c.ch: 188 err = r.err 189 if err == syscall.ERROR_OPERATION_ABORTED { 190 if f.closing.isSet() { 191 err = ErrFileClosed 192 } 193 } 194 case <-timeout: 195 cancelIoEx(f.handle, &c.o) 196 r = <-c.ch 197 err = r.err 198 if err == syscall.ERROR_OPERATION_ABORTED { 199 err = ErrTimeout 200 } 201 } 202 203 // runtime.KeepAlive is needed, as c is passed via native 204 // code to ioCompletionProcessor, c must remain alive 205 // until the channel read is complete. 206 runtime.KeepAlive(c) 207 return int(r.bytes), err 208} 209 210// Read reads from a file handle. 211func (f *win32File) Read(b []byte) (int, error) { 212 c, err := f.prepareIo() 213 if err != nil { 214 return 0, err 215 } 216 defer f.wg.Done() 217 218 if f.readDeadline.timedout.isSet() { 219 return 0, ErrTimeout 220 } 221 222 var bytes uint32 223 err = syscall.ReadFile(f.handle, b, &bytes, &c.o) 224 n, err := f.asyncIo(c, &f.readDeadline, bytes, err) 225 runtime.KeepAlive(b) 226 227 // Handle EOF conditions. 228 if err == nil && n == 0 && len(b) != 0 { 229 return 0, io.EOF 230 } else if err == syscall.ERROR_BROKEN_PIPE { 231 return 0, io.EOF 232 } else { 233 return n, err 234 } 235} 236 237// Write writes to a file handle. 238func (f *win32File) Write(b []byte) (int, error) { 239 c, err := f.prepareIo() 240 if err != nil { 241 return 0, err 242 } 243 defer f.wg.Done() 244 245 if f.writeDeadline.timedout.isSet() { 246 return 0, ErrTimeout 247 } 248 249 var bytes uint32 250 err = syscall.WriteFile(f.handle, b, &bytes, &c.o) 251 n, err := f.asyncIo(c, &f.writeDeadline, bytes, err) 252 runtime.KeepAlive(b) 253 return n, err 254} 255 256func (f *win32File) SetReadDeadline(deadline time.Time) error { 257 return f.readDeadline.set(deadline) 258} 259 260func (f *win32File) SetWriteDeadline(deadline time.Time) error { 261 return f.writeDeadline.set(deadline) 262} 263 264func (f *win32File) Flush() error { 265 return syscall.FlushFileBuffers(f.handle) 266} 267 268func (d *deadlineHandler) set(deadline time.Time) error { 269 d.setLock.Lock() 270 defer d.setLock.Unlock() 271 272 if d.timer != nil { 273 if !d.timer.Stop() { 274 <-d.channel 275 } 276 d.timer = nil 277 } 278 d.timedout.setFalse() 279 280 select { 281 case <-d.channel: 282 d.channelLock.Lock() 283 d.channel = make(chan struct{}) 284 d.channelLock.Unlock() 285 default: 286 } 287 288 if deadline.IsZero() { 289 return nil 290 } 291 292 timeoutIO := func() { 293 d.timedout.setTrue() 294 close(d.channel) 295 } 296 297 now := time.Now() 298 duration := deadline.Sub(now) 299 if deadline.After(now) { 300 // Deadline is in the future, set a timer to wait 301 d.timer = time.AfterFunc(duration, timeoutIO) 302 } else { 303 // Deadline is in the past. Cancel all pending IO now. 304 timeoutIO() 305 } 306 return nil 307} 308