1package goprocess
2
3import (
4	"sync"
5)
6
7// process implements Process
8type process struct {
9	children map[*processLink]struct{} // process to close with us
10	waitfors map[*processLink]struct{} // process to only wait for
11	waiters  []*processLink            // processes that wait for us. for gc.
12
13	teardown TeardownFunc  // called to run the teardown logic.
14	closing  chan struct{} // closed once close starts.
15	closed   chan struct{} // closed once close is done.
16	closeErr error         // error to return to clients of Close()
17
18	sync.Mutex
19}
20
21// newProcess constructs and returns a Process.
22// It will call tf TeardownFunc exactly once:
23//  **after** all children have fully Closed,
24//  **after** entering <-Closing(), and
25//  **before** <-Closed().
26func newProcess(tf TeardownFunc) *process {
27	return &process{
28		teardown: tf,
29		closed:   make(chan struct{}),
30		closing:  make(chan struct{}),
31		waitfors: make(map[*processLink]struct{}),
32		children: make(map[*processLink]struct{}),
33	}
34}
35
36func (p *process) WaitFor(q Process) {
37	if q == nil {
38		panic("waiting for nil process")
39	}
40
41	p.Lock()
42	defer p.Unlock()
43
44	select {
45	case <-p.Closed():
46		panic("Process cannot wait after being closed")
47	default:
48	}
49
50	pl := newProcessLink(p, q)
51	if p.waitfors == nil {
52		// This may be nil when we're closing. In close, we'll keep
53		// reading this map till it stays nil.
54		p.waitfors = make(map[*processLink]struct{}, 1)
55	}
56	p.waitfors[pl] = struct{}{}
57	go pl.AddToChild()
58}
59
60func (p *process) AddChildNoWait(child Process) {
61	if child == nil {
62		panic("adding nil child process")
63	}
64
65	p.Lock()
66	defer p.Unlock()
67
68	select {
69	case <-p.Closing():
70		// Either closed or closing, close child immediately. This is
71		// correct because we aren't asked to _wait_ on this child.
72		go child.Close()
73		// Wait for the child to start closing so the child is in the
74		// "correct" state after this function finishes (see #17).
75		<-child.Closing()
76		return
77	default:
78	}
79
80	pl := newProcessLink(p, child)
81	p.children[pl] = struct{}{}
82	go pl.AddToChild()
83}
84
85func (p *process) AddChild(child Process) {
86	if child == nil {
87		panic("adding nil child process")
88	}
89
90	p.Lock()
91	defer p.Unlock()
92
93	pl := newProcessLink(p, child)
94
95	select {
96	case <-p.Closed():
97		// AddChild must not be called on a dead process. Maybe that's
98		// too strict?
99		panic("Process cannot add children after being closed")
100	default:
101	}
102
103	select {
104	case <-p.Closing():
105		// Already closing, close child in background.
106		go child.Close()
107		// Wait for the child to start closing so the child is in the
108		// "correct" state after this function finishes (see #17).
109		<-child.Closing()
110	default:
111		// Only add the child when not closing. When closing, just add
112		// it to the "waitfors" list.
113		p.children[pl] = struct{}{}
114	}
115
116	if p.waitfors == nil {
117		// This may be be nil when we're closing. In close, we'll keep
118		// reading this map till it stays nil.
119		p.waitfors = make(map[*processLink]struct{}, 1)
120	}
121	p.waitfors[pl] = struct{}{}
122	go pl.AddToChild()
123}
124
125func (p *process) Go(f ProcessFunc) Process {
126	child := newProcess(nil)
127	waitFor := newProcess(nil)
128	child.WaitFor(waitFor) // prevent child from closing
129
130	// add child last, to prevent a closing parent from
131	// closing all of them prematurely, before running the func.
132	p.AddChild(child)
133	go func() {
134		f(child)
135		waitFor.Close()            // allow child to close.
136		child.CloseAfterChildren() // close to tear down.
137	}()
138	return child
139}
140
141// SetTeardown to assign a teardown function
142func (p *process) SetTeardown(tf TeardownFunc) {
143	if tf == nil {
144		panic("cannot set nil TeardownFunc")
145	}
146
147	p.Lock()
148	if p.teardown != nil {
149		panic("cannot SetTeardown twice")
150	}
151
152	p.teardown = tf
153	select {
154	case <-p.Closed():
155		// Call the teardown function, but don't set the error. We can't
156		// change that after we shut down.
157		tf()
158	default:
159	}
160	p.Unlock()
161}
162
163// Close is the external close function.
164// it's a wrapper around internalClose that waits on Closed()
165func (p *process) Close() error {
166	p.Lock()
167
168	// if already closing, or closed, get out. (but wait!)
169	select {
170	case <-p.Closing():
171		p.Unlock()
172		<-p.Closed()
173		return p.closeErr
174	default:
175	}
176
177	p.doClose()
178	p.Unlock()
179	return p.closeErr
180}
181
182func (p *process) Closing() <-chan struct{} {
183	return p.closing
184}
185
186func (p *process) Closed() <-chan struct{} {
187	return p.closed
188}
189
190func (p *process) Err() error {
191	<-p.Closed()
192	return p.closeErr
193}
194
195// the _actual_ close process.
196func (p *process) doClose() {
197	// this function is only be called once (protected by p.Lock()).
198	// and it will panic (on closing channels) otherwise.
199
200	close(p.closing) // signal that we're shutting down (Closing)
201
202	// We won't add any children after we start closing so we can do this
203	// once.
204	for plc, _ := range p.children {
205		child := plc.Child()
206		if child != nil { // check because child may already have been removed.
207			go child.Close() // force all children to shut down
208		}
209
210		// safe to call multiple times per link
211		plc.ParentClear()
212	}
213	p.children = nil // clear them. release memory.
214
215	// We may repeatedly continue to add waiters while we wait to close so
216	// we have to do this in a loop.
217	for len(p.waitfors) > 0 {
218		// we must be careful not to iterate over waitfors directly, as it may
219		// change under our feet.
220		wf := p.waitfors
221		p.waitfors = nil // clear them. release memory.
222		for w, _ := range wf {
223			// Here, we wait UNLOCKED, so that waitfors who are in the middle of
224			// adding a child to us can finish. we will immediately close the child.
225			p.Unlock()
226			<-w.ChildClosed() // wait till all waitfors are fully closed (before teardown)
227			p.Lock()
228
229			// safe to call multiple times per link
230			w.ParentClear()
231		}
232	}
233
234	if p.teardown != nil {
235		p.closeErr = p.teardown() // actually run the close logic (ok safe to teardown)
236	}
237	close(p.closed) // signal that we're shut down (Closed)
238
239	// go remove all the parents from the process links. optimization.
240	go func(waiters []*processLink) {
241		for _, pl := range waiters {
242			pl.ClearChild()
243			pr, ok := pl.Parent().(*process)
244			if !ok {
245				// parent has already been called to close
246				continue
247			}
248			pr.Lock()
249			delete(pr.waitfors, pl)
250			delete(pr.children, pl)
251			pr.Unlock()
252		}
253	}(p.waiters) // pass in so
254	p.waiters = nil // clear them. release memory.
255}
256
257// We will only wait on the children we have now.
258// We will not wait on children added subsequently.
259// this may change in the future.
260func (p *process) CloseAfterChildren() error {
261	p.Lock()
262	select {
263	case <-p.Closed():
264		p.Unlock()
265		return p.Close() // get error. safe, after p.Closed()
266	default:
267	}
268	p.Unlock()
269
270	// here only from one goroutine.
271
272	nextToWaitFor := func() Process {
273		p.Lock()
274		defer p.Unlock()
275		for e, _ := range p.waitfors {
276			c := e.Child()
277			if c == nil {
278				continue
279			}
280
281			select {
282			case <-c.Closed():
283			default:
284				return c
285			}
286		}
287		return nil
288	}
289
290	// wait for all processes we're waiting for are closed.
291	// the semantics here are simple: we will _only_ close
292	// if there are no processes currently waiting for.
293	for next := nextToWaitFor(); next != nil; next = nextToWaitFor() {
294		<-next.Closed()
295	}
296
297	// YAY! we're done. close
298	return p.Close()
299}
300