diff options
author | Antonin Kral <a.kral@bobek.cz> | 2011-09-14 17:08:06 +0200 |
---|---|---|
committer | Antonin Kral <a.kral@bobek.cz> | 2011-09-14 17:08:06 +0200 |
commit | 5d342a758c6095b4d30aba0750b54f13b8916f51 (patch) | |
tree | 762e9aa84781f5e3b96db2c02d356c29cf0217c0 /client/parallel.cpp | |
parent | cbe2d992e9cd1ea66af9fa91df006106775d3073 (diff) | |
download | mongodb-5d342a758c6095b4d30aba0750b54f13b8916f51.tar.gz |
Imported Upstream version 2.0.0
Diffstat (limited to 'client/parallel.cpp')
-rw-r--r-- | client/parallel.cpp | 369 |
1 files changed, 311 insertions, 58 deletions
diff --git a/client/parallel.cpp b/client/parallel.cpp index c4905e3..76b0168 100644 --- a/client/parallel.cpp +++ b/client/parallel.cpp @@ -63,7 +63,20 @@ namespace mongo { _init(); } - auto_ptr<DBClientCursor> ClusteredCursor::query( const string& server , int num , BSONObj extra , int skipLeft ) { + void ClusteredCursor::_checkCursor( DBClientCursor * cursor ) { + assert( cursor ); + + if ( cursor->hasResultFlag( ResultFlag_ShardConfigStale ) ) { + throw StaleConfigException( _ns , "ClusteredCursor::query" ); + } + + if ( cursor->hasResultFlag( ResultFlag_ErrSet ) ) { + BSONObj o = cursor->next(); + throw UserException( o["code"].numberInt() , o["$err"].String() ); + } + } + + auto_ptr<DBClientCursor> ClusteredCursor::query( const string& server , int num , BSONObj extra , int skipLeft , bool lazy ) { uassert( 10017 , "cursor already done" , ! _done ); assert( _didInit ); @@ -80,12 +93,10 @@ namespace mongo { throw StaleConfigException( _ns , "ClusteredCursor::query ShardConnection had to change" , true ); } - if ( logLevel >= 5 ) { - log(5) << "ClusteredCursor::query (" << type() << ") server:" << server - << " ns:" << _ns << " query:" << q << " num:" << num - << " _fields:" << _fields << " options: " << _options << endl; - } - + LOG(5) << "ClusteredCursor::query (" << type() << ") server:" << server + << " ns:" << _ns << " query:" << q << " num:" << num + << " _fields:" << _fields << " options: " << _options << endl; + auto_ptr<DBClientCursor> cursor = conn->query( _ns , q , num , 0 , ( _fields.isEmpty() ? 0 : &_fields ) , _options , _batchSize == 0 ? 0 : _batchSize + skipLeft ); @@ -97,21 +108,9 @@ namespace mongo { massert( 13633 , str::stream() << "error querying server: " << server , cursor.get() ); - if ( cursor->hasResultFlag( ResultFlag_ShardConfigStale ) ) { - conn.done(); - throw StaleConfigException( _ns , "ClusteredCursor::query" ); - } - - if ( cursor->hasResultFlag( ResultFlag_ErrSet ) ) { - conn.done(); - BSONObj o = cursor->next(); - throw UserException( o["code"].numberInt() , o["$err"].String() ); - } - - - cursor->attach( &conn ); - - conn.done(); + cursor->attach( &conn ); // this calls done on conn + assert( ! conn.ok() ); + _checkCursor( cursor.get() ); return cursor; } catch ( SocketException& e ) { @@ -228,6 +227,11 @@ namespace mongo { : _matcher( filter ) , _cursor( cursor ) , _done( cursor.get() == 0 ) { } + FilteringClientCursor::FilteringClientCursor( DBClientCursor* cursor , const BSONObj filter ) + : _matcher( filter ) , _cursor( cursor ) , _done( cursor == 0 ) { + } + + FilteringClientCursor::~FilteringClientCursor() { } @@ -237,6 +241,13 @@ namespace mongo { _done = _cursor.get() == 0; } + void FilteringClientCursor::reset( DBClientCursor* cursor ) { + _cursor.reset( cursor ); + _next = BSONObj(); + _done = cursor == 0; + } + + bool FilteringClientCursor::more() { if ( ! _next.isEmpty() ) return true; @@ -399,17 +410,245 @@ namespace mongo { } } + // TODO: Merge with futures API? We do a lot of error checking here that would be useful elsewhere. void ParallelSortClusteredCursor::_init() { + + // log() << "Starting parallel search..." << endl; + + // make sure we're not already initialized assert( ! _cursors ); _cursors = new FilteringClientCursor[_numServers]; - // TODO: parellize - int num = 0; - for ( set<ServerAndQuery>::iterator i = _servers.begin(); i!=_servers.end(); ++i ) { - const ServerAndQuery& sq = *i; - _cursors[num++].reset( query( sq._server , 0 , sq._extra , _needToSkip ) ); + bool returnPartial = ( _options & QueryOption_PartialResults ); + + vector<ServerAndQuery> queries( _servers.begin(), _servers.end() ); + set<int> retryQueries; + int finishedQueries = 0; + + vector< shared_ptr<ShardConnection> > conns; + vector<string> servers; + + // Since we may get all sorts of errors, record them all as they come and throw them later if necessary + vector<string> staleConfigExs; + vector<string> socketExs; + vector<string> otherExs; + bool allConfigStale = false; + + int retries = -1; + + // Loop through all the queries until we've finished or gotten a socket exception on all of them + // We break early for non-socket exceptions, and socket exceptions if we aren't returning partial results + do { + retries++; + + bool firstPass = retryQueries.size() == 0; + + if( ! firstPass ){ + log() << "retrying " << ( returnPartial ? "(partial) " : "" ) << "parallel connection to "; + for( set<int>::iterator it = retryQueries.begin(); it != retryQueries.end(); ++it ){ + log() << queries[*it]._server << ", "; + } + log() << finishedQueries << " finished queries." << endl; + } + + size_t num = 0; + for ( vector<ServerAndQuery>::iterator it = queries.begin(); it != queries.end(); ++it ) { + size_t i = num++; + + const ServerAndQuery& sq = *it; + + // If we're not retrying this cursor on later passes, continue + if( ! firstPass && retryQueries.find( i ) == retryQueries.end() ) continue; + + // log() << "Querying " << _query << " from " << _ns << " for " << sq._server << endl; + + BSONObj q = _query; + if ( ! sq._extra.isEmpty() ) { + q = concatQuery( q , sq._extra ); + } + + string errLoc = " @ " + sq._server; + + if( firstPass ){ + + // This may be the first time connecting to this shard, if so we can get an error here + try { + conns.push_back( shared_ptr<ShardConnection>( new ShardConnection( sq._server , _ns ) ) ); + } + catch( std::exception& e ){ + socketExs.push_back( e.what() + errLoc ); + if( ! returnPartial ){ + num--; + break; + } + conns.push_back( shared_ptr<ShardConnection>() ); + continue; + } + + servers.push_back( sq._server ); + } + + if ( conns[i]->setVersion() ) { + conns[i]->done(); + staleConfigExs.push_back( StaleConfigException( _ns , "ClusteredCursor::query ShardConnection had to change" , true ).what() + errLoc ); + break; + } + + LOG(5) << "ParallelSortClusteredCursor::init server:" << sq._server << " ns:" << _ns + << " query:" << q << " _fields:" << _fields << " options: " << _options << endl; + + if( ! _cursors[i].raw() ) + _cursors[i].reset( new DBClientCursor( conns[i]->get() , _ns , q , + 0 , // nToReturn + 0 , // nToSkip + _fields.isEmpty() ? 0 : &_fields , // fieldsToReturn + _options , + _batchSize == 0 ? 0 : _batchSize + _needToSkip // batchSize + ) ); + + try{ + _cursors[i].raw()->initLazy( ! firstPass ); + } + catch( SocketException& e ){ + socketExs.push_back( e.what() + errLoc ); + _cursors[i].reset( NULL ); + conns[i]->done(); + if( ! returnPartial ) break; + } + catch( std::exception& e){ + otherExs.push_back( e.what() + errLoc ); + _cursors[i].reset( NULL ); + conns[i]->done(); + break; + } + + } + + // Go through all the potentially started cursors and finish initializing them or log any errors and + // potentially retry + // TODO: Better error classification would make this easier, errors are indicated in all sorts of ways + // here that we need to trap. + for ( size_t i = 0; i < num; i++ ) { + + // log() << "Finishing query for " << cons[i].get()->getHost() << endl; + string errLoc = " @ " + queries[i]._server; + + if( ! _cursors[i].raw() || ( ! firstPass && retryQueries.find( i ) == retryQueries.end() ) ){ + if( conns[i] ) conns[i].get()->done(); + continue; + } + + assert( conns[i] ); + retryQueries.erase( i ); + + bool retry = false; + + try { + + if( ! _cursors[i].raw()->initLazyFinish( retry ) ) { + + warning() << "invalid result from " << conns[i]->getHost() << ( retry ? ", retrying" : "" ) << endl; + _cursors[i].reset( NULL ); + + if( ! retry ){ + socketExs.push_back( str::stream() << "error querying server: " << servers[i] ); + conns[i]->done(); + } + else { + retryQueries.insert( i ); + } + + continue; + } + } + catch ( MsgAssertionException& e ){ + socketExs.push_back( e.what() + errLoc ); + _cursors[i].reset( NULL ); + conns[i]->done(); + continue; + } + catch ( SocketException& e ) { + socketExs.push_back( e.what() + errLoc ); + _cursors[i].reset( NULL ); + conns[i]->done(); + continue; + } + catch( std::exception& e ){ + otherExs.push_back( e.what() + errLoc ); + _cursors[i].reset( NULL ); + conns[i]->done(); + continue; + } + + try { + _cursors[i].raw()->attach( conns[i].get() ); // this calls done on conn + _checkCursor( _cursors[i].raw() ); + + finishedQueries++; + } + catch ( StaleConfigException& e ){ + + // Our stored configuration data is actually stale, we need to reload it + // when we throw our exception + allConfigStale = true; + + staleConfigExs.push_back( e.what() + errLoc ); + _cursors[i].reset( NULL ); + conns[i]->done(); + continue; + } + catch( std::exception& e ){ + otherExs.push_back( e.what() + errLoc ); + _cursors[i].reset( NULL ); + conns[i]->done(); + continue; + } + } + + // Don't exceed our max retries, should not happen + assert( retries < 5 ); + } + while( retryQueries.size() > 0 /* something to retry */ && + ( socketExs.size() == 0 || returnPartial ) /* no conn issues */ && + staleConfigExs.size() == 0 /* no config issues */ && + otherExs.size() == 0 /* no other issues */); + + // Assert that our conns are all closed! + for( vector< shared_ptr<ShardConnection> >::iterator i = conns.begin(); i < conns.end(); ++i ){ + assert( ! (*i) || ! (*i)->ok() ); + } + + // Handle errors we got during initialization. + // If we're returning partial results, we can ignore socketExs, but nothing else + // Log a warning in any case, so we don't lose these messages + bool throwException = ( socketExs.size() > 0 && ! returnPartial ) || staleConfigExs.size() > 0 || otherExs.size() > 0; + + if( socketExs.size() > 0 || staleConfigExs.size() > 0 || otherExs.size() > 0 ) { + + vector<string> errMsgs; + + errMsgs.insert( errMsgs.end(), staleConfigExs.begin(), staleConfigExs.end() ); + errMsgs.insert( errMsgs.end(), otherExs.begin(), otherExs.end() ); + errMsgs.insert( errMsgs.end(), socketExs.begin(), socketExs.end() ); + + stringstream errMsg; + errMsg << "could not initialize cursor across all shards because : "; + for( vector<string>::iterator i = errMsgs.begin(); i != errMsgs.end(); i++ ){ + if( i != errMsgs.begin() ) errMsg << " :: and :: "; + errMsg << *i; + } + + if( throwException && staleConfigExs.size() > 0 ) + throw StaleConfigException( _ns , errMsg.str() , ! allConfigStale ); + else if( throwException ) + throw DBException( errMsg.str(), 14827 ); + else + warning() << errMsg.str() << endl; } + if( retries > 0 ) + log() << "successfully finished parallel query after " << retries << " retries" << endl; + } ParallelSortClusteredCursor::~ParallelSortClusteredCursor() { @@ -451,6 +690,7 @@ namespace mongo { if ( best.isEmpty() ) { best = me; bestFrom = i; + if( _sortKey.isEmpty() ) break; continue; } @@ -481,49 +721,62 @@ namespace mongo { // ---- Future ----- // ----------------- - Future::CommandResult::CommandResult( const string& server , const string& db , const BSONObj& cmd , DBClientBase * conn ) { - _server = server; - _db = db; - _cmd = cmd; - _conn = conn; - _done = false; - } + Future::CommandResult::CommandResult( const string& server , const string& db , const BSONObj& cmd , int options , DBClientBase * conn ) + :_server(server) ,_db(db) , _options(options), _cmd(cmd) ,_conn(conn) ,_done(false) + { + try { + if ( ! _conn ){ + _connHolder.reset( new ScopedDbConnection( _server ) ); + _conn = _connHolder->get(); + } - bool Future::CommandResult::join() { - _thr->join(); - assert( _done ); - return _ok; + if ( _conn->lazySupported() ) { + _cursor.reset( new DBClientCursor(_conn, _db + ".$cmd", _cmd, -1/*limit*/, 0, NULL, _options, 0)); + _cursor->initLazy(); + } + else { + _done = true; // we set _done first because even if there is an error we're done + _ok = _conn->runCommand( db , cmd , _res , options ); + } + } + catch ( std::exception& e ) { + error() << "Future::spawnComand (part 1) exception: " << e.what() << endl; + _ok = false; + _done = true; + } } - void Future::commandThread(shared_ptr<CommandResult> res) { - setThreadName( "future" ); + bool Future::CommandResult::join() { + if (_done) + return _ok; try { - DBClientBase * conn = res->_conn; - - scoped_ptr<ScopedDbConnection> myconn; - if ( ! conn ){ - myconn.reset( new ScopedDbConnection( res->_server ) ); - conn = myconn->get(); - } - - res->_ok = conn->runCommand( res->_db , res->_cmd , res->_res ); + // TODO: Allow retries? + bool retry = false; + bool finished = _cursor->initLazyFinish( retry ); + + // Shouldn't need to communicate with server any more + if ( _connHolder ) + _connHolder->done(); - if ( myconn ) - myconn->done(); + uassert(14812, str::stream() << "Error running command on server: " << _server, finished); + massert(14813, "Command returned nothing", _cursor->more()); + + _res = _cursor->nextSafe(); + _ok = _res["ok"].trueValue(); } catch ( std::exception& e ) { - error() << "Future::commandThread exception: " << e.what() << endl; - res->_ok = false; + error() << "Future::spawnComand (part 2) exception: " << e.what() << endl; + _ok = false; } - res->_done = true; - } - shared_ptr<Future::CommandResult> Future::spawnCommand( const string& server , const string& db , const BSONObj& cmd , DBClientBase * conn ) { - shared_ptr<Future::CommandResult> res (new Future::CommandResult( server , db , cmd , conn )); - res->_thr.reset( new boost::thread( boost::bind(Future::commandThread, res) ) ); + _done = true; + return _ok; + } + shared_ptr<Future::CommandResult> Future::spawnCommand( const string& server , const string& db , const BSONObj& cmd , int options , DBClientBase * conn ) { + shared_ptr<Future::CommandResult> res (new Future::CommandResult( server , db , cmd , options , conn )); return res; } |