diff options
Diffstat (limited to 'util/message_server_asio.cpp')
-rw-r--r-- | util/message_server_asio.cpp | 112 |
1 files changed, 56 insertions, 56 deletions
diff --git a/util/message_server_asio.cpp b/util/message_server_asio.cpp index 0c9479c..0c6a7d9 100644 --- a/util/message_server_asio.cpp +++ b/util/message_server_asio.cpp @@ -37,29 +37,29 @@ namespace mongo { class MessageServerSession; namespace { - class StickyThread{ + class StickyThread { public: StickyThread() : _thread(boost::ref(*this)) {} - ~StickyThread(){ + ~StickyThread() { _mss.put(boost::shared_ptr<MessageServerSession>()); _thread.join(); } - void ready(boost::shared_ptr<MessageServerSession> mss){ + void ready(boost::shared_ptr<MessageServerSession> mss) { _mss.put(mss); } - void operator() (){ + void operator() () { boost::shared_ptr<MessageServerSession> mss; - while((mss = _mss.take())){ // intentionally not using == + while((mss = _mss.take())) { // intentionally not using == task(mss.get()); mss.reset(); } } - + private: boost::thread _thread; inline void task(MessageServerSession* mss); // must be defined after MessageServerSession @@ -79,34 +79,34 @@ namespace mongo { , _portCache(0) { } - ~MessageServerSession(){ + ~MessageServerSession() { cout << "disconnect from: " << _socket.remote_endpoint() << endl; } - tcp::socket& socket(){ + tcp::socket& socket() { return _socket; } - void start(){ + void start() { cout << "MessageServerSession start from:" << _socket.remote_endpoint() << endl; _startHeaderRead(); } - - void handleReadHeader( const boost::system::error_code& error ){ + + void handleReadHeader( const boost::system::error_code& error ) { if ( _inHeader.len == 0 ) return; - if ( ! _inHeader.valid() ){ + if ( ! _inHeader.valid() ) { cout << " got invalid header from: " << _socket.remote_endpoint() << " closing connected" << endl; return; } - + char * raw = (char*)malloc( _inHeader.len ); - + MsgData * data = (MsgData*)raw; memcpy( data , &_inHeader , sizeof( _inHeader ) ); assert( data->len == _inHeader.len ); - + uassert( 10273 , "_cur not empty! pipelining requests not supported" , ! _cur.data ); _cur.setData( data , true ); @@ -114,11 +114,11 @@ namespace mongo { buffer( raw + sizeof( _inHeader ) , _inHeader.len - sizeof( _inHeader ) ) , boost::bind( &MessageServerSession::handleReadBody , shared_from_this() , boost::asio::placeholders::error ) ); } - - void handleReadBody( const boost::system::error_code& error ){ - if (!_myThread){ + + void handleReadBody( const boost::system::error_code& error ) { + if (!_myThread) { mongo::mutex::scoped_lock(tp_mutex); - if (!thread_pool.empty()){ + if (!thread_pool.empty()) { _myThread = thread_pool.back(); thread_pool.pop_back(); } @@ -132,20 +132,21 @@ namespace mongo { _myThread->ready(shared_from_this()); } - void process(){ + void process() { _handler->process( _cur , this ); - if (_reply.data){ + if (_reply.data) { async_write( _socket , buffer( (char*)_reply.data , _reply.data->len ) , boost::bind( &MessageServerSession::handleWriteDone , shared_from_this() , boost::asio::placeholders::error ) ); - } else { + } + else { _cur.reset(); _startHeaderRead(); } } - - void handleWriteDone( const boost::system::error_code& error ){ + + void handleWriteDone( const boost::system::error_code& error ) { { // return thread to pool after we have sent data to the client mongo::mutex::scoped_lock(tp_mutex); @@ -157,12 +158,12 @@ namespace mongo { _reply.reset(); _startHeaderRead(); } - - virtual void reply( Message& received, Message& response ){ + + virtual void reply( Message& received, Message& response ) { reply( received , response , received.data->id ); } - - virtual void reply( Message& query , Message& toSend, MSGID responseTo ){ + + virtual void reply( Message& query , Message& toSend, MSGID responseTo ) { _reply = toSend; _reply.data->id = nextMessageId(); @@ -170,22 +171,22 @@ namespace mongo { uassert( 10274 , "pipelining requests doesn't work yet" , query.data->id == _cur.data->id ); } - - virtual unsigned remotePort(){ + + virtual unsigned remotePort() { if (!_portCache) _portCache = _socket.remote_endpoint().port(); //this is expensive return _portCache; } - - private: - - void _startHeaderRead(){ + + private: + + void _startHeaderRead() { _inHeader.len = 0; - async_read( _socket , + async_read( _socket , buffer( &_inHeader , sizeof( _inHeader ) ) , boost::bind( &MessageServerSession::handleReadHeader , shared_from_this() , boost::asio::placeholders::error ) ); } - + MessageHandler * _handler; tcp::socket _socket; MsgData _inHeader; @@ -197,10 +198,10 @@ namespace mongo { boost::shared_ptr<StickyThread> _myThread; }; - void StickyThread::task(MessageServerSession* mss){ + void StickyThread::task(MessageServerSession* mss) { mss->process(); } - + class AsyncMessageServer : public MessageServer { public: @@ -209,39 +210,38 @@ namespace mongo { : _port( opts.port ) , _handler(handler) , _endpoint( tcp::v4() , opts.port ) - , _acceptor( _ioservice , _endpoint ) - { + , _acceptor( _ioservice , _endpoint ) { _accept(); } - virtual ~AsyncMessageServer(){ - + virtual ~AsyncMessageServer() { + } - void run(){ + void run() { cout << "AsyncMessageServer starting to listen on: " << _port << endl; boost::thread other(boost::bind(&io_service::run, &_ioservice)); _ioservice.run(); cout << "AsyncMessageServer done listening on: " << _port << endl; } - - void handleAccept( shared_ptr<MessageServerSession> session , - const boost::system::error_code& error ){ - if ( error ){ + + void handleAccept( shared_ptr<MessageServerSession> session , + const boost::system::error_code& error ) { + if ( error ) { cout << "handleAccept error!" << endl; return; } session->start(); _accept(); } - - void _accept( ){ + + void _accept( ) { shared_ptr<MessageServerSession> session( new MessageServerSession( _handler , _ioservice ) ); _acceptor.async_accept( session->socket() , - boost::bind( &AsyncMessageServer::handleAccept, - this, - session, - boost::asio::placeholders::error ) - ); + boost::bind( &AsyncMessageServer::handleAccept, + this, + session, + boost::asio::placeholders::error ) + ); } private: @@ -252,9 +252,9 @@ namespace mongo { tcp::acceptor _acceptor; }; - MessageServer * createServer( const MessageServer::Options& opts , MessageHandler * handler ){ + MessageServer * createServer( const MessageServer::Options& opts , MessageHandler * handler ) { return new AsyncMessageServer( opts , handler ); - } + } } |