summaryrefslogtreecommitdiff
path: root/client/parallel.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'client/parallel.cpp')
-rw-r--r--client/parallel.cpp316
1 files changed, 265 insertions, 51 deletions
diff --git a/client/parallel.cpp b/client/parallel.cpp
index bd29013..eeadb89 100644
--- a/client/parallel.cpp
+++ b/client/parallel.cpp
@@ -16,12 +16,13 @@
*/
-#include "stdafx.h"
+#include "pch.h"
#include "parallel.h"
#include "connpool.h"
#include "../db/queryutil.h"
#include "../db/dbmessage.h"
#include "../s/util.h"
+#include "../s/shard.h"
namespace mongo {
@@ -31,8 +32,13 @@ namespace mongo {
_ns = q.ns;
_query = q.query.copy();
_options = q.queryOptions;
- _fields = q.fields;
+ _fields = q.fields.copy();
+ _batchSize = q.ntoreturn;
+ if ( _batchSize == 1 )
+ _batchSize = 2;
+
_done = false;
+ _didInit = false;
}
ClusteredCursor::ClusteredCursor( const string& ns , const BSONObj& q , int options , const BSONObj& fields ){
@@ -40,37 +46,84 @@ namespace mongo {
_query = q.getOwned();
_options = options;
_fields = fields.getOwned();
+ _batchSize = 0;
+
_done = false;
+ _didInit = false;
}
ClusteredCursor::~ClusteredCursor(){
_done = true; // just in case
}
+
+ void ClusteredCursor::init(){
+ if ( _didInit )
+ return;
+ _didInit = true;
+ _init();
+ }
- auto_ptr<DBClientCursor> ClusteredCursor::query( const string& server , int num , BSONObj extra ){
+ auto_ptr<DBClientCursor> ClusteredCursor::query( const string& server , int num , BSONObj extra , int skipLeft ){
uassert( 10017 , "cursor already done" , ! _done );
+ assert( _didInit );
BSONObj q = _query;
if ( ! extra.isEmpty() ){
q = concatQuery( q , extra );
}
- ScopedDbConnection conn( server );
- checkShardVersion( conn.conn() , _ns );
+ ShardConnection conn( server , _ns );
+
+ if ( conn.setVersion() ){
+ conn.done();
+ throw StaleConfigException( _ns , "ClusteredCursor::query ShardConnection had to change" , true );
+ }
+
+ if ( logLevel >= 5 ){
+ log(5) << "ClusteredCursor::query (" << type() << ") server:" << server
+ << " ns:" << _ns << " query:" << q << " num:" << num
+ << " _fields:" << _fields << " options: " << _options << endl;
+ }
+
+ auto_ptr<DBClientCursor> cursor =
+ conn->query( _ns , q , num , 0 , ( _fields.isEmpty() ? 0 : &_fields ) , _options , _batchSize == 0 ? 0 : _batchSize + skipLeft );
- log(5) << "ClusteredCursor::query server:" << server << " ns:" << _ns << " query:" << q << " num:" << num << " _fields:" << _fields << " options: " << _options << endl;
- auto_ptr<DBClientCursor> cursor = conn->query( _ns.c_str() , q , num , 0 , ( _fields.isEmpty() ? 0 : &_fields ) , _options );
- if ( cursor->hasResultFlag( QueryResult::ResultFlag_ShardConfigStale ) )
+ assert( cursor.get() );
+
+ if ( cursor->hasResultFlag( ResultFlag_ShardConfigStale ) ){
+ conn.done();
throw StaleConfigException( _ns , "ClusteredCursor::query" );
+ }
+
+ if ( cursor->hasResultFlag( ResultFlag_ErrSet ) ){
+ conn.done();
+ BSONObj o = cursor->next();
+ throw UserException( o["code"].numberInt() , o["$err"].String() );
+ }
+
+
+ cursor->attach( &conn );
conn.done();
return cursor;
}
+ BSONObj ClusteredCursor::explain( const string& server , BSONObj extra ){
+ BSONObj q = _query;
+ if ( ! extra.isEmpty() ){
+ q = concatQuery( q , extra );
+ }
+
+ ShardConnection conn( server , _ns );
+ BSONObj o = conn->findOne( _ns , Query( q ).explain() );
+ conn.done();
+ return o;
+ }
+
BSONObj ClusteredCursor::concatQuery( const BSONObj& query , const BSONObj& extraFilter ){
if ( ! query.hasField( "query" ) )
return _concatFilter( query , extraFilter );
-
+
BSONObjBuilder b;
BSONObjIterator i( query );
while ( i.more() ){
@@ -94,6 +147,112 @@ namespace mongo {
// TODO: should do some simplification here if possibl ideally
}
+ BSONObj ClusteredCursor::explain(){
+ BSONObjBuilder b;
+ b.append( "clusteredType" , type() );
+
+ long long nscanned = 0;
+ long long nscannedObjects = 0;
+ long long n = 0;
+ long long millis = 0;
+ double numExplains = 0;
+
+ map<string,list<BSONObj> > out;
+ {
+ _explain( out );
+
+ BSONObjBuilder x( b.subobjStart( "shards" ) );
+ for ( map<string,list<BSONObj> >::iterator i=out.begin(); i!=out.end(); ++i ){
+ string shard = i->first;
+ list<BSONObj> l = i->second;
+ BSONArrayBuilder y( x.subarrayStart( shard.c_str() ) );
+ for ( list<BSONObj>::iterator j=l.begin(); j!=l.end(); ++j ){
+ BSONObj temp = *j;
+ y.append( temp );
+
+ nscanned += temp["nscanned"].numberLong();
+ nscannedObjects += temp["nscannedObjects"].numberLong();
+ n += temp["n"].numberLong();
+ millis += temp["millis"].numberLong();
+ numExplains++;
+ }
+ y.done();
+ }
+ x.done();
+ }
+
+ b.appendNumber( "nscanned" , nscanned );
+ b.appendNumber( "nscannedObjects" , nscannedObjects );
+ b.appendNumber( "n" , n );
+ b.appendNumber( "millisTotal" , millis );
+ b.append( "millisAvg" , (int)((double)millis / numExplains ) );
+ b.append( "numQueries" , (int)numExplains );
+ b.append( "numShards" , (int)out.size() );
+
+ return b.obj();
+ }
+
+ // -------- FilteringClientCursor -----------
+ FilteringClientCursor::FilteringClientCursor( const BSONObj filter )
+ : _matcher( filter ) , _done( true ){
+ }
+
+ FilteringClientCursor::FilteringClientCursor( auto_ptr<DBClientCursor> cursor , const BSONObj filter )
+ : _matcher( filter ) , _cursor( cursor ) , _done( cursor.get() == 0 ){
+ }
+
+ FilteringClientCursor::~FilteringClientCursor(){
+ }
+
+ void FilteringClientCursor::reset( auto_ptr<DBClientCursor> cursor ){
+ _cursor = cursor;
+ _next = BSONObj();
+ _done = _cursor.get() == 0;
+ }
+
+ bool FilteringClientCursor::more(){
+ if ( ! _next.isEmpty() )
+ return true;
+
+ if ( _done )
+ return false;
+
+ _advance();
+ return ! _next.isEmpty();
+ }
+
+ BSONObj FilteringClientCursor::next(){
+ assert( ! _next.isEmpty() );
+ assert( ! _done );
+
+ BSONObj ret = _next;
+ _next = BSONObj();
+ _advance();
+ return ret;
+ }
+
+ BSONObj FilteringClientCursor::peek(){
+ if ( _next.isEmpty() )
+ _advance();
+ return _next;
+ }
+
+ void FilteringClientCursor::_advance(){
+ assert( _next.isEmpty() );
+ if ( ! _cursor.get() || _done )
+ return;
+
+ while ( _cursor->more() ){
+ _next = _cursor->next();
+ if ( _matcher.matches( _next ) ){
+ if ( ! _cursor->moreInCurrentBatch() )
+ _next = _next.getOwned();
+ return;
+ }
+ _next = BSONObj();
+ }
+ _done = true;
+ }
// -------- SerialServerClusteredCursor -----------
@@ -107,10 +266,21 @@ namespace mongo {
sort( _servers.rbegin() , _servers.rend() );
_serverIndex = 0;
+
+ _needToSkip = q.ntoskip;
}
bool SerialServerClusteredCursor::more(){
- if ( _current.get() && _current->more() )
+
+ // TODO: optimize this by sending on first query and then back counting
+ // tricky in case where 1st server doesn't have any after
+ // need it to send n skipped
+ while ( _needToSkip > 0 && _current.more() ){
+ _current.next();
+ _needToSkip--;
+ }
+
+ if ( _current.more() )
return true;
if ( _serverIndex >= _servers.size() ){
@@ -119,17 +289,21 @@ namespace mongo {
ServerAndQuery& sq = _servers[_serverIndex++];
- _current = query( sq._server , 0 , sq._extra );
- if ( _current->more() )
- return true;
-
- // this sq has nothing, so keep looking
+ _current.reset( query( sq._server , 0 , sq._extra ) );
return more();
}
BSONObj SerialServerClusteredCursor::next(){
uassert( 10018 , "no more items" , more() );
- return _current->next();
+ return _current.next();
+ }
+
+ void SerialServerClusteredCursor::_explain( map< string,list<BSONObj> >& out ){
+ for ( unsigned i=0; i<_servers.size(); i++ ){
+ ServerAndQuery& sq = _servers[i];
+ list<BSONObj> & l = out[sq._server];
+ l.push_back( explain( sq._server , sq._extra ) );
+ }
}
// -------- ParallelSortClusteredCursor -----------
@@ -138,7 +312,8 @@ namespace mongo {
const BSONObj& sortKey )
: ClusteredCursor( q ) , _servers( servers ){
_sortKey = sortKey.getOwned();
- _init();
+ _needToSkip = q.ntoskip;
+ _finishCons();
}
ParallelSortClusteredCursor::ParallelSortClusteredCursor( const set<ServerAndQuery>& servers , const string& ns ,
@@ -146,85 +321,123 @@ namespace mongo {
int options , const BSONObj& fields )
: ClusteredCursor( ns , q.obj , options , fields ) , _servers( servers ){
_sortKey = q.getSort().copy();
- _init();
+ _needToSkip = 0;
+ _finishCons();
}
- void ParallelSortClusteredCursor::_init(){
+ void ParallelSortClusteredCursor::_finishCons(){
_numServers = _servers.size();
- _cursors = new auto_ptr<DBClientCursor>[_numServers];
- _nexts = new BSONObj[_numServers];
+ _cursors = 0;
+
+ if ( ! _sortKey.isEmpty() && ! _fields.isEmpty() ){
+ // we need to make sure the sort key is in the project
+ bool isNegative = false;
+ BSONObjBuilder b;
+ {
+ BSONObjIterator i( _fields );
+ while ( i.more() ){
+ BSONElement e = i.next();
+ b.append( e );
+ if ( ! e.trueValue() )
+ isNegative = true;
+ }
+ }
+
+ {
+ BSONObjIterator i( _sortKey );
+ while ( i.more() ){
+ BSONElement e = i.next();
+ BSONElement f = _fields.getField( e.fieldName() );
+ if ( isNegative ){
+ uassert( 13431 , "have to have sort key in projection and removing it" , f.eoo() );
+ }
+ else if ( f.eoo() ){
+ // add to projection
+ b.append( e );
+ }
+ }
+ }
+
+ _fields = b.obj();
+ }
+ }
+
+ void ParallelSortClusteredCursor::_init(){
+ assert( ! _cursors );
+ _cursors = new FilteringClientCursor[_numServers];
// TODO: parellize
int num = 0;
- for ( set<ServerAndQuery>::iterator i = _servers.begin(); i!=_servers.end(); i++ ){
+ for ( set<ServerAndQuery>::iterator i = _servers.begin(); i!=_servers.end(); ++i ){
const ServerAndQuery& sq = *i;
- _cursors[num++] = query( sq._server , 0 , sq._extra );
+ _cursors[num++].reset( query( sq._server , 0 , sq._extra , _needToSkip ) );
}
}
ParallelSortClusteredCursor::~ParallelSortClusteredCursor(){
delete [] _cursors;
- delete [] _nexts;
+ _cursors = 0;
}
bool ParallelSortClusteredCursor::more(){
- for ( int i=0; i<_numServers; i++ ){
- if ( ! _nexts[i].isEmpty() )
- return true;
- if ( _cursors[i].get() && _cursors[i]->more() )
+ if ( _needToSkip > 0 ){
+ int n = _needToSkip;
+ _needToSkip = 0;
+
+ while ( n > 0 && more() ){
+ BSONObj x = next();
+ n--;
+ }
+
+ _needToSkip = n;
+ }
+
+ for ( int i=0; i<_numServers; i++ ){
+ if ( _cursors[i].more() )
return true;
}
return false;
}
BSONObj ParallelSortClusteredCursor::next(){
- advance();
-
BSONObj best = BSONObj();
int bestFrom = -1;
for ( int i=0; i<_numServers; i++){
- if ( _nexts[i].isEmpty() )
+ if ( ! _cursors[i].more() )
continue;
+
+ BSONObj me = _cursors[i].peek();
if ( best.isEmpty() ){
- best = _nexts[i];
+ best = me;
bestFrom = i;
continue;
}
- int comp = best.woSortOrder( _nexts[i] , _sortKey );
+ int comp = best.woSortOrder( me , _sortKey , true );
if ( comp < 0 )
continue;
- best = _nexts[i];
+ best = me;
bestFrom = i;
}
-
+
uassert( 10019 , "no more elements" , ! best.isEmpty() );
- _nexts[bestFrom] = BSONObj();
+ _cursors[bestFrom].next();
return best;
}
- void ParallelSortClusteredCursor::advance(){
- for ( int i=0; i<_numServers; i++ ){
-
- if ( ! _nexts[i].isEmpty() ){
- // already have a good object there
- continue;
- }
-
- if ( ! _cursors[i]->more() ){
- // cursor is dead, oh well
- continue;
- }
-
- _nexts[i] = _cursors[i]->next();
+ void ParallelSortClusteredCursor::_explain( map< string,list<BSONObj> >& out ){
+ for ( set<ServerAndQuery>::iterator i=_servers.begin(); i!=_servers.end(); ++i ){
+ const ServerAndQuery& sq = *i;
+ list<BSONObj> & l = out[sq._server];
+ l.push_back( explain( sq._server , sq._extra ) );
}
-
+
}
// -----------------
@@ -252,6 +465,7 @@ namespace mongo {
ScopedDbConnection conn( res->_server );
res->_ok = conn->runCommand( res->_db , res->_cmd , res->_res );
res->_done = true;
+ conn.done();
}
shared_ptr<Future::CommandResult> Future::spawnCommand( const string& server , const string& db , const BSONObj& cmd ){