1package grouper_test 2 3import ( 4 "errors" 5 "os" 6 "sync/atomic" 7 "syscall" 8 "time" 9 10 "github.com/tedsuo/ifrit" 11 "github.com/tedsuo/ifrit/fake_runner" 12 "github.com/tedsuo/ifrit/grouper" 13 14 . "github.com/onsi/ginkgo" 15 . "github.com/onsi/gomega" 16) 17 18var _ = Describe("Ordered Group", func() { 19 var ( 20 started chan struct{} 21 22 groupRunner ifrit.Runner 23 groupProcess ifrit.Process 24 members grouper.Members 25 26 childRunner1 *fake_runner.TestRunner 27 childRunner2 *fake_runner.TestRunner 28 childRunner3 *fake_runner.TestRunner 29 30 Δ time.Duration = 10 * time.Millisecond 31 ) 32 33 Describe("Start", func() { 34 BeforeEach(func() { 35 childRunner1 = fake_runner.NewTestRunner() 36 childRunner2 = fake_runner.NewTestRunner() 37 childRunner3 = fake_runner.NewTestRunner() 38 39 members = grouper.Members{ 40 {"child1", childRunner1}, 41 {"child2", childRunner2}, 42 {"child3", childRunner3}, 43 } 44 45 groupRunner = grouper.NewOrdered(os.Interrupt, members) 46 }) 47 48 AfterEach(func() { 49 childRunner1.EnsureExit() 50 childRunner2.EnsureExit() 51 childRunner3.EnsureExit() 52 53 Eventually(started).Should(BeClosed()) 54 groupProcess.Signal(os.Kill) 55 Eventually(groupProcess.Wait()).Should(Receive()) 56 }) 57 58 BeforeEach(func() { 59 started = make(chan struct{}) 60 groupProcess = ifrit.Background(groupRunner) 61 go func() { 62 select { 63 case <-groupProcess.Ready(): 64 case <-groupProcess.Wait(): 65 } 66 close(started) 67 }() 68 }) 69 70 It("runs the first runner, then the second, then becomes ready", func() { 71 Eventually(childRunner1.RunCallCount).Should(Equal(1)) 72 Consistently(childRunner2.RunCallCount, Δ).Should(BeZero()) 73 Consistently(started, Δ).ShouldNot(BeClosed()) 74 75 childRunner1.TriggerReady() 76 77 Eventually(childRunner2.RunCallCount).Should(Equal(1)) 78 Consistently(childRunner3.RunCallCount, Δ).Should(BeZero()) 79 Consistently(started, Δ).ShouldNot(BeClosed()) 80 81 childRunner2.TriggerReady() 82 83 Eventually(childRunner3.RunCallCount).Should(Equal(1)) 84 Consistently(started, Δ).ShouldNot(BeClosed()) 85 86 childRunner3.TriggerReady() 87 88 Eventually(started).Should(BeClosed()) 89 }) 90 91 Describe("when all the runners are ready", func() { 92 var ( 93 signal1 <-chan os.Signal 94 signal2 <-chan os.Signal 95 signal3 <-chan os.Signal 96 ) 97 98 BeforeEach(func() { 99 signal1 = childRunner1.WaitForCall() 100 childRunner1.TriggerReady() 101 signal2 = childRunner2.WaitForCall() 102 childRunner2.TriggerReady() 103 signal3 = childRunner3.WaitForCall() 104 childRunner3.TriggerReady() 105 106 Eventually(started).Should(BeClosed()) 107 }) 108 109 Describe("when it receives a signal", func() { 110 BeforeEach(func() { 111 groupProcess.Signal(syscall.SIGUSR2) 112 }) 113 114 It("doesn't send any more signals to remaining child processes", func() { 115 Eventually(signal3).Should(Receive(Equal(syscall.SIGUSR2))) 116 childRunner2.TriggerExit(nil) 117 Consistently(signal3).ShouldNot(Receive()) 118 }) 119 }) 120 121 Describe("when a process exits cleanly", func() { 122 BeforeEach(func() { 123 childRunner1.TriggerExit(nil) 124 }) 125 126 It("sends an interrupt signal to the other processes", func() { 127 Eventually(signal3).Should(Receive(Equal(os.Interrupt))) 128 childRunner3.TriggerExit(nil) 129 Eventually(signal2).Should(Receive(Equal(os.Interrupt))) 130 }) 131 132 It("does not exit", func() { 133 Consistently(groupProcess.Wait(), Δ).ShouldNot(Receive()) 134 }) 135 136 Describe("when another process exits", func() { 137 BeforeEach(func() { 138 childRunner2.TriggerExit(nil) 139 }) 140 141 It("doesn't send any more signals to remaining child processes", func() { 142 Eventually(signal3).Should(Receive(Equal(os.Interrupt))) 143 Consistently(signal3).ShouldNot(Receive()) 144 }) 145 }) 146 147 Describe("when all of the processes have exited cleanly", func() { 148 BeforeEach(func() { 149 childRunner2.TriggerExit(nil) 150 childRunner3.TriggerExit(nil) 151 }) 152 153 It("exits cleanly", func() { 154 Eventually(groupProcess.Wait()).Should(Receive(BeNil())) 155 }) 156 }) 157 158 Describe("when one of the processes exits with an error", func() { 159 BeforeEach(func() { 160 childRunner2.TriggerExit(errors.New("Fail")) 161 childRunner3.TriggerExit(nil) 162 }) 163 164 It("returns an error indicating which child processes failed", func() { 165 var err error 166 Eventually(groupProcess.Wait()).Should(Receive(&err)) 167 errTrace := err.(grouper.ErrorTrace) 168 Ω(errTrace).Should(HaveLen(3)) 169 170 Ω(errTrace).Should(ContainElement(grouper.ExitEvent{grouper.Member{"child1", childRunner1}, nil})) 171 Ω(errTrace).Should(ContainElement(grouper.ExitEvent{grouper.Member{"child2", childRunner2}, errors.New("Fail")})) 172 }) 173 }) 174 }) 175 }) 176 177 Describe("when the first member is started", func() { 178 var signals <-chan os.Signal 179 180 BeforeEach(func() { 181 childRunner1.WaitForCall() 182 childRunner1.TriggerReady() 183 }) 184 185 Describe("and the first member exits while second member is setting up", func() { 186 BeforeEach(func() { 187 signals = childRunner2.WaitForCall() 188 childRunner1.TriggerExit(nil) 189 }) 190 191 It("should terminate", func() { 192 var signal os.Signal 193 Eventually(signals).Should(Receive(&signal)) 194 Expect(signal).To(Equal(syscall.SIGINT)) 195 }) 196 }) 197 198 Describe("and the second member exits before becoming ready", func() { 199 BeforeEach(func() { 200 signals = childRunner1.WaitForCall() 201 childRunner2.TriggerExit(nil) 202 }) 203 204 It("should terminate the first runner", func() { 205 var signal os.Signal 206 Eventually(signals).Should(Receive(&signal)) 207 Expect(signal).To(Equal(syscall.SIGINT)) 208 childRunner1.TriggerExit(nil) 209 var err error 210 Eventually(groupProcess.Wait()).Should(Receive(&err)) 211 Expect(err).NotTo(HaveOccurred()) 212 }) 213 }) 214 }) 215 216 Describe("Failed start", func() { 217 BeforeEach(func() { 218 signal1 := childRunner1.WaitForCall() 219 childRunner1.TriggerReady() 220 childRunner2.TriggerExit(errors.New("Fail")) 221 Eventually(signal1).Should(Receive(Equal(os.Interrupt))) 222 childRunner1.TriggerExit(nil) 223 Eventually(started).Should(BeClosed()) 224 }) 225 226 It("exits without starting further processes", func() { 227 var err error 228 229 Eventually(groupProcess.Wait()).Should(Receive(&err)) 230 errTrace := err.(grouper.ErrorTrace) 231 Ω(errTrace).Should(ContainElement(grouper.ExitEvent{grouper.Member{"child1", childRunner1}, nil})) 232 Ω(errTrace).Should(ContainElement(grouper.ExitEvent{grouper.Member{"child2", childRunner2}, errors.New("Fail")})) 233 Ω(exitIndex("child1", errTrace)).Should(BeNumerically(">", exitIndex("child2", errTrace))) 234 }) 235 }) 236 }) 237 238 Describe("Stop", func() { 239 240 var runnerIndex int64 241 var startOrder chan int64 242 var stopOrder chan int64 243 var receivedSignals chan os.Signal 244 245 makeRunner := func(waitTime time.Duration) (ifrit.Runner, chan struct{}) { 246 quickExit := make(chan struct{}) 247 return ifrit.RunFunc(func(signals <-chan os.Signal, ready chan<- struct{}) error { 248 index := atomic.AddInt64(&runnerIndex, 1) 249 startOrder <- index 250 close(ready) 251 252 select { 253 case <-quickExit: 254 case <-signals: 255 } 256 time.Sleep(waitTime) 257 stopOrder <- index 258 259 return nil 260 }), quickExit 261 } 262 263 makeSignalEchoRunner := func(waitTime time.Duration, name string) ifrit.Runner { 264 return ifrit.RunFunc(func(signals <-chan os.Signal, ready chan<- struct{}) error { 265 close(ready) 266 done := make(chan bool) 267 go func() { 268 time.Sleep(waitTime) 269 done <- true 270 }() 271 L: 272 for { 273 select { 274 case s := <-signals: 275 receivedSignals <- s 276 case _ = <-done: 277 break L 278 } 279 } 280 return nil 281 }) 282 } 283 284 Context("when runner receives a single signal", func() { 285 BeforeEach(func() { 286 startOrder = make(chan int64, 3) 287 stopOrder = make(chan int64, 3) 288 289 r1, _ := makeRunner(0) 290 r2, _ := makeRunner(30 * time.Millisecond) 291 r3, _ := makeRunner(50 * time.Millisecond) 292 members = grouper.Members{ 293 {"child1", r1}, 294 {"child2", r2}, 295 {"child3", r3}, 296 } 297 }) 298 299 AfterEach(func() { 300 groupProcess.Signal(os.Kill) 301 Eventually(groupProcess.Wait()).Should(Receive()) 302 }) 303 304 JustBeforeEach(func() { 305 groupRunner = grouper.NewOrdered(os.Interrupt, members) 306 307 started = make(chan struct{}) 308 go func() { 309 groupProcess = ifrit.Invoke(groupRunner) 310 close(started) 311 }() 312 313 Eventually(started).Should(BeClosed()) 314 }) 315 316 It("stops in reverse order", func() { 317 groupProcess.Signal(os.Kill) 318 Eventually(groupProcess.Wait()).Should(Receive()) 319 close(startOrder) 320 close(stopOrder) 321 322 Ω(startOrder).To(HaveLen(len(stopOrder))) 323 324 order := []int64{} 325 for r := range startOrder { 326 order = append(order, r) 327 } 328 329 for i := len(stopOrder) - 1; i >= 0; i-- { 330 Ω(order[i]).To(Equal(<-stopOrder)) 331 } 332 }) 333 334 Context("when a runner stops", func() { 335 var quickExit chan struct{} 336 337 BeforeEach(func() { 338 var r1 ifrit.Runner 339 r1, quickExit = makeRunner(0) 340 members[0].Runner = r1 341 }) 342 343 It("stops in reverse order", func() { 344 close(quickExit) 345 Eventually(groupProcess.Wait()).Should(Receive()) 346 close(startOrder) 347 close(stopOrder) 348 349 Ω(startOrder).To(HaveLen(len(stopOrder))) 350 351 order := []int64{} 352 for r := range startOrder { 353 order = append(order, r) 354 } 355 356 firstDeath := <-stopOrder 357 for i := len(order) - 1; i >= 0; i-- { 358 if order[i] == firstDeath { 359 continue 360 } 361 Ω(order[i]).To(Equal(<-stopOrder)) 362 } 363 }) 364 }) 365 }) 366 367 Context("when a runner receives multiple signals", func() { 368 BeforeEach(func() { 369 startOrder = make(chan int64, 2) 370 stopOrder = make(chan int64, 2) 371 372 r1 := makeSignalEchoRunner(200*time.Millisecond, "child1") 373 r2 := makeSignalEchoRunner(100*time.Millisecond, "child2") 374 members = grouper.Members{ 375 {"child1", r1}, 376 {"child2", r2}, 377 } 378 }) 379 380 AfterEach(func() { 381 groupProcess.Signal(os.Kill) 382 Eventually(groupProcess.Wait()).Should(Receive()) 383 }) 384 385 JustBeforeEach(func() { 386 groupRunner = grouper.NewOrdered(os.Interrupt, members) 387 388 started = make(chan struct{}) 389 go func() { 390 groupProcess = ifrit.Invoke(groupRunner) 391 close(started) 392 }() 393 394 Eventually(started).Should(BeClosed()) 395 }) 396 397 Context("of different types", func() { 398 399 BeforeEach(func() { 400 receivedSignals = make(chan os.Signal, 4) 401 }) 402 403 It("allows the process to finish gracefully", func() { 404 groupProcess.Signal(syscall.SIGINT) 405 Consistently(groupProcess.Wait(), 20*time.Millisecond, 10*time.Millisecond).ShouldNot(Receive()) 406 groupProcess.Signal(syscall.SIGUSR1) 407 Consistently(groupProcess.Wait(), 20*time.Millisecond, 10*time.Millisecond).ShouldNot(Receive()) 408 groupProcess.Signal(syscall.SIGUSR2) 409 Consistently(groupProcess.Wait(), 20*time.Millisecond, 10*time.Millisecond).ShouldNot(Receive()) 410 411 Eventually(groupProcess.Wait()).Should(Receive()) 412 413 signals := []os.Signal{syscall.SIGINT, syscall.SIGUSR1, syscall.SIGUSR2, syscall.SIGUSR2} 414 for _, expectedSig := range signals { 415 actualSig := <-receivedSignals 416 Expect(actualSig).Should(Equal(expectedSig)) 417 } 418 }) 419 }) 420 421 Context("of same type", func() { 422 423 BeforeEach(func() { 424 receivedSignals = make(chan os.Signal, 2) 425 }) 426 427 It("allows the process to finish gracefully", func() { 428 groupProcess.Signal(syscall.SIGUSR1) 429 Consistently(groupProcess.Wait(), 20*time.Millisecond, 10*time.Millisecond).ShouldNot(Receive()) 430 groupProcess.Signal(syscall.SIGUSR1) 431 Consistently(groupProcess.Wait(), 20*time.Millisecond, 10*time.Millisecond).ShouldNot(Receive()) 432 groupProcess.Signal(syscall.SIGUSR1) 433 Consistently(groupProcess.Wait(), 20*time.Millisecond, 10*time.Millisecond).ShouldNot(Receive()) 434 435 Eventually(groupProcess.Wait()).Should(Receive()) 436 437 signals := []os.Signal{syscall.SIGUSR1, syscall.SIGUSR1} 438 for _, expectedSig := range signals { 439 actualSig := <-receivedSignals 440 Expect(actualSig).Should(Equal(expectedSig)) 441 } 442 }) 443 }) 444 }) 445 }) 446}) 447 448func exitIndex(name string, errTrace grouper.ErrorTrace) int { 449 for i, exitTrace := range errTrace { 450 if exitTrace.Member.Name == name { 451 return i 452 } 453 } 454 455 return -1 456} 457