diff options
author | Antonin Kral <a.kral@bobek.cz> | 2011-03-17 00:07:52 +0100 |
---|---|---|
committer | Antonin Kral <a.kral@bobek.cz> | 2011-03-17 00:07:52 +0100 |
commit | 98b8b639326ab4c89eed73739d9903993c4c8959 (patch) | |
tree | 0462df078bf740093774d033b75f0ea24a31fa97 /db/commands | |
parent | f5d6e97ca8d2f3e7c4cdd5c9afbf8e756ef65bc2 (diff) | |
parent | 582fc32574a3b158c81e49cb00e6ae59205e66ba (diff) | |
download | mongodb-98b8b639326ab4c89eed73739d9903993c4c8959.tar.gz |
Merge commit 'upstream/1.8.0
Diffstat (limited to 'db/commands')
-rw-r--r-- | db/commands/distinct.cpp | 150 | ||||
-rw-r--r-- | db/commands/group.cpp | 202 | ||||
-rw-r--r-- | db/commands/isself.cpp | 220 | ||||
-rw-r--r-- | db/commands/mr.cpp | 1074 | ||||
-rw-r--r-- | db/commands/mr.h | 291 |
5 files changed, 1937 insertions, 0 deletions
diff --git a/db/commands/distinct.cpp b/db/commands/distinct.cpp new file mode 100644 index 0000000..2e26bcd --- /dev/null +++ b/db/commands/distinct.cpp @@ -0,0 +1,150 @@ +// distinct.cpp + +/** +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "pch.h" +#include "../commands.h" +#include "../instance.h" +#include "../queryoptimizer.h" +#include "../clientcursor.h" + +namespace mongo { + + class DistinctCommand : public Command { + public: + DistinctCommand() : Command("distinct") {} + virtual bool slaveOk() const { return true; } + virtual LockType locktype() const { return READ; } + virtual void help( stringstream &help ) const { + help << "{ distinct : 'collection name' , key : 'a.b' , query : {} }"; + } + + bool run(const string& dbname, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl ) { + Timer t; + string ns = dbname + '.' + cmdObj.firstElement().valuestr(); + + string key = cmdObj["key"].valuestrsafe(); + BSONObj keyPattern = BSON( key << 1 ); + + BSONObj query = getQuery( cmdObj ); + + int bufSize = BSONObjMaxUserSize - 4096; + BufBuilder bb( bufSize ); + char * start = bb.buf(); + + BSONArrayBuilder arr( bb ); + BSONElementSet values; + + long long nscanned = 0; // locations looked at + long long nscannedObjects = 0; // full objects looked at + long long n = 0; // matches + MatchDetails md; + + NamespaceDetails * d = nsdetails( ns.c_str() ); + + if ( ! d ) { + result.appendArray( "values" , BSONObj() ); + result.append( "stats" , BSON( "n" << 0 << "nscanned" << 0 << "nscannedObjects" << 0 ) ); + return true; + } + + shared_ptr<Cursor> cursor; + if ( ! query.isEmpty() ) { + cursor = bestGuessCursor(ns.c_str() , query , BSONObj() ); + } + else { + + // query is empty, so lets see if we can find an index + // with the key so we don't have to hit the raw data + NamespaceDetails::IndexIterator ii = d->ii(); + while ( ii.more() ) { + IndexDetails& idx = ii.next(); + + if ( d->isMultikey( ii.pos() - 1 ) ) + continue; + + if ( idx.inKeyPattern( key ) ) { + cursor = bestGuessCursor( ns.c_str() , BSONObj() , idx.keyPattern() ); + break; + } + + } + + if ( ! cursor.get() ) + cursor = bestGuessCursor(ns.c_str() , query , BSONObj() ); + + } + + + + scoped_ptr<ClientCursor> cc (new ClientCursor(QueryOption_NoCursorTimeout, cursor, ns)); + + while ( cursor->ok() ) { + nscanned++; + bool loadedObject = false; + + if ( !cursor->matcher() || cursor->matcher()->matchesCurrent( cursor.get() , &md ) ) { + n++; + + BSONElementSet temp; + loadedObject = ! cc->getFieldsDotted( key , temp ); + + for ( BSONElementSet::iterator i=temp.begin(); i!=temp.end(); ++i ) { + BSONElement e = *i; + if ( values.count( e ) ) + continue; + + int now = bb.len(); + + uassert(10044, "distinct too big, 4mb cap", ( now + e.size() + 1024 ) < bufSize ); + + arr.append( e ); + BSONElement x( start + now ); + + values.insert( x ); + } + } + + if ( loadedObject || md.loadedObject ) + nscannedObjects++; + + cursor->advance(); + + if (!cc->yieldSometimes()) + break; + + RARELY killCurrentOp.checkForInterrupt(); + } + + assert( start == bb.buf() ); + + result.appendArray( "values" , arr.done() ); + + { + BSONObjBuilder b; + b.appendNumber( "n" , n ); + b.appendNumber( "nscanned" , nscanned ); + b.appendNumber( "nscannedObjects" , nscannedObjects ); + b.appendNumber( "timems" , t.millis() ); + result.append( "stats" , b.obj() ); + } + + return true; + } + + } distinctCmd; + +} diff --git a/db/commands/group.cpp b/db/commands/group.cpp new file mode 100644 index 0000000..0cc6ab3 --- /dev/null +++ b/db/commands/group.cpp @@ -0,0 +1,202 @@ +// group.cpp + +/** +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "pch.h" +#include "../commands.h" +#include "../instance.h" +#include "../queryoptimizer.h" + +namespace mongo { + + class GroupCommand : public Command { + public: + GroupCommand() : Command("group") {} + virtual LockType locktype() const { return READ; } + virtual bool slaveOk() const { return false; } + virtual bool slaveOverrideOk() { return true; } + virtual void help( stringstream &help ) const { + help << "http://www.mongodb.org/display/DOCS/Aggregation"; + } + + BSONObj getKey( const BSONObj& obj , const BSONObj& keyPattern , ScriptingFunction func , double avgSize , Scope * s ) { + if ( func ) { + BSONObjBuilder b( obj.objsize() + 32 ); + b.append( "0" , obj ); + int res = s->invoke( func , b.obj() ); + uassert( 10041 , (string)"invoke failed in $keyf: " + s->getError() , res == 0 ); + int type = s->type("return"); + uassert( 10042 , "return of $key has to be an object" , type == Object ); + return s->getObject( "return" ); + } + return obj.extractFields( keyPattern , true ); + } + + bool group( string realdbname , const string& ns , const BSONObj& query , + BSONObj keyPattern , string keyFunctionCode , string reduceCode , const char * reduceScope , + BSONObj initial , string finalize , + string& errmsg , BSONObjBuilder& result ) { + + + auto_ptr<Scope> s = globalScriptEngine->getPooledScope( realdbname ); + s->localConnect( realdbname.c_str() ); + + if ( reduceScope ) + s->init( reduceScope ); + + s->setObject( "$initial" , initial , true ); + + s->exec( "$reduce = " + reduceCode , "reduce setup" , false , true , true , 100 ); + s->exec( "$arr = [];" , "reduce setup 2" , false , true , true , 100 ); + ScriptingFunction f = s->createFunction( + "function(){ " + " if ( $arr[n] == null ){ " + " next = {}; " + " Object.extend( next , $key ); " + " Object.extend( next , $initial , true ); " + " $arr[n] = next; " + " next = null; " + " } " + " $reduce( obj , $arr[n] ); " + "}" ); + + ScriptingFunction keyFunction = 0; + if ( keyFunctionCode.size() ) { + keyFunction = s->createFunction( keyFunctionCode.c_str() ); + } + + + double keysize = keyPattern.objsize() * 3; + double keynum = 1; + + map<BSONObj,int,BSONObjCmp> map; + list<BSONObj> blah; + + shared_ptr<Cursor> cursor = bestGuessCursor(ns.c_str() , query , BSONObj() ); + + while ( cursor->ok() ) { + if ( cursor->matcher() && ! cursor->matcher()->matchesCurrent( cursor.get() ) ) { + cursor->advance(); + continue; + } + + BSONObj obj = cursor->current(); + cursor->advance(); + + BSONObj key = getKey( obj , keyPattern , keyFunction , keysize / keynum , s.get() ); + keysize += key.objsize(); + keynum++; + + int& n = map[key]; + if ( n == 0 ) { + n = map.size(); + s->setObject( "$key" , key , true ); + + uassert( 10043 , "group() can't handle more than 20000 unique keys" , n <= 20000 ); + } + + s->setObject( "obj" , obj , true ); + s->setNumber( "n" , n - 1 ); + if ( s->invoke( f , BSONObj() , 0 , true ) ) { + throw UserException( 9010 , (string)"reduce invoke failed: " + s->getError() ); + } + } + + if (!finalize.empty()) { + s->exec( "$finalize = " + finalize , "finalize define" , false , true , true , 100 ); + ScriptingFunction g = s->createFunction( + "function(){ " + " for(var i=0; i < $arr.length; i++){ " + " var ret = $finalize($arr[i]); " + " if (ret !== undefined) " + " $arr[i] = ret; " + " } " + "}" ); + s->invoke( g , BSONObj() , 0 , true ); + } + + result.appendArray( "retval" , s->getObject( "$arr" ) ); + result.append( "count" , keynum - 1 ); + result.append( "keys" , (int)(map.size()) ); + s->exec( "$arr = [];" , "reduce setup 2" , false , true , true , 100 ); + s->gc(); + + return true; + } + + bool run(const string& dbname, BSONObj& jsobj, string& errmsg, BSONObjBuilder& result, bool fromRepl ) { + + /* db.$cmd.findOne( { group : <p> } ) */ + const BSONObj& p = jsobj.firstElement().embeddedObjectUserCheck(); + + BSONObj q; + if ( p["cond"].type() == Object ) + q = p["cond"].embeddedObject(); + else if ( p["condition"].type() == Object ) + q = p["condition"].embeddedObject(); + else + q = getQuery( p ); + + if ( p["ns"].type() != String ) { + errmsg = "ns has to be set"; + return false; + } + + string ns = dbname + "." + p["ns"].String(); + + BSONObj key; + string keyf; + if ( p["key"].type() == Object ) { + key = p["key"].embeddedObjectUserCheck(); + if ( ! p["$keyf"].eoo() ) { + errmsg = "can't have key and $keyf"; + return false; + } + } + else if ( p["$keyf"].type() ) { + keyf = p["$keyf"]._asCode(); + } + else { + // no key specified, will use entire object as key + } + + BSONElement reduce = p["$reduce"]; + if ( reduce.eoo() ) { + errmsg = "$reduce has to be set"; + return false; + } + + BSONElement initial = p["initial"]; + if ( initial.type() != Object ) { + errmsg = "initial has to be an object"; + return false; + } + + + string finalize; + if (p["finalize"].type()) + finalize = p["finalize"]._asCode(); + + return group( dbname , ns , q , + key , keyf , reduce._asCode() , reduce.type() != CodeWScope ? 0 : reduce.codeWScopeScopeData() , + initial.embeddedObject() , finalize , + errmsg , result ); + } + + } cmdGroup; + + +} // namespace mongo diff --git a/db/commands/isself.cpp b/db/commands/isself.cpp new file mode 100644 index 0000000..b97f51e --- /dev/null +++ b/db/commands/isself.cpp @@ -0,0 +1,220 @@ +// isself.cpp + +#include "pch.h" +#include "../../util/message.h" +#include "../commands.h" +#include "../../client/dbclient.h" + +#ifndef _WIN32 +# ifndef __sunos__ +# include <ifaddrs.h> +# endif +# include <sys/resource.h> +# include <sys/stat.h> +#endif + + +namespace mongo { + +#if !defined(_WIN32) && !defined(__sunos__) + + vector<string> getMyAddrs() { + ifaddrs * addrs; + + int status = getifaddrs(&addrs); + massert(13469, "getifaddrs failure: " + errnoWithDescription(errno), status == 0); + + vector<string> out; + + // based on example code from linux getifaddrs manpage + for (ifaddrs * addr = addrs; addr != NULL; addr = addr->ifa_next) { + if ( addr->ifa_addr == NULL ) continue; + int family = addr->ifa_addr->sa_family; + char host[NI_MAXHOST]; + + if (family == AF_INET || family == AF_INET6) { + status = getnameinfo(addr->ifa_addr, + (family == AF_INET ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6)), + host, NI_MAXHOST, NULL, 0, NI_NUMERICHOST); + if ( status != 0 ) { + freeifaddrs( addrs ); + addrs = NULL; + msgasserted( 13470, string("getnameinfo() failed: ") + gai_strerror(status) ); + } + + out.push_back(host); + } + + } + + freeifaddrs( addrs ); + addrs = NULL; + + if (logLevel >= 1) { + log(1) << "getMyAddrs():"; + for (vector<string>::const_iterator it=out.begin(), end=out.end(); it!=end; ++it) { + log(1) << " [" << *it << ']'; + } + log(1) << endl; + } + + return out; + } + + vector<string> getAllIPs(StringData iporhost) { + addrinfo* addrs = NULL; + addrinfo hints; + memset(&hints, 0, sizeof(addrinfo)); + hints.ai_socktype = SOCK_STREAM; + hints.ai_family = (IPv6Enabled() ? AF_UNSPEC : AF_INET); + + static string portNum = BSONObjBuilder::numStr(cmdLine.port); + + vector<string> out; + + int ret = getaddrinfo(iporhost.data(), portNum.c_str(), &hints, &addrs); + if ( ret ) { + warning() << "getaddrinfo(\"" << iporhost.data() << "\") failed: " << gai_strerror(ret) << endl; + return out; + } + + for (addrinfo* addr = addrs; addr != NULL; addr = addr->ai_next) { + int family = addr->ai_family; + char host[NI_MAXHOST]; + + if (family == AF_INET || family == AF_INET6) { + int status = getnameinfo(addr->ai_addr, addr->ai_addrlen, host, NI_MAXHOST, NULL, 0, NI_NUMERICHOST); + + massert(13472, string("getnameinfo() failed: ") + gai_strerror(status), status == 0); + + out.push_back(host); + } + + } + + freeaddrinfo(addrs); + + if (logLevel >= 1) { + log(1) << "getallIPs(\"" << iporhost << "\"):"; + for (vector<string>::const_iterator it=out.begin(), end=out.end(); it!=end; ++it) { + log(1) << " [" << *it << ']'; + } + log(1) << endl; + } + + return out; + } +#endif + + + class IsSelfCommand : public Command { + public: + IsSelfCommand() : Command("_isSelf") , _cacheLock( "IsSelfCommand::_cacheLock" ) {} + virtual bool slaveOk() const { return true; } + virtual LockType locktype() const { return NONE; } + virtual void help( stringstream &help ) const { + help << "{ _isSelf : 1 } INTERNAL ONLY"; + } + + bool run(const string& dbname, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl ) { + init(); + result.append( "id" , _id ); + return true; + } + + void init() { + scoped_lock lk( _cacheLock ); + if ( ! _id.isSet() ) + _id.init(); + } + + OID _id; + + mongo::mutex _cacheLock; + map<string,bool> _cache; + } isSelfCommand; + + bool HostAndPort::isSelf() const { + + int p = _port == -1 ? CmdLine::DefaultDBPort : _port; + + if( p != cmdLine.port ) { + // shortcut - ports have to match at the very least + return false; + } + + string host = str::stream() << _host << ":" << p; + + { + // check cache for this host + // debatably something _could_ change, but I'm not sure right now (erh 10/14/2010) + scoped_lock lk( isSelfCommand._cacheLock ); + map<string,bool>::const_iterator i = isSelfCommand._cache.find( host ); + if ( i != isSelfCommand._cache.end() ) + return i->second; + } + +#if !defined(_WIN32) && !defined(__sunos__) + // on linux and os x we can do a quick check for an ip match + + const vector<string> myaddrs = getMyAddrs(); + const vector<string> addrs = getAllIPs(_host); + + for (vector<string>::const_iterator i=myaddrs.begin(), iend=myaddrs.end(); i!=iend; ++i) { + for (vector<string>::const_iterator j=addrs.begin(), jend=addrs.end(); j!=jend; ++j) { + string a = *i; + string b = *j; + + if ( a == b || + ( str::startsWith( a , "127." ) && str::startsWith( b , "127." ) ) // 127. is all loopback + ) { + + // add to cache + scoped_lock lk( isSelfCommand._cacheLock ); + isSelfCommand._cache[host] = true; + return true; + } + } + } + +#endif + + if ( ! Listener::getTimeTracker() ) { + // this ensures we are actually running a server + // this may return true later, so may want to retry + return false; + } + + + try { + + isSelfCommand.init(); + + DBClientConnection conn; + string errmsg; + if ( ! conn.connect( host , errmsg ) ) { + // should this go in the cache? + return false; + } + + BSONObj out; + bool ok = conn.simpleCommand( "admin" , &out , "_isSelf" ); + + bool me = ok && out["id"].type() == jstOID && isSelfCommand._id == out["id"].OID(); + + // add to cache + scoped_lock lk( isSelfCommand._cacheLock ); + isSelfCommand._cache[host] = me; + + return me; + } + catch ( std::exception& e ) { + warning() << "could't check isSelf (" << host << ") " << e.what() << endl; + } + + return false; + } + + + +} diff --git a/db/commands/mr.cpp b/db/commands/mr.cpp new file mode 100644 index 0000000..16c604a --- /dev/null +++ b/db/commands/mr.cpp @@ -0,0 +1,1074 @@ +// mr.cpp + +/** + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#include "pch.h" +#include "../db.h" +#include "../instance.h" +#include "../commands.h" +#include "../../scripting/engine.h" +#include "../../client/dbclient.h" +#include "../../client/connpool.h" +#include "../../client/parallel.h" +#include "../queryoptimizer.h" +#include "../matcher.h" +#include "../clientcursor.h" +#include "../replpair.h" +#include "../../s/d_chunk_manager.h" +#include "../../s/d_logic.h" + +#include "mr.h" + +namespace mongo { + + namespace mr { + + AtomicUInt Config::JOB_NUMBER; + + JSFunction::JSFunction( string type , const BSONElement& e ) { + _type = type; + _code = e._asCode(); + + if ( e.type() == CodeWScope ) + _wantedScope = e.codeWScopeObject(); + } + + void JSFunction::init( State * state ) { + _scope = state->scope(); + assert( _scope ); + _scope->init( &_wantedScope ); + + _func = _scope->createFunction( _code.c_str() ); + uassert( 13598 , str::stream() << "couldn't compile code for: " << _type , _func ); + } + + void JSMapper::init( State * state ) { + _func.init( state ); + _params = state->config().mapParams; + } + + /** + * Applies the map function to an object, which should internally call emit() + */ + void JSMapper::map( const BSONObj& o ) { + Scope * s = _func.scope(); + assert( s ); + s->setThis( &o ); + if ( s->invoke( _func.func() , _params , 0 , true ) ) + throw UserException( 9014, str::stream() << "map invoke failed: " + s->getError() ); + } + + /** + * Applies the finalize function to a tuple obj (key, val) + * Returns tuple obj {_id: key, value: newval} + */ + BSONObj JSFinalizer::finalize( const BSONObj& o ) { + Scope * s = _func.scope(); + + Scope::NoDBAccess no = s->disableDBAccess( "can't access db inside finalize" ); + s->invokeSafe( _func.func() , o ); + + // don't want to use o.objsize() to size b + // since there are many cases where the point of finalize + // is converting many fields to 1 + BSONObjBuilder b; + b.append( o.firstElement() ); + s->append( b , "value" , "return" ); + return b.obj(); + } + + /** + * Reduces a list of tuple objects (key, value) to a single tuple {"0": key, "1": value} + */ + BSONObj JSReducer::reduce( const BSONList& tuples ) { + if (tuples.size() <= 1) + return tuples[0]; + BSONObj key; + int endSizeEstimate = 16; + _reduce( tuples , key , endSizeEstimate ); + + BSONObjBuilder b(endSizeEstimate); + b.appendAs( key.firstElement() , "0" ); + _func.scope()->append( b , "1" , "return" ); + return b.obj(); + } + + /** + * Reduces a list of tuple object (key, value) to a single tuple {_id: key, value: val} + * Also applies a finalizer method if present. + */ + BSONObj JSReducer::finalReduce( const BSONList& tuples , Finalizer * finalizer ) { + + BSONObj res; + BSONObj key; + + if (tuples.size() == 1) { + // 1 obj, just use it + key = tuples[0]; + BSONObjBuilder b(key.objsize()); + BSONObjIterator it(key); + b.appendAs( it.next() , "_id" ); + b.appendAs( it.next() , "value" ); + res = b.obj(); + } + else { + // need to reduce + int endSizeEstimate = 16; + _reduce( tuples , key , endSizeEstimate ); + BSONObjBuilder b(endSizeEstimate); + b.appendAs( key.firstElement() , "_id" ); + _func.scope()->append( b , "value" , "return" ); + res = b.obj(); + } + + if ( finalizer ) { + res = finalizer->finalize( res ); + } + + return res; + } + + /** + * actually applies a reduce, to a list of tuples (key, value). + * After the call, tuples will hold a single tuple {"0": key, "1": value} + */ + void JSReducer::_reduce( const BSONList& tuples , BSONObj& key , int& endSizeEstimate ) { + uassert( 10074 , "need values" , tuples.size() ); + + int sizeEstimate = ( tuples.size() * tuples.begin()->getField( "value" ).size() ) + 128; + + // need to build the reduce args: ( key, [values] ) + BSONObjBuilder reduceArgs( sizeEstimate ); + boost::scoped_ptr<BSONArrayBuilder> valueBuilder; + int sizeSoFar = 0; + unsigned n = 0; + for ( ; n<tuples.size(); n++ ) { + BSONObjIterator j(tuples[n]); + BSONElement keyE = j.next(); + if ( n == 0 ) { + reduceArgs.append( keyE ); + key = keyE.wrap(); + sizeSoFar = 5 + keyE.size(); + valueBuilder.reset(new BSONArrayBuilder( reduceArgs.subarrayStart( "tuples" ) )); + } + + BSONElement ee = j.next(); + + uassert( 13070 , "value too large to reduce" , ee.size() < ( BSONObjMaxUserSize / 2 ) ); + + if ( sizeSoFar + ee.size() > BSONObjMaxUserSize ) { + assert( n > 1 ); // if not, inf. loop + break; + } + + valueBuilder->append( ee ); + sizeSoFar += ee.size(); + } + assert(valueBuilder); + valueBuilder->done(); + BSONObj args = reduceArgs.obj(); + + Scope * s = _func.scope(); + + s->invokeSafe( _func.func() , args ); + + if ( s->type( "return" ) == Array ) { + uasserted( 10075 , "reduce -> multiple not supported yet"); + return; + } + + endSizeEstimate = key.objsize() + ( args.objsize() / tuples.size() ); + + if ( n == tuples.size() ) + return; + + // the input list was too large, add the rest of elmts to new tuples and reduce again + // note: would be better to use loop instead of recursion to avoid stack overflow + BSONList x; + for ( ; n < tuples.size(); n++ ) { + x.push_back( tuples[n] ); + } + BSONObjBuilder temp( endSizeEstimate ); + temp.append( key.firstElement() ); + s->append( temp , "1" , "return" ); + x.push_back( temp.obj() ); + _reduce( x , key , endSizeEstimate ); + } + + Config::Config( const string& _dbname , const BSONObj& cmdObj ) { + + dbname = _dbname; + ns = dbname + "." + cmdObj.firstElement().valuestr(); + + verbose = cmdObj["verbose"].trueValue(); + + uassert( 13602 , "outType is no longer a valid option" , cmdObj["outType"].eoo() ); + + if ( cmdObj["out"].type() == String ) { + finalShort = cmdObj["out"].String(); + outType = REPLACE; + } + else if ( cmdObj["out"].type() == Object ) { + BSONObj o = cmdObj["out"].embeddedObject(); + + BSONElement e = o.firstElement(); + string t = e.fieldName(); + + if ( t == "normal" || t == "replace" ) { + outType = REPLACE; + finalShort = e.String(); + } + else if ( t == "merge" ) { + outType = MERGE; + finalShort = e.String(); + } + else if ( t == "reduce" ) { + outType = REDUCE; + finalShort = e.String(); + } + else if ( t == "inline" ) { + outType = INMEMORY; + } + else { + uasserted( 13522 , str::stream() << "unknown out specifier [" << t << "]" ); + } + + if (o.hasElement("db")) { + outDB = o["db"].String(); + } + } + else { + uasserted( 13606 , "'out' has to be a string or an object" ); + } + + if ( outType != INMEMORY ) { // setup names + tempLong = str::stream() << (outDB.empty() ? dbname : outDB) << ".tmp.mr." << cmdObj.firstElement().String() << "_" << finalShort << "_" << JOB_NUMBER++; + + incLong = tempLong + "_inc"; + + finalLong = str::stream() << (outDB.empty() ? dbname : outDB) << "." << finalShort; + } + + { + // scope and code + + if ( cmdObj["scope"].type() == Object ) + scopeSetup = cmdObj["scope"].embeddedObjectUserCheck(); + + mapper.reset( new JSMapper( cmdObj["map"] ) ); + reducer.reset( new JSReducer( cmdObj["reduce"] ) ); + if ( cmdObj["finalize"].type() && cmdObj["finalize"].trueValue() ) + finalizer.reset( new JSFinalizer( cmdObj["finalize"] ) ); + + if ( cmdObj["mapparams"].type() == Array ) { + mapParams = cmdObj["mapparams"].embeddedObjectUserCheck(); + } + + } + + { + // query options + BSONElement q = cmdObj["query"]; + if ( q.type() == Object ) + filter = q.embeddedObjectUserCheck(); + else + uassert( 13608 , "query has to be blank or an Object" , ! q.trueValue() ); + + + BSONElement s = cmdObj["sort"]; + if ( s.type() == Object ) + sort = s.embeddedObjectUserCheck(); + else + uassert( 13609 , "sort has to be blank or an Object" , ! s.trueValue() ); + + if ( cmdObj["limit"].isNumber() ) + limit = cmdObj["limit"].numberLong(); + else + limit = 0; + } + } + + /** + * Create temporary collection, set up indexes + */ + void State::prepTempCollection() { + if ( ! _onDisk ) + return; + + _db.dropCollection( _config.tempLong ); + + { + // create + writelock lock( _config.tempLong.c_str() ); + Client::Context ctx( _config.tempLong.c_str() ); + string errmsg; + if ( ! userCreateNS( _config.tempLong.c_str() , BSONObj() , errmsg , true ) ) { + uasserted( 13630 , str::stream() << "userCreateNS failed for mr tempLong ns: " << _config.tempLong << " err: " << errmsg ); + } + } + + + { + // copy indexes + auto_ptr<DBClientCursor> idx = _db.getIndexes( _config.finalLong ); + while ( idx->more() ) { + BSONObj i = idx->next(); + + BSONObjBuilder b( i.objsize() + 16 ); + b.append( "ns" , _config.tempLong ); + BSONObjIterator j( i ); + while ( j.more() ) { + BSONElement e = j.next(); + if ( str::equals( e.fieldName() , "_id" ) || + str::equals( e.fieldName() , "ns" ) ) + continue; + + b.append( e ); + } + + BSONObj indexToInsert = b.obj(); + insert( Namespace( _config.tempLong.c_str() ).getSisterNS( "system.indexes" ).c_str() , indexToInsert ); + } + + } + + } + + /** + * For inline mode, appends results to output object. + * Makes sure (key, value) tuple is formatted as {_id: key, value: val} + */ + void State::appendResults( BSONObjBuilder& final ) { + if ( _onDisk ) + return; + + uassert( 13604 , "too much data for in memory map/reduce" , _size < ( BSONObjMaxUserSize / 2 ) ); + + BSONArrayBuilder b( (int)(_size * 1.2) ); // _size is data size, doesn't count overhead and keys + + for ( InMemory::iterator i=_temp->begin(); i!=_temp->end(); ++i ) { + BSONObj key = i->first; + BSONList& all = i->second; + + assert( all.size() == 1 ); + + BSONObjIterator vi( all[0] ); + vi.next(); + + BSONObjBuilder temp( b.subobjStart() ); + temp.appendAs( key.firstElement() , "_id" ); + temp.appendAs( vi.next() , "value" ); + temp.done(); + } + + BSONArray res = b.arr(); + uassert( 13605 , "too much data for in memory map/reduce" , res.objsize() < ( BSONObjMaxUserSize * 2 / 3 ) ); + + final.append( "results" , res ); + } + + /** + * Does post processing on output collection. + * This may involve replacing, merging or reducing. + */ + long long State::postProcessCollection() { + if ( _onDisk == false || _config.outType == Config::INMEMORY ) + return _temp->size(); + + dblock lock; + + if ( _config.finalLong == _config.tempLong ) + return _db.count( _config.finalLong ); + + if ( _config.outType == Config::REPLACE || _db.count( _config.finalLong ) == 0 ) { + // replace: just rename from temp to final collection name, dropping previous collection + _db.dropCollection( _config.finalLong ); + BSONObj info; + uassert( 10076 , "rename failed" , + _db.runCommand( "admin" , BSON( "renameCollection" << _config.tempLong << "to" << _config.finalLong ) , info ) ); + _db.dropCollection( _config.tempLong ); + } + else if ( _config.outType == Config::MERGE ) { + // merge: upsert new docs into old collection + auto_ptr<DBClientCursor> cursor = _db.query( _config.tempLong , BSONObj() ); + while ( cursor->more() ) { + BSONObj o = cursor->next(); + Helpers::upsert( _config.finalLong , o ); + getDur().commitIfNeeded(); + } + _db.dropCollection( _config.tempLong ); + } + else if ( _config.outType == Config::REDUCE ) { + // reduce: apply reduce op on new result and existing one + BSONList values; + + auto_ptr<DBClientCursor> cursor = _db.query( _config.tempLong , BSONObj() ); + while ( cursor->more() ) { + BSONObj temp = cursor->next(); + BSONObj old; + + bool found; + { + Client::Context tx( _config.finalLong ); + found = Helpers::findOne( _config.finalLong.c_str() , temp["_id"].wrap() , old , true ); + } + + if ( found ) { + // need to reduce + values.clear(); + values.push_back( temp ); + values.push_back( old ); + Helpers::upsert( _config.finalLong , _config.reducer->finalReduce( values , _config.finalizer.get() ) ); + } + else { + Helpers::upsert( _config.finalLong , temp ); + } + getDur().commitIfNeeded(); + } + _db.dropCollection( _config.tempLong ); + } + + return _db.count( _config.finalLong ); + } + + /** + * Insert doc in collection + */ + void State::insert( const string& ns , BSONObj& o ) { + assert( _onDisk ); + + writelock l( ns ); + Client::Context ctx( ns ); + + theDataFileMgr.insertAndLog( ns.c_str() , o , false ); + } + + /** + * Insert doc into the inc collection + */ + void State::_insertToInc( BSONObj& o ) { + assert( _onDisk ); + theDataFileMgr.insertWithObjMod( _config.incLong.c_str() , o , true ); + getDur().commitIfNeeded(); + } + + State::State( const Config& c ) : _config( c ), _size(0), _numEmits(0) { + _temp.reset( new InMemory() ); + _onDisk = _config.outType != Config::INMEMORY; + } + + bool State::sourceExists() { + return _db.exists( _config.ns ); + } + + long long State::incomingDocuments() { + return _db.count( _config.ns , _config.filter , QueryOption_SlaveOk , (unsigned) _config.limit ); + } + + State::~State() { + if ( _onDisk ) { + try { + _db.dropCollection( _config.tempLong ); + _db.dropCollection( _config.incLong ); + } + catch ( std::exception& e ) { + error() << "couldn't cleanup after map reduce: " << e.what() << endl; + } + } + } + + /** + * Initialize the mapreduce operation, creating the inc collection + */ + void State::init() { + // setup js + _scope.reset(globalScriptEngine->getPooledScope( _config.dbname ).release() ); + _scope->localConnect( _config.dbname.c_str() ); + + if ( ! _config.scopeSetup.isEmpty() ) + _scope->init( &_config.scopeSetup ); + + _config.mapper->init( this ); + _config.reducer->init( this ); + if ( _config.finalizer ) + _config.finalizer->init( this ); + + _scope->injectNative( "emit" , fast_emit ); + + if ( _onDisk ) { + // clear temp collections + _db.dropCollection( _config.tempLong ); + _db.dropCollection( _config.incLong ); + + // create the inc collection and make sure we have index on "0" key + { + writelock l( _config.incLong ); + Client::Context ctx( _config.incLong ); + string err; + if ( ! userCreateNS( _config.incLong.c_str() , BSON( "autoIndexId" << 0 ) , err , false ) ) { + uasserted( 13631 , str::stream() << "userCreateNS failed for mr incLong ns: " << _config.incLong << " err: " << err ); + } + } + + BSONObj sortKey = BSON( "0" << 1 ); + _db.ensureIndex( _config.incLong , sortKey ); + + } + + } + + /** + * Applies last reduce and finalize on a list of tuples (key, val) + * Inserts single result {_id: key, value: val} into temp collection + */ + void State::finalReduce( BSONList& values ) { + if ( !_onDisk || values.size() == 0 ) + return; + + BSONObj res = _config.reducer->finalReduce( values , _config.finalizer.get() ); + insert( _config.tempLong , res ); + } + + /** + * Applies last reduce and finalize. + * After calling this method, the temp collection will be completed. + * If inline, the results will be in the in memory map + */ + void State::finalReduce( CurOp * op , ProgressMeterHolder& pm ) { + if ( ! _onDisk ) { + // all data has already been reduced, just finalize + if ( _config.finalizer ) { + long size = 0; + for ( InMemory::iterator i=_temp->begin(); i!=_temp->end(); ++i ) { + BSONObj key = i->first; + BSONList& all = i->second; + + assert( all.size() == 1 ); + + BSONObj res = _config.finalizer->finalize( all[0] ); + + all.clear(); + all.push_back( res ); + size += res.objsize(); + } + _size = size; + } + return; + } + + // use index on "0" to pull sorted data + assert( _temp->size() == 0 ); + BSONObj sortKey = BSON( "0" << 1 ); + { + bool foundIndex = false; + + auto_ptr<DBClientCursor> idx = _db.getIndexes( _config.incLong ); + while ( idx.get() && idx->more() ) { + BSONObj x = idx->next(); + if ( sortKey.woCompare( x["key"].embeddedObject() ) == 0 ) { + foundIndex = true; + break; + } + } + + assert( foundIndex ); + } + + readlock rl( _config.incLong.c_str() ); + Client::Context ctx( _config.incLong ); + + BSONObj prev; + BSONList all; + + assert( pm == op->setMessage( "m/r: (3/3) final reduce to collection" , _db.count( _config.incLong, BSONObj(), QueryOption_SlaveOk ) ) ); + + shared_ptr<Cursor> temp = bestGuessCursor( _config.incLong.c_str() , BSONObj() , sortKey ); + auto_ptr<ClientCursor> cursor( new ClientCursor( QueryOption_NoCursorTimeout , temp , _config.incLong.c_str() ) ); + + // iterate over all sorted objects + while ( cursor->ok() ) { + BSONObj o = cursor->current().getOwned(); + cursor->advance(); + + pm.hit(); + + if ( o.woSortOrder( prev , sortKey ) == 0 ) { + // object is same as previous, add to array + all.push_back( o ); + if ( pm->hits() % 1000 == 0 ) { + if ( ! cursor->yield() ) { + cursor.release(); + break; + } + killCurrentOp.checkForInterrupt(); + } + continue; + } + + ClientCursor::YieldLock yield (cursor.get()); + // reduce an finalize array + finalReduce( all ); + + all.clear(); + prev = o; + all.push_back( o ); + + if ( ! yield.stillOk() ) { + cursor.release(); + break; + } + + killCurrentOp.checkForInterrupt(); + } + + // we need to release here since we temp release below + cursor.release(); + + { + dbtempreleasecond tl; + if ( ! tl.unlocked() ) + log( LL_WARNING ) << "map/reduce can't temp release" << endl; + // reduce and finalize last array + finalReduce( all ); + } + + pm.finished(); + } + + /** + * Attempts to reduce objects in the memory map. + * A new memory map will be created to hold the results. + * If applicable, objects with unique key may be dumped to inc collection. + * Input and output objects are both {"0": key, "1": val} + */ + void State::reduceInMemory() { + + auto_ptr<InMemory> n( new InMemory() ); // for new data + long nSize = 0; + long dupCount = 0; + + for ( InMemory::iterator i=_temp->begin(); i!=_temp->end(); ++i ) { + BSONObj key = i->first; + BSONList& all = i->second; + + if ( all.size() == 1 ) { + // only 1 value for this key + if ( _onDisk ) { + // this key has low cardinality, so just write to collection + writelock l(_config.incLong); + Client::Context ctx(_config.incLong.c_str()); + _insertToInc( *(all.begin()) ); + } + else { + // add to new map + _add( n.get() , all[0] , nSize, dupCount ); + } + } + else if ( all.size() > 1 ) { + // several values, reduce and add to map + BSONObj res = _config.reducer->reduce( all ); + _add( n.get() , res , nSize, dupCount ); + } + } + + // swap maps + _temp.reset( n.release() ); + _size = nSize; + _dupCount = dupCount; + } + + /** + * Dumps the entire in memory map to the inc collection. + */ + void State::dumpToInc() { + if ( ! _onDisk ) + return; + + writelock l(_config.incLong); + Client::Context ctx(_config.incLong); + + for ( InMemory::iterator i=_temp->begin(); i!=_temp->end(); i++ ) { + BSONList& all = i->second; + if ( all.size() < 1 ) + continue; + + for ( BSONList::iterator j=all.begin(); j!=all.end(); j++ ) + _insertToInc( *j ); + } + _temp->clear(); + _size = 0; + + } + + /** + * Adds object to in memory map + */ + void State::emit( const BSONObj& a ) { + _numEmits++; + _add( _temp.get() , a , _size, _dupCount ); + } + + void State::_add( InMemory* im, const BSONObj& a , long& size, long& dupCount ) { + BSONList& all = (*im)[a]; + all.push_back( a ); + size += a.objsize() + 16; + if (all.size() > 1) + ++dupCount; + } + + /** + * this method checks the size of in memory map and potentially flushes to disk + */ + void State::checkSize() { + if ( _size < 1024 * 50 ) + return; + + // attempt to reduce in memory map, if we've seen duplicates + if ( _dupCount > 0) { + long before = _size; + reduceInMemory(); + log(1) << " mr: did reduceInMemory " << before << " -->> " << _size << endl; + } + + if ( ! _onDisk || _size < 1024 * 100 ) + return; + + dumpToInc(); + log(1) << " mr: dumping to db" << endl; + } + + boost::thread_specific_ptr<State*> _tl; + + /** + * emit that will be called by js function + */ + BSONObj fast_emit( const BSONObj& args ) { + uassert( 10077 , "fast_emit takes 2 args" , args.nFields() == 2 ); + uassert( 13069 , "an emit can't be more than half max bson size" , args.objsize() < ( BSONObjMaxUserSize / 2 ) ); + (*_tl)->emit( args ); + return BSONObj(); + } + + /** + * This class represents a map/reduce command executed on a single server + */ + class MapReduceCommand : public Command { + public: + MapReduceCommand() : Command("mapReduce", false, "mapreduce") {} + virtual bool slaveOk() const { return !replSet; } + virtual bool slaveOverrideOk() { return true; } + + virtual void help( stringstream &help ) const { + help << "Run a map/reduce operation on the server.\n"; + help << "Note this is used for aggregation, not querying, in MongoDB.\n"; + help << "http://www.mongodb.org/display/DOCS/MapReduce"; + } + virtual LockType locktype() const { return NONE; } + bool run(const string& dbname , BSONObj& cmd, string& errmsg, BSONObjBuilder& result, bool fromRepl ) { + Timer t; + Client::GodScope cg; + Client& client = cc(); + CurOp * op = client.curop(); + + Config config( dbname , cmd ); + + log(1) << "mr ns: " << config.ns << endl; + + bool shouldHaveData = false; + + long long num = 0; + long long inReduce = 0; + + BSONObjBuilder countsBuilder; + BSONObjBuilder timingBuilder; + State state( config ); + + if ( ! state.sourceExists() ) { + errmsg = "ns doesn't exist"; + return false; + } + + if (replSet && state.isOnDisk()) { + // this means that it will be doing a write operation, make sure we are on Master + // ideally this check should be in slaveOk(), but at that point config is not known + if (!isMaster(dbname.c_str())) { + errmsg = "not master"; + return false; + } + } + + try { + state.init(); + + { + State** s = new State*(); + s[0] = &state; + _tl.reset( s ); + } + + wassert( config.limit < 0x4000000 ); // see case on next line to 32 bit unsigned + ProgressMeterHolder pm( op->setMessage( "m/r: (1/3) emit phase" , state.incomingDocuments() ) ); + long long mapTime = 0; + { + readlock lock( config.ns ); + Client::Context ctx( config.ns ); + + ShardChunkManagerPtr chunkManager; + if ( shardingState.needShardChunkManager( config.ns ) ) { + chunkManager = shardingState.getShardChunkManager( config.ns ); + } + + // obtain cursor on data to apply mr to, sorted + shared_ptr<Cursor> temp = bestGuessCursor( config.ns.c_str(), config.filter, config.sort ); + auto_ptr<ClientCursor> cursor( new ClientCursor( QueryOption_NoCursorTimeout , temp , config.ns.c_str() ) ); + + Timer mt; + // go through each doc + while ( cursor->ok() ) { + // make sure we dont process duplicates in case data gets moved around during map + if ( cursor->currentIsDup() ) { + cursor->advance(); + continue; + } + + if ( ! cursor->currentMatches() ) { + cursor->advance(); + continue; + } + + BSONObj o = cursor->current(); + cursor->advance(); + + // check to see if this is a new object we don't own yet + // because of a chunk migration + if ( chunkManager && ! chunkManager->belongsToMe( o ) ) + continue; + + // do map + if ( config.verbose ) mt.reset(); + config.mapper->map( o ); + if ( config.verbose ) mapTime += mt.micros(); + + num++; + if ( num % 100 == 0 ) { + // try to yield lock regularly + ClientCursor::YieldLock yield (cursor.get()); + Timer t; + // check if map needs to be dumped to disk + state.checkSize(); + inReduce += t.micros(); + + if ( ! yield.stillOk() ) { + cursor.release(); + break; + } + + killCurrentOp.checkForInterrupt(); + } + pm.hit(); + + if ( config.limit && num >= config.limit ) + break; + } + } + pm.finished(); + + killCurrentOp.checkForInterrupt(); + // update counters + countsBuilder.appendNumber( "input" , num ); + countsBuilder.appendNumber( "emit" , state.numEmits() ); + if ( state.numEmits() ) + shouldHaveData = true; + + timingBuilder.append( "mapTime" , mapTime / 1000 ); + timingBuilder.append( "emitLoop" , t.millis() ); + + op->setMessage( "m/r: (2/3) final reduce in memory" ); + // do reduce in memory + // this will be the last reduce needed for inline mode + state.reduceInMemory(); + // if not inline: dump the in memory map to inc collection, all data is on disk + state.dumpToInc(); + state.prepTempCollection(); + // final reduce + state.finalReduce( op , pm ); + + _tl.reset(); + } + catch ( ... ) { + log() << "mr failed, removing collection" << endl; + throw; + } + + long long finalCount = state.postProcessCollection(); + state.appendResults( result ); + + timingBuilder.append( "total" , t.millis() ); + + if (!config.outDB.empty()) { + BSONObjBuilder loc; + if ( !config.outDB.empty()) + loc.append( "db" , config.outDB ); + if ( !config.finalShort.empty() ) + loc.append( "collection" , config.finalShort ); + result.append("result", loc.obj()); + } + else { + if ( !config.finalShort.empty() ) + result.append( "result" , config.finalShort ); + } + result.append( "timeMillis" , t.millis() ); + countsBuilder.appendNumber( "output" , finalCount ); + if ( config.verbose ) result.append( "timing" , timingBuilder.obj() ); + result.append( "counts" , countsBuilder.obj() ); + + if ( finalCount == 0 && shouldHaveData ) { + result.append( "cmd" , cmd ); + errmsg = "there were emits but no data!"; + return false; + } + + return true; + } + + } mapReduceCommand; + + /** + * This class represents a map/reduce command executed on the output server of a sharded env + */ + class MapReduceFinishCommand : public Command { + public: + MapReduceFinishCommand() : Command( "mapreduce.shardedfinish" ) {} + virtual bool slaveOk() const { return !replSet; } + virtual bool slaveOverrideOk() { return true; } + + virtual LockType locktype() const { return NONE; } + bool run(const string& dbname , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool) { + string shardedOutputCollection = cmdObj["shardedOutputCollection"].valuestrsafe(); + + Config config( dbname , cmdObj.firstElement().embeddedObjectUserCheck() ); + config.incLong = config.tempLong; + + set<ServerAndQuery> servers; + + BSONObjBuilder shardCounts; + map<string,long long> counts; + + BSONObj shards = cmdObj["shards"].embeddedObjectUserCheck(); + vector< auto_ptr<DBClientCursor> > shardCursors; + + { + // parse per shard results + BSONObjIterator i( shards ); + while ( i.more() ) { + BSONElement e = i.next(); + string shard = e.fieldName(); + + BSONObj res = e.embeddedObjectUserCheck(); + + uassert( 10078 , "something bad happened" , shardedOutputCollection == res["result"].valuestrsafe() ); + servers.insert( shard ); + shardCounts.appendAs( res["counts"] , shard ); + + BSONObjIterator j( res["counts"].embeddedObjectUserCheck() ); + while ( j.more() ) { + BSONElement temp = j.next(); + counts[temp.fieldName()] += temp.numberLong(); + } + + } + + } + + State state(config); + state.prepTempCollection(); + + { + // reduce from each stream + + BSONObj sortKey = BSON( "_id" << 1 ); + + ParallelSortClusteredCursor cursor( servers , dbname + "." + shardedOutputCollection , + Query().sort( sortKey ) ); + cursor.init(); + state.init(); + + BSONList values; + if (!config.outDB.empty()) { + BSONObjBuilder loc; + if ( !config.outDB.empty()) + loc.append( "db" , config.outDB ); + if ( !config.finalShort.empty() ) + loc.append( "collection" , config.finalShort ); + result.append("result", loc.obj()); + } + else { + if ( !config.finalShort.empty() ) + result.append( "result" , config.finalShort ); + } + + while ( cursor.more() ) { + BSONObj t = cursor.next().getOwned(); + + if ( values.size() == 0 ) { + values.push_back( t ); + continue; + } + + if ( t.woSortOrder( *(values.begin()) , sortKey ) == 0 ) { + values.push_back( t ); + continue; + } + + + state.emit( config.reducer->finalReduce( values , config.finalizer.get() ) ); + values.clear(); + values.push_back( t ); + } + + if ( values.size() ) + state.emit( config.reducer->finalReduce( values , config.finalizer.get() ) ); + } + + + state.dumpToInc(); + state.postProcessCollection(); + state.appendResults( result ); + + for ( set<ServerAndQuery>::iterator i=servers.begin(); i!=servers.end(); i++ ) { + ScopedDbConnection conn( i->_server ); + conn->dropCollection( dbname + "." + shardedOutputCollection ); + conn.done(); + } + + result.append( "shardCounts" , shardCounts.obj() ); + + { + BSONObjBuilder c; + for ( map<string,long long>::iterator i=counts.begin(); i!=counts.end(); i++ ) { + c.append( i->first , i->second ); + } + result.append( "counts" , c.obj() ); + } + + return 1; + } + } mapReduceFinishCommand; + + } + +} + diff --git a/db/commands/mr.h b/db/commands/mr.h new file mode 100644 index 0000000..f505a45 --- /dev/null +++ b/db/commands/mr.h @@ -0,0 +1,291 @@ +// mr.h + +/** + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#pragma once + +#include "pch.h" + +namespace mongo { + + namespace mr { + + typedef vector<BSONObj> BSONList; + + class State; + + // ------------ function interfaces ----------- + + class Mapper : boost::noncopyable { + public: + virtual ~Mapper() {} + virtual void init( State * state ) = 0; + + virtual void map( const BSONObj& o ) = 0; + }; + + class Finalizer : boost::noncopyable { + public: + virtual ~Finalizer() {} + virtual void init( State * state ) = 0; + + /** + * this takes a tuple and returns a tuple + */ + virtual BSONObj finalize( const BSONObj& tuple ) = 0; + }; + + class Reducer : boost::noncopyable { + public: + virtual ~Reducer() {} + virtual void init( State * state ) = 0; + + virtual BSONObj reduce( const BSONList& tuples ) = 0; + /** this means its a final reduce, even if there is no finalizer */ + virtual BSONObj finalReduce( const BSONList& tuples , Finalizer * finalizer ) = 0; + }; + + // ------------ js function implementations ----------- + + /** + * used as a holder for Scope and ScriptingFunction + * visitor like pattern as Scope is gotten from first access + */ + class JSFunction : boost::noncopyable { + public: + /** + * @param type (map|reduce|finalize) + */ + JSFunction( string type , const BSONElement& e ); + virtual ~JSFunction() {} + + virtual void init( State * state ); + + Scope * scope() const { return _scope; } + ScriptingFunction func() const { return _func; } + + private: + string _type; + string _code; // actual javascript code + BSONObj _wantedScope; // this is for CodeWScope + + Scope * _scope; // this is not owned by us, and might be shared + ScriptingFunction _func; + }; + + class JSMapper : public Mapper { + public: + JSMapper( const BSONElement & code ) : _func( "map" , code ) {} + virtual void map( const BSONObj& o ); + virtual void init( State * state ); + + private: + JSFunction _func; + BSONObj _params; + }; + + class JSReducer : public Reducer { + public: + JSReducer( const BSONElement& code ) : _func( "reduce" , code ) {} + virtual void init( State * state ) { _func.init( state ); } + + virtual BSONObj reduce( const BSONList& tuples ); + virtual BSONObj finalReduce( const BSONList& tuples , Finalizer * finalizer ); + + private: + + /** + * result in "return" + * @param key OUT + * @param endSizeEstimate OUT + */ + void _reduce( const BSONList& values , BSONObj& key , int& endSizeEstimate ); + + JSFunction _func; + + }; + + class JSFinalizer : public Finalizer { + public: + JSFinalizer( const BSONElement& code ) : _func( "finalize" , code ) {} + virtual BSONObj finalize( const BSONObj& o ); + virtual void init( State * state ) { _func.init( state ); } + private: + JSFunction _func; + + }; + + // ----------------- + + + class TupleKeyCmp { + public: + TupleKeyCmp() {} + bool operator()( const BSONObj &l, const BSONObj &r ) const { + return l.firstElement().woCompare( r.firstElement() ) < 0; + } + }; + + typedef map< BSONObj,BSONList,TupleKeyCmp > InMemory; // from key to list of tuples + + /** + * holds map/reduce config information + */ + class Config { + public: + Config( const string& _dbname , const BSONObj& cmdObj ); + + string dbname; + string ns; + + // options + bool verbose; + + // query options + + BSONObj filter; + BSONObj sort; + long long limit; + + // functions + + scoped_ptr<Mapper> mapper; + scoped_ptr<Reducer> reducer; + scoped_ptr<Finalizer> finalizer; + + BSONObj mapParams; + BSONObj scopeSetup; + + // output tables + string incLong; + string tempLong; + + string finalShort; + string finalLong; + + string outDB; + + enum { REPLACE , // atomically replace the collection + MERGE , // merge keys, override dups + REDUCE , // merge keys, reduce dups + INMEMORY // only store in memory, limited in size + } outType; + + static AtomicUInt JOB_NUMBER; + }; // end MRsetup + + /** + * stores information about intermediate map reduce state + * controls flow of data from map->reduce->finalize->output + */ + class State { + public: + State( const Config& c ); + ~State(); + + void init(); + + // ---- prep ----- + bool sourceExists(); + + long long incomingDocuments(); + + // ---- map stage ---- + + /** + * stages on in in-memory storage + */ + void emit( const BSONObj& a ); + + /** + * if size is big, run a reduce + * if its still big, dump to temp collection + */ + void checkSize(); + + /** + * run reduce on _temp + */ + void reduceInMemory(); + + /** + * transfers in memory storage to temp collection + */ + void dumpToInc(); + + // ------ reduce stage ----------- + + void prepTempCollection(); + + void finalReduce( BSONList& values ); + + void finalReduce( CurOp * op , ProgressMeterHolder& pm ); + + // ------- cleanup/data positioning ---------- + + /** + @return number objects in collection + */ + long long postProcessCollection(); + + /** + * if INMEMORY will append + * may also append stats or anything else it likes + */ + void appendResults( BSONObjBuilder& b ); + + // -------- util ------------ + + /** + * inserts with correct replication semantics + */ + void insert( const string& ns , BSONObj& o ); + + // ------ simple accessors ----- + + /** State maintains ownership, do no use past State lifetime */ + Scope* scope() { return _scope.get(); } + + const Config& config() { return _config; } + + const bool isOnDisk() { return _onDisk; } + + long long numEmits() const { return _numEmits; } + + protected: + + void _insertToInc( BSONObj& o ); + static void _add( InMemory* im , const BSONObj& a , long& size, long& dupCount ); + + scoped_ptr<Scope> _scope; + const Config& _config; + bool _onDisk; // if the end result of this map reduce is disk or not + + DBDirectClient _db; + + scoped_ptr<InMemory> _temp; + long _size; // bytes in _temp + long _dupCount; // number of duplicate key entries + + long long _numEmits; + }; + + BSONObj fast_emit( const BSONObj& args ); + + } // end mr namespace +} + + |