summaryrefslogtreecommitdiff
path: root/client/parallel.cpp
diff options
context:
space:
mode:
authorAntonin Kral <a.kral@bobek.cz>2011-09-14 17:08:06 +0200
committerAntonin Kral <a.kral@bobek.cz>2011-09-14 17:08:06 +0200
commit5d342a758c6095b4d30aba0750b54f13b8916f51 (patch)
tree762e9aa84781f5e3b96db2c02d356c29cf0217c0 /client/parallel.cpp
parentcbe2d992e9cd1ea66af9fa91df006106775d3073 (diff)
downloadmongodb-5d342a758c6095b4d30aba0750b54f13b8916f51.tar.gz
Imported Upstream version 2.0.0
Diffstat (limited to 'client/parallel.cpp')
-rw-r--r--client/parallel.cpp369
1 files changed, 311 insertions, 58 deletions
diff --git a/client/parallel.cpp b/client/parallel.cpp
index c4905e3..76b0168 100644
--- a/client/parallel.cpp
+++ b/client/parallel.cpp
@@ -63,7 +63,20 @@ namespace mongo {
_init();
}
- auto_ptr<DBClientCursor> ClusteredCursor::query( const string& server , int num , BSONObj extra , int skipLeft ) {
+ void ClusteredCursor::_checkCursor( DBClientCursor * cursor ) {
+ assert( cursor );
+
+ if ( cursor->hasResultFlag( ResultFlag_ShardConfigStale ) ) {
+ throw StaleConfigException( _ns , "ClusteredCursor::query" );
+ }
+
+ if ( cursor->hasResultFlag( ResultFlag_ErrSet ) ) {
+ BSONObj o = cursor->next();
+ throw UserException( o["code"].numberInt() , o["$err"].String() );
+ }
+ }
+
+ auto_ptr<DBClientCursor> ClusteredCursor::query( const string& server , int num , BSONObj extra , int skipLeft , bool lazy ) {
uassert( 10017 , "cursor already done" , ! _done );
assert( _didInit );
@@ -80,12 +93,10 @@ namespace mongo {
throw StaleConfigException( _ns , "ClusteredCursor::query ShardConnection had to change" , true );
}
- if ( logLevel >= 5 ) {
- log(5) << "ClusteredCursor::query (" << type() << ") server:" << server
- << " ns:" << _ns << " query:" << q << " num:" << num
- << " _fields:" << _fields << " options: " << _options << endl;
- }
-
+ LOG(5) << "ClusteredCursor::query (" << type() << ") server:" << server
+ << " ns:" << _ns << " query:" << q << " num:" << num
+ << " _fields:" << _fields << " options: " << _options << endl;
+
auto_ptr<DBClientCursor> cursor =
conn->query( _ns , q , num , 0 , ( _fields.isEmpty() ? 0 : &_fields ) , _options , _batchSize == 0 ? 0 : _batchSize + skipLeft );
@@ -97,21 +108,9 @@ namespace mongo {
massert( 13633 , str::stream() << "error querying server: " << server , cursor.get() );
- if ( cursor->hasResultFlag( ResultFlag_ShardConfigStale ) ) {
- conn.done();
- throw StaleConfigException( _ns , "ClusteredCursor::query" );
- }
-
- if ( cursor->hasResultFlag( ResultFlag_ErrSet ) ) {
- conn.done();
- BSONObj o = cursor->next();
- throw UserException( o["code"].numberInt() , o["$err"].String() );
- }
-
-
- cursor->attach( &conn );
-
- conn.done();
+ cursor->attach( &conn ); // this calls done on conn
+ assert( ! conn.ok() );
+ _checkCursor( cursor.get() );
return cursor;
}
catch ( SocketException& e ) {
@@ -228,6 +227,11 @@ namespace mongo {
: _matcher( filter ) , _cursor( cursor ) , _done( cursor.get() == 0 ) {
}
+ FilteringClientCursor::FilteringClientCursor( DBClientCursor* cursor , const BSONObj filter )
+ : _matcher( filter ) , _cursor( cursor ) , _done( cursor == 0 ) {
+ }
+
+
FilteringClientCursor::~FilteringClientCursor() {
}
@@ -237,6 +241,13 @@ namespace mongo {
_done = _cursor.get() == 0;
}
+ void FilteringClientCursor::reset( DBClientCursor* cursor ) {
+ _cursor.reset( cursor );
+ _next = BSONObj();
+ _done = cursor == 0;
+ }
+
+
bool FilteringClientCursor::more() {
if ( ! _next.isEmpty() )
return true;
@@ -399,17 +410,245 @@ namespace mongo {
}
}
+ // TODO: Merge with futures API? We do a lot of error checking here that would be useful elsewhere.
void ParallelSortClusteredCursor::_init() {
+
+ // log() << "Starting parallel search..." << endl;
+
+ // make sure we're not already initialized
assert( ! _cursors );
_cursors = new FilteringClientCursor[_numServers];
- // TODO: parellize
- int num = 0;
- for ( set<ServerAndQuery>::iterator i = _servers.begin(); i!=_servers.end(); ++i ) {
- const ServerAndQuery& sq = *i;
- _cursors[num++].reset( query( sq._server , 0 , sq._extra , _needToSkip ) );
+ bool returnPartial = ( _options & QueryOption_PartialResults );
+
+ vector<ServerAndQuery> queries( _servers.begin(), _servers.end() );
+ set<int> retryQueries;
+ int finishedQueries = 0;
+
+ vector< shared_ptr<ShardConnection> > conns;
+ vector<string> servers;
+
+ // Since we may get all sorts of errors, record them all as they come and throw them later if necessary
+ vector<string> staleConfigExs;
+ vector<string> socketExs;
+ vector<string> otherExs;
+ bool allConfigStale = false;
+
+ int retries = -1;
+
+ // Loop through all the queries until we've finished or gotten a socket exception on all of them
+ // We break early for non-socket exceptions, and socket exceptions if we aren't returning partial results
+ do {
+ retries++;
+
+ bool firstPass = retryQueries.size() == 0;
+
+ if( ! firstPass ){
+ log() << "retrying " << ( returnPartial ? "(partial) " : "" ) << "parallel connection to ";
+ for( set<int>::iterator it = retryQueries.begin(); it != retryQueries.end(); ++it ){
+ log() << queries[*it]._server << ", ";
+ }
+ log() << finishedQueries << " finished queries." << endl;
+ }
+
+ size_t num = 0;
+ for ( vector<ServerAndQuery>::iterator it = queries.begin(); it != queries.end(); ++it ) {
+ size_t i = num++;
+
+ const ServerAndQuery& sq = *it;
+
+ // If we're not retrying this cursor on later passes, continue
+ if( ! firstPass && retryQueries.find( i ) == retryQueries.end() ) continue;
+
+ // log() << "Querying " << _query << " from " << _ns << " for " << sq._server << endl;
+
+ BSONObj q = _query;
+ if ( ! sq._extra.isEmpty() ) {
+ q = concatQuery( q , sq._extra );
+ }
+
+ string errLoc = " @ " + sq._server;
+
+ if( firstPass ){
+
+ // This may be the first time connecting to this shard, if so we can get an error here
+ try {
+ conns.push_back( shared_ptr<ShardConnection>( new ShardConnection( sq._server , _ns ) ) );
+ }
+ catch( std::exception& e ){
+ socketExs.push_back( e.what() + errLoc );
+ if( ! returnPartial ){
+ num--;
+ break;
+ }
+ conns.push_back( shared_ptr<ShardConnection>() );
+ continue;
+ }
+
+ servers.push_back( sq._server );
+ }
+
+ if ( conns[i]->setVersion() ) {
+ conns[i]->done();
+ staleConfigExs.push_back( StaleConfigException( _ns , "ClusteredCursor::query ShardConnection had to change" , true ).what() + errLoc );
+ break;
+ }
+
+ LOG(5) << "ParallelSortClusteredCursor::init server:" << sq._server << " ns:" << _ns
+ << " query:" << q << " _fields:" << _fields << " options: " << _options << endl;
+
+ if( ! _cursors[i].raw() )
+ _cursors[i].reset( new DBClientCursor( conns[i]->get() , _ns , q ,
+ 0 , // nToReturn
+ 0 , // nToSkip
+ _fields.isEmpty() ? 0 : &_fields , // fieldsToReturn
+ _options ,
+ _batchSize == 0 ? 0 : _batchSize + _needToSkip // batchSize
+ ) );
+
+ try{
+ _cursors[i].raw()->initLazy( ! firstPass );
+ }
+ catch( SocketException& e ){
+ socketExs.push_back( e.what() + errLoc );
+ _cursors[i].reset( NULL );
+ conns[i]->done();
+ if( ! returnPartial ) break;
+ }
+ catch( std::exception& e){
+ otherExs.push_back( e.what() + errLoc );
+ _cursors[i].reset( NULL );
+ conns[i]->done();
+ break;
+ }
+
+ }
+
+ // Go through all the potentially started cursors and finish initializing them or log any errors and
+ // potentially retry
+ // TODO: Better error classification would make this easier, errors are indicated in all sorts of ways
+ // here that we need to trap.
+ for ( size_t i = 0; i < num; i++ ) {
+
+ // log() << "Finishing query for " << cons[i].get()->getHost() << endl;
+ string errLoc = " @ " + queries[i]._server;
+
+ if( ! _cursors[i].raw() || ( ! firstPass && retryQueries.find( i ) == retryQueries.end() ) ){
+ if( conns[i] ) conns[i].get()->done();
+ continue;
+ }
+
+ assert( conns[i] );
+ retryQueries.erase( i );
+
+ bool retry = false;
+
+ try {
+
+ if( ! _cursors[i].raw()->initLazyFinish( retry ) ) {
+
+ warning() << "invalid result from " << conns[i]->getHost() << ( retry ? ", retrying" : "" ) << endl;
+ _cursors[i].reset( NULL );
+
+ if( ! retry ){
+ socketExs.push_back( str::stream() << "error querying server: " << servers[i] );
+ conns[i]->done();
+ }
+ else {
+ retryQueries.insert( i );
+ }
+
+ continue;
+ }
+ }
+ catch ( MsgAssertionException& e ){
+ socketExs.push_back( e.what() + errLoc );
+ _cursors[i].reset( NULL );
+ conns[i]->done();
+ continue;
+ }
+ catch ( SocketException& e ) {
+ socketExs.push_back( e.what() + errLoc );
+ _cursors[i].reset( NULL );
+ conns[i]->done();
+ continue;
+ }
+ catch( std::exception& e ){
+ otherExs.push_back( e.what() + errLoc );
+ _cursors[i].reset( NULL );
+ conns[i]->done();
+ continue;
+ }
+
+ try {
+ _cursors[i].raw()->attach( conns[i].get() ); // this calls done on conn
+ _checkCursor( _cursors[i].raw() );
+
+ finishedQueries++;
+ }
+ catch ( StaleConfigException& e ){
+
+ // Our stored configuration data is actually stale, we need to reload it
+ // when we throw our exception
+ allConfigStale = true;
+
+ staleConfigExs.push_back( e.what() + errLoc );
+ _cursors[i].reset( NULL );
+ conns[i]->done();
+ continue;
+ }
+ catch( std::exception& e ){
+ otherExs.push_back( e.what() + errLoc );
+ _cursors[i].reset( NULL );
+ conns[i]->done();
+ continue;
+ }
+ }
+
+ // Don't exceed our max retries, should not happen
+ assert( retries < 5 );
+ }
+ while( retryQueries.size() > 0 /* something to retry */ &&
+ ( socketExs.size() == 0 || returnPartial ) /* no conn issues */ &&
+ staleConfigExs.size() == 0 /* no config issues */ &&
+ otherExs.size() == 0 /* no other issues */);
+
+ // Assert that our conns are all closed!
+ for( vector< shared_ptr<ShardConnection> >::iterator i = conns.begin(); i < conns.end(); ++i ){
+ assert( ! (*i) || ! (*i)->ok() );
+ }
+
+ // Handle errors we got during initialization.
+ // If we're returning partial results, we can ignore socketExs, but nothing else
+ // Log a warning in any case, so we don't lose these messages
+ bool throwException = ( socketExs.size() > 0 && ! returnPartial ) || staleConfigExs.size() > 0 || otherExs.size() > 0;
+
+ if( socketExs.size() > 0 || staleConfigExs.size() > 0 || otherExs.size() > 0 ) {
+
+ vector<string> errMsgs;
+
+ errMsgs.insert( errMsgs.end(), staleConfigExs.begin(), staleConfigExs.end() );
+ errMsgs.insert( errMsgs.end(), otherExs.begin(), otherExs.end() );
+ errMsgs.insert( errMsgs.end(), socketExs.begin(), socketExs.end() );
+
+ stringstream errMsg;
+ errMsg << "could not initialize cursor across all shards because : ";
+ for( vector<string>::iterator i = errMsgs.begin(); i != errMsgs.end(); i++ ){
+ if( i != errMsgs.begin() ) errMsg << " :: and :: ";
+ errMsg << *i;
+ }
+
+ if( throwException && staleConfigExs.size() > 0 )
+ throw StaleConfigException( _ns , errMsg.str() , ! allConfigStale );
+ else if( throwException )
+ throw DBException( errMsg.str(), 14827 );
+ else
+ warning() << errMsg.str() << endl;
}
+ if( retries > 0 )
+ log() << "successfully finished parallel query after " << retries << " retries" << endl;
+
}
ParallelSortClusteredCursor::~ParallelSortClusteredCursor() {
@@ -451,6 +690,7 @@ namespace mongo {
if ( best.isEmpty() ) {
best = me;
bestFrom = i;
+ if( _sortKey.isEmpty() ) break;
continue;
}
@@ -481,49 +721,62 @@ namespace mongo {
// ---- Future -----
// -----------------
- Future::CommandResult::CommandResult( const string& server , const string& db , const BSONObj& cmd , DBClientBase * conn ) {
- _server = server;
- _db = db;
- _cmd = cmd;
- _conn = conn;
- _done = false;
- }
+ Future::CommandResult::CommandResult( const string& server , const string& db , const BSONObj& cmd , int options , DBClientBase * conn )
+ :_server(server) ,_db(db) , _options(options), _cmd(cmd) ,_conn(conn) ,_done(false)
+ {
+ try {
+ if ( ! _conn ){
+ _connHolder.reset( new ScopedDbConnection( _server ) );
+ _conn = _connHolder->get();
+ }
- bool Future::CommandResult::join() {
- _thr->join();
- assert( _done );
- return _ok;
+ if ( _conn->lazySupported() ) {
+ _cursor.reset( new DBClientCursor(_conn, _db + ".$cmd", _cmd, -1/*limit*/, 0, NULL, _options, 0));
+ _cursor->initLazy();
+ }
+ else {
+ _done = true; // we set _done first because even if there is an error we're done
+ _ok = _conn->runCommand( db , cmd , _res , options );
+ }
+ }
+ catch ( std::exception& e ) {
+ error() << "Future::spawnComand (part 1) exception: " << e.what() << endl;
+ _ok = false;
+ _done = true;
+ }
}
- void Future::commandThread(shared_ptr<CommandResult> res) {
- setThreadName( "future" );
+ bool Future::CommandResult::join() {
+ if (_done)
+ return _ok;
try {
- DBClientBase * conn = res->_conn;
-
- scoped_ptr<ScopedDbConnection> myconn;
- if ( ! conn ){
- myconn.reset( new ScopedDbConnection( res->_server ) );
- conn = myconn->get();
- }
-
- res->_ok = conn->runCommand( res->_db , res->_cmd , res->_res );
+ // TODO: Allow retries?
+ bool retry = false;
+ bool finished = _cursor->initLazyFinish( retry );
+
+ // Shouldn't need to communicate with server any more
+ if ( _connHolder )
+ _connHolder->done();
- if ( myconn )
- myconn->done();
+ uassert(14812, str::stream() << "Error running command on server: " << _server, finished);
+ massert(14813, "Command returned nothing", _cursor->more());
+
+ _res = _cursor->nextSafe();
+ _ok = _res["ok"].trueValue();
}
catch ( std::exception& e ) {
- error() << "Future::commandThread exception: " << e.what() << endl;
- res->_ok = false;
+ error() << "Future::spawnComand (part 2) exception: " << e.what() << endl;
+ _ok = false;
}
- res->_done = true;
- }
- shared_ptr<Future::CommandResult> Future::spawnCommand( const string& server , const string& db , const BSONObj& cmd , DBClientBase * conn ) {
- shared_ptr<Future::CommandResult> res (new Future::CommandResult( server , db , cmd , conn ));
- res->_thr.reset( new boost::thread( boost::bind(Future::commandThread, res) ) );
+ _done = true;
+ return _ok;
+ }
+ shared_ptr<Future::CommandResult> Future::spawnCommand( const string& server , const string& db , const BSONObj& cmd , int options , DBClientBase * conn ) {
+ shared_ptr<Future::CommandResult> res (new Future::CommandResult( server , db , cmd , options , conn ));
return res;
}