summaryrefslogtreecommitdiff
path: root/s/d_migrate.cpp
diff options
context:
space:
mode:
Diffstat (limited to 's/d_migrate.cpp')
-rw-r--r--s/d_migrate.cpp1197
1 files changed, 867 insertions, 330 deletions
diff --git a/s/d_migrate.cpp b/s/d_migrate.cpp
index 8e9584c..2878276 100644
--- a/s/d_migrate.cpp
+++ b/s/d_migrate.cpp
@@ -25,18 +25,24 @@
#include "pch.h"
#include <map>
#include <string>
+#include <algorithm>
#include "../db/commands.h"
#include "../db/jsobj.h"
#include "../db/dbmessage.h"
#include "../db/query.h"
#include "../db/cmdline.h"
+#include "../db/queryoptimizer.h"
+#include "../db/btree.h"
+#include "../db/repl_block.h"
+#include "../db/dur.h"
#include "../client/connpool.h"
#include "../client/distlock.h"
#include "../util/queue.h"
#include "../util/unittest.h"
+#include "../util/processinfo.h"
#include "shard.h"
#include "d_logic.h"
@@ -49,131 +55,185 @@ namespace mongo {
class MoveTimingHelper {
public:
- MoveTimingHelper( const string& where , const string& ns , BSONObj min , BSONObj max )
- : _where( where ) , _ns( ns ){
- _next = 1;
+ MoveTimingHelper( const string& where , const string& ns , BSONObj min , BSONObj max , int total )
+ : _where( where ) , _ns( ns ) , _next( 0 ) , _total( total ) {
+ _nextNote = 0;
_b.append( "min" , min );
_b.append( "max" , max );
}
- ~MoveTimingHelper(){
- configServer.logChange( (string)"moveChunk." + _where , _ns, _b.obj() );
+ ~MoveTimingHelper() {
+ // even if logChange doesn't throw, bson does
+ // sigh
+ try {
+ if ( _next != _total ) {
+ note( "aborted" );
+ }
+ configServer.logChange( (string)"moveChunk." + _where , _ns, _b.obj() );
+ }
+ catch ( const std::exception& e ) {
+ log( LL_WARNING ) << "couldn't record timing for moveChunk '" << _where << "': " << e.what() << endl;
+ }
}
-
- void done( int step ){
- assert( step == _next++ );
-
+
+ void done( int step ) {
+ assert( step == ++_next );
+ assert( step <= _total );
+
stringstream ss;
ss << "step" << step;
string s = ss.str();
-
+
CurOp * op = cc().curop();
if ( op )
op->setMessage( s.c_str() );
- else
+ else
log( LL_WARNING ) << "op is null in MoveTimingHelper::done" << endl;
-
+
_b.appendNumber( s , _t.millis() );
_t.reset();
+
+#if 0
+ // debugging for memory leak?
+ ProcessInfo pi;
+ ss << " v:" << pi.getVirtualMemorySize()
+ << " r:" << pi.getResidentSize();
+ log() << ss.str() << endl;
+#endif
}
-
-
+
+
+ void note( const string& s ) {
+ string field = "note";
+ if ( _nextNote > 0 ) {
+ StringBuilder buf;
+ buf << "note" << _nextNote;
+ field = buf.str();
+ }
+ _nextNote++;
+
+ _b.append( field , s );
+ }
+
private:
Timer _t;
string _where;
string _ns;
-
+
int _next;
-
+ int _total; // expected # of steps
+ int _nextNote;
+
BSONObjBuilder _b;
+
};
struct OldDataCleanup {
+ static AtomicUInt _numThreads; // how many threads are doing async cleanusp
+
string ns;
BSONObj min;
BSONObj max;
set<CursorId> initial;
- void doRemove(){
- ShardForceModeBlock sf;
+
+ OldDataCleanup(){
+ _numThreads++;
+ }
+ OldDataCleanup( const OldDataCleanup& other ) {
+ ns = other.ns;
+ min = other.min.getOwned();
+ max = other.max.getOwned();
+ initial = other.initial;
+ _numThreads++;
+ }
+ ~OldDataCleanup(){
+ _numThreads--;
+ }
+
+ void doRemove() {
+ ShardForceVersionOkModeBlock sf;
writelock lk(ns);
RemoveSaver rs("moveChunk",ns,"post-cleanup");
long long num = Helpers::removeRange( ns , min , max , true , false , cmdLine.moveParanoia ? &rs : 0 );
log() << "moveChunk deleted: " << num << endl;
}
+
};
+ AtomicUInt OldDataCleanup::_numThreads = 0;
+
static const char * const cleanUpThreadName = "cleanupOldData";
-
- void _cleanupOldData( OldDataCleanup cleanup ){
+
+ void _cleanupOldData( OldDataCleanup cleanup ) {
Client::initThread( cleanUpThreadName );
log() << " (start) waiting to cleanup " << cleanup.ns << " from " << cleanup.min << " -> " << cleanup.max << " # cursors:" << cleanup.initial.size() << endl;
int loops = 0;
Timer t;
- while ( t.seconds() < 900 ){ // 15 minutes
+ while ( t.seconds() < 900 ) { // 15 minutes
assert( dbMutex.getState() == 0 );
sleepmillis( 20 );
-
+
set<CursorId> now;
- ClientCursor::find( cleanup.ns , now );
-
+ ClientCursor::find( cleanup.ns , now );
+
set<CursorId> left;
- for ( set<CursorId>::iterator i=cleanup.initial.begin(); i!=cleanup.initial.end(); ++i ){
+ for ( set<CursorId>::iterator i=cleanup.initial.begin(); i!=cleanup.initial.end(); ++i ) {
CursorId id = *i;
if ( now.count(id) )
left.insert( id );
}
-
+
if ( left.size() == 0 )
break;
cleanup.initial = left;
-
- if ( ( loops++ % 200 ) == 0 ){
+
+ if ( ( loops++ % 200 ) == 0 ) {
log() << " (looping " << loops << ") waiting to cleanup " << cleanup.ns << " from " << cleanup.min << " -> " << cleanup.max << " # cursors:" << cleanup.initial.size() << endl;
-
+
stringstream ss;
- for ( set<CursorId>::iterator i=cleanup.initial.begin(); i!=cleanup.initial.end(); ++i ){
+ for ( set<CursorId>::iterator i=cleanup.initial.begin(); i!=cleanup.initial.end(); ++i ) {
CursorId id = *i;
ss << id << " ";
}
log() << " cursors: " << ss.str() << endl;
}
}
-
+
cleanup.doRemove();
cc().shutdown();
}
- void cleanupOldData( OldDataCleanup cleanup ){
+ void cleanupOldData( OldDataCleanup cleanup ) {
try {
_cleanupOldData( cleanup );
}
- catch ( std::exception& e ){
+ catch ( std::exception& e ) {
log() << " error cleaning old data:" << e.what() << endl;
}
- catch ( ... ){
+ catch ( ... ) {
log() << " unknown error cleaning old data" << endl;
}
}
class ChunkCommandHelper : public Command {
public:
- ChunkCommandHelper( const char * name )
- : Command( name ){
+ ChunkCommandHelper( const char * name )
+ : Command( name ) {
}
-
+
virtual void help( stringstream& help ) const {
- help << "internal should not be calling this directly" << endl;
+ help << "internal - should not be called directly" << endl;
}
virtual bool slaveOk() const { return false; }
virtual bool adminOnly() const { return true; }
- virtual LockType locktype() const { return NONE; }
+ virtual LockType locktype() const { return NONE; }
};
- bool isInRange( const BSONObj& obj , const BSONObj& min , const BSONObj& max ){
+ bool isInRange( const BSONObj& obj , const BSONObj& min , const BSONObj& max ) {
BSONObj k = obj.extractFields( min, true );
return k.woCompare( min ) >= 0 && k.woCompare( max ) < 0;
@@ -182,48 +242,57 @@ namespace mongo {
class MigrateFromStatus {
public:
-
- MigrateFromStatus()
- : _mutex( "MigrateFromStatus" ){
+
+ MigrateFromStatus() : _m("MigrateFromStatus") {
_active = false;
_inCriticalSection = false;
+ _memoryUsed = 0;
}
- void start( string ns , const BSONObj& min , const BSONObj& max ){
+ void start( string ns , const BSONObj& min , const BSONObj& max ) {
+ scoped_lock l(_m); // reads and writes _active
+
assert( ! _active );
-
+
assert( ! min.isEmpty() );
assert( ! max.isEmpty() );
assert( ns.size() );
-
+
_ns = ns;
_min = min;
_max = max;
-
- _deleted.clear();
- _reload.clear();
-
+
+ assert( _cloneLocs.size() == 0 );
+ assert( _deleted.size() == 0 );
+ assert( _reload.size() == 0 );
+ assert( _memoryUsed == 0 );
+
_active = true;
}
-
- void done(){
- if ( ! _active )
- return;
- _active = false;
- _inCriticalSection = false;
- scoped_lock lk( _mutex );
+ void done() {
+ readlock lk( _ns );
+
_deleted.clear();
_reload.clear();
+ _cloneLocs.clear();
+ _memoryUsed = 0;
+
+ scoped_lock l(_m);
+ _active = false;
+ _inCriticalSection = false;
}
-
- void logOp( const char * opstr , const char * ns , const BSONObj& obj , BSONObj * patt ){
- if ( ! _active )
+
+ void logOp( const char * opstr , const char * ns , const BSONObj& obj , BSONObj * patt ) {
+ if ( ! _getActive() )
return;
if ( _ns != ns )
return;
-
+
+ // no need to log if this is not an insertion, an update, or an actual deletion
+ // note: opstr 'db' isn't a deletion but a mention that a database exists (for replication
+ // machinery mostly)
char op = opstr[0];
if ( op == 'n' || op =='c' || ( op == 'd' && opstr[1] == 'b' ) )
return;
@@ -231,68 +300,68 @@ namespace mongo {
BSONElement ide;
if ( patt )
ide = patt->getField( "_id" );
- else
+ else
ide = obj["_id"];
-
- if ( ide.eoo() ){
+
+ if ( ide.eoo() ) {
log( LL_WARNING ) << "logOpForSharding got mod with no _id, ignoring obj: " << obj << endl;
return;
}
-
+
BSONObj it;
- switch ( opstr[0] ){
-
+ switch ( opstr[0] ) {
+
case 'd': {
-
- if ( getThreadName() == cleanUpThreadName ){
+
+ if ( getThreadName() == cleanUpThreadName ) {
// we don't want to xfer things we're cleaning
// as then they'll be deleted on TO
// which is bad
return;
}
-
+
// can't filter deletes :(
- scoped_lock lk( _mutex );
_deleted.push_back( ide.wrap() );
+ _memoryUsed += ide.size() + 5;
return;
}
-
- case 'i':
+
+ case 'i':
it = obj;
break;
-
- case 'u':
- if ( ! Helpers::findById( cc() , _ns.c_str() , ide.wrap() , it ) ){
+
+ case 'u':
+ if ( ! Helpers::findById( cc() , _ns.c_str() , ide.wrap() , it ) ) {
log( LL_WARNING ) << "logOpForSharding couldn't find: " << ide << " even though should have" << endl;
return;
}
break;
-
+
}
-
+
if ( ! isInRange( it , _min , _max ) )
return;
-
- scoped_lock lk( _mutex );
+
_reload.push_back( ide.wrap() );
+ _memoryUsed += ide.size() + 5;
}
- void xfer( list<BSONObj> * l , BSONObjBuilder& b , const char * name , long long& size , bool explode ){
+ void xfer( list<BSONObj> * l , BSONObjBuilder& b , const char * name , long long& size , bool explode ) {
const long long maxSize = 1024 * 1024;
-
+
if ( l->size() == 0 || size > maxSize )
return;
-
+
BSONArrayBuilder arr(b.subarrayStart(name));
-
- list<BSONObj>::iterator i = l->begin();
-
- while ( i != l->end() && size < maxSize ){
+
+ list<BSONObj>::iterator i = l->begin();
+
+ while ( i != l->end() && size < maxSize ) {
BSONObj t = *i;
- if ( explode ){
+ if ( explode ) {
BSONObj it;
- if ( Helpers::findById( cc() , _ns.c_str() , t, it ) ){
+ if ( Helpers::findById( cc() , _ns.c_str() , t, it ) ) {
arr.append( it );
size += it.objsize();
}
@@ -303,12 +372,16 @@ namespace mongo {
i = l->erase( i );
size += t.objsize();
}
-
+
arr.done();
}
- bool transferMods( string& errmsg , BSONObjBuilder& b ){
- if ( ! _active ){
+ /**
+ * called from the dest of a migrate
+ * transfers mods from src to dest
+ */
+ bool transferMods( string& errmsg , BSONObjBuilder& b ) {
+ if ( ! _getActive() ) {
errmsg = "no active migration!";
return false;
}
@@ -318,8 +391,7 @@ namespace mongo {
{
readlock rl( _ns );
Client::Context cx( _ns );
-
- scoped_lock lk( _mutex );
+
xfer( &_deleted , b , "deleted" , size , false );
xfer( &_reload , b , "reload" , size , true );
}
@@ -329,45 +401,201 @@ namespace mongo {
return true;
}
- bool _inCriticalSection;
+ /**
+ * Get the disklocs that belong to the chunk migrated and sort them in _cloneLocs (to avoid seeking disk later)
+ *
+ * @param maxChunkSize number of bytes beyond which a chunk's base data (no indices) is considered too large to move
+ * @param errmsg filled with textual description of error if this call return false
+ * @return false if approximate chunk size is too big to move or true otherwise
+ */
+ bool storeCurrentLocs( long long maxChunkSize , string& errmsg , BSONObjBuilder& result ) {
+ readlock l( _ns );
+ Client::Context ctx( _ns );
+ NamespaceDetails *d = nsdetails( _ns.c_str() );
+ if ( ! d ) {
+ errmsg = "ns not found, should be impossible";
+ return false;
+ }
+
+ BSONObj keyPattern;
+ // the copies are needed because the indexDetailsForRange destroys the input
+ BSONObj min = _min.copy();
+ BSONObj max = _max.copy();
+ IndexDetails *idx = indexDetailsForRange( _ns.c_str() , errmsg , min , max , keyPattern );
+ if ( idx == NULL ) {
+ errmsg = "can't find index in storeCurrentLocs";
+ return false;
+ }
+
+ scoped_ptr<ClientCursor> cc( new ClientCursor( QueryOption_NoCursorTimeout ,
+ shared_ptr<Cursor>( new BtreeCursor( d , d->idxNo(*idx) , *idx , min , max , false , 1 ) ) ,
+ _ns ) );
+
+ // use the average object size to estimate how many objects a full chunk would carry
+ // do that while traversing the chunk's range using the sharding index, below
+ // there's a fair amout of slack before we determine a chunk is too large because object sizes will vary
+ unsigned long long maxRecsWhenFull;
+ long long avgRecSize;
+ const long long totalRecs = d->stats.nrecords;
+ if ( totalRecs > 0 ) {
+ avgRecSize = d->stats.datasize / totalRecs;
+ maxRecsWhenFull = maxChunkSize / avgRecSize;
+ maxRecsWhenFull = 130 * maxRecsWhenFull / 100; // slack
+ }
+ else {
+ avgRecSize = 0;
+ maxRecsWhenFull = numeric_limits<long long>::max();
+ }
+
+ // do a full traversal of the chunk and don't stop even if we think it is a large chunk
+ // we want the number of records to better report, in that case
+ bool isLargeChunk = false;
+ unsigned long long recCount = 0;;
+ while ( cc->ok() ) {
+ DiskLoc dl = cc->currLoc();
+ if ( ! isLargeChunk ) {
+ _cloneLocs.insert( dl );
+ }
+ cc->advance();
+
+ // we can afford to yield here because any change to the base data that we might miss is already being
+ // queued and will be migrated in the 'transferMods' stage
+ if ( ! cc->yieldSometimes() ) {
+ break;
+ }
+
+ if ( ++recCount > maxRecsWhenFull ) {
+ isLargeChunk = true;
+ }
+ }
+
+ if ( isLargeChunk ) {
+ warning() << "can't move chunk of size (aprox) " << recCount * avgRecSize
+ << " because maximum size allowed to move is " << maxChunkSize
+ << " ns: " << _ns << " " << _min << " -> " << _max
+ << endl;
+ result.appendBool( "chunkTooBig" , true );
+ result.appendNumber( "chunkSize" , (long long)(recCount * avgRecSize) );
+ errmsg = "chunk too big to move";
+ return false;
+ }
+
+ log() << "moveChunk number of documents: " << _cloneLocs.size() << endl;
+ return true;
+ }
+
+ bool clone( string& errmsg , BSONObjBuilder& result ) {
+ if ( ! _getActive() ) {
+ errmsg = "not active";
+ return false;
+ }
+
+ readlock l( _ns );
+ Client::Context ctx( _ns );
+
+ NamespaceDetails *d = nsdetails( _ns.c_str() );
+ assert( d );
+
+ BSONArrayBuilder a( std::min( BSONObjMaxUserSize , (int)( ( 12 + d->averageObjectSize() )* _cloneLocs.size() ) ) );
+
+ set<DiskLoc>::iterator i = _cloneLocs.begin();
+ for ( ; i!=_cloneLocs.end(); ++i ) {
+ DiskLoc dl = *i;
+ BSONObj o = dl.obj();
+
+ // use the builder size instead of accumulating 'o's size so that we take into consideration
+ // the overhead of BSONArray indices
+ if ( a.len() + o.objsize() + 1024 > BSONObjMaxUserSize ) {
+ break;
+ }
+ a.append( o );
+ }
+
+ result.appendArray( "objects" , a.arr() );
+ _cloneLocs.erase( _cloneLocs.begin() , i );
+ return true;
+ }
+
+ void aboutToDelete( const Database* db , const DiskLoc& dl ) {
+ dbMutex.assertWriteLocked();
+
+ if ( ! _getActive() )
+ return;
+
+ if ( ! db->ownsNS( _ns ) )
+ return;
+
+ _cloneLocs.erase( dl );
+ }
+
+ long long mbUsed() const { return _memoryUsed / ( 1024 * 1024 ); }
+
+ bool getInCriticalSection() const { scoped_lock l(_m); return _inCriticalSection; }
+ void setInCriticalSection( bool b ) { scoped_lock l(_m); _inCriticalSection = b; }
+
+ bool isActive() const { return _getActive(); }
private:
-
+ mutable mongo::mutex _m; // protect _inCriticalSection and _active
+ bool _inCriticalSection;
bool _active;
string _ns;
BSONObj _min;
BSONObj _max;
- list<BSONObj> _reload;
- list<BSONObj> _deleted;
+ // disk locs yet to be transferred from here to the other side
+ // no locking needed because build by 1 thread in a read lock
+ // depleted by 1 thread in a read lock
+ // updates applied by 1 thread in a write lock
+ set<DiskLoc> _cloneLocs;
+
+ list<BSONObj> _reload; // objects that were modified that must be recloned
+ list<BSONObj> _deleted; // objects deleted during clone that should be deleted later
+ long long _memoryUsed; // bytes in _reload + _deleted
+
+ bool _getActive() const { scoped_lock l(_m); return _active; }
+ void _setActive( bool b ) { scoped_lock l(_m); _active = b; }
- mongo::mutex _mutex;
-
} migrateFromStatus;
-
+
struct MigrateStatusHolder {
- MigrateStatusHolder( string ns , const BSONObj& min , const BSONObj& max ){
+ MigrateStatusHolder( string ns , const BSONObj& min , const BSONObj& max ) {
migrateFromStatus.start( ns , min , max );
}
- ~MigrateStatusHolder(){
+ ~MigrateStatusHolder() {
migrateFromStatus.done();
}
};
- void logOpForSharding( const char * opstr , const char * ns , const BSONObj& obj , BSONObj * patt ){
+ void logOpForSharding( const char * opstr , const char * ns , const BSONObj& obj , BSONObj * patt ) {
migrateFromStatus.logOp( opstr , ns , obj , patt );
}
- class TransferModsCommand : public ChunkCommandHelper{
+ void aboutToDeleteForSharding( const Database* db , const DiskLoc& dl ) {
+ migrateFromStatus.aboutToDelete( db , dl );
+ }
+
+ class TransferModsCommand : public ChunkCommandHelper {
public:
- TransferModsCommand() : ChunkCommandHelper( "_transferMods" ){}
+ TransferModsCommand() : ChunkCommandHelper( "_transferMods" ) {}
- bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){
+ bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool) {
return migrateFromStatus.transferMods( errmsg, result );
}
} transferModsCommand;
+
+ class InitialCloneCommand : public ChunkCommandHelper {
+ public:
+ InitialCloneCommand() : ChunkCommandHelper( "_migrateClone" ) {}
+
+ bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool) {
+ return migrateFromStatus.clone( errmsg, result );
+ }
+ } initialCloneCommand;
+
+
/**
* this is the main entry for moveChunk
* called to initial a move
@@ -376,20 +604,22 @@ namespace mongo {
*/
class MoveChunkCommand : public Command {
public:
- MoveChunkCommand() : Command( "moveChunk" ){}
+ MoveChunkCommand() : Command( "moveChunk" ) {}
virtual void help( stringstream& help ) const {
help << "should not be calling this directly" << endl;
}
virtual bool slaveOk() const { return false; }
virtual bool adminOnly() const { return true; }
- virtual LockType locktype() const { return NONE; }
-
-
- bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){
+ virtual LockType locktype() const { return NONE; }
+
+
+ bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool) {
// 1. parse options
// 2. make sure my view is complete and lock
// 3. start migrate
+ // in a read lock, get all DiskLoc and sort so we can do as little seeking as possible
+ // tell to start transferring
// 4. pause till migrate caught up
// 5. LOCK
// a) update my config, essentially locking
@@ -398,10 +628,9 @@ namespace mongo {
// d) logChange to config server
// 6. wait for all current cursors to expire
// 7. remove data locally
-
+
// -------------------------------
-
-
+
// 1.
string ns = cmdObj.firstElement().str();
string to = cmdObj["to"].str();
@@ -409,38 +638,45 @@ namespace mongo {
BSONObj min = cmdObj["min"].Obj();
BSONObj max = cmdObj["max"].Obj();
BSONElement shardId = cmdObj["shardId"];
-
- if ( ns.empty() ){
+ BSONElement maxSizeElem = cmdObj["maxChunkSizeBytes"];
+
+ if ( ns.empty() ) {
errmsg = "need to specify namespace in command";
return false;
}
-
- if ( to.empty() ){
- errmsg = "need to specify server to move shard to";
+
+ if ( to.empty() ) {
+ errmsg = "need to specify server to move chunk to";
return false;
}
- if ( from.empty() ){
- errmsg = "need to specify server to move shard from (redundat i know)";
+ if ( from.empty() ) {
+ errmsg = "need to specify server to move chunk from";
return false;
}
-
- if ( min.isEmpty() ){
+
+ if ( min.isEmpty() ) {
errmsg = "need to specify a min";
return false;
}
- if ( max.isEmpty() ){
+ if ( max.isEmpty() ) {
errmsg = "need to specify a max";
return false;
}
-
- if ( shardId.eoo() ){
+
+ if ( shardId.eoo() ) {
errmsg = "need shardId";
return false;
}
-
- if ( ! shardingState.enabled() ){
- if ( cmdObj["configdb"].type() != String ){
+
+ if ( maxSizeElem.eoo() || ! maxSizeElem.isNumber() ) {
+ errmsg = "need to specify maxChunkSizeBytes";
+ return false;
+ }
+ const long long maxChunkSize = maxSizeElem.numberLong(); // in bytes
+
+ if ( ! shardingState.enabled() ) {
+ if ( cmdObj["configdb"].type() != String ) {
errmsg = "sharding not enabled";
return false;
}
@@ -449,78 +685,107 @@ namespace mongo {
configServer.init( configdb );
}
- MoveTimingHelper timing( "from" , ns , min , max );
+ MoveTimingHelper timing( "from" , ns , min , max , 6 /* steps */);
Shard fromShard( from );
Shard toShard( to );
-
- log() << "got movechunk: " << cmdObj << endl;
+
+ log() << "received moveChunk request: " << cmdObj << endl;
timing.done(1);
- // 2.
-
+
+ // 2.
DistributedLock lockSetup( ConnectionString( shardingState.getConfigServer() , ConnectionString::SYNC ) , ns );
dist_lock_try dlk( &lockSetup , (string)"migrate-" + min.toString() );
- if ( ! dlk.got() ){
- errmsg = "someone else has the lock";
+ if ( ! dlk.got() ) {
+ errmsg = "the collection's metadata lock is taken";
result.append( "who" , dlk.other() );
return false;
}
+ BSONObj chunkInfo = BSON("min" << min << "max" << max << "from" << fromShard.getName() << "to" << toShard.getName());
+ configServer.logChange( "moveChunk.start" , ns , chunkInfo );
+
ShardChunkVersion maxVersion;
string myOldShard;
{
ScopedDbConnection conn( shardingState.getConfigServer() );
-
+
BSONObj x = conn->findOne( ShardNS::chunk , Query( BSON( "ns" << ns ) ).sort( BSON( "lastmod" << -1 ) ) );
maxVersion = x["lastmod"];
- x = conn->findOne( ShardNS::chunk , shardId.wrap( "_id" ) );
- assert( x["shard"].type() );
- myOldShard = x["shard"].String();
-
- if ( myOldShard != fromShard.getName() ){
- errmsg = "i'm out of date";
+ BSONObj currChunk = conn->findOne( ShardNS::chunk , shardId.wrap( "_id" ) );
+ assert( currChunk["shard"].type() );
+ assert( currChunk["min"].type() );
+ assert( currChunk["max"].type() );
+ myOldShard = currChunk["shard"].String();
+ conn.done();
+
+ BSONObj currMin = currChunk["min"].Obj();
+ BSONObj currMax = currChunk["max"].Obj();
+ if ( currMin.woCompare( min ) || currMax.woCompare( max ) ) {
+ errmsg = "boundaries are outdated (likely a split occurred)";
+ result.append( "currMin" , currMin );
+ result.append( "currMax" , currMax );
+ result.append( "requestedMin" , min );
+ result.append( "requestedMax" , max );
+
+ log( LL_WARNING ) << "aborted moveChunk because" << errmsg << ": " << min << "->" << max
+ << " is now " << currMin << "->" << currMax << endl;
+ return false;
+ }
+
+ if ( myOldShard != fromShard.getName() ) {
+ errmsg = "location is outdated (likely balance or migrate occurred)";
result.append( "from" , fromShard.getName() );
result.append( "official" , myOldShard );
+
+ log( LL_WARNING ) << "aborted moveChunk because " << errmsg << ": chunk is at " << myOldShard
+ << " and not at " << fromShard.getName() << endl;
return false;
}
-
- if ( maxVersion < shardingState.getVersion( ns ) ){
- errmsg = "official version less than mine?";;
+
+ if ( maxVersion < shardingState.getVersion( ns ) ) {
+ errmsg = "official version less than mine?";
result.appendTimestamp( "officialVersion" , maxVersion );
result.appendTimestamp( "myVersion" , shardingState.getVersion( ns ) );
+
+ log( LL_WARNING ) << "aborted moveChunk because " << errmsg << ": official " << maxVersion
+ << " mine: " << shardingState.getVersion(ns) << endl;
return false;
}
- conn.done();
+ // since this could be the first call that enable sharding we also make sure to have the chunk manager up to date
+ shardingState.gotShardName( myOldShard );
+ ShardChunkVersion shardVersion;
+ shardingState.trySetVersion( ns , shardVersion /* will return updated */ );
+
+ log() << "moveChunk request accepted at version " << shardVersion << endl;
}
-
+
timing.done(2);
-
+
// 3.
MigrateStatusHolder statusHolder( ns , min , max );
{
- dblock lk;
- // this makes sure there wasn't a write inside the .cpp code we can miss
- }
-
- {
-
- ScopedDbConnection conn( to );
- BSONObj res;
- bool ok = conn->runCommand( "admin" ,
- BSON( "_recvChunkStart" << ns <<
- "from" << from <<
- "min" << min <<
- "max" << max <<
- "configServer" << configServer.modelServer()
- ) ,
- res );
- conn.done();
+ // this gets a read lock, so we know we have a checkpoint for mods
+ if ( ! migrateFromStatus.storeCurrentLocs( maxChunkSize , errmsg , result ) )
+ return false;
- if ( ! ok ){
- errmsg = "_recvChunkStart failed: ";
+ ScopedDbConnection connTo( to );
+ BSONObj res;
+ bool ok = connTo->runCommand( "admin" ,
+ BSON( "_recvChunkStart" << ns <<
+ "from" << from <<
+ "min" << min <<
+ "max" << max <<
+ "configServer" << configServer.modelServer()
+ ) ,
+ res );
+ connTo.done();
+
+ if ( ! ok ) {
+ errmsg = "moveChunk failed to engage TO-shard in the data transfer: ";
assert( res["errmsg"].type() );
errmsg += res["errmsg"].String();
result.append( "cause" , res );
@@ -529,118 +794,275 @@ namespace mongo {
}
timing.done( 3 );
-
- // 4.
- for ( int i=0; i<86400; i++ ){ // don't want a single chunk move to take more than a day
+
+ // 4.
+ for ( int i=0; i<86400; i++ ) { // don't want a single chunk move to take more than a day
assert( dbMutex.getState() == 0 );
- sleepsecs( 1 );
+ sleepsecs( 1 );
ScopedDbConnection conn( to );
BSONObj res;
bool ok = conn->runCommand( "admin" , BSON( "_recvChunkStatus" << 1 ) , res );
res = res.getOwned();
conn.done();
-
- log(0) << "_recvChunkStatus : " << res << endl;
-
- if ( ! ok || res["state"].String() == "fail" ){
- log( LL_ERROR ) << "_recvChunkStatus error : " << res << endl;
- errmsg = "_recvChunkStatus error";
- result.append( "cause" ,res );
+
+ log(0) << "moveChunk data transfer progress: " << res << " my mem used: " << migrateFromStatus.mbUsed() << endl;
+
+ if ( ! ok || res["state"].String() == "fail" ) {
+ log( LL_WARNING ) << "moveChunk error transfering data caused migration abort: " << res << endl;
+ errmsg = "data transfer error";
+ result.append( "cause" , res );
return false;
}
if ( res["state"].String() == "steady" )
break;
+ if ( migrateFromStatus.mbUsed() > (500 * 1024 * 1024) ) {
+ // this is too much memory for us to use for this
+ // so we're going to abort the migrate
+ ScopedDbConnection conn( to );
+ BSONObj res;
+ conn->runCommand( "admin" , BSON( "_recvChunkAbort" << 1 ) , res );
+ res = res.getOwned();
+ conn.done();
+ error() << "aborting migrate because too much memory used res: " << res << endl;
+ errmsg = "aborting migrate because too much memory used";
+ result.appendBool( "split" , true );
+ return false;
+ }
+
killCurrentOp.checkForInterrupt();
}
timing.done(4);
// 5.
- {
+ {
// 5.a
- migrateFromStatus._inCriticalSection = true;
- ShardChunkVersion myVersion = maxVersion;
+ // we're under the collection lock here, so no other migrate can change maxVersion or ShardChunkManager state
+ migrateFromStatus.setInCriticalSection( true );
+ ShardChunkVersion currVersion = maxVersion;
+ ShardChunkVersion myVersion = currVersion;
myVersion.incMajor();
-
+
{
- dblock lk;
+ writelock lk( ns );
assert( myVersion > shardingState.getVersion( ns ) );
- shardingState.setVersion( ns , myVersion );
- assert( myVersion == shardingState.getVersion( ns ) );
- log() << "moveChunk locking myself to: " << myVersion << endl;
+
+ // bump the chunks manager's version up and "forget" about the chunk being moved
+ // this is not the commit point but in practice the state in this shard won't until the commit it done
+ shardingState.donateChunk( ns , min , max , myVersion );
}
-
+ log() << "moveChunk setting version to: " << myVersion << endl;
+
// 5.b
+ // we're under the collection lock here, too, so we can undo the chunk donation because no other state change
+ // could be ongoing
{
BSONObj res;
- ScopedDbConnection conn( to );
- bool ok = conn->runCommand( "admin" ,
- BSON( "_recvChunkCommit" << 1 ) ,
- res );
- conn.done();
- log() << "moveChunk commit result: " << res << endl;
- if ( ! ok ){
- log() << "_recvChunkCommit failed: " << res << endl;
+ ScopedDbConnection connTo( to );
+ bool ok = connTo->runCommand( "admin" ,
+ BSON( "_recvChunkCommit" << 1 ) ,
+ res );
+ connTo.done();
+
+ if ( ! ok ) {
+ {
+ writelock lk( ns );
+
+ // revert the chunk manager back to the state before "forgetting" about the chunk
+ shardingState.undoDonateChunk( ns , min , max , currVersion );
+ }
+
+ log() << "movChunk migrate commit not accepted by TO-shard: " << res
+ << " resetting shard version to: " << currVersion << endl;
+
errmsg = "_recvChunkCommit failed!";
result.append( "cause" , res );
return false;
}
+
+ log() << "moveChunk migrate commit accepted by TO-shard: " << res << endl;
}
-
+
// 5.c
- ScopedDbConnection conn( shardingState.getConfigServer() );
-
- BSONObjBuilder temp;
- temp.append( "shard" , toShard.getName() );
- temp.appendTimestamp( "lastmod" , myVersion );
-
- conn->update( ShardNS::chunk , shardId.wrap( "_id" ) , BSON( "$set" << temp.obj() ) );
-
- {
- // update another random chunk
- BSONObj x = conn->findOne( ShardNS::chunk , Query( BSON( "ns" << ns << "shard" << myOldShard ) ).sort( BSON( "lastmod" << -1 ) ) );
- if ( ! x.isEmpty() ){
-
- BSONObjBuilder temp2;
- myVersion.incMinor();
-
- temp2.appendTimestamp( "lastmod" , myVersion );
-
- shardingState.setVersion( ns , myVersion );
-
- conn->update( ShardNS::chunk , x["_id"].wrap() , BSON( "$set" << temp2.obj() ) );
-
- log() << "moveChunk updating self to: " << myVersion << endl;
+
+ // version at which the next highest lastmod will be set
+ // if the chunk being moved is the last in the shard, nextVersion is that chunk's lastmod
+ // otherwise the highest version is from the chunk being bumped on the FROM-shard
+ ShardChunkVersion nextVersion;
+
+ // we want to go only once to the configDB but perhaps change two chunks, the one being migrated and another
+ // local one (so to bump version for the entire shard)
+ // we use the 'applyOps' mechanism to group the two updates and make them safer
+ // TODO pull config update code to a module
+
+ BSONObjBuilder cmdBuilder;
+
+ BSONArrayBuilder updates( cmdBuilder.subarrayStart( "applyOps" ) );
+ {
+ // update for the chunk being moved
+ BSONObjBuilder op;
+ op.append( "op" , "u" );
+ op.appendBool( "b" , false /* no upserting */ );
+ op.append( "ns" , ShardNS::chunk );
+
+ BSONObjBuilder n( op.subobjStart( "o" ) );
+ n.append( "_id" , Chunk::genID( ns , min ) );
+ n.appendTimestamp( "lastmod" , myVersion /* same as used on donateChunk */ );
+ n.append( "ns" , ns );
+ n.append( "min" , min );
+ n.append( "max" , max );
+ n.append( "shard" , toShard.getName() );
+ n.done();
+
+ BSONObjBuilder q( op.subobjStart( "o2" ) );
+ q.append( "_id" , Chunk::genID( ns , min ) );
+ q.done();
+
+ updates.append( op.obj() );
+ }
+
+ nextVersion = myVersion;
+
+ // if we have chunks left on the FROM shard, update the version of one of them as well
+ // we can figure that out by grabbing the chunkManager installed on 5.a
+ // TODO expose that manager when installing it
+
+ ShardChunkManagerPtr chunkManager = shardingState.getShardChunkManager( ns );
+ if( chunkManager->getNumChunks() > 0 ) {
+
+ // get another chunk on that shard
+ BSONObj lookupKey;
+ BSONObj bumpMin, bumpMax;
+ do {
+ chunkManager->getNextChunk( lookupKey , &bumpMin , &bumpMax );
+ lookupKey = bumpMin;
+ }
+ while( bumpMin == min );
+
+ BSONObjBuilder op;
+ op.append( "op" , "u" );
+ op.appendBool( "b" , false );
+ op.append( "ns" , ShardNS::chunk );
+
+ nextVersion.incMinor(); // same as used on donateChunk
+ BSONObjBuilder n( op.subobjStart( "o" ) );
+ n.append( "_id" , Chunk::genID( ns , bumpMin ) );
+ n.appendTimestamp( "lastmod" , nextVersion );
+ n.append( "ns" , ns );
+ n.append( "min" , bumpMin );
+ n.append( "max" , bumpMax );
+ n.append( "shard" , fromShard.getName() );
+ n.done();
+
+ BSONObjBuilder q( op.subobjStart( "o2" ) );
+ q.append( "_id" , Chunk::genID( ns , bumpMin ) );
+ q.done();
+
+ updates.append( op.obj() );
+
+ log() << "moveChunk updating self version to: " << nextVersion << " through "
+ << bumpMin << " -> " << bumpMax << " for collection '" << ns << "'" << endl;
+
+ }
+ else {
+
+ log() << "moveChunk moved last chunk out for collection '" << ns << "'" << endl;
+ }
+
+ updates.done();
+
+ BSONArrayBuilder preCond( cmdBuilder.subarrayStart( "preCondition" ) );
+ {
+ BSONObjBuilder b;
+ b.append( "ns" , ShardNS::chunk );
+ b.append( "q" , BSON( "query" << BSON( "ns" << ns ) << "orderby" << BSON( "lastmod" << -1 ) ) );
+ {
+ BSONObjBuilder bb( b.subobjStart( "res" ) );
+ bb.appendTimestamp( "lastmod" , maxVersion );
+ bb.done();
}
- else {
- //++myVersion;
- shardingState.setVersion( ns , 0 );
+ preCond.append( b.obj() );
+ }
+
+ preCond.done();
+
+ BSONObj cmd = cmdBuilder.obj();
+ log(7) << "moveChunk update: " << cmd << endl;
+
+ bool ok = false;
+ BSONObj cmdResult;
+ try {
+ ScopedDbConnection conn( shardingState.getConfigServer() );
+ ok = conn->runCommand( "config" , cmd , cmdResult );
+ conn.done();
+ }
+ catch ( DBException& e ) {
+ ok = false;
+ BSONObjBuilder b;
+ e.getInfo().append( b );
+ cmdResult = b.obj();
+ }
+
+ if ( ! ok ) {
+
+ // this could be a blip in the connectivity
+ // wait out a few seconds and check if the commit request made it
+ //
+ // if the commit made it to the config, we'll see the chunk in the new shard and there's no action
+ // if the commit did not make it, currently the only way to fix this state is to bounce the mongod so
+ // that the old state (before migrating) be brought in
+
+ warning() << "moveChunk commit outcome ongoing: " << cmd << " for command :" << cmdResult << endl;
+ sleepsecs( 10 );
+
+ try {
+ ScopedDbConnection conn( shardingState.getConfigServer() );
+
+ // look for the chunk in this shard whose version got bumped
+ // we assume that if that mod made it to the config, the applyOps was successful
+ BSONObj doc = conn->findOne( ShardNS::chunk , Query(BSON( "ns" << ns )).sort( BSON("lastmod" << -1)));
+ ShardChunkVersion checkVersion = doc["lastmod"];
+
+ if ( checkVersion == nextVersion ) {
+ log() << "moveChunk commit confirmed" << endl;
+
+ }
+ else {
+ error() << "moveChunk commit failed: version is at"
+ << checkVersion << " instead of " << nextVersion << endl;
+ error() << "TERMINATING" << endl;
+ dbexit( EXIT_SHARDING_ERROR );
+ }
+
+ conn.done();
- log() << "moveChunk now i'm empty" << endl;
+ }
+ catch ( ... ) {
+ error() << "moveChunk failed to get confirmation of commit" << endl;
+ error() << "TERMINATING" << endl;
+ dbexit( EXIT_SHARDING_ERROR );
}
}
- conn.done();
- migrateFromStatus._inCriticalSection = false;
+ migrateFromStatus.setInCriticalSection( false );
+
// 5.d
- configServer.logChange( "moveChunk" , ns , BSON( "min" << min << "max" << max <<
- "from" << fromShard.getName() <<
- "to" << toShard.getName() ) );
+ configServer.logChange( "moveChunk.commit" , ns , chunkInfo );
}
-
+
migrateFromStatus.done();
timing.done(5);
-
- { // 6.
+ {
+ // 6.
OldDataCleanup c;
c.ns = ns;
c.min = min.getOwned();
c.max = max.getOwned();
ClientCursor::find( ns , c.initial );
- if ( c.initial.size() ){
+ if ( c.initial.size() ) {
log() << "forking for cleaning up chunk data" << endl;
boost::thread t( boost::bind( &cleanupOldData , c ) );
}
@@ -649,24 +1071,24 @@ namespace mongo {
// 7.
c.doRemove();
}
-
-
+
+
}
- timing.done(6);
+ timing.done(6);
return true;
-
+
}
-
+
} moveChunkCmd;
- bool ShardingState::inCriticalMigrateSection(){
- return migrateFromStatus._inCriticalSection;
+ bool ShardingState::inCriticalMigrateSection() {
+ return migrateFromStatus.getInCriticalSection();
}
/* -----
below this are the "to" side commands
-
+
command to initiate
worker thread
does initial clone
@@ -679,71 +1101,74 @@ namespace mongo {
class MigrateStatus {
public:
-
- MigrateStatus(){
- active = false;
- }
- void prepare(){
+ MigrateStatus() : m_active("MigrateStatus") { active = false; }
+
+ void prepare() {
+ scoped_lock l(m_active); // reading and writing 'active'
+
assert( ! active );
state = READY;
errmsg = "";
numCloned = 0;
+ clonedBytes = 0;
numCatchup = 0;
numSteady = 0;
active = true;
}
- void go(){
+ void go() {
try {
_go();
}
- catch ( std::exception& e ){
+ catch ( std::exception& e ) {
state = FAIL;
errmsg = e.what();
log( LL_ERROR ) << "migrate failed: " << e.what() << endl;
}
- catch ( ... ){
+ catch ( ... ) {
state = FAIL;
errmsg = "UNKNOWN ERROR";
log( LL_ERROR ) << "migrate failed with unknown exception" << endl;
}
- active = false;
+ setActive( false );
}
-
- void _go(){
- assert( active );
+
+ void _go() {
+ assert( getActive() );
assert( state == READY );
assert( ! min.isEmpty() );
assert( ! max.isEmpty() );
-
- MoveTimingHelper timing( "to" , ns , min , max );
-
+
+ MoveTimingHelper timing( "to" , ns , min , max , 5 /* steps */ );
+
ScopedDbConnection conn( from );
conn->getLastError(); // just test connection
- { // 1. copy indexes
+ {
+ // 1. copy indexes
auto_ptr<DBClientCursor> indexes = conn->getIndexes( ns );
vector<BSONObj> all;
- while ( indexes->more() ){
+ while ( indexes->more() ) {
all.push_back( indexes->next().getOwned() );
}
-
+
writelock lk( ns );
Client::Context ct( ns );
-
+
string system_indexes = cc().database()->name + ".system.indexes";
- for ( unsigned i=0; i<all.size(); i++ ){
+ for ( unsigned i=0; i<all.size(); i++ ) {
BSONObj idx = all[i];
- theDataFileMgr.insert( system_indexes.c_str() , idx.objdata() , idx.objsize() );
+ theDataFileMgr.insertAndLog( system_indexes.c_str() , idx );
}
-
+
timing.done(1);
}
-
- { // 2. delete any data already in range
+
+ {
+ // 2. delete any data already in range
writelock lk( ns );
RemoveSaver rs( "moveChunk" , ns , "preCleanup" );
long long num = Helpers::removeRange( ns , min , max , true , false , cmdLine.moveParanoia ? &rs : 0 );
@@ -752,29 +1177,54 @@ namespace mongo {
timing.done(2);
}
-
-
- { // 3. initial bulk clone
+
+
+ {
+ // 3. initial bulk clone
state = CLONE;
- auto_ptr<DBClientCursor> cursor = conn->query( ns , Query().minKey( min ).maxKey( max ) , /* QueryOption_Exhaust */ 0 );
- assert( cursor.get() );
- while ( cursor->more() ){
- BSONObj o = cursor->next().getOwned();
- {
- writelock lk( ns );
- Helpers::upsert( ns , o );
+
+ while ( true ) {
+ BSONObj res;
+ if ( ! conn->runCommand( "admin" , BSON( "_migrateClone" << 1 ) , res ) ) {
+ state = FAIL;
+ errmsg = "_migrateClone failed: ";
+ errmsg += res.toString();
+ error() << errmsg << endl;
+ conn.done();
+ return;
+ }
+
+ BSONObj arr = res["objects"].Obj();
+ int thisTime = 0;
+
+ BSONObjIterator i( arr );
+ while( i.more() ) {
+ BSONObj o = i.next().Obj();
+ {
+ writelock lk( ns );
+ Helpers::upsert( ns , o );
+ }
+ thisTime++;
+ numCloned++;
+ clonedBytes += o.objsize();
}
- numCloned++;
+
+ if ( thisTime == 0 )
+ break;
}
timing.done(3);
}
-
- { // 4. do bulk of mods
+
+ // if running on a replicated system, we'll need to flush the docs we cloned to the secondaries
+ ReplTime lastOpApplied;
+
+ {
+ // 4. do bulk of mods
state = CATCHUP;
- while ( true ){
+ while ( true ) {
BSONObj res;
- if ( ! conn->runCommand( "admin" , BSON( "_transferMods" << 1 ) , res ) ){
+ if ( ! conn->runCommand( "admin" , BSON( "_transferMods" << 1 ) , res ) ) {
state = FAIL;
errmsg = "_transferMods failed: ";
errmsg += res.toString();
@@ -784,18 +1234,26 @@ namespace mongo {
}
if ( res["size"].number() == 0 )
break;
-
- apply( res );
+
+ apply( res , &lastOpApplied );
+
+ if ( state == ABORT ) {
+ timing.note( "aborted" );
+ return;
+ }
}
timing.done(4);
}
-
- { // 5. wait for commit
+
+ {
+ // 5. wait for commit
+ Timer timeWaitingForCommit;
+
state = STEADY;
- while ( state == STEADY || state == COMMIT_START ){
+ while ( state == STEADY || state == COMMIT_START ) {
BSONObj res;
- if ( ! conn->runCommand( "admin" , BSON( "_transferMods" << 1 ) , res ) ){
+ if ( ! conn->runCommand( "admin" , BSON( "_transferMods" << 1 ) , res ) ) {
log() << "_transferMods failed in STEADY state: " << res << endl;
errmsg = res.toString();
state = FAIL;
@@ -803,36 +1261,48 @@ namespace mongo {
return;
}
- if ( res["size"].number() > 0 && apply( res ) )
+ if ( res["size"].number() > 0 && apply( res , &lastOpApplied ) )
continue;
-
- if ( state == COMMIT_START )
+
+ if ( state == COMMIT_START && flushPendingWrites( lastOpApplied ) )
break;
sleepmillis( 10 );
}
-
+
+ if ( state == ABORT ) {
+ timing.note( "aborted" );
+ return;
+ }
+
+ if ( timeWaitingForCommit.seconds() > 86400 ) {
+ state = FAIL;
+ errmsg = "timed out waiting for commit";
+ return;
+ }
+
timing.done(5);
}
-
+
state = DONE;
conn.done();
}
- void status( BSONObjBuilder& b ){
- b.appendBool( "active" , active );
+ void status( BSONObjBuilder& b ) {
+ b.appendBool( "active" , getActive() );
b.append( "ns" , ns );
b.append( "from" , from );
b.append( "min" , min );
b.append( "max" , max );
-
+
b.append( "state" , stateString() );
if ( state == FAIL )
b.append( "errmsg" , errmsg );
{
BSONObjBuilder bb( b.subobjStart( "counts" ) );
bb.append( "cloned" , numCloned );
+ bb.append( "clonedBytes" , clonedBytes );
bb.append( "catchup" , numCatchup );
bb.append( "steady" , numSteady );
bb.done();
@@ -841,17 +1311,22 @@ namespace mongo {
}
- bool apply( const BSONObj& xfer ){
+ bool apply( const BSONObj& xfer , ReplTime* lastOpApplied ) {
+ ReplTime dummy;
+ if ( lastOpApplied == NULL ) {
+ lastOpApplied = &dummy;
+ }
+
bool didAnything = false;
-
- if ( xfer["deleted"].isABSONObj() ){
+
+ if ( xfer["deleted"].isABSONObj() ) {
writelock lk(ns);
Client::Context cx(ns);
-
+
RemoveSaver rs( "moveChunk" , ns , "removedDuring" );
BSONObjIterator i( xfer["deleted"].Obj() );
- while ( i.more() ){
+ while ( i.more() ) {
BSONObj id = i.next().Obj();
// do not apply deletes if they do not belong to the chunk being migrated
@@ -865,27 +1340,56 @@ namespace mongo {
}
Helpers::removeRange( ns , id , id, false , true , cmdLine.moveParanoia ? &rs : 0 );
+
+ *lastOpApplied = cx.getClient()->getLastOp();
didAnything = true;
}
}
-
- if ( xfer["reload"].isABSONObj() ){
+
+ if ( xfer["reload"].isABSONObj() ) {
writelock lk(ns);
Client::Context cx(ns);
BSONObjIterator i( xfer["reload"].Obj() );
- while ( i.more() ){
+ while ( i.more() ) {
BSONObj it = i.next().Obj();
+
Helpers::upsert( ns , it );
+
+ *lastOpApplied = cx.getClient()->getLastOp();
didAnything = true;
}
}
return didAnything;
}
-
- string stateString(){
- switch ( state ){
+
+ bool flushPendingWrites( const ReplTime& lastOpApplied ) {
+ // if replication is on, try to force enough secondaries to catch up
+ // TODO opReplicatedEnough should eventually honor priorities and geo-awareness
+ // for now, we try to replicate to a sensible number of secondaries
+ const int slaveCount = getSlaveCount() / 2 + 1;
+ if ( ! opReplicatedEnough( lastOpApplied , slaveCount ) ) {
+ log( LL_WARNING ) << "migrate commit attempt timed out contacting " << slaveCount
+ << " slaves for '" << ns << "' " << min << " -> " << max << endl;
+ return false;
+ }
+ log() << "migrate commit succeeded flushing to secondaries for '" << ns << "' " << min << " -> " << max << endl;
+
+ {
+ readlock lk(ns); // commitNow() currently requires it
+
+ // if durability is on, force a write to journal
+ if ( getDur().commitNow() ) {
+ log() << "migrate commit flushed to journal for '" << ns << "' " << min << " -> " << max << endl;
+ }
+ }
+
+ return true;
+ }
+
+ string stateString() {
+ switch ( state ) {
case READY: return "ready";
case CLONE: return "clone";
case CATCHUP: return "catchup";
@@ -893,17 +1397,18 @@ namespace mongo {
case COMMIT_START: return "commitStart";
case DONE: return "done";
case FAIL: return "fail";
+ case ABORT: return "abort";
}
assert(0);
return "";
}
- bool startCommit(){
+ bool startCommit() {
if ( state != STEADY )
return false;
state = COMMIT_START;
-
- for ( int i=0; i<86400; i++ ){
+
+ for ( int i=0; i<86400; i++ ) {
sleepmillis(1);
if ( state == DONE )
return true;
@@ -912,42 +1417,60 @@ namespace mongo {
return false;
}
+ void abort() {
+ state = ABORT;
+ errmsg = "aborted";
+ }
+
+ bool getActive() const { scoped_lock l(m_active); return active; }
+ void setActive( bool b ) { scoped_lock l(m_active); active = b; }
+
+ mutable mongo::mutex m_active;
bool active;
-
+
string ns;
string from;
-
+
BSONObj min;
BSONObj max;
-
+
long long numCloned;
+ long long clonedBytes;
long long numCatchup;
long long numSteady;
- enum State { READY , CLONE , CATCHUP , STEADY , COMMIT_START , DONE , FAIL } state;
+ enum State { READY , CLONE , CATCHUP , STEADY , COMMIT_START , DONE , FAIL , ABORT } state;
string errmsg;
-
+
} migrateStatus;
-
- void migrateThread(){
+
+ void migrateThread() {
Client::initThread( "migrateThread" );
migrateStatus.go();
cc().shutdown();
}
-
+
class RecvChunkStartCommand : public ChunkCommandHelper {
public:
- RecvChunkStartCommand() : ChunkCommandHelper( "_recvChunkStart" ){}
+ RecvChunkStartCommand() : ChunkCommandHelper( "_recvChunkStart" ) {}
virtual LockType locktype() const { return WRITE; } // this is so don't have to do locking internally
- bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){
-
- if ( migrateStatus.active ){
+ bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool) {
+
+ if ( migrateStatus.getActive() ) {
errmsg = "migrate already in progress";
return false;
}
+ if ( OldDataCleanup::_numThreads > 0 ) {
+ errmsg =
+ str::stream()
+ << "still waiting for a previous migrates data to get cleaned, can't accept new chunks, num threads: "
+ << OldDataCleanup::_numThreads;
+ return false;
+ }
+
if ( ! configServer.ok() )
configServer.init( cmdObj["configServer"].String() );
@@ -957,9 +1480,9 @@ namespace mongo {
migrateStatus.from = cmdObj["from"].String();
migrateStatus.min = cmdObj["min"].Obj().getOwned();
migrateStatus.max = cmdObj["max"].Obj().getOwned();
-
+
boost::thread m( migrateThread );
-
+
result.appendBool( "started" , true );
return true;
}
@@ -968,20 +1491,20 @@ namespace mongo {
class RecvChunkStatusCommand : public ChunkCommandHelper {
public:
- RecvChunkStatusCommand() : ChunkCommandHelper( "_recvChunkStatus" ){}
+ RecvChunkStatusCommand() : ChunkCommandHelper( "_recvChunkStatus" ) {}
- bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){
+ bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool) {
migrateStatus.status( result );
return 1;
}
-
+
} recvChunkStatusCommand;
class RecvChunkCommitCommand : public ChunkCommandHelper {
public:
- RecvChunkCommitCommand() : ChunkCommandHelper( "_recvChunkCommit" ){}
-
- bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){
+ RecvChunkCommitCommand() : ChunkCommandHelper( "_recvChunkCommit" ) {}
+
+ bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool) {
bool ok = migrateStatus.startCommit();
migrateStatus.status( result );
return ok;
@@ -989,10 +1512,22 @@ namespace mongo {
} recvChunkCommitCommand;
+ class RecvChunkAbortCommand : public ChunkCommandHelper {
+ public:
+ RecvChunkAbortCommand() : ChunkCommandHelper( "_recvChunkAbort" ) {}
+
+ bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool) {
+ migrateStatus.abort();
+ migrateStatus.status( result );
+ return true;
+ }
+
+ } recvChunkAboortCommand;
+
class IsInRangeTest : public UnitTest {
public:
- void run(){
+ void run() {
BSONObj min = BSON( "x" << 1 );
BSONObj max = BSON( "x" << 5 );
@@ -1002,6 +1537,8 @@ namespace mongo {
assert( isInRange( BSON( "x" << 4 ) , min , max ) );
assert( ! isInRange( BSON( "x" << 5 ) , min , max ) );
assert( ! isInRange( BSON( "x" << 6 ) , min , max ) );
+
+ log(1) << "isInRangeTest passed" << endl;
}
} isInRangeTest;
}