/* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License (the "License"). * You may not use this file except in compliance with the License. * * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE * or http://www.opensolaris.org/os/licensing. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at usr/src/OPENSOLARIS.LICENSE. * If applicable, add the following below this CDDL HEADER, with the * fields enclosed by brackets "[]" replaced with your own identifying * information: Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END */ /* * Copyright 2007 Sun Microsystems, Inc. All rights reserved. * Use is subject to license terms. */ #pragma ident "%Z%%M% %I% %E% SMI" #include "config.h" #include #ifdef HAVE_LWPS #include #endif #include #include "threadflow.h" #include "filebench.h" #include "flowop.h" #include "ipc.h" static threadflow_t *threadflow_define_common(procflow_t *procflow, char *name, threadflow_t *inherit, int instance); /* * Threadflows are filebench entities which manage operating system * threads. Each worker threadflow spawns a separate filebench thread, * with attributes inherited from a FLOW_MASTER threadflow created during * f model language parsing. This section contains routines to define, * create, control, and delete threadflows. * * Each thread defined in the f model creates a FLOW_MASTER * threadflow which encapsulates the defined attributes and flowops of * the f language thread, including the number of instances to create. * At runtime, a worker threadflow instance with an associated filebench * thread is created, which runs until told to quit or is specifically * deleted. */ /* * Prints information about threadflow syntax. */ void threadflow_usage(void) { (void) fprintf(stderr, " thread name=[,instances=]\n"); (void) fprintf(stderr, "\n"); (void) fprintf(stderr, " {\n"); (void) fprintf(stderr, " flowop ...\n"); (void) fprintf(stderr, " flowop ...\n"); (void) fprintf(stderr, " flowop ...\n"); (void) fprintf(stderr, " }\n"); (void) fprintf(stderr, "\n"); } /* * Creates a thread for the supplied threadflow. If interprocess * shared memory is desired, then increments the amount of shared * memory needed by the amount specified in the threadflow's * tf_memsize parameter. The thread starts in routine * flowop_start() with a poineter to the threadflow supplied * as the argument. */ static int threadflow_createthread(threadflow_t *threadflow) { int fp = 0; filebench_log(LOG_DEBUG_SCRIPT, "Creating thread %s, memory = %ld", threadflow->tf_name, *threadflow->tf_memsize); if (threadflow->tf_attrs & THREADFLOW_USEISM) filebench_shm->shm_required += (*threadflow->tf_memsize); if (pthread_create(&threadflow->tf_tid, NULL, (void *(*)(void*))flowop_start, threadflow) != 0) { filebench_log(LOG_ERROR, "thread create failed"); filebench_shutdown(1); } /* XXX */ return (fp < 0); } #ifndef USE_PROCESS_MODEL static procflow_t *my_procflow; /* * Terminates (exits) all the threads of the procflow (process). * The procflow is determined from a process private pointer * initialized by threadflow_init(). */ /* ARGSUSED */ static void threadflow_cancel(int arg1) { threadflow_t *threadflow = my_procflow->pf_threads; #ifdef HAVE_LWPS filebench_log(LOG_DEBUG_IMPL, "Thread signal handler on tid %d", _lwp_self()); #endif my_procflow->pf_running = 0; exit(0); while (threadflow) { if (threadflow->tf_tid) { (void) pthread_cancel(threadflow->tf_tid); filebench_log(LOG_DEBUG_IMPL, "Thread %d cancelled...", threadflow->tf_tid); } threadflow = threadflow->tf_next; } } #endif /* USE_PROCESS_MODEL */ /* * Creates threads for the threadflows associated with a procflow. * The routine iterates through the list of threadflows in the * supplied procflow's pf_threads list. For each threadflow on * the list, it defines tf_instances number of cloned * threadflows, and then calls threadflow_createthread() for * each to create and start the actual operating system thread. * Note that each of the newly defined threadflows will be linked * into the procflows threadflow list, but at the head of the * list, so they will not become part of the supplied set. After * all the threads have been created, threadflow_init enters * a join loop for all the threads in the newly defined * threadflows. Once all the created threads have exited, * threadflow_init will return 0. If errors are encountered, it * will return a non zero value. */ int threadflow_init(procflow_t *procflow) { threadflow_t *threadflow = procflow->pf_threads; int ret = 0; (void) ipc_mutex_lock(&filebench_shm->threadflow_lock); #ifndef USE_PROCESS_MODEL my_procflow = procflow; (void) signal(SIGUSR1, threadflow_cancel); #endif while (threadflow) { threadflow_t *newthread; int i; filebench_log(LOG_VERBOSE, "Starting %lld %s threads", *(threadflow->tf_instances), threadflow->tf_name); for (i = 1; i < *threadflow->tf_instances; i++) { /* Create threads */ newthread = threadflow_define_common(procflow, threadflow->tf_name, threadflow, i + 1); if (newthread == NULL) return (-1); ret += threadflow_createthread(newthread); } newthread = threadflow_define_common(procflow, threadflow->tf_name, threadflow, 1); if (newthread == NULL) return (-1); /* Create threads */ ret += threadflow_createthread(newthread); threadflow = threadflow->tf_next; } threadflow = procflow->pf_threads; (void) ipc_mutex_unlock(&filebench_shm->threadflow_lock); while (threadflow) { void *status; if (threadflow->tf_tid) (void) pthread_join(threadflow->tf_tid, &status); ret += *(int *)status; threadflow = threadflow->tf_next; } procflow->pf_running = 0; return (ret); } /* * Tells the threadflow's thread to stop and optionally signals * its associated process to end the thread. */ static void threadflow_kill(threadflow_t *threadflow) { /* Tell thread to finish */ threadflow->tf_abort = 1; #ifdef USE_PROCESS_MODEL #ifdef HAVE_SIGSEND (void) sigsend(P_PID, threadflow->tf_process->pf_pid, SIGUSR1); #else (void) kill(threadflow->tf_process->pf_pid, SIGUSR1); #endif #else /* USE_PROCESS_MODEL */ threadflow->tf_process->pf_running = 0; #endif /* USE_PROCESS_MODEL */ } /* * Deletes the specified threadflow from the specified threadflow * list after first terminating the threadflow's thread, deleting * the threadflow's flowops, and finally freeing the threadflow * entity. It also subtracts the threadflow's shared memory * requirements from the total amount required, shm_required. If * the specified threadflow is found, returns 0, otherwise * returns -1. */ static int threadflow_delete(threadflow_t **threadlist, threadflow_t *threadflow) { threadflow_t *entry = *threadlist; filebench_log(LOG_DEBUG_IMPL, "Deleting thread: (%s-%d)", threadflow->tf_name, threadflow->tf_instance); if (threadflow->tf_attrs & THREADFLOW_USEISM) { filebench_shm->shm_required -= (*threadflow->tf_memsize); } if (threadflow == *threadlist) { /* First on list */ filebench_log(LOG_DEBUG_IMPL, "Deleted thread: (%s-%d)", threadflow->tf_name, threadflow->tf_instance); threadflow_kill(threadflow); flowop_delete_all(&threadflow->tf_ops); *threadlist = threadflow->tf_next; ipc_free(FILEBENCH_THREADFLOW, (char *)threadflow); return (0); } while (entry->tf_next) { filebench_log(LOG_DEBUG_IMPL, "Delete thread: (%s-%d) == (%s-%d)", entry->tf_next->tf_name, entry->tf_next->tf_instance, threadflow->tf_name, threadflow->tf_instance); if (threadflow == entry->tf_next) { /* Delete */ filebench_log(LOG_DEBUG_IMPL, "Deleted thread: (%s-%d)", entry->tf_next->tf_name, entry->tf_next->tf_instance); threadflow_kill(entry->tf_next); flowop_delete_all(&entry->tf_next->tf_ops); ipc_free(FILEBENCH_THREADFLOW, (char *)threadflow); entry->tf_next = entry->tf_next->tf_next; return (0); } entry = entry->tf_next; } return (-1); } /* * Given a pointer to the thread list of a procflow, cycles * through all the threadflows on the list, deleting each one * except the FLOW_MASTER. */ void threadflow_delete_all(threadflow_t **threadlist) { threadflow_t *threadflow = *threadlist; (void) ipc_mutex_lock(&filebench_shm->threadflow_lock); filebench_log(LOG_DEBUG_IMPL, "Deleting all threads"); while (threadflow) { if (threadflow->tf_instance && (threadflow->tf_instance == FLOW_MASTER)) { threadflow = threadflow->tf_next; continue; } (void) threadflow_delete(threadlist, threadflow); threadflow = threadflow->tf_next; } (void) ipc_mutex_unlock(&filebench_shm->threadflow_lock); } /* * Waits till all threadflows are started, or a timeout occurs. * Checks through the list of threadflows, waiting up to 10 * seconds for each one to set its tf_running flag to 1. If not * set after 10 seconds, continues on to the next threadflow * anyway. */ void threadflow_allstarted(pid_t pid, threadflow_t *threadflow) { (void) ipc_mutex_lock(&filebench_shm->threadflow_lock); while (threadflow) { int waits; if ((threadflow->tf_instance == 0) || (threadflow->tf_instance == FLOW_MASTER)) { threadflow = threadflow->tf_next; continue; } filebench_log(LOG_DEBUG_IMPL, "Checking pid %d thread %s-%d", pid, threadflow->tf_name, threadflow->tf_instance); waits = 10; while (waits && threadflow->tf_running == 0) { (void) ipc_mutex_unlock( &filebench_shm->threadflow_lock); if (waits < 3) filebench_log(LOG_INFO, "Waiting for pid %d thread %s-%d", pid, threadflow->tf_name, threadflow->tf_instance); (void) sleep(1); (void) ipc_mutex_lock(&filebench_shm->threadflow_lock); waits--; } threadflow = threadflow->tf_next; } (void) ipc_mutex_unlock(&filebench_shm->threadflow_lock); } /* * Create an in-memory thread object linked to a parent procflow. * A threadflow entity is allocated from shared memory and * initialized from the "inherit" threadflow if supplied, * otherwise to zeros. The threadflow is assigned a unique * thread id, the supplied instance number, the supplied name * and added to the procflow's pf_thread list. If no name is * supplied or the threadflow can't be allocated, NULL is * returned Otherwise a pointer to the newly allocated threadflow * is returned. * * The filebench_shm->threadflow_lock must be held by the caller. */ static threadflow_t * threadflow_define_common(procflow_t *procflow, char *name, threadflow_t *inherit, int instance) { threadflow_t *threadflow; threadflow_t **threadlistp = &procflow->pf_threads; if (name == NULL) return (NULL); threadflow = (threadflow_t *)ipc_malloc(FILEBENCH_THREADFLOW); if (threadflow == NULL) return (NULL); if (inherit) (void) memcpy(threadflow, inherit, sizeof (threadflow_t)); else (void) memset(threadflow, 0, sizeof (threadflow_t)); threadflow->tf_utid = ++filebench_shm->utid; threadflow->tf_instance = instance; (void) strcpy(threadflow->tf_name, name); threadflow->tf_process = procflow; filebench_log(LOG_DEBUG_IMPL, "Defining thread %s-%d", name, instance); /* Add threadflow to list */ if (*threadlistp == NULL) { *threadlistp = threadflow; threadflow->tf_next = NULL; } else { threadflow->tf_next = *threadlistp; *threadlistp = threadflow; } return (threadflow); } /* * Create an in memory FLOW_MASTER thread object as described * by the syntax. Acquire the filebench_shm->threadflow_lock and * call threadflow_define_common() to create a threadflow entity. * Set the number of instances to create at runtime, * tf_instances, to "instances". Return the threadflow pointer * returned by the threadflow_define_common call. */ threadflow_t * threadflow_define(procflow_t *procflow, char *name, threadflow_t *inherit, var_integer_t instances) { threadflow_t *threadflow; (void) ipc_mutex_lock(&filebench_shm->threadflow_lock); if ((threadflow = threadflow_define_common(procflow, name, inherit, FLOW_MASTER)) == NULL) return (NULL); threadflow->tf_instances = instances; (void) ipc_mutex_unlock(&filebench_shm->threadflow_lock); return (threadflow); } /* * Searches the provided threadflow list for the named threadflow. * A pointer to the threadflow is returned, or NULL if threadflow * is not found. */ threadflow_t * threadflow_find(threadflow_t *threadlist, char *name) { threadflow_t *threadflow = threadlist; (void) ipc_mutex_lock(&filebench_shm->threadflow_lock); while (threadflow) { if (strcmp(name, threadflow->tf_name) == 0) { (void) ipc_mutex_unlock( &filebench_shm->threadflow_lock); return (threadflow); } threadflow = threadflow->tf_next; } (void) ipc_mutex_unlock(&filebench_shm->threadflow_lock); return (NULL); }