summaryrefslogtreecommitdiff
path: root/db/mr.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'db/mr.cpp')
-rw-r--r--db/mr.cpp153
1 files changed, 108 insertions, 45 deletions
diff --git a/db/mr.cpp b/db/mr.cpp
index ff88d9e..210dfca 100644
--- a/db/mr.cpp
+++ b/db/mr.cpp
@@ -28,6 +28,8 @@ namespace mongo {
namespace mr {
+ typedef vector<BSONObj> BSONList;
+
class MyCmp {
public:
MyCmp(){}
@@ -38,48 +40,76 @@ namespace mongo {
typedef pair<BSONObj,BSONObj> Data;
//typedef list< Data > InMemory;
- typedef map< BSONObj,list<BSONObj>,MyCmp > InMemory;
+ typedef map< BSONObj,BSONList,MyCmp > InMemory;
- BSONObj reduceValues( list<BSONObj>& values , Scope * s , ScriptingFunction reduce , bool final , ScriptingFunction finalize ){
+ BSONObj reduceValues( BSONList& values , Scope * s , ScriptingFunction reduce , bool final , ScriptingFunction finalize ){
uassert( 10074 , "need values" , values.size() );
int sizeEstimate = ( values.size() * values.begin()->getField( "value" ).size() ) + 128;
BSONObj key;
BSONObjBuilder reduceArgs( sizeEstimate );
-
- BSONObjBuilder valueBuilder( sizeEstimate );
- int n = 0;
- for ( list<BSONObj>::iterator i=values.begin(); i!=values.end(); i++){
- BSONObj o = *i;
- BSONObjIterator j(o);
+ BSONArrayBuilder * valueBuilder = 0;
+
+ int sizeSoFar = 0;
+ unsigned n = 0;
+ for ( ; n<values.size(); n++ ){
+ BSONObjIterator j(values[n]);
BSONElement keyE = j.next();
if ( n == 0 ){
reduceArgs.append( keyE );
- BSONObjBuilder temp;
- temp.append( keyE );
- key = temp.obj();
+ key = keyE.wrap();
+ valueBuilder = new BSONArrayBuilder( reduceArgs.subarrayStart( "values" ) );
+ sizeSoFar = 5 + keyE.size();
}
- valueBuilder.appendAs( j.next() , BSONObjBuilder::numStr( n++ ).c_str() );
+
+ BSONElement ee = j.next();
+
+ uassert( 13070 , "value to large to reduce" , ee.size() < ( 2 * 1024 * 1024 ) );
+
+ if ( sizeSoFar + ee.size() > ( 4 * 1024 * 1024 ) ){
+ assert( n > 1 ); // if not, inf. loop
+ break;
+ }
+
+ valueBuilder->append( ee );
+ sizeSoFar += ee.size();
}
-
- reduceArgs.appendArray( "values" , valueBuilder.obj() );
+ assert(valueBuilder);
+ valueBuilder->done();
+ delete valueBuilder;
BSONObj args = reduceArgs.obj();
-
+
s->invokeSafe( reduce , args );
if ( s->type( "return" ) == Array ){
uassert( 10075 , "reduce -> multiple not supported yet",0);
return BSONObj();
}
+
+ int endSizeEstimate = key.objsize() + ( args.objsize() / values.size() );
+
+ if ( n < values.size() ){
+ BSONList x;
+ for ( ; n < values.size(); n++ ){
+ x.push_back( values[n] );
+ }
+ BSONObjBuilder temp( endSizeEstimate );
+ temp.append( key.firstElement() );
+ s->append( temp , "1" , "return" );
+ x.push_back( temp.obj() );
+ return reduceValues( x , s , reduce , final , finalize );
+ }
+
+
if ( finalize ){
- BSONObjBuilder b;
+ BSONObjBuilder b(endSizeEstimate);
b.appendAs( key.firstElement() , "_id" );
s->append( b , "value" , "return" );
s->invokeSafe( finalize , b.obj() );
}
- BSONObjBuilder b;
+ BSONObjBuilder b(endSizeEstimate);
b.appendAs( key.firstElement() , final ? "_id" : "0" );
s->append( b , final ? "value" : "1" , "return" );
return b.obj();
@@ -108,8 +138,12 @@ namespace mongo {
if ( ! keeptemp && markAsTemp )
cc().addTempCollection( tempLong );
- if ( cmdObj["out"].type() == String )
+ replicate = keeptemp;
+
+ if ( cmdObj["out"].type() == String ){
finalShort = cmdObj["out"].valuestr();
+ replicate = true;
+ }
else
finalShort = tempShort;
@@ -123,8 +157,10 @@ namespace mongo {
if ( cmdObj["finalize"].type() ){
finalizeCode = cmdObj["finalize"].ascode();
}
+ checkCodeWScope( "map" , cmdObj );
+ checkCodeWScope( "reduce" , cmdObj );
+ checkCodeWScope( "finalize" , cmdObj );
-
if ( cmdObj["mapparams"].type() == Array ){
mapparams = cmdObj["mapparams"].embeddedObjectUserCheck();
}
@@ -151,6 +187,14 @@ namespace mongo {
}
}
+ void checkCodeWScope( const char * field , const BSONObj& o ){
+ BSONElement e = o[field];
+ if ( e.type() != CodeWScope )
+ return;
+ BSONObj x = e.codeWScopeObject();
+ uassert( 13035 , (string)"can't use CodeWScope with map/reduce function: " + field , x.isEmpty() );
+ }
+
/**
@return number objects in collection
*/
@@ -171,6 +215,7 @@ namespace mongo {
// options
bool verbose;
bool keeptemp;
+ bool replicate;
// query options
@@ -224,12 +269,13 @@ namespace mongo {
db.dropCollection( setup.incLong );
writelock l( setup.incLong );
+ Client::Context ctx( setup.incLong );
string err;
assert( userCreateNS( setup.incLong.c_str() , BSON( "autoIndexId" << 0 ) , err , false ) );
}
- void finalReduce( list<BSONObj>& values ){
+ void finalReduce( BSONList& values ){
if ( values.size() == 0 )
return;
@@ -237,7 +283,11 @@ namespace mongo {
BSONObj res = reduceValues( values , scope.get() , reduce , 1 , finalize );
writelock l( setup.tempLong );
- theDataFileMgr.insertAndLog( setup.tempLong.c_str() , res , false );
+ Client::Context ctx( setup.incLong );
+ if ( setup.replicate )
+ theDataFileMgr.insertAndLog( setup.tempLong.c_str() , res , false );
+ else
+ theDataFileMgr.insert( setup.tempLong.c_str() , res , false );
}
@@ -272,7 +322,7 @@ namespace mongo {
for ( InMemory::iterator i=old->begin(); i!=old->end(); i++ ){
BSONObj key = i->first;
- list<BSONObj>& all = i->second;
+ BSONList& all = i->second;
if ( all.size() == 1 ){
// this key has low cardinality, so just write to db
@@ -291,13 +341,14 @@ namespace mongo {
void dump(){
writelock l(_state.setup.incLong);
+ Client::Context ctx(_state.setup.incLong);
for ( InMemory::iterator i=_temp->begin(); i!=_temp->end(); i++ ){
- list<BSONObj>& all = i->second;
+ BSONList& all = i->second;
if ( all.size() < 1 )
continue;
- for ( list<BSONObj>::iterator j=all.begin(); j!=all.end(); j++ )
+ for ( BSONList::iterator j=all.begin(); j!=all.end(); j++ )
write( *j );
}
_temp->clear();
@@ -306,7 +357,7 @@ namespace mongo {
}
void insert( const BSONObj& a ){
- list<BSONObj>& all = (*_temp)[a];
+ BSONList& all = (*_temp)[a];
all.push_back( a );
_size += a.objsize() + 16;
}
@@ -343,7 +394,8 @@ namespace mongo {
boost::thread_specific_ptr<MRTL> _tlmr;
BSONObj fast_emit( const BSONObj& args ){
- uassert( 10077 , "fast_emit takes 2 args" , args.nFields() == 2 );
+ uassert( 10077 , "fast_emit takes 2 args" , args.nFields() == 2 );
+ uassert( 13069 , "an emit can't be more than 2mb" , args.objsize() < ( 2 * 1024 * 1024 ) );
_tlmr->insert( args );
_tlmr->numEmits++;
return BSONObj();
@@ -357,11 +409,14 @@ namespace mongo {
virtual void help( stringstream &help ) const {
help << "see http://www.mongodb.org/display/DOCS/MapReduce";
}
-
+ virtual LockType locktype(){ return WRITE; } // TODO, READ?
bool run(const char *dbname, BSONObj& cmd, string& errmsg, BSONObjBuilder& result, bool fromRepl ){
Timer t;
Client::GodScope cg;
- MRSetup mr( cc().database()->name , cmd );
+ Client& client = cc();
+ CurOp * op = client.curop();
+
+ MRSetup mr( client.database()->name , cmd );
log(1) << "mr ns: " << mr.ns << endl;
@@ -385,7 +440,7 @@ namespace mongo {
MRTL * mrtl = new MRTL( state );
_tlmr.reset( mrtl );
- ProgressMeter pm( db.count( mr.ns , mr.filter ) );
+ ProgressMeter & pm = op->setMessage( "m/r: (1/3) emit phase" , db.count( mr.ns , mr.filter ) );
auto_ptr<DBClientCursor> cursor = db.query( mr.ns , mr.q );
long long mapTime = 0;
Timer mt;
@@ -405,6 +460,7 @@ namespace mongo {
Timer t;
mrtl->checkSize();
inReduce += t.micros();
+ killCurrentOp.checkForInterrupt();
dbtemprelease temprlease;
}
pm.hit();
@@ -412,9 +468,10 @@ namespace mongo {
if ( mr.limit && num >= mr.limit )
break;
}
+ pm.finished();
- countsBuilder.append( "input" , num );
- countsBuilder.append( "emit" , mrtl->numEmits );
+ countsBuilder.appendNumber( "input" , num );
+ countsBuilder.appendNumber( "emit" , mrtl->numEmits );
if ( mrtl->numEmits )
shouldHaveData = true;
@@ -422,7 +479,7 @@ namespace mongo {
timingBuilder.append( "emitLoop" , t.millis() );
// final reduce
-
+ op->setMessage( "m/r: (2/3) final reduce in memory" );
mrtl->reduceInMemory();
mrtl->dump();
@@ -430,16 +487,22 @@ namespace mongo {
db.ensureIndex( mr.incLong , sortKey );
BSONObj prev;
- list<BSONObj> all;
+ BSONList all;
- ProgressMeter fpm( db.count( mr.incLong ) );
+ assert( userCreateNS( mr.tempLong.c_str() , BSONObj() , errmsg , mr.replicate ) );
+
+ pm = op->setMessage( "m/r: (3/3) final reduce to collection" , db.count( mr.incLong ) );
cursor = db.query( mr.incLong, Query().sort( sortKey ) );
while ( cursor->more() ){
BSONObj o = cursor->next().getOwned();
-
+ pm.hit();
+
if ( o.woSortOrder( prev , sortKey ) == 0 ){
all.push_back( o );
+ if ( pm.hits() % 1000 == 0 ){
+ dbtemprelease tl;
+ }
continue;
}
@@ -448,12 +511,11 @@ namespace mongo {
all.clear();
prev = o;
all.push_back( o );
- fpm.hit();
+ killCurrentOp.checkForInterrupt();
dbtemprelease tl;
}
-
state.finalReduce( all );
-
+ pm.finished();
_tlmr.reset( 0 );
}
catch ( ... ){
@@ -471,7 +533,7 @@ namespace mongo {
result.append( "result" , mr.finalShort );
result.append( "timeMillis" , t.millis() );
- countsBuilder.append( "output" , finalCount );
+ countsBuilder.appendNumber( "output" , finalCount );
if ( mr.verbose ) result.append( "timing" , timingBuilder.obj() );
result.append( "counts" , countsBuilder.obj() );
@@ -493,11 +555,12 @@ namespace mongo {
public:
MapReduceFinishCommand() : Command( "mapreduce.shardedfinish" ){}
virtual bool slaveOk() { return true; }
-
+
+ virtual LockType locktype(){ return WRITE; }
bool run(const char *ns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){
- dbtemprelease temprlease; // we don't touch the db directly
-
- string dbname = cc().database()->name;
+ string dbname = cc().database()->name; // this has to come before dbtemprelease
+ dbtemprelease temprelease; // we don't touch the db directly
+
string shardedOutputCollection = cmdObj["shardedOutputCollection"].valuestrsafe();
MRSetup mr( dbname , cmdObj.firstElement().embeddedObjectUserCheck() , false );
@@ -540,14 +603,14 @@ namespace mongo {
if ( mr.finalizeCode.size() )
finalizeFunction = s->createFunction( mr.finalizeCode.c_str() );
- list<BSONObj> values;
+ BSONList values;
result.append( "result" , mr.finalShort );
DBDirectClient db;
while ( cursor.more() ){
- BSONObj t = cursor.next();
+ BSONObj t = cursor.next().getOwned();
if ( values.size() == 0 ){
values.push_back( t );