summaryrefslogtreecommitdiff
path: root/client
diff options
context:
space:
mode:
authorAntonin Kral <a.kral@bobek.cz>2012-05-10 06:57:54 +0200
committerAntonin Kral <a.kral@bobek.cz>2012-05-10 06:57:54 +0200
commit61619b3142c1de8f60f91964ff2656054d4f11a6 (patch)
treed3aaf9d1e70cac8efa0856e5b5ba39e2fb9dc526 /client
parenteaaa7b30c99b89b5483e0a372bb73fe8c8695185 (diff)
downloadmongodb-61619b3142c1de8f60f91964ff2656054d4f11a6.tar.gz
Imported Upstream version 2.0.5
Diffstat (limited to 'client')
-rw-r--r--client/dbclient.cpp1
-rw-r--r--client/dbclient_rs.cpp203
-rw-r--r--client/dbclient_rs.h44
-rw-r--r--client/dbclientcursor.cpp15
-rw-r--r--client/parallel.cpp14
-rw-r--r--client/redef_macros.h53
-rw-r--r--client/undef_macros.h32
7 files changed, 284 insertions, 78 deletions
diff --git a/client/dbclient.cpp b/client/dbclient.cpp
index 67ecea0..6b9631b 100644
--- a/client/dbclient.cpp
+++ b/client/dbclient.cpp
@@ -952,6 +952,7 @@ namespace mongo {
an exception. we should make it return void and just throw an exception anytime
it fails
*/
+ checkConnection();
try {
if ( !port().call(toSend, response) ) {
_failed = true;
diff --git a/client/dbclient_rs.cpp b/client/dbclient_rs.cpp
index 0189700..4a8112b 100644
--- a/client/dbclient_rs.cpp
+++ b/client/dbclient_rs.cpp
@@ -211,6 +211,7 @@ namespace mongo {
void ReplicaSetMonitor::notifyFailure( const HostAndPort& server ) {
scoped_lock lk( _lock );
+
if ( _master >= 0 && _master < (int)_nodes.size() ) {
if ( server == _nodes[_master].addr ) {
_nodes[_master].ok = false;
@@ -292,7 +293,7 @@ namespace mongo {
}
/**
- * notify the monitor that server has faild
+ * notify the monitor that server has failed
*/
void ReplicaSetMonitor::notifySlaveFailure( const HostAndPort& server ) {
int x = _find( server );
@@ -402,15 +403,15 @@ namespace mongo {
// Our host list may have changed while waiting for another thread in the meantime,
// so double-check here
- // TODO: Do we really need this much protection, this should be pretty rare and not triggered
- // from lots of threads, duping old behavior for safety
+ // TODO: Do we really need this much protection, this should be pretty rare and not
+ // triggered from lots of threads, duping old behavior for safety
if( ! _shouldChangeHosts( hostList, true ) ){
changed = false;
return;
}
- // LogLevel can be pretty low, since replica set reconfiguration should be pretty rare and we
- // want to record our changes
+ // LogLevel can be pretty low, since replica set reconfiguration should be pretty rare and
+ // we want to record our changes
log() << "changing hosts to " << hostList << " from " << _getServerAddress_inlock() << endl;
NodeDiff diff = _getHostDiff_inlock( hostList );
@@ -424,7 +425,6 @@ namespace mongo {
for( set<int>::reverse_iterator i = removed.rbegin(), end = removed.rend(); i != end; ++i ){
log() << "erasing host " << _nodes[ *i ] << " from replica set " << this->_name << endl;
-
_nodes.erase( _nodes.begin() + *i );
}
@@ -450,29 +450,52 @@ namespace mongo {
_nodes.push_back( Node( h , newConn ) );
}
-
}
-
- bool ReplicaSetMonitor::_checkConnection( DBClientConnection * c , string& maybePrimary , bool verbose , int nodesOffset ) {
- assert( c );
+ bool ReplicaSetMonitor::_checkConnection( DBClientConnection* conn,
+ string& maybePrimary, bool verbose, int nodesOffset ) {
+
+ assert( conn );
+
scoped_lock lk( _checkConnectionLock );
bool isMaster = false;
bool changed = false;
+ bool errorOccured = false;
+
+ if ( nodesOffset >= 0 ){
+ scoped_lock lk( _lock );
+ if ( !_checkConnMatch_inlock( conn, nodesOffset )) {
+ /* Another thread modified _nodes -> invariant broken.
+ * This also implies that another thread just passed
+ * through here and refreshed _nodes. So no need to do
+ * duplicate work.
+ */
+ return false;
+ }
+ }
+
try {
Timer t;
BSONObj o;
- c->isMaster(isMaster, &o);
+ conn->isMaster( isMaster, &o );
+
if ( o["setName"].type() != String || o["setName"].String() != _name ) {
- warning() << "node: " << c->getServerAddress() << " isn't a part of set: " << _name
+ warning() << "node: " << conn->getServerAddress()
+ << " isn't a part of set: " << _name
<< " ismaster: " << o << endl;
- if ( nodesOffset >= 0 )
+
+ if ( nodesOffset >= 0 ) {
+ scoped_lock lk( _lock );
_nodes[nodesOffset].ok = false;
+ }
+
return false;
}
if ( nodesOffset >= 0 ) {
+ scoped_lock lk( _lock );
+
_nodes[nodesOffset].pingTimeMillis = t.millis();
_nodes[nodesOffset].hidden = o["hidden"].trueValue();
_nodes[nodesOffset].secondary = o["secondary"].trueValue();
@@ -481,7 +504,8 @@ namespace mongo {
_nodes[nodesOffset].lastIsMaster = o.copy();
}
- log( ! verbose ) << "ReplicaSetMonitor::_checkConnection: " << c->toString() << ' ' << o << endl;
+ log( ! verbose ) << "ReplicaSetMonitor::_checkConnection: " << conn->toString()
+ << ' ' << o << endl;
// add other nodes
BSONArrayBuilder b;
@@ -492,18 +516,25 @@ namespace mongo {
BSONObjIterator it( o["hosts"].Obj() );
while( it.more() ) b.append( it.next() );
}
+
if (o.hasField("passives") && o["passives"].type() == Array) {
BSONObjIterator it( o["passives"].Obj() );
while( it.more() ) b.append( it.next() );
}
_checkHosts( b.arr(), changed);
- _checkStatus(c);
+ _checkStatus( conn );
-
}
catch ( std::exception& e ) {
- log( ! verbose ) << "ReplicaSetMonitor::_checkConnection: caught exception " << c->toString() << ' ' << e.what() << endl;
+ log( ! verbose ) << "ReplicaSetMonitor::_checkConnection: caught exception "
+ << conn->toString() << ' ' << e.what() << endl;
+
+ errorOccured = true;
+ }
+
+ if ( errorOccured ) {
+ scoped_lock lk( _lock );
_nodes[nodesOffset].ok = false;
}
@@ -514,63 +545,114 @@ namespace mongo {
}
void ReplicaSetMonitor::_check( bool checkAllSecondaries ) {
-
- bool triedQuickCheck = false;
-
LOG(1) << "_check : " << getServerAddress() << endl;
int newMaster = -1;
+ shared_ptr<DBClientConnection> nodeConn;
for ( int retry = 0; retry < 2; retry++ ) {
- for ( unsigned i=0; i<_nodes.size(); i++ ) {
- shared_ptr<DBClientConnection> c;
+ bool triedQuickCheck = false;
+
+ if ( !checkAllSecondaries ) {
+ scoped_lock lk( _lock );
+ if ( _master >= 0 ) {
+ /* Nothing else to do since another thread already
+ * found the _master
+ */
+ return;
+ }
+ }
+
+ for ( unsigned i = 0; /* should not check while outside of lock! */ ; i++ ) {
{
scoped_lock lk( _lock );
- c = _nodes[i].conn;
+ if ( i >= _nodes.size() ) break;
+ nodeConn = _nodes[i].conn;
}
string maybePrimary;
- if ( _checkConnection( c.get() , maybePrimary , retry , i ) ) {
- _master = i;
- newMaster = i;
- if ( ! checkAllSecondaries )
- return;
+ if ( _checkConnection( nodeConn.get(), maybePrimary, retry, i ) ) {
+ scoped_lock lk( _lock );
+ if ( _checkConnMatch_inlock( nodeConn.get(), i )) {
+ _master = i;
+ newMaster = i;
+
+ if ( !checkAllSecondaries )
+ return;
+ }
+ else {
+ /*
+ * Somebody modified _nodes and most likely set the new
+ * _master, so try again.
+ */
+ break;
+ }
}
- if ( ! triedQuickCheck && maybePrimary.size() ) {
- int x = _find( maybePrimary );
- if ( x >= 0 ) {
+
+ if ( ! triedQuickCheck && ! maybePrimary.empty() ) {
+ int probablePrimaryIdx = -1;
+ shared_ptr<DBClientConnection> probablePrimaryConn;
+
+ {
+ scoped_lock lk( _lock );
+ probablePrimaryIdx = _find_inlock( maybePrimary );
+ probablePrimaryConn = _nodes[probablePrimaryIdx].conn;
+ }
+
+ if ( probablePrimaryIdx >= 0 ) {
triedQuickCheck = true;
+
string dummy;
- shared_ptr<DBClientConnection> testConn;
- {
+ if ( _checkConnection( probablePrimaryConn.get(), dummy,
+ false, probablePrimaryIdx ) ) {
+
scoped_lock lk( _lock );
- testConn = _nodes[x].conn;
- }
- if ( _checkConnection( testConn.get() , dummy , false , x ) ) {
- _master = x;
- newMaster = x;
- if ( ! checkAllSecondaries )
- return;
+
+ if ( _checkConnMatch_inlock( probablePrimaryConn.get(),
+ probablePrimaryIdx )) {
+
+ _master = probablePrimaryIdx;
+ newMaster = probablePrimaryIdx;
+
+ if ( ! checkAllSecondaries )
+ return;
+ }
+ else {
+ /*
+ * Somebody modified _nodes and most likely set the
+ * new _master, so try again.
+ */
+ break;
+ }
}
}
}
-
}
if ( newMaster >= 0 )
return;
- sleepsecs(1);
+ sleepsecs( 1 );
}
-
}
void ReplicaSetMonitor::check( bool checkAllSecondaries ) {
- // first see if the current master is fine
- if ( _master >= 0 ) {
+ shared_ptr<DBClientConnection> masterConn;
+
+ {
+ scoped_lock lk( _lock );
+
+ // first see if the current master is fine
+ if ( _master >= 0 ) {
+ masterConn = _nodes[_master].conn;
+ }
+ }
+
+ if ( masterConn.get() != NULL ) {
string temp;
- if ( _checkConnection( _nodes[_master].conn.get() , temp , false , _master ) ) {
+
+ if ( _checkConnection( masterConn.get(), temp, false, _master )) {
if ( ! checkAllSecondaries ) {
// current master is fine, so we're done
return;
@@ -588,21 +670,17 @@ namespace mongo {
}
int ReplicaSetMonitor::_find_inlock( const string& server ) const {
- for ( unsigned i=0; i<_nodes.size(); i++ )
- if ( _nodes[i].addr == server )
- return i;
- return -1;
- }
-
+ const size_t size = _nodes.size();
- int ReplicaSetMonitor::_find( const HostAndPort& server ) const {
- scoped_lock lk( _lock );
- for ( unsigned i=0; i<_nodes.size(); i++ )
- if ( _nodes[i].addr == server )
+ for ( unsigned i = 0; i < size; i++ ) {
+ if ( _nodes[i].addr == server ) {
return i;
+ }
+ }
+
return -1;
}
-
+
void ReplicaSetMonitor::appendInfo( BSONObjBuilder& b ) const {
scoped_lock lk( _lock );
BSONArrayBuilder hosts( b.subarrayStart( "hosts" ) );
@@ -622,6 +700,13 @@ namespace mongo {
b.append( "nextSlave" , _nextSlave );
}
+ bool ReplicaSetMonitor::_checkConnMatch_inlock( DBClientConnection* conn,
+ size_t nodeOffset ) const {
+
+ return ( nodeOffset < _nodes.size() &&
+ conn->getServerAddress() == _nodes[nodeOffset].conn->getServerAddress() );
+ }
+
mongo::mutex ReplicaSetMonitor::_setsLock( "ReplicaSetMonitor" );
map<string,ReplicaSetMonitorPtr> ReplicaSetMonitor::_sets;
@@ -645,6 +730,7 @@ namespace mongo {
// a master is selected. let's just make sure connection didn't die
if ( ! _master->isFailed() )
return _master.get();
+
_monitor->notifyFailure( _masterHost );
}
@@ -687,7 +773,6 @@ namespace mongo {
warning() << "cached auth failed for set: " << _monitor->getName() << " db: " << a.dbname << " user: " << a.username << endl;
}
-
}
DBClientConnection& DBClientReplicaSet::masterConn() {
@@ -794,6 +879,8 @@ namespace mongo {
}
auto_ptr<DBClientCursor> DBClientReplicaSet::checkSlaveQueryResult( auto_ptr<DBClientCursor> result ){
+ if ( result.get() == NULL ) return result;
+
BSONObj error;
bool isError = result->peekError( &error );
if( ! isError ) return result;
diff --git a/client/dbclient_rs.h b/client/dbclient_rs.h
index b68af29..bf91f09 100644
--- a/client/dbclient_rs.h
+++ b/client/dbclient_rs.h
@@ -107,6 +107,13 @@ namespace mongo {
*/
ReplicaSetMonitor( const string& name , const vector<HostAndPort>& servers );
+ /**
+ * Checks all connections from the host list and sets the current
+ * master.
+ *
+ * @param checkAllSecondaries if set to false, stop immediately when
+ * the master is found or when _master is not -1.
+ */
void _check( bool checkAllSecondaries );
/**
@@ -125,25 +132,50 @@ namespace mongo {
/**
* Updates host list.
- * @param c the connection to check
+ * Invariant: if nodesOffset is >= 0, _nodes[nodesOffset].conn should be
+ * equal to conn.
+ *
+ * @param conn the connection to check
* @param maybePrimary OUT
* @param verbose
* @param nodesOffset - offset into _nodes array, -1 for not in it
- * @return if the connection is good
+ *
+ * @return true if the connection is good or false if invariant
+ * is broken
*/
- bool _checkConnection( DBClientConnection * c , string& maybePrimary , bool verbose , int nodesOffset );
+ bool _checkConnection( DBClientConnection* conn, string& maybePrimary,
+ bool verbose, int nodesOffset );
string _getServerAddress_inlock() const;
NodeDiff _getHostDiff_inlock( const BSONObj& hostList );
bool _shouldChangeHosts( const BSONObj& hostList, bool inlock );
-
+ /**
+ * @return the index to _nodes corresponding to the server address.
+ */
int _find( const string& server ) const ;
int _find_inlock( const string& server ) const ;
- int _find( const HostAndPort& server ) const ;
- mutable mongo::mutex _lock; // protects _nodes
+ /**
+ * Checks whether the given connection matches the connection stored in _nodes.
+ * Mainly used for sanity checking to confirm that nodeOffset still
+ * refers to the right connection after releasing and reacquiring
+ * a mutex.
+ */
+ bool _checkConnMatch_inlock( DBClientConnection* conn, size_t nodeOffset ) const;
+
+ // protects _nodes and indices pointing to it (_master & _nextSlave)
+ mutable mongo::mutex _lock;
+
+ /**
+ * "Synchronizes" the _checkConnection method. Should ideally be one mutex per
+ * connection object being used. The purpose of this lock is to make sure that
+ * the reply from the connection the lock holder got is the actual response
+ * to what it sent.
+ *
+ * Deadlock WARNING: never acquire this while holding _lock
+ */
mutable mongo::mutex _checkConnectionLock;
string _name;
diff --git a/client/dbclientcursor.cpp b/client/dbclientcursor.cpp
index 5db360e..9e7e8a6 100644
--- a/client/dbclientcursor.cpp
+++ b/client/dbclientcursor.cpp
@@ -290,12 +290,23 @@ namespace mongo {
m.setData( dbKillCursors , b.buf() , b.len() );
if ( _client ) {
- _client->sayPiggyBack( m );
+
+ // Kill the cursor the same way the connection itself would. Usually, non-lazily
+ if( DBClientConnection::getLazyKillCursor() )
+ _client->sayPiggyBack( m );
+ else
+ _client->say( m );
+
}
else {
assert( _scopedHost.size() );
ScopedDbConnection conn( _scopedHost );
- conn->sayPiggyBack( m );
+
+ if( DBClientConnection::getLazyKillCursor() )
+ conn->sayPiggyBack( m );
+ else
+ conn->say( m );
+
conn.done();
}
}
diff --git a/client/parallel.cpp b/client/parallel.cpp
index 3a33eb5..d7975a6 100644
--- a/client/parallel.cpp
+++ b/client/parallel.cpp
@@ -503,7 +503,19 @@ namespace mongo {
0 , // nToSkip
_fields.isEmpty() ? 0 : &_fields , // fieldsToReturn
_options ,
- _batchSize == 0 ? 0 : _batchSize + _needToSkip // batchSize
+ // NtoReturn is weird.
+ // If zero, it means use default size, so we do that for all cursors
+ // If positive, it's the batch size (we don't want this cursor limiting results), tha
+ // done at a higher level
+ // If negative, it's the batch size, but we don't create a cursor - so we don't want
+ // to create a child cursor either.
+ // Either way, if non-zero, we want to pull back the batch size + the skip amount as
+ // quickly as possible. Potentially, for a cursor on a single shard or if we keep be
+ // chunks, we can actually add the skip value into the cursor and/or make some assump
+ // return value size ( (batch size + skip amount) / num_servers ).
+ _batchSize == 0 ? 0 :
+ ( _batchSize > 0 ? _batchSize + _needToSkip :
+ _batchSize - _needToSkip ) // batchSize
) );
try{
diff --git a/client/redef_macros.h b/client/redef_macros.h
index 897912d..5a39561 100644
--- a/client/redef_macros.h
+++ b/client/redef_macros.h
@@ -1,4 +1,4 @@
-/** @file redef_macros.h macros the implementation uses.
+/** @file redef_macros.h macros for mongo internals
@see undef_macros.h undefines these after use to minimize name pollution.
*/
@@ -20,42 +20,83 @@
// If you define a new global un-prefixed macro, please add it here and in undef_macros
-// #pragma once // this file is intended to be processed multiple times
-
-#if defined(MONGO_MACROS_CLEANED)
+#define MONGO_MACROS_PUSHED 1
// util/allocator.h
+#pragma push_macro("malloc")
+#undef malloc
#define malloc MONGO_malloc
+#pragma push_macro("realloc")
+#undef realloc
#define realloc MONGO_realloc
// util/assert_util.h
+#pragma push_macro("assert")
+#undef assert
#define assert MONGO_assert
+#pragma push_macro("verify")
+#undef verify
+#define verify MONGO_verify
+#pragma push_macro("dassert")
+#undef dassert
#define dassert MONGO_dassert
+#pragma push_macro("wassert")
+#undef wassert
#define wassert MONGO_wassert
+#pragma push_macro("massert")
+#undef massert
#define massert MONGO_massert
+#pragma push_macro("uassert")
+#undef uassert
#define uassert MONGO_uassert
#define BOOST_CHECK_EXCEPTION MONGO_BOOST_CHECK_EXCEPTION
+#pragma push_macro("DESTRUCTOR_GUARD")
+#undef DESTRUCTOR_GUARD
#define DESTRUCTOR_GUARD MONGO_DESTRUCTOR_GUARD
// util/goodies.h
+#pragma push_macro("PRINT")
+#undef PRINT
#define PRINT MONGO_PRINT
+#pragma push_macro("PRINTFL")
+#undef PRINTFL
#define PRINTFL MONGO_PRINTFL
+#pragma push_macro("asctime")
+#undef asctime
#define asctime MONGO_asctime
+#pragma push_macro("gmtime")
+#undef gmtime
#define gmtime MONGO_gmtime
+#pragma push_macro("localtime")
+#undef localtime
#define localtime MONGO_localtime
+#pragma push_macro("ctime")
+#undef ctime
#define ctime MONGO_ctime
// util/debug_util.h
+#pragma push_macro("DEV")
+#undef DEV
#define DEV MONGO_DEV
+#pragma push_macro("DEBUGGING")
+#undef DEBUGGING
#define DEBUGGING MONGO_DEBUGGING
+#pragma push_macro("SOMETIMES")
+#undef SOMETIMES
#define SOMETIMES MONGO_SOMETIMES
+#pragma push_macro("OCCASIONALLY")
+#undef OCCASIONALLY
#define OCCASIONALLY MONGO_OCCASIONALLY
+#pragma push_macro("RARELY")
+#undef RARELY
#define RARELY MONGO_RARELY
+#pragma push_macro("ONCE")
+#undef ONCE
#define ONCE MONGO_ONCE
// util/log.h
+#pragma push_macro("LOG")
+#undef LOG
#define LOG MONGO_LOG
-#undef MONGO_MACROS_CLEANED
-#endif
diff --git a/client/undef_macros.h b/client/undef_macros.h
index 30ece61..c880f06 100644
--- a/client/undef_macros.h
+++ b/client/undef_macros.h
@@ -19,43 +19,65 @@
// #pragma once // this file is intended to be processed multiple times
+#if !defined (MONGO_EXPOSE_MACROS)
-/** MONGO_EXPOSE_MACROS - when defined, indicates that you are compiling a mongo program rather
- than just using the C++ driver.
-*/
-#if !defined(MONGO_EXPOSE_MACROS) && !defined(MONGO_MACROS_CLEANED)
+#ifdef MONGO_MACROS_PUSHED
// util/allocator.h
#undef malloc
+#pragma pop_macro("malloc")
#undef realloc
+#pragma pop_macro("realloc")
// util/assert_util.h
#undef assert
+#pragma pop_macro("assert")
#undef dassert
+#pragma pop_macro("dassert")
#undef wassert
+#pragma pop_macro("wassert")
#undef massert
+#pragma pop_macro("massert")
#undef uassert
+#pragma pop_macro("uassert")
#undef BOOST_CHECK_EXCEPTION
+#undef verify
+#pragma pop_macro("verify")
#undef DESTRUCTOR_GUARD
+#pragma pop_macro("DESTRUCTOR_GUARD")
// util/goodies.h
#undef PRINT
+#pragma pop_macro("PRINT")
#undef PRINTFL
+#pragma pop_macro("PRINTFL")
#undef asctime
+#pragma pop_macro("asctime")
#undef gmtime
+#pragma pop_macro("gmtime")
#undef localtime
+#pragma pop_macro("localtime")
#undef ctime
+#pragma pop_macro("ctime")
// util/debug_util.h
#undef DEV
+#pragma pop_macro("DEV")
#undef DEBUGGING
+#pragma pop_macro("DEBUGGING")
#undef SOMETIMES
+#pragma pop_macro("SOMETIMES")
#undef OCCASIONALLY
+#pragma pop_macro("OCCASIONALLY")
#undef RARELY
+#pragma pop_macro("RARELY")
#undef ONCE
+#pragma pop_macro("ONCE")
// util/log.h
#undef LOG
+#pragma pop_macro("LOG")
-#define MONGO_MACROS_CLEANED
+#undef MONGO_MACROS_PUSHED
+#endif
#endif