summaryrefslogtreecommitdiff
path: root/usr/src/lib/mergeq/workq.c
diff options
context:
space:
mode:
authorRobert Mustacchi <rm@joyent.com>2015-07-27 00:35:52 +0000
committerRobert Mustacchi <rm@joyent.com>2015-07-28 19:05:39 +0000
commit14c3d85bba96b10c225341f4c7f4af93c314b508 (patch)
tree8ef3a2513e220291581564d4db16bd5b4fbf127e /usr/src/lib/mergeq/workq.c
parent9211d9b4c9ccc64292132e8e87c92ad6084b29a8 (diff)
downloadillumos-joyent-14c3d85bba96b10c225341f4c7f4af93c314b508.tar.gz
OS-4548 CTF Everywhere: Phase 1
OS-4549 ctfconvert should be implemented in terms of libctf OS-4550 ctfconvert could convert multiple compilation units OS-4553 want multi-threaded ctfmerge OS-4552 Want general workq OS-4551 Want general mergeq OS-4554 ctfdiff doesn't properly handle unknown options OS-4555 ctfdiff's symbols could be more consistently prefixed OS-4048 new ctfmerge uses tmpfile after freeing it OS-4556 ctfdump should drive on when incomplete files exist OS-4557 ctf_add_encoded assigns() incorrect byte size to types OS-4558 ctf_add_{struct,union,enum} can reuse forwards OS-4559 ctf_add_{struct,union,enum} occasionally forget to dirty the ctf_file_t OS-4560 ctf_add_member could better handle bitfields OS-4561 ctf_type_size() reports wrong size for forwards OS-4563 diffing CTF typedefs needs to walk multiple definitions OS-4564 build scripts shouldn't hardcode CTF paths OS-4565 ctf_fdcreate could be more flexible OS-4566 Want libctf ctf_kind_name() function OS-4567 Want libctf function to set struct/union size OS-4568 Want ctfmerge altexec
Diffstat (limited to 'usr/src/lib/mergeq/workq.c')
-rw-r--r--usr/src/lib/mergeq/workq.c311
1 files changed, 311 insertions, 0 deletions
diff --git a/usr/src/lib/mergeq/workq.c b/usr/src/lib/mergeq/workq.c
new file mode 100644
index 0000000000..b9f1f2aa1c
--- /dev/null
+++ b/usr/src/lib/mergeq/workq.c
@@ -0,0 +1,311 @@
+/*
+ * This file and its contents are supplied under the terms of the
+ * Common Development and Distribution License ("CDDL"), version 1.0.
+ * You may only use this file in accordance with the terms of version
+ * 1.0 of the CDDL.
+ *
+ * A full copy of the text of the CDDL should have accompanied this
+ * source. A copy of the CDDL is also available via the Internet at
+ * http://www.illumos.org/license/CDDL.
+ */
+
+/*
+ * Copyright 2015 Joyent, Inc.
+ */
+
+/*
+ * Work queue
+ *
+ * A multi-threaded work queue.
+ *
+ * The general design of this is to add a fixed number of items to the queue and
+ * then drain them with the specified number of threads.
+ */
+
+#include <strings.h>
+#include <sys/debug.h>
+#include <thread.h>
+#include <synch.h>
+#include <errno.h>
+#include <limits.h>
+#include <stdlib.h>
+
+#include "workq.h"
+
+struct workq {
+ mutex_t wq_lock; /* Protects below items */
+ cond_t wq_cond; /* Condition variable */
+ void **wq_items; /* Array of items to process */
+ size_t wq_nitems; /* Number of items in queue */
+ size_t wq_cap; /* Queue capacity */
+ size_t wq_next; /* Next item to process */
+ uint_t wq_ndthreads; /* Desired number of threads */
+ thread_t *wq_thrs; /* Actual threads */
+ workq_proc_f *wq_func; /* Processing function */
+ void *wq_arg; /* Argument for processing */
+ boolean_t wq_working; /* Are we actively using it? */
+ boolean_t wq_iserror; /* Have we encountered an error? */
+ int wq_error; /* Error value, if any */
+};
+
+#define WORKQ_DEFAULT_CAP 64
+
+static int
+workq_error(int err)
+{
+ VERIFY(err != 0);
+ errno = err;
+ return (WORKQ_ERROR);
+}
+
+void
+workq_fini(workq_t *wqp)
+{
+ if (wqp == NULL)
+ return;
+
+ VERIFY(wqp->wq_working != B_TRUE);
+ VERIFY0(mutex_destroy(&wqp->wq_lock));
+ VERIFY0(cond_destroy(&wqp->wq_cond));
+ if (wqp->wq_cap > 0)
+ workq_free(wqp->wq_items, sizeof (void *) * wqp->wq_cap);
+ if (wqp->wq_ndthreads > 0)
+ workq_free(wqp->wq_thrs, sizeof (thread_t) * wqp->wq_ndthreads);
+ workq_free(wqp, sizeof (workq_t));
+}
+
+int
+workq_init(workq_t **outp, uint_t nthrs)
+{
+ int ret;
+ workq_t *wqp;
+
+ wqp = workq_alloc(sizeof (workq_t));
+ if (wqp == NULL)
+ return (workq_error(ENOMEM));
+
+ bzero(wqp, sizeof (workq_t));
+ wqp->wq_items = workq_alloc(sizeof (void *) * WORKQ_DEFAULT_CAP);
+ if (wqp->wq_items == NULL) {
+ workq_free(wqp, sizeof (workq_t));
+ return (workq_error(ENOMEM));
+ }
+ bzero(wqp->wq_items, sizeof (void *) * WORKQ_DEFAULT_CAP);
+
+ wqp->wq_ndthreads = nthrs - 1;
+ if (wqp->wq_ndthreads > 0) {
+ wqp->wq_thrs = workq_alloc(sizeof (thread_t) *
+ wqp->wq_ndthreads);
+ if (wqp->wq_thrs == NULL) {
+ workq_free(wqp->wq_items, sizeof (void *) *
+ WORKQ_DEFAULT_CAP);
+ workq_free(wqp, sizeof (workq_t));
+ return (workq_error(ENOMEM));
+ }
+ }
+
+ if ((ret = mutex_init(&wqp->wq_lock, USYNC_THREAD | LOCK_ERRORCHECK,
+ NULL)) != 0) {
+ if (wqp->wq_ndthreads > 0) {
+ workq_free(wqp->wq_thrs,
+ sizeof (thread_t) * wqp->wq_ndthreads);
+ }
+ workq_free(wqp->wq_items, sizeof (void *) * WORKQ_DEFAULT_CAP);
+ workq_free(wqp, sizeof (workq_t));
+ return (workq_error(ret));
+ }
+
+ if ((ret = cond_init(&wqp->wq_cond, USYNC_THREAD, NULL)) != 0) {
+ VERIFY0(mutex_destroy(&wqp->wq_lock));
+ if (wqp->wq_ndthreads > 0) {
+ workq_free(wqp->wq_thrs,
+ sizeof (thread_t) * wqp->wq_ndthreads);
+ }
+ workq_free(wqp->wq_items, sizeof (void *) * WORKQ_DEFAULT_CAP);
+ workq_free(wqp, sizeof (workq_t));
+ return (workq_error(ret));
+ }
+
+ wqp->wq_cap = WORKQ_DEFAULT_CAP;
+ *outp = wqp;
+ return (0);
+}
+
+static void
+workq_reset(workq_t *wqp)
+{
+ VERIFY(MUTEX_HELD(&wqp->wq_lock));
+ VERIFY(wqp->wq_working == B_FALSE);
+ if (wqp->wq_cap > 0)
+ bzero(wqp->wq_items, sizeof (void *) * wqp->wq_cap);
+ wqp->wq_nitems = 0;
+ wqp->wq_next = 0;
+ wqp->wq_func = NULL;
+ wqp->wq_arg = NULL;
+ wqp->wq_iserror = B_FALSE;
+ wqp->wq_error = 0;
+}
+
+static int
+workq_grow(workq_t *wqp)
+{
+ size_t ncap;
+ void **items;
+
+ VERIFY(MUTEX_HELD(&wqp->wq_lock));
+ VERIFY(wqp->wq_working == B_FALSE);
+
+ if (SIZE_MAX - wqp->wq_cap < WORKQ_DEFAULT_CAP)
+ return (ENOSPC);
+
+ ncap = wqp->wq_cap + WORKQ_DEFAULT_CAP;
+ items = workq_alloc(ncap * sizeof (void *));
+ if (items == NULL)
+ return (ENOMEM);
+
+ bzero(items, ncap * sizeof (void *));
+ bcopy(wqp->wq_items, items, wqp->wq_cap * sizeof (void *));
+ workq_free(wqp->wq_items, sizeof (void *) * wqp->wq_cap);
+ wqp->wq_items = items;
+ wqp->wq_cap = ncap;
+ return (0);
+}
+
+int
+workq_add(workq_t *wqp, void *item)
+{
+ VERIFY0(mutex_lock(&wqp->wq_lock));
+ if (wqp->wq_working == B_TRUE) {
+ VERIFY0(mutex_unlock(&wqp->wq_lock));
+ return (workq_error(ENXIO));
+ }
+
+ if (wqp->wq_nitems == wqp->wq_cap) {
+ int ret;
+
+ if ((ret = workq_grow(wqp)) != 0) {
+ VERIFY0(mutex_unlock(&wqp->wq_lock));
+ return (workq_error(ret));
+ }
+ }
+
+ wqp->wq_items[wqp->wq_nitems] = item;
+ wqp->wq_nitems++;
+
+ VERIFY0(mutex_unlock(&wqp->wq_lock));
+
+ return (0);
+}
+
+static void *
+workq_pop(workq_t *wqp)
+{
+ void *out;
+
+ VERIFY(MUTEX_HELD(&wqp->wq_lock));
+ VERIFY(wqp->wq_next < wqp->wq_nitems);
+
+ out = wqp->wq_items[wqp->wq_next];
+ wqp->wq_items[wqp->wq_next] = NULL;
+ wqp->wq_next++;
+
+ return (out);
+}
+
+static void *
+workq_thr_work(void *arg)
+{
+ workq_t *wqp = arg;
+
+ VERIFY0(mutex_lock(&wqp->wq_lock));
+ VERIFY(wqp->wq_working == B_TRUE);
+
+ for (;;) {
+ int ret;
+ void *item;
+
+ if (wqp->wq_iserror == B_TRUE ||
+ wqp->wq_next == wqp->wq_nitems) {
+ VERIFY0(mutex_unlock(&wqp->wq_lock));
+ return (NULL);
+ }
+
+ item = workq_pop(wqp);
+
+ VERIFY0(mutex_unlock(&wqp->wq_lock));
+ ret = wqp->wq_func(item, wqp->wq_arg);
+ VERIFY0(mutex_lock(&wqp->wq_lock));
+
+ if (ret != 0) {
+ if (wqp->wq_iserror == B_FALSE) {
+ wqp->wq_iserror = B_TRUE;
+ wqp->wq_error = ret;
+ }
+ VERIFY0(mutex_unlock(&wqp->wq_lock));
+ return (NULL);
+ }
+ }
+}
+
+int
+workq_work(workq_t *wqp, workq_proc_f *func, void *arg, int *errp)
+{
+ int i, ret;
+ boolean_t seterr = B_FALSE;
+
+ if (wqp == NULL || func == NULL)
+ return (workq_error(EINVAL));
+
+ VERIFY0(mutex_lock(&wqp->wq_lock));
+ if (wqp->wq_working == B_TRUE) {
+ VERIFY0(mutex_unlock(&wqp->wq_lock));
+ return (workq_error(EBUSY));
+ }
+
+ if (wqp->wq_nitems == 0) {
+ workq_reset(wqp);
+ VERIFY0(mutex_unlock(&wqp->wq_lock));
+ return (0);
+ }
+
+ wqp->wq_func = func;
+ wqp->wq_arg = arg;
+ wqp->wq_next = 0;
+ wqp->wq_working = B_TRUE;
+
+ ret = 0;
+ for (i = 0; i < wqp->wq_ndthreads; i++) {
+ ret = thr_create(NULL, 0, workq_thr_work, wqp, 0,
+ &wqp->wq_thrs[i]);
+ if (ret != 0) {
+ wqp->wq_iserror = B_TRUE;
+ }
+ }
+
+ VERIFY0(mutex_unlock(&wqp->wq_lock));
+ if (ret == 0)
+ (void) workq_thr_work(wqp);
+
+ for (i = 0; i < wqp->wq_ndthreads; i++) {
+ VERIFY0(thr_join(wqp->wq_thrs[i], NULL, NULL));
+ }
+
+ VERIFY0(mutex_lock(&wqp->wq_lock));
+ wqp->wq_working = B_FALSE;
+ if (ret == 0 && wqp->wq_iserror == B_TRUE) {
+ ret = WORKQ_UERROR;
+ if (errp != NULL)
+ *errp = wqp->wq_error;
+ } else if (ret != 0) {
+ VERIFY(wqp->wq_iserror == B_FALSE);
+ seterr = B_TRUE;
+ }
+
+ workq_reset(wqp);
+ VERIFY0(mutex_unlock(&wqp->wq_lock));
+
+ if (seterr == B_TRUE)
+ return (workq_error(ret));
+
+ return (ret);
+}