diff options
Diffstat (limited to 's/d_migrate.cpp')
-rw-r--r-- | s/d_migrate.cpp | 1197 |
1 files changed, 867 insertions, 330 deletions
diff --git a/s/d_migrate.cpp b/s/d_migrate.cpp index 8e9584c..2878276 100644 --- a/s/d_migrate.cpp +++ b/s/d_migrate.cpp @@ -25,18 +25,24 @@ #include "pch.h" #include <map> #include <string> +#include <algorithm> #include "../db/commands.h" #include "../db/jsobj.h" #include "../db/dbmessage.h" #include "../db/query.h" #include "../db/cmdline.h" +#include "../db/queryoptimizer.h" +#include "../db/btree.h" +#include "../db/repl_block.h" +#include "../db/dur.h" #include "../client/connpool.h" #include "../client/distlock.h" #include "../util/queue.h" #include "../util/unittest.h" +#include "../util/processinfo.h" #include "shard.h" #include "d_logic.h" @@ -49,131 +55,185 @@ namespace mongo { class MoveTimingHelper { public: - MoveTimingHelper( const string& where , const string& ns , BSONObj min , BSONObj max ) - : _where( where ) , _ns( ns ){ - _next = 1; + MoveTimingHelper( const string& where , const string& ns , BSONObj min , BSONObj max , int total ) + : _where( where ) , _ns( ns ) , _next( 0 ) , _total( total ) { + _nextNote = 0; _b.append( "min" , min ); _b.append( "max" , max ); } - ~MoveTimingHelper(){ - configServer.logChange( (string)"moveChunk." + _where , _ns, _b.obj() ); + ~MoveTimingHelper() { + // even if logChange doesn't throw, bson does + // sigh + try { + if ( _next != _total ) { + note( "aborted" ); + } + configServer.logChange( (string)"moveChunk." + _where , _ns, _b.obj() ); + } + catch ( const std::exception& e ) { + log( LL_WARNING ) << "couldn't record timing for moveChunk '" << _where << "': " << e.what() << endl; + } } - - void done( int step ){ - assert( step == _next++ ); - + + void done( int step ) { + assert( step == ++_next ); + assert( step <= _total ); + stringstream ss; ss << "step" << step; string s = ss.str(); - + CurOp * op = cc().curop(); if ( op ) op->setMessage( s.c_str() ); - else + else log( LL_WARNING ) << "op is null in MoveTimingHelper::done" << endl; - + _b.appendNumber( s , _t.millis() ); _t.reset(); + +#if 0 + // debugging for memory leak? + ProcessInfo pi; + ss << " v:" << pi.getVirtualMemorySize() + << " r:" << pi.getResidentSize(); + log() << ss.str() << endl; +#endif } - - + + + void note( const string& s ) { + string field = "note"; + if ( _nextNote > 0 ) { + StringBuilder buf; + buf << "note" << _nextNote; + field = buf.str(); + } + _nextNote++; + + _b.append( field , s ); + } + private: Timer _t; string _where; string _ns; - + int _next; - + int _total; // expected # of steps + int _nextNote; + BSONObjBuilder _b; + }; struct OldDataCleanup { + static AtomicUInt _numThreads; // how many threads are doing async cleanusp + string ns; BSONObj min; BSONObj max; set<CursorId> initial; - void doRemove(){ - ShardForceModeBlock sf; + + OldDataCleanup(){ + _numThreads++; + } + OldDataCleanup( const OldDataCleanup& other ) { + ns = other.ns; + min = other.min.getOwned(); + max = other.max.getOwned(); + initial = other.initial; + _numThreads++; + } + ~OldDataCleanup(){ + _numThreads--; + } + + void doRemove() { + ShardForceVersionOkModeBlock sf; writelock lk(ns); RemoveSaver rs("moveChunk",ns,"post-cleanup"); long long num = Helpers::removeRange( ns , min , max , true , false , cmdLine.moveParanoia ? &rs : 0 ); log() << "moveChunk deleted: " << num << endl; } + }; + AtomicUInt OldDataCleanup::_numThreads = 0; + static const char * const cleanUpThreadName = "cleanupOldData"; - - void _cleanupOldData( OldDataCleanup cleanup ){ + + void _cleanupOldData( OldDataCleanup cleanup ) { Client::initThread( cleanUpThreadName ); log() << " (start) waiting to cleanup " << cleanup.ns << " from " << cleanup.min << " -> " << cleanup.max << " # cursors:" << cleanup.initial.size() << endl; int loops = 0; Timer t; - while ( t.seconds() < 900 ){ // 15 minutes + while ( t.seconds() < 900 ) { // 15 minutes assert( dbMutex.getState() == 0 ); sleepmillis( 20 ); - + set<CursorId> now; - ClientCursor::find( cleanup.ns , now ); - + ClientCursor::find( cleanup.ns , now ); + set<CursorId> left; - for ( set<CursorId>::iterator i=cleanup.initial.begin(); i!=cleanup.initial.end(); ++i ){ + for ( set<CursorId>::iterator i=cleanup.initial.begin(); i!=cleanup.initial.end(); ++i ) { CursorId id = *i; if ( now.count(id) ) left.insert( id ); } - + if ( left.size() == 0 ) break; cleanup.initial = left; - - if ( ( loops++ % 200 ) == 0 ){ + + if ( ( loops++ % 200 ) == 0 ) { log() << " (looping " << loops << ") waiting to cleanup " << cleanup.ns << " from " << cleanup.min << " -> " << cleanup.max << " # cursors:" << cleanup.initial.size() << endl; - + stringstream ss; - for ( set<CursorId>::iterator i=cleanup.initial.begin(); i!=cleanup.initial.end(); ++i ){ + for ( set<CursorId>::iterator i=cleanup.initial.begin(); i!=cleanup.initial.end(); ++i ) { CursorId id = *i; ss << id << " "; } log() << " cursors: " << ss.str() << endl; } } - + cleanup.doRemove(); cc().shutdown(); } - void cleanupOldData( OldDataCleanup cleanup ){ + void cleanupOldData( OldDataCleanup cleanup ) { try { _cleanupOldData( cleanup ); } - catch ( std::exception& e ){ + catch ( std::exception& e ) { log() << " error cleaning old data:" << e.what() << endl; } - catch ( ... ){ + catch ( ... ) { log() << " unknown error cleaning old data" << endl; } } class ChunkCommandHelper : public Command { public: - ChunkCommandHelper( const char * name ) - : Command( name ){ + ChunkCommandHelper( const char * name ) + : Command( name ) { } - + virtual void help( stringstream& help ) const { - help << "internal should not be calling this directly" << endl; + help << "internal - should not be called directly" << endl; } virtual bool slaveOk() const { return false; } virtual bool adminOnly() const { return true; } - virtual LockType locktype() const { return NONE; } + virtual LockType locktype() const { return NONE; } }; - bool isInRange( const BSONObj& obj , const BSONObj& min , const BSONObj& max ){ + bool isInRange( const BSONObj& obj , const BSONObj& min , const BSONObj& max ) { BSONObj k = obj.extractFields( min, true ); return k.woCompare( min ) >= 0 && k.woCompare( max ) < 0; @@ -182,48 +242,57 @@ namespace mongo { class MigrateFromStatus { public: - - MigrateFromStatus() - : _mutex( "MigrateFromStatus" ){ + + MigrateFromStatus() : _m("MigrateFromStatus") { _active = false; _inCriticalSection = false; + _memoryUsed = 0; } - void start( string ns , const BSONObj& min , const BSONObj& max ){ + void start( string ns , const BSONObj& min , const BSONObj& max ) { + scoped_lock l(_m); // reads and writes _active + assert( ! _active ); - + assert( ! min.isEmpty() ); assert( ! max.isEmpty() ); assert( ns.size() ); - + _ns = ns; _min = min; _max = max; - - _deleted.clear(); - _reload.clear(); - + + assert( _cloneLocs.size() == 0 ); + assert( _deleted.size() == 0 ); + assert( _reload.size() == 0 ); + assert( _memoryUsed == 0 ); + _active = true; } - - void done(){ - if ( ! _active ) - return; - _active = false; - _inCriticalSection = false; - scoped_lock lk( _mutex ); + void done() { + readlock lk( _ns ); + _deleted.clear(); _reload.clear(); + _cloneLocs.clear(); + _memoryUsed = 0; + + scoped_lock l(_m); + _active = false; + _inCriticalSection = false; } - - void logOp( const char * opstr , const char * ns , const BSONObj& obj , BSONObj * patt ){ - if ( ! _active ) + + void logOp( const char * opstr , const char * ns , const BSONObj& obj , BSONObj * patt ) { + if ( ! _getActive() ) return; if ( _ns != ns ) return; - + + // no need to log if this is not an insertion, an update, or an actual deletion + // note: opstr 'db' isn't a deletion but a mention that a database exists (for replication + // machinery mostly) char op = opstr[0]; if ( op == 'n' || op =='c' || ( op == 'd' && opstr[1] == 'b' ) ) return; @@ -231,68 +300,68 @@ namespace mongo { BSONElement ide; if ( patt ) ide = patt->getField( "_id" ); - else + else ide = obj["_id"]; - - if ( ide.eoo() ){ + + if ( ide.eoo() ) { log( LL_WARNING ) << "logOpForSharding got mod with no _id, ignoring obj: " << obj << endl; return; } - + BSONObj it; - switch ( opstr[0] ){ - + switch ( opstr[0] ) { + case 'd': { - - if ( getThreadName() == cleanUpThreadName ){ + + if ( getThreadName() == cleanUpThreadName ) { // we don't want to xfer things we're cleaning // as then they'll be deleted on TO // which is bad return; } - + // can't filter deletes :( - scoped_lock lk( _mutex ); _deleted.push_back( ide.wrap() ); + _memoryUsed += ide.size() + 5; return; } - - case 'i': + + case 'i': it = obj; break; - - case 'u': - if ( ! Helpers::findById( cc() , _ns.c_str() , ide.wrap() , it ) ){ + + case 'u': + if ( ! Helpers::findById( cc() , _ns.c_str() , ide.wrap() , it ) ) { log( LL_WARNING ) << "logOpForSharding couldn't find: " << ide << " even though should have" << endl; return; } break; - + } - + if ( ! isInRange( it , _min , _max ) ) return; - - scoped_lock lk( _mutex ); + _reload.push_back( ide.wrap() ); + _memoryUsed += ide.size() + 5; } - void xfer( list<BSONObj> * l , BSONObjBuilder& b , const char * name , long long& size , bool explode ){ + void xfer( list<BSONObj> * l , BSONObjBuilder& b , const char * name , long long& size , bool explode ) { const long long maxSize = 1024 * 1024; - + if ( l->size() == 0 || size > maxSize ) return; - + BSONArrayBuilder arr(b.subarrayStart(name)); - - list<BSONObj>::iterator i = l->begin(); - - while ( i != l->end() && size < maxSize ){ + + list<BSONObj>::iterator i = l->begin(); + + while ( i != l->end() && size < maxSize ) { BSONObj t = *i; - if ( explode ){ + if ( explode ) { BSONObj it; - if ( Helpers::findById( cc() , _ns.c_str() , t, it ) ){ + if ( Helpers::findById( cc() , _ns.c_str() , t, it ) ) { arr.append( it ); size += it.objsize(); } @@ -303,12 +372,16 @@ namespace mongo { i = l->erase( i ); size += t.objsize(); } - + arr.done(); } - bool transferMods( string& errmsg , BSONObjBuilder& b ){ - if ( ! _active ){ + /** + * called from the dest of a migrate + * transfers mods from src to dest + */ + bool transferMods( string& errmsg , BSONObjBuilder& b ) { + if ( ! _getActive() ) { errmsg = "no active migration!"; return false; } @@ -318,8 +391,7 @@ namespace mongo { { readlock rl( _ns ); Client::Context cx( _ns ); - - scoped_lock lk( _mutex ); + xfer( &_deleted , b , "deleted" , size , false ); xfer( &_reload , b , "reload" , size , true ); } @@ -329,45 +401,201 @@ namespace mongo { return true; } - bool _inCriticalSection; + /** + * Get the disklocs that belong to the chunk migrated and sort them in _cloneLocs (to avoid seeking disk later) + * + * @param maxChunkSize number of bytes beyond which a chunk's base data (no indices) is considered too large to move + * @param errmsg filled with textual description of error if this call return false + * @return false if approximate chunk size is too big to move or true otherwise + */ + bool storeCurrentLocs( long long maxChunkSize , string& errmsg , BSONObjBuilder& result ) { + readlock l( _ns ); + Client::Context ctx( _ns ); + NamespaceDetails *d = nsdetails( _ns.c_str() ); + if ( ! d ) { + errmsg = "ns not found, should be impossible"; + return false; + } + + BSONObj keyPattern; + // the copies are needed because the indexDetailsForRange destroys the input + BSONObj min = _min.copy(); + BSONObj max = _max.copy(); + IndexDetails *idx = indexDetailsForRange( _ns.c_str() , errmsg , min , max , keyPattern ); + if ( idx == NULL ) { + errmsg = "can't find index in storeCurrentLocs"; + return false; + } + + scoped_ptr<ClientCursor> cc( new ClientCursor( QueryOption_NoCursorTimeout , + shared_ptr<Cursor>( new BtreeCursor( d , d->idxNo(*idx) , *idx , min , max , false , 1 ) ) , + _ns ) ); + + // use the average object size to estimate how many objects a full chunk would carry + // do that while traversing the chunk's range using the sharding index, below + // there's a fair amout of slack before we determine a chunk is too large because object sizes will vary + unsigned long long maxRecsWhenFull; + long long avgRecSize; + const long long totalRecs = d->stats.nrecords; + if ( totalRecs > 0 ) { + avgRecSize = d->stats.datasize / totalRecs; + maxRecsWhenFull = maxChunkSize / avgRecSize; + maxRecsWhenFull = 130 * maxRecsWhenFull / 100; // slack + } + else { + avgRecSize = 0; + maxRecsWhenFull = numeric_limits<long long>::max(); + } + + // do a full traversal of the chunk and don't stop even if we think it is a large chunk + // we want the number of records to better report, in that case + bool isLargeChunk = false; + unsigned long long recCount = 0;; + while ( cc->ok() ) { + DiskLoc dl = cc->currLoc(); + if ( ! isLargeChunk ) { + _cloneLocs.insert( dl ); + } + cc->advance(); + + // we can afford to yield here because any change to the base data that we might miss is already being + // queued and will be migrated in the 'transferMods' stage + if ( ! cc->yieldSometimes() ) { + break; + } + + if ( ++recCount > maxRecsWhenFull ) { + isLargeChunk = true; + } + } + + if ( isLargeChunk ) { + warning() << "can't move chunk of size (aprox) " << recCount * avgRecSize + << " because maximum size allowed to move is " << maxChunkSize + << " ns: " << _ns << " " << _min << " -> " << _max + << endl; + result.appendBool( "chunkTooBig" , true ); + result.appendNumber( "chunkSize" , (long long)(recCount * avgRecSize) ); + errmsg = "chunk too big to move"; + return false; + } + + log() << "moveChunk number of documents: " << _cloneLocs.size() << endl; + return true; + } + + bool clone( string& errmsg , BSONObjBuilder& result ) { + if ( ! _getActive() ) { + errmsg = "not active"; + return false; + } + + readlock l( _ns ); + Client::Context ctx( _ns ); + + NamespaceDetails *d = nsdetails( _ns.c_str() ); + assert( d ); + + BSONArrayBuilder a( std::min( BSONObjMaxUserSize , (int)( ( 12 + d->averageObjectSize() )* _cloneLocs.size() ) ) ); + + set<DiskLoc>::iterator i = _cloneLocs.begin(); + for ( ; i!=_cloneLocs.end(); ++i ) { + DiskLoc dl = *i; + BSONObj o = dl.obj(); + + // use the builder size instead of accumulating 'o's size so that we take into consideration + // the overhead of BSONArray indices + if ( a.len() + o.objsize() + 1024 > BSONObjMaxUserSize ) { + break; + } + a.append( o ); + } + + result.appendArray( "objects" , a.arr() ); + _cloneLocs.erase( _cloneLocs.begin() , i ); + return true; + } + + void aboutToDelete( const Database* db , const DiskLoc& dl ) { + dbMutex.assertWriteLocked(); + + if ( ! _getActive() ) + return; + + if ( ! db->ownsNS( _ns ) ) + return; + + _cloneLocs.erase( dl ); + } + + long long mbUsed() const { return _memoryUsed / ( 1024 * 1024 ); } + + bool getInCriticalSection() const { scoped_lock l(_m); return _inCriticalSection; } + void setInCriticalSection( bool b ) { scoped_lock l(_m); _inCriticalSection = b; } + + bool isActive() const { return _getActive(); } private: - + mutable mongo::mutex _m; // protect _inCriticalSection and _active + bool _inCriticalSection; bool _active; string _ns; BSONObj _min; BSONObj _max; - list<BSONObj> _reload; - list<BSONObj> _deleted; + // disk locs yet to be transferred from here to the other side + // no locking needed because build by 1 thread in a read lock + // depleted by 1 thread in a read lock + // updates applied by 1 thread in a write lock + set<DiskLoc> _cloneLocs; + + list<BSONObj> _reload; // objects that were modified that must be recloned + list<BSONObj> _deleted; // objects deleted during clone that should be deleted later + long long _memoryUsed; // bytes in _reload + _deleted + + bool _getActive() const { scoped_lock l(_m); return _active; } + void _setActive( bool b ) { scoped_lock l(_m); _active = b; } - mongo::mutex _mutex; - } migrateFromStatus; - + struct MigrateStatusHolder { - MigrateStatusHolder( string ns , const BSONObj& min , const BSONObj& max ){ + MigrateStatusHolder( string ns , const BSONObj& min , const BSONObj& max ) { migrateFromStatus.start( ns , min , max ); } - ~MigrateStatusHolder(){ + ~MigrateStatusHolder() { migrateFromStatus.done(); } }; - void logOpForSharding( const char * opstr , const char * ns , const BSONObj& obj , BSONObj * patt ){ + void logOpForSharding( const char * opstr , const char * ns , const BSONObj& obj , BSONObj * patt ) { migrateFromStatus.logOp( opstr , ns , obj , patt ); } - class TransferModsCommand : public ChunkCommandHelper{ + void aboutToDeleteForSharding( const Database* db , const DiskLoc& dl ) { + migrateFromStatus.aboutToDelete( db , dl ); + } + + class TransferModsCommand : public ChunkCommandHelper { public: - TransferModsCommand() : ChunkCommandHelper( "_transferMods" ){} + TransferModsCommand() : ChunkCommandHelper( "_transferMods" ) {} - bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){ + bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool) { return migrateFromStatus.transferMods( errmsg, result ); } } transferModsCommand; + + class InitialCloneCommand : public ChunkCommandHelper { + public: + InitialCloneCommand() : ChunkCommandHelper( "_migrateClone" ) {} + + bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool) { + return migrateFromStatus.clone( errmsg, result ); + } + } initialCloneCommand; + + /** * this is the main entry for moveChunk * called to initial a move @@ -376,20 +604,22 @@ namespace mongo { */ class MoveChunkCommand : public Command { public: - MoveChunkCommand() : Command( "moveChunk" ){} + MoveChunkCommand() : Command( "moveChunk" ) {} virtual void help( stringstream& help ) const { help << "should not be calling this directly" << endl; } virtual bool slaveOk() const { return false; } virtual bool adminOnly() const { return true; } - virtual LockType locktype() const { return NONE; } - - - bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){ + virtual LockType locktype() const { return NONE; } + + + bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool) { // 1. parse options // 2. make sure my view is complete and lock // 3. start migrate + // in a read lock, get all DiskLoc and sort so we can do as little seeking as possible + // tell to start transferring // 4. pause till migrate caught up // 5. LOCK // a) update my config, essentially locking @@ -398,10 +628,9 @@ namespace mongo { // d) logChange to config server // 6. wait for all current cursors to expire // 7. remove data locally - + // ------------------------------- - - + // 1. string ns = cmdObj.firstElement().str(); string to = cmdObj["to"].str(); @@ -409,38 +638,45 @@ namespace mongo { BSONObj min = cmdObj["min"].Obj(); BSONObj max = cmdObj["max"].Obj(); BSONElement shardId = cmdObj["shardId"]; - - if ( ns.empty() ){ + BSONElement maxSizeElem = cmdObj["maxChunkSizeBytes"]; + + if ( ns.empty() ) { errmsg = "need to specify namespace in command"; return false; } - - if ( to.empty() ){ - errmsg = "need to specify server to move shard to"; + + if ( to.empty() ) { + errmsg = "need to specify server to move chunk to"; return false; } - if ( from.empty() ){ - errmsg = "need to specify server to move shard from (redundat i know)"; + if ( from.empty() ) { + errmsg = "need to specify server to move chunk from"; return false; } - - if ( min.isEmpty() ){ + + if ( min.isEmpty() ) { errmsg = "need to specify a min"; return false; } - if ( max.isEmpty() ){ + if ( max.isEmpty() ) { errmsg = "need to specify a max"; return false; } - - if ( shardId.eoo() ){ + + if ( shardId.eoo() ) { errmsg = "need shardId"; return false; } - - if ( ! shardingState.enabled() ){ - if ( cmdObj["configdb"].type() != String ){ + + if ( maxSizeElem.eoo() || ! maxSizeElem.isNumber() ) { + errmsg = "need to specify maxChunkSizeBytes"; + return false; + } + const long long maxChunkSize = maxSizeElem.numberLong(); // in bytes + + if ( ! shardingState.enabled() ) { + if ( cmdObj["configdb"].type() != String ) { errmsg = "sharding not enabled"; return false; } @@ -449,78 +685,107 @@ namespace mongo { configServer.init( configdb ); } - MoveTimingHelper timing( "from" , ns , min , max ); + MoveTimingHelper timing( "from" , ns , min , max , 6 /* steps */); Shard fromShard( from ); Shard toShard( to ); - - log() << "got movechunk: " << cmdObj << endl; + + log() << "received moveChunk request: " << cmdObj << endl; timing.done(1); - // 2. - + + // 2. DistributedLock lockSetup( ConnectionString( shardingState.getConfigServer() , ConnectionString::SYNC ) , ns ); dist_lock_try dlk( &lockSetup , (string)"migrate-" + min.toString() ); - if ( ! dlk.got() ){ - errmsg = "someone else has the lock"; + if ( ! dlk.got() ) { + errmsg = "the collection's metadata lock is taken"; result.append( "who" , dlk.other() ); return false; } + BSONObj chunkInfo = BSON("min" << min << "max" << max << "from" << fromShard.getName() << "to" << toShard.getName()); + configServer.logChange( "moveChunk.start" , ns , chunkInfo ); + ShardChunkVersion maxVersion; string myOldShard; { ScopedDbConnection conn( shardingState.getConfigServer() ); - + BSONObj x = conn->findOne( ShardNS::chunk , Query( BSON( "ns" << ns ) ).sort( BSON( "lastmod" << -1 ) ) ); maxVersion = x["lastmod"]; - x = conn->findOne( ShardNS::chunk , shardId.wrap( "_id" ) ); - assert( x["shard"].type() ); - myOldShard = x["shard"].String(); - - if ( myOldShard != fromShard.getName() ){ - errmsg = "i'm out of date"; + BSONObj currChunk = conn->findOne( ShardNS::chunk , shardId.wrap( "_id" ) ); + assert( currChunk["shard"].type() ); + assert( currChunk["min"].type() ); + assert( currChunk["max"].type() ); + myOldShard = currChunk["shard"].String(); + conn.done(); + + BSONObj currMin = currChunk["min"].Obj(); + BSONObj currMax = currChunk["max"].Obj(); + if ( currMin.woCompare( min ) || currMax.woCompare( max ) ) { + errmsg = "boundaries are outdated (likely a split occurred)"; + result.append( "currMin" , currMin ); + result.append( "currMax" , currMax ); + result.append( "requestedMin" , min ); + result.append( "requestedMax" , max ); + + log( LL_WARNING ) << "aborted moveChunk because" << errmsg << ": " << min << "->" << max + << " is now " << currMin << "->" << currMax << endl; + return false; + } + + if ( myOldShard != fromShard.getName() ) { + errmsg = "location is outdated (likely balance or migrate occurred)"; result.append( "from" , fromShard.getName() ); result.append( "official" , myOldShard ); + + log( LL_WARNING ) << "aborted moveChunk because " << errmsg << ": chunk is at " << myOldShard + << " and not at " << fromShard.getName() << endl; return false; } - - if ( maxVersion < shardingState.getVersion( ns ) ){ - errmsg = "official version less than mine?";; + + if ( maxVersion < shardingState.getVersion( ns ) ) { + errmsg = "official version less than mine?"; result.appendTimestamp( "officialVersion" , maxVersion ); result.appendTimestamp( "myVersion" , shardingState.getVersion( ns ) ); + + log( LL_WARNING ) << "aborted moveChunk because " << errmsg << ": official " << maxVersion + << " mine: " << shardingState.getVersion(ns) << endl; return false; } - conn.done(); + // since this could be the first call that enable sharding we also make sure to have the chunk manager up to date + shardingState.gotShardName( myOldShard ); + ShardChunkVersion shardVersion; + shardingState.trySetVersion( ns , shardVersion /* will return updated */ ); + + log() << "moveChunk request accepted at version " << shardVersion << endl; } - + timing.done(2); - + // 3. MigrateStatusHolder statusHolder( ns , min , max ); { - dblock lk; - // this makes sure there wasn't a write inside the .cpp code we can miss - } - - { - - ScopedDbConnection conn( to ); - BSONObj res; - bool ok = conn->runCommand( "admin" , - BSON( "_recvChunkStart" << ns << - "from" << from << - "min" << min << - "max" << max << - "configServer" << configServer.modelServer() - ) , - res ); - conn.done(); + // this gets a read lock, so we know we have a checkpoint for mods + if ( ! migrateFromStatus.storeCurrentLocs( maxChunkSize , errmsg , result ) ) + return false; - if ( ! ok ){ - errmsg = "_recvChunkStart failed: "; + ScopedDbConnection connTo( to ); + BSONObj res; + bool ok = connTo->runCommand( "admin" , + BSON( "_recvChunkStart" << ns << + "from" << from << + "min" << min << + "max" << max << + "configServer" << configServer.modelServer() + ) , + res ); + connTo.done(); + + if ( ! ok ) { + errmsg = "moveChunk failed to engage TO-shard in the data transfer: "; assert( res["errmsg"].type() ); errmsg += res["errmsg"].String(); result.append( "cause" , res ); @@ -529,118 +794,275 @@ namespace mongo { } timing.done( 3 ); - - // 4. - for ( int i=0; i<86400; i++ ){ // don't want a single chunk move to take more than a day + + // 4. + for ( int i=0; i<86400; i++ ) { // don't want a single chunk move to take more than a day assert( dbMutex.getState() == 0 ); - sleepsecs( 1 ); + sleepsecs( 1 ); ScopedDbConnection conn( to ); BSONObj res; bool ok = conn->runCommand( "admin" , BSON( "_recvChunkStatus" << 1 ) , res ); res = res.getOwned(); conn.done(); - - log(0) << "_recvChunkStatus : " << res << endl; - - if ( ! ok || res["state"].String() == "fail" ){ - log( LL_ERROR ) << "_recvChunkStatus error : " << res << endl; - errmsg = "_recvChunkStatus error"; - result.append( "cause" ,res ); + + log(0) << "moveChunk data transfer progress: " << res << " my mem used: " << migrateFromStatus.mbUsed() << endl; + + if ( ! ok || res["state"].String() == "fail" ) { + log( LL_WARNING ) << "moveChunk error transfering data caused migration abort: " << res << endl; + errmsg = "data transfer error"; + result.append( "cause" , res ); return false; } if ( res["state"].String() == "steady" ) break; + if ( migrateFromStatus.mbUsed() > (500 * 1024 * 1024) ) { + // this is too much memory for us to use for this + // so we're going to abort the migrate + ScopedDbConnection conn( to ); + BSONObj res; + conn->runCommand( "admin" , BSON( "_recvChunkAbort" << 1 ) , res ); + res = res.getOwned(); + conn.done(); + error() << "aborting migrate because too much memory used res: " << res << endl; + errmsg = "aborting migrate because too much memory used"; + result.appendBool( "split" , true ); + return false; + } + killCurrentOp.checkForInterrupt(); } timing.done(4); // 5. - { + { // 5.a - migrateFromStatus._inCriticalSection = true; - ShardChunkVersion myVersion = maxVersion; + // we're under the collection lock here, so no other migrate can change maxVersion or ShardChunkManager state + migrateFromStatus.setInCriticalSection( true ); + ShardChunkVersion currVersion = maxVersion; + ShardChunkVersion myVersion = currVersion; myVersion.incMajor(); - + { - dblock lk; + writelock lk( ns ); assert( myVersion > shardingState.getVersion( ns ) ); - shardingState.setVersion( ns , myVersion ); - assert( myVersion == shardingState.getVersion( ns ) ); - log() << "moveChunk locking myself to: " << myVersion << endl; + + // bump the chunks manager's version up and "forget" about the chunk being moved + // this is not the commit point but in practice the state in this shard won't until the commit it done + shardingState.donateChunk( ns , min , max , myVersion ); } - + log() << "moveChunk setting version to: " << myVersion << endl; + // 5.b + // we're under the collection lock here, too, so we can undo the chunk donation because no other state change + // could be ongoing { BSONObj res; - ScopedDbConnection conn( to ); - bool ok = conn->runCommand( "admin" , - BSON( "_recvChunkCommit" << 1 ) , - res ); - conn.done(); - log() << "moveChunk commit result: " << res << endl; - if ( ! ok ){ - log() << "_recvChunkCommit failed: " << res << endl; + ScopedDbConnection connTo( to ); + bool ok = connTo->runCommand( "admin" , + BSON( "_recvChunkCommit" << 1 ) , + res ); + connTo.done(); + + if ( ! ok ) { + { + writelock lk( ns ); + + // revert the chunk manager back to the state before "forgetting" about the chunk + shardingState.undoDonateChunk( ns , min , max , currVersion ); + } + + log() << "movChunk migrate commit not accepted by TO-shard: " << res + << " resetting shard version to: " << currVersion << endl; + errmsg = "_recvChunkCommit failed!"; result.append( "cause" , res ); return false; } + + log() << "moveChunk migrate commit accepted by TO-shard: " << res << endl; } - + // 5.c - ScopedDbConnection conn( shardingState.getConfigServer() ); - - BSONObjBuilder temp; - temp.append( "shard" , toShard.getName() ); - temp.appendTimestamp( "lastmod" , myVersion ); - - conn->update( ShardNS::chunk , shardId.wrap( "_id" ) , BSON( "$set" << temp.obj() ) ); - - { - // update another random chunk - BSONObj x = conn->findOne( ShardNS::chunk , Query( BSON( "ns" << ns << "shard" << myOldShard ) ).sort( BSON( "lastmod" << -1 ) ) ); - if ( ! x.isEmpty() ){ - - BSONObjBuilder temp2; - myVersion.incMinor(); - - temp2.appendTimestamp( "lastmod" , myVersion ); - - shardingState.setVersion( ns , myVersion ); - - conn->update( ShardNS::chunk , x["_id"].wrap() , BSON( "$set" << temp2.obj() ) ); - - log() << "moveChunk updating self to: " << myVersion << endl; + + // version at which the next highest lastmod will be set + // if the chunk being moved is the last in the shard, nextVersion is that chunk's lastmod + // otherwise the highest version is from the chunk being bumped on the FROM-shard + ShardChunkVersion nextVersion; + + // we want to go only once to the configDB but perhaps change two chunks, the one being migrated and another + // local one (so to bump version for the entire shard) + // we use the 'applyOps' mechanism to group the two updates and make them safer + // TODO pull config update code to a module + + BSONObjBuilder cmdBuilder; + + BSONArrayBuilder updates( cmdBuilder.subarrayStart( "applyOps" ) ); + { + // update for the chunk being moved + BSONObjBuilder op; + op.append( "op" , "u" ); + op.appendBool( "b" , false /* no upserting */ ); + op.append( "ns" , ShardNS::chunk ); + + BSONObjBuilder n( op.subobjStart( "o" ) ); + n.append( "_id" , Chunk::genID( ns , min ) ); + n.appendTimestamp( "lastmod" , myVersion /* same as used on donateChunk */ ); + n.append( "ns" , ns ); + n.append( "min" , min ); + n.append( "max" , max ); + n.append( "shard" , toShard.getName() ); + n.done(); + + BSONObjBuilder q( op.subobjStart( "o2" ) ); + q.append( "_id" , Chunk::genID( ns , min ) ); + q.done(); + + updates.append( op.obj() ); + } + + nextVersion = myVersion; + + // if we have chunks left on the FROM shard, update the version of one of them as well + // we can figure that out by grabbing the chunkManager installed on 5.a + // TODO expose that manager when installing it + + ShardChunkManagerPtr chunkManager = shardingState.getShardChunkManager( ns ); + if( chunkManager->getNumChunks() > 0 ) { + + // get another chunk on that shard + BSONObj lookupKey; + BSONObj bumpMin, bumpMax; + do { + chunkManager->getNextChunk( lookupKey , &bumpMin , &bumpMax ); + lookupKey = bumpMin; + } + while( bumpMin == min ); + + BSONObjBuilder op; + op.append( "op" , "u" ); + op.appendBool( "b" , false ); + op.append( "ns" , ShardNS::chunk ); + + nextVersion.incMinor(); // same as used on donateChunk + BSONObjBuilder n( op.subobjStart( "o" ) ); + n.append( "_id" , Chunk::genID( ns , bumpMin ) ); + n.appendTimestamp( "lastmod" , nextVersion ); + n.append( "ns" , ns ); + n.append( "min" , bumpMin ); + n.append( "max" , bumpMax ); + n.append( "shard" , fromShard.getName() ); + n.done(); + + BSONObjBuilder q( op.subobjStart( "o2" ) ); + q.append( "_id" , Chunk::genID( ns , bumpMin ) ); + q.done(); + + updates.append( op.obj() ); + + log() << "moveChunk updating self version to: " << nextVersion << " through " + << bumpMin << " -> " << bumpMax << " for collection '" << ns << "'" << endl; + + } + else { + + log() << "moveChunk moved last chunk out for collection '" << ns << "'" << endl; + } + + updates.done(); + + BSONArrayBuilder preCond( cmdBuilder.subarrayStart( "preCondition" ) ); + { + BSONObjBuilder b; + b.append( "ns" , ShardNS::chunk ); + b.append( "q" , BSON( "query" << BSON( "ns" << ns ) << "orderby" << BSON( "lastmod" << -1 ) ) ); + { + BSONObjBuilder bb( b.subobjStart( "res" ) ); + bb.appendTimestamp( "lastmod" , maxVersion ); + bb.done(); } - else { - //++myVersion; - shardingState.setVersion( ns , 0 ); + preCond.append( b.obj() ); + } + + preCond.done(); + + BSONObj cmd = cmdBuilder.obj(); + log(7) << "moveChunk update: " << cmd << endl; + + bool ok = false; + BSONObj cmdResult; + try { + ScopedDbConnection conn( shardingState.getConfigServer() ); + ok = conn->runCommand( "config" , cmd , cmdResult ); + conn.done(); + } + catch ( DBException& e ) { + ok = false; + BSONObjBuilder b; + e.getInfo().append( b ); + cmdResult = b.obj(); + } + + if ( ! ok ) { + + // this could be a blip in the connectivity + // wait out a few seconds and check if the commit request made it + // + // if the commit made it to the config, we'll see the chunk in the new shard and there's no action + // if the commit did not make it, currently the only way to fix this state is to bounce the mongod so + // that the old state (before migrating) be brought in + + warning() << "moveChunk commit outcome ongoing: " << cmd << " for command :" << cmdResult << endl; + sleepsecs( 10 ); + + try { + ScopedDbConnection conn( shardingState.getConfigServer() ); + + // look for the chunk in this shard whose version got bumped + // we assume that if that mod made it to the config, the applyOps was successful + BSONObj doc = conn->findOne( ShardNS::chunk , Query(BSON( "ns" << ns )).sort( BSON("lastmod" << -1))); + ShardChunkVersion checkVersion = doc["lastmod"]; + + if ( checkVersion == nextVersion ) { + log() << "moveChunk commit confirmed" << endl; + + } + else { + error() << "moveChunk commit failed: version is at" + << checkVersion << " instead of " << nextVersion << endl; + error() << "TERMINATING" << endl; + dbexit( EXIT_SHARDING_ERROR ); + } + + conn.done(); - log() << "moveChunk now i'm empty" << endl; + } + catch ( ... ) { + error() << "moveChunk failed to get confirmation of commit" << endl; + error() << "TERMINATING" << endl; + dbexit( EXIT_SHARDING_ERROR ); } } - conn.done(); - migrateFromStatus._inCriticalSection = false; + migrateFromStatus.setInCriticalSection( false ); + // 5.d - configServer.logChange( "moveChunk" , ns , BSON( "min" << min << "max" << max << - "from" << fromShard.getName() << - "to" << toShard.getName() ) ); + configServer.logChange( "moveChunk.commit" , ns , chunkInfo ); } - + migrateFromStatus.done(); timing.done(5); - - { // 6. + { + // 6. OldDataCleanup c; c.ns = ns; c.min = min.getOwned(); c.max = max.getOwned(); ClientCursor::find( ns , c.initial ); - if ( c.initial.size() ){ + if ( c.initial.size() ) { log() << "forking for cleaning up chunk data" << endl; boost::thread t( boost::bind( &cleanupOldData , c ) ); } @@ -649,24 +1071,24 @@ namespace mongo { // 7. c.doRemove(); } - - + + } - timing.done(6); + timing.done(6); return true; - + } - + } moveChunkCmd; - bool ShardingState::inCriticalMigrateSection(){ - return migrateFromStatus._inCriticalSection; + bool ShardingState::inCriticalMigrateSection() { + return migrateFromStatus.getInCriticalSection(); } /* ----- below this are the "to" side commands - + command to initiate worker thread does initial clone @@ -679,71 +1101,74 @@ namespace mongo { class MigrateStatus { public: - - MigrateStatus(){ - active = false; - } - void prepare(){ + MigrateStatus() : m_active("MigrateStatus") { active = false; } + + void prepare() { + scoped_lock l(m_active); // reading and writing 'active' + assert( ! active ); state = READY; errmsg = ""; numCloned = 0; + clonedBytes = 0; numCatchup = 0; numSteady = 0; active = true; } - void go(){ + void go() { try { _go(); } - catch ( std::exception& e ){ + catch ( std::exception& e ) { state = FAIL; errmsg = e.what(); log( LL_ERROR ) << "migrate failed: " << e.what() << endl; } - catch ( ... ){ + catch ( ... ) { state = FAIL; errmsg = "UNKNOWN ERROR"; log( LL_ERROR ) << "migrate failed with unknown exception" << endl; } - active = false; + setActive( false ); } - - void _go(){ - assert( active ); + + void _go() { + assert( getActive() ); assert( state == READY ); assert( ! min.isEmpty() ); assert( ! max.isEmpty() ); - - MoveTimingHelper timing( "to" , ns , min , max ); - + + MoveTimingHelper timing( "to" , ns , min , max , 5 /* steps */ ); + ScopedDbConnection conn( from ); conn->getLastError(); // just test connection - { // 1. copy indexes + { + // 1. copy indexes auto_ptr<DBClientCursor> indexes = conn->getIndexes( ns ); vector<BSONObj> all; - while ( indexes->more() ){ + while ( indexes->more() ) { all.push_back( indexes->next().getOwned() ); } - + writelock lk( ns ); Client::Context ct( ns ); - + string system_indexes = cc().database()->name + ".system.indexes"; - for ( unsigned i=0; i<all.size(); i++ ){ + for ( unsigned i=0; i<all.size(); i++ ) { BSONObj idx = all[i]; - theDataFileMgr.insert( system_indexes.c_str() , idx.objdata() , idx.objsize() ); + theDataFileMgr.insertAndLog( system_indexes.c_str() , idx ); } - + timing.done(1); } - - { // 2. delete any data already in range + + { + // 2. delete any data already in range writelock lk( ns ); RemoveSaver rs( "moveChunk" , ns , "preCleanup" ); long long num = Helpers::removeRange( ns , min , max , true , false , cmdLine.moveParanoia ? &rs : 0 ); @@ -752,29 +1177,54 @@ namespace mongo { timing.done(2); } - - - { // 3. initial bulk clone + + + { + // 3. initial bulk clone state = CLONE; - auto_ptr<DBClientCursor> cursor = conn->query( ns , Query().minKey( min ).maxKey( max ) , /* QueryOption_Exhaust */ 0 ); - assert( cursor.get() ); - while ( cursor->more() ){ - BSONObj o = cursor->next().getOwned(); - { - writelock lk( ns ); - Helpers::upsert( ns , o ); + + while ( true ) { + BSONObj res; + if ( ! conn->runCommand( "admin" , BSON( "_migrateClone" << 1 ) , res ) ) { + state = FAIL; + errmsg = "_migrateClone failed: "; + errmsg += res.toString(); + error() << errmsg << endl; + conn.done(); + return; + } + + BSONObj arr = res["objects"].Obj(); + int thisTime = 0; + + BSONObjIterator i( arr ); + while( i.more() ) { + BSONObj o = i.next().Obj(); + { + writelock lk( ns ); + Helpers::upsert( ns , o ); + } + thisTime++; + numCloned++; + clonedBytes += o.objsize(); } - numCloned++; + + if ( thisTime == 0 ) + break; } timing.done(3); } - - { // 4. do bulk of mods + + // if running on a replicated system, we'll need to flush the docs we cloned to the secondaries + ReplTime lastOpApplied; + + { + // 4. do bulk of mods state = CATCHUP; - while ( true ){ + while ( true ) { BSONObj res; - if ( ! conn->runCommand( "admin" , BSON( "_transferMods" << 1 ) , res ) ){ + if ( ! conn->runCommand( "admin" , BSON( "_transferMods" << 1 ) , res ) ) { state = FAIL; errmsg = "_transferMods failed: "; errmsg += res.toString(); @@ -784,18 +1234,26 @@ namespace mongo { } if ( res["size"].number() == 0 ) break; - - apply( res ); + + apply( res , &lastOpApplied ); + + if ( state == ABORT ) { + timing.note( "aborted" ); + return; + } } timing.done(4); } - - { // 5. wait for commit + + { + // 5. wait for commit + Timer timeWaitingForCommit; + state = STEADY; - while ( state == STEADY || state == COMMIT_START ){ + while ( state == STEADY || state == COMMIT_START ) { BSONObj res; - if ( ! conn->runCommand( "admin" , BSON( "_transferMods" << 1 ) , res ) ){ + if ( ! conn->runCommand( "admin" , BSON( "_transferMods" << 1 ) , res ) ) { log() << "_transferMods failed in STEADY state: " << res << endl; errmsg = res.toString(); state = FAIL; @@ -803,36 +1261,48 @@ namespace mongo { return; } - if ( res["size"].number() > 0 && apply( res ) ) + if ( res["size"].number() > 0 && apply( res , &lastOpApplied ) ) continue; - - if ( state == COMMIT_START ) + + if ( state == COMMIT_START && flushPendingWrites( lastOpApplied ) ) break; sleepmillis( 10 ); } - + + if ( state == ABORT ) { + timing.note( "aborted" ); + return; + } + + if ( timeWaitingForCommit.seconds() > 86400 ) { + state = FAIL; + errmsg = "timed out waiting for commit"; + return; + } + timing.done(5); } - + state = DONE; conn.done(); } - void status( BSONObjBuilder& b ){ - b.appendBool( "active" , active ); + void status( BSONObjBuilder& b ) { + b.appendBool( "active" , getActive() ); b.append( "ns" , ns ); b.append( "from" , from ); b.append( "min" , min ); b.append( "max" , max ); - + b.append( "state" , stateString() ); if ( state == FAIL ) b.append( "errmsg" , errmsg ); { BSONObjBuilder bb( b.subobjStart( "counts" ) ); bb.append( "cloned" , numCloned ); + bb.append( "clonedBytes" , clonedBytes ); bb.append( "catchup" , numCatchup ); bb.append( "steady" , numSteady ); bb.done(); @@ -841,17 +1311,22 @@ namespace mongo { } - bool apply( const BSONObj& xfer ){ + bool apply( const BSONObj& xfer , ReplTime* lastOpApplied ) { + ReplTime dummy; + if ( lastOpApplied == NULL ) { + lastOpApplied = &dummy; + } + bool didAnything = false; - - if ( xfer["deleted"].isABSONObj() ){ + + if ( xfer["deleted"].isABSONObj() ) { writelock lk(ns); Client::Context cx(ns); - + RemoveSaver rs( "moveChunk" , ns , "removedDuring" ); BSONObjIterator i( xfer["deleted"].Obj() ); - while ( i.more() ){ + while ( i.more() ) { BSONObj id = i.next().Obj(); // do not apply deletes if they do not belong to the chunk being migrated @@ -865,27 +1340,56 @@ namespace mongo { } Helpers::removeRange( ns , id , id, false , true , cmdLine.moveParanoia ? &rs : 0 ); + + *lastOpApplied = cx.getClient()->getLastOp(); didAnything = true; } } - - if ( xfer["reload"].isABSONObj() ){ + + if ( xfer["reload"].isABSONObj() ) { writelock lk(ns); Client::Context cx(ns); BSONObjIterator i( xfer["reload"].Obj() ); - while ( i.more() ){ + while ( i.more() ) { BSONObj it = i.next().Obj(); + Helpers::upsert( ns , it ); + + *lastOpApplied = cx.getClient()->getLastOp(); didAnything = true; } } return didAnything; } - - string stateString(){ - switch ( state ){ + + bool flushPendingWrites( const ReplTime& lastOpApplied ) { + // if replication is on, try to force enough secondaries to catch up + // TODO opReplicatedEnough should eventually honor priorities and geo-awareness + // for now, we try to replicate to a sensible number of secondaries + const int slaveCount = getSlaveCount() / 2 + 1; + if ( ! opReplicatedEnough( lastOpApplied , slaveCount ) ) { + log( LL_WARNING ) << "migrate commit attempt timed out contacting " << slaveCount + << " slaves for '" << ns << "' " << min << " -> " << max << endl; + return false; + } + log() << "migrate commit succeeded flushing to secondaries for '" << ns << "' " << min << " -> " << max << endl; + + { + readlock lk(ns); // commitNow() currently requires it + + // if durability is on, force a write to journal + if ( getDur().commitNow() ) { + log() << "migrate commit flushed to journal for '" << ns << "' " << min << " -> " << max << endl; + } + } + + return true; + } + + string stateString() { + switch ( state ) { case READY: return "ready"; case CLONE: return "clone"; case CATCHUP: return "catchup"; @@ -893,17 +1397,18 @@ namespace mongo { case COMMIT_START: return "commitStart"; case DONE: return "done"; case FAIL: return "fail"; + case ABORT: return "abort"; } assert(0); return ""; } - bool startCommit(){ + bool startCommit() { if ( state != STEADY ) return false; state = COMMIT_START; - - for ( int i=0; i<86400; i++ ){ + + for ( int i=0; i<86400; i++ ) { sleepmillis(1); if ( state == DONE ) return true; @@ -912,42 +1417,60 @@ namespace mongo { return false; } + void abort() { + state = ABORT; + errmsg = "aborted"; + } + + bool getActive() const { scoped_lock l(m_active); return active; } + void setActive( bool b ) { scoped_lock l(m_active); active = b; } + + mutable mongo::mutex m_active; bool active; - + string ns; string from; - + BSONObj min; BSONObj max; - + long long numCloned; + long long clonedBytes; long long numCatchup; long long numSteady; - enum State { READY , CLONE , CATCHUP , STEADY , COMMIT_START , DONE , FAIL } state; + enum State { READY , CLONE , CATCHUP , STEADY , COMMIT_START , DONE , FAIL , ABORT } state; string errmsg; - + } migrateStatus; - - void migrateThread(){ + + void migrateThread() { Client::initThread( "migrateThread" ); migrateStatus.go(); cc().shutdown(); } - + class RecvChunkStartCommand : public ChunkCommandHelper { public: - RecvChunkStartCommand() : ChunkCommandHelper( "_recvChunkStart" ){} + RecvChunkStartCommand() : ChunkCommandHelper( "_recvChunkStart" ) {} virtual LockType locktype() const { return WRITE; } // this is so don't have to do locking internally - bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){ - - if ( migrateStatus.active ){ + bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool) { + + if ( migrateStatus.getActive() ) { errmsg = "migrate already in progress"; return false; } + if ( OldDataCleanup::_numThreads > 0 ) { + errmsg = + str::stream() + << "still waiting for a previous migrates data to get cleaned, can't accept new chunks, num threads: " + << OldDataCleanup::_numThreads; + return false; + } + if ( ! configServer.ok() ) configServer.init( cmdObj["configServer"].String() ); @@ -957,9 +1480,9 @@ namespace mongo { migrateStatus.from = cmdObj["from"].String(); migrateStatus.min = cmdObj["min"].Obj().getOwned(); migrateStatus.max = cmdObj["max"].Obj().getOwned(); - + boost::thread m( migrateThread ); - + result.appendBool( "started" , true ); return true; } @@ -968,20 +1491,20 @@ namespace mongo { class RecvChunkStatusCommand : public ChunkCommandHelper { public: - RecvChunkStatusCommand() : ChunkCommandHelper( "_recvChunkStatus" ){} + RecvChunkStatusCommand() : ChunkCommandHelper( "_recvChunkStatus" ) {} - bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){ + bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool) { migrateStatus.status( result ); return 1; } - + } recvChunkStatusCommand; class RecvChunkCommitCommand : public ChunkCommandHelper { public: - RecvChunkCommitCommand() : ChunkCommandHelper( "_recvChunkCommit" ){} - - bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){ + RecvChunkCommitCommand() : ChunkCommandHelper( "_recvChunkCommit" ) {} + + bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool) { bool ok = migrateStatus.startCommit(); migrateStatus.status( result ); return ok; @@ -989,10 +1512,22 @@ namespace mongo { } recvChunkCommitCommand; + class RecvChunkAbortCommand : public ChunkCommandHelper { + public: + RecvChunkAbortCommand() : ChunkCommandHelper( "_recvChunkAbort" ) {} + + bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool) { + migrateStatus.abort(); + migrateStatus.status( result ); + return true; + } + + } recvChunkAboortCommand; + class IsInRangeTest : public UnitTest { public: - void run(){ + void run() { BSONObj min = BSON( "x" << 1 ); BSONObj max = BSON( "x" << 5 ); @@ -1002,6 +1537,8 @@ namespace mongo { assert( isInRange( BSON( "x" << 4 ) , min , max ) ); assert( ! isInRange( BSON( "x" << 5 ) , min , max ) ); assert( ! isInRange( BSON( "x" << 6 ) , min , max ) ); + + log(1) << "isInRangeTest passed" << endl; } } isInRangeTest; } |