diff options
Diffstat (limited to 's/commands_public.cpp')
-rw-r--r-- | s/commands_public.cpp | 620 |
1 files changed, 472 insertions, 148 deletions
diff --git a/s/commands_public.cpp b/s/commands_public.cpp index 649d7d1..3dbc8ad 100644 --- a/s/commands_public.cpp +++ b/s/commands_public.cpp @@ -17,16 +17,18 @@ * along with this program. If not, see <http://www.gnu.org/licenses/>. */ -#include "stdafx.h" +#include "pch.h" #include "../util/message.h" #include "../db/dbmessage.h" #include "../client/connpool.h" #include "../client/parallel.h" #include "../db/commands.h" +#include "../db/query.h" #include "config.h" #include "chunk.h" #include "strategy.h" +#include "grid.h" namespace mongo { @@ -34,32 +36,109 @@ namespace mongo { class PublicGridCommand : public Command { public: - PublicGridCommand( const char * n ) : Command( n ){ + PublicGridCommand( const char* n, const char* oldname=NULL ) : Command( n, false, oldname ){ } - virtual bool slaveOk(){ + virtual bool slaveOk() const { return true; } - virtual bool adminOnly() { + virtual bool adminOnly() const { return false; } // all grid commands are designed not to lock - virtual LockType locktype(){ return NONE; } + virtual LockType locktype() const { return NONE; } protected: - string getDBName( string ns ){ - return ns.substr( 0 , ns.size() - 5 ); - } + bool passthrough( DBConfigPtr conf, const BSONObj& cmdObj , BSONObjBuilder& result ){ + return _passthrough(conf->getName(), conf, cmdObj, result); + } + bool adminPassthrough( DBConfigPtr conf, const BSONObj& cmdObj , BSONObjBuilder& result ){ + return _passthrough("admin", conf, cmdObj, result); + } - bool passthrough( DBConfig * conf, const BSONObj& cmdObj , BSONObjBuilder& result ){ - ScopedDbConnection conn( conf->getPrimary() ); + private: + bool _passthrough(const string& db, DBConfigPtr conf, const BSONObj& cmdObj , BSONObjBuilder& result ){ + ShardConnection conn( conf->getPrimary() , "" ); BSONObj res; - bool ok = conn->runCommand( conf->getName() , cmdObj , res ); + bool ok = conn->runCommand( db , cmdObj , res ); result.appendElements( res ); conn.done(); return ok; } }; + + class RunOnAllShardsCommand : public Command { + public: + RunOnAllShardsCommand(const char* n, const char* oldname=NULL) : Command(n, false, oldname) {} + + virtual bool slaveOk() const { return true; } + virtual bool adminOnly() const { return false; } + + // all grid commands are designed not to lock + virtual LockType locktype() const { return NONE; } + + + // default impl uses all shards for DB + virtual void getShards(const string& dbName , BSONObj& cmdObj, set<Shard>& shards){ + DBConfigPtr conf = grid.getDBConfig( dbName , false ); + conf->getAllShards(shards); + } + + virtual void aggregateResults(const vector<BSONObj>& results, BSONObjBuilder& output) {} + + // don't override + virtual bool run(const string& dbName , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& output, bool){ + set<Shard> shards; + getShards(dbName, cmdObj, shards); + + list< shared_ptr<Future::CommandResult> > futures; + for ( set<Shard>::const_iterator i=shards.begin(), end=shards.end() ; i != end ; i++ ){ + futures.push_back( Future::spawnCommand( i->getConnString() , dbName , cmdObj ) ); + } + + vector<BSONObj> results; + BSONObjBuilder subobj (output.subobjStart("raw")); + BSONObjBuilder errors; + for ( list< shared_ptr<Future::CommandResult> >::iterator i=futures.begin(); i!=futures.end(); i++ ){ + shared_ptr<Future::CommandResult> res = *i; + if ( ! res->join() ){ + errors.appendAs(res->result()["errmsg"], res->getServer()); + } + results.push_back( res->result() ); + subobj.append( res->getServer() , res->result() ); + } + + subobj.done(); + + BSONObj errobj = errors.done(); + if (! errobj.isEmpty()){ + errmsg = errobj.toString(false, true); + return false; + } + + aggregateResults(results, output); + return true; + } + + }; + + class AllShardsCollectionCommand : public RunOnAllShardsCommand { + public: + AllShardsCollectionCommand(const char* n, const char* oldname=NULL) : RunOnAllShardsCommand(n, oldname) {} + + virtual void getShards(const string& dbName , BSONObj& cmdObj, set<Shard>& shards){ + string fullns = dbName + '.' + cmdObj.firstElement().valuestrsafe(); + + DBConfigPtr conf = grid.getDBConfig( dbName , false ); + + if ( ! conf || ! conf->isShardingEnabled() || ! conf->isSharded( fullns ) ){ + shards.insert(conf->getShard(fullns)); + } else { + conf->getChunkManager(fullns)->getAllShards(shards); + } + } + }; + class NotAllowedOnShardedCollectionCmd : public PublicGridCommand { public: @@ -67,12 +146,10 @@ namespace mongo { virtual string getFullNS( const string& dbName , const BSONObj& cmdObj ) = 0; - virtual bool run(const char *ns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){ - - string dbName = getDBName( ns ); + virtual bool run(const string& dbName , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){ string fullns = getFullNS( dbName , cmdObj ); - DBConfig * conf = grid.getDBConfig( dbName , false ); + DBConfigPtr conf = grid.getDBConfig( dbName , false ); if ( ! conf || ! conf->isShardingEnabled() || ! conf->isSharded( fullns ) ){ return passthrough( conf , cmdObj , result ); @@ -84,16 +161,70 @@ namespace mongo { // ---- + class DropIndexesCmd : public AllShardsCollectionCommand { + public: + DropIndexesCmd() : AllShardsCollectionCommand("dropIndexes", "deleteIndexes") {} + } dropIndexesCmd; + + class ReIndexCmd : public AllShardsCollectionCommand { + public: + ReIndexCmd() : AllShardsCollectionCommand("reIndex") {} + } reIndexCmd; + + class ValidateCmd : public AllShardsCollectionCommand { + public: + ValidateCmd() : AllShardsCollectionCommand("validate") {} + } validateCmd; + + class RepairDatabaseCmd : public RunOnAllShardsCommand { + public: + RepairDatabaseCmd() : RunOnAllShardsCommand("repairDatabase") {} + } repairDatabaseCmd; + + class DBStatsCmd : public RunOnAllShardsCommand { + public: + DBStatsCmd() : RunOnAllShardsCommand("dbstats") {} + + virtual void aggregateResults(const vector<BSONObj>& results, BSONObjBuilder& output) { + long long objects = 0; + long long dataSize = 0; + long long storageSize = 0; + long long numExtents = 0; + long long indexes = 0; + long long indexSize = 0; + long long fileSize = 0; + + for (vector<BSONObj>::const_iterator it(results.begin()), end(results.end()); it != end; ++it){ + const BSONObj& b = *it; + objects += b["objects"].numberLong(); + dataSize += b["dataSize"].numberLong(); + storageSize += b["storageSize"].numberLong(); + numExtents += b["numExtents"].numberLong(); + indexes += b["indexes"].numberLong(); + indexSize += b["indexSize"].numberLong(); + fileSize += b["fileSize"].numberLong(); + } + + //result.appendNumber( "collections" , ncollections ); //TODO: need to find a good way to get this + output.appendNumber( "objects" , objects ); + output.append ( "avgObjSize" , double(dataSize) / double(objects) ); + output.appendNumber( "dataSize" , dataSize ); + output.appendNumber( "storageSize" , storageSize); + output.appendNumber( "numExtents" , numExtents ); + output.appendNumber( "indexes" , indexes ); + output.appendNumber( "indexSize" , indexSize ); + output.appendNumber( "fileSize" , fileSize ); + } + } DBStatsCmdObj; + class DropCmd : public PublicGridCommand { public: DropCmd() : PublicGridCommand( "drop" ){} - bool run(const char *ns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){ - - string dbName = getDBName( ns ); + bool run(const string& dbName , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){ string collection = cmdObj.firstElement().valuestrsafe(); string fullns = dbName + "." + collection; - DBConfig * conf = grid.getDBConfig( dbName , false ); + DBConfigPtr conf = grid.getDBConfig( dbName , false ); log() << "DROP: " << fullns << endl; @@ -101,10 +232,10 @@ namespace mongo { return passthrough( conf , cmdObj , result ); } - ChunkManager * cm = conf->getChunkManager( fullns ); + ChunkManagerPtr cm = conf->getChunkManager( fullns ); massert( 10418 , "how could chunk manager be null!" , cm ); - cm->drop(); + cm->drop( cm ); return 1; } @@ -113,7 +244,7 @@ namespace mongo { class DropDBCmd : public PublicGridCommand { public: DropDBCmd() : PublicGridCommand( "dropDatabase" ){} - bool run(const char *ns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){ + bool run(const string& dbName , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){ BSONElement e = cmdObj.firstElement(); @@ -122,14 +253,13 @@ namespace mongo { return 0; } - string dbName = getDBName( ns ); - DBConfig * conf = grid.getDBConfig( dbName , false ); + DBConfigPtr conf = grid.getDBConfig( dbName , false ); log() << "DROP DATABASE: " << dbName << endl; - if ( ! conf || ! conf->isShardingEnabled() ){ - log(1) << " passing though drop database for: " << dbName << endl; - return passthrough( conf , cmdObj , result ); + if ( ! conf ){ + result.append( "info" , "database didn't exist" ); + return true; } if ( ! conf->dropDatabase( errmsg ) ) @@ -140,39 +270,168 @@ namespace mongo { } } dropDBCmd; + class RenameCollectionCmd : public PublicGridCommand { + public: + RenameCollectionCmd() : PublicGridCommand( "renameCollection" ){} + bool run(const string& dbName, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){ + string fullnsFrom = cmdObj.firstElement().valuestrsafe(); + string dbNameFrom = nsToDatabase( fullnsFrom.c_str() ); + DBConfigPtr confFrom = grid.getDBConfig( dbNameFrom , false ); + + string fullnsTo = cmdObj["to"].valuestrsafe(); + string dbNameTo = nsToDatabase( fullnsTo.c_str() ); + DBConfigPtr confTo = grid.getDBConfig( dbNameTo , false ); + + uassert(13140, "Don't recognize source or target DB", confFrom && confTo); + uassert(13138, "You can't rename a sharded collection", !confFrom->isSharded(fullnsFrom)); + uassert(13139, "You can't rename to a sharded collection", !confTo->isSharded(fullnsTo)); + + const Shard& shardTo = confTo->getShard(fullnsTo); + const Shard& shardFrom = confFrom->getShard(fullnsFrom); + + uassert(13137, "Source and destination collections must be on same shard", shardFrom == shardTo); + + return adminPassthrough( confFrom , cmdObj , result ); + } + } renameCollectionCmd; + + class CopyDBCmd : public PublicGridCommand { + public: + CopyDBCmd() : PublicGridCommand( "copydb" ){} + bool run(const string& dbName, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){ + string todb = cmdObj.getStringField("todb"); + uassert(13402, "need a todb argument", !todb.empty()); + + DBConfigPtr confTo = grid.getDBConfig( todb ); + uassert(13398, "cant copy to sharded DB", !confTo->isShardingEnabled()); + + string fromhost = cmdObj.getStringField("fromhost"); + if (!fromhost.empty()){ + return adminPassthrough( confTo , cmdObj , result ); + } else { + string fromdb = cmdObj.getStringField("fromdb"); + uassert(13399, "need a fromdb argument", !fromdb.empty()); + + DBConfigPtr confFrom = grid.getDBConfig( fromdb , false ); + uassert(13400, "don't know where source DB is", confFrom); + uassert(13401, "cant copy from sharded DB", !confFrom->isShardingEnabled()); + + BSONObjBuilder b; + BSONForEach(e, cmdObj){ + if (strcmp(e.fieldName(), "fromhost") != 0) + b.append(e); + } + b.append("fromhost", confFrom->getPrimary().getConnString()); + BSONObj fixed = b.obj(); + + return adminPassthrough( confTo , fixed , result ); + } + + } + }copyDBCmd; + class CountCmd : public PublicGridCommand { public: CountCmd() : PublicGridCommand("count") { } - bool run(const char *ns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){ - - string dbName = getDBName( ns ); + bool run(const string& dbName, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool l){ string collection = cmdObj.firstElement().valuestrsafe(); string fullns = dbName + "." + collection; - BSONObj filter = cmdObj["query"].embeddedObject(); + BSONObj filter; + if ( cmdObj["query"].isABSONObj() ) + filter = cmdObj["query"].Obj(); - DBConfig * conf = grid.getDBConfig( dbName , false ); + DBConfigPtr conf = grid.getDBConfig( dbName , false ); if ( ! conf || ! conf->isShardingEnabled() || ! conf->isSharded( fullns ) ){ - ScopedDbConnection conn( conf->getPrimary() ); - result.append( "n" , (double)conn->count( fullns , filter ) ); + ShardConnection conn( conf->getPrimary() , fullns ); + + BSONObj temp; + bool ok = conn->runCommand( dbName , cmdObj , temp ); conn.done(); - return true; + + if ( ok ){ + result.append( temp["n"] ); + return true; + } + + if ( temp["code"].numberInt() != StaleConfigInContextCode ){ + errmsg = temp["errmsg"].String(); + result.appendElements( temp ); + return false; + } + + // this collection got sharded + ChunkManagerPtr cm = conf->getChunkManager( fullns , true ); + if ( ! cm ){ + errmsg = "should be sharded now"; + result.append( "root" , temp ); + return false; + } } - ChunkManager * cm = conf->getChunkManager( fullns ); - massert( 10419 , "how could chunk manager be null!" , cm ); - - vector<Chunk*> chunks; - cm->getChunksForQuery( chunks , filter ); + long long total = 0; + map<string,long long> shardCounts; - unsigned long long total = 0; - for ( vector<Chunk*>::iterator i = chunks.begin() ; i != chunks.end() ; i++ ){ - Chunk * c = *i; - total += c->countObjects( filter ); + ChunkManagerPtr cm = conf->getChunkManager( fullns ); + while ( true ){ + if ( ! cm ){ + // probably unsharded now + return run( dbName , cmdObj , errmsg , result , l ); + } + + set<Shard> shards; + cm->getShardsForQuery( shards , filter ); + assert( shards.size() ); + + bool hadToBreak = false; + + for (set<Shard>::iterator it=shards.begin(), end=shards.end(); it != end; ++it){ + ShardConnection conn(*it, fullns); + if ( conn.setVersion() ){ + total = 0; + shardCounts.clear(); + cm = conf->getChunkManager( fullns ); + conn.done(); + hadToBreak = true; + break; + } + + BSONObj temp; + bool ok = conn->runCommand( dbName , BSON( "count" << collection << "query" << filter ) , temp ); + conn.done(); + + if ( ok ){ + long long mine = temp["n"].numberLong(); + total += mine; + shardCounts[it->getName()] = mine; + continue; + } + + if ( StaleConfigInContextCode == temp["code"].numberInt() ){ + // my version is old + total = 0; + shardCounts.clear(); + cm = conf->getChunkManager( fullns , true ); + hadToBreak = true; + break; + } + + // command failed :( + errmsg = "failed on : " + it->getName(); + result.append( "cause" , temp ); + return false; + } + if ( ! hadToBreak ) + break; } - result.append( "n" , (double)total ); + total = applySkipLimit( total , cmdObj ); + result.appendNumber( "n" , total ); + BSONObjBuilder temp( result.subobjStart( "shards" ) ); + for ( map<string,long long>::iterator i=shardCounts.begin(); i!=shardCounts.end(); ++i ) + temp.appendNumber( i->first , i->second ); + temp.done(); return true; } } countCmd; @@ -180,31 +439,33 @@ namespace mongo { class CollectionStats : public PublicGridCommand { public: CollectionStats() : PublicGridCommand("collstats") { } - bool run(const char *ns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){ - string dbName = getDBName( ns ); + bool run(const string& dbName , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){ string collection = cmdObj.firstElement().valuestrsafe(); string fullns = dbName + "." + collection; - DBConfig * conf = grid.getDBConfig( dbName , false ); + DBConfigPtr conf = grid.getDBConfig( dbName , false ); if ( ! conf || ! conf->isShardingEnabled() || ! conf->isSharded( fullns ) ){ + result.append( "ns" , fullns ); result.appendBool("sharded", false); + result.append( "primary" , conf->getPrimary().getName() ); return passthrough( conf , cmdObj , result); } result.appendBool("sharded", true); - ChunkManager * cm = conf->getChunkManager( fullns ); + ChunkManagerPtr cm = conf->getChunkManager( fullns ); massert( 12594 , "how could chunk manager be null!" , cm ); - set<string> servers; - cm->getAllServers(servers); + set<Shard> servers; + cm->getAllShards(servers); BSONObjBuilder shardStats; long long count=0; long long size=0; long long storageSize=0; int nindexes=0; - for ( set<string>::iterator i=servers.begin(); i!=servers.end(); i++ ){ + bool warnedAboutIndexes = false; + for ( set<Shard>::iterator i=servers.begin(); i!=servers.end(); i++ ){ ScopedDbConnection conn( *i ); BSONObj res; if ( ! conn->runCommand( dbName , cmdObj , res ) ){ @@ -217,17 +478,33 @@ namespace mongo { size += res["size"].numberLong(); storageSize += res["storageSize"].numberLong(); - if (nindexes) - massert(12595, "nindexes should be the same on all shards!", nindexes == res["nindexes"].numberInt()); - else - nindexes = res["nindexes"].numberInt(); + int myIndexes = res["nindexes"].numberInt(); + + if ( nindexes == 0 ){ + nindexes = myIndexes; + } + else if ( nindexes == myIndexes ){ + // no-op + } + else { + // hopefully this means we're building an index + + if ( myIndexes > nindexes ) + nindexes = myIndexes; + + if ( ! warnedAboutIndexes ){ + result.append( "warning" , "indexes don't all match - ok if ensureIndex is running" ); + warnedAboutIndexes = true; + } + } - shardStats.append(*i, res); + shardStats.append(i->getName(), res); } result.append("ns", fullns); result.appendNumber("count", count); result.appendNumber("size", size); + result.append ("avgObjSize", double(size) / double(count)); result.appendNumber("storageSize", storageSize); result.append("nindexes", nindexes); @@ -241,95 +518,92 @@ namespace mongo { class FindAndModifyCmd : public PublicGridCommand { public: FindAndModifyCmd() : PublicGridCommand("findandmodify") { } - bool run(const char *ns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){ - string dbName = getDBName( ns ); + bool run(const string& dbName, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){ string collection = cmdObj.firstElement().valuestrsafe(); string fullns = dbName + "." + collection; - BSONObj filter = cmdObj.getObjectField("query"); - - DBConfig * conf = grid.getDBConfig( dbName , false ); + DBConfigPtr conf = grid.getDBConfig( dbName , false ); if ( ! conf || ! conf->isShardingEnabled() || ! conf->isSharded( fullns ) ){ return passthrough( conf , cmdObj , result); } - ChunkManager * cm = conf->getChunkManager( fullns ); + ChunkManagerPtr cm = conf->getChunkManager( fullns ); massert( 13002 , "how could chunk manager be null!" , cm ); - vector<Chunk*> chunks; - cm->getChunksForQuery( chunks , filter ); - - BSONObj sort = cmdObj.getObjectField("sort"); - if (!sort.isEmpty()){ - ShardKeyPattern& sk = cm->getShardKey(); - { - BSONObjIterator k (sk.key()); - BSONObjIterator s (sort); - bool good = true; - while (k.more()){ - if (!s.more()){ - good = false; - break; - } - - BSONElement ke = k.next(); - BSONElement se = s.next(); - - // TODO consider values when we support compound keys - if (strcmp(ke.fieldName(), se.fieldName()) != 0){ - good = false; - break; - } - } - - uassert(13001, "Sort must match shard key for sharded findandmodify", good); - } - - std::sort(chunks.begin(), chunks.end(), ChunkCmp(sort)); - } + BSONObj filter = cmdObj.getObjectField("query"); + uassert(13343, "query for sharded findAndModify must have shardkey", cm->hasShardKey(filter)); - for ( vector<Chunk*>::iterator i = chunks.begin() ; i != chunks.end() ; i++ ){ - Chunk * c = *i; + //TODO with upsert consider tracking for splits - ScopedDbConnection conn( c->getShard() ); - BSONObj res; - bool ok = conn->runCommand( conf->getName() , fixCmdObj(cmdObj, c) , res ); - conn.done(); + ChunkPtr chunk = cm->findChunk(filter); + ShardConnection conn( chunk->getShard() , fullns ); + BSONObj res; + bool ok = conn->runCommand( conf->getName() , cmdObj , res ); + conn.done(); - if (ok || (strcmp(res["errmsg"].valuestrsafe(), "No matching object found") != 0)){ - result.appendElements(res); - return ok; - } + if (ok || (strcmp(res["errmsg"].valuestrsafe(), "No matching object found") != 0)){ + result.appendElements(res); + return ok; } return true; } - private: - BSONObj fixCmdObj(const BSONObj& cmdObj, const Chunk* chunk){ - assert(chunk); + } findAndModifyCmd; - BSONObjBuilder b; - BSONObjIterator i(cmdObj); - bool foundQuery = false; - while (i.more()){ - BSONElement e = i.next(); - if (strcmp(e.fieldName(), "query") != 0){ - b.append(e); - }else{ - foundQuery = true; - b.append("query", ClusteredCursor::concatQuery(e.embeddedObjectUserCheck(), chunk->getFilter())); - } + class DataSizeCmd : public PublicGridCommand { + public: + DataSizeCmd() : PublicGridCommand("dataSize", "datasize") { } + bool run(const string& dbName, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){ + string fullns = cmdObj.firstElement().String(); + + DBConfigPtr conf = grid.getDBConfig( dbName , false ); + + if ( ! conf || ! conf->isShardingEnabled() || ! conf->isSharded( fullns ) ){ + return passthrough( conf , cmdObj , result); } + + ChunkManagerPtr cm = conf->getChunkManager( fullns ); + massert( 13407 , "how could chunk manager be null!" , cm ); + + BSONObj min = cmdObj.getObjectField( "min" ); + BSONObj max = cmdObj.getObjectField( "max" ); + BSONObj keyPattern = cmdObj.getObjectField( "keyPattern" ); - if (!foundQuery) - b.append("query", chunk->getFilter()); + uassert(13408, "keyPattern must equal shard key", cm->getShardKey().key() == keyPattern); - return b.obj(); + // yes these are doubles... + double size = 0; + double numObjects = 0; + int millis = 0; + + set<Shard> shards; + cm->getShardsForRange(shards, min, max); + for ( set<Shard>::iterator i=shards.begin(), end=shards.end() ; i != end; ++i ){ + ScopedDbConnection conn( *i ); + BSONObj res; + bool ok = conn->runCommand( conf->getName() , cmdObj , res ); + conn.done(); + + if ( ! ok ){ + result.appendElements( res ); + return false; + } + + size += res["size"].number(); + numObjects += res["numObjects"].number(); + millis += res["millis"].numberInt(); + + } + + result.append( "size", size ); + result.append( "numObjects" , numObjects ); + result.append( "millis" , millis ); + return true; } - } findAndModifyCmd; + } DataSizeCmd; class ConvertToCappedCmd : public NotAllowedOnShardedCollectionCmd { public: @@ -356,33 +630,30 @@ namespace mongo { public: DistinctCmd() : PublicGridCommand("distinct"){} virtual void help( stringstream &help ) const { - help << "{ distinct : 'collection name' , key : 'a.b' }"; + help << "{ distinct : 'collection name' , key : 'a.b' , query : {} }"; } - bool run(const char *ns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){ - - string dbName = getDBName( ns ); + bool run(const string& dbName , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){ string collection = cmdObj.firstElement().valuestrsafe(); string fullns = dbName + "." + collection; - DBConfig * conf = grid.getDBConfig( dbName , false ); + DBConfigPtr conf = grid.getDBConfig( dbName , false ); if ( ! conf || ! conf->isShardingEnabled() || ! conf->isSharded( fullns ) ){ return passthrough( conf , cmdObj , result ); } - ChunkManager * cm = conf->getChunkManager( fullns ); + ChunkManagerPtr cm = conf->getChunkManager( fullns ); massert( 10420 , "how could chunk manager be null!" , cm ); - - vector<Chunk*> chunks; - cm->getChunksForQuery( chunks , BSONObj() ); + + BSONObj query = getQuery(cmdObj); + set<Shard> shards; + cm->getShardsForQuery(shards, query); set<BSONObj,BSONObjCmp> all; int size = 32; - for ( vector<Chunk*>::iterator i = chunks.begin() ; i != chunks.end() ; i++ ){ - Chunk * c = *i; - - ScopedDbConnection conn( c->getShard() ); + for ( set<Shard>::iterator i=shards.begin(), end=shards.end() ; i != end; ++i ){ + ShardConnection conn( *i , fullns ); BSONObj res; bool ok = conn->runCommand( conf->getName() , cmdObj , res ); conn.done(); @@ -392,11 +663,11 @@ namespace mongo { return false; } - BSONObjIterator it( res["values"].embeddedObjectUserCheck() ); + BSONObjIterator it( res["values"].embeddedObject() ); while ( it.more() ){ BSONElement nxt = it.next(); BSONObjBuilder temp(32); - temp.appendAs( nxt , "x" ); + temp.appendAs( nxt , "" ); all.insert( temp.obj() ); } @@ -413,6 +684,45 @@ namespace mongo { } } disinctCmd; + class FileMD5Cmd : public PublicGridCommand { + public: + FileMD5Cmd() : PublicGridCommand("filemd5"){} + virtual void help( stringstream &help ) const { + help << " example: { filemd5 : ObjectId(aaaaaaa) , root : \"fs\" }"; + } + bool run(const string& dbName , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){ + string fullns = dbName; + fullns += "."; + { + string root = cmdObj.getStringField( "root" ); + if ( root.size() == 0 ) + root = "fs"; + fullns += root; + } + fullns += ".chunks"; + + DBConfigPtr conf = grid.getDBConfig( dbName , false ); + + if ( ! conf || ! conf->isShardingEnabled() || ! conf->isSharded( fullns ) ){ + return passthrough( conf , cmdObj , result ); + } + + ChunkManagerPtr cm = conf->getChunkManager( fullns ); + massert( 13091 , "how could chunk manager be null!" , cm ); + uassert( 13092 , "GridFS chunks collection can only be sharded on files_id", cm->getShardKey().key() == BSON("files_id" << 1)); + + ChunkPtr chunk = cm->findChunk( BSON("files_id" << cmdObj.firstElement()) ); + + ShardConnection conn( chunk->getShard() , fullns ); + BSONObj res; + bool ok = conn->runCommand( conf->getName() , cmdObj , res ); + conn.done(); + + result.appendElements(res); + return ok; + } + } fileMD5Cmd; + class MRCmd : public PublicGridCommand { public: MRCmd() : PublicGridCommand( "mapreduce" ){} @@ -451,14 +761,13 @@ namespace mongo { return b.obj(); } - bool run(const char *ns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){ + bool run(const string& dbName , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){ Timer t; - string dbName = getDBName( ns ); string collection = cmdObj.firstElement().valuestrsafe(); string fullns = dbName + "." + collection; - DBConfig * conf = grid.getDBConfig( dbName , false ); + DBConfigPtr conf = grid.getDBConfig( dbName , false ); if ( ! conf || ! conf->isShardingEnabled() || ! conf->isSharded( fullns ) ){ return passthrough( conf , cmdObj , result ); @@ -466,15 +775,15 @@ namespace mongo { BSONObjBuilder timingBuilder; - ChunkManager * cm = conf->getChunkManager( fullns ); + ChunkManagerPtr cm = conf->getChunkManager( fullns ); BSONObj q; if ( cmdObj["query"].type() == Object ){ q = cmdObj["query"].embeddedObjectUserCheck(); } - vector<Chunk*> chunks; - cm->getChunksForQuery( chunks , q ); + set<Shard> shards; + cm->getShardsForQuery( shards , q ); const string shardedOutputCollection = getTmpName( collection ); @@ -486,9 +795,8 @@ namespace mongo { list< shared_ptr<Future::CommandResult> > futures; - for ( vector<Chunk*>::iterator i = chunks.begin() ; i != chunks.end() ; i++ ){ - Chunk * c = *i; - futures.push_back( Future::spawnCommand( c->getShard() , dbName , shardedCommand ) ); + for ( set<Shard>::iterator i=shards.begin(), end=shards.end() ; i != end ; i++ ){ + futures.push_back( Future::spawnCommand( i->getConnString() , dbName , shardedCommand ) ); } BSONObjBuilder shardresults; @@ -506,9 +814,12 @@ namespace mongo { timingBuilder.append( "shards" , t.millis() ); Timer t2; - ScopedDbConnection conn( conf->getPrimary() ); + ShardConnection conn( conf->getPrimary() , fullns ); BSONObj finalResult; - if ( ! conn->runCommand( dbName , finalCmd.obj() , finalResult ) ){ + bool ok = conn->runCommand( dbName , finalCmd.obj() , finalResult ); + conn.done(); + + if ( ! ok ){ errmsg = "final reduce failed: "; errmsg += finalResult.toString(); return 0; @@ -522,5 +833,18 @@ namespace mongo { return 1; } } mrCmd; + + class ApplyOpsCmd : public PublicGridCommand { + public: + ApplyOpsCmd() : PublicGridCommand( "applyOps" ){} + + virtual bool run(const string& dbName , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){ + errmsg = "applyOps not allowed through mongos"; + return false; + } + + } applyOpsCmd; + } + } |