From 0ca01a91ae0a3562e54c226e7b9512feb2ea83d0 Mon Sep 17 00:00:00 2001 From: Antonin Kral Date: Thu, 25 Mar 2010 19:21:32 +0100 Subject: Imported Upstream version 1.4.0 --- util/message_server_asio.cpp | 82 +++++++++++++++++++++++++++++++++++++++----- 1 file changed, 74 insertions(+), 8 deletions(-) (limited to 'util/message_server_asio.cpp') diff --git a/util/message_server_asio.cpp b/util/message_server_asio.cpp index 4d5fab0..7fca29a 100644 --- a/util/message_server_asio.cpp +++ b/util/message_server_asio.cpp @@ -27,23 +27,58 @@ #include "message.h" #include "message_server.h" -#include "../util/thread_pool.h" +#include "../util/mvar.h" using namespace boost; using namespace boost::asio; using namespace boost::asio::ip; -//using namespace std; namespace mongo { + class MessageServerSession; + namespace { - ThreadPool tp; + class StickyThread{ + public: + StickyThread() + : _thread(boost::ref(*this)) + {} + + ~StickyThread(){ + _mss.put(boost::shared_ptr()); + _thread.join(); + } + + void ready(boost::shared_ptr mss){ + _mss.put(mss); + } + + void operator() (){ + boost::shared_ptr mss; + while((mss = _mss.take())){ // intentionally not using == + task(mss.get()); + mss.reset(); + } + } + + private: + boost::thread _thread; + inline void task(MessageServerSession* mss); // must be defined after MessageServerSession + + MVar > _mss; // populated when given a task + }; + + vector > thread_pool; + mongo::mutex tp_mutex; // this is only needed if io_service::run() is called from multiple threads } class MessageServerSession : public boost::enable_shared_from_this , public AbstractMessagingPort { public: - MessageServerSession( MessageHandler * handler , io_service& ioservice ) : _handler( handler ) , _socket( ioservice ){ - - } + MessageServerSession( MessageHandler * handler , io_service& ioservice ) + : _handler( handler ) + , _socket( ioservice ) + , _portCache(0) + { } + ~MessageServerSession(){ cout << "disconnect from: " << _socket.remote_endpoint() << endl; } @@ -81,7 +116,20 @@ namespace mongo { } void handleReadBody( const boost::system::error_code& error ){ - tp.schedule(&MessageServerSession::process, shared_from_this()); + if (!_myThread){ + mongo::mutex::scoped_lock(tp_mutex); + if (!thread_pool.empty()){ + _myThread = thread_pool.back(); + thread_pool.pop_back(); + } + } + + if (!_myThread) // pool is empty + _myThread.reset(new StickyThread()); + + assert(_myThread); + + _myThread->ready(shared_from_this()); } void process(){ @@ -98,6 +146,13 @@ namespace mongo { } void handleWriteDone( const boost::system::error_code& error ){ + { + // return thread to pool after we have sent data to the client + mongo::mutex::scoped_lock(tp_mutex); + assert(_myThread); + thread_pool.push_back(_myThread); + _myThread.reset(); + } _cur.reset(); _reply.reset(); _startHeaderRead(); @@ -117,7 +172,9 @@ namespace mongo { virtual unsigned remotePort(){ - return _socket.remote_endpoint().port(); + if (!_portCache) + _portCache = _socket.remote_endpoint().port(); //this is expensive + return _portCache; } private: @@ -134,7 +191,15 @@ namespace mongo { MsgData _inHeader; Message _cur; Message _reply; + + unsigned _portCache; + + boost::shared_ptr _myThread; }; + + void StickyThread::task(MessageServerSession* mss){ + mss->process(); + } class AsyncMessageServer : public MessageServer { @@ -152,6 +217,7 @@ namespace mongo { void run(){ cout << "AsyncMessageServer starting to listen on: " << _port << endl; + boost::thread other(boost::bind(&io_service::run, &_ioservice)); _ioservice.run(); cout << "AsyncMessageServer done listening on: " << _port << endl; } -- cgit v1.2.3