diff options
Diffstat (limited to 'usr/src/cmd/filebench/common/flowop.c')
-rw-r--r-- | usr/src/cmd/filebench/common/flowop.c | 725 |
1 files changed, 725 insertions, 0 deletions
diff --git a/usr/src/cmd/filebench/common/flowop.c b/usr/src/cmd/filebench/common/flowop.c new file mode 100644 index 0000000000..99602c446c --- /dev/null +++ b/usr/src/cmd/filebench/common/flowop.c @@ -0,0 +1,725 @@ +/* + * CDDL HEADER START + * + * The contents of this file are subject to the terms of the + * Common Development and Distribution License (the "License"). + * You may not use this file except in compliance with the License. + * + * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE + * or http://www.opensolaris.org/os/licensing. + * See the License for the specific language governing permissions + * and limitations under the License. + * + * When distributing Covered Code, include this CDDL HEADER in each + * file and include the License file at usr/src/OPENSOLARIS.LICENSE. + * If applicable, add the following below this CDDL HEADER, with the + * fields enclosed by brackets "[]" replaced with your own identifying + * information: Portions Copyright [yyyy] [name of copyright owner] + * + * CDDL HEADER END + */ +/* + * Copyright 2007 Sun Microsystems, Inc. All rights reserved. + * Use is subject to license terms. + */ + +#pragma ident "%Z%%M% %I% %E% SMI" + +#include "config.h" + +#ifdef HAVE_LWPS +#include <sys/lwp.h> +#endif +#include <fcntl.h> +#include "filebench.h" +#include "flowop.h" +#include "stats.h" + +#ifdef LINUX_PORT +#include <sys/types.h> +#include <linux/unistd.h> +#endif + +static flowop_t *flowop_define_common(threadflow_t *threadflow, char *name, + flowop_t *inherit, int instance, int type); + + +/* + * A collection of flowop support functions. The actual code that + * implements the various flowops is in flowop_library.c. + * + * Routines for defining, creating, initializing and destroying + * flowops, cyclically invoking the flowops on each threadflow's flowop + * list, collecting statistics about flowop execution, and other + * housekeeping duties are included in this file. + */ + + +/* + * Prints the name and instance number of each flowop in + * the supplied list to the filebench log. + */ +int +flowop_printlist(flowop_t *list) +{ + flowop_t *flowop = list; + + while (flowop) { + filebench_log(LOG_DEBUG_IMPL, "flowop-list %s-%d", + flowop->fo_name, flowop->fo_instance); + flowop = flowop->fo_threadnext; + } + return (0); +} + +#define TIMESPEC_TO_HRTIME(s, e) (((e.tv_sec - s.tv_sec) * 1000000000LL) + \ + (e.tv_nsec - s.tv_nsec)) +/* + * Puts current high resolution time in start time entry + * for threadflow and may also calculate running filebench + * overhead statistics. + */ +void +flowop_beginop(threadflow_t *threadflow, flowop_t *flowop) +{ +#ifdef HAVE_PROCFS + if ((noproc == 0) && (threadflow->tf_lwpusagefd == 0)) { + char procname[128]; + + (void) snprintf(procname, sizeof (procname), + "/proc/%d/lwp/%d/lwpusage", pid, _lwp_self()); + threadflow->tf_lwpusagefd = open(procname, O_RDONLY); + } + + (void) pread(threadflow->tf_lwpusagefd, + &threadflow->tf_susage, + sizeof (struct prusage), 0); + + /* Compute overhead time in this thread around op */ + if (threadflow->tf_eusage.pr_stime.tv_nsec) { + flowop->fo_stats.fs_mstate[FLOW_MSTATE_OHEAD] += + TIMESPEC_TO_HRTIME(threadflow->tf_eusage.pr_utime, + threadflow->tf_susage.pr_utime) + + TIMESPEC_TO_HRTIME(threadflow->tf_eusage.pr_ttime, + threadflow->tf_susage.pr_ttime) + + TIMESPEC_TO_HRTIME(threadflow->tf_eusage.pr_stime, + threadflow->tf_susage.pr_stime); + } +#endif + /* Start of op for this thread */ + threadflow->tf_stime = gethrtime(); +} + +flowstat_t controlstats; +static int controlstats_zeroed = 0; + +/* + * Updates flowop's latency statistics, using saved start + * time and current high resolution time. Updates flowop's + * io count and transferred bytes statistics. Also updates + * threadflow's and flowop's cumulative read or write byte + * and io count statistics. + */ +void +flowop_endop(threadflow_t *threadflow, flowop_t *flowop) +{ + hrtime_t t; + + flowop->fo_stats.fs_mstate[FLOW_MSTATE_LAT] += + (gethrtime() - threadflow->tf_stime); +#ifdef HAVE_PROCFS + if ((pread(threadflow->tf_lwpusagefd, &threadflow->tf_eusage, + sizeof (struct prusage), 0)) != sizeof (struct prusage)) + filebench_log(LOG_ERROR, "cannot read /proc"); + + t = + TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_utime, + threadflow->tf_eusage.pr_utime) + + TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_ttime, + threadflow->tf_eusage.pr_ttime) + + TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_stime, + threadflow->tf_eusage.pr_stime); + flowop->fo_stats.fs_mstate[FLOW_MSTATE_CPU] += t; + + flowop->fo_stats.fs_mstate[FLOW_MSTATE_WAIT] += + TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_tftime, + threadflow->tf_eusage.pr_tftime) + + TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_dftime, + threadflow->tf_eusage.pr_dftime) + + TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_kftime, + threadflow->tf_eusage.pr_kftime) + + TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_kftime, + threadflow->tf_eusage.pr_kftime) + + TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_slptime, + threadflow->tf_eusage.pr_slptime); +#endif + + flowop->fo_stats.fs_count++; + flowop->fo_stats.fs_bytes += *flowop->fo_iosize; + if ((flowop->fo_type & FLOW_TYPE_IO) || + (flowop->fo_type & FLOW_TYPE_AIO)) { + controlstats.fs_count++; + controlstats.fs_bytes += *flowop->fo_iosize; + } + if (flowop->fo_attrs & FLOW_ATTR_READ) { + threadflow->tf_stats.fs_rbytes += *flowop->fo_iosize; + threadflow->tf_stats.fs_rcount++; + flowop->fo_stats.fs_rcount++; + controlstats.fs_rbytes += *flowop->fo_iosize; + controlstats.fs_rcount++; + } else if (flowop->fo_attrs & FLOW_ATTR_WRITE) { + threadflow->tf_stats.fs_wbytes += *flowop->fo_iosize; + threadflow->tf_stats.fs_wcount++; + flowop->fo_stats.fs_wcount++; + controlstats.fs_wbytes += *flowop->fo_iosize; + controlstats.fs_wcount++; + } +} + +/* + * Calls the flowop's initialization function, pointed to by + * flowop->fo_init. + */ +static int +flowop_initflow(flowop_t *flowop) +{ + if ((*flowop->fo_init)(flowop) < 0) { + filebench_log(LOG_ERROR, "flowop %s-%d init failed", + flowop->fo_name, flowop->fo_instance); + return (-1); + } + return (0); +} + +/* + * The final initialization and main execution loop for the + * worker threads. Sets threadflow and flowop start times, + * waits for all process to start, then creates the runtime + * flowops from those defined by the F language workload + * script. It does some more initialization, then enters a + * loop to repeatedly execute the flowops on the flowop list + * until an abort condition is detected, at which time it exits. + * This is the starting routine for the new worker thread + * created by threadflow_createthread(), and is not currently + * called from anywhere else. + */ +void +flowop_start(threadflow_t *threadflow) +{ + flowop_t *flowop; + size_t memsize; + int ret = 0; + + pid = getpid(); + +#ifdef HAVE_PROCFS + if (noproc == 0) { + char procname[128]; + long ctl[2] = {PCSET, PR_MSACCT}; + int pfd; + + (void) snprintf(procname, sizeof (procname), + "/proc/%d/lwp/%d/lwpctl", pid, _lwp_self()); + pfd = open(procname, O_WRONLY); + (void) pwrite(pfd, &ctl, sizeof (ctl), 0); + (void) close(pfd); + } +#endif + + if (!controlstats_zeroed) { + (void) memset(&controlstats, 0, sizeof (controlstats)); + controlstats_zeroed = 1; + } + + flowop = threadflow->tf_ops; + threadflow->tf_stats.fs_stime = gethrtime(); + flowop->fo_stats.fs_stime = gethrtime(); + + /* Hold the flowop find lock as reader to prevent lookups */ + (void) pthread_rwlock_rdlock(&filebench_shm->flowop_find_lock); + + /* + * Block until all processes have started, acting like + * a barrier. The original filebench process initially + * holds the run_lock as a reader, preventing any of the + * threads from obtaining the writer lock, and hence + * passing this point. Once all processes and threads + * have been created, the original process unlocks + * run_lock, allowing each waiting thread to lock + * and then immediately unlock it, then begin running. + */ + (void) pthread_rwlock_wrlock(&filebench_shm->run_lock); + (void) pthread_rwlock_unlock(&filebench_shm->run_lock); + + /* Create the runtime flowops from those defined by the script */ + (void) ipc_mutex_lock(&filebench_shm->flowop_lock); + while (flowop) { + flowop_t *newflowop; + + if (flowop == threadflow->tf_ops) + threadflow->tf_ops = NULL; + newflowop = flowop_define_common(threadflow, flowop->fo_name, + flowop, 1, 0); + if (newflowop == NULL) + return; + if (flowop_initflow(newflowop) < 0) { + filebench_log(LOG_ERROR, "Flowop init of %s failed", + newflowop->fo_name); + } + flowop = flowop->fo_threadnext; + } + (void) ipc_mutex_unlock(&filebench_shm->flowop_lock); + + /* Release the find lock as reader to allow lookups */ + (void) pthread_rwlock_unlock(&filebench_shm->flowop_find_lock); + + /* Set to the start of the new flowop list */ + flowop = threadflow->tf_ops; + + threadflow->tf_abort = 0; + threadflow->tf_running = 1; + + /* If we are going to use ISM, allocate later */ + if (threadflow->tf_attrs & THREADFLOW_USEISM) { + threadflow->tf_mem = + ipc_ismmalloc((size_t)*threadflow->tf_memsize); + } else { + threadflow->tf_mem = malloc((size_t)*threadflow->tf_memsize); + } + + memsize = *threadflow->tf_memsize; + (void) memset(threadflow->tf_mem, 0, memsize); + filebench_log(LOG_DEBUG_SCRIPT, "Thread allocated %d bytes", memsize); + +#ifdef HAVE_LWPS + filebench_log(LOG_DEBUG_SCRIPT, "Thread %zx (%d) started", + threadflow, + _lwp_self()); +#endif + + /* Main filebench worker loop */ + /* CONSTCOND */ + while (1) { + int i; + + /* Abort if asked */ + if (threadflow->tf_abort || filebench_shm->f_abort) { + (void) ipc_mutex_lock(&threadflow->tf_lock); + threadflow->tf_running = 0; + (void) ipc_mutex_unlock(&threadflow->tf_lock); + break; + } + + /* Be quiet while stats are gathered */ + if (filebench_shm->bequiet) { + (void) sleep(1); + continue; + } + + /* Take it easy until everyone is ready to go */ + if (!filebench_shm->allrunning) + (void) sleep(1); + + if (flowop->fo_stats.fs_stime == 0) + flowop->fo_stats.fs_stime = gethrtime(); + + if (flowop == NULL) { + filebench_log(LOG_ERROR, "flowop_read null flowop"); + return; + } + + if (threadflow->tf_memsize == 0) { + filebench_log(LOG_ERROR, + "Zero memory size for thread %s", + threadflow->tf_name); + return; + } + + filebench_log(LOG_DEBUG_SCRIPT, "%s: executing flowop %s-%d", + threadflow->tf_name, flowop->fo_name, flowop->fo_instance); + + /* Execute the flowop for fo_iters times */ + for (i = 0; i < *flowop->fo_iters; i++) { + filebench_log(LOG_DEBUG_SCRIPT, "%s: executing flowop " + "%s-%d", threadflow->tf_name, flowop->fo_name, + flowop->fo_instance); + ret = (*flowop->fo_func)(threadflow, flowop); + filebench_log(LOG_DEBUG_SCRIPT, "%s: executing flowop " + "%s-%d", threadflow->tf_name, flowop->fo_name, + flowop->fo_instance); + + /* Return value > 0 means "stop the filebench run" */ + if (ret > 0) { + filebench_log(LOG_VERBOSE, + "%s: exiting flowop %s-%d", + threadflow->tf_name, flowop->fo_name, + flowop->fo_instance); + (void) ipc_mutex_lock(&threadflow->tf_lock); + threadflow->tf_abort = 1; + filebench_shm->f_abort = 1; + threadflow->tf_running = 0; + (void) ipc_mutex_unlock(&threadflow->tf_lock); + break; + } + /* + * Return value < 0 means "flowop failed, stop the + * filebench run" + */ + if (ret < 0) { + filebench_log(LOG_ERROR, "flowop %s failed", + flowop->fo_name); + (void) ipc_mutex_lock(&threadflow->tf_lock); + threadflow->tf_abort = 1; + filebench_shm->f_abort = 1; + threadflow->tf_running = 0; + (void) ipc_mutex_unlock(&threadflow->tf_lock); + break; + } + } + + /* advance to next flowop */ + flowop = flowop->fo_threadnext; + + /* but if at end of list, start over from the beginning */ + if (flowop == NULL) { + flowop = threadflow->tf_ops; + threadflow->tf_stats.fs_count++; + } + } + +#ifdef HAVE_LWPS + filebench_log(LOG_DEBUG_SCRIPT, "Thread %d exiting", + _lwp_self()); +#endif + + pthread_exit(&ret); +} + +void +flowop_init(void) +{ + flowoplib_init(); +} + +/* + * Calls the flowop's destruct function, pointed to by + * flowop->fo_destruct. + */ +static void +flowop_destructflow(flowop_t *flowop) +{ + (*flowop->fo_destruct)(flowop); +} + +/* + * Delete the designated flowop from the thread's flowop list. + * After removal from the list, the flowop is destroyed with + * flowop_destructflow(). + */ +static void +flowop_delete(flowop_t **flowoplist, flowop_t *flowop) +{ + flowop_t *entry = *flowoplist; + int found = 0; + + filebench_log(LOG_DEBUG_IMPL, "Deleting flowop (%s-%d)", + flowop->fo_name, + flowop->fo_instance); + + /* Delete from thread's flowop list */ + if (flowop == *flowoplist) { + /* First on list */ + *flowoplist = flowop->fo_threadnext; + filebench_log(LOG_DEBUG_IMPL, + "Delete0 flowop: (%s-%d)", + flowop->fo_name, + flowop->fo_instance); + } else { + while (entry->fo_threadnext) { + filebench_log(LOG_DEBUG_IMPL, + "Delete0 flowop: (%s-%d) == (%s-%d)", + entry->fo_threadnext->fo_name, + entry->fo_threadnext->fo_instance, + flowop->fo_name, + flowop->fo_instance); + + if (flowop == entry->fo_threadnext) { + /* Delete */ + filebench_log(LOG_DEBUG_IMPL, + "Deleted0 flowop: (%s-%d)", + entry->fo_threadnext->fo_name, + entry->fo_threadnext->fo_instance); + entry->fo_threadnext = + entry->fo_threadnext->fo_threadnext; + break; + } + entry = entry->fo_threadnext; + } + } + + /* Call destructor */ + flowop_destructflow(flowop); + +#ifdef HAVE_PROCFS + /* Close /proc stats */ + if (flowop->fo_thread) + (void) close(flowop->fo_thread->tf_lwpusagefd); +#endif + + /* Delete from global list */ + entry = filebench_shm->flowoplist; + + if (flowop == filebench_shm->flowoplist) { + /* First on list */ + filebench_shm->flowoplist = flowop->fo_next; + found = 1; + } else { + while (entry->fo_next) { + filebench_log(LOG_DEBUG_IMPL, + "Delete flowop: (%s-%d) == (%s-%d)", + entry->fo_next->fo_name, + entry->fo_next->fo_instance, + flowop->fo_name, + flowop->fo_instance); + + if (flowop == entry->fo_next) { + /* Delete */ + entry->fo_next = entry->fo_next->fo_next; + found = 1; + break; + } + + entry = entry->fo_next; + } + } + if (found) { + filebench_log(LOG_DEBUG_IMPL, + "Deleted flowop: (%s-%d)", + flowop->fo_name, + flowop->fo_instance); + ipc_free(FILEBENCH_FLOWOP, (char *)flowop); + } else { + filebench_log(LOG_DEBUG_IMPL, "Flowop %s-%d not found!", + flowop->fo_name, + flowop->fo_instance); + } +} + +/* + * Deletes all the flowops from a flowop list. + */ +void +flowop_delete_all(flowop_t **flowoplist) +{ + flowop_t *flowop = *flowoplist; + + filebench_log(LOG_DEBUG_IMPL, "Deleting all flowops..."); + while (flowop) { + filebench_log(LOG_DEBUG_IMPL, "Deleting all flowops (%s-%d)", + flowop->fo_name, flowop->fo_instance); + flowop = flowop->fo_threadnext; + } + + flowop = *flowoplist; + + (void) ipc_mutex_lock(&filebench_shm->flowop_lock); + + while (flowop) { + if (flowop->fo_instance && + (flowop->fo_instance == FLOW_MASTER)) { + flowop = flowop->fo_threadnext; + continue; + } + flowop_delete(flowoplist, flowop); + flowop = flowop->fo_threadnext; + } + + (void) ipc_mutex_unlock(&filebench_shm->flowop_lock); +} + +/* + * Allocates a flowop entity and initializes it with inherited + * contents from the "inherit" flowop, if it is supplied, or + * with zeros otherwise. In either case the file descriptor + * (fo_fd) is set to -1, and the fo_next and fo_threadnext + * pointers are set to NULL, and fo_thread is set to point to + * the owning threadflow. The initialized flowop is placed at + * the head of the global flowop list, and also placed on the + * tail of thethreadflow's tf_ops list. The routine locks the + * flowop's fo_lock and leaves it held on return. If successful, + * it returns a pointer to the allocated and initialized flowop, + * otherwise NULL. + * + * filebench_shm->flowop_lock must be held by caller. + */ +static flowop_t * +flowop_define_common(threadflow_t *threadflow, char *name, flowop_t *inherit, + int instance, int type) +{ + flowop_t *flowop; + + if (name == NULL) + return (NULL); + + if ((flowop = (flowop_t *)ipc_malloc(FILEBENCH_FLOWOP)) == NULL) { + filebench_log(LOG_ERROR, + "flowop_define: Can't malloc flowop"); + return (NULL); + } + + filebench_log(LOG_DEBUG_IMPL, "defining flowops %s-%d, addr %zx", + name, instance, flowop); + + if (flowop == NULL) + return (NULL); + + if (inherit) { + (void) memcpy(flowop, inherit, sizeof (flowop_t)); + (void) pthread_mutex_init(&flowop->fo_lock, ipc_mutexattr()); + (void) ipc_mutex_lock(&flowop->fo_lock); + flowop->fo_next = NULL; + flowop->fo_threadnext = NULL; + flowop->fo_fd = -1; + filebench_log(LOG_DEBUG_IMPL, + "flowop %s-%d calling init", name, instance); + } else { + (void) memset(flowop, 0, sizeof (flowop_t)); + flowop->fo_fd = -1; + flowop->fo_iters = integer_alloc(1); + flowop->fo_type = type; + (void) pthread_mutex_init(&flowop->fo_lock, ipc_mutexattr()); + (void) ipc_mutex_lock(&flowop->fo_lock); + } + + /* Create backpointer to thread */ + flowop->fo_thread = threadflow; + + /* Add flowop to global list */ + if (filebench_shm->flowoplist == NULL) { + filebench_shm->flowoplist = flowop; + flowop->fo_next = NULL; + } else { + flowop->fo_next = filebench_shm->flowoplist; + filebench_shm->flowoplist = flowop; + } + + (void) strcpy(flowop->fo_name, name); + flowop->fo_instance = instance; + + if (threadflow == NULL) + return (flowop); + + /* Add flowop to thread op list */ + if (threadflow->tf_ops == NULL) { + threadflow->tf_ops = flowop; + flowop->fo_threadnext = NULL; + } else { + flowop_t *flowend; + + /* Find the end of the thread list */ + flowend = threadflow->tf_ops; + while (flowend->fo_threadnext != NULL) + flowend = flowend->fo_threadnext; + flowend->fo_threadnext = flowop; + flowop->fo_threadnext = NULL; + } + + return (flowop); +} + +/* + * Calls flowop_define_common() to allocate and initialize a + * flowop, and holds the shared flowop_lock during the call. + * It releases the created flowop's fo_lock when done. + */ +flowop_t * +flowop_define(threadflow_t *threadflow, char *name, flowop_t *inherit, + int instance, int type) +{ + flowop_t *flowop; + + (void) ipc_mutex_lock(&filebench_shm->flowop_lock); + flowop = flowop_define_common(threadflow, name, + inherit, instance, type); + (void) ipc_mutex_unlock(&filebench_shm->flowop_lock); + + if (flowop == NULL) + return (NULL); + + (void) ipc_mutex_unlock(&flowop->fo_lock); + + return (flowop); +} + +/* + * Attempts to take a write lock on the flowop_find_lock that is + * defined in interprocess shared memory. Since each call to + * flowop_start() holds a read lock on flowop_find_lock, this + * routine effectively blocks until all instances of + * flowop_start() have finished. The flowop_find() routine calls + * this routine so that flowops won't be searched for until all + * flowops have been created by flowop_start. + */ +static void +flowop_find_barrier(void) +{ + /* Block on wrlock to ensure find waits for all creates */ + (void) pthread_rwlock_wrlock(&filebench_shm->flowop_find_lock); + (void) pthread_rwlock_unlock(&filebench_shm->flowop_find_lock); +} + +/* + * Returns a list of flowops named "name" from the master + * flowop list. + */ +flowop_t * +flowop_find(char *name) +{ + flowop_t *flowop = filebench_shm->flowoplist; + flowop_t *result = NULL; + + flowop_find_barrier(); + + (void) ipc_mutex_lock(&filebench_shm->flowop_lock); + + while (flowop) { + if (strcmp(name, flowop->fo_name) == 0) { + + /* Add flowop to result list */ + if (result == NULL) { + result = flowop; + flowop->fo_resultnext = NULL; + } else { + flowop->fo_resultnext = result; + result = flowop; + } + } + flowop = flowop->fo_next; + } + + (void) ipc_mutex_unlock(&filebench_shm->flowop_lock); + + + return (result); +} + +/* + * Returns a pointer to the specified instance of flowop + * "name" from the list returned by flowop_find(). + */ +flowop_t * +flowop_find_one(char *name, int instance) +{ + flowop_t *result; + + result = flowop_find(name); + + while (result) { + if ((strcmp(name, result->fo_name) == 0) && + (instance == result->fo_instance)) + break; + result = result->fo_next; + } + + return (result); +} |