1// Copyright 2013 The Go Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style 3// license that can be found in the LICENSE file. 4 5package poll 6 7import "sync/atomic" 8 9// fdMutex is a specialized synchronization primitive that manages 10// lifetime of an fd and serializes access to Read, Write and Close 11// methods on FD. 12type fdMutex struct { 13 state uint64 14 rsema uint32 15 wsema uint32 16} 17 18// fdMutex.state is organized as follows: 19// 1 bit - whether FD is closed, if set all subsequent lock operations will fail. 20// 1 bit - lock for read operations. 21// 1 bit - lock for write operations. 22// 20 bits - total number of references (read+write+misc). 23// 20 bits - number of outstanding read waiters. 24// 20 bits - number of outstanding write waiters. 25const ( 26 mutexClosed = 1 << 0 27 mutexRLock = 1 << 1 28 mutexWLock = 1 << 2 29 mutexRef = 1 << 3 30 mutexRefMask = (1<<20 - 1) << 3 31 mutexRWait = 1 << 23 32 mutexRMask = (1<<20 - 1) << 23 33 mutexWWait = 1 << 43 34 mutexWMask = (1<<20 - 1) << 43 35) 36 37// Read operations must do rwlock(true)/rwunlock(true). 38// 39// Write operations must do rwlock(false)/rwunlock(false). 40// 41// Misc operations must do incref/decref. 42// Misc operations include functions like setsockopt and setDeadline. 43// They need to use incref/decref to ensure that they operate on the 44// correct fd in presence of a concurrent close call (otherwise fd can 45// be closed under their feet). 46// 47// Close operations must do increfAndClose/decref. 48 49// incref adds a reference to mu. 50// It reports whether mu is available for reading or writing. 51func (mu *fdMutex) incref() bool { 52 for { 53 old := atomic.LoadUint64(&mu.state) 54 if old&mutexClosed != 0 { 55 return false 56 } 57 new := old + mutexRef 58 if new&mutexRefMask == 0 { 59 panic("inconsistent poll.fdMutex") 60 } 61 if atomic.CompareAndSwapUint64(&mu.state, old, new) { 62 return true 63 } 64 } 65} 66 67// increfAndClose sets the state of mu to closed. 68// It returns false if the file was already closed. 69func (mu *fdMutex) increfAndClose() bool { 70 for { 71 old := atomic.LoadUint64(&mu.state) 72 if old&mutexClosed != 0 { 73 return false 74 } 75 // Mark as closed and acquire a reference. 76 new := (old | mutexClosed) + mutexRef 77 if new&mutexRefMask == 0 { 78 panic("inconsistent poll.fdMutex") 79 } 80 // Remove all read and write waiters. 81 new &^= mutexRMask | mutexWMask 82 if atomic.CompareAndSwapUint64(&mu.state, old, new) { 83 // Wake all read and write waiters, 84 // they will observe closed flag after wakeup. 85 for old&mutexRMask != 0 { 86 old -= mutexRWait 87 runtime_Semrelease(&mu.rsema) 88 } 89 for old&mutexWMask != 0 { 90 old -= mutexWWait 91 runtime_Semrelease(&mu.wsema) 92 } 93 return true 94 } 95 } 96} 97 98// decref removes a reference from mu. 99// It reports whether there is no remaining reference. 100func (mu *fdMutex) decref() bool { 101 for { 102 old := atomic.LoadUint64(&mu.state) 103 if old&mutexRefMask == 0 { 104 panic("inconsistent poll.fdMutex") 105 } 106 new := old - mutexRef 107 if atomic.CompareAndSwapUint64(&mu.state, old, new) { 108 return new&(mutexClosed|mutexRefMask) == mutexClosed 109 } 110 } 111} 112 113// lock adds a reference to mu and locks mu. 114// It reports whether mu is available for reading or writing. 115func (mu *fdMutex) rwlock(read bool) bool { 116 var mutexBit, mutexWait, mutexMask uint64 117 var mutexSema *uint32 118 if read { 119 mutexBit = mutexRLock 120 mutexWait = mutexRWait 121 mutexMask = mutexRMask 122 mutexSema = &mu.rsema 123 } else { 124 mutexBit = mutexWLock 125 mutexWait = mutexWWait 126 mutexMask = mutexWMask 127 mutexSema = &mu.wsema 128 } 129 for { 130 old := atomic.LoadUint64(&mu.state) 131 if old&mutexClosed != 0 { 132 return false 133 } 134 var new uint64 135 if old&mutexBit == 0 { 136 // Lock is free, acquire it. 137 new = (old | mutexBit) + mutexRef 138 if new&mutexRefMask == 0 { 139 panic("inconsistent poll.fdMutex") 140 } 141 } else { 142 // Wait for lock. 143 new = old + mutexWait 144 if new&mutexMask == 0 { 145 panic("inconsistent poll.fdMutex") 146 } 147 } 148 if atomic.CompareAndSwapUint64(&mu.state, old, new) { 149 if old&mutexBit == 0 { 150 return true 151 } 152 runtime_Semacquire(mutexSema) 153 // The signaller has subtracted mutexWait. 154 } 155 } 156} 157 158// unlock removes a reference from mu and unlocks mu. 159// It reports whether there is no remaining reference. 160func (mu *fdMutex) rwunlock(read bool) bool { 161 var mutexBit, mutexWait, mutexMask uint64 162 var mutexSema *uint32 163 if read { 164 mutexBit = mutexRLock 165 mutexWait = mutexRWait 166 mutexMask = mutexRMask 167 mutexSema = &mu.rsema 168 } else { 169 mutexBit = mutexWLock 170 mutexWait = mutexWWait 171 mutexMask = mutexWMask 172 mutexSema = &mu.wsema 173 } 174 for { 175 old := atomic.LoadUint64(&mu.state) 176 if old&mutexBit == 0 || old&mutexRefMask == 0 { 177 panic("inconsistent poll.fdMutex") 178 } 179 // Drop lock, drop reference and wake read waiter if present. 180 new := (old &^ mutexBit) - mutexRef 181 if old&mutexMask != 0 { 182 new -= mutexWait 183 } 184 if atomic.CompareAndSwapUint64(&mu.state, old, new) { 185 if old&mutexMask != 0 { 186 runtime_Semrelease(mutexSema) 187 } 188 return new&(mutexClosed|mutexRefMask) == mutexClosed 189 } 190 } 191} 192 193// Implemented in runtime package. 194func runtime_Semacquire(sema *uint32) 195func runtime_Semrelease(sema *uint32) 196 197// incref adds a reference to fd. 198// It returns an error when fd cannot be used. 199func (fd *FD) incref() error { 200 if !fd.fdmu.incref() { 201 return errClosing(fd.isFile) 202 } 203 return nil 204} 205 206// decref removes a reference from fd. 207// It also closes fd when the state of fd is set to closed and there 208// is no remaining reference. 209func (fd *FD) decref() error { 210 if fd.fdmu.decref() { 211 return fd.destroy() 212 } 213 return nil 214} 215 216// readLock adds a reference to fd and locks fd for reading. 217// It returns an error when fd cannot be used for reading. 218func (fd *FD) readLock() error { 219 if !fd.fdmu.rwlock(true) { 220 return errClosing(fd.isFile) 221 } 222 return nil 223} 224 225// readUnlock removes a reference from fd and unlocks fd for reading. 226// It also closes fd when the state of fd is set to closed and there 227// is no remaining reference. 228func (fd *FD) readUnlock() { 229 if fd.fdmu.rwunlock(true) { 230 fd.destroy() 231 } 232} 233 234// writeLock adds a reference to fd and locks fd for writing. 235// It returns an error when fd cannot be used for writing. 236func (fd *FD) writeLock() error { 237 if !fd.fdmu.rwlock(false) { 238 return errClosing(fd.isFile) 239 } 240 return nil 241} 242 243// writeUnlock removes a reference from fd and unlocks fd for writing. 244// It also closes fd when the state of fd is set to closed and there 245// is no remaining reference. 246func (fd *FD) writeUnlock() { 247 if fd.fdmu.rwunlock(false) { 248 fd.destroy() 249 } 250} 251