// mr.h /** * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ #pragma once #include "pch.h" namespace mongo { namespace mr { typedef vector BSONList; class State; // ------------ function interfaces ----------- class Mapper : boost::noncopyable { public: virtual ~Mapper() {} virtual void init( State * state ) = 0; virtual void map( const BSONObj& o ) = 0; }; class Finalizer : boost::noncopyable { public: virtual ~Finalizer() {} virtual void init( State * state ) = 0; /** * this takes a tuple and returns a tuple */ virtual BSONObj finalize( const BSONObj& tuple ) = 0; }; class Reducer : boost::noncopyable { public: Reducer() : numReduces(0) {} virtual ~Reducer() {} virtual void init( State * state ) = 0; virtual BSONObj reduce( const BSONList& tuples ) = 0; /** this means its a final reduce, even if there is no finalizer */ virtual BSONObj finalReduce( const BSONList& tuples , Finalizer * finalizer ) = 0; long long numReduces; }; // ------------ js function implementations ----------- /** * used as a holder for Scope and ScriptingFunction * visitor like pattern as Scope is gotten from first access */ class JSFunction : boost::noncopyable { public: /** * @param type (map|reduce|finalize) */ JSFunction( string type , const BSONElement& e ); virtual ~JSFunction() {} virtual void init( State * state ); Scope * scope() const { return _scope; } ScriptingFunction func() const { return _func; } private: string _type; string _code; // actual javascript code BSONObj _wantedScope; // this is for CodeWScope Scope * _scope; // this is not owned by us, and might be shared ScriptingFunction _func; }; class JSMapper : public Mapper { public: JSMapper( const BSONElement & code ) : _func( "_map" , code ) {} virtual void map( const BSONObj& o ); virtual void init( State * state ); private: JSFunction _func; BSONObj _params; }; class JSReducer : public Reducer { public: JSReducer( const BSONElement& code ) : _func( "_reduce" , code ) {} virtual void init( State * state ); virtual BSONObj reduce( const BSONList& tuples ); virtual BSONObj finalReduce( const BSONList& tuples , Finalizer * finalizer ); private: /** * result in "return" * @param key OUT * @param endSizeEstimate OUT */ void _reduce( const BSONList& values , BSONObj& key , int& endSizeEstimate ); JSFunction _func; }; class JSFinalizer : public Finalizer { public: JSFinalizer( const BSONElement& code ) : _func( "_finalize" , code ) {} virtual BSONObj finalize( const BSONObj& o ); virtual void init( State * state ) { _func.init( state ); } private: JSFunction _func; }; // ----------------- class TupleKeyCmp { public: TupleKeyCmp() {} bool operator()( const BSONObj &l, const BSONObj &r ) const { return l.firstElement().woCompare( r.firstElement() ) < 0; } }; typedef map< BSONObj,BSONList,TupleKeyCmp > InMemory; // from key to list of tuples /** * holds map/reduce config information */ class Config { public: Config( const string& _dbname , const BSONObj& cmdObj ); string dbname; string ns; // options bool verbose; bool jsMode; // query options BSONObj filter; BSONObj sort; long long limit; // functions scoped_ptr mapper; scoped_ptr reducer; scoped_ptr finalizer; BSONObj mapParams; BSONObj scopeSetup; // output tables string incLong; string tempLong; string finalShort; string finalLong; string outDB; // max number of keys allowed in JS map before switching mode long jsMaxKeys; // ratio of duplicates vs unique keys before reduce is triggered in js mode float reduceTriggerRatio; // maximum size of map before it gets dumped to disk long maxInMemSize; enum { REPLACE , // atomically replace the collection MERGE , // merge keys, override dups REDUCE , // merge keys, reduce dups INMEMORY // only store in memory, limited in size } outType; static AtomicUInt JOB_NUMBER; }; // end MRsetup /** * stores information about intermediate map reduce state * controls flow of data from map->reduce->finalize->output */ class State { public: State( const Config& c ); ~State(); void init(); // ---- prep ----- bool sourceExists(); long long incomingDocuments(); // ---- map stage ---- /** * stages on in in-memory storage */ void emit( const BSONObj& a ); /** * if size is big, run a reduce * if its still big, dump to temp collection */ void checkSize(); /** * run reduce on _temp */ void reduceInMemory(); /** * transfers in memory storage to temp collection */ void dumpToInc(); void insertToInc( BSONObj& o ); void _insertToInc( BSONObj& o ); // ------ reduce stage ----------- void prepTempCollection(); void finalReduce( BSONList& values ); void finalReduce( CurOp * op , ProgressMeterHolder& pm ); // ------- cleanup/data positioning ---------- /** @return number objects in collection */ long long postProcessCollection(); /** * if INMEMORY will append * may also append stats or anything else it likes */ void appendResults( BSONObjBuilder& b ); // -------- util ------------ /** * inserts with correct replication semantics */ void insert( const string& ns , const BSONObj& o ); // ------ simple accessors ----- /** State maintains ownership, do no use past State lifetime */ Scope* scope() { return _scope.get(); } const Config& config() { return _config; } const bool isOnDisk() { return _onDisk; } long long numEmits() const { if (_jsMode) return _scope->getNumberLongLong("_emitCt"); return _numEmits; } long long numReduces() const { if (_jsMode) return _scope->getNumberLongLong("_redCt"); return _config.reducer->numReduces; } bool jsMode() {return _jsMode;} void switchMode(bool jsMode); void bailFromJS(); const Config& _config; DBDirectClient _db; protected: void _add( InMemory* im , const BSONObj& a , long& size ); scoped_ptr _scope; bool _onDisk; // if the end result of this map reduce is disk or not scoped_ptr _temp; long _size; // bytes in _temp long _dupCount; // number of duplicate key entries long long _numEmits; bool _jsMode; ScriptingFunction _reduceAll; ScriptingFunction _reduceAndEmit; ScriptingFunction _reduceAndFinalize; ScriptingFunction _reduceAndFinalizeAndInsert; }; BSONObj fast_emit( const BSONObj& args, void* data ); BSONObj _bailFromJS( const BSONObj& args, void* data ); } // end mr namespace }