1// Copyright 2013 The Go Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5package poll
6
7import "sync/atomic"
8
9// fdMutex is a specialized synchronization primitive that manages
10// lifetime of an fd and serializes access to Read, Write and Close
11// methods on FD.
12type fdMutex struct {
13	state uint64
14	rsema uint32
15	wsema uint32
16}
17
18// fdMutex.state is organized as follows:
19// 1 bit - whether FD is closed, if set all subsequent lock operations will fail.
20// 1 bit - lock for read operations.
21// 1 bit - lock for write operations.
22// 20 bits - total number of references (read+write+misc).
23// 20 bits - number of outstanding read waiters.
24// 20 bits - number of outstanding write waiters.
25const (
26	mutexClosed  = 1 << 0
27	mutexRLock   = 1 << 1
28	mutexWLock   = 1 << 2
29	mutexRef     = 1 << 3
30	mutexRefMask = (1<<20 - 1) << 3
31	mutexRWait   = 1 << 23
32	mutexRMask   = (1<<20 - 1) << 23
33	mutexWWait   = 1 << 43
34	mutexWMask   = (1<<20 - 1) << 43
35)
36
37// Read operations must do rwlock(true)/rwunlock(true).
38//
39// Write operations must do rwlock(false)/rwunlock(false).
40//
41// Misc operations must do incref/decref.
42// Misc operations include functions like setsockopt and setDeadline.
43// They need to use incref/decref to ensure that they operate on the
44// correct fd in presence of a concurrent close call (otherwise fd can
45// be closed under their feet).
46//
47// Close operations must do increfAndClose/decref.
48
49// incref adds a reference to mu.
50// It reports whether mu is available for reading or writing.
51func (mu *fdMutex) incref() bool {
52	for {
53		old := atomic.LoadUint64(&mu.state)
54		if old&mutexClosed != 0 {
55			return false
56		}
57		new := old + mutexRef
58		if new&mutexRefMask == 0 {
59			panic("inconsistent poll.fdMutex")
60		}
61		if atomic.CompareAndSwapUint64(&mu.state, old, new) {
62			return true
63		}
64	}
65}
66
67// increfAndClose sets the state of mu to closed.
68// It returns false if the file was already closed.
69func (mu *fdMutex) increfAndClose() bool {
70	for {
71		old := atomic.LoadUint64(&mu.state)
72		if old&mutexClosed != 0 {
73			return false
74		}
75		// Mark as closed and acquire a reference.
76		new := (old | mutexClosed) + mutexRef
77		if new&mutexRefMask == 0 {
78			panic("inconsistent poll.fdMutex")
79		}
80		// Remove all read and write waiters.
81		new &^= mutexRMask | mutexWMask
82		if atomic.CompareAndSwapUint64(&mu.state, old, new) {
83			// Wake all read and write waiters,
84			// they will observe closed flag after wakeup.
85			for old&mutexRMask != 0 {
86				old -= mutexRWait
87				runtime_Semrelease(&mu.rsema)
88			}
89			for old&mutexWMask != 0 {
90				old -= mutexWWait
91				runtime_Semrelease(&mu.wsema)
92			}
93			return true
94		}
95	}
96}
97
98// decref removes a reference from mu.
99// It reports whether there is no remaining reference.
100func (mu *fdMutex) decref() bool {
101	for {
102		old := atomic.LoadUint64(&mu.state)
103		if old&mutexRefMask == 0 {
104			panic("inconsistent poll.fdMutex")
105		}
106		new := old - mutexRef
107		if atomic.CompareAndSwapUint64(&mu.state, old, new) {
108			return new&(mutexClosed|mutexRefMask) == mutexClosed
109		}
110	}
111}
112
113// lock adds a reference to mu and locks mu.
114// It reports whether mu is available for reading or writing.
115func (mu *fdMutex) rwlock(read bool) bool {
116	var mutexBit, mutexWait, mutexMask uint64
117	var mutexSema *uint32
118	if read {
119		mutexBit = mutexRLock
120		mutexWait = mutexRWait
121		mutexMask = mutexRMask
122		mutexSema = &mu.rsema
123	} else {
124		mutexBit = mutexWLock
125		mutexWait = mutexWWait
126		mutexMask = mutexWMask
127		mutexSema = &mu.wsema
128	}
129	for {
130		old := atomic.LoadUint64(&mu.state)
131		if old&mutexClosed != 0 {
132			return false
133		}
134		var new uint64
135		if old&mutexBit == 0 {
136			// Lock is free, acquire it.
137			new = (old | mutexBit) + mutexRef
138			if new&mutexRefMask == 0 {
139				panic("inconsistent poll.fdMutex")
140			}
141		} else {
142			// Wait for lock.
143			new = old + mutexWait
144			if new&mutexMask == 0 {
145				panic("inconsistent poll.fdMutex")
146			}
147		}
148		if atomic.CompareAndSwapUint64(&mu.state, old, new) {
149			if old&mutexBit == 0 {
150				return true
151			}
152			runtime_Semacquire(mutexSema)
153			// The signaller has subtracted mutexWait.
154		}
155	}
156}
157
158// unlock removes a reference from mu and unlocks mu.
159// It reports whether there is no remaining reference.
160func (mu *fdMutex) rwunlock(read bool) bool {
161	var mutexBit, mutexWait, mutexMask uint64
162	var mutexSema *uint32
163	if read {
164		mutexBit = mutexRLock
165		mutexWait = mutexRWait
166		mutexMask = mutexRMask
167		mutexSema = &mu.rsema
168	} else {
169		mutexBit = mutexWLock
170		mutexWait = mutexWWait
171		mutexMask = mutexWMask
172		mutexSema = &mu.wsema
173	}
174	for {
175		old := atomic.LoadUint64(&mu.state)
176		if old&mutexBit == 0 || old&mutexRefMask == 0 {
177			panic("inconsistent poll.fdMutex")
178		}
179		// Drop lock, drop reference and wake read waiter if present.
180		new := (old &^ mutexBit) - mutexRef
181		if old&mutexMask != 0 {
182			new -= mutexWait
183		}
184		if atomic.CompareAndSwapUint64(&mu.state, old, new) {
185			if old&mutexMask != 0 {
186				runtime_Semrelease(mutexSema)
187			}
188			return new&(mutexClosed|mutexRefMask) == mutexClosed
189		}
190	}
191}
192
193// Implemented in runtime package.
194func runtime_Semacquire(sema *uint32)
195func runtime_Semrelease(sema *uint32)
196
197// incref adds a reference to fd.
198// It returns an error when fd cannot be used.
199func (fd *FD) incref() error {
200	if !fd.fdmu.incref() {
201		return errClosing(fd.isFile)
202	}
203	return nil
204}
205
206// decref removes a reference from fd.
207// It also closes fd when the state of fd is set to closed and there
208// is no remaining reference.
209func (fd *FD) decref() error {
210	if fd.fdmu.decref() {
211		return fd.destroy()
212	}
213	return nil
214}
215
216// readLock adds a reference to fd and locks fd for reading.
217// It returns an error when fd cannot be used for reading.
218func (fd *FD) readLock() error {
219	if !fd.fdmu.rwlock(true) {
220		return errClosing(fd.isFile)
221	}
222	return nil
223}
224
225// readUnlock removes a reference from fd and unlocks fd for reading.
226// It also closes fd when the state of fd is set to closed and there
227// is no remaining reference.
228func (fd *FD) readUnlock() {
229	if fd.fdmu.rwunlock(true) {
230		fd.destroy()
231	}
232}
233
234// writeLock adds a reference to fd and locks fd for writing.
235// It returns an error when fd cannot be used for writing.
236func (fd *FD) writeLock() error {
237	if !fd.fdmu.rwlock(false) {
238		return errClosing(fd.isFile)
239	}
240	return nil
241}
242
243// writeUnlock removes a reference from fd and unlocks fd for writing.
244// It also closes fd when the state of fd is set to closed and there
245// is no remaining reference.
246func (fd *FD) writeUnlock() {
247	if fd.fdmu.rwunlock(false) {
248		fd.destroy()
249	}
250}
251