summaryrefslogtreecommitdiff
path: root/s/strategy_single.cpp
diff options
context:
space:
mode:
authorAntonin Kral <a.kral@bobek.cz>2010-08-11 12:38:57 +0200
committerAntonin Kral <a.kral@bobek.cz>2010-08-11 12:38:57 +0200
commit7645618fd3914cb8a20561625913c20d49504a49 (patch)
tree8370f846f58f6d71165b7a0e2eda04648584ec76 /s/strategy_single.cpp
parent68c73c3c7608b4c87f07440dc3232801720b1168 (diff)
downloadmongodb-7645618fd3914cb8a20561625913c20d49504a49.tar.gz
Imported Upstream version 1.6.0
Diffstat (limited to 's/strategy_single.cpp')
-rw-r--r--s/strategy_single.cpp185
1 files changed, 154 insertions, 31 deletions
diff --git a/s/strategy_single.cpp b/s/strategy_single.cpp
index 8f157d5..b840c9b 100644
--- a/s/strategy_single.cpp
+++ b/s/strategy_single.cpp
@@ -16,7 +16,7 @@
// strategy_simple.cpp
-#include "stdafx.h"
+#include "pch.h"
#include "request.h"
#include "../client/connpool.h"
#include "../db/commands.h"
@@ -40,29 +40,59 @@ namespace mongo {
log(3) << "single query: " << q.ns << " " << q.query << " ntoreturn: " << q.ntoreturn << endl;
try {
- if ( ( q.ntoreturn == -1 || q.ntoreturn == 1 ) && strstr(q.ns, ".$cmd") ) {
- BSONObjBuilder builder;
- bool ok = Command::runAgainstRegistered(q.ns, q.query, builder);
- if ( ok ) {
- BSONObj x = builder.done();
- replyToQuery(0, r.p(), r.m(), x);
+ if ( r.isCommand() ){
+
+ if ( handleSpecialNamespaces( r , q ) )
return;
+
+ int loops = 5;
+ while ( true ){
+ BSONObjBuilder builder;
+ try {
+ bool ok = Command::runAgainstRegistered(q.ns, q.query, builder);
+ if ( ok ) {
+ BSONObj x = builder.done();
+ replyToQuery(0, r.p(), r.m(), x);
+ return;
+ }
+ break;
+ }
+ catch ( StaleConfigException& e ){
+ if ( loops <= 0 )
+ throw e;
+
+ loops--;
+ log() << "retrying command: " << q.query << endl;
+ ShardConnection::checkMyConnectionVersions( e.getns() );
+ }
+ catch ( AssertionException& e ){
+ e.getInfo().append( builder , "assertion" , "assertionCode" );
+ builder.append( "errmsg" , "db assertion failure" );
+ builder.append( "ok" , 0 );
+ BSONObj x = builder.done();
+ replyToQuery(0, r.p(), r.m(), x);
+ return;
+ }
}
string commandName = q.query.firstElement().fieldName();
- if ( ! _commandsSafeToPass.count( commandName ) )
- log() << "passing through unknown command: " << commandName << " " << q.query << endl;
- }
+ uassert(13390, "unrecognized command: " + commandName, _commandsSafeToPass.count(commandName) != 0);
+ }
+
lateAssert = true;
- doQuery( r , r.singleServerName() );
+ doQuery( r , r.primaryShard() );
}
catch ( AssertionException& e ) {
- assert( !lateAssert );
+ if ( lateAssert ){
+ log() << "lateAssert: " << e.getInfo() << endl;
+ assert( !lateAssert );
+ }
+
BSONObjBuilder err;
- err.append("$err", string("mongos: ") + (e.msg.empty() ? "assertion during query" : e.msg));
+ e.getInfo().append( err );
BSONObj errObj = err.done();
- replyToQuery(QueryResult::ResultFlag_ErrSet, r.p() , r.m() , errObj);
+ replyToQuery(ResultFlag_ErrSet, r.p() , r.m() , errObj);
return;
}
@@ -73,18 +103,14 @@ namespace mongo {
log(3) << "single getmore: " << ns << endl;
- ScopedDbConnection dbcon( r.singleServerName() );
- DBClientBase& _c = dbcon.conn();
-
- // TODO
- DBClientConnection &c = dynamic_cast<DBClientConnection&>(_c);
+ ShardConnection conn( r.primaryShard() , ns );
Message response;
- bool ok = c.port().call( r.m() , response);
+ bool ok = conn->callRead( r.m() , response);
uassert( 10204 , "dbgrid: getmore: error calling db", ok);
- r.reply( response );
+ r.reply( response , conn->getServerAddress() );
- dbcon.done();
+ conn.done();
}
@@ -97,18 +123,26 @@ namespace mongo {
BSONObj o = d.nextJsObj();
const char * ns = o["ns"].valuestr();
if ( r.getConfig()->isSharded( ns ) ){
+ BSONObj newIndexKey = o["key"].embeddedObjectUserCheck();
+
uassert( 10205 , (string)"can't use unique indexes with sharding ns:" + ns +
" key: " + o["key"].embeddedObjectUserCheck().toString() ,
- IndexDetails::isIdIndexPattern( o["key"].embeddedObjectUserCheck() ) ||
- ! o["unique"].trueValue() );
- ChunkManager * cm = r.getConfig()->getChunkManager( ns );
+ IndexDetails::isIdIndexPattern( newIndexKey ) ||
+ ! o["unique"].trueValue() ||
+ r.getConfig()->getChunkManager( ns )->getShardKey().isPrefixOf( newIndexKey ) );
+
+ ChunkManagerPtr cm = r.getConfig()->getChunkManager( ns );
assert( cm );
- for ( int i=0; i<cm->numChunks();i++)
- doWrite( op , r , cm->getChunk(i)->getShard() );
+
+ set<Shard> shards;
+ cm->getAllShards(shards);
+ for (set<Shard>::const_iterator it=shards.begin(), end=shards.end(); it != end; ++it)
+ doWrite( op , r , *it );
}
else {
- doWrite( op , r , r.singleServerName() );
+ doWrite( op , r , r.primaryShard() );
}
+ r.gotInsert();
}
}
else if ( op == dbUpdate ){
@@ -129,15 +163,104 @@ namespace mongo {
const char *ns = r.getns();
if ( r.isShardingEnabled() &&
- strstr( ns , ".system.indexes" ) == strstr( ns , "." ) &&
- strstr( ns , "." ) ){
+ strstr( ns , ".system.indexes" ) == strchr( ns , '.' ) &&
+ strchr( ns , '.' ) ) {
log(1) << " .system.indexes write for: " << ns << endl;
handleIndexWrite( op , r );
return;
}
log(3) << "single write: " << ns << endl;
- doWrite( op , r , r.singleServerName() );
+ doWrite( op , r , r.primaryShard() );
+ r.gotInsert(); // Won't handle mulit-insert correctly. Not worth parsing the request.
+ }
+
+ bool handleSpecialNamespaces( Request& r , QueryMessage& q ){
+ const char * ns = r.getns();
+ ns = strstr( r.getns() , ".$cmd.sys." );
+ if ( ! ns )
+ return false;
+ ns += 10;
+
+ BSONObjBuilder b;
+ vector<Shard> shards;
+
+ if ( strcmp( ns , "inprog" ) == 0 ){
+ Shard::getAllShards( shards );
+
+ BSONArrayBuilder arr( b.subarrayStart( "inprog" ) );
+
+ for ( unsigned i=0; i<shards.size(); i++ ){
+ Shard shard = shards[i];
+ ScopedDbConnection conn( shard );
+ BSONObj temp = conn->findOne( r.getns() , BSONObj() );
+ if ( temp["inprog"].isABSONObj() ){
+ BSONObjIterator i( temp["inprog"].Obj() );
+ while ( i.more() ){
+ BSONObjBuilder x;
+
+ BSONObjIterator j( i.next().Obj() );
+ while( j.more() ){
+ BSONElement e = j.next();
+ if ( strcmp( e.fieldName() , "opid" ) == 0 ){
+ stringstream ss;
+ ss << shard.getName() << ':' << e.numberInt();
+ x.append( "opid" , ss.str() );
+ }
+ else {
+ x.append( e );
+ }
+ }
+ arr.append( x.obj() );
+ }
+ }
+ conn.done();
+ }
+
+ arr.done();
+ }
+ else if ( strcmp( ns , "killop" ) == 0 ){
+ BSONElement e = q.query["op"];
+ if ( strstr( r.getns() , "admin." ) != 0 ){
+ b.append( "err" , "unauthorized" );
+ }
+ else if ( e.type() != String ){
+ b.append( "err" , "bad op" );
+ b.append( e );
+ }
+ else {
+ b.append( e );
+ string s = e.String();
+ string::size_type i = s.find( ':' );
+ if ( i == string::npos ){
+ b.append( "err" , "bad opid" );
+ }
+ else {
+ string shard = s.substr( 0 , i );
+ int opid = atoi( s.substr( i + 1 ).c_str() );
+ b.append( "shard" , shard );
+ b.append( "shardid" , opid );
+
+ log() << "want to kill op: " << e << endl;
+ Shard s(shard);
+
+ ScopedDbConnection conn( s );
+ conn->findOne( r.getns() , BSON( "op" << opid ) );
+ conn.done();
+ }
+ }
+ }
+ else if ( strcmp( ns , "unlock" ) == 0 ){
+ b.append( "err" , "can't do unlock through mongos" );
+ }
+ else {
+ log( LL_WARNING ) << "unknown sys command [" << ns << "]" << endl;
+ return false;
+ }
+
+ BSONObj x = b.done();
+ replyToQuery(0, r.p(), r.m(), x);
+ return true;
}
set<string> _commandsSafeToPass;