summaryrefslogtreecommitdiff
path: root/util/concurrency/thread_pool.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'util/concurrency/thread_pool.cpp')
-rw-r--r--util/concurrency/thread_pool.cpp45
1 files changed, 24 insertions, 21 deletions
diff --git a/util/concurrency/thread_pool.cpp b/util/concurrency/thread_pool.cpp
index 2caac1f..1c25884 100644
--- a/util/concurrency/thread_pool.cpp
+++ b/util/concurrency/thread_pool.cpp
@@ -20,8 +20,8 @@
#include "thread_pool.h"
#include "mvar.h"
-namespace mongo{
- namespace threadpool{
+namespace mongo {
+ namespace threadpool {
// Worker thread
class Worker : boost::noncopyable {
@@ -34,12 +34,12 @@ namespace mongo{
// destructor will block until current operation is completed
// Acts as a "join" on this thread
- ~Worker(){
+ ~Worker() {
_task.put(Task());
_thread.join();
}
- void set_task(Task& func){
+ void set_task(Task& func) {
assert(!func.empty());
assert(_is_done);
_is_done = false;
@@ -47,13 +47,13 @@ namespace mongo{
_task.put(func);
}
- private:
+ private:
ThreadPool& _owner;
MVar<Task> _task;
bool _is_done; // only used for error detection
boost::thread _thread;
- void loop(){
+ void loop() {
while (true) {
Task task = _task.take();
if (task.empty())
@@ -61,9 +61,11 @@ namespace mongo{
try {
task();
- } catch (std::exception e){
+ }
+ catch (std::exception e) {
log() << "Unhandled exception in worker thread: " << e.what() << endl;;
- } catch (...){
+ }
+ catch (...) {
log() << "Unhandled non-exception in worker thread" << endl;
}
_is_done = true;
@@ -74,16 +76,15 @@ namespace mongo{
ThreadPool::ThreadPool(int nThreads)
: _mutex("ThreadPool"), _tasksRemaining(0)
- , _nThreads(nThreads)
- {
+ , _nThreads(nThreads) {
scoped_lock lock(_mutex);
- while (nThreads-- > 0){
+ while (nThreads-- > 0) {
Worker* worker = new Worker(*this);
_freeWorkers.push_front(worker);
}
}
- ThreadPool::~ThreadPool(){
+ ThreadPool::~ThreadPool() {
join();
assert(_tasks.empty());
@@ -91,40 +92,42 @@ namespace mongo{
// O(n) but n should be small
assert(_freeWorkers.size() == (unsigned)_nThreads);
- while(!_freeWorkers.empty()){
+ while(!_freeWorkers.empty()) {
delete _freeWorkers.front();
_freeWorkers.pop_front();
}
}
- void ThreadPool::join(){
+ void ThreadPool::join() {
scoped_lock lock(_mutex);
- while(_tasksRemaining){
+ while(_tasksRemaining) {
_condition.wait(lock.boost());
}
}
- void ThreadPool::schedule(Task task){
+ void ThreadPool::schedule(Task task) {
scoped_lock lock(_mutex);
_tasksRemaining++;
- if (!_freeWorkers.empty()){
+ if (!_freeWorkers.empty()) {
_freeWorkers.front()->set_task(task);
_freeWorkers.pop_front();
- }else{
+ }
+ else {
_tasks.push_back(task);
}
}
// should only be called by a worker from the worker thread
- void ThreadPool::task_done(Worker* worker){
+ void ThreadPool::task_done(Worker* worker) {
scoped_lock lock(_mutex);
- if (!_tasks.empty()){
+ if (!_tasks.empty()) {
worker->set_task(_tasks.front());
_tasks.pop_front();
- }else{
+ }
+ else {
_freeWorkers.push_front(worker);
}