summaryrefslogtreecommitdiff
path: root/usr/src/uts/common/os/msg.c
diff options
context:
space:
mode:
authordv142724 <none@none>2007-05-03 03:28:00 -0700
committerdv142724 <none@none>2007-05-03 03:28:00 -0700
commit2c5b6df145c068c61f714a0ccd0f4a3e64037fb5 (patch)
treeef6fa38c0fde41d79c8fa9f3c42ffbb2c175d748 /usr/src/uts/common/os/msg.c
parentd9976468b7ae1e0b4133ee59b2fa5678de9e9cf2 (diff)
downloadillumos-joyent-2c5b6df145c068c61f714a0ccd0f4a3e64037fb5.tar.gz
6449436 msgsnd and msgrcv causing performance issues.
Diffstat (limited to 'usr/src/uts/common/os/msg.c')
-rw-r--r--usr/src/uts/common/os/msg.c725
1 files changed, 576 insertions, 149 deletions
diff --git a/usr/src/uts/common/os/msg.c b/usr/src/uts/common/os/msg.c
index 22f04d9e16..e4d49ea1b3 100644
--- a/usr/src/uts/common/os/msg.c
+++ b/usr/src/uts/common/os/msg.c
@@ -19,7 +19,7 @@
* CDDL HEADER END
*/
/*
- * Copyright 2006 Sun Microsystems, Inc. All rights reserved.
+ * Copyright 2007 Sun Microsystems, Inc. All rights reserved.
* Use is subject to license terms.
*/
@@ -158,6 +158,139 @@ static struct modlsys modlsys32 = {
};
#endif
+/*
+ * Big Theory statement for message queue correctness
+ *
+ * The msgrcv and msgsnd functions no longer uses cv_broadcast to wake up
+ * receivers who are waiting for an event. Using the cv_broadcast method
+ * resulted in negative scaling when the number of waiting receivers are large
+ * (the thundering herd problem). Instead, the receivers waiting to receive a
+ * message are now linked in a queue-like fashion and awaken one at a time in
+ * a controlled manner.
+ *
+ * Receivers can block on two different classes of waiting list:
+ * 1) "sendwait" list, which is the more complex list of the two. The
+ * receiver will be awakened by a sender posting a new message. There
+ * are two types of "sendwait" list used:
+ * a) msg_wait_snd: handles all receivers who are looking for
+ * a message type >= 0, but was unable to locate a match.
+ *
+ * slot 0: reserved for receivers that have designated they
+ * will take any message type.
+ * rest: consist of receivers requesting a specific type
+ * but the type was not present. The entries are
+ * hashed into a bucket in an attempt to keep
+ * any list search relatively short.
+ * b) msg_wait_snd_ngt: handles all receivers that have designated
+ * a negative message type. Unlike msg_wait_snd, the hash bucket
+ * serves a range of negative message types (-1 to -5, -6 to -10
+ * and so forth), where the last bucket is reserved for all the
+ * negative message types that hash outside of MSG_MAX_QNUM - 1.
+ * This is done this way to simplify the operation of locating a
+ * negative message type.
+ *
+ * 2) "copyout" list, where the receiver is awakened by another
+ * receiver after a message is copied out. This is a linked list
+ * of waiters that are awakened one at a time. Although the solution is
+ * not optimal, the complexity that would be added in for waking
+ * up the right entry far exceeds any potential pay back (too many
+ * correctness and corner case issues).
+ *
+ * The lists are doubly linked. In the case of the "sendwait"
+ * list, this allows the thread to remove itself from the list without having
+ * to traverse the list. In the case of the "copyout" list it simply allows
+ * us to use common functions with the "sendwait" list.
+ *
+ * To make sure receivers are not hung out to dry, we must guarantee:
+ * 1. If any queued message matches any receiver, then at least one
+ * matching receiver must be processing the request.
+ * 2. Blocking on the copyout queue is only temporary while messages
+ * are being copied out. The process is guaranted to wakeup
+ * when it gets to front of the queue (copyout is a FIFO).
+ *
+ * Rules for blocking and waking up:
+ * 1. A receiver entering msgrcv must examine all messages for a match
+ * before blocking on a sendwait queue.
+ * 2. If the receiver blocks because the message it chose is already
+ * being copied out, then when it wakes up needs to start start
+ * checking the messages from the beginning.
+ * 3) When ever a process returns from msgrcv for any reason, if it
+ * had attempted to copy a message or blocked waiting for a copy
+ * to complete it needs to wakeup the next receiver blocked on
+ * a copy out.
+ * 4) When a message is sent, the sender selects a process waiting
+ * for that type of message. This selection process rotates between
+ * receivers types of 0, negative and positive to prevent starvation of
+ * any one particular receiver type.
+ * 5) The following are the scenarios for processes that are awakened
+ * by a msgsnd:
+ * a) The process finds the message and is able to copy
+ * it out. Once complete, the process returns.
+ * b) The message that was sent that triggered the wakeup is no
+ * longer available (another process found the message first).
+ * We issue a wakeup on copy queue and then go back to
+ * sleep waiting for another matching message to be sent.
+ * c) The message that was supposed to be processed was
+ * already serviced by another process. However a different
+ * message is present which we can service. The message
+ * is copied and the process returns.
+ * d) The message is found, but some sort of error occurs that
+ * prevents the message from being copied. The receiver
+ * wakes up the next sender that can service this message
+ * type and returns an error to the caller.
+ * e) The message is found, but it is marked as being copied
+ * out. The receiver then goes to sleep on the copyout
+ * queue where it will be awakened again sometime in the future.
+ *
+ *
+ * 6) Whenever a message is found that matches the message type designated,
+ * but is being copied out we have to block on the copyout queue.
+ * After process copying finishes the copy out, it must wakeup (either
+ * directly or indirectly) all receivers who blocked on its copyout,
+ * so they are guaranteed a chance to examine the remaining messages.
+ * This is implemented via a chain of wakeups: Y wakes X, who wakes Z,
+ * and so on. The chain cannot be broken. This leads to the following
+ * cases:
+ * a) A receiver is finished copying the message (or encountered)
+ * an error), the first entry on the copyout queue is woken
+ * up.
+ * b) When the receiver is woken up, it attempts to locate
+ * a message type match.
+ * c) If a message type is found and
+ * -- MSG_RCVCOPY flag is not set, the message is
+ * marked for copying out. Regardless of the copyout
+ * success the next entry on the copyout queue is
+ * awakened and the operation is completed.
+ * -- MSG_RCVCOPY is set, we simply go back to sleep again
+ * on the copyout queue.
+ * d) If the message type is not found then we wakeup the next
+ * process on the copyout queue.
+ */
+
+static ulong_t msg_type_hash(long);
+static int msgq_check_err(kmsqid_t *qp, int cvres);
+static int msg_rcvq_sleep(list_t *, msgq_wakeup_t *, kmutex_t **,
+ kmsqid_t *);
+static int msg_copyout(kmsqid_t *, long, kmutex_t **, size_t *, size_t,
+ struct msg *, struct ipcmsgbuf *, int);
+static void msg_rcvq_wakeup_all(list_t *);
+static void msg_wakeup_rdr(kmsqid_t *, msg_select_t **, long);
+static msgq_wakeup_t *msg_fnd_any_snd(kmsqid_t *, int, long);
+static msgq_wakeup_t *msg_fnd_any_rdr(kmsqid_t *, int, long);
+static msgq_wakeup_t *msg_fnd_neg_snd(kmsqid_t *, int, long);
+static msgq_wakeup_t *msg_fnd_spc_snd(kmsqid_t *, int, long);
+static struct msg *msgrcv_lookup(kmsqid_t *, long);
+
+msg_select_t msg_fnd_sndr[] = {
+ { msg_fnd_any_snd, &msg_fnd_sndr[1] },
+ { msg_fnd_spc_snd, &msg_fnd_sndr[2] },
+ { msg_fnd_neg_snd, &msg_fnd_sndr[0] }
+};
+
+msg_select_t msg_fnd_rdr[1] = {
+ { msg_fnd_any_rdr, &msg_fnd_rdr[0] },
+};
+
static struct modlinkage modlinkage = {
MODREV_1,
&modlsys,
@@ -205,8 +338,14 @@ msg_dtor(kipc_perm_t *perm)
kmsqid_t *qp = (kmsqid_t *)perm;
int ii;
- for (ii = 0; ii < MAX_QNUM_CV; ii++)
- ASSERT(qp->msg_rcv_cnt[ii] == 0);
+ for (ii = 0; ii <= MSG_MAX_QNUM; ii++) {
+ ASSERT(list_is_empty(&qp->msg_wait_snd[ii]));
+ ASSERT(list_is_empty(&qp->msg_wait_snd_ngt[ii]));
+ list_destroy(&qp->msg_wait_snd[ii]);
+ list_destroy(&qp->msg_wait_snd_ngt[ii]);
+ }
+ ASSERT(list_is_empty(&qp->msg_cpy_block));
+ list_destroy(&qp->msg_cpy_block);
ASSERT(qp->msg_snd_cnt == 0);
ASSERT(qp->msg_cbytes == 0);
list_destroy(&qp->msg_list);
@@ -261,10 +400,15 @@ msg_rmid(kipc_perm_t *perm)
msgunlink(qp, mp);
ASSERT(qp->msg_cbytes == 0);
- for (ii = 0; ii < MAX_QNUM_CV; ii++) {
- if (qp->msg_rcv_cnt[ii])
- cv_broadcast(&qp->msg_rcv_cv[ii]);
+ /*
+ * Wake up everyone who is in a wait state of some sort
+ * for this message queue.
+ */
+ for (ii = 0; ii <= MSG_MAX_QNUM; ii++) {
+ msg_rcvq_wakeup_all(&qp->msg_wait_snd[ii]);
+ msg_rcvq_wakeup_all(&qp->msg_wait_snd_ngt[ii]);
}
+ msg_rcvq_wakeup_all(&qp->msg_cpy_block);
if (qp->msg_snd_cnt)
cv_broadcast(&qp->msg_snd_cv);
}
@@ -280,7 +424,7 @@ msgctl(int msgid, int cmd, void *arg)
{
STRUCT_DECL(msqid_ds, ds); /* SVR4 queue work area */
kmsqid_t *qp; /* ptr to associated q */
- int error, ii;
+ int error;
struct cred *cr;
model_t mdl = get_udatamodel();
struct msqid_ds64 ds64;
@@ -338,12 +482,8 @@ msgctl(int msgid, int cmd, void *arg)
return (set_errno(error));
}
- for (ii = 0; ii < MAX_QNUM_CV; ii++) {
- if (qp->msg_rcv_cnt[ii]) {
- qp->msg_perm.ipc_mode |= MSG_RWAIT;
- break;
- }
- }
+ if (qp->msg_rcv_cnt)
+ qp->msg_perm.ipc_mode |= MSG_RWAIT;
if (qp->msg_snd_cnt)
qp->msg_perm.ipc_mode |= MSG_WWAIT;
ipcperm_stat(&STRUCT_BUF(ds)->msg_perm, &qp->msg_perm, mdl);
@@ -381,12 +521,8 @@ msgctl(int msgid, int cmd, void *arg)
break;
case IPC_STAT64:
- for (ii = 0; ii < MAX_QNUM_CV; ii++) {
- if (qp->msg_rcv_cnt[ii]) {
- qp->msg_perm.ipc_mode |= MSG_RWAIT;
- break;
- }
- }
+ if (qp->msg_rcv_cnt)
+ qp->msg_perm.ipc_mode |= MSG_RWAIT;
if (qp->msg_snd_cnt)
qp->msg_perm.ipc_mode |= MSG_WWAIT;
ipcperm_stat64(&ds64.msgx_perm, &qp->msg_perm);
@@ -463,8 +599,29 @@ top:
qp->msg_lspid = qp->msg_lrpid = 0;
qp->msg_stime = qp->msg_rtime = 0;
qp->msg_ctime = gethrestime_sec();
- for (ii = 0; ii < MAX_QNUM_CV; ii++)
- qp->msg_rcv_cnt[ii] = 0;
+ qp->msg_ngt_cnt = 0;
+ qp->msg_neg_copy = 0;
+ for (ii = 0; ii <= MSG_MAX_QNUM; ii++) {
+ list_create(&qp->msg_wait_snd[ii],
+ sizeof (msgq_wakeup_t),
+ offsetof(msgq_wakeup_t, msgw_list));
+ list_create(&qp->msg_wait_snd_ngt[ii],
+ sizeof (msgq_wakeup_t),
+ offsetof(msgq_wakeup_t, msgw_list));
+ }
+ /*
+ * The proper initialization of msg_lowest_type is to the
+ * highest possible value. By doing this we guarantee that
+ * when the first send happens, the lowest type will be set
+ * properly.
+ */
+ qp->msg_lowest_type = -1;
+ list_create(&qp->msg_cpy_block,
+ sizeof (msgq_wakeup_t),
+ offsetof(msgq_wakeup_t, msgw_list));
+ qp->msg_fnd_sndr = &msg_fnd_sndr[0];
+ qp->msg_fnd_rdr = &msg_fnd_rdr[0];
+ qp->msg_rcv_cnt = 0;
qp->msg_snd_cnt = 0;
if (error = ipc_commit_begin(msq_svc, key, msgflg,
@@ -488,162 +645,269 @@ top:
return (id);
}
-/*
- * msgrcv system call.
- */
static ssize_t
msgrcv(int msqid, struct ipcmsgbuf *msgp, size_t msgsz, long msgtyp, int msgflg)
{
- struct msg *mp; /* ptr to msg on q */
struct msg *smp; /* ptr to best msg on q */
kmsqid_t *qp; /* ptr to associated q */
kmutex_t *lock;
size_t xtsz; /* transfer byte count */
- int error = 0, copyerror = 0;
+ int error = 0;
int cvres;
- STRUCT_HANDLE(ipcmsgbuf, umsgp);
- model_t mdl = get_udatamodel();
+ ulong_t msg_hash;
+ msgq_wakeup_t msg_entry;
CPU_STATS_ADDQ(CPU, sys, msg, 1); /* bump msg send/rcv count */
- STRUCT_SET_HANDLE(umsgp, mdl, msgp);
- if ((lock = ipc_lookup(msq_svc, msqid, (kipc_perm_t **)&qp)) == NULL)
+ msg_hash = msg_type_hash(msgtyp);
+ if ((lock = ipc_lookup(msq_svc, msqid, (kipc_perm_t **)&qp)) == NULL) {
return ((ssize_t)set_errno(EINVAL));
+ }
ipc_hold(msq_svc, (kipc_perm_t *)qp);
- if (error = ipcperm_access(&qp->msg_perm, MSG_R, CRED()))
+ if (error = ipcperm_access(&qp->msg_perm, MSG_R, CRED())) {
goto msgrcv_out;
+ }
+ /*
+ * Various information (including the condvar_t) required for the
+ * process to sleep is provided by it's stack.
+ */
+ msg_entry.msgw_thrd = curthread;
+ msg_entry.msgw_snd_wake = 0;
+ msg_entry.msgw_type = msgtyp;
findmsg:
- smp = NULL;
- mp = list_head(&qp->msg_list);
- if (msgtyp == 0) {
- smp = mp;
- } else {
- for (; mp; mp = list_next(&qp->msg_list, mp)) {
- if (msgtyp > 0) {
- if (msgtyp != mp->msg_type)
- continue;
- smp = mp;
- break;
- }
- if (mp->msg_type <= -msgtyp) {
- if (smp && smp->msg_type <= mp->msg_type)
- continue;
- smp = mp;
- }
- }
- }
+ smp = msgrcv_lookup(qp, msgtyp);
if (smp) {
/*
- * Message found.
+ * We found a possible message to copy out.
*/
if ((smp->msg_flags & MSG_RCVCOPY) == 0) {
/*
- * No one else is copying this message. Copy it.
+ * It is available, attempt to copy it.
*/
- if (msgsz < smp->msg_size) {
- if ((msgflg & MSG_NOERROR) == 0) {
- error = E2BIG;
- goto msgrcv_out;
- } else {
- xtsz = msgsz;
- }
- } else {
- xtsz = smp->msg_size;
- }
-
+ error = msg_copyout(qp, msgtyp, &lock, &xtsz, msgsz,
+ smp, msgp, msgflg);
/*
- * Mark message as being copied out. Release mutex
- * while copying out.
+ * Don't forget to wakeup a sleeper that blocked because
+ * we were copying things out.
*/
- ASSERT((smp->msg_flags & MSG_RCVCOPY) == 0);
- smp->msg_flags |= MSG_RCVCOPY;
- msg_hold(smp);
- mutex_exit(lock);
+ msg_wakeup_rdr(qp, &qp->msg_fnd_rdr, 0);
+ goto msgrcv_out;
+ }
+ /*
+ * The selected message is being copied out, so block. We do
+ * not need to wake the next person up on the msg_cpy_block list
+ * due to the fact some one is copying out and they will get
+ * things moving again once the copy is completed.
+ */
+ cvres = msg_rcvq_sleep(&qp->msg_cpy_block,
+ &msg_entry, &lock, qp);
+ error = msgq_check_err(qp, cvres);
+ if (error) {
+ goto msgrcv_out;
+ }
+ goto findmsg;
+ }
+ /*
+ * There isn't a message to copy out that matches the designated
+ * criteria.
+ */
+ if (msgflg & IPC_NOWAIT) {
+ error = ENOMSG;
+ goto msgrcv_out;
+ }
+ msg_wakeup_rdr(qp, &qp->msg_fnd_rdr, 0);
- if (mdl == DATAMODEL_NATIVE) {
- copyerror = copyout(&smp->msg_type, msgp,
- sizeof (smp->msg_type));
- } else {
- /*
- * 32-bit callers need an imploded msg type.
- */
- int32_t msg_type32 = smp->msg_type;
+ /*
+ * Wait for new message. We keep the negative and positive types
+ * separate for performance reasons.
+ */
+ msg_entry.msgw_snd_wake = 0;
+ if (msgtyp >= 0) {
+ cvres = msg_rcvq_sleep(&qp->msg_wait_snd[msg_hash],
+ &msg_entry, &lock, qp);
+ } else {
+ qp->msg_ngt_cnt++;
+ cvres = msg_rcvq_sleep(&qp->msg_wait_snd_ngt[msg_hash],
+ &msg_entry, &lock, qp);
+ qp->msg_ngt_cnt--;
+ }
- copyerror = copyout(&msg_type32, msgp,
- sizeof (msg_type32));
- }
+ if (!(error = msgq_check_err(qp, cvres))) {
+ goto findmsg;
+ }
- if (copyerror == 0 && xtsz)
- copyerror = copyout(smp->msg_addr,
- STRUCT_FADDR(umsgp, mtext), xtsz);
+msgrcv_out:
+ if (error) {
+ msg_wakeup_rdr(qp, &qp->msg_fnd_rdr, 0);
+ if (msg_entry.msgw_snd_wake) {
+ msg_wakeup_rdr(qp, &qp->msg_fnd_sndr,
+ msg_entry.msgw_snd_wake);
+ }
+ ipc_rele(msq_svc, (kipc_perm_t *)qp);
+ return ((ssize_t)set_errno(error));
+ }
+ ipc_rele(msq_svc, (kipc_perm_t *)qp);
+ return ((ssize_t)xtsz);
+}
- /*
- * Reclaim mutex, make sure queue still exists,
- * and remove message.
- */
- lock = ipc_lock(msq_svc, qp->msg_perm.ipc_id);
- ASSERT(smp->msg_flags & MSG_RCVCOPY);
- smp->msg_flags &= ~MSG_RCVCOPY;
- msg_rele(smp);
-
- if (IPC_FREE(&qp->msg_perm)) {
- error = EIDRM;
- goto msgrcv_out;
- }
- /*
- * MSG_RCVCOPY was set while we dropped and reaquired
- * the lock. A thread looking for same message type
- * might have entered during that interval and seeing
- * MSG_RCVCOPY set, would have landed up in the sleepq.
- */
- cv_broadcast(&qp->msg_rcv_cv[MSG_QNUM(smp->msg_type)]);
- cv_broadcast(&qp->msg_rcv_cv[0]);
+static int
+msgq_check_err(kmsqid_t *qp, int cvres)
+{
+ if (IPC_FREE(&qp->msg_perm)) {
+ return (EIDRM);
+ }
- if (copyerror) {
- error = EFAULT;
- goto msgrcv_out;
- }
- qp->msg_lrpid = ttoproc(curthread)->p_pid;
- qp->msg_rtime = gethrestime_sec();
- msgunlink(qp, smp);
- goto msgrcv_out;
+ if (cvres == 0) {
+ return (EINTR);
+ }
+
+ return (0);
+}
+
+static int
+msg_copyout(kmsqid_t *qp, long msgtyp, kmutex_t **lock, size_t *xtsz_ret,
+ size_t msgsz, struct msg *smp, struct ipcmsgbuf *msgp, int msgflg)
+{
+ size_t xtsz;
+ STRUCT_HANDLE(ipcmsgbuf, umsgp);
+ model_t mdl = get_udatamodel();
+ int copyerror = 0;
+
+ STRUCT_SET_HANDLE(umsgp, mdl, msgp);
+ if (msgsz < smp->msg_size) {
+ if ((msgflg & MSG_NOERROR) == 0) {
+ return (E2BIG);
+ } else {
+ xtsz = msgsz;
}
+ } else {
+ xtsz = smp->msg_size;
+ }
+ *xtsz_ret = xtsz;
+
+ /*
+ * To prevent a DOS attack we mark the message as being
+ * copied out and release mutex. When the copy is completed
+ * we need to acquire the mutex and make the appropriate updates.
+ */
+ ASSERT((smp->msg_flags & MSG_RCVCOPY) == 0);
+ smp->msg_flags |= MSG_RCVCOPY;
+ msg_hold(smp);
+ if (msgtyp < 0) {
+ ASSERT(qp->msg_neg_copy == 0);
+ qp->msg_neg_copy = 1;
+ }
+ mutex_exit(*lock);
+ if (mdl == DATAMODEL_NATIVE) {
+ copyerror = copyout(&smp->msg_type, msgp,
+ sizeof (smp->msg_type));
} else {
/*
- * No message found.
+ * 32-bit callers need an imploded msg type.
*/
- if (msgflg & IPC_NOWAIT) {
- error = ENOMSG;
- goto msgrcv_out;
- }
+ int32_t msg_type32 = smp->msg_type;
+
+ copyerror = copyout(&msg_type32, msgp,
+ sizeof (msg_type32));
+ }
+
+ if (copyerror == 0 && xtsz) {
+ copyerror = copyout(smp->msg_addr,
+ STRUCT_FADDR(umsgp, mtext), xtsz);
}
- /* Wait for new message */
- qp->msg_rcv_cnt[MSG_QNUM(msgtyp)]++;
- cvres = cv_wait_sig(&qp->msg_rcv_cv[MSG_QNUM(msgtyp)], lock);
- lock = ipc_relock(msq_svc, qp->msg_perm.ipc_id, lock);
- qp->msg_rcv_cnt[MSG_QNUM(msgtyp)]--;
+ /*
+ * Reclaim the mutex and make sure the message queue still exists.
+ */
+ *lock = ipc_lock(msq_svc, qp->msg_perm.ipc_id);
+ if (msgtyp < 0) {
+ qp->msg_neg_copy = 0;
+ }
+ ASSERT(smp->msg_flags & MSG_RCVCOPY);
+ smp->msg_flags &= ~MSG_RCVCOPY;
+ msg_rele(smp);
if (IPC_FREE(&qp->msg_perm)) {
- error = EIDRM;
- goto msgrcv_out;
+ return (EIDRM);
}
- if (cvres == 0) {
- error = EINTR;
- goto msgrcv_out;
+ if (copyerror) {
+ return (EFAULT);
}
+ qp->msg_lrpid = ttoproc(curthread)->p_pid;
+ qp->msg_rtime = gethrestime_sec();
+ msgunlink(qp, smp);
+ return (0);
+}
- goto findmsg;
+static struct msg *
+msgrcv_lookup(kmsqid_t *qp, long msgtyp)
+{
+ struct msg *smp = NULL;
+ int qp_low;
+ struct msg *mp; /* ptr to msg on q */
+ int low_msgtype;
+ static struct msg neg_copy_smp;
-msgrcv_out:
- ipc_rele(msq_svc, (kipc_perm_t *)qp);
- if (error)
- return ((ssize_t)set_errno(error));
- return ((ssize_t)xtsz);
+ mp = list_head(&qp->msg_list);
+ if (msgtyp == 0) {
+ smp = mp;
+ } else {
+ qp_low = qp->msg_lowest_type;
+ if (msgtyp > 0) {
+ /*
+ * If our lowest possible message type is larger than
+ * the message type desired, then we know there is
+ * no entry present.
+ */
+ if (qp_low > msgtyp) {
+ return (NULL);
+ }
+
+ for (; mp; mp = list_next(&qp->msg_list, mp)) {
+ if (msgtyp == mp->msg_type) {
+ smp = mp;
+ break;
+ }
+ }
+ } else {
+ /*
+ * We have kept track of the lowest possible message
+ * type on the send queue. This allows us to terminate
+ * the search early if we find a message type of that
+ * type. Note, the lowest type may not be the actual
+ * lowest value in the system, it is only guaranteed
+ * that there isn't a value lower than that.
+ */
+ low_msgtype = -msgtyp;
+ if (low_msgtype++ < qp_low) {
+ return (NULL);
+ }
+ if (qp->msg_neg_copy) {
+ neg_copy_smp.msg_flags = MSG_RCVCOPY;
+ return (&neg_copy_smp);
+ }
+ for (; mp; mp = list_next(&qp->msg_list, mp)) {
+ if (mp->msg_type < low_msgtype) {
+ smp = mp;
+ low_msgtype = mp->msg_type;
+ if (low_msgtype == qp_low) {
+ break;
+ }
+ }
+ }
+ if (smp) {
+ /*
+ * Update the lowest message type.
+ */
+ qp->msg_lowest_type = smp->msg_type;
+ }
+ }
+ }
+ return (smp);
}
/*
@@ -879,13 +1143,7 @@ top:
lock = ipc_relock(msq_svc, qp->msg_perm.ipc_id, lock);
qp->msg_snd_cnt--;
- if (IPC_FREE(&qp->msg_perm)) {
- error = EIDRM;
- goto msgsnd_out;
- }
-
- if (cvres == 0) {
- error = EINTR;
+ if (error = msgq_check_err(qp, cvres)) {
goto msgsnd_out;
}
}
@@ -922,18 +1180,13 @@ top:
qp->msg_lspid = curproc->p_pid;
qp->msg_stime = gethrestime_sec();
mp->msg_type = type;
- mp->msg_flags = 0;
+ if (qp->msg_lowest_type > type)
+ qp->msg_lowest_type = type;
list_insert_tail(&qp->msg_list, mp);
/*
- * For all message type >= 1.
+ * Get the proper receiver going.
*/
- if (qp->msg_rcv_cnt[MSG_QNUM(type)])
- cv_broadcast(&qp->msg_rcv_cv[MSG_QNUM(type)]);
- /*
- * For all message type < 1.
- */
- if (qp->msg_rcv_cnt[0])
- cv_broadcast(&qp->msg_rcv_cv[0]);
+ msg_wakeup_rdr(qp, &qp->msg_fnd_sndr, type);
msgsnd_out:
if (lock)
@@ -948,6 +1201,180 @@ msgsnd_out:
return (0);
}
+static void
+msg_wakeup_rdr(kmsqid_t *qp, msg_select_t **flist, long type)
+{
+ msg_select_t *walker = *flist;
+ msgq_wakeup_t *wakeup;
+ ulong_t msg_hash;
+
+ msg_hash = msg_type_hash(type);
+
+ do {
+ wakeup = walker->selection(qp, msg_hash, type);
+ walker = walker->next_selection;
+ } while (!wakeup && walker != *flist);
+
+ *flist = (*flist)->next_selection;
+ if (wakeup) {
+ if (type) {
+ wakeup->msgw_snd_wake = type;
+ }
+ cv_signal(&wakeup->msgw_wake_cv);
+ }
+}
+
+static ulong_t
+msg_type_hash(long msg_type)
+{
+ long temp;
+ ulong_t hash;
+
+ if (msg_type < 0) {
+ /*
+ * Negative message types are hashed over an
+ * interval. Any message type that hashes
+ * beyond MSG_MAX_QNUM is automatically placed
+ * in the last bucket.
+ */
+ temp = -msg_type;
+ hash = temp / MSG_NEG_INTERVAL;
+ if (hash > MSG_MAX_QNUM) {
+ hash = MSG_MAX_QNUM;
+ }
+ return (hash);
+ }
+
+ /*
+ * 0 or positive message type. The first bucket is reserved for
+ * message receivers of type 0, the other buckets we hash into.
+ */
+ if (msg_type) {
+ return (1 + (msg_type % (MSG_MAX_QNUM)));
+ }
+ return (0);
+}
+
+/*
+ * Routines to see if we have a receiver of type 0 either blocked waiting
+ * for a message. Simply return the first guy on the list.
+ */
+
+static msgq_wakeup_t *
+/* LINTED */
+msg_fnd_any_snd(kmsqid_t *qp, int msg_hash, long type)
+{
+ return (list_head(&qp->msg_wait_snd[0]));
+}
+
+static msgq_wakeup_t *
+/* LINTED */
+msg_fnd_any_rdr(kmsqid_t *qp, int msg_hash, long type)
+{
+ return (list_head(&qp->msg_cpy_block));
+}
+
+static msgq_wakeup_t *
+msg_fnd_spc_snd(kmsqid_t *qp, int msg_hash, long type)
+{
+ msgq_wakeup_t *walker;
+
+ walker = list_head(&qp->msg_wait_snd[msg_hash]);
+
+ while (walker && walker->msgw_type != type &&
+ (walker = list_next(&qp->msg_wait_snd[msg_hash], walker)));
+ return (walker);
+}
+
+static msgq_wakeup_t *
+/* LINTED */
+msg_fnd_neg_snd(kmsqid_t *qp, int msg_hash, long type)
+{
+ msgq_wakeup_t *qptr;
+ int count;
+ int check_index;
+ int neg_index;
+ int nbuckets;
+
+ if (!qp->msg_ngt_cnt) {
+ return (NULL);
+ }
+ neg_index = msg_type_hash(-type);
+
+ /*
+ * Check for a match among the negative type queues. Any buckets
+ * at neg_index or larger can match the type. Use the last send
+ * time to randomize the starting bucket to prevent starvation.
+ * Search all buckets from neg_index to MSG_MAX_QNUM, starting
+ * from the random starting point, and wrapping around after
+ * MSG_MAX_QNUM.
+ */
+
+ nbuckets = MSG_MAX_QNUM - neg_index + 1;
+ check_index = neg_index + (qp->msg_stime % nbuckets);
+
+ for (count = nbuckets; count > 0; count--) {
+ qptr = list_head(&qp->msg_wait_snd_ngt[check_index]);
+ while (qptr) {
+ /*
+ * The lowest hash bucket may actually contain
+ * message types that are not valid for this
+ * request. This can happen due to the fact that
+ * the message buckets actually contain a consecutive
+ * range of types.
+ */
+ if (-qptr->msgw_type >= type) {
+ return (qptr);
+ }
+ qptr = list_next(&qp->msg_wait_snd_ngt[msg_hash], qptr);
+ }
+
+ if (++check_index > MSG_MAX_QNUM) {
+ check_index = neg_index;
+ }
+ }
+ return (NULL);
+}
+
+static int
+msg_rcvq_sleep(list_t *queue, msgq_wakeup_t *entry, kmutex_t **lock,
+ kmsqid_t *qp)
+{
+ int cvres;
+
+ cv_init(&entry->msgw_wake_cv, NULL, 0, NULL);
+
+ list_insert_tail(queue, entry);
+
+ qp->msg_rcv_cnt++;
+ cvres = cv_wait_sig(&entry->msgw_wake_cv, *lock);
+ *lock = ipc_relock(msq_svc, qp->msg_perm.ipc_id, *lock);
+ qp->msg_rcv_cnt--;
+ /*
+ * We have woken up, so remove ourselves from the waiter list.
+ */
+ if (!IPC_FREE(&qp->msg_perm)) {
+ list_remove(queue, entry);
+ }
+
+ return (cvres);
+}
+
+static void
+msg_rcvq_wakeup_all(list_t *q_ptr)
+{
+ msgq_wakeup_t *q_walk;
+
+ q_walk = (msgq_wakeup_t *)list_head(q_ptr);
+ while (q_walk) {
+ /*
+ * Walk the entire list, wake every process up.
+ */
+ cv_signal(&q_walk->msgw_wake_cv);
+ q_walk = list_next(q_ptr, q_walk);
+ }
+}
+
/*
* msgsys - System entry point for msgctl, msgget, msgrcv, and msgsnd
* system calls.