summaryrefslogtreecommitdiff
path: root/s/commands_public.cpp
diff options
context:
space:
mode:
Diffstat (limited to 's/commands_public.cpp')
-rw-r--r--s/commands_public.cpp541
1 files changed, 347 insertions, 194 deletions
diff --git a/s/commands_public.cpp b/s/commands_public.cpp
index 80d5cc9..02000a0 100644
--- a/s/commands_public.cpp
+++ b/s/commands_public.cpp
@@ -33,10 +33,10 @@
namespace mongo {
namespace dbgrid_pub_cmds {
-
+
class PublicGridCommand : public Command {
public:
- PublicGridCommand( const char* n, const char* oldname=NULL ) : Command( n, false, oldname ){
+ PublicGridCommand( const char* n, const char* oldname=NULL ) : Command( n, false, oldname ) {
}
virtual bool slaveOk() const {
return true;
@@ -46,18 +46,18 @@ namespace mongo {
}
// all grid commands are designed not to lock
- virtual LockType locktype() const { return NONE; }
+ virtual LockType locktype() const { return NONE; }
protected:
- bool passthrough( DBConfigPtr conf, const BSONObj& cmdObj , BSONObjBuilder& result ){
+ bool passthrough( DBConfigPtr conf, const BSONObj& cmdObj , BSONObjBuilder& result ) {
return _passthrough(conf->getName(), conf, cmdObj, result);
}
- bool adminPassthrough( DBConfigPtr conf, const BSONObj& cmdObj , BSONObjBuilder& result ){
+ bool adminPassthrough( DBConfigPtr conf, const BSONObj& cmdObj , BSONObjBuilder& result ) {
return _passthrough("admin", conf, cmdObj, result);
}
-
+
private:
- bool _passthrough(const string& db, DBConfigPtr conf, const BSONObj& cmdObj , BSONObjBuilder& result ){
+ bool _passthrough(const string& db, DBConfigPtr conf, const BSONObj& cmdObj , BSONObjBuilder& result ) {
ShardConnection conn( conf->getPrimary() , "" );
BSONObj res;
bool ok = conn->runCommand( db , cmdObj , res );
@@ -75,33 +75,33 @@ namespace mongo {
virtual bool adminOnly() const { return false; }
// all grid commands are designed not to lock
- virtual LockType locktype() const { return NONE; }
+ virtual LockType locktype() const { return NONE; }
// default impl uses all shards for DB
- virtual void getShards(const string& dbName , BSONObj& cmdObj, set<Shard>& shards){
+ virtual void getShards(const string& dbName , BSONObj& cmdObj, set<Shard>& shards) {
DBConfigPtr conf = grid.getDBConfig( dbName , false );
conf->getAllShards(shards);
}
-
+
virtual void aggregateResults(const vector<BSONObj>& results, BSONObjBuilder& output) {}
// don't override
- virtual bool run(const string& dbName , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& output, bool){
+ virtual bool run(const string& dbName , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& output, bool) {
set<Shard> shards;
getShards(dbName, cmdObj, shards);
list< shared_ptr<Future::CommandResult> > futures;
- for ( set<Shard>::const_iterator i=shards.begin(), end=shards.end() ; i != end ; i++ ){
+ for ( set<Shard>::const_iterator i=shards.begin(), end=shards.end() ; i != end ; i++ ) {
futures.push_back( Future::spawnCommand( i->getConnString() , dbName , cmdObj ) );
}
-
+
vector<BSONObj> results;
BSONObjBuilder subobj (output.subobjStart("raw"));
BSONObjBuilder errors;
- for ( list< shared_ptr<Future::CommandResult> >::iterator i=futures.begin(); i!=futures.end(); i++ ){
+ for ( list< shared_ptr<Future::CommandResult> >::iterator i=futures.begin(); i!=futures.end(); i++ ) {
shared_ptr<Future::CommandResult> res = *i;
- if ( ! res->join() ){
+ if ( ! res->join() ) {
errors.appendAs(res->result()["errmsg"], res->getServer());
}
results.push_back( res->result() );
@@ -111,11 +111,11 @@ namespace mongo {
subobj.done();
BSONObj errobj = errors.done();
- if (! errobj.isEmpty()){
+ if (! errobj.isEmpty()) {
errmsg = errobj.toString(false, true);
return false;
}
-
+
aggregateResults(results, output);
return true;
}
@@ -126,39 +126,40 @@ namespace mongo {
public:
AllShardsCollectionCommand(const char* n, const char* oldname=NULL) : RunOnAllShardsCommand(n, oldname) {}
- virtual void getShards(const string& dbName , BSONObj& cmdObj, set<Shard>& shards){
+ virtual void getShards(const string& dbName , BSONObj& cmdObj, set<Shard>& shards) {
string fullns = dbName + '.' + cmdObj.firstElement().valuestrsafe();
-
+
DBConfigPtr conf = grid.getDBConfig( dbName , false );
-
- if ( ! conf || ! conf->isShardingEnabled() || ! conf->isSharded( fullns ) ){
+
+ if ( ! conf || ! conf->isShardingEnabled() || ! conf->isSharded( fullns ) ) {
shards.insert(conf->getShard(fullns));
- } else {
+ }
+ else {
conf->getChunkManager(fullns)->getAllShards(shards);
}
}
};
-
+
class NotAllowedOnShardedCollectionCmd : public PublicGridCommand {
public:
- NotAllowedOnShardedCollectionCmd( const char * n ) : PublicGridCommand( n ){}
+ NotAllowedOnShardedCollectionCmd( const char * n ) : PublicGridCommand( n ) {}
virtual string getFullNS( const string& dbName , const BSONObj& cmdObj ) = 0;
-
- virtual bool run(const string& dbName , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){
+
+ virtual bool run(const string& dbName , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool) {
string fullns = getFullNS( dbName , cmdObj );
-
+
DBConfigPtr conf = grid.getDBConfig( dbName , false );
-
- if ( ! conf || ! conf->isShardingEnabled() || ! conf->isSharded( fullns ) ){
+
+ if ( ! conf || ! conf->isShardingEnabled() || ! conf->isSharded( fullns ) ) {
return passthrough( conf , cmdObj , result );
}
errmsg = "can't do command: " + name + " on sharded collection";
return false;
}
};
-
+
// ----
class DropIndexesCmd : public AllShardsCollectionCommand {
@@ -194,7 +195,7 @@ namespace mongo {
long long indexSize = 0;
long long fileSize = 0;
- for (vector<BSONObj>::const_iterator it(results.begin()), end(results.end()); it != end; ++it){
+ for (vector<BSONObj>::const_iterator it(results.begin()), end(results.end()); it != end; ++it) {
const BSONObj& b = *it;
objects += b["objects"].numberLong();
dataSize += b["dataSize"].numberLong();
@@ -219,23 +220,24 @@ namespace mongo {
class DropCmd : public PublicGridCommand {
public:
- DropCmd() : PublicGridCommand( "drop" ){}
- bool run(const string& dbName , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){
+ DropCmd() : PublicGridCommand( "drop" ) {}
+ bool run(const string& dbName , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool) {
string collection = cmdObj.firstElement().valuestrsafe();
string fullns = dbName + "." + collection;
-
+
DBConfigPtr conf = grid.getDBConfig( dbName , false );
-
+
log() << "DROP: " << fullns << endl;
-
- if ( ! conf || ! conf->isShardingEnabled() || ! conf->isSharded( fullns ) ){
+
+ if ( ! conf || ! conf->isShardingEnabled() || ! conf->isSharded( fullns ) ) {
return passthrough( conf , cmdObj , result );
}
-
+
ChunkManagerPtr cm = conf->getChunkManager( fullns );
massert( 10418 , "how could chunk manager be null!" , cm );
-
+
cm->drop( cm );
+ uassert( 13512 , "drop collection attempted on non-sharded collection" , conf->removeSharding( fullns ) );
return 1;
}
@@ -243,25 +245,25 @@ namespace mongo {
class DropDBCmd : public PublicGridCommand {
public:
- DropDBCmd() : PublicGridCommand( "dropDatabase" ){}
- bool run(const string& dbName , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){
-
+ DropDBCmd() : PublicGridCommand( "dropDatabase" ) {}
+ bool run(const string& dbName , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool) {
+
BSONElement e = cmdObj.firstElement();
-
- if ( ! e.isNumber() || e.number() != 1 ){
+
+ if ( ! e.isNumber() || e.number() != 1 ) {
errmsg = "invalid params";
return 0;
}
-
+
DBConfigPtr conf = grid.getDBConfig( dbName , false );
-
+
log() << "DROP DATABASE: " << dbName << endl;
- if ( ! conf ){
+ if ( ! conf ) {
result.append( "info" , "database didn't exist" );
return true;
}
-
+
if ( ! conf->dropDatabase( errmsg ) )
return false;
@@ -272,8 +274,8 @@ namespace mongo {
class RenameCollectionCmd : public PublicGridCommand {
public:
- RenameCollectionCmd() : PublicGridCommand( "renameCollection" ){}
- bool run(const string& dbName, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){
+ RenameCollectionCmd() : PublicGridCommand( "renameCollection" ) {}
+ bool run(const string& dbName, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool) {
string fullnsFrom = cmdObj.firstElement().valuestrsafe();
string dbNameFrom = nsToDatabase( fullnsFrom.c_str() );
DBConfigPtr confFrom = grid.getDBConfig( dbNameFrom , false );
@@ -297,18 +299,19 @@ namespace mongo {
class CopyDBCmd : public PublicGridCommand {
public:
- CopyDBCmd() : PublicGridCommand( "copydb" ){}
- bool run(const string& dbName, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){
+ CopyDBCmd() : PublicGridCommand( "copydb" ) {}
+ bool run(const string& dbName, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool) {
string todb = cmdObj.getStringField("todb");
uassert(13402, "need a todb argument", !todb.empty());
-
+
DBConfigPtr confTo = grid.getDBConfig( todb );
uassert(13398, "cant copy to sharded DB", !confTo->isShardingEnabled());
string fromhost = cmdObj.getStringField("fromhost");
- if (!fromhost.empty()){
+ if (!fromhost.empty()) {
return adminPassthrough( confTo , cmdObj , result );
- } else {
+ }
+ else {
string fromdb = cmdObj.getStringField("fromdb");
uassert(13399, "need a fromdb argument", !fromdb.empty());
@@ -317,7 +320,7 @@ namespace mongo {
uassert(13401, "cant copy from sharded DB", !confFrom->isShardingEnabled());
BSONObjBuilder b;
- BSONForEach(e, cmdObj){
+ BSONForEach(e, cmdObj) {
if (strcmp(e.fieldName(), "fromhost") != 0)
b.append(e);
}
@@ -328,67 +331,67 @@ namespace mongo {
}
}
- }copyDBCmd;
+ } copyDBCmd;
class CountCmd : public PublicGridCommand {
public:
CountCmd() : PublicGridCommand("count") { }
- bool run(const string& dbName, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool l){
+ bool run(const string& dbName, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool l) {
string collection = cmdObj.firstElement().valuestrsafe();
string fullns = dbName + "." + collection;
-
+
BSONObj filter;
if ( cmdObj["query"].isABSONObj() )
filter = cmdObj["query"].Obj();
-
+
DBConfigPtr conf = grid.getDBConfig( dbName , false );
-
- if ( ! conf || ! conf->isShardingEnabled() || ! conf->isSharded( fullns ) ){
+
+ if ( ! conf || ! conf->isShardingEnabled() || ! conf->isSharded( fullns ) ) {
ShardConnection conn( conf->getPrimary() , fullns );
BSONObj temp;
bool ok = conn->runCommand( dbName , cmdObj , temp );
conn.done();
-
- if ( ok ){
+
+ if ( ok ) {
result.append( temp["n"] );
return true;
}
-
- if ( temp["code"].numberInt() != StaleConfigInContextCode ){
+
+ if ( temp["code"].numberInt() != StaleConfigInContextCode ) {
errmsg = temp["errmsg"].String();
result.appendElements( temp );
return false;
}
-
+
// this collection got sharded
ChunkManagerPtr cm = conf->getChunkManager( fullns , true );
- if ( ! cm ){
+ if ( ! cm ) {
errmsg = "should be sharded now";
result.append( "root" , temp );
return false;
}
}
-
+
long long total = 0;
map<string,long long> shardCounts;
-
+
ChunkManagerPtr cm = conf->getChunkManager( fullns );
- while ( true ){
- if ( ! cm ){
+ while ( true ) {
+ if ( ! cm ) {
// probably unsharded now
return run( dbName , cmdObj , errmsg , result , l );
}
-
+
set<Shard> shards;
cm->getShardsForQuery( shards , filter );
assert( shards.size() );
-
+
bool hadToBreak = false;
- for (set<Shard>::iterator it=shards.begin(), end=shards.end(); it != end; ++it){
+ for (set<Shard>::iterator it=shards.begin(), end=shards.end(); it != end; ++it) {
ShardConnection conn(*it, fullns);
- if ( conn.setVersion() ){
+ if ( conn.setVersion() ) {
total = 0;
shardCounts.clear();
cm = conf->getChunkManager( fullns );
@@ -396,19 +399,19 @@ namespace mongo {
hadToBreak = true;
break;
}
-
+
BSONObj temp;
bool ok = conn->runCommand( dbName , BSON( "count" << collection << "query" << filter ) , temp );
conn.done();
-
- if ( ok ){
+
+ if ( ok ) {
long long mine = temp["n"].numberLong();
total += mine;
shardCounts[it->getName()] = mine;
continue;
}
-
- if ( StaleConfigInContextCode == temp["code"].numberInt() ){
+
+ if ( StaleConfigInContextCode == temp["code"].numberInt() ) {
// my version is old
total = 0;
shardCounts.clear();
@@ -425,7 +428,7 @@ namespace mongo {
if ( ! hadToBreak )
break;
}
-
+
total = applySkipLimit( total , cmdObj );
result.appendNumber( "n" , total );
BSONObjBuilder temp( result.subobjStart( "shards" ) );
@@ -439,13 +442,13 @@ namespace mongo {
class CollectionStats : public PublicGridCommand {
public:
CollectionStats() : PublicGridCommand("collStats", "collstats") { }
- bool run(const string& dbName , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){
+ bool run(const string& dbName , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool) {
string collection = cmdObj.firstElement().valuestrsafe();
string fullns = dbName + "." + collection;
-
+
DBConfigPtr conf = grid.getDBConfig( dbName , false );
-
- if ( ! conf || ! conf->isShardingEnabled() || ! conf->isSharded( fullns ) ){
+
+ if ( ! conf || ! conf->isShardingEnabled() || ! conf->isSharded( fullns ) ) {
result.append( "ns" , fullns );
result.appendBool("sharded", false);
result.append( "primary" , conf->getPrimary().getName() );
@@ -458,17 +461,17 @@ namespace mongo {
set<Shard> servers;
cm->getAllShards(servers);
-
+
BSONObjBuilder shardStats;
long long count=0;
long long size=0;
long long storageSize=0;
int nindexes=0;
bool warnedAboutIndexes = false;
- for ( set<Shard>::iterator i=servers.begin(); i!=servers.end(); i++ ){
+ for ( set<Shard>::iterator i=servers.begin(); i!=servers.end(); i++ ) {
ScopedDbConnection conn( *i );
BSONObj res;
- if ( ! conn->runCommand( dbName , cmdObj , res ) ){
+ if ( ! conn->runCommand( dbName , cmdObj , res ) ) {
errmsg = "failed on shard: " + res.toString();
return false;
}
@@ -480,19 +483,19 @@ namespace mongo {
int myIndexes = res["nindexes"].numberInt();
- if ( nindexes == 0 ){
+ if ( nindexes == 0 ) {
nindexes = myIndexes;
}
- else if ( nindexes == myIndexes ){
+ else if ( nindexes == myIndexes ) {
// no-op
}
else {
// hopefully this means we're building an index
-
+
if ( myIndexes > nindexes )
nindexes = myIndexes;
-
- if ( ! warnedAboutIndexes ){
+
+ if ( ! warnedAboutIndexes ) {
result.append( "warning" , "indexes don't all match - ok if ensureIndex is running" );
warnedAboutIndexes = true;
}
@@ -510,7 +513,7 @@ namespace mongo {
result.append("nchunks", cm->numChunks());
result.append("shards", shardStats.obj());
-
+
return true;
}
} collectionStatsCmd;
@@ -518,19 +521,19 @@ namespace mongo {
class FindAndModifyCmd : public PublicGridCommand {
public:
FindAndModifyCmd() : PublicGridCommand("findAndModify", "findandmodify") { }
- bool run(const string& dbName, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){
+ bool run(const string& dbName, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool) {
string collection = cmdObj.firstElement().valuestrsafe();
string fullns = dbName + "." + collection;
-
+
DBConfigPtr conf = grid.getDBConfig( dbName , false );
-
- if ( ! conf || ! conf->isShardingEnabled() || ! conf->isSharded( fullns ) ){
+
+ if ( ! conf || ! conf->isShardingEnabled() || ! conf->isSharded( fullns ) ) {
return passthrough( conf , cmdObj , result);
}
-
+
ChunkManagerPtr cm = conf->getChunkManager( fullns );
massert( 13002 , "how could chunk manager be null!" , cm );
-
+
BSONObj filter = cmdObj.getObjectField("query");
uassert(13343, "query for sharded findAndModify must have shardkey", cm->hasShardKey(filter));
@@ -542,11 +545,11 @@ namespace mongo {
bool ok = conn->runCommand( conf->getName() , cmdObj , res );
conn.done();
- if (ok || (strcmp(res["errmsg"].valuestrsafe(), "No matching object found") != 0)){
+ if (ok || (strcmp(res["errmsg"].valuestrsafe(), "No matching object found") != 0)) {
result.appendElements(res);
return ok;
}
-
+
return true;
}
@@ -555,18 +558,18 @@ namespace mongo {
class DataSizeCmd : public PublicGridCommand {
public:
DataSizeCmd() : PublicGridCommand("dataSize", "datasize") { }
- bool run(const string& dbName, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){
+ bool run(const string& dbName, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool) {
string fullns = cmdObj.firstElement().String();
-
+
DBConfigPtr conf = grid.getDBConfig( dbName , false );
-
- if ( ! conf || ! conf->isShardingEnabled() || ! conf->isSharded( fullns ) ){
+
+ if ( ! conf || ! conf->isShardingEnabled() || ! conf->isSharded( fullns ) ) {
return passthrough( conf , cmdObj , result);
}
-
+
ChunkManagerPtr cm = conf->getChunkManager( fullns );
massert( 13407 , "how could chunk manager be null!" , cm );
-
+
BSONObj min = cmdObj.getObjectField( "min" );
BSONObj max = cmdObj.getObjectField( "max" );
BSONObj keyPattern = cmdObj.getObjectField( "keyPattern" );
@@ -580,13 +583,13 @@ namespace mongo {
set<Shard> shards;
cm->getShardsForRange(shards, min, max);
- for ( set<Shard>::iterator i=shards.begin(), end=shards.end() ; i != end; ++i ){
+ for ( set<Shard>::iterator i=shards.begin(), end=shards.end() ; i != end; ++i ) {
ScopedDbConnection conn( *i );
BSONObj res;
bool ok = conn->runCommand( conf->getName() , cmdObj , res );
conn.done();
-
- if ( ! ok ){
+
+ if ( ! ok ) {
result.appendElements( res );
return false;
}
@@ -607,64 +610,64 @@ namespace mongo {
class ConvertToCappedCmd : public NotAllowedOnShardedCollectionCmd {
public:
- ConvertToCappedCmd() : NotAllowedOnShardedCollectionCmd("convertToCapped"){}
-
- virtual string getFullNS( const string& dbName , const BSONObj& cmdObj ){
+ ConvertToCappedCmd() : NotAllowedOnShardedCollectionCmd("convertToCapped") {}
+
+ virtual string getFullNS( const string& dbName , const BSONObj& cmdObj ) {
return dbName + "." + cmdObj.firstElement().valuestrsafe();
}
-
+
} convertToCappedCmd;
class GroupCmd : public NotAllowedOnShardedCollectionCmd {
public:
- GroupCmd() : NotAllowedOnShardedCollectionCmd("group"){}
-
- virtual string getFullNS( const string& dbName , const BSONObj& cmdObj ){
+ GroupCmd() : NotAllowedOnShardedCollectionCmd("group") {}
+
+ virtual string getFullNS( const string& dbName , const BSONObj& cmdObj ) {
return dbName + "." + cmdObj.firstElement().embeddedObjectUserCheck()["ns"].valuestrsafe();
}
-
+
} groupCmd;
class DistinctCmd : public PublicGridCommand {
public:
- DistinctCmd() : PublicGridCommand("distinct"){}
+ DistinctCmd() : PublicGridCommand("distinct") {}
virtual void help( stringstream &help ) const {
help << "{ distinct : 'collection name' , key : 'a.b' , query : {} }";
}
- bool run(const string& dbName , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){
+ bool run(const string& dbName , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool) {
string collection = cmdObj.firstElement().valuestrsafe();
string fullns = dbName + "." + collection;
DBConfigPtr conf = grid.getDBConfig( dbName , false );
-
- if ( ! conf || ! conf->isShardingEnabled() || ! conf->isSharded( fullns ) ){
+
+ if ( ! conf || ! conf->isShardingEnabled() || ! conf->isSharded( fullns ) ) {
return passthrough( conf , cmdObj , result );
}
-
+
ChunkManagerPtr cm = conf->getChunkManager( fullns );
massert( 10420 , "how could chunk manager be null!" , cm );
BSONObj query = getQuery(cmdObj);
set<Shard> shards;
cm->getShardsForQuery(shards, query);
-
+
set<BSONObj,BSONObjCmp> all;
int size = 32;
-
- for ( set<Shard>::iterator i=shards.begin(), end=shards.end() ; i != end; ++i ){
+
+ for ( set<Shard>::iterator i=shards.begin(), end=shards.end() ; i != end; ++i ) {
ShardConnection conn( *i , fullns );
BSONObj res;
bool ok = conn->runCommand( conf->getName() , cmdObj , res );
conn.done();
-
- if ( ! ok ){
+
+ if ( ! ok ) {
result.appendElements( res );
return false;
}
-
+
BSONObjIterator it( res["values"].embeddedObject() );
- while ( it.more() ){
+ while ( it.more() ) {
BSONElement nxt = it.next();
BSONObjBuilder temp(32);
temp.appendAs( nxt , "" );
@@ -672,13 +675,13 @@ namespace mongo {
}
}
-
+
BSONObjBuilder b( size );
int n=0;
- for ( set<BSONObj,BSONObjCmp>::iterator i = all.begin() ; i != all.end(); i++ ){
- b.appendAs( i->firstElement() , b.numStr( n++ ).c_str() );
+ for ( set<BSONObj,BSONObjCmp>::iterator i = all.begin() ; i != all.end(); i++ ) {
+ b.appendAs( i->firstElement() , b.numStr( n++ ) );
}
-
+
result.appendArray( "values" , b.obj() );
return true;
}
@@ -686,11 +689,11 @@ namespace mongo {
class FileMD5Cmd : public PublicGridCommand {
public:
- FileMD5Cmd() : PublicGridCommand("filemd5"){}
+ FileMD5Cmd() : PublicGridCommand("filemd5") {}
virtual void help( stringstream &help ) const {
help << " example: { filemd5 : ObjectId(aaaaaaa) , root : \"fs\" }";
}
- bool run(const string& dbName , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){
+ bool run(const string& dbName , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool) {
string fullns = dbName;
fullns += ".";
{
@@ -702,17 +705,17 @@ namespace mongo {
fullns += ".chunks";
DBConfigPtr conf = grid.getDBConfig( dbName , false );
-
- if ( ! conf || ! conf->isShardingEnabled() || ! conf->isSharded( fullns ) ){
+
+ if ( ! conf || ! conf->isShardingEnabled() || ! conf->isSharded( fullns ) ) {
return passthrough( conf , cmdObj , result );
}
-
+
ChunkManagerPtr cm = conf->getChunkManager( fullns );
massert( 13091 , "how could chunk manager be null!" , cm );
uassert( 13092 , "GridFS chunks collection can only be sharded on files_id", cm->getShardKey().key() == BSON("files_id" << 1));
ChunkPtr chunk = cm->findChunk( BSON("files_id" << cmdObj.firstElement()) );
-
+
ShardConnection conn( chunk->getShard() , fullns );
BSONObj res;
bool ok = conn->runCommand( conf->getName() , cmdObj , res );
@@ -723,104 +726,254 @@ namespace mongo {
}
} fileMD5Cmd;
+ class Geo2dFindNearCmd : public PublicGridCommand {
+ public:
+ Geo2dFindNearCmd() : PublicGridCommand( "geoNear" ) {}
+ void help(stringstream& h) const { h << "http://www.mongodb.org/display/DOCS/Geospatial+Indexing#GeospatialIndexing-geoNearCommand"; }
+
+ bool run(const string& dbName , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool) {
+ string collection = cmdObj.firstElement().valuestrsafe();
+ string fullns = dbName + "." + collection;
+
+ DBConfigPtr conf = grid.getDBConfig( dbName , false );
+
+ if ( ! conf || ! conf->isShardingEnabled() || ! conf->isSharded( fullns ) ) {
+ return passthrough( conf , cmdObj , result );
+ }
+
+ ChunkManagerPtr cm = conf->getChunkManager( fullns );
+ massert( 13500 , "how could chunk manager be null!" , cm );
+
+ BSONObj query = getQuery(cmdObj);
+ set<Shard> shards;
+ cm->getShardsForQuery(shards, query);
+
+ int limit = 100;
+ if (cmdObj["num"].isNumber())
+ limit = cmdObj["num"].numberInt();
+
+ list< shared_ptr<Future::CommandResult> > futures;
+ BSONArrayBuilder shardArray;
+ for ( set<Shard>::const_iterator i=shards.begin(), end=shards.end() ; i != end ; i++ ) {
+ futures.push_back( Future::spawnCommand( i->getConnString() , dbName , cmdObj ) );
+ shardArray.append(i->getName());
+ }
+
+ multimap<double, BSONObj> results; // TODO: maybe use merge-sort instead
+ string nearStr;
+ double time = 0;
+ double btreelocs = 0;
+ double nscanned = 0;
+ double objectsLoaded = 0;
+ for ( list< shared_ptr<Future::CommandResult> >::iterator i=futures.begin(); i!=futures.end(); i++ ) {
+ shared_ptr<Future::CommandResult> res = *i;
+ if ( ! res->join() ) {
+ errmsg = res->result()["errmsg"].String();
+ return false;
+ }
+
+ nearStr = res->result()["near"].String();
+ time += res->result()["stats"]["time"].Number();
+ btreelocs += res->result()["stats"]["btreelocs"].Number();
+ nscanned += res->result()["stats"]["nscanned"].Number();
+ objectsLoaded += res->result()["stats"]["objectsLoaded"].Number();
+
+ BSONForEach(obj, res->result()["results"].embeddedObject()) {
+ results.insert(make_pair(obj["dis"].Number(), obj.embeddedObject().getOwned()));
+ }
+
+ // TODO: maybe shrink results if size() > limit
+ }
+
+ result.append("ns" , fullns);
+ result.append("near", nearStr);
+
+ int outCount = 0;
+ double totalDistance = 0;
+ double maxDistance = 0;
+ {
+ BSONArrayBuilder sub (result.subarrayStart("results"));
+ for (multimap<double, BSONObj>::const_iterator it(results.begin()), end(results.end()); it!= end && outCount < limit; ++it, ++outCount) {
+ totalDistance += it->first;
+ maxDistance = it->first; // guaranteed to be highest so far
+
+ sub.append(it->second);
+ }
+ sub.done();
+ }
+
+ {
+ BSONObjBuilder sub (result.subobjStart("stats"));
+ sub.append("time", time);
+ sub.append("btreelocs", btreelocs);
+ sub.append("nscanned", nscanned);
+ sub.append("objectsLoaded", objectsLoaded);
+ sub.append("avgDistance", totalDistance / outCount);
+ sub.append("maxDistance", maxDistance);
+ sub.append("shards", shardArray.arr());
+ sub.done();
+ }
+
+ return true;
+ }
+ } geo2dFindNearCmd;
+
class MRCmd : public PublicGridCommand {
public:
- MRCmd() : PublicGridCommand( "mapreduce" ){}
-
- string getTmpName( const string& coll ){
+ MRCmd() : PublicGridCommand( "mapreduce" ) {}
+
+ string getTmpName( const string& coll ) {
static int inc = 1;
stringstream ss;
ss << "tmp.mrs." << coll << "_" << time(0) << "_" << inc++;
return ss.str();
}
- BSONObj fixForShards( const BSONObj& orig , const string& output ){
+ BSONObj fixForShards( const BSONObj& orig , const string& output, BSONObj& customOut , string& badShardedField ) {
BSONObjBuilder b;
BSONObjIterator i( orig );
- while ( i.more() ){
+ while ( i.more() ) {
BSONElement e = i.next();
string fn = e.fieldName();
- if ( fn == "map" ||
- fn == "mapreduce" ||
- fn == "reduce" ||
- fn == "query" ||
- fn == "sort" ||
- fn == "scope" ||
- fn == "verbose" ){
+ if ( fn == "map" ||
+ fn == "mapreduce" ||
+ fn == "mapparams" ||
+ fn == "reduce" ||
+ fn == "query" ||
+ fn == "sort" ||
+ fn == "scope" ||
+ fn == "verbose" ) {
b.append( e );
}
- else if ( fn == "keeptemp" ||
- fn == "out" ||
- fn == "finalize" ){
+ else if ( fn == "out" ||
+ fn == "finalize" ) {
// we don't want to copy these
+ if (fn == "out" && e.type() == Object) {
+ // check if there is a custom output
+ BSONObj out = e.embeddedObject();
+ if (out.hasField("db"))
+ customOut = out;
+ }
}
else {
- uassert( 10177 , (string)"don't know mr field: " + fn , 0 );
+ badShardedField = fn;
+ return BSONObj();
}
}
b.append( "out" , output );
return b.obj();
}
-
- bool run(const string& dbName , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){
+
+ bool run(const string& dbName , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool) {
Timer t;
string collection = cmdObj.firstElement().valuestrsafe();
string fullns = dbName + "." + collection;
+ const string shardedOutputCollection = getTmpName( collection );
+
+ string badShardedField;
+ BSONObj customOut;
+ BSONObj shardedCommand = fixForShards( cmdObj , shardedOutputCollection, customOut , badShardedField );
+
+ bool customOutDB = ! customOut.isEmpty() && customOut.hasField( "db" );
+
DBConfigPtr conf = grid.getDBConfig( dbName , false );
- if ( ! conf || ! conf->isShardingEnabled() || ! conf->isSharded( fullns ) ){
+ if ( ! conf || ! conf->isShardingEnabled() || ! conf->isSharded( fullns ) ) {
+ if ( customOutDB ) {
+ errmsg = "can't use out 'db' with non-sharded db";
+ return false;
+ }
return passthrough( conf , cmdObj , result );
}
-
+
+ if ( badShardedField.size() ) {
+ errmsg = str::stream() << "unknown m/r field for sharding: " << badShardedField;
+ return false;
+ }
+
BSONObjBuilder timingBuilder;
ChunkManagerPtr cm = conf->getChunkManager( fullns );
BSONObj q;
- if ( cmdObj["query"].type() == Object ){
+ if ( cmdObj["query"].type() == Object ) {
q = cmdObj["query"].embeddedObjectUserCheck();
}
-
+
set<Shard> shards;
cm->getShardsForQuery( shards , q );
-
- const string shardedOutputCollection = getTmpName( collection );
-
- BSONObj shardedCommand = fixForShards( cmdObj , shardedOutputCollection );
-
+
+
BSONObjBuilder finalCmd;
finalCmd.append( "mapreduce.shardedfinish" , cmdObj );
finalCmd.append( "shardedOutputCollection" , shardedOutputCollection );
- list< shared_ptr<Future::CommandResult> > futures;
-
- for ( set<Shard>::iterator i=shards.begin(), end=shards.end() ; i != end ; i++ ){
- futures.push_back( Future::spawnCommand( i->getConnString() , dbName , shardedCommand ) );
- }
- BSONObjBuilder shardresults;
- for ( list< shared_ptr<Future::CommandResult> >::iterator i=futures.begin(); i!=futures.end(); i++ ){
- shared_ptr<Future::CommandResult> res = *i;
- if ( ! res->join() ){
- errmsg = "mongod mr failed: ";
- errmsg += res->result().toString();
- return 0;
+ {
+ // we need to use our connections to the shard
+ // so filtering is done correctly for un-owned docs
+ // so we allocate them in our thread
+ // and hand off
+
+ vector< shared_ptr<ShardConnection> > shardConns;
+
+ list< shared_ptr<Future::CommandResult> > futures;
+
+ for ( set<Shard>::iterator i=shards.begin(), end=shards.end() ; i != end ; i++ ) {
+ shared_ptr<ShardConnection> temp( new ShardConnection( i->getConnString() , fullns ) );
+ assert( temp->get() );
+ futures.push_back( Future::spawnCommand( i->getConnString() , dbName , shardedCommand , temp->get() ) );
+ shardConns.push_back( temp );
+ }
+
+ bool failed = false;
+
+ BSONObjBuilder shardresults;
+ for ( list< shared_ptr<Future::CommandResult> >::iterator i=futures.begin(); i!=futures.end(); i++ ) {
+ shared_ptr<Future::CommandResult> res = *i;
+ if ( ! res->join() ) {
+ error() << "sharded m/r failed on shard: " << res->getServer() << " error: " << res->result() << endl;
+ result.append( "cause" , res->result() );
+ errmsg = "mongod mr failed: ";
+ errmsg += res->result().toString();
+ failed = true;
+ continue;
+ }
+ shardresults.append( res->getServer() , res->result() );
}
- shardresults.append( res->getServer() , res->result() );
+
+ for ( unsigned i=0; i<shardConns.size(); i++ )
+ shardConns[i]->done();
+
+ if ( failed )
+ return 0;
+
+ finalCmd.append( "shards" , shardresults.obj() );
+ timingBuilder.append( "shards" , t.millis() );
}
-
- finalCmd.append( "shards" , shardresults.obj() );
- timingBuilder.append( "shards" , t.millis() );
Timer t2;
- ShardConnection conn( conf->getPrimary() , fullns );
+ // by default the target database is same as input
+ Shard outServer = conf->getPrimary();
+ string outns = fullns;
+ if ( customOutDB ) {
+ // have to figure out shard for the output DB
+ BSONElement elmt = customOut.getField("db");
+ string outdb = elmt.valuestrsafe();
+ outns = outdb + "." + collection;
+ DBConfigPtr conf2 = grid.getDBConfig( outdb , true );
+ outServer = conf2->getPrimary();
+ }
+ log() << "customOut: " << customOut << " outServer: " << outServer << endl;
+
+ ShardConnection conn( outServer , outns );
BSONObj finalResult;
bool ok = conn->runCommand( dbName , finalCmd.obj() , finalResult );
conn.done();
- if ( ! ok ){
+ if ( ! ok ) {
errmsg = "final reduce failed: ";
errmsg += finalResult.toString();
return 0;
@@ -830,22 +983,22 @@ namespace mongo {
result.appendElements( finalResult );
result.append( "timeMillis" , t.millis() );
result.append( "timing" , timingBuilder.obj() );
-
+
return 1;
}
} mrCmd;
-
+
class ApplyOpsCmd : public PublicGridCommand {
public:
- ApplyOpsCmd() : PublicGridCommand( "applyOps" ){}
-
- virtual bool run(const string& dbName , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){
+ ApplyOpsCmd() : PublicGridCommand( "applyOps" ) {}
+
+ virtual bool run(const string& dbName , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool) {
errmsg = "applyOps not allowed through mongos";
return false;
}
-
+
} applyOpsCmd;
-
+
}
}