1
2 /*
3 * Copyright (C) NGINX, Inc.
4 */
5
6 #include <nxt_main.h>
7 #include <math.h>
8 #include <inttypes.h>
9
10 #ifndef NXT_NCQ_TEST
11 #define NXT_NCQ_TEST 1
12 #endif
13
14 #define NXT_QTEST_USE_THREAD 0
15
16 #if NXT_NCQ_TEST
17 #include <nxt_nncq.h>
18 #else
19 #include <nxt_nvbcq.h>
20 #endif
21
22
23 #define MAX_ITER 20
24 #define STAT_ITER 5
25 #define MIN_COV 0.02
26
27 extern char **environ;
28 static uintptr_t nops = 10000000;
29
30 static uintptr_t nprocs_enq = 0;
31 static uintptr_t nprocs_deq = 0;
32 static uintptr_t nprocs_wenq = 0;
33 static uintptr_t nprocs_wdeq = 0;
34 static uintptr_t nprocs_enq_deq = 0;
35 static uintptr_t nprocs_cas = 0;
36 static uintptr_t nprocs_faa = 0;
37
38 static uintptr_t nprocs = 1;
39
40
41 static size_t
elapsed_time(size_t us)42 elapsed_time(size_t us)
43 {
44 struct timeval t;
45
46 gettimeofday(&t, NULL);
47
48 return t.tv_sec * 1000000 + t.tv_usec - us;
49 }
50
51
52 static double
mean(const double * times,int n)53 mean(const double *times, int n)
54 {
55 int i;
56 double sum;
57
58 sum = 0;
59
60 for (i = 0; i < n; i++) {
61 sum += times[i];
62 }
63
64 return sum / n;
65 }
66
67
68 static double
cov(const double * times,double mean,int n)69 cov(const double *times, double mean, int n)
70 {
71 int i;
72 double variance;
73
74 variance = 0;
75
76 for (i = 0; i < n; i++) {
77 variance += (times[i] - mean) * (times[i] - mean);
78 }
79
80 variance /= n;
81
82 return sqrt(variance) / mean;
83 }
84
85 typedef struct {
86 #if NXT_NCQ_TEST
87 nxt_nncq_t free_queue;
88 nxt_nncq_t active_queue;
89 #else
90 nxt_nvbcq_t free_queue;
91 nxt_nvbcq_t active_queue;
92 #endif
93 uint32_t counter;
94 } nxt_cq_t;
95
96
97 static nxt_cq_t *pgq;
98
99
100 #if NXT_NCQ_TEST
101 #define nxt_cq_enqueue nxt_nncq_enqueue
102 #define nxt_cq_dequeue nxt_nncq_dequeue
103 #define nxt_cq_empty nxt_nncq_empty
104 #define nxt_cq_init nxt_nncq_init
105 #define NXT_CQ_SIZE NXT_NNCQ_SIZE
106 #else
107 #define nxt_cq_enqueue nxt_nvbcq_enqueue
108 #define nxt_cq_dequeue nxt_nvbcq_dequeue
109 #define nxt_cq_empty nxt_nvbcq_empty
110 #define nxt_cq_init nxt_nvbcq_init
111 #define NXT_CQ_SIZE NXT_NVBCQ_SIZE
112 #endif
113
114 typedef struct {
115 int id;
116 uint64_t enq;
117 uint64_t deq;
118 uint64_t wait_enq;
119 uint64_t wait_deq;
120 uint64_t own_res;
121 uint64_t cas;
122 uint64_t faa;
123
124 #if NXT_QTEST_USE_THREAD
125 nxt_thread_handle_t handle;
126 #else
127 nxt_pid_t pid;
128 int status;
129 #endif
130 } nxt_worker_info_t;
131
132
133 static void
cas_worker(void * p)134 cas_worker(void *p)
135 {
136 nxt_cq_t *q;
137 uint32_t c;
138 uintptr_t i;
139 nxt_worker_info_t *wi;
140
141 q = pgq;
142 wi = p;
143
144 for (i = 0; i < nops / nprocs_cas; i++) {
145 c = q->counter;
146
147 if (nxt_atomic_cmp_set(&q->counter, c, c + 1)) {
148 ++wi->cas;
149 }
150 }
151 }
152
153
154 static void
faa_worker(void * p)155 faa_worker(void *p)
156 {
157 nxt_cq_t *q;
158 uintptr_t i;
159 nxt_worker_info_t *wi;
160
161 q = pgq;
162 wi = p;
163
164 for (i = 0; i < nops / nprocs_faa; i++) {
165 nxt_atomic_fetch_add(&q->counter, 1);
166 wi->faa++;
167 }
168 }
169
170
171 static void
enq_deq_worker(void * p)172 enq_deq_worker(void *p)
173 {
174 nxt_cq_t *q;
175 uintptr_t i, v;
176 nxt_worker_info_t *wi;
177
178 q = pgq;
179 wi = p;
180
181 for (i = 0; i < nops / nprocs_enq_deq; i++) {
182 v = nxt_cq_dequeue(&q->free_queue);
183
184 if (v != nxt_cq_empty(&q->free_queue)) {
185 nxt_cq_enqueue(&q->active_queue, wi->id);
186 wi->enq++;
187 }
188
189 v = nxt_cq_dequeue(&q->active_queue);
190
191 if (v != nxt_cq_empty(&q->active_queue)) {
192 nxt_cq_enqueue(&q->free_queue, v);
193 wi->deq++;
194
195 if ((int) v == wi->id) {
196 wi->own_res++;
197 }
198 }
199 }
200 }
201
202
203 static void
enq_worker(void * p)204 enq_worker(void *p)
205 {
206 nxt_cq_t *q;
207 uintptr_t i, v;
208 nxt_worker_info_t *wi;
209
210 q = pgq;
211 wi = p;
212
213 for (i = 0; i < nops / nprocs_enq; i++) {
214 v = nxt_cq_dequeue(&q->free_queue);
215
216 if (v != nxt_cq_empty(&q->free_queue)) {
217 nxt_cq_enqueue(&q->active_queue, v);
218 wi->enq++;
219 }
220 }
221 }
222
223
224 static void
deq_worker(void * p)225 deq_worker(void *p)
226 {
227 nxt_cq_t *q;
228 uintptr_t i, v;
229 nxt_worker_info_t *wi;
230
231 q = pgq;
232 wi = p;
233
234 for (i = 0; i < nops / nprocs_deq; i++) {
235 v = nxt_cq_dequeue(&q->active_queue);
236
237 if (v != nxt_cq_empty(&q->active_queue)) {
238 nxt_cq_enqueue(&q->free_queue, v);
239 ++wi->deq;
240 }
241 }
242 }
243
244
245 static void
wenq_worker(void * p)246 wenq_worker(void *p)
247 {
248 nxt_cq_t *q;
249 uintptr_t i, v;
250 nxt_worker_info_t *wi;
251
252 q = pgq;
253 wi = p;
254
255 for (i = 0; i < nops / nprocs_wenq; i++) {
256
257 do {
258 wi->wait_enq++;
259 v = nxt_cq_dequeue(&q->free_queue);
260 } while (v == nxt_cq_empty(&q->free_queue));
261
262 nxt_cq_enqueue(&q->active_queue, v);
263
264 wi->enq++;
265 wi->wait_enq--;
266 }
267 }
268
269
270 static void
wdeq_worker(void * p)271 wdeq_worker(void *p)
272 {
273 nxt_cq_t *q;
274 uintptr_t i, v;
275 nxt_worker_info_t *wi;
276
277 q = pgq;
278 wi = p;
279
280 for (i = 0; i < nops / nprocs_wdeq; i++) {
281
282 do {
283 wi->wait_deq++;
284 v = nxt_cq_dequeue(&q->active_queue);
285 } while (v == nxt_cq_empty(&q->active_queue));
286
287 nxt_cq_enqueue(&q->free_queue, v);
288
289 wi->deq++;
290 wi->wait_deq--;
291 }
292 }
293
294
295 static nxt_int_t
worker_create(nxt_worker_info_t * wi,int id,nxt_thread_start_t start)296 worker_create(nxt_worker_info_t *wi, int id, nxt_thread_start_t start)
297 {
298 wi->id = id;
299
300 #if NXT_QTEST_USE_THREAD
301 nxt_thread_link_t *link;
302
303 link = nxt_zalloc(sizeof(nxt_thread_link_t));
304
305 link->start = start;
306 link->work.data = wi;
307
308 return nxt_thread_create(&wi->handle, link);
309
310 #else
311 pid_t pid = fork();
312
313 if (pid == 0) {
314 start(wi);
315 exit(0);
316
317 } else {
318 wi->pid = pid;
319 }
320
321 return NXT_OK;
322 #endif
323 }
324
325
326 static void
worker_wait(nxt_worker_info_t * wi)327 worker_wait(nxt_worker_info_t *wi)
328 {
329 #if NXT_QTEST_USE_THREAD
330 pthread_join(wi->handle, NULL);
331
332 #else
333 waitpid(wi->pid, &wi->status, 0);
334 #endif
335 }
336
337
338 int nxt_cdecl
main(int argc,char ** argv)339 main(int argc, char **argv)
340 {
341 int i, k, id, verbose, objective, rk;
342 char *a;
343 size_t start, elapsed;
344 double *stats, m, c;
345 uint64_t total_ops;
346 uintptr_t j;
347 nxt_task_t task;
348 nxt_thread_t *thr;
349 nxt_worker_info_t *wi;
350 double times[MAX_ITER], mopsec[MAX_ITER];
351
352 verbose = 0;
353 objective = 0;
354
355 for (i = 1; i < argc; i++) {
356 a = argv[i];
357
358 if (strcmp(a, "-v") == 0) {
359 verbose++;
360 continue;
361 }
362
363 if (strcmp(a, "-n") == 0 && (i + 1) < argc) {
364 nops = atoi(argv[++i]);
365 continue;
366 }
367
368 if (strcmp(a, "--enq") == 0 && (i + 1) < argc) {
369 nprocs_enq = atoi(argv[++i]);
370 continue;
371 }
372
373 if (strcmp(a, "--deq") == 0 && (i + 1) < argc) {
374 nprocs_deq = atoi(argv[++i]);
375 continue;
376 }
377
378 if (strcmp(a, "--wenq") == 0 && (i + 1) < argc) {
379 nprocs_wenq = atoi(argv[++i]);
380 continue;
381 }
382
383 if (strcmp(a, "--wdeq") == 0 && (i + 1) < argc) {
384 nprocs_wdeq = atoi(argv[++i]);
385 continue;
386 }
387
388 if (strcmp(a, "--ed") == 0 && (i + 1) < argc) {
389 nprocs_enq_deq = atoi(argv[++i]);
390 continue;
391 }
392
393 if (strcmp(a, "--cas") == 0 && (i + 1) < argc) {
394 nprocs_cas = atoi(argv[++i]);
395 continue;
396 }
397
398 if (strcmp(a, "--faa") == 0 && (i + 1) < argc) {
399 nprocs_faa = atoi(argv[++i]);
400 continue;
401 }
402
403 if (strcmp(a, "--obj") == 0 && (i + 1) < argc) {
404 objective = atoi(argv[++i]);
405 continue;
406 }
407
408 printf("unknown option %s", a);
409
410 return 1;
411 }
412
413 if (nxt_lib_start("ncq_test", argv, &environ) != NXT_OK) {
414 return 1;
415 }
416
417 nprocs = nprocs_enq + nprocs_deq + nprocs_wenq + nprocs_wdeq
418 + nprocs_enq_deq + nprocs_cas + nprocs_faa;
419
420 if (nprocs == 0) {
421 return 0;
422 }
423
424 nxt_main_log.level = NXT_LOG_INFO;
425 task.log = &nxt_main_log;
426
427 thr = nxt_thread();
428 thr->task = &task;
429
430 pgq = mmap(NULL, sizeof(nxt_cq_t), PROT_READ | PROT_WRITE,
431 MAP_ANON | MAP_SHARED, -1, 0);
432 if (pgq == MAP_FAILED) {
433 return 2;
434 }
435
436 nxt_cq_init(&pgq->free_queue);
437 nxt_cq_init(&pgq->active_queue);
438
439 for(i = 0; i < NXT_CQ_SIZE; i++) {
440 nxt_cq_enqueue(&pgq->free_queue, i);
441 }
442
443 if (verbose >= 1) {
444 printf("number of workers: %d\n", (int) nprocs);
445 printf("number of ops: %d\n", (int) nops);
446 }
447
448 wi = mmap(NULL, nprocs * sizeof(nxt_worker_info_t), PROT_READ | PROT_WRITE,
449 MAP_ANON | MAP_SHARED, -1, 0);
450 if (wi == MAP_FAILED) {
451 return 3;
452 }
453
454 for (k = 0; k < MAX_ITER; k++) {
455 nxt_memzero(wi, nprocs * sizeof(nxt_worker_info_t));
456
457 nxt_cq_init(&pgq->free_queue);
458 nxt_cq_init(&pgq->active_queue);
459
460 for(i = 0; i < NXT_CQ_SIZE; i++) {
461 nxt_cq_enqueue(&pgq->free_queue, i);
462 }
463
464 start = elapsed_time(0);
465
466 id = 0;
467
468 for (j = 0; j < nprocs_enq; j++, id++) {
469 worker_create(wi + id, id, enq_worker);
470 }
471
472 for (j = 0; j < nprocs_deq; j++, id++) {
473 worker_create(wi + id, id, deq_worker);
474 }
475
476 for (j = 0; j < nprocs_wenq; j++, id++) {
477 worker_create(wi + id, id, wenq_worker);
478 }
479
480 for (j = 0; j < nprocs_wdeq; j++, id++) {
481 worker_create(wi + id, id, wdeq_worker);
482 }
483
484 for (j = 0; j < nprocs_enq_deq; j++, id++) {
485 worker_create(wi + id, id, enq_deq_worker);
486 }
487
488 for (j = 0; j < nprocs_cas; j++, id++) {
489 worker_create(wi + id, id, cas_worker);
490 }
491
492 for (j = 0; j < nprocs_faa; j++, id++) {
493 worker_create(wi + id, id, faa_worker);
494 }
495
496 for (j = 0; j < nprocs; j++) {
497 worker_wait(wi + j);
498 }
499
500 elapsed = elapsed_time(start);
501
502 for (j = 1; j < nprocs; j++) {
503 wi[0].enq += wi[j].enq;
504 wi[0].deq += wi[j].deq;
505 wi[0].wait_enq += wi[j].wait_enq;
506 wi[0].wait_deq += wi[j].wait_deq;
507 wi[0].own_res += wi[j].own_res;
508 wi[0].cas += wi[j].cas;
509 wi[0].faa += wi[j].faa;
510 }
511
512 total_ops = wi[0].enq + wi[0].deq + wi[0].cas + wi[0].faa;
513
514 if (total_ops == 0) {
515 total_ops = nops;
516 }
517
518 times[k] = elapsed / 1000.0;
519 mopsec[k] = (double) total_ops / elapsed;
520
521 if (verbose >= 2) {
522 printf("enq %10"PRIu64"\n", wi[0].enq);
523 printf("deq %10"PRIu64"\n", wi[0].deq);
524 printf("wait_enq %10"PRIu64"\n", wi[0].wait_enq);
525 printf("wait_deq %10"PRIu64"\n", wi[0].wait_deq);
526 printf("own_res %10"PRIu64"\n", wi[0].own_res);
527 printf("cas %10"PRIu64"\n", wi[0].cas);
528 printf("faa %10"PRIu64"\n", wi[0].faa);
529 printf("total ops %10"PRIu64"\n", total_ops);
530 printf("Mops/sec %13.2f\n", mopsec[k]);
531
532 printf("elapsed %10d us\n", (int) elapsed);
533 printf("per op %10d ns\n", (int) ((1000 * elapsed) / total_ops));
534 }
535
536 if (k >= STAT_ITER) {
537 stats = (objective == 0) ? times : mopsec;
538
539 m = mean(stats + k - STAT_ITER, STAT_ITER);
540 c = cov(stats + k - STAT_ITER, m, STAT_ITER);
541
542 if (verbose >= 1) {
543 if (objective == 0) {
544 printf(" #%02d elapsed time: %.2f ms; Mops/sec %.2f; "
545 "mean time %.2f ms; cov %.4f\n",
546 (int) k + 1, times[k], mopsec[k], m, c);
547
548 } else {
549 printf(" #%02d elapsed time: %.2f ms; Mops/sec %.2f; "
550 "mean Mop/sec %.2f; cov %.4f\n",
551 (int) k + 1, times[k], mopsec[k], m, c);
552 }
553 }
554
555 if (c < MIN_COV) {
556 rk = k - STAT_ITER;
557
558 for (i = rk + 1; i <= k; i++) {
559 if (fabs(stats[i] - m) < fabs(stats[rk] - m)) {
560 rk = i;
561 }
562 }
563
564 printf("#%d %.2f ms; %.2f\n", rk, times[rk], mopsec[rk]);
565
566 return 0;
567 }
568
569 } else {
570 if (verbose >= 1) {
571 printf(" #%02d elapsed time: %.2f ms; Mops/sec %.2f\n",
572 (int) k + 1, times[k], mopsec[k]);
573 }
574 }
575 }
576
577 return 0;
578 }
579