1package grouper_test
2
3import (
4	"errors"
5	"os"
6	"sync/atomic"
7	"syscall"
8	"time"
9
10	"github.com/tedsuo/ifrit"
11	"github.com/tedsuo/ifrit/fake_runner"
12	"github.com/tedsuo/ifrit/grouper"
13
14	. "github.com/onsi/ginkgo"
15	. "github.com/onsi/gomega"
16)
17
18var _ = Describe("Ordered Group", func() {
19	var (
20		started chan struct{}
21
22		groupRunner  ifrit.Runner
23		groupProcess ifrit.Process
24		members      grouper.Members
25
26		childRunner1 *fake_runner.TestRunner
27		childRunner2 *fake_runner.TestRunner
28		childRunner3 *fake_runner.TestRunner
29
30		Δ time.Duration = 10 * time.Millisecond
31	)
32
33	Describe("Start", func() {
34		BeforeEach(func() {
35			childRunner1 = fake_runner.NewTestRunner()
36			childRunner2 = fake_runner.NewTestRunner()
37			childRunner3 = fake_runner.NewTestRunner()
38
39			members = grouper.Members{
40				{"child1", childRunner1},
41				{"child2", childRunner2},
42				{"child3", childRunner3},
43			}
44
45			groupRunner = grouper.NewOrdered(os.Interrupt, members)
46		})
47
48		AfterEach(func() {
49			childRunner1.EnsureExit()
50			childRunner2.EnsureExit()
51			childRunner3.EnsureExit()
52
53			Eventually(started).Should(BeClosed())
54			groupProcess.Signal(os.Kill)
55			Eventually(groupProcess.Wait()).Should(Receive())
56		})
57
58		BeforeEach(func() {
59			started = make(chan struct{})
60			groupProcess = ifrit.Background(groupRunner)
61			go func() {
62				select {
63				case <-groupProcess.Ready():
64				case <-groupProcess.Wait():
65				}
66				close(started)
67			}()
68		})
69
70		It("runs the first runner, then the second, then becomes ready", func() {
71			Eventually(childRunner1.RunCallCount).Should(Equal(1))
72			Consistently(childRunner2.RunCallCount, Δ).Should(BeZero())
73			Consistently(started, Δ).ShouldNot(BeClosed())
74
75			childRunner1.TriggerReady()
76
77			Eventually(childRunner2.RunCallCount).Should(Equal(1))
78			Consistently(childRunner3.RunCallCount, Δ).Should(BeZero())
79			Consistently(started, Δ).ShouldNot(BeClosed())
80
81			childRunner2.TriggerReady()
82
83			Eventually(childRunner3.RunCallCount).Should(Equal(1))
84			Consistently(started, Δ).ShouldNot(BeClosed())
85
86			childRunner3.TriggerReady()
87
88			Eventually(started).Should(BeClosed())
89		})
90
91		Describe("when all the runners are ready", func() {
92			var (
93				signal1 <-chan os.Signal
94				signal2 <-chan os.Signal
95				signal3 <-chan os.Signal
96			)
97
98			BeforeEach(func() {
99				signal1 = childRunner1.WaitForCall()
100				childRunner1.TriggerReady()
101				signal2 = childRunner2.WaitForCall()
102				childRunner2.TriggerReady()
103				signal3 = childRunner3.WaitForCall()
104				childRunner3.TriggerReady()
105
106				Eventually(started).Should(BeClosed())
107			})
108
109			Describe("when it receives a signal", func() {
110				BeforeEach(func() {
111					groupProcess.Signal(syscall.SIGUSR2)
112				})
113
114				It("doesn't send any more signals to remaining child processes", func() {
115					Eventually(signal3).Should(Receive(Equal(syscall.SIGUSR2)))
116					childRunner2.TriggerExit(nil)
117					Consistently(signal3).ShouldNot(Receive())
118				})
119			})
120
121			Describe("when a process exits cleanly", func() {
122				BeforeEach(func() {
123					childRunner1.TriggerExit(nil)
124				})
125
126				It("sends an interrupt signal to the other processes", func() {
127					Eventually(signal3).Should(Receive(Equal(os.Interrupt)))
128					childRunner3.TriggerExit(nil)
129					Eventually(signal2).Should(Receive(Equal(os.Interrupt)))
130				})
131
132				It("does not exit", func() {
133					Consistently(groupProcess.Wait(), Δ).ShouldNot(Receive())
134				})
135
136				Describe("when another process exits", func() {
137					BeforeEach(func() {
138						childRunner2.TriggerExit(nil)
139					})
140
141					It("doesn't send any more signals to remaining child processes", func() {
142						Eventually(signal3).Should(Receive(Equal(os.Interrupt)))
143						Consistently(signal3).ShouldNot(Receive())
144					})
145				})
146
147				Describe("when all of the processes have exited cleanly", func() {
148					BeforeEach(func() {
149						childRunner2.TriggerExit(nil)
150						childRunner3.TriggerExit(nil)
151					})
152
153					It("exits cleanly", func() {
154						Eventually(groupProcess.Wait()).Should(Receive(BeNil()))
155					})
156				})
157
158				Describe("when one of the processes exits with an error", func() {
159					BeforeEach(func() {
160						childRunner2.TriggerExit(errors.New("Fail"))
161						childRunner3.TriggerExit(nil)
162					})
163
164					It("returns an error indicating which child processes failed", func() {
165						var err error
166						Eventually(groupProcess.Wait()).Should(Receive(&err))
167						errTrace := err.(grouper.ErrorTrace)
168						Ω(errTrace).Should(HaveLen(3))
169
170						Ω(errTrace).Should(ContainElement(grouper.ExitEvent{grouper.Member{"child1", childRunner1}, nil}))
171						Ω(errTrace).Should(ContainElement(grouper.ExitEvent{grouper.Member{"child2", childRunner2}, errors.New("Fail")}))
172					})
173				})
174			})
175		})
176
177		Describe("when the first member is started", func() {
178			var signals <-chan os.Signal
179
180			BeforeEach(func() {
181				childRunner1.WaitForCall()
182				childRunner1.TriggerReady()
183			})
184
185			Describe("and the first member exits while second member is setting up", func() {
186				BeforeEach(func() {
187					signals = childRunner2.WaitForCall()
188					childRunner1.TriggerExit(nil)
189				})
190
191				It("should terminate", func() {
192					var signal os.Signal
193					Eventually(signals).Should(Receive(&signal))
194					Expect(signal).To(Equal(syscall.SIGINT))
195				})
196			})
197
198			Describe("and the second member exits before becoming ready", func() {
199				BeforeEach(func() {
200					signals = childRunner1.WaitForCall()
201					childRunner2.TriggerExit(nil)
202				})
203
204				It("should terminate the first runner", func() {
205					var signal os.Signal
206					Eventually(signals).Should(Receive(&signal))
207					Expect(signal).To(Equal(syscall.SIGINT))
208					childRunner1.TriggerExit(nil)
209					var err error
210					Eventually(groupProcess.Wait()).Should(Receive(&err))
211					Expect(err).NotTo(HaveOccurred())
212				})
213			})
214		})
215
216		Describe("Failed start", func() {
217			BeforeEach(func() {
218				signal1 := childRunner1.WaitForCall()
219				childRunner1.TriggerReady()
220				childRunner2.TriggerExit(errors.New("Fail"))
221				Eventually(signal1).Should(Receive(Equal(os.Interrupt)))
222				childRunner1.TriggerExit(nil)
223				Eventually(started).Should(BeClosed())
224			})
225
226			It("exits without starting further processes", func() {
227				var err error
228
229				Eventually(groupProcess.Wait()).Should(Receive(&err))
230				errTrace := err.(grouper.ErrorTrace)
231				Ω(errTrace).Should(ContainElement(grouper.ExitEvent{grouper.Member{"child1", childRunner1}, nil}))
232				Ω(errTrace).Should(ContainElement(grouper.ExitEvent{grouper.Member{"child2", childRunner2}, errors.New("Fail")}))
233				Ω(exitIndex("child1", errTrace)).Should(BeNumerically(">", exitIndex("child2", errTrace)))
234			})
235		})
236	})
237
238	Describe("Stop", func() {
239
240		var runnerIndex int64
241		var startOrder chan int64
242		var stopOrder chan int64
243		var receivedSignals chan os.Signal
244
245		makeRunner := func(waitTime time.Duration) (ifrit.Runner, chan struct{}) {
246			quickExit := make(chan struct{})
247			return ifrit.RunFunc(func(signals <-chan os.Signal, ready chan<- struct{}) error {
248				index := atomic.AddInt64(&runnerIndex, 1)
249				startOrder <- index
250				close(ready)
251
252				select {
253				case <-quickExit:
254				case <-signals:
255				}
256				time.Sleep(waitTime)
257				stopOrder <- index
258
259				return nil
260			}), quickExit
261		}
262
263		makeSignalEchoRunner := func(waitTime time.Duration, name string) ifrit.Runner {
264			return ifrit.RunFunc(func(signals <-chan os.Signal, ready chan<- struct{}) error {
265				close(ready)
266				done := make(chan bool)
267				go func() {
268					time.Sleep(waitTime)
269					done <- true
270				}()
271			L:
272				for {
273					select {
274					case s := <-signals:
275						receivedSignals <- s
276					case _ = <-done:
277						break L
278					}
279				}
280				return nil
281			})
282		}
283
284		Context("when runner receives a single signal", func() {
285			BeforeEach(func() {
286				startOrder = make(chan int64, 3)
287				stopOrder = make(chan int64, 3)
288
289				r1, _ := makeRunner(0)
290				r2, _ := makeRunner(30 * time.Millisecond)
291				r3, _ := makeRunner(50 * time.Millisecond)
292				members = grouper.Members{
293					{"child1", r1},
294					{"child2", r2},
295					{"child3", r3},
296				}
297			})
298
299			AfterEach(func() {
300				groupProcess.Signal(os.Kill)
301				Eventually(groupProcess.Wait()).Should(Receive())
302			})
303
304			JustBeforeEach(func() {
305				groupRunner = grouper.NewOrdered(os.Interrupt, members)
306
307				started = make(chan struct{})
308				go func() {
309					groupProcess = ifrit.Invoke(groupRunner)
310					close(started)
311				}()
312
313				Eventually(started).Should(BeClosed())
314			})
315
316			It("stops in reverse order", func() {
317				groupProcess.Signal(os.Kill)
318				Eventually(groupProcess.Wait()).Should(Receive())
319				close(startOrder)
320				close(stopOrder)
321
322				Ω(startOrder).To(HaveLen(len(stopOrder)))
323
324				order := []int64{}
325				for r := range startOrder {
326					order = append(order, r)
327				}
328
329				for i := len(stopOrder) - 1; i >= 0; i-- {
330					Ω(order[i]).To(Equal(<-stopOrder))
331				}
332			})
333
334			Context("when a runner stops", func() {
335				var quickExit chan struct{}
336
337				BeforeEach(func() {
338					var r1 ifrit.Runner
339					r1, quickExit = makeRunner(0)
340					members[0].Runner = r1
341				})
342
343				It("stops in reverse order", func() {
344					close(quickExit)
345					Eventually(groupProcess.Wait()).Should(Receive())
346					close(startOrder)
347					close(stopOrder)
348
349					Ω(startOrder).To(HaveLen(len(stopOrder)))
350
351					order := []int64{}
352					for r := range startOrder {
353						order = append(order, r)
354					}
355
356					firstDeath := <-stopOrder
357					for i := len(order) - 1; i >= 0; i-- {
358						if order[i] == firstDeath {
359							continue
360						}
361						Ω(order[i]).To(Equal(<-stopOrder))
362					}
363				})
364			})
365		})
366
367		Context("when a runner receives multiple signals", func() {
368			BeforeEach(func() {
369				startOrder = make(chan int64, 2)
370				stopOrder = make(chan int64, 2)
371
372				r1 := makeSignalEchoRunner(200*time.Millisecond, "child1")
373				r2 := makeSignalEchoRunner(100*time.Millisecond, "child2")
374				members = grouper.Members{
375					{"child1", r1},
376					{"child2", r2},
377				}
378			})
379
380			AfterEach(func() {
381				groupProcess.Signal(os.Kill)
382				Eventually(groupProcess.Wait()).Should(Receive())
383			})
384
385			JustBeforeEach(func() {
386				groupRunner = grouper.NewOrdered(os.Interrupt, members)
387
388				started = make(chan struct{})
389				go func() {
390					groupProcess = ifrit.Invoke(groupRunner)
391					close(started)
392				}()
393
394				Eventually(started).Should(BeClosed())
395			})
396
397			Context("of different types", func() {
398
399				BeforeEach(func() {
400					receivedSignals = make(chan os.Signal, 4)
401				})
402
403				It("allows the process to finish gracefully", func() {
404					groupProcess.Signal(syscall.SIGINT)
405					Consistently(groupProcess.Wait(), 20*time.Millisecond, 10*time.Millisecond).ShouldNot(Receive())
406					groupProcess.Signal(syscall.SIGUSR1)
407					Consistently(groupProcess.Wait(), 20*time.Millisecond, 10*time.Millisecond).ShouldNot(Receive())
408					groupProcess.Signal(syscall.SIGUSR2)
409					Consistently(groupProcess.Wait(), 20*time.Millisecond, 10*time.Millisecond).ShouldNot(Receive())
410
411					Eventually(groupProcess.Wait()).Should(Receive())
412
413					signals := []os.Signal{syscall.SIGINT, syscall.SIGUSR1, syscall.SIGUSR2, syscall.SIGUSR2}
414					for _, expectedSig := range signals {
415						actualSig := <-receivedSignals
416						Expect(actualSig).Should(Equal(expectedSig))
417					}
418				})
419			})
420
421			Context("of same type", func() {
422
423				BeforeEach(func() {
424					receivedSignals = make(chan os.Signal, 2)
425				})
426
427				It("allows the process to finish gracefully", func() {
428					groupProcess.Signal(syscall.SIGUSR1)
429					Consistently(groupProcess.Wait(), 20*time.Millisecond, 10*time.Millisecond).ShouldNot(Receive())
430					groupProcess.Signal(syscall.SIGUSR1)
431					Consistently(groupProcess.Wait(), 20*time.Millisecond, 10*time.Millisecond).ShouldNot(Receive())
432					groupProcess.Signal(syscall.SIGUSR1)
433					Consistently(groupProcess.Wait(), 20*time.Millisecond, 10*time.Millisecond).ShouldNot(Receive())
434
435					Eventually(groupProcess.Wait()).Should(Receive())
436
437					signals := []os.Signal{syscall.SIGUSR1, syscall.SIGUSR1}
438					for _, expectedSig := range signals {
439						actualSig := <-receivedSignals
440						Expect(actualSig).Should(Equal(expectedSig))
441					}
442				})
443			})
444		})
445	})
446})
447
448func exitIndex(name string, errTrace grouper.ErrorTrace) int {
449	for i, exitTrace := range errTrace {
450		if exitTrace.Member.Name == name {
451			return i
452		}
453	}
454
455	return -1
456}
457