1 /** @file
2 
3   A brief file description
4 
5   @section license License
6 
7   Licensed to the Apache Software Foundation (ASF) under one
8   or more contributor license agreements.  See the NOTICE file
9   distributed with this work for additional information
10   regarding copyright ownership.  The ASF licenses this file
11   to you under the Apache License, Version 2.0 (the
12   "License"); you may not use this file except in compliance
13   with the License.  You may obtain a copy of the License at
14 
15       http://www.apache.org/licenses/LICENSE-2.0
16 
17   Unless required by applicable law or agreed to in writing, software
18   distributed under the License is distributed on an "AS IS" BASIS,
19   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20   See the License for the specific language governing permissions and
21   limitations under the License.
22  */
23 
24 /*
25  * Async Disk IO operations.
26  */
27 
28 #include <tscore/TSSystemState.h>
29 
30 #include "P_AIO.h"
31 
32 #if AIO_MODE == AIO_MODE_NATIVE
33 #define AIO_PERIOD -HRTIME_MSECONDS(10)
34 #else
35 
36 #define MAX_DISKS_POSSIBLE 100
37 
38 // globals
39 
40 int ts_config_with_inkdiskio = 0;
41 /* structure to hold information about each file descriptor */
42 AIO_Reqs *aio_reqs[MAX_DISKS_POSSIBLE];
43 /* number of unique file descriptors in the aio_reqs array */
44 int num_filedes = 1;
45 
46 // acquire this mutex before inserting a new entry in the aio_reqs array.
47 // Don't need to acquire this for searching the array
48 static ink_mutex insert_mutex;
49 
50 int thread_is_created = 0;
51 #endif // AIO_MODE == AIO_MODE_NATIVE
52 RecInt cache_config_threads_per_disk = 12;
53 RecInt api_config_threads_per_disk   = 12;
54 
55 RecRawStatBlock *aio_rsb      = nullptr;
56 Continuation *aio_err_callbck = nullptr;
57 // AIO Stats
58 uint64_t aio_num_read      = 0;
59 uint64_t aio_bytes_read    = 0;
60 uint64_t aio_num_write     = 0;
61 uint64_t aio_bytes_written = 0;
62 
63 /*
64  * Stats
65  */
66 
67 static int
aio_stats_cb(const char *,RecDataT data_type,RecData * data,RecRawStatBlock * rsb,int id)68 aio_stats_cb(const char * /* name ATS_UNUSED */, RecDataT data_type, RecData *data, RecRawStatBlock *rsb, int id)
69 {
70   (void)data_type;
71   (void)rsb;
72   int64_t new_val = 0;
73   int64_t diff    = 0;
74   int64_t count, sum;
75   ink_hrtime now = Thread::get_hrtime();
76   // The RecGetGlobalXXX stat functions are cheaper than the
77   // RecGetXXX functions. The Global ones are expensive
78   // for increments and decrements. But for AIO stats we
79   // only do Sets and Gets, so they are cheaper in our case.
80   RecGetGlobalRawStatSum(aio_rsb, id, &sum);
81   RecGetGlobalRawStatCount(aio_rsb, id, &count);
82 
83   int64_t time_diff = ink_hrtime_to_msec(now - count);
84   if (time_diff == 0) {
85     data->rec_float = 0.0;
86     return 0;
87   }
88   switch (id) {
89   case AIO_STAT_READ_PER_SEC:
90     new_val = aio_num_read;
91     break;
92   case AIO_STAT_WRITE_PER_SEC:
93     new_val = aio_num_write;
94     break;
95   case AIO_STAT_KB_READ_PER_SEC:
96     new_val = aio_bytes_read >> 10;
97     break;
98   case AIO_STAT_KB_WRITE_PER_SEC:
99     new_val = aio_bytes_written >> 10;
100     break;
101   default:
102     ink_assert(0);
103   }
104   diff = new_val - sum;
105   RecSetGlobalRawStatSum(aio_rsb, id, new_val);
106   RecSetGlobalRawStatCount(aio_rsb, id, now);
107   data->rec_float = static_cast<float>(diff) * 1000.00 / static_cast<float>(time_diff);
108   return 0;
109 }
110 
111 #ifdef AIO_STATS
112 /* total number of requests received - for debugging */
113 static int num_requests = 0;
114 /* performance results */
115 static AIOTestData *data;
116 
117 int
ink_aio_stats(int event,void * d)118 AIOTestData::ink_aio_stats(int event, void *d)
119 {
120   ink_hrtime now   = Thread::get_hrtime();
121   double time_msec = (double)(now - start) / (double)HRTIME_MSECOND;
122   int i            = (aio_reqs[0] == nullptr) ? 1 : 0;
123   for (; i < num_filedes; ++i)
124     printf("%0.2f\t%i\t%i\t%i\n", time_msec, aio_reqs[i]->filedes, aio_reqs[i]->pending, aio_reqs[i]->queued);
125   printf("Num Requests: %i Num Queued: %i num Moved: %i\n\n", data->num_req, data->num_queue, data->num_temp);
126   eventProcessor.schedule_in(this, HRTIME_MSECONDS(50), ET_CALL);
127   return EVENT_DONE;
128 }
129 
130 #endif // AIO_STATS
131 
132 /*
133  * Common
134  */
135 AIOCallback *
new_AIOCallback()136 new_AIOCallback()
137 {
138   return new AIOCallbackInternal;
139 }
140 
141 void
ink_aio_set_callback(Continuation * callback)142 ink_aio_set_callback(Continuation *callback)
143 {
144   aio_err_callbck = callback;
145 }
146 
147 void
ink_aio_init(ts::ModuleVersion v)148 ink_aio_init(ts::ModuleVersion v)
149 {
150   ink_release_assert(v.check(AIO_MODULE_INTERNAL_VERSION));
151 
152   aio_rsb = RecAllocateRawStatBlock(static_cast<int>(AIO_STAT_COUNT));
153   RecRegisterRawStat(aio_rsb, RECT_PROCESS, "proxy.process.cache.read_per_sec", RECD_FLOAT, RECP_PERSISTENT,
154                      (int)AIO_STAT_READ_PER_SEC, aio_stats_cb);
155   RecRegisterRawStat(aio_rsb, RECT_PROCESS, "proxy.process.cache.write_per_sec", RECD_FLOAT, RECP_PERSISTENT,
156                      (int)AIO_STAT_WRITE_PER_SEC, aio_stats_cb);
157   RecRegisterRawStat(aio_rsb, RECT_PROCESS, "proxy.process.cache.KB_read_per_sec", RECD_FLOAT, RECP_PERSISTENT,
158                      (int)AIO_STAT_KB_READ_PER_SEC, aio_stats_cb);
159   RecRegisterRawStat(aio_rsb, RECT_PROCESS, "proxy.process.cache.KB_write_per_sec", RECD_FLOAT, RECP_PERSISTENT,
160                      (int)AIO_STAT_KB_WRITE_PER_SEC, aio_stats_cb);
161 #if AIO_MODE != AIO_MODE_NATIVE
162   memset(&aio_reqs, 0, MAX_DISKS_POSSIBLE * sizeof(AIO_Reqs *));
163   ink_mutex_init(&insert_mutex);
164 #endif
165   REC_ReadConfigInteger(cache_config_threads_per_disk, "proxy.config.cache.threads_per_disk");
166 #if TS_USE_LINUX_NATIVE_AIO
167   Warning("Running with Linux AIO, there are known issues with this feature");
168 #endif
169 }
170 
171 int
ink_aio_start()172 ink_aio_start()
173 {
174 #ifdef AIO_STATS
175   data = new AIOTestData();
176   eventProcessor.schedule_in(data, HRTIME_MSECONDS(100), ET_CALL);
177 #endif
178   return 0;
179 }
180 
181 #if AIO_MODE != AIO_MODE_NATIVE
182 
183 static void *aio_thread_main(void *arg);
184 
185 struct AIOThreadInfo : public Continuation {
186   AIO_Reqs *req;
187   int sleep_wait;
188 
189   int
startAIOThreadInfo190   start(int event, Event *e)
191   {
192     (void)event;
193     (void)e;
194 #if TS_USE_HWLOC
195 #if HWLOC_API_VERSION >= 0x20000
196     hwloc_set_membind(ink_get_topology(), hwloc_topology_get_topology_nodeset(ink_get_topology()), HWLOC_MEMBIND_INTERLEAVE,
197                       HWLOC_MEMBIND_THREAD | HWLOC_MEMBIND_BYNODESET);
198 #else
199     hwloc_set_membind_nodeset(ink_get_topology(), hwloc_topology_get_topology_nodeset(ink_get_topology()), HWLOC_MEMBIND_INTERLEAVE,
200                               HWLOC_MEMBIND_THREAD);
201 #endif
202 #endif
203     aio_thread_main(this);
204     delete this;
205     return EVENT_DONE;
206   }
207 
AIOThreadInfoAIOThreadInfo208   AIOThreadInfo(AIO_Reqs *thr_req, int sleep) : Continuation(new_ProxyMutex()), req(thr_req), sleep_wait(sleep)
209   {
210     SET_HANDLER(&AIOThreadInfo::start);
211   }
212 };
213 
214 /*
215   A dedicated number of threads (THREADS_PER_DISK) wait on the condition
216   variable associated with the file descriptor. The cache threads try to put
217   the request in the appropriate queue. If they fail to acquire the lock, they
218   put the request in the atomic list.
219  */
220 
221 /* insert  an entry for file descriptor fildes into aio_reqs */
222 static AIO_Reqs *
aio_init_fildes(int fildes,int fromAPI=0)223 aio_init_fildes(int fildes, int fromAPI = 0)
224 {
225   char thr_name[MAX_THREAD_NAME_LENGTH];
226   int i;
227   AIO_Reqs *request = new AIO_Reqs;
228 
229   INK_WRITE_MEMORY_BARRIER;
230 
231   ink_cond_init(&request->aio_cond);
232   ink_mutex_init(&request->aio_mutex);
233 
234   RecInt thread_num;
235 
236   if (fromAPI) {
237     request->index    = 0;
238     request->filedes  = -1;
239     aio_reqs[0]       = request;
240     thread_is_created = 1;
241     thread_num        = api_config_threads_per_disk;
242   } else {
243     request->index        = num_filedes;
244     request->filedes      = fildes;
245     aio_reqs[num_filedes] = request;
246     thread_num            = cache_config_threads_per_disk;
247   }
248 
249   /* create the main thread */
250   AIOThreadInfo *thr_info;
251   size_t stacksize;
252 
253   REC_ReadConfigInteger(stacksize, "proxy.config.thread.default.stacksize");
254   for (i = 0; i < thread_num; i++) {
255     if (i == (thread_num - 1)) {
256       thr_info = new AIOThreadInfo(request, 1);
257     } else {
258       thr_info = new AIOThreadInfo(request, 0);
259     }
260     snprintf(thr_name, MAX_THREAD_NAME_LENGTH, "[ET_AIO %d:%d]", i, fildes);
261     ink_assert(eventProcessor.spawn_thread(thr_info, thr_name, stacksize));
262   }
263 
264   /* the num_filedes should be incremented after initializing everything.
265      This prevents a thread from looking at uninitialized fields */
266   if (!fromAPI) {
267     num_filedes++;
268   }
269   return request;
270 }
271 
272 /* insert a request into aio_todo queue. */
273 static void
aio_insert(AIOCallback * op,AIO_Reqs * req)274 aio_insert(AIOCallback *op, AIO_Reqs *req)
275 {
276 #ifdef AIO_STATS
277   num_requests++;
278   req->queued++;
279 #endif
280   req->aio_todo.enqueue(op);
281 }
282 
283 /* move the request from the atomic list to the queue */
284 static void
aio_move(AIO_Reqs * req)285 aio_move(AIO_Reqs *req)
286 {
287   if (req->aio_temp_list.empty()) {
288     return;
289   }
290 
291   AIOCallbackInternal *cbi;
292   SList(AIOCallbackInternal, alink) aq(req->aio_temp_list.popall());
293 
294   // flip the list
295   Queue<AIOCallback> cbq;
296   while ((cbi = aq.pop())) {
297     cbq.push(cbi);
298   }
299 
300   AIOCallback *cb;
301   while ((cb = cbq.pop())) {
302     aio_insert(cb, req);
303   }
304 }
305 
306 /* queue the new request */
307 static void
aio_queue_req(AIOCallbackInternal * op,int fromAPI=0)308 aio_queue_req(AIOCallbackInternal *op, int fromAPI = 0)
309 {
310   int thread_ndx = 1;
311   AIO_Reqs *req  = op->aio_req;
312   op->link.next  = nullptr;
313   op->link.prev  = nullptr;
314 #ifdef AIO_STATS
315   ink_atomic_increment((int *)&data->num_req, 1);
316 #endif
317   if (!fromAPI && (!req || req->filedes != op->aiocb.aio_fildes)) {
318     /* search for the matching file descriptor */
319     for (; thread_ndx < num_filedes; thread_ndx++) {
320       if (aio_reqs[thread_ndx]->filedes == op->aiocb.aio_fildes) {
321         /* found the matching file descriptor */
322         req = aio_reqs[thread_ndx];
323         break;
324       }
325     }
326     if (!req) {
327       ink_mutex_acquire(&insert_mutex);
328       if (thread_ndx == num_filedes) {
329         /* insert a new entry */
330         req = aio_init_fildes(op->aiocb.aio_fildes);
331       } else {
332         /* a new entry was inserted between the time we checked the
333            aio_reqs and acquired the mutex. check the aio_reqs array to
334            make sure the entry inserted does not correspond  to the current
335            file descriptor */
336         for (thread_ndx = 1; thread_ndx < num_filedes; thread_ndx++) {
337           if (aio_reqs[thread_ndx]->filedes == op->aiocb.aio_fildes) {
338             req = aio_reqs[thread_ndx];
339             break;
340           }
341         }
342         if (!req) {
343           req = aio_init_fildes(op->aiocb.aio_fildes);
344         }
345       }
346       ink_mutex_release(&insert_mutex);
347     }
348     op->aio_req = req;
349   }
350   if (fromAPI && (!req || req->filedes != -1)) {
351     ink_mutex_acquire(&insert_mutex);
352     if (aio_reqs[0] == nullptr) {
353       req = aio_init_fildes(-1, 1);
354     } else {
355       req = aio_reqs[0];
356     }
357     ink_mutex_release(&insert_mutex);
358     op->aio_req = req;
359   }
360   ink_atomic_increment(&req->requests_queued, 1);
361   if (!ink_mutex_try_acquire(&req->aio_mutex)) {
362 #ifdef AIO_STATS
363     ink_atomic_increment(&data->num_temp, 1);
364 #endif
365     req->aio_temp_list.push(op);
366   } else {
367 /* check if any pending requests on the atomic list */
368 #ifdef AIO_STATS
369     ink_atomic_increment(&data->num_queue, 1);
370 #endif
371     aio_move(req);
372     /* now put the new request */
373     aio_insert(op, req);
374     ink_cond_signal(&req->aio_cond);
375     ink_mutex_release(&req->aio_mutex);
376   }
377 }
378 
379 static inline int
cache_op(AIOCallbackInternal * op)380 cache_op(AIOCallbackInternal *op)
381 {
382   bool read = (op->aiocb.aio_lio_opcode == LIO_READ);
383   for (; op; op = (AIOCallbackInternal *)op->then) {
384     ink_aiocb *a = &op->aiocb;
385     ssize_t err, res = 0;
386 
387     while (a->aio_nbytes - res > 0) {
388       do {
389         if (read) {
390           err = pread(a->aio_fildes, (static_cast<char *>(a->aio_buf)) + res, a->aio_nbytes - res, a->aio_offset + res);
391         } else {
392           err = pwrite(a->aio_fildes, (static_cast<char *>(a->aio_buf)) + res, a->aio_nbytes - res, a->aio_offset + res);
393         }
394       } while ((err < 0) && (errno == EINTR || errno == ENOBUFS || errno == ENOMEM));
395       if (err <= 0) {
396         Warning("cache disk operation failed %s %zd %d\n", (a->aio_lio_opcode == LIO_READ) ? "READ" : "WRITE", err, errno);
397         op->aio_result = -errno;
398         return (err);
399       }
400       res += err;
401     }
402     op->aio_result = res;
403     ink_assert(op->aio_result == (int64_t)a->aio_nbytes);
404   }
405   return 1;
406 }
407 
408 int
ink_aio_read(AIOCallback * op,int fromAPI)409 ink_aio_read(AIOCallback *op, int fromAPI)
410 {
411   op->aiocb.aio_lio_opcode = LIO_READ;
412   aio_queue_req((AIOCallbackInternal *)op, fromAPI);
413 
414   return 1;
415 }
416 
417 int
ink_aio_write(AIOCallback * op,int fromAPI)418 ink_aio_write(AIOCallback *op, int fromAPI)
419 {
420   op->aiocb.aio_lio_opcode = LIO_WRITE;
421   aio_queue_req((AIOCallbackInternal *)op, fromAPI);
422 
423   return 1;
424 }
425 
426 bool
ink_aio_thread_num_set(int thread_num)427 ink_aio_thread_num_set(int thread_num)
428 {
429   if (thread_num > 0 && !thread_is_created) {
430     api_config_threads_per_disk = thread_num;
431     return true;
432   }
433 
434   return false;
435 }
436 
437 void *
aio_thread_main(void * arg)438 aio_thread_main(void *arg)
439 {
440   AIOThreadInfo *thr_info = static_cast<AIOThreadInfo *>(arg);
441   AIO_Reqs *my_aio_req    = thr_info->req;
442   AIO_Reqs *current_req   = nullptr;
443   AIOCallback *op         = nullptr;
444   ink_mutex_acquire(&my_aio_req->aio_mutex);
445   for (;;) {
446     do {
447       if (TSSystemState::is_event_system_shut_down()) {
448         ink_mutex_release(&my_aio_req->aio_mutex);
449         return nullptr;
450       }
451       current_req = my_aio_req;
452       /* check if any pending requests on the atomic list */
453       aio_move(my_aio_req);
454       if (!(op = my_aio_req->aio_todo.pop())) {
455         break;
456       }
457 #ifdef AIO_STATS
458       num_requests--;
459       current_req->queued--;
460       ink_atomic_increment((int *)&current_req->pending, 1);
461 #endif
462       // update the stats;
463       if (op->aiocb.aio_lio_opcode == LIO_WRITE) {
464         aio_num_write++;
465         aio_bytes_written += op->aiocb.aio_nbytes;
466       } else {
467         aio_num_read++;
468         aio_bytes_read += op->aiocb.aio_nbytes;
469       }
470       ink_mutex_release(&current_req->aio_mutex);
471       cache_op((AIOCallbackInternal *)op);
472       ink_atomic_increment(&current_req->requests_queued, -1);
473 #ifdef AIO_STATS
474       ink_atomic_increment((int *)&current_req->pending, -1);
475 #endif
476       op->link.prev = nullptr;
477       op->link.next = nullptr;
478       op->mutex     = op->action.mutex;
479       if (op->thread == AIO_CALLBACK_THREAD_AIO) {
480         SCOPED_MUTEX_LOCK(lock, op->mutex, thr_info->mutex->thread_holding);
481         op->handleEvent(EVENT_NONE, nullptr);
482       } else if (op->thread == AIO_CALLBACK_THREAD_ANY) {
483         eventProcessor.schedule_imm(op);
484       } else {
485         op->thread->schedule_imm(op);
486       }
487       ink_mutex_acquire(&my_aio_req->aio_mutex);
488     } while (true);
489     timespec timedwait_msec = ink_hrtime_to_timespec(Thread::get_hrtime_updated() + HRTIME_MSECONDS(net_config_poll_timeout));
490     ink_cond_timedwait(&my_aio_req->aio_cond, &my_aio_req->aio_mutex, &timedwait_msec);
491   }
492   return nullptr;
493 }
494 #else
495 int
startAIOEvent(int,Event * e)496 DiskHandler::startAIOEvent(int /* event ATS_UNUSED */, Event *e)
497 {
498   SET_HANDLER(&DiskHandler::mainAIOEvent);
499   e->schedule_every(AIO_PERIOD);
500   trigger_event = e;
501   return EVENT_CONT;
502 }
503 
504 int
mainAIOEvent(int event,Event * e)505 DiskHandler::mainAIOEvent(int event, Event *e)
506 {
507   AIOCallback *op = nullptr;
508 Lagain:
509   int ret = io_getevents(ctx, 0, MAX_AIO_EVENTS, events, nullptr);
510   for (int i = 0; i < ret; i++) {
511     op             = (AIOCallback *)events[i].data;
512     op->aio_result = events[i].res;
513     ink_assert(op->action.continuation);
514     complete_list.enqueue(op);
515   }
516 
517   if (ret == MAX_AIO_EVENTS) {
518     goto Lagain;
519   }
520 
521   if (ret < 0) {
522     if (errno == EINTR)
523       goto Lagain;
524     if (errno == EFAULT || errno == ENOSYS)
525       Debug("aio", "io_getevents failed: %s (%d)", strerror(-ret), -ret);
526   }
527 
528   ink_aiocb *cbs[MAX_AIO_EVENTS];
529   int num = 0;
530 
531   for (; num < MAX_AIO_EVENTS && ((op = ready_list.dequeue()) != nullptr); ++num) {
532     cbs[num] = &op->aiocb;
533     ink_assert(op->action.continuation);
534   }
535 
536   if (num > 0) {
537     int ret;
538     do {
539       ret = io_submit(ctx, num, cbs);
540     } while (ret < 0 && ret == -EAGAIN);
541 
542     if (ret != num) {
543       if (ret < 0) {
544         Debug("aio", "io_submit failed: %s (%d)", strerror(-ret), -ret);
545       } else {
546         Fatal("could not submit IOs, io_submit(%p, %d, %p) returned %d", ctx, num, cbs, ret);
547       }
548     }
549   }
550 
551   while ((op = complete_list.dequeue()) != nullptr) {
552     op->mutex = op->action.mutex;
553     MUTEX_TRY_LOCK(lock, op->mutex, trigger_event->ethread);
554     if (!lock.is_locked()) {
555       trigger_event->ethread->schedule_imm(op);
556     } else {
557       op->handleEvent(EVENT_NONE, nullptr);
558     }
559   }
560   return EVENT_CONT;
561 }
562 
563 int
ink_aio_read(AIOCallback * op,int)564 ink_aio_read(AIOCallback *op, int /* fromAPI ATS_UNUSED */)
565 {
566   op->aiocb.aio_lio_opcode = IO_CMD_PREAD;
567   op->aiocb.data           = op;
568   EThread *t               = this_ethread();
569 #ifdef HAVE_EVENTFD
570   io_set_eventfd(&op->aiocb, t->evfd);
571 #endif
572   t->diskHandler->ready_list.enqueue(op);
573 
574   return 1;
575 }
576 
577 int
ink_aio_write(AIOCallback * op,int)578 ink_aio_write(AIOCallback *op, int /* fromAPI ATS_UNUSED */)
579 {
580   op->aiocb.aio_lio_opcode = IO_CMD_PWRITE;
581   op->aiocb.data           = op;
582   EThread *t               = this_ethread();
583 #ifdef HAVE_EVENTFD
584   io_set_eventfd(&op->aiocb, t->evfd);
585 #endif
586   t->diskHandler->ready_list.enqueue(op);
587 
588   return 1;
589 }
590 
591 int
ink_aio_readv(AIOCallback * op,int)592 ink_aio_readv(AIOCallback *op, int /* fromAPI ATS_UNUSED */)
593 {
594   EThread *t      = this_ethread();
595   DiskHandler *dh = t->diskHandler;
596   AIOCallback *io = op;
597   int sz          = 0;
598 
599   while (io) {
600     io->aiocb.aio_lio_opcode = IO_CMD_PREAD;
601     io->aiocb.data           = io;
602 #ifdef HAVE_EVENTFD
603     io_set_eventfd(&op->aiocb, t->evfd);
604 #endif
605     dh->ready_list.enqueue(io);
606     ++sz;
607     io = io->then;
608   }
609 
610   if (sz > 1) {
611     ink_assert(op->action.continuation);
612     AIOVec *vec = new AIOVec(sz, op);
613     while (--sz >= 0) {
614       op->action = vec;
615       op         = op->then;
616     }
617   }
618   return 1;
619 }
620 
621 int
ink_aio_writev(AIOCallback * op,int)622 ink_aio_writev(AIOCallback *op, int /* fromAPI ATS_UNUSED */)
623 {
624   EThread *t      = this_ethread();
625   DiskHandler *dh = t->diskHandler;
626   AIOCallback *io = op;
627   int sz          = 0;
628 
629   while (io) {
630     io->aiocb.aio_lio_opcode = IO_CMD_PWRITE;
631     io->aiocb.data           = io;
632 #ifdef HAVE_EVENTFD
633     io_set_eventfd(&op->aiocb, t->evfd);
634 #endif
635     dh->ready_list.enqueue(io);
636     ++sz;
637     io = io->then;
638   }
639 
640   if (sz > 1) {
641     ink_assert(op->action.continuation);
642     AIOVec *vec = new AIOVec(sz, op);
643     while (--sz >= 0) {
644       op->action = vec;
645       op         = op->then;
646     }
647   }
648   return 1;
649 }
650 #endif // AIO_MODE != AIO_MODE_NATIVE
651