summaryrefslogtreecommitdiff
path: root/util/message_server_asio.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'util/message_server_asio.cpp')
-rw-r--r--util/message_server_asio.cpp82
1 files changed, 74 insertions, 8 deletions
diff --git a/util/message_server_asio.cpp b/util/message_server_asio.cpp
index 4d5fab0..7fca29a 100644
--- a/util/message_server_asio.cpp
+++ b/util/message_server_asio.cpp
@@ -27,23 +27,58 @@
#include "message.h"
#include "message_server.h"
-#include "../util/thread_pool.h"
+#include "../util/mvar.h"
using namespace boost;
using namespace boost::asio;
using namespace boost::asio::ip;
-//using namespace std;
namespace mongo {
+ class MessageServerSession;
+
namespace {
- ThreadPool tp;
+ class StickyThread{
+ public:
+ StickyThread()
+ : _thread(boost::ref(*this))
+ {}
+
+ ~StickyThread(){
+ _mss.put(boost::shared_ptr<MessageServerSession>());
+ _thread.join();
+ }
+
+ void ready(boost::shared_ptr<MessageServerSession> mss){
+ _mss.put(mss);
+ }
+
+ void operator() (){
+ boost::shared_ptr<MessageServerSession> mss;
+ while((mss = _mss.take())){ // intentionally not using ==
+ task(mss.get());
+ mss.reset();
+ }
+ }
+
+ private:
+ boost::thread _thread;
+ inline void task(MessageServerSession* mss); // must be defined after MessageServerSession
+
+ MVar<boost::shared_ptr<MessageServerSession> > _mss; // populated when given a task
+ };
+
+ vector<boost::shared_ptr<StickyThread> > thread_pool;
+ mongo::mutex tp_mutex; // this is only needed if io_service::run() is called from multiple threads
}
class MessageServerSession : public boost::enable_shared_from_this<MessageServerSession> , public AbstractMessagingPort {
public:
- MessageServerSession( MessageHandler * handler , io_service& ioservice ) : _handler( handler ) , _socket( ioservice ){
-
- }
+ MessageServerSession( MessageHandler * handler , io_service& ioservice )
+ : _handler( handler )
+ , _socket( ioservice )
+ , _portCache(0)
+ { }
+
~MessageServerSession(){
cout << "disconnect from: " << _socket.remote_endpoint() << endl;
}
@@ -81,7 +116,20 @@ namespace mongo {
}
void handleReadBody( const boost::system::error_code& error ){
- tp.schedule(&MessageServerSession::process, shared_from_this());
+ if (!_myThread){
+ mongo::mutex::scoped_lock(tp_mutex);
+ if (!thread_pool.empty()){
+ _myThread = thread_pool.back();
+ thread_pool.pop_back();
+ }
+ }
+
+ if (!_myThread) // pool is empty
+ _myThread.reset(new StickyThread());
+
+ assert(_myThread);
+
+ _myThread->ready(shared_from_this());
}
void process(){
@@ -98,6 +146,13 @@ namespace mongo {
}
void handleWriteDone( const boost::system::error_code& error ){
+ {
+ // return thread to pool after we have sent data to the client
+ mongo::mutex::scoped_lock(tp_mutex);
+ assert(_myThread);
+ thread_pool.push_back(_myThread);
+ _myThread.reset();
+ }
_cur.reset();
_reply.reset();
_startHeaderRead();
@@ -117,7 +172,9 @@ namespace mongo {
virtual unsigned remotePort(){
- return _socket.remote_endpoint().port();
+ if (!_portCache)
+ _portCache = _socket.remote_endpoint().port(); //this is expensive
+ return _portCache;
}
private:
@@ -134,7 +191,15 @@ namespace mongo {
MsgData _inHeader;
Message _cur;
Message _reply;
+
+ unsigned _portCache;
+
+ boost::shared_ptr<StickyThread> _myThread;
};
+
+ void StickyThread::task(MessageServerSession* mss){
+ mss->process();
+ }
class AsyncMessageServer : public MessageServer {
@@ -152,6 +217,7 @@ namespace mongo {
void run(){
cout << "AsyncMessageServer starting to listen on: " << _port << endl;
+ boost::thread other(boost::bind(&io_service::run, &_ioservice));
_ioservice.run();
cout << "AsyncMessageServer done listening on: " << _port << endl;
}