diff options
author | Antonin Kral <a.kral@bobek.cz> | 2011-09-14 17:08:06 +0200 |
---|---|---|
committer | Antonin Kral <a.kral@bobek.cz> | 2011-09-14 17:08:06 +0200 |
commit | 5d342a758c6095b4d30aba0750b54f13b8916f51 (patch) | |
tree | 762e9aa84781f5e3b96db2c02d356c29cf0217c0 /s/chunk.cpp | |
parent | cbe2d992e9cd1ea66af9fa91df006106775d3073 (diff) | |
download | mongodb-5d342a758c6095b4d30aba0750b54f13b8916f51.tar.gz |
Imported Upstream version 2.0.0
Diffstat (limited to 's/chunk.cpp')
-rw-r--r-- | s/chunk.cpp | 557 |
1 files changed, 211 insertions, 346 deletions
diff --git a/s/chunk.cpp b/s/chunk.cpp index 2d0ad5d..09dc994 100644 --- a/s/chunk.cpp +++ b/s/chunk.cpp @@ -19,8 +19,10 @@ #include "pch.h" #include "../client/connpool.h" +#include "../db/querypattern.h" #include "../db/queryutil.h" #include "../util/unittest.h" +#include "../util/timer.h" #include "chunk.h" #include "config.h" @@ -48,17 +50,34 @@ namespace mongo { int Chunk::MaxObjectPerChunk = 250000; - Chunk::Chunk( ChunkManager * manager ) : _manager(manager), _lastmod(0) { - _setDataWritten(); - } + Chunk::Chunk(const ChunkManager * manager, BSONObj from) + : _manager(manager), _lastmod(0), _dataWritten(mkDataWritten()) + { + string ns = from.getStringField( "ns" ); + _shard.reset( from.getStringField( "shard" ) ); + + _lastmod = from["lastmod"]; + assert( _lastmod > 0 ); - Chunk::Chunk(ChunkManager * info , const BSONObj& min, const BSONObj& max, const Shard& shard) - : _manager(info), _min(min), _max(max), _shard(shard), _lastmod(0) { - _setDataWritten(); + _min = from.getObjectField( "min" ).getOwned(); + _max = from.getObjectField( "max" ).getOwned(); + + uassert( 10170 , "Chunk needs a ns" , ! ns.empty() ); + uassert( 13327 , "Chunk ns must match server ns" , ns == _manager->getns() ); + + uassert( 10171 , "Chunk needs a server" , _shard.ok() ); + + uassert( 10172 , "Chunk needs a min" , ! _min.isEmpty() ); + uassert( 10173 , "Chunk needs a max" , ! _max.isEmpty() ); } - void Chunk::_setDataWritten() { - _dataWritten = rand() % ( MaxChunkSize / 5 ); + + Chunk::Chunk(const ChunkManager * info , const BSONObj& min, const BSONObj& max, const Shard& shard) + : _manager(info), _min(min), _max(max), _shard(shard), _lastmod(0), _dataWritten(mkDataWritten()) + {} + + long Chunk::mkDataWritten() { + return rand() % ( MaxChunkSize / 5 ); } string Chunk::getns() const { @@ -175,7 +194,7 @@ namespace mongo { conn.done(); } - bool Chunk::singleSplit( bool force , BSONObj& res , ChunkPtr* low, ChunkPtr* high) { + BSONObj Chunk::singleSplit( bool force , BSONObj& res ) const { vector<BSONObj> splitPoint; // if splitting is not obligatory we may return early if there are not enough data @@ -189,8 +208,8 @@ namespace mongo { // no split points means there isn't enough data to split on // 1 split point means we have between half the chunk size to full chunk size // so we shouldn't split - log(1) << "chunk not full enough to trigger auto-split" << endl; - return false; + LOG(1) << "chunk not full enough to trigger auto-split" << endl; + return BSONObj(); } splitPoint.push_back( candidates.front() ); @@ -228,24 +247,16 @@ namespace mongo { if ( splitPoint.empty() || _min == splitPoint.front() || _max == splitPoint.front() ) { log() << "want to split chunk, but can't find split point chunk " << toString() << " got: " << ( splitPoint.empty() ? "<empty>" : splitPoint.front().toString() ) << endl; - return false; - } - - if (!multiSplit( splitPoint , res , true )) - return false; - - if (low && high) { - low->reset( new Chunk(_manager, _min, splitPoint[0], _shard)); - high->reset(new Chunk(_manager, splitPoint[0], _max, _shard)); - } - else { - assert(!low && !high); // can't have one without the other + return BSONObj(); } - - return true; + + if (multiSplit( splitPoint , res )) + return splitPoint.front(); + else + return BSONObj(); } - bool Chunk::multiSplit( const vector<BSONObj>& m , BSONObj& res , bool resetIfSplit) { + bool Chunk::multiSplit( const vector<BSONObj>& m , BSONObj& res ) const { const size_t maxSplitPoints = 8192; uassert( 10165 , "can't split as shard doesn't have a manager" , _manager ); @@ -270,24 +281,22 @@ namespace mongo { warning() << "splitChunk failed - cmd: " << cmdObj << " result: " << res << endl; conn.done(); - // reloading won't stricly solve all problems, e.g. the collection's metdata lock can be taken - // but we issue here so that mongos may refresh wihtout needing to be written/read against - grid.getDBConfig(_manager->getns())->getChunkManager(_manager->getns(), true); + // reloading won't strictly solve all problems, e.g. the collection's metadata lock can be taken + // but we issue here so that mongos may refresh without needing to be written/read against + _manager->reload(); return false; } conn.done(); - - if ( resetIfSplit ) { - // force reload of chunks - grid.getDBConfig(_manager->getns())->getChunkManager(_manager->getns(), true); - } + + // force reload of config + _manager->reload(); return true; } - bool Chunk::moveAndCommit( const Shard& to , long long chunkSize /* bytes */, BSONObj& res ) { + bool Chunk::moveAndCommit( const Shard& to , long long chunkSize /* bytes */, BSONObj& res ) const { uassert( 10167 , "can't move shard to its current location!" , getShard() != to ); log() << "moving chunk ns: " << _manager->getns() << " moving ( " << toString() << ") " << _shard.toString() << " -> " << to.toString() << endl; @@ -311,15 +320,17 @@ namespace mongo { fromconn.done(); + log( worked ) << "moveChunk result: " << res << endl; + // if succeeded, needs to reload to pick up the new location // if failed, mongos may be stale // reload is excessive here as the failure could be simply because collection metadata is taken - grid.getDBConfig(_manager->getns())->getChunkManager(_manager->getns(), true); + _manager->reload(); return worked; } - bool Chunk::splitIfShould( long dataWritten ) { + bool Chunk::splitIfShould( long dataWritten ) const { LastError::Disabled d( lastError.get() ); try { @@ -332,28 +343,63 @@ namespace mongo { if ( _dataWritten < splitThreshold / 5 ) return false; - log(1) << "about to initiate autosplit: " << *this << " dataWritten: " << _dataWritten << " splitThreshold: " << splitThreshold << endl; + // this is a bit ugly + // we need it so that mongos blocks for the writes to actually be committed + // this does mean mongos has more back pressure than mongod alone + // since it nots 100% tcp queue bound + // this was implicit before since we did a splitVector on the same socket + ShardConnection::sync(); + + LOG(1) << "about to initiate autosplit: " << *this << " dataWritten: " << _dataWritten << " splitThreshold: " << splitThreshold << endl; _dataWritten = 0; // reset so we check often enough BSONObj res; - ChunkPtr low; - ChunkPtr high; - bool worked = singleSplit( false /* does not force a split if not enough data */ , res , &low, &high); - if ( !worked ) { + BSONObj splitPoint = singleSplit( false /* does not force a split if not enough data */ , res ); + if ( splitPoint.isEmpty() ) { // singleSplit would have issued a message if we got here _dataWritten = 0; // this means there wasn't enough data to split, so don't want to try again until considerable more data return false; } log() << "autosplitted " << _manager->getns() << " shard: " << toString() - << " on: " << low->getMax() << "(splitThreshold " << splitThreshold << ")" + << " on: " << splitPoint << "(splitThreshold " << splitThreshold << ")" #ifdef _DEBUG - << " size: " << getPhysicalSize() // slow - but can be usefule when debugging + << " size: " << getPhysicalSize() // slow - but can be useful when debugging #endif << endl; - low->moveIfShould( high ); + BSONElement shouldMigrate = res["shouldMigrate"]; // not in mongod < 1.9.1 but that is ok + if (!shouldMigrate.eoo() && grid.shouldBalance()){ + BSONObj range = shouldMigrate.embeddedObject(); + BSONObj min = range["min"].embeddedObject(); + BSONObj max = range["max"].embeddedObject(); + + Shard newLocation = Shard::pick( getShard() ); + if ( getShard() == newLocation ) { + // if this is the best shard, then we shouldn't do anything (Shard::pick already logged our shard). + LOG(1) << "recently split chunk: " << range << " already in the best shard: " << getShard() << endl; + return true; // we did split even if we didn't migrate + } + + ChunkManagerPtr cm = _manager->reload(false/*just reloaded in mulitsplit*/); + ChunkPtr toMove = cm->findChunk(min); + + if ( ! (toMove->getMin() == min && toMove->getMax() == max) ){ + LOG(1) << "recently split chunk: " << range << " modified before we could migrate " << toMove << endl; + return true; + } + + log() << "moving chunk (auto): " << toMove << " to: " << newLocation.toString() << endl; + + BSONObj res; + massert( 10412 , + str::stream() << "moveAndCommit failed: " << res , + toMove->moveAndCommit( newLocation , MaxChunkSize , res ) ); + + // update our config + _manager->reload(); + } return true; @@ -365,40 +411,6 @@ namespace mongo { } } - bool Chunk::moveIfShould( ChunkPtr newChunk ) { - ChunkPtr toMove; - - if ( newChunk->countObjects(2) <= 1 ) { - toMove = newChunk; - } - else if ( this->countObjects(2) <= 1 ) { - DEV assert( shared_from_this() ); - toMove = shared_from_this(); - } - else { - // moving middle shards is handled by balancer - return false; - } - - assert( toMove ); - - Shard newLocation = Shard::pick( getShard() ); - if ( getShard() == newLocation ) { - // if this is the best shard, then we shouldn't do anything (Shard::pick already logged our shard). - log(1) << "recently split chunk: " << toString() << "already in the best shard" << endl; - return 0; - } - - log() << "moving chunk (auto): " << toMove->toString() << " to: " << newLocation.toString() << " #objects: " << toMove->countObjects() << endl; - - BSONObj res; - massert( 10412 , - str::stream() << "moveAndCommit failed: " << res , - toMove->moveAndCommit( newLocation , MaxChunkSize , res ) ); - - return true; - } - long Chunk::getPhysicalSize() const { ScopedDbConnection conn( getShard().getConnString() ); @@ -416,24 +428,7 @@ namespace mongo { return (long)result["size"].number(); } - int Chunk::countObjects(int maxCount) const { - static const BSONObj fields = BSON("_id" << 1 ); - - ShardConnection conn( getShard() , _manager->getns() ); - - // not using regular count as this is more flexible and supports $min/$max - Query q = Query().minKey(_min).maxKey(_max); - int n; - { - auto_ptr<DBClientCursor> c = conn->query(_manager->getns(), q, maxCount, 0, &fields); - assert( c.get() ); - n = c->itcount(); - } - conn.done(); - return n; - } - - void Chunk::appendShortVersion( const char * name , BSONObjBuilder& b ) { + void Chunk::appendShortVersion( const char * name , BSONObjBuilder& b ) const { BSONObjBuilder bb( b.subobjStart( name ) ); bb.append( "min" , _min ); bb.append( "max" , _max ); @@ -481,33 +476,6 @@ namespace mongo { return buf.str(); } - void Chunk::unserialize(const BSONObj& from) { - string ns = from.getStringField( "ns" ); - _shard.reset( from.getStringField( "shard" ) ); - - _lastmod = from["lastmod"]; - assert( _lastmod > 0 ); - - BSONElement e = from["minDotted"]; - - if (e.eoo()) { - _min = from.getObjectField( "min" ).getOwned(); - _max = from.getObjectField( "max" ).getOwned(); - } - else { // TODO delete this case after giving people a chance to migrate - _min = e.embeddedObject().getOwned(); - _max = from.getObjectField( "maxDotted" ).getOwned(); - } - - uassert( 10170 , "Chunk needs a ns" , ! ns.empty() ); - uassert( 13327 , "Chunk ns must match server ns" , ns == _manager->getns() ); - - uassert( 10171 , "Chunk needs a server" , _shard.ok() ); - - uassert( 10172 , "Chunk needs a min" , ! _min.isEmpty() ); - uassert( 10173 , "Chunk needs a max" , ! _max.isEmpty() ); - } - string Chunk::toString() const { stringstream ss; ss << "ns:" << _manager->getns() << " at: " << _shard.toString() << " lastmod: " << _lastmod.toString() << " min: " << _min << " max: " << _max; @@ -523,57 +491,63 @@ namespace mongo { AtomicUInt ChunkManager::NextSequenceNumber = 1; ChunkManager::ChunkManager( string ns , ShardKeyPattern pattern , bool unique ) : - _ns( ns ) , _key( pattern ) , _unique( unique ) , _lock("rw:ChunkManager"), - _nsLock( ConnectionString( configServer.modelServer() , ConnectionString::SYNC ) , ns ) { - _reload_inlock(); // will set _sequenceNumber - } - - ChunkManager::~ChunkManager() { - _chunkMap.clear(); - _chunkRanges.clear(); - _shards.clear(); - } + _ns( ns ) , _key( pattern ) , _unique( unique ) , _chunkRanges(), _mutex("ChunkManager"), + _nsLock( ConnectionString( configServer.modelServer() , ConnectionString::SYNC ) , ns ), - void ChunkManager::_reload() { - rwlock lk( _lock , true ); - _reload_inlock(); - } + // The shard versioning mechanism hinges on keeping track of the number of times we reloaded ChunkManager's. + // Increasing this number here will prompt checkShardVersion() to refresh the connection-level versions to + // the most up to date value. + _sequenceNumber(++NextSequenceNumber) - void ChunkManager::_reload_inlock() { + { int tries = 3; while (tries--) { - _chunkMap.clear(); - _chunkRanges.clear(); - _shards.clear(); - _load(); - - if (_isValid()) { - _chunkRanges.reloadAll(_chunkMap); - - // The shard versioning mechanism hinges on keeping track of the number of times we reloaded ChunkManager's. - // Increasing this number here will prompt checkShardVersion() to refresh the connection-level versions to - // the most up to date value. - _sequenceNumber = ++NextSequenceNumber; + ChunkMap chunkMap; + set<Shard> shards; + ShardVersionMap shardVersions; + Timer t; + _load(chunkMap, shards, shardVersions); + { + int ms = t.millis(); + log() << "ChunkManager: time to load chunks for " << ns << ": " << ms << "ms" + << " sequenceNumber: " << _sequenceNumber + << " version: " << _version.toString() + << endl; + } + if (_isValid(chunkMap)) { + // These variables are const for thread-safety. Since the + // constructor can only be called from one thread, we don't have + // to worry about that here. + const_cast<ChunkMap&>(_chunkMap).swap(chunkMap); + const_cast<set<Shard>&>(_shards).swap(shards); + const_cast<ShardVersionMap&>(_shardVersions).swap(shardVersions); + const_cast<ChunkRangeManager&>(_chunkRanges).reloadAll(_chunkMap); return; } - + if (_chunkMap.size() < 10) { _printChunks(); } + + warning() << "ChunkManager loaded an invalid config, trying again" << endl; sleepmillis(10 * (3-tries)); } + // this will abort construction so we should never have a reference to an invalid config msgasserted(13282, "Couldn't load a valid config for " + _ns + " after 3 attempts. Please try again."); + } + ChunkManagerPtr ChunkManager::reload(bool force) const { + return grid.getDBConfig(getns())->getChunkManager(getns(), force); } - void ChunkManager::_load() { + void ChunkManager::_load(ChunkMap& chunkMap, set<Shard>& shards, ShardVersionMap& shardVersions) { ScopedDbConnection conn( configServer.modelServer() ); // TODO really need the sort? - auto_ptr<DBClientCursor> cursor = conn->query( Chunk::chunkMetadataNS, QUERY("ns" << _ns).sort("lastmod",1), 0, 0, 0, 0, + auto_ptr<DBClientCursor> cursor = conn->query( Chunk::chunkMetadataNS, QUERY("ns" << _ns).sort("lastmod",-1), 0, 0, 0, 0, (DEBUG_BUILD ? 2 : 1000000)); // batch size. Try to induce potential race conditions in debug builds assert( cursor.get() ); while ( cursor->more() ) { @@ -582,28 +556,36 @@ namespace mongo { continue; } - ChunkPtr c( new Chunk( this ) ); - c->unserialize( d ); + ChunkPtr c( new Chunk( this, d ) ); - _chunkMap[c->getMax()] = c; - _shards.insert(c->getShard()); + chunkMap[c->getMax()] = c; + shards.insert(c->getShard()); + + // set global max + if ( c->getLastmod() > _version ) + _version = c->getLastmod(); + + // set shard max + ShardChunkVersion& shardMax = shardVersions[c->getShard()]; + if ( c->getLastmod() > shardMax ) + shardMax = c->getLastmod(); } conn.done(); } - bool ChunkManager::_isValid() const { + bool ChunkManager::_isValid(const ChunkMap& chunkMap) { #define ENSURE(x) do { if(!(x)) { log() << "ChunkManager::_isValid failed: " #x << endl; return false; } } while(0) - if (_chunkMap.empty()) + if (chunkMap.empty()) return true; // Check endpoints - ENSURE(allOfType(MinKey, _chunkMap.begin()->second->getMin())); - ENSURE(allOfType(MaxKey, prior(_chunkMap.end())->second->getMax())); + ENSURE(allOfType(MinKey, chunkMap.begin()->second->getMin())); + ENSURE(allOfType(MaxKey, prior(chunkMap.end())->second->getMax())); // Make sure there are no gaps or overlaps - for (ChunkMap::const_iterator it=boost::next(_chunkMap.begin()), end=_chunkMap.end(); it != end; ++it) { + for (ChunkMap::const_iterator it=boost::next(chunkMap.begin()), end=chunkMap.end(); it != end; ++it) { ChunkMap::const_iterator last = prior(it); if (!(it->second->getMin() == last->second->getMax())) { @@ -625,14 +607,15 @@ namespace mongo { } } - bool ChunkManager::hasShardKey( const BSONObj& obj ) { + bool ChunkManager::hasShardKey( const BSONObj& obj ) const { return _key.hasShardKey( obj ); } - void ChunkManager::createFirstChunk( const Shard& shard ) { + void ChunkManager::createFirstChunk( const Shard& shard ) const { + // TODO distlock? assert( _chunkMap.size() == 0 ); - ChunkPtr c( new Chunk(this, _key.globalMin(), _key.globalMax(), shard ) ); + Chunk c (this, _key.globalMin(), _key.globalMax(), shard); // this is the first chunk; start the versioning from scratch ShardChunkVersion version; @@ -640,52 +623,42 @@ namespace mongo { // build update for the chunk collection BSONObjBuilder chunkBuilder; - c->serialize( chunkBuilder , version ); + c.serialize( chunkBuilder , version ); BSONObj chunkCmd = chunkBuilder.obj(); log() << "about to create first chunk for: " << _ns << endl; ScopedDbConnection conn( configServer.modelServer() ); BSONObj res; - conn->update( Chunk::chunkMetadataNS, QUERY( "_id" << c->genID() ), chunkCmd, true, false ); + conn->update( Chunk::chunkMetadataNS, QUERY( "_id" << c.genID() ), chunkCmd, true, false ); string errmsg = conn->getLastError(); if ( errmsg.size() ) { stringstream ss; ss << "saving first chunk failed. cmd: " << chunkCmd << " result: " << errmsg; log( LL_ERROR ) << ss.str() << endl; - msgasserted( 13592 , ss.str() ); // assert(13592) + msgasserted( 13592 , ss.str() ); } conn.done(); - // every instance of ChunkManager has a unique sequence number; callers of ChunkManager may - // inquiry about whether there were changes in chunk configuration (see re/load() calls) since - // the last access to ChunkManager by checking the sequence number - _sequenceNumber = ++NextSequenceNumber; - - _chunkMap[c->getMax()] = c; - _chunkRanges.reloadAll(_chunkMap); - _shards.insert(c->getShard()); - c->setLastmod(version); - // the ensure index will have the (desired) indirect effect of creating the collection on the // assigned shard, as it sets up the index over the sharding keys. - ensureIndex_inlock(); + ScopedDbConnection shardConn( c.getShard().getConnString() ); + shardConn->ensureIndex( getns() , getShardKey().key() , _unique , "" , false /* do not cache ensureIndex SERVER-1691 */ ); + shardConn.done(); - log() << "successfully created first chunk for " << c->toString() << endl; + log() << "successfully created first chunk for " << c.toString() << endl; } - ChunkPtr ChunkManager::findChunk( const BSONObj & obj) { + ChunkPtr ChunkManager::findChunk( const BSONObj & obj ) const { BSONObj key = _key.extractKey(obj); { - rwlock lk( _lock , false ); - BSONObj foo; ChunkPtr c; { - ChunkMap::iterator it = _chunkMap.upper_bound(key); + ChunkMap::const_iterator it = _chunkMap.upper_bound(key); if (it != _chunkMap.end()) { foo = it->first; c = it->second; @@ -693,25 +666,24 @@ namespace mongo { } if ( c ) { - if ( c->contains( obj ) ) + if ( c->contains( key ) ){ + dassert(c->contains(key)); // doesn't use fast-path in extractKey return c; + } PRINT(foo); PRINT(*c); PRINT(key); - grid.getDBConfig(getns())->getChunkManager(getns(), true); + reload(); massert(13141, "Chunk map pointed to incorrect chunk", false); } } - massert(8070, str::stream() << "couldn't find a chunk aftry retry which should be impossible extracted: " << key, false); - return ChunkPtr(); // unreachable + throw UserException( 8070 , str::stream() << "couldn't find a chunk which should be impossible: " << key ); } ChunkPtr ChunkManager::findChunkOnServer( const Shard& shard ) const { - rwlock lk( _lock , false ); - for ( ChunkMap::const_iterator i=_chunkMap.begin(); i!=_chunkMap.end(); ++i ) { ChunkPtr c = i->second; if ( c->getShard() == shard ) @@ -721,14 +693,11 @@ namespace mongo { return ChunkPtr(); } - void ChunkManager::getShardsForQuery( set<Shard>& shards , const BSONObj& query ) { - rwlock lk( _lock , false ); - DEV PRINT(query); - + void ChunkManager::getShardsForQuery( set<Shard>& shards , const BSONObj& query ) const { //TODO look into FieldRangeSetOr - FieldRangeOrSet fros(_ns.c_str(), query, false); + OrRangeGenerator org(_ns.c_str(), query, false); - const string special = fros.getSpecial(); + const string special = org.getSpecial(); if (special == "2d") { BSONForEach(field, query) { if (getGtLtOp(field) == BSONObj::opNEAR) { @@ -743,25 +712,22 @@ namespace mongo { } do { - boost::scoped_ptr<FieldRangeSet> frs (fros.topFrs()); + boost::scoped_ptr<FieldRangeSetPair> frsp (org.topFrsp()); { // special case if most-significant field isn't in query - FieldRange range = frs->range(_key.key().firstElement().fieldName()); + FieldRange range = frsp->singleKeyRange(_key.key().firstElementFieldName()); if ( !range.nontrivial() ) { DEV PRINT(range.nontrivial()); - getAllShards_inlock(shards); + getAllShards(shards); return; } } - BoundList ranges = frs->indexBounds(_key.key(), 1); + BoundList ranges = frsp->singleKeyIndexBounds(_key.key(), 1); for (BoundList::const_iterator it=ranges.begin(), end=ranges.end(); it != end; ++it) { BSONObj minObj = it->first.replaceFieldNames(_key.key()); BSONObj maxObj = it->second.replaceFieldNames(_key.key()); - DEV PRINT(minObj); - DEV PRINT(maxObj); - ChunkRangeMap::const_iterator min, max; min = _chunkRanges.upper_bound(minObj); max = _chunkRanges.upper_bound(maxObj); @@ -781,14 +747,14 @@ namespace mongo { //return; } - if (fros.moreOrClauses()) - fros.popOrClause(); + if (org.moreOrClauses()) + org.popOrClauseSingleKey(); } - while (fros.moreOrClauses()); + while (org.moreOrClauses()); } - void ChunkManager::getShardsForRange(set<Shard>& shards, const BSONObj& min, const BSONObj& max) { + void ChunkManager::getShardsForRange(set<Shard>& shards, const BSONObj& min, const BSONObj& max) const { uassert(13405, "min must have shard key", hasShardKey(min)); uassert(13406, "max must have shard key", hasShardKey(max)); @@ -804,37 +770,30 @@ namespace mongo { } } - void ChunkManager::getAllShards( set<Shard>& all ) { - rwlock lk( _lock , false ); - getAllShards_inlock( all ); - } - - void ChunkManager::getAllShards_inlock( set<Shard>& all ){ + void ChunkManager::getAllShards( set<Shard>& all ) const { all.insert(_shards.begin(), _shards.end()); } - void ChunkManager::ensureIndex_inlock() { - //TODO in parallel? - for ( set<Shard>::const_iterator i=_shards.begin(); i!=_shards.end(); ++i ) { - ScopedDbConnection conn( i->getConnString() ); - conn->ensureIndex( getns() , getShardKey().key() , _unique , "" , false /* do not cache ensureIndex SERVER-1691 */ ); - conn.done(); - } - } - - void ChunkManager::drop( ChunkManagerPtr me ) { - rwlock lk( _lock , true ); + void ChunkManager::drop( ChunkManagerPtr me ) const { + scoped_lock lk( _mutex ); configServer.logChange( "dropCollection.start" , _ns , BSONObj() ); - dist_lock_try dlk( &_nsLock , "drop" ); + dist_lock_try dlk; + try{ + dlk = dist_lock_try( &_nsLock , "drop" ); + } + catch( LockException& e ){ + uassert( 14022, str::stream() << "Error locking distributed lock for chunk drop." << causedBy( e ), false); + } + uassert( 13331 , "collection's metadata is undergoing changes. Please try again." , dlk.got() ); uassert( 10174 , "config servers not all up" , configServer.allUp() ); set<Shard> seen; - log(1) << "ChunkManager::drop : " << _ns << endl; + LOG(1) << "ChunkManager::drop : " << _ns << endl; // lock all shards so no one can do a split/migrate for ( ChunkMap::const_iterator i=_chunkMap.begin(); i!=_chunkMap.end(); ++i ) { @@ -842,12 +801,7 @@ namespace mongo { seen.insert( c->getShard() ); } - log(1) << "ChunkManager::drop : " << _ns << "\t all locked" << endl; - - // wipe my meta-data - _chunkMap.clear(); - _chunkRanges.clear(); - _shards.clear(); + LOG(1) << "ChunkManager::drop : " << _ns << "\t all locked" << endl; // delete data from mongod for ( set<Shard>::iterator i=seen.begin(); i!=seen.end(); i++ ) { @@ -856,82 +810,64 @@ namespace mongo { conn.done(); } - log(1) << "ChunkManager::drop : " << _ns << "\t removed shard data" << endl; + LOG(1) << "ChunkManager::drop : " << _ns << "\t removed shard data" << endl; // remove chunk data ScopedDbConnection conn( configServer.modelServer() ); conn->remove( Chunk::chunkMetadataNS , BSON( "ns" << _ns ) ); conn.done(); - log(1) << "ChunkManager::drop : " << _ns << "\t removed chunk data" << endl; + LOG(1) << "ChunkManager::drop : " << _ns << "\t removed chunk data" << endl; for ( set<Shard>::iterator i=seen.begin(); i!=seen.end(); i++ ) { ScopedDbConnection conn( *i ); BSONObj res; + + // this is horrible + // we need a special command for dropping on the d side + // this hack works for the moment + if ( ! setShardVersion( conn.conn() , _ns , 0 , true , res ) ) throw UserException( 8071 , str::stream() << "cleaning up after drop failed: " << res ); + conn->simpleCommand( "admin", 0, "unsetSharding" ); conn.done(); } - log(1) << "ChunkManager::drop : " << _ns << "\t DONE" << endl; + LOG(1) << "ChunkManager::drop : " << _ns << "\t DONE" << endl; configServer.logChange( "dropCollection" , _ns , BSONObj() ); } - bool ChunkManager::maybeChunkCollection() { - ensureIndex_inlock(); - + void ChunkManager::maybeChunkCollection() const { uassert( 13346 , "can't pre-split already splitted collection" , (_chunkMap.size() == 1) ); - + ChunkPtr soleChunk = _chunkMap.begin()->second; vector<BSONObj> splitPoints; soleChunk->pickSplitVector( splitPoints , Chunk::MaxChunkSize ); if ( splitPoints.empty() ) { - log(1) << "not enough data to warrant chunking " << getns() << endl; - return false; + LOG(1) << "not enough data to warrant chunking " << getns() << endl; + return; } - + BSONObj res; - bool worked = soleChunk->multiSplit( splitPoints , res , false ); + ChunkPtr p; + bool worked = soleChunk->multiSplit( splitPoints , res ); if (!worked) { log( LL_WARNING ) << "could not split '" << getns() << "': " << res << endl; - return false; + return; } - return true; } ShardChunkVersion ChunkManager::getVersion( const Shard& shard ) const { - rwlock lk( _lock , false ); - // TODO: cache or something? - - ShardChunkVersion max = 0; - - for ( ChunkMap::const_iterator i=_chunkMap.begin(); i!=_chunkMap.end(); ++i ) { - ChunkPtr c = i->second; - DEV assert( c ); - if ( c->getShard() != shard ) - continue; - if ( c->getLastmod() > max ) - max = c->getLastmod(); - } - return max; + ShardVersionMap::const_iterator i = _shardVersions.find( shard ); + if ( i == _shardVersions.end() ) + return 0; + return i->second; } ShardChunkVersion ChunkManager::getVersion() const { - rwlock lk( _lock , false ); - - ShardChunkVersion max = 0; - - for ( ChunkMap::const_iterator i=_chunkMap.begin(); i!=_chunkMap.end(); ++i ) { - ChunkPtr c = i->second; - if ( c->getLastmod() > max ) - max = c->getLastmod(); - } - - return max; + return _version; } string ChunkManager::toString() const { - rwlock lk( _lock , false ); - stringstream ss; ss << "ChunkManager: " << _ns << " key:" << _key.toString() << '\n'; for ( ChunkMap::const_iterator i=_chunkMap.begin(); i!=_chunkMap.end(); ++i ) { @@ -993,69 +929,6 @@ namespace mongo { } } - void ChunkRangeManager::reloadRange(const ChunkMap& chunks, const BSONObj& min, const BSONObj& max) { - if (_ranges.empty()) { - reloadAll(chunks); - return; - } - - ChunkRangeMap::iterator low = _ranges.upper_bound(min); - ChunkRangeMap::iterator high = _ranges.lower_bound(max); - - assert(low != _ranges.end()); - assert(high != _ranges.end()); - assert(low->second); - assert(high->second); - - ChunkMap::const_iterator begin = chunks.upper_bound(low->second->getMin()); - ChunkMap::const_iterator end = chunks.lower_bound(high->second->getMax()); - - assert(begin != chunks.end()); - assert(end != chunks.end()); - - // C++ end iterators are one-past-last - ++high; - ++end; - - // update ranges - _ranges.erase(low, high); // invalidates low - _insertRange(begin, end); - - assert(!_ranges.empty()); - DEV assertValid(); - - // merge low-end if possible - low = _ranges.upper_bound(min); - assert(low != _ranges.end()); - if (low != _ranges.begin()) { - shared_ptr<ChunkRange> a = prior(low)->second; - shared_ptr<ChunkRange> b = low->second; - if (a->getShard() == b->getShard()) { - shared_ptr<ChunkRange> cr (new ChunkRange(*a, *b)); - _ranges.erase(prior(low)); - _ranges.erase(low); // invalidates low - _ranges[cr->getMax()] = cr; - } - } - - DEV assertValid(); - - // merge high-end if possible - high = _ranges.lower_bound(max); - if (high != prior(_ranges.end())) { - shared_ptr<ChunkRange> a = high->second; - shared_ptr<ChunkRange> b = boost::next(high)->second; - if (a->getShard() == b->getShard()) { - shared_ptr<ChunkRange> cr (new ChunkRange(*a, *b)); - _ranges.erase(boost::next(high)); - _ranges.erase(high); //invalidates high - _ranges[cr->getMax()] = cr; - } - } - - DEV assertValid(); - } - void ChunkRangeManager::reloadAll(const ChunkMap& chunks) { _ranges.clear(); _insertRange(chunks.begin(), chunks.end()); @@ -1095,13 +968,6 @@ namespace mongo { class ChunkObjUnitTest : public UnitTest { public: - void runShard() { - ChunkPtr c; - assert( ! c ); - c.reset( new Chunk( 0 ) ); - assert( c ); - } - void runShardChunkVersion() { vector<ShardChunkVersion> all; all.push_back( ShardChunkVersion(1,1) ); @@ -1118,9 +984,8 @@ namespace mongo { } void run() { - runShard(); runShardChunkVersion(); - log(1) << "shardObjTest passed" << endl; + LOG(1) << "shardObjTest passed" << endl; } } shardObjTest; @@ -1145,7 +1010,7 @@ namespace mongo { cmdBuilder.append( "shardHost" , s.getConnString() ); BSONObj cmd = cmdBuilder.obj(); - log(1) << " setShardVersion " << s.getName() << " " << conn.getServerAddress() << " " << ns << " " << cmd << " " << &conn << endl; + LOG(1) << " setShardVersion " << s.getName() << " " << conn.getServerAddress() << " " << ns << " " << cmd << " " << &conn << endl; return conn.runCommand( "admin" , cmd , result ); } |