summaryrefslogtreecommitdiff
path: root/util/message.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'util/message.cpp')
-rw-r--r--util/message.cpp41
1 files changed, 15 insertions, 26 deletions
diff --git a/util/message.cpp b/util/message.cpp
index cfff420..a809c1f 100644
--- a/util/message.cpp
+++ b/util/message.cpp
@@ -201,10 +201,12 @@ namespace mongo {
if ( x == ECONNABORTED || x == EBADF ) {
log() << "Listener on port " << _port << " aborted" << endl;
return;
- } if ( x == 0 && inShutdown() ){
+ }
+ if ( x == 0 && inShutdown() ) {
return; // socket closed
}
- log() << "Listener: accept() returns " << s << " " << errnoWithDescription(x) << endl;
+ if( !inShutdown() )
+ log() << "Listener: accept() returns " << s << " " << errnoWithDescription(x) << endl;
continue;
}
if (from.getType() != AF_UNIX)
@@ -255,14 +257,10 @@ namespace mongo {
_cur = _buf;
}
- int len() {
- return _cur - _buf;
- }
+ int len() const { return _cur - _buf; }
private:
-
MessagingPort* _port;
-
char * _buf;
char * _cur;
};
@@ -272,10 +270,13 @@ namespace mongo {
mongo::mutex m;
public:
Ports() : ports(), m("Ports") {}
- void closeAll() { \
+ void closeAll(unsigned skip_mask) {
scoped_lock bl(m);
- for ( set<MessagingPort*>::iterator i = ports.begin(); i != ports.end(); i++ )
+ for ( set<MessagingPort*>::iterator i = ports.begin(); i != ports.end(); i++ ) {
+ if( (*i)->tag & skip_mask )
+ continue;
(*i)->shutdown();
+ }
}
void insert(MessagingPort* p) {
scoped_lock bl(m);
@@ -291,18 +292,16 @@ namespace mongo {
// are being destructed during termination.
Ports& ports = *(new Ports());
-
-
- void closeAllSockets() {
- ports.closeAll();
+ void MessagingPort::closeAllSockets(unsigned mask) {
+ ports.closeAll(mask);
}
- MessagingPort::MessagingPort(int _sock, const SockAddr& _far) : sock(_sock), piggyBackData(0), farEnd(_far), _timeout() {
+ MessagingPort::MessagingPort(int _sock, const SockAddr& _far) : sock(_sock), piggyBackData(0), farEnd(_far), _timeout(), tag(0) {
_logLevel = 0;
ports.insert(this);
}
- MessagingPort::MessagingPort( int timeout, int ll ) {
+ MessagingPort::MessagingPort( int timeout, int ll ) : tag(0) {
_logLevel = ll;
ports.insert(this);
sock = -1;
@@ -341,7 +340,7 @@ namespace mongo {
sock = socket(farEnd.getType(), SOCK_STREAM, 0);
if ( sock == INVALID_SOCKET ) {
- log(_logLevel) << "ERROR: connect(): invalid socket? " << errnoWithDescription() << endl;
+ log(_logLevel) << "ERROR: connect invalid socket " << errnoWithDescription() << endl;
return false;
}
@@ -630,7 +629,6 @@ namespace mongo {
MSGID NextMsgId;
- bool usingClientIds = 0;
ThreadLocalValue<int> clientId;
struct MsgStart {
@@ -642,12 +640,6 @@ namespace mongo {
MSGID nextMessageId(){
MSGID msgid = NextMsgId++;
-
- if ( usingClientIds ){
- msgid = msgid & 0xFFFF;
- msgid = msgid | clientId.get();
- }
-
return msgid;
}
@@ -656,9 +648,6 @@ namespace mongo {
}
void setClientId( int id ){
- usingClientIds = true;
- id = id & 0xFFFF0000;
- massert( 10445 , "invalid id" , id );
clientId.set( id );
}