summaryrefslogtreecommitdiff
path: root/s
diff options
context:
space:
mode:
authorAntonin Kral <a.kral@bobek.cz>2011-06-18 21:24:41 +0200
committerAntonin Kral <a.kral@bobek.cz>2011-06-18 21:24:41 +0200
commit64b33ee522375a8dc15be2875dfb7db4502259b0 (patch)
tree44979e0aaf6bb576f4a737a93e071e28809b6779 /s
parent4d87ff4aa74d7ae975268ac43eee152dc3f5b7e9 (diff)
downloadmongodb-64b33ee522375a8dc15be2875dfb7db4502259b0.tar.gz
Imported Upstream version 1.8.2
Diffstat (limited to 's')
-rw-r--r--s/balance.cpp17
-rw-r--r--s/balancer_policy.cpp8
-rw-r--r--s/chunk.cpp88
-rw-r--r--s/chunk.h12
-rw-r--r--s/client.cpp8
-rw-r--r--s/client.h5
-rw-r--r--s/commands_admin.cpp22
-rw-r--r--s/commands_public.cpp4
-rw-r--r--s/config.cpp4
-rw-r--r--s/d_logic.cpp1
-rw-r--r--s/d_migrate.cpp123
-rw-r--r--s/d_split.cpp8
-rw-r--r--s/d_state.cpp4
-rw-r--r--s/grid.cpp6
-rw-r--r--s/grid.h2
-rw-r--r--s/shard.cpp96
-rw-r--r--s/shard_version.cpp6
-rw-r--r--s/strategy_shard.cpp6
-rw-r--r--s/strategy_single.cpp2
-rw-r--r--s/writeback_listener.cpp17
-rw-r--r--s/writeback_listener.h24
21 files changed, 312 insertions, 151 deletions
diff --git a/s/balance.cpp b/s/balance.cpp
index ee0c992..8b01ea7 100644
--- a/s/balance.cpp
+++ b/s/balance.cpp
@@ -276,20 +276,21 @@ namespace mongo {
try {
- // first make sure we should even be running
+ ScopedDbConnection conn( config );
+
+ // ping has to be first so we keep things in the config server in sync
+ _ping( conn.conn() );
+
+ // now make sure we should even be running
if ( ! grid.shouldBalance() ) {
log(1) << "skipping balancing round because balancing is disabled" << endl;
+ conn.done();
+
sleepsecs( 30 );
continue;
}
-
- ScopedDbConnection conn( config );
-
- _ping( conn.conn() );
- if ( ! _checkOIDs() ) {
- uassert( 13258 , "oids broken after resetting!" , _checkOIDs() );
- }
+ uassert( 13258 , "oids broken after resetting!" , _checkOIDs() );
// use fresh shard state
Shard::reloadShardInfo();
diff --git a/s/balancer_policy.cpp b/s/balancer_policy.cpp
index 2098a1f..482fab0 100644
--- a/s/balancer_policy.cpp
+++ b/s/balancer_policy.cpp
@@ -40,6 +40,8 @@ namespace mongo {
pair<string,unsigned> max("",0);
vector<string> drainingShards;
+ bool maxOpsQueued = false;
+
for (ShardToChunksIter i = shardToChunksMap.begin(); i!=shardToChunksMap.end(); ++i ) {
// Find whether this shard's capacity or availability are exhausted
@@ -67,6 +69,7 @@ namespace mongo {
// Draining shards take a lower priority than overloaded shards.
if ( size > max.second ) {
max = make_pair( shard , size );
+ maxOpsQueued = opsQueued;
}
if ( draining && (size > 0)) {
drainingShards.push_back( shard );
@@ -80,6 +83,11 @@ namespace mongo {
return NULL;
}
+ if ( maxOpsQueued ) {
+ log() << "biggest shard has unprocessed writebacks, waiting for completion of migrate" << endl;
+ return NULL;
+ }
+
log(1) << "collection : " << ns << endl;
log(1) << "donor : " << max.second << " chunks on " << max.first << endl;
log(1) << "receiver : " << min.second << " chunks on " << min.first << endl;
diff --git a/s/chunk.cpp b/s/chunk.cpp
index b2ad03d..1e473e2 100644
--- a/s/chunk.cpp
+++ b/s/chunk.cpp
@@ -175,7 +175,7 @@ namespace mongo {
conn.done();
}
- ChunkPtr Chunk::singleSplit( bool force , BSONObj& res ) {
+ bool Chunk::singleSplit( bool force , BSONObj& res , ChunkPtr* low, ChunkPtr* high) {
vector<BSONObj> splitPoint;
// if splitting is not obligatory we may return early if there are not enough data
@@ -190,7 +190,7 @@ namespace mongo {
// 1 split point means we have between half the chunk size to full chunk size
// so we shouldn't split
log(1) << "chunk not full enough to trigger auto-split" << endl;
- return ChunkPtr();
+ return false;
}
splitPoint.push_back( candidates.front() );
@@ -228,13 +228,24 @@ namespace mongo {
if ( splitPoint.empty() || _min == splitPoint.front() || _max == splitPoint.front() ) {
log() << "want to split chunk, but can't find split point chunk " << toString()
<< " got: " << ( splitPoint.empty() ? "<empty>" : splitPoint.front().toString() ) << endl;
- return ChunkPtr();
+ return false;
}
- return multiSplit( splitPoint , res );
+ if (!multiSplit( splitPoint , res , true ))
+ return false;
+
+ if (low && high) {
+ low->reset( new Chunk(_manager, _min, splitPoint[0], _shard));
+ high->reset(new Chunk(_manager, splitPoint[0], _max, _shard));
+ }
+ else {
+ assert(!low && !high); // can't have one without the other
+ }
+
+ return true;
}
- ChunkPtr Chunk::multiSplit( const vector<BSONObj>& m , BSONObj& res ) {
+ bool Chunk::multiSplit( const vector<BSONObj>& m , BSONObj& res , bool resetIfSplit) {
const size_t maxSplitPoints = 8192;
uassert( 10165 , "can't split as shard doesn't have a manager" , _manager );
@@ -261,27 +272,19 @@ namespace mongo {
// reloading won't stricly solve all problems, e.g. the collection's metdata lock can be taken
// but we issue here so that mongos may refresh wihtout needing to be written/read against
- _manager->_reload();
+ grid.getDBConfig(_manager->getns())->getChunkManager(_manager->getns(), true);
- return ChunkPtr();
+ return false;
}
conn.done();
- _manager->_reload();
- // The previous multisplit logic adjusted the boundaries of 'this' chunk. Any call to 'this' object hereafter
- // will see a different _max for the chunk.
- // TODO Untie this dependency since, for metadata purposes, the reload() above already fixed boundaries
- {
- rwlock lk( _manager->_lock , true );
-
- setMax(m[0].getOwned());
- DEV assert( shared_from_this() );
- _manager->_chunkMap[_max] = shared_from_this();
- }
+ if ( resetIfSplit ) {
+ // force reload of chunks
+ grid.getDBConfig(_manager->getns())->getChunkManager(_manager->getns(), true);
+ }
- // return the second half, if a single split, or the first new chunk, if a multisplit.
- return _manager->findChunk( m[0] );
+ return true;
}
bool Chunk::moveAndCommit( const Shard& to , long long chunkSize /* bytes */, BSONObj& res ) {
@@ -311,7 +314,7 @@ namespace mongo {
// if succeeded, needs to reload to pick up the new location
// if failed, mongos may be stale
// reload is excessive here as the failure could be simply because collection metadata is taken
- _manager->_reload();
+ grid.getDBConfig(_manager->getns())->getChunkManager(_manager->getns(), true);
return worked;
}
@@ -334,21 +337,23 @@ namespace mongo {
_dataWritten = 0; // reset so we check often enough
BSONObj res;
- ChunkPtr newShard = singleSplit( false /* does not force a split if not enough data */ , res );
- if ( newShard.get() == NULL ) {
+ ChunkPtr low;
+ ChunkPtr high;
+ bool worked = singleSplit( false /* does not force a split if not enough data */ , res , &low, &high);
+ if ( !worked ) {
// singleSplit would have issued a message if we got here
_dataWritten = 0; // this means there wasn't enough data to split, so don't want to try again until considerable more data
return false;
}
log() << "autosplitted " << _manager->getns() << " shard: " << toString()
- << " on: " << newShard->getMax() << "(splitThreshold " << splitThreshold << ")"
+ << " on: " << low->getMax() << "(splitThreshold " << splitThreshold << ")"
#ifdef _DEBUG
<< " size: " << getPhysicalSize() // slow - but can be usefule when debugging
#endif
<< endl;
- moveIfShould( newShard );
+ low->moveIfShould( high );
return true;
@@ -671,7 +676,7 @@ namespace mongo {
log() << "successfully created first chunk for " << c->toString() << endl;
}
- ChunkPtr ChunkManager::findChunk( const BSONObj & obj , bool retry ) {
+ ChunkPtr ChunkManager::findChunk( const BSONObj & obj) {
BSONObj key = _key.extractKey(obj);
{
@@ -695,20 +700,13 @@ namespace mongo {
PRINT(*c);
PRINT(key);
- _reload_inlock();
+ grid.getDBConfig(getns())->getChunkManager(getns(), true);
massert(13141, "Chunk map pointed to incorrect chunk", false);
}
}
- if ( retry ) {
- stringstream ss;
- ss << "couldn't find a chunk aftry retry which should be impossible extracted: " << key;
- throw UserException( 8070 , ss.str() );
- }
-
- log() << "ChunkManager: couldn't find chunk for: " << key << " going to retry" << endl;
- _reload_inlock();
- return findChunk( obj , true );
+ massert(8070, str::stream() << "couldn't find a chunk aftry retry which should be impossible extracted: " << key, false);
+ return ChunkPtr(); // unreachable
}
ChunkPtr ChunkManager::findChunkOnServer( const Shard& shard ) const {
@@ -874,24 +872,26 @@ namespace mongo {
configServer.logChange( "dropCollection" , _ns , BSONObj() );
}
- void ChunkManager::maybeChunkCollection() {
+ bool ChunkManager::maybeChunkCollection() {
+ ensureIndex_inlock();
+
uassert( 13346 , "can't pre-split already splitted collection" , (_chunkMap.size() == 1) );
-
+
ChunkPtr soleChunk = _chunkMap.begin()->second;
vector<BSONObj> splitPoints;
soleChunk->pickSplitVector( splitPoints , Chunk::MaxChunkSize );
if ( splitPoints.empty() ) {
log(1) << "not enough data to warrant chunking " << getns() << endl;
- return;
+ return false;
}
-
+
BSONObj res;
- ChunkPtr p;
- p = soleChunk->multiSplit( splitPoints , res );
- if ( p.get() == NULL ) {
+ bool worked = soleChunk->multiSplit( splitPoints , res , false );
+ if (!worked) {
log( LL_WARNING ) << "could not split '" << getns() << "': " << res << endl;
- return;
+ return false;
}
+ return true;
}
ShardChunkVersion ChunkManager::getVersion( const Shard& shard ) const {
diff --git a/s/chunk.h b/s/chunk.h
index b4bcc35..21e1fbf 100644
--- a/s/chunk.h
+++ b/s/chunk.h
@@ -108,18 +108,18 @@ namespace mongo {
* @param force if set to true, will split the chunk regardless if the split is really necessary size wise
* if set to false, will only split if the chunk has reached the currently desired maximum size
* @param res the object containing details about the split execution
- * @return if found a key, return a pointer to the first chunk, otherwise return a null pointer
+ * @return if found a key and split successfully
*/
- ChunkPtr singleSplit( bool force , BSONObj& res );
+ bool singleSplit( bool force , BSONObj& res , ChunkPtr* low=NULL, ChunkPtr* high=NULL);
/**
* Splits this chunk at the given key (or keys)
*
* @param splitPoints the vector of keys that should be used to divide this chunk
* @param res the object containing details about the split execution
- * @return shared pointer to the first new Chunk or null pointer if failed
+ * @return if split was successful
*/
- ChunkPtr multiSplit( const vector<BSONObj>& splitPoints , BSONObj& res );
+ bool multiSplit( const vector<BSONObj>& splitPoints , BSONObj& res , bool resetIfSplit );
/**
* Asks the mongod holding this chunk to find a key that approximately divides this chunk in two
@@ -308,13 +308,13 @@ namespace mongo {
bool hasShardKey( const BSONObj& obj );
void createFirstChunk( const Shard& shard );
- ChunkPtr findChunk( const BSONObj& obj , bool retry = false );
+ ChunkPtr findChunk( const BSONObj& obj );
ChunkPtr findChunkOnServer( const Shard& shard ) const;
const ShardKeyPattern& getShardKey() const { return _key; }
bool isUnique() const { return _unique; }
- void maybeChunkCollection();
+ bool maybeChunkCollection();
void getShardsForQuery( set<Shard>& shards , const BSONObj& query );
void getAllShards( set<Shard>& all );
diff --git a/s/client.cpp b/s/client.cpp
index 95e3124..c0d25fb 100644
--- a/s/client.cpp
+++ b/s/client.cpp
@@ -100,7 +100,11 @@ namespace mongo {
return;
}
- all.push_back( WBInfo( cid.numberLong() , w.OID() ) );
+ string ident = "";
+ if ( gle["instanceIdent"].type() == String )
+ ident = gle["instanceIdent"].String();
+
+ all.push_back( WBInfo( WriteBackListener::ConnectionIdent( ident , cid.numberLong() ) , w.OID() ) );
}
vector<BSONObj> ClientInfo::_handleWriteBacks( vector<WBInfo>& all , bool fromWriteBackListener ) {
@@ -115,7 +119,7 @@ namespace mongo {
}
for ( unsigned i=0; i<all.size(); i++ ) {
- res.push_back( WriteBackListener::waitFor( all[i].connectionId , all[i].id ) );
+ res.push_back( WriteBackListener::waitFor( all[i].ident , all[i].id ) );
}
return res;
diff --git a/s/client.h b/s/client.h
index 2e9fefe..a01b1de 100644
--- a/s/client.h
+++ b/s/client.h
@@ -17,6 +17,7 @@
*/
#include "../pch.h"
+#include "writeback_listener.h"
namespace mongo {
@@ -85,8 +86,8 @@ namespace mongo {
private:
struct WBInfo {
- WBInfo( ConnectionId c , OID o ) : connectionId( c ) , id( o ) {}
- ConnectionId connectionId;
+ WBInfo( const WriteBackListener::ConnectionIdent& c , OID o ) : ident( c ) , id( o ) {}
+ WriteBackListener::ConnectionIdent ident;
OID id;
};
diff --git a/s/commands_admin.cpp b/s/commands_admin.cpp
index 532161a..7677265 100644
--- a/s/commands_admin.cpp
+++ b/s/commands_admin.cpp
@@ -79,6 +79,20 @@ namespace mongo {
}
} netstat;
+ class FlushRouterConfigCmd : public GridAdminCmd {
+ public:
+ FlushRouterConfigCmd() : GridAdminCmd("flushRouterConfig") { }
+ virtual void help( stringstream& help ) const {
+ help << "flush all router config";
+ }
+ bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool) {
+ grid.flushConfig();
+ result.appendBool( "flushed" , true );
+ return true;
+ }
+ } flushRouterConfigCmd;
+
+
class ServerStatusCmd : public Command {
public:
ServerStatusCmd() : Command( "serverStatus" , true ) {
@@ -512,9 +526,9 @@ namespace mongo {
log() << "splitting: " << ns << " shard: " << chunk << endl;
BSONObj res;
- ChunkPtr p;
+ bool worked;
if ( middle.isEmpty() ) {
- p = chunk->singleSplit( true /* force a split even if not enough data */ , res );
+ worked = chunk->singleSplit( true /* force a split even if not enough data */ , res );
}
else {
@@ -526,10 +540,10 @@ namespace mongo {
vector<BSONObj> splitPoints;
splitPoints.push_back( middle );
- p = chunk->multiSplit( splitPoints , res );
+ worked = chunk->multiSplit( splitPoints , res , true );
}
- if ( p.get() == NULL ) {
+ if ( !worked ) {
errmsg = "split failed";
result.append( "cause" , res );
return false;
diff --git a/s/commands_public.cpp b/s/commands_public.cpp
index 5b1ecaf..f29205b 100644
--- a/s/commands_public.cpp
+++ b/s/commands_public.cpp
@@ -545,6 +545,10 @@ namespace mongo {
bool ok = conn->runCommand( conf->getName() , cmdObj , res );
conn.done();
+ if (!ok && res.getIntField("code") == 9996) { // code for StaleConfigException
+ throw StaleConfigException(fullns, "FindAndModify"); // Command code traps this and re-runs
+ }
+
result.appendElements(res);
return ok;
}
diff --git a/s/config.cpp b/s/config.cpp
index 35a3be2..9ed3207 100644
--- a/s/config.cpp
+++ b/s/config.cpp
@@ -143,7 +143,9 @@ namespace mongo {
_save();
try {
- cm->maybeChunkCollection();
+ if ( cm->maybeChunkCollection() ) {
+ _load();
+ }
}
catch ( UserException& e ) {
// failure to chunk is not critical enough to abort the command (and undo the _save()'d configDB state)
diff --git a/s/d_logic.cpp b/s/d_logic.cpp
index c032883..1ab7c64 100644
--- a/s/d_logic.cpp
+++ b/s/d_logic.cpp
@@ -103,6 +103,7 @@ namespace mongo {
b.append( "ns" , ns );
b.append( "id" , writebackID );
b.append( "connectionId" , cc().getConnectionId() );
+ b.append( "instanceIdent" , prettyHostName() );
b.appendTimestamp( "version" , shardingState.getVersion( ns ) );
b.appendTimestamp( "yourVersion" , ShardedConnectionInfo::get( true )->getVersion( ns ) );
b.appendBinData( "msg" , m.header()->len , bdtCustom , (char*)(m.singleData()) );
diff --git a/s/d_migrate.cpp b/s/d_migrate.cpp
index 2878276..df12e54 100644
--- a/s/d_migrate.cpp
+++ b/s/d_migrate.cpp
@@ -273,9 +273,12 @@ namespace mongo {
void done() {
readlock lk( _ns );
- _deleted.clear();
- _reload.clear();
- _cloneLocs.clear();
+ {
+ scoped_spinlock lk( _trackerLocks );
+ _deleted.clear();
+ _reload.clear();
+ _cloneLocs.clear();
+ }
_memoryUsed = 0;
scoped_lock l(_m);
@@ -454,6 +457,7 @@ namespace mongo {
while ( cc->ok() ) {
DiskLoc dl = cc->currLoc();
if ( ! isLargeChunk ) {
+ scoped_spinlock lk( _trackerLocks );
_cloneLocs.insert( dl );
}
cc->advance();
@@ -480,7 +484,10 @@ namespace mongo {
return false;
}
- log() << "moveChunk number of documents: " << _cloneLocs.size() << endl;
+ {
+ scoped_spinlock lk( _trackerLocks );
+ log() << "moveChunk number of documents: " << _cloneLocs.size() << endl;
+ }
return true;
}
@@ -490,29 +497,50 @@ namespace mongo {
return false;
}
- readlock l( _ns );
- Client::Context ctx( _ns );
+ ElapsedTracker tracker (128, 10); // same as ClientCursor::_yieldSometimesTracker
- NamespaceDetails *d = nsdetails( _ns.c_str() );
- assert( d );
+ int allocSize;
+ {
+ readlock l(_ns);
+ Client::Context ctx( _ns );
+ NamespaceDetails *d = nsdetails( _ns.c_str() );
+ assert( d );
+ scoped_spinlock lk( _trackerLocks );
+ allocSize = std::min(BSONObjMaxUserSize, (int)((12 + d->averageObjectSize()) * _cloneLocs.size()));
+ }
+ BSONArrayBuilder a (allocSize);
+
+ while ( 1 ) {
+ bool filledBuffer = false;
+
+ readlock l( _ns );
+ Client::Context ctx( _ns );
+ scoped_spinlock lk( _trackerLocks );
+ set<DiskLoc>::iterator i = _cloneLocs.begin();
+ for ( ; i!=_cloneLocs.end(); ++i ) {
+ if (tracker.ping()) // should I yield?
+ break;
+
+ DiskLoc dl = *i;
+ BSONObj o = dl.obj();
+
+ // use the builder size instead of accumulating 'o's size so that we take into consideration
+ // the overhead of BSONArray indices
+ if ( a.len() + o.objsize() + 1024 > BSONObjMaxUserSize ) {
+ filledBuffer = true; // break out of outer while loop
+ break;
+ }
- BSONArrayBuilder a( std::min( BSONObjMaxUserSize , (int)( ( 12 + d->averageObjectSize() )* _cloneLocs.size() ) ) );
+ a.append( o );
+ }
- set<DiskLoc>::iterator i = _cloneLocs.begin();
- for ( ; i!=_cloneLocs.end(); ++i ) {
- DiskLoc dl = *i;
- BSONObj o = dl.obj();
+ _cloneLocs.erase( _cloneLocs.begin() , i );
- // use the builder size instead of accumulating 'o's size so that we take into consideration
- // the overhead of BSONArray indices
- if ( a.len() + o.objsize() + 1024 > BSONObjMaxUserSize ) {
+ if ( _cloneLocs.empty() || filledBuffer )
break;
- }
- a.append( o );
}
result.appendArray( "objects" , a.arr() );
- _cloneLocs.erase( _cloneLocs.begin() , i );
return true;
}
@@ -525,6 +553,11 @@ namespace mongo {
if ( ! db->ownsNS( _ns ) )
return;
+
+ // not needed right now
+ // but trying to prevent a future bug
+ scoped_spinlock lk( _trackerLocks );
+
_cloneLocs.erase( dl );
}
@@ -544,9 +577,13 @@ namespace mongo {
BSONObj _min;
BSONObj _max;
+ // we need the lock in case there is a malicious _migrateClone for example
+ // even though it shouldn't be needed under normal operation
+ SpinLock _trackerLocks;
+
// disk locs yet to be transferred from here to the other side
- // no locking needed because build by 1 thread in a read lock
- // depleted by 1 thread in a read lock
+ // no locking needed because built initially by 1 thread in a read lock
+ // emptied by 1 thread in a read lock
// updates applied by 1 thread in a write lock
set<DiskLoc> _cloneLocs;
@@ -1141,6 +1178,8 @@ namespace mongo {
assert( state == READY );
assert( ! min.isEmpty() );
assert( ! max.isEmpty() );
+
+ slaveCount = ( getSlaveCount() / 2 ) + 1;
MoveTimingHelper timing( "to" , ns , min , max , 5 /* steps */ );
@@ -1236,11 +1275,32 @@ namespace mongo {
break;
apply( res , &lastOpApplied );
+
+ const int maxIterations = 3600*50;
+ int i;
+ for ( i=0;i<maxIterations; i++) {
+ if ( state == ABORT ) {
+ timing.note( "aborted" );
+ return;
+ }
+
+ if ( opReplicatedEnough( lastOpApplied ) )
+ break;
+
+ if ( i > 100 ) {
+ warning() << "secondaries having hard time keeping up with migrate" << endl;
+ }
- if ( state == ABORT ) {
- timing.note( "aborted" );
- return;
+ sleepmillis( 20 );
}
+
+ if ( i == maxIterations ) {
+ errmsg = "secondary can't keep up with migrate";
+ error() << errmsg << endl;
+ conn.done();
+ state = FAIL;
+ return;
+ }
}
timing.done(4);
@@ -1364,14 +1424,17 @@ namespace mongo {
return didAnything;
}
- bool flushPendingWrites( const ReplTime& lastOpApplied ) {
+ bool opReplicatedEnough( const ReplTime& lastOpApplied ) {
// if replication is on, try to force enough secondaries to catch up
// TODO opReplicatedEnough should eventually honor priorities and geo-awareness
// for now, we try to replicate to a sensible number of secondaries
- const int slaveCount = getSlaveCount() / 2 + 1;
- if ( ! opReplicatedEnough( lastOpApplied , slaveCount ) ) {
- log( LL_WARNING ) << "migrate commit attempt timed out contacting " << slaveCount
- << " slaves for '" << ns << "' " << min << " -> " << max << endl;
+ return mongo::opReplicatedEnough( lastOpApplied , slaveCount );
+ }
+
+ bool flushPendingWrites( const ReplTime& lastOpApplied ) {
+ if ( ! opReplicatedEnough( lastOpApplied ) ) {
+ warning() << "migrate commit attempt timed out contacting " << slaveCount
+ << " slaves for '" << ns << "' " << min << " -> " << max << endl;
return false;
}
log() << "migrate commit succeeded flushing to secondaries for '" << ns << "' " << min << " -> " << max << endl;
@@ -1438,6 +1501,8 @@ namespace mongo {
long long clonedBytes;
long long numCatchup;
long long numSteady;
+
+ int slaveCount;
enum State { READY , CLONE , CATCHUP , STEADY , COMMIT_START , DONE , FAIL , ABORT } state;
string errmsg;
diff --git a/s/d_split.cpp b/s/d_split.cpp
index 66fe38e..3ed6e9b 100644
--- a/s/d_split.cpp
+++ b/s/d_split.cpp
@@ -138,6 +138,11 @@ namespace mongo {
const char* ns = jsobj.getStringField( "checkShardingIndex" );
BSONObj keyPattern = jsobj.getObjectField( "keyPattern" );
+ if ( keyPattern.nFields() == 1 && str::equals( "_id" , keyPattern.firstElement().fieldName() ) ) {
+ result.appendBool( "idskip" , true );
+ return true;
+ }
+
// If min and max are not provided use the "minKey" and "maxKey" for the sharding key pattern.
BSONObj min = jsobj.getObjectField( "min" );
BSONObj max = jsobj.getObjectField( "max" );
@@ -211,6 +216,9 @@ namespace mongo {
return false;
}
cc->advance();
+
+ if ( ! cc->yieldSometimes() )
+ break;
}
return true;
diff --git a/s/d_state.cpp b/s/d_state.cpp
index 11fbcef..e10400f 100644
--- a/s/d_state.cpp
+++ b/s/d_state.cpp
@@ -75,7 +75,7 @@ namespace mongo {
<< " before [" << _shardName << "] "
<< " got [" << name << "] "
;
- uasserted( 13298 , ss.str() );
+ msgasserted( 13298 , ss.str() );
}
void ShardingState::gotShardHost( string host ) {
@@ -97,7 +97,7 @@ namespace mongo {
<< " before [" << _shardHost << "] "
<< " got [" << host << "] "
;
- uasserted( 13299 , ss.str() );
+ msgasserted( 13299 , ss.str() );
}
void ShardingState::resetShardingState() {
diff --git a/s/grid.cpp b/s/grid.cpp
index 0045754..0646507 100644
--- a/s/grid.cpp
+++ b/s/grid.cpp
@@ -440,8 +440,14 @@ namespace mongo {
return ( dbName == "local" ) || ( dbName == "admin" ) || ( dbName == "config" );
}
+ void Grid::flushConfig() {
+ scoped_lock lk( _lock );
+ _databases.clear();
+ }
+
Grid grid;
+
// unit tests
class BalancingWindowUnitTest : public UnitTest {
diff --git a/s/grid.h b/s/grid.h
index 5692a82..e5af33f 100644
--- a/s/grid.h
+++ b/s/grid.h
@@ -83,6 +83,8 @@ namespace mongo {
bool shouldBalance() const;
unsigned long long getNextOpTime() const;
+
+ void flushConfig();
// exposed methods below are for testing only
diff --git a/s/shard.cpp b/s/shard.cpp
index dbfd8f9..c1e3b56 100644
--- a/s/shard.cpp
+++ b/s/shard.cpp
@@ -25,6 +25,8 @@
namespace mongo {
+ typedef shared_ptr<Shard> ShardPtr;
+
class StaticShardInfo {
public:
StaticShardInfo() : _mutex("StaticShardInfo") { }
@@ -48,9 +50,9 @@ namespace mongo {
// the config state intact. The rationale is that this way we could drop shards that
// were removed without reinitializing the config DB information.
- map<string,Shard>::iterator i = _lookup.find( "config" );
+ ShardMap::iterator i = _lookup.find( "config" );
if ( i != _lookup.end() ) {
- Shard config = i->second;
+ ShardPtr config = i->second;
_lookup.clear();
_lookup[ "config" ] = config;
}
@@ -75,14 +77,14 @@ namespace mongo {
isDraining = isDrainingElem.Bool();
}
- Shard s( name , host , maxSize , isDraining );
+ ShardPtr s( new Shard( name , host , maxSize , isDraining ) );
_lookup[name] = s;
_installHost( host , s );
}
}
- const Shard& find( const string& ident ) {
+ ShardPtr find( const string& ident ) {
string mykey = ident;
{
@@ -94,7 +96,7 @@ namespace mongo {
{
scoped_lock lk( _mutex );
- map<string,Shard>::iterator i = _lookup.find( mykey );
+ ShardMap::iterator i = _lookup.find( mykey );
if ( i != _lookup.end() )
return i->second;
@@ -104,23 +106,24 @@ namespace mongo {
reload();
scoped_lock lk( _mutex );
- map<string,Shard>::iterator i = _lookup.find( mykey );
+ ShardMap::iterator i = _lookup.find( mykey );
massert( 13129 , (string)"can't find shard for: " + mykey , i != _lookup.end() );
return i->second;
}
void set( const string& name , const Shard& s , bool setName = true , bool setAddr = true ) {
scoped_lock lk( _mutex );
+ ShardPtr ss( new Shard( s ) );
if ( setName )
- _lookup[name] = s;
+ _lookup[name] = ss;
if ( setAddr )
- _installHost( s.getConnString() , s );
+ _installHost( s.getConnString() , ss );
}
- void _installHost( const string& host , const Shard& s ) {
+ void _installHost( const string& host , const ShardPtr& s ) {
_lookup[host] = s;
- const ConnectionString& cs = s.getAddress();
+ const ConnectionString& cs = s->getAddress();
if ( cs.type() == ConnectionString::SET ) {
if ( cs.getSetName().size() )
_lookup[ cs.getSetName() ] = s;
@@ -134,9 +137,9 @@ namespace mongo {
void remove( const string& name ) {
scoped_lock lk( _mutex );
- for ( map<string,Shard>::iterator i = _lookup.begin(); i!=_lookup.end(); ) {
- Shard s = i->second;
- if ( s.getName() == name ) {
+ for ( ShardMap::iterator i = _lookup.begin(); i!=_lookup.end(); ) {
+ ShardPtr s = i->second;
+ if ( s->getName() == name ) {
_lookup.erase(i++);
}
else {
@@ -145,35 +148,49 @@ namespace mongo {
}
}
- void getAllShards( vector<Shard>& all ) const {
+ void getAllShards( vector<ShardPtr>& all ) const {
scoped_lock lk( _mutex );
std::set<string> seen;
- for ( map<string,Shard>::const_iterator i = _lookup.begin(); i!=_lookup.end(); ++i ) {
- const Shard& s = i->second;
- if ( s.getName() == "config" )
+ for ( ShardMap::const_iterator i = _lookup.begin(); i!=_lookup.end(); ++i ) {
+ const ShardPtr& s = i->second;
+ if ( s->getName() == "config" )
continue;
- if ( seen.count( s.getName() ) )
+ if ( seen.count( s->getName() ) )
continue;
- seen.insert( s.getName() );
+ seen.insert( s->getName() );
all.push_back( s );
}
}
+
+ void getAllShards( vector<Shard>& all ) const {
+ scoped_lock lk( _mutex );
+ std::set<string> seen;
+ for ( ShardMap::const_iterator i = _lookup.begin(); i!=_lookup.end(); ++i ) {
+ const ShardPtr& s = i->second;
+ if ( s->getName() == "config" )
+ continue;
+ if ( seen.count( s->getName() ) )
+ continue;
+ seen.insert( s->getName() );
+ all.push_back( *s );
+ }
+ }
+
bool isAShardNode( const string& addr ) const {
scoped_lock lk( _mutex );
// check direct nods or set names
- map<string,Shard>::const_iterator i = _lookup.find( addr );
+ ShardMap::const_iterator i = _lookup.find( addr );
if ( i != _lookup.end() )
return true;
// check for set nodes
- for ( map<string,Shard>::const_iterator i = _lookup.begin(); i!=_lookup.end(); ++i ) {
+ for ( ShardMap::const_iterator i = _lookup.begin(); i!=_lookup.end(); ++i ) {
if ( i->first == "config" )
continue;
- const Shard& s = i->second;
- if ( s.containsNode( addr ) )
+ if ( i->second->containsNode( addr ) )
return true;
}
@@ -185,8 +202,8 @@ namespace mongo {
BSONObjBuilder b( _lookup.size() + 50 );
- for ( map<string,Shard>::const_iterator i = _lookup.begin(); i!=_lookup.end(); ++i ) {
- b.append( i->first , i->second.getConnString() );
+ for ( ShardMap::const_iterator i = _lookup.begin(); i!=_lookup.end(); ++i ) {
+ b.append( i->first , i->second->getConnString() );
}
result.append( "map" , b.obj() );
@@ -195,7 +212,8 @@ namespace mongo {
}
private:
- map<string,Shard> _lookup;
+ typedef map<string,ShardPtr> ShardMap;
+ ShardMap _lookup;
mutable mongo::mutex _mutex;
} staticShardInfo;
@@ -242,14 +260,14 @@ namespace mongo {
}
void Shard::reset( const string& ident ) {
- const Shard& s = staticShardInfo.find( ident );
- massert( 13128 , (string)"can't find shard for: " + ident , s.ok() );
- _name = s._name;
- _addr = s._addr;
- _cs = s._cs;
+ ShardPtr s = staticShardInfo.find( ident );
+ massert( 13128 , (string)"can't find shard for: " + ident , s->ok() );
+ _name = s->_name;
+ _addr = s->_addr;
+ _cs = s->_cs;
_rsInit();
- _maxSize = s._maxSize;
- _isDraining = s._isDraining;
+ _maxSize = s->_maxSize;
+ _isDraining = s->_isDraining;
}
bool Shard::containsNode( const string& node ) const {
@@ -271,10 +289,10 @@ namespace mongo {
}
void Shard::printShardInfo( ostream& out ) {
- vector<Shard> all;
- getAllShards( all );
+ vector<ShardPtr> all;
+ staticShardInfo.getAllShards( all );
for ( unsigned i=0; i<all.size(); i++ )
- out << all[i].toString() << "\n";
+ out << all[i]->toString() << "\n";
out.flush();
}
@@ -306,7 +324,7 @@ namespace mongo {
}
Shard Shard::pick( const Shard& current ) {
- vector<Shard> all;
+ vector<ShardPtr> all;
staticShardInfo.getAllShards( all );
if ( all.size() == 0 ) {
staticShardInfo.reload();
@@ -316,13 +334,13 @@ namespace mongo {
}
// if current shard was provided, pick a different shard only if it is a better choice
- ShardStatus best = all[0].getStatus();
+ ShardStatus best = all[0]->getStatus();
if ( current != EMPTY ) {
best = current.getStatus();
}
for ( size_t i=0; i<all.size(); i++ ) {
- ShardStatus t = all[i].getStatus();
+ ShardStatus t = all[i]->getStatus();
if ( t < best )
best = t;
}
diff --git a/s/shard_version.cpp b/s/shard_version.cpp
index 043b9bd..a189a08 100644
--- a/s/shard_version.cpp
+++ b/s/shard_version.cpp
@@ -97,7 +97,9 @@ namespace mongo {
const bool isSharded = conf->isSharded( ns );
if ( isSharded ) {
manager = conf->getChunkManager( ns , authoritative );
- officialSequenceNumber = manager->getSequenceNumber();
+ // It's possible the chunk manager was reset since we checked whether sharded was true,
+ // so must check this here.
+ if( manager ) officialSequenceNumber = manager->getSequenceNumber();
}
// has the ChunkManager been reloaded since the last time we updated the connection-level version?
@@ -109,7 +111,7 @@ namespace mongo {
ShardChunkVersion version = 0;
- if ( isSharded ) {
+ if ( isSharded && manager ) {
version = manager->getVersion( Shard::make( conn.getServerAddress() ) );
}
diff --git a/s/strategy_shard.cpp b/s/strategy_shard.cpp
index 2eca0c6..26ea79a 100644
--- a/s/strategy_shard.cpp
+++ b/s/strategy_shard.cpp
@@ -151,8 +151,10 @@ namespace mongo {
// Many operations benefit from having the shard key early in the object
o = manager->getShardKey().moveToFront(o);
+ const int maxTries = 10;
+
bool gotThrough = false;
- for ( int i=0; i<10; i++ ) {
+ for ( int i=0; i<maxTries; i++ ) {
try {
ChunkPtr c = manager->findChunk( o );
log(4) << " server:" << c->getShard().toString() << " " << o << endl;
@@ -165,7 +167,7 @@ namespace mongo {
break;
}
catch ( StaleConfigException& ) {
- log(1) << "retrying insert because of StaleConfigException: " << o << endl;
+ log( i < ( maxTries / 2 ) ) << "retrying insert because of StaleConfigException: " << o << endl;
r.reset();
manager = r.getChunkManager();
}
diff --git a/s/strategy_single.cpp b/s/strategy_single.cpp
index b3b5502..3fd357a 100644
--- a/s/strategy_single.cpp
+++ b/s/strategy_single.cpp
@@ -88,7 +88,7 @@ namespace mongo {
long long id = r.d().getInt64( 4 );
- ShardConnection conn( cursorCache.getRef( id ) , ns );
+ ScopedDbConnection conn( cursorCache.getRef( id ) );
Message response;
bool ok = conn->callRead( r.m() , response);
diff --git a/s/writeback_listener.cpp b/s/writeback_listener.cpp
index 3051013..769163e 100644
--- a/s/writeback_listener.cpp
+++ b/s/writeback_listener.cpp
@@ -36,7 +36,7 @@ namespace mongo {
set<string> WriteBackListener::_seenSets;
mongo::mutex WriteBackListener::_cacheLock("WriteBackListener");
- map<ConnectionId,WriteBackListener::WBStatus> WriteBackListener::_seenWritebacks;
+ map<WriteBackListener::ConnectionIdent,WriteBackListener::WBStatus> WriteBackListener::_seenWritebacks;
mongo::mutex WriteBackListener::_seenWritebacksLock("WriteBackListener::seen");
WriteBackListener::WriteBackListener( const string& addr ) : _addr( addr ) {
@@ -86,18 +86,18 @@ namespace mongo {
}
/* static */
- BSONObj WriteBackListener::waitFor( ConnectionId connectionId, const OID& oid ) {
+ BSONObj WriteBackListener::waitFor( const ConnectionIdent& ident, const OID& oid ) {
Timer t;
for ( int i=0; i<5000; i++ ) {
{
scoped_lock lk( _seenWritebacksLock );
- WBStatus s = _seenWritebacks[connectionId];
+ WBStatus s = _seenWritebacks[ident];
if ( oid < s.id ) {
// this means we're waiting for a GLE that already passed.
// it should be impossible becauseonce we call GLE, no other
// writebacks should happen with that connection id
msgasserted( 13633 , str::stream() << "got writeback waitfor for older id " <<
- " oid: " << oid << " s.id: " << s.id << " connectionId: " << connectionId );
+ " oid: " << oid << " s.id: " << s.id << " connection: " << ident.toString() );
}
else if ( oid == s.id ) {
return s.gle;
@@ -142,10 +142,13 @@ namespace mongo {
if ( data.getBoolField( "writeBack" ) ) {
string ns = data["ns"].valuestrsafe();
- ConnectionId cid = 0;
+ ConnectionIdent cid( "" , 0 );
OID wid;
if ( data["connectionId"].isNumber() && data["id"].type() == jstOID ) {
- cid = data["connectionId"].numberLong();
+ string s = "";
+ if ( data["instanceIdent"].type() == String )
+ s = data["instanceIdent"].String();
+ cid = ConnectionIdent( s , data["connectionId"].numberLong() );
wid = data["id"].OID();
}
else {
@@ -226,7 +229,7 @@ namespace mongo {
secsToSleep = 0;
continue;
}
- catch ( std::exception e ) {
+ catch ( std::exception& e ) {
if ( inShutdown() ) {
// we're shutting down, so just clean up
diff --git a/s/writeback_listener.h b/s/writeback_listener.h
index 7335999..0125073 100644
--- a/s/writeback_listener.h
+++ b/s/writeback_listener.h
@@ -35,10 +35,30 @@ namespace mongo {
*/
class WriteBackListener : public BackgroundJob {
public:
+
+ class ConnectionIdent {
+ public:
+ ConnectionIdent( const string& ii , ConnectionId id )
+ : instanceIdent( ii ) , connectionId( id ) {
+ }
+
+ bool operator<(const ConnectionIdent& other) const {
+ if ( instanceIdent == other.instanceIdent )
+ return connectionId < other.connectionId;
+
+ return instanceIdent < other.instanceIdent;
+ }
+
+ string toString() const { return str::stream() << instanceIdent << ":" << connectionId; }
+
+ string instanceIdent;
+ ConnectionId connectionId;
+ };
+
static void init( DBClientBase& conn );
static void init( const string& host );
- static BSONObj waitFor( ConnectionId connectionId, const OID& oid );
+ static BSONObj waitFor( const ConnectionIdent& ident, const OID& oid );
protected:
WriteBackListener( const string& addr );
@@ -59,7 +79,7 @@ namespace mongo {
};
static mongo::mutex _seenWritebacksLock; // protects _seenWritbacks
- static map<ConnectionId,WBStatus> _seenWritebacks; // connectionId -> last write back GLE
+ static map<ConnectionIdent,WBStatus> _seenWritebacks; // connectionId -> last write back GLE
};
void waitForWriteback( const OID& oid );