1 /** @file
2
3 A brief file description
4
5 @section license License
6
7 Licensed to the Apache Software Foundation (ASF) under one
8 or more contributor license agreements. See the NOTICE file
9 distributed with this work for additional information
10 regarding copyright ownership. The ASF licenses this file
11 to you under the Apache License, Version 2.0 (the
12 "License"); you may not use this file except in compliance
13 with the License. You may obtain a copy of the License at
14
15 http://www.apache.org/licenses/LICENSE-2.0
16
17 Unless required by applicable law or agreed to in writing, software
18 distributed under the License is distributed on an "AS IS" BASIS,
19 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20 See the License for the specific language governing permissions and
21 limitations under the License.
22 */
23
24 /*
25 * Async Disk IO operations.
26 */
27
28 #include <tscore/TSSystemState.h>
29
30 #include "P_AIO.h"
31
32 #if AIO_MODE == AIO_MODE_NATIVE
33 #define AIO_PERIOD -HRTIME_MSECONDS(10)
34 #else
35
36 #define MAX_DISKS_POSSIBLE 100
37
38 // globals
39
40 int ts_config_with_inkdiskio = 0;
41 /* structure to hold information about each file descriptor */
42 AIO_Reqs *aio_reqs[MAX_DISKS_POSSIBLE];
43 /* number of unique file descriptors in the aio_reqs array */
44 int num_filedes = 1;
45
46 // acquire this mutex before inserting a new entry in the aio_reqs array.
47 // Don't need to acquire this for searching the array
48 static ink_mutex insert_mutex;
49
50 int thread_is_created = 0;
51 #endif // AIO_MODE == AIO_MODE_NATIVE
52 RecInt cache_config_threads_per_disk = 12;
53 RecInt api_config_threads_per_disk = 12;
54
55 RecRawStatBlock *aio_rsb = nullptr;
56 Continuation *aio_err_callbck = nullptr;
57 // AIO Stats
58 uint64_t aio_num_read = 0;
59 uint64_t aio_bytes_read = 0;
60 uint64_t aio_num_write = 0;
61 uint64_t aio_bytes_written = 0;
62
63 /*
64 * Stats
65 */
66
67 static int
aio_stats_cb(const char *,RecDataT data_type,RecData * data,RecRawStatBlock * rsb,int id)68 aio_stats_cb(const char * /* name ATS_UNUSED */, RecDataT data_type, RecData *data, RecRawStatBlock *rsb, int id)
69 {
70 (void)data_type;
71 (void)rsb;
72 int64_t new_val = 0;
73 int64_t diff = 0;
74 int64_t count, sum;
75 ink_hrtime now = Thread::get_hrtime();
76 // The RecGetGlobalXXX stat functions are cheaper than the
77 // RecGetXXX functions. The Global ones are expensive
78 // for increments and decrements. But for AIO stats we
79 // only do Sets and Gets, so they are cheaper in our case.
80 RecGetGlobalRawStatSum(aio_rsb, id, &sum);
81 RecGetGlobalRawStatCount(aio_rsb, id, &count);
82
83 int64_t time_diff = ink_hrtime_to_msec(now - count);
84 if (time_diff == 0) {
85 data->rec_float = 0.0;
86 return 0;
87 }
88 switch (id) {
89 case AIO_STAT_READ_PER_SEC:
90 new_val = aio_num_read;
91 break;
92 case AIO_STAT_WRITE_PER_SEC:
93 new_val = aio_num_write;
94 break;
95 case AIO_STAT_KB_READ_PER_SEC:
96 new_val = aio_bytes_read >> 10;
97 break;
98 case AIO_STAT_KB_WRITE_PER_SEC:
99 new_val = aio_bytes_written >> 10;
100 break;
101 default:
102 ink_assert(0);
103 }
104 diff = new_val - sum;
105 RecSetGlobalRawStatSum(aio_rsb, id, new_val);
106 RecSetGlobalRawStatCount(aio_rsb, id, now);
107 data->rec_float = static_cast<float>(diff) * 1000.00 / static_cast<float>(time_diff);
108 return 0;
109 }
110
111 #ifdef AIO_STATS
112 /* total number of requests received - for debugging */
113 static int num_requests = 0;
114 /* performance results */
115 static AIOTestData *data;
116
117 int
ink_aio_stats(int event,void * d)118 AIOTestData::ink_aio_stats(int event, void *d)
119 {
120 ink_hrtime now = Thread::get_hrtime();
121 double time_msec = (double)(now - start) / (double)HRTIME_MSECOND;
122 int i = (aio_reqs[0] == nullptr) ? 1 : 0;
123 for (; i < num_filedes; ++i)
124 printf("%0.2f\t%i\t%i\t%i\n", time_msec, aio_reqs[i]->filedes, aio_reqs[i]->pending, aio_reqs[i]->queued);
125 printf("Num Requests: %i Num Queued: %i num Moved: %i\n\n", data->num_req, data->num_queue, data->num_temp);
126 eventProcessor.schedule_in(this, HRTIME_MSECONDS(50), ET_CALL);
127 return EVENT_DONE;
128 }
129
130 #endif // AIO_STATS
131
132 /*
133 * Common
134 */
135 AIOCallback *
new_AIOCallback()136 new_AIOCallback()
137 {
138 return new AIOCallbackInternal;
139 }
140
141 void
ink_aio_set_callback(Continuation * callback)142 ink_aio_set_callback(Continuation *callback)
143 {
144 aio_err_callbck = callback;
145 }
146
147 void
ink_aio_init(ts::ModuleVersion v)148 ink_aio_init(ts::ModuleVersion v)
149 {
150 ink_release_assert(v.check(AIO_MODULE_INTERNAL_VERSION));
151
152 aio_rsb = RecAllocateRawStatBlock(static_cast<int>(AIO_STAT_COUNT));
153 RecRegisterRawStat(aio_rsb, RECT_PROCESS, "proxy.process.cache.read_per_sec", RECD_FLOAT, RECP_PERSISTENT,
154 (int)AIO_STAT_READ_PER_SEC, aio_stats_cb);
155 RecRegisterRawStat(aio_rsb, RECT_PROCESS, "proxy.process.cache.write_per_sec", RECD_FLOAT, RECP_PERSISTENT,
156 (int)AIO_STAT_WRITE_PER_SEC, aio_stats_cb);
157 RecRegisterRawStat(aio_rsb, RECT_PROCESS, "proxy.process.cache.KB_read_per_sec", RECD_FLOAT, RECP_PERSISTENT,
158 (int)AIO_STAT_KB_READ_PER_SEC, aio_stats_cb);
159 RecRegisterRawStat(aio_rsb, RECT_PROCESS, "proxy.process.cache.KB_write_per_sec", RECD_FLOAT, RECP_PERSISTENT,
160 (int)AIO_STAT_KB_WRITE_PER_SEC, aio_stats_cb);
161 #if AIO_MODE != AIO_MODE_NATIVE
162 memset(&aio_reqs, 0, MAX_DISKS_POSSIBLE * sizeof(AIO_Reqs *));
163 ink_mutex_init(&insert_mutex);
164 #endif
165 REC_ReadConfigInteger(cache_config_threads_per_disk, "proxy.config.cache.threads_per_disk");
166 #if TS_USE_LINUX_NATIVE_AIO
167 Warning("Running with Linux AIO, there are known issues with this feature");
168 #endif
169 }
170
171 int
ink_aio_start()172 ink_aio_start()
173 {
174 #ifdef AIO_STATS
175 data = new AIOTestData();
176 eventProcessor.schedule_in(data, HRTIME_MSECONDS(100), ET_CALL);
177 #endif
178 return 0;
179 }
180
181 #if AIO_MODE != AIO_MODE_NATIVE
182
183 static void *aio_thread_main(void *arg);
184
185 struct AIOThreadInfo : public Continuation {
186 AIO_Reqs *req;
187 int sleep_wait;
188
189 int
startAIOThreadInfo190 start(int event, Event *e)
191 {
192 (void)event;
193 (void)e;
194 #if TS_USE_HWLOC
195 #if HWLOC_API_VERSION >= 0x20000
196 hwloc_set_membind(ink_get_topology(), hwloc_topology_get_topology_nodeset(ink_get_topology()), HWLOC_MEMBIND_INTERLEAVE,
197 HWLOC_MEMBIND_THREAD | HWLOC_MEMBIND_BYNODESET);
198 #else
199 hwloc_set_membind_nodeset(ink_get_topology(), hwloc_topology_get_topology_nodeset(ink_get_topology()), HWLOC_MEMBIND_INTERLEAVE,
200 HWLOC_MEMBIND_THREAD);
201 #endif
202 #endif
203 aio_thread_main(this);
204 delete this;
205 return EVENT_DONE;
206 }
207
AIOThreadInfoAIOThreadInfo208 AIOThreadInfo(AIO_Reqs *thr_req, int sleep) : Continuation(new_ProxyMutex()), req(thr_req), sleep_wait(sleep)
209 {
210 SET_HANDLER(&AIOThreadInfo::start);
211 }
212 };
213
214 /*
215 A dedicated number of threads (THREADS_PER_DISK) wait on the condition
216 variable associated with the file descriptor. The cache threads try to put
217 the request in the appropriate queue. If they fail to acquire the lock, they
218 put the request in the atomic list.
219 */
220
221 /* insert an entry for file descriptor fildes into aio_reqs */
222 static AIO_Reqs *
aio_init_fildes(int fildes,int fromAPI=0)223 aio_init_fildes(int fildes, int fromAPI = 0)
224 {
225 char thr_name[MAX_THREAD_NAME_LENGTH];
226 int i;
227 AIO_Reqs *request = new AIO_Reqs;
228
229 INK_WRITE_MEMORY_BARRIER;
230
231 ink_cond_init(&request->aio_cond);
232 ink_mutex_init(&request->aio_mutex);
233
234 RecInt thread_num;
235
236 if (fromAPI) {
237 request->index = 0;
238 request->filedes = -1;
239 aio_reqs[0] = request;
240 thread_is_created = 1;
241 thread_num = api_config_threads_per_disk;
242 } else {
243 request->index = num_filedes;
244 request->filedes = fildes;
245 aio_reqs[num_filedes] = request;
246 thread_num = cache_config_threads_per_disk;
247 }
248
249 /* create the main thread */
250 AIOThreadInfo *thr_info;
251 size_t stacksize;
252
253 REC_ReadConfigInteger(stacksize, "proxy.config.thread.default.stacksize");
254 for (i = 0; i < thread_num; i++) {
255 if (i == (thread_num - 1)) {
256 thr_info = new AIOThreadInfo(request, 1);
257 } else {
258 thr_info = new AIOThreadInfo(request, 0);
259 }
260 snprintf(thr_name, MAX_THREAD_NAME_LENGTH, "[ET_AIO %d:%d]", i, fildes);
261 ink_assert(eventProcessor.spawn_thread(thr_info, thr_name, stacksize));
262 }
263
264 /* the num_filedes should be incremented after initializing everything.
265 This prevents a thread from looking at uninitialized fields */
266 if (!fromAPI) {
267 num_filedes++;
268 }
269 return request;
270 }
271
272 /* insert a request into aio_todo queue. */
273 static void
aio_insert(AIOCallback * op,AIO_Reqs * req)274 aio_insert(AIOCallback *op, AIO_Reqs *req)
275 {
276 #ifdef AIO_STATS
277 num_requests++;
278 req->queued++;
279 #endif
280 req->aio_todo.enqueue(op);
281 }
282
283 /* move the request from the atomic list to the queue */
284 static void
aio_move(AIO_Reqs * req)285 aio_move(AIO_Reqs *req)
286 {
287 if (req->aio_temp_list.empty()) {
288 return;
289 }
290
291 AIOCallbackInternal *cbi;
292 SList(AIOCallbackInternal, alink) aq(req->aio_temp_list.popall());
293
294 // flip the list
295 Queue<AIOCallback> cbq;
296 while ((cbi = aq.pop())) {
297 cbq.push(cbi);
298 }
299
300 AIOCallback *cb;
301 while ((cb = cbq.pop())) {
302 aio_insert(cb, req);
303 }
304 }
305
306 /* queue the new request */
307 static void
aio_queue_req(AIOCallbackInternal * op,int fromAPI=0)308 aio_queue_req(AIOCallbackInternal *op, int fromAPI = 0)
309 {
310 int thread_ndx = 1;
311 AIO_Reqs *req = op->aio_req;
312 op->link.next = nullptr;
313 op->link.prev = nullptr;
314 #ifdef AIO_STATS
315 ink_atomic_increment((int *)&data->num_req, 1);
316 #endif
317 if (!fromAPI && (!req || req->filedes != op->aiocb.aio_fildes)) {
318 /* search for the matching file descriptor */
319 for (; thread_ndx < num_filedes; thread_ndx++) {
320 if (aio_reqs[thread_ndx]->filedes == op->aiocb.aio_fildes) {
321 /* found the matching file descriptor */
322 req = aio_reqs[thread_ndx];
323 break;
324 }
325 }
326 if (!req) {
327 ink_mutex_acquire(&insert_mutex);
328 if (thread_ndx == num_filedes) {
329 /* insert a new entry */
330 req = aio_init_fildes(op->aiocb.aio_fildes);
331 } else {
332 /* a new entry was inserted between the time we checked the
333 aio_reqs and acquired the mutex. check the aio_reqs array to
334 make sure the entry inserted does not correspond to the current
335 file descriptor */
336 for (thread_ndx = 1; thread_ndx < num_filedes; thread_ndx++) {
337 if (aio_reqs[thread_ndx]->filedes == op->aiocb.aio_fildes) {
338 req = aio_reqs[thread_ndx];
339 break;
340 }
341 }
342 if (!req) {
343 req = aio_init_fildes(op->aiocb.aio_fildes);
344 }
345 }
346 ink_mutex_release(&insert_mutex);
347 }
348 op->aio_req = req;
349 }
350 if (fromAPI && (!req || req->filedes != -1)) {
351 ink_mutex_acquire(&insert_mutex);
352 if (aio_reqs[0] == nullptr) {
353 req = aio_init_fildes(-1, 1);
354 } else {
355 req = aio_reqs[0];
356 }
357 ink_mutex_release(&insert_mutex);
358 op->aio_req = req;
359 }
360 ink_atomic_increment(&req->requests_queued, 1);
361 if (!ink_mutex_try_acquire(&req->aio_mutex)) {
362 #ifdef AIO_STATS
363 ink_atomic_increment(&data->num_temp, 1);
364 #endif
365 req->aio_temp_list.push(op);
366 } else {
367 /* check if any pending requests on the atomic list */
368 #ifdef AIO_STATS
369 ink_atomic_increment(&data->num_queue, 1);
370 #endif
371 aio_move(req);
372 /* now put the new request */
373 aio_insert(op, req);
374 ink_cond_signal(&req->aio_cond);
375 ink_mutex_release(&req->aio_mutex);
376 }
377 }
378
379 static inline int
cache_op(AIOCallbackInternal * op)380 cache_op(AIOCallbackInternal *op)
381 {
382 bool read = (op->aiocb.aio_lio_opcode == LIO_READ);
383 for (; op; op = (AIOCallbackInternal *)op->then) {
384 ink_aiocb *a = &op->aiocb;
385 ssize_t err, res = 0;
386
387 while (a->aio_nbytes - res > 0) {
388 do {
389 if (read) {
390 err = pread(a->aio_fildes, (static_cast<char *>(a->aio_buf)) + res, a->aio_nbytes - res, a->aio_offset + res);
391 } else {
392 err = pwrite(a->aio_fildes, (static_cast<char *>(a->aio_buf)) + res, a->aio_nbytes - res, a->aio_offset + res);
393 }
394 } while ((err < 0) && (errno == EINTR || errno == ENOBUFS || errno == ENOMEM));
395 if (err <= 0) {
396 Warning("cache disk operation failed %s %zd %d\n", (a->aio_lio_opcode == LIO_READ) ? "READ" : "WRITE", err, errno);
397 op->aio_result = -errno;
398 return (err);
399 }
400 res += err;
401 }
402 op->aio_result = res;
403 ink_assert(op->aio_result == (int64_t)a->aio_nbytes);
404 }
405 return 1;
406 }
407
408 int
ink_aio_read(AIOCallback * op,int fromAPI)409 ink_aio_read(AIOCallback *op, int fromAPI)
410 {
411 op->aiocb.aio_lio_opcode = LIO_READ;
412 aio_queue_req((AIOCallbackInternal *)op, fromAPI);
413
414 return 1;
415 }
416
417 int
ink_aio_write(AIOCallback * op,int fromAPI)418 ink_aio_write(AIOCallback *op, int fromAPI)
419 {
420 op->aiocb.aio_lio_opcode = LIO_WRITE;
421 aio_queue_req((AIOCallbackInternal *)op, fromAPI);
422
423 return 1;
424 }
425
426 bool
ink_aio_thread_num_set(int thread_num)427 ink_aio_thread_num_set(int thread_num)
428 {
429 if (thread_num > 0 && !thread_is_created) {
430 api_config_threads_per_disk = thread_num;
431 return true;
432 }
433
434 return false;
435 }
436
437 void *
aio_thread_main(void * arg)438 aio_thread_main(void *arg)
439 {
440 AIOThreadInfo *thr_info = static_cast<AIOThreadInfo *>(arg);
441 AIO_Reqs *my_aio_req = thr_info->req;
442 AIO_Reqs *current_req = nullptr;
443 AIOCallback *op = nullptr;
444 ink_mutex_acquire(&my_aio_req->aio_mutex);
445 for (;;) {
446 do {
447 if (TSSystemState::is_event_system_shut_down()) {
448 ink_mutex_release(&my_aio_req->aio_mutex);
449 return nullptr;
450 }
451 current_req = my_aio_req;
452 /* check if any pending requests on the atomic list */
453 aio_move(my_aio_req);
454 if (!(op = my_aio_req->aio_todo.pop())) {
455 break;
456 }
457 #ifdef AIO_STATS
458 num_requests--;
459 current_req->queued--;
460 ink_atomic_increment((int *)¤t_req->pending, 1);
461 #endif
462 // update the stats;
463 if (op->aiocb.aio_lio_opcode == LIO_WRITE) {
464 aio_num_write++;
465 aio_bytes_written += op->aiocb.aio_nbytes;
466 } else {
467 aio_num_read++;
468 aio_bytes_read += op->aiocb.aio_nbytes;
469 }
470 ink_mutex_release(¤t_req->aio_mutex);
471 cache_op((AIOCallbackInternal *)op);
472 ink_atomic_increment(¤t_req->requests_queued, -1);
473 #ifdef AIO_STATS
474 ink_atomic_increment((int *)¤t_req->pending, -1);
475 #endif
476 op->link.prev = nullptr;
477 op->link.next = nullptr;
478 op->mutex = op->action.mutex;
479 if (op->thread == AIO_CALLBACK_THREAD_AIO) {
480 SCOPED_MUTEX_LOCK(lock, op->mutex, thr_info->mutex->thread_holding);
481 op->handleEvent(EVENT_NONE, nullptr);
482 } else if (op->thread == AIO_CALLBACK_THREAD_ANY) {
483 eventProcessor.schedule_imm(op);
484 } else {
485 op->thread->schedule_imm(op);
486 }
487 ink_mutex_acquire(&my_aio_req->aio_mutex);
488 } while (true);
489 timespec timedwait_msec = ink_hrtime_to_timespec(Thread::get_hrtime_updated() + HRTIME_MSECONDS(net_config_poll_timeout));
490 ink_cond_timedwait(&my_aio_req->aio_cond, &my_aio_req->aio_mutex, &timedwait_msec);
491 }
492 return nullptr;
493 }
494 #else
495 int
startAIOEvent(int,Event * e)496 DiskHandler::startAIOEvent(int /* event ATS_UNUSED */, Event *e)
497 {
498 SET_HANDLER(&DiskHandler::mainAIOEvent);
499 e->schedule_every(AIO_PERIOD);
500 trigger_event = e;
501 return EVENT_CONT;
502 }
503
504 int
mainAIOEvent(int event,Event * e)505 DiskHandler::mainAIOEvent(int event, Event *e)
506 {
507 AIOCallback *op = nullptr;
508 Lagain:
509 int ret = io_getevents(ctx, 0, MAX_AIO_EVENTS, events, nullptr);
510 for (int i = 0; i < ret; i++) {
511 op = (AIOCallback *)events[i].data;
512 op->aio_result = events[i].res;
513 ink_assert(op->action.continuation);
514 complete_list.enqueue(op);
515 }
516
517 if (ret == MAX_AIO_EVENTS) {
518 goto Lagain;
519 }
520
521 if (ret < 0) {
522 if (errno == EINTR)
523 goto Lagain;
524 if (errno == EFAULT || errno == ENOSYS)
525 Debug("aio", "io_getevents failed: %s (%d)", strerror(-ret), -ret);
526 }
527
528 ink_aiocb *cbs[MAX_AIO_EVENTS];
529 int num = 0;
530
531 for (; num < MAX_AIO_EVENTS && ((op = ready_list.dequeue()) != nullptr); ++num) {
532 cbs[num] = &op->aiocb;
533 ink_assert(op->action.continuation);
534 }
535
536 if (num > 0) {
537 int ret;
538 do {
539 ret = io_submit(ctx, num, cbs);
540 } while (ret < 0 && ret == -EAGAIN);
541
542 if (ret != num) {
543 if (ret < 0) {
544 Debug("aio", "io_submit failed: %s (%d)", strerror(-ret), -ret);
545 } else {
546 Fatal("could not submit IOs, io_submit(%p, %d, %p) returned %d", ctx, num, cbs, ret);
547 }
548 }
549 }
550
551 while ((op = complete_list.dequeue()) != nullptr) {
552 op->mutex = op->action.mutex;
553 MUTEX_TRY_LOCK(lock, op->mutex, trigger_event->ethread);
554 if (!lock.is_locked()) {
555 trigger_event->ethread->schedule_imm(op);
556 } else {
557 op->handleEvent(EVENT_NONE, nullptr);
558 }
559 }
560 return EVENT_CONT;
561 }
562
563 int
ink_aio_read(AIOCallback * op,int)564 ink_aio_read(AIOCallback *op, int /* fromAPI ATS_UNUSED */)
565 {
566 op->aiocb.aio_lio_opcode = IO_CMD_PREAD;
567 op->aiocb.data = op;
568 EThread *t = this_ethread();
569 #ifdef HAVE_EVENTFD
570 io_set_eventfd(&op->aiocb, t->evfd);
571 #endif
572 t->diskHandler->ready_list.enqueue(op);
573
574 return 1;
575 }
576
577 int
ink_aio_write(AIOCallback * op,int)578 ink_aio_write(AIOCallback *op, int /* fromAPI ATS_UNUSED */)
579 {
580 op->aiocb.aio_lio_opcode = IO_CMD_PWRITE;
581 op->aiocb.data = op;
582 EThread *t = this_ethread();
583 #ifdef HAVE_EVENTFD
584 io_set_eventfd(&op->aiocb, t->evfd);
585 #endif
586 t->diskHandler->ready_list.enqueue(op);
587
588 return 1;
589 }
590
591 int
ink_aio_readv(AIOCallback * op,int)592 ink_aio_readv(AIOCallback *op, int /* fromAPI ATS_UNUSED */)
593 {
594 EThread *t = this_ethread();
595 DiskHandler *dh = t->diskHandler;
596 AIOCallback *io = op;
597 int sz = 0;
598
599 while (io) {
600 io->aiocb.aio_lio_opcode = IO_CMD_PREAD;
601 io->aiocb.data = io;
602 #ifdef HAVE_EVENTFD
603 io_set_eventfd(&op->aiocb, t->evfd);
604 #endif
605 dh->ready_list.enqueue(io);
606 ++sz;
607 io = io->then;
608 }
609
610 if (sz > 1) {
611 ink_assert(op->action.continuation);
612 AIOVec *vec = new AIOVec(sz, op);
613 while (--sz >= 0) {
614 op->action = vec;
615 op = op->then;
616 }
617 }
618 return 1;
619 }
620
621 int
ink_aio_writev(AIOCallback * op,int)622 ink_aio_writev(AIOCallback *op, int /* fromAPI ATS_UNUSED */)
623 {
624 EThread *t = this_ethread();
625 DiskHandler *dh = t->diskHandler;
626 AIOCallback *io = op;
627 int sz = 0;
628
629 while (io) {
630 io->aiocb.aio_lio_opcode = IO_CMD_PWRITE;
631 io->aiocb.data = io;
632 #ifdef HAVE_EVENTFD
633 io_set_eventfd(&op->aiocb, t->evfd);
634 #endif
635 dh->ready_list.enqueue(io);
636 ++sz;
637 io = io->then;
638 }
639
640 if (sz > 1) {
641 ink_assert(op->action.continuation);
642 AIOVec *vec = new AIOVec(sz, op);
643 while (--sz >= 0) {
644 op->action = vec;
645 op = op->then;
646 }
647 }
648 return 1;
649 }
650 #endif // AIO_MODE != AIO_MODE_NATIVE
651