diff options
Diffstat (limited to 'db/mr.cpp')
-rw-r--r-- | db/mr.cpp | 153 |
1 files changed, 108 insertions, 45 deletions
@@ -28,6 +28,8 @@ namespace mongo { namespace mr { + typedef vector<BSONObj> BSONList; + class MyCmp { public: MyCmp(){} @@ -38,48 +40,76 @@ namespace mongo { typedef pair<BSONObj,BSONObj> Data; //typedef list< Data > InMemory; - typedef map< BSONObj,list<BSONObj>,MyCmp > InMemory; + typedef map< BSONObj,BSONList,MyCmp > InMemory; - BSONObj reduceValues( list<BSONObj>& values , Scope * s , ScriptingFunction reduce , bool final , ScriptingFunction finalize ){ + BSONObj reduceValues( BSONList& values , Scope * s , ScriptingFunction reduce , bool final , ScriptingFunction finalize ){ uassert( 10074 , "need values" , values.size() ); int sizeEstimate = ( values.size() * values.begin()->getField( "value" ).size() ) + 128; BSONObj key; BSONObjBuilder reduceArgs( sizeEstimate ); - - BSONObjBuilder valueBuilder( sizeEstimate ); - int n = 0; - for ( list<BSONObj>::iterator i=values.begin(); i!=values.end(); i++){ - BSONObj o = *i; - BSONObjIterator j(o); + BSONArrayBuilder * valueBuilder = 0; + + int sizeSoFar = 0; + unsigned n = 0; + for ( ; n<values.size(); n++ ){ + BSONObjIterator j(values[n]); BSONElement keyE = j.next(); if ( n == 0 ){ reduceArgs.append( keyE ); - BSONObjBuilder temp; - temp.append( keyE ); - key = temp.obj(); + key = keyE.wrap(); + valueBuilder = new BSONArrayBuilder( reduceArgs.subarrayStart( "values" ) ); + sizeSoFar = 5 + keyE.size(); } - valueBuilder.appendAs( j.next() , BSONObjBuilder::numStr( n++ ).c_str() ); + + BSONElement ee = j.next(); + + uassert( 13070 , "value to large to reduce" , ee.size() < ( 2 * 1024 * 1024 ) ); + + if ( sizeSoFar + ee.size() > ( 4 * 1024 * 1024 ) ){ + assert( n > 1 ); // if not, inf. loop + break; + } + + valueBuilder->append( ee ); + sizeSoFar += ee.size(); } - - reduceArgs.appendArray( "values" , valueBuilder.obj() ); + assert(valueBuilder); + valueBuilder->done(); + delete valueBuilder; BSONObj args = reduceArgs.obj(); - + s->invokeSafe( reduce , args ); if ( s->type( "return" ) == Array ){ uassert( 10075 , "reduce -> multiple not supported yet",0); return BSONObj(); } + + int endSizeEstimate = key.objsize() + ( args.objsize() / values.size() ); + + if ( n < values.size() ){ + BSONList x; + for ( ; n < values.size(); n++ ){ + x.push_back( values[n] ); + } + BSONObjBuilder temp( endSizeEstimate ); + temp.append( key.firstElement() ); + s->append( temp , "1" , "return" ); + x.push_back( temp.obj() ); + return reduceValues( x , s , reduce , final , finalize ); + } + + if ( finalize ){ - BSONObjBuilder b; + BSONObjBuilder b(endSizeEstimate); b.appendAs( key.firstElement() , "_id" ); s->append( b , "value" , "return" ); s->invokeSafe( finalize , b.obj() ); } - BSONObjBuilder b; + BSONObjBuilder b(endSizeEstimate); b.appendAs( key.firstElement() , final ? "_id" : "0" ); s->append( b , final ? "value" : "1" , "return" ); return b.obj(); @@ -108,8 +138,12 @@ namespace mongo { if ( ! keeptemp && markAsTemp ) cc().addTempCollection( tempLong ); - if ( cmdObj["out"].type() == String ) + replicate = keeptemp; + + if ( cmdObj["out"].type() == String ){ finalShort = cmdObj["out"].valuestr(); + replicate = true; + } else finalShort = tempShort; @@ -123,8 +157,10 @@ namespace mongo { if ( cmdObj["finalize"].type() ){ finalizeCode = cmdObj["finalize"].ascode(); } + checkCodeWScope( "map" , cmdObj ); + checkCodeWScope( "reduce" , cmdObj ); + checkCodeWScope( "finalize" , cmdObj ); - if ( cmdObj["mapparams"].type() == Array ){ mapparams = cmdObj["mapparams"].embeddedObjectUserCheck(); } @@ -151,6 +187,14 @@ namespace mongo { } } + void checkCodeWScope( const char * field , const BSONObj& o ){ + BSONElement e = o[field]; + if ( e.type() != CodeWScope ) + return; + BSONObj x = e.codeWScopeObject(); + uassert( 13035 , (string)"can't use CodeWScope with map/reduce function: " + field , x.isEmpty() ); + } + /** @return number objects in collection */ @@ -171,6 +215,7 @@ namespace mongo { // options bool verbose; bool keeptemp; + bool replicate; // query options @@ -224,12 +269,13 @@ namespace mongo { db.dropCollection( setup.incLong ); writelock l( setup.incLong ); + Client::Context ctx( setup.incLong ); string err; assert( userCreateNS( setup.incLong.c_str() , BSON( "autoIndexId" << 0 ) , err , false ) ); } - void finalReduce( list<BSONObj>& values ){ + void finalReduce( BSONList& values ){ if ( values.size() == 0 ) return; @@ -237,7 +283,11 @@ namespace mongo { BSONObj res = reduceValues( values , scope.get() , reduce , 1 , finalize ); writelock l( setup.tempLong ); - theDataFileMgr.insertAndLog( setup.tempLong.c_str() , res , false ); + Client::Context ctx( setup.incLong ); + if ( setup.replicate ) + theDataFileMgr.insertAndLog( setup.tempLong.c_str() , res , false ); + else + theDataFileMgr.insert( setup.tempLong.c_str() , res , false ); } @@ -272,7 +322,7 @@ namespace mongo { for ( InMemory::iterator i=old->begin(); i!=old->end(); i++ ){ BSONObj key = i->first; - list<BSONObj>& all = i->second; + BSONList& all = i->second; if ( all.size() == 1 ){ // this key has low cardinality, so just write to db @@ -291,13 +341,14 @@ namespace mongo { void dump(){ writelock l(_state.setup.incLong); + Client::Context ctx(_state.setup.incLong); for ( InMemory::iterator i=_temp->begin(); i!=_temp->end(); i++ ){ - list<BSONObj>& all = i->second; + BSONList& all = i->second; if ( all.size() < 1 ) continue; - for ( list<BSONObj>::iterator j=all.begin(); j!=all.end(); j++ ) + for ( BSONList::iterator j=all.begin(); j!=all.end(); j++ ) write( *j ); } _temp->clear(); @@ -306,7 +357,7 @@ namespace mongo { } void insert( const BSONObj& a ){ - list<BSONObj>& all = (*_temp)[a]; + BSONList& all = (*_temp)[a]; all.push_back( a ); _size += a.objsize() + 16; } @@ -343,7 +394,8 @@ namespace mongo { boost::thread_specific_ptr<MRTL> _tlmr; BSONObj fast_emit( const BSONObj& args ){ - uassert( 10077 , "fast_emit takes 2 args" , args.nFields() == 2 ); + uassert( 10077 , "fast_emit takes 2 args" , args.nFields() == 2 ); + uassert( 13069 , "an emit can't be more than 2mb" , args.objsize() < ( 2 * 1024 * 1024 ) ); _tlmr->insert( args ); _tlmr->numEmits++; return BSONObj(); @@ -357,11 +409,14 @@ namespace mongo { virtual void help( stringstream &help ) const { help << "see http://www.mongodb.org/display/DOCS/MapReduce"; } - + virtual LockType locktype(){ return WRITE; } // TODO, READ? bool run(const char *dbname, BSONObj& cmd, string& errmsg, BSONObjBuilder& result, bool fromRepl ){ Timer t; Client::GodScope cg; - MRSetup mr( cc().database()->name , cmd ); + Client& client = cc(); + CurOp * op = client.curop(); + + MRSetup mr( client.database()->name , cmd ); log(1) << "mr ns: " << mr.ns << endl; @@ -385,7 +440,7 @@ namespace mongo { MRTL * mrtl = new MRTL( state ); _tlmr.reset( mrtl ); - ProgressMeter pm( db.count( mr.ns , mr.filter ) ); + ProgressMeter & pm = op->setMessage( "m/r: (1/3) emit phase" , db.count( mr.ns , mr.filter ) ); auto_ptr<DBClientCursor> cursor = db.query( mr.ns , mr.q ); long long mapTime = 0; Timer mt; @@ -405,6 +460,7 @@ namespace mongo { Timer t; mrtl->checkSize(); inReduce += t.micros(); + killCurrentOp.checkForInterrupt(); dbtemprelease temprlease; } pm.hit(); @@ -412,9 +468,10 @@ namespace mongo { if ( mr.limit && num >= mr.limit ) break; } + pm.finished(); - countsBuilder.append( "input" , num ); - countsBuilder.append( "emit" , mrtl->numEmits ); + countsBuilder.appendNumber( "input" , num ); + countsBuilder.appendNumber( "emit" , mrtl->numEmits ); if ( mrtl->numEmits ) shouldHaveData = true; @@ -422,7 +479,7 @@ namespace mongo { timingBuilder.append( "emitLoop" , t.millis() ); // final reduce - + op->setMessage( "m/r: (2/3) final reduce in memory" ); mrtl->reduceInMemory(); mrtl->dump(); @@ -430,16 +487,22 @@ namespace mongo { db.ensureIndex( mr.incLong , sortKey ); BSONObj prev; - list<BSONObj> all; + BSONList all; - ProgressMeter fpm( db.count( mr.incLong ) ); + assert( userCreateNS( mr.tempLong.c_str() , BSONObj() , errmsg , mr.replicate ) ); + + pm = op->setMessage( "m/r: (3/3) final reduce to collection" , db.count( mr.incLong ) ); cursor = db.query( mr.incLong, Query().sort( sortKey ) ); while ( cursor->more() ){ BSONObj o = cursor->next().getOwned(); - + pm.hit(); + if ( o.woSortOrder( prev , sortKey ) == 0 ){ all.push_back( o ); + if ( pm.hits() % 1000 == 0 ){ + dbtemprelease tl; + } continue; } @@ -448,12 +511,11 @@ namespace mongo { all.clear(); prev = o; all.push_back( o ); - fpm.hit(); + killCurrentOp.checkForInterrupt(); dbtemprelease tl; } - state.finalReduce( all ); - + pm.finished(); _tlmr.reset( 0 ); } catch ( ... ){ @@ -471,7 +533,7 @@ namespace mongo { result.append( "result" , mr.finalShort ); result.append( "timeMillis" , t.millis() ); - countsBuilder.append( "output" , finalCount ); + countsBuilder.appendNumber( "output" , finalCount ); if ( mr.verbose ) result.append( "timing" , timingBuilder.obj() ); result.append( "counts" , countsBuilder.obj() ); @@ -493,11 +555,12 @@ namespace mongo { public: MapReduceFinishCommand() : Command( "mapreduce.shardedfinish" ){} virtual bool slaveOk() { return true; } - + + virtual LockType locktype(){ return WRITE; } bool run(const char *ns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){ - dbtemprelease temprlease; // we don't touch the db directly - - string dbname = cc().database()->name; + string dbname = cc().database()->name; // this has to come before dbtemprelease + dbtemprelease temprelease; // we don't touch the db directly + string shardedOutputCollection = cmdObj["shardedOutputCollection"].valuestrsafe(); MRSetup mr( dbname , cmdObj.firstElement().embeddedObjectUserCheck() , false ); @@ -540,14 +603,14 @@ namespace mongo { if ( mr.finalizeCode.size() ) finalizeFunction = s->createFunction( mr.finalizeCode.c_str() ); - list<BSONObj> values; + BSONList values; result.append( "result" , mr.finalShort ); DBDirectClient db; while ( cursor.more() ){ - BSONObj t = cursor.next(); + BSONObj t = cursor.next().getOwned(); if ( values.size() == 0 ){ values.push_back( t ); |