summaryrefslogtreecommitdiff
path: root/usr/src/lib/libc/port/rt/mqueue.c
diff options
context:
space:
mode:
Diffstat (limited to 'usr/src/lib/libc/port/rt/mqueue.c')
-rw-r--r--usr/src/lib/libc/port/rt/mqueue.c1101
1 files changed, 1101 insertions, 0 deletions
diff --git a/usr/src/lib/libc/port/rt/mqueue.c b/usr/src/lib/libc/port/rt/mqueue.c
new file mode 100644
index 0000000000..ebab58a259
--- /dev/null
+++ b/usr/src/lib/libc/port/rt/mqueue.c
@@ -0,0 +1,1101 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License (the "License").
+ * You may not use this file except in compliance with the License.
+ *
+ * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
+ * or http://www.opensolaris.org/os/licensing.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information: Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ */
+
+/*
+ * Copyright 2006 Sun Microsystems, Inc. All rights reserved.
+ * Use is subject to license terms.
+ */
+
+#pragma ident "%Z%%M% %I% %E% SMI"
+
+#pragma weak mq_open = _mq_open
+#pragma weak mq_close = _mq_close
+#pragma weak mq_unlink = _mq_unlink
+#pragma weak mq_send = _mq_send
+#pragma weak mq_timedsend = _mq_timedsend
+#pragma weak mq_reltimedsend_np = _mq_reltimedsend_np
+#pragma weak mq_receive = _mq_receive
+#pragma weak mq_timedreceive = _mq_timedreceive
+#pragma weak mq_reltimedreceive_np = _mq_reltimedreceive_np
+#pragma weak mq_notify = _mq_notify
+#pragma weak mq_setattr = _mq_setattr
+#pragma weak mq_getattr = _mq_getattr
+
+#include "synonyms.h"
+#include "mtlib.h"
+#define _KMEMUSER
+#include <sys/param.h> /* _MQ_OPEN_MAX, _MQ_PRIO_MAX, _SEM_VALUE_MAX */
+#undef _KMEMUSER
+#include <mqueue.h>
+#include <sys/types.h>
+#include <sys/file.h>
+#include <sys/mman.h>
+#include <errno.h>
+#include <stdarg.h>
+#include <limits.h>
+#include <pthread.h>
+#include <assert.h>
+#include <string.h>
+#include <unistd.h>
+#include <stdlib.h>
+#include <sys/stat.h>
+#include <inttypes.h>
+#include "sigev_thread.h"
+#include "pos4obj.h"
+
+/*
+ * Default values per message queue
+ */
+#define MQ_MAXMSG 128
+#define MQ_MAXSIZE 1024
+
+#define MQ_MAGIC 0x4d534751 /* "MSGQ" */
+
+/*
+ * Message header which is part of messages in link list
+ */
+typedef struct {
+ uint64_t msg_next; /* offset of next message in the link */
+ uint64_t msg_len; /* length of the message */
+} msghdr_t;
+
+/*
+ * message queue description
+ */
+struct mq_dn {
+ size_t mqdn_flags; /* open description flags */
+};
+
+/*
+ * message queue descriptor structure
+ */
+typedef struct mq_des {
+ struct mq_des *mqd_next; /* list of all open mq descriptors, */
+ struct mq_des *mqd_prev; /* needed for fork-safety */
+ int mqd_magic; /* magic # to identify mq_des */
+ int mqd_flags; /* operation flag per open */
+ struct mq_header *mqd_mq; /* address pointer of message Q */
+ struct mq_dn *mqd_mqdn; /* open description */
+ thread_communication_data_t *mqd_tcd; /* SIGEV_THREAD notification */
+} mqdes_t;
+
+/*
+ * message queue common header, part of the mmap()ed file.
+ * Since message queues may be shared between 32- and 64-bit processes,
+ * care must be taken to make sure that the elements of this structure
+ * are identical for both _LP64 and _ILP32 cases.
+ */
+typedef struct mq_header {
+ /* first field must be mq_totsize, DO NOT insert before this */
+ int64_t mq_totsize; /* total size of the Queue */
+ int64_t mq_maxsz; /* max size of each message */
+ uint32_t mq_maxmsg; /* max messages in the queue */
+ uint32_t mq_maxprio; /* maximum mqueue priority */
+ uint32_t mq_curmaxprio; /* current maximum MQ priority */
+ uint32_t mq_mask; /* priority bitmask */
+ uint64_t mq_freep; /* free message's head pointer */
+ uint64_t mq_headpp; /* pointer to head pointers */
+ uint64_t mq_tailpp; /* pointer to tail pointers */
+ signotify_id_t mq_sigid; /* notification id (3 int's) */
+ uint32_t mq_ntype; /* notification type (SIGEV_*) */
+ uint64_t mq_des; /* pointer to msg Q descriptor */
+ mutex_t mq_exclusive; /* acquire for exclusive access */
+ sem_t mq_rblocked; /* number of processes rblocked */
+ sem_t mq_notfull; /* mq_send()'s block on this */
+ sem_t mq_notempty; /* mq_receive()'s block on this */
+ sem_t mq_spawner; /* spawner thread blocks on this */
+} mqhdr_t;
+
+/*
+ * The code assumes that _MQ_OPEN_MAX == -1 or "no fixed implementation limit".
+ * If this assumption is somehow invalidated, mq_open() needs to be changed
+ * back to the old version which kept a count and enforced a limit.
+ * We make sure that this is pointed out to those changing <sys/param.h>
+ * by checking _MQ_OPEN_MAX at compile time.
+ */
+#if _MQ_OPEN_MAX != -1
+#error "mq_open() no longer enforces _MQ_OPEN_MAX and needs fixing."
+#endif
+
+#define MQ_ALIGNSIZE 8 /* 64-bit alignment */
+
+#ifdef DEBUG
+#define MQ_ASSERT(x) assert(x);
+
+#define MQ_ASSERT_PTR(_m, _p) \
+ assert((_p) != NULL && !((uintptr_t)(_p) & (MQ_ALIGNSIZE -1)) && \
+ !((uintptr_t)_m + (uintptr_t)(_p) >= (uintptr_t)_m + \
+ _m->mq_totsize));
+
+#define MQ_ASSERT_SEMVAL_LEQ(sem, val) { \
+ int _val; \
+ (void) sem_getvalue((sem), &_val); \
+ assert((_val) <= val); }
+#else
+#define MQ_ASSERT(x)
+#define MQ_ASSERT_PTR(_m, _p)
+#define MQ_ASSERT_SEMVAL_LEQ(sem, val)
+#endif
+
+#define MQ_PTR(m, n) ((msghdr_t *)((uintptr_t)m + (uintptr_t)n))
+#define HEAD_PTR(m, n) ((uint64_t *)((uintptr_t)m + \
+ (uintptr_t)m->mq_headpp + n * sizeof (uint64_t)))
+#define TAIL_PTR(m, n) ((uint64_t *)((uintptr_t)m + \
+ (uintptr_t)m->mq_tailpp + n * sizeof (uint64_t)))
+
+#define MQ_RESERVED ((mqdes_t *)-1)
+
+#define ABS_TIME 0
+#define REL_TIME 1
+
+static mutex_t mq_list_lock = DEFAULTMUTEX;
+static mqdes_t *mq_list = NULL;
+
+extern int __signotify(int cmd, siginfo_t *sigonfo, signotify_id_t *sn_id);
+
+static int
+mq_is_valid(mqdes_t *mqdp)
+{
+ /*
+ * Any use of a message queue after it was closed is
+ * undefined. But the standard strongly favours EBADF
+ * returns. Before we dereference which could be fatal,
+ * we first do some pointer sanity checks.
+ */
+ if (mqdp != NULL && mqdp != MQ_RESERVED &&
+ ((uintptr_t)mqdp & 0x7) == 0) {
+ return (mqdp->mqd_magic == MQ_MAGIC);
+ }
+
+ return (0);
+}
+
+static void
+mq_init(mqhdr_t *mqhp, size_t msgsize, ssize_t maxmsg)
+{
+ int i;
+ uint64_t temp;
+ uint64_t currentp;
+ uint64_t nextp;
+
+ /*
+ * We only need to initialize the non-zero fields. The use of
+ * ftruncate() on the message queue file assures that the
+ * pages will be zfod.
+ */
+ (void) mutex_init(&mqhp->mq_exclusive, USYNC_PROCESS, NULL);
+ (void) sem_init(&mqhp->mq_rblocked, 1, 0);
+ (void) sem_init(&mqhp->mq_notempty, 1, 0);
+ (void) sem_init(&mqhp->mq_spawner, 1, 0);
+ (void) sem_init(&mqhp->mq_notfull, 1, (uint_t)maxmsg);
+
+ mqhp->mq_maxsz = msgsize;
+ mqhp->mq_maxmsg = maxmsg;
+
+ /*
+ * As of this writing (1997), there are 32 message queue priorities.
+ * If this is to change, then the size of the mq_mask will
+ * also have to change. If DEBUG is defined, assert that
+ * _MQ_PRIO_MAX hasn't changed.
+ */
+ mqhp->mq_maxprio = _MQ_PRIO_MAX;
+#if defined(DEBUG)
+ /* LINTED always true */
+ MQ_ASSERT(sizeof (mqhp->mq_mask) * 8 >= _MQ_PRIO_MAX);
+#endif
+
+ /*
+ * Since the message queue can be mapped into different
+ * virtual address ranges by different processes, we don't
+ * keep track of pointers, only offsets into the shared region.
+ */
+ mqhp->mq_headpp = sizeof (mqhdr_t);
+ mqhp->mq_tailpp = mqhp->mq_headpp +
+ mqhp->mq_maxprio * sizeof (uint64_t);
+ mqhp->mq_freep = mqhp->mq_tailpp +
+ mqhp->mq_maxprio * sizeof (uint64_t);
+
+ currentp = mqhp->mq_freep;
+ MQ_PTR(mqhp, currentp)->msg_next = 0;
+
+ temp = (mqhp->mq_maxsz + MQ_ALIGNSIZE - 1) & ~(MQ_ALIGNSIZE - 1);
+ for (i = 1; i < mqhp->mq_maxmsg; i++) {
+ nextp = currentp + sizeof (msghdr_t) + temp;
+ MQ_PTR(mqhp, currentp)->msg_next = nextp;
+ MQ_PTR(mqhp, nextp)->msg_next = 0;
+ currentp = nextp;
+ }
+}
+
+static size_t
+mq_getmsg(mqhdr_t *mqhp, char *msgp, uint_t *msg_prio)
+{
+ uint64_t currentp;
+ msghdr_t *curbuf;
+ uint64_t *headpp;
+ uint64_t *tailpp;
+
+ MQ_ASSERT(MUTEX_HELD(&mqhp->mq_exclusive));
+
+ /*
+ * Get the head and tail pointers for the queue of maximum
+ * priority. We shouldn't be here unless there is a message for
+ * us, so it's fair to assert that both the head and tail
+ * pointers are non-NULL.
+ */
+ headpp = HEAD_PTR(mqhp, mqhp->mq_curmaxprio);
+ tailpp = TAIL_PTR(mqhp, mqhp->mq_curmaxprio);
+
+ if (msg_prio != NULL)
+ *msg_prio = mqhp->mq_curmaxprio;
+
+ currentp = *headpp;
+ MQ_ASSERT_PTR(mqhp, currentp);
+ curbuf = MQ_PTR(mqhp, currentp);
+
+ if ((*headpp = curbuf->msg_next) == NULL) {
+ /*
+ * We just nuked the last message in this priority's queue.
+ * Twiddle this priority's bit, and then find the next bit
+ * tipped.
+ */
+ uint_t prio = mqhp->mq_curmaxprio;
+
+ mqhp->mq_mask &= ~(1u << prio);
+
+ for (; prio != 0; prio--)
+ if (mqhp->mq_mask & (1u << prio))
+ break;
+ mqhp->mq_curmaxprio = prio;
+
+ *tailpp = NULL;
+ }
+
+ /*
+ * Copy the message, and put the buffer back on the free list.
+ */
+ (void) memcpy(msgp, (char *)&curbuf[1], curbuf->msg_len);
+ curbuf->msg_next = mqhp->mq_freep;
+ mqhp->mq_freep = currentp;
+
+ return (curbuf->msg_len);
+}
+
+
+static void
+mq_putmsg(mqhdr_t *mqhp, const char *msgp, ssize_t len, uint_t prio)
+{
+ uint64_t currentp;
+ msghdr_t *curbuf;
+ uint64_t *headpp;
+ uint64_t *tailpp;
+
+ MQ_ASSERT(MUTEX_HELD(&mqhp->mq_exclusive));
+
+ /*
+ * Grab a free message block, and link it in. We shouldn't
+ * be here unless there is room in the queue for us; it's
+ * fair to assert that the free pointer is non-NULL.
+ */
+ currentp = mqhp->mq_freep;
+ MQ_ASSERT_PTR(mqhp, currentp);
+ curbuf = MQ_PTR(mqhp, currentp);
+
+ /*
+ * Remove a message from the free list, and copy in the new contents.
+ */
+ mqhp->mq_freep = curbuf->msg_next;
+ curbuf->msg_next = NULL;
+ (void) memcpy((char *)&curbuf[1], msgp, len);
+ curbuf->msg_len = len;
+
+ headpp = HEAD_PTR(mqhp, prio);
+ tailpp = TAIL_PTR(mqhp, prio);
+
+ if (*tailpp == 0) {
+ /*
+ * This is the first message on this queue. Set the
+ * head and tail pointers, and tip the appropriate bit
+ * in the priority mask.
+ */
+ *headpp = currentp;
+ *tailpp = currentp;
+ mqhp->mq_mask |= (1u << prio);
+ if (prio > mqhp->mq_curmaxprio)
+ mqhp->mq_curmaxprio = prio;
+ } else {
+ MQ_ASSERT_PTR(mqhp, *tailpp);
+ MQ_PTR(mqhp, *tailpp)->msg_next = currentp;
+ *tailpp = currentp;
+ }
+}
+
+mqd_t
+_mq_open(const char *path, int oflag, /* mode_t mode, mq_attr *attr */ ...)
+{
+ va_list ap;
+ mode_t mode;
+ struct mq_attr *attr;
+ int fd;
+ int err;
+ int cr_flag = 0;
+ int locked = 0;
+ uint64_t total_size;
+ size_t msgsize;
+ ssize_t maxmsg;
+ uint64_t temp;
+ void *ptr;
+ mqdes_t *mqdp;
+ mqhdr_t *mqhp;
+ struct mq_dn *mqdnp;
+
+ if (__pos4obj_check(path) == -1)
+ return ((mqd_t)-1);
+
+ /* acquire MSGQ lock to have atomic operation */
+ if (__pos4obj_lock(path, MQ_LOCK_TYPE) < 0)
+ goto out;
+ locked = 1;
+
+ va_start(ap, oflag);
+ /* filter oflag to have READ/WRITE/CREATE modes only */
+ oflag = oflag & (O_RDONLY|O_WRONLY|O_RDWR|O_CREAT|O_EXCL|O_NONBLOCK);
+ if ((oflag & O_CREAT) != 0) {
+ mode = va_arg(ap, mode_t);
+ attr = va_arg(ap, struct mq_attr *);
+ }
+ va_end(ap);
+
+ if ((fd = __pos4obj_open(path, MQ_PERM_TYPE, oflag,
+ mode, &cr_flag)) < 0)
+ goto out;
+
+ /* closing permission file */
+ (void) __close_nc(fd);
+
+ /* Try to open/create data file */
+ if (cr_flag) {
+ cr_flag = PFILE_CREATE;
+ if (attr == NULL) {
+ maxmsg = MQ_MAXMSG;
+ msgsize = MQ_MAXSIZE;
+ } else if (attr->mq_maxmsg <= 0 || attr->mq_msgsize <= 0) {
+ errno = EINVAL;
+ goto out;
+ } else if (attr->mq_maxmsg > _SEM_VALUE_MAX) {
+ errno = ENOSPC;
+ goto out;
+ } else {
+ maxmsg = attr->mq_maxmsg;
+ msgsize = attr->mq_msgsize;
+ }
+
+ /* adjust for message size at word boundary */
+ temp = (msgsize + MQ_ALIGNSIZE - 1) & ~(MQ_ALIGNSIZE - 1);
+
+ total_size = sizeof (mqhdr_t) +
+ maxmsg * (temp + sizeof (msghdr_t)) +
+ 2 * _MQ_PRIO_MAX * sizeof (uint64_t);
+
+ if (total_size > SSIZE_MAX) {
+ errno = ENOSPC;
+ goto out;
+ }
+
+ /*
+ * data file is opened with read/write to those
+ * who have read or write permission
+ */
+ mode = mode | (mode & 0444) >> 1 | (mode & 0222) << 1;
+ if ((fd = __pos4obj_open(path, MQ_DATA_TYPE,
+ (O_RDWR|O_CREAT|O_EXCL), mode, &err)) < 0)
+ goto out;
+
+ cr_flag |= DFILE_CREATE | DFILE_OPEN;
+
+ /* force permissions to avoid umask effect */
+ if (fchmod(fd, mode) < 0)
+ goto out;
+
+ if (ftruncate64(fd, (off64_t)total_size) < 0)
+ goto out;
+ } else {
+ if ((fd = __pos4obj_open(path, MQ_DATA_TYPE,
+ O_RDWR, 0666, &err)) < 0)
+ goto out;
+ cr_flag = DFILE_OPEN;
+
+ /* Message queue has not been initialized yet */
+ if (read(fd, &total_size, sizeof (total_size)) !=
+ sizeof (total_size) || total_size == 0) {
+ errno = ENOENT;
+ goto out;
+ }
+
+ /* Message queue too big for this process to handle */
+ if (total_size > SSIZE_MAX) {
+ errno = EFBIG;
+ goto out;
+ }
+ }
+
+ if ((mqdp = (mqdes_t *)malloc(sizeof (mqdes_t))) == NULL) {
+ errno = ENOMEM;
+ goto out;
+ }
+ cr_flag |= ALLOC_MEM;
+
+ if ((ptr = mmap64(NULL, total_size, PROT_READ|PROT_WRITE,
+ MAP_SHARED, fd, (off64_t)0)) == MAP_FAILED)
+ goto out;
+ mqhp = ptr;
+ cr_flag |= DFILE_MMAP;
+
+ /* closing data file */
+ (void) __close_nc(fd);
+ cr_flag &= ~DFILE_OPEN;
+
+ /*
+ * create, unlink, size, mmap, and close description file
+ * all for a flag word in anonymous shared memory
+ */
+ if ((fd = __pos4obj_open(path, MQ_DSCN_TYPE, O_RDWR | O_CREAT,
+ 0666, &err)) < 0)
+ goto out;
+ cr_flag |= DFILE_OPEN;
+ (void) __pos4obj_unlink(path, MQ_DSCN_TYPE);
+ if (ftruncate64(fd, (off64_t)sizeof (struct mq_dn)) < 0)
+ goto out;
+
+ if ((ptr = mmap64(NULL, sizeof (struct mq_dn),
+ PROT_READ | PROT_WRITE, MAP_SHARED, fd, (off64_t)0)) == MAP_FAILED)
+ goto out;
+ mqdnp = ptr;
+ cr_flag |= MQDNP_MMAP;
+
+ (void) __close_nc(fd);
+ cr_flag &= ~DFILE_OPEN;
+
+ /*
+ * we follow the same strategy as filesystem open() routine,
+ * where fcntl.h flags are changed to flags defined in file.h.
+ */
+ mqdp->mqd_flags = (oflag - FOPEN) & (FREAD|FWRITE);
+ mqdnp->mqdn_flags = (oflag - FOPEN) & (FNONBLOCK);
+
+ /* new message queue requires initialization */
+ if ((cr_flag & DFILE_CREATE) != 0) {
+ /* message queue header has to be initialized */
+ mq_init(mqhp, msgsize, maxmsg);
+ mqhp->mq_totsize = total_size;
+ }
+ mqdp->mqd_mq = mqhp;
+ mqdp->mqd_mqdn = mqdnp;
+ mqdp->mqd_magic = MQ_MAGIC;
+ mqdp->mqd_tcd = NULL;
+ if (__pos4obj_unlock(path, MQ_LOCK_TYPE) == 0) {
+ lmutex_lock(&mq_list_lock);
+ mqdp->mqd_next = mq_list;
+ mqdp->mqd_prev = NULL;
+ if (mq_list)
+ mq_list->mqd_prev = mqdp;
+ mq_list = mqdp;
+ lmutex_unlock(&mq_list_lock);
+ return ((mqd_t)mqdp);
+ }
+
+ locked = 0; /* fall into the error case */
+out:
+ err = errno;
+ if ((cr_flag & DFILE_OPEN) != 0)
+ (void) __close_nc(fd);
+ if ((cr_flag & DFILE_CREATE) != 0)
+ (void) __pos4obj_unlink(path, MQ_DATA_TYPE);
+ if ((cr_flag & PFILE_CREATE) != 0)
+ (void) __pos4obj_unlink(path, MQ_PERM_TYPE);
+ if ((cr_flag & ALLOC_MEM) != 0)
+ free((void *)mqdp);
+ if ((cr_flag & DFILE_MMAP) != 0)
+ (void) munmap((caddr_t)mqhp, (size_t)total_size);
+ if ((cr_flag & MQDNP_MMAP) != 0)
+ (void) munmap((caddr_t)mqdnp, sizeof (struct mq_dn));
+ if (locked)
+ (void) __pos4obj_unlock(path, MQ_LOCK_TYPE);
+ errno = err;
+ return ((mqd_t)-1);
+}
+
+static void
+mq_close_cleanup(mqdes_t *mqdp)
+{
+ mqhdr_t *mqhp = mqdp->mqd_mq;
+ struct mq_dn *mqdnp = mqdp->mqd_mqdn;
+
+ /* invalidate the descriptor before freeing it */
+ mqdp->mqd_magic = 0;
+ (void) mutex_unlock(&mqhp->mq_exclusive);
+
+ lmutex_lock(&mq_list_lock);
+ if (mqdp->mqd_next)
+ mqdp->mqd_next->mqd_prev = mqdp->mqd_prev;
+ if (mqdp->mqd_prev)
+ mqdp->mqd_prev->mqd_next = mqdp->mqd_next;
+ if (mq_list == mqdp)
+ mq_list = mqdp->mqd_next;
+ lmutex_unlock(&mq_list_lock);
+
+ free(mqdp);
+ (void) munmap((caddr_t)mqdnp, sizeof (struct mq_dn));
+ (void) munmap((caddr_t)mqhp, (size_t)mqhp->mq_totsize);
+}
+
+int
+_mq_close(mqd_t mqdes)
+{
+ mqdes_t *mqdp = (mqdes_t *)mqdes;
+ mqhdr_t *mqhp;
+ thread_communication_data_t *tcdp;
+
+ if (!mq_is_valid(mqdp)) {
+ errno = EBADF;
+ return (-1);
+ }
+
+ mqhp = mqdp->mqd_mq;
+ (void) mutex_lock(&mqhp->mq_exclusive);
+
+ if (mqhp->mq_des == (uintptr_t)mqdp &&
+ mqhp->mq_sigid.sn_pid == getpid()) {
+ /* notification is set for this descriptor, remove it */
+ (void) __signotify(SN_CANCEL, NULL, &mqhp->mq_sigid);
+ mqhp->mq_ntype = 0;
+ mqhp->mq_des = 0;
+ }
+
+ pthread_cleanup_push(mq_close_cleanup, mqdp);
+ if ((tcdp = mqdp->mqd_tcd) != NULL) {
+ mqdp->mqd_tcd = NULL;
+ del_sigev_mq(tcdp); /* possible cancellation point */
+ }
+ pthread_cleanup_pop(1); /* finish in the cleanup handler */
+
+ return (0);
+}
+
+int
+_mq_unlink(const char *path)
+{
+ int err;
+
+ if (__pos4obj_check(path) < 0)
+ return (-1);
+
+ if (__pos4obj_lock(path, MQ_LOCK_TYPE) < 0) {
+ return (-1);
+ }
+
+ err = __pos4obj_unlink(path, MQ_PERM_TYPE);
+
+ if (err == 0 || (err == -1 && errno == EEXIST)) {
+ errno = 0;
+ err = __pos4obj_unlink(path, MQ_DATA_TYPE);
+ }
+
+ if (__pos4obj_unlock(path, MQ_LOCK_TYPE) < 0)
+ return (-1);
+
+ return (err);
+
+}
+
+static int
+__mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len,
+ uint_t msg_prio, const timespec_t *timeout, int abs_rel)
+{
+ mqdes_t *mqdp = (mqdes_t *)mqdes;
+ mqhdr_t *mqhp;
+ int err;
+ int notify = 0;
+
+ /*
+ * sem_*wait() does cancellation, if called.
+ * pthread_testcancel() ensures that cancellation takes place if
+ * there is a cancellation pending when mq_*send() is called.
+ */
+ pthread_testcancel();
+
+ if (!mq_is_valid(mqdp) || (mqdp->mqd_flags & FWRITE) == 0) {
+ errno = EBADF;
+ return (-1);
+ }
+
+ mqhp = mqdp->mqd_mq;
+
+ if (msg_prio >= mqhp->mq_maxprio) {
+ errno = EINVAL;
+ return (-1);
+ }
+ if (msg_len > mqhp->mq_maxsz) {
+ errno = EMSGSIZE;
+ return (-1);
+ }
+
+ if (mqdp->mqd_mqdn->mqdn_flags & O_NONBLOCK)
+ err = sem_trywait(&mqhp->mq_notfull);
+ else {
+ /*
+ * We might get cancelled here...
+ */
+ if (timeout == NULL)
+ err = sem_wait(&mqhp->mq_notfull);
+ else if (abs_rel == ABS_TIME)
+ err = sem_timedwait(&mqhp->mq_notfull, timeout);
+ else
+ err = sem_reltimedwait_np(&mqhp->mq_notfull, timeout);
+ }
+ if (err == -1) {
+ /*
+ * errno has been set to EAGAIN / EINTR / ETIMEDOUT
+ * by sem_*wait(), so we can just return.
+ */
+ return (-1);
+ }
+
+ /*
+ * By the time we're here, we know that we've got the capacity
+ * to add to the queue...now acquire the exclusive lock.
+ */
+ (void) mutex_lock(&mqhp->mq_exclusive);
+
+ /*
+ * Now determine if we want to kick the notification. POSIX
+ * requires that if a process has registered for notification,
+ * we must kick it when the queue makes an empty to non-empty
+ * transition, and there are no blocked receivers. Note that
+ * this mechanism does _not_ guarantee that the kicked process
+ * will be able to receive a message without blocking;
+ * another receiver could intervene in the meantime. Thus,
+ * the notification mechanism is inherently racy; all we can
+ * do is hope to minimize the window as much as possible.
+ * In general, we want to avoid kicking the notification when
+ * there are clearly receivers blocked. We'll determine if
+ * we want to kick the notification before the mq_putmsg(),
+ * but the actual signotify() won't be done until the message
+ * is on the queue.
+ */
+ if (mqhp->mq_sigid.sn_pid != 0) {
+ int nmessages, nblocked;
+
+ (void) sem_getvalue(&mqhp->mq_notempty, &nmessages);
+ (void) sem_getvalue(&mqhp->mq_rblocked, &nblocked);
+
+ if (nmessages == 0 && nblocked == 0)
+ notify = 1;
+ }
+
+ mq_putmsg(mqhp, msg_ptr, (ssize_t)msg_len, msg_prio);
+ (void) sem_post(&mqhp->mq_notempty);
+
+ if (notify) {
+ /* notify and also delete the registration */
+ (void) __signotify(SN_SEND, NULL, &mqhp->mq_sigid);
+ if (mqhp->mq_ntype == SIGEV_THREAD ||
+ mqhp->mq_ntype == SIGEV_PORT)
+ (void) sem_post(&mqhp->mq_spawner);
+ mqhp->mq_ntype = 0;
+ mqhp->mq_des = 0;
+ }
+
+ MQ_ASSERT_SEMVAL_LEQ(&mqhp->mq_notempty, ((int)mqhp->mq_maxmsg));
+ (void) mutex_unlock(&mqhp->mq_exclusive);
+
+ return (0);
+}
+
+int
+_mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, uint_t msg_prio)
+{
+ return (__mq_timedsend(mqdes, msg_ptr, msg_len, msg_prio,
+ NULL, ABS_TIME));
+}
+
+int
+_mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len,
+ uint_t msg_prio, const timespec_t *abs_timeout)
+{
+ return (__mq_timedsend(mqdes, msg_ptr, msg_len, msg_prio,
+ abs_timeout, ABS_TIME));
+}
+
+int
+_mq_reltimedsend_np(mqd_t mqdes, const char *msg_ptr, size_t msg_len,
+ uint_t msg_prio, const timespec_t *rel_timeout)
+{
+ return (__mq_timedsend(mqdes, msg_ptr, msg_len, msg_prio,
+ rel_timeout, REL_TIME));
+}
+
+static void
+decrement_rblocked(mqhdr_t *mqhp)
+{
+ int canstate;
+
+ (void) pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &canstate);
+ while (sem_wait(&mqhp->mq_rblocked) == -1)
+ continue;
+ (void) pthread_setcancelstate(canstate, NULL);
+}
+
+static ssize_t
+__mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len,
+ uint_t *msg_prio, const timespec_t *timeout, int abs_rel)
+{
+ mqdes_t *mqdp = (mqdes_t *)mqdes;
+ mqhdr_t *mqhp;
+ ssize_t msg_size;
+ int err;
+
+ /*
+ * sem_*wait() does cancellation, if called.
+ * pthread_testcancel() ensures that cancellation takes place if
+ * there is a cancellation pending when mq_*receive() is called.
+ */
+ pthread_testcancel();
+
+ if (!mq_is_valid(mqdp) || (mqdp->mqd_flags & FREAD) == 0) {
+ errno = EBADF;
+ return (ssize_t)(-1);
+ }
+
+ mqhp = mqdp->mqd_mq;
+
+ if (msg_len < mqhp->mq_maxsz) {
+ errno = EMSGSIZE;
+ return (ssize_t)(-1);
+ }
+
+ /*
+ * The semaphoring scheme for mq_[timed]receive is a little hairy
+ * thanks to POSIX.1b's arcane notification mechanism. First,
+ * we try to take the common case and do a sem_trywait().
+ * If that doesn't work, and O_NONBLOCK hasn't been set,
+ * then note that we're going to sleep by incrementing the rblocked
+ * semaphore. We decrement that semaphore after waking up.
+ */
+ if (sem_trywait(&mqhp->mq_notempty) == -1) {
+ if ((mqdp->mqd_mqdn->mqdn_flags & O_NONBLOCK) != 0) {
+ /*
+ * errno has been set to EAGAIN or EINTR by
+ * sem_trywait(), so we can just return.
+ */
+ return (-1);
+ }
+ /*
+ * If we're here, then we're probably going to block...
+ * increment the rblocked semaphore. If we get
+ * cancelled, decrement_rblocked() will decrement it.
+ */
+ (void) sem_post(&mqhp->mq_rblocked);
+
+ pthread_cleanup_push(decrement_rblocked, mqhp);
+ if (timeout == NULL)
+ err = sem_wait(&mqhp->mq_notempty);
+ else if (abs_rel == ABS_TIME)
+ err = sem_timedwait(&mqhp->mq_notempty, timeout);
+ else
+ err = sem_reltimedwait_np(&mqhp->mq_notempty, timeout);
+ pthread_cleanup_pop(1);
+
+ if (err == -1) {
+ /*
+ * We took a signal or timeout while waiting
+ * on mq_notempty...
+ */
+ return (-1);
+ }
+ }
+
+ (void) mutex_lock(&mqhp->mq_exclusive);
+ msg_size = mq_getmsg(mqhp, msg_ptr, msg_prio);
+ (void) sem_post(&mqhp->mq_notfull);
+ MQ_ASSERT_SEMVAL_LEQ(&mqhp->mq_notfull, ((int)mqhp->mq_maxmsg));
+ (void) mutex_unlock(&mqhp->mq_exclusive);
+
+ return (msg_size);
+}
+
+ssize_t
+_mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, uint_t *msg_prio)
+{
+ return (__mq_timedreceive(mqdes, msg_ptr, msg_len, msg_prio,
+ NULL, ABS_TIME));
+}
+
+ssize_t
+_mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len,
+ uint_t *msg_prio, const timespec_t *abs_timeout)
+{
+ return (__mq_timedreceive(mqdes, msg_ptr, msg_len, msg_prio,
+ abs_timeout, ABS_TIME));
+}
+
+ssize_t
+_mq_reltimedreceive_np(mqd_t mqdes, char *msg_ptr, size_t msg_len,
+ uint_t *msg_prio, const timespec_t *rel_timeout)
+{
+ return (__mq_timedreceive(mqdes, msg_ptr, msg_len, msg_prio,
+ rel_timeout, REL_TIME));
+}
+
+/*
+ * Only used below, in _mq_notify().
+ * We already have a spawner thread.
+ * Verify that the attributes match; cancel it if necessary.
+ */
+static int
+cancel_if_necessary(thread_communication_data_t *tcdp,
+ const struct sigevent *sigevp)
+{
+ int do_cancel = !_pthread_attr_equal(tcdp->tcd_attrp,
+ sigevp->sigev_notify_attributes);
+
+ if (do_cancel) {
+ /*
+ * Attributes don't match, cancel the spawner thread.
+ */
+ (void) pthread_cancel(tcdp->tcd_server_id);
+ } else {
+ /*
+ * Reuse the existing spawner thread with possibly
+ * changed notification function and value.
+ */
+ tcdp->tcd_notif.sigev_notify = SIGEV_THREAD;
+ tcdp->tcd_notif.sigev_signo = 0;
+ tcdp->tcd_notif.sigev_value = sigevp->sigev_value;
+ tcdp->tcd_notif.sigev_notify_function =
+ sigevp->sigev_notify_function;
+ }
+
+ return (do_cancel);
+}
+
+int
+_mq_notify(mqd_t mqdes, const struct sigevent *sigevp)
+{
+ mqdes_t *mqdp = (mqdes_t *)mqdes;
+ mqhdr_t *mqhp;
+ thread_communication_data_t *tcdp;
+ siginfo_t mq_siginfo;
+ struct sigevent sigevent;
+ struct stat64 statb;
+ port_notify_t *pn;
+ void *userval;
+ int rval = -1;
+ int ntype;
+ int port;
+
+ if (!mq_is_valid(mqdp)) {
+ errno = EBADF;
+ return (-1);
+ }
+
+ mqhp = mqdp->mqd_mq;
+
+ (void) mutex_lock(&mqhp->mq_exclusive);
+
+ if (sigevp == NULL) { /* remove notification */
+ if (mqhp->mq_des == (uintptr_t)mqdp &&
+ mqhp->mq_sigid.sn_pid == getpid()) {
+ /* notification is set for this descriptor, remove it */
+ (void) __signotify(SN_CANCEL, NULL, &mqhp->mq_sigid);
+ if ((tcdp = mqdp->mqd_tcd) != NULL) {
+ sig_mutex_lock(&tcdp->tcd_lock);
+ if (tcdp->tcd_msg_enabled) {
+ /* cancel the spawner thread */
+ tcdp = mqdp->mqd_tcd;
+ mqdp->mqd_tcd = NULL;
+ (void) pthread_cancel(
+ tcdp->tcd_server_id);
+ }
+ sig_mutex_unlock(&tcdp->tcd_lock);
+ }
+ mqhp->mq_ntype = 0;
+ mqhp->mq_des = 0;
+ } else {
+ /* notification is not set for this descriptor */
+ errno = EBUSY;
+ goto bad;
+ }
+ } else { /* register notification with this process */
+ switch (ntype = sigevp->sigev_notify) {
+ case SIGEV_THREAD:
+ userval = sigevp->sigev_value.sival_ptr;
+ port = -1;
+ break;
+ case SIGEV_PORT:
+ pn = sigevp->sigev_value.sival_ptr;
+ userval = pn->portnfy_user;
+ port = pn->portnfy_port;
+ if (fstat64(port, &statb) != 0 ||
+ !S_ISPORT(statb.st_mode)) {
+ errno = EBADF;
+ goto bad;
+ }
+ (void) memset(&sigevent, 0, sizeof (sigevent));
+ sigevent.sigev_notify = SIGEV_PORT;
+ sigevp = &sigevent;
+ break;
+ }
+ switch (ntype) {
+ case SIGEV_NONE:
+ mq_siginfo.si_signo = 0;
+ mq_siginfo.si_code = SI_MESGQ;
+ break;
+ case SIGEV_SIGNAL:
+ mq_siginfo.si_signo = sigevp->sigev_signo;
+ mq_siginfo.si_value = sigevp->sigev_value;
+ mq_siginfo.si_code = SI_MESGQ;
+ break;
+ case SIGEV_THREAD:
+ if ((tcdp = mqdp->mqd_tcd) != NULL &&
+ cancel_if_necessary(tcdp, sigevp))
+ mqdp->mqd_tcd = NULL;
+ /* FALLTHROUGH */
+ case SIGEV_PORT:
+ if ((tcdp = mqdp->mqd_tcd) == NULL) {
+ /* we must create a spawner thread */
+ tcdp = setup_sigev_handler(sigevp, MQ);
+ if (tcdp == NULL) {
+ errno = EBADF;
+ goto bad;
+ }
+ tcdp->tcd_msg_enabled = 0;
+ tcdp->tcd_msg_closing = 0;
+ tcdp->tcd_msg_avail = &mqhp->mq_spawner;
+ if (launch_spawner(tcdp) != 0) {
+ free_sigev_handler(tcdp);
+ goto bad;
+ }
+ mqdp->mqd_tcd = tcdp;
+ }
+ mq_siginfo.si_signo = 0;
+ mq_siginfo.si_code = SI_MESGQ;
+ break;
+ default:
+ errno = EINVAL;
+ goto bad;
+ }
+
+ /* register notification */
+ if (__signotify(SN_PROC, &mq_siginfo, &mqhp->mq_sigid) < 0)
+ goto bad;
+ mqhp->mq_ntype = ntype;
+ mqhp->mq_des = (uintptr_t)mqdp;
+ switch (ntype) {
+ case SIGEV_THREAD:
+ case SIGEV_PORT:
+ tcdp->tcd_port = port;
+ tcdp->tcd_msg_object = mqdp;
+ tcdp->tcd_msg_userval = userval;
+ sig_mutex_lock(&tcdp->tcd_lock);
+ tcdp->tcd_msg_enabled = ntype;
+ sig_mutex_unlock(&tcdp->tcd_lock);
+ (void) cond_broadcast(&tcdp->tcd_cv);
+ break;
+ }
+ }
+
+ rval = 0; /* success */
+bad:
+ (void) mutex_unlock(&mqhp->mq_exclusive);
+ return (rval);
+}
+
+int
+_mq_setattr(mqd_t mqdes, const struct mq_attr *mqstat, struct mq_attr *omqstat)
+{
+ mqdes_t *mqdp = (mqdes_t *)mqdes;
+ mqhdr_t *mqhp;
+ uint_t flag = 0;
+
+ if (!mq_is_valid(mqdp)) {
+ errno = EBADF;
+ return (-1);
+ }
+
+ /* store current attributes */
+ if (omqstat != NULL) {
+ int count;
+
+ mqhp = mqdp->mqd_mq;
+ omqstat->mq_flags = mqdp->mqd_mqdn->mqdn_flags;
+ omqstat->mq_maxmsg = (long)mqhp->mq_maxmsg;
+ omqstat->mq_msgsize = (long)mqhp->mq_maxsz;
+ (void) sem_getvalue(&mqhp->mq_notempty, &count);
+ omqstat->mq_curmsgs = count;
+ }
+
+ /* set description attributes */
+ if ((mqstat->mq_flags & O_NONBLOCK) != 0)
+ flag = FNONBLOCK;
+ mqdp->mqd_mqdn->mqdn_flags = flag;
+
+ return (0);
+}
+
+int
+_mq_getattr(mqd_t mqdes, struct mq_attr *mqstat)
+{
+ mqdes_t *mqdp = (mqdes_t *)mqdes;
+ mqhdr_t *mqhp;
+ int count;
+
+ if (!mq_is_valid(mqdp)) {
+ errno = EBADF;
+ return (-1);
+ }
+
+ mqhp = mqdp->mqd_mq;
+
+ mqstat->mq_flags = mqdp->mqd_mqdn->mqdn_flags;
+ mqstat->mq_maxmsg = (long)mqhp->mq_maxmsg;
+ mqstat->mq_msgsize = (long)mqhp->mq_maxsz;
+ (void) sem_getvalue(&mqhp->mq_notempty, &count);
+ mqstat->mq_curmsgs = count;
+ return (0);
+}
+
+/*
+ * Cleanup after fork1() in the child process.
+ */
+void
+postfork1_child_sigev_mq(void)
+{
+ thread_communication_data_t *tcdp;
+ mqdes_t *mqdp;
+
+ for (mqdp = mq_list; mqdp; mqdp = mqdp->mqd_next) {
+ if ((tcdp = mqdp->mqd_tcd) != NULL) {
+ mqdp->mqd_tcd = NULL;
+ tcd_teardown(tcdp);
+ }
+ }
+}