summaryrefslogtreecommitdiff
path: root/util/concurrency
diff options
context:
space:
mode:
authorAntonin Kral <a.kral@bobek.cz>2011-09-14 17:08:06 +0200
committerAntonin Kral <a.kral@bobek.cz>2011-09-14 17:08:06 +0200
commit5d342a758c6095b4d30aba0750b54f13b8916f51 (patch)
tree762e9aa84781f5e3b96db2c02d356c29cf0217c0 /util/concurrency
parentcbe2d992e9cd1ea66af9fa91df006106775d3073 (diff)
downloadmongodb-5d342a758c6095b4d30aba0750b54f13b8916f51.tar.gz
Imported Upstream version 2.0.0
Diffstat (limited to 'util/concurrency')
-rw-r--r--util/concurrency/list.h28
-rw-r--r--util/concurrency/mutex.h103
-rw-r--r--util/concurrency/race.h49
-rw-r--r--util/concurrency/rwlock.h250
-rw-r--r--[-rwxr-xr-x]util/concurrency/shared_mutex_win.hpp29
-rw-r--r--util/concurrency/spin_lock.cpp45
-rw-r--r--util/concurrency/spin_lock.h33
-rw-r--r--util/concurrency/synchronization.cpp36
-rw-r--r--util/concurrency/synchronization.h18
-rw-r--r--util/concurrency/value.h51
-rw-r--r--util/concurrency/vars.cpp4
11 files changed, 465 insertions, 181 deletions
diff --git a/util/concurrency/list.h b/util/concurrency/list.h
index e5eaec6..01bae6f 100644
--- a/util/concurrency/list.h
+++ b/util/concurrency/list.h
@@ -42,38 +42,54 @@ namespace mongo {
friend class List1;
T *_next;
public:
+ Base() : _next(0){}
+ ~Base() { wassert(false); } // we never want this to happen
T* next() const { return _next; }
};
- T* head() const { return _head; }
+ /** note this is safe:
+
+ T* p = mylist.head();
+ if( p )
+ use(p);
+
+ and this is not:
+
+ if( mylist.head() )
+ use( mylist.head() ); // could become 0
+ */
+ T* head() const { return (T*) _head; }
void push(T* t) {
+ assert( t->_next == 0 );
scoped_lock lk(_m);
- t->_next = _head;
+ t->_next = (T*) _head;
_head = t;
}
- // intentionally leak.
+ // intentionally leaks.
void orphanAll() {
+ scoped_lock lk(_m);
_head = 0;
}
/* t is not deleted, but is removed from the list. (orphaned) */
void orphan(T* t) {
scoped_lock lk(_m);
- T *&prev = _head;
+ T *&prev = (T*&) _head;
T *n = prev;
while( n != t ) {
+ uassert( 14050 , "List1: item to orphan not in list", n );
prev = n->_next;
n = prev;
}
prev = t->_next;
if( ++_orphans > 500 )
- log() << "warning orphans=" << _orphans << '\n';
+ log() << "warning List1 orphans=" << _orphans << '\n';
}
private:
- T *_head;
+ volatile T *_head;
mongo::mutex _m;
int _orphans;
};
diff --git a/util/concurrency/mutex.h b/util/concurrency/mutex.h
index c463498..f17c3f0 100644
--- a/util/concurrency/mutex.h
+++ b/util/concurrency/mutex.h
@@ -19,11 +19,12 @@
#include <map>
#include <set>
-
#include "../heapcheck.h"
namespace mongo {
+ void printStackTrace( ostream &o );
+
class mutex;
inline boost::xtime incxtimemillis( long long s ) {
@@ -50,7 +51,6 @@ namespace mongo {
map< mid, set<mid> > followers;
boost::mutex &x;
unsigned magic;
-
void aBreakPoint() { } // for debugging
public:
// set these to create an assert that
@@ -147,20 +147,16 @@ namespace mongo {
~StaticObserver() { _destroyingStatics = true; }
};
- /** On pthread systems, it is an error to destroy a mutex while held. Static global
- * mutexes may be held upon shutdown in our implementation, and this way we avoid
- * destroying them.
- * NOT recursive.
+ /** On pthread systems, it is an error to destroy a mutex while held (boost mutex
+ * may use pthread). Static global mutexes may be held upon shutdown in our
+ * implementation, and this way we avoid destroying them.
+ * NOT recursive.
*/
class mutex : boost::noncopyable {
public:
#if defined(_DEBUG)
const char * const _name;
-#endif
-
-#if defined(_DEBUG)
- mutex(const char *name)
- : _name(name)
+ mutex(const char *name) : _name(name)
#else
mutex(const char *)
#endif
@@ -184,44 +180,47 @@ namespace mongo {
#else
ok( _l.locked() )
#endif
- {
- }
-
- ~try_lock() {
- }
-
+ { }
private:
boost::timed_mutex::scoped_timed_lock _l;
-
public:
const bool ok;
};
-
class scoped_lock : boost::noncopyable {
+ public:
#if defined(_DEBUG)
- mongo::mutex *mut;
+ struct PostStaticCheck {
+ PostStaticCheck() {
+ if ( StaticObserver::_destroyingStatics ) {
+ cout << "trying to lock a mongo::mutex during static shutdown" << endl;
+ printStackTrace( cout );
+ }
+ }
+ };
+
+ PostStaticCheck _check;
+ mongo::mutex * const _mut;
#endif
- public:
- scoped_lock( mongo::mutex &m ) : _l( m.boost() ) {
+ scoped_lock( mongo::mutex &m ) :
+#if defined(_DEBUG)
+ _mut(&m),
+#endif
+ _l( m.boost() ) {
#if defined(_DEBUG)
- mut = &m;
- mutexDebugger.entering(mut->_name);
+ mutexDebugger.entering(_mut->_name);
#endif
}
~scoped_lock() {
#if defined(_DEBUG)
- mutexDebugger.leaving(mut->_name);
+ mutexDebugger.leaving(_mut->_name);
#endif
}
boost::timed_mutex::scoped_lock &boost() { return _l; }
private:
boost::timed_mutex::scoped_lock _l;
};
-
-
private:
-
boost::timed_mutex &boost() { return *_m; }
boost::timed_mutex *_m;
};
@@ -229,4 +228,52 @@ namespace mongo {
typedef mutex::scoped_lock scoped_lock;
typedef boost::recursive_mutex::scoped_lock recursive_scoped_lock;
+ /** The concept with SimpleMutex is that it is a basic lock/unlock with no
+ special functionality (such as try and try timeout). Thus it can be
+ implemented using OS-specific facilities in all environments (if desired).
+ On Windows, the implementation below is faster than boost mutex.
+ */
+#if defined(_WIN32)
+ class SimpleMutex : boost::noncopyable {
+ CRITICAL_SECTION _cs;
+ public:
+ SimpleMutex(const char *name) { InitializeCriticalSection(&_cs); }
+ ~SimpleMutex() { DeleteCriticalSection(&_cs); }
+
+ void lock() { EnterCriticalSection(&_cs); }
+ void unlock() { LeaveCriticalSection(&_cs); }
+
+ class scoped_lock : boost::noncopyable {
+ SimpleMutex& _m;
+ public:
+ scoped_lock( SimpleMutex &m ) : _m(m) { _m.lock(); }
+ ~scoped_lock() { _m.unlock(); }
+ };
+ };
+#else
+ class SimpleMutex : boost::noncopyable {
+ public:
+ SimpleMutex(const char* name) { assert( pthread_mutex_init(&_lock,0) == 0 ); }
+ ~SimpleMutex(){
+ if ( ! StaticObserver::_destroyingStatics ) {
+ assert( pthread_mutex_destroy(&_lock) == 0 );
+ }
+ }
+
+ void lock() { assert( pthread_mutex_lock(&_lock) == 0 ); }
+ void unlock() { assert( pthread_mutex_unlock(&_lock) == 0 ); }
+
+ class scoped_lock : boost::noncopyable {
+ SimpleMutex& _m;
+ public:
+ scoped_lock( SimpleMutex &m ) : _m(m) { _m.lock(); }
+ ~scoped_lock() { _m.unlock(); }
+ };
+
+ private:
+ pthread_mutex_t _lock;
+ };
+
+#endif
+
}
diff --git a/util/concurrency/race.h b/util/concurrency/race.h
index 0b8338c..4644e37 100644
--- a/util/concurrency/race.h
+++ b/util/concurrency/race.h
@@ -19,15 +19,56 @@ namespace mongo {
the same time. Also detects and disallows recursion.
*/
+#ifdef _WIN32
+ typedef unsigned threadId_t;
+#else
+ typedef pthread_t threadId_t;
+#endif
+
+
#if defined(_DEBUG)
+ namespace race {
+
+ class CodePoint {
+ public:
+ string lastName;
+ threadId_t lastTid;
+ string file;
+ CodePoint(string f) : lastTid(0), file(f) { }
+ };
+ class Check {
+ public:
+ Check(CodePoint& p) {
+ threadId_t t = GetCurrentThreadId();
+ if( p.lastTid == 0 ) {
+ p.lastTid = t;
+ p.lastName = getThreadName();
+ }
+ else if( t != p.lastTid ) {
+ log() << "\n\n\n\n\nRACE? error assert\n " << p.file << '\n'
+ << " " << p.lastName
+ << " " << getThreadName() << "\n\n" << endl;
+ mongoAbort("racecheck");
+ }
+ };
+ };
+
+ }
+
+#define RACECHECK
+ // dm TODO - the right code for this file is in a different branch at the moment (merge)
+ //#define RACECHECK
+ //static race::CodePoint __cp(__FILE__);
+ //race::Check __ck(__cp);
+
class CodeBlock {
volatile int n;
- unsigned tid;
+ threadId_t tid;
void fail() {
log() << "synchronization (race condition) failure" << endl;
printStackTrace();
- abort();
+ ::abort();
}
void enter() {
if( ++n != 1 ) fail();
@@ -58,6 +99,8 @@ namespace mongo {
#else
+#define RACECHECK
+
class CodeBlock{
public:
class Within {
@@ -69,4 +112,4 @@ namespace mongo {
#endif
-}
+} // namespace
diff --git a/util/concurrency/rwlock.h b/util/concurrency/rwlock.h
index ca81a9f..d8a11ea 100644
--- a/util/concurrency/rwlock.h
+++ b/util/concurrency/rwlock.h
@@ -21,9 +21,11 @@
#include "mutex.h"
#include "../time_support.h"
-// this requires Vista+ to work
+// this requires newer windows versions
// it works better than sharable_mutex under high contention
+#if defined(_WIN64)
//#define MONGO_USE_SRW_ON_WINDOWS 1
+#endif
#if !defined(MONGO_USE_SRW_ON_WINDOWS)
@@ -55,131 +57,153 @@ namespace mongo {
#if defined(MONGO_USE_SRW_ON_WINDOWS) && defined(_WIN32)
- class RWLock {
+ // Windows RWLock implementation (requires newer versions of windows thus the above macro)
+ class RWLock : boost::noncopyable {
public:
- RWLock(const char *) { InitializeSRWLock(&_lock); }
+ RWLock(const char *, int lowPriorityWaitMS=0 ) : _lowPriorityWaitMS(lowPriorityWaitMS)
+ { InitializeSRWLock(&_lock); }
~RWLock() { }
+ const char * implType() const { return "WINSRW"; }
+ int lowPriorityWaitMS() const { return _lowPriorityWaitMS; }
void lock() { AcquireSRWLockExclusive(&_lock); }
void unlock() { ReleaseSRWLockExclusive(&_lock); }
void lock_shared() { AcquireSRWLockShared(&_lock); }
void unlock_shared() { ReleaseSRWLockShared(&_lock); }
bool lock_shared_try( int millis ) {
+ if( TryAcquireSRWLockShared(&_lock) )
+ return true;
+ if( millis == 0 )
+ return false;
unsigned long long end = curTimeMicros64() + millis*1000;
while( 1 ) {
+ Sleep(1);
if( TryAcquireSRWLockShared(&_lock) )
return true;
if( curTimeMicros64() >= end )
break;
- Sleep(1);
}
return false;
}
bool lock_try( int millis = 0 ) {
+ if( TryAcquireSRWLockExclusive(&_lock) ) // quick check to optimistically avoid calling curTimeMicros64
+ return true;
+ if( millis == 0 )
+ return false;
unsigned long long end = curTimeMicros64() + millis*1000;
- while( 1 ) {
+ do {
+ Sleep(1);
if( TryAcquireSRWLockExclusive(&_lock) )
return true;
- if( curTimeMicros64() >= end )
- break;
- Sleep(1);
- }
+ } while( curTimeMicros64() < end );
return false;
}
private:
SRWLOCK _lock;
+ const int _lowPriorityWaitMS;
};
#elif defined(BOOST_RWLOCK)
- class RWLock {
+
+ // Boost based RWLock implementation
+ class RWLock : boost::noncopyable {
shared_mutex _m;
+ const int _lowPriorityWaitMS;
public:
-#if defined(_DEBUG)
- const char *_name;
- RWLock(const char *name) : _name(name) { }
-#else
- RWLock(const char *) { }
-#endif
+ const char * const _name;
+
+ RWLock(const char *name, int lowPriorityWait=0) : _lowPriorityWaitMS(lowPriorityWait) , _name(name) { }
+
+ const char * implType() const { return "boost"; }
+
+ int lowPriorityWaitMS() const { return _lowPriorityWaitMS; }
+
void lock() {
- _m.lock();
-#if defined(_DEBUG)
- mutexDebugger.entering(_name);
-#endif
+ _m.lock();
+ DEV mutexDebugger.entering(_name);
}
+
+ /*void lock() {
+ // This sequence gives us the lock semantics we want: specifically that write lock acquisition is
+ // greedy EXCEPT when someone already is in upgradable state.
+ lockAsUpgradable();
+ upgrade();
+ DEV mutexDebugger.entering(_name);
+ }*/
+
void unlock() {
-#if defined(_DEBUG)
- mutexDebugger.leaving(_name);
-#endif
+ DEV mutexDebugger.leaving(_name);
_m.unlock();
}
+ void lockAsUpgradable() {
+ _m.lock_upgrade();
+ }
+ void unlockFromUpgradable() { // upgradable -> unlocked
+ _m.unlock_upgrade();
+ }
+ void upgrade() { // upgradable -> exclusive lock
+ _m.unlock_upgrade_and_lock();
+ }
+
void lock_shared() {
_m.lock_shared();
}
-
void unlock_shared() {
_m.unlock_shared();
}
bool lock_shared_try( int millis ) {
- boost::system_time until = get_system_time();
- until += boost::posix_time::milliseconds(millis);
- if( _m.timed_lock_shared( until ) ) {
+ if( _m.timed_lock_shared( boost::posix_time::milliseconds(millis) ) ) {
return true;
}
return false;
}
bool lock_try( int millis = 0 ) {
- boost::system_time until = get_system_time();
- until += boost::posix_time::milliseconds(millis);
- if( _m.timed_lock( until ) ) {
-#if defined(_DEBUG)
- mutexDebugger.entering(_name);
-#endif
+ if( _m.timed_lock( boost::posix_time::milliseconds(millis) ) ) {
+ DEV mutexDebugger.entering(_name);
return true;
}
return false;
}
-
-
};
+
#else
- class RWLock {
- pthread_rwlock_t _lock;
- inline void check( int x ) {
- if( x == 0 )
+ // Posix RWLock implementation
+ class RWLock : boost::noncopyable {
+ pthread_rwlock_t _lock;
+ const int _lowPriorityWaitMS;
+ static void check( int x ) {
+ if( MONGO_likely(x == 0) )
return;
log() << "pthread rwlock failed: " << x << endl;
assert( x == 0 );
}
-
+
public:
-#if defined(_DEBUG)
const char *_name;
- RWLock(const char *name) : _name(name) {
-#else
- RWLock(const char *) {
-#endif
+ RWLock(const char *name, int lowPriorityWaitMS=0) : _lowPriorityWaitMS(lowPriorityWaitMS), _name(name)
+ {
check( pthread_rwlock_init( &_lock , 0 ) );
}
-
+
~RWLock() {
if ( ! StaticObserver::_destroyingStatics ) {
- check( pthread_rwlock_destroy( &_lock ) );
+ wassert( pthread_rwlock_destroy( &_lock ) == 0 ); // wassert as don't want to throw from a destructor
}
}
+ const char * implType() const { return "posix"; }
+
+ int lowPriorityWaitMS() const { return _lowPriorityWaitMS; }
+
void lock() {
check( pthread_rwlock_wrlock( &_lock ) );
-#if defined(_DEBUG)
- mutexDebugger.entering(_name);
-#endif
+ DEV mutexDebugger.entering(_name);
}
void unlock() {
-#if defined(_DEBUG)
mutexDebugger.leaving(_name);
-#endif
check( pthread_rwlock_unlock( &_lock ) );
}
@@ -197,9 +221,7 @@ namespace mongo {
bool lock_try( int millis = 0 ) {
if( _try( millis , true ) ) {
-#if defined(_DEBUG)
- mutexDebugger.entering(_name);
-#endif
+ DEV mutexDebugger.entering(_name);
return true;
}
return false;
@@ -233,7 +255,7 @@ namespace mongo {
#endif
/** throws on failure to acquire in the specified time period. */
- class rwlock_try_write {
+ class rwlock_try_write : boost::noncopyable {
public:
struct exception { };
rwlock_try_write(RWLock& l, int millis = 0) : _l(l) {
@@ -245,16 +267,57 @@ namespace mongo {
RWLock& _l;
};
+ class rwlock_shared : boost::noncopyable {
+ public:
+ rwlock_shared(RWLock& rwlock) : _r(rwlock) {_r.lock_shared(); }
+ ~rwlock_shared() { _r.unlock_shared(); }
+ private:
+ RWLock& _r;
+ };
+
/* scoped lock for RWLock */
- class rwlock {
+ class rwlock : boost::noncopyable {
public:
- rwlock( const RWLock& lock , bool write , bool alreadyHaveLock = false )
+ /**
+ * @param write acquire write lock if true sharable if false
+ * @param lowPriority if > 0, will try to get the lock non-greedily for that many ms
+ */
+ rwlock( const RWLock& lock , bool write, /* bool alreadyHaveLock = false , */int lowPriorityWaitMS = 0 )
: _lock( (RWLock&)lock ) , _write( write ) {
- if ( ! alreadyHaveLock ) {
- if ( _write )
- _lock.lock();
- else
+
+ {
+ if ( _write ) {
+
+ if ( ! lowPriorityWaitMS && lock.lowPriorityWaitMS() )
+ lowPriorityWaitMS = lock.lowPriorityWaitMS();
+
+ if ( lowPriorityWaitMS ) {
+ bool got = false;
+ for ( int i=0; i<lowPriorityWaitMS; i++ ) {
+ if ( _lock.lock_try(0) ) {
+ got = true;
+ break;
+ }
+
+ int sleep = 1;
+ if ( i > ( lowPriorityWaitMS / 20 ) )
+ sleep = 10;
+ sleepmillis(sleep);
+ i += ( sleep - 1 );
+ }
+ if ( ! got ) {
+ log() << "couldn't get lazy rwlock" << endl;
+ _lock.lock();
+ }
+ }
+ else {
+ _lock.lock();
+ }
+
+ }
+ else {
_lock.lock_shared();
+ }
}
}
~rwlock() {
@@ -267,4 +330,67 @@ namespace mongo {
RWLock& _lock;
const bool _write;
};
+
+ /** recursive on shared locks is ok for this implementation */
+ class RWLockRecursive : boost::noncopyable {
+ ThreadLocalValue<int> _state;
+ RWLock _lk;
+ friend class Exclusive;
+ public:
+ /** @param lpwaitms lazy wait */
+ RWLockRecursive(const char *name, int lpwaitms) : _lk(name, lpwaitms) { }
+
+ void assertExclusivelyLocked() {
+ dassert( _state.get() < 0 );
+ }
+
+ // RWLockRecursive::Exclusive scoped lock
+ class Exclusive : boost::noncopyable {
+ RWLockRecursive& _r;
+ rwlock *_scopedLock;
+ public:
+ Exclusive(RWLockRecursive& r) : _r(r), _scopedLock(0) {
+ int s = _r._state.get();
+ dassert( s <= 0 );
+ if( s == 0 )
+ _scopedLock = new rwlock(_r._lk, true);
+ _r._state.set(s-1);
+ }
+ ~Exclusive() {
+ int s = _r._state.get();
+ DEV wassert( s < 0 ); // wassert: don't throw from destructors
+ _r._state.set(s+1);
+ delete _scopedLock;
+ }
+ };
+
+ // RWLockRecursive::Shared scoped lock
+ class Shared : boost::noncopyable {
+ RWLockRecursive& _r;
+ bool _alreadyExclusive;
+ public:
+ Shared(RWLockRecursive& r) : _r(r) {
+ int s = _r._state.get();
+ _alreadyExclusive = s < 0;
+ if( !_alreadyExclusive ) {
+ dassert( s >= 0 ); // -1 would mean exclusive
+ if( s == 0 )
+ _r._lk.lock_shared();
+ _r._state.set(s+1);
+ }
+ }
+ ~Shared() {
+ if( _alreadyExclusive ) {
+ DEV wassert( _r._state.get() < 0 );
+ }
+ else {
+ int s = _r._state.get() - 1;
+ if( s == 0 )
+ _r._lk.unlock_shared();
+ _r._state.set(s);
+ DEV wassert( s >= 0 );
+ }
+ }
+ };
+ };
}
diff --git a/util/concurrency/shared_mutex_win.hpp b/util/concurrency/shared_mutex_win.hpp
index 5356cf2..e850fc6 100755..100644
--- a/util/concurrency/shared_mutex_win.hpp
+++ b/util/concurrency/shared_mutex_win.hpp
@@ -7,10 +7,31 @@
// accompanying file LICENSE_1_0.txt or copy at
// http://www.boost.org/LICENSE_1_0.txt)
-// MongoDB :
-//
-// Slightly modified boost file to not die above 127 pending writes
-//
+/* MongoDB :
+ Slightly modified boost file to not die above 127 pending writes
+ Here is what changed (from boost 1.42.0 shared_mutex.hpp):
+ 1,2c1,2
+ < #ifndef BOOST_THREAD_WIN32_SHARED_MUTEX_HPP
+ < #define BOOST_THREAD_WIN32_SHARED_MUTEX_HPP
+ ---
+ > #ifndef BOOST_THREAD_WIN32_SHARED_MUTEX_HPP_MODIFIED
+ > #define BOOST_THREAD_WIN32_SHARED_MUTEX_HPP_MODIFIED
+ 22c27
+ < class shared_mutex:
+ ---
+ > class modified_shared_mutex:
+ 73c78
+ < shared_mutex():
+ ---
+ > modified_shared_mutex():
+ 84c89
+ < ~shared_mutex()
+ ---
+ > ~modified_shared_mutex()
+ 283a289,290
+ > if( new_state.exclusive_waiting == 127 ) // the maximum already!
+ > break;
+*/
#include <boost/assert.hpp>
#include <boost/detail/interlocked.hpp>
diff --git a/util/concurrency/spin_lock.cpp b/util/concurrency/spin_lock.cpp
index 0f33609..1811f15 100644
--- a/util/concurrency/spin_lock.cpp
+++ b/util/concurrency/spin_lock.cpp
@@ -25,20 +25,28 @@ namespace mongo {
SpinLock::~SpinLock() {
#if defined(_WIN32)
DeleteCriticalSection(&_cs);
+#elif defined(__USE_XOPEN2K)
+ pthread_spin_destroy(&_lock);
#endif
}
SpinLock::SpinLock()
-#if defined(__GCC_HAVE_SYNC_COMPARE_AND_SWAP_4)
- : _locked( false ) { }
-#elif defined(_WIN32)
+#if defined(_WIN32)
{ InitializeCriticalSectionAndSpinCount(&_cs, 4000); }
+#elif defined(__USE_XOPEN2K)
+ { pthread_spin_init( &_lock , 0 ); }
+#elif defined(__GCC_HAVE_SYNC_COMPARE_AND_SWAP_4)
+ : _locked( false ) { }
#else
- : _mutex( "SpinLock" ) { }
+ : _mutex( "SpinLock" ) { }
#endif
void SpinLock::lock() {
-#if defined(__GCC_HAVE_SYNC_COMPARE_AND_SWAP_4)
+#if defined(_WIN32)
+ EnterCriticalSection(&_cs);
+#elif defined(__USE_XOPEN2K)
+ pthread_spin_lock( &_lock );
+#elif defined(__GCC_HAVE_SYNC_COMPARE_AND_SWAP_4)
// fast path
if (!_locked && !__sync_lock_test_and_set(&_locked, true)) {
return;
@@ -55,8 +63,6 @@ namespace mongo {
while (__sync_lock_test_and_set(&_locked, true)) {
nanosleep(&t, NULL);
}
-#elif defined(_WIN32)
- EnterCriticalSection(&_cs);
#else
// WARNING Missing spin lock in this platform. This can potentially
// be slow.
@@ -66,19 +72,28 @@ namespace mongo {
}
void SpinLock::unlock() {
-#if defined(__GCC_HAVE_SYNC_COMPARE_AND_SWAP_4)
-
- __sync_lock_release(&_locked);
-
-#elif defined(WIN32)
-
+#if defined(_WIN32)
LeaveCriticalSection(&_cs);
-
+#elif defined(__USE_XOPEN2K)
+ pthread_spin_unlock(&_lock);
+#elif defined(__GCC_HAVE_SYNC_COMPARE_AND_SWAP_4)
+ __sync_lock_release(&_locked);
#else
-
_mutex.unlock();
+#endif
+ }
+ bool SpinLock::isfast() {
+#if defined(_WIN32)
+ return true;
+#elif defined(__USE_XOPEN2K)
+ return true;
+#elif defined(__GCC_HAVE_SYNC_COMPARE_AND_SWAP_4)
+ return true;
+#else
+ return false;
#endif
}
+
} // namespace mongo
diff --git a/util/concurrency/spin_lock.h b/util/concurrency/spin_lock.h
index 02a8797..65ecb15 100644
--- a/util/concurrency/spin_lock.h
+++ b/util/concurrency/spin_lock.h
@@ -18,8 +18,7 @@
#pragma once
-#include "pch.h"
-#include "rwlock.h"
+#include "mutex.h"
namespace mongo {
@@ -27,7 +26,7 @@ namespace mongo {
* The spinlock currently requires late GCC support routines to be efficient.
* Other platforms default to a mutex implemenation.
*/
- class SpinLock {
+ class SpinLock : boost::noncopyable {
public:
SpinLock();
~SpinLock();
@@ -35,30 +34,30 @@ namespace mongo {
void lock();
void unlock();
+ static bool isfast(); // true if a real spinlock on this platform
+
private:
-#if defined(__GCC_HAVE_SYNC_COMPARE_AND_SWAP_4)
- volatile bool _locked;
-#elif defined(_WIN32)
+#if defined(_WIN32)
CRITICAL_SECTION _cs;
+#elif defined(__USE_XOPEN2K)
+ pthread_spinlock_t _lock;
+#elif defined(__GCC_HAVE_SYNC_COMPARE_AND_SWAP_4)
+ volatile bool _locked;
#else
- // default to a scoped mutex if not implemented
- RWLock _mutex;
+ // default to a mutex if not implemented
+ SimpleMutex _mutex;
#endif
-
- // Non-copyable, non-assignable
- SpinLock(SpinLock&);
- SpinLock& operator=(SpinLock&);
};
- struct scoped_spinlock {
- scoped_spinlock( SpinLock& l ) : _l(l){
+ class scoped_spinlock : boost::noncopyable {
+ public:
+ scoped_spinlock( SpinLock& l ) : _l(l) {
_l.lock();
}
~scoped_spinlock() {
- _l.unlock();
- }
+ _l.unlock();}
+ private:
SpinLock& _l;
};
} // namespace mongo
-
diff --git a/util/concurrency/synchronization.cpp b/util/concurrency/synchronization.cpp
index 12e2894..ce2547c 100644
--- a/util/concurrency/synchronization.cpp
+++ b/util/concurrency/synchronization.cpp
@@ -20,7 +20,8 @@
namespace mongo {
- Notification::Notification() : _mutex ( "Notification" ) , _notified( false ) { }
+ Notification::Notification() : _mutex ( "Notification" ) , _notified( false ) {
+ }
Notification::~Notification() { }
@@ -37,19 +38,40 @@ namespace mongo {
_condition.notify_one();
}
- NotifyAll::NotifyAll() : _mutex("NotifyAll"), _counter(0) { }
+ /* --- NotifyAll --- */
+
+ NotifyAll::NotifyAll() : _mutex("NotifyAll") {
+ _lastDone = 0;
+ _lastReturned = 0;
+ _nWaiting = 0;
+ }
+
+ NotifyAll::When NotifyAll::now() {
+ scoped_lock lock( _mutex );
+ return ++_lastReturned;
+ }
+
+ void NotifyAll::waitFor(When e) {
+ scoped_lock lock( _mutex );
+ ++_nWaiting;
+ while( _lastDone < e ) {
+ _condition.wait( lock.boost() );
+ }
+ }
- void NotifyAll::wait() {
+ void NotifyAll::awaitBeyondNow() {
scoped_lock lock( _mutex );
- unsigned long long old = _counter;
- while( old == _counter ) {
+ ++_nWaiting;
+ When e = ++_lastReturned;
+ while( _lastDone <= e ) {
_condition.wait( lock.boost() );
}
}
- void NotifyAll::notifyAll() {
+ void NotifyAll::notifyAll(When e) {
scoped_lock lock( _mutex );
- ++_counter;
+ _lastDone = e;
+ _nWaiting = 0;
_condition.notify_all();
}
diff --git a/util/concurrency/synchronization.h b/util/concurrency/synchronization.h
index ac2fcab..a0e89f7 100644
--- a/util/concurrency/synchronization.h
+++ b/util/concurrency/synchronization.h
@@ -56,18 +56,30 @@ namespace mongo {
public:
NotifyAll();
+ typedef unsigned long long When;
+
+ When now();
+
/** awaits the next notifyAll() call by another thread. notifications that precede this
call are ignored -- we are looking for a fresh event.
*/
- void wait();
+ void waitFor(When);
+
+ /** a bit faster than waitFor( now() ) */
+ void awaitBeyondNow();
/** may be called multiple times. notifies all waiters */
- void notifyAll();
+ void notifyAll(When);
+
+ /** indicates how many threads are waiting for a notify. */
+ unsigned nWaiting() const { return _nWaiting; }
private:
mongo::mutex _mutex;
- unsigned long long _counter;
boost::condition _condition;
+ When _lastDone;
+ When _lastReturned;
+ unsigned _nWaiting;
};
} // namespace mongo
diff --git a/util/concurrency/value.h b/util/concurrency/value.h
index 0a0ef85..c66977b 100644
--- a/util/concurrency/value.h
+++ b/util/concurrency/value.h
@@ -1,5 +1,5 @@
/* @file value.h
- concurrency helpers Atomic<T> and DiagStr
+ concurrency helpers DiagStr, Guarded
*/
/**
@@ -20,44 +20,29 @@
#pragma once
+#include "mutex.h"
+
namespace mongo {
- extern mutex _atomicMutex;
+ /** declare that a variable that is "guarded" by a mutex.
- /** atomic wrapper for a value. enters a mutex on each access. must
- be copyable.
- */
- template<typename T>
- class Atomic : boost::noncopyable {
- T val;
- public:
- Atomic<T>() { }
+ The decl documents the rule. For example "counta and countb are guarded by xyzMutex":
- void operator=(const T& a) {
- scoped_lock lk(_atomicMutex);
- val = a;
- }
+ Guarded<int, xyzMutex> counta;
+ Guarded<int, xyzMutex> countb;
- operator T() const {
- scoped_lock lk(_atomicMutex);
- return val;
+ Upon use, specify the scoped_lock object. This makes it hard for someone
+ later to forget to be in the lock. Check is made that it is the right lock in _DEBUG
+ builds at runtime.
+ */
+ template <typename T, mutex& BY>
+ class Guarded {
+ T _val;
+ public:
+ T& ref(const scoped_lock& lk) {
+ dassert( lk._mut == &BY );
+ return _val;
}
-
- /** example:
- Atomic<int> q;
- ...
- {
- Atomic<int>::tran t(q);
- if( q.ref() > 0 )
- q.ref()--;
- }
- */
- class tran : private scoped_lock {
- Atomic<T>& _a;
- public:
- tran(Atomic<T>& a) : scoped_lock(_atomicMutex), _a(a) { }
- T& ref() { return _a.val; }
- };
};
class DiagStr {
diff --git a/util/concurrency/vars.cpp b/util/concurrency/vars.cpp
index 19b58eb..213e576 100644
--- a/util/concurrency/vars.cpp
+++ b/util/concurrency/vars.cpp
@@ -17,15 +17,13 @@
*/
#include "pch.h"
-#include "value.h"
#include "mutex.h"
+#include "value.h"
namespace mongo {
mutex DiagStr::m("diags");
- mongo::mutex _atomicMutex("_atomicMutex");
-
// intentional leak. otherwise destructor orders can be problematic at termination.
MutexDebugger &mutexDebugger = *(new MutexDebugger());