1 /*
2 * CDDL HEADER START
3 *
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
7 *
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
12 *
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
18 *
19 * CDDL HEADER END
20 */
21
22 /*
23 * Copyright 2008 Sun Microsystems, Inc. All rights reserved.
24 * Use is subject to license terms.
25 */
26
27 #include "lint.h"
28 #include "thr_uberdata.h"
29 #include "libc.h"
30 #include "asyncio.h"
31 #include <atomic.h>
32 #include <sys/param.h>
33 #include <sys/file.h>
34 #include <sys/port.h>
35
36 static int _aio_hash_insert(aio_result_t *, aio_req_t *);
37 static aio_req_t *_aio_req_get(aio_worker_t *);
38 static void _aio_req_add(aio_req_t *, aio_worker_t **, int);
39 static void _aio_req_del(aio_worker_t *, aio_req_t *, int);
40 static void _aio_work_done(aio_worker_t *);
41 static void _aio_enq_doneq(aio_req_t *);
42
43 extern void _aio_lio_free(aio_lio_t *);
44
45 extern int __fcntl(int, int, ...);
46 extern int _port_dispatch(int, int, int, int, uintptr_t, void *);
47
48 static int _aio_fsync_del(aio_worker_t *, aio_req_t *);
49 static void _aiodone(aio_req_t *, ssize_t, int);
50 static void _aio_cancel_work(aio_worker_t *, int, int *, int *);
51 static void _aio_finish_request(aio_worker_t *, ssize_t, int);
52
53 /*
54 * switch for kernel async I/O
55 */
56 int _kaio_ok = 0; /* 0 = disabled, 1 = on, -1 = error */
57
58 /*
59 * Key for thread-specific data
60 */
61 pthread_key_t _aio_key;
62
63 /*
64 * Array for determining whether or not a file supports kaio.
65 * Initialized in _kaio_init().
66 */
67 uint32_t *_kaio_supported = NULL;
68
69 /*
70 * workers for read/write requests
71 * (__aio_mutex lock protects circular linked list of workers)
72 */
73 aio_worker_t *__workers_rw; /* circular list of AIO workers */
74 aio_worker_t *__nextworker_rw; /* next worker in list of workers */
75 int __rw_workerscnt; /* number of read/write workers */
76
77 /*
78 * worker for notification requests.
79 */
80 aio_worker_t *__workers_no; /* circular list of AIO workers */
81 aio_worker_t *__nextworker_no; /* next worker in list of workers */
82 int __no_workerscnt; /* number of write workers */
83
84 aio_req_t *_aio_done_tail; /* list of done requests */
85 aio_req_t *_aio_done_head;
86
87 mutex_t __aio_initlock = DEFAULTMUTEX; /* makes aio initialization atomic */
88 cond_t __aio_initcv = DEFAULTCV;
89 int __aio_initbusy = 0;
90
91 mutex_t __aio_mutex = DEFAULTMUTEX; /* protects counts, and linked lists */
92 cond_t _aio_iowait_cv = DEFAULTCV; /* wait for userland I/Os */
93
94 pid_t __pid = (pid_t)-1; /* initialize as invalid pid */
95 int _sigio_enabled = 0; /* when set, send SIGIO signal */
96
97 aio_hash_t *_aio_hash;
98
99 aio_req_t *_aio_doneq; /* double linked done queue list */
100
101 int _aio_donecnt = 0;
102 int _aio_waitncnt = 0; /* # of requests for aio_waitn */
103 int _aio_doneq_cnt = 0;
104 int _aio_outstand_cnt = 0; /* # of outstanding requests */
105 int _kaio_outstand_cnt = 0; /* # of outstanding kaio requests */
106 int _aio_req_done_cnt = 0; /* req. done but not in "done queue" */
107 int _aio_kernel_suspend = 0; /* active kernel kaio calls */
108 int _aio_suscv_cnt = 0; /* aio_suspend calls waiting on cv's */
109
110 int _max_workers = 256; /* max number of workers permitted */
111 int _min_workers = 4; /* min number of workers */
112 int _minworkload = 2; /* min number of request in q */
113 int _aio_worker_cnt = 0; /* number of workers to do requests */
114 int __uaio_ok = 0; /* AIO has been enabled */
115 sigset_t _worker_set; /* worker's signal mask */
116
117 int _aiowait_flag = 0; /* when set, aiowait() is inprogress */
118 int _aio_flags = 0; /* see asyncio.h defines for */
119
120 aio_worker_t *_kaiowp = NULL; /* points to kaio cleanup thread */
121
122 int hz; /* clock ticks per second */
123
124 static int
_kaio_supported_init(void)125 _kaio_supported_init(void)
126 {
127 void *ptr;
128 size_t size;
129
130 if (_kaio_supported != NULL) /* already initialized */
131 return (0);
132
133 size = MAX_KAIO_FDARRAY_SIZE * sizeof (uint32_t);
134 ptr = mmap(NULL, size, PROT_READ | PROT_WRITE,
135 MAP_PRIVATE | MAP_ANON, -1, (off_t)0);
136 if (ptr == MAP_FAILED)
137 return (-1);
138 _kaio_supported = ptr;
139 return (0);
140 }
141
142 /*
143 * The aio subsystem is initialized when an AIO request is made.
144 * Constants are initialized like the max number of workers that
145 * the subsystem can create, and the minimum number of workers
146 * permitted before imposing some restrictions. Also, some
147 * workers are created.
148 */
149 int
__uaio_init(void)150 __uaio_init(void)
151 {
152 int ret = -1;
153 int i;
154 int cancel_state;
155
156 lmutex_lock(&__aio_initlock);
157 (void) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &cancel_state);
158 while (__aio_initbusy)
159 (void) cond_wait(&__aio_initcv, &__aio_initlock);
160 (void) pthread_setcancelstate(cancel_state, NULL);
161 if (__uaio_ok) { /* already initialized */
162 lmutex_unlock(&__aio_initlock);
163 return (0);
164 }
165 __aio_initbusy = 1;
166 lmutex_unlock(&__aio_initlock);
167
168 hz = (int)sysconf(_SC_CLK_TCK);
169 __pid = getpid();
170
171 setup_cancelsig(SIGAIOCANCEL);
172
173 if (_kaio_supported_init() != 0)
174 goto out;
175
176 /*
177 * Allocate and initialize the hash table.
178 * Do this only once, even if __uaio_init() is called twice.
179 */
180 if (_aio_hash == NULL) {
181 /* LINTED pointer cast */
182 _aio_hash = (aio_hash_t *)mmap(NULL,
183 HASHSZ * sizeof (aio_hash_t), PROT_READ | PROT_WRITE,
184 MAP_PRIVATE | MAP_ANON, -1, (off_t)0);
185 if ((void *)_aio_hash == MAP_FAILED) {
186 _aio_hash = NULL;
187 goto out;
188 }
189 for (i = 0; i < HASHSZ; i++)
190 (void) mutex_init(&_aio_hash[i].hash_lock,
191 USYNC_THREAD, NULL);
192 }
193
194 /*
195 * Initialize worker's signal mask to only catch SIGAIOCANCEL.
196 */
197 (void) sigfillset(&_worker_set);
198 (void) sigdelset(&_worker_set, SIGAIOCANCEL);
199
200 /*
201 * Create one worker to send asynchronous notifications.
202 * Do this only once, even if __uaio_init() is called twice.
203 */
204 if (__no_workerscnt == 0 &&
205 (_aio_create_worker(NULL, AIONOTIFY) != 0)) {
206 errno = EAGAIN;
207 goto out;
208 }
209
210 /*
211 * Create the minimum number of read/write workers.
212 * And later check whether atleast one worker is created;
213 * lwp_create() calls could fail because of segkp exhaustion.
214 */
215 for (i = 0; i < _min_workers; i++)
216 (void) _aio_create_worker(NULL, AIOREAD);
217 if (__rw_workerscnt == 0) {
218 errno = EAGAIN;
219 goto out;
220 }
221
222 ret = 0;
223 out:
224 lmutex_lock(&__aio_initlock);
225 if (ret == 0)
226 __uaio_ok = 1;
227 __aio_initbusy = 0;
228 (void) cond_broadcast(&__aio_initcv);
229 lmutex_unlock(&__aio_initlock);
230 return (ret);
231 }
232
233 /*
234 * Called from close() before actually performing the real _close().
235 */
236 void
_aio_close(int fd)237 _aio_close(int fd)
238 {
239 if (fd < 0) /* avoid cancelling everything */
240 return;
241 /*
242 * Cancel all outstanding aio requests for this file descriptor.
243 */
244 if (__uaio_ok)
245 (void) aiocancel_all(fd);
246 /*
247 * If we have allocated the bit array, clear the bit for this file.
248 * The next open may re-use this file descriptor and the new file
249 * may have different kaio() behaviour.
250 */
251 if (_kaio_supported != NULL)
252 CLEAR_KAIO_SUPPORTED(fd);
253 }
254
255 /*
256 * special kaio cleanup thread sits in a loop in the
257 * kernel waiting for pending kaio requests to complete.
258 */
259 void *
_kaio_cleanup_thread(void * arg)260 _kaio_cleanup_thread(void *arg)
261 {
262 if (pthread_setspecific(_aio_key, arg) != 0)
263 aio_panic("_kaio_cleanup_thread, pthread_setspecific()");
264 (void) _kaio(AIOSTART);
265 return (arg);
266 }
267
268 /*
269 * initialize kaio.
270 */
271 void
_kaio_init()272 _kaio_init()
273 {
274 int error;
275 sigset_t oset;
276 int cancel_state;
277
278 lmutex_lock(&__aio_initlock);
279 (void) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &cancel_state);
280 while (__aio_initbusy)
281 (void) cond_wait(&__aio_initcv, &__aio_initlock);
282 (void) pthread_setcancelstate(cancel_state, NULL);
283 if (_kaio_ok) { /* already initialized */
284 lmutex_unlock(&__aio_initlock);
285 return;
286 }
287 __aio_initbusy = 1;
288 lmutex_unlock(&__aio_initlock);
289
290 if (_kaio_supported_init() != 0)
291 error = ENOMEM;
292 else if ((_kaiowp = _aio_worker_alloc()) == NULL)
293 error = ENOMEM;
294 else if ((error = (int)_kaio(AIOINIT)) == 0) {
295 (void) pthread_sigmask(SIG_SETMASK, &maskset, &oset);
296 error = thr_create(NULL, AIOSTKSIZE, _kaio_cleanup_thread,
297 _kaiowp, THR_DAEMON, &_kaiowp->work_tid);
298 (void) pthread_sigmask(SIG_SETMASK, &oset, NULL);
299 }
300 if (error && _kaiowp != NULL) {
301 _aio_worker_free(_kaiowp);
302 _kaiowp = NULL;
303 }
304
305 lmutex_lock(&__aio_initlock);
306 if (error)
307 _kaio_ok = -1;
308 else
309 _kaio_ok = 1;
310 __aio_initbusy = 0;
311 (void) cond_broadcast(&__aio_initcv);
312 lmutex_unlock(&__aio_initlock);
313 }
314
315 int
aioread(int fd,caddr_t buf,int bufsz,off_t offset,int whence,aio_result_t * resultp)316 aioread(int fd, caddr_t buf, int bufsz, off_t offset, int whence,
317 aio_result_t *resultp)
318 {
319 return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOREAD));
320 }
321
322 int
aiowrite(int fd,caddr_t buf,int bufsz,off_t offset,int whence,aio_result_t * resultp)323 aiowrite(int fd, caddr_t buf, int bufsz, off_t offset, int whence,
324 aio_result_t *resultp)
325 {
326 return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOWRITE));
327 }
328
329 #if !defined(_LP64)
330 int
aioread64(int fd,caddr_t buf,int bufsz,off64_t offset,int whence,aio_result_t * resultp)331 aioread64(int fd, caddr_t buf, int bufsz, off64_t offset, int whence,
332 aio_result_t *resultp)
333 {
334 return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOAREAD64));
335 }
336
337 int
aiowrite64(int fd,caddr_t buf,int bufsz,off64_t offset,int whence,aio_result_t * resultp)338 aiowrite64(int fd, caddr_t buf, int bufsz, off64_t offset, int whence,
339 aio_result_t *resultp)
340 {
341 return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOAWRITE64));
342 }
343 #endif /* !defined(_LP64) */
344
345 int
_aiorw(int fd,caddr_t buf,int bufsz,offset_t offset,int whence,aio_result_t * resultp,int mode)346 _aiorw(int fd, caddr_t buf, int bufsz, offset_t offset, int whence,
347 aio_result_t *resultp, int mode)
348 {
349 aio_req_t *reqp;
350 aio_args_t *ap;
351 offset_t loffset;
352 struct stat64 stat64;
353 int error = 0;
354 int kerr;
355 int umode;
356
357 switch (whence) {
358
359 case SEEK_SET:
360 loffset = offset;
361 break;
362 case SEEK_CUR:
363 if ((loffset = llseek(fd, 0, SEEK_CUR)) == -1)
364 error = -1;
365 else
366 loffset += offset;
367 break;
368 case SEEK_END:
369 if (fstat64(fd, &stat64) == -1)
370 error = -1;
371 else
372 loffset = offset + stat64.st_size;
373 break;
374 default:
375 errno = EINVAL;
376 error = -1;
377 }
378
379 if (error)
380 return (error);
381
382 /* initialize kaio */
383 if (!_kaio_ok)
384 _kaio_init();
385
386 /*
387 * _aio_do_request() needs the original request code (mode) to be able
388 * to choose the appropiate 32/64 bit function. All other functions
389 * only require the difference between READ and WRITE (umode).
390 */
391 if (mode == AIOAREAD64 || mode == AIOAWRITE64)
392 umode = mode - AIOAREAD64;
393 else
394 umode = mode;
395
396 /*
397 * Try kernel aio first.
398 * If errno is ENOTSUP/EBADFD, fall back to the thread implementation.
399 */
400 if (_kaio_ok > 0 && KAIO_SUPPORTED(fd)) {
401 resultp->aio_errno = 0;
402 sig_mutex_lock(&__aio_mutex);
403 _kaio_outstand_cnt++;
404 sig_mutex_unlock(&__aio_mutex);
405 kerr = (int)_kaio(((resultp->aio_return == AIO_INPROGRESS) ?
406 (umode | AIO_POLL_BIT) : umode),
407 fd, buf, bufsz, loffset, resultp);
408 if (kerr == 0) {
409 return (0);
410 }
411 sig_mutex_lock(&__aio_mutex);
412 _kaio_outstand_cnt--;
413 sig_mutex_unlock(&__aio_mutex);
414 if (errno != ENOTSUP && errno != EBADFD)
415 return (-1);
416 if (errno == EBADFD)
417 SET_KAIO_NOT_SUPPORTED(fd);
418 }
419
420 if (!__uaio_ok && __uaio_init() == -1)
421 return (-1);
422
423 if ((reqp = _aio_req_alloc()) == NULL) {
424 errno = EAGAIN;
425 return (-1);
426 }
427
428 /*
429 * _aio_do_request() checks reqp->req_op to differentiate
430 * between 32 and 64 bit access.
431 */
432 reqp->req_op = mode;
433 reqp->req_resultp = resultp;
434 ap = &reqp->req_args;
435 ap->fd = fd;
436 ap->buf = buf;
437 ap->bufsz = bufsz;
438 ap->offset = loffset;
439
440 if (_aio_hash_insert(resultp, reqp) != 0) {
441 _aio_req_free(reqp);
442 errno = EINVAL;
443 return (-1);
444 }
445 /*
446 * _aio_req_add() only needs the difference between READ and
447 * WRITE to choose the right worker queue.
448 */
449 _aio_req_add(reqp, &__nextworker_rw, umode);
450 return (0);
451 }
452
453 int
aiocancel(aio_result_t * resultp)454 aiocancel(aio_result_t *resultp)
455 {
456 aio_req_t *reqp;
457 aio_worker_t *aiowp;
458 int ret;
459 int done = 0;
460 int canceled = 0;
461
462 if (!__uaio_ok) {
463 errno = EINVAL;
464 return (-1);
465 }
466
467 sig_mutex_lock(&__aio_mutex);
468 reqp = _aio_hash_find(resultp);
469 if (reqp == NULL) {
470 if (_aio_outstand_cnt == _aio_req_done_cnt)
471 errno = EINVAL;
472 else
473 errno = EACCES;
474 ret = -1;
475 } else {
476 aiowp = reqp->req_worker;
477 sig_mutex_lock(&aiowp->work_qlock1);
478 (void) _aio_cancel_req(aiowp, reqp, &canceled, &done);
479 sig_mutex_unlock(&aiowp->work_qlock1);
480
481 if (canceled) {
482 ret = 0;
483 } else {
484 if (_aio_outstand_cnt == 0 ||
485 _aio_outstand_cnt == _aio_req_done_cnt)
486 errno = EINVAL;
487 else
488 errno = EACCES;
489 ret = -1;
490 }
491 }
492 sig_mutex_unlock(&__aio_mutex);
493 return (ret);
494 }
495
496 static void
_aiowait_cleanup(void * arg __unused)497 _aiowait_cleanup(void *arg __unused)
498 {
499 sig_mutex_lock(&__aio_mutex);
500 _aiowait_flag--;
501 sig_mutex_unlock(&__aio_mutex);
502 }
503
504 /*
505 * This must be asynch safe and cancel safe
506 */
507 aio_result_t *
aiowait(struct timeval * uwait)508 aiowait(struct timeval *uwait)
509 {
510 aio_result_t *uresultp;
511 aio_result_t *kresultp;
512 aio_result_t *resultp;
513 int dontblock;
514 int timedwait = 0;
515 int kaio_errno = 0;
516 struct timeval twait;
517 struct timeval *wait = NULL;
518 hrtime_t hrtend;
519 hrtime_t hres;
520
521 if (uwait) {
522 /*
523 * Check for a valid specified wait time.
524 * If it is invalid, fail the call right away.
525 */
526 if (uwait->tv_sec < 0 || uwait->tv_usec < 0 ||
527 uwait->tv_usec >= MICROSEC) {
528 errno = EINVAL;
529 return ((aio_result_t *)-1);
530 }
531
532 if (uwait->tv_sec > 0 || uwait->tv_usec > 0) {
533 hrtend = gethrtime() +
534 (hrtime_t)uwait->tv_sec * NANOSEC +
535 (hrtime_t)uwait->tv_usec * (NANOSEC / MICROSEC);
536 twait = *uwait;
537 wait = &twait;
538 timedwait++;
539 } else {
540 /* polling */
541 sig_mutex_lock(&__aio_mutex);
542 if (_kaio_outstand_cnt == 0) {
543 kresultp = (aio_result_t *)-1;
544 } else {
545 kresultp = (aio_result_t *)_kaio(AIOWAIT,
546 (struct timeval *)-1, 1);
547 if (kresultp != (aio_result_t *)-1 &&
548 kresultp != NULL &&
549 kresultp != (aio_result_t *)1) {
550 _kaio_outstand_cnt--;
551 sig_mutex_unlock(&__aio_mutex);
552 return (kresultp);
553 }
554 }
555 uresultp = _aio_req_done();
556 sig_mutex_unlock(&__aio_mutex);
557 if (uresultp != NULL &&
558 uresultp != (aio_result_t *)-1) {
559 return (uresultp);
560 }
561 if (uresultp == (aio_result_t *)-1 &&
562 kresultp == (aio_result_t *)-1) {
563 errno = EINVAL;
564 return ((aio_result_t *)-1);
565 } else {
566 return (NULL);
567 }
568 }
569 }
570
571 for (;;) {
572 sig_mutex_lock(&__aio_mutex);
573 uresultp = _aio_req_done();
574 if (uresultp != NULL && uresultp != (aio_result_t *)-1) {
575 sig_mutex_unlock(&__aio_mutex);
576 resultp = uresultp;
577 break;
578 }
579 _aiowait_flag++;
580 dontblock = (uresultp == (aio_result_t *)-1);
581 if (dontblock && _kaio_outstand_cnt == 0) {
582 kresultp = (aio_result_t *)-1;
583 kaio_errno = EINVAL;
584 } else {
585 sig_mutex_unlock(&__aio_mutex);
586 pthread_cleanup_push(_aiowait_cleanup, NULL);
587 _cancel_prologue();
588 kresultp = (aio_result_t *)_kaio(AIOWAIT,
589 wait, dontblock);
590 _cancel_epilogue();
591 pthread_cleanup_pop(0);
592 sig_mutex_lock(&__aio_mutex);
593 kaio_errno = errno;
594 }
595 _aiowait_flag--;
596 sig_mutex_unlock(&__aio_mutex);
597 if (kresultp == (aio_result_t *)1) {
598 /* aiowait() awakened by an aionotify() */
599 continue;
600 } else if (kresultp != NULL &&
601 kresultp != (aio_result_t *)-1) {
602 resultp = kresultp;
603 sig_mutex_lock(&__aio_mutex);
604 _kaio_outstand_cnt--;
605 sig_mutex_unlock(&__aio_mutex);
606 break;
607 } else if (kresultp == (aio_result_t *)-1 &&
608 kaio_errno == EINVAL &&
609 uresultp == (aio_result_t *)-1) {
610 errno = kaio_errno;
611 resultp = (aio_result_t *)-1;
612 break;
613 } else if (kresultp == (aio_result_t *)-1 &&
614 kaio_errno == EINTR) {
615 errno = kaio_errno;
616 resultp = (aio_result_t *)-1;
617 break;
618 } else if (timedwait) {
619 hres = hrtend - gethrtime();
620 if (hres <= 0) {
621 /* time is up; return */
622 resultp = NULL;
623 break;
624 } else {
625 /*
626 * Some time left. Round up the remaining time
627 * in nanoseconds to microsec. Retry the call.
628 */
629 hres += (NANOSEC / MICROSEC) - 1;
630 wait->tv_sec = hres / NANOSEC;
631 wait->tv_usec =
632 (hres % NANOSEC) / (NANOSEC / MICROSEC);
633 }
634 } else {
635 ASSERT(kresultp == NULL && uresultp == NULL);
636 resultp = NULL;
637 continue;
638 }
639 }
640 return (resultp);
641 }
642
643 /*
644 * _aio_get_timedelta calculates the remaining time and stores the result
645 * into timespec_t *wait.
646 */
647
648 int
_aio_get_timedelta(timespec_t * end,timespec_t * wait)649 _aio_get_timedelta(timespec_t *end, timespec_t *wait)
650 {
651 int ret = 0;
652 struct timeval cur;
653 timespec_t curtime;
654
655 (void) gettimeofday(&cur, NULL);
656 curtime.tv_sec = cur.tv_sec;
657 curtime.tv_nsec = cur.tv_usec * 1000; /* convert us to ns */
658
659 if (end->tv_sec >= curtime.tv_sec) {
660 wait->tv_sec = end->tv_sec - curtime.tv_sec;
661 if (end->tv_nsec >= curtime.tv_nsec) {
662 wait->tv_nsec = end->tv_nsec - curtime.tv_nsec;
663 if (wait->tv_sec == 0 && wait->tv_nsec == 0)
664 ret = -1; /* timer expired */
665 } else {
666 if (end->tv_sec > curtime.tv_sec) {
667 wait->tv_sec -= 1;
668 wait->tv_nsec = NANOSEC -
669 (curtime.tv_nsec - end->tv_nsec);
670 } else {
671 ret = -1; /* timer expired */
672 }
673 }
674 } else {
675 ret = -1;
676 }
677 return (ret);
678 }
679
680 /*
681 * If closing by file descriptor: we will simply cancel all the outstanding
682 * aio`s and return. Those aio's in question will have either noticed the
683 * cancellation notice before, during, or after initiating io.
684 */
685 int
aiocancel_all(int fd)686 aiocancel_all(int fd)
687 {
688 aio_req_t *reqp;
689 aio_req_t **reqpp, *last;
690 aio_worker_t *first;
691 aio_worker_t *next;
692 int canceled = 0;
693 int done = 0;
694 int cancelall = 0;
695
696 sig_mutex_lock(&__aio_mutex);
697
698 if (_aio_outstand_cnt == 0) {
699 sig_mutex_unlock(&__aio_mutex);
700 return (AIO_ALLDONE);
701 }
702
703 /*
704 * Cancel requests from the read/write workers' queues.
705 */
706 first = __nextworker_rw;
707 next = first;
708 do {
709 _aio_cancel_work(next, fd, &canceled, &done);
710 } while ((next = next->work_forw) != first);
711
712 /*
713 * finally, check if there are requests on the done queue that
714 * should be canceled.
715 */
716 if (fd < 0)
717 cancelall = 1;
718 reqpp = &_aio_done_tail;
719 last = _aio_done_tail;
720 while ((reqp = *reqpp) != NULL) {
721 if (cancelall || reqp->req_args.fd == fd) {
722 *reqpp = reqp->req_next;
723 if (last == reqp) {
724 last = reqp->req_next;
725 }
726 if (_aio_done_head == reqp) {
727 /* this should be the last req in list */
728 _aio_done_head = last;
729 }
730 _aio_donecnt--;
731 _aio_set_result(reqp, -1, ECANCELED);
732 (void) _aio_hash_del(reqp->req_resultp);
733 _aio_req_free(reqp);
734 } else {
735 reqpp = &reqp->req_next;
736 last = reqp;
737 }
738 }
739
740 if (cancelall) {
741 ASSERT(_aio_donecnt == 0);
742 _aio_done_head = NULL;
743 }
744 sig_mutex_unlock(&__aio_mutex);
745
746 if (canceled && done == 0)
747 return (AIO_CANCELED);
748 else if (done && canceled == 0)
749 return (AIO_ALLDONE);
750 else if ((canceled + done == 0) && KAIO_SUPPORTED(fd))
751 return ((int)_kaio(AIOCANCEL, fd, NULL));
752 return (AIO_NOTCANCELED);
753 }
754
755 /*
756 * Cancel requests from a given work queue. If the file descriptor
757 * parameter, fd, is non-negative, then only cancel those requests
758 * in this queue that are to this file descriptor. If the fd
759 * parameter is -1, then cancel all requests.
760 */
761 static void
_aio_cancel_work(aio_worker_t * aiowp,int fd,int * canceled,int * done)762 _aio_cancel_work(aio_worker_t *aiowp, int fd, int *canceled, int *done)
763 {
764 aio_req_t *reqp;
765
766 sig_mutex_lock(&aiowp->work_qlock1);
767 /*
768 * cancel queued requests first.
769 */
770 reqp = aiowp->work_tail1;
771 while (reqp != NULL) {
772 if (fd < 0 || reqp->req_args.fd == fd) {
773 if (_aio_cancel_req(aiowp, reqp, canceled, done)) {
774 /*
775 * Callers locks were dropped.
776 * reqp is invalid; start traversing
777 * the list from the beginning again.
778 */
779 reqp = aiowp->work_tail1;
780 continue;
781 }
782 }
783 reqp = reqp->req_next;
784 }
785 /*
786 * Since the queued requests have been canceled, there can
787 * only be one inprogress request that should be canceled.
788 */
789 if ((reqp = aiowp->work_req) != NULL &&
790 (fd < 0 || reqp->req_args.fd == fd))
791 (void) _aio_cancel_req(aiowp, reqp, canceled, done);
792 sig_mutex_unlock(&aiowp->work_qlock1);
793 }
794
795 /*
796 * Cancel a request. Return 1 if the callers locks were temporarily
797 * dropped, otherwise return 0.
798 */
799 int
_aio_cancel_req(aio_worker_t * aiowp,aio_req_t * reqp,int * canceled,int * done)800 _aio_cancel_req(aio_worker_t *aiowp, aio_req_t *reqp, int *canceled, int *done)
801 {
802 int ostate = reqp->req_state;
803
804 ASSERT(MUTEX_HELD(&__aio_mutex));
805 ASSERT(MUTEX_HELD(&aiowp->work_qlock1));
806 if (ostate == AIO_REQ_CANCELED)
807 return (0);
808 if (ostate == AIO_REQ_DONE && !POSIX_AIO(reqp) &&
809 aiowp->work_prev1 == reqp) {
810 ASSERT(aiowp->work_done1 != 0);
811 /*
812 * If not on the done queue yet, just mark it CANCELED,
813 * _aio_work_done() will do the necessary clean up.
814 * This is required to ensure that aiocancel_all() cancels
815 * all the outstanding requests, including this one which
816 * is not yet on done queue but has been marked done.
817 */
818 _aio_set_result(reqp, -1, ECANCELED);
819 (void) _aio_hash_del(reqp->req_resultp);
820 reqp->req_state = AIO_REQ_CANCELED;
821 (*canceled)++;
822 return (0);
823 }
824
825 if (ostate == AIO_REQ_DONE || ostate == AIO_REQ_DONEQ) {
826 (*done)++;
827 return (0);
828 }
829 if (reqp->req_op == AIOFSYNC && reqp != aiowp->work_req) {
830 ASSERT(POSIX_AIO(reqp));
831 /* Cancel the queued aio_fsync() request */
832 if (!reqp->req_head->lio_canned) {
833 reqp->req_head->lio_canned = 1;
834 _aio_outstand_cnt--;
835 (*canceled)++;
836 }
837 return (0);
838 }
839 reqp->req_state = AIO_REQ_CANCELED;
840 _aio_req_del(aiowp, reqp, ostate);
841 (void) _aio_hash_del(reqp->req_resultp);
842 (*canceled)++;
843 if (reqp == aiowp->work_req) {
844 ASSERT(ostate == AIO_REQ_INPROGRESS);
845 /*
846 * Set the result values now, before _aiodone() is called.
847 * We do this because the application can expect aio_return
848 * and aio_errno to be set to -1 and ECANCELED, respectively,
849 * immediately after a successful return from aiocancel()
850 * or aio_cancel().
851 */
852 _aio_set_result(reqp, -1, ECANCELED);
853 (void) thr_kill(aiowp->work_tid, SIGAIOCANCEL);
854 return (0);
855 }
856 if (!POSIX_AIO(reqp)) {
857 _aio_outstand_cnt--;
858 _aio_set_result(reqp, -1, ECANCELED);
859 _aio_req_free(reqp);
860 return (0);
861 }
862 sig_mutex_unlock(&aiowp->work_qlock1);
863 sig_mutex_unlock(&__aio_mutex);
864 _aiodone(reqp, -1, ECANCELED);
865 sig_mutex_lock(&__aio_mutex);
866 sig_mutex_lock(&aiowp->work_qlock1);
867 return (1);
868 }
869
870 int
_aio_create_worker(aio_req_t * reqp,int mode)871 _aio_create_worker(aio_req_t *reqp, int mode)
872 {
873 aio_worker_t *aiowp, **workers, **nextworker;
874 int *aio_workerscnt;
875 void *(*func)(void *);
876 sigset_t oset;
877 int error;
878
879 /*
880 * Put the new worker thread in the right queue.
881 */
882 switch (mode) {
883 case AIOREAD:
884 case AIOWRITE:
885 case AIOAREAD:
886 case AIOAWRITE:
887 #if !defined(_LP64)
888 case AIOAREAD64:
889 case AIOAWRITE64:
890 #endif
891 workers = &__workers_rw;
892 nextworker = &__nextworker_rw;
893 aio_workerscnt = &__rw_workerscnt;
894 func = _aio_do_request;
895 break;
896 case AIONOTIFY:
897 workers = &__workers_no;
898 nextworker = &__nextworker_no;
899 func = _aio_do_notify;
900 aio_workerscnt = &__no_workerscnt;
901 break;
902 default:
903 aio_panic("_aio_create_worker: invalid mode");
904 break;
905 }
906
907 if ((aiowp = _aio_worker_alloc()) == NULL)
908 return (-1);
909
910 if (reqp) {
911 reqp->req_state = AIO_REQ_QUEUED;
912 reqp->req_worker = aiowp;
913 aiowp->work_head1 = reqp;
914 aiowp->work_tail1 = reqp;
915 aiowp->work_next1 = reqp;
916 aiowp->work_count1 = 1;
917 aiowp->work_minload1 = 1;
918 }
919
920 (void) pthread_sigmask(SIG_SETMASK, &maskset, &oset);
921 error = thr_create(NULL, AIOSTKSIZE, func, aiowp,
922 THR_DAEMON | THR_SUSPENDED, &aiowp->work_tid);
923 (void) pthread_sigmask(SIG_SETMASK, &oset, NULL);
924 if (error) {
925 if (reqp) {
926 reqp->req_state = 0;
927 reqp->req_worker = NULL;
928 }
929 _aio_worker_free(aiowp);
930 return (-1);
931 }
932
933 lmutex_lock(&__aio_mutex);
934 (*aio_workerscnt)++;
935 if (*workers == NULL) {
936 aiowp->work_forw = aiowp;
937 aiowp->work_backw = aiowp;
938 *nextworker = aiowp;
939 *workers = aiowp;
940 } else {
941 aiowp->work_backw = (*workers)->work_backw;
942 aiowp->work_forw = (*workers);
943 (*workers)->work_backw->work_forw = aiowp;
944 (*workers)->work_backw = aiowp;
945 }
946 _aio_worker_cnt++;
947 lmutex_unlock(&__aio_mutex);
948
949 (void) thr_continue(aiowp->work_tid);
950
951 return (0);
952 }
953
954 /*
955 * This is the worker's main routine.
956 * The task of this function is to execute all queued requests;
957 * once the last pending request is executed this function will block
958 * in _aio_idle(). A new incoming request must wakeup this thread to
959 * restart the work.
960 * Every worker has an own work queue. The queue lock is required
961 * to synchronize the addition of new requests for this worker or
962 * cancellation of pending/running requests.
963 *
964 * Cancellation scenarios:
965 * The cancellation of a request is being done asynchronously using
966 * _aio_cancel_req() from another thread context.
967 * A queued request can be cancelled in different manners :
968 * a) request is queued but not "in progress" or "done" (AIO_REQ_QUEUED):
969 * - lock the queue -> remove the request -> unlock the queue
970 * - this function/thread does not detect this cancellation process
971 * b) request is in progress (AIO_REQ_INPROGRESS) :
972 * - this function first allow the cancellation of the running
973 * request with the flag "work_cancel_flg=1"
974 * see _aio_req_get() -> _aio_cancel_on()
975 * During this phase, it is allowed to interrupt the worker
976 * thread running the request (this thread) using the SIGAIOCANCEL
977 * signal.
978 * Once this thread returns from the kernel (because the request
979 * is just done), then it must disable a possible cancellation
980 * and proceed to finish the request. To disable the cancellation
981 * this thread must use _aio_cancel_off() to set "work_cancel_flg=0".
982 * c) request is already done (AIO_REQ_DONE || AIO_REQ_DONEQ):
983 * same procedure as in a)
984 *
985 * To b)
986 * This thread uses sigsetjmp() to define the position in the code, where
987 * it wish to continue working in the case that a SIGAIOCANCEL signal
988 * is detected.
989 * Normally this thread should get the cancellation signal during the
990 * kernel phase (reading or writing). In that case the signal handler
991 * aiosigcancelhndlr() is activated using the worker thread context,
992 * which again will use the siglongjmp() function to break the standard
993 * code flow and jump to the "sigsetjmp" position, provided that
994 * "work_cancel_flg" is set to "1".
995 * Because the "work_cancel_flg" is only manipulated by this worker
996 * thread and it can only run on one CPU at a given time, it is not
997 * necessary to protect that flag with the queue lock.
998 * Returning from the kernel (read or write system call) we must
999 * first disable the use of the SIGAIOCANCEL signal and accordingly
1000 * the use of the siglongjmp() function to prevent a possible deadlock:
1001 * - It can happens that this worker thread returns from the kernel and
1002 * blocks in "work_qlock1",
1003 * - then a second thread cancels the apparently "in progress" request
1004 * and sends the SIGAIOCANCEL signal to the worker thread,
1005 * - the worker thread gets assigned the "work_qlock1" and will returns
1006 * from the kernel,
1007 * - the kernel detects the pending signal and activates the signal
1008 * handler instead,
1009 * - if the "work_cancel_flg" is still set then the signal handler
1010 * should use siglongjmp() to cancel the "in progress" request and
1011 * it would try to acquire the same work_qlock1 in _aio_req_get()
1012 * for a second time => deadlock.
1013 * To avoid that situation we disable the cancellation of the request
1014 * in progress BEFORE we try to acquire the work_qlock1.
1015 * In that case the signal handler will not call siglongjmp() and the
1016 * worker thread will continue running the standard code flow.
1017 * Then this thread must check the AIO_REQ_CANCELED flag to emulate
1018 * an eventually required siglongjmp() freeing the work_qlock1 and
1019 * avoiding a deadlock.
1020 */
1021 void *
_aio_do_request(void * arglist)1022 _aio_do_request(void *arglist)
1023 {
1024 aio_worker_t *aiowp = (aio_worker_t *)arglist;
1025 ulwp_t *self = curthread;
1026 struct aio_args *arg;
1027 aio_req_t *reqp; /* current AIO request */
1028 ssize_t retval;
1029 int append;
1030 int error;
1031
1032 if (pthread_setspecific(_aio_key, aiowp) != 0)
1033 aio_panic("_aio_do_request, pthread_setspecific()");
1034 (void) pthread_sigmask(SIG_SETMASK, &_worker_set, NULL);
1035 ASSERT(aiowp->work_req == NULL);
1036
1037 /*
1038 * We resume here when an operation is cancelled.
1039 * On first entry, aiowp->work_req == NULL, so all
1040 * we do is block SIGAIOCANCEL.
1041 */
1042 (void) sigsetjmp(aiowp->work_jmp_buf, 0);
1043 ASSERT(self->ul_sigdefer == 0);
1044
1045 sigoff(self); /* block SIGAIOCANCEL */
1046 if (aiowp->work_req != NULL)
1047 _aio_finish_request(aiowp, -1, ECANCELED);
1048
1049 for (;;) {
1050 /*
1051 * Put completed requests on aio_done_list. This has
1052 * to be done as part of the main loop to ensure that
1053 * we don't artificially starve any aiowait'ers.
1054 */
1055 if (aiowp->work_done1)
1056 _aio_work_done(aiowp);
1057
1058 top:
1059 /* consume any deferred SIGAIOCANCEL signal here */
1060 sigon(self);
1061 sigoff(self);
1062
1063 while ((reqp = _aio_req_get(aiowp)) == NULL) {
1064 if (_aio_idle(aiowp) != 0)
1065 goto top;
1066 }
1067 arg = &reqp->req_args;
1068 ASSERT(reqp->req_state == AIO_REQ_INPROGRESS ||
1069 reqp->req_state == AIO_REQ_CANCELED);
1070 error = 0;
1071
1072 switch (reqp->req_op) {
1073 case AIOREAD:
1074 case AIOAREAD:
1075 sigon(self); /* unblock SIGAIOCANCEL */
1076 retval = pread(arg->fd, arg->buf,
1077 arg->bufsz, arg->offset);
1078 if (retval == -1) {
1079 if (errno == ESPIPE) {
1080 retval = read(arg->fd,
1081 arg->buf, arg->bufsz);
1082 if (retval == -1)
1083 error = errno;
1084 } else {
1085 error = errno;
1086 }
1087 }
1088 sigoff(self); /* block SIGAIOCANCEL */
1089 break;
1090 case AIOWRITE:
1091 case AIOAWRITE:
1092 /*
1093 * The SUSv3 POSIX spec for aio_write() states:
1094 * If O_APPEND is set for the file descriptor,
1095 * write operations append to the file in the
1096 * same order as the calls were made.
1097 * but, somewhat inconsistently, it requires pwrite()
1098 * to ignore the O_APPEND setting. So we have to use
1099 * fcntl() to get the open modes and call write() for
1100 * the O_APPEND case.
1101 */
1102 append = (__fcntl(arg->fd, F_GETFL) & O_APPEND);
1103 sigon(self); /* unblock SIGAIOCANCEL */
1104 retval = append?
1105 write(arg->fd, arg->buf, arg->bufsz) :
1106 pwrite(arg->fd, arg->buf, arg->bufsz,
1107 arg->offset);
1108 if (retval == -1) {
1109 if (errno == ESPIPE) {
1110 retval = write(arg->fd,
1111 arg->buf, arg->bufsz);
1112 if (retval == -1)
1113 error = errno;
1114 } else {
1115 error = errno;
1116 }
1117 }
1118 sigoff(self); /* block SIGAIOCANCEL */
1119 break;
1120 #if !defined(_LP64)
1121 case AIOAREAD64:
1122 sigon(self); /* unblock SIGAIOCANCEL */
1123 retval = pread64(arg->fd, arg->buf,
1124 arg->bufsz, arg->offset);
1125 if (retval == -1) {
1126 if (errno == ESPIPE) {
1127 retval = read(arg->fd,
1128 arg->buf, arg->bufsz);
1129 if (retval == -1)
1130 error = errno;
1131 } else {
1132 error = errno;
1133 }
1134 }
1135 sigoff(self); /* block SIGAIOCANCEL */
1136 break;
1137 case AIOAWRITE64:
1138 /*
1139 * The SUSv3 POSIX spec for aio_write() states:
1140 * If O_APPEND is set for the file descriptor,
1141 * write operations append to the file in the
1142 * same order as the calls were made.
1143 * but, somewhat inconsistently, it requires pwrite()
1144 * to ignore the O_APPEND setting. So we have to use
1145 * fcntl() to get the open modes and call write() for
1146 * the O_APPEND case.
1147 */
1148 append = (__fcntl(arg->fd, F_GETFL) & O_APPEND);
1149 sigon(self); /* unblock SIGAIOCANCEL */
1150 retval = append?
1151 write(arg->fd, arg->buf, arg->bufsz) :
1152 pwrite64(arg->fd, arg->buf, arg->bufsz,
1153 arg->offset);
1154 if (retval == -1) {
1155 if (errno == ESPIPE) {
1156 retval = write(arg->fd,
1157 arg->buf, arg->bufsz);
1158 if (retval == -1)
1159 error = errno;
1160 } else {
1161 error = errno;
1162 }
1163 }
1164 sigoff(self); /* block SIGAIOCANCEL */
1165 break;
1166 #endif /* !defined(_LP64) */
1167 case AIOFSYNC:
1168 if (_aio_fsync_del(aiowp, reqp))
1169 goto top;
1170 ASSERT(reqp->req_head == NULL);
1171 /*
1172 * All writes for this fsync request are now
1173 * acknowledged. Now make these writes visible
1174 * and put the final request into the hash table.
1175 */
1176 if (reqp->req_state == AIO_REQ_CANCELED) {
1177 /* EMPTY */;
1178 } else if (arg->offset == O_SYNC) {
1179 if ((retval = __fdsync(arg->fd, FDSYNC_FILE)) ==
1180 -1) {
1181 error = errno;
1182 }
1183 } else {
1184 if ((retval = __fdsync(arg->fd, FDSYNC_DATA)) ==
1185 -1) {
1186 error = errno;
1187 }
1188 }
1189 if (_aio_hash_insert(reqp->req_resultp, reqp) != 0)
1190 aio_panic("_aio_do_request(): AIOFSYNC: "
1191 "request already in hash table");
1192 break;
1193 default:
1194 aio_panic("_aio_do_request, bad op");
1195 }
1196
1197 _aio_finish_request(aiowp, retval, error);
1198 }
1199 /* NOTREACHED */
1200 return (NULL);
1201 }
1202
1203 /*
1204 * Perform the tail processing for _aio_do_request().
1205 * The in-progress request may or may not have been cancelled.
1206 */
1207 static void
_aio_finish_request(aio_worker_t * aiowp,ssize_t retval,int error)1208 _aio_finish_request(aio_worker_t *aiowp, ssize_t retval, int error)
1209 {
1210 aio_req_t *reqp;
1211
1212 sig_mutex_lock(&aiowp->work_qlock1);
1213 if ((reqp = aiowp->work_req) == NULL)
1214 sig_mutex_unlock(&aiowp->work_qlock1);
1215 else {
1216 aiowp->work_req = NULL;
1217 if (reqp->req_state == AIO_REQ_CANCELED) {
1218 retval = -1;
1219 error = ECANCELED;
1220 }
1221 if (!POSIX_AIO(reqp)) {
1222 int notify;
1223 if (reqp->req_state == AIO_REQ_INPROGRESS) {
1224 reqp->req_state = AIO_REQ_DONE;
1225 _aio_set_result(reqp, retval, error);
1226 }
1227 sig_mutex_unlock(&aiowp->work_qlock1);
1228 sig_mutex_lock(&__aio_mutex);
1229 /*
1230 * If it was canceled, this request will not be
1231 * added to done list. Just free it.
1232 */
1233 if (error == ECANCELED) {
1234 _aio_outstand_cnt--;
1235 _aio_req_free(reqp);
1236 } else {
1237 _aio_req_done_cnt++;
1238 }
1239 /*
1240 * Notify any thread that may have blocked
1241 * because it saw an outstanding request.
1242 */
1243 notify = 0;
1244 if (_aio_outstand_cnt == 0 && _aiowait_flag) {
1245 notify = 1;
1246 }
1247 sig_mutex_unlock(&__aio_mutex);
1248 if (notify) {
1249 (void) _kaio(AIONOTIFY);
1250 }
1251 } else {
1252 if (reqp->req_state == AIO_REQ_INPROGRESS)
1253 reqp->req_state = AIO_REQ_DONE;
1254 sig_mutex_unlock(&aiowp->work_qlock1);
1255 _aiodone(reqp, retval, error);
1256 }
1257 }
1258 }
1259
1260 void
_aio_req_mark_done(aio_req_t * reqp)1261 _aio_req_mark_done(aio_req_t *reqp)
1262 {
1263 #if !defined(_LP64)
1264 if (reqp->req_largefile)
1265 ((aiocb64_t *)reqp->req_aiocbp)->aio_state = USERAIO_DONE;
1266 else
1267 #endif
1268 ((aiocb_t *)reqp->req_aiocbp)->aio_state = USERAIO_DONE;
1269 }
1270
1271 /*
1272 * Sleep for 'ticks' clock ticks to give somebody else a chance to run,
1273 * hopefully to consume one of our queued signals.
1274 */
1275 static void
_aio_delay(int ticks)1276 _aio_delay(int ticks)
1277 {
1278 (void) usleep(ticks * (MICROSEC / hz));
1279 }
1280
1281 /*
1282 * Actually send the notifications.
1283 * We could block indefinitely here if the application
1284 * is not listening for the signal or port notifications.
1285 */
1286 static void
send_notification(notif_param_t * npp)1287 send_notification(notif_param_t *npp)
1288 {
1289 extern int __sigqueue(pid_t pid, int signo,
1290 /* const union sigval */ void *value, int si_code, int block);
1291
1292 if (npp->np_signo)
1293 (void) __sigqueue(__pid, npp->np_signo, npp->np_user,
1294 SI_ASYNCIO, 1);
1295 else if (npp->np_port >= 0)
1296 (void) _port_dispatch(npp->np_port, 0, PORT_SOURCE_AIO,
1297 npp->np_event, npp->np_object, npp->np_user);
1298
1299 if (npp->np_lio_signo)
1300 (void) __sigqueue(__pid, npp->np_lio_signo, npp->np_lio_user,
1301 SI_ASYNCIO, 1);
1302 else if (npp->np_lio_port >= 0)
1303 (void) _port_dispatch(npp->np_lio_port, 0, PORT_SOURCE_AIO,
1304 npp->np_lio_event, npp->np_lio_object, npp->np_lio_user);
1305 }
1306
1307 /*
1308 * Asynchronous notification worker.
1309 */
1310 void *
_aio_do_notify(void * arg)1311 _aio_do_notify(void *arg)
1312 {
1313 aio_worker_t *aiowp = (aio_worker_t *)arg;
1314 aio_req_t *reqp;
1315
1316 /*
1317 * This isn't really necessary. All signals are blocked.
1318 */
1319 if (pthread_setspecific(_aio_key, aiowp) != 0)
1320 aio_panic("_aio_do_notify, pthread_setspecific()");
1321
1322 /*
1323 * Notifications are never cancelled.
1324 * All signals remain blocked, forever.
1325 */
1326 for (;;) {
1327 while ((reqp = _aio_req_get(aiowp)) == NULL) {
1328 if (_aio_idle(aiowp) != 0)
1329 aio_panic("_aio_do_notify: _aio_idle() failed");
1330 }
1331 send_notification(&reqp->req_notify);
1332 _aio_req_free(reqp);
1333 }
1334
1335 /* NOTREACHED */
1336 return (NULL);
1337 }
1338
1339 /*
1340 * Do the completion semantics for a request that was either canceled
1341 * by _aio_cancel_req() or was completed by _aio_do_request().
1342 */
1343 static void
_aiodone(aio_req_t * reqp,ssize_t retval,int error)1344 _aiodone(aio_req_t *reqp, ssize_t retval, int error)
1345 {
1346 aio_result_t *resultp = reqp->req_resultp;
1347 int notify = 0;
1348 aio_lio_t *head;
1349 int sigev_none;
1350 int sigev_signal;
1351 int sigev_thread;
1352 int sigev_port;
1353 notif_param_t np;
1354
1355 /*
1356 * We call _aiodone() only for Posix I/O.
1357 */
1358 ASSERT(POSIX_AIO(reqp));
1359
1360 sigev_none = 0;
1361 sigev_signal = 0;
1362 sigev_thread = 0;
1363 sigev_port = 0;
1364 np.np_signo = 0;
1365 np.np_port = -1;
1366 np.np_lio_signo = 0;
1367 np.np_lio_port = -1;
1368
1369 switch (reqp->req_sigevent.sigev_notify) {
1370 case SIGEV_NONE:
1371 sigev_none = 1;
1372 break;
1373 case SIGEV_SIGNAL:
1374 sigev_signal = 1;
1375 break;
1376 case SIGEV_THREAD:
1377 sigev_thread = 1;
1378 break;
1379 case SIGEV_PORT:
1380 sigev_port = 1;
1381 break;
1382 default:
1383 aio_panic("_aiodone: improper sigev_notify");
1384 break;
1385 }
1386
1387 /*
1388 * Figure out the notification parameters while holding __aio_mutex.
1389 * Actually perform the notifications after dropping __aio_mutex.
1390 * This allows us to sleep for a long time (if the notifications
1391 * incur delays) without impeding other async I/O operations.
1392 */
1393
1394 sig_mutex_lock(&__aio_mutex);
1395
1396 if (sigev_signal) {
1397 if ((np.np_signo = reqp->req_sigevent.sigev_signo) != 0)
1398 notify = 1;
1399 np.np_user = reqp->req_sigevent.sigev_value.sival_ptr;
1400 } else if (sigev_thread | sigev_port) {
1401 if ((np.np_port = reqp->req_sigevent.sigev_signo) >= 0)
1402 notify = 1;
1403 np.np_event = reqp->req_op;
1404 if (np.np_event == AIOFSYNC && reqp->req_largefile)
1405 np.np_event = AIOFSYNC64;
1406 np.np_object = (uintptr_t)reqp->req_aiocbp;
1407 np.np_user = reqp->req_sigevent.sigev_value.sival_ptr;
1408 }
1409
1410 if (resultp->aio_errno == EINPROGRESS)
1411 _aio_set_result(reqp, retval, error);
1412
1413 _aio_outstand_cnt--;
1414
1415 head = reqp->req_head;
1416 reqp->req_head = NULL;
1417
1418 if (sigev_none) {
1419 _aio_enq_doneq(reqp);
1420 reqp = NULL;
1421 } else {
1422 (void) _aio_hash_del(resultp);
1423 _aio_req_mark_done(reqp);
1424 }
1425
1426 _aio_waitn_wakeup();
1427
1428 /*
1429 * __aio_waitn() sets AIO_WAIT_INPROGRESS and
1430 * __aio_suspend() increments "_aio_kernel_suspend"
1431 * when they are waiting in the kernel for completed I/Os.
1432 *
1433 * _kaio(AIONOTIFY) awakes the corresponding function
1434 * in the kernel; then the corresponding __aio_waitn() or
1435 * __aio_suspend() function could reap the recently
1436 * completed I/Os (_aiodone()).
1437 */
1438 if ((_aio_flags & AIO_WAIT_INPROGRESS) || _aio_kernel_suspend > 0)
1439 (void) _kaio(AIONOTIFY);
1440
1441 sig_mutex_unlock(&__aio_mutex);
1442
1443 if (head != NULL) {
1444 /*
1445 * If all the lio requests have completed,
1446 * prepare to notify the waiting thread.
1447 */
1448 sig_mutex_lock(&head->lio_mutex);
1449 ASSERT(head->lio_refcnt == head->lio_nent);
1450 if (head->lio_refcnt == 1) {
1451 int waiting = 0;
1452 if (head->lio_mode == LIO_WAIT) {
1453 if ((waiting = head->lio_waiting) != 0)
1454 (void) cond_signal(&head->lio_cond_cv);
1455 } else if (head->lio_port < 0) { /* none or signal */
1456 if ((np.np_lio_signo = head->lio_signo) != 0)
1457 notify = 1;
1458 np.np_lio_user = head->lio_sigval.sival_ptr;
1459 } else { /* thread or port */
1460 notify = 1;
1461 np.np_lio_port = head->lio_port;
1462 np.np_lio_event = head->lio_event;
1463 np.np_lio_object =
1464 (uintptr_t)head->lio_sigevent;
1465 np.np_lio_user = head->lio_sigval.sival_ptr;
1466 }
1467 head->lio_nent = head->lio_refcnt = 0;
1468 sig_mutex_unlock(&head->lio_mutex);
1469 if (waiting == 0)
1470 _aio_lio_free(head);
1471 } else {
1472 head->lio_nent--;
1473 head->lio_refcnt--;
1474 sig_mutex_unlock(&head->lio_mutex);
1475 }
1476 }
1477
1478 /*
1479 * The request is completed; now perform the notifications.
1480 */
1481 if (notify) {
1482 if (reqp != NULL) {
1483 /*
1484 * We usually put the request on the notification
1485 * queue because we don't want to block and delay
1486 * other operations behind us in the work queue.
1487 * Also we must never block on a cancel notification
1488 * because we are being called from an application
1489 * thread in this case and that could lead to deadlock
1490 * if no other thread is receiving notificatins.
1491 */
1492 reqp->req_notify = np;
1493 reqp->req_op = AIONOTIFY;
1494 _aio_req_add(reqp, &__workers_no, AIONOTIFY);
1495 reqp = NULL;
1496 } else {
1497 /*
1498 * We already put the request on the done queue,
1499 * so we can't queue it to the notification queue.
1500 * Just do the notification directly.
1501 */
1502 send_notification(&np);
1503 }
1504 }
1505
1506 if (reqp != NULL)
1507 _aio_req_free(reqp);
1508 }
1509
1510 /*
1511 * Delete fsync requests from list head until there is
1512 * only one left. Return 0 when there is only one,
1513 * otherwise return a non-zero value.
1514 */
1515 static int
_aio_fsync_del(aio_worker_t * aiowp,aio_req_t * reqp)1516 _aio_fsync_del(aio_worker_t *aiowp, aio_req_t *reqp)
1517 {
1518 aio_lio_t *head = reqp->req_head;
1519 int rval = 0;
1520
1521 ASSERT(reqp == aiowp->work_req);
1522 sig_mutex_lock(&aiowp->work_qlock1);
1523 sig_mutex_lock(&head->lio_mutex);
1524 if (head->lio_refcnt > 1) {
1525 head->lio_refcnt--;
1526 head->lio_nent--;
1527 aiowp->work_req = NULL;
1528 sig_mutex_unlock(&head->lio_mutex);
1529 sig_mutex_unlock(&aiowp->work_qlock1);
1530 sig_mutex_lock(&__aio_mutex);
1531 _aio_outstand_cnt--;
1532 _aio_waitn_wakeup();
1533 sig_mutex_unlock(&__aio_mutex);
1534 _aio_req_free(reqp);
1535 return (1);
1536 }
1537 ASSERT(head->lio_nent == 1 && head->lio_refcnt == 1);
1538 reqp->req_head = NULL;
1539 if (head->lio_canned)
1540 reqp->req_state = AIO_REQ_CANCELED;
1541 if (head->lio_mode == LIO_DESTROY) {
1542 aiowp->work_req = NULL;
1543 rval = 1;
1544 }
1545 sig_mutex_unlock(&head->lio_mutex);
1546 sig_mutex_unlock(&aiowp->work_qlock1);
1547 head->lio_refcnt--;
1548 head->lio_nent--;
1549 _aio_lio_free(head);
1550 if (rval != 0)
1551 _aio_req_free(reqp);
1552 return (rval);
1553 }
1554
1555 /*
1556 * A worker is set idle when its work queue is empty.
1557 * The worker checks again that it has no more work
1558 * and then goes to sleep waiting for more work.
1559 */
1560 int
_aio_idle(aio_worker_t * aiowp)1561 _aio_idle(aio_worker_t *aiowp)
1562 {
1563 int error = 0;
1564
1565 sig_mutex_lock(&aiowp->work_qlock1);
1566 if (aiowp->work_count1 == 0) {
1567 ASSERT(aiowp->work_minload1 == 0);
1568 aiowp->work_idleflg = 1;
1569 /*
1570 * A cancellation handler is not needed here.
1571 * aio worker threads are never cancelled via pthread_cancel().
1572 */
1573 error = sig_cond_wait(&aiowp->work_idle_cv,
1574 &aiowp->work_qlock1);
1575 /*
1576 * The idle flag is normally cleared before worker is awakened
1577 * by aio_req_add(). On error (EINTR), we clear it ourself.
1578 */
1579 if (error)
1580 aiowp->work_idleflg = 0;
1581 }
1582 sig_mutex_unlock(&aiowp->work_qlock1);
1583 return (error);
1584 }
1585
1586 /*
1587 * A worker's completed AIO requests are placed onto a global
1588 * done queue. The application is only sent a SIGIO signal if
1589 * the process has a handler enabled and it is not waiting via
1590 * aiowait().
1591 */
1592 static void
_aio_work_done(aio_worker_t * aiowp)1593 _aio_work_done(aio_worker_t *aiowp)
1594 {
1595 aio_req_t *reqp;
1596
1597 sig_mutex_lock(&__aio_mutex);
1598 sig_mutex_lock(&aiowp->work_qlock1);
1599 reqp = aiowp->work_prev1;
1600 reqp->req_next = NULL;
1601 aiowp->work_done1 = 0;
1602 aiowp->work_tail1 = aiowp->work_next1;
1603 if (aiowp->work_tail1 == NULL)
1604 aiowp->work_head1 = NULL;
1605 aiowp->work_prev1 = NULL;
1606 _aio_outstand_cnt--;
1607 _aio_req_done_cnt--;
1608 if (reqp->req_state == AIO_REQ_CANCELED) {
1609 /*
1610 * Request got cancelled after it was marked done. This can
1611 * happen because _aio_finish_request() marks it AIO_REQ_DONE
1612 * and drops all locks. Don't add the request to the done
1613 * queue and just discard it.
1614 */
1615 sig_mutex_unlock(&aiowp->work_qlock1);
1616 _aio_req_free(reqp);
1617 if (_aio_outstand_cnt == 0 && _aiowait_flag) {
1618 sig_mutex_unlock(&__aio_mutex);
1619 (void) _kaio(AIONOTIFY);
1620 } else {
1621 sig_mutex_unlock(&__aio_mutex);
1622 }
1623 return;
1624 }
1625 sig_mutex_unlock(&aiowp->work_qlock1);
1626 _aio_donecnt++;
1627 ASSERT(_aio_donecnt > 0 &&
1628 _aio_outstand_cnt >= 0 &&
1629 _aio_req_done_cnt >= 0);
1630 ASSERT(reqp != NULL);
1631
1632 if (_aio_done_tail == NULL) {
1633 _aio_done_head = _aio_done_tail = reqp;
1634 } else {
1635 _aio_done_head->req_next = reqp;
1636 _aio_done_head = reqp;
1637 }
1638
1639 if (_aiowait_flag) {
1640 sig_mutex_unlock(&__aio_mutex);
1641 (void) _kaio(AIONOTIFY);
1642 } else {
1643 sig_mutex_unlock(&__aio_mutex);
1644 if (_sigio_enabled)
1645 (void) kill(__pid, SIGIO);
1646 }
1647 }
1648
1649 /*
1650 * The done queue consists of AIO requests that are in either the
1651 * AIO_REQ_DONE or AIO_REQ_CANCELED state. Requests that were cancelled
1652 * are discarded. If the done queue is empty then NULL is returned.
1653 * Otherwise the address of a done aio_result_t is returned.
1654 */
1655 aio_result_t *
_aio_req_done(void)1656 _aio_req_done(void)
1657 {
1658 aio_req_t *reqp;
1659 aio_result_t *resultp;
1660
1661 ASSERT(MUTEX_HELD(&__aio_mutex));
1662
1663 if ((reqp = _aio_done_tail) != NULL) {
1664 if ((_aio_done_tail = reqp->req_next) == NULL)
1665 _aio_done_head = NULL;
1666 ASSERT(_aio_donecnt > 0);
1667 _aio_donecnt--;
1668 (void) _aio_hash_del(reqp->req_resultp);
1669 resultp = reqp->req_resultp;
1670 ASSERT(reqp->req_state == AIO_REQ_DONE);
1671 _aio_req_free(reqp);
1672 return (resultp);
1673 }
1674 /* is queue empty? */
1675 if (reqp == NULL && _aio_outstand_cnt == 0) {
1676 return ((aio_result_t *)-1);
1677 }
1678 return (NULL);
1679 }
1680
1681 /*
1682 * Set the return and errno values for the application's use.
1683 *
1684 * For the Posix interfaces, we must set the return value first followed
1685 * by the errno value because the Posix interfaces allow for a change
1686 * in the errno value from EINPROGRESS to something else to signal
1687 * the completion of the asynchronous request.
1688 *
1689 * The opposite is true for the Solaris interfaces. These allow for
1690 * a change in the return value from AIO_INPROGRESS to something else
1691 * to signal the completion of the asynchronous request.
1692 */
1693 void
_aio_set_result(aio_req_t * reqp,ssize_t retval,int error)1694 _aio_set_result(aio_req_t *reqp, ssize_t retval, int error)
1695 {
1696 aio_result_t *resultp = reqp->req_resultp;
1697
1698 if (POSIX_AIO(reqp)) {
1699 resultp->aio_return = retval;
1700 membar_producer();
1701 resultp->aio_errno = error;
1702 } else {
1703 resultp->aio_errno = error;
1704 membar_producer();
1705 resultp->aio_return = retval;
1706 }
1707 }
1708
1709 /*
1710 * Add an AIO request onto the next work queue.
1711 * A circular list of workers is used to choose the next worker.
1712 */
1713 void
_aio_req_add(aio_req_t * reqp,aio_worker_t ** nextworker,int mode)1714 _aio_req_add(aio_req_t *reqp, aio_worker_t **nextworker, int mode)
1715 {
1716 ulwp_t *self = curthread;
1717 aio_worker_t *aiowp;
1718 aio_worker_t *first;
1719 int load_bal_flg = 1;
1720 int found;
1721
1722 ASSERT(reqp->req_state != AIO_REQ_DONEQ);
1723 reqp->req_next = NULL;
1724 /*
1725 * Try to acquire the next worker's work queue. If it is locked,
1726 * then search the list of workers until a queue is found unlocked,
1727 * or until the list is completely traversed at which point another
1728 * worker will be created.
1729 */
1730 sigoff(self); /* defer SIGIO */
1731 sig_mutex_lock(&__aio_mutex);
1732 first = aiowp = *nextworker;
1733 if (mode != AIONOTIFY)
1734 _aio_outstand_cnt++;
1735 sig_mutex_unlock(&__aio_mutex);
1736
1737 switch (mode) {
1738 case AIOREAD:
1739 case AIOWRITE:
1740 case AIOAREAD:
1741 case AIOAWRITE:
1742 #if !defined(_LP64)
1743 case AIOAREAD64:
1744 case AIOAWRITE64:
1745 #endif
1746 /* try to find an idle worker */
1747 found = 0;
1748 do {
1749 if (sig_mutex_trylock(&aiowp->work_qlock1) == 0) {
1750 if (aiowp->work_idleflg) {
1751 found = 1;
1752 break;
1753 }
1754 sig_mutex_unlock(&aiowp->work_qlock1);
1755 }
1756 } while ((aiowp = aiowp->work_forw) != first);
1757
1758 if (found) {
1759 aiowp->work_minload1++;
1760 break;
1761 }
1762
1763 /* try to acquire some worker's queue lock */
1764 do {
1765 if (sig_mutex_trylock(&aiowp->work_qlock1) == 0) {
1766 found = 1;
1767 break;
1768 }
1769 } while ((aiowp = aiowp->work_forw) != first);
1770
1771 /*
1772 * Create more workers when the workers appear overloaded.
1773 * Either all the workers are busy draining their queues
1774 * or no worker's queue lock could be acquired.
1775 */
1776 if (!found) {
1777 if (_aio_worker_cnt < _max_workers) {
1778 if (_aio_create_worker(reqp, mode))
1779 aio_panic("_aio_req_add: add worker");
1780 sigon(self); /* reenable SIGIO */
1781 return;
1782 }
1783
1784 /*
1785 * No worker available and we have created
1786 * _max_workers, keep going through the
1787 * list slowly until we get a lock
1788 */
1789 while (sig_mutex_trylock(&aiowp->work_qlock1) != 0) {
1790 /*
1791 * give someone else a chance
1792 */
1793 _aio_delay(1);
1794 aiowp = aiowp->work_forw;
1795 }
1796 }
1797
1798 ASSERT(MUTEX_HELD(&aiowp->work_qlock1));
1799 if (_aio_worker_cnt < _max_workers &&
1800 aiowp->work_minload1 >= _minworkload) {
1801 sig_mutex_unlock(&aiowp->work_qlock1);
1802 sig_mutex_lock(&__aio_mutex);
1803 *nextworker = aiowp->work_forw;
1804 sig_mutex_unlock(&__aio_mutex);
1805 if (_aio_create_worker(reqp, mode))
1806 aio_panic("aio_req_add: add worker");
1807 sigon(self); /* reenable SIGIO */
1808 return;
1809 }
1810 aiowp->work_minload1++;
1811 break;
1812 case AIOFSYNC:
1813 case AIONOTIFY:
1814 load_bal_flg = 0;
1815 sig_mutex_lock(&aiowp->work_qlock1);
1816 break;
1817 default:
1818 aio_panic("_aio_req_add: invalid mode");
1819 break;
1820 }
1821 /*
1822 * Put request onto worker's work queue.
1823 */
1824 if (aiowp->work_tail1 == NULL) {
1825 ASSERT(aiowp->work_count1 == 0);
1826 aiowp->work_tail1 = reqp;
1827 aiowp->work_next1 = reqp;
1828 } else {
1829 aiowp->work_head1->req_next = reqp;
1830 if (aiowp->work_next1 == NULL)
1831 aiowp->work_next1 = reqp;
1832 }
1833 reqp->req_state = AIO_REQ_QUEUED;
1834 reqp->req_worker = aiowp;
1835 aiowp->work_head1 = reqp;
1836 /*
1837 * Awaken worker if it is not currently active.
1838 */
1839 if (aiowp->work_count1++ == 0 && aiowp->work_idleflg) {
1840 aiowp->work_idleflg = 0;
1841 (void) cond_signal(&aiowp->work_idle_cv);
1842 }
1843 sig_mutex_unlock(&aiowp->work_qlock1);
1844
1845 if (load_bal_flg) {
1846 sig_mutex_lock(&__aio_mutex);
1847 *nextworker = aiowp->work_forw;
1848 sig_mutex_unlock(&__aio_mutex);
1849 }
1850 sigon(self); /* reenable SIGIO */
1851 }
1852
1853 /*
1854 * Get an AIO request for a specified worker.
1855 * If the work queue is empty, return NULL.
1856 */
1857 aio_req_t *
_aio_req_get(aio_worker_t * aiowp)1858 _aio_req_get(aio_worker_t *aiowp)
1859 {
1860 aio_req_t *reqp;
1861
1862 sig_mutex_lock(&aiowp->work_qlock1);
1863 if ((reqp = aiowp->work_next1) != NULL) {
1864 /*
1865 * Remove a POSIX request from the queue; the
1866 * request queue is a singularly linked list
1867 * with a previous pointer. The request is
1868 * removed by updating the previous pointer.
1869 *
1870 * Non-posix requests are left on the queue
1871 * to eventually be placed on the done queue.
1872 */
1873
1874 if (POSIX_AIO(reqp)) {
1875 if (aiowp->work_prev1 == NULL) {
1876 aiowp->work_tail1 = reqp->req_next;
1877 if (aiowp->work_tail1 == NULL)
1878 aiowp->work_head1 = NULL;
1879 } else {
1880 aiowp->work_prev1->req_next = reqp->req_next;
1881 if (aiowp->work_head1 == reqp)
1882 aiowp->work_head1 = reqp->req_next;
1883 }
1884
1885 } else {
1886 aiowp->work_prev1 = reqp;
1887 ASSERT(aiowp->work_done1 >= 0);
1888 aiowp->work_done1++;
1889 }
1890 ASSERT(reqp != reqp->req_next);
1891 aiowp->work_next1 = reqp->req_next;
1892 ASSERT(aiowp->work_count1 >= 1);
1893 aiowp->work_count1--;
1894 switch (reqp->req_op) {
1895 case AIOREAD:
1896 case AIOWRITE:
1897 case AIOAREAD:
1898 case AIOAWRITE:
1899 #if !defined(_LP64)
1900 case AIOAREAD64:
1901 case AIOAWRITE64:
1902 #endif
1903 ASSERT(aiowp->work_minload1 > 0);
1904 aiowp->work_minload1--;
1905 break;
1906 }
1907 reqp->req_state = AIO_REQ_INPROGRESS;
1908 }
1909 aiowp->work_req = reqp;
1910 ASSERT(reqp != NULL || aiowp->work_count1 == 0);
1911 sig_mutex_unlock(&aiowp->work_qlock1);
1912 return (reqp);
1913 }
1914
1915 static void
_aio_req_del(aio_worker_t * aiowp,aio_req_t * reqp,int ostate)1916 _aio_req_del(aio_worker_t *aiowp, aio_req_t *reqp, int ostate)
1917 {
1918 aio_req_t **last;
1919 aio_req_t *lastrp;
1920 aio_req_t *next;
1921
1922 ASSERT(aiowp != NULL);
1923 ASSERT(MUTEX_HELD(&aiowp->work_qlock1));
1924 if (POSIX_AIO(reqp)) {
1925 if (ostate != AIO_REQ_QUEUED)
1926 return;
1927 }
1928 last = &aiowp->work_tail1;
1929 lastrp = aiowp->work_tail1;
1930 ASSERT(ostate == AIO_REQ_QUEUED || ostate == AIO_REQ_INPROGRESS);
1931 while ((next = *last) != NULL) {
1932 if (next == reqp) {
1933 *last = next->req_next;
1934 if (aiowp->work_next1 == next)
1935 aiowp->work_next1 = next->req_next;
1936
1937 /*
1938 * if this is the first request on the queue, move
1939 * the lastrp pointer forward.
1940 */
1941 if (lastrp == next)
1942 lastrp = next->req_next;
1943
1944 /*
1945 * if this request is pointed by work_head1, then
1946 * make work_head1 point to the last request that is
1947 * present on the queue.
1948 */
1949 if (aiowp->work_head1 == next)
1950 aiowp->work_head1 = lastrp;
1951
1952 /*
1953 * work_prev1 is used only in non posix case and it
1954 * points to the current AIO_REQ_INPROGRESS request.
1955 * If work_prev1 points to this request which is being
1956 * deleted, make work_prev1 NULL and set work_done1
1957 * to 0.
1958 *
1959 * A worker thread can be processing only one request
1960 * at a time.
1961 */
1962 if (aiowp->work_prev1 == next) {
1963 ASSERT(ostate == AIO_REQ_INPROGRESS &&
1964 !POSIX_AIO(reqp) && aiowp->work_done1 > 0);
1965 aiowp->work_prev1 = NULL;
1966 aiowp->work_done1--;
1967 }
1968
1969 if (ostate == AIO_REQ_QUEUED) {
1970 ASSERT(aiowp->work_count1 >= 1);
1971 aiowp->work_count1--;
1972 ASSERT(aiowp->work_minload1 >= 1);
1973 aiowp->work_minload1--;
1974 }
1975 return;
1976 }
1977 last = &next->req_next;
1978 lastrp = next;
1979 }
1980 /* NOTREACHED */
1981 }
1982
1983 static void
_aio_enq_doneq(aio_req_t * reqp)1984 _aio_enq_doneq(aio_req_t *reqp)
1985 {
1986 if (_aio_doneq == NULL) {
1987 _aio_doneq = reqp;
1988 reqp->req_next = reqp->req_prev = reqp;
1989 } else {
1990 reqp->req_next = _aio_doneq;
1991 reqp->req_prev = _aio_doneq->req_prev;
1992 _aio_doneq->req_prev->req_next = reqp;
1993 _aio_doneq->req_prev = reqp;
1994 }
1995 reqp->req_state = AIO_REQ_DONEQ;
1996 _aio_doneq_cnt++;
1997 }
1998
1999 /*
2000 * caller owns the _aio_mutex
2001 */
2002 aio_req_t *
_aio_req_remove(aio_req_t * reqp)2003 _aio_req_remove(aio_req_t *reqp)
2004 {
2005 if (reqp && reqp->req_state != AIO_REQ_DONEQ)
2006 return (NULL);
2007
2008 if (reqp) {
2009 /* request in done queue */
2010 if (_aio_doneq == reqp)
2011 _aio_doneq = reqp->req_next;
2012 if (_aio_doneq == reqp) {
2013 /* only one request on queue */
2014 _aio_doneq = NULL;
2015 } else {
2016 aio_req_t *tmp = reqp->req_next;
2017 reqp->req_prev->req_next = tmp;
2018 tmp->req_prev = reqp->req_prev;
2019 }
2020 } else if ((reqp = _aio_doneq) != NULL) {
2021 if (reqp == reqp->req_next) {
2022 /* only one request on queue */
2023 _aio_doneq = NULL;
2024 } else {
2025 reqp->req_prev->req_next = _aio_doneq = reqp->req_next;
2026 _aio_doneq->req_prev = reqp->req_prev;
2027 }
2028 }
2029 if (reqp) {
2030 _aio_doneq_cnt--;
2031 reqp->req_next = reqp->req_prev = reqp;
2032 reqp->req_state = AIO_REQ_DONE;
2033 }
2034 return (reqp);
2035 }
2036
2037 /*
2038 * An AIO request is identified by an aio_result_t pointer. The library
2039 * maps this aio_result_t pointer to its internal representation using a
2040 * hash table. This function adds an aio_result_t pointer to the hash table.
2041 */
2042 static int
_aio_hash_insert(aio_result_t * resultp,aio_req_t * reqp)2043 _aio_hash_insert(aio_result_t *resultp, aio_req_t *reqp)
2044 {
2045 aio_hash_t *hashp;
2046 aio_req_t **prev;
2047 aio_req_t *next;
2048
2049 hashp = _aio_hash + AIOHASH(resultp);
2050 lmutex_lock(&hashp->hash_lock);
2051 prev = &hashp->hash_ptr;
2052 while ((next = *prev) != NULL) {
2053 if (resultp == next->req_resultp) {
2054 lmutex_unlock(&hashp->hash_lock);
2055 return (-1);
2056 }
2057 prev = &next->req_link;
2058 }
2059 *prev = reqp;
2060 ASSERT(reqp->req_link == NULL);
2061 lmutex_unlock(&hashp->hash_lock);
2062 return (0);
2063 }
2064
2065 /*
2066 * Remove an entry from the hash table.
2067 */
2068 aio_req_t *
_aio_hash_del(aio_result_t * resultp)2069 _aio_hash_del(aio_result_t *resultp)
2070 {
2071 aio_hash_t *hashp;
2072 aio_req_t **prev;
2073 aio_req_t *next = NULL;
2074
2075 if (_aio_hash != NULL) {
2076 hashp = _aio_hash + AIOHASH(resultp);
2077 lmutex_lock(&hashp->hash_lock);
2078 prev = &hashp->hash_ptr;
2079 while ((next = *prev) != NULL) {
2080 if (resultp == next->req_resultp) {
2081 *prev = next->req_link;
2082 next->req_link = NULL;
2083 break;
2084 }
2085 prev = &next->req_link;
2086 }
2087 lmutex_unlock(&hashp->hash_lock);
2088 }
2089 return (next);
2090 }
2091
2092 /*
2093 * find an entry in the hash table
2094 */
2095 aio_req_t *
_aio_hash_find(aio_result_t * resultp)2096 _aio_hash_find(aio_result_t *resultp)
2097 {
2098 aio_hash_t *hashp;
2099 aio_req_t **prev;
2100 aio_req_t *next = NULL;
2101
2102 if (_aio_hash != NULL) {
2103 hashp = _aio_hash + AIOHASH(resultp);
2104 lmutex_lock(&hashp->hash_lock);
2105 prev = &hashp->hash_ptr;
2106 while ((next = *prev) != NULL) {
2107 if (resultp == next->req_resultp)
2108 break;
2109 prev = &next->req_link;
2110 }
2111 lmutex_unlock(&hashp->hash_lock);
2112 }
2113 return (next);
2114 }
2115
2116 /*
2117 * AIO interface for POSIX
2118 */
2119 int
_aio_rw(aiocb_t * aiocbp,aio_lio_t * lio_head,aio_worker_t ** nextworker,int mode,int flg)2120 _aio_rw(aiocb_t *aiocbp, aio_lio_t *lio_head, aio_worker_t **nextworker,
2121 int mode, int flg)
2122 {
2123 aio_req_t *reqp;
2124 aio_args_t *ap;
2125 int kerr;
2126
2127 if (aiocbp == NULL) {
2128 errno = EINVAL;
2129 return (-1);
2130 }
2131
2132 /* initialize kaio */
2133 if (!_kaio_ok)
2134 _kaio_init();
2135
2136 aiocbp->aio_state = NOCHECK;
2137
2138 /*
2139 * If we have been called because a list I/O
2140 * kaio() failed, we dont want to repeat the
2141 * system call
2142 */
2143
2144 if (flg & AIO_KAIO) {
2145 /*
2146 * Try kernel aio first.
2147 * If errno is ENOTSUP/EBADFD,
2148 * fall back to the thread implementation.
2149 */
2150 if (_kaio_ok > 0 && KAIO_SUPPORTED(aiocbp->aio_fildes)) {
2151 aiocbp->aio_resultp.aio_errno = EINPROGRESS;
2152 aiocbp->aio_state = CHECK;
2153 kerr = (int)_kaio(mode, aiocbp);
2154 if (kerr == 0)
2155 return (0);
2156 if (errno != ENOTSUP && errno != EBADFD) {
2157 aiocbp->aio_resultp.aio_errno = errno;
2158 aiocbp->aio_resultp.aio_return = -1;
2159 aiocbp->aio_state = NOCHECK;
2160 return (-1);
2161 }
2162 if (errno == EBADFD)
2163 SET_KAIO_NOT_SUPPORTED(aiocbp->aio_fildes);
2164 }
2165 }
2166
2167 aiocbp->aio_resultp.aio_errno = EINPROGRESS;
2168 aiocbp->aio_state = USERAIO;
2169
2170 if (!__uaio_ok && __uaio_init() == -1)
2171 return (-1);
2172
2173 if ((reqp = _aio_req_alloc()) == NULL) {
2174 errno = EAGAIN;
2175 return (-1);
2176 }
2177
2178 /*
2179 * If an LIO request, add the list head to the aio request
2180 */
2181 reqp->req_head = lio_head;
2182 reqp->req_type = AIO_POSIX_REQ;
2183 reqp->req_op = mode;
2184 reqp->req_largefile = 0;
2185
2186 if (aiocbp->aio_sigevent.sigev_notify == SIGEV_NONE) {
2187 reqp->req_sigevent.sigev_notify = SIGEV_NONE;
2188 } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_SIGNAL) {
2189 reqp->req_sigevent.sigev_notify = SIGEV_SIGNAL;
2190 reqp->req_sigevent.sigev_signo =
2191 aiocbp->aio_sigevent.sigev_signo;
2192 reqp->req_sigevent.sigev_value.sival_ptr =
2193 aiocbp->aio_sigevent.sigev_value.sival_ptr;
2194 } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_PORT) {
2195 port_notify_t *pn = aiocbp->aio_sigevent.sigev_value.sival_ptr;
2196 reqp->req_sigevent.sigev_notify = SIGEV_PORT;
2197 /*
2198 * Reuse the sigevent structure to contain the port number
2199 * and the user value. Same for SIGEV_THREAD, below.
2200 */
2201 reqp->req_sigevent.sigev_signo =
2202 pn->portnfy_port;
2203 reqp->req_sigevent.sigev_value.sival_ptr =
2204 pn->portnfy_user;
2205 } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_THREAD) {
2206 reqp->req_sigevent.sigev_notify = SIGEV_THREAD;
2207 /*
2208 * The sigevent structure contains the port number
2209 * and the user value. Same for SIGEV_PORT, above.
2210 */
2211 reqp->req_sigevent.sigev_signo =
2212 aiocbp->aio_sigevent.sigev_signo;
2213 reqp->req_sigevent.sigev_value.sival_ptr =
2214 aiocbp->aio_sigevent.sigev_value.sival_ptr;
2215 }
2216
2217 reqp->req_resultp = &aiocbp->aio_resultp;
2218 reqp->req_aiocbp = aiocbp;
2219 ap = &reqp->req_args;
2220 ap->fd = aiocbp->aio_fildes;
2221 ap->buf = (caddr_t)aiocbp->aio_buf;
2222 ap->bufsz = aiocbp->aio_nbytes;
2223 ap->offset = aiocbp->aio_offset;
2224
2225 if ((flg & AIO_NO_DUPS) &&
2226 _aio_hash_insert(&aiocbp->aio_resultp, reqp) != 0) {
2227 aio_panic("_aio_rw(): request already in hash table");
2228 _aio_req_free(reqp);
2229 errno = EINVAL;
2230 return (-1);
2231 }
2232 _aio_req_add(reqp, nextworker, mode);
2233 return (0);
2234 }
2235
2236 #if !defined(_LP64)
2237 /*
2238 * 64-bit AIO interface for POSIX
2239 */
2240 int
_aio_rw64(aiocb64_t * aiocbp,aio_lio_t * lio_head,aio_worker_t ** nextworker,int mode,int flg)2241 _aio_rw64(aiocb64_t *aiocbp, aio_lio_t *lio_head, aio_worker_t **nextworker,
2242 int mode, int flg)
2243 {
2244 aio_req_t *reqp;
2245 aio_args_t *ap;
2246 int kerr;
2247
2248 if (aiocbp == NULL) {
2249 errno = EINVAL;
2250 return (-1);
2251 }
2252
2253 /* initialize kaio */
2254 if (!_kaio_ok)
2255 _kaio_init();
2256
2257 aiocbp->aio_state = NOCHECK;
2258
2259 /*
2260 * If we have been called because a list I/O
2261 * kaio() failed, we dont want to repeat the
2262 * system call
2263 */
2264
2265 if (flg & AIO_KAIO) {
2266 /*
2267 * Try kernel aio first.
2268 * If errno is ENOTSUP/EBADFD,
2269 * fall back to the thread implementation.
2270 */
2271 if (_kaio_ok > 0 && KAIO_SUPPORTED(aiocbp->aio_fildes)) {
2272 aiocbp->aio_resultp.aio_errno = EINPROGRESS;
2273 aiocbp->aio_state = CHECK;
2274 kerr = (int)_kaio(mode, aiocbp);
2275 if (kerr == 0)
2276 return (0);
2277 if (errno != ENOTSUP && errno != EBADFD) {
2278 aiocbp->aio_resultp.aio_errno = errno;
2279 aiocbp->aio_resultp.aio_return = -1;
2280 aiocbp->aio_state = NOCHECK;
2281 return (-1);
2282 }
2283 if (errno == EBADFD)
2284 SET_KAIO_NOT_SUPPORTED(aiocbp->aio_fildes);
2285 }
2286 }
2287
2288 aiocbp->aio_resultp.aio_errno = EINPROGRESS;
2289 aiocbp->aio_state = USERAIO;
2290
2291 if (!__uaio_ok && __uaio_init() == -1)
2292 return (-1);
2293
2294 if ((reqp = _aio_req_alloc()) == NULL) {
2295 errno = EAGAIN;
2296 return (-1);
2297 }
2298
2299 /*
2300 * If an LIO request, add the list head to the aio request
2301 */
2302 reqp->req_head = lio_head;
2303 reqp->req_type = AIO_POSIX_REQ;
2304 reqp->req_op = mode;
2305 reqp->req_largefile = 1;
2306
2307 if (aiocbp->aio_sigevent.sigev_notify == SIGEV_NONE) {
2308 reqp->req_sigevent.sigev_notify = SIGEV_NONE;
2309 } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_SIGNAL) {
2310 reqp->req_sigevent.sigev_notify = SIGEV_SIGNAL;
2311 reqp->req_sigevent.sigev_signo =
2312 aiocbp->aio_sigevent.sigev_signo;
2313 reqp->req_sigevent.sigev_value.sival_ptr =
2314 aiocbp->aio_sigevent.sigev_value.sival_ptr;
2315 } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_PORT) {
2316 port_notify_t *pn = aiocbp->aio_sigevent.sigev_value.sival_ptr;
2317 reqp->req_sigevent.sigev_notify = SIGEV_PORT;
2318 reqp->req_sigevent.sigev_signo =
2319 pn->portnfy_port;
2320 reqp->req_sigevent.sigev_value.sival_ptr =
2321 pn->portnfy_user;
2322 } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_THREAD) {
2323 reqp->req_sigevent.sigev_notify = SIGEV_THREAD;
2324 reqp->req_sigevent.sigev_signo =
2325 aiocbp->aio_sigevent.sigev_signo;
2326 reqp->req_sigevent.sigev_value.sival_ptr =
2327 aiocbp->aio_sigevent.sigev_value.sival_ptr;
2328 }
2329
2330 reqp->req_resultp = &aiocbp->aio_resultp;
2331 reqp->req_aiocbp = aiocbp;
2332 ap = &reqp->req_args;
2333 ap->fd = aiocbp->aio_fildes;
2334 ap->buf = (caddr_t)aiocbp->aio_buf;
2335 ap->bufsz = aiocbp->aio_nbytes;
2336 ap->offset = aiocbp->aio_offset;
2337
2338 if ((flg & AIO_NO_DUPS) &&
2339 _aio_hash_insert(&aiocbp->aio_resultp, reqp) != 0) {
2340 aio_panic("_aio_rw64(): request already in hash table");
2341 _aio_req_free(reqp);
2342 errno = EINVAL;
2343 return (-1);
2344 }
2345 _aio_req_add(reqp, nextworker, mode);
2346 return (0);
2347 }
2348 #endif /* !defined(_LP64) */
2349