diff options
| author | Sebastien Roy <seb@delphix.com> | 2017-01-12 13:27:24 -0500 |
|---|---|---|
| committer | Prakash Surya <prakash.surya@delphix.com> | 2018-05-15 15:59:28 -0700 |
| commit | 591e0e133f9980083db5d64ac33a30bcc3382ff7 (patch) | |
| tree | 4494f1d0592050bc9ada1a04db41f07e7e301fe5 /usr/src/lib/libzfs/common/libzfs_taskq.c | |
| parent | b4bf0cf0458759c67920a031021a9d96cd683cfe (diff) | |
| download | illumos-gate-591e0e133f9980083db5d64ac33a30bcc3382ff7.tar.gz | |
8115 parallel zfs mount
Reviewed by: Matthew Ahrens <mahrens@delphix.com>
Reviewed by: Pavel Zakharov <pavel.zakharov@delphix.com>
Reviewed by: Brad Lewis <brad.lewis@delphix.com>
Reviewed by: George Wilson <george.wilson@delphix.com>
Reviewed by: Paul Dagnelie <pcd@delphix.com>
Reviewed by: Prashanth Sreenivasa <pks@delphix.com>
Approved by: Matt Ahrens <mahrens@delphix.com>
Diffstat (limited to 'usr/src/lib/libzfs/common/libzfs_taskq.c')
| -rw-r--r-- | usr/src/lib/libzfs/common/libzfs_taskq.c | 297 |
1 files changed, 297 insertions, 0 deletions
diff --git a/usr/src/lib/libzfs/common/libzfs_taskq.c b/usr/src/lib/libzfs/common/libzfs_taskq.c new file mode 100644 index 0000000000..28bf649710 --- /dev/null +++ b/usr/src/lib/libzfs/common/libzfs_taskq.c @@ -0,0 +1,297 @@ +/* + * CDDL HEADER START + * + * The contents of this file are subject to the terms of the + * Common Development and Distribution License (the "License"). + * You may not use this file except in compliance with the License. + * + * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE + * or http://www.opensolaris.org/os/licensing. + * See the License for the specific language governing permissions + * and limitations under the License. + * + * When distributing Covered Code, include this CDDL HEADER in each + * file and include the License file at usr/src/OPENSOLARIS.LICENSE. + * If applicable, add the following below this CDDL HEADER, with the + * fields enclosed by brackets "[]" replaced with your own identifying + * information: Portions Copyright [yyyy] [name of copyright owner] + * + * CDDL HEADER END + */ +/* + * Copyright 2010 Sun Microsystems, Inc. All rights reserved. + * Use is subject to license terms. + */ +/* + * Copyright 2011 Nexenta Systems, Inc. All rights reserved. + * Copyright 2012 Garrett D'Amore <garrett@damore.org>. All rights reserved. + * Copyright (c) 2014, 2018 by Delphix. All rights reserved. + */ + +#include <thread.h> +#include <synch.h> +#include <unistd.h> +#include <string.h> +#include <errno.h> +#include <sys/debug.h> +#include <sys/sysmacros.h> + +#include "libzfs_taskq.h" + +#define ZFS_TASKQ_ACTIVE 0x00010000 +#define ZFS_TASKQ_NAMELEN 31 + +typedef struct zfs_taskq_ent { + struct zfs_taskq_ent *ztqent_next; + struct zfs_taskq_ent *ztqent_prev; + ztask_func_t *ztqent_func; + void *ztqent_arg; + uintptr_t ztqent_flags; +} zfs_taskq_ent_t; + +struct zfs_taskq { + char ztq_name[ZFS_TASKQ_NAMELEN + 1]; + mutex_t ztq_lock; + rwlock_t ztq_threadlock; + cond_t ztq_dispatch_cv; + cond_t ztq_wait_cv; + thread_t *ztq_threadlist; + int ztq_flags; + int ztq_active; + int ztq_nthreads; + int ztq_nalloc; + int ztq_minalloc; + int ztq_maxalloc; + cond_t ztq_maxalloc_cv; + int ztq_maxalloc_wait; + zfs_taskq_ent_t *ztq_freelist; + zfs_taskq_ent_t ztq_task; +}; + +static zfs_taskq_ent_t * +ztask_alloc(zfs_taskq_t *ztq, int ztqflags) +{ + zfs_taskq_ent_t *t; + timestruc_t ts; + int err; + +again: if ((t = ztq->ztq_freelist) != NULL && + ztq->ztq_nalloc >= ztq->ztq_minalloc) { + ztq->ztq_freelist = t->ztqent_next; + } else { + if (ztq->ztq_nalloc >= ztq->ztq_maxalloc) { + if (!(ztqflags & UMEM_NOFAIL)) + return (NULL); + + /* + * We don't want to exceed ztq_maxalloc, but we can't + * wait for other tasks to complete (and thus free up + * task structures) without risking deadlock with + * the caller. So, we just delay for one second + * to throttle the allocation rate. If we have tasks + * complete before one second timeout expires then + * zfs_taskq_ent_free will signal us and we will + * immediately retry the allocation. + */ + ztq->ztq_maxalloc_wait++; + + ts.tv_sec = 1; + ts.tv_nsec = 0; + err = cond_reltimedwait(&ztq->ztq_maxalloc_cv, + &ztq->ztq_lock, &ts); + + ztq->ztq_maxalloc_wait--; + if (err == 0) + goto again; /* signaled */ + } + mutex_exit(&ztq->ztq_lock); + + t = umem_alloc(sizeof (zfs_taskq_ent_t), ztqflags); + + mutex_enter(&ztq->ztq_lock); + if (t != NULL) + ztq->ztq_nalloc++; + } + return (t); +} + +static void +ztask_free(zfs_taskq_t *ztq, zfs_taskq_ent_t *t) +{ + if (ztq->ztq_nalloc <= ztq->ztq_minalloc) { + t->ztqent_next = ztq->ztq_freelist; + ztq->ztq_freelist = t; + } else { + ztq->ztq_nalloc--; + mutex_exit(&ztq->ztq_lock); + umem_free(t, sizeof (zfs_taskq_ent_t)); + mutex_enter(&ztq->ztq_lock); + } + + if (ztq->ztq_maxalloc_wait) + VERIFY0(cond_signal(&ztq->ztq_maxalloc_cv)); +} + +zfs_taskqid_t +zfs_taskq_dispatch(zfs_taskq_t *ztq, ztask_func_t func, void *arg, + uint_t ztqflags) +{ + zfs_taskq_ent_t *t; + + mutex_enter(&ztq->ztq_lock); + ASSERT(ztq->ztq_flags & ZFS_TASKQ_ACTIVE); + if ((t = ztask_alloc(ztq, ztqflags)) == NULL) { + mutex_exit(&ztq->ztq_lock); + return (0); + } + if (ztqflags & ZFS_TQ_FRONT) { + t->ztqent_next = ztq->ztq_task.ztqent_next; + t->ztqent_prev = &ztq->ztq_task; + } else { + t->ztqent_next = &ztq->ztq_task; + t->ztqent_prev = ztq->ztq_task.ztqent_prev; + } + t->ztqent_next->ztqent_prev = t; + t->ztqent_prev->ztqent_next = t; + t->ztqent_func = func; + t->ztqent_arg = arg; + t->ztqent_flags = 0; + VERIFY0(cond_signal(&ztq->ztq_dispatch_cv)); + mutex_exit(&ztq->ztq_lock); + return (1); +} + +void +zfs_taskq_wait(zfs_taskq_t *ztq) +{ + mutex_enter(&ztq->ztq_lock); + while (ztq->ztq_task.ztqent_next != &ztq->ztq_task || + ztq->ztq_active != 0) { + int ret = cond_wait(&ztq->ztq_wait_cv, &ztq->ztq_lock); + VERIFY(ret == 0 || ret == EINTR); + } + mutex_exit(&ztq->ztq_lock); +} + +static void * +zfs_taskq_thread(void *arg) +{ + zfs_taskq_t *ztq = arg; + zfs_taskq_ent_t *t; + boolean_t prealloc; + + mutex_enter(&ztq->ztq_lock); + while (ztq->ztq_flags & ZFS_TASKQ_ACTIVE) { + if ((t = ztq->ztq_task.ztqent_next) == &ztq->ztq_task) { + int ret; + if (--ztq->ztq_active == 0) + VERIFY0(cond_broadcast(&ztq->ztq_wait_cv)); + ret = cond_wait(&ztq->ztq_dispatch_cv, &ztq->ztq_lock); + VERIFY(ret == 0 || ret == EINTR); + ztq->ztq_active++; + continue; + } + t->ztqent_prev->ztqent_next = t->ztqent_next; + t->ztqent_next->ztqent_prev = t->ztqent_prev; + t->ztqent_next = NULL; + t->ztqent_prev = NULL; + prealloc = t->ztqent_flags & ZFS_TQENT_FLAG_PREALLOC; + mutex_exit(&ztq->ztq_lock); + + VERIFY0(rw_rdlock(&ztq->ztq_threadlock)); + t->ztqent_func(t->ztqent_arg); + VERIFY0(rw_unlock(&ztq->ztq_threadlock)); + + mutex_enter(&ztq->ztq_lock); + if (!prealloc) + ztask_free(ztq, t); + } + ztq->ztq_nthreads--; + VERIFY0(cond_broadcast(&ztq->ztq_wait_cv)); + mutex_exit(&ztq->ztq_lock); + return (NULL); +} + +/*ARGSUSED*/ +zfs_taskq_t * +zfs_taskq_create(const char *name, int nthreads, pri_t pri, int minalloc, + int maxalloc, uint_t flags) +{ + zfs_taskq_t *ztq = umem_zalloc(sizeof (zfs_taskq_t), UMEM_NOFAIL); + int t; + + ASSERT3S(nthreads, >=, 1); + + VERIFY0(rwlock_init(&ztq->ztq_threadlock, USYNC_THREAD, NULL)); + VERIFY0(cond_init(&ztq->ztq_dispatch_cv, USYNC_THREAD, NULL)); + VERIFY0(cond_init(&ztq->ztq_wait_cv, USYNC_THREAD, NULL)); + VERIFY0(cond_init(&ztq->ztq_maxalloc_cv, USYNC_THREAD, NULL)); + VERIFY0(mutex_init( + &ztq->ztq_lock, LOCK_NORMAL | LOCK_ERRORCHECK, NULL)); + + (void) strncpy(ztq->ztq_name, name, ZFS_TASKQ_NAMELEN + 1); + + ztq->ztq_flags = flags | ZFS_TASKQ_ACTIVE; + ztq->ztq_active = nthreads; + ztq->ztq_nthreads = nthreads; + ztq->ztq_minalloc = minalloc; + ztq->ztq_maxalloc = maxalloc; + ztq->ztq_task.ztqent_next = &ztq->ztq_task; + ztq->ztq_task.ztqent_prev = &ztq->ztq_task; + ztq->ztq_threadlist = + umem_alloc(nthreads * sizeof (thread_t), UMEM_NOFAIL); + + if (flags & ZFS_TASKQ_PREPOPULATE) { + mutex_enter(&ztq->ztq_lock); + while (minalloc-- > 0) + ztask_free(ztq, ztask_alloc(ztq, UMEM_NOFAIL)); + mutex_exit(&ztq->ztq_lock); + } + + for (t = 0; t < nthreads; t++) { + (void) thr_create(0, 0, zfs_taskq_thread, + ztq, THR_BOUND, &ztq->ztq_threadlist[t]); + } + + return (ztq); +} + +void +zfs_taskq_destroy(zfs_taskq_t *ztq) +{ + int t; + int nthreads = ztq->ztq_nthreads; + + zfs_taskq_wait(ztq); + + mutex_enter(&ztq->ztq_lock); + + ztq->ztq_flags &= ~ZFS_TASKQ_ACTIVE; + VERIFY0(cond_broadcast(&ztq->ztq_dispatch_cv)); + + while (ztq->ztq_nthreads != 0) { + int ret = cond_wait(&ztq->ztq_wait_cv, &ztq->ztq_lock); + VERIFY(ret == 0 || ret == EINTR); + } + + ztq->ztq_minalloc = 0; + while (ztq->ztq_nalloc != 0) { + ASSERT(ztq->ztq_freelist != NULL); + ztask_free(ztq, ztask_alloc(ztq, UMEM_NOFAIL)); + } + + mutex_exit(&ztq->ztq_lock); + + for (t = 0; t < nthreads; t++) + (void) thr_join(ztq->ztq_threadlist[t], NULL, NULL); + + umem_free(ztq->ztq_threadlist, nthreads * sizeof (thread_t)); + + VERIFY0(rwlock_destroy(&ztq->ztq_threadlock)); + VERIFY0(cond_destroy(&ztq->ztq_dispatch_cv)); + VERIFY0(cond_destroy(&ztq->ztq_wait_cv)); + VERIFY0(cond_destroy(&ztq->ztq_maxalloc_cv)); + VERIFY0(mutex_destroy(&ztq->ztq_lock)); + + umem_free(ztq, sizeof (zfs_taskq_t)); +} |
