From 64b33ee522375a8dc15be2875dfb7db4502259b0 Mon Sep 17 00:00:00 2001 From: Antonin Kral Date: Sat, 18 Jun 2011 21:24:41 +0200 Subject: Imported Upstream version 1.8.2 --- s/balance.cpp | 17 ++++--- s/balancer_policy.cpp | 8 +++ s/chunk.cpp | 88 ++++++++++++++++----------------- s/chunk.h | 12 ++--- s/client.cpp | 8 ++- s/client.h | 5 +- s/commands_admin.cpp | 22 +++++++-- s/commands_public.cpp | 4 ++ s/config.cpp | 4 +- s/d_logic.cpp | 1 + s/d_migrate.cpp | 123 ++++++++++++++++++++++++++++++++++++----------- s/d_split.cpp | 8 +++ s/d_state.cpp | 4 +- s/grid.cpp | 6 +++ s/grid.h | 2 + s/shard.cpp | 96 +++++++++++++++++++++--------------- s/shard_version.cpp | 6 ++- s/strategy_shard.cpp | 6 ++- s/strategy_single.cpp | 2 +- s/writeback_listener.cpp | 17 ++++--- s/writeback_listener.h | 24 ++++++++- 21 files changed, 312 insertions(+), 151 deletions(-) (limited to 's') diff --git a/s/balance.cpp b/s/balance.cpp index ee0c992..8b01ea7 100644 --- a/s/balance.cpp +++ b/s/balance.cpp @@ -276,20 +276,21 @@ namespace mongo { try { - // first make sure we should even be running + ScopedDbConnection conn( config ); + + // ping has to be first so we keep things in the config server in sync + _ping( conn.conn() ); + + // now make sure we should even be running if ( ! grid.shouldBalance() ) { log(1) << "skipping balancing round because balancing is disabled" << endl; + conn.done(); + sleepsecs( 30 ); continue; } - - ScopedDbConnection conn( config ); - - _ping( conn.conn() ); - if ( ! _checkOIDs() ) { - uassert( 13258 , "oids broken after resetting!" , _checkOIDs() ); - } + uassert( 13258 , "oids broken after resetting!" , _checkOIDs() ); // use fresh shard state Shard::reloadShardInfo(); diff --git a/s/balancer_policy.cpp b/s/balancer_policy.cpp index 2098a1f..482fab0 100644 --- a/s/balancer_policy.cpp +++ b/s/balancer_policy.cpp @@ -40,6 +40,8 @@ namespace mongo { pair max("",0); vector drainingShards; + bool maxOpsQueued = false; + for (ShardToChunksIter i = shardToChunksMap.begin(); i!=shardToChunksMap.end(); ++i ) { // Find whether this shard's capacity or availability are exhausted @@ -67,6 +69,7 @@ namespace mongo { // Draining shards take a lower priority than overloaded shards. if ( size > max.second ) { max = make_pair( shard , size ); + maxOpsQueued = opsQueued; } if ( draining && (size > 0)) { drainingShards.push_back( shard ); @@ -80,6 +83,11 @@ namespace mongo { return NULL; } + if ( maxOpsQueued ) { + log() << "biggest shard has unprocessed writebacks, waiting for completion of migrate" << endl; + return NULL; + } + log(1) << "collection : " << ns << endl; log(1) << "donor : " << max.second << " chunks on " << max.first << endl; log(1) << "receiver : " << min.second << " chunks on " << min.first << endl; diff --git a/s/chunk.cpp b/s/chunk.cpp index b2ad03d..1e473e2 100644 --- a/s/chunk.cpp +++ b/s/chunk.cpp @@ -175,7 +175,7 @@ namespace mongo { conn.done(); } - ChunkPtr Chunk::singleSplit( bool force , BSONObj& res ) { + bool Chunk::singleSplit( bool force , BSONObj& res , ChunkPtr* low, ChunkPtr* high) { vector splitPoint; // if splitting is not obligatory we may return early if there are not enough data @@ -190,7 +190,7 @@ namespace mongo { // 1 split point means we have between half the chunk size to full chunk size // so we shouldn't split log(1) << "chunk not full enough to trigger auto-split" << endl; - return ChunkPtr(); + return false; } splitPoint.push_back( candidates.front() ); @@ -228,13 +228,24 @@ namespace mongo { if ( splitPoint.empty() || _min == splitPoint.front() || _max == splitPoint.front() ) { log() << "want to split chunk, but can't find split point chunk " << toString() << " got: " << ( splitPoint.empty() ? "" : splitPoint.front().toString() ) << endl; - return ChunkPtr(); + return false; } - return multiSplit( splitPoint , res ); + if (!multiSplit( splitPoint , res , true )) + return false; + + if (low && high) { + low->reset( new Chunk(_manager, _min, splitPoint[0], _shard)); + high->reset(new Chunk(_manager, splitPoint[0], _max, _shard)); + } + else { + assert(!low && !high); // can't have one without the other + } + + return true; } - ChunkPtr Chunk::multiSplit( const vector& m , BSONObj& res ) { + bool Chunk::multiSplit( const vector& m , BSONObj& res , bool resetIfSplit) { const size_t maxSplitPoints = 8192; uassert( 10165 , "can't split as shard doesn't have a manager" , _manager ); @@ -261,27 +272,19 @@ namespace mongo { // reloading won't stricly solve all problems, e.g. the collection's metdata lock can be taken // but we issue here so that mongos may refresh wihtout needing to be written/read against - _manager->_reload(); + grid.getDBConfig(_manager->getns())->getChunkManager(_manager->getns(), true); - return ChunkPtr(); + return false; } conn.done(); - _manager->_reload(); - // The previous multisplit logic adjusted the boundaries of 'this' chunk. Any call to 'this' object hereafter - // will see a different _max for the chunk. - // TODO Untie this dependency since, for metadata purposes, the reload() above already fixed boundaries - { - rwlock lk( _manager->_lock , true ); - - setMax(m[0].getOwned()); - DEV assert( shared_from_this() ); - _manager->_chunkMap[_max] = shared_from_this(); - } + if ( resetIfSplit ) { + // force reload of chunks + grid.getDBConfig(_manager->getns())->getChunkManager(_manager->getns(), true); + } - // return the second half, if a single split, or the first new chunk, if a multisplit. - return _manager->findChunk( m[0] ); + return true; } bool Chunk::moveAndCommit( const Shard& to , long long chunkSize /* bytes */, BSONObj& res ) { @@ -311,7 +314,7 @@ namespace mongo { // if succeeded, needs to reload to pick up the new location // if failed, mongos may be stale // reload is excessive here as the failure could be simply because collection metadata is taken - _manager->_reload(); + grid.getDBConfig(_manager->getns())->getChunkManager(_manager->getns(), true); return worked; } @@ -334,21 +337,23 @@ namespace mongo { _dataWritten = 0; // reset so we check often enough BSONObj res; - ChunkPtr newShard = singleSplit( false /* does not force a split if not enough data */ , res ); - if ( newShard.get() == NULL ) { + ChunkPtr low; + ChunkPtr high; + bool worked = singleSplit( false /* does not force a split if not enough data */ , res , &low, &high); + if ( !worked ) { // singleSplit would have issued a message if we got here _dataWritten = 0; // this means there wasn't enough data to split, so don't want to try again until considerable more data return false; } log() << "autosplitted " << _manager->getns() << " shard: " << toString() - << " on: " << newShard->getMax() << "(splitThreshold " << splitThreshold << ")" + << " on: " << low->getMax() << "(splitThreshold " << splitThreshold << ")" #ifdef _DEBUG << " size: " << getPhysicalSize() // slow - but can be usefule when debugging #endif << endl; - moveIfShould( newShard ); + low->moveIfShould( high ); return true; @@ -671,7 +676,7 @@ namespace mongo { log() << "successfully created first chunk for " << c->toString() << endl; } - ChunkPtr ChunkManager::findChunk( const BSONObj & obj , bool retry ) { + ChunkPtr ChunkManager::findChunk( const BSONObj & obj) { BSONObj key = _key.extractKey(obj); { @@ -695,20 +700,13 @@ namespace mongo { PRINT(*c); PRINT(key); - _reload_inlock(); + grid.getDBConfig(getns())->getChunkManager(getns(), true); massert(13141, "Chunk map pointed to incorrect chunk", false); } } - if ( retry ) { - stringstream ss; - ss << "couldn't find a chunk aftry retry which should be impossible extracted: " << key; - throw UserException( 8070 , ss.str() ); - } - - log() << "ChunkManager: couldn't find chunk for: " << key << " going to retry" << endl; - _reload_inlock(); - return findChunk( obj , true ); + massert(8070, str::stream() << "couldn't find a chunk aftry retry which should be impossible extracted: " << key, false); + return ChunkPtr(); // unreachable } ChunkPtr ChunkManager::findChunkOnServer( const Shard& shard ) const { @@ -874,24 +872,26 @@ namespace mongo { configServer.logChange( "dropCollection" , _ns , BSONObj() ); } - void ChunkManager::maybeChunkCollection() { + bool ChunkManager::maybeChunkCollection() { + ensureIndex_inlock(); + uassert( 13346 , "can't pre-split already splitted collection" , (_chunkMap.size() == 1) ); - + ChunkPtr soleChunk = _chunkMap.begin()->second; vector splitPoints; soleChunk->pickSplitVector( splitPoints , Chunk::MaxChunkSize ); if ( splitPoints.empty() ) { log(1) << "not enough data to warrant chunking " << getns() << endl; - return; + return false; } - + BSONObj res; - ChunkPtr p; - p = soleChunk->multiSplit( splitPoints , res ); - if ( p.get() == NULL ) { + bool worked = soleChunk->multiSplit( splitPoints , res , false ); + if (!worked) { log( LL_WARNING ) << "could not split '" << getns() << "': " << res << endl; - return; + return false; } + return true; } ShardChunkVersion ChunkManager::getVersion( const Shard& shard ) const { diff --git a/s/chunk.h b/s/chunk.h index b4bcc35..21e1fbf 100644 --- a/s/chunk.h +++ b/s/chunk.h @@ -108,18 +108,18 @@ namespace mongo { * @param force if set to true, will split the chunk regardless if the split is really necessary size wise * if set to false, will only split if the chunk has reached the currently desired maximum size * @param res the object containing details about the split execution - * @return if found a key, return a pointer to the first chunk, otherwise return a null pointer + * @return if found a key and split successfully */ - ChunkPtr singleSplit( bool force , BSONObj& res ); + bool singleSplit( bool force , BSONObj& res , ChunkPtr* low=NULL, ChunkPtr* high=NULL); /** * Splits this chunk at the given key (or keys) * * @param splitPoints the vector of keys that should be used to divide this chunk * @param res the object containing details about the split execution - * @return shared pointer to the first new Chunk or null pointer if failed + * @return if split was successful */ - ChunkPtr multiSplit( const vector& splitPoints , BSONObj& res ); + bool multiSplit( const vector& splitPoints , BSONObj& res , bool resetIfSplit ); /** * Asks the mongod holding this chunk to find a key that approximately divides this chunk in two @@ -308,13 +308,13 @@ namespace mongo { bool hasShardKey( const BSONObj& obj ); void createFirstChunk( const Shard& shard ); - ChunkPtr findChunk( const BSONObj& obj , bool retry = false ); + ChunkPtr findChunk( const BSONObj& obj ); ChunkPtr findChunkOnServer( const Shard& shard ) const; const ShardKeyPattern& getShardKey() const { return _key; } bool isUnique() const { return _unique; } - void maybeChunkCollection(); + bool maybeChunkCollection(); void getShardsForQuery( set& shards , const BSONObj& query ); void getAllShards( set& all ); diff --git a/s/client.cpp b/s/client.cpp index 95e3124..c0d25fb 100644 --- a/s/client.cpp +++ b/s/client.cpp @@ -100,7 +100,11 @@ namespace mongo { return; } - all.push_back( WBInfo( cid.numberLong() , w.OID() ) ); + string ident = ""; + if ( gle["instanceIdent"].type() == String ) + ident = gle["instanceIdent"].String(); + + all.push_back( WBInfo( WriteBackListener::ConnectionIdent( ident , cid.numberLong() ) , w.OID() ) ); } vector ClientInfo::_handleWriteBacks( vector& all , bool fromWriteBackListener ) { @@ -115,7 +119,7 @@ namespace mongo { } for ( unsigned i=0; isingleSplit( true /* force a split even if not enough data */ , res ); + worked = chunk->singleSplit( true /* force a split even if not enough data */ , res ); } else { @@ -526,10 +540,10 @@ namespace mongo { vector splitPoints; splitPoints.push_back( middle ); - p = chunk->multiSplit( splitPoints , res ); + worked = chunk->multiSplit( splitPoints , res , true ); } - if ( p.get() == NULL ) { + if ( !worked ) { errmsg = "split failed"; result.append( "cause" , res ); return false; diff --git a/s/commands_public.cpp b/s/commands_public.cpp index 5b1ecaf..f29205b 100644 --- a/s/commands_public.cpp +++ b/s/commands_public.cpp @@ -545,6 +545,10 @@ namespace mongo { bool ok = conn->runCommand( conf->getName() , cmdObj , res ); conn.done(); + if (!ok && res.getIntField("code") == 9996) { // code for StaleConfigException + throw StaleConfigException(fullns, "FindAndModify"); // Command code traps this and re-runs + } + result.appendElements(res); return ok; } diff --git a/s/config.cpp b/s/config.cpp index 35a3be2..9ed3207 100644 --- a/s/config.cpp +++ b/s/config.cpp @@ -143,7 +143,9 @@ namespace mongo { _save(); try { - cm->maybeChunkCollection(); + if ( cm->maybeChunkCollection() ) { + _load(); + } } catch ( UserException& e ) { // failure to chunk is not critical enough to abort the command (and undo the _save()'d configDB state) diff --git a/s/d_logic.cpp b/s/d_logic.cpp index c032883..1ab7c64 100644 --- a/s/d_logic.cpp +++ b/s/d_logic.cpp @@ -103,6 +103,7 @@ namespace mongo { b.append( "ns" , ns ); b.append( "id" , writebackID ); b.append( "connectionId" , cc().getConnectionId() ); + b.append( "instanceIdent" , prettyHostName() ); b.appendTimestamp( "version" , shardingState.getVersion( ns ) ); b.appendTimestamp( "yourVersion" , ShardedConnectionInfo::get( true )->getVersion( ns ) ); b.appendBinData( "msg" , m.header()->len , bdtCustom , (char*)(m.singleData()) ); diff --git a/s/d_migrate.cpp b/s/d_migrate.cpp index 2878276..df12e54 100644 --- a/s/d_migrate.cpp +++ b/s/d_migrate.cpp @@ -273,9 +273,12 @@ namespace mongo { void done() { readlock lk( _ns ); - _deleted.clear(); - _reload.clear(); - _cloneLocs.clear(); + { + scoped_spinlock lk( _trackerLocks ); + _deleted.clear(); + _reload.clear(); + _cloneLocs.clear(); + } _memoryUsed = 0; scoped_lock l(_m); @@ -454,6 +457,7 @@ namespace mongo { while ( cc->ok() ) { DiskLoc dl = cc->currLoc(); if ( ! isLargeChunk ) { + scoped_spinlock lk( _trackerLocks ); _cloneLocs.insert( dl ); } cc->advance(); @@ -480,7 +484,10 @@ namespace mongo { return false; } - log() << "moveChunk number of documents: " << _cloneLocs.size() << endl; + { + scoped_spinlock lk( _trackerLocks ); + log() << "moveChunk number of documents: " << _cloneLocs.size() << endl; + } return true; } @@ -490,29 +497,50 @@ namespace mongo { return false; } - readlock l( _ns ); - Client::Context ctx( _ns ); + ElapsedTracker tracker (128, 10); // same as ClientCursor::_yieldSometimesTracker - NamespaceDetails *d = nsdetails( _ns.c_str() ); - assert( d ); + int allocSize; + { + readlock l(_ns); + Client::Context ctx( _ns ); + NamespaceDetails *d = nsdetails( _ns.c_str() ); + assert( d ); + scoped_spinlock lk( _trackerLocks ); + allocSize = std::min(BSONObjMaxUserSize, (int)((12 + d->averageObjectSize()) * _cloneLocs.size())); + } + BSONArrayBuilder a (allocSize); + + while ( 1 ) { + bool filledBuffer = false; + + readlock l( _ns ); + Client::Context ctx( _ns ); + scoped_spinlock lk( _trackerLocks ); + set::iterator i = _cloneLocs.begin(); + for ( ; i!=_cloneLocs.end(); ++i ) { + if (tracker.ping()) // should I yield? + break; + + DiskLoc dl = *i; + BSONObj o = dl.obj(); + + // use the builder size instead of accumulating 'o's size so that we take into consideration + // the overhead of BSONArray indices + if ( a.len() + o.objsize() + 1024 > BSONObjMaxUserSize ) { + filledBuffer = true; // break out of outer while loop + break; + } - BSONArrayBuilder a( std::min( BSONObjMaxUserSize , (int)( ( 12 + d->averageObjectSize() )* _cloneLocs.size() ) ) ); + a.append( o ); + } - set::iterator i = _cloneLocs.begin(); - for ( ; i!=_cloneLocs.end(); ++i ) { - DiskLoc dl = *i; - BSONObj o = dl.obj(); + _cloneLocs.erase( _cloneLocs.begin() , i ); - // use the builder size instead of accumulating 'o's size so that we take into consideration - // the overhead of BSONArray indices - if ( a.len() + o.objsize() + 1024 > BSONObjMaxUserSize ) { + if ( _cloneLocs.empty() || filledBuffer ) break; - } - a.append( o ); } result.appendArray( "objects" , a.arr() ); - _cloneLocs.erase( _cloneLocs.begin() , i ); return true; } @@ -525,6 +553,11 @@ namespace mongo { if ( ! db->ownsNS( _ns ) ) return; + + // not needed right now + // but trying to prevent a future bug + scoped_spinlock lk( _trackerLocks ); + _cloneLocs.erase( dl ); } @@ -544,9 +577,13 @@ namespace mongo { BSONObj _min; BSONObj _max; + // we need the lock in case there is a malicious _migrateClone for example + // even though it shouldn't be needed under normal operation + SpinLock _trackerLocks; + // disk locs yet to be transferred from here to the other side - // no locking needed because build by 1 thread in a read lock - // depleted by 1 thread in a read lock + // no locking needed because built initially by 1 thread in a read lock + // emptied by 1 thread in a read lock // updates applied by 1 thread in a write lock set _cloneLocs; @@ -1141,6 +1178,8 @@ namespace mongo { assert( state == READY ); assert( ! min.isEmpty() ); assert( ! max.isEmpty() ); + + slaveCount = ( getSlaveCount() / 2 ) + 1; MoveTimingHelper timing( "to" , ns , min , max , 5 /* steps */ ); @@ -1236,11 +1275,32 @@ namespace mongo { break; apply( res , &lastOpApplied ); + + const int maxIterations = 3600*50; + int i; + for ( i=0;i 100 ) { + warning() << "secondaries having hard time keeping up with migrate" << endl; + } - if ( state == ABORT ) { - timing.note( "aborted" ); - return; + sleepmillis( 20 ); } + + if ( i == maxIterations ) { + errmsg = "secondary can't keep up with migrate"; + error() << errmsg << endl; + conn.done(); + state = FAIL; + return; + } } timing.done(4); @@ -1364,14 +1424,17 @@ namespace mongo { return didAnything; } - bool flushPendingWrites( const ReplTime& lastOpApplied ) { + bool opReplicatedEnough( const ReplTime& lastOpApplied ) { // if replication is on, try to force enough secondaries to catch up // TODO opReplicatedEnough should eventually honor priorities and geo-awareness // for now, we try to replicate to a sensible number of secondaries - const int slaveCount = getSlaveCount() / 2 + 1; - if ( ! opReplicatedEnough( lastOpApplied , slaveCount ) ) { - log( LL_WARNING ) << "migrate commit attempt timed out contacting " << slaveCount - << " slaves for '" << ns << "' " << min << " -> " << max << endl; + return mongo::opReplicatedEnough( lastOpApplied , slaveCount ); + } + + bool flushPendingWrites( const ReplTime& lastOpApplied ) { + if ( ! opReplicatedEnough( lastOpApplied ) ) { + warning() << "migrate commit attempt timed out contacting " << slaveCount + << " slaves for '" << ns << "' " << min << " -> " << max << endl; return false; } log() << "migrate commit succeeded flushing to secondaries for '" << ns << "' " << min << " -> " << max << endl; @@ -1438,6 +1501,8 @@ namespace mongo { long long clonedBytes; long long numCatchup; long long numSteady; + + int slaveCount; enum State { READY , CLONE , CATCHUP , STEADY , COMMIT_START , DONE , FAIL , ABORT } state; string errmsg; diff --git a/s/d_split.cpp b/s/d_split.cpp index 66fe38e..3ed6e9b 100644 --- a/s/d_split.cpp +++ b/s/d_split.cpp @@ -138,6 +138,11 @@ namespace mongo { const char* ns = jsobj.getStringField( "checkShardingIndex" ); BSONObj keyPattern = jsobj.getObjectField( "keyPattern" ); + if ( keyPattern.nFields() == 1 && str::equals( "_id" , keyPattern.firstElement().fieldName() ) ) { + result.appendBool( "idskip" , true ); + return true; + } + // If min and max are not provided use the "minKey" and "maxKey" for the sharding key pattern. BSONObj min = jsobj.getObjectField( "min" ); BSONObj max = jsobj.getObjectField( "max" ); @@ -211,6 +216,9 @@ namespace mongo { return false; } cc->advance(); + + if ( ! cc->yieldSometimes() ) + break; } return true; diff --git a/s/d_state.cpp b/s/d_state.cpp index 11fbcef..e10400f 100644 --- a/s/d_state.cpp +++ b/s/d_state.cpp @@ -75,7 +75,7 @@ namespace mongo { << " before [" << _shardName << "] " << " got [" << name << "] " ; - uasserted( 13298 , ss.str() ); + msgasserted( 13298 , ss.str() ); } void ShardingState::gotShardHost( string host ) { @@ -97,7 +97,7 @@ namespace mongo { << " before [" << _shardHost << "] " << " got [" << host << "] " ; - uasserted( 13299 , ss.str() ); + msgasserted( 13299 , ss.str() ); } void ShardingState::resetShardingState() { diff --git a/s/grid.cpp b/s/grid.cpp index 0045754..0646507 100644 --- a/s/grid.cpp +++ b/s/grid.cpp @@ -440,8 +440,14 @@ namespace mongo { return ( dbName == "local" ) || ( dbName == "admin" ) || ( dbName == "config" ); } + void Grid::flushConfig() { + scoped_lock lk( _lock ); + _databases.clear(); + } + Grid grid; + // unit tests class BalancingWindowUnitTest : public UnitTest { diff --git a/s/grid.h b/s/grid.h index 5692a82..e5af33f 100644 --- a/s/grid.h +++ b/s/grid.h @@ -83,6 +83,8 @@ namespace mongo { bool shouldBalance() const; unsigned long long getNextOpTime() const; + + void flushConfig(); // exposed methods below are for testing only diff --git a/s/shard.cpp b/s/shard.cpp index dbfd8f9..c1e3b56 100644 --- a/s/shard.cpp +++ b/s/shard.cpp @@ -25,6 +25,8 @@ namespace mongo { + typedef shared_ptr ShardPtr; + class StaticShardInfo { public: StaticShardInfo() : _mutex("StaticShardInfo") { } @@ -48,9 +50,9 @@ namespace mongo { // the config state intact. The rationale is that this way we could drop shards that // were removed without reinitializing the config DB information. - map::iterator i = _lookup.find( "config" ); + ShardMap::iterator i = _lookup.find( "config" ); if ( i != _lookup.end() ) { - Shard config = i->second; + ShardPtr config = i->second; _lookup.clear(); _lookup[ "config" ] = config; } @@ -75,14 +77,14 @@ namespace mongo { isDraining = isDrainingElem.Bool(); } - Shard s( name , host , maxSize , isDraining ); + ShardPtr s( new Shard( name , host , maxSize , isDraining ) ); _lookup[name] = s; _installHost( host , s ); } } - const Shard& find( const string& ident ) { + ShardPtr find( const string& ident ) { string mykey = ident; { @@ -94,7 +96,7 @@ namespace mongo { { scoped_lock lk( _mutex ); - map::iterator i = _lookup.find( mykey ); + ShardMap::iterator i = _lookup.find( mykey ); if ( i != _lookup.end() ) return i->second; @@ -104,23 +106,24 @@ namespace mongo { reload(); scoped_lock lk( _mutex ); - map::iterator i = _lookup.find( mykey ); + ShardMap::iterator i = _lookup.find( mykey ); massert( 13129 , (string)"can't find shard for: " + mykey , i != _lookup.end() ); return i->second; } void set( const string& name , const Shard& s , bool setName = true , bool setAddr = true ) { scoped_lock lk( _mutex ); + ShardPtr ss( new Shard( s ) ); if ( setName ) - _lookup[name] = s; + _lookup[name] = ss; if ( setAddr ) - _installHost( s.getConnString() , s ); + _installHost( s.getConnString() , ss ); } - void _installHost( const string& host , const Shard& s ) { + void _installHost( const string& host , const ShardPtr& s ) { _lookup[host] = s; - const ConnectionString& cs = s.getAddress(); + const ConnectionString& cs = s->getAddress(); if ( cs.type() == ConnectionString::SET ) { if ( cs.getSetName().size() ) _lookup[ cs.getSetName() ] = s; @@ -134,9 +137,9 @@ namespace mongo { void remove( const string& name ) { scoped_lock lk( _mutex ); - for ( map::iterator i = _lookup.begin(); i!=_lookup.end(); ) { - Shard s = i->second; - if ( s.getName() == name ) { + for ( ShardMap::iterator i = _lookup.begin(); i!=_lookup.end(); ) { + ShardPtr s = i->second; + if ( s->getName() == name ) { _lookup.erase(i++); } else { @@ -145,35 +148,49 @@ namespace mongo { } } - void getAllShards( vector& all ) const { + void getAllShards( vector& all ) const { scoped_lock lk( _mutex ); std::set seen; - for ( map::const_iterator i = _lookup.begin(); i!=_lookup.end(); ++i ) { - const Shard& s = i->second; - if ( s.getName() == "config" ) + for ( ShardMap::const_iterator i = _lookup.begin(); i!=_lookup.end(); ++i ) { + const ShardPtr& s = i->second; + if ( s->getName() == "config" ) continue; - if ( seen.count( s.getName() ) ) + if ( seen.count( s->getName() ) ) continue; - seen.insert( s.getName() ); + seen.insert( s->getName() ); all.push_back( s ); } } + + void getAllShards( vector& all ) const { + scoped_lock lk( _mutex ); + std::set seen; + for ( ShardMap::const_iterator i = _lookup.begin(); i!=_lookup.end(); ++i ) { + const ShardPtr& s = i->second; + if ( s->getName() == "config" ) + continue; + if ( seen.count( s->getName() ) ) + continue; + seen.insert( s->getName() ); + all.push_back( *s ); + } + } + bool isAShardNode( const string& addr ) const { scoped_lock lk( _mutex ); // check direct nods or set names - map::const_iterator i = _lookup.find( addr ); + ShardMap::const_iterator i = _lookup.find( addr ); if ( i != _lookup.end() ) return true; // check for set nodes - for ( map::const_iterator i = _lookup.begin(); i!=_lookup.end(); ++i ) { + for ( ShardMap::const_iterator i = _lookup.begin(); i!=_lookup.end(); ++i ) { if ( i->first == "config" ) continue; - const Shard& s = i->second; - if ( s.containsNode( addr ) ) + if ( i->second->containsNode( addr ) ) return true; } @@ -185,8 +202,8 @@ namespace mongo { BSONObjBuilder b( _lookup.size() + 50 ); - for ( map::const_iterator i = _lookup.begin(); i!=_lookup.end(); ++i ) { - b.append( i->first , i->second.getConnString() ); + for ( ShardMap::const_iterator i = _lookup.begin(); i!=_lookup.end(); ++i ) { + b.append( i->first , i->second->getConnString() ); } result.append( "map" , b.obj() ); @@ -195,7 +212,8 @@ namespace mongo { } private: - map _lookup; + typedef map ShardMap; + ShardMap _lookup; mutable mongo::mutex _mutex; } staticShardInfo; @@ -242,14 +260,14 @@ namespace mongo { } void Shard::reset( const string& ident ) { - const Shard& s = staticShardInfo.find( ident ); - massert( 13128 , (string)"can't find shard for: " + ident , s.ok() ); - _name = s._name; - _addr = s._addr; - _cs = s._cs; + ShardPtr s = staticShardInfo.find( ident ); + massert( 13128 , (string)"can't find shard for: " + ident , s->ok() ); + _name = s->_name; + _addr = s->_addr; + _cs = s->_cs; _rsInit(); - _maxSize = s._maxSize; - _isDraining = s._isDraining; + _maxSize = s->_maxSize; + _isDraining = s->_isDraining; } bool Shard::containsNode( const string& node ) const { @@ -271,10 +289,10 @@ namespace mongo { } void Shard::printShardInfo( ostream& out ) { - vector all; - getAllShards( all ); + vector all; + staticShardInfo.getAllShards( all ); for ( unsigned i=0; itoString() << "\n"; out.flush(); } @@ -306,7 +324,7 @@ namespace mongo { } Shard Shard::pick( const Shard& current ) { - vector all; + vector all; staticShardInfo.getAllShards( all ); if ( all.size() == 0 ) { staticShardInfo.reload(); @@ -316,13 +334,13 @@ namespace mongo { } // if current shard was provided, pick a different shard only if it is a better choice - ShardStatus best = all[0].getStatus(); + ShardStatus best = all[0]->getStatus(); if ( current != EMPTY ) { best = current.getStatus(); } for ( size_t i=0; igetStatus(); if ( t < best ) best = t; } diff --git a/s/shard_version.cpp b/s/shard_version.cpp index 043b9bd..a189a08 100644 --- a/s/shard_version.cpp +++ b/s/shard_version.cpp @@ -97,7 +97,9 @@ namespace mongo { const bool isSharded = conf->isSharded( ns ); if ( isSharded ) { manager = conf->getChunkManager( ns , authoritative ); - officialSequenceNumber = manager->getSequenceNumber(); + // It's possible the chunk manager was reset since we checked whether sharded was true, + // so must check this here. + if( manager ) officialSequenceNumber = manager->getSequenceNumber(); } // has the ChunkManager been reloaded since the last time we updated the connection-level version? @@ -109,7 +111,7 @@ namespace mongo { ShardChunkVersion version = 0; - if ( isSharded ) { + if ( isSharded && manager ) { version = manager->getVersion( Shard::make( conn.getServerAddress() ) ); } diff --git a/s/strategy_shard.cpp b/s/strategy_shard.cpp index 2eca0c6..26ea79a 100644 --- a/s/strategy_shard.cpp +++ b/s/strategy_shard.cpp @@ -151,8 +151,10 @@ namespace mongo { // Many operations benefit from having the shard key early in the object o = manager->getShardKey().moveToFront(o); + const int maxTries = 10; + bool gotThrough = false; - for ( int i=0; i<10; i++ ) { + for ( int i=0; ifindChunk( o ); log(4) << " server:" << c->getShard().toString() << " " << o << endl; @@ -165,7 +167,7 @@ namespace mongo { break; } catch ( StaleConfigException& ) { - log(1) << "retrying insert because of StaleConfigException: " << o << endl; + log( i < ( maxTries / 2 ) ) << "retrying insert because of StaleConfigException: " << o << endl; r.reset(); manager = r.getChunkManager(); } diff --git a/s/strategy_single.cpp b/s/strategy_single.cpp index b3b5502..3fd357a 100644 --- a/s/strategy_single.cpp +++ b/s/strategy_single.cpp @@ -88,7 +88,7 @@ namespace mongo { long long id = r.d().getInt64( 4 ); - ShardConnection conn( cursorCache.getRef( id ) , ns ); + ScopedDbConnection conn( cursorCache.getRef( id ) ); Message response; bool ok = conn->callRead( r.m() , response); diff --git a/s/writeback_listener.cpp b/s/writeback_listener.cpp index 3051013..769163e 100644 --- a/s/writeback_listener.cpp +++ b/s/writeback_listener.cpp @@ -36,7 +36,7 @@ namespace mongo { set WriteBackListener::_seenSets; mongo::mutex WriteBackListener::_cacheLock("WriteBackListener"); - map WriteBackListener::_seenWritebacks; + map WriteBackListener::_seenWritebacks; mongo::mutex WriteBackListener::_seenWritebacksLock("WriteBackListener::seen"); WriteBackListener::WriteBackListener( const string& addr ) : _addr( addr ) { @@ -86,18 +86,18 @@ namespace mongo { } /* static */ - BSONObj WriteBackListener::waitFor( ConnectionId connectionId, const OID& oid ) { + BSONObj WriteBackListener::waitFor( const ConnectionIdent& ident, const OID& oid ) { Timer t; for ( int i=0; i<5000; i++ ) { { scoped_lock lk( _seenWritebacksLock ); - WBStatus s = _seenWritebacks[connectionId]; + WBStatus s = _seenWritebacks[ident]; if ( oid < s.id ) { // this means we're waiting for a GLE that already passed. // it should be impossible becauseonce we call GLE, no other // writebacks should happen with that connection id msgasserted( 13633 , str::stream() << "got writeback waitfor for older id " << - " oid: " << oid << " s.id: " << s.id << " connectionId: " << connectionId ); + " oid: " << oid << " s.id: " << s.id << " connection: " << ident.toString() ); } else if ( oid == s.id ) { return s.gle; @@ -142,10 +142,13 @@ namespace mongo { if ( data.getBoolField( "writeBack" ) ) { string ns = data["ns"].valuestrsafe(); - ConnectionId cid = 0; + ConnectionIdent cid( "" , 0 ); OID wid; if ( data["connectionId"].isNumber() && data["id"].type() == jstOID ) { - cid = data["connectionId"].numberLong(); + string s = ""; + if ( data["instanceIdent"].type() == String ) + s = data["instanceIdent"].String(); + cid = ConnectionIdent( s , data["connectionId"].numberLong() ); wid = data["id"].OID(); } else { @@ -226,7 +229,7 @@ namespace mongo { secsToSleep = 0; continue; } - catch ( std::exception e ) { + catch ( std::exception& e ) { if ( inShutdown() ) { // we're shutting down, so just clean up diff --git a/s/writeback_listener.h b/s/writeback_listener.h index 7335999..0125073 100644 --- a/s/writeback_listener.h +++ b/s/writeback_listener.h @@ -35,10 +35,30 @@ namespace mongo { */ class WriteBackListener : public BackgroundJob { public: + + class ConnectionIdent { + public: + ConnectionIdent( const string& ii , ConnectionId id ) + : instanceIdent( ii ) , connectionId( id ) { + } + + bool operator<(const ConnectionIdent& other) const { + if ( instanceIdent == other.instanceIdent ) + return connectionId < other.connectionId; + + return instanceIdent < other.instanceIdent; + } + + string toString() const { return str::stream() << instanceIdent << ":" << connectionId; } + + string instanceIdent; + ConnectionId connectionId; + }; + static void init( DBClientBase& conn ); static void init( const string& host ); - static BSONObj waitFor( ConnectionId connectionId, const OID& oid ); + static BSONObj waitFor( const ConnectionIdent& ident, const OID& oid ); protected: WriteBackListener( const string& addr ); @@ -59,7 +79,7 @@ namespace mongo { }; static mongo::mutex _seenWritebacksLock; // protects _seenWritbacks - static map _seenWritebacks; // connectionId -> last write back GLE + static map _seenWritebacks; // connectionId -> last write back GLE }; void waitForWriteback( const OID& oid ); -- cgit v1.2.3