summaryrefslogtreecommitdiff
path: root/client
diff options
context:
space:
mode:
authorAntonin Kral <a.kral@bobek.cz>2011-12-15 09:35:47 +0100
committerAntonin Kral <a.kral@bobek.cz>2011-12-15 09:35:47 +0100
commitf0d9a01bccdaeb466c12c92057914bbfef59526c (patch)
tree7679efa1f0daf7d1d906882a15dc77af6b7aef32 /client
parent5d342a758c6095b4d30aba0750b54f13b8916f51 (diff)
downloadmongodb-f0d9a01bccdaeb466c12c92057914bbfef59526c.tar.gz
Imported Upstream version 2.0.2
Diffstat (limited to 'client')
-rw-r--r--client/connpool.cpp40
-rw-r--r--client/connpool.h4
-rw-r--r--client/dbclient.cpp21
-rw-r--r--client/dbclient.h6
-rw-r--r--client/dbclient_rs.cpp66
-rw-r--r--client/distlock.cpp22
-rw-r--r--client/distlock.h28
-rw-r--r--client/distlock_test.cpp9
-rw-r--r--client/parallel.cpp8
9 files changed, 144 insertions, 60 deletions
diff --git a/client/connpool.cpp b/client/connpool.cpp
index 2d7c37b..94ce4ec 100644
--- a/client/connpool.cpp
+++ b/client/connpool.cpp
@@ -38,7 +38,7 @@ namespace mongo {
void PoolForHost::done( DBConnectionPool * pool, DBClientBase * c ) {
if ( _pool.size() >= _maxPerHost ) {
- pool->onDestory( c );
+ pool->onDestroy( c );
delete c;
}
else {
@@ -55,7 +55,7 @@ namespace mongo {
_pool.pop();
if ( ! sc.ok( now ) ) {
- pool->onDestory( sc.conn );
+ pool->onDestroy( sc.conn );
delete sc.conn;
continue;
}
@@ -145,9 +145,15 @@ namespace mongo {
PoolForHost& p = _pools[PoolKey(host,socketTimeout)];
p.createdOne( conn );
}
-
- onCreate( conn );
- onHandedOut( conn );
+
+ try {
+ onCreate( conn );
+ onHandedOut( conn );
+ }
+ catch ( std::exception& e ) {
+ delete conn;
+ throw;
+ }
return conn;
}
@@ -155,7 +161,13 @@ namespace mongo {
DBClientBase* DBConnectionPool::get(const ConnectionString& url, double socketTimeout) {
DBClientBase * c = _get( url.toString() , socketTimeout );
if ( c ) {
- onHandedOut( c );
+ try {
+ onHandedOut( c );
+ }
+ catch ( std::exception& e ) {
+ delete c;
+ throw;
+ }
return c;
}
@@ -169,7 +181,13 @@ namespace mongo {
DBClientBase* DBConnectionPool::get(const string& host, double socketTimeout) {
DBClientBase * c = _get( host , socketTimeout );
if ( c ) {
- onHandedOut( c );
+ try {
+ onHandedOut( c );
+ }
+ catch ( std::exception& e ) {
+ delete c;
+ throw;
+ }
return c;
}
@@ -185,7 +203,7 @@ namespace mongo {
void DBConnectionPool::release(const string& host, DBClientBase *c) {
if ( c->isFailed() ) {
- onDestory( c );
+ onDestroy( c );
delete c;
return;
}
@@ -228,12 +246,12 @@ namespace mongo {
}
}
- void DBConnectionPool::onDestory( DBClientBase * conn ) {
+ void DBConnectionPool::onDestroy( DBClientBase * conn ) {
if ( _hooks->size() == 0 )
return;
for ( list<DBConnectionHook*>::iterator i = _hooks->begin(); i != _hooks->end(); i++ ) {
- (*i)->onDestory( conn );
+ (*i)->onDestroy( conn );
}
}
@@ -357,7 +375,7 @@ namespace mongo {
for ( size_t i=0; i<toDelete.size(); i++ ) {
try {
- onDestory( toDelete[i] );
+ onDestroy( toDelete[i] );
delete toDelete[i];
}
catch ( ... ) {
diff --git a/client/connpool.h b/client/connpool.h
index a37dad7..8733abb 100644
--- a/client/connpool.h
+++ b/client/connpool.h
@@ -89,7 +89,7 @@ namespace mongo {
virtual ~DBConnectionHook() {}
virtual void onCreate( DBClientBase * conn ) {}
virtual void onHandedOut( DBClientBase * conn ) {}
- virtual void onDestory( DBClientBase * conn ) {}
+ virtual void onDestroy( DBClientBase * conn ) {}
};
/** Database connection pool.
@@ -119,7 +119,7 @@ namespace mongo {
void onCreate( DBClientBase * conn );
void onHandedOut( DBClientBase * conn );
- void onDestory( DBClientBase * conn );
+ void onDestroy( DBClientBase * conn );
void flush();
diff --git a/client/dbclient.cpp b/client/dbclient.cpp
index dadf7e4..5faeccf 100644
--- a/client/dbclient.cpp
+++ b/client/dbclient.cpp
@@ -247,6 +247,11 @@ namespace mongo {
return o["ok"].trueValue();
}
+ bool DBClientWithCommands::isNotMasterErrorString( const BSONElement& e ) {
+ return e.type() == String && str::contains( e.valuestr() , "not master" );
+ }
+
+
enum QueryOptions DBClientWithCommands::availableOptions() {
if ( !_haveCachedAvailableOptions ) {
BSONObj ret;
@@ -599,6 +604,19 @@ namespace mongo {
return true;
}
+
+ inline bool DBClientConnection::runCommand(const string &dbname, const BSONObj& cmd, BSONObj &info, int options) {
+ if ( DBClientWithCommands::runCommand( dbname , cmd , info , options ) )
+ return true;
+
+ if ( clientSet && isNotMasterErrorString( info["errmsg"] ) ) {
+ clientSet->isntMaster();
+ }
+
+ return false;
+ }
+
+
void DBClientConnection::_checkConnection() {
if ( !_failed )
return;
@@ -982,8 +1000,7 @@ namespace mongo {
if ( clientSet && nReturned ) {
assert(data);
BSONObj o(data);
- BSONElement e = getErrField(o);
- if ( e.type() == String && str::contains( e.valuestr() , "not master" ) ) {
+ if ( isNotMasterErrorString( getErrField(o) ) ) {
clientSet->isntMaster();
}
}
diff --git a/client/dbclient.h b/client/dbclient.h
index 2b4bb85..ea55bb4 100644
--- a/client/dbclient.h
+++ b/client/dbclient.h
@@ -721,8 +721,12 @@ namespace mongo {
}
protected:
+ /** if the result of a command is ok*/
bool isOk(const BSONObj&);
+ /** if the element contains a not master error */
+ bool isNotMasterErrorString( const BSONElement& e );
+
BSONObj _countCmd(const string &ns, const BSONObj& query, int options, int limit, int skip );
enum QueryOptions availableOptions();
@@ -892,6 +896,8 @@ namespace mongo {
unsigned long long query( boost::function<void(const BSONObj&)> f, const string& ns, Query query, const BSONObj *fieldsToReturn = 0, int queryOptions = 0);
unsigned long long query( boost::function<void(DBClientCursorBatchIterator&)> f, const string& ns, Query query, const BSONObj *fieldsToReturn = 0, int queryOptions = 0);
+ virtual bool runCommand(const string &dbname, const BSONObj& cmd, BSONObj &info, int options=0);
+
/**
@return true if this connection is currently in a failed state. When autoreconnect is on,
a connection will transition back to an ok state after reconnecting.
diff --git a/client/dbclient_rs.cpp b/client/dbclient_rs.cpp
index 2cab1f7..c57a52d 100644
--- a/client/dbclient_rs.cpp
+++ b/client/dbclient_rs.cpp
@@ -247,38 +247,27 @@ namespace mongo {
}
HostAndPort ReplicaSetMonitor::getSlave() {
+ LOG(2) << "dbclient_rs getSlave " << getServerAddress() << endl;
- LOG(2) << "selecting new slave from replica set " << getServerAddress() << endl;
-
- // Logic is to retry three times for any secondary node, if we can't find any secondary, we'll take
- // any "ok" node
- // TODO: Could this query hidden nodes?
- const int MAX = 3;
- for ( int xxx=0; xxx<MAX; xxx++ ) {
+ scoped_lock lk( _lock );
- {
- scoped_lock lk( _lock );
-
- unsigned i = 0;
- for ( ; i<_nodes.size(); i++ ) {
- _nextSlave = ( _nextSlave + 1 ) % _nodes.size();
- if ( _nextSlave == _master ){
- LOG(2) << "not selecting " << _nodes[_nextSlave] << " as it is the current master" << endl;
- continue;
- }
- if ( _nodes[ _nextSlave ].okForSecondaryQueries() || ( _nodes[ _nextSlave ].ok && ( xxx + 1 ) >= MAX ) )
- return _nodes[ _nextSlave ].addr;
-
- LOG(2) << "not selecting " << _nodes[_nextSlave] << " as it is not ok to use" << endl;
- }
-
+ for ( unsigned ii = 0; ii < _nodes.size(); ii++ ) {
+ _nextSlave = ( _nextSlave + 1 ) % _nodes.size();
+ if ( _nextSlave != _master ) {
+ if ( _nodes[ _nextSlave ].okForSecondaryQueries() )
+ return _nodes[ _nextSlave ].addr;
+ LOG(2) << "dbclient_rs getSlave not selecting " << _nodes[_nextSlave] << ", not currently okForSecondaryQueries" << endl;
}
+ }
- check(false);
+ if( _master >= 0 ) {
+ assert( static_cast<unsigned>(_master) < _nodes.size() );
+ LOG(2) << "dbclient_rs getSlave no member in secondary state found, returning primary " << _nodes[ _master ] << endl;
+ return _nodes[_master].addr;
}
- LOG(2) << "no suitable slave nodes found, returning default node " << _nodes[ 0 ] << endl;
-
+ LOG(2) << "dbclient_rs getSlave no suitable member found, returning first node " << _nodes[ 0 ] << endl;
+ assert( _nodes.size() > 0 );
return _nodes[0].addr;
}
@@ -820,10 +809,14 @@ namespace mongo {
bool DBClientReplicaSet::call( Message &toSend, Message &response, bool assertOk , string * actualServer ) {
+ const char * ns = 0;
+
if ( toSend.operation() == dbQuery ) {
// TODO: might be possible to do this faster by changing api
DbMessage dm( toSend );
QueryMessage qm( dm );
+ ns = qm.ns;
+
if ( qm.queryOptions & QueryOption_SlaveOk ) {
for ( int i=0; i<3; i++ ) {
try {
@@ -844,7 +837,26 @@ namespace mongo {
DBClientConnection* m = checkMaster();
if ( actualServer )
*actualServer = m->getServerAddress();
- return m->call( toSend , response , assertOk );
+
+ if ( ! m->call( toSend , response , assertOk ) )
+ return false;
+
+ if ( ns ) {
+ QueryResult * res = (QueryResult*)response.singleData();
+ if ( res->nReturned == 1 ) {
+ BSONObj x(res->data() );
+ if ( str::contains( ns , "$cmd" ) ) {
+ if ( isNotMasterErrorString( x["errmsg"] ) )
+ isntMaster();
+ }
+ else {
+ if ( isNotMasterErrorString( getErrField( x ) ) )
+ isntMaster();
+ }
+ }
+ }
+
+ return true;
}
}
diff --git a/client/distlock.cpp b/client/distlock.cpp
index cb71159..595fc38 100644
--- a/client/distlock.cpp
+++ b/client/distlock.cpp
@@ -22,6 +22,7 @@
namespace mongo {
LabeledLevel DistributedLock::logLvl( 1 );
+ DistributedLock::LastPings DistributedLock::lastPings;
ThreadLocalValue<string> distLockIds("");
@@ -84,7 +85,7 @@ namespace mongo {
Date_t pingTime;
try {
- ScopedDbConnection conn( addr );
+ ScopedDbConnection conn( addr, 30.0 );
pingTime = jsTime();
@@ -224,7 +225,7 @@ namespace mongo {
string s = pingThreadId( conn, processId );
// Ignore if we already have a pinging thread for this process.
- if ( _seen.count( s ) > 0 ) return "";
+ if ( _seen.count( s ) > 0 ) return s;
// Check our clock skew
try {
@@ -303,6 +304,18 @@ namespace mongo {
log( logLvl - 1 ) << "created new distributed lock for " << name << " on " << conn
<< " ( lock timeout : " << _lockTimeout
<< ", ping interval : " << _lockPing << ", process : " << asProcess << " )" << endl;
+
+
+ }
+
+ DistributedLock::PingData DistributedLock::LastPings::getLastPing( const ConnectionString& conn, const string& lockName ){
+ scoped_lock lock( _mutex );
+ return _lastPings[ std::pair< string, string >( conn.toString(), lockName ) ];
+ }
+
+ void DistributedLock::LastPings::setLastPing( const ConnectionString& conn, const string& lockName, const PingData& pd ){
+ scoped_lock lock( _mutex );
+ _lastPings[ std::pair< string, string >( conn.toString(), lockName ) ] = pd;
}
Date_t DistributedLock::getRemoteTime() {
@@ -512,6 +525,7 @@ namespace mongo {
unsigned long long elapsed = 0;
unsigned long long takeover = _lockTimeout;
+ PingData _lastPingCheck = getLastPing();
log( logLvl ) << "checking last ping for lock '" << lockName << "'" << " against process " << _lastPingCheck.get<0>() << " and ping " << _lastPingCheck.get<1>() << endl;
@@ -527,8 +541,7 @@ namespace mongo {
if( recPingChange || recTSChange ) {
// If the ping has changed since we last checked, mark the current date and time
- scoped_lock lk( _mutex );
- _lastPingCheck = boost::tuple<string, Date_t, Date_t, OID>( lastPing["_id"].String().c_str(), lastPing["ping"].Date(), remote, o["ts"].OID() );
+ setLastPing( PingData( lastPing["_id"].String().c_str(), lastPing["ping"].Date(), remote, o["ts"].OID() ) );
}
else {
@@ -540,7 +553,6 @@ namespace mongo {
else
elapsed = remote - _lastPingCheck.get<2>();
}
-
}
catch( LockException& e ) {
diff --git a/client/distlock.h b/client/distlock.h
index 8985672..106a5d0 100644
--- a/client/distlock.h
+++ b/client/distlock.h
@@ -71,6 +71,22 @@ namespace mongo {
static LabeledLevel logLvl;
+ typedef boost::tuple<string, Date_t, Date_t, OID> PingData;
+
+ class LastPings {
+ public:
+ LastPings() : _mutex( "DistributedLock::LastPings" ) {}
+ ~LastPings(){}
+
+ PingData getLastPing( const ConnectionString& conn, const string& lockName );
+ void setLastPing( const ConnectionString& conn, const string& lockName, const PingData& pd );
+
+ mongo::mutex _mutex;
+ map< std::pair<string, string>, PingData > _lastPings;
+ };
+
+ static LastPings lastPings;
+
/**
* The constructor does not connect to the configdb yet and constructing does not mean the lock was acquired.
* Construction does trigger a lock "pinging" mechanism, though.
@@ -145,16 +161,12 @@ namespace mongo {
private:
- void resetLastPing(){
- scoped_lock lk( _mutex );
- _lastPingCheck = boost::tuple<string, Date_t, Date_t, OID>();
- }
-
- mongo::mutex _mutex;
+ void resetLastPing(){ lastPings.setLastPing( _conn, _name, PingData() ); }
+ void setLastPing( const PingData& pd ){ lastPings.setLastPing( _conn, _name, pd ); }
+ PingData getLastPing(){ return lastPings.getLastPing( _conn, _name ); }
- // Data from last check of process with ping time
- boost::tuple<string, Date_t, Date_t, OID> _lastPingCheck;
// May or may not exist, depending on startup
+ mongo::mutex _mutex;
string _threadId;
};
diff --git a/client/distlock_test.cpp b/client/distlock_test.cpp
index 42a1c48..5f37e6b 100644
--- a/client/distlock_test.cpp
+++ b/client/distlock_test.cpp
@@ -195,6 +195,7 @@ namespace mongo {
boost::variate_generator<boost::mt19937&, boost::uniform_int<> > randomSkew(gen, boost::uniform_int<>(0, skewRange));
boost::variate_generator<boost::mt19937&, boost::uniform_int<> > randomWait(gen, boost::uniform_int<>(1, threadWait));
boost::variate_generator<boost::mt19937&, boost::uniform_int<> > randomSleep(gen, boost::uniform_int<>(1, threadSleep));
+ boost::variate_generator<boost::mt19937&, boost::uniform_int<> > randomNewLock(gen, boost::uniform_int<>(0, 3));
int skew = 0;
@@ -262,7 +263,7 @@ namespace mongo {
}
else {
log() << "**** Not unlocking for thread " << threadId << endl;
- DistributedLock::killPinger( *myLock );
+ assert( DistributedLock::killPinger( *myLock ) );
// We're simulating a crashed process...
break;
}
@@ -274,6 +275,12 @@ namespace mongo {
break;
}
+ // Create a new lock 1/3 of the time
+ if( randomNewLock() > 1 ){
+ lock.reset(new DistributedLock( hostConn, lockName, takeoverMS, true ));
+ myLock = lock.get();
+ }
+
sleepmillis(randomSleep());
}
diff --git a/client/parallel.cpp b/client/parallel.cpp
index 76b0168..3a33eb5 100644
--- a/client/parallel.cpp
+++ b/client/parallel.cpp
@@ -67,7 +67,7 @@ namespace mongo {
assert( cursor );
if ( cursor->hasResultFlag( ResultFlag_ShardConfigStale ) ) {
- throw StaleConfigException( _ns , "ClusteredCursor::query" );
+ throw StaleConfigException( _ns , "ClusteredCursor::_checkCursor" );
}
if ( cursor->hasResultFlag( ResultFlag_ErrSet ) ) {
@@ -90,7 +90,7 @@ namespace mongo {
if ( conn.setVersion() ) {
conn.done();
- throw StaleConfigException( _ns , "ClusteredCursor::query ShardConnection had to change" , true );
+ throw StaleConfigException( _ns , "ClusteredCursor::query" , true );
}
LOG(5) << "ClusteredCursor::query (" << type() << ") server:" << server
@@ -490,7 +490,7 @@ namespace mongo {
if ( conns[i]->setVersion() ) {
conns[i]->done();
- staleConfigExs.push_back( StaleConfigException( _ns , "ClusteredCursor::query ShardConnection had to change" , true ).what() + errLoc );
+ staleConfigExs.push_back( (string)"stale config detected for " + StaleConfigException( _ns , "ParallelCursor::_init" , true ).what() + errLoc );
break;
}
@@ -592,7 +592,7 @@ namespace mongo {
// when we throw our exception
allConfigStale = true;
- staleConfigExs.push_back( e.what() + errLoc );
+ staleConfigExs.push_back( (string)"stale config detected for " + e.what() + errLoc );
_cursors[i].reset( NULL );
conns[i]->done();
continue;