diff options
author | Antonin Kral <a.kral@bobek.cz> | 2011-03-17 00:05:43 +0100 |
---|---|---|
committer | Antonin Kral <a.kral@bobek.cz> | 2011-03-17 00:05:43 +0100 |
commit | 582fc32574a3b158c81e49cb00e6ae59205e66ba (patch) | |
tree | ac64a3243e0d2121709f685695247052858115c8 /util/concurrency | |
parent | 2761bffa96595ac1698d86bbc2e95ebb0d4d6e93 (diff) | |
download | mongodb-582fc32574a3b158c81e49cb00e6ae59205e66ba.tar.gz |
Imported Upstream version 1.8.0
Diffstat (limited to 'util/concurrency')
-rw-r--r-- | util/concurrency/README (renamed from util/concurrency/readme.txt) | 20 | ||||
-rw-r--r-- | util/concurrency/list.h | 96 | ||||
-rw-r--r-- | util/concurrency/msg.h | 8 | ||||
-rw-r--r-- | util/concurrency/mutex.h | 129 | ||||
-rw-r--r-- | util/concurrency/mvar.h | 28 | ||||
-rw-r--r-- | util/concurrency/race.h | 72 | ||||
-rw-r--r-- | util/concurrency/rwlock.h | 170 | ||||
-rwxr-xr-x | util/concurrency/shared_mutex_win.hpp | 573 | ||||
-rw-r--r-- | util/concurrency/spin_lock.cpp | 34 | ||||
-rw-r--r-- | util/concurrency/spin_lock.h | 26 | ||||
-rw-r--r-- | util/concurrency/synchronization.cpp | 56 | ||||
-rw-r--r-- | util/concurrency/synchronization.h | 73 | ||||
-rw-r--r-- | util/concurrency/task.cpp | 56 | ||||
-rw-r--r-- | util/concurrency/task.h | 14 | ||||
-rw-r--r-- | util/concurrency/thread_pool.cpp | 45 | ||||
-rw-r--r-- | util/concurrency/thread_pool.h | 110 | ||||
-rw-r--r-- | util/concurrency/value.h | 24 | ||||
-rw-r--r-- | util/concurrency/vars.cpp | 24 |
18 files changed, 1237 insertions, 321 deletions
diff --git a/util/concurrency/readme.txt b/util/concurrency/README index 6f308f5..1d72a1c 100644 --- a/util/concurrency/readme.txt +++ b/util/concurrency/README @@ -1,11 +1,10 @@ -util/concurrency/ files
-
-list.h - a list class that is lock-free for reads
-rwlock.h - read/write locks (RWLock)
-msg.h - message passing between threads
-task.h - an abstraction around threads
-mutex.h - small enhancements that wrap boost::mutex
-thread_pool.h
+util/concurrency/ files + +list.h - a list class that is lock-free for reads +rwlock.h - read/write locks (RWLock) +msg.h - message passing between threads +task.h - an abstraction around threads +mutex.h - small enhancements that wrap boost::mutex mvar.h This is based on haskell's MVar synchronization primitive: http://www.haskell.org/ghc/docs/latest/html/libraries/base-4.2.0.0/Control-Concurrent-MVar.html @@ -13,3 +12,8 @@ mvar.h You can also think of it as a box that can be either full or empty. value.h Atomic wrapper for values/objects that are copy constructable / assignable +thread_pool.h +spinlock.h +synchronization.h + A class to establish a sinchronization point between two threads. One thread is the waiter and one + is the notifier. After the notification event, both proceed normally. diff --git a/util/concurrency/list.h b/util/concurrency/list.h index 968ff4d..e5eaec6 100644 --- a/util/concurrency/list.h +++ b/util/concurrency/list.h @@ -18,64 +18,64 @@ #pragma once -namespace mongo { +namespace mongo { -/* this class uses a mutex for writes, but not for reads. - we can get fancier later... + /* this class uses a mutex for writes, but not for reads. + we can get fancier later... - struct Member : public List1<Member>::Base { - const char *host; - int port; - }; - List1<Member> _members; - _members.head()->next(); + struct Member : public List1<Member>::Base { + const char *host; + int port; + }; + List1<Member> _members; + _members.head()->next(); -*/ -template<typename T> -class List1 : boost::noncopyable { -public: - /* next() and head() return 0 at end of list */ + */ + template<typename T> + class List1 : boost::noncopyable { + public: + /* next() and head() return 0 at end of list */ - List1() : _head(0), _m("List1"), _orphans(0) { } + List1() : _head(0), _m("List1"), _orphans(0) { } - class Base { - friend class List1; - T *_next; - public: - T* next() const { return _next; } - }; + class Base { + friend class List1; + T *_next; + public: + T* next() const { return _next; } + }; - T* head() const { return _head; } + T* head() const { return _head; } - void push(T* t) { - scoped_lock lk(_m); - t->_next = _head; - _head = t; - } + void push(T* t) { + scoped_lock lk(_m); + t->_next = _head; + _head = t; + } - // intentionally leak. - void orphanAll() { - _head = 0; - } + // intentionally leak. + void orphanAll() { + _head = 0; + } - /* t is not deleted, but is removed from the list. (orphaned) */ - void orphan(T* t) { - scoped_lock lk(_m); - T *&prev = _head; - T *n = prev; - while( n != t ) { - prev = n->_next; - n = prev; + /* t is not deleted, but is removed from the list. (orphaned) */ + void orphan(T* t) { + scoped_lock lk(_m); + T *&prev = _head; + T *n = prev; + while( n != t ) { + prev = n->_next; + n = prev; + } + prev = t->_next; + if( ++_orphans > 500 ) + log() << "warning orphans=" << _orphans << '\n'; } - prev = t->_next; - if( ++_orphans > 500 ) - log() << "warning orphans=" << _orphans << '\n'; - } -private: - T *_head; - mutex _m; - int _orphans; -}; + private: + T *_head; + mongo::mutex _m; + int _orphans; + }; }; diff --git a/util/concurrency/msg.h b/util/concurrency/msg.h index a5b07d3..f7c6788 100644 --- a/util/concurrency/msg.h +++ b/util/concurrency/msg.h @@ -21,14 +21,14 @@ #include <deque> #include "task.h" -namespace mongo { +namespace mongo { - namespace task { + namespace task { typedef boost::function<void()> lam; /** typical usage is: task::fork( new Server("threadname") ); */ - class Server : public Task { + class Server : public Task { public: /** send a message to the port */ void send(lam); @@ -47,7 +47,7 @@ namespace mongo { private: virtual bool initClient() { return true; } - virtual string name() { return _name; } + virtual string name() const { return _name; } void doWork(); deque<lam> d; boost::mutex m; diff --git a/util/concurrency/mutex.h b/util/concurrency/mutex.h index 797ab77..c463498 100644 --- a/util/concurrency/mutex.h +++ b/util/concurrency/mutex.h @@ -20,13 +20,29 @@ #include <map> #include <set> -namespace mongo { +#include "../heapcheck.h" + +namespace mongo { - extern bool __destroyingStatics; class mutex; - // only used on _DEBUG builds: - class MutexDebugger { + inline boost::xtime incxtimemillis( long long s ) { + boost::xtime xt; + boost::xtime_get(&xt, boost::TIME_UTC); + xt.sec += (int)( s / 1000 ); + xt.nsec += (int)(( s % 1000 ) * 1000000); + if ( xt.nsec >= 1000000000 ) { + xt.nsec -= 1000000000; + xt.sec++; + } + return xt; + } + + /** only used on _DEBUG builds. + MutexDebugger checks that we always acquire locks for multiple mutexes in a consistant (acyclic) order. + If we were inconsistent we could deadlock. + */ + class MutexDebugger { typedef const char * mid; // mid = mutex ID typedef map<mid,int> Preceeding; map< mid, int > maxNest; @@ -34,36 +50,41 @@ namespace mongo { map< mid, set<mid> > followers; boost::mutex &x; unsigned magic; + + void aBreakPoint() { } // for debugging public: // set these to create an assert that // b must never be locked before a - // so + // so // a.lock(); b.lock(); is fine // b.lock(); alone is fine too // only checked on _DEBUG builds. string a,b; - - void aBreakPoint(){} + + /** outputs some diagnostic info on mutexes (on _DEBUG builds) */ void programEnding(); + MutexDebugger(); + void entering(mid m) { - if( magic != 0x12345678 ) return; + if( this == 0 ) return; + assert( magic == 0x12345678 ); Preceeding *_preceeding = us.get(); if( _preceeding == 0 ) us.reset( _preceeding = new Preceeding() ); Preceeding &preceeding = *_preceeding; - if( a == m ) { + if( a == m ) { aBreakPoint(); if( preceeding[b.c_str()] ) { - cout << "mutex problem " << b << " was locked before " << a << endl; + cout << "****** MutexDebugger error! warning " << b << " was locked before " << a << endl; assert(false); } } preceeding[m]++; - if( preceeding[m] > 1 ) { + if( preceeding[m] > 1 ) { // recursive re-locking. if( preceeding[m] > maxNest[m] ) maxNest[m] = preceeding[m]; @@ -75,19 +96,19 @@ namespace mongo { { boost::mutex::scoped_lock lk(x); followers[m]; - for( Preceeding::iterator i = preceeding.begin(); i != preceeding.end(); i++ ) { + for( Preceeding::iterator i = preceeding.begin(); i != preceeding.end(); i++ ) { if( m != i->first && i->second > 0 ) { followers[i->first].insert(m); - if( followers[m].count(i->first) != 0 ){ + if( followers[m].count(i->first) != 0 ) { failed = true; stringstream ss; mid bad = i->first; ss << "mutex problem" << - "\n when locking " << m << - "\n " << bad << " was already locked and should not be." - "\n set a and b above to debug.\n"; + "\n when locking " << m << + "\n " << bad << " was already locked and should not be." + "\n set a and b above to debug.\n"; stringstream q; - for( Preceeding::iterator i = preceeding.begin(); i != preceeding.end(); i++ ) { + for( Preceeding::iterator i = preceeding.begin(); i != preceeding.end(); i++ ) { if( i->first != m && i->first != bad && i->second > 0 ) q << " " << i->first << '\n'; } @@ -105,8 +126,8 @@ namespace mongo { assert( 0 ); } } - void leaving(mid m) { - if( magic != 0x12345678 ) return; + void leaving(mid m) { + if( this == 0 ) return; // still in startup pre-main() Preceeding& preceeding = *us.get(); preceeding[m]--; if( preceeding[m] < 0 ) { @@ -116,38 +137,67 @@ namespace mongo { } }; extern MutexDebugger &mutexDebugger; - + // If you create a local static instance of this class, that instance will be destroyed - // before all global static objects are destroyed, so __destroyingStatics will be set + // before all global static objects are destroyed, so _destroyingStatics will be set // to true before the global static variables are destroyed. class StaticObserver : boost::noncopyable { public: - ~StaticObserver() { __destroyingStatics = true; } + static bool _destroyingStatics; + ~StaticObserver() { _destroyingStatics = true; } }; - // On pthread systems, it is an error to destroy a mutex while held. Static global - // mutexes may be held upon shutdown in our implementation, and this way we avoid - // destroying them. + /** On pthread systems, it is an error to destroy a mutex while held. Static global + * mutexes may be held upon shutdown in our implementation, and this way we avoid + * destroying them. + * NOT recursive. + */ class mutex : boost::noncopyable { public: #if defined(_DEBUG) - const char *_name; + const char * const _name; #endif #if defined(_DEBUG) - mutex(const char *name) - : _name(name) + mutex(const char *name) + : _name(name) #else - mutex(const char *) + mutex(const char *) #endif - { - _m = new boost::mutex(); + { + _m = new boost::timed_mutex(); + IGNORE_OBJECT( _m ); // Turn-off heap checking on _m } ~mutex() { - if( !__destroyingStatics ) { + if( !StaticObserver::_destroyingStatics ) { + UNIGNORE_OBJECT( _m ); delete _m; } } + + class try_lock : boost::noncopyable { + public: + try_lock( mongo::mutex &m , int millis = 0 ) + : _l( m.boost() , incxtimemillis( millis ) ) , +#if BOOST_VERSION >= 103500 + ok( _l.owns_lock() ) +#else + ok( _l.locked() ) +#endif + { + } + + ~try_lock() { + } + + private: + boost::timed_mutex::scoped_timed_lock _l; + + public: + const bool ok; + }; + + class scoped_lock : boost::noncopyable { #if defined(_DEBUG) mongo::mutex *mut; @@ -159,20 +209,23 @@ namespace mongo { mutexDebugger.entering(mut->_name); #endif } - ~scoped_lock() { + ~scoped_lock() { #if defined(_DEBUG) mutexDebugger.leaving(mut->_name); #endif } - boost::mutex::scoped_lock &boost() { return _l; } + boost::timed_mutex::scoped_lock &boost() { return _l; } private: - boost::mutex::scoped_lock _l; + boost::timed_mutex::scoped_lock _l; }; + + private: - boost::mutex &boost() { return *_m; } - boost::mutex *_m; + + boost::timed_mutex &boost() { return *_m; } + boost::timed_mutex *_m; }; - + typedef mutex::scoped_lock scoped_lock; typedef boost::recursive_mutex::scoped_lock recursive_scoped_lock; diff --git a/util/concurrency/mvar.h b/util/concurrency/mvar.h index 7d17051..9c7a505 100644 --- a/util/concurrency/mvar.h +++ b/util/concurrency/mvar.h @@ -31,18 +31,18 @@ namespace mongo { // create an empty MVar MVar() - : _state(EMPTY) + : _state(EMPTY) {} // creates a full MVar MVar(const T& val) - : _state(FULL) - , _value(val) + : _state(FULL) + , _value(val) {} // puts val into the MVar and returns true or returns false if full // never blocks - bool tryPut(const T& val){ + bool tryPut(const T& val) { // intentionally repeat test before and after lock if (_state == FULL) return false; Mutex::scoped_lock lock(_mutex); @@ -59,17 +59,17 @@ namespace mongo { // puts val into the MVar // will block if the MVar is already full - void put(const T& val){ + void put(const T& val) { Mutex::scoped_lock lock(_mutex); - while (!tryPut(val)){ - // unlocks lock while waiting and relocks before returning + while (!tryPut(val)) { + // unlocks lock while waiting and relocks before returning _condition.wait(lock); - } + } } // takes val out of the MVar and returns true or returns false if empty // never blocks - bool tryTake(T& out){ + bool tryTake(T& out) { // intentionally repeat test before and after lock if (_state == EMPTY) return false; Mutex::scoped_lock lock(_mutex); @@ -86,14 +86,14 @@ namespace mongo { // takes val out of the MVar // will block if the MVar is empty - T take(){ + T take() { T ret = T(); Mutex::scoped_lock lock(_mutex); - while (!tryTake(ret)){ - // unlocks lock while waiting and relocks before returning + while (!tryTake(ret)) { + // unlocks lock while waiting and relocks before returning _condition.wait(lock); - } + } return ret; } @@ -102,7 +102,7 @@ namespace mongo { // Note: this is fast because there is no locking, but state could // change before you get a chance to act on it. // Mainly useful for sanity checks / asserts. - State getState(){ return _state; } + State getState() { return _state; } private: diff --git a/util/concurrency/race.h b/util/concurrency/race.h new file mode 100644 index 0000000..0b8338c --- /dev/null +++ b/util/concurrency/race.h @@ -0,0 +1,72 @@ +#pragma once + +#include "../goodies.h" // printStackTrace + +namespace mongo { + + /** some self-testing of synchronization and attempts to catch race conditions. + + use something like: + + CodeBlock myBlock; + + void foo() { + CodeBlock::Within w(myBlock); + ... + } + + In _DEBUG builds, will (sometimes/maybe) fail if two threads are in the same code block at + the same time. Also detects and disallows recursion. + */ + +#if defined(_DEBUG) + + class CodeBlock { + volatile int n; + unsigned tid; + void fail() { + log() << "synchronization (race condition) failure" << endl; + printStackTrace(); + abort(); + } + void enter() { + if( ++n != 1 ) fail(); +#if defined(_WIN32) + tid = GetCurrentThreadId(); +#endif + } + void leave() { + if( --n != 0 ) fail(); + } + public: + CodeBlock() : n(0) { } + + class Within { + CodeBlock& _s; + public: + Within(CodeBlock& s) : _s(s) { _s.enter(); } + ~Within() { _s.leave(); } + }; + + void assertWithin() { + assert( n == 1 ); +#if defined(_WIN32) + assert( GetCurrentThreadId() == tid ); +#endif + } + }; + +#else + + class CodeBlock{ + public: + class Within { + public: + Within(CodeBlock&) { } + }; + void assertWithin() { } + }; + +#endif + +} diff --git a/util/concurrency/rwlock.h b/util/concurrency/rwlock.h index 75169b2..ca81a9f 100644 --- a/util/concurrency/rwlock.h +++ b/util/concurrency/rwlock.h @@ -1,4 +1,4 @@ -// rwlock.h +// @file rwlock.h generic reader-writer lock (cross platform support) /* * Copyright (C) 2010 10gen Inc. @@ -19,30 +19,79 @@ #pragma once #include "mutex.h" +#include "../time_support.h" + +// this requires Vista+ to work +// it works better than sharable_mutex under high contention +//#define MONGO_USE_SRW_ON_WINDOWS 1 + +#if !defined(MONGO_USE_SRW_ON_WINDOWS) #if BOOST_VERSION >= 103500 - #define BOOST_RWLOCK +# define BOOST_RWLOCK #else +# if defined(_WIN32) +# error need boost >= 1.35 for windows +# endif +# include <pthread.h> +#endif - #if defined(_WIN32) - #error need boost >= 1.35 for windows - #endif - - #include <pthread.h> - +#if defined(_WIN32) +# include "shared_mutex_win.hpp" +namespace mongo { + typedef boost::modified_shared_mutex shared_mutex; +} +# undef assert +# define assert MONGO_assert +#elif defined(BOOST_RWLOCK) +# include <boost/thread/shared_mutex.hpp> +# undef assert +# define assert MONGO_assert #endif -#ifdef BOOST_RWLOCK -#include <boost/thread/shared_mutex.hpp> -#undef assert -#define assert MONGO_assert #endif namespace mongo { -#ifdef BOOST_RWLOCK +#if defined(MONGO_USE_SRW_ON_WINDOWS) && defined(_WIN32) + class RWLock { - boost::shared_mutex _m; + public: + RWLock(const char *) { InitializeSRWLock(&_lock); } + ~RWLock() { } + void lock() { AcquireSRWLockExclusive(&_lock); } + void unlock() { ReleaseSRWLockExclusive(&_lock); } + void lock_shared() { AcquireSRWLockShared(&_lock); } + void unlock_shared() { ReleaseSRWLockShared(&_lock); } + bool lock_shared_try( int millis ) { + unsigned long long end = curTimeMicros64() + millis*1000; + while( 1 ) { + if( TryAcquireSRWLockShared(&_lock) ) + return true; + if( curTimeMicros64() >= end ) + break; + Sleep(1); + } + return false; + } + bool lock_try( int millis = 0 ) { + unsigned long long end = curTimeMicros64() + millis*1000; + while( 1 ) { + if( TryAcquireSRWLockExclusive(&_lock) ) + return true; + if( curTimeMicros64() >= end ) + break; + Sleep(1); + } + return false; + } + private: + SRWLOCK _lock; + }; + +#elif defined(BOOST_RWLOCK) + class RWLock { + shared_mutex _m; public: #if defined(_DEBUG) const char *_name; @@ -50,40 +99,40 @@ namespace mongo { #else RWLock(const char *) { } #endif - void lock(){ + void lock() { _m.lock(); #if defined(_DEBUG) mutexDebugger.entering(_name); #endif } - void unlock(){ + void unlock() { #if defined(_DEBUG) mutexDebugger.leaving(_name); #endif _m.unlock(); } - - void lock_shared(){ + + void lock_shared() { _m.lock_shared(); } - - void unlock_shared(){ + + void unlock_shared() { _m.unlock_shared(); } - bool lock_shared_try( int millis ){ + bool lock_shared_try( int millis ) { boost::system_time until = get_system_time(); until += boost::posix_time::milliseconds(millis); - if( _m.timed_lock_shared( until ) ) { + if( _m.timed_lock_shared( until ) ) { return true; } return false; } - bool lock_try( int millis = 0 ){ + bool lock_try( int millis = 0 ) { boost::system_time until = get_system_time(); until += boost::posix_time::milliseconds(millis); - if( _m.timed_lock( until ) ) { + if( _m.timed_lock( until ) ) { #if defined(_DEBUG) mutexDebugger.entering(_name); #endif @@ -98,7 +147,7 @@ namespace mongo { class RWLock { pthread_rwlock_t _lock; - inline void check( int x ){ + inline void check( int x ) { if( x == 0 ) return; log() << "pthread rwlock failed: " << x << endl; @@ -114,40 +163,40 @@ namespace mongo { #endif check( pthread_rwlock_init( &_lock , 0 ) ); } - - ~RWLock(){ - if ( ! __destroyingStatics ){ + + ~RWLock() { + if ( ! StaticObserver::_destroyingStatics ) { check( pthread_rwlock_destroy( &_lock ) ); } } - void lock(){ + void lock() { check( pthread_rwlock_wrlock( &_lock ) ); #if defined(_DEBUG) mutexDebugger.entering(_name); #endif } - void unlock(){ + void unlock() { #if defined(_DEBUG) mutexDebugger.leaving(_name); #endif check( pthread_rwlock_unlock( &_lock ) ); } - - void lock_shared(){ + + void lock_shared() { check( pthread_rwlock_rdlock( &_lock ) ); } - - void unlock_shared(){ + + void unlock_shared() { check( pthread_rwlock_unlock( &_lock ) ); } - - bool lock_shared_try( int millis ){ + + bool lock_shared_try( int millis ) { return _try( millis , false ); } - bool lock_try( int millis = 0 ){ - if( _try( millis , true ) ) { + bool lock_try( int millis = 0 ) { + if( _try( millis , true ) ) { #if defined(_DEBUG) mutexDebugger.entering(_name); #endif @@ -156,65 +205,66 @@ namespace mongo { return false; } - bool _try( int millis , bool write ){ + bool _try( int millis , bool write ) { while ( true ) { - int x = write ? - pthread_rwlock_trywrlock( &_lock ) : - pthread_rwlock_tryrdlock( &_lock ); - + int x = write ? + pthread_rwlock_trywrlock( &_lock ) : + pthread_rwlock_tryrdlock( &_lock ); + if ( x <= 0 ) { return true; } - + if ( millis-- <= 0 ) return false; - - if ( x == EBUSY ){ + + if ( x == EBUSY ) { sleepmillis(1); continue; } check(x); - } - + } + return false; } }; - #endif + /** throws on failure to acquire in the specified time period. */ class rwlock_try_write { - RWLock& _l; public: struct exception { }; rwlock_try_write(RWLock& l, int millis = 0) : _l(l) { - if( !l.lock_try(millis) ) throw exception(); + if( !l.lock_try(millis) ) + throw exception(); } ~rwlock_try_write() { _l.unlock(); } + private: + RWLock& _l; }; - /* scoped lock */ - struct rwlock { + /* scoped lock for RWLock */ + class rwlock { + public: rwlock( const RWLock& lock , bool write , bool alreadyHaveLock = false ) - : _lock( (RWLock&)lock ) , _write( write ){ - - if ( ! alreadyHaveLock ){ + : _lock( (RWLock&)lock ) , _write( write ) { + if ( ! alreadyHaveLock ) { if ( _write ) _lock.lock(); else _lock.lock_shared(); } } - - ~rwlock(){ + ~rwlock() { if ( _write ) _lock.unlock(); else _lock.unlock_shared(); } - + private: RWLock& _lock; - bool _write; + const bool _write; }; } diff --git a/util/concurrency/shared_mutex_win.hpp b/util/concurrency/shared_mutex_win.hpp new file mode 100755 index 0000000..5356cf2 --- /dev/null +++ b/util/concurrency/shared_mutex_win.hpp @@ -0,0 +1,573 @@ +#ifndef BOOST_THREAD_WIN32_SHARED_MUTEX_HPP_MODIFIED
+#define BOOST_THREAD_WIN32_SHARED_MUTEX_HPP_MODIFIED
+
+// (C) Copyright 2006-8 Anthony Williams
+//
+// Distributed under the Boost Software License, Version 1.0. (See
+// accompanying file LICENSE_1_0.txt or copy at
+// http://www.boost.org/LICENSE_1_0.txt)
+
+// MongoDB :
+//
+// Slightly modified boost file to not die above 127 pending writes
+//
+
+#include <boost/assert.hpp>
+#include <boost/detail/interlocked.hpp>
+#include <boost/thread/win32/thread_primitives.hpp>
+#include <boost/static_assert.hpp>
+#include <limits.h>
+#include <boost/utility.hpp>
+#include <boost/thread/thread_time.hpp>
+
+#include <boost/config/abi_prefix.hpp>
+
+namespace boost
+{
+ class modified_shared_mutex:
+ private boost::noncopyable
+ {
+ private:
+ struct state_data
+ {
+ unsigned shared_count:11,
+ shared_waiting:11,
+ exclusive:1,
+ upgrade:1,
+ exclusive_waiting:7,
+ exclusive_waiting_blocked:1;
+
+ friend bool operator==(state_data const& lhs,state_data const& rhs)
+ {
+ return *reinterpret_cast<unsigned const*>(&lhs)==*reinterpret_cast<unsigned const*>(&rhs);
+ }
+ };
+
+
+ template<typename T>
+ T interlocked_compare_exchange(T* target,T new_value,T comparand)
+ {
+ BOOST_STATIC_ASSERT(sizeof(T)==sizeof(long));
+ long const res=BOOST_INTERLOCKED_COMPARE_EXCHANGE(reinterpret_cast<long*>(target),
+ *reinterpret_cast<long*>(&new_value),
+ *reinterpret_cast<long*>(&comparand));
+ return *reinterpret_cast<T const*>(&res);
+ }
+
+ state_data state;
+ detail::win32::handle semaphores[2];
+ detail::win32::handle &unlock_sem;
+ detail::win32::handle &exclusive_sem;
+ detail::win32::handle upgrade_sem;
+
+ void release_waiters(state_data old_state)
+ {
+ if(old_state.exclusive_waiting)
+ {
+ BOOST_VERIFY(detail::win32::ReleaseSemaphore(exclusive_sem,1,0)!=0);
+ }
+
+ if(old_state.shared_waiting || old_state.exclusive_waiting)
+ {
+ BOOST_VERIFY(detail::win32::ReleaseSemaphore(unlock_sem,old_state.shared_waiting + (old_state.exclusive_waiting?1:0),0)!=0);
+ }
+ }
+
+
+ public:
+ modified_shared_mutex():
+ unlock_sem(semaphores[0]),
+ exclusive_sem(semaphores[1])
+ {
+ unlock_sem=detail::win32::create_anonymous_semaphore(0,LONG_MAX);
+ exclusive_sem=detail::win32::create_anonymous_semaphore(0,LONG_MAX);
+ upgrade_sem=detail::win32::create_anonymous_semaphore(0,LONG_MAX);
+ state_data state_={0};
+ state=state_;
+ }
+
+ ~modified_shared_mutex()
+ {
+ detail::win32::CloseHandle(upgrade_sem);
+ detail::win32::CloseHandle(unlock_sem);
+ detail::win32::CloseHandle(exclusive_sem);
+ }
+
+ bool try_lock_shared()
+ {
+ state_data old_state=state;
+ for(;;)
+ {
+ state_data new_state=old_state;
+ if(!new_state.exclusive && !new_state.exclusive_waiting_blocked)
+ {
+ ++new_state.shared_count;
+ }
+
+ state_data const current_state=interlocked_compare_exchange(&state,new_state,old_state);
+ if(current_state==old_state)
+ {
+ break;
+ }
+ old_state=current_state;
+ }
+ return !(old_state.exclusive| old_state.exclusive_waiting_blocked);
+ }
+
+ void lock_shared()
+ {
+ BOOST_VERIFY(timed_lock_shared(::boost::detail::get_system_time_sentinel()));
+ }
+
+ template<typename TimeDuration>
+ bool timed_lock_shared(TimeDuration const & relative_time)
+ {
+ return timed_lock_shared(get_system_time()+relative_time);
+ }
+
+ bool timed_lock_shared(boost::system_time const& wait_until)
+ {
+ for(;;)
+ {
+ state_data old_state=state;
+ for(;;)
+ {
+ state_data new_state=old_state;
+ if(new_state.exclusive || new_state.exclusive_waiting_blocked)
+ {
+ ++new_state.shared_waiting;
+ }
+ else
+ {
+ ++new_state.shared_count;
+ }
+
+ state_data const current_state=interlocked_compare_exchange(&state,new_state,old_state);
+ if(current_state==old_state)
+ {
+ break;
+ }
+ old_state=current_state;
+ }
+
+ if(!(old_state.exclusive| old_state.exclusive_waiting_blocked))
+ {
+ return true;
+ }
+
+ unsigned long const res=detail::win32::WaitForSingleObject(unlock_sem,::boost::detail::get_milliseconds_until(wait_until));
+ if(res==detail::win32::timeout)
+ {
+ for(;;)
+ {
+ state_data new_state=old_state;
+ if(new_state.exclusive || new_state.exclusive_waiting_blocked)
+ {
+ if(new_state.shared_waiting)
+ {
+ --new_state.shared_waiting;
+ }
+ }
+ else
+ {
+ ++new_state.shared_count;
+ }
+
+ state_data const current_state=interlocked_compare_exchange(&state,new_state,old_state);
+ if(current_state==old_state)
+ {
+ break;
+ }
+ old_state=current_state;
+ }
+
+ if(!(old_state.exclusive| old_state.exclusive_waiting_blocked))
+ {
+ return true;
+ }
+ return false;
+ }
+
+ BOOST_ASSERT(res==0);
+ }
+ }
+
+ void unlock_shared()
+ {
+ state_data old_state=state;
+ for(;;)
+ {
+ state_data new_state=old_state;
+ bool const last_reader=!--new_state.shared_count;
+
+ if(last_reader)
+ {
+ if(new_state.upgrade)
+ {
+ new_state.upgrade=false;
+ new_state.exclusive=true;
+ }
+ else
+ {
+ if(new_state.exclusive_waiting)
+ {
+ --new_state.exclusive_waiting;
+ new_state.exclusive_waiting_blocked=false;
+ }
+ new_state.shared_waiting=0;
+ }
+ }
+
+ state_data const current_state=interlocked_compare_exchange(&state,new_state,old_state);
+ if(current_state==old_state)
+ {
+ if(last_reader)
+ {
+ if(old_state.upgrade)
+ {
+ BOOST_VERIFY(detail::win32::ReleaseSemaphore(upgrade_sem,1,0)!=0);
+ }
+ else
+ {
+ release_waiters(old_state);
+ }
+ }
+ break;
+ }
+ old_state=current_state;
+ }
+ }
+
+ void lock()
+ {
+ BOOST_VERIFY(timed_lock(::boost::detail::get_system_time_sentinel()));
+ }
+
+ template<typename TimeDuration>
+ bool timed_lock(TimeDuration const & relative_time)
+ {
+ return timed_lock(get_system_time()+relative_time);
+ }
+
+ bool try_lock()
+ {
+ state_data old_state=state;
+ for(;;)
+ {
+ state_data new_state=old_state;
+ if(new_state.shared_count || new_state.exclusive)
+ {
+ return false;
+ }
+ else
+ {
+ new_state.exclusive=true;
+ }
+
+ state_data const current_state=interlocked_compare_exchange(&state,new_state,old_state);
+ if(current_state==old_state)
+ {
+ break;
+ }
+ old_state=current_state;
+ }
+ return true;
+ }
+
+
+ bool timed_lock(boost::system_time const& wait_until)
+ {
+ for(;;)
+ {
+ state_data old_state=state;
+
+ for(;;)
+ {
+ state_data new_state=old_state;
+ if(new_state.shared_count || new_state.exclusive)
+ {
+ if( new_state.exclusive_waiting == 127 ) // the maximum already!
+ break;
+ ++new_state.exclusive_waiting;
+ new_state.exclusive_waiting_blocked=true;
+ }
+ else
+ {
+ new_state.exclusive=true;
+ }
+
+ state_data const current_state=interlocked_compare_exchange(&state,new_state,old_state);
+ if(current_state==old_state)
+ {
+ break;
+ }
+ old_state=current_state;
+ }
+
+ if(!old_state.shared_count && !old_state.exclusive)
+ {
+ return true;
+ }
+ unsigned long const wait_res=detail::win32::WaitForMultipleObjects(2,semaphores,true,::boost::detail::get_milliseconds_until(wait_until));
+ if(wait_res==detail::win32::timeout)
+ {
+ for(;;)
+ {
+ state_data new_state=old_state;
+ if(new_state.shared_count || new_state.exclusive)
+ {
+ if(new_state.exclusive_waiting)
+ {
+ if(!--new_state.exclusive_waiting)
+ {
+ new_state.exclusive_waiting_blocked=false;
+ }
+ }
+ }
+ else
+ {
+ new_state.exclusive=true;
+ }
+
+ state_data const current_state=interlocked_compare_exchange(&state,new_state,old_state);
+ if(current_state==old_state)
+ {
+ break;
+ }
+ old_state=current_state;
+ }
+ if(!old_state.shared_count && !old_state.exclusive)
+ {
+ return true;
+ }
+ return false;
+ }
+ BOOST_ASSERT(wait_res<2);
+ }
+ }
+
+ void unlock()
+ {
+ state_data old_state=state;
+ for(;;)
+ {
+ state_data new_state=old_state;
+ new_state.exclusive=false;
+ if(new_state.exclusive_waiting)
+ {
+ --new_state.exclusive_waiting;
+ new_state.exclusive_waiting_blocked=false;
+ }
+ new_state.shared_waiting=0;
+
+ state_data const current_state=interlocked_compare_exchange(&state,new_state,old_state);
+ if(current_state==old_state)
+ {
+ break;
+ }
+ old_state=current_state;
+ }
+ release_waiters(old_state);
+ }
+
+ void lock_upgrade()
+ {
+ for(;;)
+ {
+ state_data old_state=state;
+ for(;;)
+ {
+ state_data new_state=old_state;
+ if(new_state.exclusive || new_state.exclusive_waiting_blocked || new_state.upgrade)
+ {
+ ++new_state.shared_waiting;
+ }
+ else
+ {
+ ++new_state.shared_count;
+ new_state.upgrade=true;
+ }
+
+ state_data const current_state=interlocked_compare_exchange(&state,new_state,old_state);
+ if(current_state==old_state)
+ {
+ break;
+ }
+ old_state=current_state;
+ }
+
+ if(!(old_state.exclusive|| old_state.exclusive_waiting_blocked|| old_state.upgrade))
+ {
+ return;
+ }
+
+ BOOST_VERIFY(!detail::win32::WaitForSingleObject(unlock_sem,detail::win32::infinite));
+ }
+ }
+
+ bool try_lock_upgrade()
+ {
+ state_data old_state=state;
+ for(;;)
+ {
+ state_data new_state=old_state;
+ if(new_state.exclusive || new_state.exclusive_waiting_blocked || new_state.upgrade)
+ {
+ return false;
+ }
+ else
+ {
+ ++new_state.shared_count;
+ new_state.upgrade=true;
+ }
+
+ state_data const current_state=interlocked_compare_exchange(&state,new_state,old_state);
+ if(current_state==old_state)
+ {
+ break;
+ }
+ old_state=current_state;
+ }
+ return true;
+ }
+
+ void unlock_upgrade()
+ {
+ state_data old_state=state;
+ for(;;)
+ {
+ state_data new_state=old_state;
+ new_state.upgrade=false;
+ bool const last_reader=!--new_state.shared_count;
+
+ if(last_reader)
+ {
+ if(new_state.exclusive_waiting)
+ {
+ --new_state.exclusive_waiting;
+ new_state.exclusive_waiting_blocked=false;
+ }
+ new_state.shared_waiting=0;
+ }
+
+ state_data const current_state=interlocked_compare_exchange(&state,new_state,old_state);
+ if(current_state==old_state)
+ {
+ if(last_reader)
+ {
+ release_waiters(old_state);
+ }
+ break;
+ }
+ old_state=current_state;
+ }
+ }
+
+ void unlock_upgrade_and_lock()
+ {
+ state_data old_state=state;
+ for(;;)
+ {
+ state_data new_state=old_state;
+ bool const last_reader=!--new_state.shared_count;
+
+ if(last_reader)
+ {
+ new_state.upgrade=false;
+ new_state.exclusive=true;
+ }
+
+ state_data const current_state=interlocked_compare_exchange(&state,new_state,old_state);
+ if(current_state==old_state)
+ {
+ if(!last_reader)
+ {
+ BOOST_VERIFY(!detail::win32::WaitForSingleObject(upgrade_sem,detail::win32::infinite));
+ }
+ break;
+ }
+ old_state=current_state;
+ }
+ }
+
+ void unlock_and_lock_upgrade()
+ {
+ state_data old_state=state;
+ for(;;)
+ {
+ state_data new_state=old_state;
+ new_state.exclusive=false;
+ new_state.upgrade=true;
+ ++new_state.shared_count;
+ if(new_state.exclusive_waiting)
+ {
+ --new_state.exclusive_waiting;
+ new_state.exclusive_waiting_blocked=false;
+ }
+ new_state.shared_waiting=0;
+
+ state_data const current_state=interlocked_compare_exchange(&state,new_state,old_state);
+ if(current_state==old_state)
+ {
+ break;
+ }
+ old_state=current_state;
+ }
+ release_waiters(old_state);
+ }
+
+ void unlock_and_lock_shared()
+ {
+ state_data old_state=state;
+ for(;;)
+ {
+ state_data new_state=old_state;
+ new_state.exclusive=false;
+ ++new_state.shared_count;
+ if(new_state.exclusive_waiting)
+ {
+ --new_state.exclusive_waiting;
+ new_state.exclusive_waiting_blocked=false;
+ }
+ new_state.shared_waiting=0;
+
+ state_data const current_state=interlocked_compare_exchange(&state,new_state,old_state);
+ if(current_state==old_state)
+ {
+ break;
+ }
+ old_state=current_state;
+ }
+ release_waiters(old_state);
+ }
+
+ void unlock_upgrade_and_lock_shared()
+ {
+ state_data old_state=state;
+ for(;;)
+ {
+ state_data new_state=old_state;
+ new_state.upgrade=false;
+ if(new_state.exclusive_waiting)
+ {
+ --new_state.exclusive_waiting;
+ new_state.exclusive_waiting_blocked=false;
+ }
+ new_state.shared_waiting=0;
+
+ state_data const current_state=interlocked_compare_exchange(&state,new_state,old_state);
+ if(current_state==old_state)
+ {
+ break;
+ }
+ old_state=current_state;
+ }
+ release_waiters(old_state);
+ }
+
+ };
+}
+
+#include <boost/config/abi_suffix.hpp>
+
+#endif
diff --git a/util/concurrency/spin_lock.cpp b/util/concurrency/spin_lock.cpp index b3e689a..0f33609 100644 --- a/util/concurrency/spin_lock.cpp +++ b/util/concurrency/spin_lock.cpp @@ -16,18 +16,29 @@ * along with this program. If not, see <http://www.gnu.org/licenses/>. */ +#include "pch.h" #include <time.h> #include "spin_lock.h" namespace mongo { - SpinLock::SpinLock() : _locked( false ){} - - SpinLock::~SpinLock(){} + SpinLock::~SpinLock() { +#if defined(_WIN32) + DeleteCriticalSection(&_cs); +#endif + } - void SpinLock::lock(){ + SpinLock::SpinLock() #if defined(__GCC_HAVE_SYNC_COMPARE_AND_SWAP_4) + : _locked( false ) { } +#elif defined(_WIN32) + { InitializeCriticalSectionAndSpinCount(&_cs, 4000); } +#else + : _mutex( "SpinLock" ) { } +#endif + void SpinLock::lock() { +#if defined(__GCC_HAVE_SYNC_COMPARE_AND_SWAP_4) // fast path if (!_locked && !__sync_lock_test_and_set(&_locked, true)) { return; @@ -44,21 +55,28 @@ namespace mongo { while (__sync_lock_test_and_set(&_locked, true)) { nanosleep(&t, NULL); } +#elif defined(_WIN32) + EnterCriticalSection(&_cs); #else - - // WARNING "TODO Missing spin lock in this platform." + // WARNING Missing spin lock in this platform. This can potentially + // be slow. + _mutex.lock(); #endif } - void SpinLock::unlock(){ + void SpinLock::unlock() { #if defined(__GCC_HAVE_SYNC_COMPARE_AND_SWAP_4) __sync_lock_release(&_locked); +#elif defined(WIN32) + + LeaveCriticalSection(&_cs); + #else - // WARNING "TODO Missing spin lock in this platform." + _mutex.unlock(); #endif } diff --git a/util/concurrency/spin_lock.h b/util/concurrency/spin_lock.h index 110290d..d5360f7 100644 --- a/util/concurrency/spin_lock.h +++ b/util/concurrency/spin_lock.h @@ -16,18 +16,18 @@ * along with this program. If not, see <http://www.gnu.org/licenses/>. */ -#ifndef CONCURRENCY_SPINLOCK_HEADER -#define CONCURRENCY_SPINLOCK_HEADER +#pragma once + +#include "pch.h" +#include "rwlock.h" namespace mongo { /** - * BIG WARNING - COMPILES, BUT NOT READY FOR USE - BIG WARNING - * - * The spinlock currently requires late GCC support - * routines. Support for other platforms will be added soon. + * The spinlock currently requires late GCC support routines to be efficient. + * Other platforms default to a mutex implemenation. */ - class SpinLock{ + class SpinLock { public: SpinLock(); ~SpinLock(); @@ -36,13 +36,19 @@ namespace mongo { void unlock(); private: - bool _locked; +#if defined(__GCC_HAVE_SYNC_COMPARE_AND_SWAP_4) + volatile bool _locked; +#elif defined(_WIN32) + CRITICAL_SECTION _cs; +#else + // default to a scoped mutex if not implemented + RWLock _mutex; +#endif // Non-copyable, non-assignable SpinLock(SpinLock&); SpinLock& operator=(SpinLock&); - }; + }; } // namespace mongo -#endif // CONCURRENCY_SPINLOCK_HEADER diff --git a/util/concurrency/synchronization.cpp b/util/concurrency/synchronization.cpp new file mode 100644 index 0000000..12e2894 --- /dev/null +++ b/util/concurrency/synchronization.cpp @@ -0,0 +1,56 @@ +// synchronization.cpp + +/* Copyright 2010 10gen Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "pch.h" +#include "synchronization.h" + +namespace mongo { + + Notification::Notification() : _mutex ( "Notification" ) , _notified( false ) { } + + Notification::~Notification() { } + + void Notification::waitToBeNotified() { + scoped_lock lock( _mutex ); + while ( ! _notified ) + _condition.wait( lock.boost() ); + } + + void Notification::notifyOne() { + scoped_lock lock( _mutex ); + assert( !_notified ); + _notified = true; + _condition.notify_one(); + } + + NotifyAll::NotifyAll() : _mutex("NotifyAll"), _counter(0) { } + + void NotifyAll::wait() { + scoped_lock lock( _mutex ); + unsigned long long old = _counter; + while( old == _counter ) { + _condition.wait( lock.boost() ); + } + } + + void NotifyAll::notifyAll() { + scoped_lock lock( _mutex ); + ++_counter; + _condition.notify_all(); + } + +} // namespace mongo diff --git a/util/concurrency/synchronization.h b/util/concurrency/synchronization.h new file mode 100644 index 0000000..ac2fcab --- /dev/null +++ b/util/concurrency/synchronization.h @@ -0,0 +1,73 @@ +// synchronization.h + +/* Copyright 2010 10gen Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <boost/thread/condition.hpp> +#include "mutex.h" + +namespace mongo { + + /* + * A class to establish a synchronization point between two threads. One thread is the waiter and one is + * the notifier. After the notification event, both proceed normally. + * + * This class is thread-safe. + */ + class Notification { + public: + Notification(); + ~Notification(); + + /* + * Blocks until the method 'notifyOne()' is called. + */ + void waitToBeNotified(); + + /* + * Notifies the waiter of '*this' that it can proceed. Can only be called once. + */ + void notifyOne(); + + private: + mongo::mutex _mutex; // protects state below + bool _notified; // was notifyOne() issued? + boost::condition _condition; // cond over _notified being true + }; + + /** establishes a synchronization point between threads. N threads are waits and one is notifier. + threadsafe. + */ + class NotifyAll : boost::noncopyable { + public: + NotifyAll(); + + /** awaits the next notifyAll() call by another thread. notifications that precede this + call are ignored -- we are looking for a fresh event. + */ + void wait(); + + /** may be called multiple times. notifies all waiters */ + void notifyAll(); + + private: + mongo::mutex _mutex; + unsigned long long _counter; + boost::condition _condition; + }; + +} // namespace mongo diff --git a/util/concurrency/task.cpp b/util/concurrency/task.cpp index 99288bb..d84cd71 100644 --- a/util/concurrency/task.cpp +++ b/util/concurrency/task.cpp @@ -17,16 +17,19 @@ */ #include "pch.h" + +#include <boost/thread/condition.hpp> + #include "task.h" #include "../goodies.h" #include "../unittest.h" -#include "boost/thread/condition.hpp" +#include "../time_support.h" -namespace mongo { +namespace mongo { - namespace task { + namespace task { - /*void foo() { + /*void foo() { boost::mutex m; boost::mutex::scoped_lock lk(m); boost::condition cond; @@ -34,21 +37,21 @@ namespace mongo { cond.notify_one(); }*/ - Task::Task() { + Task::Task() + : BackgroundJob( true /* deleteSelf */ ) { n = 0; repeat = 0; - deleteSelf = true; } void Task::halt() { repeat = 0; } - void Task::run() { + void Task::run() { assert( n == 0 ); while( 1 ) { n++; - try { + try { doWork(); - } + } catch(...) { } if( repeat == 0 ) break; @@ -62,11 +65,11 @@ namespace mongo { go(); } - void fork(Task *t) { + void fork(Task *t) { t->begin(); } - void repeat(Task *t, unsigned millis) { + void repeat(Task *t, unsigned millis) { t->repeat = millis; t->begin(); } @@ -107,7 +110,7 @@ namespace mongo { } } - void Server::send( lam msg ) { + void Server::send( lam msg ) { { boost::mutex::scoped_lock lk(m); d.push_back(msg); @@ -115,9 +118,9 @@ namespace mongo { c.notify_one(); } - void Server::doWork() { + void Server::doWork() { starting(); - while( 1 ) { + while( 1 ) { lam f; try { boost::mutex::scoped_lock lk(m); @@ -126,7 +129,7 @@ namespace mongo { f = d.front(); d.pop_front(); } - catch(...) { + catch(...) { log() << "ERROR exception in Server:doWork?" << endl; } try { @@ -138,27 +141,28 @@ namespace mongo { d.push_back(f); } } - } catch(std::exception& e) { - log() << "Server::doWork task:" << name() << " exception:" << e.what() << endl; - } - catch(const char *p) { - log() << "Server::doWork task:" << name() << " unknown c exception:" << - ((p&&strlen(p)<800)?p:"?") << endl; - } - catch(...) { - log() << "Server::doWork unknown exception task:" << name() << endl; + } + catch(std::exception& e) { + log() << "Server::doWork task:" << name() << " exception:" << e.what() << endl; + } + catch(const char *p) { + log() << "Server::doWork task:" << name() << " unknown c exception:" << + ((p&&strlen(p)<800)?p:"?") << endl; + } + catch(...) { + log() << "Server::doWork unknown exception task:" << name() << endl; } } } static Server *s; - static void abc(int i) { + static void abc(int i) { cout << "Hello " << i << endl; s->requeue(); } class TaskUnitTest : public mongo::UnitTest { public: - virtual void run() { + virtual void run() { lam f = boost::bind(abc, 3); //f(); diff --git a/util/concurrency/task.h b/util/concurrency/task.h index b3a2ece..d7b45ee 100644 --- a/util/concurrency/task.h +++ b/util/concurrency/task.h @@ -20,9 +20,9 @@ #include "../background.h" -namespace mongo { +namespace mongo { - namespace task { + namespace task { /** abstraction around threads. simpler than BackgroundJob which is used behind the scenes. allocate the Task dynamically. when the thread terminates, the Task object will delete itself. @@ -30,11 +30,11 @@ namespace mongo { class Task : private BackgroundJob { protected: virtual void doWork() = 0; // implement the task here. - virtual string name() = 0; // name the threada + virtual string name() const = 0; // name the threada public: Task(); - /** for a repeating task, stop after current invocation ends. can be called by other threads + /** for a repeating task, stop after current invocation ends. can be called by other threads as long as the Task is still in scope. */ void halt(); @@ -43,7 +43,7 @@ namespace mongo { friend void fork(Task* t); friend void repeat(Task* t, unsigned millis); virtual void run(); - virtual void ending() { } + //virtual void ending() { } void begin(); }; @@ -54,8 +54,8 @@ namespace mongo { void repeat(Task *t, unsigned millis); /*** Example *** - inline void sample() { - class Sample : public Task { + inline void sample() { + class Sample : public Task { public: int result; virtual void doWork() { result = 1234; } diff --git a/util/concurrency/thread_pool.cpp b/util/concurrency/thread_pool.cpp index 2caac1f..1c25884 100644 --- a/util/concurrency/thread_pool.cpp +++ b/util/concurrency/thread_pool.cpp @@ -20,8 +20,8 @@ #include "thread_pool.h" #include "mvar.h" -namespace mongo{ - namespace threadpool{ +namespace mongo { + namespace threadpool { // Worker thread class Worker : boost::noncopyable { @@ -34,12 +34,12 @@ namespace mongo{ // destructor will block until current operation is completed // Acts as a "join" on this thread - ~Worker(){ + ~Worker() { _task.put(Task()); _thread.join(); } - void set_task(Task& func){ + void set_task(Task& func) { assert(!func.empty()); assert(_is_done); _is_done = false; @@ -47,13 +47,13 @@ namespace mongo{ _task.put(func); } - private: + private: ThreadPool& _owner; MVar<Task> _task; bool _is_done; // only used for error detection boost::thread _thread; - void loop(){ + void loop() { while (true) { Task task = _task.take(); if (task.empty()) @@ -61,9 +61,11 @@ namespace mongo{ try { task(); - } catch (std::exception e){ + } + catch (std::exception e) { log() << "Unhandled exception in worker thread: " << e.what() << endl;; - } catch (...){ + } + catch (...) { log() << "Unhandled non-exception in worker thread" << endl; } _is_done = true; @@ -74,16 +76,15 @@ namespace mongo{ ThreadPool::ThreadPool(int nThreads) : _mutex("ThreadPool"), _tasksRemaining(0) - , _nThreads(nThreads) - { + , _nThreads(nThreads) { scoped_lock lock(_mutex); - while (nThreads-- > 0){ + while (nThreads-- > 0) { Worker* worker = new Worker(*this); _freeWorkers.push_front(worker); } } - ThreadPool::~ThreadPool(){ + ThreadPool::~ThreadPool() { join(); assert(_tasks.empty()); @@ -91,40 +92,42 @@ namespace mongo{ // O(n) but n should be small assert(_freeWorkers.size() == (unsigned)_nThreads); - while(!_freeWorkers.empty()){ + while(!_freeWorkers.empty()) { delete _freeWorkers.front(); _freeWorkers.pop_front(); } } - void ThreadPool::join(){ + void ThreadPool::join() { scoped_lock lock(_mutex); - while(_tasksRemaining){ + while(_tasksRemaining) { _condition.wait(lock.boost()); } } - void ThreadPool::schedule(Task task){ + void ThreadPool::schedule(Task task) { scoped_lock lock(_mutex); _tasksRemaining++; - if (!_freeWorkers.empty()){ + if (!_freeWorkers.empty()) { _freeWorkers.front()->set_task(task); _freeWorkers.pop_front(); - }else{ + } + else { _tasks.push_back(task); } } // should only be called by a worker from the worker thread - void ThreadPool::task_done(Worker* worker){ + void ThreadPool::task_done(Worker* worker) { scoped_lock lock(_mutex); - if (!_tasks.empty()){ + if (!_tasks.empty()) { worker->set_task(_tasks.front()); _tasks.pop_front(); - }else{ + } + else { _freeWorkers.push_front(worker); } diff --git a/util/concurrency/thread_pool.h b/util/concurrency/thread_pool.h index f0fe8f1..b348ed1 100644 --- a/util/concurrency/thread_pool.h +++ b/util/concurrency/thread_pool.h @@ -15,6 +15,8 @@ * limitations under the License. */ +#pragma once + #include <boost/function.hpp> #include <boost/bind.hpp> #undef assert @@ -22,59 +24,59 @@ namespace mongo { -namespace threadpool { - class Worker; - - typedef boost::function<void(void)> Task; //nullary function or functor - - // exported to the mongo namespace - class ThreadPool : boost::noncopyable{ - public: - explicit ThreadPool(int nThreads=8); - - // blocks until all tasks are complete (tasks_remaining() == 0) - // You should not call schedule while in the destructor - ~ThreadPool(); - - // blocks until all tasks are complete (tasks_remaining() == 0) - // does not prevent new tasks from being scheduled so could wait forever. - // Also, new tasks could be scheduled after this returns. - void join(); - - // task will be copied a few times so make sure it's relatively cheap - void schedule(Task task); - - // Helpers that wrap schedule and boost::bind. - // Functor and args will be copied a few times so make sure it's relatively cheap - template<typename F, typename A> - void schedule(F f, A a){ schedule(boost::bind(f,a)); } - template<typename F, typename A, typename B> - void schedule(F f, A a, B b){ schedule(boost::bind(f,a,b)); } - template<typename F, typename A, typename B, typename C> - void schedule(F f, A a, B b, C c){ schedule(boost::bind(f,a,b,c)); } - template<typename F, typename A, typename B, typename C, typename D> - void schedule(F f, A a, B b, C c, D d){ schedule(boost::bind(f,a,b,c,d)); } - template<typename F, typename A, typename B, typename C, typename D, typename E> - void schedule(F f, A a, B b, C c, D d, E e){ schedule(boost::bind(f,a,b,c,d,e)); } - - int tasks_remaining() { return _tasksRemaining; } - - private: - mongo::mutex _mutex; - boost::condition _condition; - - list<Worker*> _freeWorkers; //used as LIFO stack (always front) - list<Task> _tasks; //used as FIFO queue (push_back, pop_front) - int _tasksRemaining; // in queue + currently processing - int _nThreads; // only used for sanity checking. could be removed in the future. - - // should only be called by a worker from the worker's thread - void task_done(Worker* worker); - friend class Worker; - }; - -} //namespace threadpool - -using threadpool::ThreadPool; + namespace threadpool { + class Worker; + + typedef boost::function<void(void)> Task; //nullary function or functor + + // exported to the mongo namespace + class ThreadPool : boost::noncopyable { + public: + explicit ThreadPool(int nThreads=8); + + // blocks until all tasks are complete (tasks_remaining() == 0) + // You should not call schedule while in the destructor + ~ThreadPool(); + + // blocks until all tasks are complete (tasks_remaining() == 0) + // does not prevent new tasks from being scheduled so could wait forever. + // Also, new tasks could be scheduled after this returns. + void join(); + + // task will be copied a few times so make sure it's relatively cheap + void schedule(Task task); + + // Helpers that wrap schedule and boost::bind. + // Functor and args will be copied a few times so make sure it's relatively cheap + template<typename F, typename A> + void schedule(F f, A a) { schedule(boost::bind(f,a)); } + template<typename F, typename A, typename B> + void schedule(F f, A a, B b) { schedule(boost::bind(f,a,b)); } + template<typename F, typename A, typename B, typename C> + void schedule(F f, A a, B b, C c) { schedule(boost::bind(f,a,b,c)); } + template<typename F, typename A, typename B, typename C, typename D> + void schedule(F f, A a, B b, C c, D d) { schedule(boost::bind(f,a,b,c,d)); } + template<typename F, typename A, typename B, typename C, typename D, typename E> + void schedule(F f, A a, B b, C c, D d, E e) { schedule(boost::bind(f,a,b,c,d,e)); } + + int tasks_remaining() { return _tasksRemaining; } + + private: + mongo::mutex _mutex; + boost::condition _condition; + + list<Worker*> _freeWorkers; //used as LIFO stack (always front) + list<Task> _tasks; //used as FIFO queue (push_back, pop_front) + int _tasksRemaining; // in queue + currently processing + int _nThreads; // only used for sanity checking. could be removed in the future. + + // should only be called by a worker from the worker's thread + void task_done(Worker* worker); + friend class Worker; + }; + + } //namespace threadpool + + using threadpool::ThreadPool; } //namespace mongo diff --git a/util/concurrency/value.h b/util/concurrency/value.h index dabeb95..08d5306 100644 --- a/util/concurrency/value.h +++ b/util/concurrency/value.h @@ -20,11 +20,11 @@ #pragma once -namespace mongo { +namespace mongo { extern mutex _atomicMutex; - /** atomic wrapper for a value. enters a mutex on each access. must + /** atomic wrapper for a value. enters a mutex on each access. must be copyable. */ template<typename T> @@ -33,20 +33,22 @@ namespace mongo { public: Atomic<T>() { } - void operator=(const T& a) { + void operator=(const T& a) { scoped_lock lk(_atomicMutex); - val = a; } + val = a; + } - operator T() const { + operator T() const { scoped_lock lk(_atomicMutex); - return val; } - + return val; + } + /** example: Atomic<int> q; ... { Atomic<int>::tran t(q); - if( q.ref() > 0 ) + if( q.ref() > 0 ) q.ref()--; } */ @@ -58,11 +60,11 @@ namespace mongo { }; }; - /** this string COULD be mangled but with the double buffering, assuming writes - are infrequent, it's unlikely. thus, this is reasonable for lockless setting of + /** this string COULD be mangled but with the double buffering, assuming writes + are infrequent, it's unlikely. thus, this is reasonable for lockless setting of diagnostic strings, where their content isn't critical. */ - class DiagStr { + class DiagStr { char buf1[256]; char buf2[256]; char *p; diff --git a/util/concurrency/vars.cpp b/util/concurrency/vars.cpp index 8863a27..3d057a4 100644 --- a/util/concurrency/vars.cpp +++ b/util/concurrency/vars.cpp @@ -20,28 +20,28 @@ #include "value.h" #include "mutex.h" -namespace mongo { +namespace mongo { - mutex _atomicMutex("_atomicMutex"); + mongo::mutex _atomicMutex("_atomicMutex"); // intentional leak. otherwise destructor orders can be problematic at termination. MutexDebugger &mutexDebugger = *(new MutexDebugger()); - MutexDebugger::MutexDebugger() : - x( *(new boost::mutex()) ), magic(0x12345678) { - // optional way to debug lock order - /* - a = "a_lock"; - b = "b_lock"; - */ + MutexDebugger::MutexDebugger() : + x( *(new boost::mutex()) ), magic(0x12345678) { + // optional way to debug lock order + /* + a = "a_lock"; + b = "b_lock"; + */ } - void MutexDebugger::programEnding() { + void MutexDebugger::programEnding() { if( logLevel>=1 && followers.size() ) { std::cout << followers.size() << " mutexes in program" << endl; - for( map< mid, set<mid> >::iterator i = followers.begin(); i != followers.end(); i++ ) { + for( map< mid, set<mid> >::iterator i = followers.begin(); i != followers.end(); i++ ) { cout << i->first; - if( maxNest[i->first] > 1 ) + if( maxNest[i->first] > 1 ) cout << " maxNest:" << maxNest[i->first]; cout << '\n'; for( set<mid>::iterator j = i->second.begin(); j != i->second.end(); j++ ) |