summaryrefslogtreecommitdiff
path: root/util/message_server_port.cpp
diff options
context:
space:
mode:
authorAntonin Kral <a.kral@bobek.cz>2011-03-17 00:05:43 +0100
committerAntonin Kral <a.kral@bobek.cz>2011-03-17 00:05:43 +0100
commit582fc32574a3b158c81e49cb00e6ae59205e66ba (patch)
treeac64a3243e0d2121709f685695247052858115c8 /util/message_server_port.cpp
parent2761bffa96595ac1698d86bbc2e95ebb0d4d6e93 (diff)
downloadmongodb-582fc32574a3b158c81e49cb00e6ae59205e66ba.tar.gz
Imported Upstream version 1.8.0
Diffstat (limited to 'util/message_server_port.cpp')
-rw-r--r--util/message_server_port.cpp55
1 files changed, 30 insertions, 25 deletions
diff --git a/util/message_server_port.cpp b/util/message_server_port.cpp
index 9649e45..6d00628 100644
--- a/util/message_server_port.cpp
+++ b/util/message_server_port.cpp
@@ -23,29 +23,32 @@
#include "message_server.h"
#include "../db/cmdline.h"
+#include "../db/stats/counters.h"
namespace mongo {
namespace pms {
MessageHandler * handler;
-
- void threadRun( MessagingPort * inPort){
- assert( inPort );
+
+ void threadRun( MessagingPort * inPort) {
+ TicketHolderReleaser connTicketReleaser( &connTicketHolder );
+ assert( inPort );
+
setThreadName( "conn" );
- TicketHolderReleaser connTicketReleaser( &connTicketHolder );
auto_ptr<MessagingPort> p( inPort );
-
+
string otherSide;
-
+
Message m;
try {
otherSide = p->farEnd.toString();
- while ( 1 ){
+ while ( 1 ) {
m.reset();
+ p->clearCounters();
if ( ! p->recv(m) ) {
if( !cmdLine.quiet )
@@ -53,20 +56,21 @@ namespace mongo {
p->shutdown();
break;
}
-
+
handler->process( m , p.get() );
+ networkCounter.hit( p->getBytesIn() , p->getBytesOut() );
}
}
- catch ( const SocketException& ){
+ catch ( const SocketException& ) {
log() << "unclean socket shutdown from: " << otherSide << endl;
}
- catch ( const std::exception& e ){
+ catch ( const std::exception& e ) {
problem() << "uncaught exception (" << e.what() << ")(" << demangleName( typeid(e) ) <<") in PortMessageServer::threadRun, closing connection" << endl;
}
- catch ( ... ){
+ catch ( ... ) {
problem() << "uncaught exception in PortMessageServer::threadRun, closing connection" << endl;
- }
-
+ }
+
handler->disconnected( p.get() );
}
@@ -74,16 +78,16 @@ namespace mongo {
class PortMessageServer : public MessageServer , public Listener {
public:
- PortMessageServer( const MessageServer::Options& opts, MessageHandler * handler ) :
- Listener( opts.ipList, opts.port ){
-
+ PortMessageServer( const MessageServer::Options& opts, MessageHandler * handler ) :
+ Listener( opts.ipList, opts.port ) {
+
uassert( 10275 , "multiple PortMessageServer not supported" , ! pms::handler );
pms::handler = handler;
}
-
+
virtual void accepted(MessagingPort * p) {
-
- if ( ! connTicketHolder.tryAcquire() ){
+
+ if ( ! connTicketHolder.tryAcquire() ) {
log() << "connection refused because too many open connections: " << connTicketHolder.used() << endl;
// TODO: would be nice if we notified them...
@@ -97,7 +101,8 @@ namespace mongo {
try {
boost::thread thr( boost::bind( &pms::threadRun , p ) );
}
- catch ( boost::thread_resource_error& ){
+ catch ( boost::thread_resource_error& ) {
+ connTicketHolder.release();
log() << "can't create new thread, closing connection" << endl;
p->shutdown();
@@ -106,21 +111,21 @@ namespace mongo {
sleepmillis(2);
}
}
-
- virtual void setAsTimeTracker(){
+
+ virtual void setAsTimeTracker() {
Listener::setAsTimeTracker();
}
- void run(){
+ void run() {
initAndListen();
}
};
- MessageServer * createServer( const MessageServer::Options& opts , MessageHandler * handler ){
+ MessageServer * createServer( const MessageServer::Options& opts , MessageHandler * handler ) {
return new PortMessageServer( opts , handler );
- }
+ }
}