summaryrefslogtreecommitdiff
path: root/client/dbclient_rs.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'client/dbclient_rs.cpp')
-rw-r--r--client/dbclient_rs.cpp335
1 files changed, 266 insertions, 69 deletions
diff --git a/client/dbclient_rs.cpp b/client/dbclient_rs.cpp
index 37f6225..2cab1f7 100644
--- a/client/dbclient_rs.cpp
+++ b/client/dbclient_rs.cpp
@@ -54,9 +54,9 @@ namespace mongo {
void run() {
log() << "starting" << endl;
while ( ! inShutdown() ) {
- sleepsecs( 20 );
+ sleepsecs( 10 );
try {
- ReplicaSetMonitor::checkAll();
+ ReplicaSetMonitor::checkAll( true );
}
catch ( std::exception& e ) {
error() << "check failed: " << e.what() << endl;
@@ -99,17 +99,14 @@ namespace mongo {
}
_nodes.push_back( Node( servers[i] , conn.release() ) );
-
+
+ int myLoc = _nodes.size() - 1;
string maybePrimary;
- if (_checkConnection( _nodes[_nodes.size()-1].conn , maybePrimary, false)) {
- break;
- }
+ _checkConnection( _nodes[myLoc].conn.get() , maybePrimary, false, myLoc );
}
}
ReplicaSetMonitor::~ReplicaSetMonitor() {
- for ( unsigned i=0; i<_nodes.size(); i++ )
- delete _nodes[i].conn;
_nodes.clear();
_master = -1;
}
@@ -125,7 +122,16 @@ namespace mongo {
return m;
}
- void ReplicaSetMonitor::checkAll() {
+ ReplicaSetMonitorPtr ReplicaSetMonitor::get( const string& name ) {
+ scoped_lock lk( _setsLock );
+ map<string,ReplicaSetMonitorPtr>::const_iterator i = _sets.find( name );
+ if ( i == _sets.end() )
+ return ReplicaSetMonitorPtr();
+ return i->second;
+ }
+
+
+ void ReplicaSetMonitor::checkAll( bool checkAllSecondaries ) {
set<string> seen;
while ( true ) {
@@ -146,7 +152,7 @@ namespace mongo {
if ( ! m )
break;
- m->check();
+ m->check( checkAllSecondaries );
}
@@ -202,7 +208,7 @@ namespace mongo {
return _nodes[_master].addr;
}
- _check();
+ _check( false );
scoped_lock lk( _lock );
uassert( 10009 , str::stream() << "ReplicaSetMonitor no master found for set: " << _name , _master >= 0 );
@@ -210,34 +216,70 @@ namespace mongo {
}
HostAndPort ReplicaSetMonitor::getSlave( const HostAndPort& prev ) {
- // make sure its valid
- if ( prev.port() > 0 ) {
+ // make sure its valid
+
+ bool wasFound = false;
+
+ // This is always true, since checked in port()
+ assert( prev.port() >= 0 );
+ if( prev.host().size() ){
scoped_lock lk( _lock );
for ( unsigned i=0; i<_nodes.size(); i++ ) {
if ( prev != _nodes[i].addr )
continue;
- if ( _nodes[i].ok )
+ wasFound = true;
+
+ if ( _nodes[i].okForSecondaryQueries() )
return prev;
+
break;
}
}
+ if( prev.host().size() ){
+ if( wasFound ){ LOG(1) << "slave '" << prev << "' is no longer ok to use" << endl; }
+ else{ LOG(1) << "slave '" << prev << "' was not found in the replica set" << endl; }
+ }
+ else LOG(1) << "slave '" << prev << "' is not initialized or invalid" << endl;
+
return getSlave();
}
HostAndPort ReplicaSetMonitor::getSlave() {
- scoped_lock lk( _lock );
- for ( unsigned i=0; i<_nodes.size(); i++ ) {
- _nextSlave = ( _nextSlave + 1 ) % _nodes.size();
- if ( _nextSlave == _master )
- continue;
- if ( _nodes[ _nextSlave ].ok )
- return _nodes[ _nextSlave ].addr;
+ LOG(2) << "selecting new slave from replica set " << getServerAddress() << endl;
+
+ // Logic is to retry three times for any secondary node, if we can't find any secondary, we'll take
+ // any "ok" node
+ // TODO: Could this query hidden nodes?
+ const int MAX = 3;
+ for ( int xxx=0; xxx<MAX; xxx++ ) {
+
+ {
+ scoped_lock lk( _lock );
+
+ unsigned i = 0;
+ for ( ; i<_nodes.size(); i++ ) {
+ _nextSlave = ( _nextSlave + 1 ) % _nodes.size();
+ if ( _nextSlave == _master ){
+ LOG(2) << "not selecting " << _nodes[_nextSlave] << " as it is the current master" << endl;
+ continue;
+ }
+ if ( _nodes[ _nextSlave ].okForSecondaryQueries() || ( _nodes[ _nextSlave ].ok && ( xxx + 1 ) >= MAX ) )
+ return _nodes[ _nextSlave ].addr;
+
+ LOG(2) << "not selecting " << _nodes[_nextSlave] << " as it is not ok to use" << endl;
+ }
+
+ }
+
+ check(false);
}
+
+ LOG(2) << "no suitable slave nodes found, returning default node " << _nodes[ 0 ] << endl;
- return _nodes[ 0 ].addr;
+ return _nodes[0].addr;
}
/**
@@ -266,7 +308,7 @@ namespace mongo {
string host = member["name"].String();
int m = -1;
- if ((m = _find(host)) <= 0) {
+ if ((m = _find(host)) < 0) {
continue;
}
@@ -309,16 +351,34 @@ namespace mongo {
- bool ReplicaSetMonitor::_checkConnection( DBClientConnection * c , string& maybePrimary , bool verbose ) {
+ bool ReplicaSetMonitor::_checkConnection( DBClientConnection * c , string& maybePrimary , bool verbose , int nodesOffset ) {
scoped_lock lk( _checkConnectionLock );
bool isMaster = false;
bool changed = false;
try {
+ Timer t;
BSONObj o;
c->isMaster(isMaster, &o);
+
+ if ( o["setName"].type() != String || o["setName"].String() != _name ) {
+ warning() << "node: " << c->getServerAddress() << " isn't a part of set: " << _name
+ << " ismaster: " << o << endl;
+ if ( nodesOffset >= 0 )
+ _nodes[nodesOffset].ok = false;
+ return false;
+ }
- log( ! verbose ) << "ReplicaSetMonitor::_checkConnection: " << c->toString() << ' ' << o << endl;
+ if ( nodesOffset >= 0 ) {
+ _nodes[nodesOffset].pingTimeMillis = t.millis();
+ _nodes[nodesOffset].hidden = o["hidden"].trueValue();
+ _nodes[nodesOffset].secondary = o["secondary"].trueValue();
+ _nodes[nodesOffset].ismaster = o["ismaster"].trueValue();
+
+ _nodes[nodesOffset].lastIsMaster = o.copy();
+ }
+ log( ! verbose ) << "ReplicaSetMonitor::_checkConnection: " << c->toString() << ' ' << o << endl;
+
// add other nodes
if ( o["hosts"].type() == Array ) {
if ( o["primary"].type() == String )
@@ -329,11 +389,14 @@ namespace mongo {
if (o.hasField("passives") && o["passives"].type() == Array) {
_checkHosts(o["passives"].Obj(), changed);
}
-
+
_checkStatus(c);
+
+
}
catch ( std::exception& e ) {
log( ! verbose ) << "ReplicaSetMonitor::_checkConnection: caught exception " << c->toString() << ' ' << e.what() << endl;
+ _nodes[nodesOffset].ok = false;
}
if ( changed && _hook )
@@ -342,24 +405,28 @@ namespace mongo {
return isMaster;
}
- void ReplicaSetMonitor::_check() {
+ void ReplicaSetMonitor::_check( bool checkAllSecondaries ) {
bool triedQuickCheck = false;
LOG(1) << "_check : " << getServerAddress() << endl;
+ int newMaster = -1;
+
for ( int retry = 0; retry < 2; retry++ ) {
for ( unsigned i=0; i<_nodes.size(); i++ ) {
- DBClientConnection * c;
+ shared_ptr<DBClientConnection> c;
{
scoped_lock lk( _lock );
c = _nodes[i].conn;
}
string maybePrimary;
- if ( _checkConnection( c , maybePrimary , retry ) ) {
+ if ( _checkConnection( c.get() , maybePrimary , retry , i ) ) {
_master = i;
- return;
+ newMaster = i;
+ if ( ! checkAllSecondaries )
+ return;
}
if ( ! triedQuickCheck && maybePrimary.size() ) {
@@ -367,36 +434,44 @@ namespace mongo {
if ( x >= 0 ) {
triedQuickCheck = true;
string dummy;
- DBClientConnection * testConn;
+ shared_ptr<DBClientConnection> testConn;
{
scoped_lock lk( _lock );
testConn = _nodes[x].conn;
}
- if ( _checkConnection( testConn , dummy , false ) ) {
+ if ( _checkConnection( testConn.get() , dummy , false , x ) ) {
_master = x;
- return;
+ newMaster = x;
+ if ( ! checkAllSecondaries )
+ return;
}
}
}
}
+
+ if ( newMaster >= 0 )
+ return;
+
sleepsecs(1);
}
}
- void ReplicaSetMonitor::check() {
+ void ReplicaSetMonitor::check( bool checkAllSecondaries ) {
// first see if the current master is fine
if ( _master >= 0 ) {
string temp;
- if ( _checkConnection( _nodes[_master].conn , temp , false ) ) {
- // current master is fine, so we're done
- return;
+ if ( _checkConnection( _nodes[_master].conn.get() , temp , false , _master ) ) {
+ if ( ! checkAllSecondaries ) {
+ // current master is fine, so we're done
+ return;
+ }
}
}
// we either have no master, or the current is dead
- _check();
+ _check( checkAllSecondaries );
}
int ReplicaSetMonitor::_find( const string& server ) const {
@@ -419,7 +494,26 @@ namespace mongo {
return i;
return -1;
}
-
+
+ void ReplicaSetMonitor::appendInfo( BSONObjBuilder& b ) const {
+ scoped_lock lk( _lock );
+ BSONArrayBuilder hosts( b.subarrayStart( "hosts" ) );
+ for ( unsigned i=0; i<_nodes.size(); i++ ) {
+ hosts.append( BSON( "addr" << _nodes[i].addr <<
+ // "lastIsMaster" << _nodes[i].lastIsMaster << // this is a potential race, so only used when debugging
+ "ok" << _nodes[i].ok <<
+ "ismaster" << _nodes[i].ismaster <<
+ "hidden" << _nodes[i].hidden <<
+ "secondary" << _nodes[i].secondary <<
+ "pingTimeMillis" << _nodes[i].pingTimeMillis ) );
+
+ }
+ hosts.done();
+
+ b.append( "master" , _master );
+ b.append( "nextSlave" , _nextSlave );
+ }
+
mongo::mutex ReplicaSetMonitor::_setsLock( "ReplicaSetMonitor" );
map<string,ReplicaSetMonitorPtr> ReplicaSetMonitor::_sets;
@@ -428,8 +522,9 @@ namespace mongo {
// ----- DBClientReplicaSet ---------
// --------------------------------
- DBClientReplicaSet::DBClientReplicaSet( const string& name , const vector<HostAndPort>& servers )
- : _monitor( ReplicaSetMonitor::get( name , servers ) ) {
+ DBClientReplicaSet::DBClientReplicaSet( const string& name , const vector<HostAndPort>& servers, double so_timeout )
+ : _monitor( ReplicaSetMonitor::get( name , servers ) ),
+ _so_timeout( so_timeout ) {
}
DBClientReplicaSet::~DBClientReplicaSet() {
@@ -446,7 +541,7 @@ namespace mongo {
}
_masterHost = _monitor->getMaster();
- _master.reset( new DBClientConnection( true , this ) );
+ _master.reset( new DBClientConnection( true , this , _so_timeout ) );
string errmsg;
if ( ! _master->connect( _masterHost , errmsg ) ) {
_monitor->notifyFailure( _masterHost );
@@ -464,12 +559,12 @@ namespace mongo {
return _slave.get();
_monitor->notifySlaveFailure( _slaveHost );
_slaveHost = _monitor->getSlave();
- }
+ }
else {
_slaveHost = h;
}
- _slave.reset( new DBClientConnection( true , this ) );
+ _slave.reset( new DBClientConnection( true , this , _so_timeout ) );
_slave->connect( _slaveHost );
_auth( _slave.get() );
return _slave.get();
@@ -522,12 +617,12 @@ namespace mongo {
// ------------- simple functions -----------------
- void DBClientReplicaSet::insert( const string &ns , BSONObj obj ) {
- checkMaster()->insert(ns, obj);
+ void DBClientReplicaSet::insert( const string &ns , BSONObj obj , int flags) {
+ checkMaster()->insert(ns, obj, flags);
}
- void DBClientReplicaSet::insert( const string &ns, const vector< BSONObj >& v ) {
- checkMaster()->insert(ns, v);
+ void DBClientReplicaSet::insert( const string &ns, const vector< BSONObj >& v , int flags) {
+ checkMaster()->insert(ns, v, flags);
}
void DBClientReplicaSet::remove( const string &ns , Query obj , bool justOne ) {
@@ -545,12 +640,12 @@ namespace mongo {
// we're ok sending to a slave
// we'll try 2 slaves before just using master
// checkSlave will try a different slave automatically after a failure
- for ( int i=0; i<2; i++ ) {
+ for ( int i=0; i<3; i++ ) {
try {
return checkSlaveQueryResult( checkSlave()->query(ns,query,nToReturn,nToSkip,fieldsToReturn,queryOptions,batchSize) );
}
catch ( DBException &e ) {
- log() << "can't query replica set slave " << i << " : " << _slaveHost << e.what() << endl;
+ LOG(1) << "can't query replica set slave " << i << " : " << _slaveHost << causedBy( e ) << endl;
}
}
}
@@ -563,12 +658,12 @@ namespace mongo {
// we're ok sending to a slave
// we'll try 2 slaves before just using master
// checkSlave will try a different slave automatically after a failure
- for ( int i=0; i<2; i++ ) {
+ for ( int i=0; i<3; i++ ) {
try {
return checkSlave()->findOne(ns,query,fieldsToReturn,queryOptions);
}
catch ( DBException &e ) {
- LOG(1) << "can't findone replica set slave " << i << " : " << _slaveHost << e.what() << endl;
+ LOG(1) << "can't findone replica set slave " << i << " : " << _slaveHost << causedBy( e ) << endl;
}
}
}
@@ -584,23 +679,22 @@ namespace mongo {
assert(0);
}
- auto_ptr<DBClientCursor> DBClientReplicaSet::checkSlaveQueryResult( auto_ptr<DBClientCursor> result ){
+ void DBClientReplicaSet::isntMaster() {
+ log() << "got not master for: " << _masterHost << endl;
+ _monitor->notifyFailure( _masterHost );
+ _master.reset();
+ }
- bool isError = result->hasResultFlag( ResultFlag_ErrSet );
+ auto_ptr<DBClientCursor> DBClientReplicaSet::checkSlaveQueryResult( auto_ptr<DBClientCursor> result ){
+ BSONObj error;
+ bool isError = result->peekError( &error );
if( ! isError ) return result;
- BSONObj error = result->peekOne();
-
- BSONElement code = error["code"];
- if( code.eoo() || ! code.isNumber() ){
- warning() << "no code for error from secondary host " << _slaveHost << ", error was " << error << endl;
- return result;
- }
-
// We only check for "not master or secondary" errors here
// If the error code here ever changes, we need to change this code also
- if( code.Int() == 13436 /* not master or secondary */ ){
+ BSONElement code = error["code"];
+ if( code.isNumber() && code.Int() == 13436 /* not master or secondary */ ){
isntSecondary();
throw DBException( str::stream() << "slave " << _slaveHost.toString() << " is no longer secondary", 14812 );
}
@@ -615,20 +709,123 @@ namespace mongo {
_slave.reset();
}
+ void DBClientReplicaSet::say( Message& toSend, bool isRetry ) {
- void DBClientReplicaSet::isntMaster() {
- log() << "got not master for: " << _masterHost << endl;
- _monitor->notifyFailure( _masterHost );
- _master.reset();
+ if( ! isRetry )
+ _lazyState = LazyState();
+
+ int lastOp = -1;
+ bool slaveOk = false;
+
+ if ( ( lastOp = toSend.operation() ) == dbQuery ) {
+ // TODO: might be possible to do this faster by changing api
+ DbMessage dm( toSend );
+ QueryMessage qm( dm );
+ if ( ( slaveOk = ( qm.queryOptions & QueryOption_SlaveOk ) ) ) {
+
+ for ( int i = _lazyState._retries; i < 3; i++ ) {
+ try {
+ DBClientConnection* slave = checkSlave();
+ slave->say( toSend );
+
+ _lazyState._lastOp = lastOp;
+ _lazyState._slaveOk = slaveOk;
+ _lazyState._retries = i;
+ _lazyState._lastClient = slave;
+ return;
+ }
+ catch ( DBException &e ) {
+ LOG(1) << "can't callLazy replica set slave " << i << " : " << _slaveHost << causedBy( e ) << endl;
+ }
+ }
+ }
+ }
+
+ DBClientConnection* master = checkMaster();
+ master->say( toSend );
+
+ _lazyState._lastOp = lastOp;
+ _lazyState._slaveOk = slaveOk;
+ _lazyState._retries = 3;
+ _lazyState._lastClient = master;
+ return;
+ }
+
+ bool DBClientReplicaSet::recv( Message& m ) {
+
+ assert( _lazyState._lastClient );
+
+ // TODO: It would be nice if we could easily wrap a conn error as a result error
+ try {
+ return _lazyState._lastClient->recv( m );
+ }
+ catch( DBException& e ){
+ log() << "could not receive data from " << _lazyState._lastClient << causedBy( e ) << endl;
+ return false;
+ }
+ }
+
+ void DBClientReplicaSet::checkResponse( const char* data, int nReturned, bool* retry, string* targetHost ){
+
+ // For now, do exactly as we did before, so as not to break things. In general though, we
+ // should fix this so checkResponse has a more consistent contract.
+ if( ! retry ){
+ if( _lazyState._lastClient )
+ return _lazyState._lastClient->checkResponse( data, nReturned );
+ else
+ return checkMaster()->checkResponse( data, nReturned );
+ }
+
+ *retry = false;
+ if( targetHost && _lazyState._lastClient ) *targetHost = _lazyState._lastClient->getServerAddress();
+ else if (targetHost) *targetHost = "";
+
+ if( ! _lazyState._lastClient ) return;
+ if( nReturned != 1 && nReturned != -1 ) return;
+
+ BSONObj dataObj;
+ if( nReturned == 1 ) dataObj = BSONObj( data );
+
+ // Check if we should retry here
+ if( _lazyState._lastOp == dbQuery && _lazyState._slaveOk ){
+
+ // Check the error code for a slave not secondary error
+ if( nReturned == -1 ||
+ ( hasErrField( dataObj ) && ! dataObj["code"].eoo() && dataObj["code"].Int() == 13436 ) ){
+
+ bool wasMaster = false;
+ if( _lazyState._lastClient == _slave.get() ){
+ isntSecondary();
+ }
+ else if( _lazyState._lastClient == _master.get() ){
+ wasMaster = true;
+ isntMaster();
+ }
+ else
+ warning() << "passed " << dataObj << " but last rs client " << _lazyState._lastClient->toString() << " is not master or secondary" << endl;
+
+ if( _lazyState._retries < 3 ){
+ _lazyState._retries++;
+ *retry = true;
+ }
+ else{
+ (void)wasMaster; // silence set-but-not-used warning
+ // assert( wasMaster );
+ // printStackTrace();
+ log() << "too many retries (" << _lazyState._retries << "), could not get data from replica set" << endl;
+ }
+ }
+ }
}
+
bool DBClientReplicaSet::call( Message &toSend, Message &response, bool assertOk , string * actualServer ) {
if ( toSend.operation() == dbQuery ) {
// TODO: might be possible to do this faster by changing api
DbMessage dm( toSend );
QueryMessage qm( dm );
if ( qm.queryOptions & QueryOption_SlaveOk ) {
- for ( int i=0; i<2; i++ ) {
+ for ( int i=0; i<3; i++ ) {
try {
DBClientConnection* s = checkSlave();
if ( actualServer )
@@ -636,7 +833,7 @@ namespace mongo {
return s->call( toSend , response , assertOk );
}
catch ( DBException &e ) {
- LOG(1) << "can't call replica set slave " << i << " : " << _slaveHost << e.what() << endl;
+ LOG(1) << "can't call replica set slave " << i << " : " << _slaveHost << causedBy( e ) << endl;
if ( actualServer )
*actualServer = "";
}