1// Copyright 2020 The Go Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5package par
6
7import "fmt"
8
9// Queue manages a set of work items to be executed in parallel. The number of
10// active work items is limited, and excess items are queued sequentially.
11type Queue struct {
12	maxActive int
13	st        chan queueState
14}
15
16type queueState struct {
17	active  int // number of goroutines processing work; always nonzero when len(backlog) > 0
18	backlog []func()
19	idle    chan struct{} // if non-nil, closed when active becomes 0
20}
21
22// NewQueue returns a Queue that executes up to maxActive items in parallel.
23//
24// maxActive must be positive.
25func NewQueue(maxActive int) *Queue {
26	if maxActive < 1 {
27		panic(fmt.Sprintf("par.NewQueue called with nonpositive limit (%d)", maxActive))
28	}
29
30	q := &Queue{
31		maxActive: maxActive,
32		st:        make(chan queueState, 1),
33	}
34	q.st <- queueState{}
35	return q
36}
37
38// Add adds f as a work item in the queue.
39//
40// Add returns immediately, but the queue will be marked as non-idle until after
41// f (and any subsequently-added work) has completed.
42func (q *Queue) Add(f func()) {
43	st := <-q.st
44	if st.active == q.maxActive {
45		st.backlog = append(st.backlog, f)
46		q.st <- st
47		return
48	}
49	if st.active == 0 {
50		// Mark q as non-idle.
51		st.idle = nil
52	}
53	st.active++
54	q.st <- st
55
56	go func() {
57		for {
58			f()
59
60			st := <-q.st
61			if len(st.backlog) == 0 {
62				if st.active--; st.active == 0 && st.idle != nil {
63					close(st.idle)
64				}
65				q.st <- st
66				return
67			}
68			f, st.backlog = st.backlog[0], st.backlog[1:]
69			q.st <- st
70		}
71	}()
72}
73
74// Idle returns a channel that will be closed when q has no (active or enqueued)
75// work outstanding.
76func (q *Queue) Idle() <-chan struct{} {
77	st := <-q.st
78	defer func() { q.st <- st }()
79
80	if st.idle == nil {
81		st.idle = make(chan struct{})
82		if st.active == 0 {
83			close(st.idle)
84		}
85	}
86
87	return st.idle
88}
89