diff options
Diffstat (limited to 's/chunk.cpp')
-rw-r--r-- | s/chunk.cpp | 1203 |
1 files changed, 898 insertions, 305 deletions
diff --git a/s/chunk.cpp b/s/chunk.cpp index 73d17d9..5df3b69 100644 --- a/s/chunk.cpp +++ b/s/chunk.cpp @@ -16,28 +16,52 @@ * along with this program. If not, see <http://www.gnu.org/licenses/>. */ -#include "stdafx.h" +#include "pch.h" #include "chunk.h" #include "config.h" +#include "grid.h" #include "../util/unittest.h" #include "../client/connpool.h" +#include "../client/distlock.h" +#include "../db/queryutil.h" #include "cursors.h" #include "strategy.h" namespace mongo { + inline bool allOfType(BSONType type, const BSONObj& o){ + BSONObjIterator it(o); + while(it.more()){ + if (it.next().type() != type) + return false; + } + return true; + } + + RWLock chunkSplitLock("rw:chunkSplitLock"); + // ------- Shard -------- - int Chunk::MaxChunkSize = 1024 * 1204 * 200; + int Chunk::MaxChunkSize = 1024 * 1024 * 200; - Chunk::Chunk( ChunkManager * manager ) : _manager( manager ){ - _modified = false; - _lastmod = 0; - _dataWritten = 0; + Chunk::Chunk( ChunkManager * manager ) + : _manager(manager), + _lastmod(0), _modified(false), _dataWritten(0) + {} + + Chunk::Chunk(ChunkManager * info , const BSONObj& min, const BSONObj& max, const Shard& shard) + : _manager(info), _min(min), _max(max), _shard(shard), + _lastmod(0), _modified(false), _dataWritten(0) + {} + + string Chunk::getns() const { + assert( _manager ); + return _manager->getns(); } - void Chunk::setShard( string s ){ + void Chunk::setShard( const Shard& s ){ _shard = s; + _manager->_migrationNotification(this); _markModified(); } @@ -47,18 +71,33 @@ namespace mongo { _manager->getShardKey().compare( obj , getMax() ) < 0; } + bool ChunkRange::contains(const BSONObj& obj) const { + // same as Chunk method + return + _manager->getShardKey().compare( getMin() , obj ) <= 0 && + _manager->getShardKey().compare( obj , getMax() ) < 0; + } + + bool Chunk::minIsInf() const { + return _manager->getShardKey().globalMin().woCompare( getMin() ) == 0; + } + + bool Chunk::maxIsInf() const { + return _manager->getShardKey().globalMax().woCompare( getMax() ) == 0; + } + BSONObj Chunk::pickSplitPoint() const{ int sort = 0; - if ( _manager->getShardKey().globalMin().woCompare( getMin() ) == 0 ){ + if ( minIsInf() ){ sort = 1; } - else if ( _manager->getShardKey().globalMax().woCompare( getMax() ) == 0 ){ + else if ( maxIsInf() ){ sort = -1; } if ( sort ){ - ScopedDbConnection conn( getShard() ); + ShardConnection conn( getShard().getConnString() , _manager->getns() ); Query q; if ( sort == 1 ) q.sort( _manager->getShardKey().key() ); @@ -75,212 +114,313 @@ namespace mongo { q.sort( r.obj() ); } - BSONObj end = conn->findOne( _ns , q ); + BSONObj end = conn->findOne( _manager->getns() , q ); conn.done(); if ( ! end.isEmpty() ) return _manager->getShardKey().extractKey( end ); } - ScopedDbConnection conn( getShard() ); + BSONObj cmd = BSON( "medianKey" << _manager->getns() + << "keyPattern" << _manager->getShardKey().key() + << "min" << getMin() + << "max" << getMax() ); + + ScopedDbConnection conn( getShard().getConnString() ); BSONObj result; - if ( ! conn->runCommand( "admin" , BSON( "medianKey" << _ns - << "keyPattern" << _manager->getShardKey().key() - << "min" << getMin() - << "max" << getMax() - ) , result ) ){ + if ( ! conn->runCommand( "admin" , cmd , result ) ){ stringstream ss; ss << "medianKey command failed: " << result; uassert( 10164 , ss.str() , 0 ); } - BSONObj median = result.getObjectField( "median" ); - if (median == getMin()){ - //TODO compound support - BSONElement key = getMin().firstElement(); - BSONObjBuilder b; - b.appendAs("$gt", key); + BSONObj median = result.getObjectField( "median" ).getOwned(); + conn.done(); - Query q = QUERY(key.fieldName() << b.obj()); + + if (median == getMin()){ + Query q; + q.minKey(_min).maxKey(_max); q.sort(_manager->getShardKey().key()); - median = conn->findOne(_ns, q); + median = conn->findOne(_manager->getns(), q); median = _manager->getShardKey().extractKey( median ); - PRINT(median); } + + if ( median < getMin() || median >= getMax() ){ + stringstream ss; + ss << "medianKey returned value out of range. " + << " cmd: " << cmd + << " result: " << result; + uasserted( 13394 , ss.str() ); + } + + return median; + } + void Chunk::pickSplitVector( vector<BSONObj>* splitPoints ) const { + // Ask the mongod holding this chunk to figure out the split points. + ScopedDbConnection conn( getShard().getConnString() ); + BSONObj result; + BSONObjBuilder cmd; + cmd.append( "splitVector" , _manager->getns() ); + cmd.append( "keyPattern" , _manager->getShardKey().key() ); + cmd.append( "maxChunkSize" , Chunk::MaxChunkSize / (1<<20) ); + BSONObj cmdObj = cmd.obj(); + + if ( ! conn->runCommand( "admin" , cmdObj , result )){ + ostringstream os; + os << "splitVector command failed: " << result; + uassert( 13345 , os.str() , 0 ); + } + + BSONObjIterator it( result.getObjectField( "splitKeys" ) ); + while ( it.more() ){ + splitPoints->push_back( it.next().Obj().getOwned() ); + } conn.done(); - - return median.getOwned(); } - Chunk * Chunk::split(){ - return split( pickSplitPoint() ); + ChunkPtr Chunk::split(){ + vector<BSONObj> splitPoints; + splitPoints.push_back( pickSplitPoint() ); + return multiSplit( splitPoints ); } - Chunk * Chunk::split( const BSONObj& m ){ - uassert( 10165 , "can't split as shard that doesn't have a manager" , _manager ); - - log(1) << " before split on: " << m << "\n" - << "\t self : " << toString() << endl; + ChunkPtr Chunk::multiSplit( const vector<BSONObj>& m ){ + const size_t maxSplitPoints = 256; - uassert( 10166 , "locking namespace on server failed" , lockNamespaceOnServer( getShard() , _ns ) ); - uassert( 13003 , "can't split chunk. does it have only one distinct value?" , - !m.isEmpty() && _min.woCompare(m) && _max.woCompare(m)); + uassert( 10165 , "can't split as shard doesn't have a manager" , _manager ); + uassert( 13332 , "need a split key to split chunk" , !m.empty() ); + uassert( 13333 , "can't split a chunk in that many parts", m.size() < maxSplitPoints ); + uassert( 13003 , "can't split a chunk with only one distinct value" , _min.woCompare(_max) ); - Chunk * s = new Chunk( _manager ); - s->_ns = _ns; - s->_shard = _shard; - s->setMin(m.getOwned()); - s->setMax(_max); + DistributedLock lockSetup( ConnectionString( modelServer() , ConnectionString::SYNC ) , getns() ); + dist_lock_try dlk( &lockSetup , string("split-") + toString() ); + uassert( 10166 , "locking namespace failed" , dlk.got() ); - s->_markModified(); + { + ShardChunkVersion onServer = getVersionOnConfigServer(); + ShardChunkVersion mine = _lastmod; + if ( onServer > mine ){ + stringstream ss; + ss << "mulitSplit failing because config not up to date" + << " onServer: " << onServer.toString() + << " mine: " << mine.toString(); + + //reload config + grid.getDBConfig(_manager->_ns)->getChunkManager(_manager->_ns, true); + + uasserted( 13387 , ss.str() ); + } + } + + BSONObjBuilder detail; + appendShortVersion( "before" , detail ); + log(1) << "before split on " << m.size() << " points " << toString() << endl; + + // Iterate over the split points in 'm', splitting off a new chunk per entry. That chunk's range + // covers until the next entry in 'm' or _max . + vector<ChunkPtr> newChunks; + vector<BSONObj>::const_iterator i = m.begin(); + BSONObj nextPoint = i->getOwned(); _markModified(); + do { + BSONObj splitPoint = nextPoint; + log(4) << "splitPoint: " << splitPoint << endl; + nextPoint = (++i != m.end()) ? i->getOwned() : _max.getOwned(); + log(4) << "nextPoint: " << nextPoint << endl; + + if ( nextPoint <= splitPoint) { + stringstream ss; + ss << "multiSplit failing because keys min: " << splitPoint << " and max: " << nextPoint + << " do not define a valid chunk"; + uasserted( 13395, ss.str() ); + } + + ChunkPtr s( new Chunk( _manager, splitPoint , nextPoint , _shard) ); + s->_markModified(); + newChunks.push_back(s); + } while ( i != m.end() ); + + // Have the chunk manager reflect the key change for the first chunk and create an entry for every + // new chunk spawned by it. + { + rwlock lk( _manager->_lock , true ); + + setMax(m[0].getOwned()); + DEV assert( shared_from_this() ); + _manager->_chunkMap[_max] = shared_from_this(); + + for ( vector<ChunkPtr>::const_iterator it = newChunks.begin(); it != newChunks.end(); ++it ){ + ChunkPtr s = *it; + _manager->_chunkMap[s->getMax()] = s; + } + } - _manager->_chunks.push_back( s ); - - setMax(m.getOwned()); - - log(1) << " after split:\n" - << "\t left : " << toString() << "\n" - << "\t right: "<< s->toString() << endl; - - + log(1) << "after split adjusted range: " << toString() << endl; + for ( vector<ChunkPtr>::const_iterator it = newChunks.begin(); it != newChunks.end(); ++it ){ + ChunkPtr s = *it; + log(1) << "after split created new chunk: " << s->toString() << endl; + } + + // Save the new key boundaries in the configDB. _manager->save(); - - return s; - } - bool Chunk::moveAndCommit( const string& to , string& errmsg ){ - uassert( 10167 , "can't move shard to its current location!" , to != getShard() ); + // Log all these changes in the configDB's log. We log a simple split differently than a multi-split. + if ( newChunks.size() == 1) { + appendShortVersion( "left" , detail ); + newChunks[0]->appendShortVersion( "right" , detail ); + configServer.logChange( "split" , _manager->getns(), detail.obj() ); + + } else { + BSONObj beforeDetailObj = detail.obj(); + BSONObj firstDetailObj = beforeDetailObj.getOwned(); + const int newChunksSize = newChunks.size(); + + BSONObjBuilder firstDetail; + firstDetail.appendElements( beforeDetailObj ); + firstDetail.append( "number" , 0 ); + firstDetail.append( "of" , newChunksSize ); + appendShortVersion( "chunk" , firstDetail ); + configServer.logChange( "multi-split" , _manager->getns() , firstDetail.obj() ); + + for ( int i=0; i < newChunksSize; i++ ){ + BSONObjBuilder chunkDetail; + chunkDetail.appendElements( beforeDetailObj ); + chunkDetail.append( "number", i+1 ); + chunkDetail.append( "of" , newChunksSize ); + newChunks[i]->appendShortVersion( "chunk" , chunkDetail ); + configServer.logChange( "multi-split" , _manager->getns() , chunkDetail.obj() ); + } + } + + return newChunks[0]; + } - log() << "moving chunk ns: " << _ns << " moving chunk: " << toString() << " " << _shard << " -> " << to << endl; + bool Chunk::moveAndCommit( const Shard& to , string& errmsg ){ + uassert( 10167 , "can't move shard to its current location!" , getShard() != to ); - string from = _shard; - ShardChunkVersion oldVersion = _manager->getVersion( from ); + log() << "moving chunk ns: " << _manager->getns() << " moving ( " << toString() << ") " << _shard.toString() << " -> " << to.toString() << endl; - BSONObj filter; - { - BSONObjBuilder b; - getFilter( b ); - filter = b.obj(); - } + Shard from = _shard; - ScopedDbConnection fromconn( from ); + ScopedDbConnection fromconn( from); - BSONObj startRes; + BSONObj res; bool worked = fromconn->runCommand( "admin" , - BSON( "movechunk.start" << _ns << - "from" << from << - "to" << to << - "filter" << filter + BSON( "moveChunk" << _manager->getns() << + "from" << from.getConnString() << + "to" << to.getConnString() << + "min" << _min << + "max" << _max << + "shardId" << genID() << + "configdb" << configServer.modelServer() ) , - startRes + res ); - if ( ! worked ){ - errmsg = (string)"movechunk.start failed: " + startRes.toString(); - fromconn.done(); - return false; - } - - // update config db - setShard( to ); - - // need to increment version # for old server - Chunk * randomChunkOnOldServer = _manager->findChunkOnServer( from ); - if ( randomChunkOnOldServer ) - randomChunkOnOldServer->_markModified(); - - _manager->save(); - - BSONObj finishRes; - { + fromconn.done(); - ShardChunkVersion newVersion = _manager->getVersion( from ); - if ( newVersion == 0 && oldVersion > 0 ){ - newVersion = oldVersion; - newVersion++; - _manager->save(); - } - else if ( newVersion <= oldVersion ){ - log() << "newVersion: " << newVersion << " oldVersion: " << oldVersion << endl; - uassert( 10168 , "version has to be higher" , newVersion > oldVersion ); - } - - BSONObjBuilder b; - b << "movechunk.finish" << _ns; - b << "to" << to; - b.appendTimestamp( "newVersion" , newVersion ); - b.append( startRes["finishToken"] ); - - worked = fromconn->runCommand( "admin" , - b.done() , - finishRes ); - } - - if ( ! worked ){ - errmsg = (string)"movechunk.finish failed: " + finishRes.toString(); - fromconn.done(); - return false; + if ( worked ){ + _manager->_reload(); + return true; } - fromconn.done(); - return true; + errmsg = res["errmsg"].String(); + errmsg += " " + res.toString(); + return false; } bool Chunk::splitIfShould( long dataWritten ){ + LastError::Disabled d( lastError.get() ); + try { + return _splitIfShould( dataWritten ); + } + catch ( std::exception& e ){ + log( LL_ERROR ) << "splitIfShould failed: " << e.what() << endl; + return false; + } + } + + bool Chunk::_splitIfShould( long dataWritten ){ _dataWritten += dataWritten; - if ( _dataWritten < MaxChunkSize / 5 ) + // split faster in early chunks helps spread out an initial load better + int splitThreshold; + const int minChunkSize = 1 << 20; // 1 MBytes + int numChunks = getManager()->numChunks(); + if ( numChunks < 10 ){ + splitThreshold = max( MaxChunkSize / 4 , minChunkSize ); + } else if ( numChunks < 20 ){ + splitThreshold = max( MaxChunkSize / 2 , minChunkSize ); + } else { + splitThreshold = max( MaxChunkSize , minChunkSize ); + } + + if ( minIsInf() || maxIsInf() ){ + splitThreshold = (int) ((double)splitThreshold * .9); + } + + if ( _dataWritten < splitThreshold / 5 ) + return false; + + if ( ! chunkSplitLock.lock_try(0) ) return false; - log(1) << "\t want to split chunk : " << this << endl; + rwlock lk( chunkSplitLock , 1 , true ); + + log(3) << "\t splitIfShould : " << *this << endl; _dataWritten = 0; - BSONObj split_point = pickSplitPoint(); - if ( split_point.isEmpty() || _min == split_point || _max == split_point) { + BSONObj splitPoint = pickSplitPoint(); + if ( splitPoint.isEmpty() || _min == splitPoint || _max == splitPoint) { log() << "SHARD PROBLEM** shard is too big, but can't split: " << toString() << endl; return false; } long size = getPhysicalSize(); - if ( size < MaxChunkSize ) + if ( size < splitThreshold ) return false; - log() << "autosplitting " << _ns << " size: " << size << " shard: " << toString() << endl; - Chunk * newShard = split(split_point); + log() << "autosplitting " << _manager->getns() << " size: " << size << " shard: " << toString() + << " on: " << splitPoint << "(splitThreshold " << splitThreshold << ")" << endl; + + vector<BSONObj> splitPoints; + splitPoints.push_back( splitPoint ); + ChunkPtr newShard = multiSplit( splitPoints ); moveIfShould( newShard ); return true; } - bool Chunk::moveIfShould( Chunk * newChunk ){ - Chunk * toMove = 0; + bool Chunk::moveIfShould( ChunkPtr newChunk ){ + ChunkPtr toMove; - if ( newChunk->countObjects() <= 1 ){ + if ( newChunk->countObjects(2) <= 1 ){ toMove = newChunk; } - else if ( this->countObjects() <= 1 ){ - toMove = this; + else if ( this->countObjects(2) <= 1 ){ + DEV assert( shared_from_this() ); + toMove = shared_from_this(); } else { - log(1) << "don't know how to decide if i should move inner shard" << endl; + // moving middle shards is handled by balancer + return false; } - if ( ! toMove ) - return false; + assert( toMove ); - string newLocation = grid.pickShardForNewDB(); - if ( newLocation == getShard() ){ + Shard newLocation = Shard::pick(); + if ( getShard() == newLocation ){ // if this is the best server, then we shouldn't do anything! - log(1) << "not moving chunk: " << toString() << " b/c would move to same place " << newLocation << " -> " << getShard() << endl; + log(1) << "not moving chunk: " << toString() << " b/c would move to same place " << newLocation.toString() << " -> " << getShard().toString() << endl; return 0; } - log() << "moving chunk (auto): " << toMove->toString() << " to: " << newLocation << " #objcets: " << toMove->countObjects() << endl; + log() << "moving chunk (auto): " << toMove->toString() << " to: " << newLocation.toString() << " #objects: " << toMove->countObjects() << endl; string errmsg; massert( 10412 , (string)"moveAndCommit failed: " + errmsg , @@ -290,32 +430,43 @@ namespace mongo { } long Chunk::getPhysicalSize() const{ - ScopedDbConnection conn( getShard() ); + ScopedDbConnection conn( getShard().getConnString() ); BSONObj result; - uassert( 10169 , "datasize failed!" , conn->runCommand( "admin" , BSON( "datasize" << _ns - << "keyPattern" << _manager->getShardKey().key() - << "min" << getMin() - << "max" << getMax() - ) , result ) ); + uassert( 10169 , "datasize failed!" , conn->runCommand( "admin" , + BSON( "datasize" << _manager->getns() + << "keyPattern" << _manager->getShardKey().key() + << "min" << getMin() + << "max" << getMax() + << "maxSize" << ( MaxChunkSize + 1 ) + ) , result ) ); conn.done(); return (long)result["size"].number(); } - - long Chunk::countObjects( const BSONObj& filter ) const{ - ScopedDbConnection conn( getShard() ); - - BSONObj f = getFilter(); - if ( ! filter.isEmpty() ) - f = ClusteredCursor::concatQuery( f , filter ); + int Chunk::countObjects(int maxCount) const { + static const BSONObj fields = BSON("_id" << 1 ); - BSONObj result; - unsigned long long n = conn->count( _ns , f ); + ShardConnection conn( getShard() , _manager->getns() ); + // not using regular count as this is more flexible and supports $min/$max + Query q = Query().minKey(_min).maxKey(_max); + int n; + { + auto_ptr<DBClientCursor> c = conn->query(_manager->getns(), q, maxCount, 0, &fields); + assert( c.get() ); + n = c->itcount(); + } conn.done(); - return (long)n; + return n; + } + + void Chunk::appendShortVersion( const char * name , BSONObjBuilder& b ){ + BSONObjBuilder bb( b.subobjStart( name ) ); + bb.append( "min" , _min ); + bb.append( "max" , _max ); + bb.done(); } bool Chunk::operator==( const Chunk& s ) const{ @@ -325,81 +476,86 @@ namespace mongo { ; } - void Chunk::getFilter( BSONObjBuilder& b ) const{ - _manager->_key.getFilter( b , _min , _max ); - } - - void Chunk::serialize(BSONObjBuilder& to){ - if ( _lastmod ) + void Chunk::serialize(BSONObjBuilder& to,ShardChunkVersion myLastMod){ + + to.append( "_id" , genID( _manager->getns() , _min ) ); + + if ( myLastMod.isSet() ){ + to.appendTimestamp( "lastmod" , myLastMod ); + } + else if ( _lastmod.isSet() ){ + assert( _lastmod > 0 && _lastmod < 1000 ); to.appendTimestamp( "lastmod" , _lastmod ); - else - to.appendTimestamp( "lastmod" ); + } + else { + assert(0); + } - to << "ns" << _ns; + to << "ns" << _manager->getns(); to << "min" << _min; to << "max" << _max; - to << "shard" << _shard; + to << "shard" << _shard.getName(); + } + + string Chunk::genID( const string& ns , const BSONObj& o ) { + StringBuilder buf( ns.size() + o.objsize() + 16 ); + buf << ns << "-"; + + BSONObjIterator i(o); + while ( i.more() ){ + BSONElement e = i.next(); + buf << e.fieldName() << "_" << e.toString(false, true); + } + + return buf.str(); } void Chunk::unserialize(const BSONObj& from){ - _ns = from.getStringField( "ns" ); - _shard = from.getStringField( "shard" ); - _lastmod = from.hasField( "lastmod" ) ? from["lastmod"]._numberLong() : 0; + string ns = from.getStringField( "ns" ); + _shard.reset( from.getStringField( "shard" ) ); + + _lastmod = from["lastmod"]; + assert( _lastmod > 0 ); BSONElement e = from["minDotted"]; - cout << from << endl; + if (e.eoo()){ _min = from.getObjectField( "min" ).getOwned(); _max = from.getObjectField( "max" ).getOwned(); - } else { // TODO delete this case after giving people a chance to migrate + } + else { // TODO delete this case after giving people a chance to migrate _min = e.embeddedObject().getOwned(); _max = from.getObjectField( "maxDotted" ).getOwned(); } - uassert( 10170 , "Chunk needs a ns" , ! _ns.empty() ); - uassert( 10171 , "Chunk needs a server" , ! _ns.empty() ); + uassert( 10170 , "Chunk needs a ns" , ! ns.empty() ); + uassert( 13327 , "Chunk ns must match server ns" , ns == _manager->getns() ); + + uassert( 10171 , "Chunk needs a server" , _shard.ok() ); uassert( 10172 , "Chunk needs a min" , ! _min.isEmpty() ); uassert( 10173 , "Chunk needs a max" , ! _max.isEmpty() ); } - string Chunk::modelServer() { + string Chunk::modelServer() const { // TODO: this could move around? return configServer.modelServer(); } - void Chunk::_markModified(){ - _modified = true; - // set to 0 so that the config server sets it - _lastmod = 0; + ShardChunkVersion Chunk::getVersionOnConfigServer() const { + ScopedDbConnection conn( modelServer() ); + BSONObj o = conn->findOne( ShardNS::chunk , BSON( "_id" << genID() ) ); + conn.done(); + return o["lastmod"]; } - void Chunk::save( bool check ){ - bool reload = ! _lastmod; - Model::save( check ); - if ( reload ){ - // need to do this so that we get the new _lastMod and therefore version number - massert( 10413 , "_id has to be filled in already" , ! _id.isEmpty() ); - - string b = toString(); - BSONObj q = _id.copy(); - massert( 10414 , "how could load fail?" , load( q ) ); - log(2) << "before: " << q << "\t" << b << endl; - log(2) << "after : " << _id << "\t" << toString() << endl; - massert( 10415 , "chunk reload changed content!" , b == toString() ); - massert( 10416 , "id changed!" , q["_id"] == _id["_id"] ); - } - } - - void Chunk::ensureIndex(){ - ScopedDbConnection conn( getShard() ); - conn->ensureIndex( _ns , _manager->getShardKey().key() , _manager->_unique ); - conn.done(); + void Chunk::_markModified(){ + _modified = true; } string Chunk::toString() const { stringstream ss; - ss << "shard ns:" << _ns << " shard: " << _shard << " min: " << _min << " max: " << _max; + ss << "ns:" << _manager->getns() << " at: " << _shard.toString() << " lastmod: " << _lastmod.toString() << " min: " << _min << " max: " << _max; return ss.str(); } @@ -410,139 +566,291 @@ namespace mongo { // ------- ChunkManager -------- - unsigned long long ChunkManager::NextSequenceNumber = 1; + AtomicUInt ChunkManager::NextSequenceNumber = 1; ChunkManager::ChunkManager( DBConfig * config , string ns , ShardKeyPattern pattern , bool unique ) : - _config( config ) , _ns( ns ) , _key( pattern ) , _unique( unique ){ - Chunk temp(0); + _config( config ) , _ns( ns ) , + _key( pattern ) , _unique( unique ) , + _sequenceNumber( ++NextSequenceNumber ), _lock("rw:ChunkManager") + { + _reload_inlock(); + + if ( _chunkMap.empty() ){ + ChunkPtr c( new Chunk(this, _key.globalMin(), _key.globalMax(), config->getPrimary()) ); + c->_markModified(); + + _chunkMap[c->getMax()] = c; + _chunkRanges.reloadAll(_chunkMap); + + _shards.insert(c->getShard()); + + save_inlock(); + log() << "no chunks for:" << ns << " so creating first: " << c->toString() << endl; + } + } + + ChunkManager::~ChunkManager(){ + _chunkMap.clear(); + _chunkRanges.clear(); + _shards.clear(); + } + + void ChunkManager::_reload(){ + rwlock lk( _lock , true ); + _reload_inlock(); + } + + void ChunkManager::_reload_inlock(){ + int tries = 3; + while (tries--){ + _chunkMap.clear(); + _chunkRanges.clear(); + _shards.clear(); + _load(); + + if (_isValid()){ + _chunkRanges.reloadAll(_chunkMap); + return; + } + + if (_chunkMap.size() < 10){ + _printChunks(); + } + sleepmillis(10 * (3-tries)); + sleepsecs(10); + } + msgasserted(13282, "Couldn't load a valid config for " + _ns + " after 3 tries. Giving up"); + + } + + void ChunkManager::_load(){ + static Chunk temp(0); ScopedDbConnection conn( temp.modelServer() ); - auto_ptr<DBClientCursor> cursor = conn->query( temp.getNS() , BSON( "ns" << ns ) ); + + auto_ptr<DBClientCursor> cursor = conn->query(temp.getNS(), QUERY("ns" << _ns).sort("lastmod",1), 0, 0, 0, 0, + (DEBUG_BUILD ? 2 : 1000000)); // batch size. Try to induce potential race conditions in debug builds + assert( cursor.get() ); while ( cursor->more() ){ BSONObj d = cursor->next(); if ( d["isMaxMarker"].trueValue() ){ continue; } - - Chunk * c = new Chunk( this ); + + ChunkPtr c( new Chunk( this ) ); c->unserialize( d ); - _chunks.push_back( c ); - c->_id = d["_id"].wrap().getOwned(); + + _chunkMap[c->getMax()] = c; + _shards.insert(c->getShard()); + } conn.done(); - - if ( _chunks.size() == 0 ){ - Chunk * c = new Chunk( this ); - c->_ns = ns; - c->setMin(_key.globalMin()); - c->setMax(_key.globalMax()); - c->_shard = config->getPrimary(); - c->_markModified(); - - _chunks.push_back( c ); - - log() << "no chunks for:" << ns << " so creating first: " << c->toString() << endl; + } + + bool ChunkManager::_isValid() const { +#define ENSURE(x) do { if(!(x)) { log() << "ChunkManager::_isValid failed: " #x << endl; return false; } } while(0) + + if (_chunkMap.empty()) + return true; + + // Check endpoints + ENSURE(allOfType(MinKey, _chunkMap.begin()->second->getMin())); + ENSURE(allOfType(MaxKey, prior(_chunkMap.end())->second->getMax())); + + // Make sure there are no gaps or overlaps + for (ChunkMap::const_iterator it=boost::next(_chunkMap.begin()), end=_chunkMap.end(); it != end; ++it){ + ChunkMap::const_iterator last = prior(it); + + if (!(it->second->getMin() == last->second->getMax())){ + PRINT(it->second->toString()); + PRINT(it->second->getMin()); + PRINT(last->second->getMax()); + } + ENSURE(it->second->getMin() == last->second->getMax()); } - _sequenceNumber = ++NextSequenceNumber; + return true; + +#undef ENSURE } - - ChunkManager::~ChunkManager(){ - for ( vector<Chunk*>::iterator i=_chunks.begin(); i != _chunks.end(); i++ ){ - delete( *i ); + + void ChunkManager::_printChunks() const { + for (ChunkMap::const_iterator it=_chunkMap.begin(), end=_chunkMap.end(); it != end; ++it) { + log() << *it->second << endl; } - _chunks.clear(); } bool ChunkManager::hasShardKey( const BSONObj& obj ){ return _key.hasShardKey( obj ); } - Chunk& ChunkManager::findChunk( const BSONObj & obj ){ + ChunkPtr ChunkManager::findChunk( const BSONObj & obj , bool retry ){ + BSONObj key = _key.extractKey(obj); - for ( vector<Chunk*>::iterator i=_chunks.begin(); i != _chunks.end(); i++ ){ - Chunk * c = *i; - if ( c->contains( obj ) ) - return *c; + { + rwlock lk( _lock , false ); + + BSONObj foo; + ChunkPtr c; + { + ChunkMap::iterator it = _chunkMap.upper_bound(key); + if (it != _chunkMap.end()){ + foo = it->first; + c = it->second; + } + } + + if ( c ){ + if ( c->contains( obj ) ) + return c; + + PRINT(foo); + PRINT(*c); + PRINT(key); + + _reload_inlock(); + massert(13141, "Chunk map pointed to incorrect chunk", false); + } } - stringstream ss; - ss << "couldn't find a chunk which should be impossible extracted: " << _key.extractKey( obj ); - throw UserException( 8070 , ss.str() ); - } - Chunk* ChunkManager::findChunkOnServer( const string& server ) const { + if ( retry ){ + stringstream ss; + ss << "couldn't find a chunk aftry retry which should be impossible extracted: " << key; + throw UserException( 8070 , ss.str() ); + } + + log() << "ChunkManager: couldn't find chunk for: " << key << " going to retry" << endl; + _reload_inlock(); + return findChunk( obj , true ); + } - for ( vector<Chunk*>::const_iterator i=_chunks.begin(); i!=_chunks.end(); i++ ){ - Chunk * c = *i; - if ( c->getShard() == server ) + ChunkPtr ChunkManager::findChunkOnServer( const Shard& shard ) const { + rwlock lk( _lock , false ); + + for ( ChunkMap::const_iterator i=_chunkMap.begin(); i!=_chunkMap.end(); ++i ){ + ChunkPtr c = i->second; + if ( c->getShard() == shard ) return c; } - return 0; + return ChunkPtr(); } - int ChunkManager::getChunksForQuery( vector<Chunk*>& chunks , const BSONObj& query ){ - int added = 0; - - for ( vector<Chunk*>::iterator i=_chunks.begin(); i != _chunks.end(); i++ ){ - Chunk * c = *i; - if ( _key.relevantForQuery( query , c ) ){ - chunks.push_back( c ); - added++; + void ChunkManager::getShardsForQuery( set<Shard>& shards , const BSONObj& query ){ + rwlock lk( _lock , false ); + DEV PRINT(query); + + //TODO look into FieldRangeSetOr + FieldRangeOrSet fros(_ns.c_str(), query, false); + uassert(13088, "no support for special queries yet", fros.getSpecial().empty()); + + do { + boost::scoped_ptr<FieldRangeSet> frs (fros.topFrs()); + { + // special case if most-significant field isn't in query + FieldRange range = frs->range(_key.key().firstElement().fieldName()); + if ( !range.nontrivial() ){ + DEV PRINT(range.nontrivial()); + getAllShards(shards); + return; + } + } + + BoundList ranges = frs->indexBounds(_key.key(), 1); + for (BoundList::const_iterator it=ranges.begin(), end=ranges.end(); it != end; ++it){ + BSONObj minObj = it->first.replaceFieldNames(_key.key()); + BSONObj maxObj = it->second.replaceFieldNames(_key.key()); + + DEV PRINT(minObj); + DEV PRINT(maxObj); + + ChunkRangeMap::const_iterator min, max; + min = _chunkRanges.upper_bound(minObj); + max = _chunkRanges.upper_bound(maxObj); + + assert(min != _chunkRanges.ranges().end()); + + // make max non-inclusive like end iterators + if(max != _chunkRanges.ranges().end()) + ++max; + + for (ChunkRangeMap::const_iterator it=min; it != max; ++it){ + shards.insert(it->second->getShard()); + } + + // once we know we need to visit all shards no need to keep looping + //if (shards.size() == _shards.size()) + //return; } + + if (fros.moreOrClauses()) + fros.popOrClause(); + + } while (fros.moreOrClauses()); + } + + void ChunkManager::getShardsForRange(set<Shard>& shards, const BSONObj& min, const BSONObj& max){ + uassert(13405, "min must have shard key", hasShardKey(min)); + uassert(13406, "max must have shard key", hasShardKey(max)); + + ChunkRangeMap::const_iterator it = _chunkRanges.upper_bound(min); + ChunkRangeMap::const_iterator end = _chunkRanges.lower_bound(max); + + for (; it!=end; ++ it){ + shards.insert(it->second->getShard()); + + // once we know we need to visit all shards no need to keep looping + if (shards.size() == _shards.size()) + break; } - return added; } - void ChunkManager::getAllServers( set<string>& allServers ){ - for ( vector<Chunk*>::iterator i=_chunks.begin(); i != _chunks.end(); i++ ){ - allServers.insert( (*i)->getShard() ); - } + void ChunkManager::getAllShards( set<Shard>& all ){ + rwlock lk( _lock , false ); + all.insert(_shards.begin(), _shards.end()); } - void ChunkManager::ensureIndex(){ - set<string> seen; - - for ( vector<Chunk*>::const_iterator i=_chunks.begin(); i!=_chunks.end(); i++ ){ - Chunk * c = *i; - if ( seen.count( c->getShard() ) ) - continue; - seen.insert( c->getShard() ); - c->ensureIndex(); + void ChunkManager::ensureIndex_inlock(){ + //TODO in parallel? + for ( set<Shard>::const_iterator i=_shards.begin(); i!=_shards.end(); ++i ){ + ScopedDbConnection conn( i->getConnString() ); + conn->ensureIndex( getns() , getShardKey().key() , _unique ); + conn.done(); } } - void ChunkManager::drop(){ + void ChunkManager::drop( ChunkManagerPtr me ){ + rwlock lk( _lock , true ); + + configServer.logChange( "dropCollection.start" , _ns , BSONObj() ); + + DistributedLock lockSetup( ConnectionString( configServer.modelServer() , ConnectionString::SYNC ) , getns() ); + dist_lock_try dlk( &lockSetup , "drop" ); + uassert( 13331 , "locking namespace failed" , dlk.got() ); + uassert( 10174 , "config servers not all up" , configServer.allUp() ); - map<string,ShardChunkVersion> seen; + set<Shard> seen; log(1) << "ChunkManager::drop : " << _ns << endl; // lock all shards so no one can do a split/migrate - for ( vector<Chunk*>::const_iterator i=_chunks.begin(); i!=_chunks.end(); i++ ){ - Chunk * c = *i; - ShardChunkVersion& version = seen[ c->getShard() ]; - if ( version ) - continue; - version = lockNamespaceOnServer( c->getShard() , _ns ); - if ( version ) - continue; - - // rollback - uassert( 10175 , "don't know how to rollback locks b/c drop can't lock all shards" , 0 ); + for ( ChunkMap::const_iterator i=_chunkMap.begin(); i!=_chunkMap.end(); ++i ){ + ChunkPtr c = i->second; + seen.insert( c->getShard() ); } log(1) << "ChunkManager::drop : " << _ns << "\t all locked" << endl; // wipe my meta-data - _chunks.clear(); + _chunkMap.clear(); + _chunkRanges.clear(); + _shards.clear(); // delete data from mongod - for ( map<string,ShardChunkVersion>::iterator i=seen.begin(); i!=seen.end(); i++ ){ - string shard = i->first; - ScopedDbConnection conn( shard ); + for ( set<Shard>::iterator i=seen.begin(); i!=seen.end(); i++ ){ + ScopedDbConnection conn( *i ); conn->dropCollection( _ns ); conn.done(); } @@ -551,18 +859,16 @@ namespace mongo { // clean up database meta-data uassert( 10176 , "no sharding data?" , _config->removeSharding( _ns ) ); - _config->save(); - // remove chunk data - Chunk temp(0); + static Chunk temp(0); ScopedDbConnection conn( temp.modelServer() ); conn->remove( temp.getNS() , BSON( "ns" << _ns ) ); conn.done(); log(1) << "ChunkManager::drop : " << _ns << "\t removed chunk data" << endl; - for ( map<string,ShardChunkVersion>::iterator i=seen.begin(); i!=seen.end(); i++ ){ - ScopedDbConnection conn( i->first ); + for ( set<Shard>::iterator i=seen.begin(); i!=seen.end(); i++ ){ + ScopedDbConnection conn( *i ); BSONObj res; if ( ! setShardVersion( conn.conn() , _ns , 0 , true , res ) ) throw UserException( 8071 , (string)"OH KNOW, cleaning up after drop failed: " + res.toString() ); @@ -571,50 +877,156 @@ namespace mongo { log(1) << "ChunkManager::drop : " << _ns << "\t DONE" << endl; + configServer.logChange( "dropCollection" , _ns , BSONObj() ); } void ChunkManager::save(){ - ShardChunkVersion a = getVersion(); + rwlock lk( _lock , true ); + save_inlock(); + } + + void ChunkManager::save_inlock(){ + + ShardChunkVersion a = getVersion_inlock(); + assert( a > 0 || _chunkMap.size() <= 1 ); + ShardChunkVersion nextChunkVersion = a.incMajor(); + vector<ChunkPtr> toFix; + vector<ShardChunkVersion> newVersions; + + BSONObjBuilder cmdBuilder; + BSONArrayBuilder updates( cmdBuilder.subarrayStart( "applyOps" ) ); - set<string> withRealChunks; - for ( vector<Chunk*>::const_iterator i=_chunks.begin(); i!=_chunks.end(); i++ ){ - Chunk* c = *i; + int numOps = 0; + for ( ChunkMap::const_iterator i=_chunkMap.begin(); i!=_chunkMap.end(); ++i ){ + ChunkPtr c = i->second; if ( ! c->_modified ) continue; - c->save( true ); + + numOps++; _sequenceNumber = ++NextSequenceNumber; - withRealChunks.insert( c->getShard() ); + ShardChunkVersion myVersion = nextChunkVersion; + ++nextChunkVersion; + toFix.push_back( c ); + newVersions.push_back( myVersion ); + + BSONObjBuilder op; + op.append( "op" , "u" ); + op.appendBool( "b" , true ); + op.append( "ns" , ShardNS::chunk ); + + BSONObjBuilder n( op.subobjStart( "o" ) ); + c->serialize( n , myVersion ); + n.done(); + + BSONObjBuilder q( op.subobjStart( "o2" ) ); + q.append( "_id" , c->genID() ); + q.done(); + + updates.append( op.obj() ); } - massert( 10417 , "how did version get smalled" , getVersion() >= a ); + if ( numOps == 0 ) + return; + + updates.done(); + + if ( a > 0 || _chunkMap.size() > 1 ){ + BSONArrayBuilder temp( cmdBuilder.subarrayStart( "preCondition" ) ); + BSONObjBuilder b; + b.append( "ns" , ShardNS::chunk ); + b.append( "q" , BSON( "query" << BSON( "ns" << _ns ) << "orderby" << BSON( "lastmod" << -1 ) ) ); + { + BSONObjBuilder bb( b.subobjStart( "res" ) ); + bb.appendTimestamp( "lastmod" , a ); + bb.done(); + } + temp.append( b.obj() ); + temp.done(); + } + + BSONObj cmd = cmdBuilder.obj(); + + log(7) << "ChunkManager::save update: " << cmd << endl; + + ScopedDbConnection conn( Chunk(0).modelServer() ); + BSONObj res; + bool ok = conn->runCommand( "config" , cmd , res ); + conn.done(); + + if ( ! ok ){ + stringstream ss; + ss << "saving chunks failed. cmd: " << cmd << " result: " << res; + log( LL_ERROR ) << ss.str() << endl; + msgasserted( 13327 , ss.str() ); + } + + for ( unsigned i=0; i<toFix.size(); i++ ){ + toFix[i]->_lastmod = newVersions[i]; + } - ensureIndex(); // TODO: this is too aggressive - but not really sooo bad + massert( 10417 , "how did version get smalled" , getVersion_inlock() >= a ); + + ensureIndex_inlock(); // TODO: this is too aggressive - but not really sooo bad } - ShardChunkVersion ChunkManager::getVersion( const string& server ) const{ + void ChunkManager::maybeChunkCollection() { + uassert( 13346 , "can't pre-split already splitted collection" , (_chunkMap.size() == 1) ); + + ChunkPtr soleChunk = _chunkMap.begin()->second; + vector<BSONObj> splitPoints; + soleChunk->pickSplitVector( &splitPoints ); + if ( splitPoints.empty() ){ + log(1) << "not enough data to warrant chunking " << getns() << endl; + return; + } + + soleChunk->multiSplit( splitPoints ); + } + + ShardChunkVersion ChunkManager::getVersionOnConfigServer() const { + static Chunk temp(0); + + ScopedDbConnection conn( temp.modelServer() ); + + auto_ptr<DBClientCursor> cursor = conn->query(temp.getNS(), QUERY("ns" << _ns).sort("lastmod",1), 1 ); + assert( cursor.get() ); + BSONObj o; + if ( cursor->more() ) + o = cursor->next(); + conn.done(); + + return o["lastmod"]; + } + + ShardChunkVersion ChunkManager::getVersion( const Shard& shard ) const{ + rwlock lk( _lock , false ); // TODO: cache or something? ShardChunkVersion max = 0; - for ( vector<Chunk*>::const_iterator i=_chunks.begin(); i!=_chunks.end(); i++ ){ - Chunk* c = *i; - if ( c->getShard() != server ) + for ( ChunkMap::const_iterator i=_chunkMap.begin(); i!=_chunkMap.end(); ++i ){ + ChunkPtr c = i->second; + DEV assert( c ); + if ( c->getShard() != shard ) continue; - if ( c->_lastmod > max ) max = c->_lastmod; } - return max; } ShardChunkVersion ChunkManager::getVersion() const{ + rwlock lk( _lock , false ); + return getVersion_inlock(); + } + + ShardChunkVersion ChunkManager::getVersion_inlock() const{ ShardChunkVersion max = 0; - - for ( vector<Chunk*>::const_iterator i=_chunks.begin(); i!=_chunks.end(); i++ ){ - Chunk* c = *i; + + for ( ChunkMap::const_iterator i=_chunkMap.begin(); i!=_chunkMap.end(); ++i ){ + ChunkPtr c = i->second; if ( c->_lastmod > max ) max = c->_lastmod; } @@ -623,27 +1035,208 @@ namespace mongo { } string ChunkManager::toString() const { + rwlock lk( _lock , false ); + stringstream ss; - ss << "ChunkManager: " << _ns << " key:" << _key.toString() << "\n"; - for ( vector<Chunk*>::const_iterator i=_chunks.begin(); i!=_chunks.end(); i++ ){ - const Chunk* c = *i; - ss << "\t" << c->toString() << "\n"; + ss << "ChunkManager: " << _ns << " key:" << _key.toString() << '\n'; + for ( ChunkMap::const_iterator i=_chunkMap.begin(); i!=_chunkMap.end(); ++i ){ + const ChunkPtr c = i->second; + ss << "\t" << c->toString() << '\n'; } return ss.str(); } + + void ChunkManager::_migrationNotification(Chunk* c){ + _chunkRanges.reloadRange(_chunkMap, c->getMin(), c->getMax()); + _shards.insert(c->getShard()); + } + + void ChunkRangeManager::assertValid() const{ + if (_ranges.empty()) + return; + + try { + // No Nulls + for (ChunkRangeMap::const_iterator it=_ranges.begin(), end=_ranges.end(); it != end; ++it){ + assert(it->second); + } + + // Check endpoints + assert(allOfType(MinKey, _ranges.begin()->second->getMin())); + assert(allOfType(MaxKey, prior(_ranges.end())->second->getMax())); + + // Make sure there are no gaps or overlaps + for (ChunkRangeMap::const_iterator it=boost::next(_ranges.begin()), end=_ranges.end(); it != end; ++it){ + ChunkRangeMap::const_iterator last = prior(it); + assert(it->second->getMin() == last->second->getMax()); + } + + // Check Map keys + for (ChunkRangeMap::const_iterator it=_ranges.begin(), end=_ranges.end(); it != end; ++it){ + assert(it->first == it->second->getMax()); + } + + // Make sure we match the original chunks + const ChunkMap chunks = _ranges.begin()->second->getManager()->_chunkMap; + for ( ChunkMap::const_iterator i=chunks.begin(); i!=chunks.end(); ++i ){ + const ChunkPtr chunk = i->second; + + ChunkRangeMap::const_iterator min = _ranges.upper_bound(chunk->getMin()); + ChunkRangeMap::const_iterator max = _ranges.lower_bound(chunk->getMax()); + + assert(min != _ranges.end()); + assert(max != _ranges.end()); + assert(min == max); + assert(min->second->getShard() == chunk->getShard()); + assert(min->second->contains( chunk->getMin() )); + assert(min->second->contains( chunk->getMax() ) || (min->second->getMax() == chunk->getMax())); + } + + } catch (...) { + log( LL_ERROR ) << "\t invalid ChunkRangeMap! printing ranges:" << endl; + + for (ChunkRangeMap::const_iterator it=_ranges.begin(), end=_ranges.end(); it != end; ++it) + cout << it->first << ": " << *it->second << endl; + + throw; + } + } + + void ChunkRangeManager::reloadRange(const ChunkMap& chunks, const BSONObj& min, const BSONObj& max){ + if (_ranges.empty()){ + reloadAll(chunks); + return; + } + + ChunkRangeMap::iterator low = _ranges.upper_bound(min); + ChunkRangeMap::iterator high = _ranges.lower_bound(max); + + assert(low != _ranges.end()); + assert(high != _ranges.end()); + assert(low->second); + assert(high->second); + + ChunkMap::const_iterator begin = chunks.upper_bound(low->second->getMin()); + ChunkMap::const_iterator end = chunks.lower_bound(high->second->getMax()); + + assert(begin != chunks.end()); + assert(end != chunks.end()); + + // C++ end iterators are one-past-last + ++high; + ++end; + + // update ranges + _ranges.erase(low, high); // invalidates low + _insertRange(begin, end); + + assert(!_ranges.empty()); + DEV assertValid(); + + // merge low-end if possible + low = _ranges.upper_bound(min); + assert(low != _ranges.end()); + if (low != _ranges.begin()){ + shared_ptr<ChunkRange> a = prior(low)->second; + shared_ptr<ChunkRange> b = low->second; + if (a->getShard() == b->getShard()){ + shared_ptr<ChunkRange> cr (new ChunkRange(*a, *b)); + _ranges.erase(prior(low)); + _ranges.erase(low); // invalidates low + _ranges[cr->getMax()] = cr; + } + } + + DEV assertValid(); + + // merge high-end if possible + high = _ranges.lower_bound(max); + if (high != prior(_ranges.end())){ + shared_ptr<ChunkRange> a = high->second; + shared_ptr<ChunkRange> b = boost::next(high)->second; + if (a->getShard() == b->getShard()){ + shared_ptr<ChunkRange> cr (new ChunkRange(*a, *b)); + _ranges.erase(boost::next(high)); + _ranges.erase(high); //invalidates high + _ranges[cr->getMax()] = cr; + } + } + + DEV assertValid(); + } + + void ChunkRangeManager::reloadAll(const ChunkMap& chunks){ + _ranges.clear(); + _insertRange(chunks.begin(), chunks.end()); + + DEV assertValid(); + } + + void ChunkRangeManager::_insertRange(ChunkMap::const_iterator begin, const ChunkMap::const_iterator end){ + while (begin != end){ + ChunkMap::const_iterator first = begin; + Shard shard = first->second->getShard(); + while (begin != end && (begin->second->getShard() == shard)) + ++begin; + + shared_ptr<ChunkRange> cr (new ChunkRange(first, begin)); + _ranges[cr->getMax()] = cr; + } + } class ChunkObjUnitTest : public UnitTest { public: void runShard(){ - + ChunkPtr c; + assert( ! c ); + c.reset( new Chunk( 0 ) ); + assert( c ); } + void runShardChunkVersion(){ + vector<ShardChunkVersion> all; + all.push_back( ShardChunkVersion(1,1) ); + all.push_back( ShardChunkVersion(1,2) ); + all.push_back( ShardChunkVersion(2,1) ); + all.push_back( ShardChunkVersion(2,2) ); + + for ( unsigned i=0; i<all.size(); i++ ){ + for ( unsigned j=i+1; j<all.size(); j++ ){ + assert( all[i] < all[j] ); + } + } + + } + void run(){ runShard(); + runShardChunkVersion(); log(1) << "shardObjTest passed" << endl; } } shardObjTest; + // ----- to be removed --- + extern OID serverID; + bool setShardVersion( DBClientBase & conn , const string& ns , ShardChunkVersion version , bool authoritative , BSONObj& result ){ + BSONObjBuilder cmdBuilder; + cmdBuilder.append( "setShardVersion" , ns.c_str() ); + cmdBuilder.append( "configdb" , configServer.modelServer() ); + cmdBuilder.appendTimestamp( "version" , version.toLong() ); + cmdBuilder.appendOID( "serverID" , &serverID ); + if ( authoritative ) + cmdBuilder.appendBool( "authoritative" , 1 ); + + Shard s = Shard::make( conn.getServerAddress() ); + cmdBuilder.append( "shard" , s.getName() ); + cmdBuilder.append( "shardHost" , s.getConnString() ); + BSONObj cmd = cmdBuilder.obj(); + + log(1) << " setShardVersion " << s.getName() << " " << conn.getServerAddress() << " " << ns << " " << cmd << " " << &conn << endl; + + return conn.runCommand( "admin" , cmd , result ); + } + + } // namespace mongo |