1package fairshare 2 3import ( 4 "fmt" 5 "reflect" 6 "sync" 7 "testing" 8 "time" 9) 10 11func TestFairshare_newDispatcher(t *testing.T) { 12 testCases := []struct { 13 name string 14 numWorkers int 15 expectedNumWorkers int 16 }{ 17 { 18 name: "", 19 numWorkers: 0, 20 expectedNumWorkers: 1, 21 }, 22 { 23 name: "", 24 numWorkers: 10, 25 expectedNumWorkers: 10, 26 }, 27 { 28 name: "test-dispatcher", 29 numWorkers: 10, 30 expectedNumWorkers: 10, 31 }, 32 } 33 34 l := newTestLogger("workerpool-test") 35 for tcNum, tc := range testCases { 36 d := newDispatcher(tc.name, tc.numWorkers, l) 37 38 if tc.name != "" && d.name != tc.name { 39 t.Errorf("tc %d: expected name %s, got %s", tcNum, tc.name, d.name) 40 } 41 if len(d.workers) != tc.expectedNumWorkers { 42 t.Errorf("tc %d: expected %d workers, got %d", tcNum, tc.expectedNumWorkers, len(d.workers)) 43 } 44 if d.jobCh == nil { 45 t.Errorf("tc %d: work channel not set up properly", tcNum) 46 } 47 } 48} 49 50func TestFairshare_createDispatcher(t *testing.T) { 51 testCases := []struct { 52 name string 53 numWorkers int 54 expectedNumWorkers int 55 }{ 56 { 57 name: "", 58 numWorkers: -1, 59 expectedNumWorkers: 1, 60 }, 61 { 62 name: "", 63 numWorkers: 0, 64 expectedNumWorkers: 1, 65 }, 66 { 67 name: "", 68 numWorkers: 10, 69 expectedNumWorkers: 10, 70 }, 71 { 72 name: "", 73 numWorkers: 10, 74 expectedNumWorkers: 10, 75 }, 76 { 77 name: "test-dispatcher", 78 numWorkers: 10, 79 expectedNumWorkers: 10, 80 }, 81 } 82 83 l := newTestLogger("workerpool-test") 84 for tcNum, tc := range testCases { 85 d := createDispatcher(tc.name, tc.numWorkers, l) 86 if d == nil { 87 t.Fatalf("tc %d: expected non-nil object", tcNum) 88 } 89 90 if tc.name != "" && d.name != tc.name { 91 t.Errorf("tc %d: expected name %s, got %s", tcNum, tc.name, d.name) 92 } 93 if len(d.name) == 0 { 94 t.Errorf("tc %d: expected name to be set", tcNum) 95 } 96 if d.numWorkers != tc.expectedNumWorkers { 97 t.Errorf("tc %d: expected %d workers, got %d", tcNum, tc.expectedNumWorkers, d.numWorkers) 98 } 99 if d.workers == nil { 100 t.Errorf("tc %d: expected non-nil workers", tcNum) 101 } 102 if d.jobCh == nil { 103 t.Errorf("tc %d: work channel not set up properly", tcNum) 104 } 105 if d.quit == nil { 106 t.Errorf("tc %d: expected non-nil quit channel", tcNum) 107 } 108 if d.logger == nil { 109 t.Errorf("tc %d: expected non-nil logger", tcNum) 110 } 111 } 112} 113 114func TestFairshare_initDispatcher(t *testing.T) { 115 testCases := []struct { 116 numWorkers int 117 }{ 118 { 119 numWorkers: 1, 120 }, 121 { 122 numWorkers: 10, 123 }, 124 { 125 numWorkers: 100, 126 }, 127 { 128 numWorkers: 1000, 129 }, 130 } 131 132 l := newTestLogger("workerpool-test") 133 for tcNum, tc := range testCases { 134 d := createDispatcher("", tc.numWorkers, l) 135 136 d.init() 137 if len(d.workers) != tc.numWorkers { 138 t.Fatalf("tc %d: expected %d workers, got %d", tcNum, tc.numWorkers, len(d.workers)) 139 } 140 } 141} 142 143func TestFairshare_initializeWorker(t *testing.T) { 144 numWorkers := 3 145 146 d := createDispatcher("", numWorkers, newTestLogger("workerpool-test")) 147 148 for workerNum := 0; workerNum < numWorkers; workerNum++ { 149 d.initializeWorker() 150 151 w := d.workers[workerNum] 152 expectedName := fmt.Sprint("worker-", workerNum) 153 if w.name != expectedName { 154 t.Errorf("tc %d: expected name %s, got %s", workerNum, expectedName, w.name) 155 } 156 if w.jobCh != d.jobCh { 157 t.Errorf("tc %d: work channel not set up properly", workerNum) 158 } 159 if w.quit == nil || w.quit != d.quit { 160 t.Errorf("tc %d: quit channel not set up properly", workerNum) 161 } 162 if w.logger == nil || w.logger != d.logger { 163 t.Errorf("tc %d: logger not set up properly", workerNum) 164 } 165 } 166} 167 168func TestFairshare_startWorker(t *testing.T) { 169 d := newDispatcher("", 1, newTestLogger("workerpool-test")) 170 171 d.workers[0].start() 172 defer d.stop() 173 174 var wg sync.WaitGroup 175 ex := func(_ string) error { 176 wg.Done() 177 return nil 178 } 179 onFail := func(_ error) {} 180 181 job := newTestJob(t, "test job", ex, onFail) 182 183 doneCh := make(chan struct{}) 184 timeout := time.After(5 * time.Second) 185 186 wg.Add(1) 187 d.dispatch(&job, nil, nil) 188 go func() { 189 wg.Wait() 190 doneCh <- struct{}{} 191 }() 192 193 select { 194 case <-doneCh: 195 break 196 case <-timeout: 197 t.Fatal("timed out") 198 } 199} 200 201func TestFairshare_start(t *testing.T) { 202 numJobs := 10 203 var wg sync.WaitGroup 204 ex := func(_ string) error { 205 wg.Done() 206 return nil 207 } 208 onFail := func(_ error) {} 209 210 wg.Add(numJobs) 211 d := newDispatcher("", 3, newTestLogger("workerpool-test")) 212 213 d.start() 214 defer d.stop() 215 216 doneCh := make(chan struct{}) 217 timeout := time.After(5 * time.Second) 218 go func() { 219 wg.Wait() 220 doneCh <- struct{}{} 221 }() 222 223 for i := 0; i < numJobs; i++ { 224 job := newTestJob(t, fmt.Sprintf("job-%d", i), ex, onFail) 225 d.dispatch(&job, nil, nil) 226 } 227 228 select { 229 case <-doneCh: 230 break 231 case <-timeout: 232 t.Fatal("timed out") 233 } 234} 235 236func TestFairshare_stop(t *testing.T) { 237 d := newDispatcher("", 5, newTestLogger("workerpool-test")) 238 239 d.start() 240 241 doneCh := make(chan struct{}) 242 timeout := time.After(5 * time.Second) 243 244 go func() { 245 d.stop() 246 d.wg.Wait() 247 doneCh <- struct{}{} 248 }() 249 250 select { 251 case <-doneCh: 252 break 253 case <-timeout: 254 t.Fatal("timed out") 255 } 256} 257 258func TestFairshare_stopMultiple(t *testing.T) { 259 d := newDispatcher("", 5, newTestLogger("workerpool-test")) 260 261 d.start() 262 263 doneCh := make(chan struct{}) 264 timeout := time.After(5 * time.Second) 265 266 go func() { 267 d.stop() 268 d.wg.Wait() 269 doneCh <- struct{}{} 270 }() 271 272 select { 273 case <-doneCh: 274 break 275 case <-timeout: 276 t.Fatal("timed out") 277 } 278 279 // essentially, we don't want to panic here 280 var r interface{} 281 go func() { 282 t.Helper() 283 284 defer func() { 285 r = recover() 286 doneCh <- struct{}{} 287 }() 288 289 d.stop() 290 d.wg.Wait() 291 }() 292 293 select { 294 case <-doneCh: 295 break 296 case <-timeout: 297 t.Fatal("timed out") 298 } 299 300 if r != nil { 301 t.Fatalf("panic during second stop: %v", r) 302 } 303} 304 305func TestFairshare_dispatch(t *testing.T) { 306 d := newDispatcher("", 1, newTestLogger("workerpool-test")) 307 308 var wg sync.WaitGroup 309 accumulatedIDs := make([]string, 0) 310 ex := func(id string) error { 311 accumulatedIDs = append(accumulatedIDs, id) 312 wg.Done() 313 return nil 314 } 315 onFail := func(_ error) {} 316 317 expectedIDs := []string{"job-1", "job-2", "job-3", "job-4"} 318 go func() { 319 for _, id := range expectedIDs { 320 job := newTestJob(t, id, ex, onFail) 321 d.dispatch(&job, nil, nil) 322 } 323 }() 324 325 wg.Add(len(expectedIDs)) 326 d.start() 327 defer d.stop() 328 329 doneCh := make(chan struct{}) 330 go func() { 331 wg.Wait() 332 doneCh <- struct{}{} 333 }() 334 335 timeout := time.After(5 * time.Second) 336 select { 337 case <-doneCh: 338 break 339 case <-timeout: 340 t.Fatal("timed out") 341 } 342 343 if !reflect.DeepEqual(accumulatedIDs, expectedIDs) { 344 t.Fatalf("bad job ids. expected %v, got %v", expectedIDs, accumulatedIDs) 345 } 346} 347 348func TestFairshare_jobFailure(t *testing.T) { 349 numJobs := 10 350 testErr := fmt.Errorf("test error") 351 var wg sync.WaitGroup 352 353 ex := func(_ string) error { 354 return testErr 355 } 356 onFail := func(err error) { 357 if err != testErr { 358 t.Errorf("got unexpected error. expected %v, got %v", testErr, err) 359 } 360 361 wg.Done() 362 } 363 364 wg.Add(numJobs) 365 d := newDispatcher("", 3, newTestLogger("workerpool-test")) 366 367 d.start() 368 defer d.stop() 369 370 doneCh := make(chan struct{}) 371 timeout := time.After(5 * time.Second) 372 go func() { 373 wg.Wait() 374 doneCh <- struct{}{} 375 }() 376 377 for i := 0; i < numJobs; i++ { 378 job := newTestJob(t, fmt.Sprintf("job-%d", i), ex, onFail) 379 d.dispatch(&job, nil, nil) 380 } 381 382 select { 383 case <-doneCh: 384 break 385 case <-timeout: 386 t.Fatal("timed out") 387 } 388} 389 390func TestFairshare_nilLoggerDispatcher(t *testing.T) { 391 d := newDispatcher("test-job-mgr", 1, nil) 392 if d.logger == nil { 393 t.Error("logger not set up properly") 394 } 395} 396