1// Copyright 2020 The Go Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5package par
6
7import (
8	"sync"
9	"testing"
10)
11
12func TestQueueIdle(t *testing.T) {
13	q := NewQueue(1)
14	select {
15	case <-q.Idle():
16	default:
17		t.Errorf("NewQueue(1) is not initially idle.")
18	}
19
20	started := make(chan struct{})
21	unblock := make(chan struct{})
22	q.Add(func() {
23		close(started)
24		<-unblock
25	})
26
27	<-started
28	idle := q.Idle()
29	select {
30	case <-idle:
31		t.Errorf("NewQueue(1) is marked idle while processing work.")
32	default:
33	}
34
35	close(unblock)
36	<-idle // Should be closed as soon as the Add callback returns.
37}
38
39func TestQueueBacklog(t *testing.T) {
40	const (
41		maxActive = 2
42		totalWork = 3 * maxActive
43	)
44
45	q := NewQueue(maxActive)
46	t.Logf("q = NewQueue(%d)", maxActive)
47
48	var wg sync.WaitGroup
49	wg.Add(totalWork)
50	started := make([]chan struct{}, totalWork)
51	unblock := make(chan struct{})
52	for i := range started {
53		started[i] = make(chan struct{})
54		i := i
55		q.Add(func() {
56			close(started[i])
57			<-unblock
58			wg.Done()
59		})
60	}
61
62	for i, c := range started {
63		if i < maxActive {
64			<-c // Work item i should be started immediately.
65		} else {
66			select {
67			case <-c:
68				t.Errorf("Work item %d started before previous items finished.", i)
69			default:
70			}
71		}
72	}
73
74	close(unblock)
75	for _, c := range started[maxActive:] {
76		<-c
77	}
78	wg.Wait()
79}
80