1package exec 2 3import ( 4 "context" 5 "errors" 6 "fmt" 7 "os" 8 "runtime/debug" 9 10 "github.com/hashicorp/go-multierror" 11) 12 13// InParallelStep is a step of steps to run in parallel. 14type InParallelStep struct { 15 steps []Step 16 limit int 17 failFast bool 18} 19 20// InParallel constructs an InParallelStep. 21func InParallel(steps []Step, limit int, failFast bool) InParallelStep { 22 if limit < 1 { 23 limit = len(steps) 24 } 25 return InParallelStep{ 26 steps: steps, 27 limit: limit, 28 failFast: failFast, 29 } 30} 31 32// Run executes all steps in order and ensures that the number of running steps 33// does not exceed the optional limit to parallelism. By default the limit is equal 34// to the number of steps, which means all steps will all be executed in parallel. 35// 36// Fail fast can be used to abort running steps if any steps exit with an error. When set 37// to false, parallel wil wait for all the steps to exit even if a step fails or errors. 38// 39// Cancelling a parallel step means that any outstanding steps will not be scheduled to run. 40// After all steps finish, their errors (if any) will be collected and returned as a 41// single error. 42func (step InParallelStep) Run(ctx context.Context, state RunState) error { 43 var ( 44 errs = make(chan error, len(step.steps)) 45 sem = make(chan bool, step.limit) 46 executedSteps int 47 ) 48 49 runCtx, cancel := context.WithCancel(ctx) 50 defer cancel() 51 52 for _, s := range step.steps { 53 s := s 54 sem <- true 55 56 if runCtx.Err() != nil { 57 break 58 } 59 60 go func() { 61 defer func() { 62 if r := recover(); r != nil { 63 err := fmt.Errorf("panic in parallel step: %v", r) 64 65 fmt.Fprintf(os.Stderr, "%s\n %s\n", err.Error(), string(debug.Stack())) 66 errs <- err 67 } 68 }() 69 defer func() { 70 <-sem 71 }() 72 73 errs <- s.Run(runCtx, state) 74 if !s.Succeeded() && step.failFast { 75 cancel() 76 } 77 }() 78 executedSteps++ 79 } 80 81 var result error 82 for i := 0; i < executedSteps; i++ { 83 err := <-errs 84 if err != nil && !errors.Is(err, context.Canceled) { 85 // The Run context being cancelled only means that one or more steps failed, not 86 // in_parallel itself. If we return context.Canceled error messages the step will 87 // be marked as errored instead of failed, and therefore they should be ignored. 88 result = multierror.Append(result, err) 89 } 90 } 91 92 if ctx.Err() != nil { 93 return ctx.Err() 94 } 95 96 if result != nil { 97 return result 98 } 99 100 return nil 101} 102 103// Succeeded is true if all of the steps' Succeeded is true 104func (step InParallelStep) Succeeded() bool { 105 succeeded := true 106 107 for _, step := range step.steps { 108 if !step.Succeeded() { 109 succeeded = false 110 } 111 } 112 113 return succeeded 114} 115