diff options
Diffstat (limited to 'client/dbclient_rs.cpp')
-rw-r--r-- | client/dbclient_rs.cpp | 335 |
1 files changed, 266 insertions, 69 deletions
diff --git a/client/dbclient_rs.cpp b/client/dbclient_rs.cpp index 37f6225..2cab1f7 100644 --- a/client/dbclient_rs.cpp +++ b/client/dbclient_rs.cpp @@ -54,9 +54,9 @@ namespace mongo { void run() { log() << "starting" << endl; while ( ! inShutdown() ) { - sleepsecs( 20 ); + sleepsecs( 10 ); try { - ReplicaSetMonitor::checkAll(); + ReplicaSetMonitor::checkAll( true ); } catch ( std::exception& e ) { error() << "check failed: " << e.what() << endl; @@ -99,17 +99,14 @@ namespace mongo { } _nodes.push_back( Node( servers[i] , conn.release() ) ); - + + int myLoc = _nodes.size() - 1; string maybePrimary; - if (_checkConnection( _nodes[_nodes.size()-1].conn , maybePrimary, false)) { - break; - } + _checkConnection( _nodes[myLoc].conn.get() , maybePrimary, false, myLoc ); } } ReplicaSetMonitor::~ReplicaSetMonitor() { - for ( unsigned i=0; i<_nodes.size(); i++ ) - delete _nodes[i].conn; _nodes.clear(); _master = -1; } @@ -125,7 +122,16 @@ namespace mongo { return m; } - void ReplicaSetMonitor::checkAll() { + ReplicaSetMonitorPtr ReplicaSetMonitor::get( const string& name ) { + scoped_lock lk( _setsLock ); + map<string,ReplicaSetMonitorPtr>::const_iterator i = _sets.find( name ); + if ( i == _sets.end() ) + return ReplicaSetMonitorPtr(); + return i->second; + } + + + void ReplicaSetMonitor::checkAll( bool checkAllSecondaries ) { set<string> seen; while ( true ) { @@ -146,7 +152,7 @@ namespace mongo { if ( ! m ) break; - m->check(); + m->check( checkAllSecondaries ); } @@ -202,7 +208,7 @@ namespace mongo { return _nodes[_master].addr; } - _check(); + _check( false ); scoped_lock lk( _lock ); uassert( 10009 , str::stream() << "ReplicaSetMonitor no master found for set: " << _name , _master >= 0 ); @@ -210,34 +216,70 @@ namespace mongo { } HostAndPort ReplicaSetMonitor::getSlave( const HostAndPort& prev ) { - // make sure its valid - if ( prev.port() > 0 ) { + // make sure its valid + + bool wasFound = false; + + // This is always true, since checked in port() + assert( prev.port() >= 0 ); + if( prev.host().size() ){ scoped_lock lk( _lock ); for ( unsigned i=0; i<_nodes.size(); i++ ) { if ( prev != _nodes[i].addr ) continue; - if ( _nodes[i].ok ) + wasFound = true; + + if ( _nodes[i].okForSecondaryQueries() ) return prev; + break; } } + if( prev.host().size() ){ + if( wasFound ){ LOG(1) << "slave '" << prev << "' is no longer ok to use" << endl; } + else{ LOG(1) << "slave '" << prev << "' was not found in the replica set" << endl; } + } + else LOG(1) << "slave '" << prev << "' is not initialized or invalid" << endl; + return getSlave(); } HostAndPort ReplicaSetMonitor::getSlave() { - scoped_lock lk( _lock ); - for ( unsigned i=0; i<_nodes.size(); i++ ) { - _nextSlave = ( _nextSlave + 1 ) % _nodes.size(); - if ( _nextSlave == _master ) - continue; - if ( _nodes[ _nextSlave ].ok ) - return _nodes[ _nextSlave ].addr; + LOG(2) << "selecting new slave from replica set " << getServerAddress() << endl; + + // Logic is to retry three times for any secondary node, if we can't find any secondary, we'll take + // any "ok" node + // TODO: Could this query hidden nodes? + const int MAX = 3; + for ( int xxx=0; xxx<MAX; xxx++ ) { + + { + scoped_lock lk( _lock ); + + unsigned i = 0; + for ( ; i<_nodes.size(); i++ ) { + _nextSlave = ( _nextSlave + 1 ) % _nodes.size(); + if ( _nextSlave == _master ){ + LOG(2) << "not selecting " << _nodes[_nextSlave] << " as it is the current master" << endl; + continue; + } + if ( _nodes[ _nextSlave ].okForSecondaryQueries() || ( _nodes[ _nextSlave ].ok && ( xxx + 1 ) >= MAX ) ) + return _nodes[ _nextSlave ].addr; + + LOG(2) << "not selecting " << _nodes[_nextSlave] << " as it is not ok to use" << endl; + } + + } + + check(false); } + + LOG(2) << "no suitable slave nodes found, returning default node " << _nodes[ 0 ] << endl; - return _nodes[ 0 ].addr; + return _nodes[0].addr; } /** @@ -266,7 +308,7 @@ namespace mongo { string host = member["name"].String(); int m = -1; - if ((m = _find(host)) <= 0) { + if ((m = _find(host)) < 0) { continue; } @@ -309,16 +351,34 @@ namespace mongo { - bool ReplicaSetMonitor::_checkConnection( DBClientConnection * c , string& maybePrimary , bool verbose ) { + bool ReplicaSetMonitor::_checkConnection( DBClientConnection * c , string& maybePrimary , bool verbose , int nodesOffset ) { scoped_lock lk( _checkConnectionLock ); bool isMaster = false; bool changed = false; try { + Timer t; BSONObj o; c->isMaster(isMaster, &o); + + if ( o["setName"].type() != String || o["setName"].String() != _name ) { + warning() << "node: " << c->getServerAddress() << " isn't a part of set: " << _name + << " ismaster: " << o << endl; + if ( nodesOffset >= 0 ) + _nodes[nodesOffset].ok = false; + return false; + } - log( ! verbose ) << "ReplicaSetMonitor::_checkConnection: " << c->toString() << ' ' << o << endl; + if ( nodesOffset >= 0 ) { + _nodes[nodesOffset].pingTimeMillis = t.millis(); + _nodes[nodesOffset].hidden = o["hidden"].trueValue(); + _nodes[nodesOffset].secondary = o["secondary"].trueValue(); + _nodes[nodesOffset].ismaster = o["ismaster"].trueValue(); + + _nodes[nodesOffset].lastIsMaster = o.copy(); + } + log( ! verbose ) << "ReplicaSetMonitor::_checkConnection: " << c->toString() << ' ' << o << endl; + // add other nodes if ( o["hosts"].type() == Array ) { if ( o["primary"].type() == String ) @@ -329,11 +389,14 @@ namespace mongo { if (o.hasField("passives") && o["passives"].type() == Array) { _checkHosts(o["passives"].Obj(), changed); } - + _checkStatus(c); + + } catch ( std::exception& e ) { log( ! verbose ) << "ReplicaSetMonitor::_checkConnection: caught exception " << c->toString() << ' ' << e.what() << endl; + _nodes[nodesOffset].ok = false; } if ( changed && _hook ) @@ -342,24 +405,28 @@ namespace mongo { return isMaster; } - void ReplicaSetMonitor::_check() { + void ReplicaSetMonitor::_check( bool checkAllSecondaries ) { bool triedQuickCheck = false; LOG(1) << "_check : " << getServerAddress() << endl; + int newMaster = -1; + for ( int retry = 0; retry < 2; retry++ ) { for ( unsigned i=0; i<_nodes.size(); i++ ) { - DBClientConnection * c; + shared_ptr<DBClientConnection> c; { scoped_lock lk( _lock ); c = _nodes[i].conn; } string maybePrimary; - if ( _checkConnection( c , maybePrimary , retry ) ) { + if ( _checkConnection( c.get() , maybePrimary , retry , i ) ) { _master = i; - return; + newMaster = i; + if ( ! checkAllSecondaries ) + return; } if ( ! triedQuickCheck && maybePrimary.size() ) { @@ -367,36 +434,44 @@ namespace mongo { if ( x >= 0 ) { triedQuickCheck = true; string dummy; - DBClientConnection * testConn; + shared_ptr<DBClientConnection> testConn; { scoped_lock lk( _lock ); testConn = _nodes[x].conn; } - if ( _checkConnection( testConn , dummy , false ) ) { + if ( _checkConnection( testConn.get() , dummy , false , x ) ) { _master = x; - return; + newMaster = x; + if ( ! checkAllSecondaries ) + return; } } } } + + if ( newMaster >= 0 ) + return; + sleepsecs(1); } } - void ReplicaSetMonitor::check() { + void ReplicaSetMonitor::check( bool checkAllSecondaries ) { // first see if the current master is fine if ( _master >= 0 ) { string temp; - if ( _checkConnection( _nodes[_master].conn , temp , false ) ) { - // current master is fine, so we're done - return; + if ( _checkConnection( _nodes[_master].conn.get() , temp , false , _master ) ) { + if ( ! checkAllSecondaries ) { + // current master is fine, so we're done + return; + } } } // we either have no master, or the current is dead - _check(); + _check( checkAllSecondaries ); } int ReplicaSetMonitor::_find( const string& server ) const { @@ -419,7 +494,26 @@ namespace mongo { return i; return -1; } - + + void ReplicaSetMonitor::appendInfo( BSONObjBuilder& b ) const { + scoped_lock lk( _lock ); + BSONArrayBuilder hosts( b.subarrayStart( "hosts" ) ); + for ( unsigned i=0; i<_nodes.size(); i++ ) { + hosts.append( BSON( "addr" << _nodes[i].addr << + // "lastIsMaster" << _nodes[i].lastIsMaster << // this is a potential race, so only used when debugging + "ok" << _nodes[i].ok << + "ismaster" << _nodes[i].ismaster << + "hidden" << _nodes[i].hidden << + "secondary" << _nodes[i].secondary << + "pingTimeMillis" << _nodes[i].pingTimeMillis ) ); + + } + hosts.done(); + + b.append( "master" , _master ); + b.append( "nextSlave" , _nextSlave ); + } + mongo::mutex ReplicaSetMonitor::_setsLock( "ReplicaSetMonitor" ); map<string,ReplicaSetMonitorPtr> ReplicaSetMonitor::_sets; @@ -428,8 +522,9 @@ namespace mongo { // ----- DBClientReplicaSet --------- // -------------------------------- - DBClientReplicaSet::DBClientReplicaSet( const string& name , const vector<HostAndPort>& servers ) - : _monitor( ReplicaSetMonitor::get( name , servers ) ) { + DBClientReplicaSet::DBClientReplicaSet( const string& name , const vector<HostAndPort>& servers, double so_timeout ) + : _monitor( ReplicaSetMonitor::get( name , servers ) ), + _so_timeout( so_timeout ) { } DBClientReplicaSet::~DBClientReplicaSet() { @@ -446,7 +541,7 @@ namespace mongo { } _masterHost = _monitor->getMaster(); - _master.reset( new DBClientConnection( true , this ) ); + _master.reset( new DBClientConnection( true , this , _so_timeout ) ); string errmsg; if ( ! _master->connect( _masterHost , errmsg ) ) { _monitor->notifyFailure( _masterHost ); @@ -464,12 +559,12 @@ namespace mongo { return _slave.get(); _monitor->notifySlaveFailure( _slaveHost ); _slaveHost = _monitor->getSlave(); - } + } else { _slaveHost = h; } - _slave.reset( new DBClientConnection( true , this ) ); + _slave.reset( new DBClientConnection( true , this , _so_timeout ) ); _slave->connect( _slaveHost ); _auth( _slave.get() ); return _slave.get(); @@ -522,12 +617,12 @@ namespace mongo { // ------------- simple functions ----------------- - void DBClientReplicaSet::insert( const string &ns , BSONObj obj ) { - checkMaster()->insert(ns, obj); + void DBClientReplicaSet::insert( const string &ns , BSONObj obj , int flags) { + checkMaster()->insert(ns, obj, flags); } - void DBClientReplicaSet::insert( const string &ns, const vector< BSONObj >& v ) { - checkMaster()->insert(ns, v); + void DBClientReplicaSet::insert( const string &ns, const vector< BSONObj >& v , int flags) { + checkMaster()->insert(ns, v, flags); } void DBClientReplicaSet::remove( const string &ns , Query obj , bool justOne ) { @@ -545,12 +640,12 @@ namespace mongo { // we're ok sending to a slave // we'll try 2 slaves before just using master // checkSlave will try a different slave automatically after a failure - for ( int i=0; i<2; i++ ) { + for ( int i=0; i<3; i++ ) { try { return checkSlaveQueryResult( checkSlave()->query(ns,query,nToReturn,nToSkip,fieldsToReturn,queryOptions,batchSize) ); } catch ( DBException &e ) { - log() << "can't query replica set slave " << i << " : " << _slaveHost << e.what() << endl; + LOG(1) << "can't query replica set slave " << i << " : " << _slaveHost << causedBy( e ) << endl; } } } @@ -563,12 +658,12 @@ namespace mongo { // we're ok sending to a slave // we'll try 2 slaves before just using master // checkSlave will try a different slave automatically after a failure - for ( int i=0; i<2; i++ ) { + for ( int i=0; i<3; i++ ) { try { return checkSlave()->findOne(ns,query,fieldsToReturn,queryOptions); } catch ( DBException &e ) { - LOG(1) << "can't findone replica set slave " << i << " : " << _slaveHost << e.what() << endl; + LOG(1) << "can't findone replica set slave " << i << " : " << _slaveHost << causedBy( e ) << endl; } } } @@ -584,23 +679,22 @@ namespace mongo { assert(0); } - auto_ptr<DBClientCursor> DBClientReplicaSet::checkSlaveQueryResult( auto_ptr<DBClientCursor> result ){ + void DBClientReplicaSet::isntMaster() { + log() << "got not master for: " << _masterHost << endl; + _monitor->notifyFailure( _masterHost ); + _master.reset(); + } - bool isError = result->hasResultFlag( ResultFlag_ErrSet ); + auto_ptr<DBClientCursor> DBClientReplicaSet::checkSlaveQueryResult( auto_ptr<DBClientCursor> result ){ + BSONObj error; + bool isError = result->peekError( &error ); if( ! isError ) return result; - BSONObj error = result->peekOne(); - - BSONElement code = error["code"]; - if( code.eoo() || ! code.isNumber() ){ - warning() << "no code for error from secondary host " << _slaveHost << ", error was " << error << endl; - return result; - } - // We only check for "not master or secondary" errors here // If the error code here ever changes, we need to change this code also - if( code.Int() == 13436 /* not master or secondary */ ){ + BSONElement code = error["code"]; + if( code.isNumber() && code.Int() == 13436 /* not master or secondary */ ){ isntSecondary(); throw DBException( str::stream() << "slave " << _slaveHost.toString() << " is no longer secondary", 14812 ); } @@ -615,20 +709,123 @@ namespace mongo { _slave.reset(); } + void DBClientReplicaSet::say( Message& toSend, bool isRetry ) { - void DBClientReplicaSet::isntMaster() { - log() << "got not master for: " << _masterHost << endl; - _monitor->notifyFailure( _masterHost ); - _master.reset(); + if( ! isRetry ) + _lazyState = LazyState(); + + int lastOp = -1; + bool slaveOk = false; + + if ( ( lastOp = toSend.operation() ) == dbQuery ) { + // TODO: might be possible to do this faster by changing api + DbMessage dm( toSend ); + QueryMessage qm( dm ); + if ( ( slaveOk = ( qm.queryOptions & QueryOption_SlaveOk ) ) ) { + + for ( int i = _lazyState._retries; i < 3; i++ ) { + try { + DBClientConnection* slave = checkSlave(); + slave->say( toSend ); + + _lazyState._lastOp = lastOp; + _lazyState._slaveOk = slaveOk; + _lazyState._retries = i; + _lazyState._lastClient = slave; + return; + } + catch ( DBException &e ) { + LOG(1) << "can't callLazy replica set slave " << i << " : " << _slaveHost << causedBy( e ) << endl; + } + } + } + } + + DBClientConnection* master = checkMaster(); + master->say( toSend ); + + _lazyState._lastOp = lastOp; + _lazyState._slaveOk = slaveOk; + _lazyState._retries = 3; + _lazyState._lastClient = master; + return; + } + + bool DBClientReplicaSet::recv( Message& m ) { + + assert( _lazyState._lastClient ); + + // TODO: It would be nice if we could easily wrap a conn error as a result error + try { + return _lazyState._lastClient->recv( m ); + } + catch( DBException& e ){ + log() << "could not receive data from " << _lazyState._lastClient << causedBy( e ) << endl; + return false; + } + } + + void DBClientReplicaSet::checkResponse( const char* data, int nReturned, bool* retry, string* targetHost ){ + + // For now, do exactly as we did before, so as not to break things. In general though, we + // should fix this so checkResponse has a more consistent contract. + if( ! retry ){ + if( _lazyState._lastClient ) + return _lazyState._lastClient->checkResponse( data, nReturned ); + else + return checkMaster()->checkResponse( data, nReturned ); + } + + *retry = false; + if( targetHost && _lazyState._lastClient ) *targetHost = _lazyState._lastClient->getServerAddress(); + else if (targetHost) *targetHost = ""; + + if( ! _lazyState._lastClient ) return; + if( nReturned != 1 && nReturned != -1 ) return; + + BSONObj dataObj; + if( nReturned == 1 ) dataObj = BSONObj( data ); + + // Check if we should retry here + if( _lazyState._lastOp == dbQuery && _lazyState._slaveOk ){ + + // Check the error code for a slave not secondary error + if( nReturned == -1 || + ( hasErrField( dataObj ) && ! dataObj["code"].eoo() && dataObj["code"].Int() == 13436 ) ){ + + bool wasMaster = false; + if( _lazyState._lastClient == _slave.get() ){ + isntSecondary(); + } + else if( _lazyState._lastClient == _master.get() ){ + wasMaster = true; + isntMaster(); + } + else + warning() << "passed " << dataObj << " but last rs client " << _lazyState._lastClient->toString() << " is not master or secondary" << endl; + + if( _lazyState._retries < 3 ){ + _lazyState._retries++; + *retry = true; + } + else{ + (void)wasMaster; // silence set-but-not-used warning + // assert( wasMaster ); + // printStackTrace(); + log() << "too many retries (" << _lazyState._retries << "), could not get data from replica set" << endl; + } + } + } } + bool DBClientReplicaSet::call( Message &toSend, Message &response, bool assertOk , string * actualServer ) { if ( toSend.operation() == dbQuery ) { // TODO: might be possible to do this faster by changing api DbMessage dm( toSend ); QueryMessage qm( dm ); if ( qm.queryOptions & QueryOption_SlaveOk ) { - for ( int i=0; i<2; i++ ) { + for ( int i=0; i<3; i++ ) { try { DBClientConnection* s = checkSlave(); if ( actualServer ) @@ -636,7 +833,7 @@ namespace mongo { return s->call( toSend , response , assertOk ); } catch ( DBException &e ) { - LOG(1) << "can't call replica set slave " << i << " : " << _slaveHost << e.what() << endl; + LOG(1) << "can't call replica set slave " << i << " : " << _slaveHost << causedBy( e ) << endl; if ( actualServer ) *actualServer = ""; } |