1// Copyright 2013 The Go Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5package poll
6
7import "sync/atomic"
8
9// fdMutex is a specialized synchronization primitive that manages
10// lifetime of an fd and serializes access to Read, Write and Close
11// methods on FD.
12type fdMutex struct {
13	state uint64
14	rsema uint32
15	wsema uint32
16}
17
18// fdMutex.state is organized as follows:
19// 1 bit - whether FD is closed, if set all subsequent lock operations will fail.
20// 1 bit - lock for read operations.
21// 1 bit - lock for write operations.
22// 20 bits - total number of references (read+write+misc).
23// 20 bits - number of outstanding read waiters.
24// 20 bits - number of outstanding write waiters.
25const (
26	mutexClosed  = 1 << 0
27	mutexRLock   = 1 << 1
28	mutexWLock   = 1 << 2
29	mutexRef     = 1 << 3
30	mutexRefMask = (1<<20 - 1) << 3
31	mutexRWait   = 1 << 23
32	mutexRMask   = (1<<20 - 1) << 23
33	mutexWWait   = 1 << 43
34	mutexWMask   = (1<<20 - 1) << 43
35)
36
37const overflowMsg = "too many concurrent operations on a single file or socket (max 1048575)"
38
39// Read operations must do rwlock(true)/rwunlock(true).
40//
41// Write operations must do rwlock(false)/rwunlock(false).
42//
43// Misc operations must do incref/decref.
44// Misc operations include functions like setsockopt and setDeadline.
45// They need to use incref/decref to ensure that they operate on the
46// correct fd in presence of a concurrent close call (otherwise fd can
47// be closed under their feet).
48//
49// Close operations must do increfAndClose/decref.
50
51// incref adds a reference to mu.
52// It reports whether mu is available for reading or writing.
53func (mu *fdMutex) incref() bool {
54	for {
55		old := atomic.LoadUint64(&mu.state)
56		if old&mutexClosed != 0 {
57			return false
58		}
59		new := old + mutexRef
60		if new&mutexRefMask == 0 {
61			panic(overflowMsg)
62		}
63		if atomic.CompareAndSwapUint64(&mu.state, old, new) {
64			return true
65		}
66	}
67}
68
69// increfAndClose sets the state of mu to closed.
70// It returns false if the file was already closed.
71func (mu *fdMutex) increfAndClose() bool {
72	for {
73		old := atomic.LoadUint64(&mu.state)
74		if old&mutexClosed != 0 {
75			return false
76		}
77		// Mark as closed and acquire a reference.
78		new := (old | mutexClosed) + mutexRef
79		if new&mutexRefMask == 0 {
80			panic(overflowMsg)
81		}
82		// Remove all read and write waiters.
83		new &^= mutexRMask | mutexWMask
84		if atomic.CompareAndSwapUint64(&mu.state, old, new) {
85			// Wake all read and write waiters,
86			// they will observe closed flag after wakeup.
87			for old&mutexRMask != 0 {
88				old -= mutexRWait
89				runtime_Semrelease(&mu.rsema)
90			}
91			for old&mutexWMask != 0 {
92				old -= mutexWWait
93				runtime_Semrelease(&mu.wsema)
94			}
95			return true
96		}
97	}
98}
99
100// decref removes a reference from mu.
101// It reports whether there is no remaining reference.
102func (mu *fdMutex) decref() bool {
103	for {
104		old := atomic.LoadUint64(&mu.state)
105		if old&mutexRefMask == 0 {
106			panic("inconsistent poll.fdMutex")
107		}
108		new := old - mutexRef
109		if atomic.CompareAndSwapUint64(&mu.state, old, new) {
110			return new&(mutexClosed|mutexRefMask) == mutexClosed
111		}
112	}
113}
114
115// lock adds a reference to mu and locks mu.
116// It reports whether mu is available for reading or writing.
117func (mu *fdMutex) rwlock(read bool) bool {
118	var mutexBit, mutexWait, mutexMask uint64
119	var mutexSema *uint32
120	if read {
121		mutexBit = mutexRLock
122		mutexWait = mutexRWait
123		mutexMask = mutexRMask
124		mutexSema = &mu.rsema
125	} else {
126		mutexBit = mutexWLock
127		mutexWait = mutexWWait
128		mutexMask = mutexWMask
129		mutexSema = &mu.wsema
130	}
131	for {
132		old := atomic.LoadUint64(&mu.state)
133		if old&mutexClosed != 0 {
134			return false
135		}
136		var new uint64
137		if old&mutexBit == 0 {
138			// Lock is free, acquire it.
139			new = (old | mutexBit) + mutexRef
140			if new&mutexRefMask == 0 {
141				panic(overflowMsg)
142			}
143		} else {
144			// Wait for lock.
145			new = old + mutexWait
146			if new&mutexMask == 0 {
147				panic(overflowMsg)
148			}
149		}
150		if atomic.CompareAndSwapUint64(&mu.state, old, new) {
151			if old&mutexBit == 0 {
152				return true
153			}
154			runtime_Semacquire(mutexSema)
155			// The signaller has subtracted mutexWait.
156		}
157	}
158}
159
160// unlock removes a reference from mu and unlocks mu.
161// It reports whether there is no remaining reference.
162func (mu *fdMutex) rwunlock(read bool) bool {
163	var mutexBit, mutexWait, mutexMask uint64
164	var mutexSema *uint32
165	if read {
166		mutexBit = mutexRLock
167		mutexWait = mutexRWait
168		mutexMask = mutexRMask
169		mutexSema = &mu.rsema
170	} else {
171		mutexBit = mutexWLock
172		mutexWait = mutexWWait
173		mutexMask = mutexWMask
174		mutexSema = &mu.wsema
175	}
176	for {
177		old := atomic.LoadUint64(&mu.state)
178		if old&mutexBit == 0 || old&mutexRefMask == 0 {
179			panic("inconsistent poll.fdMutex")
180		}
181		// Drop lock, drop reference and wake read waiter if present.
182		new := (old &^ mutexBit) - mutexRef
183		if old&mutexMask != 0 {
184			new -= mutexWait
185		}
186		if atomic.CompareAndSwapUint64(&mu.state, old, new) {
187			if old&mutexMask != 0 {
188				runtime_Semrelease(mutexSema)
189			}
190			return new&(mutexClosed|mutexRefMask) == mutexClosed
191		}
192	}
193}
194
195// Implemented in runtime package.
196func runtime_Semacquire(sema *uint32)
197func runtime_Semrelease(sema *uint32)
198
199// incref adds a reference to fd.
200// It returns an error when fd cannot be used.
201func (fd *FD) incref() error {
202	if !fd.fdmu.incref() {
203		return errClosing(fd.isFile)
204	}
205	return nil
206}
207
208// decref removes a reference from fd.
209// It also closes fd when the state of fd is set to closed and there
210// is no remaining reference.
211func (fd *FD) decref() error {
212	if fd.fdmu.decref() {
213		return fd.destroy()
214	}
215	return nil
216}
217
218// readLock adds a reference to fd and locks fd for reading.
219// It returns an error when fd cannot be used for reading.
220func (fd *FD) readLock() error {
221	if !fd.fdmu.rwlock(true) {
222		return errClosing(fd.isFile)
223	}
224	return nil
225}
226
227// readUnlock removes a reference from fd and unlocks fd for reading.
228// It also closes fd when the state of fd is set to closed and there
229// is no remaining reference.
230func (fd *FD) readUnlock() {
231	if fd.fdmu.rwunlock(true) {
232		fd.destroy()
233	}
234}
235
236// writeLock adds a reference to fd and locks fd for writing.
237// It returns an error when fd cannot be used for writing.
238func (fd *FD) writeLock() error {
239	if !fd.fdmu.rwlock(false) {
240		return errClosing(fd.isFile)
241	}
242	return nil
243}
244
245// writeUnlock removes a reference from fd and unlocks fd for writing.
246// It also closes fd when the state of fd is set to closed and there
247// is no remaining reference.
248func (fd *FD) writeUnlock() {
249	if fd.fdmu.rwunlock(false) {
250		fd.destroy()
251	}
252}
253