diff options
Diffstat (limited to 'client/parallel.cpp')
-rw-r--r-- | client/parallel.cpp | 316 |
1 files changed, 265 insertions, 51 deletions
diff --git a/client/parallel.cpp b/client/parallel.cpp index bd29013..eeadb89 100644 --- a/client/parallel.cpp +++ b/client/parallel.cpp @@ -16,12 +16,13 @@ */ -#include "stdafx.h" +#include "pch.h" #include "parallel.h" #include "connpool.h" #include "../db/queryutil.h" #include "../db/dbmessage.h" #include "../s/util.h" +#include "../s/shard.h" namespace mongo { @@ -31,8 +32,13 @@ namespace mongo { _ns = q.ns; _query = q.query.copy(); _options = q.queryOptions; - _fields = q.fields; + _fields = q.fields.copy(); + _batchSize = q.ntoreturn; + if ( _batchSize == 1 ) + _batchSize = 2; + _done = false; + _didInit = false; } ClusteredCursor::ClusteredCursor( const string& ns , const BSONObj& q , int options , const BSONObj& fields ){ @@ -40,37 +46,84 @@ namespace mongo { _query = q.getOwned(); _options = options; _fields = fields.getOwned(); + _batchSize = 0; + _done = false; + _didInit = false; } ClusteredCursor::~ClusteredCursor(){ _done = true; // just in case } + + void ClusteredCursor::init(){ + if ( _didInit ) + return; + _didInit = true; + _init(); + } - auto_ptr<DBClientCursor> ClusteredCursor::query( const string& server , int num , BSONObj extra ){ + auto_ptr<DBClientCursor> ClusteredCursor::query( const string& server , int num , BSONObj extra , int skipLeft ){ uassert( 10017 , "cursor already done" , ! _done ); + assert( _didInit ); BSONObj q = _query; if ( ! extra.isEmpty() ){ q = concatQuery( q , extra ); } - ScopedDbConnection conn( server ); - checkShardVersion( conn.conn() , _ns ); + ShardConnection conn( server , _ns ); + + if ( conn.setVersion() ){ + conn.done(); + throw StaleConfigException( _ns , "ClusteredCursor::query ShardConnection had to change" , true ); + } + + if ( logLevel >= 5 ){ + log(5) << "ClusteredCursor::query (" << type() << ") server:" << server + << " ns:" << _ns << " query:" << q << " num:" << num + << " _fields:" << _fields << " options: " << _options << endl; + } + + auto_ptr<DBClientCursor> cursor = + conn->query( _ns , q , num , 0 , ( _fields.isEmpty() ? 0 : &_fields ) , _options , _batchSize == 0 ? 0 : _batchSize + skipLeft ); - log(5) << "ClusteredCursor::query server:" << server << " ns:" << _ns << " query:" << q << " num:" << num << " _fields:" << _fields << " options: " << _options << endl; - auto_ptr<DBClientCursor> cursor = conn->query( _ns.c_str() , q , num , 0 , ( _fields.isEmpty() ? 0 : &_fields ) , _options ); - if ( cursor->hasResultFlag( QueryResult::ResultFlag_ShardConfigStale ) ) + assert( cursor.get() ); + + if ( cursor->hasResultFlag( ResultFlag_ShardConfigStale ) ){ + conn.done(); throw StaleConfigException( _ns , "ClusteredCursor::query" ); + } + + if ( cursor->hasResultFlag( ResultFlag_ErrSet ) ){ + conn.done(); + BSONObj o = cursor->next(); + throw UserException( o["code"].numberInt() , o["$err"].String() ); + } + + + cursor->attach( &conn ); conn.done(); return cursor; } + BSONObj ClusteredCursor::explain( const string& server , BSONObj extra ){ + BSONObj q = _query; + if ( ! extra.isEmpty() ){ + q = concatQuery( q , extra ); + } + + ShardConnection conn( server , _ns ); + BSONObj o = conn->findOne( _ns , Query( q ).explain() ); + conn.done(); + return o; + } + BSONObj ClusteredCursor::concatQuery( const BSONObj& query , const BSONObj& extraFilter ){ if ( ! query.hasField( "query" ) ) return _concatFilter( query , extraFilter ); - + BSONObjBuilder b; BSONObjIterator i( query ); while ( i.more() ){ @@ -94,6 +147,112 @@ namespace mongo { // TODO: should do some simplification here if possibl ideally } + BSONObj ClusteredCursor::explain(){ + BSONObjBuilder b; + b.append( "clusteredType" , type() ); + + long long nscanned = 0; + long long nscannedObjects = 0; + long long n = 0; + long long millis = 0; + double numExplains = 0; + + map<string,list<BSONObj> > out; + { + _explain( out ); + + BSONObjBuilder x( b.subobjStart( "shards" ) ); + for ( map<string,list<BSONObj> >::iterator i=out.begin(); i!=out.end(); ++i ){ + string shard = i->first; + list<BSONObj> l = i->second; + BSONArrayBuilder y( x.subarrayStart( shard.c_str() ) ); + for ( list<BSONObj>::iterator j=l.begin(); j!=l.end(); ++j ){ + BSONObj temp = *j; + y.append( temp ); + + nscanned += temp["nscanned"].numberLong(); + nscannedObjects += temp["nscannedObjects"].numberLong(); + n += temp["n"].numberLong(); + millis += temp["millis"].numberLong(); + numExplains++; + } + y.done(); + } + x.done(); + } + + b.appendNumber( "nscanned" , nscanned ); + b.appendNumber( "nscannedObjects" , nscannedObjects ); + b.appendNumber( "n" , n ); + b.appendNumber( "millisTotal" , millis ); + b.append( "millisAvg" , (int)((double)millis / numExplains ) ); + b.append( "numQueries" , (int)numExplains ); + b.append( "numShards" , (int)out.size() ); + + return b.obj(); + } + + // -------- FilteringClientCursor ----------- + FilteringClientCursor::FilteringClientCursor( const BSONObj filter ) + : _matcher( filter ) , _done( true ){ + } + + FilteringClientCursor::FilteringClientCursor( auto_ptr<DBClientCursor> cursor , const BSONObj filter ) + : _matcher( filter ) , _cursor( cursor ) , _done( cursor.get() == 0 ){ + } + + FilteringClientCursor::~FilteringClientCursor(){ + } + + void FilteringClientCursor::reset( auto_ptr<DBClientCursor> cursor ){ + _cursor = cursor; + _next = BSONObj(); + _done = _cursor.get() == 0; + } + + bool FilteringClientCursor::more(){ + if ( ! _next.isEmpty() ) + return true; + + if ( _done ) + return false; + + _advance(); + return ! _next.isEmpty(); + } + + BSONObj FilteringClientCursor::next(){ + assert( ! _next.isEmpty() ); + assert( ! _done ); + + BSONObj ret = _next; + _next = BSONObj(); + _advance(); + return ret; + } + + BSONObj FilteringClientCursor::peek(){ + if ( _next.isEmpty() ) + _advance(); + return _next; + } + + void FilteringClientCursor::_advance(){ + assert( _next.isEmpty() ); + if ( ! _cursor.get() || _done ) + return; + + while ( _cursor->more() ){ + _next = _cursor->next(); + if ( _matcher.matches( _next ) ){ + if ( ! _cursor->moreInCurrentBatch() ) + _next = _next.getOwned(); + return; + } + _next = BSONObj(); + } + _done = true; + } // -------- SerialServerClusteredCursor ----------- @@ -107,10 +266,21 @@ namespace mongo { sort( _servers.rbegin() , _servers.rend() ); _serverIndex = 0; + + _needToSkip = q.ntoskip; } bool SerialServerClusteredCursor::more(){ - if ( _current.get() && _current->more() ) + + // TODO: optimize this by sending on first query and then back counting + // tricky in case where 1st server doesn't have any after + // need it to send n skipped + while ( _needToSkip > 0 && _current.more() ){ + _current.next(); + _needToSkip--; + } + + if ( _current.more() ) return true; if ( _serverIndex >= _servers.size() ){ @@ -119,17 +289,21 @@ namespace mongo { ServerAndQuery& sq = _servers[_serverIndex++]; - _current = query( sq._server , 0 , sq._extra ); - if ( _current->more() ) - return true; - - // this sq has nothing, so keep looking + _current.reset( query( sq._server , 0 , sq._extra ) ); return more(); } BSONObj SerialServerClusteredCursor::next(){ uassert( 10018 , "no more items" , more() ); - return _current->next(); + return _current.next(); + } + + void SerialServerClusteredCursor::_explain( map< string,list<BSONObj> >& out ){ + for ( unsigned i=0; i<_servers.size(); i++ ){ + ServerAndQuery& sq = _servers[i]; + list<BSONObj> & l = out[sq._server]; + l.push_back( explain( sq._server , sq._extra ) ); + } } // -------- ParallelSortClusteredCursor ----------- @@ -138,7 +312,8 @@ namespace mongo { const BSONObj& sortKey ) : ClusteredCursor( q ) , _servers( servers ){ _sortKey = sortKey.getOwned(); - _init(); + _needToSkip = q.ntoskip; + _finishCons(); } ParallelSortClusteredCursor::ParallelSortClusteredCursor( const set<ServerAndQuery>& servers , const string& ns , @@ -146,85 +321,123 @@ namespace mongo { int options , const BSONObj& fields ) : ClusteredCursor( ns , q.obj , options , fields ) , _servers( servers ){ _sortKey = q.getSort().copy(); - _init(); + _needToSkip = 0; + _finishCons(); } - void ParallelSortClusteredCursor::_init(){ + void ParallelSortClusteredCursor::_finishCons(){ _numServers = _servers.size(); - _cursors = new auto_ptr<DBClientCursor>[_numServers]; - _nexts = new BSONObj[_numServers]; + _cursors = 0; + + if ( ! _sortKey.isEmpty() && ! _fields.isEmpty() ){ + // we need to make sure the sort key is in the project + bool isNegative = false; + BSONObjBuilder b; + { + BSONObjIterator i( _fields ); + while ( i.more() ){ + BSONElement e = i.next(); + b.append( e ); + if ( ! e.trueValue() ) + isNegative = true; + } + } + + { + BSONObjIterator i( _sortKey ); + while ( i.more() ){ + BSONElement e = i.next(); + BSONElement f = _fields.getField( e.fieldName() ); + if ( isNegative ){ + uassert( 13431 , "have to have sort key in projection and removing it" , f.eoo() ); + } + else if ( f.eoo() ){ + // add to projection + b.append( e ); + } + } + } + + _fields = b.obj(); + } + } + + void ParallelSortClusteredCursor::_init(){ + assert( ! _cursors ); + _cursors = new FilteringClientCursor[_numServers]; // TODO: parellize int num = 0; - for ( set<ServerAndQuery>::iterator i = _servers.begin(); i!=_servers.end(); i++ ){ + for ( set<ServerAndQuery>::iterator i = _servers.begin(); i!=_servers.end(); ++i ){ const ServerAndQuery& sq = *i; - _cursors[num++] = query( sq._server , 0 , sq._extra ); + _cursors[num++].reset( query( sq._server , 0 , sq._extra , _needToSkip ) ); } } ParallelSortClusteredCursor::~ParallelSortClusteredCursor(){ delete [] _cursors; - delete [] _nexts; + _cursors = 0; } bool ParallelSortClusteredCursor::more(){ - for ( int i=0; i<_numServers; i++ ){ - if ( ! _nexts[i].isEmpty() ) - return true; - if ( _cursors[i].get() && _cursors[i]->more() ) + if ( _needToSkip > 0 ){ + int n = _needToSkip; + _needToSkip = 0; + + while ( n > 0 && more() ){ + BSONObj x = next(); + n--; + } + + _needToSkip = n; + } + + for ( int i=0; i<_numServers; i++ ){ + if ( _cursors[i].more() ) return true; } return false; } BSONObj ParallelSortClusteredCursor::next(){ - advance(); - BSONObj best = BSONObj(); int bestFrom = -1; for ( int i=0; i<_numServers; i++){ - if ( _nexts[i].isEmpty() ) + if ( ! _cursors[i].more() ) continue; + + BSONObj me = _cursors[i].peek(); if ( best.isEmpty() ){ - best = _nexts[i]; + best = me; bestFrom = i; continue; } - int comp = best.woSortOrder( _nexts[i] , _sortKey ); + int comp = best.woSortOrder( me , _sortKey , true ); if ( comp < 0 ) continue; - best = _nexts[i]; + best = me; bestFrom = i; } - + uassert( 10019 , "no more elements" , ! best.isEmpty() ); - _nexts[bestFrom] = BSONObj(); + _cursors[bestFrom].next(); return best; } - void ParallelSortClusteredCursor::advance(){ - for ( int i=0; i<_numServers; i++ ){ - - if ( ! _nexts[i].isEmpty() ){ - // already have a good object there - continue; - } - - if ( ! _cursors[i]->more() ){ - // cursor is dead, oh well - continue; - } - - _nexts[i] = _cursors[i]->next(); + void ParallelSortClusteredCursor::_explain( map< string,list<BSONObj> >& out ){ + for ( set<ServerAndQuery>::iterator i=_servers.begin(); i!=_servers.end(); ++i ){ + const ServerAndQuery& sq = *i; + list<BSONObj> & l = out[sq._server]; + l.push_back( explain( sq._server , sq._extra ) ); } - + } // ----------------- @@ -252,6 +465,7 @@ namespace mongo { ScopedDbConnection conn( res->_server ); res->_ok = conn->runCommand( res->_db , res->_cmd , res->_res ); res->_done = true; + conn.done(); } shared_ptr<Future::CommandResult> Future::spawnCommand( const string& server , const string& db , const BSONObj& cmd ){ |