summaryrefslogtreecommitdiff
path: root/client/dbclient_rs.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'client/dbclient_rs.cpp')
-rw-r--r--client/dbclient_rs.cpp191
1 files changed, 155 insertions, 36 deletions
diff --git a/client/dbclient_rs.cpp b/client/dbclient_rs.cpp
index c57a52d..0189700 100644
--- a/client/dbclient_rs.cpp
+++ b/client/dbclient_rs.cpp
@@ -72,6 +72,15 @@ namespace mongo {
} replicaSetMonitorWatcher;
+ string seedString( const vector<HostAndPort>& servers ){
+ string seedStr;
+ for ( unsigned i = 0; i < servers.size(); i++ ){
+ seedStr += servers[i].toString();
+ if( i < servers.size() - 1 ) seedStr += ",";
+ }
+
+ return seedStr;
+ }
ReplicaSetMonitor::ReplicaSetMonitor( const string& name , const vector<HostAndPort>& servers )
: _lock( "ReplicaSetMonitor instance" ) , _checkConnectionLock( "ReplicaSetMonitor check connection lock" ), _name( name ) , _master(-1), _nextSlave(0) {
@@ -82,28 +91,36 @@ namespace mongo {
warning() << "replica set name empty, first node: " << servers[0] << endl;
}
- string errmsg;
+ log() << "starting new replica set monitor for replica set " << _name << " with seed of " << seedString( servers ) << endl;
- for ( unsigned i=0; i<servers.size(); i++ ) {
+ string errmsg;
+ for ( unsigned i = 0; i < servers.size(); i++ ) {
- bool haveAlready = false;
- for ( unsigned n = 0; n < _nodes.size() && ! haveAlready; n++ )
- haveAlready = ( _nodes[n].addr == servers[i] );
- if( haveAlready ) continue;
+ // Don't check servers we have already
+ if( _find_inlock( servers[i] ) >= 0 ) continue;
auto_ptr<DBClientConnection> conn( new DBClientConnection( true , 0, 5.0 ) );
- if (!conn->connect( servers[i] , errmsg ) ) {
- log(1) << "error connecting to seed " << servers[i] << ": " << errmsg << endl;
+ try{
+ if( ! conn->connect( servers[i] , errmsg ) ){
+ throw DBException( errmsg, 15928 );
+ }
+ log() << "successfully connected to seed " << servers[i] << " for replica set " << this->_name << endl;
+ }
+ catch( DBException& e ){
+ log() << "error connecting to seed " << servers[i] << causedBy( e ) << endl;
// skip seeds that don't work
continue;
}
- _nodes.push_back( Node( servers[i] , conn.release() ) );
-
- int myLoc = _nodes.size() - 1;
string maybePrimary;
- _checkConnection( _nodes[myLoc].conn.get() , maybePrimary, false, myLoc );
+ _checkConnection( conn.get(), maybePrimary, false, -1 );
}
+
+ // Check everything to get the first data
+ _check( true );
+
+ log() << "replica set monitor for replica set " << _name << " started, address is " << getServerAddress() << endl;
+
}
ReplicaSetMonitor::~ReplicaSetMonitor() {
@@ -164,18 +181,21 @@ namespace mongo {
}
string ReplicaSetMonitor::getServerAddress() const {
+ scoped_lock lk( _lock );
+ return _getServerAddress_inlock();
+ }
+
+ string ReplicaSetMonitor::_getServerAddress_inlock() const {
StringBuilder ss;
if ( _name.size() )
ss << _name << "/";
- {
- scoped_lock lk( _lock );
- for ( unsigned i=0; i<_nodes.size(); i++ ) {
- if ( i > 0 )
- ss << ",";
- ss << _nodes[i].addr.toString();
- }
+ for ( unsigned i=0; i<_nodes.size(); i++ ) {
+ if ( i > 0 )
+ ss << ",";
+ ss << _nodes[i].addr.toString();
}
+
return ss.str();
}
@@ -313,34 +333,130 @@ namespace mongo {
}
}
- void ReplicaSetMonitor::_checkHosts( const BSONObj& hostList, bool& changed ) {
+ NodeDiff ReplicaSetMonitor::_getHostDiff_inlock( const BSONObj& hostList ){
+
+ NodeDiff diff;
+ set<int> nodesFound;
+
+ int index = 0;
+ BSONObjIterator hi( hostList );
+ while( hi.more() ){
+
+ string toCheck = hi.next().String();
+ int nodeIndex = _find_inlock( toCheck );
+
+ // Node-to-add
+ if( nodeIndex < 0 ) diff.first.insert( toCheck );
+ else nodesFound.insert( nodeIndex );
+
+ index++;
+ }
+
+ for( size_t i = 0; i < _nodes.size(); i++ ){
+ if( nodesFound.find( static_cast<int>(i) ) == nodesFound.end() ) diff.second.insert( static_cast<int>(i) );
+ }
+
+ return diff;
+ }
+
+ bool ReplicaSetMonitor::_shouldChangeHosts( const BSONObj& hostList, bool inlock ){
+
+ int origHosts = 0;
+ if( ! inlock ){
+ scoped_lock lk( _lock );
+ origHosts = _nodes.size();
+ }
+ else origHosts = _nodes.size();
+ int numHosts = 0;
+ bool changed = false;
+
BSONObjIterator hi(hostList);
while ( hi.more() ) {
string toCheck = hi.next().String();
- if ( _find( toCheck ) >= 0 )
- continue;
+ numHosts++;
+ int index = 0;
+ if( ! inlock ) index = _find( toCheck );
+ else index = _find_inlock( toCheck );
+
+ if ( index >= 0 ) continue;
+
+ changed = true;
+ break;
+ }
+
+ return changed || origHosts != numHosts;
+
+ }
+
+ void ReplicaSetMonitor::_checkHosts( const BSONObj& hostList, bool& changed ) {
+
+ // Fast path, still requires intermittent locking
+ if( ! _shouldChangeHosts( hostList, false ) ){
+ changed = false;
+ return;
+ }
+
+ // Slow path, double-checked though
+ scoped_lock lk( _lock );
+
+ // Our host list may have changed while waiting for another thread in the meantime,
+ // so double-check here
+ // TODO: Do we really need this much protection, this should be pretty rare and not triggered
+ // from lots of threads, duping old behavior for safety
+ if( ! _shouldChangeHosts( hostList, true ) ){
+ changed = false;
+ return;
+ }
+
+ // LogLevel can be pretty low, since replica set reconfiguration should be pretty rare and we
+ // want to record our changes
+ log() << "changing hosts to " << hostList << " from " << _getServerAddress_inlock() << endl;
- HostAndPort h( toCheck );
+ NodeDiff diff = _getHostDiff_inlock( hostList );
+ set<string> added = diff.first;
+ set<int> removed = diff.second;
+
+ assert( added.size() > 0 || removed.size() > 0 );
+ changed = true;
+
+ // Delete from the end so we don't invalidate as we delete, delete indices are ascending
+ for( set<int>::reverse_iterator i = removed.rbegin(), end = removed.rend(); i != end; ++i ){
+
+ log() << "erasing host " << _nodes[ *i ] << " from replica set " << this->_name << endl;
+
+ _nodes.erase( _nodes.begin() + *i );
+ }
+
+ // Add new nodes
+ for( set<string>::iterator i = added.begin(), end = added.end(); i != end; ++i ){
+
+ log() << "trying to add new host " << *i << " to replica set " << this->_name << endl;
+
+ // Connect to new node
+ HostAndPort h( *i );
DBClientConnection * newConn = new DBClientConnection( true, 0, 5.0 );
- string temp;
- newConn->connect( h , temp );
- {
- scoped_lock lk( _lock );
- if ( _find_inlock( toCheck ) >= 0 ) {
- // we need this check inside the lock so there isn't thread contention on adding to vector
- continue;
+
+ string errmsg;
+ try{
+ if( ! newConn->connect( h , errmsg ) ){
+ throw DBException( errmsg, 15927 );
}
- _nodes.push_back( Node( h , newConn ) );
+ log() << "successfully connected to new host " << *i << " in replica set " << this->_name << endl;
}
- log() << "updated set (" << _name << ") to: " << getServerAddress() << endl;
- changed = true;
+ catch( DBException& e ){
+ warning() << "cannot connect to new host " << *i << " to replica set " << this->_name << causedBy( e ) << endl;
+ }
+
+ _nodes.push_back( Node( h , newConn ) );
}
+
}
bool ReplicaSetMonitor::_checkConnection( DBClientConnection * c , string& maybePrimary , bool verbose , int nodesOffset ) {
+ assert( c );
scoped_lock lk( _checkConnectionLock );
bool isMaster = false;
bool changed = false;
@@ -348,7 +464,6 @@ namespace mongo {
Timer t;
BSONObj o;
c->isMaster(isMaster, &o);
-
if ( o["setName"].type() != String || o["setName"].String() != _name ) {
warning() << "node: " << c->getServerAddress() << " isn't a part of set: " << _name
<< " ismaster: " << o << endl;
@@ -369,16 +484,20 @@ namespace mongo {
log( ! verbose ) << "ReplicaSetMonitor::_checkConnection: " << c->toString() << ' ' << o << endl;
// add other nodes
+ BSONArrayBuilder b;
if ( o["hosts"].type() == Array ) {
if ( o["primary"].type() == String )
maybePrimary = o["primary"].String();
- _checkHosts(o["hosts"].Obj(), changed);
+ BSONObjIterator it( o["hosts"].Obj() );
+ while( it.more() ) b.append( it.next() );
}
if (o.hasField("passives") && o["passives"].type() == Array) {
- _checkHosts(o["passives"].Obj(), changed);
+ BSONObjIterator it( o["passives"].Obj() );
+ while( it.more() ) b.append( it.next() );
}
+ _checkHosts( b.arr(), changed);
_checkStatus(c);