summaryrefslogtreecommitdiff
path: root/s/d_migrate.cpp
diff options
context:
space:
mode:
Diffstat (limited to 's/d_migrate.cpp')
-rw-r--r--s/d_migrate.cpp123
1 files changed, 94 insertions, 29 deletions
diff --git a/s/d_migrate.cpp b/s/d_migrate.cpp
index 2878276..df12e54 100644
--- a/s/d_migrate.cpp
+++ b/s/d_migrate.cpp
@@ -273,9 +273,12 @@ namespace mongo {
void done() {
readlock lk( _ns );
- _deleted.clear();
- _reload.clear();
- _cloneLocs.clear();
+ {
+ scoped_spinlock lk( _trackerLocks );
+ _deleted.clear();
+ _reload.clear();
+ _cloneLocs.clear();
+ }
_memoryUsed = 0;
scoped_lock l(_m);
@@ -454,6 +457,7 @@ namespace mongo {
while ( cc->ok() ) {
DiskLoc dl = cc->currLoc();
if ( ! isLargeChunk ) {
+ scoped_spinlock lk( _trackerLocks );
_cloneLocs.insert( dl );
}
cc->advance();
@@ -480,7 +484,10 @@ namespace mongo {
return false;
}
- log() << "moveChunk number of documents: " << _cloneLocs.size() << endl;
+ {
+ scoped_spinlock lk( _trackerLocks );
+ log() << "moveChunk number of documents: " << _cloneLocs.size() << endl;
+ }
return true;
}
@@ -490,29 +497,50 @@ namespace mongo {
return false;
}
- readlock l( _ns );
- Client::Context ctx( _ns );
+ ElapsedTracker tracker (128, 10); // same as ClientCursor::_yieldSometimesTracker
- NamespaceDetails *d = nsdetails( _ns.c_str() );
- assert( d );
+ int allocSize;
+ {
+ readlock l(_ns);
+ Client::Context ctx( _ns );
+ NamespaceDetails *d = nsdetails( _ns.c_str() );
+ assert( d );
+ scoped_spinlock lk( _trackerLocks );
+ allocSize = std::min(BSONObjMaxUserSize, (int)((12 + d->averageObjectSize()) * _cloneLocs.size()));
+ }
+ BSONArrayBuilder a (allocSize);
+
+ while ( 1 ) {
+ bool filledBuffer = false;
+
+ readlock l( _ns );
+ Client::Context ctx( _ns );
+ scoped_spinlock lk( _trackerLocks );
+ set<DiskLoc>::iterator i = _cloneLocs.begin();
+ for ( ; i!=_cloneLocs.end(); ++i ) {
+ if (tracker.ping()) // should I yield?
+ break;
+
+ DiskLoc dl = *i;
+ BSONObj o = dl.obj();
+
+ // use the builder size instead of accumulating 'o's size so that we take into consideration
+ // the overhead of BSONArray indices
+ if ( a.len() + o.objsize() + 1024 > BSONObjMaxUserSize ) {
+ filledBuffer = true; // break out of outer while loop
+ break;
+ }
- BSONArrayBuilder a( std::min( BSONObjMaxUserSize , (int)( ( 12 + d->averageObjectSize() )* _cloneLocs.size() ) ) );
+ a.append( o );
+ }
- set<DiskLoc>::iterator i = _cloneLocs.begin();
- for ( ; i!=_cloneLocs.end(); ++i ) {
- DiskLoc dl = *i;
- BSONObj o = dl.obj();
+ _cloneLocs.erase( _cloneLocs.begin() , i );
- // use the builder size instead of accumulating 'o's size so that we take into consideration
- // the overhead of BSONArray indices
- if ( a.len() + o.objsize() + 1024 > BSONObjMaxUserSize ) {
+ if ( _cloneLocs.empty() || filledBuffer )
break;
- }
- a.append( o );
}
result.appendArray( "objects" , a.arr() );
- _cloneLocs.erase( _cloneLocs.begin() , i );
return true;
}
@@ -525,6 +553,11 @@ namespace mongo {
if ( ! db->ownsNS( _ns ) )
return;
+
+ // not needed right now
+ // but trying to prevent a future bug
+ scoped_spinlock lk( _trackerLocks );
+
_cloneLocs.erase( dl );
}
@@ -544,9 +577,13 @@ namespace mongo {
BSONObj _min;
BSONObj _max;
+ // we need the lock in case there is a malicious _migrateClone for example
+ // even though it shouldn't be needed under normal operation
+ SpinLock _trackerLocks;
+
// disk locs yet to be transferred from here to the other side
- // no locking needed because build by 1 thread in a read lock
- // depleted by 1 thread in a read lock
+ // no locking needed because built initially by 1 thread in a read lock
+ // emptied by 1 thread in a read lock
// updates applied by 1 thread in a write lock
set<DiskLoc> _cloneLocs;
@@ -1141,6 +1178,8 @@ namespace mongo {
assert( state == READY );
assert( ! min.isEmpty() );
assert( ! max.isEmpty() );
+
+ slaveCount = ( getSlaveCount() / 2 ) + 1;
MoveTimingHelper timing( "to" , ns , min , max , 5 /* steps */ );
@@ -1236,11 +1275,32 @@ namespace mongo {
break;
apply( res , &lastOpApplied );
+
+ const int maxIterations = 3600*50;
+ int i;
+ for ( i=0;i<maxIterations; i++) {
+ if ( state == ABORT ) {
+ timing.note( "aborted" );
+ return;
+ }
+
+ if ( opReplicatedEnough( lastOpApplied ) )
+ break;
+
+ if ( i > 100 ) {
+ warning() << "secondaries having hard time keeping up with migrate" << endl;
+ }
- if ( state == ABORT ) {
- timing.note( "aborted" );
- return;
+ sleepmillis( 20 );
}
+
+ if ( i == maxIterations ) {
+ errmsg = "secondary can't keep up with migrate";
+ error() << errmsg << endl;
+ conn.done();
+ state = FAIL;
+ return;
+ }
}
timing.done(4);
@@ -1364,14 +1424,17 @@ namespace mongo {
return didAnything;
}
- bool flushPendingWrites( const ReplTime& lastOpApplied ) {
+ bool opReplicatedEnough( const ReplTime& lastOpApplied ) {
// if replication is on, try to force enough secondaries to catch up
// TODO opReplicatedEnough should eventually honor priorities and geo-awareness
// for now, we try to replicate to a sensible number of secondaries
- const int slaveCount = getSlaveCount() / 2 + 1;
- if ( ! opReplicatedEnough( lastOpApplied , slaveCount ) ) {
- log( LL_WARNING ) << "migrate commit attempt timed out contacting " << slaveCount
- << " slaves for '" << ns << "' " << min << " -> " << max << endl;
+ return mongo::opReplicatedEnough( lastOpApplied , slaveCount );
+ }
+
+ bool flushPendingWrites( const ReplTime& lastOpApplied ) {
+ if ( ! opReplicatedEnough( lastOpApplied ) ) {
+ warning() << "migrate commit attempt timed out contacting " << slaveCount
+ << " slaves for '" << ns << "' " << min << " -> " << max << endl;
return false;
}
log() << "migrate commit succeeded flushing to secondaries for '" << ns << "' " << min << " -> " << max << endl;
@@ -1438,6 +1501,8 @@ namespace mongo {
long long clonedBytes;
long long numCatchup;
long long numSteady;
+
+ int slaveCount;
enum State { READY , CLONE , CATCHUP , STEADY , COMMIT_START , DONE , FAIL , ABORT } state;
string errmsg;