1 #include "EXTERN.h"
2 #include "perl.h"
3 #include "XSUB.h"
4 
5 #include "queue.h"
6 #include "alloc.h"
7 
8 /*#define DEBUG(x) x*/
9 #define DEBUG(x)
10 /*#define DEBUG_ERR(x) x*/
11 #define DEBUG_ERR(x)
12 
13 /*#define KEEP_STATS 1*/
14 
15 #if KEEP_STATS
16 #define STATS(x) x
17 #else
18 #define STATS(x)
19 #endif
20 
21 #define PQ_START_SIZE 10
22 #define AT_START 0
23 #define AT_END 1
24 
25 #define STUPID_IDS 0
26 
27 #define LARGE_QUEUE_SIZE 50
28 
29 /*
30 We store the queue in a similar way to the way perl deals with arrays,
31 we keep a block of memory, but the first element may or may not be in use,
32 depending on the pattern of usage.
33 
34 There's 3 value controlling usage of the array:
35 
36   - alloc - the number of elements allocated in total
37   - start - the first element in use in the array
38   - end - one past the end of the last element in the array
39 
40 This has the properties that:
41 
42   start == 0 - no space at the front
43   end == alloc - no space at the end
44   end - start - number of elements in the queue
45 
46 We use a perl hash (HV *) to store the mapping from ids to priorities.
47 
48 */
49 struct poe_queue_tag {
50   /* the first entry in use */
51   int start;
52 
53   /* 1 past the last entry in use, hence end - start is the number of
54      entries in the queue */
55   int end;
56 
57   /* the total number of entries allocated */
58   int alloc;
59 
60   /* used to generate item ids */
61   pq_id_t queue_seq;
62 
63   /* used to track in use item ids */
64   HV *ids;
65 
66   /* the actual entries */
67   pq_entry *entries;
68 
69 #if KEEP_STATS
70   int total_finds;
71   int binary_finds;
72 #endif
73 };
74 
75 /*
76 poe_create - create a new queue object.
77 
78 No parameters.  returns the new queue object.
79 
80 */
81 poe_queue *
pq_create(void)82 pq_create(void) {
83   poe_queue *pq = mymalloc(sizeof(poe_queue));
84 
85   if (pq == NULL)
86     croak("Out of memory");
87   pq->start = 0;
88   pq->end = 0;
89   pq->alloc = PQ_START_SIZE;
90   pq->queue_seq = 0;
91   pq->ids = newHV();
92   pq->entries = mymalloc(sizeof(pq_entry) * PQ_START_SIZE);
93   memset(pq->entries, 0, sizeof(pq_entry) * PQ_START_SIZE);
94   if (pq->entries == NULL)
95     croak("Out of memory");
96 
97 #if KEEP_STATS
98   pq->total_finds = pq->binary_finds = 0;
99 #endif
100 
101   DEBUG( fprintf(stderr, "pq_create() => %p\n", pq) );
102 
103   return pq;
104 }
105 
106 /*
107 pq_delete - release the queue object.
108 
109 This also releases one reference from each SV in the queue.
110 
111 */
112 void
pq_delete(poe_queue * pq)113 pq_delete(poe_queue *pq) {
114   int i;
115 
116   DEBUG( fprintf(stderr, "pq_delete(%p)\n", pq) );
117   if (pq->end > pq->start) {
118     for (i = pq->start; i < pq->end; ++i) {
119       SvREFCNT_dec(pq->entries[i].payload);
120     }
121   }
122   SvREFCNT_dec((SV *)pq->ids);
123   pq->ids = NULL;
124   if (pq->entries)
125     myfree(pq->entries);
126   pq->entries = NULL;
127   myfree(pq);
128 }
129 
130 /*
131 pq_new_id - generate a new item id.
132 
133 Internal use only.
134 
135 This, the following 3 functions and pq_create, pq_delete, should be
136 all that needs to be modified if we change hash implementations.
137 
138 */
139 static
140 pq_id_t
pq_new_id(poe_queue * pq,pq_priority_t priority)141 pq_new_id(poe_queue *pq, pq_priority_t priority) {
142 #if STUPID_IDS
143   int seq;
144   int i;
145   int found;
146 
147   do {
148     seq = ++pq->queue_seq;
149     found = 0;
150     for (i = pq->start; i < pq->end; ++i) {
151       if (pq->entries[i].id == seq) {
152 	found = 1;
153 	break;
154       }
155     }
156   } while (found);
157 
158   return seq;
159 #else
160   pq_id_t seq = ++pq->queue_seq;;
161 
162   while (hv_exists(pq->ids, (char *)&seq, sizeof(seq))) {
163     seq = ++pq->queue_seq;
164   }
165   hv_store(pq->ids, (char *)&seq, sizeof(seq), newSVnv(priority), 0);
166 #endif
167 
168   return seq;
169 }
170 
171 /*
172 pq_release_id - releases an id for future use.
173 */
174 static
175 void
pq_release_id(poe_queue * pq,pq_id_t id)176 pq_release_id(poe_queue *pq, pq_id_t id) {
177 #if STUPID_IDS
178 #else
179   hv_delete(pq->ids, (char *)&id, sizeof(id), 0);
180 #endif
181 }
182 
183 /*
184 pq_item_priority - get the priority of an item given it's id
185 */
186 static
187 int
pq_item_priority(poe_queue * pq,pq_id_t id,pq_priority_t * priority)188 pq_item_priority(poe_queue *pq, pq_id_t id, pq_priority_t *priority) {
189 #if STUPID_IDS
190   int i;
191 
192   for (i = pq->start; i < pq->end; ++i) {
193     if (pq->entries[i].id == id) {
194       *priority = pq->entries[i].priority;
195       return 1;
196     }
197   }
198 
199   return 0;
200 #else
201   SV **entry = hv_fetch(pq->ids, (char *)&id, sizeof(id), 0);
202 
203   if (!entry || !*entry)
204     return 0;
205 
206   *priority = SvNV(*entry);
207 
208   return 1;
209 #endif
210 }
211 
212 /*
213 pq_set_id_priority - set the priority of an item in the id hash
214 */
215 static
216 void
pq_set_id_priority(poe_queue * pq,pq_id_t id,pq_priority_t new_priority)217 pq_set_id_priority(poe_queue *pq, pq_id_t id, pq_priority_t new_priority) {
218 #if STUPID_IDS
219   /* nothing to do, caller set it in the array */
220 #else
221   SV **entry = hv_fetch(pq->ids, (char *)&id, sizeof(id), 0);
222 
223   if (!entry && !*entry)
224     croak("pq_set_priority: id not found");
225 
226   sv_setnv(*entry, new_priority);
227 #endif
228 }
229 
230 /*
231 pq_move_items - moves items around.
232 
233 This encapsulates the old calls to memmove(), providing a single place
234 to add error checking.
235 */
236 static void
pq_move_items(poe_queue * pq,int target,int src,int count)237 pq_move_items(poe_queue *pq, int target, int src, int count) {
238 
239   DEBUG_ERR(
240   {
241     int die = 0;
242     if (src < pq->start) {
243       fprintf(stderr, "src %d less than start %d\n", src, pq->start);
244       ++die;
245     }
246     if (src + count > pq->end) {
247       fprintf(stderr, "src %d + count %d beyond end %d\n", src, count, pq->end);
248       ++die;
249     }
250     if (target < 0) {
251       fprintf(stderr, "target %d < 0\n", target);
252       ++die;
253     }
254     if (target + count > pq->alloc) {
255       fprintf(stderr, "target %d + count %d > alloc %d\n", target, count, pq->alloc);
256       ++die;
257     }
258     if (die) *(char *)0 = '\0';
259   }
260   )
261   memmove(pq->entries + target, pq->entries + src, count * sizeof(pq_entry));
262 }
263 
264 /*
265 pq_realloc - make space at the front of back of the queue.
266 
267 This adjusts the queue to allow insertion of a single item at the
268 front or the back of the queue.
269 
270 If the queue has 33% or more space available we simple adjust the
271 position of the in-use items within the array.  We try not to push the
272 items right up against the opposite end of the array, since we might
273 need to insert items there too.
274 
275 If the queue has less than 33% space available we allocate another 50%
276 space.  We then only move the queue elements if we need space at the
277 front, since the reallocation has just opened up a huge space at the
278 back.  Since we're reallocating exponentially larger sizes we should
279 have a constant time cost on reallocation per queue item stored (but
280 other costs are going to be higher.)
281 
282 */
283 static
284 void
pq_realloc(poe_queue * pq,int at_end)285 pq_realloc(poe_queue *pq, int at_end) {
286   int count = pq->end - pq->start;
287 
288   DEBUG( fprintf(stderr, "pq_realloc((%d, %d, %d), %d)\n", pq->start, pq->end, pq->alloc, at_end) );
289   if (count * 3 / 2 < pq->alloc) {
290     /* 33 % or more space available, use some of it */
291     int new_start;
292 
293     if (at_end) {
294       new_start = (pq->alloc - count) / 3;
295     }
296     else {
297       new_start = (pq->alloc - count) * 2 / 3;
298     }
299     DEBUG( fprintf(stderr, "  moving start to %d\n", new_start) );
300     pq_move_items(pq, new_start, pq->start, count);
301     pq->start = new_start;
302     pq->end = new_start + count;
303   }
304   else {
305     int new_alloc = pq->alloc * 3 / 2;
306     pq->entries = myrealloc(pq->entries, sizeof(pq_entry) * new_alloc);
307     pq->alloc = new_alloc;
308 
309     if (!pq->entries)
310       croak("Out of memory");
311 
312     DEBUG( fprintf(stderr, "  - expanding to %d entries\n", new_alloc) );
313 
314     if (!at_end) {
315       int new_start = (new_alloc - count) * 2 / 3;
316       DEBUG( fprintf(stderr, "  moving start to %d\n", new_start) );
317       pq_move_items(pq, new_start, pq->start, count);
318       pq->start = new_start;
319       pq->end = new_start + count;
320     }
321   }
322   DEBUG( fprintf(stderr, "  final: %d %d %d\n", pq->start, pq->end, pq->alloc) );
323 }
324 
325 /*
326 pq_insertion_point - figure out where to insert an item with the given
327 priority
328 
329 Internal.
330 */
331 static
332 int
pq_insertion_point(poe_queue * pq,pq_priority_t priority)333 pq_insertion_point(poe_queue *pq, pq_priority_t priority) {
334   if (pq->end - pq->start < LARGE_QUEUE_SIZE) {
335     int i = pq->end;
336     while (i > pq->start &&
337            priority < pq->entries[i-1].priority) {
338       --i;
339     }
340     return i;
341   }
342   else {
343     int lower = pq->start;
344     int upper = pq->end - 1;
345     while (1) {
346       int midpoint = (lower + upper) >> 1;
347 
348       if (upper < lower)
349         return lower;
350 
351       if (priority < pq->entries[midpoint].priority) {
352         upper = midpoint - 1;
353         continue;
354       }
355       if (priority > pq->entries[midpoint].priority) {
356         lower = midpoint + 1;
357         continue;
358       }
359       while (midpoint < pq->end &&
360              priority == pq->entries[midpoint].priority) {
361         ++midpoint;
362       }
363       return midpoint;
364     }
365   }
366 }
367 
368 int
pq_enqueue(poe_queue * pq,pq_priority_t priority,SV * payload)369 pq_enqueue(poe_queue *pq, pq_priority_t priority, SV *payload) {
370   int fill_at;
371   pq_id_t id = pq_new_id(pq, priority);
372 
373   DEBUG( fprintf(stderr, "pq_enqueue(%f, %p)\n", priority, payload) );
374   if (pq->start == pq->end) {
375     DEBUG( fprintf(stderr, "  - on empty queue\n") );
376     /* allow room at front and back for new entries */
377     pq->start = pq->alloc / 3;
378     pq->end = pq->start + 1;
379     fill_at = pq->start;
380   }
381   else if (priority >= pq->entries[pq->end-1].priority) {
382     DEBUG( fprintf(stderr, "  - at the end\n") );
383     if (pq->end == pq->alloc)
384       /* past the end - need to realloc or make some space */
385       pq_realloc(pq, AT_END);
386 
387     fill_at = pq->end;
388     ++pq->end;
389   }
390   else if (priority < pq->entries[pq->start].priority) {
391     DEBUG( fprintf(stderr, "  - at the front\n") );
392     if (pq->start == 0)
393       /* no space at the front, make some */
394       pq_realloc(pq, AT_START);
395 
396     --pq->start;
397     fill_at = pq->start;
398   }
399   else {
400     int i;
401     DEBUG( fprintf(stderr, "  - in the middle\n") );
402     i = pq_insertion_point(pq, priority);
403 
404     /* if we're near the end we want to push entries up, otherwise down */
405     if (i - pq->start > (pq->end - pq->start) / 2) {
406       DEBUG( fprintf(stderr, "    - closer to the back (%d -> [ %d %d ])\n",
407                      i, pq->start, pq->end) );
408       /* make sure we have space, this might end up copying twice,
409 	 but too bad for now */
410       if (pq->end == pq->alloc) {
411         int old_start = pq->start;
412 	pq_realloc(pq, AT_END);
413         i += pq->start - old_start;
414       }
415 
416       pq_move_items(pq, i+1, i, pq->end - i);
417       ++pq->end;
418       fill_at = i;
419     }
420     else {
421       DEBUG( fprintf(stderr, "    - closer to the front (%d -> [ %d %d ])\n",
422                      i, pq->start, pq->end) );
423       if (pq->start == 0) {
424 	pq_realloc(pq, AT_START);
425 	i += pq->start;
426       }
427       pq_move_items(pq, pq->start-1, pq->start, i - pq->start);
428       --pq->start;
429       fill_at = i-1;
430     }
431   }
432   pq->entries[fill_at].priority = priority;
433   pq->entries[fill_at].id = id;
434   pq->entries[fill_at].payload = newSVsv(payload);
435 
436   return id;
437 }
438 
439 /*
440   Note: it's up to the caller to release the SV.  The XS code does this
441   by making it mortal.
442 */
443 int
pq_dequeue_next(poe_queue * pq,pq_priority_t * priority,pq_id_t * id,SV ** payload)444 pq_dequeue_next(poe_queue *pq, pq_priority_t *priority, pq_id_t *id, SV **payload) {
445   pq_entry *entry;
446   /* the caller needs to release the payload (somehow) */
447   if (pq->start == pq->end)
448     return 0;
449 
450   entry = pq->entries + pq->start++;
451   *priority = entry->priority;
452   *id = entry->id;
453   *payload = entry->payload;
454   pq_release_id(pq, entry->id);
455 
456   return 1;
457 }
458 
459 int
pq_get_next_priority(poe_queue * pq,pq_priority_t * priority)460 pq_get_next_priority(poe_queue *pq, pq_priority_t *priority) {
461   if (pq->start == pq->end)
462     return 0;
463 
464   *priority = pq->entries[pq->start].priority;
465   return 1;
466 }
467 
468 int
pq_get_item_count(poe_queue * pq)469 pq_get_item_count(poe_queue *pq) {
470   return pq->end - pq->start;
471 }
472 
473 /*
474 pq_test_filter - the XS magic involved in passing the payload to a
475 filter function.
476 */
477 static
478 int
pq_test_filter(pq_entry * entry,SV * filter)479 pq_test_filter(pq_entry *entry, SV *filter) {
480   /* man perlcall for the magic here */
481   dSP;
482   int count;
483   SV *result_sv;
484   int result;
485 
486   ENTER;
487   SAVETMPS;
488   PUSHMARK(SP);
489   XPUSHs(sv_2mortal(newSVsv(entry->payload)));
490   PUTBACK;
491 
492   count = call_sv(filter, G_SCALAR);
493 
494   SPAGAIN;
495 
496   if (count != 1)
497     croak("got other than 1 value in scalar context");
498 
499   result_sv = POPs;
500   result = SvTRUE(result_sv);
501 
502   PUTBACK;
503   FREETMPS;
504   LEAVE;
505 
506   return result;
507 }
508 
509 /*
510 pq_find_item - search for an item we know is there.
511 
512 Internal.
513 */
514 static
515 int
pq_find_item(poe_queue * pq,pq_id_t id,pq_priority_t priority)516 pq_find_item(poe_queue *pq, pq_id_t id, pq_priority_t priority) {
517   int i;
518 
519   STATS(++pq->total_finds);
520   if (pq->end - pq->start < LARGE_QUEUE_SIZE) {
521     for (i = pq->start; i < pq->end; ++i) {
522       if (pq->entries[i].id == id)
523         return i;
524     }
525     DEBUG(fprintf(stderr, "pq_find_item %d => %f\n", id, priority) );
526     croak("Internal inconsistency: event should have been found");
527   }
528 
529   /* try a binary search */
530   /* simply translated from the perl */
531   STATS(++pq->binary_finds);
532   {
533     int lower = pq->start;
534     int upper = pq->end - 1;
535     int linear_point;
536     while (1) {
537       int midpoint = (upper + lower) >> 1;
538       if (upper < lower) {
539         croak("Internal inconsistency, priorities out of order");
540       }
541       if (priority < pq->entries[midpoint].priority) {
542         upper = midpoint - 1;
543         continue;
544       }
545       if (priority > pq->entries[midpoint].priority) {
546         lower = midpoint + 1;
547         continue;
548       }
549       linear_point = midpoint;
550       while (linear_point >= pq->start &&
551              priority == pq->entries[linear_point].priority) {
552         if (pq->entries[linear_point].id == id)
553           return linear_point;
554         --linear_point;
555       }
556       linear_point = midpoint;
557       while ( (++linear_point < pq->end) &&
558               priority == pq->entries[linear_point].priority) {
559         if (pq->entries[linear_point].id == id)
560           return linear_point;
561       }
562 
563       croak("internal inconsistency: event should have been found");
564     }
565   }
566 }
567 
568 int
pq_remove_item(poe_queue * pq,pq_id_t id,SV * filter,pq_entry * removed)569 pq_remove_item(poe_queue *pq, pq_id_t id, SV *filter, pq_entry *removed) {
570   pq_priority_t priority;
571   int index;
572 
573   if (!pq_item_priority(pq, id, &priority)) {
574     errno = ESRCH;
575     return 0;
576   }
577 
578   index = pq_find_item(pq, id, priority);
579 
580   if (!pq_test_filter(pq->entries + index, filter)) {
581     errno = EPERM;
582     return 0;
583   }
584 
585   *removed = pq->entries[index];
586   pq_release_id(pq, id);
587   if (index == pq->start) {
588     ++pq->start;
589   }
590   else if (index == pq->end - 1) {
591     --pq->end;
592   }
593   else {
594     pq_move_items(pq, index, index+1, pq->end - index - 1);
595     --pq->end;
596   }
597   DEBUG( fprintf(stderr, "removed (%d, %p (%d))\n", id, removed->payload,
598 		 SvREFCNT(removed->payload)) );
599 
600   return 1;
601 }
602 
603 int
pq_remove_items(poe_queue * pq,SV * filter,int max_count,pq_entry ** entries)604 pq_remove_items(poe_queue *pq, SV *filter, int max_count, pq_entry **entries) {
605   int in_index, out_index;
606   int remove_count = 0;
607 
608   *entries = NULL;
609   if (pq->start == pq->end)
610     return 0;
611 
612   *entries = mymalloc(sizeof(pq_entry) * (pq->end - pq->start));
613   if (!*entries)
614     croak("Out of memory");
615 
616   in_index = out_index = pq->start;
617   while (in_index < pq->end && remove_count < max_count) {
618     if (pq_test_filter(pq->entries + in_index, filter)) {
619       pq_release_id(pq, pq->entries[in_index].id);
620       (*entries)[remove_count++] = pq->entries[in_index++];
621     }
622     else {
623       pq->entries[out_index++] = pq->entries[in_index++];
624     }
625   }
626   while (in_index < pq->end) {
627     pq->entries[out_index++] = pq->entries[in_index++];
628   }
629   pq->end = out_index;
630 
631   return remove_count;
632 }
633 
634 /*
635 We need to keep the following 2 functions in sync (or combine the
636 common code.)
637 */
638 int
pq_set_priority(poe_queue * pq,pq_id_t id,SV * filter,pq_priority_t new_priority)639 pq_set_priority(poe_queue *pq, pq_id_t id, SV *filter, pq_priority_t new_priority) {
640   pq_priority_t old_priority;
641   int index, insert_at;
642 
643   if (!pq_item_priority(pq, id, &old_priority)) {
644     errno = ESRCH;
645     return 0;
646   }
647 
648   index = pq_find_item(pq, id, old_priority);
649 
650   if (!pq_test_filter(pq->entries + index, filter)) {
651     errno = EPERM;
652     return 0;
653   }
654 
655   DEBUG( fprintf(stderr, " - index %d  oldp %f newp %f\n", index, old_priority, new_priority) );
656 
657   if (pq->end - pq->start == 1) {
658     DEBUG( fprintf(stderr, "   -- one item\n") );
659     /* only the one item anyway */
660     pq->entries[pq->start].priority = new_priority;
661   }
662   else {
663     insert_at = pq_insertion_point(pq, new_priority);
664     DEBUG( fprintf(stderr, "   - new index %d\n", insert_at) );
665     /* the item is still in the queue, so either side of it means it
666        won't move */
667     if (insert_at == index || insert_at == index+1) {
668       DEBUG( fprintf(stderr, "   -- change in place\n") );
669       pq->entries[index].priority = new_priority;
670     }
671     else {
672       pq_entry saved = pq->entries[index];
673       saved.priority = new_priority;
674 
675       if (insert_at < index) {
676         DEBUG( fprintf(stderr, "  - insert_at < index\n") );
677 	pq_move_items(pq, insert_at + 1, insert_at, index - insert_at);
678         pq->entries[insert_at] = saved;
679       }
680       else {
681         DEBUG( fprintf(stderr, "  - insert_at > index\n") );
682 	--insert_at;
683 	pq_move_items(pq, index, index + 1, insert_at - index);
684         pq->entries[insert_at] = saved;
685       }
686     }
687   }
688 
689   pq_set_id_priority(pq, id, new_priority);
690 
691   return 1;
692 }
693 
694 int
pq_adjust_priority(poe_queue * pq,pq_id_t id,SV * filter,double delta,pq_priority_t * priority)695 pq_adjust_priority(poe_queue *pq, pq_id_t id, SV *filter, double delta, pq_priority_t *priority) {
696   pq_priority_t old_priority, new_priority;
697   int index, insert_at;
698 
699   DEBUG( fprintf(stderr, "pq_adjust_priority(..., %d, %p, %f, ...)\n", id, filter, delta) );
700 
701   if (!pq_item_priority(pq, id, &old_priority)) {
702     errno = ESRCH;
703     return 0;
704   }
705 
706   index = pq_find_item(pq, id, old_priority);
707 
708   if (!pq_test_filter(pq->entries + index, filter)) {
709     errno = EPERM;
710     return 0;
711   }
712 
713   new_priority = old_priority + delta;
714 
715   DEBUG( fprintf(stderr, " - index %d  oldp %f newp %f\n", index, old_priority, new_priority) );
716 
717   if (pq->end - pq->start == 1) {
718     DEBUG( fprintf(stderr, "   -- one item\n") );
719     /* only the one item anyway */
720     pq->entries[pq->start].priority = new_priority;
721   }
722   else {
723     insert_at = pq_insertion_point(pq, new_priority);
724     DEBUG( fprintf(stderr, "   - new index %d\n", insert_at) );
725     /* the item is still in the queue, so either side of it means it
726        won't move */
727     if (insert_at == index || insert_at == index+1) {
728       DEBUG( fprintf(stderr, "   -- change in place\n") );
729       pq->entries[index].priority = new_priority;
730     }
731     else {
732       pq_entry saved = pq->entries[index];
733       saved.priority = new_priority;
734 
735       if (insert_at < index) {
736         DEBUG( fprintf(stderr, "  - insert_at < index\n") );
737 	pq_move_items(pq, insert_at + 1, insert_at, index - insert_at);
738         pq->entries[insert_at] = saved;
739       }
740       else {
741         DEBUG( fprintf(stderr, "  - insert_at > index\n") );
742 	--insert_at;
743 	pq_move_items(pq, index, index + 1, insert_at - index);
744         pq->entries[insert_at] = saved;
745       }
746     }
747   }
748 
749   pq_set_id_priority(pq, id, new_priority);
750   *priority = new_priority;
751 
752   return 1;
753 }
754 
755 int
pq_peek_items(poe_queue * pq,SV * filter,int max_count,pq_entry ** items)756 pq_peek_items(poe_queue *pq, SV *filter, int max_count, pq_entry **items) {
757   int count = 0;
758   int i;
759 
760   *items = NULL;
761   if (pq->end == pq->start)
762     return 0;
763 
764   *items = mymalloc(sizeof(pq_entry) * (pq->end - pq->start));
765   for (i = pq->start; i < pq->end; ++i) {
766     if (pq_test_filter(pq->entries + i, filter)) {
767       (*items)[count++] = pq->entries[i];
768     }
769   }
770   if (!count) {
771     myfree(*items);
772     *items = NULL;
773   }
774 
775   return count;
776 }
777 
778 /*
779 pq_dump - dump the internals of the queue structure.
780 */
781 void
pq_dump(poe_queue * pq)782 pq_dump(poe_queue *pq) {
783   int i;
784   HE *he;
785 
786   fprintf(stderr, "poe_queue\n");
787   fprintf(stderr, "  start: %d\n", pq->start);
788   fprintf(stderr, "    end: %d\n", pq->end);
789   fprintf(stderr, "  alloc: %d\n", pq->alloc);
790   fprintf(stderr, "    seq: %d\n", pq->queue_seq);
791   fprintf(stderr, "  **Queue Entries:\n"
792          "      index:   id  priority    SV\n");
793   for (i = pq->start; i < pq->end; ++i) {
794     pq_entry *entry = pq->entries + i;
795     fprintf(stderr, "      %5d: %5d %8f  %p (%u)\n", i, entry->id, entry->priority,
796 	   entry->payload, (unsigned)SvREFCNT(entry->payload));
797   }
798   fprintf(stderr, "  **Hash entries:\n");
799   hv_iterinit(pq->ids);
800   while ((he = hv_iternext(pq->ids)) != NULL) {
801     STRLEN len;
802     fprintf(stderr, "   %d => %f\n", *(pq_id_t *)HePV(he, len), SvNV(hv_iterval(pq->ids, he)));
803   }
804 }
805 
806 /*
807 pq_verify - basic verification of the structure of the queue
808 
809 For now check for duplicate ids in sequence.
810 */
811 void
pq_verify(poe_queue * pq)812 pq_verify(poe_queue *pq) {
813   int i;
814   int lastid;
815   int found_err = 0;
816 
817   if (pq->start != pq->end) {
818     lastid = pq->entries[pq->start].id;
819     i = pq->start + 1;
820     while (i < pq->end) {
821       if (pq->entries[i].id == lastid) {
822         fprintf(stderr, "Duplicate id %d at %d\n", lastid, i);
823         ++found_err;
824       }
825       ++i;
826     }
827   }
828   if (found_err) {
829     pq_dump(pq);
830     exit(1);
831   }
832 }
833 
834 /*
835 pq__set_errno_queue - set errno
836 
837 This just sets errno for testing purposes.
838 */
839 void
pq__set_errno_queue(int value)840 pq__set_errno_queue(int value) {
841   errno = value;
842 }
843 
844