xref: /freebsd/sys/netpfil/ipfw/test/main.c (revision 0957b409)
1 /*
2  * $FreeBSD$
3  *
4  * Testing program for schedulers
5  *
6  * The framework include a simple controller which, at each
7  * iteration, decides whether we can enqueue and/or dequeue.
8  * Then the mainloop runs the required number of tests,
9  * keeping track of statistics.
10  */
11 
12 // #define USE_BURST	// what is this for ?
13 
14 #include "dn_test.h"
15 
16 struct cfg_s {
17 	int ac;
18 	char * const *av;
19 
20 	const char *name;
21 	int loops;
22 	struct timeval time;
23 
24 	/* running counters */
25 	uint32_t	_enqueue;
26 	uint32_t	drop;
27 	uint32_t	pending;
28 	uint32_t	dequeue;
29 
30 	/* generator parameters */
31 	int32_t th_min, th_max;	/* thresholds for hysteresis; negative means per flow */
32 #ifdef USE_BURST
33 	int maxburst;
34 #endif /* USE_BURST */
35 	int lmin, lmax;	/* packet len */
36 	int flows;	/* number of flows */
37 	int flowsets;	/* number of flowsets */
38 	int wsum;	/* sum of weights of all flows */
39 #ifdef USE_CUR
40 	int max_y;	/* max random number in the generation */
41 	int cur_y
42 	int cur_fs;	/* used in generation, between 0 and max_y - 1 */
43 #endif /* USE_CUR */
44 	const char *fs_config; /* flowset config */
45 	int can_dequeue;
46 	int burst;	/* count of packets sent in a burst */
47 	struct mbuf *tosend;	/* packet to send -- also flag to enqueue */
48 
49 	struct mbuf *freelist;
50 
51 	struct mbuf *head, *tail;	/* a simple tailq */
52 
53 	/* scheduler hooks */
54 	int (*enq)(struct dn_sch_inst *, struct dn_queue *,
55 		struct mbuf *);
56 	struct mbuf * (*deq)(struct dn_sch_inst *);
57 	/* size of the three fields including sched-specific areas */
58 	uint32_t schk_len;
59 	uint32_t q_len; /* size of a queue including sched-fields */
60 	uint32_t si_len; /* size of a sch_inst including sched-fields */
61 	char *q;	/* array of flow queues */
62 		/* use a char* because size is variable */
63 	/*
64 	 * The scheduler template (one) followd by schk_datalen bytes
65 	 * for scheduler-specific parameters, total size is schk_len
66 	 */
67 	struct dn_schk *sched;
68 	/*
69 	 * one scheduler instance, followed by si_datalen bytes
70 	 * for scheduler specific parameters of this instance,
71 	 * total size is si_len. si->sched points to sched
72 	 */
73 	struct dn_sch_inst *si;
74 	struct dn_fsk *fs;	/* array of flowsets */
75 
76 	/* generator state */
77 	int state;	/* 0 = going up (enqueue), 1: going down (dequeue) */
78 
79 	/*
80 	 * We keep lists for each backlog level, and always serve
81 	 * the one with shortest backlog. llmask contains a bitmap
82 	 * of lists, and ll are the heads of the lists. The last
83 	 * entry (BACKLOG) contains all entries considered 'full'
84 	 * XXX to optimize things, entry i could contain queues with
85 	 * 2^{i-1}+1 .. 2^i entries.
86 	 */
87 #define BACKLOG	30 /* this many backlogged classes, we only need BACKLOG+1 */
88 	uint64_t	llmask;
89 	struct list_head ll[BACKLOG + 10];
90 
91 	double *q_wfi;	/* (byte) Worst-case Fair Index of the flows  */
92 	double wfi;	/* (byte) Worst-case Fair Index of the system */
93 };
94 
95 /* FI2Q and Q2FI converts from flow_id (i.e. queue index)
96  * to dn_queue and back. We cannot simply use pointer arithmetic
97  * because the queu has variable size, q_len
98  */
99 #define FI2Q(c, i)	((struct dn_queue *)((c)->q + (c)->q_len * (i)))
100 #define Q2FI(c, q)	(((char *)(q) - (c)->q)/(c)->q_len)
101 
102 int debug = 0;
103 
104 struct dn_parms dn_cfg;
105 
106 static void controller(struct cfg_s *c);
107 
108 /* release a packet for a given flow_id.
109  * Put the mbuf in the freelist, and in case move the
110  * flow to the end of the bucket.
111  */
112 static int
113 drop(struct cfg_s *c, struct mbuf *m)
114 {
115 	struct dn_queue *q;
116 	int i;
117 
118 	c->drop++;
119 	q = FI2Q(c, m->flow_id);
120 	i = q->ni.length; // XXX or ffs...
121 
122 	ND("q %p id %d current length %d", q, m->flow_id, i);
123 	if (i < BACKLOG) {
124 		struct list_head *h = &q->ni.h;
125 		c->llmask &= ~(1<<(i+1));
126 		c->llmask |= (1<<(i));
127 		list_del(h);
128 		list_add_tail(h, &c->ll[i]);
129 	}
130 	m->m_nextpkt = c->freelist;
131 	c->freelist = m;
132 	return 0;
133 }
134 
135 
136 /*
137  * dn_sch_inst does not have a queue, for the RR we
138  * allocate a mq right after si
139  */
140 static int
141 default_enqueue(struct dn_sch_inst *si, struct dn_queue *q, struct mbuf *m)
142 {
143 	struct mq *mq = (struct mq *)si;
144 
145 	(void)q;
146 	/* this is the default function if no scheduler is provided */
147 	if (mq->head == NULL)
148 		mq->head = m;
149 	else
150 		mq->tail->m_nextpkt = m;
151 	mq->tail = m;
152 	return 0; /* default - success */
153 }
154 
155 
156 static struct mbuf *
157 default_dequeue(struct dn_sch_inst *si)
158 {
159 	struct mq *mq = (struct mq *)si;
160 	struct mbuf *m;
161 	/* this is the default function if no scheduler is provided */
162 	if ((m = mq->head)) {
163 		m = mq->head;
164 		mq->head = m->m_nextpkt;
165 		m->m_nextpkt = NULL;
166 	}
167 	return m;
168 }
169 
170 static void
171 gnet_stats_enq(struct cfg_s *c, struct mbuf *mb)
172 {
173 	struct dn_sch_inst *si = c->si;
174 	struct dn_queue *_q = FI2Q(c, mb->flow_id);
175 
176 	if (_q->ni.length == 1) {
177 		_q->ni.bytes = 0;
178 		_q->ni.sch_bytes = si->ni.bytes;
179 	}
180 }
181 
182 static void
183 gnet_stats_deq(struct cfg_s *c, struct mbuf *mb)
184 {
185 	struct dn_sch_inst *si = c->si;
186 	struct dn_queue *_q = FI2Q(c, mb->flow_id);
187 	int len = mb->m_pkthdr.len;
188 
189 	_q->ni.bytes += len;
190 	si->ni.bytes += len;
191 
192 	if (_q->ni.length == 0) {
193 		double bytes = (double)_q->ni.bytes;
194 		double sch_bytes = (double)si->ni.bytes - _q->ni.sch_bytes;
195 		double weight = (double)_q->fs->fs.par[0] / c->wsum;
196 		double wfi = sch_bytes * weight - bytes;
197 
198 		if (c->q_wfi[mb->flow_id] < wfi)
199 			c->q_wfi[mb->flow_id] = wfi;
200 	}
201 }
202 
203 static int
204 mainloop(struct cfg_s *c)
205 {
206 	int i;
207 	struct mbuf *m;
208 
209 	for (i=0; i < c->loops; i++) {
210 		/* implement histeresis */
211 		controller(c);
212 		DX(3, "loop %d enq %d send %p rx %d",
213 			i, c->_enqueue, c->tosend, c->can_dequeue);
214 		if ( (m = c->tosend) ) {
215 			int ret;
216 			struct dn_queue *q = FI2Q(c, m->flow_id);
217 			c->_enqueue++;
218 			ret = c->enq(c->si, q, m);
219 			if (ret) {
220 				drop(c, m);
221 				D("loop %d enqueue fail", i );
222 				/*
223 				 * XXX do not insist; rather, try dequeue
224 				 */
225 				goto do_dequeue;
226 			} else {
227 				ND("enqueue ok");
228 				c->pending++;
229 				gnet_stats_enq(c, m);
230 			}
231 		} else if (c->can_dequeue) {
232 do_dequeue:
233 			c->dequeue++;
234 			m = c->deq(c->si);
235 			if (m) {
236 				c->pending--;
237 				drop(c, m);
238 				c->drop--;	/* compensate */
239 				gnet_stats_deq(c, m);
240 			} else {
241 				D("--- ouch, cannot operate on iteration %d, pending %d", i, c->pending);
242 				break;
243 			}
244 		}
245 	}
246 	DX(1, "mainloop ends %d", i);
247 	return 0;
248 }
249 
250 int
251 dump(struct cfg_s *c)
252 {
253 	int i;
254 
255 	for (i=0; i < c->flows; i++) {
256 		//struct dn_queue *q = FI2Q(c, i);
257 		ND(1, "queue %4d tot %10llu", i,
258 		    (unsigned long long)q->ni.tot_bytes);
259 	}
260 	DX(1, "done %d loops\n", c->loops);
261 	return 0;
262 }
263 
264 /* interpret a number in human form */
265 static long
266 getnum(const char *s, char **next, const char *key)
267 {
268 	char *end = NULL;
269 	long l;
270 
271 	if (next)	/* default */
272 		*next = NULL;
273 	if (s && *s) {
274 		DX(3, "token is <%s> %s", s, key ? key : "-");
275 		l = strtol(s, &end, 0);
276 	} else {
277 		DX(3, "empty string");
278 		l = -1;
279 	}
280 	if (l < 0) {
281 		DX(2, "invalid %s for %s", s ? s : "NULL", (key ? key : "") );
282 		return 0;	// invalid
283 	}
284 	if (!end || !*end)
285 		return l;
286 	if (*end == 'n')
287 		l = -l;	/* multiply by n */
288 	else if (*end == 'K')
289 		l = l*1000;
290 	else if (*end == 'M')
291 		l = l*1000000;
292 	else if (*end == 'k')
293 		l = l*1024;
294 	else if (*end == 'm')
295 		l = l*1024*1024;
296 	else if (*end == 'w')
297 		;
298 	else {/* not recognized */
299 		D("suffix %s for %s, next %p", end, key, next);
300 		end--;
301 	}
302 	end++;
303 	DX(3, "suffix now %s for %s, next %p", end, key, next);
304 	if (next && *end) {
305 		DX(3, "setting next to %s for %s", end, key);
306 		*next = end;
307 	}
308 	return l;
309 }
310 
311 /*
312  * flowsets are a comma-separated list of
313  *     weight:maxlen:flows
314  * indicating how many flows are hooked to that fs.
315  * Both weight and range can be min-max-steps.
316  * The first pass (fs != NULL) justs count the number of flowsets and flows,
317  * the second pass (fs == NULL) we complete the setup.
318  */
319 static void
320 parse_flowsets(struct cfg_s *c, const char *fs)
321 {
322 	char *s, *cur, *next;
323 	int n_flows = 0, n_fs = 0, wsum = 0;
324 	int i, j;
325 	struct dn_fs *prev = NULL;
326 	int pass = (fs == NULL);
327 
328 	DX(3, "--- pass %d flows %d flowsets %d", pass, c->flows, c->flowsets);
329 	if (fs != NULL) { /* first pass */
330 		if (c->fs_config)
331 			D("warning, overwriting fs %s with %s",
332 				c->fs_config, fs);
333 		c->fs_config = fs;
334 	}
335 	s = c->fs_config ? strdup(c->fs_config) : NULL;
336 	if (s == NULL) {
337 		if (pass == 0)
338 			D("no fsconfig");
339 		return;
340 	}
341 	for (next = s; (cur = strsep(&next, ","));) {
342 		char *p = NULL;
343 		int w, w_h, w_steps, wi;
344 		int len, len_h, l_steps, li;
345 		int flows;
346 
347 		w = getnum(strsep(&cur, ":"), &p, "weight");
348 		if (w <= 0)
349 			w = 1;
350 		w_h = p ? getnum(p+1, &p, "weight_max") : w;
351 		w_steps = p ? getnum(p+1, &p, "w_steps") : (w_h == w ?1:2);
352 		len = getnum(strsep(&cur, ":"), &p, "len");
353 		if (len <= 0)
354 			len = 1000;
355 		len_h = p ? getnum(p+1, &p, "len_max") : len;
356 		l_steps = p ? getnum(p+1, &p, "l_steps") : (len_h == len ? 1 : 2);
357 		flows = getnum(strsep(&cur, ":"), NULL, "flows");
358 		if (flows == 0)
359 			flows = 1;
360 		DX(4, "weight %d..%d (%d) len %d..%d (%d) flows %d",
361 			w, w_h, w_steps, len, len_h, l_steps, flows);
362 		if (w == 0 || w_h < w || len == 0 || len_h < len ||
363 				flows == 0) {
364 			DX(4,"wrong parameters %s", s);
365 			return;
366 		}
367 		n_flows += flows * w_steps * l_steps;
368 		for (i = 0; i < w_steps; i++) {
369 			wi = w + ((w_h - w)* i)/(w_steps == 1 ? 1 : (w_steps-1));
370 			for (j = 0; j < l_steps; j++, n_fs++) {
371 				struct dn_fs *fs = &c->fs[n_fs].fs; // tentative
372 				int x;
373 
374 				li = len + ((len_h - len)* j)/(l_steps == 1 ? 1 : (l_steps-1));
375 				x = (wi*2048)/li;
376 				DX(3, "----- fs %4d weight %4d lmax %4d X %4d flows %d",
377 					n_fs, wi, li, x, flows);
378 				if (pass == 0)
379 					continue;
380 				if (c->fs == NULL || c->flowsets <= n_fs) {
381 					D("error in number of flowsets");
382 					return;
383 				}
384 				wsum += wi * flows;
385 				fs->par[0] = wi;
386 				fs->par[1] = li;
387 				fs->index = n_fs;
388 				fs->n_flows = flows;
389 				fs->cur = fs->first_flow = prev==NULL ? 0 : prev->next_flow;
390 				fs->next_flow = fs->first_flow + fs->n_flows;
391 				fs->y = x * flows;
392 				fs->base_y = (prev == NULL) ? 0 : prev->next_y;
393 				fs->next_y = fs->base_y + fs->y;
394 				prev = fs;
395 			}
396 		}
397 	}
398 	c->flows = n_flows;
399 	c->flowsets = n_fs;
400 	c->wsum = wsum;
401 	if (pass == 0)
402 		return;
403 
404 	/* now link all flows to their parent flowsets */
405 	DX(1,"%d flows on %d flowsets", c->flows, c->flowsets);
406 #ifdef USE_CUR
407 	c->max_y = prev ? prev->base_y + prev->y : 0;
408 	DX(1,"%d flows on %d flowsets max_y %d", c->flows, c->flowsets, c->max_y);
409 #endif /* USE_CUR */
410 	for (i=0; i < c->flowsets; i++) {
411 		struct dn_fs *fs = &c->fs[i].fs;
412 		DX(1, "fs %3d w %5d l %4d flow %5d .. %5d y %6d .. %6d",
413 			i, fs->par[0], fs->par[1],
414 			fs->first_flow, fs->next_flow,
415 			fs->base_y, fs->next_y);
416 		for (j = fs->first_flow; j < fs->next_flow; j++) {
417 			struct dn_queue *q = FI2Q(c, j);
418 			q->fs = &c->fs[i];
419 		}
420 	}
421 }
422 
423 /* available schedulers */
424 extern moduledata_t *_g_dn_fifo;
425 extern moduledata_t *_g_dn_wf2qp;
426 extern moduledata_t *_g_dn_rr;
427 extern moduledata_t *_g_dn_qfq;
428 #ifdef WITH_QFQP
429 extern moduledata_t *_g_dn_qfqp;
430 #endif
431 #ifdef WITH_KPS
432 extern moduledata_t *_g_dn_kps;
433 #endif
434 
435 static int
436 init(struct cfg_s *c)
437 {
438 	int i;
439 	int ac = c->ac;
440 	char * const *av = c->av;
441 
442 	c->si_len = sizeof(struct dn_sch_inst);
443 	c->q_len = sizeof(struct dn_queue);
444 	moduledata_t *mod = NULL;
445 	struct dn_alg *p = NULL;
446 
447 	c->th_min = -1; /* 1 packet per flow */
448 	c->th_max = -20;/* 20 packets per flow */
449 	c->lmin = c->lmax = 1280;	/* packet len */
450 	c->flows = 1;
451 	c->flowsets = 1;
452 	c->name = "null";
453 	ac--; av++;
454 	while (ac > 1) {
455 		if (!strcmp(*av, "-n")) {
456 			c->loops = getnum(av[1], NULL, av[0]);
457 		} else if (!strcmp(*av, "-d")) {
458 			debug = atoi(av[1]);
459 		} else if (!strcmp(*av, "-alg")) {
460 			if (!strcmp(av[1], "rr"))
461 				mod = _g_dn_rr;
462 			else if (!strcmp(av[1], "wf2qp"))
463 				mod = _g_dn_wf2qp;
464 			else if (!strcmp(av[1], "fifo"))
465 				mod = _g_dn_fifo;
466 			else if (!strcmp(av[1], "qfq"))
467 				mod = _g_dn_qfq;
468 #ifdef WITH_QFQP
469 			else if (!strcmp(av[1], "qfq+") ||
470 			    !strcmp(av[1], "qfqp") )
471 				mod = _g_dn_qfqp;
472 #endif
473 #ifdef WITH_KPS
474 			else if (!strcmp(av[1], "kps"))
475 				mod = _g_dn_kps;
476 #endif
477 			else
478 				mod = NULL;
479 			c->name = mod ? mod->name : "NULL";
480 			DX(3, "using scheduler %s", c->name);
481 		} else if (!strcmp(*av, "-len")) {
482 			c->lmin = getnum(av[1], NULL, av[0]);
483 			c->lmax = c->lmin;
484 			DX(3, "setting max to %d", c->th_max);
485 #ifdef USE_BURST
486 		} else if (!strcmp(*av, "-burst")) {
487 			c->maxburst = getnum(av[1], NULL, av[0]);
488 			DX(3, "setting max to %d", c->th_max);
489 #endif /* USE_BURST */
490 		} else if (!strcmp(*av, "-qmax")) {
491 			c->th_max = getnum(av[1], NULL, av[0]);
492 			DX(3, "setting max to %d", c->th_max);
493 		} else if (!strcmp(*av, "-qmin")) {
494 			c->th_min = getnum(av[1], NULL, av[0]);
495 			DX(3, "setting min to %d", c->th_min);
496 		} else if (!strcmp(*av, "-flows")) {
497 			c->flows = getnum(av[1], NULL, av[0]);
498 			DX(3, "setting flows to %d", c->flows);
499 		} else if (!strcmp(*av, "-flowsets")) {
500 			parse_flowsets(c, av[1]); /* first pass */
501 			DX(3, "setting flowsets to %d", c->flowsets);
502 		} else {
503 			D("option %s not recognised, ignore", *av);
504 		}
505 		ac -= 2; av += 2;
506 	}
507 #ifdef USE_BURST
508 	if (c->maxburst <= 0)
509 		c->maxburst = 1;
510 #endif /* USE_BURST */
511 	if (c->loops <= 0)
512 		c->loops = 1;
513 	if (c->flows <= 0)
514 		c->flows = 1;
515 	if (c->flowsets <= 0)
516 		c->flowsets = 1;
517 	if (c->lmin <= 0)
518 		c->lmin = 1;
519 	if (c->lmax <= 0)
520 		c->lmax = 1;
521 	/* multiply by N */
522 	if (c->th_min < 0)
523 		c->th_min = c->flows * -c->th_min;
524 	if (c->th_max < 0)
525 		c->th_max = c->flows * -c->th_max;
526 	if (c->th_max <= c->th_min)
527 		c->th_max = c->th_min + 1;
528 
529 	/* now load parameters from the module */
530 	if (mod) {
531 		p = mod->p;
532 		DX(3, "using module %s f %p p %p", mod->name, mod->f, mod->p);
533 		DX(3, "modname %s ty %d", p->name, p->type);
534 		// XXX check enq and deq not null
535 		c->enq = p->enqueue;
536 		c->deq = p->dequeue;
537 		c->si_len += p->si_datalen;
538 		c->q_len += p->q_datalen;
539 		c->schk_len += p->schk_datalen;
540 	} else {
541 		/* make sure c->si has room for a queue */
542 		c->enq = default_enqueue;
543 		c->deq = default_dequeue;
544 	}
545 
546 	/* allocate queues, flowsets and one scheduler */
547 	D("using %d flows, %d flowsets", c->flows, c->flowsets);
548 	D("q_len %d dn_fsk %d si %d sched %d",
549 		c->q_len, (int)sizeof(struct dn_fsk),
550 		c->si_len, c->schk_len);
551 	c->sched = calloc(1, c->schk_len); /* one parent scheduler */
552 	c->si = calloc(1, c->si_len); /* one scheduler instance */
553 	c->fs = calloc(c->flowsets, sizeof(struct dn_fsk));
554 	c->q = calloc(c->flows, c->q_len);	/* one queue per flow */
555 	c->q_wfi = calloc(c->flows, sizeof(double)); /* stats, one per flow */
556 	if (!c->sched || !c->si || !c->fs || !c->q || !c->q_wfi) {
557 		D("error allocating memory");
558 		exit(1);
559 	}
560 	c->si->sched = c->sched; /* link scheduler instance to template */
561 	if (p) {
562 		/* run initialization code if needed */
563 		if (p->config)
564 			p->config(c->si->sched);
565 		if (p->new_sched)
566 			p->new_sched(c->si);
567 	}
568 	/* parse_flowsets links queues to their flowsets */
569 	parse_flowsets(c, NULL); /* second pass */
570 	/* complete the work calling new_fsk */
571 	for (i = 0; i < c->flowsets; i++) {
572 		struct dn_fsk *fsk = &c->fs[i];
573 		if (fsk->fs.par[1] == 0)
574 			fsk->fs.par[1] = 1000;	/* default pkt len */
575 		fsk->sched = c->si->sched;
576 		if (p && p->new_fsk)
577 			p->new_fsk(fsk);
578 	}
579 	/* --- now the scheduler is initialized --- */
580 
581 	/*
582 	 * initialize the lists for the generator, and put
583 	 * all flows in the list for backlog = 0
584 	 */
585 	for (i=0; i <= BACKLOG+5; i++)
586 		INIT_LIST_HEAD(&c->ll[i]);
587 
588 	for (i = 0; i < c->flows; i++) {
589 		struct dn_queue *q = FI2Q(c, i);
590 		if (q->fs == NULL)
591 			q->fs = &c->fs[0]; /* XXX */
592 		q->_si = c->si;
593 		if (p && p->new_queue)
594 			p->new_queue(q);
595 		INIT_LIST_HEAD(&q->ni.h);
596 		list_add_tail(&q->ni.h, &c->ll[0]);
597 	}
598 	c->llmask = 1; /* all flows are in the first list */
599 	return 0;
600 }
601 
602 
603 int
604 main(int ac, char *av[])
605 {
606 	struct cfg_s c;
607 	double ll;
608 	int i;
609 	char msg[40];
610 
611 	bzero(&c, sizeof(c));
612 	c.ac = ac;
613 	c.av = av;
614 	init(&c);
615 	gettimeofday(&c.time, NULL);
616 	D("th_min %d th_max %d", c.th_min, c.th_max);
617 
618 	mainloop(&c);
619 	{
620 		struct timeval end;
621 		gettimeofday(&end, NULL);
622 		timersub(&end, &c.time, &c.time);
623 	}
624 	ll = c.time.tv_sec*1000000 + c.time.tv_usec;
625 	ll *= 1000;	/* convert to nanoseconds */
626 	ll /= c._enqueue;
627 	sprintf(msg, "1::%d", c.flows);
628 	for (i = 0; i < c.flows; i++) {
629 		if (c.wfi < c.q_wfi[i])
630 			c.wfi = c.q_wfi[i];
631 	}
632 	D("sched=%-12s\ttime=%d.%03d sec (%.0f nsec) enq %lu %lu deq\n"
633 	   "\twfi=%.02f\tflow=%-16s",
634 	   c.name, (int)c.time.tv_sec, (int)c.time.tv_usec / 1000, ll,
635 	   (unsigned long)c._enqueue, (unsigned long)c.dequeue, c.wfi,
636 	   c.fs_config ? c.fs_config : msg);
637 	dump(&c);
638 	DX(1, "done ac %d av %p", ac, av);
639 	for (i=0; i < ac; i++)
640 		DX(1, "arg %d %s", i, av[i]);
641 	return 0;
642 }
643 
644 /*
645  * The controller decides whether in this iteration we should send
646  * (the packet is in c->tosend) and/or receive (flag c->can_dequeue)
647  */
648 static void
649 controller(struct cfg_s *c)
650 {
651 	struct mbuf *m;
652 	struct dn_fs *fs;
653 	int flow_id;
654 
655 	/* hysteresis between max and min */
656 	if (c->state == 0 && c->pending >= (uint32_t)c->th_max)
657 		c->state = 1;
658 	else if (c->state == 1 && c->pending <= (uint32_t)c->th_min)
659 		c->state = 0;
660 	ND(1, "state %d pending %2d", c->state, c->pending);
661 	c->can_dequeue = c->state;
662 	c->tosend = NULL;
663 	if (c->can_dequeue)
664 		return;
665 
666 	/*
667 	 * locate the flow to use for enqueueing
668 	 * We take the queue with the lowest number of queued packets,
669 	 * generate a packet for it, and put the queue in the next highest.
670 	 */
671     if (1) {
672 	int i;
673 	struct dn_queue *q;
674 	struct list_head *h;
675 
676 	i = ffs(c->llmask) - 1;
677 	if (i < 0) {
678 		D("no candidate");
679 		c->can_dequeue = 1;
680 		return;
681 	}
682 	h = &c->ll[i];
683 	ND(1, "backlog %d p %p prev %p next %p", i, h, h->prev, h->next);
684 	q = list_first_entry(h, struct dn_queue, ni.h);
685 	list_del(&q->ni.h);
686 	flow_id = Q2FI(c, q);
687 	DX(2, "extracted flow %p %d backlog %d", q, flow_id, i);
688 	if (list_empty(h)) {
689 		ND(2, "backlog %d empty", i);
690 		c->llmask &= ~(1<<i);
691 	}
692 	ND(1, "before %d p %p prev %p next %p", i+1, h+1, h[1].prev, h[1].next);
693 	list_add_tail(&q->ni.h, h+1);
694 	ND(1, " after %d p %p prev %p next %p", i+1, h+1, h[1].prev, h[1].next);
695 	if (i < BACKLOG) {
696 		ND(2, "backlog %d full", i+1);
697 		c->llmask |= 1<<(1+i);
698 	}
699 	fs = &q->fs->fs;
700 	fs->cur = flow_id;
701 #ifdef USE_CUR
702 	c->cur_fs = q->fs - c->fs;
703     } else {
704 	/* XXX this does not work ? */
705 	/* now decide whom to send the packet, and the length */
706 	/* lookup in the flow table */
707 	if (c->cur_y >= c->max_y) {	/* handle wraparound */
708 		c->cur_y = 0;
709 		c->cur_fs = 0;
710 	}
711 	fs = &c->fs[c->cur_fs].fs;
712 	flow_id = fs->cur++;
713 	if (fs->cur >= fs->next_flow)
714 		fs->cur = fs->first_flow;
715 	c->cur_y++;
716 	if (c->cur_y >= fs->next_y)
717 		c->cur_fs++;
718 #endif /* USE_CUR */
719     }
720 
721 	/* construct a packet */
722 	if (c->freelist) {
723 		m = c->tosend = c->freelist;
724 		c->freelist = c->freelist->m_nextpkt;
725 	} else {
726 		m = c->tosend = calloc(1, sizeof(struct mbuf));
727 	}
728 	if (m == NULL)
729 		return;
730 
731 	//m->cfg = c;
732 	m->m_nextpkt = NULL;
733 	m->m_pkthdr.len = fs->par[1]; // XXX maxlen
734 	m->flow_id = flow_id;
735 
736 	ND(2,"y %6d flow %5d fs %3d weight %4d len %4d",
737 		c->cur_y, m->flow_id, c->cur_fs,
738 		fs->par[0], m->m_pkthdr.len);
739 
740 }
741