summaryrefslogtreecommitdiff
path: root/util/message.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'util/message.cpp')
-rw-r--r--util/message.cpp42
1 files changed, 26 insertions, 16 deletions
diff --git a/util/message.cpp b/util/message.cpp
index 0fbc2d2..2c3d006 100644
--- a/util/message.cpp
+++ b/util/message.cpp
@@ -35,9 +35,11 @@ namespace mongo {
#define mmm(x)
#ifdef MSG_NOSIGNAL
- const int portSendFlags = MSG_NOSIGNAL;
+ const int portSendFlags = MSG_NOSIGNAL;
+ const int portRecvFlags = MSG_NOSIGNAL;
#else
- const int portSendFlags = 0;
+ const int portSendFlags = 0;
+ const int portRecvFlags = 0;
#endif
/* listener ------------------------------------------------------------------- */
@@ -72,7 +74,7 @@ namespace mongo {
void Listener::listen() {
static long connNumber = 0;
SockAddr from;
- while ( 1 ) {
+ while ( ! inShutdown() ) {
int s = accept(sock, (sockaddr *) &from.sa, &from.addressSize);
if ( s < 0 ) {
if ( errno == ECONNABORTED || errno == EBADF ) {
@@ -117,7 +119,7 @@ namespace mongo {
if ( _buf == _cur )
return 0;
- int x = ::send( _port->sock , _buf , len() , 0 );
+ int x = _port->send( _buf , len() );
_cur = _buf;
return x;
}
@@ -136,23 +138,22 @@ namespace mongo {
class Ports {
set<MessagingPort*>& ports;
- boost::mutex& m;
+ mongo::mutex m;
public:
// we "new" this so it is still be around when other automatic global vars
// are being destructed during termination.
- Ports() : ports( *(new set<MessagingPort*>()) ),
- m( *(new boost::mutex()) ) { }
+ Ports() : ports( *(new set<MessagingPort*>()) ) {}
void closeAll() { \
- boostlock bl(m);
+ scoped_lock bl(m);
for ( set<MessagingPort*>::iterator i = ports.begin(); i != ports.end(); i++ )
(*i)->shutdown();
}
void insert(MessagingPort* p) {
- boostlock bl(m);
+ scoped_lock bl(m);
ports.insert(p);
}
void erase(MessagingPort* p) {
- boostlock bl(m);
+ scoped_lock bl(m);
ports.erase(p);
}
} ports;
@@ -263,7 +264,7 @@ again:
char *lenbuf = (char *) &len;
int lft = 4;
while ( 1 ) {
- int x = ::recv(sock, lenbuf, lft, 0);
+ int x = recv( lenbuf, lft );
if ( x == 0 ) {
DEV out() << "MessagingPort recv() conn closed? " << farEnd.toString() << endl;
m.reset();
@@ -286,7 +287,7 @@ again:
if ( len == -1 ) {
// Endian check from the database, after connecting, to see what mode server is running in.
unsigned foo = 0x10203040;
- int x = ::send(sock, (char *) &foo, 4, portSendFlags );
+ int x = send( (char *) &foo, 4 );
if ( x <= 0 ) {
log() << "MessagingPort endian send() " << OUTPUT_ERRNO << ' ' << farEnd.toString() << endl;
return false;
@@ -301,7 +302,7 @@ again:
stringstream ss;
ss << "HTTP/1.0 200 OK\r\nConnection: close\r\nContent-Type: text/plain\r\nContent-Length: " << msg.size() << "\r\n\r\n" << msg;
string s = ss.str();
- ::send( sock , s.c_str(), s.size(), 0 );
+ send( s.c_str(), s.size() );
return false;
}
log() << "bad recv() len: " << len << '\n';
@@ -321,7 +322,7 @@ again:
char *p = (char *) &md->id;
int left = len -4;
while ( 1 ) {
- int x = ::recv(sock, p, left, 0);
+ int x = recv( p, left );
if ( x == 0 ) {
DEV out() << "MessagingPort::recv(): conn closed? " << farEnd.toString() << endl;
m.reset();
@@ -376,6 +377,7 @@ again:
}
void MessagingPort::say(Message& toSend, int responseTo) {
+ assert( toSend.data );
mmm( out() << "* say() sock:" << this->sock << " thr:" << GetCurrentThreadId() << endl; )
toSend.data->id = nextMessageId();
toSend.data->responseTo = responseTo;
@@ -395,7 +397,7 @@ again:
}
if ( x == -100 )
- x = ::send(sock, (char*)toSend.data, toSend.data->len , portSendFlags );
+ x = send( (char*)toSend.data, toSend.data->len );
if ( x <= 0 ) {
log() << "MessagingPort say send() " << OUTPUT_ERRNO << ' ' << farEnd.toString() << endl;
@@ -404,6 +406,14 @@ again:
}
+ int MessagingPort::send( const char * data , const int len ){
+ return ::send( sock , data , len , portSendFlags );
+ }
+
+ int MessagingPort::recv( char * buf , int max ){
+ return ::recv( sock , buf , max , portRecvFlags );
+ }
+
void MessagingPort::piggyBack( Message& toSend , int responseTo ) {
if ( toSend.data->len > 1300 ) {
@@ -438,7 +448,7 @@ again:
} msgstart;
MSGID nextMessageId(){
- MSGID msgid = NextMsgId.atomicIncrement();
+ MSGID msgid = NextMsgId++;
if ( usingClientIds ){
msgid = msgid & 0xFFFF;