// s/commands_public.cpp /** * Copyright (C) 2008 10gen Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ #include "pch.h" #include "../util/net/message.h" #include "../db/dbmessage.h" #include "../client/connpool.h" #include "../client/parallel.h" #include "../db/commands.h" #include "../db/queryutil.h" #include "../scripting/engine.h" #include "config.h" #include "chunk.h" #include "strategy.h" #include "grid.h" #include "mr_shard.h" #include "client.h" namespace mongo { bool setParmsMongodSpecific(const string& dbname, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl ) { return true; } namespace dbgrid_pub_cmds { class PublicGridCommand : public Command { public: PublicGridCommand( const char* n, const char* oldname=NULL ) : Command( n, false, oldname ) { } virtual bool slaveOk() const { return true; } virtual bool adminOnly() const { return false; } // Override if passthrough should also send query options // Safer as off by default, can slowly enable as we add more tests virtual bool passOptions() const { return false; } // all grid commands are designed not to lock virtual LockType locktype() const { return NONE; } protected: bool passthrough( DBConfigPtr conf, const BSONObj& cmdObj , BSONObjBuilder& result ) { return _passthrough(conf->getName(), conf, cmdObj, 0, result); } bool adminPassthrough( DBConfigPtr conf, const BSONObj& cmdObj , BSONObjBuilder& result ) { return _passthrough("admin", conf, cmdObj, 0, result); } bool passthrough( DBConfigPtr conf, const BSONObj& cmdObj , int options, BSONObjBuilder& result ) { return _passthrough(conf->getName(), conf, cmdObj, options, result); } bool adminPassthrough( DBConfigPtr conf, const BSONObj& cmdObj , int options, BSONObjBuilder& result ) { return _passthrough("admin", conf, cmdObj, options, result); } private: bool _passthrough(const string& db, DBConfigPtr conf, const BSONObj& cmdObj , int options , BSONObjBuilder& result ) { ShardConnection conn( conf->getPrimary() , "" ); BSONObj res; bool ok = conn->runCommand( db , cmdObj , res , passOptions() ? options : 0 ); if ( ! ok && res["code"].numberInt() == StaleConfigInContextCode ) { conn.done(); throw StaleConfigException( res["ns"].toString(), "command failed because of stale config" ); } result.appendElements( res ); conn.done(); return ok; } }; class RunOnAllShardsCommand : public Command { public: RunOnAllShardsCommand(const char* n, const char* oldname=NULL) : Command(n, false, oldname) {} virtual bool slaveOk() const { return true; } virtual bool adminOnly() const { return false; } // all grid commands are designed not to lock virtual LockType locktype() const { return NONE; } // default impl uses all shards for DB virtual void getShards(const string& dbName , BSONObj& cmdObj, set& shards) { DBConfigPtr conf = grid.getDBConfig( dbName , false ); conf->getAllShards(shards); } virtual void aggregateResults(const vector& results, BSONObjBuilder& output) {} // don't override virtual bool run(const string& dbName , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& output, bool) { LOG(1) << "RunOnAllShardsCommand db: " << dbName << " cmd:" << cmdObj << endl; set shards; getShards(dbName, cmdObj, shards); list< shared_ptr > futures; for ( set::const_iterator i=shards.begin(), end=shards.end() ; i != end ; i++ ) { futures.push_back( Future::spawnCommand( i->getConnString() , dbName , cmdObj, 0 ) ); } vector results; BSONObjBuilder subobj (output.subobjStart("raw")); BSONObjBuilder errors; for ( list< shared_ptr >::iterator i=futures.begin(); i!=futures.end(); i++ ) { shared_ptr res = *i; if ( ! res->join() ) { errors.appendAs(res->result()["errmsg"], res->getServer()); } results.push_back( res->result() ); subobj.append( res->getServer() , res->result() ); } subobj.done(); BSONObj errobj = errors.done(); if (! errobj.isEmpty()) { errmsg = errobj.toString(false, true); return false; } aggregateResults(results, output); return true; } }; class AllShardsCollectionCommand : public RunOnAllShardsCommand { public: AllShardsCollectionCommand(const char* n, const char* oldname=NULL) : RunOnAllShardsCommand(n, oldname) {} virtual void getShards(const string& dbName , BSONObj& cmdObj, set& shards) { string fullns = dbName + '.' + cmdObj.firstElement().valuestrsafe(); DBConfigPtr conf = grid.getDBConfig( dbName , false ); if ( ! conf || ! conf->isShardingEnabled() || ! conf->isSharded( fullns ) ) { shards.insert(conf->getShard(fullns)); } else { conf->getChunkManager(fullns)->getAllShards(shards); } } }; class NotAllowedOnShardedCollectionCmd : public PublicGridCommand { public: NotAllowedOnShardedCollectionCmd( const char * n ) : PublicGridCommand( n ) {} virtual string getFullNS( const string& dbName , const BSONObj& cmdObj ) = 0; virtual bool run(const string& dbName , BSONObj& cmdObj, int options, string& errmsg, BSONObjBuilder& result, bool) { string fullns = getFullNS( dbName , cmdObj ); DBConfigPtr conf = grid.getDBConfig( dbName , false ); if ( ! conf || ! conf->isShardingEnabled() || ! conf->isSharded( fullns ) ) { return passthrough( conf , cmdObj , options, result ); } errmsg = "can't do command: " + name + " on sharded collection"; return false; } }; // ---- class DropIndexesCmd : public AllShardsCollectionCommand { public: DropIndexesCmd() : AllShardsCollectionCommand("dropIndexes", "deleteIndexes") {} } dropIndexesCmd; class ReIndexCmd : public AllShardsCollectionCommand { public: ReIndexCmd() : AllShardsCollectionCommand("reIndex") {} } reIndexCmd; class ProfileCmd : public PublicGridCommand { public: ProfileCmd() : PublicGridCommand("profile") {} virtual bool run(const string& dbName , BSONObj& cmdObj, int options, string& errmsg, BSONObjBuilder& result, bool) { errmsg = "profile currently not supported via mongos"; return false; } } profileCmd; class ValidateCmd : public AllShardsCollectionCommand { public: ValidateCmd() : AllShardsCollectionCommand("validate") {} virtual void aggregateResults(const vector& results, BSONObjBuilder& output) { for (vector::const_iterator it(results.begin()), end(results.end()); it!=end; it++){ const BSONObj& result = *it; const BSONElement valid = result["valid"]; if (!valid.eoo()){ if (!valid.trueValue()) { output.appendBool("valid", false); return; } } else { // Support pre-1.9.0 output with everything in a big string const char* s = result["result"].valuestrsafe(); if (strstr(s, "exception") || strstr(s, "corrupt")){ output.appendBool("valid", false); return; } } } output.appendBool("valid", true); } } validateCmd; class RepairDatabaseCmd : public RunOnAllShardsCommand { public: RepairDatabaseCmd() : RunOnAllShardsCommand("repairDatabase") {} } repairDatabaseCmd; class DBStatsCmd : public RunOnAllShardsCommand { public: DBStatsCmd() : RunOnAllShardsCommand("dbStats", "dbstats") {} virtual void aggregateResults(const vector& results, BSONObjBuilder& output) { long long objects = 0; long long dataSize = 0; long long storageSize = 0; long long numExtents = 0; long long indexes = 0; long long indexSize = 0; long long fileSize = 0; for (vector::const_iterator it(results.begin()), end(results.end()); it != end; ++it) { const BSONObj& b = *it; objects += b["objects"].numberLong(); dataSize += b["dataSize"].numberLong(); storageSize += b["storageSize"].numberLong(); numExtents += b["numExtents"].numberLong(); indexes += b["indexes"].numberLong(); indexSize += b["indexSize"].numberLong(); fileSize += b["fileSize"].numberLong(); } //result.appendNumber( "collections" , ncollections ); //TODO: need to find a good way to get this output.appendNumber( "objects" , objects ); output.append ( "avgObjSize" , double(dataSize) / double(objects) ); output.appendNumber( "dataSize" , dataSize ); output.appendNumber( "storageSize" , storageSize); output.appendNumber( "numExtents" , numExtents ); output.appendNumber( "indexes" , indexes ); output.appendNumber( "indexSize" , indexSize ); output.appendNumber( "fileSize" , fileSize ); } } DBStatsCmdObj; class DropCmd : public PublicGridCommand { public: DropCmd() : PublicGridCommand( "drop" ) {} bool run(const string& dbName , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool) { string collection = cmdObj.firstElement().valuestrsafe(); string fullns = dbName + "." + collection; DBConfigPtr conf = grid.getDBConfig( dbName , false ); log() << "DROP: " << fullns << endl; if ( ! conf || ! conf->isShardingEnabled() || ! conf->isSharded( fullns ) ) { return passthrough( conf , cmdObj , result ); } ChunkManagerPtr cm = conf->getChunkManager( fullns ); massert( 10418 , "how could chunk manager be null!" , cm ); cm->drop( cm ); uassert( 13512 , "drop collection attempted on non-sharded collection" , conf->removeSharding( fullns ) ); return 1; } } dropCmd; class DropDBCmd : public PublicGridCommand { public: DropDBCmd() : PublicGridCommand( "dropDatabase" ) {} bool run(const string& dbName , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool) { BSONElement e = cmdObj.firstElement(); if ( ! e.isNumber() || e.number() != 1 ) { errmsg = "invalid params"; return 0; } DBConfigPtr conf = grid.getDBConfig( dbName , false ); log() << "DROP DATABASE: " << dbName << endl; if ( ! conf ) { result.append( "info" , "database didn't exist" ); return true; } if ( ! conf->dropDatabase( errmsg ) ) return false; result.append( "dropped" , dbName ); return true; } } dropDBCmd; class RenameCollectionCmd : public PublicGridCommand { public: RenameCollectionCmd() : PublicGridCommand( "renameCollection" ) {} bool run(const string& dbName, BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool) { string fullnsFrom = cmdObj.firstElement().valuestrsafe(); string dbNameFrom = nsToDatabase( fullnsFrom.c_str() ); DBConfigPtr confFrom = grid.getDBConfig( dbNameFrom , false ); string fullnsTo = cmdObj["to"].valuestrsafe(); string dbNameTo = nsToDatabase( fullnsTo.c_str() ); DBConfigPtr confTo = grid.getDBConfig( dbNameTo , false ); uassert(13140, "Don't recognize source or target DB", confFrom && confTo); uassert(13138, "You can't rename a sharded collection", !confFrom->isSharded(fullnsFrom)); uassert(13139, "You can't rename to a sharded collection", !confTo->isSharded(fullnsTo)); const Shard& shardTo = confTo->getShard(fullnsTo); const Shard& shardFrom = confFrom->getShard(fullnsFrom); uassert(13137, "Source and destination collections must be on same shard", shardFrom == shardTo); return adminPassthrough( confFrom , cmdObj , result ); } } renameCollectionCmd; class CopyDBCmd : public PublicGridCommand { public: CopyDBCmd() : PublicGridCommand( "copydb" ) {} bool run(const string& dbName, BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool) { string todb = cmdObj.getStringField("todb"); uassert(13402, "need a todb argument", !todb.empty()); DBConfigPtr confTo = grid.getDBConfig( todb ); uassert(13398, "cant copy to sharded DB", !confTo->isShardingEnabled()); string fromhost = cmdObj.getStringField("fromhost"); if (!fromhost.empty()) { return adminPassthrough( confTo , cmdObj , result ); } else { string fromdb = cmdObj.getStringField("fromdb"); uassert(13399, "need a fromdb argument", !fromdb.empty()); DBConfigPtr confFrom = grid.getDBConfig( fromdb , false ); uassert(13400, "don't know where source DB is", confFrom); uassert(13401, "cant copy from sharded DB", !confFrom->isShardingEnabled()); BSONObjBuilder b; BSONForEach(e, cmdObj) { if (strcmp(e.fieldName(), "fromhost") != 0) b.append(e); } b.append("fromhost", confFrom->getPrimary().getConnString()); BSONObj fixed = b.obj(); return adminPassthrough( confTo , fixed , result ); } } } copyDBCmd; class CountCmd : public PublicGridCommand { public: CountCmd() : PublicGridCommand("count") { } virtual bool passOptions() const { return true; } bool run(const string& dbName, BSONObj& cmdObj, int options, string& errmsg, BSONObjBuilder& result, bool) { string collection = cmdObj.firstElement().valuestrsafe(); string fullns = dbName + "." + collection; BSONObj filter; if ( cmdObj["query"].isABSONObj() ) filter = cmdObj["query"].Obj(); DBConfigPtr conf = grid.getDBConfig( dbName , false ); if ( ! conf || ! conf->isShardingEnabled() || ! conf->isSharded( fullns ) ) { ShardConnection conn( conf->getPrimary() , fullns ); BSONObj temp; bool ok = conn->runCommand( dbName , cmdObj , temp, options ); conn.done(); if ( ok ) { result.append( temp["n"] ); return true; } if ( temp["code"].numberInt() != StaleConfigInContextCode ) { errmsg = temp["errmsg"].String(); result.appendElements( temp ); return false; } // this collection got sharded ChunkManagerPtr cm = conf->getChunkManagerIfExists( fullns , true ); if ( ! cm ) { errmsg = "should be sharded now"; result.append( "root" , temp ); return false; } } long long total = 0; map shardCounts; int numTries = 0; bool hadToBreak = false; ChunkManagerPtr cm = conf->getChunkManagerIfExists( fullns ); while ( numTries < 5 ) { numTries++; // This all should eventually be replaced by new pcursor framework, but for now match query // retry behavior manually if( numTries >= 2 ) sleepsecs( numTries - 1 ); if ( ! cm ) { // probably unsharded now return run( dbName , cmdObj , options , errmsg , result, false ); } set shards; cm->getShardsForQuery( shards , filter ); assert( shards.size() ); hadToBreak = false; for (set::iterator it=shards.begin(), end=shards.end(); it != end; ++it) { ShardConnection conn(*it, fullns); if ( conn.setVersion() ){ ChunkManagerPtr newCM = conf->getChunkManagerIfExists( fullns ); if( newCM->getVersion() != cm->getVersion() ){ cm = newCM; total = 0; shardCounts.clear(); conn.done(); hadToBreak = true; break; } } BSONObj temp; bool ok = conn->runCommand( dbName , BSON( "count" << collection << "query" << filter ) , temp, options ); conn.done(); if ( ok ) { long long mine = temp["n"].numberLong(); total += mine; shardCounts[it->getName()] = mine; continue; } if ( StaleConfigInContextCode == temp["code"].numberInt() ) { // my version is old total = 0; shardCounts.clear(); cm = conf->getChunkManagerIfExists( fullns , true, numTries > 2 ); // Force reload on third attempt hadToBreak = true; break; } // command failed :( errmsg = "failed on : " + it->getName(); result.append( "cause" , temp ); return false; } if ( ! hadToBreak ) break; } if (hadToBreak) { errmsg = "Tried 5 times without success to get count for " + fullns + " from all shards"; return false; } total = applySkipLimit( total , cmdObj ); result.appendNumber( "n" , total ); BSONObjBuilder temp( result.subobjStart( "shards" ) ); for ( map::iterator i=shardCounts.begin(); i!=shardCounts.end(); ++i ) temp.appendNumber( i->first , i->second ); temp.done(); return true; } } countCmd; class CollectionStats : public PublicGridCommand { public: CollectionStats() : PublicGridCommand("collStats", "collstats") { } bool run(const string& dbName , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool) { string collection = cmdObj.firstElement().valuestrsafe(); string fullns = dbName + "." + collection; DBConfigPtr conf = grid.getDBConfig( dbName , false ); if ( ! conf || ! conf->isShardingEnabled() || ! conf->isSharded( fullns ) ) { result.appendBool("sharded", false); result.append( "primary" , conf->getPrimary().getName() ); return passthrough( conf , cmdObj , result); } result.appendBool("sharded", true); ChunkManagerPtr cm = conf->getChunkManager( fullns ); massert( 12594 , "how could chunk manager be null!" , cm ); set servers; cm->getAllShards(servers); BSONObjBuilder shardStats; map counts; map indexSizes; /* long long count=0; long long size=0; long long storageSize=0; */ int nindexes=0; bool warnedAboutIndexes = false; for ( set::iterator i=servers.begin(); i!=servers.end(); i++ ) { ScopedDbConnection conn( *i ); BSONObj res; if ( ! conn->runCommand( dbName , cmdObj , res ) ) { errmsg = "failed on shard: " + res.toString(); return false; } conn.done(); BSONObjIterator j( res ); while ( j.more() ) { BSONElement e = j.next(); if ( str::equals( e.fieldName() , "ns" ) || str::equals( e.fieldName() , "ok" ) || str::equals( e.fieldName() , "avgObjSize" ) || str::equals( e.fieldName() , "lastExtentSize" ) || str::equals( e.fieldName() , "paddingFactor" ) ) { continue; } else if ( str::equals( e.fieldName() , "count" ) || str::equals( e.fieldName() , "size" ) || str::equals( e.fieldName() , "storageSize" ) || str::equals( e.fieldName() , "numExtents" ) || str::equals( e.fieldName() , "totalIndexSize" ) ) { counts[e.fieldName()] += e.numberLong(); } else if ( str::equals( e.fieldName() , "indexSizes" ) ) { BSONObjIterator k( e.Obj() ); while ( k.more() ) { BSONElement temp = k.next(); indexSizes[temp.fieldName()] += temp.numberLong(); } } else if ( str::equals( e.fieldName() , "flags" ) ) { if ( ! result.hasField( e.fieldName() ) ) result.append( e ); } else if ( str::equals( e.fieldName() , "nindexes" ) ) { int myIndexes = e.numberInt(); if ( nindexes == 0 ) { nindexes = myIndexes; } else if ( nindexes == myIndexes ) { // no-op } else { // hopefully this means we're building an index if ( myIndexes > nindexes ) nindexes = myIndexes; if ( ! warnedAboutIndexes ) { result.append( "warning" , "indexes don't all match - ok if ensureIndex is running" ); warnedAboutIndexes = true; } } } else { warning() << "mongos collstats doesn't know about: " << e.fieldName() << endl; } } shardStats.append(i->getName(), res); } result.append("ns", fullns); for ( map::iterator i=counts.begin(); i!=counts.end(); ++i ) result.appendNumber( i->first , i->second ); { BSONObjBuilder ib( result.subobjStart( "indexSizes" ) ); for ( map::iterator i=indexSizes.begin(); i!=indexSizes.end(); ++i ) ib.appendNumber( i->first , i->second ); ib.done(); } if ( counts["count"] > 0 ) result.append("avgObjSize", (double)counts["size"] / (double)counts["count"] ); else result.append( "avgObjSize", 0.0 ); result.append("nindexes", nindexes); result.append("nchunks", cm->numChunks()); result.append("shards", shardStats.obj()); return true; } } collectionStatsCmd; class FindAndModifyCmd : public PublicGridCommand { public: FindAndModifyCmd() : PublicGridCommand("findAndModify", "findandmodify") { } bool run(const string& dbName, BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool) { string collection = cmdObj.firstElement().valuestrsafe(); string fullns = dbName + "." + collection; DBConfigPtr conf = grid.getDBConfig( dbName , false ); if ( ! conf || ! conf->isShardingEnabled() || ! conf->isSharded( fullns ) ) { return passthrough( conf , cmdObj , result); } ChunkManagerPtr cm = conf->getChunkManager( fullns ); massert( 13002 , "shard internal error chunk manager should never be null" , cm ); BSONObj filter = cmdObj.getObjectField("query"); uassert(13343, "query for sharded findAndModify must have shardkey", cm->hasShardKey(filter)); //TODO with upsert consider tracking for splits ChunkPtr chunk = cm->findChunk(filter); ShardConnection conn( chunk->getShard() , fullns ); BSONObj res; bool ok = conn->runCommand( conf->getName() , cmdObj , res ); conn.done(); if (!ok && res.getIntField("code") == 9996) { // code for StaleConfigException throw StaleConfigException(fullns, "FindAndModify"); // Command code traps this and re-runs } result.appendElements(res); return ok; } } findAndModifyCmd; class DataSizeCmd : public PublicGridCommand { public: DataSizeCmd() : PublicGridCommand("dataSize", "datasize") { } bool run(const string& dbName, BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool) { string fullns = cmdObj.firstElement().String(); DBConfigPtr conf = grid.getDBConfig( dbName , false ); if ( ! conf || ! conf->isShardingEnabled() || ! conf->isSharded( fullns ) ) { return passthrough( conf , cmdObj , result); } ChunkManagerPtr cm = conf->getChunkManager( fullns ); massert( 13407 , "how could chunk manager be null!" , cm ); BSONObj min = cmdObj.getObjectField( "min" ); BSONObj max = cmdObj.getObjectField( "max" ); BSONObj keyPattern = cmdObj.getObjectField( "keyPattern" ); uassert(13408, "keyPattern must equal shard key", cm->getShardKey().key() == keyPattern); // yes these are doubles... double size = 0; double numObjects = 0; int millis = 0; set shards; cm->getShardsForRange(shards, min, max); for ( set::iterator i=shards.begin(), end=shards.end() ; i != end; ++i ) { ScopedDbConnection conn( *i ); BSONObj res; bool ok = conn->runCommand( conf->getName() , cmdObj , res ); conn.done(); if ( ! ok ) { result.appendElements( res ); return false; } size += res["size"].number(); numObjects += res["numObjects"].number(); millis += res["millis"].numberInt(); } result.append( "size", size ); result.append( "numObjects" , numObjects ); result.append( "millis" , millis ); return true; } } DataSizeCmd; class ConvertToCappedCmd : public NotAllowedOnShardedCollectionCmd { public: ConvertToCappedCmd() : NotAllowedOnShardedCollectionCmd("convertToCapped") {} virtual string getFullNS( const string& dbName , const BSONObj& cmdObj ) { return dbName + "." + cmdObj.firstElement().valuestrsafe(); } } convertToCappedCmd; class GroupCmd : public NotAllowedOnShardedCollectionCmd { public: GroupCmd() : NotAllowedOnShardedCollectionCmd("group") {} virtual bool passOptions() const { return true; } virtual string getFullNS( const string& dbName , const BSONObj& cmdObj ) { return dbName + "." + cmdObj.firstElement().embeddedObjectUserCheck()["ns"].valuestrsafe(); } } groupCmd; class DistinctCmd : public PublicGridCommand { public: DistinctCmd() : PublicGridCommand("distinct") {} virtual void help( stringstream &help ) const { help << "{ distinct : 'collection name' , key : 'a.b' , query : {} }"; } virtual bool passOptions() const { return true; } bool run(const string& dbName , BSONObj& cmdObj, int options, string& errmsg, BSONObjBuilder& result, bool) { string collection = cmdObj.firstElement().valuestrsafe(); string fullns = dbName + "." + collection; DBConfigPtr conf = grid.getDBConfig( dbName , false ); if ( ! conf || ! conf->isShardingEnabled() || ! conf->isSharded( fullns ) ) { return passthrough( conf , cmdObj , options, result ); } ChunkManagerPtr cm = conf->getChunkManager( fullns ); massert( 10420 , "how could chunk manager be null!" , cm ); BSONObj query = getQuery(cmdObj); set shards; cm->getShardsForQuery(shards, query); set all; int size = 32; for ( set::iterator i=shards.begin(), end=shards.end() ; i != end; ++i ) { ShardConnection conn( *i , fullns ); BSONObj res; bool ok = conn->runCommand( conf->getName() , cmdObj , res, options ); conn.done(); if ( ! ok ) { result.appendElements( res ); return false; } BSONObjIterator it( res["values"].embeddedObject() ); while ( it.more() ) { BSONElement nxt = it.next(); BSONObjBuilder temp(32); temp.appendAs( nxt , "" ); all.insert( temp.obj() ); } } BSONObjBuilder b( size ); int n=0; for ( set::iterator i = all.begin() ; i != all.end(); i++ ) { b.appendAs( i->firstElement() , b.numStr( n++ ) ); } result.appendArray( "values" , b.obj() ); return true; } } disinctCmd; class FileMD5Cmd : public PublicGridCommand { public: FileMD5Cmd() : PublicGridCommand("filemd5") {} virtual void help( stringstream &help ) const { help << " example: { filemd5 : ObjectId(aaaaaaa) , root : \"fs\" }"; } bool run(const string& dbName , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool) { string fullns = dbName; fullns += "."; { string root = cmdObj.getStringField( "root" ); if ( root.size() == 0 ) root = "fs"; fullns += root; } fullns += ".chunks"; DBConfigPtr conf = grid.getDBConfig( dbName , false ); if ( ! conf || ! conf->isShardingEnabled() || ! conf->isSharded( fullns ) ) { return passthrough( conf , cmdObj , result ); } ChunkManagerPtr cm = conf->getChunkManager( fullns ); massert( 13091 , "how could chunk manager be null!" , cm ); uassert( 13092 , "GridFS chunks collection can only be sharded on files_id", cm->getShardKey().key() == BSON("files_id" << 1)); ChunkPtr chunk = cm->findChunk( BSON("files_id" << cmdObj.firstElement()) ); ShardConnection conn( chunk->getShard() , fullns ); BSONObj res; bool ok = conn->runCommand( conf->getName() , cmdObj , res ); conn.done(); result.appendElements(res); return ok; } } fileMD5Cmd; class Geo2dFindNearCmd : public PublicGridCommand { public: Geo2dFindNearCmd() : PublicGridCommand( "geoNear" ) {} void help(stringstream& h) const { h << "http://www.mongodb.org/display/DOCS/Geospatial+Indexing#GeospatialIndexing-geoNearCommand"; } virtual bool passOptions() const { return true; } bool run(const string& dbName , BSONObj& cmdObj, int options, string& errmsg, BSONObjBuilder& result, bool) { string collection = cmdObj.firstElement().valuestrsafe(); string fullns = dbName + "." + collection; DBConfigPtr conf = grid.getDBConfig( dbName , false ); if ( ! conf || ! conf->isShardingEnabled() || ! conf->isSharded( fullns ) ) { return passthrough( conf , cmdObj , options, result ); } ChunkManagerPtr cm = conf->getChunkManager( fullns ); massert( 13500 , "how could chunk manager be null!" , cm ); BSONObj query = getQuery(cmdObj); set shards; cm->getShardsForQuery(shards, query); int limit = 100; if (cmdObj["num"].isNumber()) limit = cmdObj["num"].numberInt(); list< shared_ptr > futures; BSONArrayBuilder shardArray; for ( set::const_iterator i=shards.begin(), end=shards.end() ; i != end ; i++ ) { futures.push_back( Future::spawnCommand( i->getConnString() , dbName , cmdObj, options ) ); shardArray.append(i->getName()); } multimap results; // TODO: maybe use merge-sort instead string nearStr; double time = 0; double btreelocs = 0; double nscanned = 0; double objectsLoaded = 0; for ( list< shared_ptr >::iterator i=futures.begin(); i!=futures.end(); i++ ) { shared_ptr res = *i; if ( ! res->join() ) { errmsg = res->result()["errmsg"].String(); return false; } nearStr = res->result()["near"].String(); time += res->result()["stats"]["time"].Number(); btreelocs += res->result()["stats"]["btreelocs"].Number(); nscanned += res->result()["stats"]["nscanned"].Number(); objectsLoaded += res->result()["stats"]["objectsLoaded"].Number(); BSONForEach(obj, res->result()["results"].embeddedObject()) { results.insert(make_pair(obj["dis"].Number(), obj.embeddedObject().getOwned())); } // TODO: maybe shrink results if size() > limit } result.append("ns" , fullns); result.append("near", nearStr); int outCount = 0; double totalDistance = 0; double maxDistance = 0; { BSONArrayBuilder sub (result.subarrayStart("results")); for (multimap::const_iterator it(results.begin()), end(results.end()); it!= end && outCount < limit; ++it, ++outCount) { totalDistance += it->first; maxDistance = it->first; // guaranteed to be highest so far sub.append(it->second); } sub.done(); } { BSONObjBuilder sub (result.subobjStart("stats")); sub.append("time", time); sub.append("btreelocs", btreelocs); sub.append("nscanned", nscanned); sub.append("objectsLoaded", objectsLoaded); sub.append("avgDistance", totalDistance / outCount); sub.append("maxDistance", maxDistance); sub.append("shards", shardArray.arr()); sub.done(); } return true; } } geo2dFindNearCmd; class MRCmd : public PublicGridCommand { public: AtomicUInt JOB_NUMBER; MRCmd() : PublicGridCommand( "mapreduce" ) {} string getTmpName( const string& coll ) { stringstream ss; ss << "tmp.mrs." << coll << "_" << time(0) << "_" << JOB_NUMBER++; return ss.str(); } BSONObj fixForShards( const BSONObj& orig , const string& output, BSONObj& customOut , string& badShardedField ) { BSONObjBuilder b; BSONObjIterator i( orig ); while ( i.more() ) { BSONElement e = i.next(); string fn = e.fieldName(); if ( fn == "map" || fn == "mapreduce" || fn == "mapparams" || fn == "reduce" || fn == "query" || fn == "sort" || fn == "scope" || fn == "verbose" ) { b.append( e ); } else if ( fn == "out" || fn == "finalize" ) { // we don't want to copy these if (fn == "out" && e.type() == Object) { // check if there is a custom output BSONObj out = e.embeddedObject(); // if (out.hasField("db")) customOut = out; } } else { badShardedField = fn; return BSONObj(); } } b.append( "out" , output ); return b.obj(); } bool run(const string& dbName , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool) { Timer t; string collection = cmdObj.firstElement().valuestrsafe(); string fullns = dbName + "." + collection; const string shardedOutputCollection = getTmpName( collection ); string badShardedField; BSONObj customOut; BSONObj shardedCommand = fixForShards( cmdObj , shardedOutputCollection, customOut , badShardedField ); bool customOutDB = customOut.hasField( "db" ); DBConfigPtr conf = grid.getDBConfig( dbName , false ); if ( ! conf || ! conf->isShardingEnabled() || ! conf->isSharded( fullns ) ) { if ( customOutDB ) { errmsg = "can't use out 'db' with non-sharded db"; return false; } return passthrough( conf , cmdObj , result ); } if ( badShardedField.size() ) { errmsg = str::stream() << "unknown m/r field for sharding: " << badShardedField; return false; } BSONObjBuilder timingBuilder; ChunkManagerPtr cm = conf->getChunkManager( fullns ); BSONObj q; if ( cmdObj["query"].type() == Object ) { q = cmdObj["query"].embeddedObjectUserCheck(); } set shards; cm->getShardsForQuery( shards , q ); BSONObjBuilder finalCmd; finalCmd.append( "mapreduce.shardedfinish" , cmdObj ); finalCmd.append( "shardedOutputCollection" , shardedOutputCollection ); set servers; BSONObj shardCounts; BSONObj aggCounts; map countsMap; { // we need to use our connections to the shard // so filtering is done correctly for un-owned docs // so we allocate them in our thread // and hand off // Note: why not use pooled connections? This has been reported to create too many connections vector< shared_ptr > shardConns; list< shared_ptr > futures; for ( set::iterator i=shards.begin(), end=shards.end() ; i != end ; i++ ) { shared_ptr temp( new ShardConnection( i->getConnString() , fullns ) ); assert( temp->get() ); futures.push_back( Future::spawnCommand( i->getConnString() , dbName , shardedCommand , 0 , temp->get() ) ); shardConns.push_back( temp ); } bool failed = false; // now wait for the result of all shards BSONObjBuilder shardResultsB; BSONObjBuilder shardCountsB; BSONObjBuilder aggCountsB; for ( list< shared_ptr >::iterator i=futures.begin(); i!=futures.end(); i++ ) { shared_ptr res = *i; if ( ! res->join() ) { error() << "sharded m/r failed on shard: " << res->getServer() << " error: " << res->result() << endl; result.append( "cause" , res->result() ); errmsg = "mongod mr failed: "; errmsg += res->result().toString(); failed = true; continue; } BSONObj result = res->result(); shardResultsB.append( res->getServer() , result ); BSONObj counts = result["counts"].embeddedObjectUserCheck(); shardCountsB.append( res->getServer() , counts ); servers.insert(res->getServer()); // add up the counts for each shard // some of them will be fixed later like output and reduce BSONObjIterator j( counts ); while ( j.more() ) { BSONElement temp = j.next(); countsMap[temp.fieldName()] += temp.numberLong(); } } for ( unsigned i=0; idone(); if ( failed ) return 0; finalCmd.append( "shards" , shardResultsB.obj() ); shardCounts = shardCountsB.obj(); finalCmd.append( "shardCounts" , shardCounts ); timingBuilder.append( "shards" , t.millis() ); for ( map::iterator i=countsMap.begin(); i!=countsMap.end(); i++ ) { aggCountsB.append( i->first , i->second ); } aggCounts = aggCountsB.obj(); finalCmd.append( "counts" , aggCounts ); } Timer t2; BSONObj finalResult; bool ok = false; string outdb = dbName; if (customOutDB) { BSONElement elmt = customOut.getField("db"); outdb = elmt.valuestrsafe(); } if (!customOut.getBoolField("sharded")) { // non-sharded, use the MRFinish command on target server // This will save some data transfer // by default the target database is same as input Shard outServer = conf->getPrimary(); string outns = fullns; if ( customOutDB ) { // have to figure out shard for the output DB DBConfigPtr conf2 = grid.getDBConfig( outdb , true ); outServer = conf2->getPrimary(); outns = outdb + "." + collection; } log() << "customOut: " << customOut << " outServer: " << outServer << endl; ShardConnection conn( outServer , outns ); ok = conn->runCommand( dbName , finalCmd.obj() , finalResult ); conn.done(); } else { // grab records from each shard and insert back in correct shard in "temp" collection // we do the final reduce in mongos since records are ordered and already reduced on each shard // string shardedIncLong = str::stream() << outdb << ".tmp.mr." << collection << "_" << "shardedTemp" << "_" << time(0) << "_" << JOB_NUMBER++; mr_shard::Config config( dbName , cmdObj ); mr_shard::State state(config); LOG(1) << "mr sharded output ns: " << config.ns << endl; if (config.outType == mr_shard::Config::INMEMORY) { errmsg = "This Map Reduce mode is not supported with sharded output"; return false; } if (!config.outDB.empty()) { BSONObjBuilder loc; if ( !config.outDB.empty()) loc.append( "db" , config.outDB ); loc.append( "collection" , config.finalShort ); result.append("result", loc.obj()); } else { if ( !config.finalShort.empty() ) result.append( "result" , config.finalShort ); } string outns = config.finalLong; string tempns; // result will be inserted into a temp collection to post process const string postProcessCollection = getTmpName( collection ); finalCmd.append("postProcessCollection", postProcessCollection); tempns = dbName + "." + postProcessCollection; // if (config.outType == mr_shard::Config::REPLACE) { // // drop previous collection // BSONObj dropColCmd = BSON("drop" << config.finalShort); // BSONObjBuilder dropColResult(32); // string outdbCmd = outdb + ".$cmd"; // bool res = Command::runAgainstRegistered(outdbCmd.c_str(), dropColCmd, dropColResult); // if (!res) { // errmsg = str::stream() << "Could not drop sharded output collection " << outns << ": " << dropColResult.obj().toString(); // return false; // } // } BSONObj sortKey = BSON( "_id" << 1 ); if (!conf->isSharded(outns)) { // create the sharded collection BSONObj shardColCmd = BSON("shardCollection" << outns << "key" << sortKey); BSONObjBuilder shardColResult(32); bool res = Command::runAgainstRegistered("admin.$cmd", shardColCmd, shardColResult); if (!res) { errmsg = str::stream() << "Could not create sharded output collection " << outns << ": " << shardColResult.obj().toString(); return false; } } ParallelSortClusteredCursor cursor( servers , dbName + "." + shardedOutputCollection , Query().sort( sortKey ) ); cursor.init(); state.init(); mr_shard::BSONList values; Strategy* s = SHARDED; long long finalCount = 0; int currentSize = 0; while ( cursor.more() || !values.empty() ) { BSONObj t; if ( cursor.more() ) { t = cursor.next().getOwned(); if ( values.size() == 0 || t.woSortOrder( *(values.begin()) , sortKey ) == 0 ) { values.push_back( t ); currentSize += t.objsize(); // check size and potentially reduce if (currentSize > config.maxInMemSize && values.size() > config.reduceTriggerRatio) { BSONObj reduced = config.reducer->finalReduce(values, 0); values.clear(); values.push_back( reduced ); currentSize = reduced.objsize(); } continue; } } BSONObj final = config.reducer->finalReduce(values, config.finalizer.get()); if (config.outType == mr_shard::Config::MERGE) { BSONObj id = final["_id"].wrap(); s->updateSharded(conf, outns.c_str(), id, final, UpdateOption_Upsert, true); } else { // insert into temp collection, but using final collection's shard chunks s->insertSharded(conf, tempns.c_str(), final, 0, true, outns.c_str()); } ++finalCount; values.clear(); if (!t.isEmpty()) { values.push_back( t ); currentSize = t.objsize(); } } if (config.outType == mr_shard::Config::REDUCE || config.outType == mr_shard::Config::REPLACE) { // results were written to temp collection, need post processing vector< shared_ptr > shardConns; list< shared_ptr > futures; BSONObj finalCmdObj = finalCmd.obj(); for ( set::iterator i=shards.begin(), end=shards.end() ; i != end ; i++ ) { shared_ptr temp( new ShardConnection( i->getConnString() , outns ) ); futures.push_back( Future::spawnCommand( i->getConnString() , dbName , finalCmdObj , 0 , temp->get() ) ); shardConns.push_back( temp ); } // now wait for the result of all shards bool failed = false; for ( list< shared_ptr >::iterator i=futures.begin(); i!=futures.end(); i++ ) { shared_ptr res = *i; if ( ! res->join() ) { error() << "final reduce on sharded output m/r failed on shard: " << res->getServer() << " error: " << res->result() << endl; result.append( "cause" , res->result() ); errmsg = "mongod mr failed: "; errmsg += res->result().toString(); failed = true; continue; } BSONObj result = res->result(); } for ( unsigned i=0; idone(); if (failed) return 0; } for ( set::iterator i=servers.begin(); i!=servers.end(); i++ ) { ScopedDbConnection conn( i->_server ); conn->dropCollection( dbName + "." + shardedOutputCollection ); conn.done(); } result.append("shardCounts", shardCounts); // fix the global counts BSONObjBuilder countsB(32); BSONObjIterator j(aggCounts); while (j.more()) { BSONElement elmt = j.next(); if (!strcmp(elmt.fieldName(), "reduce")) countsB.append("reduce", elmt.numberLong() + state.numReduces()); else if (!strcmp(elmt.fieldName(), "output")) countsB.append("output", finalCount); else countsB.append(elmt); } result.append( "counts" , countsB.obj() ); ok = true; } if ( ! ok ) { errmsg = "final reduce failed: "; errmsg += finalResult.toString(); return 0; } timingBuilder.append( "final" , t2.millis() ); result.appendElements( finalResult ); result.append( "timeMillis" , t.millis() ); result.append( "timing" , timingBuilder.obj() ); return 1; } } mrCmd; class ApplyOpsCmd : public PublicGridCommand { public: ApplyOpsCmd() : PublicGridCommand( "applyOps" ) {} virtual bool run(const string& dbName , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool) { errmsg = "applyOps not allowed through mongos"; return false; } } applyOpsCmd; class CompactCmd : public PublicGridCommand { public: CompactCmd() : PublicGridCommand( "compact" ) {} virtual bool run(const string& dbName , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool) { errmsg = "compact not allowed through mongos"; return false; } } compactCmd; } bool Command::runAgainstRegistered(const char *ns, BSONObj& jsobj, BSONObjBuilder& anObjBuilder, int queryOptions) { const char *p = strchr(ns, '.'); if ( !p ) return false; if ( strcmp(p, ".$cmd") != 0 ) return false; bool ok = false; BSONElement e = jsobj.firstElement(); map::iterator i; if ( e.eoo() ) ; // check for properly registered command objects. else if ( (i = _commands->find(e.fieldName())) != _commands->end() ) { string errmsg; Command *c = i->second; ClientInfo *client = ClientInfo::get(); AuthenticationInfo *ai = client->getAuthenticationInfo(); char cl[256]; nsToDatabase(ns, cl); if( c->requiresAuth() && !ai->isAuthorized(cl)) { ok = false; errmsg = "unauthorized"; } else if( c->adminOnly() && c->localHostOnlyIfNoAuth( jsobj ) && noauth && !ai->isLocalHost ) { ok = false; errmsg = "unauthorized: this command must run from localhost when running db without auth"; log() << "command denied: " << jsobj.toString() << endl; } else if ( c->adminOnly() && !startsWith(ns, "admin.") ) { ok = false; errmsg = "access denied - use admin db"; } else if ( jsobj.getBoolField( "help" ) ) { stringstream help; help << "help for: " << e.fieldName() << " "; c->help( help ); anObjBuilder.append( "help" , help.str() ); } else { ok = c->run( nsToDatabase( ns ) , jsobj, queryOptions, errmsg, anObjBuilder, false ); } BSONObj tmp = anObjBuilder.asTempObj(); bool have_ok = tmp.hasField("ok"); bool have_errmsg = tmp.hasField("errmsg"); if (!have_ok) anObjBuilder.append( "ok" , ok ? 1.0 : 0.0 ); if ( !ok && !have_errmsg) { anObjBuilder.append("errmsg", errmsg); uassert_nothrow(errmsg.c_str()); } return true; } return false; } }