diff options
author | raf <none@none> | 2006-06-20 19:21:46 -0700 |
---|---|---|
committer | raf <none@none> | 2006-06-20 19:21:46 -0700 |
commit | f841f6ad96ea6675d6c6b35c749eaac601799fdf (patch) | |
tree | 698db5d44fc99bd070613c93a497f5e93e803bc5 /usr/src/lib/libc/port/rt/mqueue.c | |
parent | 545e5dad7996785cfebf226c5ef410c1b154740f (diff) | |
download | illumos-gate-f841f6ad96ea6675d6c6b35c749eaac601799fdf.tar.gz |
6416832 libaio and librt can and should be folded into libc
--HG--
rename : usr/src/cmd/perl/5.8.4/distrib/ext/Time/HiRes/hints/solaris.pl => deleted_files/usr/src/cmd/perl/5.8.4/distrib/ext/Time/HiRes/hints/solaris.pl
rename : usr/src/lib/libaio/common/Makefile => deleted_files/usr/src/lib/libaio/common/Makefile
rename : usr/src/lib/libaio/common/scalls.c => deleted_files/usr/src/lib/libaio/common/scalls.c
rename : usr/src/lib/libaio/common/sig.c => deleted_files/usr/src/lib/libaio/common/sig.c
rename : usr/src/lib/libaio/common/subr.c => deleted_files/usr/src/lib/libaio/common/subr.c
rename : usr/src/lib/libaio/spec/Makefile => deleted_files/usr/src/lib/libaio/spec/Makefile
rename : usr/src/lib/libaio/spec/Makefile.targ => deleted_files/usr/src/lib/libaio/spec/Makefile.targ
rename : usr/src/lib/libaio/spec/amd64/Makefile => deleted_files/usr/src/lib/libaio/spec/amd64/Makefile
rename : usr/src/lib/libaio/spec/i386/Makefile => deleted_files/usr/src/lib/libaio/spec/i386/Makefile
rename : usr/src/lib/libaio/spec/sparc/Makefile => deleted_files/usr/src/lib/libaio/spec/sparc/Makefile
rename : usr/src/lib/libaio/spec/sparcv9/Makefile => deleted_files/usr/src/lib/libaio/spec/sparcv9/Makefile
rename : usr/src/lib/libaio/spec/versions => deleted_files/usr/src/lib/libaio/spec/versions
rename : usr/src/lib/librt/common/Makefile => deleted_files/usr/src/lib/librt/common/Makefile
rename : usr/src/lib/librt/common/aio.c => deleted_files/usr/src/lib/librt/common/aio.c
rename : usr/src/lib/librt/common/fdatasync.c => deleted_files/usr/src/lib/librt/common/fdatasync.c
rename : usr/src/lib/librt/common/mqlib.h => deleted_files/usr/src/lib/librt/common/mqlib.h
rename : usr/src/lib/librt/common/pos4.c => deleted_files/usr/src/lib/librt/common/pos4.c
rename : usr/src/lib/librt/common/pos4.h => deleted_files/usr/src/lib/librt/common/pos4.h
rename : usr/src/lib/librt/common/sigrt.c => deleted_files/usr/src/lib/librt/common/sigrt.c
rename : usr/src/lib/librt/req.flg => deleted_files/usr/src/lib/librt/req.flg
rename : usr/src/lib/librt/spec/Makefile => deleted_files/usr/src/lib/librt/spec/Makefile
rename : usr/src/lib/librt/spec/Makefile.targ => deleted_files/usr/src/lib/librt/spec/Makefile.targ
rename : usr/src/lib/librt/spec/amd64/Makefile => deleted_files/usr/src/lib/librt/spec/amd64/Makefile
rename : usr/src/lib/librt/spec/i386/Makefile => deleted_files/usr/src/lib/librt/spec/i386/Makefile
rename : usr/src/lib/librt/spec/sparc/Makefile => deleted_files/usr/src/lib/librt/spec/sparc/Makefile
rename : usr/src/lib/librt/spec/sparcv9/Makefile => deleted_files/usr/src/lib/librt/spec/sparcv9/Makefile
rename : usr/src/lib/librt/spec/versions => deleted_files/usr/src/lib/librt/spec/versions
rename : usr/src/lib/libaio/common/libaio.h => usr/src/lib/libc/inc/asyncio.h
rename : usr/src/lib/librt/common/thread_pool.h => usr/src/lib/libc/inc/thread_pool.h
rename : usr/src/lib/libaio/common/aio.c => usr/src/lib/libc/port/aio/aio.c
rename : usr/src/lib/libaio/common/ma.c => usr/src/lib/libc/port/aio/aio_alloc.c
rename : usr/src/lib/libaio/common/posix_aio.c => usr/src/lib/libc/port/aio/posix_aio.c
rename : usr/src/lib/librt/common/clock_timer.c => usr/src/lib/libc/port/rt/clock_timer.c
rename : usr/src/lib/librt/common/fallocate.c => usr/src/lib/libc/port/rt/fallocate.c
rename : usr/src/lib/librt/common/mqueue.c => usr/src/lib/libc/port/rt/mqueue.c
rename : usr/src/lib/librt/common/pos4obj.c => usr/src/lib/libc/port/rt/pos4obj.c
rename : usr/src/lib/librt/common/pos4obj.h => usr/src/lib/libc/port/rt/pos4obj.h
rename : usr/src/lib/librt/common/sched.c => usr/src/lib/libc/port/rt/sched.c
rename : usr/src/lib/librt/common/sem.c => usr/src/lib/libc/port/rt/sem.c
rename : usr/src/lib/librt/common/shm.c => usr/src/lib/libc/port/rt/shm.c
rename : usr/src/lib/librt/common/sigev_thread.c => usr/src/lib/libc/port/rt/sigev_thread.c
rename : usr/src/lib/librt/common/sigev_thread.h => usr/src/lib/libc/port/rt/sigev_thread.h
rename : usr/src/lib/librt/common/thread_pool.c => usr/src/lib/libc/port/tpool/thread_pool.c
rename : usr/src/lib/librt/common/thread_pool_impl.h => usr/src/lib/libc/port/tpool/thread_pool_impl.h
rename : usr/src/lib/libaio/spec/aio.spec => usr/src/lib/libc/spec/aio.spec
rename : usr/src/lib/librt/spec/rt.spec => usr/src/lib/libc/spec/rt.spec
rename : usr/src/lib/libaio/asynch.h => usr/src/uts/common/sys/asynch.h
Diffstat (limited to 'usr/src/lib/libc/port/rt/mqueue.c')
-rw-r--r-- | usr/src/lib/libc/port/rt/mqueue.c | 1101 |
1 files changed, 1101 insertions, 0 deletions
diff --git a/usr/src/lib/libc/port/rt/mqueue.c b/usr/src/lib/libc/port/rt/mqueue.c new file mode 100644 index 0000000000..ebab58a259 --- /dev/null +++ b/usr/src/lib/libc/port/rt/mqueue.c @@ -0,0 +1,1101 @@ +/* + * CDDL HEADER START + * + * The contents of this file are subject to the terms of the + * Common Development and Distribution License (the "License"). + * You may not use this file except in compliance with the License. + * + * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE + * or http://www.opensolaris.org/os/licensing. + * See the License for the specific language governing permissions + * and limitations under the License. + * + * When distributing Covered Code, include this CDDL HEADER in each + * file and include the License file at usr/src/OPENSOLARIS.LICENSE. + * If applicable, add the following below this CDDL HEADER, with the + * fields enclosed by brackets "[]" replaced with your own identifying + * information: Portions Copyright [yyyy] [name of copyright owner] + * + * CDDL HEADER END + */ + +/* + * Copyright 2006 Sun Microsystems, Inc. All rights reserved. + * Use is subject to license terms. + */ + +#pragma ident "%Z%%M% %I% %E% SMI" + +#pragma weak mq_open = _mq_open +#pragma weak mq_close = _mq_close +#pragma weak mq_unlink = _mq_unlink +#pragma weak mq_send = _mq_send +#pragma weak mq_timedsend = _mq_timedsend +#pragma weak mq_reltimedsend_np = _mq_reltimedsend_np +#pragma weak mq_receive = _mq_receive +#pragma weak mq_timedreceive = _mq_timedreceive +#pragma weak mq_reltimedreceive_np = _mq_reltimedreceive_np +#pragma weak mq_notify = _mq_notify +#pragma weak mq_setattr = _mq_setattr +#pragma weak mq_getattr = _mq_getattr + +#include "synonyms.h" +#include "mtlib.h" +#define _KMEMUSER +#include <sys/param.h> /* _MQ_OPEN_MAX, _MQ_PRIO_MAX, _SEM_VALUE_MAX */ +#undef _KMEMUSER +#include <mqueue.h> +#include <sys/types.h> +#include <sys/file.h> +#include <sys/mman.h> +#include <errno.h> +#include <stdarg.h> +#include <limits.h> +#include <pthread.h> +#include <assert.h> +#include <string.h> +#include <unistd.h> +#include <stdlib.h> +#include <sys/stat.h> +#include <inttypes.h> +#include "sigev_thread.h" +#include "pos4obj.h" + +/* + * Default values per message queue + */ +#define MQ_MAXMSG 128 +#define MQ_MAXSIZE 1024 + +#define MQ_MAGIC 0x4d534751 /* "MSGQ" */ + +/* + * Message header which is part of messages in link list + */ +typedef struct { + uint64_t msg_next; /* offset of next message in the link */ + uint64_t msg_len; /* length of the message */ +} msghdr_t; + +/* + * message queue description + */ +struct mq_dn { + size_t mqdn_flags; /* open description flags */ +}; + +/* + * message queue descriptor structure + */ +typedef struct mq_des { + struct mq_des *mqd_next; /* list of all open mq descriptors, */ + struct mq_des *mqd_prev; /* needed for fork-safety */ + int mqd_magic; /* magic # to identify mq_des */ + int mqd_flags; /* operation flag per open */ + struct mq_header *mqd_mq; /* address pointer of message Q */ + struct mq_dn *mqd_mqdn; /* open description */ + thread_communication_data_t *mqd_tcd; /* SIGEV_THREAD notification */ +} mqdes_t; + +/* + * message queue common header, part of the mmap()ed file. + * Since message queues may be shared between 32- and 64-bit processes, + * care must be taken to make sure that the elements of this structure + * are identical for both _LP64 and _ILP32 cases. + */ +typedef struct mq_header { + /* first field must be mq_totsize, DO NOT insert before this */ + int64_t mq_totsize; /* total size of the Queue */ + int64_t mq_maxsz; /* max size of each message */ + uint32_t mq_maxmsg; /* max messages in the queue */ + uint32_t mq_maxprio; /* maximum mqueue priority */ + uint32_t mq_curmaxprio; /* current maximum MQ priority */ + uint32_t mq_mask; /* priority bitmask */ + uint64_t mq_freep; /* free message's head pointer */ + uint64_t mq_headpp; /* pointer to head pointers */ + uint64_t mq_tailpp; /* pointer to tail pointers */ + signotify_id_t mq_sigid; /* notification id (3 int's) */ + uint32_t mq_ntype; /* notification type (SIGEV_*) */ + uint64_t mq_des; /* pointer to msg Q descriptor */ + mutex_t mq_exclusive; /* acquire for exclusive access */ + sem_t mq_rblocked; /* number of processes rblocked */ + sem_t mq_notfull; /* mq_send()'s block on this */ + sem_t mq_notempty; /* mq_receive()'s block on this */ + sem_t mq_spawner; /* spawner thread blocks on this */ +} mqhdr_t; + +/* + * The code assumes that _MQ_OPEN_MAX == -1 or "no fixed implementation limit". + * If this assumption is somehow invalidated, mq_open() needs to be changed + * back to the old version which kept a count and enforced a limit. + * We make sure that this is pointed out to those changing <sys/param.h> + * by checking _MQ_OPEN_MAX at compile time. + */ +#if _MQ_OPEN_MAX != -1 +#error "mq_open() no longer enforces _MQ_OPEN_MAX and needs fixing." +#endif + +#define MQ_ALIGNSIZE 8 /* 64-bit alignment */ + +#ifdef DEBUG +#define MQ_ASSERT(x) assert(x); + +#define MQ_ASSERT_PTR(_m, _p) \ + assert((_p) != NULL && !((uintptr_t)(_p) & (MQ_ALIGNSIZE -1)) && \ + !((uintptr_t)_m + (uintptr_t)(_p) >= (uintptr_t)_m + \ + _m->mq_totsize)); + +#define MQ_ASSERT_SEMVAL_LEQ(sem, val) { \ + int _val; \ + (void) sem_getvalue((sem), &_val); \ + assert((_val) <= val); } +#else +#define MQ_ASSERT(x) +#define MQ_ASSERT_PTR(_m, _p) +#define MQ_ASSERT_SEMVAL_LEQ(sem, val) +#endif + +#define MQ_PTR(m, n) ((msghdr_t *)((uintptr_t)m + (uintptr_t)n)) +#define HEAD_PTR(m, n) ((uint64_t *)((uintptr_t)m + \ + (uintptr_t)m->mq_headpp + n * sizeof (uint64_t))) +#define TAIL_PTR(m, n) ((uint64_t *)((uintptr_t)m + \ + (uintptr_t)m->mq_tailpp + n * sizeof (uint64_t))) + +#define MQ_RESERVED ((mqdes_t *)-1) + +#define ABS_TIME 0 +#define REL_TIME 1 + +static mutex_t mq_list_lock = DEFAULTMUTEX; +static mqdes_t *mq_list = NULL; + +extern int __signotify(int cmd, siginfo_t *sigonfo, signotify_id_t *sn_id); + +static int +mq_is_valid(mqdes_t *mqdp) +{ + /* + * Any use of a message queue after it was closed is + * undefined. But the standard strongly favours EBADF + * returns. Before we dereference which could be fatal, + * we first do some pointer sanity checks. + */ + if (mqdp != NULL && mqdp != MQ_RESERVED && + ((uintptr_t)mqdp & 0x7) == 0) { + return (mqdp->mqd_magic == MQ_MAGIC); + } + + return (0); +} + +static void +mq_init(mqhdr_t *mqhp, size_t msgsize, ssize_t maxmsg) +{ + int i; + uint64_t temp; + uint64_t currentp; + uint64_t nextp; + + /* + * We only need to initialize the non-zero fields. The use of + * ftruncate() on the message queue file assures that the + * pages will be zfod. + */ + (void) mutex_init(&mqhp->mq_exclusive, USYNC_PROCESS, NULL); + (void) sem_init(&mqhp->mq_rblocked, 1, 0); + (void) sem_init(&mqhp->mq_notempty, 1, 0); + (void) sem_init(&mqhp->mq_spawner, 1, 0); + (void) sem_init(&mqhp->mq_notfull, 1, (uint_t)maxmsg); + + mqhp->mq_maxsz = msgsize; + mqhp->mq_maxmsg = maxmsg; + + /* + * As of this writing (1997), there are 32 message queue priorities. + * If this is to change, then the size of the mq_mask will + * also have to change. If DEBUG is defined, assert that + * _MQ_PRIO_MAX hasn't changed. + */ + mqhp->mq_maxprio = _MQ_PRIO_MAX; +#if defined(DEBUG) + /* LINTED always true */ + MQ_ASSERT(sizeof (mqhp->mq_mask) * 8 >= _MQ_PRIO_MAX); +#endif + + /* + * Since the message queue can be mapped into different + * virtual address ranges by different processes, we don't + * keep track of pointers, only offsets into the shared region. + */ + mqhp->mq_headpp = sizeof (mqhdr_t); + mqhp->mq_tailpp = mqhp->mq_headpp + + mqhp->mq_maxprio * sizeof (uint64_t); + mqhp->mq_freep = mqhp->mq_tailpp + + mqhp->mq_maxprio * sizeof (uint64_t); + + currentp = mqhp->mq_freep; + MQ_PTR(mqhp, currentp)->msg_next = 0; + + temp = (mqhp->mq_maxsz + MQ_ALIGNSIZE - 1) & ~(MQ_ALIGNSIZE - 1); + for (i = 1; i < mqhp->mq_maxmsg; i++) { + nextp = currentp + sizeof (msghdr_t) + temp; + MQ_PTR(mqhp, currentp)->msg_next = nextp; + MQ_PTR(mqhp, nextp)->msg_next = 0; + currentp = nextp; + } +} + +static size_t +mq_getmsg(mqhdr_t *mqhp, char *msgp, uint_t *msg_prio) +{ + uint64_t currentp; + msghdr_t *curbuf; + uint64_t *headpp; + uint64_t *tailpp; + + MQ_ASSERT(MUTEX_HELD(&mqhp->mq_exclusive)); + + /* + * Get the head and tail pointers for the queue of maximum + * priority. We shouldn't be here unless there is a message for + * us, so it's fair to assert that both the head and tail + * pointers are non-NULL. + */ + headpp = HEAD_PTR(mqhp, mqhp->mq_curmaxprio); + tailpp = TAIL_PTR(mqhp, mqhp->mq_curmaxprio); + + if (msg_prio != NULL) + *msg_prio = mqhp->mq_curmaxprio; + + currentp = *headpp; + MQ_ASSERT_PTR(mqhp, currentp); + curbuf = MQ_PTR(mqhp, currentp); + + if ((*headpp = curbuf->msg_next) == NULL) { + /* + * We just nuked the last message in this priority's queue. + * Twiddle this priority's bit, and then find the next bit + * tipped. + */ + uint_t prio = mqhp->mq_curmaxprio; + + mqhp->mq_mask &= ~(1u << prio); + + for (; prio != 0; prio--) + if (mqhp->mq_mask & (1u << prio)) + break; + mqhp->mq_curmaxprio = prio; + + *tailpp = NULL; + } + + /* + * Copy the message, and put the buffer back on the free list. + */ + (void) memcpy(msgp, (char *)&curbuf[1], curbuf->msg_len); + curbuf->msg_next = mqhp->mq_freep; + mqhp->mq_freep = currentp; + + return (curbuf->msg_len); +} + + +static void +mq_putmsg(mqhdr_t *mqhp, const char *msgp, ssize_t len, uint_t prio) +{ + uint64_t currentp; + msghdr_t *curbuf; + uint64_t *headpp; + uint64_t *tailpp; + + MQ_ASSERT(MUTEX_HELD(&mqhp->mq_exclusive)); + + /* + * Grab a free message block, and link it in. We shouldn't + * be here unless there is room in the queue for us; it's + * fair to assert that the free pointer is non-NULL. + */ + currentp = mqhp->mq_freep; + MQ_ASSERT_PTR(mqhp, currentp); + curbuf = MQ_PTR(mqhp, currentp); + + /* + * Remove a message from the free list, and copy in the new contents. + */ + mqhp->mq_freep = curbuf->msg_next; + curbuf->msg_next = NULL; + (void) memcpy((char *)&curbuf[1], msgp, len); + curbuf->msg_len = len; + + headpp = HEAD_PTR(mqhp, prio); + tailpp = TAIL_PTR(mqhp, prio); + + if (*tailpp == 0) { + /* + * This is the first message on this queue. Set the + * head and tail pointers, and tip the appropriate bit + * in the priority mask. + */ + *headpp = currentp; + *tailpp = currentp; + mqhp->mq_mask |= (1u << prio); + if (prio > mqhp->mq_curmaxprio) + mqhp->mq_curmaxprio = prio; + } else { + MQ_ASSERT_PTR(mqhp, *tailpp); + MQ_PTR(mqhp, *tailpp)->msg_next = currentp; + *tailpp = currentp; + } +} + +mqd_t +_mq_open(const char *path, int oflag, /* mode_t mode, mq_attr *attr */ ...) +{ + va_list ap; + mode_t mode; + struct mq_attr *attr; + int fd; + int err; + int cr_flag = 0; + int locked = 0; + uint64_t total_size; + size_t msgsize; + ssize_t maxmsg; + uint64_t temp; + void *ptr; + mqdes_t *mqdp; + mqhdr_t *mqhp; + struct mq_dn *mqdnp; + + if (__pos4obj_check(path) == -1) + return ((mqd_t)-1); + + /* acquire MSGQ lock to have atomic operation */ + if (__pos4obj_lock(path, MQ_LOCK_TYPE) < 0) + goto out; + locked = 1; + + va_start(ap, oflag); + /* filter oflag to have READ/WRITE/CREATE modes only */ + oflag = oflag & (O_RDONLY|O_WRONLY|O_RDWR|O_CREAT|O_EXCL|O_NONBLOCK); + if ((oflag & O_CREAT) != 0) { + mode = va_arg(ap, mode_t); + attr = va_arg(ap, struct mq_attr *); + } + va_end(ap); + + if ((fd = __pos4obj_open(path, MQ_PERM_TYPE, oflag, + mode, &cr_flag)) < 0) + goto out; + + /* closing permission file */ + (void) __close_nc(fd); + + /* Try to open/create data file */ + if (cr_flag) { + cr_flag = PFILE_CREATE; + if (attr == NULL) { + maxmsg = MQ_MAXMSG; + msgsize = MQ_MAXSIZE; + } else if (attr->mq_maxmsg <= 0 || attr->mq_msgsize <= 0) { + errno = EINVAL; + goto out; + } else if (attr->mq_maxmsg > _SEM_VALUE_MAX) { + errno = ENOSPC; + goto out; + } else { + maxmsg = attr->mq_maxmsg; + msgsize = attr->mq_msgsize; + } + + /* adjust for message size at word boundary */ + temp = (msgsize + MQ_ALIGNSIZE - 1) & ~(MQ_ALIGNSIZE - 1); + + total_size = sizeof (mqhdr_t) + + maxmsg * (temp + sizeof (msghdr_t)) + + 2 * _MQ_PRIO_MAX * sizeof (uint64_t); + + if (total_size > SSIZE_MAX) { + errno = ENOSPC; + goto out; + } + + /* + * data file is opened with read/write to those + * who have read or write permission + */ + mode = mode | (mode & 0444) >> 1 | (mode & 0222) << 1; + if ((fd = __pos4obj_open(path, MQ_DATA_TYPE, + (O_RDWR|O_CREAT|O_EXCL), mode, &err)) < 0) + goto out; + + cr_flag |= DFILE_CREATE | DFILE_OPEN; + + /* force permissions to avoid umask effect */ + if (fchmod(fd, mode) < 0) + goto out; + + if (ftruncate64(fd, (off64_t)total_size) < 0) + goto out; + } else { + if ((fd = __pos4obj_open(path, MQ_DATA_TYPE, + O_RDWR, 0666, &err)) < 0) + goto out; + cr_flag = DFILE_OPEN; + + /* Message queue has not been initialized yet */ + if (read(fd, &total_size, sizeof (total_size)) != + sizeof (total_size) || total_size == 0) { + errno = ENOENT; + goto out; + } + + /* Message queue too big for this process to handle */ + if (total_size > SSIZE_MAX) { + errno = EFBIG; + goto out; + } + } + + if ((mqdp = (mqdes_t *)malloc(sizeof (mqdes_t))) == NULL) { + errno = ENOMEM; + goto out; + } + cr_flag |= ALLOC_MEM; + + if ((ptr = mmap64(NULL, total_size, PROT_READ|PROT_WRITE, + MAP_SHARED, fd, (off64_t)0)) == MAP_FAILED) + goto out; + mqhp = ptr; + cr_flag |= DFILE_MMAP; + + /* closing data file */ + (void) __close_nc(fd); + cr_flag &= ~DFILE_OPEN; + + /* + * create, unlink, size, mmap, and close description file + * all for a flag word in anonymous shared memory + */ + if ((fd = __pos4obj_open(path, MQ_DSCN_TYPE, O_RDWR | O_CREAT, + 0666, &err)) < 0) + goto out; + cr_flag |= DFILE_OPEN; + (void) __pos4obj_unlink(path, MQ_DSCN_TYPE); + if (ftruncate64(fd, (off64_t)sizeof (struct mq_dn)) < 0) + goto out; + + if ((ptr = mmap64(NULL, sizeof (struct mq_dn), + PROT_READ | PROT_WRITE, MAP_SHARED, fd, (off64_t)0)) == MAP_FAILED) + goto out; + mqdnp = ptr; + cr_flag |= MQDNP_MMAP; + + (void) __close_nc(fd); + cr_flag &= ~DFILE_OPEN; + + /* + * we follow the same strategy as filesystem open() routine, + * where fcntl.h flags are changed to flags defined in file.h. + */ + mqdp->mqd_flags = (oflag - FOPEN) & (FREAD|FWRITE); + mqdnp->mqdn_flags = (oflag - FOPEN) & (FNONBLOCK); + + /* new message queue requires initialization */ + if ((cr_flag & DFILE_CREATE) != 0) { + /* message queue header has to be initialized */ + mq_init(mqhp, msgsize, maxmsg); + mqhp->mq_totsize = total_size; + } + mqdp->mqd_mq = mqhp; + mqdp->mqd_mqdn = mqdnp; + mqdp->mqd_magic = MQ_MAGIC; + mqdp->mqd_tcd = NULL; + if (__pos4obj_unlock(path, MQ_LOCK_TYPE) == 0) { + lmutex_lock(&mq_list_lock); + mqdp->mqd_next = mq_list; + mqdp->mqd_prev = NULL; + if (mq_list) + mq_list->mqd_prev = mqdp; + mq_list = mqdp; + lmutex_unlock(&mq_list_lock); + return ((mqd_t)mqdp); + } + + locked = 0; /* fall into the error case */ +out: + err = errno; + if ((cr_flag & DFILE_OPEN) != 0) + (void) __close_nc(fd); + if ((cr_flag & DFILE_CREATE) != 0) + (void) __pos4obj_unlink(path, MQ_DATA_TYPE); + if ((cr_flag & PFILE_CREATE) != 0) + (void) __pos4obj_unlink(path, MQ_PERM_TYPE); + if ((cr_flag & ALLOC_MEM) != 0) + free((void *)mqdp); + if ((cr_flag & DFILE_MMAP) != 0) + (void) munmap((caddr_t)mqhp, (size_t)total_size); + if ((cr_flag & MQDNP_MMAP) != 0) + (void) munmap((caddr_t)mqdnp, sizeof (struct mq_dn)); + if (locked) + (void) __pos4obj_unlock(path, MQ_LOCK_TYPE); + errno = err; + return ((mqd_t)-1); +} + +static void +mq_close_cleanup(mqdes_t *mqdp) +{ + mqhdr_t *mqhp = mqdp->mqd_mq; + struct mq_dn *mqdnp = mqdp->mqd_mqdn; + + /* invalidate the descriptor before freeing it */ + mqdp->mqd_magic = 0; + (void) mutex_unlock(&mqhp->mq_exclusive); + + lmutex_lock(&mq_list_lock); + if (mqdp->mqd_next) + mqdp->mqd_next->mqd_prev = mqdp->mqd_prev; + if (mqdp->mqd_prev) + mqdp->mqd_prev->mqd_next = mqdp->mqd_next; + if (mq_list == mqdp) + mq_list = mqdp->mqd_next; + lmutex_unlock(&mq_list_lock); + + free(mqdp); + (void) munmap((caddr_t)mqdnp, sizeof (struct mq_dn)); + (void) munmap((caddr_t)mqhp, (size_t)mqhp->mq_totsize); +} + +int +_mq_close(mqd_t mqdes) +{ + mqdes_t *mqdp = (mqdes_t *)mqdes; + mqhdr_t *mqhp; + thread_communication_data_t *tcdp; + + if (!mq_is_valid(mqdp)) { + errno = EBADF; + return (-1); + } + + mqhp = mqdp->mqd_mq; + (void) mutex_lock(&mqhp->mq_exclusive); + + if (mqhp->mq_des == (uintptr_t)mqdp && + mqhp->mq_sigid.sn_pid == getpid()) { + /* notification is set for this descriptor, remove it */ + (void) __signotify(SN_CANCEL, NULL, &mqhp->mq_sigid); + mqhp->mq_ntype = 0; + mqhp->mq_des = 0; + } + + pthread_cleanup_push(mq_close_cleanup, mqdp); + if ((tcdp = mqdp->mqd_tcd) != NULL) { + mqdp->mqd_tcd = NULL; + del_sigev_mq(tcdp); /* possible cancellation point */ + } + pthread_cleanup_pop(1); /* finish in the cleanup handler */ + + return (0); +} + +int +_mq_unlink(const char *path) +{ + int err; + + if (__pos4obj_check(path) < 0) + return (-1); + + if (__pos4obj_lock(path, MQ_LOCK_TYPE) < 0) { + return (-1); + } + + err = __pos4obj_unlink(path, MQ_PERM_TYPE); + + if (err == 0 || (err == -1 && errno == EEXIST)) { + errno = 0; + err = __pos4obj_unlink(path, MQ_DATA_TYPE); + } + + if (__pos4obj_unlock(path, MQ_LOCK_TYPE) < 0) + return (-1); + + return (err); + +} + +static int +__mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len, + uint_t msg_prio, const timespec_t *timeout, int abs_rel) +{ + mqdes_t *mqdp = (mqdes_t *)mqdes; + mqhdr_t *mqhp; + int err; + int notify = 0; + + /* + * sem_*wait() does cancellation, if called. + * pthread_testcancel() ensures that cancellation takes place if + * there is a cancellation pending when mq_*send() is called. + */ + pthread_testcancel(); + + if (!mq_is_valid(mqdp) || (mqdp->mqd_flags & FWRITE) == 0) { + errno = EBADF; + return (-1); + } + + mqhp = mqdp->mqd_mq; + + if (msg_prio >= mqhp->mq_maxprio) { + errno = EINVAL; + return (-1); + } + if (msg_len > mqhp->mq_maxsz) { + errno = EMSGSIZE; + return (-1); + } + + if (mqdp->mqd_mqdn->mqdn_flags & O_NONBLOCK) + err = sem_trywait(&mqhp->mq_notfull); + else { + /* + * We might get cancelled here... + */ + if (timeout == NULL) + err = sem_wait(&mqhp->mq_notfull); + else if (abs_rel == ABS_TIME) + err = sem_timedwait(&mqhp->mq_notfull, timeout); + else + err = sem_reltimedwait_np(&mqhp->mq_notfull, timeout); + } + if (err == -1) { + /* + * errno has been set to EAGAIN / EINTR / ETIMEDOUT + * by sem_*wait(), so we can just return. + */ + return (-1); + } + + /* + * By the time we're here, we know that we've got the capacity + * to add to the queue...now acquire the exclusive lock. + */ + (void) mutex_lock(&mqhp->mq_exclusive); + + /* + * Now determine if we want to kick the notification. POSIX + * requires that if a process has registered for notification, + * we must kick it when the queue makes an empty to non-empty + * transition, and there are no blocked receivers. Note that + * this mechanism does _not_ guarantee that the kicked process + * will be able to receive a message without blocking; + * another receiver could intervene in the meantime. Thus, + * the notification mechanism is inherently racy; all we can + * do is hope to minimize the window as much as possible. + * In general, we want to avoid kicking the notification when + * there are clearly receivers blocked. We'll determine if + * we want to kick the notification before the mq_putmsg(), + * but the actual signotify() won't be done until the message + * is on the queue. + */ + if (mqhp->mq_sigid.sn_pid != 0) { + int nmessages, nblocked; + + (void) sem_getvalue(&mqhp->mq_notempty, &nmessages); + (void) sem_getvalue(&mqhp->mq_rblocked, &nblocked); + + if (nmessages == 0 && nblocked == 0) + notify = 1; + } + + mq_putmsg(mqhp, msg_ptr, (ssize_t)msg_len, msg_prio); + (void) sem_post(&mqhp->mq_notempty); + + if (notify) { + /* notify and also delete the registration */ + (void) __signotify(SN_SEND, NULL, &mqhp->mq_sigid); + if (mqhp->mq_ntype == SIGEV_THREAD || + mqhp->mq_ntype == SIGEV_PORT) + (void) sem_post(&mqhp->mq_spawner); + mqhp->mq_ntype = 0; + mqhp->mq_des = 0; + } + + MQ_ASSERT_SEMVAL_LEQ(&mqhp->mq_notempty, ((int)mqhp->mq_maxmsg)); + (void) mutex_unlock(&mqhp->mq_exclusive); + + return (0); +} + +int +_mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, uint_t msg_prio) +{ + return (__mq_timedsend(mqdes, msg_ptr, msg_len, msg_prio, + NULL, ABS_TIME)); +} + +int +_mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len, + uint_t msg_prio, const timespec_t *abs_timeout) +{ + return (__mq_timedsend(mqdes, msg_ptr, msg_len, msg_prio, + abs_timeout, ABS_TIME)); +} + +int +_mq_reltimedsend_np(mqd_t mqdes, const char *msg_ptr, size_t msg_len, + uint_t msg_prio, const timespec_t *rel_timeout) +{ + return (__mq_timedsend(mqdes, msg_ptr, msg_len, msg_prio, + rel_timeout, REL_TIME)); +} + +static void +decrement_rblocked(mqhdr_t *mqhp) +{ + int canstate; + + (void) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &canstate); + while (sem_wait(&mqhp->mq_rblocked) == -1) + continue; + (void) pthread_setcancelstate(canstate, NULL); +} + +static ssize_t +__mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len, + uint_t *msg_prio, const timespec_t *timeout, int abs_rel) +{ + mqdes_t *mqdp = (mqdes_t *)mqdes; + mqhdr_t *mqhp; + ssize_t msg_size; + int err; + + /* + * sem_*wait() does cancellation, if called. + * pthread_testcancel() ensures that cancellation takes place if + * there is a cancellation pending when mq_*receive() is called. + */ + pthread_testcancel(); + + if (!mq_is_valid(mqdp) || (mqdp->mqd_flags & FREAD) == 0) { + errno = EBADF; + return (ssize_t)(-1); + } + + mqhp = mqdp->mqd_mq; + + if (msg_len < mqhp->mq_maxsz) { + errno = EMSGSIZE; + return (ssize_t)(-1); + } + + /* + * The semaphoring scheme for mq_[timed]receive is a little hairy + * thanks to POSIX.1b's arcane notification mechanism. First, + * we try to take the common case and do a sem_trywait(). + * If that doesn't work, and O_NONBLOCK hasn't been set, + * then note that we're going to sleep by incrementing the rblocked + * semaphore. We decrement that semaphore after waking up. + */ + if (sem_trywait(&mqhp->mq_notempty) == -1) { + if ((mqdp->mqd_mqdn->mqdn_flags & O_NONBLOCK) != 0) { + /* + * errno has been set to EAGAIN or EINTR by + * sem_trywait(), so we can just return. + */ + return (-1); + } + /* + * If we're here, then we're probably going to block... + * increment the rblocked semaphore. If we get + * cancelled, decrement_rblocked() will decrement it. + */ + (void) sem_post(&mqhp->mq_rblocked); + + pthread_cleanup_push(decrement_rblocked, mqhp); + if (timeout == NULL) + err = sem_wait(&mqhp->mq_notempty); + else if (abs_rel == ABS_TIME) + err = sem_timedwait(&mqhp->mq_notempty, timeout); + else + err = sem_reltimedwait_np(&mqhp->mq_notempty, timeout); + pthread_cleanup_pop(1); + + if (err == -1) { + /* + * We took a signal or timeout while waiting + * on mq_notempty... + */ + return (-1); + } + } + + (void) mutex_lock(&mqhp->mq_exclusive); + msg_size = mq_getmsg(mqhp, msg_ptr, msg_prio); + (void) sem_post(&mqhp->mq_notfull); + MQ_ASSERT_SEMVAL_LEQ(&mqhp->mq_notfull, ((int)mqhp->mq_maxmsg)); + (void) mutex_unlock(&mqhp->mq_exclusive); + + return (msg_size); +} + +ssize_t +_mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, uint_t *msg_prio) +{ + return (__mq_timedreceive(mqdes, msg_ptr, msg_len, msg_prio, + NULL, ABS_TIME)); +} + +ssize_t +_mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len, + uint_t *msg_prio, const timespec_t *abs_timeout) +{ + return (__mq_timedreceive(mqdes, msg_ptr, msg_len, msg_prio, + abs_timeout, ABS_TIME)); +} + +ssize_t +_mq_reltimedreceive_np(mqd_t mqdes, char *msg_ptr, size_t msg_len, + uint_t *msg_prio, const timespec_t *rel_timeout) +{ + return (__mq_timedreceive(mqdes, msg_ptr, msg_len, msg_prio, + rel_timeout, REL_TIME)); +} + +/* + * Only used below, in _mq_notify(). + * We already have a spawner thread. + * Verify that the attributes match; cancel it if necessary. + */ +static int +cancel_if_necessary(thread_communication_data_t *tcdp, + const struct sigevent *sigevp) +{ + int do_cancel = !_pthread_attr_equal(tcdp->tcd_attrp, + sigevp->sigev_notify_attributes); + + if (do_cancel) { + /* + * Attributes don't match, cancel the spawner thread. + */ + (void) pthread_cancel(tcdp->tcd_server_id); + } else { + /* + * Reuse the existing spawner thread with possibly + * changed notification function and value. + */ + tcdp->tcd_notif.sigev_notify = SIGEV_THREAD; + tcdp->tcd_notif.sigev_signo = 0; + tcdp->tcd_notif.sigev_value = sigevp->sigev_value; + tcdp->tcd_notif.sigev_notify_function = + sigevp->sigev_notify_function; + } + + return (do_cancel); +} + +int +_mq_notify(mqd_t mqdes, const struct sigevent *sigevp) +{ + mqdes_t *mqdp = (mqdes_t *)mqdes; + mqhdr_t *mqhp; + thread_communication_data_t *tcdp; + siginfo_t mq_siginfo; + struct sigevent sigevent; + struct stat64 statb; + port_notify_t *pn; + void *userval; + int rval = -1; + int ntype; + int port; + + if (!mq_is_valid(mqdp)) { + errno = EBADF; + return (-1); + } + + mqhp = mqdp->mqd_mq; + + (void) mutex_lock(&mqhp->mq_exclusive); + + if (sigevp == NULL) { /* remove notification */ + if (mqhp->mq_des == (uintptr_t)mqdp && + mqhp->mq_sigid.sn_pid == getpid()) { + /* notification is set for this descriptor, remove it */ + (void) __signotify(SN_CANCEL, NULL, &mqhp->mq_sigid); + if ((tcdp = mqdp->mqd_tcd) != NULL) { + sig_mutex_lock(&tcdp->tcd_lock); + if (tcdp->tcd_msg_enabled) { + /* cancel the spawner thread */ + tcdp = mqdp->mqd_tcd; + mqdp->mqd_tcd = NULL; + (void) pthread_cancel( + tcdp->tcd_server_id); + } + sig_mutex_unlock(&tcdp->tcd_lock); + } + mqhp->mq_ntype = 0; + mqhp->mq_des = 0; + } else { + /* notification is not set for this descriptor */ + errno = EBUSY; + goto bad; + } + } else { /* register notification with this process */ + switch (ntype = sigevp->sigev_notify) { + case SIGEV_THREAD: + userval = sigevp->sigev_value.sival_ptr; + port = -1; + break; + case SIGEV_PORT: + pn = sigevp->sigev_value.sival_ptr; + userval = pn->portnfy_user; + port = pn->portnfy_port; + if (fstat64(port, &statb) != 0 || + !S_ISPORT(statb.st_mode)) { + errno = EBADF; + goto bad; + } + (void) memset(&sigevent, 0, sizeof (sigevent)); + sigevent.sigev_notify = SIGEV_PORT; + sigevp = &sigevent; + break; + } + switch (ntype) { + case SIGEV_NONE: + mq_siginfo.si_signo = 0; + mq_siginfo.si_code = SI_MESGQ; + break; + case SIGEV_SIGNAL: + mq_siginfo.si_signo = sigevp->sigev_signo; + mq_siginfo.si_value = sigevp->sigev_value; + mq_siginfo.si_code = SI_MESGQ; + break; + case SIGEV_THREAD: + if ((tcdp = mqdp->mqd_tcd) != NULL && + cancel_if_necessary(tcdp, sigevp)) + mqdp->mqd_tcd = NULL; + /* FALLTHROUGH */ + case SIGEV_PORT: + if ((tcdp = mqdp->mqd_tcd) == NULL) { + /* we must create a spawner thread */ + tcdp = setup_sigev_handler(sigevp, MQ); + if (tcdp == NULL) { + errno = EBADF; + goto bad; + } + tcdp->tcd_msg_enabled = 0; + tcdp->tcd_msg_closing = 0; + tcdp->tcd_msg_avail = &mqhp->mq_spawner; + if (launch_spawner(tcdp) != 0) { + free_sigev_handler(tcdp); + goto bad; + } + mqdp->mqd_tcd = tcdp; + } + mq_siginfo.si_signo = 0; + mq_siginfo.si_code = SI_MESGQ; + break; + default: + errno = EINVAL; + goto bad; + } + + /* register notification */ + if (__signotify(SN_PROC, &mq_siginfo, &mqhp->mq_sigid) < 0) + goto bad; + mqhp->mq_ntype = ntype; + mqhp->mq_des = (uintptr_t)mqdp; + switch (ntype) { + case SIGEV_THREAD: + case SIGEV_PORT: + tcdp->tcd_port = port; + tcdp->tcd_msg_object = mqdp; + tcdp->tcd_msg_userval = userval; + sig_mutex_lock(&tcdp->tcd_lock); + tcdp->tcd_msg_enabled = ntype; + sig_mutex_unlock(&tcdp->tcd_lock); + (void) cond_broadcast(&tcdp->tcd_cv); + break; + } + } + + rval = 0; /* success */ +bad: + (void) mutex_unlock(&mqhp->mq_exclusive); + return (rval); +} + +int +_mq_setattr(mqd_t mqdes, const struct mq_attr *mqstat, struct mq_attr *omqstat) +{ + mqdes_t *mqdp = (mqdes_t *)mqdes; + mqhdr_t *mqhp; + uint_t flag = 0; + + if (!mq_is_valid(mqdp)) { + errno = EBADF; + return (-1); + } + + /* store current attributes */ + if (omqstat != NULL) { + int count; + + mqhp = mqdp->mqd_mq; + omqstat->mq_flags = mqdp->mqd_mqdn->mqdn_flags; + omqstat->mq_maxmsg = (long)mqhp->mq_maxmsg; + omqstat->mq_msgsize = (long)mqhp->mq_maxsz; + (void) sem_getvalue(&mqhp->mq_notempty, &count); + omqstat->mq_curmsgs = count; + } + + /* set description attributes */ + if ((mqstat->mq_flags & O_NONBLOCK) != 0) + flag = FNONBLOCK; + mqdp->mqd_mqdn->mqdn_flags = flag; + + return (0); +} + +int +_mq_getattr(mqd_t mqdes, struct mq_attr *mqstat) +{ + mqdes_t *mqdp = (mqdes_t *)mqdes; + mqhdr_t *mqhp; + int count; + + if (!mq_is_valid(mqdp)) { + errno = EBADF; + return (-1); + } + + mqhp = mqdp->mqd_mq; + + mqstat->mq_flags = mqdp->mqd_mqdn->mqdn_flags; + mqstat->mq_maxmsg = (long)mqhp->mq_maxmsg; + mqstat->mq_msgsize = (long)mqhp->mq_maxsz; + (void) sem_getvalue(&mqhp->mq_notempty, &count); + mqstat->mq_curmsgs = count; + return (0); +} + +/* + * Cleanup after fork1() in the child process. + */ +void +postfork1_child_sigev_mq(void) +{ + thread_communication_data_t *tcdp; + mqdes_t *mqdp; + + for (mqdp = mq_list; mqdp; mqdp = mqdp->mqd_next) { + if ((tcdp = mqdp->mqd_tcd) != NULL) { + mqdp->mqd_tcd = NULL; + tcd_teardown(tcdp); + } + } +} |