summaryrefslogtreecommitdiff
path: root/util/concurrency
diff options
context:
space:
mode:
authorAntonin Kral <a.kral@bobek.cz>2011-03-17 00:05:43 +0100
committerAntonin Kral <a.kral@bobek.cz>2011-03-17 00:05:43 +0100
commit582fc32574a3b158c81e49cb00e6ae59205e66ba (patch)
treeac64a3243e0d2121709f685695247052858115c8 /util/concurrency
parent2761bffa96595ac1698d86bbc2e95ebb0d4d6e93 (diff)
downloadmongodb-582fc32574a3b158c81e49cb00e6ae59205e66ba.tar.gz
Imported Upstream version 1.8.0
Diffstat (limited to 'util/concurrency')
-rw-r--r--util/concurrency/README (renamed from util/concurrency/readme.txt)20
-rw-r--r--util/concurrency/list.h96
-rw-r--r--util/concurrency/msg.h8
-rw-r--r--util/concurrency/mutex.h129
-rw-r--r--util/concurrency/mvar.h28
-rw-r--r--util/concurrency/race.h72
-rw-r--r--util/concurrency/rwlock.h170
-rwxr-xr-xutil/concurrency/shared_mutex_win.hpp573
-rw-r--r--util/concurrency/spin_lock.cpp34
-rw-r--r--util/concurrency/spin_lock.h26
-rw-r--r--util/concurrency/synchronization.cpp56
-rw-r--r--util/concurrency/synchronization.h73
-rw-r--r--util/concurrency/task.cpp56
-rw-r--r--util/concurrency/task.h14
-rw-r--r--util/concurrency/thread_pool.cpp45
-rw-r--r--util/concurrency/thread_pool.h110
-rw-r--r--util/concurrency/value.h24
-rw-r--r--util/concurrency/vars.cpp24
18 files changed, 1237 insertions, 321 deletions
diff --git a/util/concurrency/readme.txt b/util/concurrency/README
index 6f308f5..1d72a1c 100644
--- a/util/concurrency/readme.txt
+++ b/util/concurrency/README
@@ -1,11 +1,10 @@
-util/concurrency/ files
-
-list.h - a list class that is lock-free for reads
-rwlock.h - read/write locks (RWLock)
-msg.h - message passing between threads
-task.h - an abstraction around threads
-mutex.h - small enhancements that wrap boost::mutex
-thread_pool.h
+util/concurrency/ files
+
+list.h - a list class that is lock-free for reads
+rwlock.h - read/write locks (RWLock)
+msg.h - message passing between threads
+task.h - an abstraction around threads
+mutex.h - small enhancements that wrap boost::mutex
mvar.h
This is based on haskell's MVar synchronization primitive:
http://www.haskell.org/ghc/docs/latest/html/libraries/base-4.2.0.0/Control-Concurrent-MVar.html
@@ -13,3 +12,8 @@ mvar.h
You can also think of it as a box that can be either full or empty.
value.h
Atomic wrapper for values/objects that are copy constructable / assignable
+thread_pool.h
+spinlock.h
+synchronization.h
+ A class to establish a sinchronization point between two threads. One thread is the waiter and one
+ is the notifier. After the notification event, both proceed normally.
diff --git a/util/concurrency/list.h b/util/concurrency/list.h
index 968ff4d..e5eaec6 100644
--- a/util/concurrency/list.h
+++ b/util/concurrency/list.h
@@ -18,64 +18,64 @@
#pragma once
-namespace mongo {
+namespace mongo {
-/* this class uses a mutex for writes, but not for reads.
- we can get fancier later...
+ /* this class uses a mutex for writes, but not for reads.
+ we can get fancier later...
- struct Member : public List1<Member>::Base {
- const char *host;
- int port;
- };
- List1<Member> _members;
- _members.head()->next();
+ struct Member : public List1<Member>::Base {
+ const char *host;
+ int port;
+ };
+ List1<Member> _members;
+ _members.head()->next();
-*/
-template<typename T>
-class List1 : boost::noncopyable {
-public:
- /* next() and head() return 0 at end of list */
+ */
+ template<typename T>
+ class List1 : boost::noncopyable {
+ public:
+ /* next() and head() return 0 at end of list */
- List1() : _head(0), _m("List1"), _orphans(0) { }
+ List1() : _head(0), _m("List1"), _orphans(0) { }
- class Base {
- friend class List1;
- T *_next;
- public:
- T* next() const { return _next; }
- };
+ class Base {
+ friend class List1;
+ T *_next;
+ public:
+ T* next() const { return _next; }
+ };
- T* head() const { return _head; }
+ T* head() const { return _head; }
- void push(T* t) {
- scoped_lock lk(_m);
- t->_next = _head;
- _head = t;
- }
+ void push(T* t) {
+ scoped_lock lk(_m);
+ t->_next = _head;
+ _head = t;
+ }
- // intentionally leak.
- void orphanAll() {
- _head = 0;
- }
+ // intentionally leak.
+ void orphanAll() {
+ _head = 0;
+ }
- /* t is not deleted, but is removed from the list. (orphaned) */
- void orphan(T* t) {
- scoped_lock lk(_m);
- T *&prev = _head;
- T *n = prev;
- while( n != t ) {
- prev = n->_next;
- n = prev;
+ /* t is not deleted, but is removed from the list. (orphaned) */
+ void orphan(T* t) {
+ scoped_lock lk(_m);
+ T *&prev = _head;
+ T *n = prev;
+ while( n != t ) {
+ prev = n->_next;
+ n = prev;
+ }
+ prev = t->_next;
+ if( ++_orphans > 500 )
+ log() << "warning orphans=" << _orphans << '\n';
}
- prev = t->_next;
- if( ++_orphans > 500 )
- log() << "warning orphans=" << _orphans << '\n';
- }
-private:
- T *_head;
- mutex _m;
- int _orphans;
-};
+ private:
+ T *_head;
+ mongo::mutex _m;
+ int _orphans;
+ };
};
diff --git a/util/concurrency/msg.h b/util/concurrency/msg.h
index a5b07d3..f7c6788 100644
--- a/util/concurrency/msg.h
+++ b/util/concurrency/msg.h
@@ -21,14 +21,14 @@
#include <deque>
#include "task.h"
-namespace mongo {
+namespace mongo {
- namespace task {
+ namespace task {
typedef boost::function<void()> lam;
/** typical usage is: task::fork( new Server("threadname") ); */
- class Server : public Task {
+ class Server : public Task {
public:
/** send a message to the port */
void send(lam);
@@ -47,7 +47,7 @@ namespace mongo {
private:
virtual bool initClient() { return true; }
- virtual string name() { return _name; }
+ virtual string name() const { return _name; }
void doWork();
deque<lam> d;
boost::mutex m;
diff --git a/util/concurrency/mutex.h b/util/concurrency/mutex.h
index 797ab77..c463498 100644
--- a/util/concurrency/mutex.h
+++ b/util/concurrency/mutex.h
@@ -20,13 +20,29 @@
#include <map>
#include <set>
-namespace mongo {
+#include "../heapcheck.h"
+
+namespace mongo {
- extern bool __destroyingStatics;
class mutex;
- // only used on _DEBUG builds:
- class MutexDebugger {
+ inline boost::xtime incxtimemillis( long long s ) {
+ boost::xtime xt;
+ boost::xtime_get(&xt, boost::TIME_UTC);
+ xt.sec += (int)( s / 1000 );
+ xt.nsec += (int)(( s % 1000 ) * 1000000);
+ if ( xt.nsec >= 1000000000 ) {
+ xt.nsec -= 1000000000;
+ xt.sec++;
+ }
+ return xt;
+ }
+
+ /** only used on _DEBUG builds.
+ MutexDebugger checks that we always acquire locks for multiple mutexes in a consistant (acyclic) order.
+ If we were inconsistent we could deadlock.
+ */
+ class MutexDebugger {
typedef const char * mid; // mid = mutex ID
typedef map<mid,int> Preceeding;
map< mid, int > maxNest;
@@ -34,36 +50,41 @@ namespace mongo {
map< mid, set<mid> > followers;
boost::mutex &x;
unsigned magic;
+
+ void aBreakPoint() { } // for debugging
public:
// set these to create an assert that
// b must never be locked before a
- // so
+ // so
// a.lock(); b.lock(); is fine
// b.lock(); alone is fine too
// only checked on _DEBUG builds.
string a,b;
-
- void aBreakPoint(){}
+
+ /** outputs some diagnostic info on mutexes (on _DEBUG builds) */
void programEnding();
+
MutexDebugger();
+
void entering(mid m) {
- if( magic != 0x12345678 ) return;
+ if( this == 0 ) return;
+ assert( magic == 0x12345678 );
Preceeding *_preceeding = us.get();
if( _preceeding == 0 )
us.reset( _preceeding = new Preceeding() );
Preceeding &preceeding = *_preceeding;
- if( a == m ) {
+ if( a == m ) {
aBreakPoint();
if( preceeding[b.c_str()] ) {
- cout << "mutex problem " << b << " was locked before " << a << endl;
+ cout << "****** MutexDebugger error! warning " << b << " was locked before " << a << endl;
assert(false);
}
}
preceeding[m]++;
- if( preceeding[m] > 1 ) {
+ if( preceeding[m] > 1 ) {
// recursive re-locking.
if( preceeding[m] > maxNest[m] )
maxNest[m] = preceeding[m];
@@ -75,19 +96,19 @@ namespace mongo {
{
boost::mutex::scoped_lock lk(x);
followers[m];
- for( Preceeding::iterator i = preceeding.begin(); i != preceeding.end(); i++ ) {
+ for( Preceeding::iterator i = preceeding.begin(); i != preceeding.end(); i++ ) {
if( m != i->first && i->second > 0 ) {
followers[i->first].insert(m);
- if( followers[m].count(i->first) != 0 ){
+ if( followers[m].count(i->first) != 0 ) {
failed = true;
stringstream ss;
mid bad = i->first;
ss << "mutex problem" <<
- "\n when locking " << m <<
- "\n " << bad << " was already locked and should not be."
- "\n set a and b above to debug.\n";
+ "\n when locking " << m <<
+ "\n " << bad << " was already locked and should not be."
+ "\n set a and b above to debug.\n";
stringstream q;
- for( Preceeding::iterator i = preceeding.begin(); i != preceeding.end(); i++ ) {
+ for( Preceeding::iterator i = preceeding.begin(); i != preceeding.end(); i++ ) {
if( i->first != m && i->first != bad && i->second > 0 )
q << " " << i->first << '\n';
}
@@ -105,8 +126,8 @@ namespace mongo {
assert( 0 );
}
}
- void leaving(mid m) {
- if( magic != 0x12345678 ) return;
+ void leaving(mid m) {
+ if( this == 0 ) return; // still in startup pre-main()
Preceeding& preceeding = *us.get();
preceeding[m]--;
if( preceeding[m] < 0 ) {
@@ -116,38 +137,67 @@ namespace mongo {
}
};
extern MutexDebugger &mutexDebugger;
-
+
// If you create a local static instance of this class, that instance will be destroyed
- // before all global static objects are destroyed, so __destroyingStatics will be set
+ // before all global static objects are destroyed, so _destroyingStatics will be set
// to true before the global static variables are destroyed.
class StaticObserver : boost::noncopyable {
public:
- ~StaticObserver() { __destroyingStatics = true; }
+ static bool _destroyingStatics;
+ ~StaticObserver() { _destroyingStatics = true; }
};
- // On pthread systems, it is an error to destroy a mutex while held. Static global
- // mutexes may be held upon shutdown in our implementation, and this way we avoid
- // destroying them.
+ /** On pthread systems, it is an error to destroy a mutex while held. Static global
+ * mutexes may be held upon shutdown in our implementation, and this way we avoid
+ * destroying them.
+ * NOT recursive.
+ */
class mutex : boost::noncopyable {
public:
#if defined(_DEBUG)
- const char *_name;
+ const char * const _name;
#endif
#if defined(_DEBUG)
- mutex(const char *name)
- : _name(name)
+ mutex(const char *name)
+ : _name(name)
#else
- mutex(const char *)
+ mutex(const char *)
#endif
- {
- _m = new boost::mutex();
+ {
+ _m = new boost::timed_mutex();
+ IGNORE_OBJECT( _m ); // Turn-off heap checking on _m
}
~mutex() {
- if( !__destroyingStatics ) {
+ if( !StaticObserver::_destroyingStatics ) {
+ UNIGNORE_OBJECT( _m );
delete _m;
}
}
+
+ class try_lock : boost::noncopyable {
+ public:
+ try_lock( mongo::mutex &m , int millis = 0 )
+ : _l( m.boost() , incxtimemillis( millis ) ) ,
+#if BOOST_VERSION >= 103500
+ ok( _l.owns_lock() )
+#else
+ ok( _l.locked() )
+#endif
+ {
+ }
+
+ ~try_lock() {
+ }
+
+ private:
+ boost::timed_mutex::scoped_timed_lock _l;
+
+ public:
+ const bool ok;
+ };
+
+
class scoped_lock : boost::noncopyable {
#if defined(_DEBUG)
mongo::mutex *mut;
@@ -159,20 +209,23 @@ namespace mongo {
mutexDebugger.entering(mut->_name);
#endif
}
- ~scoped_lock() {
+ ~scoped_lock() {
#if defined(_DEBUG)
mutexDebugger.leaving(mut->_name);
#endif
}
- boost::mutex::scoped_lock &boost() { return _l; }
+ boost::timed_mutex::scoped_lock &boost() { return _l; }
private:
- boost::mutex::scoped_lock _l;
+ boost::timed_mutex::scoped_lock _l;
};
+
+
private:
- boost::mutex &boost() { return *_m; }
- boost::mutex *_m;
+
+ boost::timed_mutex &boost() { return *_m; }
+ boost::timed_mutex *_m;
};
-
+
typedef mutex::scoped_lock scoped_lock;
typedef boost::recursive_mutex::scoped_lock recursive_scoped_lock;
diff --git a/util/concurrency/mvar.h b/util/concurrency/mvar.h
index 7d17051..9c7a505 100644
--- a/util/concurrency/mvar.h
+++ b/util/concurrency/mvar.h
@@ -31,18 +31,18 @@ namespace mongo {
// create an empty MVar
MVar()
- : _state(EMPTY)
+ : _state(EMPTY)
{}
// creates a full MVar
MVar(const T& val)
- : _state(FULL)
- , _value(val)
+ : _state(FULL)
+ , _value(val)
{}
// puts val into the MVar and returns true or returns false if full
// never blocks
- bool tryPut(const T& val){
+ bool tryPut(const T& val) {
// intentionally repeat test before and after lock
if (_state == FULL) return false;
Mutex::scoped_lock lock(_mutex);
@@ -59,17 +59,17 @@ namespace mongo {
// puts val into the MVar
// will block if the MVar is already full
- void put(const T& val){
+ void put(const T& val) {
Mutex::scoped_lock lock(_mutex);
- while (!tryPut(val)){
- // unlocks lock while waiting and relocks before returning
+ while (!tryPut(val)) {
+ // unlocks lock while waiting and relocks before returning
_condition.wait(lock);
- }
+ }
}
// takes val out of the MVar and returns true or returns false if empty
// never blocks
- bool tryTake(T& out){
+ bool tryTake(T& out) {
// intentionally repeat test before and after lock
if (_state == EMPTY) return false;
Mutex::scoped_lock lock(_mutex);
@@ -86,14 +86,14 @@ namespace mongo {
// takes val out of the MVar
// will block if the MVar is empty
- T take(){
+ T take() {
T ret = T();
Mutex::scoped_lock lock(_mutex);
- while (!tryTake(ret)){
- // unlocks lock while waiting and relocks before returning
+ while (!tryTake(ret)) {
+ // unlocks lock while waiting and relocks before returning
_condition.wait(lock);
- }
+ }
return ret;
}
@@ -102,7 +102,7 @@ namespace mongo {
// Note: this is fast because there is no locking, but state could
// change before you get a chance to act on it.
// Mainly useful for sanity checks / asserts.
- State getState(){ return _state; }
+ State getState() { return _state; }
private:
diff --git a/util/concurrency/race.h b/util/concurrency/race.h
new file mode 100644
index 0000000..0b8338c
--- /dev/null
+++ b/util/concurrency/race.h
@@ -0,0 +1,72 @@
+#pragma once
+
+#include "../goodies.h" // printStackTrace
+
+namespace mongo {
+
+ /** some self-testing of synchronization and attempts to catch race conditions.
+
+ use something like:
+
+ CodeBlock myBlock;
+
+ void foo() {
+ CodeBlock::Within w(myBlock);
+ ...
+ }
+
+ In _DEBUG builds, will (sometimes/maybe) fail if two threads are in the same code block at
+ the same time. Also detects and disallows recursion.
+ */
+
+#if defined(_DEBUG)
+
+ class CodeBlock {
+ volatile int n;
+ unsigned tid;
+ void fail() {
+ log() << "synchronization (race condition) failure" << endl;
+ printStackTrace();
+ abort();
+ }
+ void enter() {
+ if( ++n != 1 ) fail();
+#if defined(_WIN32)
+ tid = GetCurrentThreadId();
+#endif
+ }
+ void leave() {
+ if( --n != 0 ) fail();
+ }
+ public:
+ CodeBlock() : n(0) { }
+
+ class Within {
+ CodeBlock& _s;
+ public:
+ Within(CodeBlock& s) : _s(s) { _s.enter(); }
+ ~Within() { _s.leave(); }
+ };
+
+ void assertWithin() {
+ assert( n == 1 );
+#if defined(_WIN32)
+ assert( GetCurrentThreadId() == tid );
+#endif
+ }
+ };
+
+#else
+
+ class CodeBlock{
+ public:
+ class Within {
+ public:
+ Within(CodeBlock&) { }
+ };
+ void assertWithin() { }
+ };
+
+#endif
+
+}
diff --git a/util/concurrency/rwlock.h b/util/concurrency/rwlock.h
index 75169b2..ca81a9f 100644
--- a/util/concurrency/rwlock.h
+++ b/util/concurrency/rwlock.h
@@ -1,4 +1,4 @@
-// rwlock.h
+// @file rwlock.h generic reader-writer lock (cross platform support)
/*
* Copyright (C) 2010 10gen Inc.
@@ -19,30 +19,79 @@
#pragma once
#include "mutex.h"
+#include "../time_support.h"
+
+// this requires Vista+ to work
+// it works better than sharable_mutex under high contention
+//#define MONGO_USE_SRW_ON_WINDOWS 1
+
+#if !defined(MONGO_USE_SRW_ON_WINDOWS)
#if BOOST_VERSION >= 103500
- #define BOOST_RWLOCK
+# define BOOST_RWLOCK
#else
+# if defined(_WIN32)
+# error need boost >= 1.35 for windows
+# endif
+# include <pthread.h>
+#endif
- #if defined(_WIN32)
- #error need boost >= 1.35 for windows
- #endif
-
- #include <pthread.h>
-
+#if defined(_WIN32)
+# include "shared_mutex_win.hpp"
+namespace mongo {
+ typedef boost::modified_shared_mutex shared_mutex;
+}
+# undef assert
+# define assert MONGO_assert
+#elif defined(BOOST_RWLOCK)
+# include <boost/thread/shared_mutex.hpp>
+# undef assert
+# define assert MONGO_assert
#endif
-#ifdef BOOST_RWLOCK
-#include <boost/thread/shared_mutex.hpp>
-#undef assert
-#define assert MONGO_assert
#endif
namespace mongo {
-#ifdef BOOST_RWLOCK
+#if defined(MONGO_USE_SRW_ON_WINDOWS) && defined(_WIN32)
+
class RWLock {
- boost::shared_mutex _m;
+ public:
+ RWLock(const char *) { InitializeSRWLock(&_lock); }
+ ~RWLock() { }
+ void lock() { AcquireSRWLockExclusive(&_lock); }
+ void unlock() { ReleaseSRWLockExclusive(&_lock); }
+ void lock_shared() { AcquireSRWLockShared(&_lock); }
+ void unlock_shared() { ReleaseSRWLockShared(&_lock); }
+ bool lock_shared_try( int millis ) {
+ unsigned long long end = curTimeMicros64() + millis*1000;
+ while( 1 ) {
+ if( TryAcquireSRWLockShared(&_lock) )
+ return true;
+ if( curTimeMicros64() >= end )
+ break;
+ Sleep(1);
+ }
+ return false;
+ }
+ bool lock_try( int millis = 0 ) {
+ unsigned long long end = curTimeMicros64() + millis*1000;
+ while( 1 ) {
+ if( TryAcquireSRWLockExclusive(&_lock) )
+ return true;
+ if( curTimeMicros64() >= end )
+ break;
+ Sleep(1);
+ }
+ return false;
+ }
+ private:
+ SRWLOCK _lock;
+ };
+
+#elif defined(BOOST_RWLOCK)
+ class RWLock {
+ shared_mutex _m;
public:
#if defined(_DEBUG)
const char *_name;
@@ -50,40 +99,40 @@ namespace mongo {
#else
RWLock(const char *) { }
#endif
- void lock(){
+ void lock() {
_m.lock();
#if defined(_DEBUG)
mutexDebugger.entering(_name);
#endif
}
- void unlock(){
+ void unlock() {
#if defined(_DEBUG)
mutexDebugger.leaving(_name);
#endif
_m.unlock();
}
-
- void lock_shared(){
+
+ void lock_shared() {
_m.lock_shared();
}
-
- void unlock_shared(){
+
+ void unlock_shared() {
_m.unlock_shared();
}
- bool lock_shared_try( int millis ){
+ bool lock_shared_try( int millis ) {
boost::system_time until = get_system_time();
until += boost::posix_time::milliseconds(millis);
- if( _m.timed_lock_shared( until ) ) {
+ if( _m.timed_lock_shared( until ) ) {
return true;
}
return false;
}
- bool lock_try( int millis = 0 ){
+ bool lock_try( int millis = 0 ) {
boost::system_time until = get_system_time();
until += boost::posix_time::milliseconds(millis);
- if( _m.timed_lock( until ) ) {
+ if( _m.timed_lock( until ) ) {
#if defined(_DEBUG)
mutexDebugger.entering(_name);
#endif
@@ -98,7 +147,7 @@ namespace mongo {
class RWLock {
pthread_rwlock_t _lock;
- inline void check( int x ){
+ inline void check( int x ) {
if( x == 0 )
return;
log() << "pthread rwlock failed: " << x << endl;
@@ -114,40 +163,40 @@ namespace mongo {
#endif
check( pthread_rwlock_init( &_lock , 0 ) );
}
-
- ~RWLock(){
- if ( ! __destroyingStatics ){
+
+ ~RWLock() {
+ if ( ! StaticObserver::_destroyingStatics ) {
check( pthread_rwlock_destroy( &_lock ) );
}
}
- void lock(){
+ void lock() {
check( pthread_rwlock_wrlock( &_lock ) );
#if defined(_DEBUG)
mutexDebugger.entering(_name);
#endif
}
- void unlock(){
+ void unlock() {
#if defined(_DEBUG)
mutexDebugger.leaving(_name);
#endif
check( pthread_rwlock_unlock( &_lock ) );
}
-
- void lock_shared(){
+
+ void lock_shared() {
check( pthread_rwlock_rdlock( &_lock ) );
}
-
- void unlock_shared(){
+
+ void unlock_shared() {
check( pthread_rwlock_unlock( &_lock ) );
}
-
- bool lock_shared_try( int millis ){
+
+ bool lock_shared_try( int millis ) {
return _try( millis , false );
}
- bool lock_try( int millis = 0 ){
- if( _try( millis , true ) ) {
+ bool lock_try( int millis = 0 ) {
+ if( _try( millis , true ) ) {
#if defined(_DEBUG)
mutexDebugger.entering(_name);
#endif
@@ -156,65 +205,66 @@ namespace mongo {
return false;
}
- bool _try( int millis , bool write ){
+ bool _try( int millis , bool write ) {
while ( true ) {
- int x = write ?
- pthread_rwlock_trywrlock( &_lock ) :
- pthread_rwlock_tryrdlock( &_lock );
-
+ int x = write ?
+ pthread_rwlock_trywrlock( &_lock ) :
+ pthread_rwlock_tryrdlock( &_lock );
+
if ( x <= 0 ) {
return true;
}
-
+
if ( millis-- <= 0 )
return false;
-
- if ( x == EBUSY ){
+
+ if ( x == EBUSY ) {
sleepmillis(1);
continue;
}
check(x);
- }
-
+ }
+
return false;
}
};
-
#endif
+ /** throws on failure to acquire in the specified time period. */
class rwlock_try_write {
- RWLock& _l;
public:
struct exception { };
rwlock_try_write(RWLock& l, int millis = 0) : _l(l) {
- if( !l.lock_try(millis) ) throw exception();
+ if( !l.lock_try(millis) )
+ throw exception();
}
~rwlock_try_write() { _l.unlock(); }
+ private:
+ RWLock& _l;
};
- /* scoped lock */
- struct rwlock {
+ /* scoped lock for RWLock */
+ class rwlock {
+ public:
rwlock( const RWLock& lock , bool write , bool alreadyHaveLock = false )
- : _lock( (RWLock&)lock ) , _write( write ){
-
- if ( ! alreadyHaveLock ){
+ : _lock( (RWLock&)lock ) , _write( write ) {
+ if ( ! alreadyHaveLock ) {
if ( _write )
_lock.lock();
else
_lock.lock_shared();
}
}
-
- ~rwlock(){
+ ~rwlock() {
if ( _write )
_lock.unlock();
else
_lock.unlock_shared();
}
-
+ private:
RWLock& _lock;
- bool _write;
+ const bool _write;
};
}
diff --git a/util/concurrency/shared_mutex_win.hpp b/util/concurrency/shared_mutex_win.hpp
new file mode 100755
index 0000000..5356cf2
--- /dev/null
+++ b/util/concurrency/shared_mutex_win.hpp
@@ -0,0 +1,573 @@
+#ifndef BOOST_THREAD_WIN32_SHARED_MUTEX_HPP_MODIFIED
+#define BOOST_THREAD_WIN32_SHARED_MUTEX_HPP_MODIFIED
+
+// (C) Copyright 2006-8 Anthony Williams
+//
+// Distributed under the Boost Software License, Version 1.0. (See
+// accompanying file LICENSE_1_0.txt or copy at
+// http://www.boost.org/LICENSE_1_0.txt)
+
+// MongoDB :
+//
+// Slightly modified boost file to not die above 127 pending writes
+//
+
+#include <boost/assert.hpp>
+#include <boost/detail/interlocked.hpp>
+#include <boost/thread/win32/thread_primitives.hpp>
+#include <boost/static_assert.hpp>
+#include <limits.h>
+#include <boost/utility.hpp>
+#include <boost/thread/thread_time.hpp>
+
+#include <boost/config/abi_prefix.hpp>
+
+namespace boost
+{
+ class modified_shared_mutex:
+ private boost::noncopyable
+ {
+ private:
+ struct state_data
+ {
+ unsigned shared_count:11,
+ shared_waiting:11,
+ exclusive:1,
+ upgrade:1,
+ exclusive_waiting:7,
+ exclusive_waiting_blocked:1;
+
+ friend bool operator==(state_data const& lhs,state_data const& rhs)
+ {
+ return *reinterpret_cast<unsigned const*>(&lhs)==*reinterpret_cast<unsigned const*>(&rhs);
+ }
+ };
+
+
+ template<typename T>
+ T interlocked_compare_exchange(T* target,T new_value,T comparand)
+ {
+ BOOST_STATIC_ASSERT(sizeof(T)==sizeof(long));
+ long const res=BOOST_INTERLOCKED_COMPARE_EXCHANGE(reinterpret_cast<long*>(target),
+ *reinterpret_cast<long*>(&new_value),
+ *reinterpret_cast<long*>(&comparand));
+ return *reinterpret_cast<T const*>(&res);
+ }
+
+ state_data state;
+ detail::win32::handle semaphores[2];
+ detail::win32::handle &unlock_sem;
+ detail::win32::handle &exclusive_sem;
+ detail::win32::handle upgrade_sem;
+
+ void release_waiters(state_data old_state)
+ {
+ if(old_state.exclusive_waiting)
+ {
+ BOOST_VERIFY(detail::win32::ReleaseSemaphore(exclusive_sem,1,0)!=0);
+ }
+
+ if(old_state.shared_waiting || old_state.exclusive_waiting)
+ {
+ BOOST_VERIFY(detail::win32::ReleaseSemaphore(unlock_sem,old_state.shared_waiting + (old_state.exclusive_waiting?1:0),0)!=0);
+ }
+ }
+
+
+ public:
+ modified_shared_mutex():
+ unlock_sem(semaphores[0]),
+ exclusive_sem(semaphores[1])
+ {
+ unlock_sem=detail::win32::create_anonymous_semaphore(0,LONG_MAX);
+ exclusive_sem=detail::win32::create_anonymous_semaphore(0,LONG_MAX);
+ upgrade_sem=detail::win32::create_anonymous_semaphore(0,LONG_MAX);
+ state_data state_={0};
+ state=state_;
+ }
+
+ ~modified_shared_mutex()
+ {
+ detail::win32::CloseHandle(upgrade_sem);
+ detail::win32::CloseHandle(unlock_sem);
+ detail::win32::CloseHandle(exclusive_sem);
+ }
+
+ bool try_lock_shared()
+ {
+ state_data old_state=state;
+ for(;;)
+ {
+ state_data new_state=old_state;
+ if(!new_state.exclusive && !new_state.exclusive_waiting_blocked)
+ {
+ ++new_state.shared_count;
+ }
+
+ state_data const current_state=interlocked_compare_exchange(&state,new_state,old_state);
+ if(current_state==old_state)
+ {
+ break;
+ }
+ old_state=current_state;
+ }
+ return !(old_state.exclusive| old_state.exclusive_waiting_blocked);
+ }
+
+ void lock_shared()
+ {
+ BOOST_VERIFY(timed_lock_shared(::boost::detail::get_system_time_sentinel()));
+ }
+
+ template<typename TimeDuration>
+ bool timed_lock_shared(TimeDuration const & relative_time)
+ {
+ return timed_lock_shared(get_system_time()+relative_time);
+ }
+
+ bool timed_lock_shared(boost::system_time const& wait_until)
+ {
+ for(;;)
+ {
+ state_data old_state=state;
+ for(;;)
+ {
+ state_data new_state=old_state;
+ if(new_state.exclusive || new_state.exclusive_waiting_blocked)
+ {
+ ++new_state.shared_waiting;
+ }
+ else
+ {
+ ++new_state.shared_count;
+ }
+
+ state_data const current_state=interlocked_compare_exchange(&state,new_state,old_state);
+ if(current_state==old_state)
+ {
+ break;
+ }
+ old_state=current_state;
+ }
+
+ if(!(old_state.exclusive| old_state.exclusive_waiting_blocked))
+ {
+ return true;
+ }
+
+ unsigned long const res=detail::win32::WaitForSingleObject(unlock_sem,::boost::detail::get_milliseconds_until(wait_until));
+ if(res==detail::win32::timeout)
+ {
+ for(;;)
+ {
+ state_data new_state=old_state;
+ if(new_state.exclusive || new_state.exclusive_waiting_blocked)
+ {
+ if(new_state.shared_waiting)
+ {
+ --new_state.shared_waiting;
+ }
+ }
+ else
+ {
+ ++new_state.shared_count;
+ }
+
+ state_data const current_state=interlocked_compare_exchange(&state,new_state,old_state);
+ if(current_state==old_state)
+ {
+ break;
+ }
+ old_state=current_state;
+ }
+
+ if(!(old_state.exclusive| old_state.exclusive_waiting_blocked))
+ {
+ return true;
+ }
+ return false;
+ }
+
+ BOOST_ASSERT(res==0);
+ }
+ }
+
+ void unlock_shared()
+ {
+ state_data old_state=state;
+ for(;;)
+ {
+ state_data new_state=old_state;
+ bool const last_reader=!--new_state.shared_count;
+
+ if(last_reader)
+ {
+ if(new_state.upgrade)
+ {
+ new_state.upgrade=false;
+ new_state.exclusive=true;
+ }
+ else
+ {
+ if(new_state.exclusive_waiting)
+ {
+ --new_state.exclusive_waiting;
+ new_state.exclusive_waiting_blocked=false;
+ }
+ new_state.shared_waiting=0;
+ }
+ }
+
+ state_data const current_state=interlocked_compare_exchange(&state,new_state,old_state);
+ if(current_state==old_state)
+ {
+ if(last_reader)
+ {
+ if(old_state.upgrade)
+ {
+ BOOST_VERIFY(detail::win32::ReleaseSemaphore(upgrade_sem,1,0)!=0);
+ }
+ else
+ {
+ release_waiters(old_state);
+ }
+ }
+ break;
+ }
+ old_state=current_state;
+ }
+ }
+
+ void lock()
+ {
+ BOOST_VERIFY(timed_lock(::boost::detail::get_system_time_sentinel()));
+ }
+
+ template<typename TimeDuration>
+ bool timed_lock(TimeDuration const & relative_time)
+ {
+ return timed_lock(get_system_time()+relative_time);
+ }
+
+ bool try_lock()
+ {
+ state_data old_state=state;
+ for(;;)
+ {
+ state_data new_state=old_state;
+ if(new_state.shared_count || new_state.exclusive)
+ {
+ return false;
+ }
+ else
+ {
+ new_state.exclusive=true;
+ }
+
+ state_data const current_state=interlocked_compare_exchange(&state,new_state,old_state);
+ if(current_state==old_state)
+ {
+ break;
+ }
+ old_state=current_state;
+ }
+ return true;
+ }
+
+
+ bool timed_lock(boost::system_time const& wait_until)
+ {
+ for(;;)
+ {
+ state_data old_state=state;
+
+ for(;;)
+ {
+ state_data new_state=old_state;
+ if(new_state.shared_count || new_state.exclusive)
+ {
+ if( new_state.exclusive_waiting == 127 ) // the maximum already!
+ break;
+ ++new_state.exclusive_waiting;
+ new_state.exclusive_waiting_blocked=true;
+ }
+ else
+ {
+ new_state.exclusive=true;
+ }
+
+ state_data const current_state=interlocked_compare_exchange(&state,new_state,old_state);
+ if(current_state==old_state)
+ {
+ break;
+ }
+ old_state=current_state;
+ }
+
+ if(!old_state.shared_count && !old_state.exclusive)
+ {
+ return true;
+ }
+ unsigned long const wait_res=detail::win32::WaitForMultipleObjects(2,semaphores,true,::boost::detail::get_milliseconds_until(wait_until));
+ if(wait_res==detail::win32::timeout)
+ {
+ for(;;)
+ {
+ state_data new_state=old_state;
+ if(new_state.shared_count || new_state.exclusive)
+ {
+ if(new_state.exclusive_waiting)
+ {
+ if(!--new_state.exclusive_waiting)
+ {
+ new_state.exclusive_waiting_blocked=false;
+ }
+ }
+ }
+ else
+ {
+ new_state.exclusive=true;
+ }
+
+ state_data const current_state=interlocked_compare_exchange(&state,new_state,old_state);
+ if(current_state==old_state)
+ {
+ break;
+ }
+ old_state=current_state;
+ }
+ if(!old_state.shared_count && !old_state.exclusive)
+ {
+ return true;
+ }
+ return false;
+ }
+ BOOST_ASSERT(wait_res<2);
+ }
+ }
+
+ void unlock()
+ {
+ state_data old_state=state;
+ for(;;)
+ {
+ state_data new_state=old_state;
+ new_state.exclusive=false;
+ if(new_state.exclusive_waiting)
+ {
+ --new_state.exclusive_waiting;
+ new_state.exclusive_waiting_blocked=false;
+ }
+ new_state.shared_waiting=0;
+
+ state_data const current_state=interlocked_compare_exchange(&state,new_state,old_state);
+ if(current_state==old_state)
+ {
+ break;
+ }
+ old_state=current_state;
+ }
+ release_waiters(old_state);
+ }
+
+ void lock_upgrade()
+ {
+ for(;;)
+ {
+ state_data old_state=state;
+ for(;;)
+ {
+ state_data new_state=old_state;
+ if(new_state.exclusive || new_state.exclusive_waiting_blocked || new_state.upgrade)
+ {
+ ++new_state.shared_waiting;
+ }
+ else
+ {
+ ++new_state.shared_count;
+ new_state.upgrade=true;
+ }
+
+ state_data const current_state=interlocked_compare_exchange(&state,new_state,old_state);
+ if(current_state==old_state)
+ {
+ break;
+ }
+ old_state=current_state;
+ }
+
+ if(!(old_state.exclusive|| old_state.exclusive_waiting_blocked|| old_state.upgrade))
+ {
+ return;
+ }
+
+ BOOST_VERIFY(!detail::win32::WaitForSingleObject(unlock_sem,detail::win32::infinite));
+ }
+ }
+
+ bool try_lock_upgrade()
+ {
+ state_data old_state=state;
+ for(;;)
+ {
+ state_data new_state=old_state;
+ if(new_state.exclusive || new_state.exclusive_waiting_blocked || new_state.upgrade)
+ {
+ return false;
+ }
+ else
+ {
+ ++new_state.shared_count;
+ new_state.upgrade=true;
+ }
+
+ state_data const current_state=interlocked_compare_exchange(&state,new_state,old_state);
+ if(current_state==old_state)
+ {
+ break;
+ }
+ old_state=current_state;
+ }
+ return true;
+ }
+
+ void unlock_upgrade()
+ {
+ state_data old_state=state;
+ for(;;)
+ {
+ state_data new_state=old_state;
+ new_state.upgrade=false;
+ bool const last_reader=!--new_state.shared_count;
+
+ if(last_reader)
+ {
+ if(new_state.exclusive_waiting)
+ {
+ --new_state.exclusive_waiting;
+ new_state.exclusive_waiting_blocked=false;
+ }
+ new_state.shared_waiting=0;
+ }
+
+ state_data const current_state=interlocked_compare_exchange(&state,new_state,old_state);
+ if(current_state==old_state)
+ {
+ if(last_reader)
+ {
+ release_waiters(old_state);
+ }
+ break;
+ }
+ old_state=current_state;
+ }
+ }
+
+ void unlock_upgrade_and_lock()
+ {
+ state_data old_state=state;
+ for(;;)
+ {
+ state_data new_state=old_state;
+ bool const last_reader=!--new_state.shared_count;
+
+ if(last_reader)
+ {
+ new_state.upgrade=false;
+ new_state.exclusive=true;
+ }
+
+ state_data const current_state=interlocked_compare_exchange(&state,new_state,old_state);
+ if(current_state==old_state)
+ {
+ if(!last_reader)
+ {
+ BOOST_VERIFY(!detail::win32::WaitForSingleObject(upgrade_sem,detail::win32::infinite));
+ }
+ break;
+ }
+ old_state=current_state;
+ }
+ }
+
+ void unlock_and_lock_upgrade()
+ {
+ state_data old_state=state;
+ for(;;)
+ {
+ state_data new_state=old_state;
+ new_state.exclusive=false;
+ new_state.upgrade=true;
+ ++new_state.shared_count;
+ if(new_state.exclusive_waiting)
+ {
+ --new_state.exclusive_waiting;
+ new_state.exclusive_waiting_blocked=false;
+ }
+ new_state.shared_waiting=0;
+
+ state_data const current_state=interlocked_compare_exchange(&state,new_state,old_state);
+ if(current_state==old_state)
+ {
+ break;
+ }
+ old_state=current_state;
+ }
+ release_waiters(old_state);
+ }
+
+ void unlock_and_lock_shared()
+ {
+ state_data old_state=state;
+ for(;;)
+ {
+ state_data new_state=old_state;
+ new_state.exclusive=false;
+ ++new_state.shared_count;
+ if(new_state.exclusive_waiting)
+ {
+ --new_state.exclusive_waiting;
+ new_state.exclusive_waiting_blocked=false;
+ }
+ new_state.shared_waiting=0;
+
+ state_data const current_state=interlocked_compare_exchange(&state,new_state,old_state);
+ if(current_state==old_state)
+ {
+ break;
+ }
+ old_state=current_state;
+ }
+ release_waiters(old_state);
+ }
+
+ void unlock_upgrade_and_lock_shared()
+ {
+ state_data old_state=state;
+ for(;;)
+ {
+ state_data new_state=old_state;
+ new_state.upgrade=false;
+ if(new_state.exclusive_waiting)
+ {
+ --new_state.exclusive_waiting;
+ new_state.exclusive_waiting_blocked=false;
+ }
+ new_state.shared_waiting=0;
+
+ state_data const current_state=interlocked_compare_exchange(&state,new_state,old_state);
+ if(current_state==old_state)
+ {
+ break;
+ }
+ old_state=current_state;
+ }
+ release_waiters(old_state);
+ }
+
+ };
+}
+
+#include <boost/config/abi_suffix.hpp>
+
+#endif
diff --git a/util/concurrency/spin_lock.cpp b/util/concurrency/spin_lock.cpp
index b3e689a..0f33609 100644
--- a/util/concurrency/spin_lock.cpp
+++ b/util/concurrency/spin_lock.cpp
@@ -16,18 +16,29 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
+#include "pch.h"
#include <time.h>
#include "spin_lock.h"
namespace mongo {
- SpinLock::SpinLock() : _locked( false ){}
-
- SpinLock::~SpinLock(){}
+ SpinLock::~SpinLock() {
+#if defined(_WIN32)
+ DeleteCriticalSection(&_cs);
+#endif
+ }
- void SpinLock::lock(){
+ SpinLock::SpinLock()
#if defined(__GCC_HAVE_SYNC_COMPARE_AND_SWAP_4)
+ : _locked( false ) { }
+#elif defined(_WIN32)
+ { InitializeCriticalSectionAndSpinCount(&_cs, 4000); }
+#else
+ : _mutex( "SpinLock" ) { }
+#endif
+ void SpinLock::lock() {
+#if defined(__GCC_HAVE_SYNC_COMPARE_AND_SWAP_4)
// fast path
if (!_locked && !__sync_lock_test_and_set(&_locked, true)) {
return;
@@ -44,21 +55,28 @@ namespace mongo {
while (__sync_lock_test_and_set(&_locked, true)) {
nanosleep(&t, NULL);
}
+#elif defined(_WIN32)
+ EnterCriticalSection(&_cs);
#else
-
- // WARNING "TODO Missing spin lock in this platform."
+ // WARNING Missing spin lock in this platform. This can potentially
+ // be slow.
+ _mutex.lock();
#endif
}
- void SpinLock::unlock(){
+ void SpinLock::unlock() {
#if defined(__GCC_HAVE_SYNC_COMPARE_AND_SWAP_4)
__sync_lock_release(&_locked);
+#elif defined(WIN32)
+
+ LeaveCriticalSection(&_cs);
+
#else
- // WARNING "TODO Missing spin lock in this platform."
+ _mutex.unlock();
#endif
}
diff --git a/util/concurrency/spin_lock.h b/util/concurrency/spin_lock.h
index 110290d..d5360f7 100644
--- a/util/concurrency/spin_lock.h
+++ b/util/concurrency/spin_lock.h
@@ -16,18 +16,18 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#ifndef CONCURRENCY_SPINLOCK_HEADER
-#define CONCURRENCY_SPINLOCK_HEADER
+#pragma once
+
+#include "pch.h"
+#include "rwlock.h"
namespace mongo {
/**
- * BIG WARNING - COMPILES, BUT NOT READY FOR USE - BIG WARNING
- *
- * The spinlock currently requires late GCC support
- * routines. Support for other platforms will be added soon.
+ * The spinlock currently requires late GCC support routines to be efficient.
+ * Other platforms default to a mutex implemenation.
*/
- class SpinLock{
+ class SpinLock {
public:
SpinLock();
~SpinLock();
@@ -36,13 +36,19 @@ namespace mongo {
void unlock();
private:
- bool _locked;
+#if defined(__GCC_HAVE_SYNC_COMPARE_AND_SWAP_4)
+ volatile bool _locked;
+#elif defined(_WIN32)
+ CRITICAL_SECTION _cs;
+#else
+ // default to a scoped mutex if not implemented
+ RWLock _mutex;
+#endif
// Non-copyable, non-assignable
SpinLock(SpinLock&);
SpinLock& operator=(SpinLock&);
- };
+ };
} // namespace mongo
-#endif // CONCURRENCY_SPINLOCK_HEADER
diff --git a/util/concurrency/synchronization.cpp b/util/concurrency/synchronization.cpp
new file mode 100644
index 0000000..12e2894
--- /dev/null
+++ b/util/concurrency/synchronization.cpp
@@ -0,0 +1,56 @@
+// synchronization.cpp
+
+/* Copyright 2010 10gen Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "pch.h"
+#include "synchronization.h"
+
+namespace mongo {
+
+ Notification::Notification() : _mutex ( "Notification" ) , _notified( false ) { }
+
+ Notification::~Notification() { }
+
+ void Notification::waitToBeNotified() {
+ scoped_lock lock( _mutex );
+ while ( ! _notified )
+ _condition.wait( lock.boost() );
+ }
+
+ void Notification::notifyOne() {
+ scoped_lock lock( _mutex );
+ assert( !_notified );
+ _notified = true;
+ _condition.notify_one();
+ }
+
+ NotifyAll::NotifyAll() : _mutex("NotifyAll"), _counter(0) { }
+
+ void NotifyAll::wait() {
+ scoped_lock lock( _mutex );
+ unsigned long long old = _counter;
+ while( old == _counter ) {
+ _condition.wait( lock.boost() );
+ }
+ }
+
+ void NotifyAll::notifyAll() {
+ scoped_lock lock( _mutex );
+ ++_counter;
+ _condition.notify_all();
+ }
+
+} // namespace mongo
diff --git a/util/concurrency/synchronization.h b/util/concurrency/synchronization.h
new file mode 100644
index 0000000..ac2fcab
--- /dev/null
+++ b/util/concurrency/synchronization.h
@@ -0,0 +1,73 @@
+// synchronization.h
+
+/* Copyright 2010 10gen Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <boost/thread/condition.hpp>
+#include "mutex.h"
+
+namespace mongo {
+
+ /*
+ * A class to establish a synchronization point between two threads. One thread is the waiter and one is
+ * the notifier. After the notification event, both proceed normally.
+ *
+ * This class is thread-safe.
+ */
+ class Notification {
+ public:
+ Notification();
+ ~Notification();
+
+ /*
+ * Blocks until the method 'notifyOne()' is called.
+ */
+ void waitToBeNotified();
+
+ /*
+ * Notifies the waiter of '*this' that it can proceed. Can only be called once.
+ */
+ void notifyOne();
+
+ private:
+ mongo::mutex _mutex; // protects state below
+ bool _notified; // was notifyOne() issued?
+ boost::condition _condition; // cond over _notified being true
+ };
+
+ /** establishes a synchronization point between threads. N threads are waits and one is notifier.
+ threadsafe.
+ */
+ class NotifyAll : boost::noncopyable {
+ public:
+ NotifyAll();
+
+ /** awaits the next notifyAll() call by another thread. notifications that precede this
+ call are ignored -- we are looking for a fresh event.
+ */
+ void wait();
+
+ /** may be called multiple times. notifies all waiters */
+ void notifyAll();
+
+ private:
+ mongo::mutex _mutex;
+ unsigned long long _counter;
+ boost::condition _condition;
+ };
+
+} // namespace mongo
diff --git a/util/concurrency/task.cpp b/util/concurrency/task.cpp
index 99288bb..d84cd71 100644
--- a/util/concurrency/task.cpp
+++ b/util/concurrency/task.cpp
@@ -17,16 +17,19 @@
*/
#include "pch.h"
+
+#include <boost/thread/condition.hpp>
+
#include "task.h"
#include "../goodies.h"
#include "../unittest.h"
-#include "boost/thread/condition.hpp"
+#include "../time_support.h"
-namespace mongo {
+namespace mongo {
- namespace task {
+ namespace task {
- /*void foo() {
+ /*void foo() {
boost::mutex m;
boost::mutex::scoped_lock lk(m);
boost::condition cond;
@@ -34,21 +37,21 @@ namespace mongo {
cond.notify_one();
}*/
- Task::Task() {
+ Task::Task()
+ : BackgroundJob( true /* deleteSelf */ ) {
n = 0;
repeat = 0;
- deleteSelf = true;
}
void Task::halt() { repeat = 0; }
- void Task::run() {
+ void Task::run() {
assert( n == 0 );
while( 1 ) {
n++;
- try {
+ try {
doWork();
- }
+ }
catch(...) { }
if( repeat == 0 )
break;
@@ -62,11 +65,11 @@ namespace mongo {
go();
}
- void fork(Task *t) {
+ void fork(Task *t) {
t->begin();
}
- void repeat(Task *t, unsigned millis) {
+ void repeat(Task *t, unsigned millis) {
t->repeat = millis;
t->begin();
}
@@ -107,7 +110,7 @@ namespace mongo {
}
}
- void Server::send( lam msg ) {
+ void Server::send( lam msg ) {
{
boost::mutex::scoped_lock lk(m);
d.push_back(msg);
@@ -115,9 +118,9 @@ namespace mongo {
c.notify_one();
}
- void Server::doWork() {
+ void Server::doWork() {
starting();
- while( 1 ) {
+ while( 1 ) {
lam f;
try {
boost::mutex::scoped_lock lk(m);
@@ -126,7 +129,7 @@ namespace mongo {
f = d.front();
d.pop_front();
}
- catch(...) {
+ catch(...) {
log() << "ERROR exception in Server:doWork?" << endl;
}
try {
@@ -138,27 +141,28 @@ namespace mongo {
d.push_back(f);
}
}
- } catch(std::exception& e) {
- log() << "Server::doWork task:" << name() << " exception:" << e.what() << endl;
- }
- catch(const char *p) {
- log() << "Server::doWork task:" << name() << " unknown c exception:" <<
- ((p&&strlen(p)<800)?p:"?") << endl;
- }
- catch(...) {
- log() << "Server::doWork unknown exception task:" << name() << endl;
+ }
+ catch(std::exception& e) {
+ log() << "Server::doWork task:" << name() << " exception:" << e.what() << endl;
+ }
+ catch(const char *p) {
+ log() << "Server::doWork task:" << name() << " unknown c exception:" <<
+ ((p&&strlen(p)<800)?p:"?") << endl;
+ }
+ catch(...) {
+ log() << "Server::doWork unknown exception task:" << name() << endl;
}
}
}
static Server *s;
- static void abc(int i) {
+ static void abc(int i) {
cout << "Hello " << i << endl;
s->requeue();
}
class TaskUnitTest : public mongo::UnitTest {
public:
- virtual void run() {
+ virtual void run() {
lam f = boost::bind(abc, 3);
//f();
diff --git a/util/concurrency/task.h b/util/concurrency/task.h
index b3a2ece..d7b45ee 100644
--- a/util/concurrency/task.h
+++ b/util/concurrency/task.h
@@ -20,9 +20,9 @@
#include "../background.h"
-namespace mongo {
+namespace mongo {
- namespace task {
+ namespace task {
/** abstraction around threads. simpler than BackgroundJob which is used behind the scenes.
allocate the Task dynamically. when the thread terminates, the Task object will delete itself.
@@ -30,11 +30,11 @@ namespace mongo {
class Task : private BackgroundJob {
protected:
virtual void doWork() = 0; // implement the task here.
- virtual string name() = 0; // name the threada
+ virtual string name() const = 0; // name the threada
public:
Task();
- /** for a repeating task, stop after current invocation ends. can be called by other threads
+ /** for a repeating task, stop after current invocation ends. can be called by other threads
as long as the Task is still in scope.
*/
void halt();
@@ -43,7 +43,7 @@ namespace mongo {
friend void fork(Task* t);
friend void repeat(Task* t, unsigned millis);
virtual void run();
- virtual void ending() { }
+ //virtual void ending() { }
void begin();
};
@@ -54,8 +54,8 @@ namespace mongo {
void repeat(Task *t, unsigned millis);
/*** Example ***
- inline void sample() {
- class Sample : public Task {
+ inline void sample() {
+ class Sample : public Task {
public:
int result;
virtual void doWork() { result = 1234; }
diff --git a/util/concurrency/thread_pool.cpp b/util/concurrency/thread_pool.cpp
index 2caac1f..1c25884 100644
--- a/util/concurrency/thread_pool.cpp
+++ b/util/concurrency/thread_pool.cpp
@@ -20,8 +20,8 @@
#include "thread_pool.h"
#include "mvar.h"
-namespace mongo{
- namespace threadpool{
+namespace mongo {
+ namespace threadpool {
// Worker thread
class Worker : boost::noncopyable {
@@ -34,12 +34,12 @@ namespace mongo{
// destructor will block until current operation is completed
// Acts as a "join" on this thread
- ~Worker(){
+ ~Worker() {
_task.put(Task());
_thread.join();
}
- void set_task(Task& func){
+ void set_task(Task& func) {
assert(!func.empty());
assert(_is_done);
_is_done = false;
@@ -47,13 +47,13 @@ namespace mongo{
_task.put(func);
}
- private:
+ private:
ThreadPool& _owner;
MVar<Task> _task;
bool _is_done; // only used for error detection
boost::thread _thread;
- void loop(){
+ void loop() {
while (true) {
Task task = _task.take();
if (task.empty())
@@ -61,9 +61,11 @@ namespace mongo{
try {
task();
- } catch (std::exception e){
+ }
+ catch (std::exception e) {
log() << "Unhandled exception in worker thread: " << e.what() << endl;;
- } catch (...){
+ }
+ catch (...) {
log() << "Unhandled non-exception in worker thread" << endl;
}
_is_done = true;
@@ -74,16 +76,15 @@ namespace mongo{
ThreadPool::ThreadPool(int nThreads)
: _mutex("ThreadPool"), _tasksRemaining(0)
- , _nThreads(nThreads)
- {
+ , _nThreads(nThreads) {
scoped_lock lock(_mutex);
- while (nThreads-- > 0){
+ while (nThreads-- > 0) {
Worker* worker = new Worker(*this);
_freeWorkers.push_front(worker);
}
}
- ThreadPool::~ThreadPool(){
+ ThreadPool::~ThreadPool() {
join();
assert(_tasks.empty());
@@ -91,40 +92,42 @@ namespace mongo{
// O(n) but n should be small
assert(_freeWorkers.size() == (unsigned)_nThreads);
- while(!_freeWorkers.empty()){
+ while(!_freeWorkers.empty()) {
delete _freeWorkers.front();
_freeWorkers.pop_front();
}
}
- void ThreadPool::join(){
+ void ThreadPool::join() {
scoped_lock lock(_mutex);
- while(_tasksRemaining){
+ while(_tasksRemaining) {
_condition.wait(lock.boost());
}
}
- void ThreadPool::schedule(Task task){
+ void ThreadPool::schedule(Task task) {
scoped_lock lock(_mutex);
_tasksRemaining++;
- if (!_freeWorkers.empty()){
+ if (!_freeWorkers.empty()) {
_freeWorkers.front()->set_task(task);
_freeWorkers.pop_front();
- }else{
+ }
+ else {
_tasks.push_back(task);
}
}
// should only be called by a worker from the worker thread
- void ThreadPool::task_done(Worker* worker){
+ void ThreadPool::task_done(Worker* worker) {
scoped_lock lock(_mutex);
- if (!_tasks.empty()){
+ if (!_tasks.empty()) {
worker->set_task(_tasks.front());
_tasks.pop_front();
- }else{
+ }
+ else {
_freeWorkers.push_front(worker);
}
diff --git a/util/concurrency/thread_pool.h b/util/concurrency/thread_pool.h
index f0fe8f1..b348ed1 100644
--- a/util/concurrency/thread_pool.h
+++ b/util/concurrency/thread_pool.h
@@ -15,6 +15,8 @@
* limitations under the License.
*/
+#pragma once
+
#include <boost/function.hpp>
#include <boost/bind.hpp>
#undef assert
@@ -22,59 +24,59 @@
namespace mongo {
-namespace threadpool {
- class Worker;
-
- typedef boost::function<void(void)> Task; //nullary function or functor
-
- // exported to the mongo namespace
- class ThreadPool : boost::noncopyable{
- public:
- explicit ThreadPool(int nThreads=8);
-
- // blocks until all tasks are complete (tasks_remaining() == 0)
- // You should not call schedule while in the destructor
- ~ThreadPool();
-
- // blocks until all tasks are complete (tasks_remaining() == 0)
- // does not prevent new tasks from being scheduled so could wait forever.
- // Also, new tasks could be scheduled after this returns.
- void join();
-
- // task will be copied a few times so make sure it's relatively cheap
- void schedule(Task task);
-
- // Helpers that wrap schedule and boost::bind.
- // Functor and args will be copied a few times so make sure it's relatively cheap
- template<typename F, typename A>
- void schedule(F f, A a){ schedule(boost::bind(f,a)); }
- template<typename F, typename A, typename B>
- void schedule(F f, A a, B b){ schedule(boost::bind(f,a,b)); }
- template<typename F, typename A, typename B, typename C>
- void schedule(F f, A a, B b, C c){ schedule(boost::bind(f,a,b,c)); }
- template<typename F, typename A, typename B, typename C, typename D>
- void schedule(F f, A a, B b, C c, D d){ schedule(boost::bind(f,a,b,c,d)); }
- template<typename F, typename A, typename B, typename C, typename D, typename E>
- void schedule(F f, A a, B b, C c, D d, E e){ schedule(boost::bind(f,a,b,c,d,e)); }
-
- int tasks_remaining() { return _tasksRemaining; }
-
- private:
- mongo::mutex _mutex;
- boost::condition _condition;
-
- list<Worker*> _freeWorkers; //used as LIFO stack (always front)
- list<Task> _tasks; //used as FIFO queue (push_back, pop_front)
- int _tasksRemaining; // in queue + currently processing
- int _nThreads; // only used for sanity checking. could be removed in the future.
-
- // should only be called by a worker from the worker's thread
- void task_done(Worker* worker);
- friend class Worker;
- };
-
-} //namespace threadpool
-
-using threadpool::ThreadPool;
+ namespace threadpool {
+ class Worker;
+
+ typedef boost::function<void(void)> Task; //nullary function or functor
+
+ // exported to the mongo namespace
+ class ThreadPool : boost::noncopyable {
+ public:
+ explicit ThreadPool(int nThreads=8);
+
+ // blocks until all tasks are complete (tasks_remaining() == 0)
+ // You should not call schedule while in the destructor
+ ~ThreadPool();
+
+ // blocks until all tasks are complete (tasks_remaining() == 0)
+ // does not prevent new tasks from being scheduled so could wait forever.
+ // Also, new tasks could be scheduled after this returns.
+ void join();
+
+ // task will be copied a few times so make sure it's relatively cheap
+ void schedule(Task task);
+
+ // Helpers that wrap schedule and boost::bind.
+ // Functor and args will be copied a few times so make sure it's relatively cheap
+ template<typename F, typename A>
+ void schedule(F f, A a) { schedule(boost::bind(f,a)); }
+ template<typename F, typename A, typename B>
+ void schedule(F f, A a, B b) { schedule(boost::bind(f,a,b)); }
+ template<typename F, typename A, typename B, typename C>
+ void schedule(F f, A a, B b, C c) { schedule(boost::bind(f,a,b,c)); }
+ template<typename F, typename A, typename B, typename C, typename D>
+ void schedule(F f, A a, B b, C c, D d) { schedule(boost::bind(f,a,b,c,d)); }
+ template<typename F, typename A, typename B, typename C, typename D, typename E>
+ void schedule(F f, A a, B b, C c, D d, E e) { schedule(boost::bind(f,a,b,c,d,e)); }
+
+ int tasks_remaining() { return _tasksRemaining; }
+
+ private:
+ mongo::mutex _mutex;
+ boost::condition _condition;
+
+ list<Worker*> _freeWorkers; //used as LIFO stack (always front)
+ list<Task> _tasks; //used as FIFO queue (push_back, pop_front)
+ int _tasksRemaining; // in queue + currently processing
+ int _nThreads; // only used for sanity checking. could be removed in the future.
+
+ // should only be called by a worker from the worker's thread
+ void task_done(Worker* worker);
+ friend class Worker;
+ };
+
+ } //namespace threadpool
+
+ using threadpool::ThreadPool;
} //namespace mongo
diff --git a/util/concurrency/value.h b/util/concurrency/value.h
index dabeb95..08d5306 100644
--- a/util/concurrency/value.h
+++ b/util/concurrency/value.h
@@ -20,11 +20,11 @@
#pragma once
-namespace mongo {
+namespace mongo {
extern mutex _atomicMutex;
- /** atomic wrapper for a value. enters a mutex on each access. must
+ /** atomic wrapper for a value. enters a mutex on each access. must
be copyable.
*/
template<typename T>
@@ -33,20 +33,22 @@ namespace mongo {
public:
Atomic<T>() { }
- void operator=(const T& a) {
+ void operator=(const T& a) {
scoped_lock lk(_atomicMutex);
- val = a; }
+ val = a;
+ }
- operator T() const {
+ operator T() const {
scoped_lock lk(_atomicMutex);
- return val; }
-
+ return val;
+ }
+
/** example:
Atomic<int> q;
...
{
Atomic<int>::tran t(q);
- if( q.ref() > 0 )
+ if( q.ref() > 0 )
q.ref()--;
}
*/
@@ -58,11 +60,11 @@ namespace mongo {
};
};
- /** this string COULD be mangled but with the double buffering, assuming writes
- are infrequent, it's unlikely. thus, this is reasonable for lockless setting of
+ /** this string COULD be mangled but with the double buffering, assuming writes
+ are infrequent, it's unlikely. thus, this is reasonable for lockless setting of
diagnostic strings, where their content isn't critical.
*/
- class DiagStr {
+ class DiagStr {
char buf1[256];
char buf2[256];
char *p;
diff --git a/util/concurrency/vars.cpp b/util/concurrency/vars.cpp
index 8863a27..3d057a4 100644
--- a/util/concurrency/vars.cpp
+++ b/util/concurrency/vars.cpp
@@ -20,28 +20,28 @@
#include "value.h"
#include "mutex.h"
-namespace mongo {
+namespace mongo {
- mutex _atomicMutex("_atomicMutex");
+ mongo::mutex _atomicMutex("_atomicMutex");
// intentional leak. otherwise destructor orders can be problematic at termination.
MutexDebugger &mutexDebugger = *(new MutexDebugger());
- MutexDebugger::MutexDebugger() :
- x( *(new boost::mutex()) ), magic(0x12345678) {
- // optional way to debug lock order
- /*
- a = "a_lock";
- b = "b_lock";
- */
+ MutexDebugger::MutexDebugger() :
+ x( *(new boost::mutex()) ), magic(0x12345678) {
+ // optional way to debug lock order
+ /*
+ a = "a_lock";
+ b = "b_lock";
+ */
}
- void MutexDebugger::programEnding() {
+ void MutexDebugger::programEnding() {
if( logLevel>=1 && followers.size() ) {
std::cout << followers.size() << " mutexes in program" << endl;
- for( map< mid, set<mid> >::iterator i = followers.begin(); i != followers.end(); i++ ) {
+ for( map< mid, set<mid> >::iterator i = followers.begin(); i != followers.end(); i++ ) {
cout << i->first;
- if( maxNest[i->first] > 1 )
+ if( maxNest[i->first] > 1 )
cout << " maxNest:" << maxNest[i->first];
cout << '\n';
for( set<mid>::iterator j = i->second.begin(); j != i->second.end(); j++ )