diff options
author | Antonin Kral <a.kral@bobek.cz> | 2012-05-10 06:57:54 +0200 |
---|---|---|
committer | Antonin Kral <a.kral@bobek.cz> | 2012-05-10 06:57:54 +0200 |
commit | 61619b3142c1de8f60f91964ff2656054d4f11a6 (patch) | |
tree | d3aaf9d1e70cac8efa0856e5b5ba39e2fb9dc526 /client | |
parent | eaaa7b30c99b89b5483e0a372bb73fe8c8695185 (diff) | |
download | mongodb-61619b3142c1de8f60f91964ff2656054d4f11a6.tar.gz |
Imported Upstream version 2.0.5
Diffstat (limited to 'client')
-rw-r--r-- | client/dbclient.cpp | 1 | ||||
-rw-r--r-- | client/dbclient_rs.cpp | 203 | ||||
-rw-r--r-- | client/dbclient_rs.h | 44 | ||||
-rw-r--r-- | client/dbclientcursor.cpp | 15 | ||||
-rw-r--r-- | client/parallel.cpp | 14 | ||||
-rw-r--r-- | client/redef_macros.h | 53 | ||||
-rw-r--r-- | client/undef_macros.h | 32 |
7 files changed, 284 insertions, 78 deletions
diff --git a/client/dbclient.cpp b/client/dbclient.cpp index 67ecea0..6b9631b 100644 --- a/client/dbclient.cpp +++ b/client/dbclient.cpp @@ -952,6 +952,7 @@ namespace mongo { an exception. we should make it return void and just throw an exception anytime it fails */ + checkConnection(); try { if ( !port().call(toSend, response) ) { _failed = true; diff --git a/client/dbclient_rs.cpp b/client/dbclient_rs.cpp index 0189700..4a8112b 100644 --- a/client/dbclient_rs.cpp +++ b/client/dbclient_rs.cpp @@ -211,6 +211,7 @@ namespace mongo { void ReplicaSetMonitor::notifyFailure( const HostAndPort& server ) { scoped_lock lk( _lock ); + if ( _master >= 0 && _master < (int)_nodes.size() ) { if ( server == _nodes[_master].addr ) { _nodes[_master].ok = false; @@ -292,7 +293,7 @@ namespace mongo { } /** - * notify the monitor that server has faild + * notify the monitor that server has failed */ void ReplicaSetMonitor::notifySlaveFailure( const HostAndPort& server ) { int x = _find( server ); @@ -402,15 +403,15 @@ namespace mongo { // Our host list may have changed while waiting for another thread in the meantime, // so double-check here - // TODO: Do we really need this much protection, this should be pretty rare and not triggered - // from lots of threads, duping old behavior for safety + // TODO: Do we really need this much protection, this should be pretty rare and not + // triggered from lots of threads, duping old behavior for safety if( ! _shouldChangeHosts( hostList, true ) ){ changed = false; return; } - // LogLevel can be pretty low, since replica set reconfiguration should be pretty rare and we - // want to record our changes + // LogLevel can be pretty low, since replica set reconfiguration should be pretty rare and + // we want to record our changes log() << "changing hosts to " << hostList << " from " << _getServerAddress_inlock() << endl; NodeDiff diff = _getHostDiff_inlock( hostList ); @@ -424,7 +425,6 @@ namespace mongo { for( set<int>::reverse_iterator i = removed.rbegin(), end = removed.rend(); i != end; ++i ){ log() << "erasing host " << _nodes[ *i ] << " from replica set " << this->_name << endl; - _nodes.erase( _nodes.begin() + *i ); } @@ -450,29 +450,52 @@ namespace mongo { _nodes.push_back( Node( h , newConn ) ); } - } - - bool ReplicaSetMonitor::_checkConnection( DBClientConnection * c , string& maybePrimary , bool verbose , int nodesOffset ) { - assert( c ); + bool ReplicaSetMonitor::_checkConnection( DBClientConnection* conn, + string& maybePrimary, bool verbose, int nodesOffset ) { + + assert( conn ); + scoped_lock lk( _checkConnectionLock ); bool isMaster = false; bool changed = false; + bool errorOccured = false; + + if ( nodesOffset >= 0 ){ + scoped_lock lk( _lock ); + if ( !_checkConnMatch_inlock( conn, nodesOffset )) { + /* Another thread modified _nodes -> invariant broken. + * This also implies that another thread just passed + * through here and refreshed _nodes. So no need to do + * duplicate work. + */ + return false; + } + } + try { Timer t; BSONObj o; - c->isMaster(isMaster, &o); + conn->isMaster( isMaster, &o ); + if ( o["setName"].type() != String || o["setName"].String() != _name ) { - warning() << "node: " << c->getServerAddress() << " isn't a part of set: " << _name + warning() << "node: " << conn->getServerAddress() + << " isn't a part of set: " << _name << " ismaster: " << o << endl; - if ( nodesOffset >= 0 ) + + if ( nodesOffset >= 0 ) { + scoped_lock lk( _lock ); _nodes[nodesOffset].ok = false; + } + return false; } if ( nodesOffset >= 0 ) { + scoped_lock lk( _lock ); + _nodes[nodesOffset].pingTimeMillis = t.millis(); _nodes[nodesOffset].hidden = o["hidden"].trueValue(); _nodes[nodesOffset].secondary = o["secondary"].trueValue(); @@ -481,7 +504,8 @@ namespace mongo { _nodes[nodesOffset].lastIsMaster = o.copy(); } - log( ! verbose ) << "ReplicaSetMonitor::_checkConnection: " << c->toString() << ' ' << o << endl; + log( ! verbose ) << "ReplicaSetMonitor::_checkConnection: " << conn->toString() + << ' ' << o << endl; // add other nodes BSONArrayBuilder b; @@ -492,18 +516,25 @@ namespace mongo { BSONObjIterator it( o["hosts"].Obj() ); while( it.more() ) b.append( it.next() ); } + if (o.hasField("passives") && o["passives"].type() == Array) { BSONObjIterator it( o["passives"].Obj() ); while( it.more() ) b.append( it.next() ); } _checkHosts( b.arr(), changed); - _checkStatus(c); + _checkStatus( conn ); - } catch ( std::exception& e ) { - log( ! verbose ) << "ReplicaSetMonitor::_checkConnection: caught exception " << c->toString() << ' ' << e.what() << endl; + log( ! verbose ) << "ReplicaSetMonitor::_checkConnection: caught exception " + << conn->toString() << ' ' << e.what() << endl; + + errorOccured = true; + } + + if ( errorOccured ) { + scoped_lock lk( _lock ); _nodes[nodesOffset].ok = false; } @@ -514,63 +545,114 @@ namespace mongo { } void ReplicaSetMonitor::_check( bool checkAllSecondaries ) { - - bool triedQuickCheck = false; - LOG(1) << "_check : " << getServerAddress() << endl; int newMaster = -1; + shared_ptr<DBClientConnection> nodeConn; for ( int retry = 0; retry < 2; retry++ ) { - for ( unsigned i=0; i<_nodes.size(); i++ ) { - shared_ptr<DBClientConnection> c; + bool triedQuickCheck = false; + + if ( !checkAllSecondaries ) { + scoped_lock lk( _lock ); + if ( _master >= 0 ) { + /* Nothing else to do since another thread already + * found the _master + */ + return; + } + } + + for ( unsigned i = 0; /* should not check while outside of lock! */ ; i++ ) { { scoped_lock lk( _lock ); - c = _nodes[i].conn; + if ( i >= _nodes.size() ) break; + nodeConn = _nodes[i].conn; } string maybePrimary; - if ( _checkConnection( c.get() , maybePrimary , retry , i ) ) { - _master = i; - newMaster = i; - if ( ! checkAllSecondaries ) - return; + if ( _checkConnection( nodeConn.get(), maybePrimary, retry, i ) ) { + scoped_lock lk( _lock ); + if ( _checkConnMatch_inlock( nodeConn.get(), i )) { + _master = i; + newMaster = i; + + if ( !checkAllSecondaries ) + return; + } + else { + /* + * Somebody modified _nodes and most likely set the new + * _master, so try again. + */ + break; + } } - if ( ! triedQuickCheck && maybePrimary.size() ) { - int x = _find( maybePrimary ); - if ( x >= 0 ) { + + if ( ! triedQuickCheck && ! maybePrimary.empty() ) { + int probablePrimaryIdx = -1; + shared_ptr<DBClientConnection> probablePrimaryConn; + + { + scoped_lock lk( _lock ); + probablePrimaryIdx = _find_inlock( maybePrimary ); + probablePrimaryConn = _nodes[probablePrimaryIdx].conn; + } + + if ( probablePrimaryIdx >= 0 ) { triedQuickCheck = true; + string dummy; - shared_ptr<DBClientConnection> testConn; - { + if ( _checkConnection( probablePrimaryConn.get(), dummy, + false, probablePrimaryIdx ) ) { + scoped_lock lk( _lock ); - testConn = _nodes[x].conn; - } - if ( _checkConnection( testConn.get() , dummy , false , x ) ) { - _master = x; - newMaster = x; - if ( ! checkAllSecondaries ) - return; + + if ( _checkConnMatch_inlock( probablePrimaryConn.get(), + probablePrimaryIdx )) { + + _master = probablePrimaryIdx; + newMaster = probablePrimaryIdx; + + if ( ! checkAllSecondaries ) + return; + } + else { + /* + * Somebody modified _nodes and most likely set the + * new _master, so try again. + */ + break; + } } } } - } if ( newMaster >= 0 ) return; - sleepsecs(1); + sleepsecs( 1 ); } - } void ReplicaSetMonitor::check( bool checkAllSecondaries ) { - // first see if the current master is fine - if ( _master >= 0 ) { + shared_ptr<DBClientConnection> masterConn; + + { + scoped_lock lk( _lock ); + + // first see if the current master is fine + if ( _master >= 0 ) { + masterConn = _nodes[_master].conn; + } + } + + if ( masterConn.get() != NULL ) { string temp; - if ( _checkConnection( _nodes[_master].conn.get() , temp , false , _master ) ) { + + if ( _checkConnection( masterConn.get(), temp, false, _master )) { if ( ! checkAllSecondaries ) { // current master is fine, so we're done return; @@ -588,21 +670,17 @@ namespace mongo { } int ReplicaSetMonitor::_find_inlock( const string& server ) const { - for ( unsigned i=0; i<_nodes.size(); i++ ) - if ( _nodes[i].addr == server ) - return i; - return -1; - } - + const size_t size = _nodes.size(); - int ReplicaSetMonitor::_find( const HostAndPort& server ) const { - scoped_lock lk( _lock ); - for ( unsigned i=0; i<_nodes.size(); i++ ) - if ( _nodes[i].addr == server ) + for ( unsigned i = 0; i < size; i++ ) { + if ( _nodes[i].addr == server ) { return i; + } + } + return -1; } - + void ReplicaSetMonitor::appendInfo( BSONObjBuilder& b ) const { scoped_lock lk( _lock ); BSONArrayBuilder hosts( b.subarrayStart( "hosts" ) ); @@ -622,6 +700,13 @@ namespace mongo { b.append( "nextSlave" , _nextSlave ); } + bool ReplicaSetMonitor::_checkConnMatch_inlock( DBClientConnection* conn, + size_t nodeOffset ) const { + + return ( nodeOffset < _nodes.size() && + conn->getServerAddress() == _nodes[nodeOffset].conn->getServerAddress() ); + } + mongo::mutex ReplicaSetMonitor::_setsLock( "ReplicaSetMonitor" ); map<string,ReplicaSetMonitorPtr> ReplicaSetMonitor::_sets; @@ -645,6 +730,7 @@ namespace mongo { // a master is selected. let's just make sure connection didn't die if ( ! _master->isFailed() ) return _master.get(); + _monitor->notifyFailure( _masterHost ); } @@ -687,7 +773,6 @@ namespace mongo { warning() << "cached auth failed for set: " << _monitor->getName() << " db: " << a.dbname << " user: " << a.username << endl; } - } DBClientConnection& DBClientReplicaSet::masterConn() { @@ -794,6 +879,8 @@ namespace mongo { } auto_ptr<DBClientCursor> DBClientReplicaSet::checkSlaveQueryResult( auto_ptr<DBClientCursor> result ){ + if ( result.get() == NULL ) return result; + BSONObj error; bool isError = result->peekError( &error ); if( ! isError ) return result; diff --git a/client/dbclient_rs.h b/client/dbclient_rs.h index b68af29..bf91f09 100644 --- a/client/dbclient_rs.h +++ b/client/dbclient_rs.h @@ -107,6 +107,13 @@ namespace mongo { */ ReplicaSetMonitor( const string& name , const vector<HostAndPort>& servers ); + /** + * Checks all connections from the host list and sets the current + * master. + * + * @param checkAllSecondaries if set to false, stop immediately when + * the master is found or when _master is not -1. + */ void _check( bool checkAllSecondaries ); /** @@ -125,25 +132,50 @@ namespace mongo { /** * Updates host list. - * @param c the connection to check + * Invariant: if nodesOffset is >= 0, _nodes[nodesOffset].conn should be + * equal to conn. + * + * @param conn the connection to check * @param maybePrimary OUT * @param verbose * @param nodesOffset - offset into _nodes array, -1 for not in it - * @return if the connection is good + * + * @return true if the connection is good or false if invariant + * is broken */ - bool _checkConnection( DBClientConnection * c , string& maybePrimary , bool verbose , int nodesOffset ); + bool _checkConnection( DBClientConnection* conn, string& maybePrimary, + bool verbose, int nodesOffset ); string _getServerAddress_inlock() const; NodeDiff _getHostDiff_inlock( const BSONObj& hostList ); bool _shouldChangeHosts( const BSONObj& hostList, bool inlock ); - + /** + * @return the index to _nodes corresponding to the server address. + */ int _find( const string& server ) const ; int _find_inlock( const string& server ) const ; - int _find( const HostAndPort& server ) const ; - mutable mongo::mutex _lock; // protects _nodes + /** + * Checks whether the given connection matches the connection stored in _nodes. + * Mainly used for sanity checking to confirm that nodeOffset still + * refers to the right connection after releasing and reacquiring + * a mutex. + */ + bool _checkConnMatch_inlock( DBClientConnection* conn, size_t nodeOffset ) const; + + // protects _nodes and indices pointing to it (_master & _nextSlave) + mutable mongo::mutex _lock; + + /** + * "Synchronizes" the _checkConnection method. Should ideally be one mutex per + * connection object being used. The purpose of this lock is to make sure that + * the reply from the connection the lock holder got is the actual response + * to what it sent. + * + * Deadlock WARNING: never acquire this while holding _lock + */ mutable mongo::mutex _checkConnectionLock; string _name; diff --git a/client/dbclientcursor.cpp b/client/dbclientcursor.cpp index 5db360e..9e7e8a6 100644 --- a/client/dbclientcursor.cpp +++ b/client/dbclientcursor.cpp @@ -290,12 +290,23 @@ namespace mongo { m.setData( dbKillCursors , b.buf() , b.len() ); if ( _client ) { - _client->sayPiggyBack( m ); + + // Kill the cursor the same way the connection itself would. Usually, non-lazily + if( DBClientConnection::getLazyKillCursor() ) + _client->sayPiggyBack( m ); + else + _client->say( m ); + } else { assert( _scopedHost.size() ); ScopedDbConnection conn( _scopedHost ); - conn->sayPiggyBack( m ); + + if( DBClientConnection::getLazyKillCursor() ) + conn->sayPiggyBack( m ); + else + conn->say( m ); + conn.done(); } } diff --git a/client/parallel.cpp b/client/parallel.cpp index 3a33eb5..d7975a6 100644 --- a/client/parallel.cpp +++ b/client/parallel.cpp @@ -503,7 +503,19 @@ namespace mongo { 0 , // nToSkip _fields.isEmpty() ? 0 : &_fields , // fieldsToReturn _options , - _batchSize == 0 ? 0 : _batchSize + _needToSkip // batchSize + // NtoReturn is weird. + // If zero, it means use default size, so we do that for all cursors + // If positive, it's the batch size (we don't want this cursor limiting results), tha + // done at a higher level + // If negative, it's the batch size, but we don't create a cursor - so we don't want + // to create a child cursor either. + // Either way, if non-zero, we want to pull back the batch size + the skip amount as + // quickly as possible. Potentially, for a cursor on a single shard or if we keep be + // chunks, we can actually add the skip value into the cursor and/or make some assump + // return value size ( (batch size + skip amount) / num_servers ). + _batchSize == 0 ? 0 : + ( _batchSize > 0 ? _batchSize + _needToSkip : + _batchSize - _needToSkip ) // batchSize ) ); try{ diff --git a/client/redef_macros.h b/client/redef_macros.h index 897912d..5a39561 100644 --- a/client/redef_macros.h +++ b/client/redef_macros.h @@ -1,4 +1,4 @@ -/** @file redef_macros.h macros the implementation uses. +/** @file redef_macros.h macros for mongo internals @see undef_macros.h undefines these after use to minimize name pollution. */ @@ -20,42 +20,83 @@ // If you define a new global un-prefixed macro, please add it here and in undef_macros -// #pragma once // this file is intended to be processed multiple times - -#if defined(MONGO_MACROS_CLEANED) +#define MONGO_MACROS_PUSHED 1 // util/allocator.h +#pragma push_macro("malloc") +#undef malloc #define malloc MONGO_malloc +#pragma push_macro("realloc") +#undef realloc #define realloc MONGO_realloc // util/assert_util.h +#pragma push_macro("assert") +#undef assert #define assert MONGO_assert +#pragma push_macro("verify") +#undef verify +#define verify MONGO_verify +#pragma push_macro("dassert") +#undef dassert #define dassert MONGO_dassert +#pragma push_macro("wassert") +#undef wassert #define wassert MONGO_wassert +#pragma push_macro("massert") +#undef massert #define massert MONGO_massert +#pragma push_macro("uassert") +#undef uassert #define uassert MONGO_uassert #define BOOST_CHECK_EXCEPTION MONGO_BOOST_CHECK_EXCEPTION +#pragma push_macro("DESTRUCTOR_GUARD") +#undef DESTRUCTOR_GUARD #define DESTRUCTOR_GUARD MONGO_DESTRUCTOR_GUARD // util/goodies.h +#pragma push_macro("PRINT") +#undef PRINT #define PRINT MONGO_PRINT +#pragma push_macro("PRINTFL") +#undef PRINTFL #define PRINTFL MONGO_PRINTFL +#pragma push_macro("asctime") +#undef asctime #define asctime MONGO_asctime +#pragma push_macro("gmtime") +#undef gmtime #define gmtime MONGO_gmtime +#pragma push_macro("localtime") +#undef localtime #define localtime MONGO_localtime +#pragma push_macro("ctime") +#undef ctime #define ctime MONGO_ctime // util/debug_util.h +#pragma push_macro("DEV") +#undef DEV #define DEV MONGO_DEV +#pragma push_macro("DEBUGGING") +#undef DEBUGGING #define DEBUGGING MONGO_DEBUGGING +#pragma push_macro("SOMETIMES") +#undef SOMETIMES #define SOMETIMES MONGO_SOMETIMES +#pragma push_macro("OCCASIONALLY") +#undef OCCASIONALLY #define OCCASIONALLY MONGO_OCCASIONALLY +#pragma push_macro("RARELY") +#undef RARELY #define RARELY MONGO_RARELY +#pragma push_macro("ONCE") +#undef ONCE #define ONCE MONGO_ONCE // util/log.h +#pragma push_macro("LOG") +#undef LOG #define LOG MONGO_LOG -#undef MONGO_MACROS_CLEANED -#endif diff --git a/client/undef_macros.h b/client/undef_macros.h index 30ece61..c880f06 100644 --- a/client/undef_macros.h +++ b/client/undef_macros.h @@ -19,43 +19,65 @@ // #pragma once // this file is intended to be processed multiple times +#if !defined (MONGO_EXPOSE_MACROS) -/** MONGO_EXPOSE_MACROS - when defined, indicates that you are compiling a mongo program rather - than just using the C++ driver. -*/ -#if !defined(MONGO_EXPOSE_MACROS) && !defined(MONGO_MACROS_CLEANED) +#ifdef MONGO_MACROS_PUSHED // util/allocator.h #undef malloc +#pragma pop_macro("malloc") #undef realloc +#pragma pop_macro("realloc") // util/assert_util.h #undef assert +#pragma pop_macro("assert") #undef dassert +#pragma pop_macro("dassert") #undef wassert +#pragma pop_macro("wassert") #undef massert +#pragma pop_macro("massert") #undef uassert +#pragma pop_macro("uassert") #undef BOOST_CHECK_EXCEPTION +#undef verify +#pragma pop_macro("verify") #undef DESTRUCTOR_GUARD +#pragma pop_macro("DESTRUCTOR_GUARD") // util/goodies.h #undef PRINT +#pragma pop_macro("PRINT") #undef PRINTFL +#pragma pop_macro("PRINTFL") #undef asctime +#pragma pop_macro("asctime") #undef gmtime +#pragma pop_macro("gmtime") #undef localtime +#pragma pop_macro("localtime") #undef ctime +#pragma pop_macro("ctime") // util/debug_util.h #undef DEV +#pragma pop_macro("DEV") #undef DEBUGGING +#pragma pop_macro("DEBUGGING") #undef SOMETIMES +#pragma pop_macro("SOMETIMES") #undef OCCASIONALLY +#pragma pop_macro("OCCASIONALLY") #undef RARELY +#pragma pop_macro("RARELY") #undef ONCE +#pragma pop_macro("ONCE") // util/log.h #undef LOG +#pragma pop_macro("LOG") -#define MONGO_MACROS_CLEANED +#undef MONGO_MACROS_PUSHED +#endif #endif |