diff options
author | Robert Mustacchi <rm@joyent.com> | 2015-07-27 00:35:52 +0000 |
---|---|---|
committer | Robert Mustacchi <rm@joyent.com> | 2015-07-28 19:05:39 +0000 |
commit | 14c3d85bba96b10c225341f4c7f4af93c314b508 (patch) | |
tree | 8ef3a2513e220291581564d4db16bd5b4fbf127e /usr/src/lib/mergeq | |
parent | 9211d9b4c9ccc64292132e8e87c92ad6084b29a8 (diff) | |
download | illumos-joyent-14c3d85bba96b10c225341f4c7f4af93c314b508.tar.gz |
OS-4548 CTF Everywhere: Phase 1
OS-4549 ctfconvert should be implemented in terms of libctf
OS-4550 ctfconvert could convert multiple compilation units
OS-4553 want multi-threaded ctfmerge
OS-4552 Want general workq
OS-4551 Want general mergeq
OS-4554 ctfdiff doesn't properly handle unknown options
OS-4555 ctfdiff's symbols could be more consistently prefixed
OS-4048 new ctfmerge uses tmpfile after freeing it
OS-4556 ctfdump should drive on when incomplete files exist
OS-4557 ctf_add_encoded assigns() incorrect byte size to types
OS-4558 ctf_add_{struct,union,enum} can reuse forwards
OS-4559 ctf_add_{struct,union,enum} occasionally forget to dirty the ctf_file_t
OS-4560 ctf_add_member could better handle bitfields
OS-4561 ctf_type_size() reports wrong size for forwards
OS-4563 diffing CTF typedefs needs to walk multiple definitions
OS-4564 build scripts shouldn't hardcode CTF paths
OS-4565 ctf_fdcreate could be more flexible
OS-4566 Want libctf ctf_kind_name() function
OS-4567 Want libctf function to set struct/union size
OS-4568 Want ctfmerge altexec
Diffstat (limited to 'usr/src/lib/mergeq')
-rw-r--r-- | usr/src/lib/mergeq/mergeq.c | 606 | ||||
-rw-r--r-- | usr/src/lib/mergeq/mergeq.h | 52 | ||||
-rw-r--r-- | usr/src/lib/mergeq/workq.c | 311 | ||||
-rw-r--r-- | usr/src/lib/mergeq/workq.h | 52 |
4 files changed, 1021 insertions, 0 deletions
diff --git a/usr/src/lib/mergeq/mergeq.c b/usr/src/lib/mergeq/mergeq.c new file mode 100644 index 0000000000..fd9a9c32ea --- /dev/null +++ b/usr/src/lib/mergeq/mergeq.c @@ -0,0 +1,606 @@ +/* + * This file and its contents are supplied under the terms of the + * Common Development and Distribution License ("CDDL"), version 1.0. + * You may only use this file in accordance with the terms of version + * 1.0 of the CDDL. + * + * A full copy of the text of the CDDL should have accompanied this + * source. A copy of the CDDL is also available via the Internet at + * http://www.illumos.org/license/CDDL. + */ + +/* + * Copyright 2015 Joyent, Inc. + */ + +/* + * Merge queue + * + * A multi-threaded merging queue. + * + * The general constraint of the merge queue is that if a set of items are + * inserted into the queue in the same order, then no matter how many threads + * are on the scene, we will always process the items in the same order. The + * secondary constraint is that to support environments that must be + * single-threaded, we explicitly *must not* create a thread in the case where + * the number of requested threads is just one. + * + * To that end, we've designed our queue as a circular buffer. We will grow that + * buffer to contain enough space for all the input items, after which we'll + * then treat it as a circular buffer. + * + * Items will be issued to a processing function two at a time, until there is + * only one item remaining in the queue, at which point we will be doing doing + * any merging work. + * + * A given queue has three different entries that we care about tracking: + * + * o mq_nproc - What is the slot of the next item to process for something + * looking for work. + * + * o mq_next - What is the slot of the next item that should be inserted into + * the queue. + * + * o mq_ncommit - What is the slot of the next item that should be committed. + * + * When a thread comes and looks for work, we pop entries off of the queue based + * on the index provided by mq_nproc. At the same time, it also gets the slot + * that it should place the result in, which is mq_next. However, because we + * have multiple threads that are operating on the system, we want to make sure + * that we push things onto the queue in order. We do that by allocating a slot + * to each task and when it completes, it waits for its slot to be ready based + * on it being the value of mq_ncommit. + * + * In addition, we keep track of the number of items in the queue as well as the + * number of active workers. There's also a generation count that is used to + * figure out when the various values might lap one another. + * + * The following images show what happens when we have a queue with six items + * and whose capacity has been shrunk to six, to better fit in the screen. + * + * + * 1) This is the initial configuration of the queue right before any processing + * is done in the context of mergeq_merge(). Every box has an initial item for + * merging in it (represented by an 'x'). Here, the mq_nproc, mq_next, and + * mq_ncommit will all point at the initial entry. However, the mq_next has + * already lapped around the array and thus has a generation count of one. + * + * The '+' characters indicate which bucket the corresponding value of mq_nproc, + * mq_ncommit, and mq_nproc. + * + * +---++---++---++---++---++---+ + * | X || X || X || X || X || X | + * +---++---++---++---++---++---+ + * mq_next (g1) + + * mq_ncommit (g0) + + * mq_nproc (g0) + + * + * 2) This shows the state right as the first thread begins to process an entry. + * Note in this example we will have two threads processing this queue. Note, + * mq_ncommit has not advanced. This is because the first thread has started + * processing entries, but it has not finished, and thus we can't commit it. + * We've incremented mq_next by one because it has gone ahead and assigned a + * single entry. We've incremented mq_nproc by two, because we have removed two + * entries and thus will have another set available. + * + * +---++---++---++---++---++---+ t1 - slot 0 + * | || || X || X || X || X | t2 - idle + * +---++---++---++---++---++---+ + * mq_next (g1) + + * mq_ncommit (g0) + + * mq_nproc (g0) + + * + * + * 3) This shows the state right after the second thread begins to process an + * entry, note that the first thread has not finished. The changes are very + * similar to the previous state, we've advanced, mq_nproc and mq_next, but not + * mq_ncommit. + * + * +---++---++---++---++---++---+ t1 - slot 0 + * | || || || || X || X | t2 - slot 1 + * +---++---++---++---++---++---+ + * mq_next (g1) + + * mq_ncommit (g0) + + * mq_nproc (g0) + + * + * 4) This shows the state after thread one has finished processing an item, but + * before it does anything else. Note that even if thread two finishes early, it + * cannot commit its item until thread one finishes. Here 'Y' refers to the + * result of merging the first two 'X's. + * + * +---++---++---++---++---++---+ t1 - idle + * | Y || || || || X || X | t2 - slot 1 + * +---++---++---++---++---++---+ + * mq_next (g1) + + * mq_ncommit (g0) + + * mq_nproc (g0) + + * + * 5) This shows the state after thread one has begun to process the next round + * and after thread two has committed, but before it begins processing the next + * item. Note that mq_nproc has wrapped around and we've bumped its generation + * counter. + * + * +---++---++---++---++---++---+ t1 - slot 2 + * | Y || Y || || || || | t2 - idle + * +---++---++---++---++---++---+ + * mq_next (g1) + + * mq_ncommit (g0) + + * mq_nproc (g0) + + * + * 6) Here, thread two, will take the next two Y values and thread 1 will commit + * its 'Y'. Thread one now must wait until thread two finishes such that it can + * do additional work. + * + * +---++---++---++---++---++---+ t1 - waiting + * | || || Y || || || | t2 - slot 3 + * +---++---++---++---++---++---+ + * mq_next (g1) + + * mq_ncommit (g0) + + * mq_nproc (g0) + + * + * 7) Here, thread two has committed and thread one is about to go process the + * final entry. The character 'Z' represents the results of merging two 'Y's. + * + * +---++---++---++---++---++---+ t1 - idle + * | || || Y || Z || || | t2 - idle + * +---++---++---++---++---++---+ + * mq_next (g1) + + * mq_ncommit (g0) + + * mq_nproc (g0) + + * + * 8) Here, thread one is processing the final item. Thread two is waiting in + * mergeq_pop() for enough items to be available. In this case, it will never + * happen; however, once all threads have finished it will break out. + * + * +---++---++---++---++---++---+ t1 - slot 4 + * | || || || || || | t2 - idle + * +---++---++---++---++---++---+ + * mq_next (g1) + + * mq_ncommit (g0) + + * mq_nproc (g0) + + * + * 9) This is the final state of the queue, it has a single '*' item which is + * the final merge result. At this point, both thread one and thread two would + * stop processing and we'll return the result to the user. + * + * +---++---++---++---++---++---+ t1 - slot 4 + * | || || || || * || | t2 - idle + * +---++---++---++---++---++---+ + * mq_next (g1) + + * mq_ncommit (g0) + + * mq_nproc (g0) + + * + * + * Note, that if at any point in time the processing function fails, then all + * the merges will quiesce and that error will be propagated back to the user. + */ + +#include <strings.h> +#include <sys/debug.h> +#include <thread.h> +#include <synch.h> +#include <errno.h> +#include <limits.h> +#include <stdlib.h> + +#include "mergeq.h" + +struct mergeq { + mutex_t mq_lock; /* Protects items below */ + cond_t mq_cond; /* Condition variable */ + void **mq_items; /* Array of items to process */ + size_t mq_nitems; /* Number of items in the queue */ + size_t mq_cap; /* Capacity of the items */ + size_t mq_next; /* Place to put next entry */ + size_t mq_gnext; /* Generation for next */ + size_t mq_nproc; /* Index of next thing to process */ + size_t mq_gnproc; /* Generation for next proc */ + size_t mq_ncommit; /* Index of the next thing to commit */ + size_t mq_gncommit; /* Commit generation */ + uint_t mq_nactthrs; /* Number of active threads */ + uint_t mq_ndthreads; /* Desired number of threads */ + thread_t *mq_thrs; /* Actual threads */ + mergeq_proc_f *mq_func; /* Processing function */ + void *mq_arg; /* Argument for processing */ + boolean_t mq_working; /* Are we working on processing */ + boolean_t mq_iserror; /* Have we encountered an error? */ + int mq_error; +}; + +#define MERGEQ_DEFAULT_CAP 64 + +static int +mergeq_error(int err) +{ + errno = err; + return (MERGEQ_ERROR); +} + +void +mergeq_fini(mergeq_t *mqp) +{ + if (mqp == NULL) + return; + + VERIFY(mqp->mq_working != B_TRUE); + + if (mqp->mq_items != NULL) + mergeq_free(mqp->mq_items, sizeof (void *) * mqp->mq_cap); + if (mqp->mq_ndthreads > 0) { + mergeq_free(mqp->mq_thrs, sizeof (thread_t) * + mqp->mq_ndthreads); + } + VERIFY0(cond_destroy(&mqp->mq_cond)); + VERIFY0(mutex_destroy(&mqp->mq_lock)); + mergeq_free(mqp, sizeof (mergeq_t)); +} + +int +mergeq_init(mergeq_t **outp, uint_t nthrs) +{ + int ret; + mergeq_t *mqp; + + mqp = mergeq_alloc(sizeof (mergeq_t)); + if (mqp == NULL) + return (mergeq_error(ENOMEM)); + + bzero(mqp, sizeof (mergeq_t)); + mqp->mq_items = mergeq_alloc(sizeof (void *) * MERGEQ_DEFAULT_CAP); + if (mqp->mq_items == NULL) { + mergeq_free(mqp, sizeof (mergeq_t)); + return (mergeq_error(ENOMEM)); + } + bzero(mqp->mq_items, sizeof (void *) * MERGEQ_DEFAULT_CAP); + + mqp->mq_ndthreads = nthrs - 1; + if (mqp->mq_ndthreads > 0) { + mqp->mq_thrs = mergeq_alloc(sizeof (thread_t) * + mqp->mq_ndthreads); + if (mqp->mq_thrs == NULL) { + mergeq_free(mqp->mq_items, sizeof (void *) * + MERGEQ_DEFAULT_CAP); + mergeq_free(mqp, sizeof (mergeq_t)); + return (mergeq_error(ENOMEM)); + } + } + + if ((ret = mutex_init(&mqp->mq_lock, USYNC_THREAD | LOCK_ERRORCHECK, + NULL)) != 0) { + if (mqp->mq_ndthreads > 0) { + mergeq_free(mqp->mq_thrs, + sizeof (thread_t) * mqp->mq_ndthreads); + } + mergeq_free(mqp->mq_items, sizeof (void *) * + MERGEQ_DEFAULT_CAP); + mergeq_free(mqp, sizeof (mergeq_t)); + return (mergeq_error(ret)); + } + + if ((ret = cond_init(&mqp->mq_cond, USYNC_THREAD, NULL)) != 0) { + VERIFY0(mutex_destroy(&mqp->mq_lock)); + if (mqp->mq_ndthreads > 0) { + mergeq_free(mqp->mq_thrs, + sizeof (thread_t) * mqp->mq_ndthreads); + } + mergeq_free(mqp->mq_items, sizeof (void *) * + MERGEQ_DEFAULT_CAP); + mergeq_free(mqp, sizeof (mergeq_t)); + return (mergeq_error(ret)); + } + + mqp->mq_cap = MERGEQ_DEFAULT_CAP; + *outp = mqp; + return (0); +} + +static void +mergeq_reset(mergeq_t *mqp) +{ + VERIFY(MUTEX_HELD(&mqp->mq_lock)); + VERIFY(mqp->mq_working == B_FALSE); + if (mqp->mq_cap != 0) + bzero(mqp->mq_items, sizeof (void *) * mqp->mq_cap); + mqp->mq_nitems = 0; + mqp->mq_next = 0; + mqp->mq_gnext = 0; + mqp->mq_nproc = 0; + mqp->mq_gnproc = 0; + mqp->mq_ncommit = 0; + mqp->mq_gncommit = 0; + mqp->mq_func = NULL; + mqp->mq_arg = NULL; + mqp->mq_iserror = B_FALSE; + mqp->mq_error = 0; +} + +static int +mergeq_grow(mergeq_t *mqp) +{ + size_t ncap; + void **items; + + VERIFY(MUTEX_HELD(&mqp->mq_lock)); + VERIFY(mqp->mq_working == B_FALSE); + + if (SIZE_MAX - mqp->mq_cap < MERGEQ_DEFAULT_CAP) + return (ENOSPC); + + ncap = mqp->mq_cap + MERGEQ_DEFAULT_CAP; + items = mergeq_alloc(ncap * sizeof (void *)); + if (items == NULL) + return (ENOMEM); + + bzero(items, ncap * sizeof (void *)); + bcopy(mqp->mq_items, items, mqp->mq_cap * sizeof (void *)); + mergeq_free(mqp->mq_items, sizeof (mqp->mq_cap) * sizeof (void *)); + mqp->mq_items = items; + mqp->mq_cap = ncap; + return (0); +} + +int +mergeq_add(mergeq_t *mqp, void *item) +{ + VERIFY0(mutex_lock(&mqp->mq_lock)); + if (mqp->mq_working == B_TRUE) { + VERIFY0(mutex_unlock(&mqp->mq_lock)); + return (mergeq_error(ENXIO)); + } + + if (mqp->mq_next == mqp->mq_cap) { + int ret; + + if ((ret = mergeq_grow(mqp)) != 0) { + VERIFY0(mutex_unlock(&mqp->mq_lock)); + return (mergeq_error(ret)); + } + } + mqp->mq_items[mqp->mq_next] = item; + mqp->mq_next++; + mqp->mq_nitems++; + + VERIFY0(mutex_unlock(&mqp->mq_lock)); + return (0); +} + +static size_t +mergeq_slot(mergeq_t *mqp) +{ + size_t s; + + VERIFY(MUTEX_HELD(&mqp->mq_lock)); + VERIFY(mqp->mq_next < mqp->mq_cap); + + /* + * This probably should be a cv / wait thing. + */ + VERIFY(mqp->mq_nproc != (mqp->mq_next + 1) % mqp->mq_cap); + + s = mqp->mq_next; + mqp->mq_next++; + if (mqp->mq_next == mqp->mq_cap) { + mqp->mq_next %= mqp->mq_cap; + mqp->mq_gnext++; + } + + return (s); +} + +/* + * Internal function to push items onto the queue which is now a circular + * buffer. This should only be used once we begin working on the queue. + */ +static void +mergeq_push(mergeq_t *mqp, size_t slot, void *item) +{ + VERIFY(MUTEX_HELD(&mqp->mq_lock)); + VERIFY(slot < mqp->mq_cap); + + /* + * We need to verify that we don't push over something that exists. + * Based on the design, this should never happen. However, in the face + * of bugs, anything is possible. + */ + while (mqp->mq_ncommit != slot && mqp->mq_iserror == B_FALSE) + (void) cond_wait(&mqp->mq_cond, &mqp->mq_lock); + + if (mqp->mq_iserror == B_TRUE) + return; + + mqp->mq_items[slot] = item; + mqp->mq_nitems++; + mqp->mq_ncommit++; + if (mqp->mq_ncommit == mqp->mq_cap) { + mqp->mq_ncommit %= mqp->mq_cap; + mqp->mq_gncommit++; + } + cond_broadcast(&mqp->mq_cond); +} + +static void * +mergeq_pop_one(mergeq_t *mqp) +{ + void *out; + + /* + * We can't move mq_nproc beyond mq_next if they're on the same + * generation. + */ + VERIFY(mqp->mq_gnext != mqp->mq_gnproc || + mqp->mq_nproc != mqp->mq_next); + + out = mqp->mq_items[mqp->mq_nproc]; + + mqp->mq_items[mqp->mq_nproc] = NULL; + mqp->mq_nproc++; + if (mqp->mq_nproc == mqp->mq_cap) { + mqp->mq_nproc %= mqp->mq_cap; + mqp->mq_gnproc++; + } + mqp->mq_nitems--; + + return (out); +} + +/* + * Pop a set of two entries from the queue. We may not have anything to process + * at the moment, eg. be waiting for someone to add something. In which case, + * we'll be sitting and waiting. + */ +static boolean_t +mergeq_pop(mergeq_t *mqp, void **first, void **second) +{ + VERIFY(MUTEX_HELD(&mqp->mq_lock)); + VERIFY(mqp->mq_nproc < mqp->mq_cap); + + while (mqp->mq_nitems < 2 && mqp->mq_nactthrs > 0 && + mqp->mq_iserror == B_FALSE) + (void) cond_wait(&mqp->mq_cond, &mqp->mq_lock); + + if (mqp->mq_iserror == B_TRUE) + return (B_FALSE); + + if (mqp->mq_nitems < 2 && mqp->mq_nactthrs == 0) { + VERIFY(mqp->mq_iserror == B_TRUE || mqp->mq_nitems == 1); + return (B_FALSE); + } + VERIFY(mqp->mq_nitems >= 2); + + *first = mergeq_pop_one(mqp); + *second = mergeq_pop_one(mqp); + + return (B_TRUE); +} + +static void * +mergeq_thr_merge(void *arg) +{ + mergeq_t *mqp = arg; + + VERIFY0(mutex_lock(&mqp->mq_lock)); + + /* + * Check to make sure creation worked and if not, fail fast. + */ + if (mqp->mq_iserror == B_TRUE) { + VERIFY0(mutex_unlock(&mqp->mq_lock)); + return (NULL); + } + + for (;;) { + void *first, *second, *out; + int ret; + size_t slot; + + if (mqp->mq_nitems == 1 && mqp->mq_nactthrs == 0) { + VERIFY0(mutex_unlock(&mqp->mq_lock)); + return (NULL); + } + + if (mergeq_pop(mqp, &first, &second) == B_FALSE) { + VERIFY0(mutex_unlock(&mqp->mq_lock)); + return (NULL); + } + slot = mergeq_slot(mqp); + + mqp->mq_nactthrs++; + + VERIFY0(mutex_unlock(&mqp->mq_lock)); + ret = mqp->mq_func(first, second, &out, mqp->mq_arg); + VERIFY0(mutex_lock(&mqp->mq_lock)); + + if (ret != 0) { + if (mqp->mq_iserror == B_FALSE) { + mqp->mq_iserror = B_TRUE; + mqp->mq_error = ret; + cond_broadcast(&mqp->mq_cond); + } + mqp->mq_nactthrs--; + VERIFY0(mutex_unlock(&mqp->mq_lock)); + return (NULL); + } + mergeq_push(mqp, slot, out); + mqp->mq_nactthrs--; + } +} + +int +mergeq_merge(mergeq_t *mqp, mergeq_proc_f *func, void *arg, void **outp, + int *errp) +{ + int ret, i; + boolean_t seterr = B_FALSE; + + if (mqp == NULL || func == NULL || outp == NULL) { + return (mergeq_error(EINVAL)); + } + + VERIFY0(mutex_lock(&mqp->mq_lock)); + if (mqp->mq_working == B_TRUE) { + VERIFY0(mutex_unlock(&mqp->mq_lock)); + return (mergeq_error(EBUSY)); + } + + if (mqp->mq_nitems == 0) { + *outp = NULL; + mergeq_reset(mqp); + VERIFY0(mutex_unlock(&mqp->mq_lock)); + return (0); + } + + /* + * Now that we've finished adding items to the queue, turn it into a + * circular buffer. + */ + mqp->mq_func = func; + mqp->mq_arg = arg; + mqp->mq_nproc = 0; + mqp->mq_working = B_TRUE; + if (mqp->mq_next == mqp->mq_cap) { + mqp->mq_next %= mqp->mq_cap; + mqp->mq_gnext++; + } + mqp->mq_ncommit = mqp->mq_next; + + ret = 0; + for (i = 0; i < mqp->mq_ndthreads; i++) { + ret = thr_create(NULL, 0, mergeq_thr_merge, mqp, 0, + &mqp->mq_thrs[i]); + if (ret != 0) { + mqp->mq_iserror = B_TRUE; + break; + } + } + + VERIFY0(mutex_unlock(&mqp->mq_lock)); + if (ret == 0) + (void) mergeq_thr_merge(mqp); + + for (i = 0; i < mqp->mq_ndthreads; i++) { + VERIFY0(thr_join(mqp->mq_thrs[i], NULL, NULL)); + } + + VERIFY0(mutex_lock(&mqp->mq_lock)); + + VERIFY(mqp->mq_nactthrs == 0); + mqp->mq_working = B_FALSE; + if (ret == 0 && mqp->mq_iserror == B_FALSE) { + VERIFY(mqp->mq_nitems == 1); + *outp = mergeq_pop_one(mqp); + } else if (ret == 0 && mqp->mq_iserror == B_TRUE) { + ret = MERGEQ_UERROR; + if (errp != NULL) + *errp = mqp->mq_error; + } else { + seterr = B_TRUE; + } + + mergeq_reset(mqp); + VERIFY0(mutex_unlock(&mqp->mq_lock)); + + if (seterr == B_TRUE) + return (mergeq_error(ret)); + + return (ret); +} diff --git a/usr/src/lib/mergeq/mergeq.h b/usr/src/lib/mergeq/mergeq.h new file mode 100644 index 0000000000..4c1a21d696 --- /dev/null +++ b/usr/src/lib/mergeq/mergeq.h @@ -0,0 +1,52 @@ +/* + * This file and its contents are supplied under the terms of the + * Common Development and Distribution License ("CDDL"), version 1.0. + * You may only use this file in accordance with the terms of version + * 1.0 of the CDDL. + * + * A full copy of the text of the CDDL should have accompanied this + * source. A copy of the CDDL is also available via the Internet at + * http://www.illumos.org/license/CDDL. + */ + +/* + * Copyright 2015 Joyent, Inc. + */ + +#ifndef _MERGEQ_H +#define _MERGEQ_H + +/* + * mergeq library routines + */ + +#include <sys/types.h> +#include <stdint.h> + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct mergeq mergeq_t; +typedef int (mergeq_proc_f)(void *, void *, void **, void *); + +extern int mergeq_init(mergeq_t **, uint_t); +extern void mergeq_fini(mergeq_t *); + +extern int mergeq_add(mergeq_t *, void *); + +#define MERGEQ_ERROR -1 +#define MERGEQ_UERROR -2 +extern int mergeq_merge(mergeq_t *, mergeq_proc_f *, void *, void **, int *); + +/* + * Routines consumers need to implement + */ +extern void *mergeq_alloc(size_t); +extern void mergeq_free(void *, size_t); + +#ifdef __cplusplus +} +#endif + +#endif /* _MERGEQ_H */ diff --git a/usr/src/lib/mergeq/workq.c b/usr/src/lib/mergeq/workq.c new file mode 100644 index 0000000000..b9f1f2aa1c --- /dev/null +++ b/usr/src/lib/mergeq/workq.c @@ -0,0 +1,311 @@ +/* + * This file and its contents are supplied under the terms of the + * Common Development and Distribution License ("CDDL"), version 1.0. + * You may only use this file in accordance with the terms of version + * 1.0 of the CDDL. + * + * A full copy of the text of the CDDL should have accompanied this + * source. A copy of the CDDL is also available via the Internet at + * http://www.illumos.org/license/CDDL. + */ + +/* + * Copyright 2015 Joyent, Inc. + */ + +/* + * Work queue + * + * A multi-threaded work queue. + * + * The general design of this is to add a fixed number of items to the queue and + * then drain them with the specified number of threads. + */ + +#include <strings.h> +#include <sys/debug.h> +#include <thread.h> +#include <synch.h> +#include <errno.h> +#include <limits.h> +#include <stdlib.h> + +#include "workq.h" + +struct workq { + mutex_t wq_lock; /* Protects below items */ + cond_t wq_cond; /* Condition variable */ + void **wq_items; /* Array of items to process */ + size_t wq_nitems; /* Number of items in queue */ + size_t wq_cap; /* Queue capacity */ + size_t wq_next; /* Next item to process */ + uint_t wq_ndthreads; /* Desired number of threads */ + thread_t *wq_thrs; /* Actual threads */ + workq_proc_f *wq_func; /* Processing function */ + void *wq_arg; /* Argument for processing */ + boolean_t wq_working; /* Are we actively using it? */ + boolean_t wq_iserror; /* Have we encountered an error? */ + int wq_error; /* Error value, if any */ +}; + +#define WORKQ_DEFAULT_CAP 64 + +static int +workq_error(int err) +{ + VERIFY(err != 0); + errno = err; + return (WORKQ_ERROR); +} + +void +workq_fini(workq_t *wqp) +{ + if (wqp == NULL) + return; + + VERIFY(wqp->wq_working != B_TRUE); + VERIFY0(mutex_destroy(&wqp->wq_lock)); + VERIFY0(cond_destroy(&wqp->wq_cond)); + if (wqp->wq_cap > 0) + workq_free(wqp->wq_items, sizeof (void *) * wqp->wq_cap); + if (wqp->wq_ndthreads > 0) + workq_free(wqp->wq_thrs, sizeof (thread_t) * wqp->wq_ndthreads); + workq_free(wqp, sizeof (workq_t)); +} + +int +workq_init(workq_t **outp, uint_t nthrs) +{ + int ret; + workq_t *wqp; + + wqp = workq_alloc(sizeof (workq_t)); + if (wqp == NULL) + return (workq_error(ENOMEM)); + + bzero(wqp, sizeof (workq_t)); + wqp->wq_items = workq_alloc(sizeof (void *) * WORKQ_DEFAULT_CAP); + if (wqp->wq_items == NULL) { + workq_free(wqp, sizeof (workq_t)); + return (workq_error(ENOMEM)); + } + bzero(wqp->wq_items, sizeof (void *) * WORKQ_DEFAULT_CAP); + + wqp->wq_ndthreads = nthrs - 1; + if (wqp->wq_ndthreads > 0) { + wqp->wq_thrs = workq_alloc(sizeof (thread_t) * + wqp->wq_ndthreads); + if (wqp->wq_thrs == NULL) { + workq_free(wqp->wq_items, sizeof (void *) * + WORKQ_DEFAULT_CAP); + workq_free(wqp, sizeof (workq_t)); + return (workq_error(ENOMEM)); + } + } + + if ((ret = mutex_init(&wqp->wq_lock, USYNC_THREAD | LOCK_ERRORCHECK, + NULL)) != 0) { + if (wqp->wq_ndthreads > 0) { + workq_free(wqp->wq_thrs, + sizeof (thread_t) * wqp->wq_ndthreads); + } + workq_free(wqp->wq_items, sizeof (void *) * WORKQ_DEFAULT_CAP); + workq_free(wqp, sizeof (workq_t)); + return (workq_error(ret)); + } + + if ((ret = cond_init(&wqp->wq_cond, USYNC_THREAD, NULL)) != 0) { + VERIFY0(mutex_destroy(&wqp->wq_lock)); + if (wqp->wq_ndthreads > 0) { + workq_free(wqp->wq_thrs, + sizeof (thread_t) * wqp->wq_ndthreads); + } + workq_free(wqp->wq_items, sizeof (void *) * WORKQ_DEFAULT_CAP); + workq_free(wqp, sizeof (workq_t)); + return (workq_error(ret)); + } + + wqp->wq_cap = WORKQ_DEFAULT_CAP; + *outp = wqp; + return (0); +} + +static void +workq_reset(workq_t *wqp) +{ + VERIFY(MUTEX_HELD(&wqp->wq_lock)); + VERIFY(wqp->wq_working == B_FALSE); + if (wqp->wq_cap > 0) + bzero(wqp->wq_items, sizeof (void *) * wqp->wq_cap); + wqp->wq_nitems = 0; + wqp->wq_next = 0; + wqp->wq_func = NULL; + wqp->wq_arg = NULL; + wqp->wq_iserror = B_FALSE; + wqp->wq_error = 0; +} + +static int +workq_grow(workq_t *wqp) +{ + size_t ncap; + void **items; + + VERIFY(MUTEX_HELD(&wqp->wq_lock)); + VERIFY(wqp->wq_working == B_FALSE); + + if (SIZE_MAX - wqp->wq_cap < WORKQ_DEFAULT_CAP) + return (ENOSPC); + + ncap = wqp->wq_cap + WORKQ_DEFAULT_CAP; + items = workq_alloc(ncap * sizeof (void *)); + if (items == NULL) + return (ENOMEM); + + bzero(items, ncap * sizeof (void *)); + bcopy(wqp->wq_items, items, wqp->wq_cap * sizeof (void *)); + workq_free(wqp->wq_items, sizeof (void *) * wqp->wq_cap); + wqp->wq_items = items; + wqp->wq_cap = ncap; + return (0); +} + +int +workq_add(workq_t *wqp, void *item) +{ + VERIFY0(mutex_lock(&wqp->wq_lock)); + if (wqp->wq_working == B_TRUE) { + VERIFY0(mutex_unlock(&wqp->wq_lock)); + return (workq_error(ENXIO)); + } + + if (wqp->wq_nitems == wqp->wq_cap) { + int ret; + + if ((ret = workq_grow(wqp)) != 0) { + VERIFY0(mutex_unlock(&wqp->wq_lock)); + return (workq_error(ret)); + } + } + + wqp->wq_items[wqp->wq_nitems] = item; + wqp->wq_nitems++; + + VERIFY0(mutex_unlock(&wqp->wq_lock)); + + return (0); +} + +static void * +workq_pop(workq_t *wqp) +{ + void *out; + + VERIFY(MUTEX_HELD(&wqp->wq_lock)); + VERIFY(wqp->wq_next < wqp->wq_nitems); + + out = wqp->wq_items[wqp->wq_next]; + wqp->wq_items[wqp->wq_next] = NULL; + wqp->wq_next++; + + return (out); +} + +static void * +workq_thr_work(void *arg) +{ + workq_t *wqp = arg; + + VERIFY0(mutex_lock(&wqp->wq_lock)); + VERIFY(wqp->wq_working == B_TRUE); + + for (;;) { + int ret; + void *item; + + if (wqp->wq_iserror == B_TRUE || + wqp->wq_next == wqp->wq_nitems) { + VERIFY0(mutex_unlock(&wqp->wq_lock)); + return (NULL); + } + + item = workq_pop(wqp); + + VERIFY0(mutex_unlock(&wqp->wq_lock)); + ret = wqp->wq_func(item, wqp->wq_arg); + VERIFY0(mutex_lock(&wqp->wq_lock)); + + if (ret != 0) { + if (wqp->wq_iserror == B_FALSE) { + wqp->wq_iserror = B_TRUE; + wqp->wq_error = ret; + } + VERIFY0(mutex_unlock(&wqp->wq_lock)); + return (NULL); + } + } +} + +int +workq_work(workq_t *wqp, workq_proc_f *func, void *arg, int *errp) +{ + int i, ret; + boolean_t seterr = B_FALSE; + + if (wqp == NULL || func == NULL) + return (workq_error(EINVAL)); + + VERIFY0(mutex_lock(&wqp->wq_lock)); + if (wqp->wq_working == B_TRUE) { + VERIFY0(mutex_unlock(&wqp->wq_lock)); + return (workq_error(EBUSY)); + } + + if (wqp->wq_nitems == 0) { + workq_reset(wqp); + VERIFY0(mutex_unlock(&wqp->wq_lock)); + return (0); + } + + wqp->wq_func = func; + wqp->wq_arg = arg; + wqp->wq_next = 0; + wqp->wq_working = B_TRUE; + + ret = 0; + for (i = 0; i < wqp->wq_ndthreads; i++) { + ret = thr_create(NULL, 0, workq_thr_work, wqp, 0, + &wqp->wq_thrs[i]); + if (ret != 0) { + wqp->wq_iserror = B_TRUE; + } + } + + VERIFY0(mutex_unlock(&wqp->wq_lock)); + if (ret == 0) + (void) workq_thr_work(wqp); + + for (i = 0; i < wqp->wq_ndthreads; i++) { + VERIFY0(thr_join(wqp->wq_thrs[i], NULL, NULL)); + } + + VERIFY0(mutex_lock(&wqp->wq_lock)); + wqp->wq_working = B_FALSE; + if (ret == 0 && wqp->wq_iserror == B_TRUE) { + ret = WORKQ_UERROR; + if (errp != NULL) + *errp = wqp->wq_error; + } else if (ret != 0) { + VERIFY(wqp->wq_iserror == B_FALSE); + seterr = B_TRUE; + } + + workq_reset(wqp); + VERIFY0(mutex_unlock(&wqp->wq_lock)); + + if (seterr == B_TRUE) + return (workq_error(ret)); + + return (ret); +} diff --git a/usr/src/lib/mergeq/workq.h b/usr/src/lib/mergeq/workq.h new file mode 100644 index 0000000000..20cfec4a95 --- /dev/null +++ b/usr/src/lib/mergeq/workq.h @@ -0,0 +1,52 @@ +/* + * This file and its contents are supplied under the terms of the + * Common Development and Distribution License ("CDDL"), version 1.0. + * You may only use this file in accordance with the terms of version + * 1.0 of the CDDL. + * + * A full copy of the text of the CDDL should have accompanied this + * source. A copy of the CDDL is also available via the Internet at + * http://www.illumos.org/license/CDDL. + */ + +/* + * Copyright 2015 Joyent, Inc. + */ + +#ifndef _WORKQ_H +#define _WORKQ_H + +/* + * workq library routines + */ + +#include <sys/types.h> +#include <stdint.h> + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct workq workq_t; +typedef int (workq_proc_f)(void *, void *); + +extern int workq_init(workq_t **, uint_t); +extern void workq_fini(workq_t *); + +extern int workq_add(workq_t *, void *); + +#define WORKQ_ERROR (-1) +#define WORKQ_UERROR (-2) +extern int workq_work(workq_t *, workq_proc_f *, void *, int *); + +/* + * Routines consumers need to implement + */ +extern void *workq_alloc(size_t); +extern void workq_free(void *, size_t); + +#ifdef __cplusplus +} +#endif + +#endif /* _WORKQ_H */ |