diff options
Diffstat (limited to 'util/concurrency/thread_pool.cpp')
-rw-r--r-- | util/concurrency/thread_pool.cpp | 45 |
1 files changed, 24 insertions, 21 deletions
diff --git a/util/concurrency/thread_pool.cpp b/util/concurrency/thread_pool.cpp index 2caac1f..1c25884 100644 --- a/util/concurrency/thread_pool.cpp +++ b/util/concurrency/thread_pool.cpp @@ -20,8 +20,8 @@ #include "thread_pool.h" #include "mvar.h" -namespace mongo{ - namespace threadpool{ +namespace mongo { + namespace threadpool { // Worker thread class Worker : boost::noncopyable { @@ -34,12 +34,12 @@ namespace mongo{ // destructor will block until current operation is completed // Acts as a "join" on this thread - ~Worker(){ + ~Worker() { _task.put(Task()); _thread.join(); } - void set_task(Task& func){ + void set_task(Task& func) { assert(!func.empty()); assert(_is_done); _is_done = false; @@ -47,13 +47,13 @@ namespace mongo{ _task.put(func); } - private: + private: ThreadPool& _owner; MVar<Task> _task; bool _is_done; // only used for error detection boost::thread _thread; - void loop(){ + void loop() { while (true) { Task task = _task.take(); if (task.empty()) @@ -61,9 +61,11 @@ namespace mongo{ try { task(); - } catch (std::exception e){ + } + catch (std::exception e) { log() << "Unhandled exception in worker thread: " << e.what() << endl;; - } catch (...){ + } + catch (...) { log() << "Unhandled non-exception in worker thread" << endl; } _is_done = true; @@ -74,16 +76,15 @@ namespace mongo{ ThreadPool::ThreadPool(int nThreads) : _mutex("ThreadPool"), _tasksRemaining(0) - , _nThreads(nThreads) - { + , _nThreads(nThreads) { scoped_lock lock(_mutex); - while (nThreads-- > 0){ + while (nThreads-- > 0) { Worker* worker = new Worker(*this); _freeWorkers.push_front(worker); } } - ThreadPool::~ThreadPool(){ + ThreadPool::~ThreadPool() { join(); assert(_tasks.empty()); @@ -91,40 +92,42 @@ namespace mongo{ // O(n) but n should be small assert(_freeWorkers.size() == (unsigned)_nThreads); - while(!_freeWorkers.empty()){ + while(!_freeWorkers.empty()) { delete _freeWorkers.front(); _freeWorkers.pop_front(); } } - void ThreadPool::join(){ + void ThreadPool::join() { scoped_lock lock(_mutex); - while(_tasksRemaining){ + while(_tasksRemaining) { _condition.wait(lock.boost()); } } - void ThreadPool::schedule(Task task){ + void ThreadPool::schedule(Task task) { scoped_lock lock(_mutex); _tasksRemaining++; - if (!_freeWorkers.empty()){ + if (!_freeWorkers.empty()) { _freeWorkers.front()->set_task(task); _freeWorkers.pop_front(); - }else{ + } + else { _tasks.push_back(task); } } // should only be called by a worker from the worker thread - void ThreadPool::task_done(Worker* worker){ + void ThreadPool::task_done(Worker* worker) { scoped_lock lock(_mutex); - if (!_tasks.empty()){ + if (!_tasks.empty()) { worker->set_task(_tasks.front()); _tasks.pop_front(); - }else{ + } + else { _freeWorkers.push_front(worker); } |