diff options
Diffstat (limited to 's/commands_public.cpp')
-rw-r--r-- | s/commands_public.cpp | 559 |
1 files changed, 463 insertions, 96 deletions
diff --git a/s/commands_public.cpp b/s/commands_public.cpp index f29205b..ef7110c 100644 --- a/s/commands_public.cpp +++ b/s/commands_public.cpp @@ -18,20 +18,28 @@ */ #include "pch.h" -#include "../util/message.h" +#include "../util/net/message.h" #include "../db/dbmessage.h" #include "../client/connpool.h" #include "../client/parallel.h" #include "../db/commands.h" -#include "../db/query.h" +#include "../db/queryutil.h" +#include "../scripting/engine.h" #include "config.h" #include "chunk.h" #include "strategy.h" #include "grid.h" +#include "mr_shard.h" +#include "client.h" namespace mongo { + bool setParmsMongodSpecific(const string& dbname, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl ) + { + return true; + } + namespace dbgrid_pub_cmds { class PublicGridCommand : public Command { @@ -45,22 +53,38 @@ namespace mongo { return false; } + // Override if passthrough should also send query options + // Safer as off by default, can slowly enable as we add more tests + virtual bool passOptions() const { return false; } + // all grid commands are designed not to lock virtual LockType locktype() const { return NONE; } protected: + bool passthrough( DBConfigPtr conf, const BSONObj& cmdObj , BSONObjBuilder& result ) { - return _passthrough(conf->getName(), conf, cmdObj, result); + return _passthrough(conf->getName(), conf, cmdObj, 0, result); } bool adminPassthrough( DBConfigPtr conf, const BSONObj& cmdObj , BSONObjBuilder& result ) { - return _passthrough("admin", conf, cmdObj, result); + return _passthrough("admin", conf, cmdObj, 0, result); + } + + bool passthrough( DBConfigPtr conf, const BSONObj& cmdObj , int options, BSONObjBuilder& result ) { + return _passthrough(conf->getName(), conf, cmdObj, options, result); + } + bool adminPassthrough( DBConfigPtr conf, const BSONObj& cmdObj , int options, BSONObjBuilder& result ) { + return _passthrough("admin", conf, cmdObj, options, result); } private: - bool _passthrough(const string& db, DBConfigPtr conf, const BSONObj& cmdObj , BSONObjBuilder& result ) { + bool _passthrough(const string& db, DBConfigPtr conf, const BSONObj& cmdObj , int options , BSONObjBuilder& result ) { ShardConnection conn( conf->getPrimary() , "" ); BSONObj res; - bool ok = conn->runCommand( db , cmdObj , res ); + bool ok = conn->runCommand( db , cmdObj , res , passOptions() ? options : 0 ); + if ( ! ok && res["code"].numberInt() == StaleConfigInContextCode ) { + conn.done(); + throw StaleConfigException("foo","command failed because of stale config"); + } result.appendElements( res ); conn.done(); return ok; @@ -87,13 +111,14 @@ namespace mongo { virtual void aggregateResults(const vector<BSONObj>& results, BSONObjBuilder& output) {} // don't override - virtual bool run(const string& dbName , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& output, bool) { + virtual bool run(const string& dbName , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& output, bool) { + LOG(1) << "RunOnAllShardsCommand db: " << dbName << " cmd:" << cmdObj << endl; set<Shard> shards; getShards(dbName, cmdObj, shards); list< shared_ptr<Future::CommandResult> > futures; for ( set<Shard>::const_iterator i=shards.begin(), end=shards.end() ; i != end ; i++ ) { - futures.push_back( Future::spawnCommand( i->getConnString() , dbName , cmdObj ) ); + futures.push_back( Future::spawnCommand( i->getConnString() , dbName , cmdObj, 0 ) ); } vector<BSONObj> results; @@ -147,13 +172,13 @@ namespace mongo { virtual string getFullNS( const string& dbName , const BSONObj& cmdObj ) = 0; - virtual bool run(const string& dbName , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool) { + virtual bool run(const string& dbName , BSONObj& cmdObj, int options, string& errmsg, BSONObjBuilder& result, bool) { string fullns = getFullNS( dbName , cmdObj ); DBConfigPtr conf = grid.getDBConfig( dbName , false ); if ( ! conf || ! conf->isShardingEnabled() || ! conf->isSharded( fullns ) ) { - return passthrough( conf , cmdObj , result ); + return passthrough( conf , cmdObj , options, result ); } errmsg = "can't do command: " + name + " on sharded collection"; return false; @@ -172,9 +197,41 @@ namespace mongo { ReIndexCmd() : AllShardsCollectionCommand("reIndex") {} } reIndexCmd; + class ProfileCmd : public PublicGridCommand { + public: + ProfileCmd() : PublicGridCommand("profile") {} + virtual bool run(const string& dbName , BSONObj& cmdObj, int options, string& errmsg, BSONObjBuilder& result, bool) { + errmsg = "profile currently not supported via mongos"; + return false; + } + } profileCmd; + + class ValidateCmd : public AllShardsCollectionCommand { public: ValidateCmd() : AllShardsCollectionCommand("validate") {} + virtual void aggregateResults(const vector<BSONObj>& results, BSONObjBuilder& output) { + for (vector<BSONObj>::const_iterator it(results.begin()), end(results.end()); it!=end; it++){ + const BSONObj& result = *it; + const BSONElement valid = result["valid"]; + if (!valid.eoo()){ + if (!valid.trueValue()) { + output.appendBool("valid", false); + return; + } + } + else { + // Support pre-1.9.0 output with everything in a big string + const char* s = result["result"].valuestrsafe(); + if (strstr(s, "exception") || strstr(s, "corrupt")){ + output.appendBool("valid", false); + return; + } + } + } + + output.appendBool("valid", true); + } } validateCmd; class RepairDatabaseCmd : public RunOnAllShardsCommand { @@ -221,7 +278,7 @@ namespace mongo { class DropCmd : public PublicGridCommand { public: DropCmd() : PublicGridCommand( "drop" ) {} - bool run(const string& dbName , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool) { + bool run(const string& dbName , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool) { string collection = cmdObj.firstElement().valuestrsafe(); string fullns = dbName + "." + collection; @@ -246,7 +303,7 @@ namespace mongo { class DropDBCmd : public PublicGridCommand { public: DropDBCmd() : PublicGridCommand( "dropDatabase" ) {} - bool run(const string& dbName , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool) { + bool run(const string& dbName , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool) { BSONElement e = cmdObj.firstElement(); @@ -275,7 +332,7 @@ namespace mongo { class RenameCollectionCmd : public PublicGridCommand { public: RenameCollectionCmd() : PublicGridCommand( "renameCollection" ) {} - bool run(const string& dbName, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool) { + bool run(const string& dbName, BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool) { string fullnsFrom = cmdObj.firstElement().valuestrsafe(); string dbNameFrom = nsToDatabase( fullnsFrom.c_str() ); DBConfigPtr confFrom = grid.getDBConfig( dbNameFrom , false ); @@ -300,7 +357,7 @@ namespace mongo { class CopyDBCmd : public PublicGridCommand { public: CopyDBCmd() : PublicGridCommand( "copydb" ) {} - bool run(const string& dbName, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool) { + bool run(const string& dbName, BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool) { string todb = cmdObj.getStringField("todb"); uassert(13402, "need a todb argument", !todb.empty()); @@ -336,7 +393,8 @@ namespace mongo { class CountCmd : public PublicGridCommand { public: CountCmd() : PublicGridCommand("count") { } - bool run(const string& dbName, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool l) { + virtual bool passOptions() const { return true; } + bool run(const string& dbName, BSONObj& cmdObj, int options, string& errmsg, BSONObjBuilder& result, bool) { string collection = cmdObj.firstElement().valuestrsafe(); string fullns = dbName + "." + collection; @@ -345,12 +403,11 @@ namespace mongo { filter = cmdObj["query"].Obj(); DBConfigPtr conf = grid.getDBConfig( dbName , false ); - if ( ! conf || ! conf->isShardingEnabled() || ! conf->isSharded( fullns ) ) { ShardConnection conn( conf->getPrimary() , fullns ); BSONObj temp; - bool ok = conn->runCommand( dbName , cmdObj , temp ); + bool ok = conn->runCommand( dbName , cmdObj , temp, options ); conn.done(); if ( ok ) { @@ -365,7 +422,7 @@ namespace mongo { } // this collection got sharded - ChunkManagerPtr cm = conf->getChunkManager( fullns , true ); + ChunkManagerPtr cm = conf->getChunkManagerIfExists( fullns , true ); if ( ! cm ) { errmsg = "should be sharded now"; result.append( "root" , temp ); @@ -376,11 +433,11 @@ namespace mongo { long long total = 0; map<string,long long> shardCounts; - ChunkManagerPtr cm = conf->getChunkManager( fullns ); + ChunkManagerPtr cm = conf->getChunkManagerIfExists( fullns ); while ( true ) { if ( ! cm ) { // probably unsharded now - return run( dbName , cmdObj , errmsg , result , l ); + return run( dbName , cmdObj , options , errmsg , result, false ); } set<Shard> shards; @@ -394,14 +451,14 @@ namespace mongo { if ( conn.setVersion() ) { total = 0; shardCounts.clear(); - cm = conf->getChunkManager( fullns ); + cm = conf->getChunkManagerIfExists( fullns ); conn.done(); hadToBreak = true; break; } BSONObj temp; - bool ok = conn->runCommand( dbName , BSON( "count" << collection << "query" << filter ) , temp ); + bool ok = conn->runCommand( dbName , BSON( "count" << collection << "query" << filter ) , temp, options ); conn.done(); if ( ok ) { @@ -415,7 +472,7 @@ namespace mongo { // my version is old total = 0; shardCounts.clear(); - cm = conf->getChunkManager( fullns , true ); + cm = conf->getChunkManagerIfExists( fullns , true ); hadToBreak = true; break; } @@ -442,14 +499,13 @@ namespace mongo { class CollectionStats : public PublicGridCommand { public: CollectionStats() : PublicGridCommand("collStats", "collstats") { } - bool run(const string& dbName , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool) { + bool run(const string& dbName , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool) { string collection = cmdObj.firstElement().valuestrsafe(); string fullns = dbName + "." + collection; DBConfigPtr conf = grid.getDBConfig( dbName , false ); if ( ! conf || ! conf->isShardingEnabled() || ! conf->isSharded( fullns ) ) { - result.append( "ns" , fullns ); result.appendBool("sharded", false); result.append( "primary" , conf->getPrimary().getName() ); return passthrough( conf , cmdObj , result); @@ -463,9 +519,13 @@ namespace mongo { cm->getAllShards(servers); BSONObjBuilder shardStats; + map<string,long long> counts; + map<string,long long> indexSizes; + /* long long count=0; long long size=0; long long storageSize=0; + */ int nindexes=0; bool warnedAboutIndexes = false; for ( set<Shard>::iterator i=servers.begin(); i!=servers.end(); i++ ) { @@ -476,39 +536,82 @@ namespace mongo { return false; } conn.done(); - - count += res["count"].numberLong(); - size += res["size"].numberLong(); - storageSize += res["storageSize"].numberLong(); - - int myIndexes = res["nindexes"].numberInt(); - - if ( nindexes == 0 ) { - nindexes = myIndexes; - } - else if ( nindexes == myIndexes ) { - // no-op - } - else { - // hopefully this means we're building an index - - if ( myIndexes > nindexes ) - nindexes = myIndexes; - - if ( ! warnedAboutIndexes ) { - result.append( "warning" , "indexes don't all match - ok if ensureIndex is running" ); - warnedAboutIndexes = true; + + BSONObjIterator j( res ); + while ( j.more() ) { + BSONElement e = j.next(); + + if ( str::equals( e.fieldName() , "ns" ) || + str::equals( e.fieldName() , "ok" ) || + str::equals( e.fieldName() , "avgObjSize" ) || + str::equals( e.fieldName() , "lastExtentSize" ) || + str::equals( e.fieldName() , "paddingFactor" ) ) { + continue; + } + else if ( str::equals( e.fieldName() , "count" ) || + str::equals( e.fieldName() , "size" ) || + str::equals( e.fieldName() , "storageSize" ) || + str::equals( e.fieldName() , "numExtents" ) || + str::equals( e.fieldName() , "totalIndexSize" ) ) { + counts[e.fieldName()] += e.numberLong(); + } + else if ( str::equals( e.fieldName() , "indexSizes" ) ) { + BSONObjIterator k( e.Obj() ); + while ( k.more() ) { + BSONElement temp = k.next(); + indexSizes[temp.fieldName()] += temp.numberLong(); + } + } + else if ( str::equals( e.fieldName() , "flags" ) ) { + if ( ! result.hasField( e.fieldName() ) ) + result.append( e ); } + else if ( str::equals( e.fieldName() , "nindexes" ) ) { + int myIndexes = e.numberInt(); + + if ( nindexes == 0 ) { + nindexes = myIndexes; + } + else if ( nindexes == myIndexes ) { + // no-op + } + else { + // hopefully this means we're building an index + + if ( myIndexes > nindexes ) + nindexes = myIndexes; + + if ( ! warnedAboutIndexes ) { + result.append( "warning" , "indexes don't all match - ok if ensureIndex is running" ); + warnedAboutIndexes = true; + } + } + } + else { + warning() << "mongos collstats doesn't know about: " << e.fieldName() << endl; + } + } - shardStats.append(i->getName(), res); } result.append("ns", fullns); - result.appendNumber("count", count); - result.appendNumber("size", size); - result.append ("avgObjSize", double(size) / double(count)); - result.appendNumber("storageSize", storageSize); + + for ( map<string,long long>::iterator i=counts.begin(); i!=counts.end(); ++i ) + result.appendNumber( i->first , i->second ); + + { + BSONObjBuilder ib( result.subobjStart( "indexSizes" ) ); + for ( map<string,long long>::iterator i=indexSizes.begin(); i!=indexSizes.end(); ++i ) + ib.appendNumber( i->first , i->second ); + ib.done(); + } + + if ( counts["count"] > 0 ) + result.append("avgObjSize", (double)counts["size"] / (double)counts["count"] ); + else + result.append( "avgObjSize", 0.0 ); + result.append("nindexes", nindexes); result.append("nchunks", cm->numChunks()); @@ -521,7 +624,7 @@ namespace mongo { class FindAndModifyCmd : public PublicGridCommand { public: FindAndModifyCmd() : PublicGridCommand("findAndModify", "findandmodify") { } - bool run(const string& dbName, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool) { + bool run(const string& dbName, BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool) { string collection = cmdObj.firstElement().valuestrsafe(); string fullns = dbName + "." + collection; @@ -532,7 +635,7 @@ namespace mongo { } ChunkManagerPtr cm = conf->getChunkManager( fullns ); - massert( 13002 , "how could chunk manager be null!" , cm ); + massert( 13002 , "shard internal error chunk manager should never be null" , cm ); BSONObj filter = cmdObj.getObjectField("query"); uassert(13343, "query for sharded findAndModify must have shardkey", cm->hasShardKey(filter)); @@ -558,7 +661,7 @@ namespace mongo { class DataSizeCmd : public PublicGridCommand { public: DataSizeCmd() : PublicGridCommand("dataSize", "datasize") { } - bool run(const string& dbName, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool) { + bool run(const string& dbName, BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool) { string fullns = cmdObj.firstElement().String(); DBConfigPtr conf = grid.getDBConfig( dbName , false ); @@ -622,7 +725,7 @@ namespace mongo { class GroupCmd : public NotAllowedOnShardedCollectionCmd { public: GroupCmd() : NotAllowedOnShardedCollectionCmd("group") {} - + virtual bool passOptions() const { return true; } virtual string getFullNS( const string& dbName , const BSONObj& cmdObj ) { return dbName + "." + cmdObj.firstElement().embeddedObjectUserCheck()["ns"].valuestrsafe(); } @@ -635,14 +738,15 @@ namespace mongo { virtual void help( stringstream &help ) const { help << "{ distinct : 'collection name' , key : 'a.b' , query : {} }"; } - bool run(const string& dbName , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool) { + virtual bool passOptions() const { return true; } + bool run(const string& dbName , BSONObj& cmdObj, int options, string& errmsg, BSONObjBuilder& result, bool) { string collection = cmdObj.firstElement().valuestrsafe(); string fullns = dbName + "." + collection; DBConfigPtr conf = grid.getDBConfig( dbName , false ); if ( ! conf || ! conf->isShardingEnabled() || ! conf->isSharded( fullns ) ) { - return passthrough( conf , cmdObj , result ); + return passthrough( conf , cmdObj , options, result ); } ChunkManagerPtr cm = conf->getChunkManager( fullns ); @@ -658,7 +762,7 @@ namespace mongo { for ( set<Shard>::iterator i=shards.begin(), end=shards.end() ; i != end; ++i ) { ShardConnection conn( *i , fullns ); BSONObj res; - bool ok = conn->runCommand( conf->getName() , cmdObj , res ); + bool ok = conn->runCommand( conf->getName() , cmdObj , res, options ); conn.done(); if ( ! ok ) { @@ -693,7 +797,7 @@ namespace mongo { virtual void help( stringstream &help ) const { help << " example: { filemd5 : ObjectId(aaaaaaa) , root : \"fs\" }"; } - bool run(const string& dbName , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool) { + bool run(const string& dbName , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool) { string fullns = dbName; fullns += "."; { @@ -730,15 +834,15 @@ namespace mongo { public: Geo2dFindNearCmd() : PublicGridCommand( "geoNear" ) {} void help(stringstream& h) const { h << "http://www.mongodb.org/display/DOCS/Geospatial+Indexing#GeospatialIndexing-geoNearCommand"; } - - bool run(const string& dbName , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool) { + virtual bool passOptions() const { return true; } + bool run(const string& dbName , BSONObj& cmdObj, int options, string& errmsg, BSONObjBuilder& result, bool) { string collection = cmdObj.firstElement().valuestrsafe(); string fullns = dbName + "." + collection; DBConfigPtr conf = grid.getDBConfig( dbName , false ); if ( ! conf || ! conf->isShardingEnabled() || ! conf->isSharded( fullns ) ) { - return passthrough( conf , cmdObj , result ); + return passthrough( conf , cmdObj , options, result ); } ChunkManagerPtr cm = conf->getChunkManager( fullns ); @@ -755,7 +859,7 @@ namespace mongo { list< shared_ptr<Future::CommandResult> > futures; BSONArrayBuilder shardArray; for ( set<Shard>::const_iterator i=shards.begin(), end=shards.end() ; i != end ; i++ ) { - futures.push_back( Future::spawnCommand( i->getConnString() , dbName , cmdObj ) ); + futures.push_back( Future::spawnCommand( i->getConnString() , dbName , cmdObj, options ) ); shardArray.append(i->getName()); } @@ -820,12 +924,13 @@ namespace mongo { class MRCmd : public PublicGridCommand { public: + AtomicUInt JOB_NUMBER; + MRCmd() : PublicGridCommand( "mapreduce" ) {} string getTmpName( const string& coll ) { - static int inc = 1; stringstream ss; - ss << "tmp.mrs." << coll << "_" << time(0) << "_" << inc++; + ss << "tmp.mrs." << coll << "_" << time(0) << "_" << JOB_NUMBER++; return ss.str(); } @@ -851,8 +956,8 @@ namespace mongo { if (fn == "out" && e.type() == Object) { // check if there is a custom output BSONObj out = e.embeddedObject(); - if (out.hasField("db")) - customOut = out; +// if (out.hasField("db")) + customOut = out; } } else { @@ -864,7 +969,7 @@ namespace mongo { return b.obj(); } - bool run(const string& dbName , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool) { + bool run(const string& dbName , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool) { Timer t; string collection = cmdObj.firstElement().valuestrsafe(); @@ -876,7 +981,7 @@ namespace mongo { BSONObj customOut; BSONObj shardedCommand = fixForShards( cmdObj , shardedOutputCollection, customOut , badShardedField ); - bool customOutDB = ! customOut.isEmpty() && customOut.hasField( "db" ); + bool customOutDB = customOut.hasField( "db" ); DBConfigPtr conf = grid.getDBConfig( dbName , false ); @@ -911,26 +1016,32 @@ namespace mongo { finalCmd.append( "shardedOutputCollection" , shardedOutputCollection ); + set<ServerAndQuery> servers; + BSONObj shardCounts; + BSONObj aggCounts; + map<string,long long> countsMap; { // we need to use our connections to the shard // so filtering is done correctly for un-owned docs // so we allocate them in our thread // and hand off - + // Note: why not use pooled connections? This has been reported to create too many connections vector< shared_ptr<ShardConnection> > shardConns; - list< shared_ptr<Future::CommandResult> > futures; for ( set<Shard>::iterator i=shards.begin(), end=shards.end() ; i != end ; i++ ) { shared_ptr<ShardConnection> temp( new ShardConnection( i->getConnString() , fullns ) ); assert( temp->get() ); - futures.push_back( Future::spawnCommand( i->getConnString() , dbName , shardedCommand , temp->get() ) ); + futures.push_back( Future::spawnCommand( i->getConnString() , dbName , shardedCommand , 0 , temp->get() ) ); shardConns.push_back( temp ); } bool failed = false; - - BSONObjBuilder shardresults; + + // now wait for the result of all shards + BSONObjBuilder shardResultsB; + BSONObjBuilder shardCountsB; + BSONObjBuilder aggCountsB; for ( list< shared_ptr<Future::CommandResult> >::iterator i=futures.begin(); i!=futures.end(); i++ ) { shared_ptr<Future::CommandResult> res = *i; if ( ! res->join() ) { @@ -941,7 +1052,19 @@ namespace mongo { failed = true; continue; } - shardresults.append( res->getServer() , res->result() ); + BSONObj result = res->result(); + shardResultsB.append( res->getServer() , result ); + BSONObj counts = result["counts"].embeddedObjectUserCheck(); + shardCountsB.append( res->getServer() , counts ); + servers.insert(res->getServer()); + + // add up the counts for each shard + // some of them will be fixed later like output and reduce + BSONObjIterator j( counts ); + while ( j.more() ) { + BSONElement temp = j.next(); + countsMap[temp.fieldName()] += temp.numberLong(); + } } for ( unsigned i=0; i<shardConns.size(); i++ ) @@ -950,28 +1073,205 @@ namespace mongo { if ( failed ) return 0; - finalCmd.append( "shards" , shardresults.obj() ); + finalCmd.append( "shards" , shardResultsB.obj() ); + shardCounts = shardCountsB.obj(); + finalCmd.append( "shardCounts" , shardCounts ); timingBuilder.append( "shards" , t.millis() ); + + for ( map<string,long long>::iterator i=countsMap.begin(); i!=countsMap.end(); i++ ) { + aggCountsB.append( i->first , i->second ); + } + aggCounts = aggCountsB.obj(); + finalCmd.append( "counts" , aggCounts ); } Timer t2; - // by default the target database is same as input - Shard outServer = conf->getPrimary(); - string outns = fullns; - if ( customOutDB ) { - // have to figure out shard for the output DB - BSONElement elmt = customOut.getField("db"); - string outdb = elmt.valuestrsafe(); - outns = outdb + "." + collection; - DBConfigPtr conf2 = grid.getDBConfig( outdb , true ); - outServer = conf2->getPrimary(); - } - log() << "customOut: " << customOut << " outServer: " << outServer << endl; - - ShardConnection conn( outServer , outns ); BSONObj finalResult; - bool ok = conn->runCommand( dbName , finalCmd.obj() , finalResult ); - conn.done(); + bool ok = false; + string outdb = dbName; + if (customOutDB) { + BSONElement elmt = customOut.getField("db"); + outdb = elmt.valuestrsafe(); + } + + if (!customOut.getBoolField("sharded")) { + // non-sharded, use the MRFinish command on target server + // This will save some data transfer + + // by default the target database is same as input + Shard outServer = conf->getPrimary(); + string outns = fullns; + if ( customOutDB ) { + // have to figure out shard for the output DB + DBConfigPtr conf2 = grid.getDBConfig( outdb , true ); + outServer = conf2->getPrimary(); + outns = outdb + "." + collection; + } + log() << "customOut: " << customOut << " outServer: " << outServer << endl; + + ShardConnection conn( outServer , outns ); + ok = conn->runCommand( dbName , finalCmd.obj() , finalResult ); + conn.done(); + } else { + // grab records from each shard and insert back in correct shard in "temp" collection + // we do the final reduce in mongos since records are ordered and already reduced on each shard +// string shardedIncLong = str::stream() << outdb << ".tmp.mr." << collection << "_" << "shardedTemp" << "_" << time(0) << "_" << JOB_NUMBER++; + + mr_shard::Config config( dbName , cmdObj ); + mr_shard::State state(config); + LOG(1) << "mr sharded output ns: " << config.ns << endl; + + if (config.outType == mr_shard::Config::INMEMORY) { + errmsg = "This Map Reduce mode is not supported with sharded output"; + return false; + } + + if (!config.outDB.empty()) { + BSONObjBuilder loc; + if ( !config.outDB.empty()) + loc.append( "db" , config.outDB ); + loc.append( "collection" , config.finalShort ); + result.append("result", loc.obj()); + } + else { + if ( !config.finalShort.empty() ) + result.append( "result" , config.finalShort ); + } + + string outns = config.finalLong; + string tempns; + + // result will be inserted into a temp collection to post process + const string postProcessCollection = getTmpName( collection ); + finalCmd.append("postProcessCollection", postProcessCollection); + tempns = dbName + "." + postProcessCollection; + +// if (config.outType == mr_shard::Config::REPLACE) { +// // drop previous collection +// BSONObj dropColCmd = BSON("drop" << config.finalShort); +// BSONObjBuilder dropColResult(32); +// string outdbCmd = outdb + ".$cmd"; +// bool res = Command::runAgainstRegistered(outdbCmd.c_str(), dropColCmd, dropColResult); +// if (!res) { +// errmsg = str::stream() << "Could not drop sharded output collection " << outns << ": " << dropColResult.obj().toString(); +// return false; +// } +// } + + BSONObj sortKey = BSON( "_id" << 1 ); + if (!conf->isSharded(outns)) { + // create the sharded collection + + BSONObj shardColCmd = BSON("shardCollection" << outns << "key" << sortKey); + BSONObjBuilder shardColResult(32); + bool res = Command::runAgainstRegistered("admin.$cmd", shardColCmd, shardColResult); + if (!res) { + errmsg = str::stream() << "Could not create sharded output collection " << outns << ": " << shardColResult.obj().toString(); + return false; + } + } + + ParallelSortClusteredCursor cursor( servers , dbName + "." + shardedOutputCollection , + Query().sort( sortKey ) ); + cursor.init(); + state.init(); + + mr_shard::BSONList values; + Strategy* s = SHARDED; + long long finalCount = 0; + int currentSize = 0; + while ( cursor.more() || !values.empty() ) { + BSONObj t; + if ( cursor.more() ) { + t = cursor.next().getOwned(); + + if ( values.size() == 0 || t.woSortOrder( *(values.begin()) , sortKey ) == 0 ) { + values.push_back( t ); + currentSize += t.objsize(); + + // check size and potentially reduce + if (currentSize > config.maxInMemSize && values.size() > config.reduceTriggerRatio) { + BSONObj reduced = config.reducer->finalReduce(values, 0); + values.clear(); + values.push_back( reduced ); + currentSize = reduced.objsize(); + } + continue; + } + } + + BSONObj final = config.reducer->finalReduce(values, config.finalizer.get()); + if (config.outType == mr_shard::Config::MERGE) { + BSONObj id = final["_id"].wrap(); + s->updateSharded(conf, outns.c_str(), id, final, UpdateOption_Upsert, true); + } else { + // insert into temp collection, but using final collection's shard chunks + s->insertSharded(conf, tempns.c_str(), final, 0, true, outns.c_str()); + } + ++finalCount; + values.clear(); + if (!t.isEmpty()) { + values.push_back( t ); + currentSize = t.objsize(); + } + } + + if (config.outType == mr_shard::Config::REDUCE || config.outType == mr_shard::Config::REPLACE) { + // results were written to temp collection, need post processing + vector< shared_ptr<ShardConnection> > shardConns; + list< shared_ptr<Future::CommandResult> > futures; + BSONObj finalCmdObj = finalCmd.obj(); + for ( set<Shard>::iterator i=shards.begin(), end=shards.end() ; i != end ; i++ ) { + shared_ptr<ShardConnection> temp( new ShardConnection( i->getConnString() , outns ) ); + futures.push_back( Future::spawnCommand( i->getConnString() , dbName , finalCmdObj , 0 , temp->get() ) ); + shardConns.push_back( temp ); + } + + // now wait for the result of all shards + bool failed = false; + for ( list< shared_ptr<Future::CommandResult> >::iterator i=futures.begin(); i!=futures.end(); i++ ) { + shared_ptr<Future::CommandResult> res = *i; + if ( ! res->join() ) { + error() << "final reduce on sharded output m/r failed on shard: " << res->getServer() << " error: " << res->result() << endl; + result.append( "cause" , res->result() ); + errmsg = "mongod mr failed: "; + errmsg += res->result().toString(); + failed = true; + continue; + } + BSONObj result = res->result(); + } + + for ( unsigned i=0; i<shardConns.size(); i++ ) + shardConns[i]->done(); + + if (failed) + return 0; + } + + for ( set<ServerAndQuery>::iterator i=servers.begin(); i!=servers.end(); i++ ) { + ScopedDbConnection conn( i->_server ); + conn->dropCollection( dbName + "." + shardedOutputCollection ); + conn.done(); + } + + result.append("shardCounts", shardCounts); + + // fix the global counts + BSONObjBuilder countsB(32); + BSONObjIterator j(aggCounts); + while (j.more()) { + BSONElement elmt = j.next(); + if (!strcmp(elmt.fieldName(), "reduce")) + countsB.append("reduce", elmt.numberLong() + state.numReduces()); + else if (!strcmp(elmt.fieldName(), "output")) + countsB.append("output", finalCount); + else + countsB.append(elmt); + } + result.append( "counts" , countsB.obj() ); + ok = true; + } if ( ! ok ) { errmsg = "final reduce failed: "; @@ -991,14 +1291,81 @@ namespace mongo { class ApplyOpsCmd : public PublicGridCommand { public: ApplyOpsCmd() : PublicGridCommand( "applyOps" ) {} - - virtual bool run(const string& dbName , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool) { + virtual bool run(const string& dbName , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool) { errmsg = "applyOps not allowed through mongos"; return false; } - } applyOpsCmd; + class CompactCmd : public PublicGridCommand { + public: + CompactCmd() : PublicGridCommand( "compact" ) {} + virtual bool run(const string& dbName , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool) { + errmsg = "compact not allowed through mongos"; + return false; + } + } compactCmd; + } + bool Command::runAgainstRegistered(const char *ns, BSONObj& jsobj, BSONObjBuilder& anObjBuilder, int queryOptions) { + const char *p = strchr(ns, '.'); + if ( !p ) return false; + if ( strcmp(p, ".$cmd") != 0 ) return false; + + bool ok = false; + + BSONElement e = jsobj.firstElement(); + map<string,Command*>::iterator i; + + if ( e.eoo() ) + ; + // check for properly registered command objects. + else if ( (i = _commands->find(e.fieldName())) != _commands->end() ) { + string errmsg; + Command *c = i->second; + ClientInfo *client = ClientInfo::get(); + AuthenticationInfo *ai = client->getAuthenticationInfo(); + + char cl[256]; + nsToDatabase(ns, cl); + if( c->requiresAuth() && !ai->isAuthorized(cl)) { + ok = false; + errmsg = "unauthorized"; + } + else if( c->adminOnly() && c->localHostOnlyIfNoAuth( jsobj ) && noauth && !ai->isLocalHost ) { + ok = false; + errmsg = "unauthorized: this command must run from localhost when running db without auth"; + log() << "command denied: " << jsobj.toString() << endl; + } + else if ( c->adminOnly() && !startsWith(ns, "admin.") ) { + ok = false; + errmsg = "access denied - use admin db"; + } + else if ( jsobj.getBoolField( "help" ) ) { + stringstream help; + help << "help for: " << e.fieldName() << " "; + c->help( help ); + anObjBuilder.append( "help" , help.str() ); + } + else { + ok = c->run( nsToDatabase( ns ) , jsobj, queryOptions, errmsg, anObjBuilder, false ); + } + + BSONObj tmp = anObjBuilder.asTempObj(); + bool have_ok = tmp.hasField("ok"); + bool have_errmsg = tmp.hasField("errmsg"); + + if (!have_ok) + anObjBuilder.append( "ok" , ok ? 1.0 : 0.0 ); + + if ( !ok && !have_errmsg) { + anObjBuilder.append("errmsg", errmsg); + uassert_nothrow(errmsg.c_str()); + } + return true; + } + + return false; + } } |