diff options
author | dv142724 <none@none> | 2007-05-03 03:28:00 -0700 |
---|---|---|
committer | dv142724 <none@none> | 2007-05-03 03:28:00 -0700 |
commit | 2c5b6df145c068c61f714a0ccd0f4a3e64037fb5 (patch) | |
tree | ef6fa38c0fde41d79c8fa9f3c42ffbb2c175d748 /usr/src/uts/common/os/msg.c | |
parent | d9976468b7ae1e0b4133ee59b2fa5678de9e9cf2 (diff) | |
download | illumos-joyent-2c5b6df145c068c61f714a0ccd0f4a3e64037fb5.tar.gz |
6449436 msgsnd and msgrcv causing performance issues.
Diffstat (limited to 'usr/src/uts/common/os/msg.c')
-rw-r--r-- | usr/src/uts/common/os/msg.c | 725 |
1 files changed, 576 insertions, 149 deletions
diff --git a/usr/src/uts/common/os/msg.c b/usr/src/uts/common/os/msg.c index 22f04d9e16..e4d49ea1b3 100644 --- a/usr/src/uts/common/os/msg.c +++ b/usr/src/uts/common/os/msg.c @@ -19,7 +19,7 @@ * CDDL HEADER END */ /* - * Copyright 2006 Sun Microsystems, Inc. All rights reserved. + * Copyright 2007 Sun Microsystems, Inc. All rights reserved. * Use is subject to license terms. */ @@ -158,6 +158,139 @@ static struct modlsys modlsys32 = { }; #endif +/* + * Big Theory statement for message queue correctness + * + * The msgrcv and msgsnd functions no longer uses cv_broadcast to wake up + * receivers who are waiting for an event. Using the cv_broadcast method + * resulted in negative scaling when the number of waiting receivers are large + * (the thundering herd problem). Instead, the receivers waiting to receive a + * message are now linked in a queue-like fashion and awaken one at a time in + * a controlled manner. + * + * Receivers can block on two different classes of waiting list: + * 1) "sendwait" list, which is the more complex list of the two. The + * receiver will be awakened by a sender posting a new message. There + * are two types of "sendwait" list used: + * a) msg_wait_snd: handles all receivers who are looking for + * a message type >= 0, but was unable to locate a match. + * + * slot 0: reserved for receivers that have designated they + * will take any message type. + * rest: consist of receivers requesting a specific type + * but the type was not present. The entries are + * hashed into a bucket in an attempt to keep + * any list search relatively short. + * b) msg_wait_snd_ngt: handles all receivers that have designated + * a negative message type. Unlike msg_wait_snd, the hash bucket + * serves a range of negative message types (-1 to -5, -6 to -10 + * and so forth), where the last bucket is reserved for all the + * negative message types that hash outside of MSG_MAX_QNUM - 1. + * This is done this way to simplify the operation of locating a + * negative message type. + * + * 2) "copyout" list, where the receiver is awakened by another + * receiver after a message is copied out. This is a linked list + * of waiters that are awakened one at a time. Although the solution is + * not optimal, the complexity that would be added in for waking + * up the right entry far exceeds any potential pay back (too many + * correctness and corner case issues). + * + * The lists are doubly linked. In the case of the "sendwait" + * list, this allows the thread to remove itself from the list without having + * to traverse the list. In the case of the "copyout" list it simply allows + * us to use common functions with the "sendwait" list. + * + * To make sure receivers are not hung out to dry, we must guarantee: + * 1. If any queued message matches any receiver, then at least one + * matching receiver must be processing the request. + * 2. Blocking on the copyout queue is only temporary while messages + * are being copied out. The process is guaranted to wakeup + * when it gets to front of the queue (copyout is a FIFO). + * + * Rules for blocking and waking up: + * 1. A receiver entering msgrcv must examine all messages for a match + * before blocking on a sendwait queue. + * 2. If the receiver blocks because the message it chose is already + * being copied out, then when it wakes up needs to start start + * checking the messages from the beginning. + * 3) When ever a process returns from msgrcv for any reason, if it + * had attempted to copy a message or blocked waiting for a copy + * to complete it needs to wakeup the next receiver blocked on + * a copy out. + * 4) When a message is sent, the sender selects a process waiting + * for that type of message. This selection process rotates between + * receivers types of 0, negative and positive to prevent starvation of + * any one particular receiver type. + * 5) The following are the scenarios for processes that are awakened + * by a msgsnd: + * a) The process finds the message and is able to copy + * it out. Once complete, the process returns. + * b) The message that was sent that triggered the wakeup is no + * longer available (another process found the message first). + * We issue a wakeup on copy queue and then go back to + * sleep waiting for another matching message to be sent. + * c) The message that was supposed to be processed was + * already serviced by another process. However a different + * message is present which we can service. The message + * is copied and the process returns. + * d) The message is found, but some sort of error occurs that + * prevents the message from being copied. The receiver + * wakes up the next sender that can service this message + * type and returns an error to the caller. + * e) The message is found, but it is marked as being copied + * out. The receiver then goes to sleep on the copyout + * queue where it will be awakened again sometime in the future. + * + * + * 6) Whenever a message is found that matches the message type designated, + * but is being copied out we have to block on the copyout queue. + * After process copying finishes the copy out, it must wakeup (either + * directly or indirectly) all receivers who blocked on its copyout, + * so they are guaranteed a chance to examine the remaining messages. + * This is implemented via a chain of wakeups: Y wakes X, who wakes Z, + * and so on. The chain cannot be broken. This leads to the following + * cases: + * a) A receiver is finished copying the message (or encountered) + * an error), the first entry on the copyout queue is woken + * up. + * b) When the receiver is woken up, it attempts to locate + * a message type match. + * c) If a message type is found and + * -- MSG_RCVCOPY flag is not set, the message is + * marked for copying out. Regardless of the copyout + * success the next entry on the copyout queue is + * awakened and the operation is completed. + * -- MSG_RCVCOPY is set, we simply go back to sleep again + * on the copyout queue. + * d) If the message type is not found then we wakeup the next + * process on the copyout queue. + */ + +static ulong_t msg_type_hash(long); +static int msgq_check_err(kmsqid_t *qp, int cvres); +static int msg_rcvq_sleep(list_t *, msgq_wakeup_t *, kmutex_t **, + kmsqid_t *); +static int msg_copyout(kmsqid_t *, long, kmutex_t **, size_t *, size_t, + struct msg *, struct ipcmsgbuf *, int); +static void msg_rcvq_wakeup_all(list_t *); +static void msg_wakeup_rdr(kmsqid_t *, msg_select_t **, long); +static msgq_wakeup_t *msg_fnd_any_snd(kmsqid_t *, int, long); +static msgq_wakeup_t *msg_fnd_any_rdr(kmsqid_t *, int, long); +static msgq_wakeup_t *msg_fnd_neg_snd(kmsqid_t *, int, long); +static msgq_wakeup_t *msg_fnd_spc_snd(kmsqid_t *, int, long); +static struct msg *msgrcv_lookup(kmsqid_t *, long); + +msg_select_t msg_fnd_sndr[] = { + { msg_fnd_any_snd, &msg_fnd_sndr[1] }, + { msg_fnd_spc_snd, &msg_fnd_sndr[2] }, + { msg_fnd_neg_snd, &msg_fnd_sndr[0] } +}; + +msg_select_t msg_fnd_rdr[1] = { + { msg_fnd_any_rdr, &msg_fnd_rdr[0] }, +}; + static struct modlinkage modlinkage = { MODREV_1, &modlsys, @@ -205,8 +338,14 @@ msg_dtor(kipc_perm_t *perm) kmsqid_t *qp = (kmsqid_t *)perm; int ii; - for (ii = 0; ii < MAX_QNUM_CV; ii++) - ASSERT(qp->msg_rcv_cnt[ii] == 0); + for (ii = 0; ii <= MSG_MAX_QNUM; ii++) { + ASSERT(list_is_empty(&qp->msg_wait_snd[ii])); + ASSERT(list_is_empty(&qp->msg_wait_snd_ngt[ii])); + list_destroy(&qp->msg_wait_snd[ii]); + list_destroy(&qp->msg_wait_snd_ngt[ii]); + } + ASSERT(list_is_empty(&qp->msg_cpy_block)); + list_destroy(&qp->msg_cpy_block); ASSERT(qp->msg_snd_cnt == 0); ASSERT(qp->msg_cbytes == 0); list_destroy(&qp->msg_list); @@ -261,10 +400,15 @@ msg_rmid(kipc_perm_t *perm) msgunlink(qp, mp); ASSERT(qp->msg_cbytes == 0); - for (ii = 0; ii < MAX_QNUM_CV; ii++) { - if (qp->msg_rcv_cnt[ii]) - cv_broadcast(&qp->msg_rcv_cv[ii]); + /* + * Wake up everyone who is in a wait state of some sort + * for this message queue. + */ + for (ii = 0; ii <= MSG_MAX_QNUM; ii++) { + msg_rcvq_wakeup_all(&qp->msg_wait_snd[ii]); + msg_rcvq_wakeup_all(&qp->msg_wait_snd_ngt[ii]); } + msg_rcvq_wakeup_all(&qp->msg_cpy_block); if (qp->msg_snd_cnt) cv_broadcast(&qp->msg_snd_cv); } @@ -280,7 +424,7 @@ msgctl(int msgid, int cmd, void *arg) { STRUCT_DECL(msqid_ds, ds); /* SVR4 queue work area */ kmsqid_t *qp; /* ptr to associated q */ - int error, ii; + int error; struct cred *cr; model_t mdl = get_udatamodel(); struct msqid_ds64 ds64; @@ -338,12 +482,8 @@ msgctl(int msgid, int cmd, void *arg) return (set_errno(error)); } - for (ii = 0; ii < MAX_QNUM_CV; ii++) { - if (qp->msg_rcv_cnt[ii]) { - qp->msg_perm.ipc_mode |= MSG_RWAIT; - break; - } - } + if (qp->msg_rcv_cnt) + qp->msg_perm.ipc_mode |= MSG_RWAIT; if (qp->msg_snd_cnt) qp->msg_perm.ipc_mode |= MSG_WWAIT; ipcperm_stat(&STRUCT_BUF(ds)->msg_perm, &qp->msg_perm, mdl); @@ -381,12 +521,8 @@ msgctl(int msgid, int cmd, void *arg) break; case IPC_STAT64: - for (ii = 0; ii < MAX_QNUM_CV; ii++) { - if (qp->msg_rcv_cnt[ii]) { - qp->msg_perm.ipc_mode |= MSG_RWAIT; - break; - } - } + if (qp->msg_rcv_cnt) + qp->msg_perm.ipc_mode |= MSG_RWAIT; if (qp->msg_snd_cnt) qp->msg_perm.ipc_mode |= MSG_WWAIT; ipcperm_stat64(&ds64.msgx_perm, &qp->msg_perm); @@ -463,8 +599,29 @@ top: qp->msg_lspid = qp->msg_lrpid = 0; qp->msg_stime = qp->msg_rtime = 0; qp->msg_ctime = gethrestime_sec(); - for (ii = 0; ii < MAX_QNUM_CV; ii++) - qp->msg_rcv_cnt[ii] = 0; + qp->msg_ngt_cnt = 0; + qp->msg_neg_copy = 0; + for (ii = 0; ii <= MSG_MAX_QNUM; ii++) { + list_create(&qp->msg_wait_snd[ii], + sizeof (msgq_wakeup_t), + offsetof(msgq_wakeup_t, msgw_list)); + list_create(&qp->msg_wait_snd_ngt[ii], + sizeof (msgq_wakeup_t), + offsetof(msgq_wakeup_t, msgw_list)); + } + /* + * The proper initialization of msg_lowest_type is to the + * highest possible value. By doing this we guarantee that + * when the first send happens, the lowest type will be set + * properly. + */ + qp->msg_lowest_type = -1; + list_create(&qp->msg_cpy_block, + sizeof (msgq_wakeup_t), + offsetof(msgq_wakeup_t, msgw_list)); + qp->msg_fnd_sndr = &msg_fnd_sndr[0]; + qp->msg_fnd_rdr = &msg_fnd_rdr[0]; + qp->msg_rcv_cnt = 0; qp->msg_snd_cnt = 0; if (error = ipc_commit_begin(msq_svc, key, msgflg, @@ -488,162 +645,269 @@ top: return (id); } -/* - * msgrcv system call. - */ static ssize_t msgrcv(int msqid, struct ipcmsgbuf *msgp, size_t msgsz, long msgtyp, int msgflg) { - struct msg *mp; /* ptr to msg on q */ struct msg *smp; /* ptr to best msg on q */ kmsqid_t *qp; /* ptr to associated q */ kmutex_t *lock; size_t xtsz; /* transfer byte count */ - int error = 0, copyerror = 0; + int error = 0; int cvres; - STRUCT_HANDLE(ipcmsgbuf, umsgp); - model_t mdl = get_udatamodel(); + ulong_t msg_hash; + msgq_wakeup_t msg_entry; CPU_STATS_ADDQ(CPU, sys, msg, 1); /* bump msg send/rcv count */ - STRUCT_SET_HANDLE(umsgp, mdl, msgp); - if ((lock = ipc_lookup(msq_svc, msqid, (kipc_perm_t **)&qp)) == NULL) + msg_hash = msg_type_hash(msgtyp); + if ((lock = ipc_lookup(msq_svc, msqid, (kipc_perm_t **)&qp)) == NULL) { return ((ssize_t)set_errno(EINVAL)); + } ipc_hold(msq_svc, (kipc_perm_t *)qp); - if (error = ipcperm_access(&qp->msg_perm, MSG_R, CRED())) + if (error = ipcperm_access(&qp->msg_perm, MSG_R, CRED())) { goto msgrcv_out; + } + /* + * Various information (including the condvar_t) required for the + * process to sleep is provided by it's stack. + */ + msg_entry.msgw_thrd = curthread; + msg_entry.msgw_snd_wake = 0; + msg_entry.msgw_type = msgtyp; findmsg: - smp = NULL; - mp = list_head(&qp->msg_list); - if (msgtyp == 0) { - smp = mp; - } else { - for (; mp; mp = list_next(&qp->msg_list, mp)) { - if (msgtyp > 0) { - if (msgtyp != mp->msg_type) - continue; - smp = mp; - break; - } - if (mp->msg_type <= -msgtyp) { - if (smp && smp->msg_type <= mp->msg_type) - continue; - smp = mp; - } - } - } + smp = msgrcv_lookup(qp, msgtyp); if (smp) { /* - * Message found. + * We found a possible message to copy out. */ if ((smp->msg_flags & MSG_RCVCOPY) == 0) { /* - * No one else is copying this message. Copy it. + * It is available, attempt to copy it. */ - if (msgsz < smp->msg_size) { - if ((msgflg & MSG_NOERROR) == 0) { - error = E2BIG; - goto msgrcv_out; - } else { - xtsz = msgsz; - } - } else { - xtsz = smp->msg_size; - } - + error = msg_copyout(qp, msgtyp, &lock, &xtsz, msgsz, + smp, msgp, msgflg); /* - * Mark message as being copied out. Release mutex - * while copying out. + * Don't forget to wakeup a sleeper that blocked because + * we were copying things out. */ - ASSERT((smp->msg_flags & MSG_RCVCOPY) == 0); - smp->msg_flags |= MSG_RCVCOPY; - msg_hold(smp); - mutex_exit(lock); + msg_wakeup_rdr(qp, &qp->msg_fnd_rdr, 0); + goto msgrcv_out; + } + /* + * The selected message is being copied out, so block. We do + * not need to wake the next person up on the msg_cpy_block list + * due to the fact some one is copying out and they will get + * things moving again once the copy is completed. + */ + cvres = msg_rcvq_sleep(&qp->msg_cpy_block, + &msg_entry, &lock, qp); + error = msgq_check_err(qp, cvres); + if (error) { + goto msgrcv_out; + } + goto findmsg; + } + /* + * There isn't a message to copy out that matches the designated + * criteria. + */ + if (msgflg & IPC_NOWAIT) { + error = ENOMSG; + goto msgrcv_out; + } + msg_wakeup_rdr(qp, &qp->msg_fnd_rdr, 0); - if (mdl == DATAMODEL_NATIVE) { - copyerror = copyout(&smp->msg_type, msgp, - sizeof (smp->msg_type)); - } else { - /* - * 32-bit callers need an imploded msg type. - */ - int32_t msg_type32 = smp->msg_type; + /* + * Wait for new message. We keep the negative and positive types + * separate for performance reasons. + */ + msg_entry.msgw_snd_wake = 0; + if (msgtyp >= 0) { + cvres = msg_rcvq_sleep(&qp->msg_wait_snd[msg_hash], + &msg_entry, &lock, qp); + } else { + qp->msg_ngt_cnt++; + cvres = msg_rcvq_sleep(&qp->msg_wait_snd_ngt[msg_hash], + &msg_entry, &lock, qp); + qp->msg_ngt_cnt--; + } - copyerror = copyout(&msg_type32, msgp, - sizeof (msg_type32)); - } + if (!(error = msgq_check_err(qp, cvres))) { + goto findmsg; + } - if (copyerror == 0 && xtsz) - copyerror = copyout(smp->msg_addr, - STRUCT_FADDR(umsgp, mtext), xtsz); +msgrcv_out: + if (error) { + msg_wakeup_rdr(qp, &qp->msg_fnd_rdr, 0); + if (msg_entry.msgw_snd_wake) { + msg_wakeup_rdr(qp, &qp->msg_fnd_sndr, + msg_entry.msgw_snd_wake); + } + ipc_rele(msq_svc, (kipc_perm_t *)qp); + return ((ssize_t)set_errno(error)); + } + ipc_rele(msq_svc, (kipc_perm_t *)qp); + return ((ssize_t)xtsz); +} - /* - * Reclaim mutex, make sure queue still exists, - * and remove message. - */ - lock = ipc_lock(msq_svc, qp->msg_perm.ipc_id); - ASSERT(smp->msg_flags & MSG_RCVCOPY); - smp->msg_flags &= ~MSG_RCVCOPY; - msg_rele(smp); - - if (IPC_FREE(&qp->msg_perm)) { - error = EIDRM; - goto msgrcv_out; - } - /* - * MSG_RCVCOPY was set while we dropped and reaquired - * the lock. A thread looking for same message type - * might have entered during that interval and seeing - * MSG_RCVCOPY set, would have landed up in the sleepq. - */ - cv_broadcast(&qp->msg_rcv_cv[MSG_QNUM(smp->msg_type)]); - cv_broadcast(&qp->msg_rcv_cv[0]); +static int +msgq_check_err(kmsqid_t *qp, int cvres) +{ + if (IPC_FREE(&qp->msg_perm)) { + return (EIDRM); + } - if (copyerror) { - error = EFAULT; - goto msgrcv_out; - } - qp->msg_lrpid = ttoproc(curthread)->p_pid; - qp->msg_rtime = gethrestime_sec(); - msgunlink(qp, smp); - goto msgrcv_out; + if (cvres == 0) { + return (EINTR); + } + + return (0); +} + +static int +msg_copyout(kmsqid_t *qp, long msgtyp, kmutex_t **lock, size_t *xtsz_ret, + size_t msgsz, struct msg *smp, struct ipcmsgbuf *msgp, int msgflg) +{ + size_t xtsz; + STRUCT_HANDLE(ipcmsgbuf, umsgp); + model_t mdl = get_udatamodel(); + int copyerror = 0; + + STRUCT_SET_HANDLE(umsgp, mdl, msgp); + if (msgsz < smp->msg_size) { + if ((msgflg & MSG_NOERROR) == 0) { + return (E2BIG); + } else { + xtsz = msgsz; } + } else { + xtsz = smp->msg_size; + } + *xtsz_ret = xtsz; + + /* + * To prevent a DOS attack we mark the message as being + * copied out and release mutex. When the copy is completed + * we need to acquire the mutex and make the appropriate updates. + */ + ASSERT((smp->msg_flags & MSG_RCVCOPY) == 0); + smp->msg_flags |= MSG_RCVCOPY; + msg_hold(smp); + if (msgtyp < 0) { + ASSERT(qp->msg_neg_copy == 0); + qp->msg_neg_copy = 1; + } + mutex_exit(*lock); + if (mdl == DATAMODEL_NATIVE) { + copyerror = copyout(&smp->msg_type, msgp, + sizeof (smp->msg_type)); } else { /* - * No message found. + * 32-bit callers need an imploded msg type. */ - if (msgflg & IPC_NOWAIT) { - error = ENOMSG; - goto msgrcv_out; - } + int32_t msg_type32 = smp->msg_type; + + copyerror = copyout(&msg_type32, msgp, + sizeof (msg_type32)); + } + + if (copyerror == 0 && xtsz) { + copyerror = copyout(smp->msg_addr, + STRUCT_FADDR(umsgp, mtext), xtsz); } - /* Wait for new message */ - qp->msg_rcv_cnt[MSG_QNUM(msgtyp)]++; - cvres = cv_wait_sig(&qp->msg_rcv_cv[MSG_QNUM(msgtyp)], lock); - lock = ipc_relock(msq_svc, qp->msg_perm.ipc_id, lock); - qp->msg_rcv_cnt[MSG_QNUM(msgtyp)]--; + /* + * Reclaim the mutex and make sure the message queue still exists. + */ + *lock = ipc_lock(msq_svc, qp->msg_perm.ipc_id); + if (msgtyp < 0) { + qp->msg_neg_copy = 0; + } + ASSERT(smp->msg_flags & MSG_RCVCOPY); + smp->msg_flags &= ~MSG_RCVCOPY; + msg_rele(smp); if (IPC_FREE(&qp->msg_perm)) { - error = EIDRM; - goto msgrcv_out; + return (EIDRM); } - if (cvres == 0) { - error = EINTR; - goto msgrcv_out; + if (copyerror) { + return (EFAULT); } + qp->msg_lrpid = ttoproc(curthread)->p_pid; + qp->msg_rtime = gethrestime_sec(); + msgunlink(qp, smp); + return (0); +} - goto findmsg; +static struct msg * +msgrcv_lookup(kmsqid_t *qp, long msgtyp) +{ + struct msg *smp = NULL; + int qp_low; + struct msg *mp; /* ptr to msg on q */ + int low_msgtype; + static struct msg neg_copy_smp; -msgrcv_out: - ipc_rele(msq_svc, (kipc_perm_t *)qp); - if (error) - return ((ssize_t)set_errno(error)); - return ((ssize_t)xtsz); + mp = list_head(&qp->msg_list); + if (msgtyp == 0) { + smp = mp; + } else { + qp_low = qp->msg_lowest_type; + if (msgtyp > 0) { + /* + * If our lowest possible message type is larger than + * the message type desired, then we know there is + * no entry present. + */ + if (qp_low > msgtyp) { + return (NULL); + } + + for (; mp; mp = list_next(&qp->msg_list, mp)) { + if (msgtyp == mp->msg_type) { + smp = mp; + break; + } + } + } else { + /* + * We have kept track of the lowest possible message + * type on the send queue. This allows us to terminate + * the search early if we find a message type of that + * type. Note, the lowest type may not be the actual + * lowest value in the system, it is only guaranteed + * that there isn't a value lower than that. + */ + low_msgtype = -msgtyp; + if (low_msgtype++ < qp_low) { + return (NULL); + } + if (qp->msg_neg_copy) { + neg_copy_smp.msg_flags = MSG_RCVCOPY; + return (&neg_copy_smp); + } + for (; mp; mp = list_next(&qp->msg_list, mp)) { + if (mp->msg_type < low_msgtype) { + smp = mp; + low_msgtype = mp->msg_type; + if (low_msgtype == qp_low) { + break; + } + } + } + if (smp) { + /* + * Update the lowest message type. + */ + qp->msg_lowest_type = smp->msg_type; + } + } + } + return (smp); } /* @@ -879,13 +1143,7 @@ top: lock = ipc_relock(msq_svc, qp->msg_perm.ipc_id, lock); qp->msg_snd_cnt--; - if (IPC_FREE(&qp->msg_perm)) { - error = EIDRM; - goto msgsnd_out; - } - - if (cvres == 0) { - error = EINTR; + if (error = msgq_check_err(qp, cvres)) { goto msgsnd_out; } } @@ -922,18 +1180,13 @@ top: qp->msg_lspid = curproc->p_pid; qp->msg_stime = gethrestime_sec(); mp->msg_type = type; - mp->msg_flags = 0; + if (qp->msg_lowest_type > type) + qp->msg_lowest_type = type; list_insert_tail(&qp->msg_list, mp); /* - * For all message type >= 1. + * Get the proper receiver going. */ - if (qp->msg_rcv_cnt[MSG_QNUM(type)]) - cv_broadcast(&qp->msg_rcv_cv[MSG_QNUM(type)]); - /* - * For all message type < 1. - */ - if (qp->msg_rcv_cnt[0]) - cv_broadcast(&qp->msg_rcv_cv[0]); + msg_wakeup_rdr(qp, &qp->msg_fnd_sndr, type); msgsnd_out: if (lock) @@ -948,6 +1201,180 @@ msgsnd_out: return (0); } +static void +msg_wakeup_rdr(kmsqid_t *qp, msg_select_t **flist, long type) +{ + msg_select_t *walker = *flist; + msgq_wakeup_t *wakeup; + ulong_t msg_hash; + + msg_hash = msg_type_hash(type); + + do { + wakeup = walker->selection(qp, msg_hash, type); + walker = walker->next_selection; + } while (!wakeup && walker != *flist); + + *flist = (*flist)->next_selection; + if (wakeup) { + if (type) { + wakeup->msgw_snd_wake = type; + } + cv_signal(&wakeup->msgw_wake_cv); + } +} + +static ulong_t +msg_type_hash(long msg_type) +{ + long temp; + ulong_t hash; + + if (msg_type < 0) { + /* + * Negative message types are hashed over an + * interval. Any message type that hashes + * beyond MSG_MAX_QNUM is automatically placed + * in the last bucket. + */ + temp = -msg_type; + hash = temp / MSG_NEG_INTERVAL; + if (hash > MSG_MAX_QNUM) { + hash = MSG_MAX_QNUM; + } + return (hash); + } + + /* + * 0 or positive message type. The first bucket is reserved for + * message receivers of type 0, the other buckets we hash into. + */ + if (msg_type) { + return (1 + (msg_type % (MSG_MAX_QNUM))); + } + return (0); +} + +/* + * Routines to see if we have a receiver of type 0 either blocked waiting + * for a message. Simply return the first guy on the list. + */ + +static msgq_wakeup_t * +/* LINTED */ +msg_fnd_any_snd(kmsqid_t *qp, int msg_hash, long type) +{ + return (list_head(&qp->msg_wait_snd[0])); +} + +static msgq_wakeup_t * +/* LINTED */ +msg_fnd_any_rdr(kmsqid_t *qp, int msg_hash, long type) +{ + return (list_head(&qp->msg_cpy_block)); +} + +static msgq_wakeup_t * +msg_fnd_spc_snd(kmsqid_t *qp, int msg_hash, long type) +{ + msgq_wakeup_t *walker; + + walker = list_head(&qp->msg_wait_snd[msg_hash]); + + while (walker && walker->msgw_type != type && + (walker = list_next(&qp->msg_wait_snd[msg_hash], walker))); + return (walker); +} + +static msgq_wakeup_t * +/* LINTED */ +msg_fnd_neg_snd(kmsqid_t *qp, int msg_hash, long type) +{ + msgq_wakeup_t *qptr; + int count; + int check_index; + int neg_index; + int nbuckets; + + if (!qp->msg_ngt_cnt) { + return (NULL); + } + neg_index = msg_type_hash(-type); + + /* + * Check for a match among the negative type queues. Any buckets + * at neg_index or larger can match the type. Use the last send + * time to randomize the starting bucket to prevent starvation. + * Search all buckets from neg_index to MSG_MAX_QNUM, starting + * from the random starting point, and wrapping around after + * MSG_MAX_QNUM. + */ + + nbuckets = MSG_MAX_QNUM - neg_index + 1; + check_index = neg_index + (qp->msg_stime % nbuckets); + + for (count = nbuckets; count > 0; count--) { + qptr = list_head(&qp->msg_wait_snd_ngt[check_index]); + while (qptr) { + /* + * The lowest hash bucket may actually contain + * message types that are not valid for this + * request. This can happen due to the fact that + * the message buckets actually contain a consecutive + * range of types. + */ + if (-qptr->msgw_type >= type) { + return (qptr); + } + qptr = list_next(&qp->msg_wait_snd_ngt[msg_hash], qptr); + } + + if (++check_index > MSG_MAX_QNUM) { + check_index = neg_index; + } + } + return (NULL); +} + +static int +msg_rcvq_sleep(list_t *queue, msgq_wakeup_t *entry, kmutex_t **lock, + kmsqid_t *qp) +{ + int cvres; + + cv_init(&entry->msgw_wake_cv, NULL, 0, NULL); + + list_insert_tail(queue, entry); + + qp->msg_rcv_cnt++; + cvres = cv_wait_sig(&entry->msgw_wake_cv, *lock); + *lock = ipc_relock(msq_svc, qp->msg_perm.ipc_id, *lock); + qp->msg_rcv_cnt--; + /* + * We have woken up, so remove ourselves from the waiter list. + */ + if (!IPC_FREE(&qp->msg_perm)) { + list_remove(queue, entry); + } + + return (cvres); +} + +static void +msg_rcvq_wakeup_all(list_t *q_ptr) +{ + msgq_wakeup_t *q_walk; + + q_walk = (msgq_wakeup_t *)list_head(q_ptr); + while (q_walk) { + /* + * Walk the entire list, wake every process up. + */ + cv_signal(&q_walk->msgw_wake_cv); + q_walk = list_next(q_ptr, q_walk); + } +} + /* * msgsys - System entry point for msgctl, msgget, msgrcv, and msgsnd * system calls. |