diff options
author | raf <none@none> | 2006-06-20 19:21:46 -0700 |
---|---|---|
committer | raf <none@none> | 2006-06-20 19:21:46 -0700 |
commit | f841f6ad96ea6675d6c6b35c749eaac601799fdf (patch) | |
tree | 698db5d44fc99bd070613c93a497f5e93e803bc5 /usr/src/lib/libc/port/aio/aio.c | |
parent | 545e5dad7996785cfebf226c5ef410c1b154740f (diff) | |
download | illumos-gate-f841f6ad96ea6675d6c6b35c749eaac601799fdf.tar.gz |
6416832 libaio and librt can and should be folded into libc
--HG--
rename : usr/src/cmd/perl/5.8.4/distrib/ext/Time/HiRes/hints/solaris.pl => deleted_files/usr/src/cmd/perl/5.8.4/distrib/ext/Time/HiRes/hints/solaris.pl
rename : usr/src/lib/libaio/common/Makefile => deleted_files/usr/src/lib/libaio/common/Makefile
rename : usr/src/lib/libaio/common/scalls.c => deleted_files/usr/src/lib/libaio/common/scalls.c
rename : usr/src/lib/libaio/common/sig.c => deleted_files/usr/src/lib/libaio/common/sig.c
rename : usr/src/lib/libaio/common/subr.c => deleted_files/usr/src/lib/libaio/common/subr.c
rename : usr/src/lib/libaio/spec/Makefile => deleted_files/usr/src/lib/libaio/spec/Makefile
rename : usr/src/lib/libaio/spec/Makefile.targ => deleted_files/usr/src/lib/libaio/spec/Makefile.targ
rename : usr/src/lib/libaio/spec/amd64/Makefile => deleted_files/usr/src/lib/libaio/spec/amd64/Makefile
rename : usr/src/lib/libaio/spec/i386/Makefile => deleted_files/usr/src/lib/libaio/spec/i386/Makefile
rename : usr/src/lib/libaio/spec/sparc/Makefile => deleted_files/usr/src/lib/libaio/spec/sparc/Makefile
rename : usr/src/lib/libaio/spec/sparcv9/Makefile => deleted_files/usr/src/lib/libaio/spec/sparcv9/Makefile
rename : usr/src/lib/libaio/spec/versions => deleted_files/usr/src/lib/libaio/spec/versions
rename : usr/src/lib/librt/common/Makefile => deleted_files/usr/src/lib/librt/common/Makefile
rename : usr/src/lib/librt/common/aio.c => deleted_files/usr/src/lib/librt/common/aio.c
rename : usr/src/lib/librt/common/fdatasync.c => deleted_files/usr/src/lib/librt/common/fdatasync.c
rename : usr/src/lib/librt/common/mqlib.h => deleted_files/usr/src/lib/librt/common/mqlib.h
rename : usr/src/lib/librt/common/pos4.c => deleted_files/usr/src/lib/librt/common/pos4.c
rename : usr/src/lib/librt/common/pos4.h => deleted_files/usr/src/lib/librt/common/pos4.h
rename : usr/src/lib/librt/common/sigrt.c => deleted_files/usr/src/lib/librt/common/sigrt.c
rename : usr/src/lib/librt/req.flg => deleted_files/usr/src/lib/librt/req.flg
rename : usr/src/lib/librt/spec/Makefile => deleted_files/usr/src/lib/librt/spec/Makefile
rename : usr/src/lib/librt/spec/Makefile.targ => deleted_files/usr/src/lib/librt/spec/Makefile.targ
rename : usr/src/lib/librt/spec/amd64/Makefile => deleted_files/usr/src/lib/librt/spec/amd64/Makefile
rename : usr/src/lib/librt/spec/i386/Makefile => deleted_files/usr/src/lib/librt/spec/i386/Makefile
rename : usr/src/lib/librt/spec/sparc/Makefile => deleted_files/usr/src/lib/librt/spec/sparc/Makefile
rename : usr/src/lib/librt/spec/sparcv9/Makefile => deleted_files/usr/src/lib/librt/spec/sparcv9/Makefile
rename : usr/src/lib/librt/spec/versions => deleted_files/usr/src/lib/librt/spec/versions
rename : usr/src/lib/libaio/common/libaio.h => usr/src/lib/libc/inc/asyncio.h
rename : usr/src/lib/librt/common/thread_pool.h => usr/src/lib/libc/inc/thread_pool.h
rename : usr/src/lib/libaio/common/aio.c => usr/src/lib/libc/port/aio/aio.c
rename : usr/src/lib/libaio/common/ma.c => usr/src/lib/libc/port/aio/aio_alloc.c
rename : usr/src/lib/libaio/common/posix_aio.c => usr/src/lib/libc/port/aio/posix_aio.c
rename : usr/src/lib/librt/common/clock_timer.c => usr/src/lib/libc/port/rt/clock_timer.c
rename : usr/src/lib/librt/common/fallocate.c => usr/src/lib/libc/port/rt/fallocate.c
rename : usr/src/lib/librt/common/mqueue.c => usr/src/lib/libc/port/rt/mqueue.c
rename : usr/src/lib/librt/common/pos4obj.c => usr/src/lib/libc/port/rt/pos4obj.c
rename : usr/src/lib/librt/common/pos4obj.h => usr/src/lib/libc/port/rt/pos4obj.h
rename : usr/src/lib/librt/common/sched.c => usr/src/lib/libc/port/rt/sched.c
rename : usr/src/lib/librt/common/sem.c => usr/src/lib/libc/port/rt/sem.c
rename : usr/src/lib/librt/common/shm.c => usr/src/lib/libc/port/rt/shm.c
rename : usr/src/lib/librt/common/sigev_thread.c => usr/src/lib/libc/port/rt/sigev_thread.c
rename : usr/src/lib/librt/common/sigev_thread.h => usr/src/lib/libc/port/rt/sigev_thread.h
rename : usr/src/lib/librt/common/thread_pool.c => usr/src/lib/libc/port/tpool/thread_pool.c
rename : usr/src/lib/librt/common/thread_pool_impl.h => usr/src/lib/libc/port/tpool/thread_pool_impl.h
rename : usr/src/lib/libaio/spec/aio.spec => usr/src/lib/libc/spec/aio.spec
rename : usr/src/lib/librt/spec/rt.spec => usr/src/lib/libc/spec/rt.spec
rename : usr/src/lib/libaio/asynch.h => usr/src/uts/common/sys/asynch.h
Diffstat (limited to 'usr/src/lib/libc/port/aio/aio.c')
-rw-r--r-- | usr/src/lib/libc/port/aio/aio.c | 2202 |
1 files changed, 2202 insertions, 0 deletions
diff --git a/usr/src/lib/libc/port/aio/aio.c b/usr/src/lib/libc/port/aio/aio.c new file mode 100644 index 0000000000..28d425d702 --- /dev/null +++ b/usr/src/lib/libc/port/aio/aio.c @@ -0,0 +1,2202 @@ +/* + * CDDL HEADER START + * + * The contents of this file are subject to the terms of the + * Common Development and Distribution License (the "License"). + * You may not use this file except in compliance with the License. + * + * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE + * or http://www.opensolaris.org/os/licensing. + * See the License for the specific language governing permissions + * and limitations under the License. + * + * When distributing Covered Code, include this CDDL HEADER in each + * file and include the License file at usr/src/OPENSOLARIS.LICENSE. + * If applicable, add the following below this CDDL HEADER, with the + * fields enclosed by brackets "[]" replaced with your own identifying + * information: Portions Copyright [yyyy] [name of copyright owner] + * + * CDDL HEADER END + */ + +/* + * Copyright 2006 Sun Microsystems, Inc. All rights reserved. + * Use is subject to license terms. + */ + +#pragma ident "%Z%%M% %I% %E% SMI" + +#include "synonyms.h" +#include "thr_uberdata.h" +#include "asyncio.h" +#include <atomic.h> +#include <sys/param.h> +#include <sys/file.h> +#include <sys/port.h> + +static int _aio_hash_insert(aio_result_t *, aio_req_t *); +static aio_req_t *_aio_req_get(aio_worker_t *); +static void _aio_req_add(aio_req_t *, aio_worker_t **, int); +static void _aio_req_del(aio_worker_t *, aio_req_t *, int); +static void _aio_work_done(aio_worker_t *); +static void _aio_enq_doneq(aio_req_t *); + +extern void _aio_lio_free(aio_lio_t *); + +extern int __fdsync(int, int); +extern int _port_dispatch(int, int, int, int, uintptr_t, void *); + +static int _aio_fsync_del(aio_worker_t *, aio_req_t *); +static void _aiodone(aio_req_t *, ssize_t, int); +static void _aio_cancel_work(aio_worker_t *, int, int *, int *); +static void _aio_finish_request(aio_worker_t *, ssize_t, int); + +/* + * switch for kernel async I/O + */ +int _kaio_ok = 0; /* 0 = disabled, 1 = on, -1 = error */ + +/* + * Key for thread-specific data + */ +pthread_key_t _aio_key; + +/* + * Array for determining whether or not a file supports kaio. + * Initialized in _kaio_init(). + */ +uint32_t *_kaio_supported = NULL; + +/* + * workers for read/write requests + * (__aio_mutex lock protects circular linked list of workers) + */ +aio_worker_t *__workers_rw; /* circular list of AIO workers */ +aio_worker_t *__nextworker_rw; /* next worker in list of workers */ +int __rw_workerscnt; /* number of read/write workers */ + +/* + * worker for notification requests. + */ +aio_worker_t *__workers_no; /* circular list of AIO workers */ +aio_worker_t *__nextworker_no; /* next worker in list of workers */ +int __no_workerscnt; /* number of write workers */ + +aio_req_t *_aio_done_tail; /* list of done requests */ +aio_req_t *_aio_done_head; + +mutex_t __aio_initlock = DEFAULTMUTEX; /* makes aio initialization atomic */ +cond_t __aio_initcv = DEFAULTCV; +int __aio_initbusy = 0; + +mutex_t __aio_mutex = DEFAULTMUTEX; /* protects counts, and linked lists */ +cond_t _aio_iowait_cv = DEFAULTCV; /* wait for userland I/Os */ + +pid_t __pid = (pid_t)-1; /* initialize as invalid pid */ +int _sigio_enabled = 0; /* when set, send SIGIO signal */ + +aio_hash_t *_aio_hash; + +aio_req_t *_aio_doneq; /* double linked done queue list */ + +int _aio_donecnt = 0; +int _aio_waitncnt = 0; /* # of requests for aio_waitn */ +int _aio_doneq_cnt = 0; +int _aio_outstand_cnt = 0; /* # of outstanding requests */ +int _kaio_outstand_cnt = 0; /* # of outstanding kaio requests */ +int _aio_req_done_cnt = 0; /* req. done but not in "done queue" */ +int _aio_kernel_suspend = 0; /* active kernel kaio calls */ +int _aio_suscv_cnt = 0; /* aio_suspend calls waiting on cv's */ + +int _max_workers = 256; /* max number of workers permitted */ +int _min_workers = 4; /* min number of workers */ +int _minworkload = 2; /* min number of request in q */ +int _aio_worker_cnt = 0; /* number of workers to do requests */ +int __uaio_ok = 0; /* AIO has been enabled */ +sigset_t _worker_set; /* worker's signal mask */ + +int _aiowait_flag = 0; /* when set, aiowait() is inprogress */ +int _aio_flags = 0; /* see asyncio.h defines for */ + +aio_worker_t *_kaiowp = NULL; /* points to kaio cleanup thread */ + +int hz; /* clock ticks per second */ + +static int +_kaio_supported_init(void) +{ + void *ptr; + size_t size; + + if (_kaio_supported != NULL) /* already initialized */ + return (0); + + size = MAX_KAIO_FDARRAY_SIZE * sizeof (uint32_t); + ptr = mmap(NULL, size, PROT_READ | PROT_WRITE, + MAP_PRIVATE | MAP_ANON, -1, (off_t)0); + if (ptr == MAP_FAILED) + return (-1); + _kaio_supported = ptr; + return (0); +} + +/* + * The aio subsystem is initialized when an AIO request is made. + * Constants are initialized like the max number of workers that + * the subsystem can create, and the minimum number of workers + * permitted before imposing some restrictions. Also, some + * workers are created. + */ +int +__uaio_init(void) +{ + int ret = -1; + int i; + + lmutex_lock(&__aio_initlock); + while (__aio_initbusy) + (void) _cond_wait(&__aio_initcv, &__aio_initlock); + if (__uaio_ok) { /* already initialized */ + lmutex_unlock(&__aio_initlock); + return (0); + } + __aio_initbusy = 1; + lmutex_unlock(&__aio_initlock); + + hz = (int)sysconf(_SC_CLK_TCK); + __pid = getpid(); + + setup_cancelsig(SIGAIOCANCEL); + + if (_kaio_supported_init() != 0) + goto out; + + /* + * Allocate and initialize the hash table. + */ + /* LINTED pointer cast */ + _aio_hash = (aio_hash_t *)mmap(NULL, + HASHSZ * sizeof (aio_hash_t), PROT_READ | PROT_WRITE, + MAP_PRIVATE | MAP_ANON, -1, (off_t)0); + if ((void *)_aio_hash == MAP_FAILED) { + _aio_hash = NULL; + goto out; + } + for (i = 0; i < HASHSZ; i++) + (void) mutex_init(&_aio_hash[i].hash_lock, USYNC_THREAD, NULL); + + /* + * Initialize worker's signal mask to only catch SIGAIOCANCEL. + */ + (void) sigfillset(&_worker_set); + (void) sigdelset(&_worker_set, SIGAIOCANCEL); + + /* + * Create the minimum number of read/write workers. + */ + for (i = 0; i < _min_workers; i++) + (void) _aio_create_worker(NULL, AIOREAD); + + /* + * Create one worker to send asynchronous notifications. + */ + (void) _aio_create_worker(NULL, AIONOTIFY); + + ret = 0; +out: + lmutex_lock(&__aio_initlock); + if (ret == 0) + __uaio_ok = 1; + __aio_initbusy = 0; + (void) cond_broadcast(&__aio_initcv); + lmutex_unlock(&__aio_initlock); + return (ret); +} + +/* + * Called from close() before actually performing the real _close(). + */ +void +_aio_close(int fd) +{ + if (fd < 0) /* avoid cancelling everything */ + return; + /* + * Cancel all outstanding aio requests for this file descriptor. + */ + if (__uaio_ok) + (void) aiocancel_all(fd); + /* + * If we have allocated the bit array, clear the bit for this file. + * The next open may re-use this file descriptor and the new file + * may have different kaio() behaviour. + */ + if (_kaio_supported != NULL) + CLEAR_KAIO_SUPPORTED(fd); +} + +/* + * special kaio cleanup thread sits in a loop in the + * kernel waiting for pending kaio requests to complete. + */ +void * +_kaio_cleanup_thread(void *arg) +{ + if (pthread_setspecific(_aio_key, arg) != 0) + aio_panic("_kaio_cleanup_thread, pthread_setspecific()"); + (void) _kaio(AIOSTART); + return (arg); +} + +/* + * initialize kaio. + */ +void +_kaio_init() +{ + int error; + sigset_t oset; + + lmutex_lock(&__aio_initlock); + while (__aio_initbusy) + (void) _cond_wait(&__aio_initcv, &__aio_initlock); + if (_kaio_ok) { /* already initialized */ + lmutex_unlock(&__aio_initlock); + return; + } + __aio_initbusy = 1; + lmutex_unlock(&__aio_initlock); + + if (_kaio_supported_init() != 0) + error = ENOMEM; + else if ((_kaiowp = _aio_worker_alloc()) == NULL) + error = ENOMEM; + else if ((error = (int)_kaio(AIOINIT)) == 0) { + (void) pthread_sigmask(SIG_SETMASK, &maskset, &oset); + error = thr_create(NULL, AIOSTKSIZE, _kaio_cleanup_thread, + _kaiowp, THR_DAEMON, &_kaiowp->work_tid); + (void) pthread_sigmask(SIG_SETMASK, &oset, NULL); + } + if (error && _kaiowp != NULL) { + _aio_worker_free(_kaiowp); + _kaiowp = NULL; + } + + lmutex_lock(&__aio_initlock); + if (error) + _kaio_ok = -1; + else + _kaio_ok = 1; + __aio_initbusy = 0; + (void) cond_broadcast(&__aio_initcv); + lmutex_unlock(&__aio_initlock); +} + +int +aioread(int fd, caddr_t buf, int bufsz, off_t offset, int whence, + aio_result_t *resultp) +{ + return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOREAD)); +} + +int +aiowrite(int fd, caddr_t buf, int bufsz, off_t offset, int whence, + aio_result_t *resultp) +{ + return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOWRITE)); +} + +#if !defined(_LP64) +int +aioread64(int fd, caddr_t buf, int bufsz, off64_t offset, int whence, + aio_result_t *resultp) +{ + return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOAREAD64)); +} + +int +aiowrite64(int fd, caddr_t buf, int bufsz, off64_t offset, int whence, + aio_result_t *resultp) +{ + return (_aiorw(fd, buf, bufsz, offset, whence, resultp, AIOAWRITE64)); +} +#endif /* !defined(_LP64) */ + +int +_aiorw(int fd, caddr_t buf, int bufsz, offset_t offset, int whence, + aio_result_t *resultp, int mode) +{ + aio_req_t *reqp; + aio_args_t *ap; + offset_t loffset; + struct stat stat; + int error = 0; + int kerr; + int umode; + + switch (whence) { + + case SEEK_SET: + loffset = offset; + break; + case SEEK_CUR: + if ((loffset = llseek(fd, 0, SEEK_CUR)) == -1) + error = -1; + else + loffset += offset; + break; + case SEEK_END: + if (fstat(fd, &stat) == -1) + error = -1; + else + loffset = offset + stat.st_size; + break; + default: + errno = EINVAL; + error = -1; + } + + if (error) + return (error); + + /* initialize kaio */ + if (!_kaio_ok) + _kaio_init(); + + /* + * _aio_do_request() needs the original request code (mode) to be able + * to choose the appropiate 32/64 bit function. All other functions + * only require the difference between READ and WRITE (umode). + */ + if (mode == AIOAREAD64 || mode == AIOAWRITE64) + umode = mode - AIOAREAD64; + else + umode = mode; + + /* + * Try kernel aio first. + * If errno is ENOTSUP/EBADFD, fall back to the thread implementation. + */ + if (_kaio_ok > 0 && KAIO_SUPPORTED(fd)) { + resultp->aio_errno = 0; + sig_mutex_lock(&__aio_mutex); + _kaio_outstand_cnt++; + kerr = (int)_kaio(((resultp->aio_return == AIO_INPROGRESS) ? + (umode | AIO_POLL_BIT) : umode), + fd, buf, bufsz, loffset, resultp); + if (kerr == 0) { + sig_mutex_unlock(&__aio_mutex); + return (0); + } + _kaio_outstand_cnt--; + sig_mutex_unlock(&__aio_mutex); + if (errno != ENOTSUP && errno != EBADFD) + return (-1); + if (errno == EBADFD) + SET_KAIO_NOT_SUPPORTED(fd); + } + + if (!__uaio_ok && __uaio_init() == -1) + return (-1); + + if ((reqp = _aio_req_alloc()) == NULL) { + errno = EAGAIN; + return (-1); + } + + /* + * _aio_do_request() checks reqp->req_op to differentiate + * between 32 and 64 bit access. + */ + reqp->req_op = mode; + reqp->req_resultp = resultp; + ap = &reqp->req_args; + ap->fd = fd; + ap->buf = buf; + ap->bufsz = bufsz; + ap->offset = loffset; + + if (_aio_hash_insert(resultp, reqp) != 0) { + _aio_req_free(reqp); + errno = EINVAL; + return (-1); + } + /* + * _aio_req_add() only needs the difference between READ and + * WRITE to choose the right worker queue. + */ + _aio_req_add(reqp, &__nextworker_rw, umode); + return (0); +} + +int +aiocancel(aio_result_t *resultp) +{ + aio_req_t *reqp; + aio_worker_t *aiowp; + int ret; + int done = 0; + int canceled = 0; + + if (!__uaio_ok) { + errno = EINVAL; + return (-1); + } + + sig_mutex_lock(&__aio_mutex); + reqp = _aio_hash_find(resultp); + if (reqp == NULL) { + if (_aio_outstand_cnt == _aio_req_done_cnt) + errno = EINVAL; + else + errno = EACCES; + ret = -1; + } else { + aiowp = reqp->req_worker; + sig_mutex_lock(&aiowp->work_qlock1); + (void) _aio_cancel_req(aiowp, reqp, &canceled, &done); + sig_mutex_unlock(&aiowp->work_qlock1); + + if (canceled) { + ret = 0; + } else { + if (_aio_outstand_cnt == 0 || + _aio_outstand_cnt == _aio_req_done_cnt) + errno = EINVAL; + else + errno = EACCES; + ret = -1; + } + } + sig_mutex_unlock(&__aio_mutex); + return (ret); +} + +/* + * This must be asynch safe + */ +aio_result_t * +aiowait(struct timeval *uwait) +{ + aio_result_t *uresultp; + aio_result_t *kresultp; + aio_result_t *resultp; + int dontblock; + int timedwait = 0; + int kaio_errno = 0; + struct timeval twait; + struct timeval *wait = NULL; + hrtime_t hrtend; + hrtime_t hres; + + if (uwait) { + /* + * Check for a valid specified wait time. + * If it is invalid, fail the call right away. + */ + if (uwait->tv_sec < 0 || uwait->tv_usec < 0 || + uwait->tv_usec >= MICROSEC) { + errno = EINVAL; + return ((aio_result_t *)-1); + } + + if (uwait->tv_sec > 0 || uwait->tv_usec > 0) { + hrtend = gethrtime() + + (hrtime_t)uwait->tv_sec * NANOSEC + + (hrtime_t)uwait->tv_usec * (NANOSEC / MICROSEC); + twait = *uwait; + wait = &twait; + timedwait++; + } else { + /* polling */ + sig_mutex_lock(&__aio_mutex); + if (_kaio_outstand_cnt == 0) { + kresultp = (aio_result_t *)-1; + } else { + kresultp = (aio_result_t *)_kaio(AIOWAIT, + (struct timeval *)-1, 1); + if (kresultp != (aio_result_t *)-1 && + kresultp != NULL && + kresultp != (aio_result_t *)1) { + _kaio_outstand_cnt--; + sig_mutex_unlock(&__aio_mutex); + return (kresultp); + } + } + uresultp = _aio_req_done(); + sig_mutex_unlock(&__aio_mutex); + if (uresultp != NULL && + uresultp != (aio_result_t *)-1) { + return (uresultp); + } + if (uresultp == (aio_result_t *)-1 && + kresultp == (aio_result_t *)-1) { + errno = EINVAL; + return ((aio_result_t *)-1); + } else { + return (NULL); + } + } + } + + for (;;) { + sig_mutex_lock(&__aio_mutex); + uresultp = _aio_req_done(); + if (uresultp != NULL && uresultp != (aio_result_t *)-1) { + sig_mutex_unlock(&__aio_mutex); + resultp = uresultp; + break; + } + _aiowait_flag++; + dontblock = (uresultp == (aio_result_t *)-1); + if (dontblock && _kaio_outstand_cnt == 0) { + kresultp = (aio_result_t *)-1; + kaio_errno = EINVAL; + } else { + sig_mutex_unlock(&__aio_mutex); + kresultp = (aio_result_t *)_kaio(AIOWAIT, + wait, dontblock); + sig_mutex_lock(&__aio_mutex); + kaio_errno = errno; + } + _aiowait_flag--; + sig_mutex_unlock(&__aio_mutex); + if (kresultp == (aio_result_t *)1) { + /* aiowait() awakened by an aionotify() */ + continue; + } else if (kresultp != NULL && + kresultp != (aio_result_t *)-1) { + resultp = kresultp; + sig_mutex_lock(&__aio_mutex); + _kaio_outstand_cnt--; + sig_mutex_unlock(&__aio_mutex); + break; + } else if (kresultp == (aio_result_t *)-1 && + kaio_errno == EINVAL && + uresultp == (aio_result_t *)-1) { + errno = kaio_errno; + resultp = (aio_result_t *)-1; + break; + } else if (kresultp == (aio_result_t *)-1 && + kaio_errno == EINTR) { + errno = kaio_errno; + resultp = (aio_result_t *)-1; + break; + } else if (timedwait) { + hres = hrtend - gethrtime(); + if (hres <= 0) { + /* time is up; return */ + resultp = NULL; + break; + } else { + /* + * Some time left. Round up the remaining time + * in nanoseconds to microsec. Retry the call. + */ + hres += (NANOSEC / MICROSEC) - 1; + wait->tv_sec = hres / NANOSEC; + wait->tv_usec = + (hres % NANOSEC) / (NANOSEC / MICROSEC); + } + } else { + ASSERT(kresultp == NULL && uresultp == NULL); + resultp = NULL; + continue; + } + } + return (resultp); +} + +/* + * _aio_get_timedelta calculates the remaining time and stores the result + * into timespec_t *wait. + */ + +int +_aio_get_timedelta(timespec_t *end, timespec_t *wait) +{ + int ret = 0; + struct timeval cur; + timespec_t curtime; + + (void) gettimeofday(&cur, NULL); + curtime.tv_sec = cur.tv_sec; + curtime.tv_nsec = cur.tv_usec * 1000; /* convert us to ns */ + + if (end->tv_sec >= curtime.tv_sec) { + wait->tv_sec = end->tv_sec - curtime.tv_sec; + if (end->tv_nsec >= curtime.tv_nsec) { + wait->tv_nsec = end->tv_nsec - curtime.tv_nsec; + if (wait->tv_sec == 0 && wait->tv_nsec == 0) + ret = -1; /* timer expired */ + } else { + if (end->tv_sec > curtime.tv_sec) { + wait->tv_sec -= 1; + wait->tv_nsec = NANOSEC - + (curtime.tv_nsec - end->tv_nsec); + } else { + ret = -1; /* timer expired */ + } + } + } else { + ret = -1; + } + return (ret); +} + +/* + * If closing by file descriptor: we will simply cancel all the outstanding + * aio`s and return. Those aio's in question will have either noticed the + * cancellation notice before, during, or after initiating io. + */ +int +aiocancel_all(int fd) +{ + aio_req_t *reqp; + aio_req_t **reqpp; + aio_worker_t *first; + aio_worker_t *next; + int canceled = 0; + int done = 0; + int cancelall = 0; + + sig_mutex_lock(&__aio_mutex); + + if (_aio_outstand_cnt == 0) { + sig_mutex_unlock(&__aio_mutex); + return (AIO_ALLDONE); + } + + /* + * Cancel requests from the read/write workers' queues. + */ + first = __nextworker_rw; + next = first; + do { + _aio_cancel_work(next, fd, &canceled, &done); + } while ((next = next->work_forw) != first); + + /* + * finally, check if there are requests on the done queue that + * should be canceled. + */ + if (fd < 0) + cancelall = 1; + reqpp = &_aio_done_tail; + while ((reqp = *reqpp) != NULL) { + if (cancelall || reqp->req_args.fd == fd) { + *reqpp = reqp->req_next; + _aio_donecnt--; + (void) _aio_hash_del(reqp->req_resultp); + _aio_req_free(reqp); + } else + reqpp = &reqp->req_next; + } + if (cancelall) { + ASSERT(_aio_donecnt == 0); + _aio_done_head = NULL; + } + sig_mutex_unlock(&__aio_mutex); + + if (canceled && done == 0) + return (AIO_CANCELED); + else if (done && canceled == 0) + return (AIO_ALLDONE); + else if ((canceled + done == 0) && KAIO_SUPPORTED(fd)) + return ((int)_kaio(AIOCANCEL, fd, NULL)); + return (AIO_NOTCANCELED); +} + +/* + * Cancel requests from a given work queue. If the file descriptor + * parameter, fd, is non-negative, then only cancel those requests + * in this queue that are to this file descriptor. If the fd + * parameter is -1, then cancel all requests. + */ +static void +_aio_cancel_work(aio_worker_t *aiowp, int fd, int *canceled, int *done) +{ + aio_req_t *reqp; + + sig_mutex_lock(&aiowp->work_qlock1); + /* + * cancel queued requests first. + */ + reqp = aiowp->work_tail1; + while (reqp != NULL) { + if (fd < 0 || reqp->req_args.fd == fd) { + if (_aio_cancel_req(aiowp, reqp, canceled, done)) { + /* + * Callers locks were dropped. + * reqp is invalid; start traversing + * the list from the beginning again. + */ + reqp = aiowp->work_tail1; + continue; + } + } + reqp = reqp->req_next; + } + /* + * Since the queued requests have been canceled, there can + * only be one inprogress request that should be canceled. + */ + if ((reqp = aiowp->work_req) != NULL && + (fd < 0 || reqp->req_args.fd == fd)) + (void) _aio_cancel_req(aiowp, reqp, canceled, done); + sig_mutex_unlock(&aiowp->work_qlock1); +} + +/* + * Cancel a request. Return 1 if the callers locks were temporarily + * dropped, otherwise return 0. + */ +int +_aio_cancel_req(aio_worker_t *aiowp, aio_req_t *reqp, int *canceled, int *done) +{ + int ostate = reqp->req_state; + + ASSERT(MUTEX_HELD(&__aio_mutex)); + ASSERT(MUTEX_HELD(&aiowp->work_qlock1)); + if (ostate == AIO_REQ_CANCELED) + return (0); + if (ostate == AIO_REQ_DONE || ostate == AIO_REQ_DONEQ) { + (*done)++; + return (0); + } + if (reqp->req_op == AIOFSYNC && reqp != aiowp->work_req) { + ASSERT(POSIX_AIO(reqp)); + /* Cancel the queued aio_fsync() request */ + if (!reqp->req_head->lio_canned) { + reqp->req_head->lio_canned = 1; + _aio_outstand_cnt--; + (*canceled)++; + } + return (0); + } + reqp->req_state = AIO_REQ_CANCELED; + _aio_req_del(aiowp, reqp, ostate); + (void) _aio_hash_del(reqp->req_resultp); + (*canceled)++; + if (reqp == aiowp->work_req) { + ASSERT(ostate == AIO_REQ_INPROGRESS); + /* + * Set the result values now, before _aiodone() is called. + * We do this because the application can expect aio_return + * and aio_errno to be set to -1 and ECANCELED, respectively, + * immediately after a successful return from aiocancel() + * or aio_cancel(). + */ + _aio_set_result(reqp, -1, ECANCELED); + (void) thr_kill(aiowp->work_tid, SIGAIOCANCEL); + return (0); + } + if (!POSIX_AIO(reqp)) { + _aio_outstand_cnt--; + _aio_set_result(reqp, -1, ECANCELED); + return (0); + } + sig_mutex_unlock(&aiowp->work_qlock1); + sig_mutex_unlock(&__aio_mutex); + _aiodone(reqp, -1, ECANCELED); + sig_mutex_lock(&__aio_mutex); + sig_mutex_lock(&aiowp->work_qlock1); + return (1); +} + +int +_aio_create_worker(aio_req_t *reqp, int mode) +{ + aio_worker_t *aiowp, **workers, **nextworker; + int *aio_workerscnt; + void *(*func)(void *); + sigset_t oset; + int error; + + /* + * Put the new worker thread in the right queue. + */ + switch (mode) { + case AIOREAD: + case AIOWRITE: + case AIOAREAD: + case AIOAWRITE: +#if !defined(_LP64) + case AIOAREAD64: + case AIOAWRITE64: +#endif + workers = &__workers_rw; + nextworker = &__nextworker_rw; + aio_workerscnt = &__rw_workerscnt; + func = _aio_do_request; + break; + case AIONOTIFY: + workers = &__workers_no; + nextworker = &__nextworker_no; + func = _aio_do_notify; + aio_workerscnt = &__no_workerscnt; + break; + default: + aio_panic("_aio_create_worker: invalid mode"); + break; + } + + if ((aiowp = _aio_worker_alloc()) == NULL) + return (-1); + + if (reqp) { + reqp->req_state = AIO_REQ_QUEUED; + reqp->req_worker = aiowp; + aiowp->work_head1 = reqp; + aiowp->work_tail1 = reqp; + aiowp->work_next1 = reqp; + aiowp->work_count1 = 1; + aiowp->work_minload1 = 1; + } + + (void) pthread_sigmask(SIG_SETMASK, &maskset, &oset); + error = thr_create(NULL, AIOSTKSIZE, func, aiowp, + THR_DAEMON | THR_SUSPENDED, &aiowp->work_tid); + (void) pthread_sigmask(SIG_SETMASK, &oset, NULL); + if (error) { + if (reqp) { + reqp->req_state = 0; + reqp->req_worker = NULL; + } + _aio_worker_free(aiowp); + return (-1); + } + + lmutex_lock(&__aio_mutex); + (*aio_workerscnt)++; + if (*workers == NULL) { + aiowp->work_forw = aiowp; + aiowp->work_backw = aiowp; + *nextworker = aiowp; + *workers = aiowp; + } else { + aiowp->work_backw = (*workers)->work_backw; + aiowp->work_forw = (*workers); + (*workers)->work_backw->work_forw = aiowp; + (*workers)->work_backw = aiowp; + } + _aio_worker_cnt++; + lmutex_unlock(&__aio_mutex); + + (void) thr_continue(aiowp->work_tid); + + return (0); +} + +/* + * This is the worker's main routine. + * The task of this function is to execute all queued requests; + * once the last pending request is executed this function will block + * in _aio_idle(). A new incoming request must wakeup this thread to + * restart the work. + * Every worker has an own work queue. The queue lock is required + * to synchronize the addition of new requests for this worker or + * cancellation of pending/running requests. + * + * Cancellation scenarios: + * The cancellation of a request is being done asynchronously using + * _aio_cancel_req() from another thread context. + * A queued request can be cancelled in different manners : + * a) request is queued but not "in progress" or "done" (AIO_REQ_QUEUED): + * - lock the queue -> remove the request -> unlock the queue + * - this function/thread does not detect this cancellation process + * b) request is in progress (AIO_REQ_INPROGRESS) : + * - this function first allow the cancellation of the running + * request with the flag "work_cancel_flg=1" + * see _aio_req_get() -> _aio_cancel_on() + * During this phase, it is allowed to interrupt the worker + * thread running the request (this thread) using the SIGAIOCANCEL + * signal. + * Once this thread returns from the kernel (because the request + * is just done), then it must disable a possible cancellation + * and proceed to finish the request. To disable the cancellation + * this thread must use _aio_cancel_off() to set "work_cancel_flg=0". + * c) request is already done (AIO_REQ_DONE || AIO_REQ_DONEQ): + * same procedure as in a) + * + * To b) + * This thread uses sigsetjmp() to define the position in the code, where + * it wish to continue working in the case that a SIGAIOCANCEL signal + * is detected. + * Normally this thread should get the cancellation signal during the + * kernel phase (reading or writing). In that case the signal handler + * aiosigcancelhndlr() is activated using the worker thread context, + * which again will use the siglongjmp() function to break the standard + * code flow and jump to the "sigsetjmp" position, provided that + * "work_cancel_flg" is set to "1". + * Because the "work_cancel_flg" is only manipulated by this worker + * thread and it can only run on one CPU at a given time, it is not + * necessary to protect that flag with the queue lock. + * Returning from the kernel (read or write system call) we must + * first disable the use of the SIGAIOCANCEL signal and accordingly + * the use of the siglongjmp() function to prevent a possible deadlock: + * - It can happens that this worker thread returns from the kernel and + * blocks in "work_qlock1", + * - then a second thread cancels the apparently "in progress" request + * and sends the SIGAIOCANCEL signal to the worker thread, + * - the worker thread gets assigned the "work_qlock1" and will returns + * from the kernel, + * - the kernel detects the pending signal and activates the signal + * handler instead, + * - if the "work_cancel_flg" is still set then the signal handler + * should use siglongjmp() to cancel the "in progress" request and + * it would try to acquire the same work_qlock1 in _aio_req_get() + * for a second time => deadlock. + * To avoid that situation we disable the cancellation of the request + * in progress BEFORE we try to acquire the work_qlock1. + * In that case the signal handler will not call siglongjmp() and the + * worker thread will continue running the standard code flow. + * Then this thread must check the AIO_REQ_CANCELED flag to emulate + * an eventually required siglongjmp() freeing the work_qlock1 and + * avoiding a deadlock. + */ +void * +_aio_do_request(void *arglist) +{ + aio_worker_t *aiowp = (aio_worker_t *)arglist; + ulwp_t *self = curthread; + struct aio_args *arg; + aio_req_t *reqp; /* current AIO request */ + ssize_t retval; + int error; + + if (pthread_setspecific(_aio_key, aiowp) != 0) + aio_panic("_aio_do_request, pthread_setspecific()"); + (void) pthread_sigmask(SIG_SETMASK, &_worker_set, NULL); + ASSERT(aiowp->work_req == NULL); + + /* + * We resume here when an operation is cancelled. + * On first entry, aiowp->work_req == NULL, so all + * we do is block SIGAIOCANCEL. + */ + (void) sigsetjmp(aiowp->work_jmp_buf, 0); + ASSERT(self->ul_sigdefer == 0); + + sigoff(self); /* block SIGAIOCANCEL */ + if (aiowp->work_req != NULL) + _aio_finish_request(aiowp, -1, ECANCELED); + + for (;;) { + /* + * Put completed requests on aio_done_list. This has + * to be done as part of the main loop to ensure that + * we don't artificially starve any aiowait'ers. + */ + if (aiowp->work_done1) + _aio_work_done(aiowp); + +top: + /* consume any deferred SIGAIOCANCEL signal here */ + sigon(self); + sigoff(self); + + while ((reqp = _aio_req_get(aiowp)) == NULL) { + if (_aio_idle(aiowp) != 0) + goto top; + } + arg = &reqp->req_args; + ASSERT(reqp->req_state == AIO_REQ_INPROGRESS || + reqp->req_state == AIO_REQ_CANCELED); + error = 0; + + switch (reqp->req_op) { + case AIOREAD: + case AIOAREAD: + sigon(self); /* unblock SIGAIOCANCEL */ + retval = pread(arg->fd, arg->buf, + arg->bufsz, arg->offset); + if (retval == -1) { + if (errno == ESPIPE) { + retval = read(arg->fd, + arg->buf, arg->bufsz); + if (retval == -1) + error = errno; + } else { + error = errno; + } + } + sigoff(self); /* block SIGAIOCANCEL */ + break; + case AIOWRITE: + case AIOAWRITE: + sigon(self); /* unblock SIGAIOCANCEL */ + retval = pwrite(arg->fd, arg->buf, + arg->bufsz, arg->offset); + if (retval == -1) { + if (errno == ESPIPE) { + retval = write(arg->fd, + arg->buf, arg->bufsz); + if (retval == -1) + error = errno; + } else { + error = errno; + } + } + sigoff(self); /* block SIGAIOCANCEL */ + break; +#if !defined(_LP64) + case AIOAREAD64: + sigon(self); /* unblock SIGAIOCANCEL */ + retval = pread64(arg->fd, arg->buf, + arg->bufsz, arg->offset); + if (retval == -1) { + if (errno == ESPIPE) { + retval = read(arg->fd, + arg->buf, arg->bufsz); + if (retval == -1) + error = errno; + } else { + error = errno; + } + } + sigoff(self); /* block SIGAIOCANCEL */ + break; + case AIOAWRITE64: + sigon(self); /* unblock SIGAIOCANCEL */ + retval = pwrite64(arg->fd, arg->buf, + arg->bufsz, arg->offset); + if (retval == -1) { + if (errno == ESPIPE) { + retval = write(arg->fd, + arg->buf, arg->bufsz); + if (retval == -1) + error = errno; + } else { + error = errno; + } + } + sigoff(self); /* block SIGAIOCANCEL */ + break; +#endif /* !defined(_LP64) */ + case AIOFSYNC: + if (_aio_fsync_del(aiowp, reqp)) + goto top; + ASSERT(reqp->req_head == NULL); + /* + * All writes for this fsync request are now + * acknowledged. Now make these writes visible + * and put the final request into the hash table. + */ + if (reqp->req_state == AIO_REQ_CANCELED) { + /* EMPTY */; + } else if (arg->offset == O_SYNC) { + if ((retval = __fdsync(arg->fd, FSYNC)) == -1) + error = errno; + } else { + if ((retval = __fdsync(arg->fd, FDSYNC)) == -1) + error = errno; + } + if (_aio_hash_insert(reqp->req_resultp, reqp) != 0) + aio_panic("_aio_do_request(): AIOFSYNC: " + "request already in hash table"); + break; + default: + aio_panic("_aio_do_request, bad op"); + } + + _aio_finish_request(aiowp, retval, error); + } + /* NOTREACHED */ + return (NULL); +} + +/* + * Perform the tail processing for _aio_do_request(). + * The in-progress request may or may not have been cancelled. + */ +static void +_aio_finish_request(aio_worker_t *aiowp, ssize_t retval, int error) +{ + aio_req_t *reqp; + + sig_mutex_lock(&aiowp->work_qlock1); + if ((reqp = aiowp->work_req) == NULL) + sig_mutex_unlock(&aiowp->work_qlock1); + else { + aiowp->work_req = NULL; + if (reqp->req_state == AIO_REQ_CANCELED) { + retval = -1; + error = ECANCELED; + } + if (!POSIX_AIO(reqp)) { + sig_mutex_unlock(&aiowp->work_qlock1); + sig_mutex_lock(&__aio_mutex); + if (reqp->req_state == AIO_REQ_INPROGRESS) + reqp->req_state = AIO_REQ_DONE; + _aio_req_done_cnt++; + _aio_set_result(reqp, retval, error); + if (error == ECANCELED) + _aio_outstand_cnt--; + sig_mutex_unlock(&__aio_mutex); + } else { + if (reqp->req_state == AIO_REQ_INPROGRESS) + reqp->req_state = AIO_REQ_DONE; + sig_mutex_unlock(&aiowp->work_qlock1); + _aiodone(reqp, retval, error); + } + } +} + +void +_aio_req_mark_done(aio_req_t *reqp) +{ +#if !defined(_LP64) + if (reqp->req_largefile) + ((aiocb64_t *)reqp->req_aiocbp)->aio_state = USERAIO_DONE; + else +#endif + ((aiocb_t *)reqp->req_aiocbp)->aio_state = USERAIO_DONE; +} + +/* + * Sleep for 'ticks' clock ticks to give somebody else a chance to run, + * hopefully to consume one of our queued signals. + */ +static void +_aio_delay(int ticks) +{ + (void) usleep(ticks * (MICROSEC / hz)); +} + +/* + * Actually send the notifications. + * We could block indefinitely here if the application + * is not listening for the signal or port notifications. + */ +static void +send_notification(notif_param_t *npp) +{ + extern int __sigqueue(pid_t pid, int signo, + /* const union sigval */ void *value, int si_code, int block); + + if (npp->np_signo) + (void) __sigqueue(__pid, npp->np_signo, npp->np_user, + SI_ASYNCIO, 1); + else if (npp->np_port >= 0) + (void) _port_dispatch(npp->np_port, 0, PORT_SOURCE_AIO, + npp->np_event, npp->np_object, npp->np_user); + + if (npp->np_lio_signo) + (void) __sigqueue(__pid, npp->np_lio_signo, npp->np_lio_user, + SI_ASYNCIO, 1); + else if (npp->np_lio_port >= 0) + (void) _port_dispatch(npp->np_lio_port, 0, PORT_SOURCE_AIO, + npp->np_lio_event, npp->np_lio_object, npp->np_lio_user); +} + +/* + * Asynchronous notification worker. + */ +void * +_aio_do_notify(void *arg) +{ + aio_worker_t *aiowp = (aio_worker_t *)arg; + aio_req_t *reqp; + + /* + * This isn't really necessary. All signals are blocked. + */ + if (pthread_setspecific(_aio_key, aiowp) != 0) + aio_panic("_aio_do_notify, pthread_setspecific()"); + + /* + * Notifications are never cancelled. + * All signals remain blocked, forever. + */ + for (;;) { + while ((reqp = _aio_req_get(aiowp)) == NULL) { + if (_aio_idle(aiowp) != 0) + aio_panic("_aio_do_notify: _aio_idle() failed"); + } + send_notification(&reqp->req_notify); + _aio_req_free(reqp); + } + + /* NOTREACHED */ + return (NULL); +} + +/* + * Do the completion semantics for a request that was either canceled + * by _aio_cancel_req() or was completed by _aio_do_request(). + */ +static void +_aiodone(aio_req_t *reqp, ssize_t retval, int error) +{ + aio_result_t *resultp = reqp->req_resultp; + int notify = 0; + aio_lio_t *head; + int sigev_none; + int sigev_signal; + int sigev_thread; + int sigev_port; + notif_param_t np; + + /* + * We call _aiodone() only for Posix I/O. + */ + ASSERT(POSIX_AIO(reqp)); + + sigev_none = 0; + sigev_signal = 0; + sigev_thread = 0; + sigev_port = 0; + np.np_signo = 0; + np.np_port = -1; + np.np_lio_signo = 0; + np.np_lio_port = -1; + + switch (reqp->req_sigevent.sigev_notify) { + case SIGEV_NONE: + sigev_none = 1; + break; + case SIGEV_SIGNAL: + sigev_signal = 1; + break; + case SIGEV_THREAD: + sigev_thread = 1; + break; + case SIGEV_PORT: + sigev_port = 1; + break; + default: + aio_panic("_aiodone: improper sigev_notify"); + break; + } + + /* + * Figure out the notification parameters while holding __aio_mutex. + * Actually perform the notifications after dropping __aio_mutex. + * This allows us to sleep for a long time (if the notifications + * incur delays) without impeding other async I/O operations. + */ + + sig_mutex_lock(&__aio_mutex); + + if (sigev_signal) { + if ((np.np_signo = reqp->req_sigevent.sigev_signo) != 0) + notify = 1; + np.np_user = reqp->req_sigevent.sigev_value.sival_ptr; + } else if (sigev_thread | sigev_port) { + if ((np.np_port = reqp->req_sigevent.sigev_signo) >= 0) + notify = 1; + np.np_event = reqp->req_op; + if (np.np_event == AIOFSYNC && reqp->req_largefile) + np.np_event = AIOFSYNC64; + np.np_object = (uintptr_t)reqp->req_aiocbp; + np.np_user = reqp->req_sigevent.sigev_value.sival_ptr; + } + + if (resultp->aio_errno == EINPROGRESS) + _aio_set_result(reqp, retval, error); + + _aio_outstand_cnt--; + + head = reqp->req_head; + reqp->req_head = NULL; + + if (sigev_none) { + _aio_enq_doneq(reqp); + reqp = NULL; + } else { + (void) _aio_hash_del(resultp); + _aio_req_mark_done(reqp); + } + + _aio_waitn_wakeup(); + + /* + * __aio_waitn() sets AIO_WAIT_INPROGRESS and + * __aio_suspend() increments "_aio_kernel_suspend" + * when they are waiting in the kernel for completed I/Os. + * + * _kaio(AIONOTIFY) awakes the corresponding function + * in the kernel; then the corresponding __aio_waitn() or + * __aio_suspend() function could reap the recently + * completed I/Os (_aiodone()). + */ + if ((_aio_flags & AIO_WAIT_INPROGRESS) || _aio_kernel_suspend > 0) + (void) _kaio(AIONOTIFY); + + sig_mutex_unlock(&__aio_mutex); + + if (head != NULL) { + /* + * If all the lio requests have completed, + * prepare to notify the waiting thread. + */ + sig_mutex_lock(&head->lio_mutex); + ASSERT(head->lio_refcnt == head->lio_nent); + if (head->lio_refcnt == 1) { + int waiting = 0; + if (head->lio_mode == LIO_WAIT) { + if ((waiting = head->lio_waiting) != 0) + (void) cond_signal(&head->lio_cond_cv); + } else if (head->lio_port < 0) { /* none or signal */ + if ((np.np_lio_signo = head->lio_signo) != 0) + notify = 1; + np.np_lio_user = head->lio_sigval.sival_ptr; + } else { /* thread or port */ + notify = 1; + np.np_lio_port = head->lio_port; + np.np_lio_event = head->lio_event; + np.np_lio_object = + (uintptr_t)head->lio_sigevent; + np.np_lio_user = head->lio_sigval.sival_ptr; + } + head->lio_nent = head->lio_refcnt = 0; + sig_mutex_unlock(&head->lio_mutex); + if (waiting == 0) + _aio_lio_free(head); + } else { + head->lio_nent--; + head->lio_refcnt--; + sig_mutex_unlock(&head->lio_mutex); + } + } + + /* + * The request is completed; now perform the notifications. + */ + if (notify) { + if (reqp != NULL) { + /* + * We usually put the request on the notification + * queue because we don't want to block and delay + * other operations behind us in the work queue. + * Also we must never block on a cancel notification + * because we are being called from an application + * thread in this case and that could lead to deadlock + * if no other thread is receiving notificatins. + */ + reqp->req_notify = np; + reqp->req_op = AIONOTIFY; + _aio_req_add(reqp, &__workers_no, AIONOTIFY); + reqp = NULL; + } else { + /* + * We already put the request on the done queue, + * so we can't queue it to the notification queue. + * Just do the notification directly. + */ + send_notification(&np); + } + } + + if (reqp != NULL) + _aio_req_free(reqp); +} + +/* + * Delete fsync requests from list head until there is + * only one left. Return 0 when there is only one, + * otherwise return a non-zero value. + */ +static int +_aio_fsync_del(aio_worker_t *aiowp, aio_req_t *reqp) +{ + aio_lio_t *head = reqp->req_head; + int rval = 0; + + ASSERT(reqp == aiowp->work_req); + sig_mutex_lock(&aiowp->work_qlock1); + sig_mutex_lock(&head->lio_mutex); + if (head->lio_refcnt > 1) { + head->lio_refcnt--; + head->lio_nent--; + aiowp->work_req = NULL; + sig_mutex_unlock(&head->lio_mutex); + sig_mutex_unlock(&aiowp->work_qlock1); + sig_mutex_lock(&__aio_mutex); + _aio_outstand_cnt--; + _aio_waitn_wakeup(); + sig_mutex_unlock(&__aio_mutex); + _aio_req_free(reqp); + return (1); + } + ASSERT(head->lio_nent == 1 && head->lio_refcnt == 1); + reqp->req_head = NULL; + if (head->lio_canned) + reqp->req_state = AIO_REQ_CANCELED; + if (head->lio_mode == LIO_DESTROY) { + aiowp->work_req = NULL; + rval = 1; + } + sig_mutex_unlock(&head->lio_mutex); + sig_mutex_unlock(&aiowp->work_qlock1); + head->lio_refcnt--; + head->lio_nent--; + _aio_lio_free(head); + if (rval != 0) + _aio_req_free(reqp); + return (rval); +} + +/* + * A worker is set idle when its work queue is empty. + * The worker checks again that it has no more work + * and then goes to sleep waiting for more work. + */ +int +_aio_idle(aio_worker_t *aiowp) +{ + int error = 0; + + sig_mutex_lock(&aiowp->work_qlock1); + if (aiowp->work_count1 == 0) { + ASSERT(aiowp->work_minload1 == 0); + aiowp->work_idleflg = 1; + /* + * A cancellation handler is not needed here. + * aio worker threads are never cancelled via pthread_cancel(). + */ + error = sig_cond_wait(&aiowp->work_idle_cv, + &aiowp->work_qlock1); + /* + * The idle flag is normally cleared before worker is awakened + * by aio_req_add(). On error (EINTR), we clear it ourself. + */ + if (error) + aiowp->work_idleflg = 0; + } + sig_mutex_unlock(&aiowp->work_qlock1); + return (error); +} + +/* + * A worker's completed AIO requests are placed onto a global + * done queue. The application is only sent a SIGIO signal if + * the process has a handler enabled and it is not waiting via + * aiowait(). + */ +static void +_aio_work_done(aio_worker_t *aiowp) +{ + aio_req_t *reqp; + + sig_mutex_lock(&aiowp->work_qlock1); + reqp = aiowp->work_prev1; + reqp->req_next = NULL; + aiowp->work_done1 = 0; + aiowp->work_tail1 = aiowp->work_next1; + if (aiowp->work_tail1 == NULL) + aiowp->work_head1 = NULL; + aiowp->work_prev1 = NULL; + sig_mutex_unlock(&aiowp->work_qlock1); + sig_mutex_lock(&__aio_mutex); + _aio_donecnt++; + _aio_outstand_cnt--; + _aio_req_done_cnt--; + ASSERT(_aio_donecnt > 0 && + _aio_outstand_cnt >= 0 && + _aio_req_done_cnt >= 0); + ASSERT(reqp != NULL); + + if (_aio_done_tail == NULL) { + _aio_done_head = _aio_done_tail = reqp; + } else { + _aio_done_head->req_next = reqp; + _aio_done_head = reqp; + } + + if (_aiowait_flag) { + sig_mutex_unlock(&__aio_mutex); + (void) _kaio(AIONOTIFY); + } else { + sig_mutex_unlock(&__aio_mutex); + if (_sigio_enabled) + (void) kill(__pid, SIGIO); + } +} + +/* + * The done queue consists of AIO requests that are in either the + * AIO_REQ_DONE or AIO_REQ_CANCELED state. Requests that were cancelled + * are discarded. If the done queue is empty then NULL is returned. + * Otherwise the address of a done aio_result_t is returned. + */ +aio_result_t * +_aio_req_done(void) +{ + aio_req_t *reqp; + aio_result_t *resultp; + + ASSERT(MUTEX_HELD(&__aio_mutex)); + + if ((reqp = _aio_done_tail) != NULL) { + if ((_aio_done_tail = reqp->req_next) == NULL) + _aio_done_head = NULL; + ASSERT(_aio_donecnt > 0); + _aio_donecnt--; + (void) _aio_hash_del(reqp->req_resultp); + resultp = reqp->req_resultp; + ASSERT(reqp->req_state == AIO_REQ_DONE); + _aio_req_free(reqp); + return (resultp); + } + /* is queue empty? */ + if (reqp == NULL && _aio_outstand_cnt == 0) { + return ((aio_result_t *)-1); + } + return (NULL); +} + +/* + * Set the return and errno values for the application's use. + * + * For the Posix interfaces, we must set the return value first followed + * by the errno value because the Posix interfaces allow for a change + * in the errno value from EINPROGRESS to something else to signal + * the completion of the asynchronous request. + * + * The opposite is true for the Solaris interfaces. These allow for + * a change in the return value from AIO_INPROGRESS to something else + * to signal the completion of the asynchronous request. + */ +void +_aio_set_result(aio_req_t *reqp, ssize_t retval, int error) +{ + aio_result_t *resultp = reqp->req_resultp; + + if (POSIX_AIO(reqp)) { + resultp->aio_return = retval; + membar_producer(); + resultp->aio_errno = error; + } else { + resultp->aio_errno = error; + membar_producer(); + resultp->aio_return = retval; + } +} + +/* + * Add an AIO request onto the next work queue. + * A circular list of workers is used to choose the next worker. + */ +void +_aio_req_add(aio_req_t *reqp, aio_worker_t **nextworker, int mode) +{ + ulwp_t *self = curthread; + aio_worker_t *aiowp; + aio_worker_t *first; + int load_bal_flg = 1; + int found; + + ASSERT(reqp->req_state != AIO_REQ_DONEQ); + reqp->req_next = NULL; + /* + * Try to acquire the next worker's work queue. If it is locked, + * then search the list of workers until a queue is found unlocked, + * or until the list is completely traversed at which point another + * worker will be created. + */ + sigoff(self); /* defer SIGIO */ + sig_mutex_lock(&__aio_mutex); + first = aiowp = *nextworker; + if (mode != AIONOTIFY) + _aio_outstand_cnt++; + sig_mutex_unlock(&__aio_mutex); + + switch (mode) { + case AIOREAD: + case AIOWRITE: + case AIOAREAD: + case AIOAWRITE: +#if !defined(_LP64) + case AIOAREAD64: + case AIOAWRITE64: +#endif + /* try to find an idle worker */ + found = 0; + do { + if (sig_mutex_trylock(&aiowp->work_qlock1) == 0) { + if (aiowp->work_idleflg) { + found = 1; + break; + } + sig_mutex_unlock(&aiowp->work_qlock1); + } + } while ((aiowp = aiowp->work_forw) != first); + + if (found) { + aiowp->work_minload1++; + break; + } + + /* try to acquire some worker's queue lock */ + do { + if (sig_mutex_trylock(&aiowp->work_qlock1) == 0) { + found = 1; + break; + } + } while ((aiowp = aiowp->work_forw) != first); + + /* + * Create more workers when the workers appear overloaded. + * Either all the workers are busy draining their queues + * or no worker's queue lock could be acquired. + */ + if (!found) { + if (_aio_worker_cnt < _max_workers) { + if (_aio_create_worker(reqp, mode)) + aio_panic("_aio_req_add: add worker"); + sigon(self); /* reenable SIGIO */ + return; + } + + /* + * No worker available and we have created + * _max_workers, keep going through the + * list slowly until we get a lock + */ + while (sig_mutex_trylock(&aiowp->work_qlock1) != 0) { + /* + * give someone else a chance + */ + _aio_delay(1); + aiowp = aiowp->work_forw; + } + } + + ASSERT(MUTEX_HELD(&aiowp->work_qlock1)); + if (_aio_worker_cnt < _max_workers && + aiowp->work_minload1 >= _minworkload) { + sig_mutex_unlock(&aiowp->work_qlock1); + sig_mutex_lock(&__aio_mutex); + *nextworker = aiowp->work_forw; + sig_mutex_unlock(&__aio_mutex); + if (_aio_create_worker(reqp, mode)) + aio_panic("aio_req_add: add worker"); + sigon(self); /* reenable SIGIO */ + return; + } + aiowp->work_minload1++; + break; + case AIOFSYNC: + case AIONOTIFY: + load_bal_flg = 0; + sig_mutex_lock(&aiowp->work_qlock1); + break; + default: + aio_panic("_aio_req_add: invalid mode"); + break; + } + /* + * Put request onto worker's work queue. + */ + if (aiowp->work_tail1 == NULL) { + ASSERT(aiowp->work_count1 == 0); + aiowp->work_tail1 = reqp; + aiowp->work_next1 = reqp; + } else { + aiowp->work_head1->req_next = reqp; + if (aiowp->work_next1 == NULL) + aiowp->work_next1 = reqp; + } + reqp->req_state = AIO_REQ_QUEUED; + reqp->req_worker = aiowp; + aiowp->work_head1 = reqp; + /* + * Awaken worker if it is not currently active. + */ + if (aiowp->work_count1++ == 0 && aiowp->work_idleflg) { + aiowp->work_idleflg = 0; + (void) cond_signal(&aiowp->work_idle_cv); + } + sig_mutex_unlock(&aiowp->work_qlock1); + + if (load_bal_flg) { + sig_mutex_lock(&__aio_mutex); + *nextworker = aiowp->work_forw; + sig_mutex_unlock(&__aio_mutex); + } + sigon(self); /* reenable SIGIO */ +} + +/* + * Get an AIO request for a specified worker. + * If the work queue is empty, return NULL. + */ +aio_req_t * +_aio_req_get(aio_worker_t *aiowp) +{ + aio_req_t *reqp; + + sig_mutex_lock(&aiowp->work_qlock1); + if ((reqp = aiowp->work_next1) != NULL) { + /* + * Remove a POSIX request from the queue; the + * request queue is a singularly linked list + * with a previous pointer. The request is + * removed by updating the previous pointer. + * + * Non-posix requests are left on the queue + * to eventually be placed on the done queue. + */ + + if (POSIX_AIO(reqp)) { + if (aiowp->work_prev1 == NULL) { + aiowp->work_tail1 = reqp->req_next; + if (aiowp->work_tail1 == NULL) + aiowp->work_head1 = NULL; + } else { + aiowp->work_prev1->req_next = reqp->req_next; + if (aiowp->work_head1 == reqp) + aiowp->work_head1 = reqp->req_next; + } + + } else { + aiowp->work_prev1 = reqp; + ASSERT(aiowp->work_done1 >= 0); + aiowp->work_done1++; + } + ASSERT(reqp != reqp->req_next); + aiowp->work_next1 = reqp->req_next; + ASSERT(aiowp->work_count1 >= 1); + aiowp->work_count1--; + switch (reqp->req_op) { + case AIOREAD: + case AIOWRITE: + case AIOAREAD: + case AIOAWRITE: +#if !defined(_LP64) + case AIOAREAD64: + case AIOAWRITE64: +#endif + ASSERT(aiowp->work_minload1 > 0); + aiowp->work_minload1--; + break; + } + reqp->req_state = AIO_REQ_INPROGRESS; + } + aiowp->work_req = reqp; + ASSERT(reqp != NULL || aiowp->work_count1 == 0); + sig_mutex_unlock(&aiowp->work_qlock1); + return (reqp); +} + +static void +_aio_req_del(aio_worker_t *aiowp, aio_req_t *reqp, int ostate) +{ + aio_req_t **last; + aio_req_t *lastrp; + aio_req_t *next; + + ASSERT(aiowp != NULL); + ASSERT(MUTEX_HELD(&aiowp->work_qlock1)); + if (POSIX_AIO(reqp)) { + if (ostate != AIO_REQ_QUEUED) + return; + } + last = &aiowp->work_tail1; + lastrp = aiowp->work_tail1; + ASSERT(ostate == AIO_REQ_QUEUED || ostate == AIO_REQ_INPROGRESS); + while ((next = *last) != NULL) { + if (next == reqp) { + *last = next->req_next; + if (aiowp->work_next1 == next) + aiowp->work_next1 = next->req_next; + + if ((next->req_next != NULL) || + (aiowp->work_done1 == 0)) { + if (aiowp->work_head1 == next) + aiowp->work_head1 = next->req_next; + if (aiowp->work_prev1 == next) + aiowp->work_prev1 = next->req_next; + } else { + if (aiowp->work_head1 == next) + aiowp->work_head1 = lastrp; + if (aiowp->work_prev1 == next) + aiowp->work_prev1 = lastrp; + } + + if (ostate == AIO_REQ_QUEUED) { + ASSERT(aiowp->work_count1 >= 1); + aiowp->work_count1--; + ASSERT(aiowp->work_minload1 >= 1); + aiowp->work_minload1--; + } else { + ASSERT(ostate == AIO_REQ_INPROGRESS && + !POSIX_AIO(reqp)); + aiowp->work_done1--; + } + return; + } + last = &next->req_next; + lastrp = next; + } + /* NOTREACHED */ +} + +static void +_aio_enq_doneq(aio_req_t *reqp) +{ + if (_aio_doneq == NULL) { + _aio_doneq = reqp; + reqp->req_next = reqp->req_prev = reqp; + } else { + reqp->req_next = _aio_doneq; + reqp->req_prev = _aio_doneq->req_prev; + _aio_doneq->req_prev->req_next = reqp; + _aio_doneq->req_prev = reqp; + } + reqp->req_state = AIO_REQ_DONEQ; + _aio_doneq_cnt++; +} + +/* + * caller owns the _aio_mutex + */ +aio_req_t * +_aio_req_remove(aio_req_t *reqp) +{ + if (reqp && reqp->req_state != AIO_REQ_DONEQ) + return (NULL); + + if (reqp) { + /* request in done queue */ + if (_aio_doneq == reqp) + _aio_doneq = reqp->req_next; + if (_aio_doneq == reqp) { + /* only one request on queue */ + _aio_doneq = NULL; + } else { + aio_req_t *tmp = reqp->req_next; + reqp->req_prev->req_next = tmp; + tmp->req_prev = reqp->req_prev; + } + } else if ((reqp = _aio_doneq) != NULL) { + if (reqp == reqp->req_next) { + /* only one request on queue */ + _aio_doneq = NULL; + } else { + reqp->req_prev->req_next = _aio_doneq = reqp->req_next; + _aio_doneq->req_prev = reqp->req_prev; + } + } + if (reqp) { + _aio_doneq_cnt--; + reqp->req_next = reqp->req_prev = reqp; + reqp->req_state = AIO_REQ_DONE; + } + return (reqp); +} + +/* + * An AIO request is identified by an aio_result_t pointer. The library + * maps this aio_result_t pointer to its internal representation using a + * hash table. This function adds an aio_result_t pointer to the hash table. + */ +static int +_aio_hash_insert(aio_result_t *resultp, aio_req_t *reqp) +{ + aio_hash_t *hashp; + aio_req_t **prev; + aio_req_t *next; + + hashp = _aio_hash + AIOHASH(resultp); + lmutex_lock(&hashp->hash_lock); + prev = &hashp->hash_ptr; + while ((next = *prev) != NULL) { + if (resultp == next->req_resultp) { + lmutex_unlock(&hashp->hash_lock); + return (-1); + } + prev = &next->req_link; + } + *prev = reqp; + ASSERT(reqp->req_link == NULL); + lmutex_unlock(&hashp->hash_lock); + return (0); +} + +/* + * Remove an entry from the hash table. + */ +aio_req_t * +_aio_hash_del(aio_result_t *resultp) +{ + aio_hash_t *hashp; + aio_req_t **prev; + aio_req_t *next = NULL; + + if (_aio_hash != NULL) { + hashp = _aio_hash + AIOHASH(resultp); + lmutex_lock(&hashp->hash_lock); + prev = &hashp->hash_ptr; + while ((next = *prev) != NULL) { + if (resultp == next->req_resultp) { + *prev = next->req_link; + next->req_link = NULL; + break; + } + prev = &next->req_link; + } + lmutex_unlock(&hashp->hash_lock); + } + return (next); +} + +/* + * find an entry in the hash table + */ +aio_req_t * +_aio_hash_find(aio_result_t *resultp) +{ + aio_hash_t *hashp; + aio_req_t **prev; + aio_req_t *next = NULL; + + if (_aio_hash != NULL) { + hashp = _aio_hash + AIOHASH(resultp); + lmutex_lock(&hashp->hash_lock); + prev = &hashp->hash_ptr; + while ((next = *prev) != NULL) { + if (resultp == next->req_resultp) + break; + prev = &next->req_link; + } + lmutex_unlock(&hashp->hash_lock); + } + return (next); +} + +/* + * AIO interface for POSIX + */ +int +_aio_rw(aiocb_t *aiocbp, aio_lio_t *lio_head, aio_worker_t **nextworker, + int mode, int flg) +{ + aio_req_t *reqp; + aio_args_t *ap; + int kerr; + + if (aiocbp == NULL) { + errno = EINVAL; + return (-1); + } + + /* initialize kaio */ + if (!_kaio_ok) + _kaio_init(); + + aiocbp->aio_state = NOCHECK; + + /* + * If we have been called because a list I/O + * kaio() failed, we dont want to repeat the + * system call + */ + + if (flg & AIO_KAIO) { + /* + * Try kernel aio first. + * If errno is ENOTSUP/EBADFD, + * fall back to the thread implementation. + */ + if (_kaio_ok > 0 && KAIO_SUPPORTED(aiocbp->aio_fildes)) { + aiocbp->aio_resultp.aio_errno = EINPROGRESS; + aiocbp->aio_state = CHECK; + kerr = (int)_kaio(mode, aiocbp); + if (kerr == 0) + return (0); + if (errno != ENOTSUP && errno != EBADFD) { + aiocbp->aio_resultp.aio_errno = errno; + aiocbp->aio_resultp.aio_return = -1; + aiocbp->aio_state = NOCHECK; + return (-1); + } + if (errno == EBADFD) + SET_KAIO_NOT_SUPPORTED(aiocbp->aio_fildes); + } + } + + aiocbp->aio_resultp.aio_errno = EINPROGRESS; + aiocbp->aio_state = USERAIO; + + if (!__uaio_ok && __uaio_init() == -1) + return (-1); + + if ((reqp = _aio_req_alloc()) == NULL) { + errno = EAGAIN; + return (-1); + } + + /* + * If an LIO request, add the list head to the aio request + */ + reqp->req_head = lio_head; + reqp->req_type = AIO_POSIX_REQ; + reqp->req_op = mode; + reqp->req_largefile = 0; + + if (aiocbp->aio_sigevent.sigev_notify == SIGEV_NONE) { + reqp->req_sigevent.sigev_notify = SIGEV_NONE; + } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_SIGNAL) { + reqp->req_sigevent.sigev_notify = SIGEV_SIGNAL; + reqp->req_sigevent.sigev_signo = + aiocbp->aio_sigevent.sigev_signo; + reqp->req_sigevent.sigev_value.sival_ptr = + aiocbp->aio_sigevent.sigev_value.sival_ptr; + } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_PORT) { + port_notify_t *pn = aiocbp->aio_sigevent.sigev_value.sival_ptr; + reqp->req_sigevent.sigev_notify = SIGEV_PORT; + /* + * Reuse the sigevent structure to contain the port number + * and the user value. Same for SIGEV_THREAD, below. + */ + reqp->req_sigevent.sigev_signo = + pn->portnfy_port; + reqp->req_sigevent.sigev_value.sival_ptr = + pn->portnfy_user; + } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_THREAD) { + reqp->req_sigevent.sigev_notify = SIGEV_THREAD; + /* + * The sigevent structure contains the port number + * and the user value. Same for SIGEV_PORT, above. + */ + reqp->req_sigevent.sigev_signo = + aiocbp->aio_sigevent.sigev_signo; + reqp->req_sigevent.sigev_value.sival_ptr = + aiocbp->aio_sigevent.sigev_value.sival_ptr; + } + + reqp->req_resultp = &aiocbp->aio_resultp; + reqp->req_aiocbp = aiocbp; + ap = &reqp->req_args; + ap->fd = aiocbp->aio_fildes; + ap->buf = (caddr_t)aiocbp->aio_buf; + ap->bufsz = aiocbp->aio_nbytes; + ap->offset = aiocbp->aio_offset; + + if ((flg & AIO_NO_DUPS) && + _aio_hash_insert(&aiocbp->aio_resultp, reqp) != 0) { + aio_panic("_aio_rw(): request already in hash table"); + _aio_req_free(reqp); + errno = EINVAL; + return (-1); + } + _aio_req_add(reqp, nextworker, mode); + return (0); +} + +#if !defined(_LP64) +/* + * 64-bit AIO interface for POSIX + */ +int +_aio_rw64(aiocb64_t *aiocbp, aio_lio_t *lio_head, aio_worker_t **nextworker, + int mode, int flg) +{ + aio_req_t *reqp; + aio_args_t *ap; + int kerr; + + if (aiocbp == NULL) { + errno = EINVAL; + return (-1); + } + + /* initialize kaio */ + if (!_kaio_ok) + _kaio_init(); + + aiocbp->aio_state = NOCHECK; + + /* + * If we have been called because a list I/O + * kaio() failed, we dont want to repeat the + * system call + */ + + if (flg & AIO_KAIO) { + /* + * Try kernel aio first. + * If errno is ENOTSUP/EBADFD, + * fall back to the thread implementation. + */ + if (_kaio_ok > 0 && KAIO_SUPPORTED(aiocbp->aio_fildes)) { + aiocbp->aio_resultp.aio_errno = EINPROGRESS; + aiocbp->aio_state = CHECK; + kerr = (int)_kaio(mode, aiocbp); + if (kerr == 0) + return (0); + if (errno != ENOTSUP && errno != EBADFD) { + aiocbp->aio_resultp.aio_errno = errno; + aiocbp->aio_resultp.aio_return = -1; + aiocbp->aio_state = NOCHECK; + return (-1); + } + if (errno == EBADFD) + SET_KAIO_NOT_SUPPORTED(aiocbp->aio_fildes); + } + } + + aiocbp->aio_resultp.aio_errno = EINPROGRESS; + aiocbp->aio_state = USERAIO; + + if (!__uaio_ok && __uaio_init() == -1) + return (-1); + + if ((reqp = _aio_req_alloc()) == NULL) { + errno = EAGAIN; + return (-1); + } + + /* + * If an LIO request, add the list head to the aio request + */ + reqp->req_head = lio_head; + reqp->req_type = AIO_POSIX_REQ; + reqp->req_op = mode; + reqp->req_largefile = 1; + + if (aiocbp->aio_sigevent.sigev_notify == SIGEV_NONE) { + reqp->req_sigevent.sigev_notify = SIGEV_NONE; + } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_SIGNAL) { + reqp->req_sigevent.sigev_notify = SIGEV_SIGNAL; + reqp->req_sigevent.sigev_signo = + aiocbp->aio_sigevent.sigev_signo; + reqp->req_sigevent.sigev_value.sival_ptr = + aiocbp->aio_sigevent.sigev_value.sival_ptr; + } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_PORT) { + port_notify_t *pn = aiocbp->aio_sigevent.sigev_value.sival_ptr; + reqp->req_sigevent.sigev_notify = SIGEV_PORT; + reqp->req_sigevent.sigev_signo = + pn->portnfy_port; + reqp->req_sigevent.sigev_value.sival_ptr = + pn->portnfy_user; + } else if (aiocbp->aio_sigevent.sigev_notify == SIGEV_THREAD) { + reqp->req_sigevent.sigev_notify = SIGEV_THREAD; + reqp->req_sigevent.sigev_signo = + aiocbp->aio_sigevent.sigev_signo; + reqp->req_sigevent.sigev_value.sival_ptr = + aiocbp->aio_sigevent.sigev_value.sival_ptr; + } + + reqp->req_resultp = &aiocbp->aio_resultp; + reqp->req_aiocbp = aiocbp; + ap = &reqp->req_args; + ap->fd = aiocbp->aio_fildes; + ap->buf = (caddr_t)aiocbp->aio_buf; + ap->bufsz = aiocbp->aio_nbytes; + ap->offset = aiocbp->aio_offset; + + if ((flg & AIO_NO_DUPS) && + _aio_hash_insert(&aiocbp->aio_resultp, reqp) != 0) { + aio_panic("_aio_rw64(): request already in hash table"); + _aio_req_free(reqp); + errno = EINVAL; + return (-1); + } + _aio_req_add(reqp, nextworker, mode); + return (0); +} +#endif /* !defined(_LP64) */ |