diff options
Diffstat (limited to 'util/message.cpp')
-rw-r--r-- | util/message.cpp | 42 |
1 files changed, 26 insertions, 16 deletions
diff --git a/util/message.cpp b/util/message.cpp index 0fbc2d2..2c3d006 100644 --- a/util/message.cpp +++ b/util/message.cpp @@ -35,9 +35,11 @@ namespace mongo { #define mmm(x) #ifdef MSG_NOSIGNAL - const int portSendFlags = MSG_NOSIGNAL; + const int portSendFlags = MSG_NOSIGNAL; + const int portRecvFlags = MSG_NOSIGNAL; #else - const int portSendFlags = 0; + const int portSendFlags = 0; + const int portRecvFlags = 0; #endif /* listener ------------------------------------------------------------------- */ @@ -72,7 +74,7 @@ namespace mongo { void Listener::listen() { static long connNumber = 0; SockAddr from; - while ( 1 ) { + while ( ! inShutdown() ) { int s = accept(sock, (sockaddr *) &from.sa, &from.addressSize); if ( s < 0 ) { if ( errno == ECONNABORTED || errno == EBADF ) { @@ -117,7 +119,7 @@ namespace mongo { if ( _buf == _cur ) return 0; - int x = ::send( _port->sock , _buf , len() , 0 ); + int x = _port->send( _buf , len() ); _cur = _buf; return x; } @@ -136,23 +138,22 @@ namespace mongo { class Ports { set<MessagingPort*>& ports; - boost::mutex& m; + mongo::mutex m; public: // we "new" this so it is still be around when other automatic global vars // are being destructed during termination. - Ports() : ports( *(new set<MessagingPort*>()) ), - m( *(new boost::mutex()) ) { } + Ports() : ports( *(new set<MessagingPort*>()) ) {} void closeAll() { \ - boostlock bl(m); + scoped_lock bl(m); for ( set<MessagingPort*>::iterator i = ports.begin(); i != ports.end(); i++ ) (*i)->shutdown(); } void insert(MessagingPort* p) { - boostlock bl(m); + scoped_lock bl(m); ports.insert(p); } void erase(MessagingPort* p) { - boostlock bl(m); + scoped_lock bl(m); ports.erase(p); } } ports; @@ -263,7 +264,7 @@ again: char *lenbuf = (char *) &len; int lft = 4; while ( 1 ) { - int x = ::recv(sock, lenbuf, lft, 0); + int x = recv( lenbuf, lft ); if ( x == 0 ) { DEV out() << "MessagingPort recv() conn closed? " << farEnd.toString() << endl; m.reset(); @@ -286,7 +287,7 @@ again: if ( len == -1 ) { // Endian check from the database, after connecting, to see what mode server is running in. unsigned foo = 0x10203040; - int x = ::send(sock, (char *) &foo, 4, portSendFlags ); + int x = send( (char *) &foo, 4 ); if ( x <= 0 ) { log() << "MessagingPort endian send() " << OUTPUT_ERRNO << ' ' << farEnd.toString() << endl; return false; @@ -301,7 +302,7 @@ again: stringstream ss; ss << "HTTP/1.0 200 OK\r\nConnection: close\r\nContent-Type: text/plain\r\nContent-Length: " << msg.size() << "\r\n\r\n" << msg; string s = ss.str(); - ::send( sock , s.c_str(), s.size(), 0 ); + send( s.c_str(), s.size() ); return false; } log() << "bad recv() len: " << len << '\n'; @@ -321,7 +322,7 @@ again: char *p = (char *) &md->id; int left = len -4; while ( 1 ) { - int x = ::recv(sock, p, left, 0); + int x = recv( p, left ); if ( x == 0 ) { DEV out() << "MessagingPort::recv(): conn closed? " << farEnd.toString() << endl; m.reset(); @@ -376,6 +377,7 @@ again: } void MessagingPort::say(Message& toSend, int responseTo) { + assert( toSend.data ); mmm( out() << "* say() sock:" << this->sock << " thr:" << GetCurrentThreadId() << endl; ) toSend.data->id = nextMessageId(); toSend.data->responseTo = responseTo; @@ -395,7 +397,7 @@ again: } if ( x == -100 ) - x = ::send(sock, (char*)toSend.data, toSend.data->len , portSendFlags ); + x = send( (char*)toSend.data, toSend.data->len ); if ( x <= 0 ) { log() << "MessagingPort say send() " << OUTPUT_ERRNO << ' ' << farEnd.toString() << endl; @@ -404,6 +406,14 @@ again: } + int MessagingPort::send( const char * data , const int len ){ + return ::send( sock , data , len , portSendFlags ); + } + + int MessagingPort::recv( char * buf , int max ){ + return ::recv( sock , buf , max , portRecvFlags ); + } + void MessagingPort::piggyBack( Message& toSend , int responseTo ) { if ( toSend.data->len > 1300 ) { @@ -438,7 +448,7 @@ again: } msgstart; MSGID nextMessageId(){ - MSGID msgid = NextMsgId.atomicIncrement(); + MSGID msgid = NextMsgId++; if ( usingClientIds ){ msgid = msgid & 0xFFFF; |