1package fairshare
2
3import (
4	"fmt"
5	"reflect"
6	"sync"
7	"testing"
8	"time"
9)
10
11func TestFairshare_newDispatcher(t *testing.T) {
12	testCases := []struct {
13		name               string
14		numWorkers         int
15		expectedNumWorkers int
16	}{
17		{
18			name:               "",
19			numWorkers:         0,
20			expectedNumWorkers: 1,
21		},
22		{
23			name:               "",
24			numWorkers:         10,
25			expectedNumWorkers: 10,
26		},
27		{
28			name:               "test-dispatcher",
29			numWorkers:         10,
30			expectedNumWorkers: 10,
31		},
32	}
33
34	l := newTestLogger("workerpool-test")
35	for tcNum, tc := range testCases {
36		d := newDispatcher(tc.name, tc.numWorkers, l)
37
38		if tc.name != "" && d.name != tc.name {
39			t.Errorf("tc %d: expected name %s, got %s", tcNum, tc.name, d.name)
40		}
41		if len(d.workers) != tc.expectedNumWorkers {
42			t.Errorf("tc %d: expected %d workers, got %d", tcNum, tc.expectedNumWorkers, len(d.workers))
43		}
44		if d.jobCh == nil {
45			t.Errorf("tc %d: work channel not set up properly", tcNum)
46		}
47	}
48}
49
50func TestFairshare_createDispatcher(t *testing.T) {
51	testCases := []struct {
52		name               string
53		numWorkers         int
54		expectedNumWorkers int
55	}{
56		{
57			name:               "",
58			numWorkers:         -1,
59			expectedNumWorkers: 1,
60		},
61		{
62			name:               "",
63			numWorkers:         0,
64			expectedNumWorkers: 1,
65		},
66		{
67			name:               "",
68			numWorkers:         10,
69			expectedNumWorkers: 10,
70		},
71		{
72			name:               "",
73			numWorkers:         10,
74			expectedNumWorkers: 10,
75		},
76		{
77			name:               "test-dispatcher",
78			numWorkers:         10,
79			expectedNumWorkers: 10,
80		},
81	}
82
83	l := newTestLogger("workerpool-test")
84	for tcNum, tc := range testCases {
85		d := createDispatcher(tc.name, tc.numWorkers, l)
86		if d == nil {
87			t.Fatalf("tc %d: expected non-nil object", tcNum)
88		}
89
90		if tc.name != "" && d.name != tc.name {
91			t.Errorf("tc %d: expected name %s, got %s", tcNum, tc.name, d.name)
92		}
93		if len(d.name) == 0 {
94			t.Errorf("tc %d: expected name to be set", tcNum)
95		}
96		if d.numWorkers != tc.expectedNumWorkers {
97			t.Errorf("tc %d: expected %d workers, got %d", tcNum, tc.expectedNumWorkers, d.numWorkers)
98		}
99		if d.workers == nil {
100			t.Errorf("tc %d: expected non-nil workers", tcNum)
101		}
102		if d.jobCh == nil {
103			t.Errorf("tc %d: work channel not set up properly", tcNum)
104		}
105		if d.quit == nil {
106			t.Errorf("tc %d: expected non-nil quit channel", tcNum)
107		}
108		if d.logger == nil {
109			t.Errorf("tc %d: expected non-nil logger", tcNum)
110		}
111	}
112}
113
114func TestFairshare_initDispatcher(t *testing.T) {
115	testCases := []struct {
116		numWorkers int
117	}{
118		{
119			numWorkers: 1,
120		},
121		{
122			numWorkers: 10,
123		},
124		{
125			numWorkers: 100,
126		},
127		{
128			numWorkers: 1000,
129		},
130	}
131
132	l := newTestLogger("workerpool-test")
133	for tcNum, tc := range testCases {
134		d := createDispatcher("", tc.numWorkers, l)
135
136		d.init()
137		if len(d.workers) != tc.numWorkers {
138			t.Fatalf("tc %d: expected %d workers, got %d", tcNum, tc.numWorkers, len(d.workers))
139		}
140	}
141}
142
143func TestFairshare_initializeWorker(t *testing.T) {
144	numWorkers := 3
145
146	d := createDispatcher("", numWorkers, newTestLogger("workerpool-test"))
147
148	for workerNum := 0; workerNum < numWorkers; workerNum++ {
149		d.initializeWorker()
150
151		w := d.workers[workerNum]
152		expectedName := fmt.Sprint("worker-", workerNum)
153		if w.name != expectedName {
154			t.Errorf("tc %d: expected name %s, got %s", workerNum, expectedName, w.name)
155		}
156		if w.jobCh != d.jobCh {
157			t.Errorf("tc %d: work channel not set up properly", workerNum)
158		}
159		if w.quit == nil || w.quit != d.quit {
160			t.Errorf("tc %d: quit channel not set up properly", workerNum)
161		}
162		if w.logger == nil || w.logger != d.logger {
163			t.Errorf("tc %d: logger not set up properly", workerNum)
164		}
165	}
166}
167
168func TestFairshare_startWorker(t *testing.T) {
169	d := newDispatcher("", 1, newTestLogger("workerpool-test"))
170
171	d.workers[0].start()
172	defer d.stop()
173
174	var wg sync.WaitGroup
175	ex := func(_ string) error {
176		wg.Done()
177		return nil
178	}
179	onFail := func(_ error) {}
180
181	job := newTestJob(t, "test job", ex, onFail)
182
183	doneCh := make(chan struct{})
184	timeout := time.After(5 * time.Second)
185
186	wg.Add(1)
187	d.dispatch(&job, nil, nil)
188	go func() {
189		wg.Wait()
190		doneCh <- struct{}{}
191	}()
192
193	select {
194	case <-doneCh:
195		break
196	case <-timeout:
197		t.Fatal("timed out")
198	}
199}
200
201func TestFairshare_start(t *testing.T) {
202	numJobs := 10
203	var wg sync.WaitGroup
204	ex := func(_ string) error {
205		wg.Done()
206		return nil
207	}
208	onFail := func(_ error) {}
209
210	wg.Add(numJobs)
211	d := newDispatcher("", 3, newTestLogger("workerpool-test"))
212
213	d.start()
214	defer d.stop()
215
216	doneCh := make(chan struct{})
217	timeout := time.After(5 * time.Second)
218	go func() {
219		wg.Wait()
220		doneCh <- struct{}{}
221	}()
222
223	for i := 0; i < numJobs; i++ {
224		job := newTestJob(t, fmt.Sprintf("job-%d", i), ex, onFail)
225		d.dispatch(&job, nil, nil)
226	}
227
228	select {
229	case <-doneCh:
230		break
231	case <-timeout:
232		t.Fatal("timed out")
233	}
234}
235
236func TestFairshare_stop(t *testing.T) {
237	d := newDispatcher("", 5, newTestLogger("workerpool-test"))
238
239	d.start()
240
241	doneCh := make(chan struct{})
242	timeout := time.After(5 * time.Second)
243
244	go func() {
245		d.stop()
246		d.wg.Wait()
247		doneCh <- struct{}{}
248	}()
249
250	select {
251	case <-doneCh:
252		break
253	case <-timeout:
254		t.Fatal("timed out")
255	}
256}
257
258func TestFairshare_stopMultiple(t *testing.T) {
259	d := newDispatcher("", 5, newTestLogger("workerpool-test"))
260
261	d.start()
262
263	doneCh := make(chan struct{})
264	timeout := time.After(5 * time.Second)
265
266	go func() {
267		d.stop()
268		d.wg.Wait()
269		doneCh <- struct{}{}
270	}()
271
272	select {
273	case <-doneCh:
274		break
275	case <-timeout:
276		t.Fatal("timed out")
277	}
278
279	// essentially, we don't want to panic here
280	var r interface{}
281	go func() {
282		t.Helper()
283
284		defer func() {
285			r = recover()
286			doneCh <- struct{}{}
287		}()
288
289		d.stop()
290		d.wg.Wait()
291	}()
292
293	select {
294	case <-doneCh:
295		break
296	case <-timeout:
297		t.Fatal("timed out")
298	}
299
300	if r != nil {
301		t.Fatalf("panic during second stop: %v", r)
302	}
303}
304
305func TestFairshare_dispatch(t *testing.T) {
306	d := newDispatcher("", 1, newTestLogger("workerpool-test"))
307
308	var wg sync.WaitGroup
309	accumulatedIDs := make([]string, 0)
310	ex := func(id string) error {
311		accumulatedIDs = append(accumulatedIDs, id)
312		wg.Done()
313		return nil
314	}
315	onFail := func(_ error) {}
316
317	expectedIDs := []string{"job-1", "job-2", "job-3", "job-4"}
318	go func() {
319		for _, id := range expectedIDs {
320			job := newTestJob(t, id, ex, onFail)
321			d.dispatch(&job, nil, nil)
322		}
323	}()
324
325	wg.Add(len(expectedIDs))
326	d.start()
327	defer d.stop()
328
329	doneCh := make(chan struct{})
330	go func() {
331		wg.Wait()
332		doneCh <- struct{}{}
333	}()
334
335	timeout := time.After(5 * time.Second)
336	select {
337	case <-doneCh:
338		break
339	case <-timeout:
340		t.Fatal("timed out")
341	}
342
343	if !reflect.DeepEqual(accumulatedIDs, expectedIDs) {
344		t.Fatalf("bad job ids. expected %v, got %v", expectedIDs, accumulatedIDs)
345	}
346}
347
348func TestFairshare_jobFailure(t *testing.T) {
349	numJobs := 10
350	testErr := fmt.Errorf("test error")
351	var wg sync.WaitGroup
352
353	ex := func(_ string) error {
354		return testErr
355	}
356	onFail := func(err error) {
357		if err != testErr {
358			t.Errorf("got unexpected error. expected %v, got %v", testErr, err)
359		}
360
361		wg.Done()
362	}
363
364	wg.Add(numJobs)
365	d := newDispatcher("", 3, newTestLogger("workerpool-test"))
366
367	d.start()
368	defer d.stop()
369
370	doneCh := make(chan struct{})
371	timeout := time.After(5 * time.Second)
372	go func() {
373		wg.Wait()
374		doneCh <- struct{}{}
375	}()
376
377	for i := 0; i < numJobs; i++ {
378		job := newTestJob(t, fmt.Sprintf("job-%d", i), ex, onFail)
379		d.dispatch(&job, nil, nil)
380	}
381
382	select {
383	case <-doneCh:
384		break
385	case <-timeout:
386		t.Fatal("timed out")
387	}
388}
389
390func TestFairshare_nilLoggerDispatcher(t *testing.T) {
391	d := newDispatcher("test-job-mgr", 1, nil)
392	if d.logger == nil {
393		t.Error("logger not set up properly")
394	}
395}
396