diff options
author | Antonin Kral <a.kral@bobek.cz> | 2011-12-15 09:35:47 +0100 |
---|---|---|
committer | Antonin Kral <a.kral@bobek.cz> | 2011-12-15 09:35:47 +0100 |
commit | f0d9a01bccdaeb466c12c92057914bbfef59526c (patch) | |
tree | 7679efa1f0daf7d1d906882a15dc77af6b7aef32 /client | |
parent | 5d342a758c6095b4d30aba0750b54f13b8916f51 (diff) | |
download | mongodb-f0d9a01bccdaeb466c12c92057914bbfef59526c.tar.gz |
Imported Upstream version 2.0.2
Diffstat (limited to 'client')
-rw-r--r-- | client/connpool.cpp | 40 | ||||
-rw-r--r-- | client/connpool.h | 4 | ||||
-rw-r--r-- | client/dbclient.cpp | 21 | ||||
-rw-r--r-- | client/dbclient.h | 6 | ||||
-rw-r--r-- | client/dbclient_rs.cpp | 66 | ||||
-rw-r--r-- | client/distlock.cpp | 22 | ||||
-rw-r--r-- | client/distlock.h | 28 | ||||
-rw-r--r-- | client/distlock_test.cpp | 9 | ||||
-rw-r--r-- | client/parallel.cpp | 8 |
9 files changed, 144 insertions, 60 deletions
diff --git a/client/connpool.cpp b/client/connpool.cpp index 2d7c37b..94ce4ec 100644 --- a/client/connpool.cpp +++ b/client/connpool.cpp @@ -38,7 +38,7 @@ namespace mongo { void PoolForHost::done( DBConnectionPool * pool, DBClientBase * c ) { if ( _pool.size() >= _maxPerHost ) { - pool->onDestory( c ); + pool->onDestroy( c ); delete c; } else { @@ -55,7 +55,7 @@ namespace mongo { _pool.pop(); if ( ! sc.ok( now ) ) { - pool->onDestory( sc.conn ); + pool->onDestroy( sc.conn ); delete sc.conn; continue; } @@ -145,9 +145,15 @@ namespace mongo { PoolForHost& p = _pools[PoolKey(host,socketTimeout)]; p.createdOne( conn ); } - - onCreate( conn ); - onHandedOut( conn ); + + try { + onCreate( conn ); + onHandedOut( conn ); + } + catch ( std::exception& e ) { + delete conn; + throw; + } return conn; } @@ -155,7 +161,13 @@ namespace mongo { DBClientBase* DBConnectionPool::get(const ConnectionString& url, double socketTimeout) { DBClientBase * c = _get( url.toString() , socketTimeout ); if ( c ) { - onHandedOut( c ); + try { + onHandedOut( c ); + } + catch ( std::exception& e ) { + delete c; + throw; + } return c; } @@ -169,7 +181,13 @@ namespace mongo { DBClientBase* DBConnectionPool::get(const string& host, double socketTimeout) { DBClientBase * c = _get( host , socketTimeout ); if ( c ) { - onHandedOut( c ); + try { + onHandedOut( c ); + } + catch ( std::exception& e ) { + delete c; + throw; + } return c; } @@ -185,7 +203,7 @@ namespace mongo { void DBConnectionPool::release(const string& host, DBClientBase *c) { if ( c->isFailed() ) { - onDestory( c ); + onDestroy( c ); delete c; return; } @@ -228,12 +246,12 @@ namespace mongo { } } - void DBConnectionPool::onDestory( DBClientBase * conn ) { + void DBConnectionPool::onDestroy( DBClientBase * conn ) { if ( _hooks->size() == 0 ) return; for ( list<DBConnectionHook*>::iterator i = _hooks->begin(); i != _hooks->end(); i++ ) { - (*i)->onDestory( conn ); + (*i)->onDestroy( conn ); } } @@ -357,7 +375,7 @@ namespace mongo { for ( size_t i=0; i<toDelete.size(); i++ ) { try { - onDestory( toDelete[i] ); + onDestroy( toDelete[i] ); delete toDelete[i]; } catch ( ... ) { diff --git a/client/connpool.h b/client/connpool.h index a37dad7..8733abb 100644 --- a/client/connpool.h +++ b/client/connpool.h @@ -89,7 +89,7 @@ namespace mongo { virtual ~DBConnectionHook() {} virtual void onCreate( DBClientBase * conn ) {} virtual void onHandedOut( DBClientBase * conn ) {} - virtual void onDestory( DBClientBase * conn ) {} + virtual void onDestroy( DBClientBase * conn ) {} }; /** Database connection pool. @@ -119,7 +119,7 @@ namespace mongo { void onCreate( DBClientBase * conn ); void onHandedOut( DBClientBase * conn ); - void onDestory( DBClientBase * conn ); + void onDestroy( DBClientBase * conn ); void flush(); diff --git a/client/dbclient.cpp b/client/dbclient.cpp index dadf7e4..5faeccf 100644 --- a/client/dbclient.cpp +++ b/client/dbclient.cpp @@ -247,6 +247,11 @@ namespace mongo { return o["ok"].trueValue(); } + bool DBClientWithCommands::isNotMasterErrorString( const BSONElement& e ) { + return e.type() == String && str::contains( e.valuestr() , "not master" ); + } + + enum QueryOptions DBClientWithCommands::availableOptions() { if ( !_haveCachedAvailableOptions ) { BSONObj ret; @@ -599,6 +604,19 @@ namespace mongo { return true; } + + inline bool DBClientConnection::runCommand(const string &dbname, const BSONObj& cmd, BSONObj &info, int options) { + if ( DBClientWithCommands::runCommand( dbname , cmd , info , options ) ) + return true; + + if ( clientSet && isNotMasterErrorString( info["errmsg"] ) ) { + clientSet->isntMaster(); + } + + return false; + } + + void DBClientConnection::_checkConnection() { if ( !_failed ) return; @@ -982,8 +1000,7 @@ namespace mongo { if ( clientSet && nReturned ) { assert(data); BSONObj o(data); - BSONElement e = getErrField(o); - if ( e.type() == String && str::contains( e.valuestr() , "not master" ) ) { + if ( isNotMasterErrorString( getErrField(o) ) ) { clientSet->isntMaster(); } } diff --git a/client/dbclient.h b/client/dbclient.h index 2b4bb85..ea55bb4 100644 --- a/client/dbclient.h +++ b/client/dbclient.h @@ -721,8 +721,12 @@ namespace mongo { } protected: + /** if the result of a command is ok*/ bool isOk(const BSONObj&); + /** if the element contains a not master error */ + bool isNotMasterErrorString( const BSONElement& e ); + BSONObj _countCmd(const string &ns, const BSONObj& query, int options, int limit, int skip ); enum QueryOptions availableOptions(); @@ -892,6 +896,8 @@ namespace mongo { unsigned long long query( boost::function<void(const BSONObj&)> f, const string& ns, Query query, const BSONObj *fieldsToReturn = 0, int queryOptions = 0); unsigned long long query( boost::function<void(DBClientCursorBatchIterator&)> f, const string& ns, Query query, const BSONObj *fieldsToReturn = 0, int queryOptions = 0); + virtual bool runCommand(const string &dbname, const BSONObj& cmd, BSONObj &info, int options=0); + /** @return true if this connection is currently in a failed state. When autoreconnect is on, a connection will transition back to an ok state after reconnecting. diff --git a/client/dbclient_rs.cpp b/client/dbclient_rs.cpp index 2cab1f7..c57a52d 100644 --- a/client/dbclient_rs.cpp +++ b/client/dbclient_rs.cpp @@ -247,38 +247,27 @@ namespace mongo { } HostAndPort ReplicaSetMonitor::getSlave() { + LOG(2) << "dbclient_rs getSlave " << getServerAddress() << endl; - LOG(2) << "selecting new slave from replica set " << getServerAddress() << endl; - - // Logic is to retry three times for any secondary node, if we can't find any secondary, we'll take - // any "ok" node - // TODO: Could this query hidden nodes? - const int MAX = 3; - for ( int xxx=0; xxx<MAX; xxx++ ) { + scoped_lock lk( _lock ); - { - scoped_lock lk( _lock ); - - unsigned i = 0; - for ( ; i<_nodes.size(); i++ ) { - _nextSlave = ( _nextSlave + 1 ) % _nodes.size(); - if ( _nextSlave == _master ){ - LOG(2) << "not selecting " << _nodes[_nextSlave] << " as it is the current master" << endl; - continue; - } - if ( _nodes[ _nextSlave ].okForSecondaryQueries() || ( _nodes[ _nextSlave ].ok && ( xxx + 1 ) >= MAX ) ) - return _nodes[ _nextSlave ].addr; - - LOG(2) << "not selecting " << _nodes[_nextSlave] << " as it is not ok to use" << endl; - } - + for ( unsigned ii = 0; ii < _nodes.size(); ii++ ) { + _nextSlave = ( _nextSlave + 1 ) % _nodes.size(); + if ( _nextSlave != _master ) { + if ( _nodes[ _nextSlave ].okForSecondaryQueries() ) + return _nodes[ _nextSlave ].addr; + LOG(2) << "dbclient_rs getSlave not selecting " << _nodes[_nextSlave] << ", not currently okForSecondaryQueries" << endl; } + } - check(false); + if( _master >= 0 ) { + assert( static_cast<unsigned>(_master) < _nodes.size() ); + LOG(2) << "dbclient_rs getSlave no member in secondary state found, returning primary " << _nodes[ _master ] << endl; + return _nodes[_master].addr; } - LOG(2) << "no suitable slave nodes found, returning default node " << _nodes[ 0 ] << endl; - + LOG(2) << "dbclient_rs getSlave no suitable member found, returning first node " << _nodes[ 0 ] << endl; + assert( _nodes.size() > 0 ); return _nodes[0].addr; } @@ -820,10 +809,14 @@ namespace mongo { bool DBClientReplicaSet::call( Message &toSend, Message &response, bool assertOk , string * actualServer ) { + const char * ns = 0; + if ( toSend.operation() == dbQuery ) { // TODO: might be possible to do this faster by changing api DbMessage dm( toSend ); QueryMessage qm( dm ); + ns = qm.ns; + if ( qm.queryOptions & QueryOption_SlaveOk ) { for ( int i=0; i<3; i++ ) { try { @@ -844,7 +837,26 @@ namespace mongo { DBClientConnection* m = checkMaster(); if ( actualServer ) *actualServer = m->getServerAddress(); - return m->call( toSend , response , assertOk ); + + if ( ! m->call( toSend , response , assertOk ) ) + return false; + + if ( ns ) { + QueryResult * res = (QueryResult*)response.singleData(); + if ( res->nReturned == 1 ) { + BSONObj x(res->data() ); + if ( str::contains( ns , "$cmd" ) ) { + if ( isNotMasterErrorString( x["errmsg"] ) ) + isntMaster(); + } + else { + if ( isNotMasterErrorString( getErrField( x ) ) ) + isntMaster(); + } + } + } + + return true; } } diff --git a/client/distlock.cpp b/client/distlock.cpp index cb71159..595fc38 100644 --- a/client/distlock.cpp +++ b/client/distlock.cpp @@ -22,6 +22,7 @@ namespace mongo { LabeledLevel DistributedLock::logLvl( 1 ); + DistributedLock::LastPings DistributedLock::lastPings; ThreadLocalValue<string> distLockIds(""); @@ -84,7 +85,7 @@ namespace mongo { Date_t pingTime; try { - ScopedDbConnection conn( addr ); + ScopedDbConnection conn( addr, 30.0 ); pingTime = jsTime(); @@ -224,7 +225,7 @@ namespace mongo { string s = pingThreadId( conn, processId ); // Ignore if we already have a pinging thread for this process. - if ( _seen.count( s ) > 0 ) return ""; + if ( _seen.count( s ) > 0 ) return s; // Check our clock skew try { @@ -303,6 +304,18 @@ namespace mongo { log( logLvl - 1 ) << "created new distributed lock for " << name << " on " << conn << " ( lock timeout : " << _lockTimeout << ", ping interval : " << _lockPing << ", process : " << asProcess << " )" << endl; + + + } + + DistributedLock::PingData DistributedLock::LastPings::getLastPing( const ConnectionString& conn, const string& lockName ){ + scoped_lock lock( _mutex ); + return _lastPings[ std::pair< string, string >( conn.toString(), lockName ) ]; + } + + void DistributedLock::LastPings::setLastPing( const ConnectionString& conn, const string& lockName, const PingData& pd ){ + scoped_lock lock( _mutex ); + _lastPings[ std::pair< string, string >( conn.toString(), lockName ) ] = pd; } Date_t DistributedLock::getRemoteTime() { @@ -512,6 +525,7 @@ namespace mongo { unsigned long long elapsed = 0; unsigned long long takeover = _lockTimeout; + PingData _lastPingCheck = getLastPing(); log( logLvl ) << "checking last ping for lock '" << lockName << "'" << " against process " << _lastPingCheck.get<0>() << " and ping " << _lastPingCheck.get<1>() << endl; @@ -527,8 +541,7 @@ namespace mongo { if( recPingChange || recTSChange ) { // If the ping has changed since we last checked, mark the current date and time - scoped_lock lk( _mutex ); - _lastPingCheck = boost::tuple<string, Date_t, Date_t, OID>( lastPing["_id"].String().c_str(), lastPing["ping"].Date(), remote, o["ts"].OID() ); + setLastPing( PingData( lastPing["_id"].String().c_str(), lastPing["ping"].Date(), remote, o["ts"].OID() ) ); } else { @@ -540,7 +553,6 @@ namespace mongo { else elapsed = remote - _lastPingCheck.get<2>(); } - } catch( LockException& e ) { diff --git a/client/distlock.h b/client/distlock.h index 8985672..106a5d0 100644 --- a/client/distlock.h +++ b/client/distlock.h @@ -71,6 +71,22 @@ namespace mongo { static LabeledLevel logLvl; + typedef boost::tuple<string, Date_t, Date_t, OID> PingData; + + class LastPings { + public: + LastPings() : _mutex( "DistributedLock::LastPings" ) {} + ~LastPings(){} + + PingData getLastPing( const ConnectionString& conn, const string& lockName ); + void setLastPing( const ConnectionString& conn, const string& lockName, const PingData& pd ); + + mongo::mutex _mutex; + map< std::pair<string, string>, PingData > _lastPings; + }; + + static LastPings lastPings; + /** * The constructor does not connect to the configdb yet and constructing does not mean the lock was acquired. * Construction does trigger a lock "pinging" mechanism, though. @@ -145,16 +161,12 @@ namespace mongo { private: - void resetLastPing(){ - scoped_lock lk( _mutex ); - _lastPingCheck = boost::tuple<string, Date_t, Date_t, OID>(); - } - - mongo::mutex _mutex; + void resetLastPing(){ lastPings.setLastPing( _conn, _name, PingData() ); } + void setLastPing( const PingData& pd ){ lastPings.setLastPing( _conn, _name, pd ); } + PingData getLastPing(){ return lastPings.getLastPing( _conn, _name ); } - // Data from last check of process with ping time - boost::tuple<string, Date_t, Date_t, OID> _lastPingCheck; // May or may not exist, depending on startup + mongo::mutex _mutex; string _threadId; }; diff --git a/client/distlock_test.cpp b/client/distlock_test.cpp index 42a1c48..5f37e6b 100644 --- a/client/distlock_test.cpp +++ b/client/distlock_test.cpp @@ -195,6 +195,7 @@ namespace mongo { boost::variate_generator<boost::mt19937&, boost::uniform_int<> > randomSkew(gen, boost::uniform_int<>(0, skewRange)); boost::variate_generator<boost::mt19937&, boost::uniform_int<> > randomWait(gen, boost::uniform_int<>(1, threadWait)); boost::variate_generator<boost::mt19937&, boost::uniform_int<> > randomSleep(gen, boost::uniform_int<>(1, threadSleep)); + boost::variate_generator<boost::mt19937&, boost::uniform_int<> > randomNewLock(gen, boost::uniform_int<>(0, 3)); int skew = 0; @@ -262,7 +263,7 @@ namespace mongo { } else { log() << "**** Not unlocking for thread " << threadId << endl; - DistributedLock::killPinger( *myLock ); + assert( DistributedLock::killPinger( *myLock ) ); // We're simulating a crashed process... break; } @@ -274,6 +275,12 @@ namespace mongo { break; } + // Create a new lock 1/3 of the time + if( randomNewLock() > 1 ){ + lock.reset(new DistributedLock( hostConn, lockName, takeoverMS, true )); + myLock = lock.get(); + } + sleepmillis(randomSleep()); } diff --git a/client/parallel.cpp b/client/parallel.cpp index 76b0168..3a33eb5 100644 --- a/client/parallel.cpp +++ b/client/parallel.cpp @@ -67,7 +67,7 @@ namespace mongo { assert( cursor ); if ( cursor->hasResultFlag( ResultFlag_ShardConfigStale ) ) { - throw StaleConfigException( _ns , "ClusteredCursor::query" ); + throw StaleConfigException( _ns , "ClusteredCursor::_checkCursor" ); } if ( cursor->hasResultFlag( ResultFlag_ErrSet ) ) { @@ -90,7 +90,7 @@ namespace mongo { if ( conn.setVersion() ) { conn.done(); - throw StaleConfigException( _ns , "ClusteredCursor::query ShardConnection had to change" , true ); + throw StaleConfigException( _ns , "ClusteredCursor::query" , true ); } LOG(5) << "ClusteredCursor::query (" << type() << ") server:" << server @@ -490,7 +490,7 @@ namespace mongo { if ( conns[i]->setVersion() ) { conns[i]->done(); - staleConfigExs.push_back( StaleConfigException( _ns , "ClusteredCursor::query ShardConnection had to change" , true ).what() + errLoc ); + staleConfigExs.push_back( (string)"stale config detected for " + StaleConfigException( _ns , "ParallelCursor::_init" , true ).what() + errLoc ); break; } @@ -592,7 +592,7 @@ namespace mongo { // when we throw our exception allConfigStale = true; - staleConfigExs.push_back( e.what() + errLoc ); + staleConfigExs.push_back( (string)"stale config detected for " + e.what() + errLoc ); _cursors[i].reset( NULL ); conns[i]->done(); continue; |