diff options
Diffstat (limited to 's/d_migrate.cpp')
-rw-r--r-- | s/d_migrate.cpp | 123 |
1 files changed, 94 insertions, 29 deletions
diff --git a/s/d_migrate.cpp b/s/d_migrate.cpp index 2878276..df12e54 100644 --- a/s/d_migrate.cpp +++ b/s/d_migrate.cpp @@ -273,9 +273,12 @@ namespace mongo { void done() { readlock lk( _ns ); - _deleted.clear(); - _reload.clear(); - _cloneLocs.clear(); + { + scoped_spinlock lk( _trackerLocks ); + _deleted.clear(); + _reload.clear(); + _cloneLocs.clear(); + } _memoryUsed = 0; scoped_lock l(_m); @@ -454,6 +457,7 @@ namespace mongo { while ( cc->ok() ) { DiskLoc dl = cc->currLoc(); if ( ! isLargeChunk ) { + scoped_spinlock lk( _trackerLocks ); _cloneLocs.insert( dl ); } cc->advance(); @@ -480,7 +484,10 @@ namespace mongo { return false; } - log() << "moveChunk number of documents: " << _cloneLocs.size() << endl; + { + scoped_spinlock lk( _trackerLocks ); + log() << "moveChunk number of documents: " << _cloneLocs.size() << endl; + } return true; } @@ -490,29 +497,50 @@ namespace mongo { return false; } - readlock l( _ns ); - Client::Context ctx( _ns ); + ElapsedTracker tracker (128, 10); // same as ClientCursor::_yieldSometimesTracker - NamespaceDetails *d = nsdetails( _ns.c_str() ); - assert( d ); + int allocSize; + { + readlock l(_ns); + Client::Context ctx( _ns ); + NamespaceDetails *d = nsdetails( _ns.c_str() ); + assert( d ); + scoped_spinlock lk( _trackerLocks ); + allocSize = std::min(BSONObjMaxUserSize, (int)((12 + d->averageObjectSize()) * _cloneLocs.size())); + } + BSONArrayBuilder a (allocSize); + + while ( 1 ) { + bool filledBuffer = false; + + readlock l( _ns ); + Client::Context ctx( _ns ); + scoped_spinlock lk( _trackerLocks ); + set<DiskLoc>::iterator i = _cloneLocs.begin(); + for ( ; i!=_cloneLocs.end(); ++i ) { + if (tracker.ping()) // should I yield? + break; + + DiskLoc dl = *i; + BSONObj o = dl.obj(); + + // use the builder size instead of accumulating 'o's size so that we take into consideration + // the overhead of BSONArray indices + if ( a.len() + o.objsize() + 1024 > BSONObjMaxUserSize ) { + filledBuffer = true; // break out of outer while loop + break; + } - BSONArrayBuilder a( std::min( BSONObjMaxUserSize , (int)( ( 12 + d->averageObjectSize() )* _cloneLocs.size() ) ) ); + a.append( o ); + } - set<DiskLoc>::iterator i = _cloneLocs.begin(); - for ( ; i!=_cloneLocs.end(); ++i ) { - DiskLoc dl = *i; - BSONObj o = dl.obj(); + _cloneLocs.erase( _cloneLocs.begin() , i ); - // use the builder size instead of accumulating 'o's size so that we take into consideration - // the overhead of BSONArray indices - if ( a.len() + o.objsize() + 1024 > BSONObjMaxUserSize ) { + if ( _cloneLocs.empty() || filledBuffer ) break; - } - a.append( o ); } result.appendArray( "objects" , a.arr() ); - _cloneLocs.erase( _cloneLocs.begin() , i ); return true; } @@ -525,6 +553,11 @@ namespace mongo { if ( ! db->ownsNS( _ns ) ) return; + + // not needed right now + // but trying to prevent a future bug + scoped_spinlock lk( _trackerLocks ); + _cloneLocs.erase( dl ); } @@ -544,9 +577,13 @@ namespace mongo { BSONObj _min; BSONObj _max; + // we need the lock in case there is a malicious _migrateClone for example + // even though it shouldn't be needed under normal operation + SpinLock _trackerLocks; + // disk locs yet to be transferred from here to the other side - // no locking needed because build by 1 thread in a read lock - // depleted by 1 thread in a read lock + // no locking needed because built initially by 1 thread in a read lock + // emptied by 1 thread in a read lock // updates applied by 1 thread in a write lock set<DiskLoc> _cloneLocs; @@ -1141,6 +1178,8 @@ namespace mongo { assert( state == READY ); assert( ! min.isEmpty() ); assert( ! max.isEmpty() ); + + slaveCount = ( getSlaveCount() / 2 ) + 1; MoveTimingHelper timing( "to" , ns , min , max , 5 /* steps */ ); @@ -1236,11 +1275,32 @@ namespace mongo { break; apply( res , &lastOpApplied ); + + const int maxIterations = 3600*50; + int i; + for ( i=0;i<maxIterations; i++) { + if ( state == ABORT ) { + timing.note( "aborted" ); + return; + } + + if ( opReplicatedEnough( lastOpApplied ) ) + break; + + if ( i > 100 ) { + warning() << "secondaries having hard time keeping up with migrate" << endl; + } - if ( state == ABORT ) { - timing.note( "aborted" ); - return; + sleepmillis( 20 ); } + + if ( i == maxIterations ) { + errmsg = "secondary can't keep up with migrate"; + error() << errmsg << endl; + conn.done(); + state = FAIL; + return; + } } timing.done(4); @@ -1364,14 +1424,17 @@ namespace mongo { return didAnything; } - bool flushPendingWrites( const ReplTime& lastOpApplied ) { + bool opReplicatedEnough( const ReplTime& lastOpApplied ) { // if replication is on, try to force enough secondaries to catch up // TODO opReplicatedEnough should eventually honor priorities and geo-awareness // for now, we try to replicate to a sensible number of secondaries - const int slaveCount = getSlaveCount() / 2 + 1; - if ( ! opReplicatedEnough( lastOpApplied , slaveCount ) ) { - log( LL_WARNING ) << "migrate commit attempt timed out contacting " << slaveCount - << " slaves for '" << ns << "' " << min << " -> " << max << endl; + return mongo::opReplicatedEnough( lastOpApplied , slaveCount ); + } + + bool flushPendingWrites( const ReplTime& lastOpApplied ) { + if ( ! opReplicatedEnough( lastOpApplied ) ) { + warning() << "migrate commit attempt timed out contacting " << slaveCount + << " slaves for '" << ns << "' " << min << " -> " << max << endl; return false; } log() << "migrate commit succeeded flushing to secondaries for '" << ns << "' " << min << " -> " << max << endl; @@ -1438,6 +1501,8 @@ namespace mongo { long long clonedBytes; long long numCatchup; long long numSteady; + + int slaveCount; enum State { READY , CLONE , CATCHUP , STEADY , COMMIT_START , DONE , FAIL , ABORT } state; string errmsg; |