diff options
author | Antonin Kral <a.kral@bobek.cz> | 2011-09-14 17:08:06 +0200 |
---|---|---|
committer | Antonin Kral <a.kral@bobek.cz> | 2011-09-14 17:08:06 +0200 |
commit | 5d342a758c6095b4d30aba0750b54f13b8916f51 (patch) | |
tree | 762e9aa84781f5e3b96db2c02d356c29cf0217c0 /util/concurrency | |
parent | cbe2d992e9cd1ea66af9fa91df006106775d3073 (diff) | |
download | mongodb-5d342a758c6095b4d30aba0750b54f13b8916f51.tar.gz |
Imported Upstream version 2.0.0
Diffstat (limited to 'util/concurrency')
-rw-r--r-- | util/concurrency/list.h | 28 | ||||
-rw-r--r-- | util/concurrency/mutex.h | 103 | ||||
-rw-r--r-- | util/concurrency/race.h | 49 | ||||
-rw-r--r-- | util/concurrency/rwlock.h | 250 | ||||
-rw-r--r--[-rwxr-xr-x] | util/concurrency/shared_mutex_win.hpp | 29 | ||||
-rw-r--r-- | util/concurrency/spin_lock.cpp | 45 | ||||
-rw-r--r-- | util/concurrency/spin_lock.h | 33 | ||||
-rw-r--r-- | util/concurrency/synchronization.cpp | 36 | ||||
-rw-r--r-- | util/concurrency/synchronization.h | 18 | ||||
-rw-r--r-- | util/concurrency/value.h | 51 | ||||
-rw-r--r-- | util/concurrency/vars.cpp | 4 |
11 files changed, 465 insertions, 181 deletions
diff --git a/util/concurrency/list.h b/util/concurrency/list.h index e5eaec6..01bae6f 100644 --- a/util/concurrency/list.h +++ b/util/concurrency/list.h @@ -42,38 +42,54 @@ namespace mongo { friend class List1; T *_next; public: + Base() : _next(0){} + ~Base() { wassert(false); } // we never want this to happen T* next() const { return _next; } }; - T* head() const { return _head; } + /** note this is safe: + + T* p = mylist.head(); + if( p ) + use(p); + + and this is not: + + if( mylist.head() ) + use( mylist.head() ); // could become 0 + */ + T* head() const { return (T*) _head; } void push(T* t) { + assert( t->_next == 0 ); scoped_lock lk(_m); - t->_next = _head; + t->_next = (T*) _head; _head = t; } - // intentionally leak. + // intentionally leaks. void orphanAll() { + scoped_lock lk(_m); _head = 0; } /* t is not deleted, but is removed from the list. (orphaned) */ void orphan(T* t) { scoped_lock lk(_m); - T *&prev = _head; + T *&prev = (T*&) _head; T *n = prev; while( n != t ) { + uassert( 14050 , "List1: item to orphan not in list", n ); prev = n->_next; n = prev; } prev = t->_next; if( ++_orphans > 500 ) - log() << "warning orphans=" << _orphans << '\n'; + log() << "warning List1 orphans=" << _orphans << '\n'; } private: - T *_head; + volatile T *_head; mongo::mutex _m; int _orphans; }; diff --git a/util/concurrency/mutex.h b/util/concurrency/mutex.h index c463498..f17c3f0 100644 --- a/util/concurrency/mutex.h +++ b/util/concurrency/mutex.h @@ -19,11 +19,12 @@ #include <map> #include <set> - #include "../heapcheck.h" namespace mongo { + void printStackTrace( ostream &o ); + class mutex; inline boost::xtime incxtimemillis( long long s ) { @@ -50,7 +51,6 @@ namespace mongo { map< mid, set<mid> > followers; boost::mutex &x; unsigned magic; - void aBreakPoint() { } // for debugging public: // set these to create an assert that @@ -147,20 +147,16 @@ namespace mongo { ~StaticObserver() { _destroyingStatics = true; } }; - /** On pthread systems, it is an error to destroy a mutex while held. Static global - * mutexes may be held upon shutdown in our implementation, and this way we avoid - * destroying them. - * NOT recursive. + /** On pthread systems, it is an error to destroy a mutex while held (boost mutex + * may use pthread). Static global mutexes may be held upon shutdown in our + * implementation, and this way we avoid destroying them. + * NOT recursive. */ class mutex : boost::noncopyable { public: #if defined(_DEBUG) const char * const _name; -#endif - -#if defined(_DEBUG) - mutex(const char *name) - : _name(name) + mutex(const char *name) : _name(name) #else mutex(const char *) #endif @@ -184,44 +180,47 @@ namespace mongo { #else ok( _l.locked() ) #endif - { - } - - ~try_lock() { - } - + { } private: boost::timed_mutex::scoped_timed_lock _l; - public: const bool ok; }; - class scoped_lock : boost::noncopyable { + public: #if defined(_DEBUG) - mongo::mutex *mut; + struct PostStaticCheck { + PostStaticCheck() { + if ( StaticObserver::_destroyingStatics ) { + cout << "trying to lock a mongo::mutex during static shutdown" << endl; + printStackTrace( cout ); + } + } + }; + + PostStaticCheck _check; + mongo::mutex * const _mut; #endif - public: - scoped_lock( mongo::mutex &m ) : _l( m.boost() ) { + scoped_lock( mongo::mutex &m ) : +#if defined(_DEBUG) + _mut(&m), +#endif + _l( m.boost() ) { #if defined(_DEBUG) - mut = &m; - mutexDebugger.entering(mut->_name); + mutexDebugger.entering(_mut->_name); #endif } ~scoped_lock() { #if defined(_DEBUG) - mutexDebugger.leaving(mut->_name); + mutexDebugger.leaving(_mut->_name); #endif } boost::timed_mutex::scoped_lock &boost() { return _l; } private: boost::timed_mutex::scoped_lock _l; }; - - private: - boost::timed_mutex &boost() { return *_m; } boost::timed_mutex *_m; }; @@ -229,4 +228,52 @@ namespace mongo { typedef mutex::scoped_lock scoped_lock; typedef boost::recursive_mutex::scoped_lock recursive_scoped_lock; + /** The concept with SimpleMutex is that it is a basic lock/unlock with no + special functionality (such as try and try timeout). Thus it can be + implemented using OS-specific facilities in all environments (if desired). + On Windows, the implementation below is faster than boost mutex. + */ +#if defined(_WIN32) + class SimpleMutex : boost::noncopyable { + CRITICAL_SECTION _cs; + public: + SimpleMutex(const char *name) { InitializeCriticalSection(&_cs); } + ~SimpleMutex() { DeleteCriticalSection(&_cs); } + + void lock() { EnterCriticalSection(&_cs); } + void unlock() { LeaveCriticalSection(&_cs); } + + class scoped_lock : boost::noncopyable { + SimpleMutex& _m; + public: + scoped_lock( SimpleMutex &m ) : _m(m) { _m.lock(); } + ~scoped_lock() { _m.unlock(); } + }; + }; +#else + class SimpleMutex : boost::noncopyable { + public: + SimpleMutex(const char* name) { assert( pthread_mutex_init(&_lock,0) == 0 ); } + ~SimpleMutex(){ + if ( ! StaticObserver::_destroyingStatics ) { + assert( pthread_mutex_destroy(&_lock) == 0 ); + } + } + + void lock() { assert( pthread_mutex_lock(&_lock) == 0 ); } + void unlock() { assert( pthread_mutex_unlock(&_lock) == 0 ); } + + class scoped_lock : boost::noncopyable { + SimpleMutex& _m; + public: + scoped_lock( SimpleMutex &m ) : _m(m) { _m.lock(); } + ~scoped_lock() { _m.unlock(); } + }; + + private: + pthread_mutex_t _lock; + }; + +#endif + } diff --git a/util/concurrency/race.h b/util/concurrency/race.h index 0b8338c..4644e37 100644 --- a/util/concurrency/race.h +++ b/util/concurrency/race.h @@ -19,15 +19,56 @@ namespace mongo { the same time. Also detects and disallows recursion. */ +#ifdef _WIN32 + typedef unsigned threadId_t; +#else + typedef pthread_t threadId_t; +#endif + + #if defined(_DEBUG) + namespace race { + + class CodePoint { + public: + string lastName; + threadId_t lastTid; + string file; + CodePoint(string f) : lastTid(0), file(f) { } + }; + class Check { + public: + Check(CodePoint& p) { + threadId_t t = GetCurrentThreadId(); + if( p.lastTid == 0 ) { + p.lastTid = t; + p.lastName = getThreadName(); + } + else if( t != p.lastTid ) { + log() << "\n\n\n\n\nRACE? error assert\n " << p.file << '\n' + << " " << p.lastName + << " " << getThreadName() << "\n\n" << endl; + mongoAbort("racecheck"); + } + }; + }; + + } + +#define RACECHECK + // dm TODO - the right code for this file is in a different branch at the moment (merge) + //#define RACECHECK + //static race::CodePoint __cp(__FILE__); + //race::Check __ck(__cp); + class CodeBlock { volatile int n; - unsigned tid; + threadId_t tid; void fail() { log() << "synchronization (race condition) failure" << endl; printStackTrace(); - abort(); + ::abort(); } void enter() { if( ++n != 1 ) fail(); @@ -58,6 +99,8 @@ namespace mongo { #else +#define RACECHECK + class CodeBlock{ public: class Within { @@ -69,4 +112,4 @@ namespace mongo { #endif -} +} // namespace diff --git a/util/concurrency/rwlock.h b/util/concurrency/rwlock.h index ca81a9f..d8a11ea 100644 --- a/util/concurrency/rwlock.h +++ b/util/concurrency/rwlock.h @@ -21,9 +21,11 @@ #include "mutex.h" #include "../time_support.h" -// this requires Vista+ to work +// this requires newer windows versions // it works better than sharable_mutex under high contention +#if defined(_WIN64) //#define MONGO_USE_SRW_ON_WINDOWS 1 +#endif #if !defined(MONGO_USE_SRW_ON_WINDOWS) @@ -55,131 +57,153 @@ namespace mongo { #if defined(MONGO_USE_SRW_ON_WINDOWS) && defined(_WIN32) - class RWLock { + // Windows RWLock implementation (requires newer versions of windows thus the above macro) + class RWLock : boost::noncopyable { public: - RWLock(const char *) { InitializeSRWLock(&_lock); } + RWLock(const char *, int lowPriorityWaitMS=0 ) : _lowPriorityWaitMS(lowPriorityWaitMS) + { InitializeSRWLock(&_lock); } ~RWLock() { } + const char * implType() const { return "WINSRW"; } + int lowPriorityWaitMS() const { return _lowPriorityWaitMS; } void lock() { AcquireSRWLockExclusive(&_lock); } void unlock() { ReleaseSRWLockExclusive(&_lock); } void lock_shared() { AcquireSRWLockShared(&_lock); } void unlock_shared() { ReleaseSRWLockShared(&_lock); } bool lock_shared_try( int millis ) { + if( TryAcquireSRWLockShared(&_lock) ) + return true; + if( millis == 0 ) + return false; unsigned long long end = curTimeMicros64() + millis*1000; while( 1 ) { + Sleep(1); if( TryAcquireSRWLockShared(&_lock) ) return true; if( curTimeMicros64() >= end ) break; - Sleep(1); } return false; } bool lock_try( int millis = 0 ) { + if( TryAcquireSRWLockExclusive(&_lock) ) // quick check to optimistically avoid calling curTimeMicros64 + return true; + if( millis == 0 ) + return false; unsigned long long end = curTimeMicros64() + millis*1000; - while( 1 ) { + do { + Sleep(1); if( TryAcquireSRWLockExclusive(&_lock) ) return true; - if( curTimeMicros64() >= end ) - break; - Sleep(1); - } + } while( curTimeMicros64() < end ); return false; } private: SRWLOCK _lock; + const int _lowPriorityWaitMS; }; #elif defined(BOOST_RWLOCK) - class RWLock { + + // Boost based RWLock implementation + class RWLock : boost::noncopyable { shared_mutex _m; + const int _lowPriorityWaitMS; public: -#if defined(_DEBUG) - const char *_name; - RWLock(const char *name) : _name(name) { } -#else - RWLock(const char *) { } -#endif + const char * const _name; + + RWLock(const char *name, int lowPriorityWait=0) : _lowPriorityWaitMS(lowPriorityWait) , _name(name) { } + + const char * implType() const { return "boost"; } + + int lowPriorityWaitMS() const { return _lowPriorityWaitMS; } + void lock() { - _m.lock(); -#if defined(_DEBUG) - mutexDebugger.entering(_name); -#endif + _m.lock(); + DEV mutexDebugger.entering(_name); } + + /*void lock() { + // This sequence gives us the lock semantics we want: specifically that write lock acquisition is + // greedy EXCEPT when someone already is in upgradable state. + lockAsUpgradable(); + upgrade(); + DEV mutexDebugger.entering(_name); + }*/ + void unlock() { -#if defined(_DEBUG) - mutexDebugger.leaving(_name); -#endif + DEV mutexDebugger.leaving(_name); _m.unlock(); } + void lockAsUpgradable() { + _m.lock_upgrade(); + } + void unlockFromUpgradable() { // upgradable -> unlocked + _m.unlock_upgrade(); + } + void upgrade() { // upgradable -> exclusive lock + _m.unlock_upgrade_and_lock(); + } + void lock_shared() { _m.lock_shared(); } - void unlock_shared() { _m.unlock_shared(); } bool lock_shared_try( int millis ) { - boost::system_time until = get_system_time(); - until += boost::posix_time::milliseconds(millis); - if( _m.timed_lock_shared( until ) ) { + if( _m.timed_lock_shared( boost::posix_time::milliseconds(millis) ) ) { return true; } return false; } bool lock_try( int millis = 0 ) { - boost::system_time until = get_system_time(); - until += boost::posix_time::milliseconds(millis); - if( _m.timed_lock( until ) ) { -#if defined(_DEBUG) - mutexDebugger.entering(_name); -#endif + if( _m.timed_lock( boost::posix_time::milliseconds(millis) ) ) { + DEV mutexDebugger.entering(_name); return true; } return false; } - - }; + #else - class RWLock { - pthread_rwlock_t _lock; - inline void check( int x ) { - if( x == 0 ) + // Posix RWLock implementation + class RWLock : boost::noncopyable { + pthread_rwlock_t _lock; + const int _lowPriorityWaitMS; + static void check( int x ) { + if( MONGO_likely(x == 0) ) return; log() << "pthread rwlock failed: " << x << endl; assert( x == 0 ); } - + public: -#if defined(_DEBUG) const char *_name; - RWLock(const char *name) : _name(name) { -#else - RWLock(const char *) { -#endif + RWLock(const char *name, int lowPriorityWaitMS=0) : _lowPriorityWaitMS(lowPriorityWaitMS), _name(name) + { check( pthread_rwlock_init( &_lock , 0 ) ); } - + ~RWLock() { if ( ! StaticObserver::_destroyingStatics ) { - check( pthread_rwlock_destroy( &_lock ) ); + wassert( pthread_rwlock_destroy( &_lock ) == 0 ); // wassert as don't want to throw from a destructor } } + const char * implType() const { return "posix"; } + + int lowPriorityWaitMS() const { return _lowPriorityWaitMS; } + void lock() { check( pthread_rwlock_wrlock( &_lock ) ); -#if defined(_DEBUG) - mutexDebugger.entering(_name); -#endif + DEV mutexDebugger.entering(_name); } void unlock() { -#if defined(_DEBUG) mutexDebugger.leaving(_name); -#endif check( pthread_rwlock_unlock( &_lock ) ); } @@ -197,9 +221,7 @@ namespace mongo { bool lock_try( int millis = 0 ) { if( _try( millis , true ) ) { -#if defined(_DEBUG) - mutexDebugger.entering(_name); -#endif + DEV mutexDebugger.entering(_name); return true; } return false; @@ -233,7 +255,7 @@ namespace mongo { #endif /** throws on failure to acquire in the specified time period. */ - class rwlock_try_write { + class rwlock_try_write : boost::noncopyable { public: struct exception { }; rwlock_try_write(RWLock& l, int millis = 0) : _l(l) { @@ -245,16 +267,57 @@ namespace mongo { RWLock& _l; }; + class rwlock_shared : boost::noncopyable { + public: + rwlock_shared(RWLock& rwlock) : _r(rwlock) {_r.lock_shared(); } + ~rwlock_shared() { _r.unlock_shared(); } + private: + RWLock& _r; + }; + /* scoped lock for RWLock */ - class rwlock { + class rwlock : boost::noncopyable { public: - rwlock( const RWLock& lock , bool write , bool alreadyHaveLock = false ) + /** + * @param write acquire write lock if true sharable if false + * @param lowPriority if > 0, will try to get the lock non-greedily for that many ms + */ + rwlock( const RWLock& lock , bool write, /* bool alreadyHaveLock = false , */int lowPriorityWaitMS = 0 ) : _lock( (RWLock&)lock ) , _write( write ) { - if ( ! alreadyHaveLock ) { - if ( _write ) - _lock.lock(); - else + + { + if ( _write ) { + + if ( ! lowPriorityWaitMS && lock.lowPriorityWaitMS() ) + lowPriorityWaitMS = lock.lowPriorityWaitMS(); + + if ( lowPriorityWaitMS ) { + bool got = false; + for ( int i=0; i<lowPriorityWaitMS; i++ ) { + if ( _lock.lock_try(0) ) { + got = true; + break; + } + + int sleep = 1; + if ( i > ( lowPriorityWaitMS / 20 ) ) + sleep = 10; + sleepmillis(sleep); + i += ( sleep - 1 ); + } + if ( ! got ) { + log() << "couldn't get lazy rwlock" << endl; + _lock.lock(); + } + } + else { + _lock.lock(); + } + + } + else { _lock.lock_shared(); + } } } ~rwlock() { @@ -267,4 +330,67 @@ namespace mongo { RWLock& _lock; const bool _write; }; + + /** recursive on shared locks is ok for this implementation */ + class RWLockRecursive : boost::noncopyable { + ThreadLocalValue<int> _state; + RWLock _lk; + friend class Exclusive; + public: + /** @param lpwaitms lazy wait */ + RWLockRecursive(const char *name, int lpwaitms) : _lk(name, lpwaitms) { } + + void assertExclusivelyLocked() { + dassert( _state.get() < 0 ); + } + + // RWLockRecursive::Exclusive scoped lock + class Exclusive : boost::noncopyable { + RWLockRecursive& _r; + rwlock *_scopedLock; + public: + Exclusive(RWLockRecursive& r) : _r(r), _scopedLock(0) { + int s = _r._state.get(); + dassert( s <= 0 ); + if( s == 0 ) + _scopedLock = new rwlock(_r._lk, true); + _r._state.set(s-1); + } + ~Exclusive() { + int s = _r._state.get(); + DEV wassert( s < 0 ); // wassert: don't throw from destructors + _r._state.set(s+1); + delete _scopedLock; + } + }; + + // RWLockRecursive::Shared scoped lock + class Shared : boost::noncopyable { + RWLockRecursive& _r; + bool _alreadyExclusive; + public: + Shared(RWLockRecursive& r) : _r(r) { + int s = _r._state.get(); + _alreadyExclusive = s < 0; + if( !_alreadyExclusive ) { + dassert( s >= 0 ); // -1 would mean exclusive + if( s == 0 ) + _r._lk.lock_shared(); + _r._state.set(s+1); + } + } + ~Shared() { + if( _alreadyExclusive ) { + DEV wassert( _r._state.get() < 0 ); + } + else { + int s = _r._state.get() - 1; + if( s == 0 ) + _r._lk.unlock_shared(); + _r._state.set(s); + DEV wassert( s >= 0 ); + } + } + }; + }; } diff --git a/util/concurrency/shared_mutex_win.hpp b/util/concurrency/shared_mutex_win.hpp index 5356cf2..e850fc6 100755..100644 --- a/util/concurrency/shared_mutex_win.hpp +++ b/util/concurrency/shared_mutex_win.hpp @@ -7,10 +7,31 @@ // accompanying file LICENSE_1_0.txt or copy at
// http://www.boost.org/LICENSE_1_0.txt)
-// MongoDB :
-//
-// Slightly modified boost file to not die above 127 pending writes
-//
+/* MongoDB :
+ Slightly modified boost file to not die above 127 pending writes
+ Here is what changed (from boost 1.42.0 shared_mutex.hpp):
+ 1,2c1,2
+ < #ifndef BOOST_THREAD_WIN32_SHARED_MUTEX_HPP
+ < #define BOOST_THREAD_WIN32_SHARED_MUTEX_HPP
+ ---
+ > #ifndef BOOST_THREAD_WIN32_SHARED_MUTEX_HPP_MODIFIED
+ > #define BOOST_THREAD_WIN32_SHARED_MUTEX_HPP_MODIFIED
+ 22c27
+ < class shared_mutex:
+ ---
+ > class modified_shared_mutex:
+ 73c78
+ < shared_mutex():
+ ---
+ > modified_shared_mutex():
+ 84c89
+ < ~shared_mutex()
+ ---
+ > ~modified_shared_mutex()
+ 283a289,290
+ > if( new_state.exclusive_waiting == 127 ) // the maximum already!
+ > break;
+*/
#include <boost/assert.hpp>
#include <boost/detail/interlocked.hpp>
diff --git a/util/concurrency/spin_lock.cpp b/util/concurrency/spin_lock.cpp index 0f33609..1811f15 100644 --- a/util/concurrency/spin_lock.cpp +++ b/util/concurrency/spin_lock.cpp @@ -25,20 +25,28 @@ namespace mongo { SpinLock::~SpinLock() { #if defined(_WIN32) DeleteCriticalSection(&_cs); +#elif defined(__USE_XOPEN2K) + pthread_spin_destroy(&_lock); #endif } SpinLock::SpinLock() -#if defined(__GCC_HAVE_SYNC_COMPARE_AND_SWAP_4) - : _locked( false ) { } -#elif defined(_WIN32) +#if defined(_WIN32) { InitializeCriticalSectionAndSpinCount(&_cs, 4000); } +#elif defined(__USE_XOPEN2K) + { pthread_spin_init( &_lock , 0 ); } +#elif defined(__GCC_HAVE_SYNC_COMPARE_AND_SWAP_4) + : _locked( false ) { } #else - : _mutex( "SpinLock" ) { } + : _mutex( "SpinLock" ) { } #endif void SpinLock::lock() { -#if defined(__GCC_HAVE_SYNC_COMPARE_AND_SWAP_4) +#if defined(_WIN32) + EnterCriticalSection(&_cs); +#elif defined(__USE_XOPEN2K) + pthread_spin_lock( &_lock ); +#elif defined(__GCC_HAVE_SYNC_COMPARE_AND_SWAP_4) // fast path if (!_locked && !__sync_lock_test_and_set(&_locked, true)) { return; @@ -55,8 +63,6 @@ namespace mongo { while (__sync_lock_test_and_set(&_locked, true)) { nanosleep(&t, NULL); } -#elif defined(_WIN32) - EnterCriticalSection(&_cs); #else // WARNING Missing spin lock in this platform. This can potentially // be slow. @@ -66,19 +72,28 @@ namespace mongo { } void SpinLock::unlock() { -#if defined(__GCC_HAVE_SYNC_COMPARE_AND_SWAP_4) - - __sync_lock_release(&_locked); - -#elif defined(WIN32) - +#if defined(_WIN32) LeaveCriticalSection(&_cs); - +#elif defined(__USE_XOPEN2K) + pthread_spin_unlock(&_lock); +#elif defined(__GCC_HAVE_SYNC_COMPARE_AND_SWAP_4) + __sync_lock_release(&_locked); #else - _mutex.unlock(); +#endif + } + bool SpinLock::isfast() { +#if defined(_WIN32) + return true; +#elif defined(__USE_XOPEN2K) + return true; +#elif defined(__GCC_HAVE_SYNC_COMPARE_AND_SWAP_4) + return true; +#else + return false; #endif } + } // namespace mongo diff --git a/util/concurrency/spin_lock.h b/util/concurrency/spin_lock.h index 02a8797..65ecb15 100644 --- a/util/concurrency/spin_lock.h +++ b/util/concurrency/spin_lock.h @@ -18,8 +18,7 @@ #pragma once -#include "pch.h" -#include "rwlock.h" +#include "mutex.h" namespace mongo { @@ -27,7 +26,7 @@ namespace mongo { * The spinlock currently requires late GCC support routines to be efficient. * Other platforms default to a mutex implemenation. */ - class SpinLock { + class SpinLock : boost::noncopyable { public: SpinLock(); ~SpinLock(); @@ -35,30 +34,30 @@ namespace mongo { void lock(); void unlock(); + static bool isfast(); // true if a real spinlock on this platform + private: -#if defined(__GCC_HAVE_SYNC_COMPARE_AND_SWAP_4) - volatile bool _locked; -#elif defined(_WIN32) +#if defined(_WIN32) CRITICAL_SECTION _cs; +#elif defined(__USE_XOPEN2K) + pthread_spinlock_t _lock; +#elif defined(__GCC_HAVE_SYNC_COMPARE_AND_SWAP_4) + volatile bool _locked; #else - // default to a scoped mutex if not implemented - RWLock _mutex; + // default to a mutex if not implemented + SimpleMutex _mutex; #endif - - // Non-copyable, non-assignable - SpinLock(SpinLock&); - SpinLock& operator=(SpinLock&); }; - struct scoped_spinlock { - scoped_spinlock( SpinLock& l ) : _l(l){ + class scoped_spinlock : boost::noncopyable { + public: + scoped_spinlock( SpinLock& l ) : _l(l) { _l.lock(); } ~scoped_spinlock() { - _l.unlock(); - } + _l.unlock();} + private: SpinLock& _l; }; } // namespace mongo - diff --git a/util/concurrency/synchronization.cpp b/util/concurrency/synchronization.cpp index 12e2894..ce2547c 100644 --- a/util/concurrency/synchronization.cpp +++ b/util/concurrency/synchronization.cpp @@ -20,7 +20,8 @@ namespace mongo { - Notification::Notification() : _mutex ( "Notification" ) , _notified( false ) { } + Notification::Notification() : _mutex ( "Notification" ) , _notified( false ) { + } Notification::~Notification() { } @@ -37,19 +38,40 @@ namespace mongo { _condition.notify_one(); } - NotifyAll::NotifyAll() : _mutex("NotifyAll"), _counter(0) { } + /* --- NotifyAll --- */ + + NotifyAll::NotifyAll() : _mutex("NotifyAll") { + _lastDone = 0; + _lastReturned = 0; + _nWaiting = 0; + } + + NotifyAll::When NotifyAll::now() { + scoped_lock lock( _mutex ); + return ++_lastReturned; + } + + void NotifyAll::waitFor(When e) { + scoped_lock lock( _mutex ); + ++_nWaiting; + while( _lastDone < e ) { + _condition.wait( lock.boost() ); + } + } - void NotifyAll::wait() { + void NotifyAll::awaitBeyondNow() { scoped_lock lock( _mutex ); - unsigned long long old = _counter; - while( old == _counter ) { + ++_nWaiting; + When e = ++_lastReturned; + while( _lastDone <= e ) { _condition.wait( lock.boost() ); } } - void NotifyAll::notifyAll() { + void NotifyAll::notifyAll(When e) { scoped_lock lock( _mutex ); - ++_counter; + _lastDone = e; + _nWaiting = 0; _condition.notify_all(); } diff --git a/util/concurrency/synchronization.h b/util/concurrency/synchronization.h index ac2fcab..a0e89f7 100644 --- a/util/concurrency/synchronization.h +++ b/util/concurrency/synchronization.h @@ -56,18 +56,30 @@ namespace mongo { public: NotifyAll(); + typedef unsigned long long When; + + When now(); + /** awaits the next notifyAll() call by another thread. notifications that precede this call are ignored -- we are looking for a fresh event. */ - void wait(); + void waitFor(When); + + /** a bit faster than waitFor( now() ) */ + void awaitBeyondNow(); /** may be called multiple times. notifies all waiters */ - void notifyAll(); + void notifyAll(When); + + /** indicates how many threads are waiting for a notify. */ + unsigned nWaiting() const { return _nWaiting; } private: mongo::mutex _mutex; - unsigned long long _counter; boost::condition _condition; + When _lastDone; + When _lastReturned; + unsigned _nWaiting; }; } // namespace mongo diff --git a/util/concurrency/value.h b/util/concurrency/value.h index 0a0ef85..c66977b 100644 --- a/util/concurrency/value.h +++ b/util/concurrency/value.h @@ -1,5 +1,5 @@ /* @file value.h - concurrency helpers Atomic<T> and DiagStr + concurrency helpers DiagStr, Guarded */ /** @@ -20,44 +20,29 @@ #pragma once +#include "mutex.h" + namespace mongo { - extern mutex _atomicMutex; + /** declare that a variable that is "guarded" by a mutex. - /** atomic wrapper for a value. enters a mutex on each access. must - be copyable. - */ - template<typename T> - class Atomic : boost::noncopyable { - T val; - public: - Atomic<T>() { } + The decl documents the rule. For example "counta and countb are guarded by xyzMutex": - void operator=(const T& a) { - scoped_lock lk(_atomicMutex); - val = a; - } + Guarded<int, xyzMutex> counta; + Guarded<int, xyzMutex> countb; - operator T() const { - scoped_lock lk(_atomicMutex); - return val; + Upon use, specify the scoped_lock object. This makes it hard for someone + later to forget to be in the lock. Check is made that it is the right lock in _DEBUG + builds at runtime. + */ + template <typename T, mutex& BY> + class Guarded { + T _val; + public: + T& ref(const scoped_lock& lk) { + dassert( lk._mut == &BY ); + return _val; } - - /** example: - Atomic<int> q; - ... - { - Atomic<int>::tran t(q); - if( q.ref() > 0 ) - q.ref()--; - } - */ - class tran : private scoped_lock { - Atomic<T>& _a; - public: - tran(Atomic<T>& a) : scoped_lock(_atomicMutex), _a(a) { } - T& ref() { return _a.val; } - }; }; class DiagStr { diff --git a/util/concurrency/vars.cpp b/util/concurrency/vars.cpp index 19b58eb..213e576 100644 --- a/util/concurrency/vars.cpp +++ b/util/concurrency/vars.cpp @@ -17,15 +17,13 @@ */ #include "pch.h" -#include "value.h" #include "mutex.h" +#include "value.h" namespace mongo { mutex DiagStr::m("diags"); - mongo::mutex _atomicMutex("_atomicMutex"); - // intentional leak. otherwise destructor orders can be problematic at termination. MutexDebugger &mutexDebugger = *(new MutexDebugger()); |