1package goprocess 2 3import ( 4 "sync" 5) 6 7// process implements Process 8type process struct { 9 children map[*processLink]struct{} // process to close with us 10 waitfors map[*processLink]struct{} // process to only wait for 11 waiters []*processLink // processes that wait for us. for gc. 12 13 teardown TeardownFunc // called to run the teardown logic. 14 closing chan struct{} // closed once close starts. 15 closed chan struct{} // closed once close is done. 16 closeErr error // error to return to clients of Close() 17 18 sync.Mutex 19} 20 21// newProcess constructs and returns a Process. 22// It will call tf TeardownFunc exactly once: 23// **after** all children have fully Closed, 24// **after** entering <-Closing(), and 25// **before** <-Closed(). 26func newProcess(tf TeardownFunc) *process { 27 return &process{ 28 teardown: tf, 29 closed: make(chan struct{}), 30 closing: make(chan struct{}), 31 waitfors: make(map[*processLink]struct{}), 32 children: make(map[*processLink]struct{}), 33 } 34} 35 36func (p *process) WaitFor(q Process) { 37 if q == nil { 38 panic("waiting for nil process") 39 } 40 41 p.Lock() 42 defer p.Unlock() 43 44 select { 45 case <-p.Closed(): 46 panic("Process cannot wait after being closed") 47 default: 48 } 49 50 pl := newProcessLink(p, q) 51 if p.waitfors == nil { 52 // This may be nil when we're closing. In close, we'll keep 53 // reading this map till it stays nil. 54 p.waitfors = make(map[*processLink]struct{}, 1) 55 } 56 p.waitfors[pl] = struct{}{} 57 go pl.AddToChild() 58} 59 60func (p *process) AddChildNoWait(child Process) { 61 if child == nil { 62 panic("adding nil child process") 63 } 64 65 p.Lock() 66 defer p.Unlock() 67 68 select { 69 case <-p.Closing(): 70 // Either closed or closing, close child immediately. This is 71 // correct because we aren't asked to _wait_ on this child. 72 go child.Close() 73 // Wait for the child to start closing so the child is in the 74 // "correct" state after this function finishes (see #17). 75 <-child.Closing() 76 return 77 default: 78 } 79 80 pl := newProcessLink(p, child) 81 p.children[pl] = struct{}{} 82 go pl.AddToChild() 83} 84 85func (p *process) AddChild(child Process) { 86 if child == nil { 87 panic("adding nil child process") 88 } 89 90 p.Lock() 91 defer p.Unlock() 92 93 pl := newProcessLink(p, child) 94 95 select { 96 case <-p.Closed(): 97 // AddChild must not be called on a dead process. Maybe that's 98 // too strict? 99 panic("Process cannot add children after being closed") 100 default: 101 } 102 103 select { 104 case <-p.Closing(): 105 // Already closing, close child in background. 106 go child.Close() 107 // Wait for the child to start closing so the child is in the 108 // "correct" state after this function finishes (see #17). 109 <-child.Closing() 110 default: 111 // Only add the child when not closing. When closing, just add 112 // it to the "waitfors" list. 113 p.children[pl] = struct{}{} 114 } 115 116 if p.waitfors == nil { 117 // This may be be nil when we're closing. In close, we'll keep 118 // reading this map till it stays nil. 119 p.waitfors = make(map[*processLink]struct{}, 1) 120 } 121 p.waitfors[pl] = struct{}{} 122 go pl.AddToChild() 123} 124 125func (p *process) Go(f ProcessFunc) Process { 126 child := newProcess(nil) 127 waitFor := newProcess(nil) 128 child.WaitFor(waitFor) // prevent child from closing 129 130 // add child last, to prevent a closing parent from 131 // closing all of them prematurely, before running the func. 132 p.AddChild(child) 133 go func() { 134 f(child) 135 waitFor.Close() // allow child to close. 136 child.CloseAfterChildren() // close to tear down. 137 }() 138 return child 139} 140 141// SetTeardown to assign a teardown function 142func (p *process) SetTeardown(tf TeardownFunc) { 143 if tf == nil { 144 panic("cannot set nil TeardownFunc") 145 } 146 147 p.Lock() 148 if p.teardown != nil { 149 panic("cannot SetTeardown twice") 150 } 151 152 p.teardown = tf 153 select { 154 case <-p.Closed(): 155 // Call the teardown function, but don't set the error. We can't 156 // change that after we shut down. 157 tf() 158 default: 159 } 160 p.Unlock() 161} 162 163// Close is the external close function. 164// it's a wrapper around internalClose that waits on Closed() 165func (p *process) Close() error { 166 p.Lock() 167 168 // if already closing, or closed, get out. (but wait!) 169 select { 170 case <-p.Closing(): 171 p.Unlock() 172 <-p.Closed() 173 return p.closeErr 174 default: 175 } 176 177 p.doClose() 178 p.Unlock() 179 return p.closeErr 180} 181 182func (p *process) Closing() <-chan struct{} { 183 return p.closing 184} 185 186func (p *process) Closed() <-chan struct{} { 187 return p.closed 188} 189 190func (p *process) Err() error { 191 <-p.Closed() 192 return p.closeErr 193} 194 195// the _actual_ close process. 196func (p *process) doClose() { 197 // this function is only be called once (protected by p.Lock()). 198 // and it will panic (on closing channels) otherwise. 199 200 close(p.closing) // signal that we're shutting down (Closing) 201 202 // We won't add any children after we start closing so we can do this 203 // once. 204 for plc, _ := range p.children { 205 child := plc.Child() 206 if child != nil { // check because child may already have been removed. 207 go child.Close() // force all children to shut down 208 } 209 210 // safe to call multiple times per link 211 plc.ParentClear() 212 } 213 p.children = nil // clear them. release memory. 214 215 // We may repeatedly continue to add waiters while we wait to close so 216 // we have to do this in a loop. 217 for len(p.waitfors) > 0 { 218 // we must be careful not to iterate over waitfors directly, as it may 219 // change under our feet. 220 wf := p.waitfors 221 p.waitfors = nil // clear them. release memory. 222 for w, _ := range wf { 223 // Here, we wait UNLOCKED, so that waitfors who are in the middle of 224 // adding a child to us can finish. we will immediately close the child. 225 p.Unlock() 226 <-w.ChildClosed() // wait till all waitfors are fully closed (before teardown) 227 p.Lock() 228 229 // safe to call multiple times per link 230 w.ParentClear() 231 } 232 } 233 234 if p.teardown != nil { 235 p.closeErr = p.teardown() // actually run the close logic (ok safe to teardown) 236 } 237 close(p.closed) // signal that we're shut down (Closed) 238 239 // go remove all the parents from the process links. optimization. 240 go func(waiters []*processLink) { 241 for _, pl := range waiters { 242 pl.ClearChild() 243 pr, ok := pl.Parent().(*process) 244 if !ok { 245 // parent has already been called to close 246 continue 247 } 248 pr.Lock() 249 delete(pr.waitfors, pl) 250 delete(pr.children, pl) 251 pr.Unlock() 252 } 253 }(p.waiters) // pass in so 254 p.waiters = nil // clear them. release memory. 255} 256 257// We will only wait on the children we have now. 258// We will not wait on children added subsequently. 259// this may change in the future. 260func (p *process) CloseAfterChildren() error { 261 p.Lock() 262 select { 263 case <-p.Closed(): 264 p.Unlock() 265 return p.Close() // get error. safe, after p.Closed() 266 default: 267 } 268 p.Unlock() 269 270 // here only from one goroutine. 271 272 nextToWaitFor := func() Process { 273 p.Lock() 274 defer p.Unlock() 275 for e, _ := range p.waitfors { 276 c := e.Child() 277 if c == nil { 278 continue 279 } 280 281 select { 282 case <-c.Closed(): 283 default: 284 return c 285 } 286 } 287 return nil 288 } 289 290 // wait for all processes we're waiting for are closed. 291 // the semantics here are simple: we will _only_ close 292 // if there are no processes currently waiting for. 293 for next := nextToWaitFor(); next != nil; next = nextToWaitFor() { 294 <-next.Closed() 295 } 296 297 // YAY! we're done. close 298 return p.Close() 299} 300