1
2 /*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7 #include <nxt_main.h>
8
9
10 /*
11 * "/dev/poll" has been introduced in Solaris 7 (11/99), HP-UX 11.22 (named
12 * "eventport pseudo driver" internally, not to be confused with Solaris 10
13 * event ports), IRIX 6.5.15, and Tru64 UNIX 5.1A.
14 *
15 * Although "/dev/poll" descriptor is a file descriptor, nevertheless
16 * it cannot be added to another poll set, Solaris poll(7d):
17 *
18 * The /dev/poll driver does not yet support polling. Polling on a
19 * /dev/poll file descriptor will result in POLLERR being returned
20 * in the revents field of pollfd structure.
21 */
22
23
24 #define NXT_DEVPOLL_ADD 0
25 #define NXT_DEVPOLL_UPDATE 1
26 #define NXT_DEVPOLL_CHANGE 2
27 #define NXT_DEVPOLL_DELETE 3
28
29
30 static nxt_int_t nxt_devpoll_create(nxt_event_engine_t *engine,
31 nxt_uint_t mchanges, nxt_uint_t mevents);
32 static void nxt_devpoll_free(nxt_event_engine_t *engine);
33 static void nxt_devpoll_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev);
34 static void nxt_devpoll_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev);
35 static nxt_bool_t nxt_devpoll_close(nxt_event_engine_t *engine,
36 nxt_fd_event_t *ev);
37 static void nxt_devpoll_enable_read(nxt_event_engine_t *engine,
38 nxt_fd_event_t *ev);
39 static void nxt_devpoll_enable_write(nxt_event_engine_t *engine,
40 nxt_fd_event_t *ev);
41 static void nxt_devpoll_disable_read(nxt_event_engine_t *engine,
42 nxt_fd_event_t *ev);
43 static void nxt_devpoll_disable_write(nxt_event_engine_t *engine,
44 nxt_fd_event_t *ev);
45 static void nxt_devpoll_block_read(nxt_event_engine_t *engine,
46 nxt_fd_event_t *ev);
47 static void nxt_devpoll_block_write(nxt_event_engine_t *engine,
48 nxt_fd_event_t *ev);
49 static void nxt_devpoll_oneshot_read(nxt_event_engine_t *engine,
50 nxt_fd_event_t *ev);
51 static void nxt_devpoll_oneshot_write(nxt_event_engine_t *engine,
52 nxt_fd_event_t *ev);
53 static void nxt_devpoll_change(nxt_event_engine_t *engine, nxt_fd_event_t *ev,
54 nxt_uint_t op, nxt_uint_t events);
55 static nxt_int_t nxt_devpoll_commit_changes(nxt_event_engine_t *engine);
56 static void nxt_devpoll_change_error(nxt_event_engine_t *engine,
57 nxt_fd_event_t *ev);
58 static void nxt_devpoll_remove(nxt_event_engine_t *engine, nxt_fd_t fd);
59 static nxt_int_t nxt_devpoll_write(nxt_event_engine_t *engine,
60 struct pollfd *pfd, size_t n);
61 static void nxt_devpoll_poll(nxt_event_engine_t *engine,
62 nxt_msec_t timeout);
63
64
65 const nxt_event_interface_t nxt_devpoll_engine = {
66 "devpoll",
67 nxt_devpoll_create,
68 nxt_devpoll_free,
69 nxt_devpoll_enable,
70 nxt_devpoll_disable,
71 nxt_devpoll_disable,
72 nxt_devpoll_close,
73 nxt_devpoll_enable_read,
74 nxt_devpoll_enable_write,
75 nxt_devpoll_disable_read,
76 nxt_devpoll_disable_write,
77 nxt_devpoll_block_read,
78 nxt_devpoll_block_write,
79 nxt_devpoll_oneshot_read,
80 nxt_devpoll_oneshot_write,
81 nxt_devpoll_enable_read,
82 NULL,
83 NULL,
84 NULL,
85 NULL,
86 nxt_devpoll_poll,
87
88 &nxt_unix_conn_io,
89
90 NXT_NO_FILE_EVENTS,
91 NXT_NO_SIGNAL_EVENTS,
92 };
93
94
95 static nxt_int_t
nxt_devpoll_create(nxt_event_engine_t * engine,nxt_uint_t mchanges,nxt_uint_t mevents)96 nxt_devpoll_create(nxt_event_engine_t *engine, nxt_uint_t mchanges,
97 nxt_uint_t mevents)
98 {
99 void *changes;
100
101 engine->u.devpoll.fd = -1;
102 engine->u.devpoll.mchanges = mchanges;
103 engine->u.devpoll.mevents = mevents;
104
105 changes = nxt_malloc(sizeof(nxt_devpoll_change_t) * mchanges);
106 if (changes == NULL) {
107 goto fail;
108 }
109
110 engine->u.devpoll.changes = changes;
111
112 /*
113 * NXT_DEVPOLL_CHANGE requires two struct pollfd's:
114 * for POLLREMOVE and subsequent POLLIN or POLLOUT.
115 */
116 changes = nxt_malloc(2 * sizeof(struct pollfd) * mchanges);
117 if (changes == NULL) {
118 goto fail;
119 }
120
121 engine->u.devpoll.write_changes = changes;
122
123 engine->u.devpoll.events = nxt_malloc(sizeof(struct pollfd) * mevents);
124 if (engine->u.devpoll.events == NULL) {
125 goto fail;
126 }
127
128 engine->u.devpoll.fd = open("/dev/poll", O_RDWR);
129
130 if (engine->u.devpoll.fd == -1) {
131 nxt_alert(&engine->task, "open(\"/dev/poll\") failed %E", nxt_errno);
132 goto fail;
133 }
134
135 nxt_debug(&engine->task, "open(\"/dev/poll\"): %d", engine->u.devpoll.fd);
136
137 return NXT_OK;
138
139 fail:
140
141 nxt_devpoll_free(engine);
142
143 return NXT_ERROR;
144 }
145
146
147 static void
nxt_devpoll_free(nxt_event_engine_t * engine)148 nxt_devpoll_free(nxt_event_engine_t *engine)
149 {
150 nxt_fd_t fd;
151
152 fd = engine->u.devpoll.fd;
153
154 nxt_debug(&engine->task, "devpoll %d free", fd);
155
156 if (fd != -1 && close(fd) != 0) {
157 nxt_alert(&engine->task, "devpoll close(%d) failed %E", fd, nxt_errno);
158 }
159
160 nxt_free(engine->u.devpoll.events);
161 nxt_free(engine->u.devpoll.write_changes);
162 nxt_free(engine->u.devpoll.changes);
163 nxt_fd_event_hash_destroy(&engine->u.devpoll.fd_hash);
164
165 nxt_memzero(&engine->u.devpoll, sizeof(nxt_devpoll_engine_t));
166 }
167
168
169 static void
nxt_devpoll_enable(nxt_event_engine_t * engine,nxt_fd_event_t * ev)170 nxt_devpoll_enable(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
171 {
172 ev->read = NXT_EVENT_ACTIVE;
173 ev->write = NXT_EVENT_ACTIVE;
174
175 nxt_devpoll_change(engine, ev, NXT_DEVPOLL_ADD, POLLIN | POLLOUT);
176 }
177
178
179 static void
nxt_devpoll_disable(nxt_event_engine_t * engine,nxt_fd_event_t * ev)180 nxt_devpoll_disable(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
181 {
182 if (ev->read != NXT_EVENT_INACTIVE || ev->write != NXT_EVENT_INACTIVE) {
183
184 ev->read = NXT_EVENT_INACTIVE;
185 ev->write = NXT_EVENT_INACTIVE;
186
187 nxt_devpoll_change(engine, ev, NXT_DEVPOLL_DELETE, POLLREMOVE);
188 }
189 }
190
191
192 /*
193 * Solaris does not automatically remove a closed file descriptor from
194 * a "/dev/poll" set: ioctl(DP_ISPOLLED) for the descriptor returns 1,
195 * significative of active descriptor. POLLREMOVE can remove already
196 * closed file descriptor, so the removal can be batched, Solaris poll(7d):
197 *
198 * When using the "/dev/poll" driver, you should remove a closed file
199 * descriptor from a monitored poll set. Failure to do so may result
200 * in a POLLNVAL revents being returned for the closed file descriptor.
201 * When a file descriptor is closed but not removed from the monitored
202 * set, and is reused in subsequent open of a different device, you
203 * will be polling the device associated with the reused file descriptor.
204 * In a multithreaded application, careful coordination among threads
205 * doing close and DP_POLL ioctl is recommended for consistent results.
206 *
207 * Besides Solaris and HP-UX allow to add invalid descriptors to an
208 * "/dev/poll" set, although the descriptors are not marked as polled,
209 * that is, ioctl(DP_ISPOLLED) returns 0.
210 *
211 * HP-UX poll(7):
212 *
213 * When a polled file descriptor is closed, it is automatically
214 * deregistered.
215 */
216
217 static nxt_bool_t
nxt_devpoll_close(nxt_event_engine_t * engine,nxt_fd_event_t * ev)218 nxt_devpoll_close(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
219 {
220 nxt_devpoll_disable(engine, ev);
221
222 return ev->changing;
223 }
224
225
226 /*
227 * Solaris poll(7d):
228 *
229 * The fd field specifies the file descriptor being polled. The events
230 * field indicates the interested poll events on the file descriptor.
231 * If a pollfd array contains multiple pollfd entries with the same fd field,
232 * the "events" field in each pollfd entry is OR'ed. A special POLLREMOVE
233 * event in the events field of the pollfd structure removes the fd from
234 * the monitored set. The revents field is not used.
235 */
236
237 static void
nxt_devpoll_enable_read(nxt_event_engine_t * engine,nxt_fd_event_t * ev)238 nxt_devpoll_enable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
239 {
240 nxt_uint_t op, events;
241
242 if (ev->read != NXT_EVENT_BLOCKED) {
243
244 events = POLLIN;
245
246 if (ev->write == NXT_EVENT_INACTIVE) {
247 op = NXT_DEVPOLL_ADD;
248
249 } else if (ev->write == NXT_EVENT_BLOCKED) {
250 ev->write = NXT_EVENT_INACTIVE;
251 op = NXT_DEVPOLL_CHANGE;
252
253 } else {
254 op = NXT_DEVPOLL_UPDATE;
255 events = POLLIN | POLLOUT;
256 }
257
258 nxt_devpoll_change(engine, ev, op, events);
259 }
260
261 ev->read = NXT_EVENT_ACTIVE;
262 }
263
264
265 static void
nxt_devpoll_enable_write(nxt_event_engine_t * engine,nxt_fd_event_t * ev)266 nxt_devpoll_enable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
267 {
268 nxt_uint_t op, events;
269
270 if (ev->write != NXT_EVENT_BLOCKED) {
271
272 events = POLLOUT;
273
274 if (ev->read == NXT_EVENT_INACTIVE) {
275 op = NXT_DEVPOLL_ADD;
276
277 } else if (ev->read == NXT_EVENT_BLOCKED) {
278 ev->read = NXT_EVENT_INACTIVE;
279 op = NXT_DEVPOLL_CHANGE;
280
281 } else {
282 op = NXT_DEVPOLL_UPDATE;
283 events = POLLIN | POLLOUT;
284 }
285
286 nxt_devpoll_change(engine, ev, op, events);
287 }
288
289 ev->write = NXT_EVENT_ACTIVE;
290 }
291
292
293 static void
nxt_devpoll_disable_read(nxt_event_engine_t * engine,nxt_fd_event_t * ev)294 nxt_devpoll_disable_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
295 {
296 nxt_uint_t op, events;
297
298 ev->read = NXT_EVENT_INACTIVE;
299
300 if (ev->write <= NXT_EVENT_BLOCKED) {
301 ev->write = NXT_EVENT_INACTIVE;
302 op = NXT_DEVPOLL_DELETE;
303 events = POLLREMOVE;
304
305 } else {
306 op = NXT_DEVPOLL_CHANGE;
307 events = POLLOUT;
308 }
309
310 nxt_devpoll_change(engine, ev, op, events);
311 }
312
313
314 static void
nxt_devpoll_disable_write(nxt_event_engine_t * engine,nxt_fd_event_t * ev)315 nxt_devpoll_disable_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
316 {
317 nxt_uint_t op, events;
318
319 ev->write = NXT_EVENT_INACTIVE;
320
321 if (ev->read <= NXT_EVENT_BLOCKED) {
322 ev->read = NXT_EVENT_INACTIVE;
323 op = NXT_DEVPOLL_DELETE;
324 events = POLLREMOVE;
325
326 } else {
327 op = NXT_DEVPOLL_CHANGE;
328 events = POLLIN;
329 }
330
331 nxt_devpoll_change(engine, ev, op, events);
332 }
333
334
335 static void
nxt_devpoll_block_read(nxt_event_engine_t * engine,nxt_fd_event_t * ev)336 nxt_devpoll_block_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
337 {
338 if (ev->read != NXT_EVENT_INACTIVE) {
339 ev->read = NXT_EVENT_BLOCKED;
340 }
341 }
342
343
344 static void
nxt_devpoll_block_write(nxt_event_engine_t * engine,nxt_fd_event_t * ev)345 nxt_devpoll_block_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
346 {
347 if (ev->write != NXT_EVENT_INACTIVE) {
348 ev->write = NXT_EVENT_BLOCKED;
349 }
350 }
351
352
353 static void
nxt_devpoll_oneshot_read(nxt_event_engine_t * engine,nxt_fd_event_t * ev)354 nxt_devpoll_oneshot_read(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
355 {
356 nxt_devpoll_enable_read(engine, ev);
357
358 ev->read = NXT_EVENT_ONESHOT;
359 }
360
361
362 static void
nxt_devpoll_oneshot_write(nxt_event_engine_t * engine,nxt_fd_event_t * ev)363 nxt_devpoll_oneshot_write(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
364 {
365 nxt_devpoll_enable_write(engine, ev);
366
367 ev->write = NXT_EVENT_ONESHOT;
368 }
369
370
371 static void
nxt_devpoll_change(nxt_event_engine_t * engine,nxt_fd_event_t * ev,nxt_uint_t op,nxt_uint_t events)372 nxt_devpoll_change(nxt_event_engine_t *engine, nxt_fd_event_t *ev,
373 nxt_uint_t op, nxt_uint_t events)
374 {
375 nxt_devpoll_change_t *change;
376
377 nxt_debug(ev->task, "devpoll %d change fd:%d op:%ui ev:%04Xi",
378 engine->u.devpoll.fd, ev->fd, op, events);
379
380 if (engine->u.devpoll.nchanges >= engine->u.devpoll.mchanges) {
381 (void) nxt_devpoll_commit_changes(engine);
382 }
383
384 ev->changing = 1;
385
386 change = &engine->u.devpoll.changes[engine->u.devpoll.nchanges++];
387 change->op = op;
388 change->events = events;
389 change->event = ev;
390 }
391
392
393 static nxt_int_t
nxt_devpoll_commit_changes(nxt_event_engine_t * engine)394 nxt_devpoll_commit_changes(nxt_event_engine_t *engine)
395 {
396 size_t n;
397 nxt_int_t ret, retval;
398 struct pollfd *pfd, *write_changes;
399 nxt_fd_event_t *ev;
400 nxt_devpoll_change_t *change, *end;
401
402 nxt_debug(&engine->task, "devpoll %d changes:%ui",
403 engine->u.devpoll.fd, engine->u.devpoll.nchanges);
404
405 retval = NXT_OK;
406 n = 0;
407 write_changes = engine->u.devpoll.write_changes;
408 change = engine->u.devpoll.changes;
409 end = change + engine->u.devpoll.nchanges;
410
411 do {
412 ev = change->event;
413
414 nxt_debug(&engine->task, "devpoll fd:%d op:%d ev:%04Xd",
415 ev->fd, change->op, change->events);
416
417 if (change->op == NXT_DEVPOLL_CHANGE) {
418 pfd = &write_changes[n++];
419 pfd->fd = ev->fd;
420 pfd->events = POLLREMOVE;
421 pfd->revents = 0;
422 }
423
424 pfd = &write_changes[n++];
425 pfd->fd = ev->fd;
426 pfd->events = change->events;
427 pfd->revents = 0;
428
429 ev->changing = 0;
430
431 change++;
432
433 } while (change < end);
434
435 change = engine->u.devpoll.changes;
436 end = change + engine->u.devpoll.nchanges;
437
438 ret = nxt_devpoll_write(engine, write_changes, n);
439
440 if (nxt_slow_path(ret != NXT_OK)) {
441
442 do {
443 nxt_devpoll_change_error(engine, change->event);
444 change++;
445 } while (change < end);
446
447 engine->u.devpoll.nchanges = 0;
448
449 return NXT_ERROR;
450 }
451
452 do {
453 ev = change->event;
454
455 if (change->op == NXT_DEVPOLL_ADD) {
456 ret = nxt_fd_event_hash_add(&engine->u.devpoll.fd_hash, ev->fd, ev);
457
458 if (nxt_slow_path(ret != NXT_OK)) {
459 nxt_devpoll_change_error(engine, ev);
460 retval = NXT_ERROR;
461 }
462
463 } else if (change->op == NXT_DEVPOLL_DELETE) {
464 nxt_fd_event_hash_delete(&engine->task, &engine->u.devpoll.fd_hash,
465 ev->fd, 0);
466 }
467
468 /* Nothing tp do for NXT_DEVPOLL_UPDATE and NXT_DEVPOLL_CHANGE. */
469
470 change++;
471
472 } while (change < end);
473
474 engine->u.devpoll.nchanges = 0;
475
476 return retval;
477 }
478
479
480 static void
nxt_devpoll_change_error(nxt_event_engine_t * engine,nxt_fd_event_t * ev)481 nxt_devpoll_change_error(nxt_event_engine_t *engine, nxt_fd_event_t *ev)
482 {
483 ev->read = NXT_EVENT_INACTIVE;
484 ev->write = NXT_EVENT_INACTIVE;
485
486 nxt_work_queue_add(&engine->fast_work_queue, ev->error_handler,
487 ev->task, ev, ev->data);
488
489 nxt_fd_event_hash_delete(ev->task, &engine->u.devpoll.fd_hash, ev->fd, 1);
490
491 nxt_devpoll_remove(engine, ev->fd);
492 }
493
494
495 static void
nxt_devpoll_remove(nxt_event_engine_t * engine,nxt_fd_t fd)496 nxt_devpoll_remove(nxt_event_engine_t *engine, nxt_fd_t fd)
497 {
498 int n;
499 struct pollfd pfd;
500
501 pfd.fd = fd;
502 pfd.events = 0;
503 pfd.revents = 0;
504
505 n = ioctl(engine->u.devpoll.fd, DP_ISPOLLED, &pfd);
506
507 nxt_debug(&engine->task, "ioctl(%d, DP_ISPOLLED, %d): %d",
508 engine->u.devpoll.fd, fd, n);
509
510 if (n == 0) {
511 /* The file descriptor is not in the set. */
512 return;
513 }
514
515 if (n == -1) {
516 nxt_alert(&engine->task, "ioctl(%d, DP_ISPOLLED, %d) failed %E",
517 engine->u.devpoll.fd, fd, nxt_errno);
518 /* Fall through. */
519 }
520
521 /* n == 1: the file descriptor is in the set. */
522
523 nxt_debug(&engine->task, "devpoll %d remove fd:%d",
524 engine->u.devpoll.fd, fd);
525
526 pfd.fd = fd;
527 pfd.events = POLLREMOVE;
528 pfd.revents = 0;
529
530 nxt_devpoll_write(engine, &pfd, 1);
531 }
532
533
534 static nxt_int_t
nxt_devpoll_write(nxt_event_engine_t * engine,struct pollfd * pfd,size_t n)535 nxt_devpoll_write(nxt_event_engine_t *engine, struct pollfd *pfd, size_t n)
536 {
537 int fd;
538
539 fd = engine->u.devpoll.fd;
540
541 nxt_debug(&engine->task, "devpoll write(%d) changes:%uz", fd, n);
542
543 n *= sizeof(struct pollfd);
544
545 if (nxt_slow_path(write(fd, pfd, n) == (ssize_t) n)) {
546 return NXT_OK;
547 }
548
549 nxt_alert(&engine->task, "devpoll write(%d) failed %E", fd, nxt_errno);
550
551 return NXT_ERROR;
552 }
553
554
555 static void
nxt_devpoll_poll(nxt_event_engine_t * engine,nxt_msec_t timeout)556 nxt_devpoll_poll(nxt_event_engine_t *engine, nxt_msec_t timeout)
557 {
558 int nevents;
559 nxt_fd_t fd;
560 nxt_int_t i;
561 nxt_err_t err;
562 nxt_uint_t events, level;
563 struct dvpoll dvp;
564 struct pollfd *pfd;
565 nxt_fd_event_t *ev;
566
567 if (engine->u.devpoll.nchanges != 0) {
568 if (nxt_devpoll_commit_changes(engine) != NXT_OK) {
569 /* Error handlers have been enqueued on failure. */
570 timeout = 0;
571 }
572 }
573
574 nxt_debug(&engine->task, "ioctl(%d, DP_POLL) timeout:%M",
575 engine->u.devpoll.fd, timeout);
576
577 dvp.dp_fds = engine->u.devpoll.events;
578 dvp.dp_nfds = engine->u.devpoll.mevents;
579 dvp.dp_timeout = timeout;
580
581 nevents = ioctl(engine->u.devpoll.fd, DP_POLL, &dvp);
582
583 err = (nevents == -1) ? nxt_errno : 0;
584
585 nxt_thread_time_update(engine->task.thread);
586
587 nxt_debug(&engine->task, "ioctl(%d, DP_POLL): %d",
588 engine->u.devpoll.fd, nevents);
589
590 if (nevents == -1) {
591 level = (err == NXT_EINTR) ? NXT_LOG_INFO : NXT_LOG_ALERT;
592
593 nxt_log(&engine->task, level, "ioctl(%d, DP_POLL) failed %E",
594 engine->u.devpoll.fd, err);
595
596 return;
597 }
598
599 for (i = 0; i < nevents; i++) {
600
601 pfd = &engine->u.devpoll.events[i];
602 fd = pfd->fd;
603 events = pfd->revents;
604
605 ev = nxt_fd_event_hash_get(&engine->task, &engine->u.devpoll.fd_hash,
606 fd);
607
608 if (nxt_slow_path(ev == NULL)) {
609 nxt_alert(&engine->task,
610 "ioctl(%d, DP_POLL) returned invalid "
611 "fd:%d ev:%04Xd rev:%04uXi",
612 engine->u.devpoll.fd, fd, pfd->events, events);
613
614 nxt_devpoll_remove(engine, fd);
615 continue;
616 }
617
618 nxt_debug(ev->task, "devpoll: fd:%d ev:%04uXi rd:%d wr:%d",
619 fd, events, ev->read, ev->write);
620
621 if (nxt_slow_path(events & (POLLERR | POLLHUP | POLLNVAL)) != 0) {
622 nxt_alert(ev->task,
623 "ioctl(%d, DP_POLL) error fd:%d ev:%04Xd rev:%04uXi",
624 engine->u.devpoll.fd, fd, pfd->events, events);
625
626 nxt_work_queue_add(&engine->fast_work_queue, ev->error_handler,
627 ev->task, ev, ev->data);
628 continue;
629 }
630
631 if (events & POLLIN) {
632 ev->read_ready = 1;
633
634 if (ev->read != NXT_EVENT_BLOCKED) {
635 nxt_work_queue_add(ev->read_work_queue, ev->read_handler,
636 ev->task, ev, ev->data);
637 }
638
639 if (ev->read == NXT_EVENT_BLOCKED
640 || ev->read == NXT_EVENT_ONESHOT)
641 {
642 nxt_devpoll_disable_read(engine, ev);
643 }
644 }
645
646 if (events & POLLOUT) {
647 ev->write_ready = 1;
648
649 if (ev->write != NXT_EVENT_BLOCKED) {
650 nxt_work_queue_add(ev->write_work_queue, ev->write_handler,
651 ev->task, ev, ev->data);
652 }
653
654 if (ev->write == NXT_EVENT_BLOCKED
655 || ev->write == NXT_EVENT_ONESHOT)
656 {
657 nxt_devpoll_disable_write(engine, ev);
658 }
659 }
660 }
661 }
662