summaryrefslogtreecommitdiff
path: root/s/commands_public.cpp
diff options
context:
space:
mode:
Diffstat (limited to 's/commands_public.cpp')
-rw-r--r--s/commands_public.cpp559
1 files changed, 463 insertions, 96 deletions
diff --git a/s/commands_public.cpp b/s/commands_public.cpp
index f29205b..ef7110c 100644
--- a/s/commands_public.cpp
+++ b/s/commands_public.cpp
@@ -18,20 +18,28 @@
*/
#include "pch.h"
-#include "../util/message.h"
+#include "../util/net/message.h"
#include "../db/dbmessage.h"
#include "../client/connpool.h"
#include "../client/parallel.h"
#include "../db/commands.h"
-#include "../db/query.h"
+#include "../db/queryutil.h"
+#include "../scripting/engine.h"
#include "config.h"
#include "chunk.h"
#include "strategy.h"
#include "grid.h"
+#include "mr_shard.h"
+#include "client.h"
namespace mongo {
+ bool setParmsMongodSpecific(const string& dbname, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl )
+ {
+ return true;
+ }
+
namespace dbgrid_pub_cmds {
class PublicGridCommand : public Command {
@@ -45,22 +53,38 @@ namespace mongo {
return false;
}
+ // Override if passthrough should also send query options
+ // Safer as off by default, can slowly enable as we add more tests
+ virtual bool passOptions() const { return false; }
+
// all grid commands are designed not to lock
virtual LockType locktype() const { return NONE; }
protected:
+
bool passthrough( DBConfigPtr conf, const BSONObj& cmdObj , BSONObjBuilder& result ) {
- return _passthrough(conf->getName(), conf, cmdObj, result);
+ return _passthrough(conf->getName(), conf, cmdObj, 0, result);
}
bool adminPassthrough( DBConfigPtr conf, const BSONObj& cmdObj , BSONObjBuilder& result ) {
- return _passthrough("admin", conf, cmdObj, result);
+ return _passthrough("admin", conf, cmdObj, 0, result);
+ }
+
+ bool passthrough( DBConfigPtr conf, const BSONObj& cmdObj , int options, BSONObjBuilder& result ) {
+ return _passthrough(conf->getName(), conf, cmdObj, options, result);
+ }
+ bool adminPassthrough( DBConfigPtr conf, const BSONObj& cmdObj , int options, BSONObjBuilder& result ) {
+ return _passthrough("admin", conf, cmdObj, options, result);
}
private:
- bool _passthrough(const string& db, DBConfigPtr conf, const BSONObj& cmdObj , BSONObjBuilder& result ) {
+ bool _passthrough(const string& db, DBConfigPtr conf, const BSONObj& cmdObj , int options , BSONObjBuilder& result ) {
ShardConnection conn( conf->getPrimary() , "" );
BSONObj res;
- bool ok = conn->runCommand( db , cmdObj , res );
+ bool ok = conn->runCommand( db , cmdObj , res , passOptions() ? options : 0 );
+ if ( ! ok && res["code"].numberInt() == StaleConfigInContextCode ) {
+ conn.done();
+ throw StaleConfigException("foo","command failed because of stale config");
+ }
result.appendElements( res );
conn.done();
return ok;
@@ -87,13 +111,14 @@ namespace mongo {
virtual void aggregateResults(const vector<BSONObj>& results, BSONObjBuilder& output) {}
// don't override
- virtual bool run(const string& dbName , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& output, bool) {
+ virtual bool run(const string& dbName , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& output, bool) {
+ LOG(1) << "RunOnAllShardsCommand db: " << dbName << " cmd:" << cmdObj << endl;
set<Shard> shards;
getShards(dbName, cmdObj, shards);
list< shared_ptr<Future::CommandResult> > futures;
for ( set<Shard>::const_iterator i=shards.begin(), end=shards.end() ; i != end ; i++ ) {
- futures.push_back( Future::spawnCommand( i->getConnString() , dbName , cmdObj ) );
+ futures.push_back( Future::spawnCommand( i->getConnString() , dbName , cmdObj, 0 ) );
}
vector<BSONObj> results;
@@ -147,13 +172,13 @@ namespace mongo {
virtual string getFullNS( const string& dbName , const BSONObj& cmdObj ) = 0;
- virtual bool run(const string& dbName , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool) {
+ virtual bool run(const string& dbName , BSONObj& cmdObj, int options, string& errmsg, BSONObjBuilder& result, bool) {
string fullns = getFullNS( dbName , cmdObj );
DBConfigPtr conf = grid.getDBConfig( dbName , false );
if ( ! conf || ! conf->isShardingEnabled() || ! conf->isSharded( fullns ) ) {
- return passthrough( conf , cmdObj , result );
+ return passthrough( conf , cmdObj , options, result );
}
errmsg = "can't do command: " + name + " on sharded collection";
return false;
@@ -172,9 +197,41 @@ namespace mongo {
ReIndexCmd() : AllShardsCollectionCommand("reIndex") {}
} reIndexCmd;
+ class ProfileCmd : public PublicGridCommand {
+ public:
+ ProfileCmd() : PublicGridCommand("profile") {}
+ virtual bool run(const string& dbName , BSONObj& cmdObj, int options, string& errmsg, BSONObjBuilder& result, bool) {
+ errmsg = "profile currently not supported via mongos";
+ return false;
+ }
+ } profileCmd;
+
+
class ValidateCmd : public AllShardsCollectionCommand {
public:
ValidateCmd() : AllShardsCollectionCommand("validate") {}
+ virtual void aggregateResults(const vector<BSONObj>& results, BSONObjBuilder& output) {
+ for (vector<BSONObj>::const_iterator it(results.begin()), end(results.end()); it!=end; it++){
+ const BSONObj& result = *it;
+ const BSONElement valid = result["valid"];
+ if (!valid.eoo()){
+ if (!valid.trueValue()) {
+ output.appendBool("valid", false);
+ return;
+ }
+ }
+ else {
+ // Support pre-1.9.0 output with everything in a big string
+ const char* s = result["result"].valuestrsafe();
+ if (strstr(s, "exception") || strstr(s, "corrupt")){
+ output.appendBool("valid", false);
+ return;
+ }
+ }
+ }
+
+ output.appendBool("valid", true);
+ }
} validateCmd;
class RepairDatabaseCmd : public RunOnAllShardsCommand {
@@ -221,7 +278,7 @@ namespace mongo {
class DropCmd : public PublicGridCommand {
public:
DropCmd() : PublicGridCommand( "drop" ) {}
- bool run(const string& dbName , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool) {
+ bool run(const string& dbName , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool) {
string collection = cmdObj.firstElement().valuestrsafe();
string fullns = dbName + "." + collection;
@@ -246,7 +303,7 @@ namespace mongo {
class DropDBCmd : public PublicGridCommand {
public:
DropDBCmd() : PublicGridCommand( "dropDatabase" ) {}
- bool run(const string& dbName , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool) {
+ bool run(const string& dbName , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool) {
BSONElement e = cmdObj.firstElement();
@@ -275,7 +332,7 @@ namespace mongo {
class RenameCollectionCmd : public PublicGridCommand {
public:
RenameCollectionCmd() : PublicGridCommand( "renameCollection" ) {}
- bool run(const string& dbName, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool) {
+ bool run(const string& dbName, BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool) {
string fullnsFrom = cmdObj.firstElement().valuestrsafe();
string dbNameFrom = nsToDatabase( fullnsFrom.c_str() );
DBConfigPtr confFrom = grid.getDBConfig( dbNameFrom , false );
@@ -300,7 +357,7 @@ namespace mongo {
class CopyDBCmd : public PublicGridCommand {
public:
CopyDBCmd() : PublicGridCommand( "copydb" ) {}
- bool run(const string& dbName, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool) {
+ bool run(const string& dbName, BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool) {
string todb = cmdObj.getStringField("todb");
uassert(13402, "need a todb argument", !todb.empty());
@@ -336,7 +393,8 @@ namespace mongo {
class CountCmd : public PublicGridCommand {
public:
CountCmd() : PublicGridCommand("count") { }
- bool run(const string& dbName, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool l) {
+ virtual bool passOptions() const { return true; }
+ bool run(const string& dbName, BSONObj& cmdObj, int options, string& errmsg, BSONObjBuilder& result, bool) {
string collection = cmdObj.firstElement().valuestrsafe();
string fullns = dbName + "." + collection;
@@ -345,12 +403,11 @@ namespace mongo {
filter = cmdObj["query"].Obj();
DBConfigPtr conf = grid.getDBConfig( dbName , false );
-
if ( ! conf || ! conf->isShardingEnabled() || ! conf->isSharded( fullns ) ) {
ShardConnection conn( conf->getPrimary() , fullns );
BSONObj temp;
- bool ok = conn->runCommand( dbName , cmdObj , temp );
+ bool ok = conn->runCommand( dbName , cmdObj , temp, options );
conn.done();
if ( ok ) {
@@ -365,7 +422,7 @@ namespace mongo {
}
// this collection got sharded
- ChunkManagerPtr cm = conf->getChunkManager( fullns , true );
+ ChunkManagerPtr cm = conf->getChunkManagerIfExists( fullns , true );
if ( ! cm ) {
errmsg = "should be sharded now";
result.append( "root" , temp );
@@ -376,11 +433,11 @@ namespace mongo {
long long total = 0;
map<string,long long> shardCounts;
- ChunkManagerPtr cm = conf->getChunkManager( fullns );
+ ChunkManagerPtr cm = conf->getChunkManagerIfExists( fullns );
while ( true ) {
if ( ! cm ) {
// probably unsharded now
- return run( dbName , cmdObj , errmsg , result , l );
+ return run( dbName , cmdObj , options , errmsg , result, false );
}
set<Shard> shards;
@@ -394,14 +451,14 @@ namespace mongo {
if ( conn.setVersion() ) {
total = 0;
shardCounts.clear();
- cm = conf->getChunkManager( fullns );
+ cm = conf->getChunkManagerIfExists( fullns );
conn.done();
hadToBreak = true;
break;
}
BSONObj temp;
- bool ok = conn->runCommand( dbName , BSON( "count" << collection << "query" << filter ) , temp );
+ bool ok = conn->runCommand( dbName , BSON( "count" << collection << "query" << filter ) , temp, options );
conn.done();
if ( ok ) {
@@ -415,7 +472,7 @@ namespace mongo {
// my version is old
total = 0;
shardCounts.clear();
- cm = conf->getChunkManager( fullns , true );
+ cm = conf->getChunkManagerIfExists( fullns , true );
hadToBreak = true;
break;
}
@@ -442,14 +499,13 @@ namespace mongo {
class CollectionStats : public PublicGridCommand {
public:
CollectionStats() : PublicGridCommand("collStats", "collstats") { }
- bool run(const string& dbName , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool) {
+ bool run(const string& dbName , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool) {
string collection = cmdObj.firstElement().valuestrsafe();
string fullns = dbName + "." + collection;
DBConfigPtr conf = grid.getDBConfig( dbName , false );
if ( ! conf || ! conf->isShardingEnabled() || ! conf->isSharded( fullns ) ) {
- result.append( "ns" , fullns );
result.appendBool("sharded", false);
result.append( "primary" , conf->getPrimary().getName() );
return passthrough( conf , cmdObj , result);
@@ -463,9 +519,13 @@ namespace mongo {
cm->getAllShards(servers);
BSONObjBuilder shardStats;
+ map<string,long long> counts;
+ map<string,long long> indexSizes;
+ /*
long long count=0;
long long size=0;
long long storageSize=0;
+ */
int nindexes=0;
bool warnedAboutIndexes = false;
for ( set<Shard>::iterator i=servers.begin(); i!=servers.end(); i++ ) {
@@ -476,39 +536,82 @@ namespace mongo {
return false;
}
conn.done();
-
- count += res["count"].numberLong();
- size += res["size"].numberLong();
- storageSize += res["storageSize"].numberLong();
-
- int myIndexes = res["nindexes"].numberInt();
-
- if ( nindexes == 0 ) {
- nindexes = myIndexes;
- }
- else if ( nindexes == myIndexes ) {
- // no-op
- }
- else {
- // hopefully this means we're building an index
-
- if ( myIndexes > nindexes )
- nindexes = myIndexes;
-
- if ( ! warnedAboutIndexes ) {
- result.append( "warning" , "indexes don't all match - ok if ensureIndex is running" );
- warnedAboutIndexes = true;
+
+ BSONObjIterator j( res );
+ while ( j.more() ) {
+ BSONElement e = j.next();
+
+ if ( str::equals( e.fieldName() , "ns" ) ||
+ str::equals( e.fieldName() , "ok" ) ||
+ str::equals( e.fieldName() , "avgObjSize" ) ||
+ str::equals( e.fieldName() , "lastExtentSize" ) ||
+ str::equals( e.fieldName() , "paddingFactor" ) ) {
+ continue;
+ }
+ else if ( str::equals( e.fieldName() , "count" ) ||
+ str::equals( e.fieldName() , "size" ) ||
+ str::equals( e.fieldName() , "storageSize" ) ||
+ str::equals( e.fieldName() , "numExtents" ) ||
+ str::equals( e.fieldName() , "totalIndexSize" ) ) {
+ counts[e.fieldName()] += e.numberLong();
+ }
+ else if ( str::equals( e.fieldName() , "indexSizes" ) ) {
+ BSONObjIterator k( e.Obj() );
+ while ( k.more() ) {
+ BSONElement temp = k.next();
+ indexSizes[temp.fieldName()] += temp.numberLong();
+ }
+ }
+ else if ( str::equals( e.fieldName() , "flags" ) ) {
+ if ( ! result.hasField( e.fieldName() ) )
+ result.append( e );
}
+ else if ( str::equals( e.fieldName() , "nindexes" ) ) {
+ int myIndexes = e.numberInt();
+
+ if ( nindexes == 0 ) {
+ nindexes = myIndexes;
+ }
+ else if ( nindexes == myIndexes ) {
+ // no-op
+ }
+ else {
+ // hopefully this means we're building an index
+
+ if ( myIndexes > nindexes )
+ nindexes = myIndexes;
+
+ if ( ! warnedAboutIndexes ) {
+ result.append( "warning" , "indexes don't all match - ok if ensureIndex is running" );
+ warnedAboutIndexes = true;
+ }
+ }
+ }
+ else {
+ warning() << "mongos collstats doesn't know about: " << e.fieldName() << endl;
+ }
+
}
-
shardStats.append(i->getName(), res);
}
result.append("ns", fullns);
- result.appendNumber("count", count);
- result.appendNumber("size", size);
- result.append ("avgObjSize", double(size) / double(count));
- result.appendNumber("storageSize", storageSize);
+
+ for ( map<string,long long>::iterator i=counts.begin(); i!=counts.end(); ++i )
+ result.appendNumber( i->first , i->second );
+
+ {
+ BSONObjBuilder ib( result.subobjStart( "indexSizes" ) );
+ for ( map<string,long long>::iterator i=indexSizes.begin(); i!=indexSizes.end(); ++i )
+ ib.appendNumber( i->first , i->second );
+ ib.done();
+ }
+
+ if ( counts["count"] > 0 )
+ result.append("avgObjSize", (double)counts["size"] / (double)counts["count"] );
+ else
+ result.append( "avgObjSize", 0.0 );
+
result.append("nindexes", nindexes);
result.append("nchunks", cm->numChunks());
@@ -521,7 +624,7 @@ namespace mongo {
class FindAndModifyCmd : public PublicGridCommand {
public:
FindAndModifyCmd() : PublicGridCommand("findAndModify", "findandmodify") { }
- bool run(const string& dbName, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool) {
+ bool run(const string& dbName, BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool) {
string collection = cmdObj.firstElement().valuestrsafe();
string fullns = dbName + "." + collection;
@@ -532,7 +635,7 @@ namespace mongo {
}
ChunkManagerPtr cm = conf->getChunkManager( fullns );
- massert( 13002 , "how could chunk manager be null!" , cm );
+ massert( 13002 , "shard internal error chunk manager should never be null" , cm );
BSONObj filter = cmdObj.getObjectField("query");
uassert(13343, "query for sharded findAndModify must have shardkey", cm->hasShardKey(filter));
@@ -558,7 +661,7 @@ namespace mongo {
class DataSizeCmd : public PublicGridCommand {
public:
DataSizeCmd() : PublicGridCommand("dataSize", "datasize") { }
- bool run(const string& dbName, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool) {
+ bool run(const string& dbName, BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool) {
string fullns = cmdObj.firstElement().String();
DBConfigPtr conf = grid.getDBConfig( dbName , false );
@@ -622,7 +725,7 @@ namespace mongo {
class GroupCmd : public NotAllowedOnShardedCollectionCmd {
public:
GroupCmd() : NotAllowedOnShardedCollectionCmd("group") {}
-
+ virtual bool passOptions() const { return true; }
virtual string getFullNS( const string& dbName , const BSONObj& cmdObj ) {
return dbName + "." + cmdObj.firstElement().embeddedObjectUserCheck()["ns"].valuestrsafe();
}
@@ -635,14 +738,15 @@ namespace mongo {
virtual void help( stringstream &help ) const {
help << "{ distinct : 'collection name' , key : 'a.b' , query : {} }";
}
- bool run(const string& dbName , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool) {
+ virtual bool passOptions() const { return true; }
+ bool run(const string& dbName , BSONObj& cmdObj, int options, string& errmsg, BSONObjBuilder& result, bool) {
string collection = cmdObj.firstElement().valuestrsafe();
string fullns = dbName + "." + collection;
DBConfigPtr conf = grid.getDBConfig( dbName , false );
if ( ! conf || ! conf->isShardingEnabled() || ! conf->isSharded( fullns ) ) {
- return passthrough( conf , cmdObj , result );
+ return passthrough( conf , cmdObj , options, result );
}
ChunkManagerPtr cm = conf->getChunkManager( fullns );
@@ -658,7 +762,7 @@ namespace mongo {
for ( set<Shard>::iterator i=shards.begin(), end=shards.end() ; i != end; ++i ) {
ShardConnection conn( *i , fullns );
BSONObj res;
- bool ok = conn->runCommand( conf->getName() , cmdObj , res );
+ bool ok = conn->runCommand( conf->getName() , cmdObj , res, options );
conn.done();
if ( ! ok ) {
@@ -693,7 +797,7 @@ namespace mongo {
virtual void help( stringstream &help ) const {
help << " example: { filemd5 : ObjectId(aaaaaaa) , root : \"fs\" }";
}
- bool run(const string& dbName , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool) {
+ bool run(const string& dbName , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool) {
string fullns = dbName;
fullns += ".";
{
@@ -730,15 +834,15 @@ namespace mongo {
public:
Geo2dFindNearCmd() : PublicGridCommand( "geoNear" ) {}
void help(stringstream& h) const { h << "http://www.mongodb.org/display/DOCS/Geospatial+Indexing#GeospatialIndexing-geoNearCommand"; }
-
- bool run(const string& dbName , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool) {
+ virtual bool passOptions() const { return true; }
+ bool run(const string& dbName , BSONObj& cmdObj, int options, string& errmsg, BSONObjBuilder& result, bool) {
string collection = cmdObj.firstElement().valuestrsafe();
string fullns = dbName + "." + collection;
DBConfigPtr conf = grid.getDBConfig( dbName , false );
if ( ! conf || ! conf->isShardingEnabled() || ! conf->isSharded( fullns ) ) {
- return passthrough( conf , cmdObj , result );
+ return passthrough( conf , cmdObj , options, result );
}
ChunkManagerPtr cm = conf->getChunkManager( fullns );
@@ -755,7 +859,7 @@ namespace mongo {
list< shared_ptr<Future::CommandResult> > futures;
BSONArrayBuilder shardArray;
for ( set<Shard>::const_iterator i=shards.begin(), end=shards.end() ; i != end ; i++ ) {
- futures.push_back( Future::spawnCommand( i->getConnString() , dbName , cmdObj ) );
+ futures.push_back( Future::spawnCommand( i->getConnString() , dbName , cmdObj, options ) );
shardArray.append(i->getName());
}
@@ -820,12 +924,13 @@ namespace mongo {
class MRCmd : public PublicGridCommand {
public:
+ AtomicUInt JOB_NUMBER;
+
MRCmd() : PublicGridCommand( "mapreduce" ) {}
string getTmpName( const string& coll ) {
- static int inc = 1;
stringstream ss;
- ss << "tmp.mrs." << coll << "_" << time(0) << "_" << inc++;
+ ss << "tmp.mrs." << coll << "_" << time(0) << "_" << JOB_NUMBER++;
return ss.str();
}
@@ -851,8 +956,8 @@ namespace mongo {
if (fn == "out" && e.type() == Object) {
// check if there is a custom output
BSONObj out = e.embeddedObject();
- if (out.hasField("db"))
- customOut = out;
+// if (out.hasField("db"))
+ customOut = out;
}
}
else {
@@ -864,7 +969,7 @@ namespace mongo {
return b.obj();
}
- bool run(const string& dbName , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool) {
+ bool run(const string& dbName , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool) {
Timer t;
string collection = cmdObj.firstElement().valuestrsafe();
@@ -876,7 +981,7 @@ namespace mongo {
BSONObj customOut;
BSONObj shardedCommand = fixForShards( cmdObj , shardedOutputCollection, customOut , badShardedField );
- bool customOutDB = ! customOut.isEmpty() && customOut.hasField( "db" );
+ bool customOutDB = customOut.hasField( "db" );
DBConfigPtr conf = grid.getDBConfig( dbName , false );
@@ -911,26 +1016,32 @@ namespace mongo {
finalCmd.append( "shardedOutputCollection" , shardedOutputCollection );
+ set<ServerAndQuery> servers;
+ BSONObj shardCounts;
+ BSONObj aggCounts;
+ map<string,long long> countsMap;
{
// we need to use our connections to the shard
// so filtering is done correctly for un-owned docs
// so we allocate them in our thread
// and hand off
-
+ // Note: why not use pooled connections? This has been reported to create too many connections
vector< shared_ptr<ShardConnection> > shardConns;
-
list< shared_ptr<Future::CommandResult> > futures;
for ( set<Shard>::iterator i=shards.begin(), end=shards.end() ; i != end ; i++ ) {
shared_ptr<ShardConnection> temp( new ShardConnection( i->getConnString() , fullns ) );
assert( temp->get() );
- futures.push_back( Future::spawnCommand( i->getConnString() , dbName , shardedCommand , temp->get() ) );
+ futures.push_back( Future::spawnCommand( i->getConnString() , dbName , shardedCommand , 0 , temp->get() ) );
shardConns.push_back( temp );
}
bool failed = false;
-
- BSONObjBuilder shardresults;
+
+ // now wait for the result of all shards
+ BSONObjBuilder shardResultsB;
+ BSONObjBuilder shardCountsB;
+ BSONObjBuilder aggCountsB;
for ( list< shared_ptr<Future::CommandResult> >::iterator i=futures.begin(); i!=futures.end(); i++ ) {
shared_ptr<Future::CommandResult> res = *i;
if ( ! res->join() ) {
@@ -941,7 +1052,19 @@ namespace mongo {
failed = true;
continue;
}
- shardresults.append( res->getServer() , res->result() );
+ BSONObj result = res->result();
+ shardResultsB.append( res->getServer() , result );
+ BSONObj counts = result["counts"].embeddedObjectUserCheck();
+ shardCountsB.append( res->getServer() , counts );
+ servers.insert(res->getServer());
+
+ // add up the counts for each shard
+ // some of them will be fixed later like output and reduce
+ BSONObjIterator j( counts );
+ while ( j.more() ) {
+ BSONElement temp = j.next();
+ countsMap[temp.fieldName()] += temp.numberLong();
+ }
}
for ( unsigned i=0; i<shardConns.size(); i++ )
@@ -950,28 +1073,205 @@ namespace mongo {
if ( failed )
return 0;
- finalCmd.append( "shards" , shardresults.obj() );
+ finalCmd.append( "shards" , shardResultsB.obj() );
+ shardCounts = shardCountsB.obj();
+ finalCmd.append( "shardCounts" , shardCounts );
timingBuilder.append( "shards" , t.millis() );
+
+ for ( map<string,long long>::iterator i=countsMap.begin(); i!=countsMap.end(); i++ ) {
+ aggCountsB.append( i->first , i->second );
+ }
+ aggCounts = aggCountsB.obj();
+ finalCmd.append( "counts" , aggCounts );
}
Timer t2;
- // by default the target database is same as input
- Shard outServer = conf->getPrimary();
- string outns = fullns;
- if ( customOutDB ) {
- // have to figure out shard for the output DB
- BSONElement elmt = customOut.getField("db");
- string outdb = elmt.valuestrsafe();
- outns = outdb + "." + collection;
- DBConfigPtr conf2 = grid.getDBConfig( outdb , true );
- outServer = conf2->getPrimary();
- }
- log() << "customOut: " << customOut << " outServer: " << outServer << endl;
-
- ShardConnection conn( outServer , outns );
BSONObj finalResult;
- bool ok = conn->runCommand( dbName , finalCmd.obj() , finalResult );
- conn.done();
+ bool ok = false;
+ string outdb = dbName;
+ if (customOutDB) {
+ BSONElement elmt = customOut.getField("db");
+ outdb = elmt.valuestrsafe();
+ }
+
+ if (!customOut.getBoolField("sharded")) {
+ // non-sharded, use the MRFinish command on target server
+ // This will save some data transfer
+
+ // by default the target database is same as input
+ Shard outServer = conf->getPrimary();
+ string outns = fullns;
+ if ( customOutDB ) {
+ // have to figure out shard for the output DB
+ DBConfigPtr conf2 = grid.getDBConfig( outdb , true );
+ outServer = conf2->getPrimary();
+ outns = outdb + "." + collection;
+ }
+ log() << "customOut: " << customOut << " outServer: " << outServer << endl;
+
+ ShardConnection conn( outServer , outns );
+ ok = conn->runCommand( dbName , finalCmd.obj() , finalResult );
+ conn.done();
+ } else {
+ // grab records from each shard and insert back in correct shard in "temp" collection
+ // we do the final reduce in mongos since records are ordered and already reduced on each shard
+// string shardedIncLong = str::stream() << outdb << ".tmp.mr." << collection << "_" << "shardedTemp" << "_" << time(0) << "_" << JOB_NUMBER++;
+
+ mr_shard::Config config( dbName , cmdObj );
+ mr_shard::State state(config);
+ LOG(1) << "mr sharded output ns: " << config.ns << endl;
+
+ if (config.outType == mr_shard::Config::INMEMORY) {
+ errmsg = "This Map Reduce mode is not supported with sharded output";
+ return false;
+ }
+
+ if (!config.outDB.empty()) {
+ BSONObjBuilder loc;
+ if ( !config.outDB.empty())
+ loc.append( "db" , config.outDB );
+ loc.append( "collection" , config.finalShort );
+ result.append("result", loc.obj());
+ }
+ else {
+ if ( !config.finalShort.empty() )
+ result.append( "result" , config.finalShort );
+ }
+
+ string outns = config.finalLong;
+ string tempns;
+
+ // result will be inserted into a temp collection to post process
+ const string postProcessCollection = getTmpName( collection );
+ finalCmd.append("postProcessCollection", postProcessCollection);
+ tempns = dbName + "." + postProcessCollection;
+
+// if (config.outType == mr_shard::Config::REPLACE) {
+// // drop previous collection
+// BSONObj dropColCmd = BSON("drop" << config.finalShort);
+// BSONObjBuilder dropColResult(32);
+// string outdbCmd = outdb + ".$cmd";
+// bool res = Command::runAgainstRegistered(outdbCmd.c_str(), dropColCmd, dropColResult);
+// if (!res) {
+// errmsg = str::stream() << "Could not drop sharded output collection " << outns << ": " << dropColResult.obj().toString();
+// return false;
+// }
+// }
+
+ BSONObj sortKey = BSON( "_id" << 1 );
+ if (!conf->isSharded(outns)) {
+ // create the sharded collection
+
+ BSONObj shardColCmd = BSON("shardCollection" << outns << "key" << sortKey);
+ BSONObjBuilder shardColResult(32);
+ bool res = Command::runAgainstRegistered("admin.$cmd", shardColCmd, shardColResult);
+ if (!res) {
+ errmsg = str::stream() << "Could not create sharded output collection " << outns << ": " << shardColResult.obj().toString();
+ return false;
+ }
+ }
+
+ ParallelSortClusteredCursor cursor( servers , dbName + "." + shardedOutputCollection ,
+ Query().sort( sortKey ) );
+ cursor.init();
+ state.init();
+
+ mr_shard::BSONList values;
+ Strategy* s = SHARDED;
+ long long finalCount = 0;
+ int currentSize = 0;
+ while ( cursor.more() || !values.empty() ) {
+ BSONObj t;
+ if ( cursor.more() ) {
+ t = cursor.next().getOwned();
+
+ if ( values.size() == 0 || t.woSortOrder( *(values.begin()) , sortKey ) == 0 ) {
+ values.push_back( t );
+ currentSize += t.objsize();
+
+ // check size and potentially reduce
+ if (currentSize > config.maxInMemSize && values.size() > config.reduceTriggerRatio) {
+ BSONObj reduced = config.reducer->finalReduce(values, 0);
+ values.clear();
+ values.push_back( reduced );
+ currentSize = reduced.objsize();
+ }
+ continue;
+ }
+ }
+
+ BSONObj final = config.reducer->finalReduce(values, config.finalizer.get());
+ if (config.outType == mr_shard::Config::MERGE) {
+ BSONObj id = final["_id"].wrap();
+ s->updateSharded(conf, outns.c_str(), id, final, UpdateOption_Upsert, true);
+ } else {
+ // insert into temp collection, but using final collection's shard chunks
+ s->insertSharded(conf, tempns.c_str(), final, 0, true, outns.c_str());
+ }
+ ++finalCount;
+ values.clear();
+ if (!t.isEmpty()) {
+ values.push_back( t );
+ currentSize = t.objsize();
+ }
+ }
+
+ if (config.outType == mr_shard::Config::REDUCE || config.outType == mr_shard::Config::REPLACE) {
+ // results were written to temp collection, need post processing
+ vector< shared_ptr<ShardConnection> > shardConns;
+ list< shared_ptr<Future::CommandResult> > futures;
+ BSONObj finalCmdObj = finalCmd.obj();
+ for ( set<Shard>::iterator i=shards.begin(), end=shards.end() ; i != end ; i++ ) {
+ shared_ptr<ShardConnection> temp( new ShardConnection( i->getConnString() , outns ) );
+ futures.push_back( Future::spawnCommand( i->getConnString() , dbName , finalCmdObj , 0 , temp->get() ) );
+ shardConns.push_back( temp );
+ }
+
+ // now wait for the result of all shards
+ bool failed = false;
+ for ( list< shared_ptr<Future::CommandResult> >::iterator i=futures.begin(); i!=futures.end(); i++ ) {
+ shared_ptr<Future::CommandResult> res = *i;
+ if ( ! res->join() ) {
+ error() << "final reduce on sharded output m/r failed on shard: " << res->getServer() << " error: " << res->result() << endl;
+ result.append( "cause" , res->result() );
+ errmsg = "mongod mr failed: ";
+ errmsg += res->result().toString();
+ failed = true;
+ continue;
+ }
+ BSONObj result = res->result();
+ }
+
+ for ( unsigned i=0; i<shardConns.size(); i++ )
+ shardConns[i]->done();
+
+ if (failed)
+ return 0;
+ }
+
+ for ( set<ServerAndQuery>::iterator i=servers.begin(); i!=servers.end(); i++ ) {
+ ScopedDbConnection conn( i->_server );
+ conn->dropCollection( dbName + "." + shardedOutputCollection );
+ conn.done();
+ }
+
+ result.append("shardCounts", shardCounts);
+
+ // fix the global counts
+ BSONObjBuilder countsB(32);
+ BSONObjIterator j(aggCounts);
+ while (j.more()) {
+ BSONElement elmt = j.next();
+ if (!strcmp(elmt.fieldName(), "reduce"))
+ countsB.append("reduce", elmt.numberLong() + state.numReduces());
+ else if (!strcmp(elmt.fieldName(), "output"))
+ countsB.append("output", finalCount);
+ else
+ countsB.append(elmt);
+ }
+ result.append( "counts" , countsB.obj() );
+ ok = true;
+ }
if ( ! ok ) {
errmsg = "final reduce failed: ";
@@ -991,14 +1291,81 @@ namespace mongo {
class ApplyOpsCmd : public PublicGridCommand {
public:
ApplyOpsCmd() : PublicGridCommand( "applyOps" ) {}
-
- virtual bool run(const string& dbName , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool) {
+ virtual bool run(const string& dbName , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool) {
errmsg = "applyOps not allowed through mongos";
return false;
}
-
} applyOpsCmd;
+ class CompactCmd : public PublicGridCommand {
+ public:
+ CompactCmd() : PublicGridCommand( "compact" ) {}
+ virtual bool run(const string& dbName , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool) {
+ errmsg = "compact not allowed through mongos";
+ return false;
+ }
+ } compactCmd;
+
}
+ bool Command::runAgainstRegistered(const char *ns, BSONObj& jsobj, BSONObjBuilder& anObjBuilder, int queryOptions) {
+ const char *p = strchr(ns, '.');
+ if ( !p ) return false;
+ if ( strcmp(p, ".$cmd") != 0 ) return false;
+
+ bool ok = false;
+
+ BSONElement e = jsobj.firstElement();
+ map<string,Command*>::iterator i;
+
+ if ( e.eoo() )
+ ;
+ // check for properly registered command objects.
+ else if ( (i = _commands->find(e.fieldName())) != _commands->end() ) {
+ string errmsg;
+ Command *c = i->second;
+ ClientInfo *client = ClientInfo::get();
+ AuthenticationInfo *ai = client->getAuthenticationInfo();
+
+ char cl[256];
+ nsToDatabase(ns, cl);
+ if( c->requiresAuth() && !ai->isAuthorized(cl)) {
+ ok = false;
+ errmsg = "unauthorized";
+ }
+ else if( c->adminOnly() && c->localHostOnlyIfNoAuth( jsobj ) && noauth && !ai->isLocalHost ) {
+ ok = false;
+ errmsg = "unauthorized: this command must run from localhost when running db without auth";
+ log() << "command denied: " << jsobj.toString() << endl;
+ }
+ else if ( c->adminOnly() && !startsWith(ns, "admin.") ) {
+ ok = false;
+ errmsg = "access denied - use admin db";
+ }
+ else if ( jsobj.getBoolField( "help" ) ) {
+ stringstream help;
+ help << "help for: " << e.fieldName() << " ";
+ c->help( help );
+ anObjBuilder.append( "help" , help.str() );
+ }
+ else {
+ ok = c->run( nsToDatabase( ns ) , jsobj, queryOptions, errmsg, anObjBuilder, false );
+ }
+
+ BSONObj tmp = anObjBuilder.asTempObj();
+ bool have_ok = tmp.hasField("ok");
+ bool have_errmsg = tmp.hasField("errmsg");
+
+ if (!have_ok)
+ anObjBuilder.append( "ok" , ok ? 1.0 : 0.0 );
+
+ if ( !ok && !have_errmsg) {
+ anObjBuilder.append("errmsg", errmsg);
+ uassert_nothrow(errmsg.c_str());
+ }
+ return true;
+ }
+
+ return false;
+ }
}