summaryrefslogtreecommitdiff
path: root/s/chunk.cpp
diff options
context:
space:
mode:
Diffstat (limited to 's/chunk.cpp')
-rw-r--r--s/chunk.cpp1203
1 files changed, 898 insertions, 305 deletions
diff --git a/s/chunk.cpp b/s/chunk.cpp
index 73d17d9..5df3b69 100644
--- a/s/chunk.cpp
+++ b/s/chunk.cpp
@@ -16,28 +16,52 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#include "stdafx.h"
+#include "pch.h"
#include "chunk.h"
#include "config.h"
+#include "grid.h"
#include "../util/unittest.h"
#include "../client/connpool.h"
+#include "../client/distlock.h"
+#include "../db/queryutil.h"
#include "cursors.h"
#include "strategy.h"
namespace mongo {
+ inline bool allOfType(BSONType type, const BSONObj& o){
+ BSONObjIterator it(o);
+ while(it.more()){
+ if (it.next().type() != type)
+ return false;
+ }
+ return true;
+ }
+
+ RWLock chunkSplitLock("rw:chunkSplitLock");
+
// ------- Shard --------
- int Chunk::MaxChunkSize = 1024 * 1204 * 200;
+ int Chunk::MaxChunkSize = 1024 * 1024 * 200;
- Chunk::Chunk( ChunkManager * manager ) : _manager( manager ){
- _modified = false;
- _lastmod = 0;
- _dataWritten = 0;
+ Chunk::Chunk( ChunkManager * manager )
+ : _manager(manager),
+ _lastmod(0), _modified(false), _dataWritten(0)
+ {}
+
+ Chunk::Chunk(ChunkManager * info , const BSONObj& min, const BSONObj& max, const Shard& shard)
+ : _manager(info), _min(min), _max(max), _shard(shard),
+ _lastmod(0), _modified(false), _dataWritten(0)
+ {}
+
+ string Chunk::getns() const {
+ assert( _manager );
+ return _manager->getns();
}
- void Chunk::setShard( string s ){
+ void Chunk::setShard( const Shard& s ){
_shard = s;
+ _manager->_migrationNotification(this);
_markModified();
}
@@ -47,18 +71,33 @@ namespace mongo {
_manager->getShardKey().compare( obj , getMax() ) < 0;
}
+ bool ChunkRange::contains(const BSONObj& obj) const {
+ // same as Chunk method
+ return
+ _manager->getShardKey().compare( getMin() , obj ) <= 0 &&
+ _manager->getShardKey().compare( obj , getMax() ) < 0;
+ }
+
+ bool Chunk::minIsInf() const {
+ return _manager->getShardKey().globalMin().woCompare( getMin() ) == 0;
+ }
+
+ bool Chunk::maxIsInf() const {
+ return _manager->getShardKey().globalMax().woCompare( getMax() ) == 0;
+ }
+
BSONObj Chunk::pickSplitPoint() const{
int sort = 0;
- if ( _manager->getShardKey().globalMin().woCompare( getMin() ) == 0 ){
+ if ( minIsInf() ){
sort = 1;
}
- else if ( _manager->getShardKey().globalMax().woCompare( getMax() ) == 0 ){
+ else if ( maxIsInf() ){
sort = -1;
}
if ( sort ){
- ScopedDbConnection conn( getShard() );
+ ShardConnection conn( getShard().getConnString() , _manager->getns() );
Query q;
if ( sort == 1 )
q.sort( _manager->getShardKey().key() );
@@ -75,212 +114,313 @@ namespace mongo {
q.sort( r.obj() );
}
- BSONObj end = conn->findOne( _ns , q );
+ BSONObj end = conn->findOne( _manager->getns() , q );
conn.done();
if ( ! end.isEmpty() )
return _manager->getShardKey().extractKey( end );
}
- ScopedDbConnection conn( getShard() );
+ BSONObj cmd = BSON( "medianKey" << _manager->getns()
+ << "keyPattern" << _manager->getShardKey().key()
+ << "min" << getMin()
+ << "max" << getMax() );
+
+ ScopedDbConnection conn( getShard().getConnString() );
BSONObj result;
- if ( ! conn->runCommand( "admin" , BSON( "medianKey" << _ns
- << "keyPattern" << _manager->getShardKey().key()
- << "min" << getMin()
- << "max" << getMax()
- ) , result ) ){
+ if ( ! conn->runCommand( "admin" , cmd , result ) ){
stringstream ss;
ss << "medianKey command failed: " << result;
uassert( 10164 , ss.str() , 0 );
}
- BSONObj median = result.getObjectField( "median" );
- if (median == getMin()){
- //TODO compound support
- BSONElement key = getMin().firstElement();
- BSONObjBuilder b;
- b.appendAs("$gt", key);
+ BSONObj median = result.getObjectField( "median" ).getOwned();
+ conn.done();
- Query q = QUERY(key.fieldName() << b.obj());
+
+ if (median == getMin()){
+ Query q;
+ q.minKey(_min).maxKey(_max);
q.sort(_manager->getShardKey().key());
- median = conn->findOne(_ns, q);
+ median = conn->findOne(_manager->getns(), q);
median = _manager->getShardKey().extractKey( median );
- PRINT(median);
}
+
+ if ( median < getMin() || median >= getMax() ){
+ stringstream ss;
+ ss << "medianKey returned value out of range. "
+ << " cmd: " << cmd
+ << " result: " << result;
+ uasserted( 13394 , ss.str() );
+ }
+
+ return median;
+ }
+ void Chunk::pickSplitVector( vector<BSONObj>* splitPoints ) const {
+ // Ask the mongod holding this chunk to figure out the split points.
+ ScopedDbConnection conn( getShard().getConnString() );
+ BSONObj result;
+ BSONObjBuilder cmd;
+ cmd.append( "splitVector" , _manager->getns() );
+ cmd.append( "keyPattern" , _manager->getShardKey().key() );
+ cmd.append( "maxChunkSize" , Chunk::MaxChunkSize / (1<<20) );
+ BSONObj cmdObj = cmd.obj();
+
+ if ( ! conn->runCommand( "admin" , cmdObj , result )){
+ ostringstream os;
+ os << "splitVector command failed: " << result;
+ uassert( 13345 , os.str() , 0 );
+ }
+
+ BSONObjIterator it( result.getObjectField( "splitKeys" ) );
+ while ( it.more() ){
+ splitPoints->push_back( it.next().Obj().getOwned() );
+ }
conn.done();
-
- return median.getOwned();
}
- Chunk * Chunk::split(){
- return split( pickSplitPoint() );
+ ChunkPtr Chunk::split(){
+ vector<BSONObj> splitPoints;
+ splitPoints.push_back( pickSplitPoint() );
+ return multiSplit( splitPoints );
}
- Chunk * Chunk::split( const BSONObj& m ){
- uassert( 10165 , "can't split as shard that doesn't have a manager" , _manager );
-
- log(1) << " before split on: " << m << "\n"
- << "\t self : " << toString() << endl;
+ ChunkPtr Chunk::multiSplit( const vector<BSONObj>& m ){
+ const size_t maxSplitPoints = 256;
- uassert( 10166 , "locking namespace on server failed" , lockNamespaceOnServer( getShard() , _ns ) );
- uassert( 13003 , "can't split chunk. does it have only one distinct value?" ,
- !m.isEmpty() && _min.woCompare(m) && _max.woCompare(m));
+ uassert( 10165 , "can't split as shard doesn't have a manager" , _manager );
+ uassert( 13332 , "need a split key to split chunk" , !m.empty() );
+ uassert( 13333 , "can't split a chunk in that many parts", m.size() < maxSplitPoints );
+ uassert( 13003 , "can't split a chunk with only one distinct value" , _min.woCompare(_max) );
- Chunk * s = new Chunk( _manager );
- s->_ns = _ns;
- s->_shard = _shard;
- s->setMin(m.getOwned());
- s->setMax(_max);
+ DistributedLock lockSetup( ConnectionString( modelServer() , ConnectionString::SYNC ) , getns() );
+ dist_lock_try dlk( &lockSetup , string("split-") + toString() );
+ uassert( 10166 , "locking namespace failed" , dlk.got() );
- s->_markModified();
+ {
+ ShardChunkVersion onServer = getVersionOnConfigServer();
+ ShardChunkVersion mine = _lastmod;
+ if ( onServer > mine ){
+ stringstream ss;
+ ss << "mulitSplit failing because config not up to date"
+ << " onServer: " << onServer.toString()
+ << " mine: " << mine.toString();
+
+ //reload config
+ grid.getDBConfig(_manager->_ns)->getChunkManager(_manager->_ns, true);
+
+ uasserted( 13387 , ss.str() );
+ }
+ }
+
+ BSONObjBuilder detail;
+ appendShortVersion( "before" , detail );
+ log(1) << "before split on " << m.size() << " points " << toString() << endl;
+
+ // Iterate over the split points in 'm', splitting off a new chunk per entry. That chunk's range
+ // covers until the next entry in 'm' or _max .
+ vector<ChunkPtr> newChunks;
+ vector<BSONObj>::const_iterator i = m.begin();
+ BSONObj nextPoint = i->getOwned();
_markModified();
+ do {
+ BSONObj splitPoint = nextPoint;
+ log(4) << "splitPoint: " << splitPoint << endl;
+ nextPoint = (++i != m.end()) ? i->getOwned() : _max.getOwned();
+ log(4) << "nextPoint: " << nextPoint << endl;
+
+ if ( nextPoint <= splitPoint) {
+ stringstream ss;
+ ss << "multiSplit failing because keys min: " << splitPoint << " and max: " << nextPoint
+ << " do not define a valid chunk";
+ uasserted( 13395, ss.str() );
+ }
+
+ ChunkPtr s( new Chunk( _manager, splitPoint , nextPoint , _shard) );
+ s->_markModified();
+ newChunks.push_back(s);
+ } while ( i != m.end() );
+
+ // Have the chunk manager reflect the key change for the first chunk and create an entry for every
+ // new chunk spawned by it.
+ {
+ rwlock lk( _manager->_lock , true );
+
+ setMax(m[0].getOwned());
+ DEV assert( shared_from_this() );
+ _manager->_chunkMap[_max] = shared_from_this();
+
+ for ( vector<ChunkPtr>::const_iterator it = newChunks.begin(); it != newChunks.end(); ++it ){
+ ChunkPtr s = *it;
+ _manager->_chunkMap[s->getMax()] = s;
+ }
+ }
- _manager->_chunks.push_back( s );
-
- setMax(m.getOwned());
-
- log(1) << " after split:\n"
- << "\t left : " << toString() << "\n"
- << "\t right: "<< s->toString() << endl;
-
-
+ log(1) << "after split adjusted range: " << toString() << endl;
+ for ( vector<ChunkPtr>::const_iterator it = newChunks.begin(); it != newChunks.end(); ++it ){
+ ChunkPtr s = *it;
+ log(1) << "after split created new chunk: " << s->toString() << endl;
+ }
+
+ // Save the new key boundaries in the configDB.
_manager->save();
-
- return s;
- }
- bool Chunk::moveAndCommit( const string& to , string& errmsg ){
- uassert( 10167 , "can't move shard to its current location!" , to != getShard() );
+ // Log all these changes in the configDB's log. We log a simple split differently than a multi-split.
+ if ( newChunks.size() == 1) {
+ appendShortVersion( "left" , detail );
+ newChunks[0]->appendShortVersion( "right" , detail );
+ configServer.logChange( "split" , _manager->getns(), detail.obj() );
+
+ } else {
+ BSONObj beforeDetailObj = detail.obj();
+ BSONObj firstDetailObj = beforeDetailObj.getOwned();
+ const int newChunksSize = newChunks.size();
+
+ BSONObjBuilder firstDetail;
+ firstDetail.appendElements( beforeDetailObj );
+ firstDetail.append( "number" , 0 );
+ firstDetail.append( "of" , newChunksSize );
+ appendShortVersion( "chunk" , firstDetail );
+ configServer.logChange( "multi-split" , _manager->getns() , firstDetail.obj() );
+
+ for ( int i=0; i < newChunksSize; i++ ){
+ BSONObjBuilder chunkDetail;
+ chunkDetail.appendElements( beforeDetailObj );
+ chunkDetail.append( "number", i+1 );
+ chunkDetail.append( "of" , newChunksSize );
+ newChunks[i]->appendShortVersion( "chunk" , chunkDetail );
+ configServer.logChange( "multi-split" , _manager->getns() , chunkDetail.obj() );
+ }
+ }
+
+ return newChunks[0];
+ }
- log() << "moving chunk ns: " << _ns << " moving chunk: " << toString() << " " << _shard << " -> " << to << endl;
+ bool Chunk::moveAndCommit( const Shard& to , string& errmsg ){
+ uassert( 10167 , "can't move shard to its current location!" , getShard() != to );
- string from = _shard;
- ShardChunkVersion oldVersion = _manager->getVersion( from );
+ log() << "moving chunk ns: " << _manager->getns() << " moving ( " << toString() << ") " << _shard.toString() << " -> " << to.toString() << endl;
- BSONObj filter;
- {
- BSONObjBuilder b;
- getFilter( b );
- filter = b.obj();
- }
+ Shard from = _shard;
- ScopedDbConnection fromconn( from );
+ ScopedDbConnection fromconn( from);
- BSONObj startRes;
+ BSONObj res;
bool worked = fromconn->runCommand( "admin" ,
- BSON( "movechunk.start" << _ns <<
- "from" << from <<
- "to" << to <<
- "filter" << filter
+ BSON( "moveChunk" << _manager->getns() <<
+ "from" << from.getConnString() <<
+ "to" << to.getConnString() <<
+ "min" << _min <<
+ "max" << _max <<
+ "shardId" << genID() <<
+ "configdb" << configServer.modelServer()
) ,
- startRes
+ res
);
- if ( ! worked ){
- errmsg = (string)"movechunk.start failed: " + startRes.toString();
- fromconn.done();
- return false;
- }
-
- // update config db
- setShard( to );
-
- // need to increment version # for old server
- Chunk * randomChunkOnOldServer = _manager->findChunkOnServer( from );
- if ( randomChunkOnOldServer )
- randomChunkOnOldServer->_markModified();
-
- _manager->save();
-
- BSONObj finishRes;
- {
+ fromconn.done();
- ShardChunkVersion newVersion = _manager->getVersion( from );
- if ( newVersion == 0 && oldVersion > 0 ){
- newVersion = oldVersion;
- newVersion++;
- _manager->save();
- }
- else if ( newVersion <= oldVersion ){
- log() << "newVersion: " << newVersion << " oldVersion: " << oldVersion << endl;
- uassert( 10168 , "version has to be higher" , newVersion > oldVersion );
- }
-
- BSONObjBuilder b;
- b << "movechunk.finish" << _ns;
- b << "to" << to;
- b.appendTimestamp( "newVersion" , newVersion );
- b.append( startRes["finishToken"] );
-
- worked = fromconn->runCommand( "admin" ,
- b.done() ,
- finishRes );
- }
-
- if ( ! worked ){
- errmsg = (string)"movechunk.finish failed: " + finishRes.toString();
- fromconn.done();
- return false;
+ if ( worked ){
+ _manager->_reload();
+ return true;
}
- fromconn.done();
- return true;
+ errmsg = res["errmsg"].String();
+ errmsg += " " + res.toString();
+ return false;
}
bool Chunk::splitIfShould( long dataWritten ){
+ LastError::Disabled d( lastError.get() );
+ try {
+ return _splitIfShould( dataWritten );
+ }
+ catch ( std::exception& e ){
+ log( LL_ERROR ) << "splitIfShould failed: " << e.what() << endl;
+ return false;
+ }
+ }
+
+ bool Chunk::_splitIfShould( long dataWritten ){
_dataWritten += dataWritten;
- if ( _dataWritten < MaxChunkSize / 5 )
+ // split faster in early chunks helps spread out an initial load better
+ int splitThreshold;
+ const int minChunkSize = 1 << 20; // 1 MBytes
+ int numChunks = getManager()->numChunks();
+ if ( numChunks < 10 ){
+ splitThreshold = max( MaxChunkSize / 4 , minChunkSize );
+ } else if ( numChunks < 20 ){
+ splitThreshold = max( MaxChunkSize / 2 , minChunkSize );
+ } else {
+ splitThreshold = max( MaxChunkSize , minChunkSize );
+ }
+
+ if ( minIsInf() || maxIsInf() ){
+ splitThreshold = (int) ((double)splitThreshold * .9);
+ }
+
+ if ( _dataWritten < splitThreshold / 5 )
+ return false;
+
+ if ( ! chunkSplitLock.lock_try(0) )
return false;
- log(1) << "\t want to split chunk : " << this << endl;
+ rwlock lk( chunkSplitLock , 1 , true );
+
+ log(3) << "\t splitIfShould : " << *this << endl;
_dataWritten = 0;
- BSONObj split_point = pickSplitPoint();
- if ( split_point.isEmpty() || _min == split_point || _max == split_point) {
+ BSONObj splitPoint = pickSplitPoint();
+ if ( splitPoint.isEmpty() || _min == splitPoint || _max == splitPoint) {
log() << "SHARD PROBLEM** shard is too big, but can't split: " << toString() << endl;
return false;
}
long size = getPhysicalSize();
- if ( size < MaxChunkSize )
+ if ( size < splitThreshold )
return false;
- log() << "autosplitting " << _ns << " size: " << size << " shard: " << toString() << endl;
- Chunk * newShard = split(split_point);
+ log() << "autosplitting " << _manager->getns() << " size: " << size << " shard: " << toString()
+ << " on: " << splitPoint << "(splitThreshold " << splitThreshold << ")" << endl;
+
+ vector<BSONObj> splitPoints;
+ splitPoints.push_back( splitPoint );
+ ChunkPtr newShard = multiSplit( splitPoints );
moveIfShould( newShard );
return true;
}
- bool Chunk::moveIfShould( Chunk * newChunk ){
- Chunk * toMove = 0;
+ bool Chunk::moveIfShould( ChunkPtr newChunk ){
+ ChunkPtr toMove;
- if ( newChunk->countObjects() <= 1 ){
+ if ( newChunk->countObjects(2) <= 1 ){
toMove = newChunk;
}
- else if ( this->countObjects() <= 1 ){
- toMove = this;
+ else if ( this->countObjects(2) <= 1 ){
+ DEV assert( shared_from_this() );
+ toMove = shared_from_this();
}
else {
- log(1) << "don't know how to decide if i should move inner shard" << endl;
+ // moving middle shards is handled by balancer
+ return false;
}
- if ( ! toMove )
- return false;
+ assert( toMove );
- string newLocation = grid.pickShardForNewDB();
- if ( newLocation == getShard() ){
+ Shard newLocation = Shard::pick();
+ if ( getShard() == newLocation ){
// if this is the best server, then we shouldn't do anything!
- log(1) << "not moving chunk: " << toString() << " b/c would move to same place " << newLocation << " -> " << getShard() << endl;
+ log(1) << "not moving chunk: " << toString() << " b/c would move to same place " << newLocation.toString() << " -> " << getShard().toString() << endl;
return 0;
}
- log() << "moving chunk (auto): " << toMove->toString() << " to: " << newLocation << " #objcets: " << toMove->countObjects() << endl;
+ log() << "moving chunk (auto): " << toMove->toString() << " to: " << newLocation.toString() << " #objects: " << toMove->countObjects() << endl;
string errmsg;
massert( 10412 , (string)"moveAndCommit failed: " + errmsg ,
@@ -290,32 +430,43 @@ namespace mongo {
}
long Chunk::getPhysicalSize() const{
- ScopedDbConnection conn( getShard() );
+ ScopedDbConnection conn( getShard().getConnString() );
BSONObj result;
- uassert( 10169 , "datasize failed!" , conn->runCommand( "admin" , BSON( "datasize" << _ns
- << "keyPattern" << _manager->getShardKey().key()
- << "min" << getMin()
- << "max" << getMax()
- ) , result ) );
+ uassert( 10169 , "datasize failed!" , conn->runCommand( "admin" ,
+ BSON( "datasize" << _manager->getns()
+ << "keyPattern" << _manager->getShardKey().key()
+ << "min" << getMin()
+ << "max" << getMax()
+ << "maxSize" << ( MaxChunkSize + 1 )
+ ) , result ) );
conn.done();
return (long)result["size"].number();
}
-
- long Chunk::countObjects( const BSONObj& filter ) const{
- ScopedDbConnection conn( getShard() );
-
- BSONObj f = getFilter();
- if ( ! filter.isEmpty() )
- f = ClusteredCursor::concatQuery( f , filter );
+ int Chunk::countObjects(int maxCount) const {
+ static const BSONObj fields = BSON("_id" << 1 );
- BSONObj result;
- unsigned long long n = conn->count( _ns , f );
+ ShardConnection conn( getShard() , _manager->getns() );
+ // not using regular count as this is more flexible and supports $min/$max
+ Query q = Query().minKey(_min).maxKey(_max);
+ int n;
+ {
+ auto_ptr<DBClientCursor> c = conn->query(_manager->getns(), q, maxCount, 0, &fields);
+ assert( c.get() );
+ n = c->itcount();
+ }
conn.done();
- return (long)n;
+ return n;
+ }
+
+ void Chunk::appendShortVersion( const char * name , BSONObjBuilder& b ){
+ BSONObjBuilder bb( b.subobjStart( name ) );
+ bb.append( "min" , _min );
+ bb.append( "max" , _max );
+ bb.done();
}
bool Chunk::operator==( const Chunk& s ) const{
@@ -325,81 +476,86 @@ namespace mongo {
;
}
- void Chunk::getFilter( BSONObjBuilder& b ) const{
- _manager->_key.getFilter( b , _min , _max );
- }
-
- void Chunk::serialize(BSONObjBuilder& to){
- if ( _lastmod )
+ void Chunk::serialize(BSONObjBuilder& to,ShardChunkVersion myLastMod){
+
+ to.append( "_id" , genID( _manager->getns() , _min ) );
+
+ if ( myLastMod.isSet() ){
+ to.appendTimestamp( "lastmod" , myLastMod );
+ }
+ else if ( _lastmod.isSet() ){
+ assert( _lastmod > 0 && _lastmod < 1000 );
to.appendTimestamp( "lastmod" , _lastmod );
- else
- to.appendTimestamp( "lastmod" );
+ }
+ else {
+ assert(0);
+ }
- to << "ns" << _ns;
+ to << "ns" << _manager->getns();
to << "min" << _min;
to << "max" << _max;
- to << "shard" << _shard;
+ to << "shard" << _shard.getName();
+ }
+
+ string Chunk::genID( const string& ns , const BSONObj& o ) {
+ StringBuilder buf( ns.size() + o.objsize() + 16 );
+ buf << ns << "-";
+
+ BSONObjIterator i(o);
+ while ( i.more() ){
+ BSONElement e = i.next();
+ buf << e.fieldName() << "_" << e.toString(false, true);
+ }
+
+ return buf.str();
}
void Chunk::unserialize(const BSONObj& from){
- _ns = from.getStringField( "ns" );
- _shard = from.getStringField( "shard" );
- _lastmod = from.hasField( "lastmod" ) ? from["lastmod"]._numberLong() : 0;
+ string ns = from.getStringField( "ns" );
+ _shard.reset( from.getStringField( "shard" ) );
+
+ _lastmod = from["lastmod"];
+ assert( _lastmod > 0 );
BSONElement e = from["minDotted"];
- cout << from << endl;
+
if (e.eoo()){
_min = from.getObjectField( "min" ).getOwned();
_max = from.getObjectField( "max" ).getOwned();
- } else { // TODO delete this case after giving people a chance to migrate
+ }
+ else { // TODO delete this case after giving people a chance to migrate
_min = e.embeddedObject().getOwned();
_max = from.getObjectField( "maxDotted" ).getOwned();
}
- uassert( 10170 , "Chunk needs a ns" , ! _ns.empty() );
- uassert( 10171 , "Chunk needs a server" , ! _ns.empty() );
+ uassert( 10170 , "Chunk needs a ns" , ! ns.empty() );
+ uassert( 13327 , "Chunk ns must match server ns" , ns == _manager->getns() );
+
+ uassert( 10171 , "Chunk needs a server" , _shard.ok() );
uassert( 10172 , "Chunk needs a min" , ! _min.isEmpty() );
uassert( 10173 , "Chunk needs a max" , ! _max.isEmpty() );
}
- string Chunk::modelServer() {
+ string Chunk::modelServer() const {
// TODO: this could move around?
return configServer.modelServer();
}
- void Chunk::_markModified(){
- _modified = true;
- // set to 0 so that the config server sets it
- _lastmod = 0;
+ ShardChunkVersion Chunk::getVersionOnConfigServer() const {
+ ScopedDbConnection conn( modelServer() );
+ BSONObj o = conn->findOne( ShardNS::chunk , BSON( "_id" << genID() ) );
+ conn.done();
+ return o["lastmod"];
}
- void Chunk::save( bool check ){
- bool reload = ! _lastmod;
- Model::save( check );
- if ( reload ){
- // need to do this so that we get the new _lastMod and therefore version number
- massert( 10413 , "_id has to be filled in already" , ! _id.isEmpty() );
-
- string b = toString();
- BSONObj q = _id.copy();
- massert( 10414 , "how could load fail?" , load( q ) );
- log(2) << "before: " << q << "\t" << b << endl;
- log(2) << "after : " << _id << "\t" << toString() << endl;
- massert( 10415 , "chunk reload changed content!" , b == toString() );
- massert( 10416 , "id changed!" , q["_id"] == _id["_id"] );
- }
- }
-
- void Chunk::ensureIndex(){
- ScopedDbConnection conn( getShard() );
- conn->ensureIndex( _ns , _manager->getShardKey().key() , _manager->_unique );
- conn.done();
+ void Chunk::_markModified(){
+ _modified = true;
}
string Chunk::toString() const {
stringstream ss;
- ss << "shard ns:" << _ns << " shard: " << _shard << " min: " << _min << " max: " << _max;
+ ss << "ns:" << _manager->getns() << " at: " << _shard.toString() << " lastmod: " << _lastmod.toString() << " min: " << _min << " max: " << _max;
return ss.str();
}
@@ -410,139 +566,291 @@ namespace mongo {
// ------- ChunkManager --------
- unsigned long long ChunkManager::NextSequenceNumber = 1;
+ AtomicUInt ChunkManager::NextSequenceNumber = 1;
ChunkManager::ChunkManager( DBConfig * config , string ns , ShardKeyPattern pattern , bool unique ) :
- _config( config ) , _ns( ns ) , _key( pattern ) , _unique( unique ){
- Chunk temp(0);
+ _config( config ) , _ns( ns ) ,
+ _key( pattern ) , _unique( unique ) ,
+ _sequenceNumber( ++NextSequenceNumber ), _lock("rw:ChunkManager")
+ {
+ _reload_inlock();
+
+ if ( _chunkMap.empty() ){
+ ChunkPtr c( new Chunk(this, _key.globalMin(), _key.globalMax(), config->getPrimary()) );
+ c->_markModified();
+
+ _chunkMap[c->getMax()] = c;
+ _chunkRanges.reloadAll(_chunkMap);
+
+ _shards.insert(c->getShard());
+
+ save_inlock();
+ log() << "no chunks for:" << ns << " so creating first: " << c->toString() << endl;
+ }
+ }
+
+ ChunkManager::~ChunkManager(){
+ _chunkMap.clear();
+ _chunkRanges.clear();
+ _shards.clear();
+ }
+
+ void ChunkManager::_reload(){
+ rwlock lk( _lock , true );
+ _reload_inlock();
+ }
+
+ void ChunkManager::_reload_inlock(){
+ int tries = 3;
+ while (tries--){
+ _chunkMap.clear();
+ _chunkRanges.clear();
+ _shards.clear();
+ _load();
+
+ if (_isValid()){
+ _chunkRanges.reloadAll(_chunkMap);
+ return;
+ }
+
+ if (_chunkMap.size() < 10){
+ _printChunks();
+ }
+ sleepmillis(10 * (3-tries));
+ sleepsecs(10);
+ }
+ msgasserted(13282, "Couldn't load a valid config for " + _ns + " after 3 tries. Giving up");
+
+ }
+
+ void ChunkManager::_load(){
+ static Chunk temp(0);
ScopedDbConnection conn( temp.modelServer() );
- auto_ptr<DBClientCursor> cursor = conn->query( temp.getNS() , BSON( "ns" << ns ) );
+
+ auto_ptr<DBClientCursor> cursor = conn->query(temp.getNS(), QUERY("ns" << _ns).sort("lastmod",1), 0, 0, 0, 0,
+ (DEBUG_BUILD ? 2 : 1000000)); // batch size. Try to induce potential race conditions in debug builds
+ assert( cursor.get() );
while ( cursor->more() ){
BSONObj d = cursor->next();
if ( d["isMaxMarker"].trueValue() ){
continue;
}
-
- Chunk * c = new Chunk( this );
+
+ ChunkPtr c( new Chunk( this ) );
c->unserialize( d );
- _chunks.push_back( c );
- c->_id = d["_id"].wrap().getOwned();
+
+ _chunkMap[c->getMax()] = c;
+ _shards.insert(c->getShard());
+
}
conn.done();
-
- if ( _chunks.size() == 0 ){
- Chunk * c = new Chunk( this );
- c->_ns = ns;
- c->setMin(_key.globalMin());
- c->setMax(_key.globalMax());
- c->_shard = config->getPrimary();
- c->_markModified();
-
- _chunks.push_back( c );
-
- log() << "no chunks for:" << ns << " so creating first: " << c->toString() << endl;
+ }
+
+ bool ChunkManager::_isValid() const {
+#define ENSURE(x) do { if(!(x)) { log() << "ChunkManager::_isValid failed: " #x << endl; return false; } } while(0)
+
+ if (_chunkMap.empty())
+ return true;
+
+ // Check endpoints
+ ENSURE(allOfType(MinKey, _chunkMap.begin()->second->getMin()));
+ ENSURE(allOfType(MaxKey, prior(_chunkMap.end())->second->getMax()));
+
+ // Make sure there are no gaps or overlaps
+ for (ChunkMap::const_iterator it=boost::next(_chunkMap.begin()), end=_chunkMap.end(); it != end; ++it){
+ ChunkMap::const_iterator last = prior(it);
+
+ if (!(it->second->getMin() == last->second->getMax())){
+ PRINT(it->second->toString());
+ PRINT(it->second->getMin());
+ PRINT(last->second->getMax());
+ }
+ ENSURE(it->second->getMin() == last->second->getMax());
}
- _sequenceNumber = ++NextSequenceNumber;
+ return true;
+
+#undef ENSURE
}
-
- ChunkManager::~ChunkManager(){
- for ( vector<Chunk*>::iterator i=_chunks.begin(); i != _chunks.end(); i++ ){
- delete( *i );
+
+ void ChunkManager::_printChunks() const {
+ for (ChunkMap::const_iterator it=_chunkMap.begin(), end=_chunkMap.end(); it != end; ++it) {
+ log() << *it->second << endl;
}
- _chunks.clear();
}
bool ChunkManager::hasShardKey( const BSONObj& obj ){
return _key.hasShardKey( obj );
}
- Chunk& ChunkManager::findChunk( const BSONObj & obj ){
+ ChunkPtr ChunkManager::findChunk( const BSONObj & obj , bool retry ){
+ BSONObj key = _key.extractKey(obj);
- for ( vector<Chunk*>::iterator i=_chunks.begin(); i != _chunks.end(); i++ ){
- Chunk * c = *i;
- if ( c->contains( obj ) )
- return *c;
+ {
+ rwlock lk( _lock , false );
+
+ BSONObj foo;
+ ChunkPtr c;
+ {
+ ChunkMap::iterator it = _chunkMap.upper_bound(key);
+ if (it != _chunkMap.end()){
+ foo = it->first;
+ c = it->second;
+ }
+ }
+
+ if ( c ){
+ if ( c->contains( obj ) )
+ return c;
+
+ PRINT(foo);
+ PRINT(*c);
+ PRINT(key);
+
+ _reload_inlock();
+ massert(13141, "Chunk map pointed to incorrect chunk", false);
+ }
}
- stringstream ss;
- ss << "couldn't find a chunk which should be impossible extracted: " << _key.extractKey( obj );
- throw UserException( 8070 , ss.str() );
- }
- Chunk* ChunkManager::findChunkOnServer( const string& server ) const {
+ if ( retry ){
+ stringstream ss;
+ ss << "couldn't find a chunk aftry retry which should be impossible extracted: " << key;
+ throw UserException( 8070 , ss.str() );
+ }
+
+ log() << "ChunkManager: couldn't find chunk for: " << key << " going to retry" << endl;
+ _reload_inlock();
+ return findChunk( obj , true );
+ }
- for ( vector<Chunk*>::const_iterator i=_chunks.begin(); i!=_chunks.end(); i++ ){
- Chunk * c = *i;
- if ( c->getShard() == server )
+ ChunkPtr ChunkManager::findChunkOnServer( const Shard& shard ) const {
+ rwlock lk( _lock , false );
+
+ for ( ChunkMap::const_iterator i=_chunkMap.begin(); i!=_chunkMap.end(); ++i ){
+ ChunkPtr c = i->second;
+ if ( c->getShard() == shard )
return c;
}
- return 0;
+ return ChunkPtr();
}
- int ChunkManager::getChunksForQuery( vector<Chunk*>& chunks , const BSONObj& query ){
- int added = 0;
-
- for ( vector<Chunk*>::iterator i=_chunks.begin(); i != _chunks.end(); i++ ){
- Chunk * c = *i;
- if ( _key.relevantForQuery( query , c ) ){
- chunks.push_back( c );
- added++;
+ void ChunkManager::getShardsForQuery( set<Shard>& shards , const BSONObj& query ){
+ rwlock lk( _lock , false );
+ DEV PRINT(query);
+
+ //TODO look into FieldRangeSetOr
+ FieldRangeOrSet fros(_ns.c_str(), query, false);
+ uassert(13088, "no support for special queries yet", fros.getSpecial().empty());
+
+ do {
+ boost::scoped_ptr<FieldRangeSet> frs (fros.topFrs());
+ {
+ // special case if most-significant field isn't in query
+ FieldRange range = frs->range(_key.key().firstElement().fieldName());
+ if ( !range.nontrivial() ){
+ DEV PRINT(range.nontrivial());
+ getAllShards(shards);
+ return;
+ }
+ }
+
+ BoundList ranges = frs->indexBounds(_key.key(), 1);
+ for (BoundList::const_iterator it=ranges.begin(), end=ranges.end(); it != end; ++it){
+ BSONObj minObj = it->first.replaceFieldNames(_key.key());
+ BSONObj maxObj = it->second.replaceFieldNames(_key.key());
+
+ DEV PRINT(minObj);
+ DEV PRINT(maxObj);
+
+ ChunkRangeMap::const_iterator min, max;
+ min = _chunkRanges.upper_bound(minObj);
+ max = _chunkRanges.upper_bound(maxObj);
+
+ assert(min != _chunkRanges.ranges().end());
+
+ // make max non-inclusive like end iterators
+ if(max != _chunkRanges.ranges().end())
+ ++max;
+
+ for (ChunkRangeMap::const_iterator it=min; it != max; ++it){
+ shards.insert(it->second->getShard());
+ }
+
+ // once we know we need to visit all shards no need to keep looping
+ //if (shards.size() == _shards.size())
+ //return;
}
+
+ if (fros.moreOrClauses())
+ fros.popOrClause();
+
+ } while (fros.moreOrClauses());
+ }
+
+ void ChunkManager::getShardsForRange(set<Shard>& shards, const BSONObj& min, const BSONObj& max){
+ uassert(13405, "min must have shard key", hasShardKey(min));
+ uassert(13406, "max must have shard key", hasShardKey(max));
+
+ ChunkRangeMap::const_iterator it = _chunkRanges.upper_bound(min);
+ ChunkRangeMap::const_iterator end = _chunkRanges.lower_bound(max);
+
+ for (; it!=end; ++ it){
+ shards.insert(it->second->getShard());
+
+ // once we know we need to visit all shards no need to keep looping
+ if (shards.size() == _shards.size())
+ break;
}
- return added;
}
- void ChunkManager::getAllServers( set<string>& allServers ){
- for ( vector<Chunk*>::iterator i=_chunks.begin(); i != _chunks.end(); i++ ){
- allServers.insert( (*i)->getShard() );
- }
+ void ChunkManager::getAllShards( set<Shard>& all ){
+ rwlock lk( _lock , false );
+ all.insert(_shards.begin(), _shards.end());
}
- void ChunkManager::ensureIndex(){
- set<string> seen;
-
- for ( vector<Chunk*>::const_iterator i=_chunks.begin(); i!=_chunks.end(); i++ ){
- Chunk * c = *i;
- if ( seen.count( c->getShard() ) )
- continue;
- seen.insert( c->getShard() );
- c->ensureIndex();
+ void ChunkManager::ensureIndex_inlock(){
+ //TODO in parallel?
+ for ( set<Shard>::const_iterator i=_shards.begin(); i!=_shards.end(); ++i ){
+ ScopedDbConnection conn( i->getConnString() );
+ conn->ensureIndex( getns() , getShardKey().key() , _unique );
+ conn.done();
}
}
- void ChunkManager::drop(){
+ void ChunkManager::drop( ChunkManagerPtr me ){
+ rwlock lk( _lock , true );
+
+ configServer.logChange( "dropCollection.start" , _ns , BSONObj() );
+
+ DistributedLock lockSetup( ConnectionString( configServer.modelServer() , ConnectionString::SYNC ) , getns() );
+ dist_lock_try dlk( &lockSetup , "drop" );
+ uassert( 13331 , "locking namespace failed" , dlk.got() );
+
uassert( 10174 , "config servers not all up" , configServer.allUp() );
- map<string,ShardChunkVersion> seen;
+ set<Shard> seen;
log(1) << "ChunkManager::drop : " << _ns << endl;
// lock all shards so no one can do a split/migrate
- for ( vector<Chunk*>::const_iterator i=_chunks.begin(); i!=_chunks.end(); i++ ){
- Chunk * c = *i;
- ShardChunkVersion& version = seen[ c->getShard() ];
- if ( version )
- continue;
- version = lockNamespaceOnServer( c->getShard() , _ns );
- if ( version )
- continue;
-
- // rollback
- uassert( 10175 , "don't know how to rollback locks b/c drop can't lock all shards" , 0 );
+ for ( ChunkMap::const_iterator i=_chunkMap.begin(); i!=_chunkMap.end(); ++i ){
+ ChunkPtr c = i->second;
+ seen.insert( c->getShard() );
}
log(1) << "ChunkManager::drop : " << _ns << "\t all locked" << endl;
// wipe my meta-data
- _chunks.clear();
+ _chunkMap.clear();
+ _chunkRanges.clear();
+ _shards.clear();
// delete data from mongod
- for ( map<string,ShardChunkVersion>::iterator i=seen.begin(); i!=seen.end(); i++ ){
- string shard = i->first;
- ScopedDbConnection conn( shard );
+ for ( set<Shard>::iterator i=seen.begin(); i!=seen.end(); i++ ){
+ ScopedDbConnection conn( *i );
conn->dropCollection( _ns );
conn.done();
}
@@ -551,18 +859,16 @@ namespace mongo {
// clean up database meta-data
uassert( 10176 , "no sharding data?" , _config->removeSharding( _ns ) );
- _config->save();
-
// remove chunk data
- Chunk temp(0);
+ static Chunk temp(0);
ScopedDbConnection conn( temp.modelServer() );
conn->remove( temp.getNS() , BSON( "ns" << _ns ) );
conn.done();
log(1) << "ChunkManager::drop : " << _ns << "\t removed chunk data" << endl;
- for ( map<string,ShardChunkVersion>::iterator i=seen.begin(); i!=seen.end(); i++ ){
- ScopedDbConnection conn( i->first );
+ for ( set<Shard>::iterator i=seen.begin(); i!=seen.end(); i++ ){
+ ScopedDbConnection conn( *i );
BSONObj res;
if ( ! setShardVersion( conn.conn() , _ns , 0 , true , res ) )
throw UserException( 8071 , (string)"OH KNOW, cleaning up after drop failed: " + res.toString() );
@@ -571,50 +877,156 @@ namespace mongo {
log(1) << "ChunkManager::drop : " << _ns << "\t DONE" << endl;
+ configServer.logChange( "dropCollection" , _ns , BSONObj() );
}
void ChunkManager::save(){
- ShardChunkVersion a = getVersion();
+ rwlock lk( _lock , true );
+ save_inlock();
+ }
+
+ void ChunkManager::save_inlock(){
+
+ ShardChunkVersion a = getVersion_inlock();
+ assert( a > 0 || _chunkMap.size() <= 1 );
+ ShardChunkVersion nextChunkVersion = a.incMajor();
+ vector<ChunkPtr> toFix;
+ vector<ShardChunkVersion> newVersions;
+
+ BSONObjBuilder cmdBuilder;
+ BSONArrayBuilder updates( cmdBuilder.subarrayStart( "applyOps" ) );
- set<string> withRealChunks;
- for ( vector<Chunk*>::const_iterator i=_chunks.begin(); i!=_chunks.end(); i++ ){
- Chunk* c = *i;
+ int numOps = 0;
+ for ( ChunkMap::const_iterator i=_chunkMap.begin(); i!=_chunkMap.end(); ++i ){
+ ChunkPtr c = i->second;
if ( ! c->_modified )
continue;
- c->save( true );
+
+ numOps++;
_sequenceNumber = ++NextSequenceNumber;
- withRealChunks.insert( c->getShard() );
+ ShardChunkVersion myVersion = nextChunkVersion;
+ ++nextChunkVersion;
+ toFix.push_back( c );
+ newVersions.push_back( myVersion );
+
+ BSONObjBuilder op;
+ op.append( "op" , "u" );
+ op.appendBool( "b" , true );
+ op.append( "ns" , ShardNS::chunk );
+
+ BSONObjBuilder n( op.subobjStart( "o" ) );
+ c->serialize( n , myVersion );
+ n.done();
+
+ BSONObjBuilder q( op.subobjStart( "o2" ) );
+ q.append( "_id" , c->genID() );
+ q.done();
+
+ updates.append( op.obj() );
}
- massert( 10417 , "how did version get smalled" , getVersion() >= a );
+ if ( numOps == 0 )
+ return;
+
+ updates.done();
+
+ if ( a > 0 || _chunkMap.size() > 1 ){
+ BSONArrayBuilder temp( cmdBuilder.subarrayStart( "preCondition" ) );
+ BSONObjBuilder b;
+ b.append( "ns" , ShardNS::chunk );
+ b.append( "q" , BSON( "query" << BSON( "ns" << _ns ) << "orderby" << BSON( "lastmod" << -1 ) ) );
+ {
+ BSONObjBuilder bb( b.subobjStart( "res" ) );
+ bb.appendTimestamp( "lastmod" , a );
+ bb.done();
+ }
+ temp.append( b.obj() );
+ temp.done();
+ }
+
+ BSONObj cmd = cmdBuilder.obj();
+
+ log(7) << "ChunkManager::save update: " << cmd << endl;
+
+ ScopedDbConnection conn( Chunk(0).modelServer() );
+ BSONObj res;
+ bool ok = conn->runCommand( "config" , cmd , res );
+ conn.done();
+
+ if ( ! ok ){
+ stringstream ss;
+ ss << "saving chunks failed. cmd: " << cmd << " result: " << res;
+ log( LL_ERROR ) << ss.str() << endl;
+ msgasserted( 13327 , ss.str() );
+ }
+
+ for ( unsigned i=0; i<toFix.size(); i++ ){
+ toFix[i]->_lastmod = newVersions[i];
+ }
- ensureIndex(); // TODO: this is too aggressive - but not really sooo bad
+ massert( 10417 , "how did version get smalled" , getVersion_inlock() >= a );
+
+ ensureIndex_inlock(); // TODO: this is too aggressive - but not really sooo bad
}
- ShardChunkVersion ChunkManager::getVersion( const string& server ) const{
+ void ChunkManager::maybeChunkCollection() {
+ uassert( 13346 , "can't pre-split already splitted collection" , (_chunkMap.size() == 1) );
+
+ ChunkPtr soleChunk = _chunkMap.begin()->second;
+ vector<BSONObj> splitPoints;
+ soleChunk->pickSplitVector( &splitPoints );
+ if ( splitPoints.empty() ){
+ log(1) << "not enough data to warrant chunking " << getns() << endl;
+ return;
+ }
+
+ soleChunk->multiSplit( splitPoints );
+ }
+
+ ShardChunkVersion ChunkManager::getVersionOnConfigServer() const {
+ static Chunk temp(0);
+
+ ScopedDbConnection conn( temp.modelServer() );
+
+ auto_ptr<DBClientCursor> cursor = conn->query(temp.getNS(), QUERY("ns" << _ns).sort("lastmod",1), 1 );
+ assert( cursor.get() );
+ BSONObj o;
+ if ( cursor->more() )
+ o = cursor->next();
+ conn.done();
+
+ return o["lastmod"];
+ }
+
+ ShardChunkVersion ChunkManager::getVersion( const Shard& shard ) const{
+ rwlock lk( _lock , false );
// TODO: cache or something?
ShardChunkVersion max = 0;
- for ( vector<Chunk*>::const_iterator i=_chunks.begin(); i!=_chunks.end(); i++ ){
- Chunk* c = *i;
- if ( c->getShard() != server )
+ for ( ChunkMap::const_iterator i=_chunkMap.begin(); i!=_chunkMap.end(); ++i ){
+ ChunkPtr c = i->second;
+ DEV assert( c );
+ if ( c->getShard() != shard )
continue;
-
if ( c->_lastmod > max )
max = c->_lastmod;
}
-
return max;
}
ShardChunkVersion ChunkManager::getVersion() const{
+ rwlock lk( _lock , false );
+ return getVersion_inlock();
+ }
+
+ ShardChunkVersion ChunkManager::getVersion_inlock() const{
ShardChunkVersion max = 0;
-
- for ( vector<Chunk*>::const_iterator i=_chunks.begin(); i!=_chunks.end(); i++ ){
- Chunk* c = *i;
+
+ for ( ChunkMap::const_iterator i=_chunkMap.begin(); i!=_chunkMap.end(); ++i ){
+ ChunkPtr c = i->second;
if ( c->_lastmod > max )
max = c->_lastmod;
}
@@ -623,27 +1035,208 @@ namespace mongo {
}
string ChunkManager::toString() const {
+ rwlock lk( _lock , false );
+
stringstream ss;
- ss << "ChunkManager: " << _ns << " key:" << _key.toString() << "\n";
- for ( vector<Chunk*>::const_iterator i=_chunks.begin(); i!=_chunks.end(); i++ ){
- const Chunk* c = *i;
- ss << "\t" << c->toString() << "\n";
+ ss << "ChunkManager: " << _ns << " key:" << _key.toString() << '\n';
+ for ( ChunkMap::const_iterator i=_chunkMap.begin(); i!=_chunkMap.end(); ++i ){
+ const ChunkPtr c = i->second;
+ ss << "\t" << c->toString() << '\n';
}
return ss.str();
}
+
+ void ChunkManager::_migrationNotification(Chunk* c){
+ _chunkRanges.reloadRange(_chunkMap, c->getMin(), c->getMax());
+ _shards.insert(c->getShard());
+ }
+
+ void ChunkRangeManager::assertValid() const{
+ if (_ranges.empty())
+ return;
+
+ try {
+ // No Nulls
+ for (ChunkRangeMap::const_iterator it=_ranges.begin(), end=_ranges.end(); it != end; ++it){
+ assert(it->second);
+ }
+
+ // Check endpoints
+ assert(allOfType(MinKey, _ranges.begin()->second->getMin()));
+ assert(allOfType(MaxKey, prior(_ranges.end())->second->getMax()));
+
+ // Make sure there are no gaps or overlaps
+ for (ChunkRangeMap::const_iterator it=boost::next(_ranges.begin()), end=_ranges.end(); it != end; ++it){
+ ChunkRangeMap::const_iterator last = prior(it);
+ assert(it->second->getMin() == last->second->getMax());
+ }
+
+ // Check Map keys
+ for (ChunkRangeMap::const_iterator it=_ranges.begin(), end=_ranges.end(); it != end; ++it){
+ assert(it->first == it->second->getMax());
+ }
+
+ // Make sure we match the original chunks
+ const ChunkMap chunks = _ranges.begin()->second->getManager()->_chunkMap;
+ for ( ChunkMap::const_iterator i=chunks.begin(); i!=chunks.end(); ++i ){
+ const ChunkPtr chunk = i->second;
+
+ ChunkRangeMap::const_iterator min = _ranges.upper_bound(chunk->getMin());
+ ChunkRangeMap::const_iterator max = _ranges.lower_bound(chunk->getMax());
+
+ assert(min != _ranges.end());
+ assert(max != _ranges.end());
+ assert(min == max);
+ assert(min->second->getShard() == chunk->getShard());
+ assert(min->second->contains( chunk->getMin() ));
+ assert(min->second->contains( chunk->getMax() ) || (min->second->getMax() == chunk->getMax()));
+ }
+
+ } catch (...) {
+ log( LL_ERROR ) << "\t invalid ChunkRangeMap! printing ranges:" << endl;
+
+ for (ChunkRangeMap::const_iterator it=_ranges.begin(), end=_ranges.end(); it != end; ++it)
+ cout << it->first << ": " << *it->second << endl;
+
+ throw;
+ }
+ }
+
+ void ChunkRangeManager::reloadRange(const ChunkMap& chunks, const BSONObj& min, const BSONObj& max){
+ if (_ranges.empty()){
+ reloadAll(chunks);
+ return;
+ }
+
+ ChunkRangeMap::iterator low = _ranges.upper_bound(min);
+ ChunkRangeMap::iterator high = _ranges.lower_bound(max);
+
+ assert(low != _ranges.end());
+ assert(high != _ranges.end());
+ assert(low->second);
+ assert(high->second);
+
+ ChunkMap::const_iterator begin = chunks.upper_bound(low->second->getMin());
+ ChunkMap::const_iterator end = chunks.lower_bound(high->second->getMax());
+
+ assert(begin != chunks.end());
+ assert(end != chunks.end());
+
+ // C++ end iterators are one-past-last
+ ++high;
+ ++end;
+
+ // update ranges
+ _ranges.erase(low, high); // invalidates low
+ _insertRange(begin, end);
+
+ assert(!_ranges.empty());
+ DEV assertValid();
+
+ // merge low-end if possible
+ low = _ranges.upper_bound(min);
+ assert(low != _ranges.end());
+ if (low != _ranges.begin()){
+ shared_ptr<ChunkRange> a = prior(low)->second;
+ shared_ptr<ChunkRange> b = low->second;
+ if (a->getShard() == b->getShard()){
+ shared_ptr<ChunkRange> cr (new ChunkRange(*a, *b));
+ _ranges.erase(prior(low));
+ _ranges.erase(low); // invalidates low
+ _ranges[cr->getMax()] = cr;
+ }
+ }
+
+ DEV assertValid();
+
+ // merge high-end if possible
+ high = _ranges.lower_bound(max);
+ if (high != prior(_ranges.end())){
+ shared_ptr<ChunkRange> a = high->second;
+ shared_ptr<ChunkRange> b = boost::next(high)->second;
+ if (a->getShard() == b->getShard()){
+ shared_ptr<ChunkRange> cr (new ChunkRange(*a, *b));
+ _ranges.erase(boost::next(high));
+ _ranges.erase(high); //invalidates high
+ _ranges[cr->getMax()] = cr;
+ }
+ }
+
+ DEV assertValid();
+ }
+
+ void ChunkRangeManager::reloadAll(const ChunkMap& chunks){
+ _ranges.clear();
+ _insertRange(chunks.begin(), chunks.end());
+
+ DEV assertValid();
+ }
+
+ void ChunkRangeManager::_insertRange(ChunkMap::const_iterator begin, const ChunkMap::const_iterator end){
+ while (begin != end){
+ ChunkMap::const_iterator first = begin;
+ Shard shard = first->second->getShard();
+ while (begin != end && (begin->second->getShard() == shard))
+ ++begin;
+
+ shared_ptr<ChunkRange> cr (new ChunkRange(first, begin));
+ _ranges[cr->getMax()] = cr;
+ }
+ }
class ChunkObjUnitTest : public UnitTest {
public:
void runShard(){
-
+ ChunkPtr c;
+ assert( ! c );
+ c.reset( new Chunk( 0 ) );
+ assert( c );
}
+ void runShardChunkVersion(){
+ vector<ShardChunkVersion> all;
+ all.push_back( ShardChunkVersion(1,1) );
+ all.push_back( ShardChunkVersion(1,2) );
+ all.push_back( ShardChunkVersion(2,1) );
+ all.push_back( ShardChunkVersion(2,2) );
+
+ for ( unsigned i=0; i<all.size(); i++ ){
+ for ( unsigned j=i+1; j<all.size(); j++ ){
+ assert( all[i] < all[j] );
+ }
+ }
+
+ }
+
void run(){
runShard();
+ runShardChunkVersion();
log(1) << "shardObjTest passed" << endl;
}
} shardObjTest;
+ // ----- to be removed ---
+ extern OID serverID;
+ bool setShardVersion( DBClientBase & conn , const string& ns , ShardChunkVersion version , bool authoritative , BSONObj& result ){
+ BSONObjBuilder cmdBuilder;
+ cmdBuilder.append( "setShardVersion" , ns.c_str() );
+ cmdBuilder.append( "configdb" , configServer.modelServer() );
+ cmdBuilder.appendTimestamp( "version" , version.toLong() );
+ cmdBuilder.appendOID( "serverID" , &serverID );
+ if ( authoritative )
+ cmdBuilder.appendBool( "authoritative" , 1 );
+
+ Shard s = Shard::make( conn.getServerAddress() );
+ cmdBuilder.append( "shard" , s.getName() );
+ cmdBuilder.append( "shardHost" , s.getConnString() );
+ BSONObj cmd = cmdBuilder.obj();
+
+ log(1) << " setShardVersion " << s.getName() << " " << conn.getServerAddress() << " " << ns << " " << cmd << " " << &conn << endl;
+
+ return conn.runCommand( "admin" , cmd , result );
+ }
+
+
} // namespace mongo