1// Copyright 2013 The Go Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style 3// license that can be found in the LICENSE file. 4 5package poll 6 7import "sync/atomic" 8 9// fdMutex is a specialized synchronization primitive that manages 10// lifetime of an fd and serializes access to Read, Write and Close 11// methods on FD. 12type fdMutex struct { 13 state uint64 14 rsema uint32 15 wsema uint32 16} 17 18// fdMutex.state is organized as follows: 19// 1 bit - whether FD is closed, if set all subsequent lock operations will fail. 20// 1 bit - lock for read operations. 21// 1 bit - lock for write operations. 22// 20 bits - total number of references (read+write+misc). 23// 20 bits - number of outstanding read waiters. 24// 20 bits - number of outstanding write waiters. 25const ( 26 mutexClosed = 1 << 0 27 mutexRLock = 1 << 1 28 mutexWLock = 1 << 2 29 mutexRef = 1 << 3 30 mutexRefMask = (1<<20 - 1) << 3 31 mutexRWait = 1 << 23 32 mutexRMask = (1<<20 - 1) << 23 33 mutexWWait = 1 << 43 34 mutexWMask = (1<<20 - 1) << 43 35) 36 37const overflowMsg = "too many concurrent operations on a single file or socket (max 1048575)" 38 39// Read operations must do rwlock(true)/rwunlock(true). 40// 41// Write operations must do rwlock(false)/rwunlock(false). 42// 43// Misc operations must do incref/decref. 44// Misc operations include functions like setsockopt and setDeadline. 45// They need to use incref/decref to ensure that they operate on the 46// correct fd in presence of a concurrent close call (otherwise fd can 47// be closed under their feet). 48// 49// Close operations must do increfAndClose/decref. 50 51// incref adds a reference to mu. 52// It reports whether mu is available for reading or writing. 53func (mu *fdMutex) incref() bool { 54 for { 55 old := atomic.LoadUint64(&mu.state) 56 if old&mutexClosed != 0 { 57 return false 58 } 59 new := old + mutexRef 60 if new&mutexRefMask == 0 { 61 panic(overflowMsg) 62 } 63 if atomic.CompareAndSwapUint64(&mu.state, old, new) { 64 return true 65 } 66 } 67} 68 69// increfAndClose sets the state of mu to closed. 70// It returns false if the file was already closed. 71func (mu *fdMutex) increfAndClose() bool { 72 for { 73 old := atomic.LoadUint64(&mu.state) 74 if old&mutexClosed != 0 { 75 return false 76 } 77 // Mark as closed and acquire a reference. 78 new := (old | mutexClosed) + mutexRef 79 if new&mutexRefMask == 0 { 80 panic(overflowMsg) 81 } 82 // Remove all read and write waiters. 83 new &^= mutexRMask | mutexWMask 84 if atomic.CompareAndSwapUint64(&mu.state, old, new) { 85 // Wake all read and write waiters, 86 // they will observe closed flag after wakeup. 87 for old&mutexRMask != 0 { 88 old -= mutexRWait 89 runtime_Semrelease(&mu.rsema) 90 } 91 for old&mutexWMask != 0 { 92 old -= mutexWWait 93 runtime_Semrelease(&mu.wsema) 94 } 95 return true 96 } 97 } 98} 99 100// decref removes a reference from mu. 101// It reports whether there is no remaining reference. 102func (mu *fdMutex) decref() bool { 103 for { 104 old := atomic.LoadUint64(&mu.state) 105 if old&mutexRefMask == 0 { 106 panic("inconsistent poll.fdMutex") 107 } 108 new := old - mutexRef 109 if atomic.CompareAndSwapUint64(&mu.state, old, new) { 110 return new&(mutexClosed|mutexRefMask) == mutexClosed 111 } 112 } 113} 114 115// lock adds a reference to mu and locks mu. 116// It reports whether mu is available for reading or writing. 117func (mu *fdMutex) rwlock(read bool) bool { 118 var mutexBit, mutexWait, mutexMask uint64 119 var mutexSema *uint32 120 if read { 121 mutexBit = mutexRLock 122 mutexWait = mutexRWait 123 mutexMask = mutexRMask 124 mutexSema = &mu.rsema 125 } else { 126 mutexBit = mutexWLock 127 mutexWait = mutexWWait 128 mutexMask = mutexWMask 129 mutexSema = &mu.wsema 130 } 131 for { 132 old := atomic.LoadUint64(&mu.state) 133 if old&mutexClosed != 0 { 134 return false 135 } 136 var new uint64 137 if old&mutexBit == 0 { 138 // Lock is free, acquire it. 139 new = (old | mutexBit) + mutexRef 140 if new&mutexRefMask == 0 { 141 panic(overflowMsg) 142 } 143 } else { 144 // Wait for lock. 145 new = old + mutexWait 146 if new&mutexMask == 0 { 147 panic(overflowMsg) 148 } 149 } 150 if atomic.CompareAndSwapUint64(&mu.state, old, new) { 151 if old&mutexBit == 0 { 152 return true 153 } 154 runtime_Semacquire(mutexSema) 155 // The signaller has subtracted mutexWait. 156 } 157 } 158} 159 160// unlock removes a reference from mu and unlocks mu. 161// It reports whether there is no remaining reference. 162func (mu *fdMutex) rwunlock(read bool) bool { 163 var mutexBit, mutexWait, mutexMask uint64 164 var mutexSema *uint32 165 if read { 166 mutexBit = mutexRLock 167 mutexWait = mutexRWait 168 mutexMask = mutexRMask 169 mutexSema = &mu.rsema 170 } else { 171 mutexBit = mutexWLock 172 mutexWait = mutexWWait 173 mutexMask = mutexWMask 174 mutexSema = &mu.wsema 175 } 176 for { 177 old := atomic.LoadUint64(&mu.state) 178 if old&mutexBit == 0 || old&mutexRefMask == 0 { 179 panic("inconsistent poll.fdMutex") 180 } 181 // Drop lock, drop reference and wake read waiter if present. 182 new := (old &^ mutexBit) - mutexRef 183 if old&mutexMask != 0 { 184 new -= mutexWait 185 } 186 if atomic.CompareAndSwapUint64(&mu.state, old, new) { 187 if old&mutexMask != 0 { 188 runtime_Semrelease(mutexSema) 189 } 190 return new&(mutexClosed|mutexRefMask) == mutexClosed 191 } 192 } 193} 194 195// Implemented in runtime package. 196func runtime_Semacquire(sema *uint32) 197func runtime_Semrelease(sema *uint32) 198 199// incref adds a reference to fd. 200// It returns an error when fd cannot be used. 201func (fd *FD) incref() error { 202 if !fd.fdmu.incref() { 203 return errClosing(fd.isFile) 204 } 205 return nil 206} 207 208// decref removes a reference from fd. 209// It also closes fd when the state of fd is set to closed and there 210// is no remaining reference. 211func (fd *FD) decref() error { 212 if fd.fdmu.decref() { 213 return fd.destroy() 214 } 215 return nil 216} 217 218// readLock adds a reference to fd and locks fd for reading. 219// It returns an error when fd cannot be used for reading. 220func (fd *FD) readLock() error { 221 if !fd.fdmu.rwlock(true) { 222 return errClosing(fd.isFile) 223 } 224 return nil 225} 226 227// readUnlock removes a reference from fd and unlocks fd for reading. 228// It also closes fd when the state of fd is set to closed and there 229// is no remaining reference. 230func (fd *FD) readUnlock() { 231 if fd.fdmu.rwunlock(true) { 232 fd.destroy() 233 } 234} 235 236// writeLock adds a reference to fd and locks fd for writing. 237// It returns an error when fd cannot be used for writing. 238func (fd *FD) writeLock() error { 239 if !fd.fdmu.rwlock(false) { 240 return errClosing(fd.isFile) 241 } 242 return nil 243} 244 245// writeUnlock removes a reference from fd and unlocks fd for writing. 246// It also closes fd when the state of fd is set to closed and there 247// is no remaining reference. 248func (fd *FD) writeUnlock() { 249 if fd.fdmu.rwunlock(false) { 250 fd.destroy() 251 } 252} 253