summaryrefslogtreecommitdiff
path: root/util/message_server_asio.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'util/message_server_asio.cpp')
-rw-r--r--util/message_server_asio.cpp112
1 files changed, 56 insertions, 56 deletions
diff --git a/util/message_server_asio.cpp b/util/message_server_asio.cpp
index 0c9479c..0c6a7d9 100644
--- a/util/message_server_asio.cpp
+++ b/util/message_server_asio.cpp
@@ -37,29 +37,29 @@ namespace mongo {
class MessageServerSession;
namespace {
- class StickyThread{
+ class StickyThread {
public:
StickyThread()
: _thread(boost::ref(*this))
{}
- ~StickyThread(){
+ ~StickyThread() {
_mss.put(boost::shared_ptr<MessageServerSession>());
_thread.join();
}
- void ready(boost::shared_ptr<MessageServerSession> mss){
+ void ready(boost::shared_ptr<MessageServerSession> mss) {
_mss.put(mss);
}
- void operator() (){
+ void operator() () {
boost::shared_ptr<MessageServerSession> mss;
- while((mss = _mss.take())){ // intentionally not using ==
+ while((mss = _mss.take())) { // intentionally not using ==
task(mss.get());
mss.reset();
}
}
-
+
private:
boost::thread _thread;
inline void task(MessageServerSession* mss); // must be defined after MessageServerSession
@@ -79,34 +79,34 @@ namespace mongo {
, _portCache(0)
{ }
- ~MessageServerSession(){
+ ~MessageServerSession() {
cout << "disconnect from: " << _socket.remote_endpoint() << endl;
}
- tcp::socket& socket(){
+ tcp::socket& socket() {
return _socket;
}
- void start(){
+ void start() {
cout << "MessageServerSession start from:" << _socket.remote_endpoint() << endl;
_startHeaderRead();
}
-
- void handleReadHeader( const boost::system::error_code& error ){
+
+ void handleReadHeader( const boost::system::error_code& error ) {
if ( _inHeader.len == 0 )
return;
- if ( ! _inHeader.valid() ){
+ if ( ! _inHeader.valid() ) {
cout << " got invalid header from: " << _socket.remote_endpoint() << " closing connected" << endl;
return;
}
-
+
char * raw = (char*)malloc( _inHeader.len );
-
+
MsgData * data = (MsgData*)raw;
memcpy( data , &_inHeader , sizeof( _inHeader ) );
assert( data->len == _inHeader.len );
-
+
uassert( 10273 , "_cur not empty! pipelining requests not supported" , ! _cur.data );
_cur.setData( data , true );
@@ -114,11 +114,11 @@ namespace mongo {
buffer( raw + sizeof( _inHeader ) , _inHeader.len - sizeof( _inHeader ) ) ,
boost::bind( &MessageServerSession::handleReadBody , shared_from_this() , boost::asio::placeholders::error ) );
}
-
- void handleReadBody( const boost::system::error_code& error ){
- if (!_myThread){
+
+ void handleReadBody( const boost::system::error_code& error ) {
+ if (!_myThread) {
mongo::mutex::scoped_lock(tp_mutex);
- if (!thread_pool.empty()){
+ if (!thread_pool.empty()) {
_myThread = thread_pool.back();
thread_pool.pop_back();
}
@@ -132,20 +132,21 @@ namespace mongo {
_myThread->ready(shared_from_this());
}
- void process(){
+ void process() {
_handler->process( _cur , this );
- if (_reply.data){
+ if (_reply.data) {
async_write( _socket ,
buffer( (char*)_reply.data , _reply.data->len ) ,
boost::bind( &MessageServerSession::handleWriteDone , shared_from_this() , boost::asio::placeholders::error ) );
- } else {
+ }
+ else {
_cur.reset();
_startHeaderRead();
}
}
-
- void handleWriteDone( const boost::system::error_code& error ){
+
+ void handleWriteDone( const boost::system::error_code& error ) {
{
// return thread to pool after we have sent data to the client
mongo::mutex::scoped_lock(tp_mutex);
@@ -157,12 +158,12 @@ namespace mongo {
_reply.reset();
_startHeaderRead();
}
-
- virtual void reply( Message& received, Message& response ){
+
+ virtual void reply( Message& received, Message& response ) {
reply( received , response , received.data->id );
}
-
- virtual void reply( Message& query , Message& toSend, MSGID responseTo ){
+
+ virtual void reply( Message& query , Message& toSend, MSGID responseTo ) {
_reply = toSend;
_reply.data->id = nextMessageId();
@@ -170,22 +171,22 @@ namespace mongo {
uassert( 10274 , "pipelining requests doesn't work yet" , query.data->id == _cur.data->id );
}
-
- virtual unsigned remotePort(){
+
+ virtual unsigned remotePort() {
if (!_portCache)
_portCache = _socket.remote_endpoint().port(); //this is expensive
return _portCache;
}
-
- private:
-
- void _startHeaderRead(){
+
+ private:
+
+ void _startHeaderRead() {
_inHeader.len = 0;
- async_read( _socket ,
+ async_read( _socket ,
buffer( &_inHeader , sizeof( _inHeader ) ) ,
boost::bind( &MessageServerSession::handleReadHeader , shared_from_this() , boost::asio::placeholders::error ) );
}
-
+
MessageHandler * _handler;
tcp::socket _socket;
MsgData _inHeader;
@@ -197,10 +198,10 @@ namespace mongo {
boost::shared_ptr<StickyThread> _myThread;
};
- void StickyThread::task(MessageServerSession* mss){
+ void StickyThread::task(MessageServerSession* mss) {
mss->process();
}
-
+
class AsyncMessageServer : public MessageServer {
public:
@@ -209,39 +210,38 @@ namespace mongo {
: _port( opts.port )
, _handler(handler)
, _endpoint( tcp::v4() , opts.port )
- , _acceptor( _ioservice , _endpoint )
- {
+ , _acceptor( _ioservice , _endpoint ) {
_accept();
}
- virtual ~AsyncMessageServer(){
-
+ virtual ~AsyncMessageServer() {
+
}
- void run(){
+ void run() {
cout << "AsyncMessageServer starting to listen on: " << _port << endl;
boost::thread other(boost::bind(&io_service::run, &_ioservice));
_ioservice.run();
cout << "AsyncMessageServer done listening on: " << _port << endl;
}
-
- void handleAccept( shared_ptr<MessageServerSession> session ,
- const boost::system::error_code& error ){
- if ( error ){
+
+ void handleAccept( shared_ptr<MessageServerSession> session ,
+ const boost::system::error_code& error ) {
+ if ( error ) {
cout << "handleAccept error!" << endl;
return;
}
session->start();
_accept();
}
-
- void _accept( ){
+
+ void _accept( ) {
shared_ptr<MessageServerSession> session( new MessageServerSession( _handler , _ioservice ) );
_acceptor.async_accept( session->socket() ,
- boost::bind( &AsyncMessageServer::handleAccept,
- this,
- session,
- boost::asio::placeholders::error )
- );
+ boost::bind( &AsyncMessageServer::handleAccept,
+ this,
+ session,
+ boost::asio::placeholders::error )
+ );
}
private:
@@ -252,9 +252,9 @@ namespace mongo {
tcp::acceptor _acceptor;
};
- MessageServer * createServer( const MessageServer::Options& opts , MessageHandler * handler ){
+ MessageServer * createServer( const MessageServer::Options& opts , MessageHandler * handler ) {
return new AsyncMessageServer( opts , handler );
- }
+ }
}