1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 
9 
10 #define NXT_POLL_ADD     0
11 #define NXT_POLL_CHANGE  1
12 #define NXT_POLL_DELETE  2
13 
14 
15 typedef struct {
16     /*
17      * A file descriptor is stored in hash entry to allow
18      * nxt_poll_fd_hash_test() to not dereference a pointer to
19      * nxt_fd_event_t which may be invalid if the file descriptor has
20      * been already closed and the nxt_fd_event_t's memory has been freed.
21      */
22     nxt_socket_t         fd;
23 
24     uint32_t             index;
25     void                 *event;
26 } nxt_poll_hash_entry_t;
27 
28 
29 static nxt_int_t nxt_poll_create(nxt_event_engine_t *engine,
30     nxt_uint_t mchanges, nxt_uint_t mevents);
31 static void nxt_poll_free(nxt_event_engine_t *engine);
32 static void nxt_poll_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev);
33 static void nxt_poll_disable(nxt_event_engine_t *engine,
34     nxt_fd_event_t *ev);
35 static nxt_bool_t nxt_poll_close(nxt_event_engine_t *engine,
36     nxt_fd_event_t *ev);
37 static void nxt_poll_enable_read(nxt_event_engine_t *engine,
38     nxt_fd_event_t *ev);
39 static void nxt_poll_enable_write(nxt_event_engine_t *engine,
40     nxt_fd_event_t *ev);
41 static void nxt_poll_disable_read(nxt_event_engine_t *engine,
42     nxt_fd_event_t *ev);
43 static void nxt_poll_disable_write(nxt_event_engine_t *engine,
44     nxt_fd_event_t *ev);
45 static void nxt_poll_block_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev);
46 static void nxt_poll_block_write(nxt_event_engine_t *engine,
47     nxt_fd_event_t *ev);
48 static void nxt_poll_oneshot_read(nxt_event_engine_t *engine,
49     nxt_fd_event_t *ev);
50 static void nxt_poll_oneshot_write(nxt_event_engine_t *engine,
51     nxt_fd_event_t *ev);
52 static void nxt_poll_change(nxt_event_engine_t *engine, nxt_fd_event_t *ev,
53     nxt_uint_t op, nxt_uint_t events);
54 static nxt_int_t nxt_poll_commit_changes(nxt_event_engine_t *engine);
55 static nxt_int_t nxt_poll_set_add(nxt_event_engine_t *engine,
56     nxt_fd_event_t *ev, int events);
57 static nxt_int_t nxt_poll_set_change(nxt_event_engine_t *engine,
58     nxt_fd_t fd, int events);
59 static nxt_int_t nxt_poll_set_delete(nxt_event_engine_t *engine, nxt_fd_t fd);
60 static void nxt_poll(nxt_event_engine_t *engine, nxt_msec_t timeout);
61 static nxt_poll_hash_entry_t *nxt_poll_fd_hash_get(nxt_event_engine_t *engine,
62     nxt_fd_t fd);
63 static nxt_int_t nxt_poll_fd_hash_test(nxt_lvlhsh_query_t *lhq, void *data);
64 static void nxt_poll_fd_hash_destroy(nxt_event_engine_t *engine,
65     nxt_lvlhsh_t *lh);
66 
67 
68 const nxt_event_interface_t  nxt_poll_engine = {
69     "poll",
70     nxt_poll_create,
71     nxt_poll_free,
72     nxt_poll_enable,
73     nxt_poll_disable,
74     nxt_poll_disable,
75     nxt_poll_close,
76     nxt_poll_enable_read,
77     nxt_poll_enable_write,
78     nxt_poll_disable_read,
79     nxt_poll_disable_write,
80     nxt_poll_block_read,
81     nxt_poll_block_write,
82     nxt_poll_oneshot_read,
83     nxt_poll_oneshot_write,
84     nxt_poll_enable_read,
85     NULL,
86     NULL,
87     NULL,
88     NULL,
89     nxt_poll,
90 
91     &nxt_unix_conn_io,
92 
93     NXT_NO_FILE_EVENTS,
94     NXT_NO_SIGNAL_EVENTS,
95 };
96 
97 
98 static const nxt_lvlhsh_proto_t  nxt_poll_fd_hash_proto  nxt_aligned(64) =
99 {
100     NXT_LVLHSH_LARGE_MEMALIGN,
101     nxt_poll_fd_hash_test,
102     nxt_lvlhsh_alloc,
103     nxt_lvlhsh_free,
104 };
105 
106 
107 static nxt_int_t
nxt_poll_create(nxt_event_engine_t * engine,nxt_uint_t mchanges,nxt_uint_t mevents)108 nxt_poll_create(nxt_event_engine_t *engine, nxt_uint_t mchanges,
109     nxt_uint_t mevents)
110 {
111     engine->u.poll.mchanges = mchanges;
112 
113     engine->u.poll.changes = nxt_malloc(sizeof(nxt_poll_change_t) * mchanges);
114 
115     if (engine->u.poll.changes != NULL) {
116         return NXT_OK;
117     }
118 
119     return NXT_ERROR;
120 }
121 
122 
123 static void
nxt_poll_free(nxt_event_engine_t * engine)124 nxt_poll_free(nxt_event_engine_t *engine)
125 {
126     nxt_debug(&engine->task, "poll free");
127 
128     nxt_free(engine->u.poll.set);
129     nxt_free(engine->u.poll.changes);
130     nxt_poll_fd_hash_destroy(engine, &engine->u.poll.fd_hash);
131 
132     nxt_memzero(&engine->u.poll, sizeof(nxt_poll_engine_t));
133 }
134 
135 
136 static void
nxt_poll_enable(nxt_event_engine_t * engine,nxt_fd_event_t * ev)137 nxt_poll_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
138 {
139     ev->read = NXT_EVENT_ACTIVE;
140     ev->write = NXT_EVENT_ACTIVE;
141 
142     nxt_poll_change(engine, ev, NXT_POLL_ADD, POLLIN | POLLOUT);
143 }
144 
145 
146 static void
nxt_poll_disable(nxt_event_engine_t * engine,nxt_fd_event_t * ev)147 nxt_poll_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
148 {
149     if (ev->read != NXT_EVENT_INACTIVE && ev->write != NXT_EVENT_INACTIVE) {
150         ev->read = NXT_EVENT_INACTIVE;
151         ev->write = NXT_EVENT_INACTIVE;
152 
153         nxt_poll_change(engine, ev, NXT_POLL_DELETE, 0);
154     }
155 }
156 
157 
158 static nxt_bool_t
nxt_poll_close(nxt_event_engine_t * engine,nxt_fd_event_t * ev)159 nxt_poll_close(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
160 {
161     nxt_poll_disable(engine, ev);
162 
163     return ev->changing;
164 }
165 
166 
167 static void
nxt_poll_enable_read(nxt_event_engine_t * engine,nxt_fd_event_t * ev)168 nxt_poll_enable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
169 {
170     nxt_uint_t  op, events;
171 
172     ev->read = NXT_EVENT_ACTIVE;
173 
174     if (ev->write == NXT_EVENT_INACTIVE) {
175         op = NXT_POLL_ADD;
176         events = POLLIN;
177 
178     } else {
179         op = NXT_POLL_CHANGE;
180         events = POLLIN | POLLOUT;
181     }
182 
183     nxt_poll_change(engine, ev, op, events);
184 }
185 
186 
187 static void
nxt_poll_enable_write(nxt_event_engine_t * engine,nxt_fd_event_t * ev)188 nxt_poll_enable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
189 {
190     nxt_uint_t  op, events;
191 
192     ev->write = NXT_EVENT_ACTIVE;
193 
194     if (ev->read == NXT_EVENT_INACTIVE) {
195         op = NXT_POLL_ADD;
196         events = POLLOUT;
197 
198     } else {
199         op = NXT_POLL_CHANGE;
200         events = POLLIN | POLLOUT;
201     }
202 
203     nxt_poll_change(engine, ev, op, events);
204 }
205 
206 
207 static void
nxt_poll_disable_read(nxt_event_engine_t * engine,nxt_fd_event_t * ev)208 nxt_poll_disable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
209 {
210     nxt_uint_t  op, events;
211 
212     ev->read = NXT_EVENT_INACTIVE;
213 
214     if (ev->write == NXT_EVENT_INACTIVE) {
215         op = NXT_POLL_DELETE;
216         events = 0;
217 
218     } else {
219         op = NXT_POLL_CHANGE;
220         events = POLLOUT;
221     }
222 
223     nxt_poll_change(engine, ev, op, events);
224 }
225 
226 
227 static void
nxt_poll_disable_write(nxt_event_engine_t * engine,nxt_fd_event_t * ev)228 nxt_poll_disable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
229 {
230     nxt_uint_t  op, events;
231 
232     ev->write = NXT_EVENT_INACTIVE;
233 
234     if (ev->read == NXT_EVENT_INACTIVE) {
235         op = NXT_POLL_DELETE;
236         events = 0;
237 
238     } else {
239         op = NXT_POLL_CHANGE;
240         events = POLLIN;
241     }
242 
243     nxt_poll_change(engine, ev, op, events);
244 }
245 
246 
247 static void
nxt_poll_block_read(nxt_event_engine_t * engine,nxt_fd_event_t * ev)248 nxt_poll_block_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
249 {
250     if (ev->read != NXT_EVENT_INACTIVE) {
251         nxt_poll_disable_read(engine, ev);
252     }
253 }
254 
255 
256 static void
nxt_poll_block_write(nxt_event_engine_t * engine,nxt_fd_event_t * ev)257 nxt_poll_block_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
258 {
259     if (ev->write != NXT_EVENT_INACTIVE) {
260         nxt_poll_disable_write(engine, ev);
261     }
262 }
263 
264 
265 static void
nxt_poll_oneshot_read(nxt_event_engine_t * engine,nxt_fd_event_t * ev)266 nxt_poll_oneshot_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
267 {
268     nxt_uint_t  op;
269 
270     op = (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) ?
271              NXT_POLL_ADD : NXT_POLL_CHANGE;
272 
273     ev->read = NXT_EVENT_ONESHOT;
274     ev->write = NXT_EVENT_INACTIVE;
275 
276     nxt_poll_change(engine, ev, op, POLLIN);
277 }
278 
279 
280 static void
nxt_poll_oneshot_write(nxt_event_engine_t * engine,nxt_fd_event_t * ev)281 nxt_poll_oneshot_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
282 {
283     nxt_uint_t  op;
284 
285     op = (ev->read == NXT_EVENT_INACTIVE && ev->write == NXT_EVENT_INACTIVE) ?
286              NXT_POLL_ADD : NXT_POLL_CHANGE;
287 
288     ev->read = NXT_EVENT_INACTIVE;
289     ev->write = NXT_EVENT_ONESHOT;
290 
291     nxt_poll_change(engine, ev, op, POLLOUT);
292 }
293 
294 
295 /*
296  * poll changes are batched to improve instruction and data cache
297  * locality of several lvlhsh operations followed by poll() call.
298  */
299 
300 static void
nxt_poll_change(nxt_event_engine_t * engine,nxt_fd_event_t * ev,nxt_uint_t op,nxt_uint_t events)301 nxt_poll_change(nxt_event_engine_t *engine, nxt_fd_event_t *ev, nxt_uint_t op,
302     nxt_uint_t events)
303 {
304     nxt_poll_change_t  *change;
305 
306     nxt_debug(ev->task, "poll change: fd:%d op:%d ev:%XD", ev->fd, op, events);
307 
308     if (engine->u.poll.nchanges >= engine->u.poll.mchanges) {
309         (void) nxt_poll_commit_changes(engine);
310     }
311 
312     ev->changing = 1;
313 
314     change = &engine->u.poll.changes[engine->u.poll.nchanges++];
315     change->op = op;
316     change->events = events;
317     change->event = ev;
318 }
319 
320 
321 static nxt_int_t
nxt_poll_commit_changes(nxt_event_engine_t * engine)322 nxt_poll_commit_changes(nxt_event_engine_t *engine)
323 {
324     nxt_int_t          ret, retval;
325     nxt_fd_event_t     *ev;
326     nxt_poll_change_t  *change, *end;
327 
328     nxt_debug(&engine->task, "poll changes:%ui", engine->u.poll.nchanges);
329 
330     retval = NXT_OK;
331     change = engine->u.poll.changes;
332     end = change + engine->u.poll.nchanges;
333 
334     do {
335         ev = change->event;
336         ev->changing = 0;
337 
338         switch (change->op) {
339 
340         case NXT_POLL_ADD:
341             ret = nxt_poll_set_add(engine, ev, change->events);
342 
343             if (nxt_fast_path(ret == NXT_OK)) {
344                 goto next;
345             }
346 
347             break;
348 
349         case NXT_POLL_CHANGE:
350             ret = nxt_poll_set_change(engine, ev->fd, change->events);
351 
352             if (nxt_fast_path(ret == NXT_OK)) {
353                 goto next;
354             }
355 
356             break;
357 
358         case NXT_POLL_DELETE:
359             ret = nxt_poll_set_delete(engine, ev->fd);
360 
361             if (nxt_fast_path(ret == NXT_OK)) {
362                 goto next;
363             }
364 
365             break;
366         }
367 
368         nxt_work_queue_add(&engine->fast_work_queue, ev->error_handler,
369                            ev->task, ev, ev->data);
370 
371         retval = NXT_ERROR;
372 
373     next:
374 
375         change++;
376 
377     } while (change < end);
378 
379     engine->u.poll.nchanges = 0;
380 
381     return retval;
382 }
383 
384 
385 static nxt_int_t
nxt_poll_set_add(nxt_event_engine_t * engine,nxt_fd_event_t * ev,int events)386 nxt_poll_set_add(nxt_event_engine_t *engine, nxt_fd_event_t *ev, int events)
387 {
388     nxt_int_t              ret;
389     nxt_uint_t             max_nfds;
390     struct pollfd          *pfd;
391     nxt_lvlhsh_query_t     lhq;
392     nxt_poll_hash_entry_t  *phe;
393 
394     nxt_debug(&engine->task, "poll add event: fd:%d ev:%04Xi", ev->fd, events);
395 
396     if (engine->u.poll.nfds >= engine->u.poll.max_nfds) {
397         max_nfds = engine->u.poll.max_nfds + 512; /* 4K */
398 
399         pfd = nxt_realloc(engine->u.poll.set, sizeof(struct pollfd) * max_nfds);
400         if (nxt_slow_path(pfd == NULL)) {
401             return NXT_ERROR;
402         }
403 
404         engine->u.poll.set = pfd;
405         engine->u.poll.max_nfds = max_nfds;
406     }
407 
408     phe = nxt_malloc(sizeof(nxt_poll_hash_entry_t));
409     if (nxt_slow_path(phe == NULL)) {
410         return NXT_ERROR;
411     }
412 
413     phe->fd = ev->fd;
414     phe->index = engine->u.poll.nfds;
415     phe->event = ev;
416 
417     pfd = &engine->u.poll.set[engine->u.poll.nfds++];
418     pfd->fd = ev->fd;
419     pfd->events = events;
420     pfd->revents = 0;
421 
422     lhq.key_hash = nxt_murmur_hash2(&ev->fd, sizeof(nxt_fd_t));
423     lhq.replace = 0;
424     lhq.value = phe;
425     lhq.proto = &nxt_poll_fd_hash_proto;
426     lhq.data = engine;
427 
428     ret = nxt_lvlhsh_insert(&engine->u.poll.fd_hash, &lhq);
429 
430     if (nxt_fast_path(ret == NXT_OK)) {
431         return NXT_OK;
432     }
433 
434     nxt_free(phe);
435 
436     return NXT_ERROR;
437 }
438 
439 
440 static nxt_int_t
nxt_poll_set_change(nxt_event_engine_t * engine,nxt_fd_t fd,int events)441 nxt_poll_set_change(nxt_event_engine_t *engine, nxt_fd_t fd, int events)
442 {
443     nxt_poll_hash_entry_t  *phe;
444 
445     nxt_debug(&engine->task, "poll change event: fd:%d ev:%04Xi",
446               fd, events);
447 
448     phe = nxt_poll_fd_hash_get(engine, fd);
449 
450     if (nxt_fast_path(phe != NULL)) {
451         engine->u.poll.set[phe->index].events = events;
452         return NXT_OK;
453     }
454 
455     return NXT_ERROR;
456 }
457 
458 
459 static nxt_int_t
nxt_poll_set_delete(nxt_event_engine_t * engine,nxt_fd_t fd)460 nxt_poll_set_delete(nxt_event_engine_t *engine, nxt_fd_t fd)
461 {
462     nxt_int_t              ret;
463     nxt_uint_t             index, nfds;
464     nxt_lvlhsh_query_t     lhq;
465     nxt_poll_hash_entry_t  *phe;
466 
467     nxt_debug(&engine->task, "poll delete event: fd:%d", fd);
468 
469     lhq.key_hash = nxt_murmur_hash2(&fd, sizeof(nxt_fd_t));
470     lhq.proto = &nxt_poll_fd_hash_proto;
471     lhq.data = engine;
472 
473     ret = nxt_lvlhsh_delete(&engine->u.poll.fd_hash, &lhq);
474 
475     if (nxt_slow_path(ret != NXT_OK)) {
476         return NXT_ERROR;
477     }
478 
479     phe = lhq.value;
480 
481     index = phe->index;
482     engine->u.poll.nfds--;
483     nfds = engine->u.poll.nfds;
484 
485     if (index != nfds) {
486         engine->u.poll.set[index] = engine->u.poll.set[nfds];
487 
488         phe = nxt_poll_fd_hash_get(engine, engine->u.poll.set[nfds].fd);
489 
490         phe->index = index;
491     }
492 
493     nxt_free(lhq.value);
494 
495     return NXT_OK;
496 }
497 
498 
499 static void
nxt_poll(nxt_event_engine_t * engine,nxt_msec_t timeout)500 nxt_poll(nxt_event_engine_t *engine, nxt_msec_t timeout)
501 {
502     int                    nevents;
503     nxt_fd_t               fd;
504     nxt_err_t              err;
505     nxt_bool_t             error;
506     nxt_uint_t             i, events, level;
507     struct pollfd          *pfd;
508     nxt_fd_event_t         *ev;
509     nxt_poll_hash_entry_t  *phe;
510 
511     if (engine->u.poll.nchanges != 0) {
512         if (nxt_poll_commit_changes(engine) != NXT_OK) {
513             /* Error handlers have been enqueued on failure. */
514             timeout = 0;
515         }
516     }
517 
518     nxt_debug(&engine->task, "poll() events:%ui timeout:%M",
519               engine->u.poll.nfds, timeout);
520 
521     nevents = poll(engine->u.poll.set, engine->u.poll.nfds, timeout);
522 
523     err = (nevents == -1) ? nxt_errno : 0;
524 
525     nxt_thread_time_update(engine->task.thread);
526 
527     nxt_debug(&engine->task, "poll(): %d", nevents);
528 
529     if (nevents == -1) {
530         level = (err == NXT_EINTR) ? NXT_LOG_INFO : NXT_LOG_ALERT;
531         nxt_log(&engine->task, level, "poll() failed %E", err);
532         return;
533     }
534 
535     for (i = 0; i < engine->u.poll.nfds && nevents != 0; i++) {
536 
537         pfd = &engine->u.poll.set[i];
538         events = pfd->revents;
539 
540         if (events == 0) {
541             continue;
542         }
543 
544         fd = pfd->fd;
545 
546         phe = nxt_poll_fd_hash_get(engine, fd);
547 
548         if (nxt_slow_path(phe == NULL)) {
549             nxt_alert(&engine->task,
550                       "poll() returned invalid fd:%d ev:%04Xd rev:%04uXi",
551                       fd, pfd->events, events);
552 
553             /* Mark the poll entry to ignore it by the kernel. */
554             pfd->fd = -1;
555             goto next;
556         }
557 
558         ev = phe->event;
559 
560         nxt_debug(ev->task, "poll: fd:%d ev:%04uXi rd:%d wr:%d",
561                   fd, events, ev->read, ev->write);
562 
563         if (nxt_slow_path((events & POLLNVAL) != 0)) {
564             nxt_alert(ev->task, "poll() error fd:%d ev:%04Xd rev:%04uXi",
565                       fd, pfd->events, events);
566 
567             /* Mark the poll entry to ignore it by the kernel. */
568             pfd->fd = -1;
569 
570             nxt_work_queue_add(&engine->fast_work_queue,
571                                ev->error_handler, ev->task, ev, ev->data);
572             goto next;
573         }
574 
575         /*
576          * On a socket's remote end close:
577          *
578          *   Linux, FreeBSD, and Solaris set POLLIN;
579          *   MacOSX sets POLLIN and POLLHUP;
580          *   NetBSD sets POLLIN, and poll(2) claims this explicitly:
581          *
582          *     If the remote end of a socket is closed, poll()
583          *     returns a POLLIN event, rather than a POLLHUP.
584          *
585          * On error:
586          *
587          *   Linux sets POLLHUP and POLLERR only;
588          *   FreeBSD adds POLLHUP to POLLIN or POLLOUT, although poll(2)
589          *   claims the opposite:
590          *
591          *     Note that POLLHUP and POLLOUT should never be
592          *     present in the revents bitmask at the same time.
593          *
594          *   Solaris and NetBSD do not add POLLHUP or POLLERR;
595          *   MacOSX sets POLLHUP only.
596          *
597          * If an implementation sets POLLERR or POLLHUP only without POLLIN
598          * or POLLOUT, the "error" variable enqueues only one active handler.
599          */
600 
601         error = (((events & (POLLERR | POLLHUP)) != 0)
602                  && ((events & (POLLIN | POLLOUT)) == 0));
603 
604         if ((events & POLLIN) || (error && ev->read_handler != NULL)) {
605             error = 0;
606             ev->read_ready = 1;
607 
608             if (ev->read == NXT_EVENT_ONESHOT) {
609                 ev->read = NXT_EVENT_INACTIVE;
610                 nxt_poll_change(engine, ev, NXT_POLL_DELETE, 0);
611             }
612 
613             nxt_work_queue_add(ev->read_work_queue, ev->read_handler,
614                                ev->task, ev, ev->data);
615         }
616 
617         if ((events & POLLOUT) || (error && ev->write_handler != NULL)) {
618             ev->write_ready = 1;
619 
620             if (ev->write == NXT_EVENT_ONESHOT) {
621                 ev->write = NXT_EVENT_INACTIVE;
622                 nxt_poll_change(engine, ev, NXT_POLL_DELETE, 0);
623             }
624 
625             nxt_work_queue_add(ev->write_work_queue, ev->write_handler,
626                                ev->task, ev, ev->data);
627         }
628 
629     next:
630 
631         nevents--;
632     }
633 }
634 
635 
636 static nxt_poll_hash_entry_t *
nxt_poll_fd_hash_get(nxt_event_engine_t * engine,nxt_fd_t fd)637 nxt_poll_fd_hash_get(nxt_event_engine_t *engine, nxt_fd_t fd)
638 {
639     nxt_lvlhsh_query_t     lhq;
640     nxt_poll_hash_entry_t  *phe;
641 
642     lhq.key_hash = nxt_murmur_hash2(&fd, sizeof(nxt_fd_t));
643     lhq.proto = &nxt_poll_fd_hash_proto;
644     lhq.data = engine;
645 
646     if (nxt_lvlhsh_find(&engine->u.poll.fd_hash, &lhq) == NXT_OK) {
647         phe = lhq.value;
648         return phe;
649     }
650 
651     nxt_alert(&engine->task, "fd %d not found in hash", fd);
652 
653     return NULL;
654 }
655 
656 
657 static nxt_int_t
nxt_poll_fd_hash_test(nxt_lvlhsh_query_t * lhq,void * data)658 nxt_poll_fd_hash_test(nxt_lvlhsh_query_t *lhq, void *data)
659 {
660     nxt_event_engine_t     *engine;
661     nxt_poll_hash_entry_t  *phe;
662 
663     phe = data;
664 
665     /* nxt_murmur_hash2() is unique for 4 bytes. */
666 
667     engine = lhq->data;
668 
669     if (nxt_fast_path(phe->fd == engine->u.poll.set[phe->index].fd)) {
670         return NXT_OK;
671     }
672 
673     nxt_alert(&engine->task, "fd %d in hash mismatches fd %d in poll set",
674               phe->fd, engine->u.poll.set[phe->index].fd);
675 
676     return NXT_DECLINED;
677 }
678 
679 
680 static void
nxt_poll_fd_hash_destroy(nxt_event_engine_t * engine,nxt_lvlhsh_t * lh)681 nxt_poll_fd_hash_destroy(nxt_event_engine_t *engine, nxt_lvlhsh_t *lh)
682 {
683     nxt_poll_hash_entry_t  *phe;
684 
685     for ( ;; ) {
686         phe = nxt_lvlhsh_retrieve(lh, &nxt_poll_fd_hash_proto, NULL);
687 
688         if (phe == NULL) {
689             return;
690         }
691 
692         nxt_free(phe);
693     }
694 }
695