1package exec
2
3import (
4	"context"
5	"errors"
6	"fmt"
7	"os"
8	"runtime/debug"
9
10	"github.com/hashicorp/go-multierror"
11)
12
13// InParallelStep is a step of steps to run in parallel.
14type InParallelStep struct {
15	steps    []Step
16	limit    int
17	failFast bool
18}
19
20// InParallel constructs an InParallelStep.
21func InParallel(steps []Step, limit int, failFast bool) InParallelStep {
22	if limit < 1 {
23		limit = len(steps)
24	}
25	return InParallelStep{
26		steps:    steps,
27		limit:    limit,
28		failFast: failFast,
29	}
30}
31
32// Run executes all steps in order and ensures that the number of running steps
33// does not exceed the optional limit to parallelism. By default the limit is equal
34// to the number of steps, which means all steps will all be executed in parallel.
35//
36// Fail fast can be used to abort running steps if any steps exit with an error. When set
37// to false, parallel wil wait for all the steps to exit even if a step fails or errors.
38//
39// Cancelling a parallel step means that any outstanding steps will not be scheduled to run.
40// After all steps finish, their errors (if any) will be collected and returned as a
41// single error.
42func (step InParallelStep) Run(ctx context.Context, state RunState) error {
43	var (
44		errs          = make(chan error, len(step.steps))
45		sem           = make(chan bool, step.limit)
46		executedSteps int
47	)
48
49	runCtx, cancel := context.WithCancel(ctx)
50	defer cancel()
51
52	for _, s := range step.steps {
53		s := s
54		sem <- true
55
56		if runCtx.Err() != nil {
57			break
58		}
59
60		go func() {
61			defer func() {
62				if r := recover(); r != nil {
63					err := fmt.Errorf("panic in parallel step: %v", r)
64
65					fmt.Fprintf(os.Stderr, "%s\n %s\n", err.Error(), string(debug.Stack()))
66					errs <- err
67				}
68			}()
69			defer func() {
70				<-sem
71			}()
72
73			errs <- s.Run(runCtx, state)
74			if !s.Succeeded() && step.failFast {
75				cancel()
76			}
77		}()
78		executedSteps++
79	}
80
81	var result error
82	for i := 0; i < executedSteps; i++ {
83		err := <-errs
84		if err != nil && !errors.Is(err, context.Canceled) {
85			// The Run context being cancelled only means that one or more steps failed, not
86			// in_parallel itself. If we return context.Canceled error messages the step will
87			// be marked as errored instead of failed, and therefore they should be ignored.
88			result = multierror.Append(result, err)
89		}
90	}
91
92	if ctx.Err() != nil {
93		return ctx.Err()
94	}
95
96	if result != nil {
97		return result
98	}
99
100	return nil
101}
102
103// Succeeded is true if all of the steps' Succeeded is true
104func (step InParallelStep) Succeeded() bool {
105	succeeded := true
106
107	for _, step := range step.steps {
108		if !step.Succeeded() {
109			succeeded = false
110		}
111	}
112
113	return succeeded
114}
115