diff options
Diffstat (limited to 'db/commands/mr.cpp')
-rw-r--r-- | db/commands/mr.cpp | 465 |
1 files changed, 315 insertions, 150 deletions
diff --git a/db/commands/mr.cpp b/db/commands/mr.cpp index b9f5b59..56e9770 100644 --- a/db/commands/mr.cpp +++ b/db/commands/mr.cpp @@ -26,7 +26,7 @@ #include "../queryoptimizer.h" #include "../matcher.h" #include "../clientcursor.h" -#include "../replpair.h" +#include "../replutil.h" #include "../../s/d_chunk_manager.h" #include "../../s/d_logic.h" @@ -53,6 +53,9 @@ namespace mongo { _func = _scope->createFunction( _code.c_str() ); uassert( 13598 , str::stream() << "couldn't compile code for: " << _type , _func ); + + // install in JS scope so that it can be called in JS mode + _scope->setFunction(_type.c_str(), _code.c_str()); } void JSMapper::init( State * state ) { @@ -66,8 +69,7 @@ namespace mongo { void JSMapper::map( const BSONObj& o ) { Scope * s = _func.scope(); assert( s ); - s->setThis( &o ); - if ( s->invoke( _func.func() , _params , 0 , true ) ) + if ( s->invoke( _func.func() , &_params, &o , 0 , true, false, true ) ) throw UserException( 9014, str::stream() << "map invoke failed: " + s->getError() ); } @@ -79,7 +81,7 @@ namespace mongo { Scope * s = _func.scope(); Scope::NoDBAccess no = s->disableDBAccess( "can't access db inside finalize" ); - s->invokeSafe( _func.func() , o ); + s->invokeSafe( _func.func() , &o, 0 ); // don't want to use o.objsize() to size b // since there are many cases where the point of finalize @@ -90,6 +92,10 @@ namespace mongo { return b.obj(); } + void JSReducer::init( State * state ) { + _func.init( state ); + } + /** * Reduces a list of tuple objects (key, value) to a single tuple {"0": key, "1": value} */ @@ -183,7 +189,8 @@ namespace mongo { Scope * s = _func.scope(); - s->invokeSafe( _func.func() , args ); + s->invokeSafe( _func.func() , &args, 0 ); + ++numReduces; if ( s->type( "return" ) == Array ) { uasserted( 10075 , "reduce -> multiple not supported yet"); @@ -214,6 +221,11 @@ namespace mongo { ns = dbname + "." + cmdObj.firstElement().valuestr(); verbose = cmdObj["verbose"].trueValue(); + jsMode = cmdObj["jsMode"].trueValue(); + + jsMaxKeys = 500000; + reduceTriggerRatio = 2.0; + maxInMemSize = 5 * 1024 * 1024; uassert( 13602 , "outType is no longer a valid option" , cmdObj["outType"].eoo() ); @@ -255,7 +267,7 @@ namespace mongo { } if ( outType != INMEMORY ) { // setup names - tempLong = str::stream() << (outDB.empty() ? dbname : outDB) << ".tmp.mr." << cmdObj.firstElement().String() << "_" << finalShort << "_" << JOB_NUMBER++; + tempLong = str::stream() << (outDB.empty() ? dbname : outDB) << ".tmp.mr." << cmdObj.firstElement().String() << "_" << JOB_NUMBER++; incLong = tempLong + "_inc"; @@ -308,10 +320,25 @@ namespace mongo { if ( ! _onDisk ) return; - _db.dropCollection( _config.tempLong ); + if (_config.incLong != _config.tempLong) { + // create the inc collection and make sure we have index on "0" key + _db.dropCollection( _config.incLong ); + { + writelock l( _config.incLong ); + Client::Context ctx( _config.incLong ); + string err; + if ( ! userCreateNS( _config.incLong.c_str() , BSON( "autoIndexId" << 0 ) , err , false ) ) { + uasserted( 13631 , str::stream() << "userCreateNS failed for mr incLong ns: " << _config.incLong << " err: " << err ); + } + } + BSONObj sortKey = BSON( "0" << 1 ); + _db.ensureIndex( _config.incLong , sortKey ); + } + + // create temp collection + _db.dropCollection( _config.tempLong ); { - // create writelock lock( _config.tempLong.c_str() ); Client::Context ctx( _config.tempLong.c_str() ); string errmsg; @@ -320,7 +347,6 @@ namespace mongo { } } - { // copy indexes auto_ptr<DBClientCursor> idx = _db.getIndexes( _config.finalLong ); @@ -355,6 +381,14 @@ namespace mongo { if ( _onDisk ) return; + if (_jsMode) { + ScriptingFunction getResult = _scope->createFunction("var map = _mrMap; var result = []; for (key in map) { result.push({_id: key, value: map[key]}) } return result;"); + _scope->invoke(getResult, 0, 0, 0, false); + BSONObj obj = _scope->getObject("return"); + final.append("results", BSONArray(obj)); + return; + } + uassert( 13604 , "too much data for in memory map/reduce" , _size < ( BSONObjMaxUserSize / 2 ) ); BSONArrayBuilder b( (int)(_size * 1.2) ); // _size is data size, doesn't count overhead and keys @@ -397,8 +431,10 @@ namespace mongo { // replace: just rename from temp to final collection name, dropping previous collection _db.dropCollection( _config.finalLong ); BSONObj info; - uassert( 10076 , "rename failed" , - _db.runCommand( "admin" , BSON( "renameCollection" << _config.tempLong << "to" << _config.finalLong ) , info ) ); + if ( ! _db.runCommand( "admin" , BSON( "renameCollection" << _config.tempLong << "to" << _config.finalLong ) , info ) ) { + uasserted( 10076 , str::stream() << "rename failed: " << info ); + } + _db.dropCollection( _config.tempLong ); } else if ( _config.outType == Config::MERGE ) { @@ -447,7 +483,7 @@ namespace mongo { /** * Insert doc in collection */ - void State::insert( const string& ns , BSONObj& o ) { + void State::insert( const string& ns , const BSONObj& o ) { assert( _onDisk ); writelock l( ns ); @@ -457,6 +493,15 @@ namespace mongo { } /** + * Insert doc into the inc collection, taking proper lock + */ + void State::insertToInc( BSONObj& o ) { + writelock l(_config.incLong); + Client::Context ctx(_config.incLong); + _insertToInc(o); + } + + /** * Insert doc into the inc collection */ void State::_insertToInc( BSONObj& o ) { @@ -465,7 +510,7 @@ namespace mongo { getDur().commitIfNeeded(); } - State::State( const Config& c ) : _config( c ), _size(0), _numEmits(0) { + State::State( const Config& c ) : _config( c ), _size(0), _dupCount(0), _numEmits(0) { _temp.reset( new InMemory() ); _onDisk = _config.outType != Config::INMEMORY; } @@ -488,6 +533,12 @@ namespace mongo { error() << "couldn't cleanup after map reduce: " << e.what() << endl; } } + + if (_scope) { + // cleanup js objects + ScriptingFunction cleanup = _scope->createFunction("delete _emitCt; delete _keyCt; delete _mrMap;"); + _scope->invoke(cleanup, 0, 0, 0, true); + } } /** @@ -505,29 +556,50 @@ namespace mongo { _config.reducer->init( this ); if ( _config.finalizer ) _config.finalizer->init( this ); + _scope->setBoolean("_doFinal", _config.finalizer); + + // by default start in JS mode, will be faster for small jobs + _jsMode = _config.jsMode; +// _jsMode = true; + switchMode(_jsMode); + + // global JS map/reduce hashmap + // we use a standard JS object which means keys are only simple types + // we could also add a real hashmap from a library, still we need to add object comparison methods +// _scope->setObject("_mrMap", BSONObj(), false); + ScriptingFunction init = _scope->createFunction("_emitCt = 0; _keyCt = 0; _dupCt = 0; _redCt = 0; if (typeof(_mrMap) === 'undefined') { _mrMap = {}; }"); + _scope->invoke(init, 0, 0, 0, true); + + // js function to run reduce on all keys +// redfunc = _scope->createFunction("for (var key in hashmap) { print('Key is ' + key); list = hashmap[key]; ret = reduce(key, list); print('Value is ' + ret); };"); + _reduceAll = _scope->createFunction("var map = _mrMap; var list, ret; for (var key in map) { list = map[key]; if (list.length != 1) { ret = _reduce(key, list); map[key] = [ret]; ++_redCt; } } _dupCt = 0;"); + _reduceAndEmit = _scope->createFunction("var map = _mrMap; var list, ret; for (var key in map) { list = map[key]; if (list.length == 1) { ret = list[0]; } else { ret = _reduce(key, list); ++_redCt; } emit(key, ret); }; delete _mrMap;"); + _reduceAndFinalize = _scope->createFunction("var map = _mrMap; var list, ret; for (var key in map) { list = map[key]; if (list.length == 1) { if (!_doFinal) {continue;} ret = list[0]; } else { ret = _reduce(key, list); ++_redCt; }; if (_doFinal){ ret = _finalize(ret); } map[key] = ret; }"); + _reduceAndFinalizeAndInsert = _scope->createFunction("var map = _mrMap; var list, ret; for (var key in map) { list = map[key]; if (list.length == 1) { ret = list[0]; } else { ret = _reduce(key, list); ++_redCt; }; if (_doFinal){ ret = _finalize(ret); } _nativeToTemp({_id: key, value: ret}); }"); - _scope->injectNative( "emit" , fast_emit ); - - if ( _onDisk ) { - // clear temp collections - _db.dropCollection( _config.tempLong ); - _db.dropCollection( _config.incLong ); - - // create the inc collection and make sure we have index on "0" key - { - writelock l( _config.incLong ); - Client::Context ctx( _config.incLong ); - string err; - if ( ! userCreateNS( _config.incLong.c_str() , BSON( "autoIndexId" << 0 ) , err , false ) ) { - uasserted( 13631 , str::stream() << "userCreateNS failed for mr incLong ns: " << _config.incLong << " err: " << err ); - } - } - - BSONObj sortKey = BSON( "0" << 1 ); - _db.ensureIndex( _config.incLong , sortKey ); + } + void State::switchMode(bool jsMode) { + _jsMode = jsMode; + if (jsMode) { + // emit function that stays in JS + _scope->setFunction("emit", "function(key, value) { if (typeof(key) === 'object') { _bailFromJS(key, value); return; }; ++_emitCt; var map = _mrMap; var list = map[key]; if (!list) { ++_keyCt; list = []; map[key] = list; } else { ++_dupCt; } list.push(value); }"); + _scope->injectNative("_bailFromJS", _bailFromJS, this); + } else { + // emit now populates C++ map + _scope->injectNative( "emit" , fast_emit, this ); } + } + + void State::bailFromJS() { + log(1) << "M/R: Switching from JS mode to mixed mode" << endl; + // reduce and reemit into c++ + switchMode(false); + _scope->invoke(_reduceAndEmit, 0, 0, 0, true); + // need to get the real number emitted so far + _numEmits = _scope->getNumberInt("_emitCt"); + _config.reducer->numReduces = _scope->getNumberInt("_redCt"); } /** @@ -542,12 +614,40 @@ namespace mongo { insert( _config.tempLong , res ); } + BSONObj _nativeToTemp( const BSONObj& args, void* data ) { + State* state = (State*) data; + BSONObjIterator it(args); + state->insert(state->_config.tempLong, it.next().Obj()); + return BSONObj(); + } + +// BSONObj _nativeToInc( const BSONObj& args, void* data ) { +// State* state = (State*) data; +// BSONObjIterator it(args); +// const BSONObj& obj = it.next().Obj(); +// state->_insertToInc(const_cast<BSONObj&>(obj)); +// return BSONObj(); +// } + /** * Applies last reduce and finalize. * After calling this method, the temp collection will be completed. * If inline, the results will be in the in memory map */ void State::finalReduce( CurOp * op , ProgressMeterHolder& pm ) { + + if (_jsMode) { + // apply the reduce within JS + if (_onDisk) { + _scope->injectNative("_nativeToTemp", _nativeToTemp, this); + _scope->invoke(_reduceAndFinalizeAndInsert, 0, 0, 0, true); + return; + } else { + _scope->invoke(_reduceAndFinalize, 0, 0, 0, true); + return; + } + } + if ( ! _onDisk ) { // all data has already been reduced, just finalize if ( _config.finalizer ) { @@ -619,8 +719,16 @@ namespace mongo { } ClientCursor::YieldLock yield (cursor.get()); - // reduce an finalize array - finalReduce( all ); + + try { + // reduce a finalize array + finalReduce( all ); + } + catch (...) { + yield.relock(); + cursor.release(); + throw; + } all.clear(); prev = o; @@ -656,9 +764,14 @@ namespace mongo { */ void State::reduceInMemory() { + if (_jsMode) { + // in js mode the reduce is applied when writing to collection + return; + } + auto_ptr<InMemory> n( new InMemory() ); // for new data long nSize = 0; - long dupCount = 0; + _dupCount = 0; for ( InMemory::iterator i=_temp->begin(); i!=_temp->end(); ++i ) { BSONObj key = i->first; @@ -674,20 +787,19 @@ namespace mongo { } else { // add to new map - _add( n.get() , all[0] , nSize, dupCount ); + _add( n.get() , all[0] , nSize ); } } else if ( all.size() > 1 ) { // several values, reduce and add to map BSONObj res = _config.reducer->reduce( all ); - _add( n.get() , res , nSize, dupCount ); + _add( n.get() , res , nSize ); } } // swap maps _temp.reset( n.release() ); _size = nSize; - _dupCount = dupCount; } /** @@ -718,57 +830,87 @@ namespace mongo { */ void State::emit( const BSONObj& a ) { _numEmits++; - _add( _temp.get() , a , _size, _dupCount ); + _add( _temp.get() , a , _size ); } - void State::_add( InMemory* im, const BSONObj& a , long& size, long& dupCount ) { + void State::_add( InMemory* im, const BSONObj& a , long& size ) { BSONList& all = (*im)[a]; all.push_back( a ); size += a.objsize() + 16; if (all.size() > 1) - ++dupCount; + ++_dupCount; } /** * this method checks the size of in memory map and potentially flushes to disk */ void State::checkSize() { - if ( _size < 1024 * 50 ) + if (_jsMode) { + // try to reduce if it is beneficial + int dupCt = _scope->getNumberInt("_dupCt"); + int keyCt = _scope->getNumberInt("_keyCt"); + + if (keyCt > _config.jsMaxKeys) { + // too many keys for JS, switch to mixed + _bailFromJS(BSONObj(), this); + // then fall through to check map size + } else if (dupCt > (keyCt * _config.reduceTriggerRatio)) { + // reduce now to lower mem usage + _scope->invoke(_reduceAll, 0, 0, 0, true); + return; + } + } + + if (_jsMode) return; + bool dump = _onDisk && _size > _config.maxInMemSize; // attempt to reduce in memory map, if we've seen duplicates - if ( _dupCount > 0) { + if ( dump || _dupCount > (_temp->size() * _config.reduceTriggerRatio)) { long before = _size; reduceInMemory(); log(1) << " mr: did reduceInMemory " << before << " -->> " << _size << endl; } - if ( ! _onDisk || _size < 1024 * 100 ) - return; - - dumpToInc(); - log(1) << " mr: dumping to db" << endl; + // reevaluate size and potentially dump + if ( dump && _size > _config.maxInMemSize) { + dumpToInc(); + log(1) << " mr: dumping to db" << endl; + } } - boost::thread_specific_ptr<State*> _tl; - /** * emit that will be called by js function */ - BSONObj fast_emit( const BSONObj& args ) { + BSONObj fast_emit( const BSONObj& args, void* data ) { uassert( 10077 , "fast_emit takes 2 args" , args.nFields() == 2 ); uassert( 13069 , "an emit can't be more than half max bson size" , args.objsize() < ( BSONObjMaxUserSize / 2 ) ); + State* state = (State*) data; if ( args.firstElement().type() == Undefined ) { BSONObjBuilder b( args.objsize() ); b.appendNull( "" ); BSONObjIterator i( args ); i.next(); b.append( i.next() ); - (*_tl)->emit( b.obj() ); + state->emit( b.obj() ); } else { - (*_tl)->emit( args ); + state->emit( args ); + } + return BSONObj(); + } + + /** + * function is called when we realize we cant use js mode for m/r on the 1st key + */ + BSONObj _bailFromJS( const BSONObj& args, void* data ) { + State* state = (State*) data; + state->bailFromJS(); + + // emit this particular key if there is one + if (!args.isEmpty()) { + fast_emit(args, data); } return BSONObj(); } @@ -788,7 +930,7 @@ namespace mongo { help << "http://www.mongodb.org/display/DOCS/MapReduce"; } virtual LockType locktype() const { return NONE; } - bool run(const string& dbname , BSONObj& cmd, string& errmsg, BSONObjBuilder& result, bool fromRepl ) { + bool run(const string& dbname , BSONObj& cmd, int, string& errmsg, BSONObjBuilder& result, bool fromRepl ) { Timer t; Client::GodScope cg; Client& client = cc(); @@ -806,7 +948,6 @@ namespace mongo { BSONObjBuilder countsBuilder; BSONObjBuilder timingBuilder; State state( config ); - if ( ! state.sourceExists() ) { errmsg = "ns doesn't exist"; return false; @@ -823,12 +964,7 @@ namespace mongo { try { state.init(); - - { - State** s = new State*(); - s[0] = &state; - _tl.reset( s ); - } + state.prepTempCollection(); wassert( config.limit < 0x4000000 ); // see case on next line to 32 bit unsigned ProgressMeterHolder pm( op->setMessage( "m/r: (1/3) emit phase" , state.incomingDocuments() ) ); @@ -843,23 +979,26 @@ namespace mongo { } // obtain cursor on data to apply mr to, sorted - shared_ptr<Cursor> temp = bestGuessCursor( config.ns.c_str(), config.filter, config.sort ); + shared_ptr<Cursor> temp = NamespaceDetailsTransient::getCursor( config.ns.c_str(), config.filter, config.sort ); + uassert( 15876, str::stream() << "could not create cursor over " << config.ns << " for query : " << config.filter << " sort : " << config.sort, temp.get() ); auto_ptr<ClientCursor> cursor( new ClientCursor( QueryOption_NoCursorTimeout , temp , config.ns.c_str() ) ); + uassert( 15877, str::stream() << "could not create client cursor over " << config.ns << " for query : " << config.filter << " sort : " << config.sort, cursor.get() ); Timer mt; // go through each doc while ( cursor->ok() ) { - // make sure we dont process duplicates in case data gets moved around during map - if ( cursor->currentIsDup() ) { + if ( ! cursor->currentMatches() ) { cursor->advance(); continue; } - if ( ! cursor->currentMatches() ) { + // make sure we dont process duplicates in case data gets moved around during map + // TODO This won't actually help when data gets moved, it's to handle multikeys. + if ( cursor->currentIsDup() ) { cursor->advance(); continue; } - + BSONObj o = cursor->current(); cursor->advance(); @@ -874,7 +1013,7 @@ namespace mongo { if ( config.verbose ) mapTime += mt.micros(); num++; - if ( num % 100 == 0 ) { + if ( num % 1000 == 0 ) { // try to yield lock regularly ClientCursor::YieldLock yield (cursor.get()); Timer t; @@ -908,19 +1047,31 @@ namespace mongo { timingBuilder.append( "emitLoop" , t.millis() ); op->setMessage( "m/r: (2/3) final reduce in memory" ); + Timer t; // do reduce in memory // this will be the last reduce needed for inline mode state.reduceInMemory(); // if not inline: dump the in memory map to inc collection, all data is on disk state.dumpToInc(); - state.prepTempCollection(); // final reduce state.finalReduce( op , pm ); - - _tl.reset(); + inReduce += t.micros(); + countsBuilder.appendNumber( "reduce" , state.numReduces() ); + timingBuilder.append( "reduceTime" , inReduce / 1000 ); + timingBuilder.append( "mode" , state.jsMode() ? "js" : "mixed" ); + } + // TODO: The error handling code for queries is v. fragile, + // *requires* rethrow AssertionExceptions - should probably fix. + catch ( AssertionException& e ){ + log() << "mr failed, removing collection" << causedBy(e) << endl; + throw e; + } + catch ( std::exception& e ){ + log() << "mr failed, removing collection" << causedBy(e) << endl; + throw e; } catch ( ... ) { - log() << "mr failed, removing collection" << endl; + log() << "mr failed for unknown reason, removing collection" << endl; throw; } @@ -967,113 +1118,127 @@ namespace mongo { virtual bool slaveOverrideOk() { return true; } virtual LockType locktype() const { return NONE; } - bool run(const string& dbname , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool) { + bool run(const string& dbname , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool) { string shardedOutputCollection = cmdObj["shardedOutputCollection"].valuestrsafe(); + string postProcessCollection = cmdObj["postProcessCollection"].valuestrsafe(); + bool postProcessOnly = !(postProcessCollection.empty()); Config config( dbname , cmdObj.firstElement().embeddedObjectUserCheck() ); + State state(config); + state.init(); + if (postProcessOnly) { + // the temp collection has been decided by mongos + config.tempLong = dbname + "." + postProcessCollection; + } + // no need for incremental collection because records are already sorted config.incLong = config.tempLong; - set<ServerAndQuery> servers; - - BSONObjBuilder shardCounts; - map<string,long long> counts; - BSONObj shards = cmdObj["shards"].embeddedObjectUserCheck(); - vector< auto_ptr<DBClientCursor> > shardCursors; - - { - // parse per shard results - BSONObjIterator i( shards ); - while ( i.more() ) { - BSONElement e = i.next(); - string shard = e.fieldName(); - - BSONObj res = e.embeddedObjectUserCheck(); - - uassert( 10078 , "something bad happened" , shardedOutputCollection == res["result"].valuestrsafe() ); - servers.insert( shard ); - shardCounts.appendAs( res["counts"] , shard ); - - BSONObjIterator j( res["counts"].embeddedObjectUserCheck() ); - while ( j.more() ) { - BSONElement temp = j.next(); - counts[temp.fieldName()] += temp.numberLong(); - } + BSONObj shardCounts = cmdObj["shardCounts"].embeddedObjectUserCheck(); + BSONObj counts = cmdObj["counts"].embeddedObjectUserCheck(); + if (postProcessOnly) { + if (!state._db.exists(config.tempLong)) { + // nothing to do + return 1; } + } else { + set<ServerAndQuery> servers; + vector< auto_ptr<DBClientCursor> > shardCursors; - } - - State state(config); - state.prepTempCollection(); + { + // parse per shard results + BSONObjIterator i( shards ); + while ( i.more() ) { + BSONElement e = i.next(); + string shard = e.fieldName(); - { - // reduce from each stream + BSONObj res = e.embeddedObjectUserCheck(); - BSONObj sortKey = BSON( "_id" << 1 ); + uassert( 10078 , "something bad happened" , shardedOutputCollection == res["result"].valuestrsafe() ); + servers.insert( shard ); - ParallelSortClusteredCursor cursor( servers , dbname + "." + shardedOutputCollection , - Query().sort( sortKey ) ); - cursor.init(); - state.init(); + } - BSONList values; - if (!config.outDB.empty()) { - BSONObjBuilder loc; - if ( !config.outDB.empty()) - loc.append( "db" , config.outDB ); - if ( !config.finalShort.empty() ) - loc.append( "collection" , config.finalShort ); - result.append("result", loc.obj()); - } - else { - if ( !config.finalShort.empty() ) - result.append( "result" , config.finalShort ); } - while ( cursor.more() ) { - BSONObj t = cursor.next().getOwned(); + state.prepTempCollection(); - if ( values.size() == 0 ) { - values.push_back( t ); - continue; + { + // reduce from each stream + + BSONObj sortKey = BSON( "_id" << 1 ); + + ParallelSortClusteredCursor cursor( servers , dbname + "." + shardedOutputCollection , + Query().sort( sortKey ) ); + cursor.init(); + + BSONList values; + if (!config.outDB.empty()) { + BSONObjBuilder loc; + if ( !config.outDB.empty()) + loc.append( "db" , config.outDB ); + if ( !config.finalShort.empty() ) + loc.append( "collection" , config.finalShort ); + result.append("result", loc.obj()); } - - if ( t.woSortOrder( *(values.begin()) , sortKey ) == 0 ) { - values.push_back( t ); - continue; + else { + if ( !config.finalShort.empty() ) + result.append( "result" , config.finalShort ); } + while ( cursor.more() || !values.empty() ) { + BSONObj t; + if (cursor.more()) { + t = cursor.next().getOwned(); - state.emit( config.reducer->finalReduce( values , config.finalizer.get() ) ); - values.clear(); - values.push_back( t ); - } + if ( values.size() == 0 ) { + values.push_back( t ); + continue; + } - if ( values.size() ) - state.emit( config.reducer->finalReduce( values , config.finalizer.get() ) ); - } + if ( t.woSortOrder( *(values.begin()) , sortKey ) == 0 ) { + values.push_back( t ); + continue; + } + } + BSONObj res = config.reducer->finalReduce( values , config.finalizer.get()); + if (state.isOnDisk()) + state.insertToInc(res); + else + state.emit(res); + values.clear(); + if (!t.isEmpty()) + values.push_back( t ); + } + } - state.dumpToInc(); - state.postProcessCollection(); - state.appendResults( result ); + for ( set<ServerAndQuery>::iterator i=servers.begin(); i!=servers.end(); i++ ) { + ScopedDbConnection conn( i->_server ); + conn->dropCollection( dbname + "." + shardedOutputCollection ); + conn.done(); + } - for ( set<ServerAndQuery>::iterator i=servers.begin(); i!=servers.end(); i++ ) { - ScopedDbConnection conn( i->_server ); - conn->dropCollection( dbname + "." + shardedOutputCollection ); - conn.done(); + result.append( "shardCounts" , shardCounts ); } - result.append( "shardCounts" , shardCounts.obj() ); + long long finalCount = state.postProcessCollection(); + state.appendResults( result ); - { - BSONObjBuilder c; - for ( map<string,long long>::iterator i=counts.begin(); i!=counts.end(); i++ ) { - c.append( i->first , i->second ); - } - result.append( "counts" , c.obj() ); + // fix the global counts + BSONObjBuilder countsB(32); + BSONObjIterator j(counts); + while (j.more()) { + BSONElement elmt = j.next(); + if (!strcmp(elmt.fieldName(), "reduce")) + countsB.append("reduce", elmt.numberLong() + state.numReduces()); + else if (!strcmp(elmt.fieldName(), "output")) + countsB.append("output", finalCount); + else + countsB.append(elmt); } + result.append( "counts" , countsB.obj() ); return 1; } |