diff options
Diffstat (limited to 's/d_migrate.cpp')
-rw-r--r-- | s/d_migrate.cpp | 126 |
1 files changed, 72 insertions, 54 deletions
diff --git a/s/d_migrate.cpp b/s/d_migrate.cpp index df12e54..6f2607d 100644 --- a/s/d_migrate.cpp +++ b/s/d_migrate.cpp @@ -165,59 +165,6 @@ namespace mongo { static const char * const cleanUpThreadName = "cleanupOldData"; - void _cleanupOldData( OldDataCleanup cleanup ) { - Client::initThread( cleanUpThreadName ); - log() << " (start) waiting to cleanup " << cleanup.ns << " from " << cleanup.min << " -> " << cleanup.max << " # cursors:" << cleanup.initial.size() << endl; - - int loops = 0; - Timer t; - while ( t.seconds() < 900 ) { // 15 minutes - assert( dbMutex.getState() == 0 ); - sleepmillis( 20 ); - - set<CursorId> now; - ClientCursor::find( cleanup.ns , now ); - - set<CursorId> left; - for ( set<CursorId>::iterator i=cleanup.initial.begin(); i!=cleanup.initial.end(); ++i ) { - CursorId id = *i; - if ( now.count(id) ) - left.insert( id ); - } - - if ( left.size() == 0 ) - break; - cleanup.initial = left; - - if ( ( loops++ % 200 ) == 0 ) { - log() << " (looping " << loops << ") waiting to cleanup " << cleanup.ns << " from " << cleanup.min << " -> " << cleanup.max << " # cursors:" << cleanup.initial.size() << endl; - - stringstream ss; - for ( set<CursorId>::iterator i=cleanup.initial.begin(); i!=cleanup.initial.end(); ++i ) { - CursorId id = *i; - ss << id << " "; - } - log() << " cursors: " << ss.str() << endl; - } - } - - cleanup.doRemove(); - - cc().shutdown(); - } - - void cleanupOldData( OldDataCleanup cleanup ) { - try { - _cleanupOldData( cleanup ); - } - catch ( std::exception& e ) { - log() << " error cleaning old data:" << e.what() << endl; - } - catch ( ... ) { - log() << " unknown error cleaning old data" << endl; - } - } - class ChunkCommandHelper : public Command { public: ChunkCommandHelper( const char * name ) @@ -243,13 +190,14 @@ namespace mongo { class MigrateFromStatus { public: - MigrateFromStatus() : _m("MigrateFromStatus") { + MigrateFromStatus() : _m("MigrateFromStatus") , _workLock( "MigrateFromStatus::WorkLock" ) { _active = false; _inCriticalSection = false; _memoryUsed = 0; } void start( string ns , const BSONObj& min , const BSONObj& max ) { + scoped_lock lk( _workLock ); scoped_lock l(_m); // reads and writes _active assert( ! _active ); @@ -568,6 +516,20 @@ namespace mongo { bool isActive() const { return _getActive(); } + + void doRemove( OldDataCleanup& cleanup ) { + while ( true ) { + { + scoped_lock lk( _workLock ); + if ( ! _active ) { + cleanup.doRemove(); + return; + } + } + sleepmillis( 100 ); + } + } + private: mutable mongo::mutex _m; // protect _inCriticalSection and _active bool _inCriticalSection; @@ -591,6 +553,9 @@ namespace mongo { list<BSONObj> _deleted; // objects deleted during clone that should be deleted later long long _memoryUsed; // bytes in _reload + _deleted + mutable mongo::mutex _workLock; // this is used to make sure only 1 thread is doing serious work + // for now, this means migrate or removing old chunk data + bool _getActive() const { scoped_lock l(_m); return _active; } void _setActive( bool b ) { scoped_lock l(_m); _active = b; } @@ -605,6 +570,59 @@ namespace mongo { } }; + void _cleanupOldData( OldDataCleanup cleanup ) { + Client::initThread( cleanUpThreadName ); + log() << " (start) waiting to cleanup " << cleanup.ns << " from " << cleanup.min << " -> " << cleanup.max << " # cursors:" << cleanup.initial.size() << endl; + + int loops = 0; + Timer t; + while ( t.seconds() < 900 ) { // 15 minutes + assert( dbMutex.getState() == 0 ); + sleepmillis( 20 ); + + set<CursorId> now; + ClientCursor::find( cleanup.ns , now ); + + set<CursorId> left; + for ( set<CursorId>::iterator i=cleanup.initial.begin(); i!=cleanup.initial.end(); ++i ) { + CursorId id = *i; + if ( now.count(id) ) + left.insert( id ); + } + + if ( left.size() == 0 ) + break; + cleanup.initial = left; + + if ( ( loops++ % 200 ) == 0 ) { + log() << " (looping " << loops << ") waiting to cleanup " << cleanup.ns << " from " << cleanup.min << " -> " << cleanup.max << " # cursors:" << cleanup.initial.size() << endl; + + stringstream ss; + for ( set<CursorId>::iterator i=cleanup.initial.begin(); i!=cleanup.initial.end(); ++i ) { + CursorId id = *i; + ss << id << " "; + } + log() << " cursors: " << ss.str() << endl; + } + } + + migrateFromStatus.doRemove( cleanup ); + + cc().shutdown(); + } + + void cleanupOldData( OldDataCleanup cleanup ) { + try { + _cleanupOldData( cleanup ); + } + catch ( std::exception& e ) { + log() << " error cleaning old data:" << e.what() << endl; + } + catch ( ... ) { + log() << " unknown error cleaning old data" << endl; + } + } + void logOpForSharding( const char * opstr , const char * ns , const BSONObj& obj , BSONObj * patt ) { migrateFromStatus.logOp( opstr , ns , obj , patt ); } |