summaryrefslogtreecommitdiff
path: root/db/commands/mr.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'db/commands/mr.cpp')
-rw-r--r--db/commands/mr.cpp465
1 files changed, 315 insertions, 150 deletions
diff --git a/db/commands/mr.cpp b/db/commands/mr.cpp
index b9f5b59..56e9770 100644
--- a/db/commands/mr.cpp
+++ b/db/commands/mr.cpp
@@ -26,7 +26,7 @@
#include "../queryoptimizer.h"
#include "../matcher.h"
#include "../clientcursor.h"
-#include "../replpair.h"
+#include "../replutil.h"
#include "../../s/d_chunk_manager.h"
#include "../../s/d_logic.h"
@@ -53,6 +53,9 @@ namespace mongo {
_func = _scope->createFunction( _code.c_str() );
uassert( 13598 , str::stream() << "couldn't compile code for: " << _type , _func );
+
+ // install in JS scope so that it can be called in JS mode
+ _scope->setFunction(_type.c_str(), _code.c_str());
}
void JSMapper::init( State * state ) {
@@ -66,8 +69,7 @@ namespace mongo {
void JSMapper::map( const BSONObj& o ) {
Scope * s = _func.scope();
assert( s );
- s->setThis( &o );
- if ( s->invoke( _func.func() , _params , 0 , true ) )
+ if ( s->invoke( _func.func() , &_params, &o , 0 , true, false, true ) )
throw UserException( 9014, str::stream() << "map invoke failed: " + s->getError() );
}
@@ -79,7 +81,7 @@ namespace mongo {
Scope * s = _func.scope();
Scope::NoDBAccess no = s->disableDBAccess( "can't access db inside finalize" );
- s->invokeSafe( _func.func() , o );
+ s->invokeSafe( _func.func() , &o, 0 );
// don't want to use o.objsize() to size b
// since there are many cases where the point of finalize
@@ -90,6 +92,10 @@ namespace mongo {
return b.obj();
}
+ void JSReducer::init( State * state ) {
+ _func.init( state );
+ }
+
/**
* Reduces a list of tuple objects (key, value) to a single tuple {"0": key, "1": value}
*/
@@ -183,7 +189,8 @@ namespace mongo {
Scope * s = _func.scope();
- s->invokeSafe( _func.func() , args );
+ s->invokeSafe( _func.func() , &args, 0 );
+ ++numReduces;
if ( s->type( "return" ) == Array ) {
uasserted( 10075 , "reduce -> multiple not supported yet");
@@ -214,6 +221,11 @@ namespace mongo {
ns = dbname + "." + cmdObj.firstElement().valuestr();
verbose = cmdObj["verbose"].trueValue();
+ jsMode = cmdObj["jsMode"].trueValue();
+
+ jsMaxKeys = 500000;
+ reduceTriggerRatio = 2.0;
+ maxInMemSize = 5 * 1024 * 1024;
uassert( 13602 , "outType is no longer a valid option" , cmdObj["outType"].eoo() );
@@ -255,7 +267,7 @@ namespace mongo {
}
if ( outType != INMEMORY ) { // setup names
- tempLong = str::stream() << (outDB.empty() ? dbname : outDB) << ".tmp.mr." << cmdObj.firstElement().String() << "_" << finalShort << "_" << JOB_NUMBER++;
+ tempLong = str::stream() << (outDB.empty() ? dbname : outDB) << ".tmp.mr." << cmdObj.firstElement().String() << "_" << JOB_NUMBER++;
incLong = tempLong + "_inc";
@@ -308,10 +320,25 @@ namespace mongo {
if ( ! _onDisk )
return;
- _db.dropCollection( _config.tempLong );
+ if (_config.incLong != _config.tempLong) {
+ // create the inc collection and make sure we have index on "0" key
+ _db.dropCollection( _config.incLong );
+ {
+ writelock l( _config.incLong );
+ Client::Context ctx( _config.incLong );
+ string err;
+ if ( ! userCreateNS( _config.incLong.c_str() , BSON( "autoIndexId" << 0 ) , err , false ) ) {
+ uasserted( 13631 , str::stream() << "userCreateNS failed for mr incLong ns: " << _config.incLong << " err: " << err );
+ }
+ }
+ BSONObj sortKey = BSON( "0" << 1 );
+ _db.ensureIndex( _config.incLong , sortKey );
+ }
+
+ // create temp collection
+ _db.dropCollection( _config.tempLong );
{
- // create
writelock lock( _config.tempLong.c_str() );
Client::Context ctx( _config.tempLong.c_str() );
string errmsg;
@@ -320,7 +347,6 @@ namespace mongo {
}
}
-
{
// copy indexes
auto_ptr<DBClientCursor> idx = _db.getIndexes( _config.finalLong );
@@ -355,6 +381,14 @@ namespace mongo {
if ( _onDisk )
return;
+ if (_jsMode) {
+ ScriptingFunction getResult = _scope->createFunction("var map = _mrMap; var result = []; for (key in map) { result.push({_id: key, value: map[key]}) } return result;");
+ _scope->invoke(getResult, 0, 0, 0, false);
+ BSONObj obj = _scope->getObject("return");
+ final.append("results", BSONArray(obj));
+ return;
+ }
+
uassert( 13604 , "too much data for in memory map/reduce" , _size < ( BSONObjMaxUserSize / 2 ) );
BSONArrayBuilder b( (int)(_size * 1.2) ); // _size is data size, doesn't count overhead and keys
@@ -397,8 +431,10 @@ namespace mongo {
// replace: just rename from temp to final collection name, dropping previous collection
_db.dropCollection( _config.finalLong );
BSONObj info;
- uassert( 10076 , "rename failed" ,
- _db.runCommand( "admin" , BSON( "renameCollection" << _config.tempLong << "to" << _config.finalLong ) , info ) );
+ if ( ! _db.runCommand( "admin" , BSON( "renameCollection" << _config.tempLong << "to" << _config.finalLong ) , info ) ) {
+ uasserted( 10076 , str::stream() << "rename failed: " << info );
+ }
+
_db.dropCollection( _config.tempLong );
}
else if ( _config.outType == Config::MERGE ) {
@@ -447,7 +483,7 @@ namespace mongo {
/**
* Insert doc in collection
*/
- void State::insert( const string& ns , BSONObj& o ) {
+ void State::insert( const string& ns , const BSONObj& o ) {
assert( _onDisk );
writelock l( ns );
@@ -457,6 +493,15 @@ namespace mongo {
}
/**
+ * Insert doc into the inc collection, taking proper lock
+ */
+ void State::insertToInc( BSONObj& o ) {
+ writelock l(_config.incLong);
+ Client::Context ctx(_config.incLong);
+ _insertToInc(o);
+ }
+
+ /**
* Insert doc into the inc collection
*/
void State::_insertToInc( BSONObj& o ) {
@@ -465,7 +510,7 @@ namespace mongo {
getDur().commitIfNeeded();
}
- State::State( const Config& c ) : _config( c ), _size(0), _numEmits(0) {
+ State::State( const Config& c ) : _config( c ), _size(0), _dupCount(0), _numEmits(0) {
_temp.reset( new InMemory() );
_onDisk = _config.outType != Config::INMEMORY;
}
@@ -488,6 +533,12 @@ namespace mongo {
error() << "couldn't cleanup after map reduce: " << e.what() << endl;
}
}
+
+ if (_scope) {
+ // cleanup js objects
+ ScriptingFunction cleanup = _scope->createFunction("delete _emitCt; delete _keyCt; delete _mrMap;");
+ _scope->invoke(cleanup, 0, 0, 0, true);
+ }
}
/**
@@ -505,29 +556,50 @@ namespace mongo {
_config.reducer->init( this );
if ( _config.finalizer )
_config.finalizer->init( this );
+ _scope->setBoolean("_doFinal", _config.finalizer);
+
+ // by default start in JS mode, will be faster for small jobs
+ _jsMode = _config.jsMode;
+// _jsMode = true;
+ switchMode(_jsMode);
+
+ // global JS map/reduce hashmap
+ // we use a standard JS object which means keys are only simple types
+ // we could also add a real hashmap from a library, still we need to add object comparison methods
+// _scope->setObject("_mrMap", BSONObj(), false);
+ ScriptingFunction init = _scope->createFunction("_emitCt = 0; _keyCt = 0; _dupCt = 0; _redCt = 0; if (typeof(_mrMap) === 'undefined') { _mrMap = {}; }");
+ _scope->invoke(init, 0, 0, 0, true);
+
+ // js function to run reduce on all keys
+// redfunc = _scope->createFunction("for (var key in hashmap) { print('Key is ' + key); list = hashmap[key]; ret = reduce(key, list); print('Value is ' + ret); };");
+ _reduceAll = _scope->createFunction("var map = _mrMap; var list, ret; for (var key in map) { list = map[key]; if (list.length != 1) { ret = _reduce(key, list); map[key] = [ret]; ++_redCt; } } _dupCt = 0;");
+ _reduceAndEmit = _scope->createFunction("var map = _mrMap; var list, ret; for (var key in map) { list = map[key]; if (list.length == 1) { ret = list[0]; } else { ret = _reduce(key, list); ++_redCt; } emit(key, ret); }; delete _mrMap;");
+ _reduceAndFinalize = _scope->createFunction("var map = _mrMap; var list, ret; for (var key in map) { list = map[key]; if (list.length == 1) { if (!_doFinal) {continue;} ret = list[0]; } else { ret = _reduce(key, list); ++_redCt; }; if (_doFinal){ ret = _finalize(ret); } map[key] = ret; }");
+ _reduceAndFinalizeAndInsert = _scope->createFunction("var map = _mrMap; var list, ret; for (var key in map) { list = map[key]; if (list.length == 1) { ret = list[0]; } else { ret = _reduce(key, list); ++_redCt; }; if (_doFinal){ ret = _finalize(ret); } _nativeToTemp({_id: key, value: ret}); }");
- _scope->injectNative( "emit" , fast_emit );
-
- if ( _onDisk ) {
- // clear temp collections
- _db.dropCollection( _config.tempLong );
- _db.dropCollection( _config.incLong );
-
- // create the inc collection and make sure we have index on "0" key
- {
- writelock l( _config.incLong );
- Client::Context ctx( _config.incLong );
- string err;
- if ( ! userCreateNS( _config.incLong.c_str() , BSON( "autoIndexId" << 0 ) , err , false ) ) {
- uasserted( 13631 , str::stream() << "userCreateNS failed for mr incLong ns: " << _config.incLong << " err: " << err );
- }
- }
-
- BSONObj sortKey = BSON( "0" << 1 );
- _db.ensureIndex( _config.incLong , sortKey );
+ }
+ void State::switchMode(bool jsMode) {
+ _jsMode = jsMode;
+ if (jsMode) {
+ // emit function that stays in JS
+ _scope->setFunction("emit", "function(key, value) { if (typeof(key) === 'object') { _bailFromJS(key, value); return; }; ++_emitCt; var map = _mrMap; var list = map[key]; if (!list) { ++_keyCt; list = []; map[key] = list; } else { ++_dupCt; } list.push(value); }");
+ _scope->injectNative("_bailFromJS", _bailFromJS, this);
+ } else {
+ // emit now populates C++ map
+ _scope->injectNative( "emit" , fast_emit, this );
}
+ }
+
+ void State::bailFromJS() {
+ log(1) << "M/R: Switching from JS mode to mixed mode" << endl;
+ // reduce and reemit into c++
+ switchMode(false);
+ _scope->invoke(_reduceAndEmit, 0, 0, 0, true);
+ // need to get the real number emitted so far
+ _numEmits = _scope->getNumberInt("_emitCt");
+ _config.reducer->numReduces = _scope->getNumberInt("_redCt");
}
/**
@@ -542,12 +614,40 @@ namespace mongo {
insert( _config.tempLong , res );
}
+ BSONObj _nativeToTemp( const BSONObj& args, void* data ) {
+ State* state = (State*) data;
+ BSONObjIterator it(args);
+ state->insert(state->_config.tempLong, it.next().Obj());
+ return BSONObj();
+ }
+
+// BSONObj _nativeToInc( const BSONObj& args, void* data ) {
+// State* state = (State*) data;
+// BSONObjIterator it(args);
+// const BSONObj& obj = it.next().Obj();
+// state->_insertToInc(const_cast<BSONObj&>(obj));
+// return BSONObj();
+// }
+
/**
* Applies last reduce and finalize.
* After calling this method, the temp collection will be completed.
* If inline, the results will be in the in memory map
*/
void State::finalReduce( CurOp * op , ProgressMeterHolder& pm ) {
+
+ if (_jsMode) {
+ // apply the reduce within JS
+ if (_onDisk) {
+ _scope->injectNative("_nativeToTemp", _nativeToTemp, this);
+ _scope->invoke(_reduceAndFinalizeAndInsert, 0, 0, 0, true);
+ return;
+ } else {
+ _scope->invoke(_reduceAndFinalize, 0, 0, 0, true);
+ return;
+ }
+ }
+
if ( ! _onDisk ) {
// all data has already been reduced, just finalize
if ( _config.finalizer ) {
@@ -619,8 +719,16 @@ namespace mongo {
}
ClientCursor::YieldLock yield (cursor.get());
- // reduce an finalize array
- finalReduce( all );
+
+ try {
+ // reduce a finalize array
+ finalReduce( all );
+ }
+ catch (...) {
+ yield.relock();
+ cursor.release();
+ throw;
+ }
all.clear();
prev = o;
@@ -656,9 +764,14 @@ namespace mongo {
*/
void State::reduceInMemory() {
+ if (_jsMode) {
+ // in js mode the reduce is applied when writing to collection
+ return;
+ }
+
auto_ptr<InMemory> n( new InMemory() ); // for new data
long nSize = 0;
- long dupCount = 0;
+ _dupCount = 0;
for ( InMemory::iterator i=_temp->begin(); i!=_temp->end(); ++i ) {
BSONObj key = i->first;
@@ -674,20 +787,19 @@ namespace mongo {
}
else {
// add to new map
- _add( n.get() , all[0] , nSize, dupCount );
+ _add( n.get() , all[0] , nSize );
}
}
else if ( all.size() > 1 ) {
// several values, reduce and add to map
BSONObj res = _config.reducer->reduce( all );
- _add( n.get() , res , nSize, dupCount );
+ _add( n.get() , res , nSize );
}
}
// swap maps
_temp.reset( n.release() );
_size = nSize;
- _dupCount = dupCount;
}
/**
@@ -718,57 +830,87 @@ namespace mongo {
*/
void State::emit( const BSONObj& a ) {
_numEmits++;
- _add( _temp.get() , a , _size, _dupCount );
+ _add( _temp.get() , a , _size );
}
- void State::_add( InMemory* im, const BSONObj& a , long& size, long& dupCount ) {
+ void State::_add( InMemory* im, const BSONObj& a , long& size ) {
BSONList& all = (*im)[a];
all.push_back( a );
size += a.objsize() + 16;
if (all.size() > 1)
- ++dupCount;
+ ++_dupCount;
}
/**
* this method checks the size of in memory map and potentially flushes to disk
*/
void State::checkSize() {
- if ( _size < 1024 * 50 )
+ if (_jsMode) {
+ // try to reduce if it is beneficial
+ int dupCt = _scope->getNumberInt("_dupCt");
+ int keyCt = _scope->getNumberInt("_keyCt");
+
+ if (keyCt > _config.jsMaxKeys) {
+ // too many keys for JS, switch to mixed
+ _bailFromJS(BSONObj(), this);
+ // then fall through to check map size
+ } else if (dupCt > (keyCt * _config.reduceTriggerRatio)) {
+ // reduce now to lower mem usage
+ _scope->invoke(_reduceAll, 0, 0, 0, true);
+ return;
+ }
+ }
+
+ if (_jsMode)
return;
+ bool dump = _onDisk && _size > _config.maxInMemSize;
// attempt to reduce in memory map, if we've seen duplicates
- if ( _dupCount > 0) {
+ if ( dump || _dupCount > (_temp->size() * _config.reduceTriggerRatio)) {
long before = _size;
reduceInMemory();
log(1) << " mr: did reduceInMemory " << before << " -->> " << _size << endl;
}
- if ( ! _onDisk || _size < 1024 * 100 )
- return;
-
- dumpToInc();
- log(1) << " mr: dumping to db" << endl;
+ // reevaluate size and potentially dump
+ if ( dump && _size > _config.maxInMemSize) {
+ dumpToInc();
+ log(1) << " mr: dumping to db" << endl;
+ }
}
- boost::thread_specific_ptr<State*> _tl;
-
/**
* emit that will be called by js function
*/
- BSONObj fast_emit( const BSONObj& args ) {
+ BSONObj fast_emit( const BSONObj& args, void* data ) {
uassert( 10077 , "fast_emit takes 2 args" , args.nFields() == 2 );
uassert( 13069 , "an emit can't be more than half max bson size" , args.objsize() < ( BSONObjMaxUserSize / 2 ) );
+ State* state = (State*) data;
if ( args.firstElement().type() == Undefined ) {
BSONObjBuilder b( args.objsize() );
b.appendNull( "" );
BSONObjIterator i( args );
i.next();
b.append( i.next() );
- (*_tl)->emit( b.obj() );
+ state->emit( b.obj() );
}
else {
- (*_tl)->emit( args );
+ state->emit( args );
+ }
+ return BSONObj();
+ }
+
+ /**
+ * function is called when we realize we cant use js mode for m/r on the 1st key
+ */
+ BSONObj _bailFromJS( const BSONObj& args, void* data ) {
+ State* state = (State*) data;
+ state->bailFromJS();
+
+ // emit this particular key if there is one
+ if (!args.isEmpty()) {
+ fast_emit(args, data);
}
return BSONObj();
}
@@ -788,7 +930,7 @@ namespace mongo {
help << "http://www.mongodb.org/display/DOCS/MapReduce";
}
virtual LockType locktype() const { return NONE; }
- bool run(const string& dbname , BSONObj& cmd, string& errmsg, BSONObjBuilder& result, bool fromRepl ) {
+ bool run(const string& dbname , BSONObj& cmd, int, string& errmsg, BSONObjBuilder& result, bool fromRepl ) {
Timer t;
Client::GodScope cg;
Client& client = cc();
@@ -806,7 +948,6 @@ namespace mongo {
BSONObjBuilder countsBuilder;
BSONObjBuilder timingBuilder;
State state( config );
-
if ( ! state.sourceExists() ) {
errmsg = "ns doesn't exist";
return false;
@@ -823,12 +964,7 @@ namespace mongo {
try {
state.init();
-
- {
- State** s = new State*();
- s[0] = &state;
- _tl.reset( s );
- }
+ state.prepTempCollection();
wassert( config.limit < 0x4000000 ); // see case on next line to 32 bit unsigned
ProgressMeterHolder pm( op->setMessage( "m/r: (1/3) emit phase" , state.incomingDocuments() ) );
@@ -843,23 +979,26 @@ namespace mongo {
}
// obtain cursor on data to apply mr to, sorted
- shared_ptr<Cursor> temp = bestGuessCursor( config.ns.c_str(), config.filter, config.sort );
+ shared_ptr<Cursor> temp = NamespaceDetailsTransient::getCursor( config.ns.c_str(), config.filter, config.sort );
+ uassert( 15876, str::stream() << "could not create cursor over " << config.ns << " for query : " << config.filter << " sort : " << config.sort, temp.get() );
auto_ptr<ClientCursor> cursor( new ClientCursor( QueryOption_NoCursorTimeout , temp , config.ns.c_str() ) );
+ uassert( 15877, str::stream() << "could not create client cursor over " << config.ns << " for query : " << config.filter << " sort : " << config.sort, cursor.get() );
Timer mt;
// go through each doc
while ( cursor->ok() ) {
- // make sure we dont process duplicates in case data gets moved around during map
- if ( cursor->currentIsDup() ) {
+ if ( ! cursor->currentMatches() ) {
cursor->advance();
continue;
}
- if ( ! cursor->currentMatches() ) {
+ // make sure we dont process duplicates in case data gets moved around during map
+ // TODO This won't actually help when data gets moved, it's to handle multikeys.
+ if ( cursor->currentIsDup() ) {
cursor->advance();
continue;
}
-
+
BSONObj o = cursor->current();
cursor->advance();
@@ -874,7 +1013,7 @@ namespace mongo {
if ( config.verbose ) mapTime += mt.micros();
num++;
- if ( num % 100 == 0 ) {
+ if ( num % 1000 == 0 ) {
// try to yield lock regularly
ClientCursor::YieldLock yield (cursor.get());
Timer t;
@@ -908,19 +1047,31 @@ namespace mongo {
timingBuilder.append( "emitLoop" , t.millis() );
op->setMessage( "m/r: (2/3) final reduce in memory" );
+ Timer t;
// do reduce in memory
// this will be the last reduce needed for inline mode
state.reduceInMemory();
// if not inline: dump the in memory map to inc collection, all data is on disk
state.dumpToInc();
- state.prepTempCollection();
// final reduce
state.finalReduce( op , pm );
-
- _tl.reset();
+ inReduce += t.micros();
+ countsBuilder.appendNumber( "reduce" , state.numReduces() );
+ timingBuilder.append( "reduceTime" , inReduce / 1000 );
+ timingBuilder.append( "mode" , state.jsMode() ? "js" : "mixed" );
+ }
+ // TODO: The error handling code for queries is v. fragile,
+ // *requires* rethrow AssertionExceptions - should probably fix.
+ catch ( AssertionException& e ){
+ log() << "mr failed, removing collection" << causedBy(e) << endl;
+ throw e;
+ }
+ catch ( std::exception& e ){
+ log() << "mr failed, removing collection" << causedBy(e) << endl;
+ throw e;
}
catch ( ... ) {
- log() << "mr failed, removing collection" << endl;
+ log() << "mr failed for unknown reason, removing collection" << endl;
throw;
}
@@ -967,113 +1118,127 @@ namespace mongo {
virtual bool slaveOverrideOk() { return true; }
virtual LockType locktype() const { return NONE; }
- bool run(const string& dbname , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool) {
+ bool run(const string& dbname , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool) {
string shardedOutputCollection = cmdObj["shardedOutputCollection"].valuestrsafe();
+ string postProcessCollection = cmdObj["postProcessCollection"].valuestrsafe();
+ bool postProcessOnly = !(postProcessCollection.empty());
Config config( dbname , cmdObj.firstElement().embeddedObjectUserCheck() );
+ State state(config);
+ state.init();
+ if (postProcessOnly) {
+ // the temp collection has been decided by mongos
+ config.tempLong = dbname + "." + postProcessCollection;
+ }
+ // no need for incremental collection because records are already sorted
config.incLong = config.tempLong;
- set<ServerAndQuery> servers;
-
- BSONObjBuilder shardCounts;
- map<string,long long> counts;
-
BSONObj shards = cmdObj["shards"].embeddedObjectUserCheck();
- vector< auto_ptr<DBClientCursor> > shardCursors;
-
- {
- // parse per shard results
- BSONObjIterator i( shards );
- while ( i.more() ) {
- BSONElement e = i.next();
- string shard = e.fieldName();
-
- BSONObj res = e.embeddedObjectUserCheck();
-
- uassert( 10078 , "something bad happened" , shardedOutputCollection == res["result"].valuestrsafe() );
- servers.insert( shard );
- shardCounts.appendAs( res["counts"] , shard );
-
- BSONObjIterator j( res["counts"].embeddedObjectUserCheck() );
- while ( j.more() ) {
- BSONElement temp = j.next();
- counts[temp.fieldName()] += temp.numberLong();
- }
+ BSONObj shardCounts = cmdObj["shardCounts"].embeddedObjectUserCheck();
+ BSONObj counts = cmdObj["counts"].embeddedObjectUserCheck();
+ if (postProcessOnly) {
+ if (!state._db.exists(config.tempLong)) {
+ // nothing to do
+ return 1;
}
+ } else {
+ set<ServerAndQuery> servers;
+ vector< auto_ptr<DBClientCursor> > shardCursors;
- }
-
- State state(config);
- state.prepTempCollection();
+ {
+ // parse per shard results
+ BSONObjIterator i( shards );
+ while ( i.more() ) {
+ BSONElement e = i.next();
+ string shard = e.fieldName();
- {
- // reduce from each stream
+ BSONObj res = e.embeddedObjectUserCheck();
- BSONObj sortKey = BSON( "_id" << 1 );
+ uassert( 10078 , "something bad happened" , shardedOutputCollection == res["result"].valuestrsafe() );
+ servers.insert( shard );
- ParallelSortClusteredCursor cursor( servers , dbname + "." + shardedOutputCollection ,
- Query().sort( sortKey ) );
- cursor.init();
- state.init();
+ }
- BSONList values;
- if (!config.outDB.empty()) {
- BSONObjBuilder loc;
- if ( !config.outDB.empty())
- loc.append( "db" , config.outDB );
- if ( !config.finalShort.empty() )
- loc.append( "collection" , config.finalShort );
- result.append("result", loc.obj());
- }
- else {
- if ( !config.finalShort.empty() )
- result.append( "result" , config.finalShort );
}
- while ( cursor.more() ) {
- BSONObj t = cursor.next().getOwned();
+ state.prepTempCollection();
- if ( values.size() == 0 ) {
- values.push_back( t );
- continue;
+ {
+ // reduce from each stream
+
+ BSONObj sortKey = BSON( "_id" << 1 );
+
+ ParallelSortClusteredCursor cursor( servers , dbname + "." + shardedOutputCollection ,
+ Query().sort( sortKey ) );
+ cursor.init();
+
+ BSONList values;
+ if (!config.outDB.empty()) {
+ BSONObjBuilder loc;
+ if ( !config.outDB.empty())
+ loc.append( "db" , config.outDB );
+ if ( !config.finalShort.empty() )
+ loc.append( "collection" , config.finalShort );
+ result.append("result", loc.obj());
}
-
- if ( t.woSortOrder( *(values.begin()) , sortKey ) == 0 ) {
- values.push_back( t );
- continue;
+ else {
+ if ( !config.finalShort.empty() )
+ result.append( "result" , config.finalShort );
}
+ while ( cursor.more() || !values.empty() ) {
+ BSONObj t;
+ if (cursor.more()) {
+ t = cursor.next().getOwned();
- state.emit( config.reducer->finalReduce( values , config.finalizer.get() ) );
- values.clear();
- values.push_back( t );
- }
+ if ( values.size() == 0 ) {
+ values.push_back( t );
+ continue;
+ }
- if ( values.size() )
- state.emit( config.reducer->finalReduce( values , config.finalizer.get() ) );
- }
+ if ( t.woSortOrder( *(values.begin()) , sortKey ) == 0 ) {
+ values.push_back( t );
+ continue;
+ }
+ }
+ BSONObj res = config.reducer->finalReduce( values , config.finalizer.get());
+ if (state.isOnDisk())
+ state.insertToInc(res);
+ else
+ state.emit(res);
+ values.clear();
+ if (!t.isEmpty())
+ values.push_back( t );
+ }
+ }
- state.dumpToInc();
- state.postProcessCollection();
- state.appendResults( result );
+ for ( set<ServerAndQuery>::iterator i=servers.begin(); i!=servers.end(); i++ ) {
+ ScopedDbConnection conn( i->_server );
+ conn->dropCollection( dbname + "." + shardedOutputCollection );
+ conn.done();
+ }
- for ( set<ServerAndQuery>::iterator i=servers.begin(); i!=servers.end(); i++ ) {
- ScopedDbConnection conn( i->_server );
- conn->dropCollection( dbname + "." + shardedOutputCollection );
- conn.done();
+ result.append( "shardCounts" , shardCounts );
}
- result.append( "shardCounts" , shardCounts.obj() );
+ long long finalCount = state.postProcessCollection();
+ state.appendResults( result );
- {
- BSONObjBuilder c;
- for ( map<string,long long>::iterator i=counts.begin(); i!=counts.end(); i++ ) {
- c.append( i->first , i->second );
- }
- result.append( "counts" , c.obj() );
+ // fix the global counts
+ BSONObjBuilder countsB(32);
+ BSONObjIterator j(counts);
+ while (j.more()) {
+ BSONElement elmt = j.next();
+ if (!strcmp(elmt.fieldName(), "reduce"))
+ countsB.append("reduce", elmt.numberLong() + state.numReduces());
+ else if (!strcmp(elmt.fieldName(), "output"))
+ countsB.append("output", finalCount);
+ else
+ countsB.append(elmt);
}
+ result.append( "counts" , countsB.obj() );
return 1;
}