1 #include "EXTERN.h"
2 #include "perl.h"
3 #include "XSUB.h"
4
5 #include "queue.h"
6 #include "alloc.h"
7
8 /*#define DEBUG(x) x*/
9 #define DEBUG(x)
10 /*#define DEBUG_ERR(x) x*/
11 #define DEBUG_ERR(x)
12
13 /*#define KEEP_STATS 1*/
14
15 #if KEEP_STATS
16 #define STATS(x) x
17 #else
18 #define STATS(x)
19 #endif
20
21 #define PQ_START_SIZE 10
22 #define AT_START 0
23 #define AT_END 1
24
25 #define STUPID_IDS 0
26
27 #define LARGE_QUEUE_SIZE 50
28
29 /*
30 We store the queue in a similar way to the way perl deals with arrays,
31 we keep a block of memory, but the first element may or may not be in use,
32 depending on the pattern of usage.
33
34 There's 3 value controlling usage of the array:
35
36 - alloc - the number of elements allocated in total
37 - start - the first element in use in the array
38 - end - one past the end of the last element in the array
39
40 This has the properties that:
41
42 start == 0 - no space at the front
43 end == alloc - no space at the end
44 end - start - number of elements in the queue
45
46 We use a perl hash (HV *) to store the mapping from ids to priorities.
47
48 */
49 struct poe_queue_tag {
50 /* the first entry in use */
51 int start;
52
53 /* 1 past the last entry in use, hence end - start is the number of
54 entries in the queue */
55 int end;
56
57 /* the total number of entries allocated */
58 int alloc;
59
60 /* used to generate item ids */
61 pq_id_t queue_seq;
62
63 /* used to track in use item ids */
64 HV *ids;
65
66 /* the actual entries */
67 pq_entry *entries;
68
69 #if KEEP_STATS
70 int total_finds;
71 int binary_finds;
72 #endif
73 };
74
75 /*
76 poe_create - create a new queue object.
77
78 No parameters. returns the new queue object.
79
80 */
81 poe_queue *
pq_create(void)82 pq_create(void) {
83 poe_queue *pq = mymalloc(sizeof(poe_queue));
84
85 if (pq == NULL)
86 croak("Out of memory");
87 pq->start = 0;
88 pq->end = 0;
89 pq->alloc = PQ_START_SIZE;
90 pq->queue_seq = 0;
91 pq->ids = newHV();
92 pq->entries = mymalloc(sizeof(pq_entry) * PQ_START_SIZE);
93 memset(pq->entries, 0, sizeof(pq_entry) * PQ_START_SIZE);
94 if (pq->entries == NULL)
95 croak("Out of memory");
96
97 #if KEEP_STATS
98 pq->total_finds = pq->binary_finds = 0;
99 #endif
100
101 DEBUG( fprintf(stderr, "pq_create() => %p\n", pq) );
102
103 return pq;
104 }
105
106 /*
107 pq_delete - release the queue object.
108
109 This also releases one reference from each SV in the queue.
110
111 */
112 void
pq_delete(poe_queue * pq)113 pq_delete(poe_queue *pq) {
114 int i;
115
116 DEBUG( fprintf(stderr, "pq_delete(%p)\n", pq) );
117 if (pq->end > pq->start) {
118 for (i = pq->start; i < pq->end; ++i) {
119 SvREFCNT_dec(pq->entries[i].payload);
120 }
121 }
122 SvREFCNT_dec((SV *)pq->ids);
123 pq->ids = NULL;
124 if (pq->entries)
125 myfree(pq->entries);
126 pq->entries = NULL;
127 myfree(pq);
128 }
129
130 /*
131 pq_new_id - generate a new item id.
132
133 Internal use only.
134
135 This, the following 3 functions and pq_create, pq_delete, should be
136 all that needs to be modified if we change hash implementations.
137
138 */
139 static
140 pq_id_t
pq_new_id(poe_queue * pq,pq_priority_t priority)141 pq_new_id(poe_queue *pq, pq_priority_t priority) {
142 #if STUPID_IDS
143 int seq;
144 int i;
145 int found;
146
147 do {
148 seq = ++pq->queue_seq;
149 found = 0;
150 for (i = pq->start; i < pq->end; ++i) {
151 if (pq->entries[i].id == seq) {
152 found = 1;
153 break;
154 }
155 }
156 } while (found);
157
158 return seq;
159 #else
160 pq_id_t seq = ++pq->queue_seq;;
161
162 while (hv_exists(pq->ids, (char *)&seq, sizeof(seq))) {
163 seq = ++pq->queue_seq;
164 }
165 hv_store(pq->ids, (char *)&seq, sizeof(seq), newSVnv(priority), 0);
166 #endif
167
168 return seq;
169 }
170
171 /*
172 pq_release_id - releases an id for future use.
173 */
174 static
175 void
pq_release_id(poe_queue * pq,pq_id_t id)176 pq_release_id(poe_queue *pq, pq_id_t id) {
177 #if STUPID_IDS
178 #else
179 hv_delete(pq->ids, (char *)&id, sizeof(id), 0);
180 #endif
181 }
182
183 /*
184 pq_item_priority - get the priority of an item given it's id
185 */
186 static
187 int
pq_item_priority(poe_queue * pq,pq_id_t id,pq_priority_t * priority)188 pq_item_priority(poe_queue *pq, pq_id_t id, pq_priority_t *priority) {
189 #if STUPID_IDS
190 int i;
191
192 for (i = pq->start; i < pq->end; ++i) {
193 if (pq->entries[i].id == id) {
194 *priority = pq->entries[i].priority;
195 return 1;
196 }
197 }
198
199 return 0;
200 #else
201 SV **entry = hv_fetch(pq->ids, (char *)&id, sizeof(id), 0);
202
203 if (!entry || !*entry)
204 return 0;
205
206 *priority = SvNV(*entry);
207
208 return 1;
209 #endif
210 }
211
212 /*
213 pq_set_id_priority - set the priority of an item in the id hash
214 */
215 static
216 void
pq_set_id_priority(poe_queue * pq,pq_id_t id,pq_priority_t new_priority)217 pq_set_id_priority(poe_queue *pq, pq_id_t id, pq_priority_t new_priority) {
218 #if STUPID_IDS
219 /* nothing to do, caller set it in the array */
220 #else
221 SV **entry = hv_fetch(pq->ids, (char *)&id, sizeof(id), 0);
222
223 if (!entry && !*entry)
224 croak("pq_set_priority: id not found");
225
226 sv_setnv(*entry, new_priority);
227 #endif
228 }
229
230 /*
231 pq_move_items - moves items around.
232
233 This encapsulates the old calls to memmove(), providing a single place
234 to add error checking.
235 */
236 static void
pq_move_items(poe_queue * pq,int target,int src,int count)237 pq_move_items(poe_queue *pq, int target, int src, int count) {
238
239 DEBUG_ERR(
240 {
241 int die = 0;
242 if (src < pq->start) {
243 fprintf(stderr, "src %d less than start %d\n", src, pq->start);
244 ++die;
245 }
246 if (src + count > pq->end) {
247 fprintf(stderr, "src %d + count %d beyond end %d\n", src, count, pq->end);
248 ++die;
249 }
250 if (target < 0) {
251 fprintf(stderr, "target %d < 0\n", target);
252 ++die;
253 }
254 if (target + count > pq->alloc) {
255 fprintf(stderr, "target %d + count %d > alloc %d\n", target, count, pq->alloc);
256 ++die;
257 }
258 if (die) *(char *)0 = '\0';
259 }
260 )
261 memmove(pq->entries + target, pq->entries + src, count * sizeof(pq_entry));
262 }
263
264 /*
265 pq_realloc - make space at the front of back of the queue.
266
267 This adjusts the queue to allow insertion of a single item at the
268 front or the back of the queue.
269
270 If the queue has 33% or more space available we simple adjust the
271 position of the in-use items within the array. We try not to push the
272 items right up against the opposite end of the array, since we might
273 need to insert items there too.
274
275 If the queue has less than 33% space available we allocate another 50%
276 space. We then only move the queue elements if we need space at the
277 front, since the reallocation has just opened up a huge space at the
278 back. Since we're reallocating exponentially larger sizes we should
279 have a constant time cost on reallocation per queue item stored (but
280 other costs are going to be higher.)
281
282 */
283 static
284 void
pq_realloc(poe_queue * pq,int at_end)285 pq_realloc(poe_queue *pq, int at_end) {
286 int count = pq->end - pq->start;
287
288 DEBUG( fprintf(stderr, "pq_realloc((%d, %d, %d), %d)\n", pq->start, pq->end, pq->alloc, at_end) );
289 if (count * 3 / 2 < pq->alloc) {
290 /* 33 % or more space available, use some of it */
291 int new_start;
292
293 if (at_end) {
294 new_start = (pq->alloc - count) / 3;
295 }
296 else {
297 new_start = (pq->alloc - count) * 2 / 3;
298 }
299 DEBUG( fprintf(stderr, " moving start to %d\n", new_start) );
300 pq_move_items(pq, new_start, pq->start, count);
301 pq->start = new_start;
302 pq->end = new_start + count;
303 }
304 else {
305 int new_alloc = pq->alloc * 3 / 2;
306 pq->entries = myrealloc(pq->entries, sizeof(pq_entry) * new_alloc);
307 pq->alloc = new_alloc;
308
309 if (!pq->entries)
310 croak("Out of memory");
311
312 DEBUG( fprintf(stderr, " - expanding to %d entries\n", new_alloc) );
313
314 if (!at_end) {
315 int new_start = (new_alloc - count) * 2 / 3;
316 DEBUG( fprintf(stderr, " moving start to %d\n", new_start) );
317 pq_move_items(pq, new_start, pq->start, count);
318 pq->start = new_start;
319 pq->end = new_start + count;
320 }
321 }
322 DEBUG( fprintf(stderr, " final: %d %d %d\n", pq->start, pq->end, pq->alloc) );
323 }
324
325 /*
326 pq_insertion_point - figure out where to insert an item with the given
327 priority
328
329 Internal.
330 */
331 static
332 int
pq_insertion_point(poe_queue * pq,pq_priority_t priority)333 pq_insertion_point(poe_queue *pq, pq_priority_t priority) {
334 if (pq->end - pq->start < LARGE_QUEUE_SIZE) {
335 int i = pq->end;
336 while (i > pq->start &&
337 priority < pq->entries[i-1].priority) {
338 --i;
339 }
340 return i;
341 }
342 else {
343 int lower = pq->start;
344 int upper = pq->end - 1;
345 while (1) {
346 int midpoint = (lower + upper) >> 1;
347
348 if (upper < lower)
349 return lower;
350
351 if (priority < pq->entries[midpoint].priority) {
352 upper = midpoint - 1;
353 continue;
354 }
355 if (priority > pq->entries[midpoint].priority) {
356 lower = midpoint + 1;
357 continue;
358 }
359 while (midpoint < pq->end &&
360 priority == pq->entries[midpoint].priority) {
361 ++midpoint;
362 }
363 return midpoint;
364 }
365 }
366 }
367
368 int
pq_enqueue(poe_queue * pq,pq_priority_t priority,SV * payload)369 pq_enqueue(poe_queue *pq, pq_priority_t priority, SV *payload) {
370 int fill_at;
371 pq_id_t id = pq_new_id(pq, priority);
372
373 DEBUG( fprintf(stderr, "pq_enqueue(%f, %p)\n", priority, payload) );
374 if (pq->start == pq->end) {
375 DEBUG( fprintf(stderr, " - on empty queue\n") );
376 /* allow room at front and back for new entries */
377 pq->start = pq->alloc / 3;
378 pq->end = pq->start + 1;
379 fill_at = pq->start;
380 }
381 else if (priority >= pq->entries[pq->end-1].priority) {
382 DEBUG( fprintf(stderr, " - at the end\n") );
383 if (pq->end == pq->alloc)
384 /* past the end - need to realloc or make some space */
385 pq_realloc(pq, AT_END);
386
387 fill_at = pq->end;
388 ++pq->end;
389 }
390 else if (priority < pq->entries[pq->start].priority) {
391 DEBUG( fprintf(stderr, " - at the front\n") );
392 if (pq->start == 0)
393 /* no space at the front, make some */
394 pq_realloc(pq, AT_START);
395
396 --pq->start;
397 fill_at = pq->start;
398 }
399 else {
400 int i;
401 DEBUG( fprintf(stderr, " - in the middle\n") );
402 i = pq_insertion_point(pq, priority);
403
404 /* if we're near the end we want to push entries up, otherwise down */
405 if (i - pq->start > (pq->end - pq->start) / 2) {
406 DEBUG( fprintf(stderr, " - closer to the back (%d -> [ %d %d ])\n",
407 i, pq->start, pq->end) );
408 /* make sure we have space, this might end up copying twice,
409 but too bad for now */
410 if (pq->end == pq->alloc) {
411 int old_start = pq->start;
412 pq_realloc(pq, AT_END);
413 i += pq->start - old_start;
414 }
415
416 pq_move_items(pq, i+1, i, pq->end - i);
417 ++pq->end;
418 fill_at = i;
419 }
420 else {
421 DEBUG( fprintf(stderr, " - closer to the front (%d -> [ %d %d ])\n",
422 i, pq->start, pq->end) );
423 if (pq->start == 0) {
424 pq_realloc(pq, AT_START);
425 i += pq->start;
426 }
427 pq_move_items(pq, pq->start-1, pq->start, i - pq->start);
428 --pq->start;
429 fill_at = i-1;
430 }
431 }
432 pq->entries[fill_at].priority = priority;
433 pq->entries[fill_at].id = id;
434 pq->entries[fill_at].payload = newSVsv(payload);
435
436 return id;
437 }
438
439 /*
440 Note: it's up to the caller to release the SV. The XS code does this
441 by making it mortal.
442 */
443 int
pq_dequeue_next(poe_queue * pq,pq_priority_t * priority,pq_id_t * id,SV ** payload)444 pq_dequeue_next(poe_queue *pq, pq_priority_t *priority, pq_id_t *id, SV **payload) {
445 pq_entry *entry;
446 /* the caller needs to release the payload (somehow) */
447 if (pq->start == pq->end)
448 return 0;
449
450 entry = pq->entries + pq->start++;
451 *priority = entry->priority;
452 *id = entry->id;
453 *payload = entry->payload;
454 pq_release_id(pq, entry->id);
455
456 return 1;
457 }
458
459 int
pq_get_next_priority(poe_queue * pq,pq_priority_t * priority)460 pq_get_next_priority(poe_queue *pq, pq_priority_t *priority) {
461 if (pq->start == pq->end)
462 return 0;
463
464 *priority = pq->entries[pq->start].priority;
465 return 1;
466 }
467
468 int
pq_get_item_count(poe_queue * pq)469 pq_get_item_count(poe_queue *pq) {
470 return pq->end - pq->start;
471 }
472
473 /*
474 pq_test_filter - the XS magic involved in passing the payload to a
475 filter function.
476 */
477 static
478 int
pq_test_filter(pq_entry * entry,SV * filter)479 pq_test_filter(pq_entry *entry, SV *filter) {
480 /* man perlcall for the magic here */
481 dSP;
482 int count;
483 SV *result_sv;
484 int result;
485
486 ENTER;
487 SAVETMPS;
488 PUSHMARK(SP);
489 XPUSHs(sv_2mortal(newSVsv(entry->payload)));
490 PUTBACK;
491
492 count = call_sv(filter, G_SCALAR);
493
494 SPAGAIN;
495
496 if (count != 1)
497 croak("got other than 1 value in scalar context");
498
499 result_sv = POPs;
500 result = SvTRUE(result_sv);
501
502 PUTBACK;
503 FREETMPS;
504 LEAVE;
505
506 return result;
507 }
508
509 /*
510 pq_find_item - search for an item we know is there.
511
512 Internal.
513 */
514 static
515 int
pq_find_item(poe_queue * pq,pq_id_t id,pq_priority_t priority)516 pq_find_item(poe_queue *pq, pq_id_t id, pq_priority_t priority) {
517 int i;
518
519 STATS(++pq->total_finds);
520 if (pq->end - pq->start < LARGE_QUEUE_SIZE) {
521 for (i = pq->start; i < pq->end; ++i) {
522 if (pq->entries[i].id == id)
523 return i;
524 }
525 DEBUG(fprintf(stderr, "pq_find_item %d => %f\n", id, priority) );
526 croak("Internal inconsistency: event should have been found");
527 }
528
529 /* try a binary search */
530 /* simply translated from the perl */
531 STATS(++pq->binary_finds);
532 {
533 int lower = pq->start;
534 int upper = pq->end - 1;
535 int linear_point;
536 while (1) {
537 int midpoint = (upper + lower) >> 1;
538 if (upper < lower) {
539 croak("Internal inconsistency, priorities out of order");
540 }
541 if (priority < pq->entries[midpoint].priority) {
542 upper = midpoint - 1;
543 continue;
544 }
545 if (priority > pq->entries[midpoint].priority) {
546 lower = midpoint + 1;
547 continue;
548 }
549 linear_point = midpoint;
550 while (linear_point >= pq->start &&
551 priority == pq->entries[linear_point].priority) {
552 if (pq->entries[linear_point].id == id)
553 return linear_point;
554 --linear_point;
555 }
556 linear_point = midpoint;
557 while ( (++linear_point < pq->end) &&
558 priority == pq->entries[linear_point].priority) {
559 if (pq->entries[linear_point].id == id)
560 return linear_point;
561 }
562
563 croak("internal inconsistency: event should have been found");
564 }
565 }
566 }
567
568 int
pq_remove_item(poe_queue * pq,pq_id_t id,SV * filter,pq_entry * removed)569 pq_remove_item(poe_queue *pq, pq_id_t id, SV *filter, pq_entry *removed) {
570 pq_priority_t priority;
571 int index;
572
573 if (!pq_item_priority(pq, id, &priority)) {
574 errno = ESRCH;
575 return 0;
576 }
577
578 index = pq_find_item(pq, id, priority);
579
580 if (!pq_test_filter(pq->entries + index, filter)) {
581 errno = EPERM;
582 return 0;
583 }
584
585 *removed = pq->entries[index];
586 pq_release_id(pq, id);
587 if (index == pq->start) {
588 ++pq->start;
589 }
590 else if (index == pq->end - 1) {
591 --pq->end;
592 }
593 else {
594 pq_move_items(pq, index, index+1, pq->end - index - 1);
595 --pq->end;
596 }
597 DEBUG( fprintf(stderr, "removed (%d, %p (%d))\n", id, removed->payload,
598 SvREFCNT(removed->payload)) );
599
600 return 1;
601 }
602
603 int
pq_remove_items(poe_queue * pq,SV * filter,int max_count,pq_entry ** entries)604 pq_remove_items(poe_queue *pq, SV *filter, int max_count, pq_entry **entries) {
605 int in_index, out_index;
606 int remove_count = 0;
607
608 *entries = NULL;
609 if (pq->start == pq->end)
610 return 0;
611
612 *entries = mymalloc(sizeof(pq_entry) * (pq->end - pq->start));
613 if (!*entries)
614 croak("Out of memory");
615
616 in_index = out_index = pq->start;
617 while (in_index < pq->end && remove_count < max_count) {
618 if (pq_test_filter(pq->entries + in_index, filter)) {
619 pq_release_id(pq, pq->entries[in_index].id);
620 (*entries)[remove_count++] = pq->entries[in_index++];
621 }
622 else {
623 pq->entries[out_index++] = pq->entries[in_index++];
624 }
625 }
626 while (in_index < pq->end) {
627 pq->entries[out_index++] = pq->entries[in_index++];
628 }
629 pq->end = out_index;
630
631 return remove_count;
632 }
633
634 /*
635 We need to keep the following 2 functions in sync (or combine the
636 common code.)
637 */
638 int
pq_set_priority(poe_queue * pq,pq_id_t id,SV * filter,pq_priority_t new_priority)639 pq_set_priority(poe_queue *pq, pq_id_t id, SV *filter, pq_priority_t new_priority) {
640 pq_priority_t old_priority;
641 int index, insert_at;
642
643 if (!pq_item_priority(pq, id, &old_priority)) {
644 errno = ESRCH;
645 return 0;
646 }
647
648 index = pq_find_item(pq, id, old_priority);
649
650 if (!pq_test_filter(pq->entries + index, filter)) {
651 errno = EPERM;
652 return 0;
653 }
654
655 DEBUG( fprintf(stderr, " - index %d oldp %f newp %f\n", index, old_priority, new_priority) );
656
657 if (pq->end - pq->start == 1) {
658 DEBUG( fprintf(stderr, " -- one item\n") );
659 /* only the one item anyway */
660 pq->entries[pq->start].priority = new_priority;
661 }
662 else {
663 insert_at = pq_insertion_point(pq, new_priority);
664 DEBUG( fprintf(stderr, " - new index %d\n", insert_at) );
665 /* the item is still in the queue, so either side of it means it
666 won't move */
667 if (insert_at == index || insert_at == index+1) {
668 DEBUG( fprintf(stderr, " -- change in place\n") );
669 pq->entries[index].priority = new_priority;
670 }
671 else {
672 pq_entry saved = pq->entries[index];
673 saved.priority = new_priority;
674
675 if (insert_at < index) {
676 DEBUG( fprintf(stderr, " - insert_at < index\n") );
677 pq_move_items(pq, insert_at + 1, insert_at, index - insert_at);
678 pq->entries[insert_at] = saved;
679 }
680 else {
681 DEBUG( fprintf(stderr, " - insert_at > index\n") );
682 --insert_at;
683 pq_move_items(pq, index, index + 1, insert_at - index);
684 pq->entries[insert_at] = saved;
685 }
686 }
687 }
688
689 pq_set_id_priority(pq, id, new_priority);
690
691 return 1;
692 }
693
694 int
pq_adjust_priority(poe_queue * pq,pq_id_t id,SV * filter,double delta,pq_priority_t * priority)695 pq_adjust_priority(poe_queue *pq, pq_id_t id, SV *filter, double delta, pq_priority_t *priority) {
696 pq_priority_t old_priority, new_priority;
697 int index, insert_at;
698
699 DEBUG( fprintf(stderr, "pq_adjust_priority(..., %d, %p, %f, ...)\n", id, filter, delta) );
700
701 if (!pq_item_priority(pq, id, &old_priority)) {
702 errno = ESRCH;
703 return 0;
704 }
705
706 index = pq_find_item(pq, id, old_priority);
707
708 if (!pq_test_filter(pq->entries + index, filter)) {
709 errno = EPERM;
710 return 0;
711 }
712
713 new_priority = old_priority + delta;
714
715 DEBUG( fprintf(stderr, " - index %d oldp %f newp %f\n", index, old_priority, new_priority) );
716
717 if (pq->end - pq->start == 1) {
718 DEBUG( fprintf(stderr, " -- one item\n") );
719 /* only the one item anyway */
720 pq->entries[pq->start].priority = new_priority;
721 }
722 else {
723 insert_at = pq_insertion_point(pq, new_priority);
724 DEBUG( fprintf(stderr, " - new index %d\n", insert_at) );
725 /* the item is still in the queue, so either side of it means it
726 won't move */
727 if (insert_at == index || insert_at == index+1) {
728 DEBUG( fprintf(stderr, " -- change in place\n") );
729 pq->entries[index].priority = new_priority;
730 }
731 else {
732 pq_entry saved = pq->entries[index];
733 saved.priority = new_priority;
734
735 if (insert_at < index) {
736 DEBUG( fprintf(stderr, " - insert_at < index\n") );
737 pq_move_items(pq, insert_at + 1, insert_at, index - insert_at);
738 pq->entries[insert_at] = saved;
739 }
740 else {
741 DEBUG( fprintf(stderr, " - insert_at > index\n") );
742 --insert_at;
743 pq_move_items(pq, index, index + 1, insert_at - index);
744 pq->entries[insert_at] = saved;
745 }
746 }
747 }
748
749 pq_set_id_priority(pq, id, new_priority);
750 *priority = new_priority;
751
752 return 1;
753 }
754
755 int
pq_peek_items(poe_queue * pq,SV * filter,int max_count,pq_entry ** items)756 pq_peek_items(poe_queue *pq, SV *filter, int max_count, pq_entry **items) {
757 int count = 0;
758 int i;
759
760 *items = NULL;
761 if (pq->end == pq->start)
762 return 0;
763
764 *items = mymalloc(sizeof(pq_entry) * (pq->end - pq->start));
765 for (i = pq->start; i < pq->end; ++i) {
766 if (pq_test_filter(pq->entries + i, filter)) {
767 (*items)[count++] = pq->entries[i];
768 }
769 }
770 if (!count) {
771 myfree(*items);
772 *items = NULL;
773 }
774
775 return count;
776 }
777
778 /*
779 pq_dump - dump the internals of the queue structure.
780 */
781 void
pq_dump(poe_queue * pq)782 pq_dump(poe_queue *pq) {
783 int i;
784 HE *he;
785
786 fprintf(stderr, "poe_queue\n");
787 fprintf(stderr, " start: %d\n", pq->start);
788 fprintf(stderr, " end: %d\n", pq->end);
789 fprintf(stderr, " alloc: %d\n", pq->alloc);
790 fprintf(stderr, " seq: %d\n", pq->queue_seq);
791 fprintf(stderr, " **Queue Entries:\n"
792 " index: id priority SV\n");
793 for (i = pq->start; i < pq->end; ++i) {
794 pq_entry *entry = pq->entries + i;
795 fprintf(stderr, " %5d: %5d %8f %p (%u)\n", i, entry->id, entry->priority,
796 entry->payload, (unsigned)SvREFCNT(entry->payload));
797 }
798 fprintf(stderr, " **Hash entries:\n");
799 hv_iterinit(pq->ids);
800 while ((he = hv_iternext(pq->ids)) != NULL) {
801 STRLEN len;
802 fprintf(stderr, " %d => %f\n", *(pq_id_t *)HePV(he, len), SvNV(hv_iterval(pq->ids, he)));
803 }
804 }
805
806 /*
807 pq_verify - basic verification of the structure of the queue
808
809 For now check for duplicate ids in sequence.
810 */
811 void
pq_verify(poe_queue * pq)812 pq_verify(poe_queue *pq) {
813 int i;
814 int lastid;
815 int found_err = 0;
816
817 if (pq->start != pq->end) {
818 lastid = pq->entries[pq->start].id;
819 i = pq->start + 1;
820 while (i < pq->end) {
821 if (pq->entries[i].id == lastid) {
822 fprintf(stderr, "Duplicate id %d at %d\n", lastid, i);
823 ++found_err;
824 }
825 ++i;
826 }
827 }
828 if (found_err) {
829 pq_dump(pq);
830 exit(1);
831 }
832 }
833
834 /*
835 pq__set_errno_queue - set errno
836
837 This just sets errno for testing purposes.
838 */
839 void
pq__set_errno_queue(int value)840 pq__set_errno_queue(int value) {
841 errno = value;
842 }
843
844