diff options
Diffstat (limited to 'util/message_server_asio.cpp')
-rw-r--r-- | util/message_server_asio.cpp | 82 |
1 files changed, 74 insertions, 8 deletions
diff --git a/util/message_server_asio.cpp b/util/message_server_asio.cpp index 4d5fab0..7fca29a 100644 --- a/util/message_server_asio.cpp +++ b/util/message_server_asio.cpp @@ -27,23 +27,58 @@ #include "message.h" #include "message_server.h" -#include "../util/thread_pool.h" +#include "../util/mvar.h" using namespace boost; using namespace boost::asio; using namespace boost::asio::ip; -//using namespace std; namespace mongo { + class MessageServerSession; + namespace { - ThreadPool tp; + class StickyThread{ + public: + StickyThread() + : _thread(boost::ref(*this)) + {} + + ~StickyThread(){ + _mss.put(boost::shared_ptr<MessageServerSession>()); + _thread.join(); + } + + void ready(boost::shared_ptr<MessageServerSession> mss){ + _mss.put(mss); + } + + void operator() (){ + boost::shared_ptr<MessageServerSession> mss; + while((mss = _mss.take())){ // intentionally not using == + task(mss.get()); + mss.reset(); + } + } + + private: + boost::thread _thread; + inline void task(MessageServerSession* mss); // must be defined after MessageServerSession + + MVar<boost::shared_ptr<MessageServerSession> > _mss; // populated when given a task + }; + + vector<boost::shared_ptr<StickyThread> > thread_pool; + mongo::mutex tp_mutex; // this is only needed if io_service::run() is called from multiple threads } class MessageServerSession : public boost::enable_shared_from_this<MessageServerSession> , public AbstractMessagingPort { public: - MessageServerSession( MessageHandler * handler , io_service& ioservice ) : _handler( handler ) , _socket( ioservice ){ - - } + MessageServerSession( MessageHandler * handler , io_service& ioservice ) + : _handler( handler ) + , _socket( ioservice ) + , _portCache(0) + { } + ~MessageServerSession(){ cout << "disconnect from: " << _socket.remote_endpoint() << endl; } @@ -81,7 +116,20 @@ namespace mongo { } void handleReadBody( const boost::system::error_code& error ){ - tp.schedule(&MessageServerSession::process, shared_from_this()); + if (!_myThread){ + mongo::mutex::scoped_lock(tp_mutex); + if (!thread_pool.empty()){ + _myThread = thread_pool.back(); + thread_pool.pop_back(); + } + } + + if (!_myThread) // pool is empty + _myThread.reset(new StickyThread()); + + assert(_myThread); + + _myThread->ready(shared_from_this()); } void process(){ @@ -98,6 +146,13 @@ namespace mongo { } void handleWriteDone( const boost::system::error_code& error ){ + { + // return thread to pool after we have sent data to the client + mongo::mutex::scoped_lock(tp_mutex); + assert(_myThread); + thread_pool.push_back(_myThread); + _myThread.reset(); + } _cur.reset(); _reply.reset(); _startHeaderRead(); @@ -117,7 +172,9 @@ namespace mongo { virtual unsigned remotePort(){ - return _socket.remote_endpoint().port(); + if (!_portCache) + _portCache = _socket.remote_endpoint().port(); //this is expensive + return _portCache; } private: @@ -134,7 +191,15 @@ namespace mongo { MsgData _inHeader; Message _cur; Message _reply; + + unsigned _portCache; + + boost::shared_ptr<StickyThread> _myThread; }; + + void StickyThread::task(MessageServerSession* mss){ + mss->process(); + } class AsyncMessageServer : public MessageServer { @@ -152,6 +217,7 @@ namespace mongo { void run(){ cout << "AsyncMessageServer starting to listen on: " << _port << endl; + boost::thread other(boost::bind(&io_service::run, &_ioservice)); _ioservice.run(); cout << "AsyncMessageServer done listening on: " << _port << endl; } |