summaryrefslogtreecommitdiff
path: root/s/d_migrate.cpp
diff options
context:
space:
mode:
Diffstat (limited to 's/d_migrate.cpp')
-rw-r--r--s/d_migrate.cpp126
1 files changed, 72 insertions, 54 deletions
diff --git a/s/d_migrate.cpp b/s/d_migrate.cpp
index df12e54..6f2607d 100644
--- a/s/d_migrate.cpp
+++ b/s/d_migrate.cpp
@@ -165,59 +165,6 @@ namespace mongo {
static const char * const cleanUpThreadName = "cleanupOldData";
- void _cleanupOldData( OldDataCleanup cleanup ) {
- Client::initThread( cleanUpThreadName );
- log() << " (start) waiting to cleanup " << cleanup.ns << " from " << cleanup.min << " -> " << cleanup.max << " # cursors:" << cleanup.initial.size() << endl;
-
- int loops = 0;
- Timer t;
- while ( t.seconds() < 900 ) { // 15 minutes
- assert( dbMutex.getState() == 0 );
- sleepmillis( 20 );
-
- set<CursorId> now;
- ClientCursor::find( cleanup.ns , now );
-
- set<CursorId> left;
- for ( set<CursorId>::iterator i=cleanup.initial.begin(); i!=cleanup.initial.end(); ++i ) {
- CursorId id = *i;
- if ( now.count(id) )
- left.insert( id );
- }
-
- if ( left.size() == 0 )
- break;
- cleanup.initial = left;
-
- if ( ( loops++ % 200 ) == 0 ) {
- log() << " (looping " << loops << ") waiting to cleanup " << cleanup.ns << " from " << cleanup.min << " -> " << cleanup.max << " # cursors:" << cleanup.initial.size() << endl;
-
- stringstream ss;
- for ( set<CursorId>::iterator i=cleanup.initial.begin(); i!=cleanup.initial.end(); ++i ) {
- CursorId id = *i;
- ss << id << " ";
- }
- log() << " cursors: " << ss.str() << endl;
- }
- }
-
- cleanup.doRemove();
-
- cc().shutdown();
- }
-
- void cleanupOldData( OldDataCleanup cleanup ) {
- try {
- _cleanupOldData( cleanup );
- }
- catch ( std::exception& e ) {
- log() << " error cleaning old data:" << e.what() << endl;
- }
- catch ( ... ) {
- log() << " unknown error cleaning old data" << endl;
- }
- }
-
class ChunkCommandHelper : public Command {
public:
ChunkCommandHelper( const char * name )
@@ -243,13 +190,14 @@ namespace mongo {
class MigrateFromStatus {
public:
- MigrateFromStatus() : _m("MigrateFromStatus") {
+ MigrateFromStatus() : _m("MigrateFromStatus") , _workLock( "MigrateFromStatus::WorkLock" ) {
_active = false;
_inCriticalSection = false;
_memoryUsed = 0;
}
void start( string ns , const BSONObj& min , const BSONObj& max ) {
+ scoped_lock lk( _workLock );
scoped_lock l(_m); // reads and writes _active
assert( ! _active );
@@ -568,6 +516,20 @@ namespace mongo {
bool isActive() const { return _getActive(); }
+
+ void doRemove( OldDataCleanup& cleanup ) {
+ while ( true ) {
+ {
+ scoped_lock lk( _workLock );
+ if ( ! _active ) {
+ cleanup.doRemove();
+ return;
+ }
+ }
+ sleepmillis( 100 );
+ }
+ }
+
private:
mutable mongo::mutex _m; // protect _inCriticalSection and _active
bool _inCriticalSection;
@@ -591,6 +553,9 @@ namespace mongo {
list<BSONObj> _deleted; // objects deleted during clone that should be deleted later
long long _memoryUsed; // bytes in _reload + _deleted
+ mutable mongo::mutex _workLock; // this is used to make sure only 1 thread is doing serious work
+ // for now, this means migrate or removing old chunk data
+
bool _getActive() const { scoped_lock l(_m); return _active; }
void _setActive( bool b ) { scoped_lock l(_m); _active = b; }
@@ -605,6 +570,59 @@ namespace mongo {
}
};
+ void _cleanupOldData( OldDataCleanup cleanup ) {
+ Client::initThread( cleanUpThreadName );
+ log() << " (start) waiting to cleanup " << cleanup.ns << " from " << cleanup.min << " -> " << cleanup.max << " # cursors:" << cleanup.initial.size() << endl;
+
+ int loops = 0;
+ Timer t;
+ while ( t.seconds() < 900 ) { // 15 minutes
+ assert( dbMutex.getState() == 0 );
+ sleepmillis( 20 );
+
+ set<CursorId> now;
+ ClientCursor::find( cleanup.ns , now );
+
+ set<CursorId> left;
+ for ( set<CursorId>::iterator i=cleanup.initial.begin(); i!=cleanup.initial.end(); ++i ) {
+ CursorId id = *i;
+ if ( now.count(id) )
+ left.insert( id );
+ }
+
+ if ( left.size() == 0 )
+ break;
+ cleanup.initial = left;
+
+ if ( ( loops++ % 200 ) == 0 ) {
+ log() << " (looping " << loops << ") waiting to cleanup " << cleanup.ns << " from " << cleanup.min << " -> " << cleanup.max << " # cursors:" << cleanup.initial.size() << endl;
+
+ stringstream ss;
+ for ( set<CursorId>::iterator i=cleanup.initial.begin(); i!=cleanup.initial.end(); ++i ) {
+ CursorId id = *i;
+ ss << id << " ";
+ }
+ log() << " cursors: " << ss.str() << endl;
+ }
+ }
+
+ migrateFromStatus.doRemove( cleanup );
+
+ cc().shutdown();
+ }
+
+ void cleanupOldData( OldDataCleanup cleanup ) {
+ try {
+ _cleanupOldData( cleanup );
+ }
+ catch ( std::exception& e ) {
+ log() << " error cleaning old data:" << e.what() << endl;
+ }
+ catch ( ... ) {
+ log() << " unknown error cleaning old data" << endl;
+ }
+ }
+
void logOpForSharding( const char * opstr , const char * ns , const BSONObj& obj , BSONObj * patt ) {
migrateFromStatus.logOp( opstr , ns , obj , patt );
}