diff options
Diffstat (limited to 's')
-rw-r--r-- | s/chunk.cpp | 6 | ||||
-rw-r--r-- | s/chunk.h | 1 | ||||
-rw-r--r-- | s/client.cpp | 4 | ||||
-rw-r--r-- | s/config.cpp | 48 | ||||
-rw-r--r-- | s/config.h | 3 | ||||
-rw-r--r-- | s/d_migrate.cpp | 126 | ||||
-rw-r--r-- | s/shard.h | 8 | ||||
-rw-r--r-- | s/shardconnection.cpp | 72 | ||||
-rw-r--r-- | s/strategy_shard.cpp | 2 | ||||
-rw-r--r-- | s/writeback_listener.cpp | 15 |
10 files changed, 197 insertions, 88 deletions
diff --git a/s/chunk.cpp b/s/chunk.cpp index 1e473e2..2d0ad5d 100644 --- a/s/chunk.cpp +++ b/s/chunk.cpp @@ -749,7 +749,7 @@ namespace mongo { FieldRange range = frs->range(_key.key().firstElement().fieldName()); if ( !range.nontrivial() ) { DEV PRINT(range.nontrivial()); - getAllShards(shards); + getAllShards_inlock(shards); return; } } @@ -806,6 +806,10 @@ namespace mongo { void ChunkManager::getAllShards( set<Shard>& all ) { rwlock lk( _lock , false ); + getAllShards_inlock( all ); + } + + void ChunkManager::getAllShards_inlock( set<Shard>& all ){ all.insert(_shards.begin(), _shards.end()); } @@ -350,6 +350,7 @@ namespace mongo { void _load(); void ensureIndex_inlock(); + void getAllShards_inlock( set<Shard>& all ); string _ns; ShardKeyPattern _key; diff --git a/s/client.cpp b/s/client.cpp index c0d25fb..c053289 100644 --- a/s/client.cpp +++ b/s/client.cpp @@ -141,7 +141,7 @@ namespace mongo { if ( shards->size() == 1 ) { string theShard = *(shards->begin() ); - ShardConnection conn( theShard , "" ); + ShardConnection conn( theShard , "", true ); BSONObj res; bool ok = false; @@ -211,7 +211,7 @@ namespace mongo { for ( set<string>::iterator i = shards->begin(); i != shards->end(); i++ ) { string theShard = *i; bbb.append( theShard ); - ShardConnection conn( theShard , "" ); + ShardConnection conn( theShard , "", true ); BSONObj res; bool ok = false; try { diff --git a/s/config.cpp b/s/config.cpp index 9ed3207..0766717 100644 --- a/s/config.cpp +++ b/s/config.cpp @@ -20,6 +20,8 @@ #include "../util/message.h" #include "../util/stringutils.h" #include "../util/unittest.h" +#include "../util/timer.h" + #include "../client/connpool.h" #include "../client/model.h" #include "../db/pdfile.h" @@ -53,8 +55,15 @@ namespace mongo { DBConfig::CollectionInfo::CollectionInfo( const BSONObj& in ) { _dirty = false; _dropped = in["dropped"].trueValue(); - if ( in["key"].isABSONObj() ) + if ( in["key"].isABSONObj() ) { + Timer t; shard( in["_id"].String() , in["key"].Obj() , in["unique"].trueValue() ); + log() << "creating ChunkManager ns: " << in["_id"] + << " took: " << t.millis() << "ms" + << " sequenceNumber: " << _cm->getSequenceNumber() + << endl; + _dirty = false; + } } @@ -87,6 +96,32 @@ namespace mongo { _dirty = false; } + bool DBConfig::CollectionInfo::needsReloading( DBClientBase * conn , const BSONObj& collectionInfo ) { + if ( ! _cm ) { + return true; + } + + if ( _dirty || _dropped ) { + return true; + } + + if ( collectionInfo["dropped"].trueValue() ) { + return true; + } + + BSONObj newest = conn->findOne( ShardNS::chunk , + Query( BSON( "ns" << collectionInfo["_id"].String() ) ).sort( "lastmod" , -1 ) ); + + if ( newest.isEmpty() ) { + // either a drop or something else weird + return true; + } + + ShardChunkVersion fromdb = newest["lastmod"]; + ShardChunkVersion inmemory = _cm->getVersion(); + return fromdb != inmemory; + } + bool DBConfig::isSharded( const string& ns ) { if ( ! _shardingEnabled ) return false; @@ -232,13 +267,20 @@ namespace mongo { unserialize( o ); BSONObjBuilder b; - b.appendRegex( "_id" , (string)"^" + _name + "." ); + b.appendRegex( "_id" , (string)"^" + _name + "\\." ); auto_ptr<DBClientCursor> cursor = conn->query( ShardNS::collection ,b.obj() ); assert( cursor.get() ); while ( cursor->more() ) { BSONObj o = cursor->next(); - _collections[o["_id"].String()] = CollectionInfo( o ); + string ns = o["_id"].String(); + + Collections::iterator i = _collections.find( ns ); + if ( i != _collections.end() && ! i->second.needsReloading( conn.get() , o ) ) { + continue; + } + + _collections[ns] = CollectionInfo( o ); } conn.done(); @@ -88,7 +88,8 @@ namespace mongo { bool wasDropped() const { return _dropped; } void save( const string& ns , DBClientBase* conn ); - + + bool needsReloading( DBClientBase * conn , const BSONObj& collectionInfo ); private: ChunkManagerPtr _cm; diff --git a/s/d_migrate.cpp b/s/d_migrate.cpp index df12e54..6f2607d 100644 --- a/s/d_migrate.cpp +++ b/s/d_migrate.cpp @@ -165,59 +165,6 @@ namespace mongo { static const char * const cleanUpThreadName = "cleanupOldData"; - void _cleanupOldData( OldDataCleanup cleanup ) { - Client::initThread( cleanUpThreadName ); - log() << " (start) waiting to cleanup " << cleanup.ns << " from " << cleanup.min << " -> " << cleanup.max << " # cursors:" << cleanup.initial.size() << endl; - - int loops = 0; - Timer t; - while ( t.seconds() < 900 ) { // 15 minutes - assert( dbMutex.getState() == 0 ); - sleepmillis( 20 ); - - set<CursorId> now; - ClientCursor::find( cleanup.ns , now ); - - set<CursorId> left; - for ( set<CursorId>::iterator i=cleanup.initial.begin(); i!=cleanup.initial.end(); ++i ) { - CursorId id = *i; - if ( now.count(id) ) - left.insert( id ); - } - - if ( left.size() == 0 ) - break; - cleanup.initial = left; - - if ( ( loops++ % 200 ) == 0 ) { - log() << " (looping " << loops << ") waiting to cleanup " << cleanup.ns << " from " << cleanup.min << " -> " << cleanup.max << " # cursors:" << cleanup.initial.size() << endl; - - stringstream ss; - for ( set<CursorId>::iterator i=cleanup.initial.begin(); i!=cleanup.initial.end(); ++i ) { - CursorId id = *i; - ss << id << " "; - } - log() << " cursors: " << ss.str() << endl; - } - } - - cleanup.doRemove(); - - cc().shutdown(); - } - - void cleanupOldData( OldDataCleanup cleanup ) { - try { - _cleanupOldData( cleanup ); - } - catch ( std::exception& e ) { - log() << " error cleaning old data:" << e.what() << endl; - } - catch ( ... ) { - log() << " unknown error cleaning old data" << endl; - } - } - class ChunkCommandHelper : public Command { public: ChunkCommandHelper( const char * name ) @@ -243,13 +190,14 @@ namespace mongo { class MigrateFromStatus { public: - MigrateFromStatus() : _m("MigrateFromStatus") { + MigrateFromStatus() : _m("MigrateFromStatus") , _workLock( "MigrateFromStatus::WorkLock" ) { _active = false; _inCriticalSection = false; _memoryUsed = 0; } void start( string ns , const BSONObj& min , const BSONObj& max ) { + scoped_lock lk( _workLock ); scoped_lock l(_m); // reads and writes _active assert( ! _active ); @@ -568,6 +516,20 @@ namespace mongo { bool isActive() const { return _getActive(); } + + void doRemove( OldDataCleanup& cleanup ) { + while ( true ) { + { + scoped_lock lk( _workLock ); + if ( ! _active ) { + cleanup.doRemove(); + return; + } + } + sleepmillis( 100 ); + } + } + private: mutable mongo::mutex _m; // protect _inCriticalSection and _active bool _inCriticalSection; @@ -591,6 +553,9 @@ namespace mongo { list<BSONObj> _deleted; // objects deleted during clone that should be deleted later long long _memoryUsed; // bytes in _reload + _deleted + mutable mongo::mutex _workLock; // this is used to make sure only 1 thread is doing serious work + // for now, this means migrate or removing old chunk data + bool _getActive() const { scoped_lock l(_m); return _active; } void _setActive( bool b ) { scoped_lock l(_m); _active = b; } @@ -605,6 +570,59 @@ namespace mongo { } }; + void _cleanupOldData( OldDataCleanup cleanup ) { + Client::initThread( cleanUpThreadName ); + log() << " (start) waiting to cleanup " << cleanup.ns << " from " << cleanup.min << " -> " << cleanup.max << " # cursors:" << cleanup.initial.size() << endl; + + int loops = 0; + Timer t; + while ( t.seconds() < 900 ) { // 15 minutes + assert( dbMutex.getState() == 0 ); + sleepmillis( 20 ); + + set<CursorId> now; + ClientCursor::find( cleanup.ns , now ); + + set<CursorId> left; + for ( set<CursorId>::iterator i=cleanup.initial.begin(); i!=cleanup.initial.end(); ++i ) { + CursorId id = *i; + if ( now.count(id) ) + left.insert( id ); + } + + if ( left.size() == 0 ) + break; + cleanup.initial = left; + + if ( ( loops++ % 200 ) == 0 ) { + log() << " (looping " << loops << ") waiting to cleanup " << cleanup.ns << " from " << cleanup.min << " -> " << cleanup.max << " # cursors:" << cleanup.initial.size() << endl; + + stringstream ss; + for ( set<CursorId>::iterator i=cleanup.initial.begin(); i!=cleanup.initial.end(); ++i ) { + CursorId id = *i; + ss << id << " "; + } + log() << " cursors: " << ss.str() << endl; + } + } + + migrateFromStatus.doRemove( cleanup ); + + cc().shutdown(); + } + + void cleanupOldData( OldDataCleanup cleanup ) { + try { + _cleanupOldData( cleanup ); + } + catch ( std::exception& e ) { + log() << " error cleaning old data:" << e.what() << endl; + } + catch ( ... ) { + log() << " unknown error cleaning old data" << endl; + } + } + void logOpForSharding( const char * opstr , const char * ns , const BSONObj& obj , BSONObj * patt ) { migrateFromStatus.logOp( opstr , ns , obj , patt ); } @@ -213,9 +213,9 @@ namespace mongo { class ShardConnection : public AScopedConnection { public: - ShardConnection( const Shard * s , const string& ns ); - ShardConnection( const Shard& s , const string& ns ); - ShardConnection( const string& addr , const string& ns ); + ShardConnection( const Shard * s , const string& ns, bool ignoreDirect = false ); + ShardConnection( const Shard& s , const string& ns, bool ignoreDirect = false ); + ShardConnection( const string& addr , const string& ns, bool ignoreDirect = false ); ~ShardConnection(); @@ -265,7 +265,7 @@ namespace mongo { static void checkMyConnectionVersions( const string & ns ); private: - void _init(); + void _init( bool ignoreDirect = false ); void _finishInit(); bool _finishedInit; diff --git a/s/shardconnection.cpp b/s/shardconnection.cpp index d05f5b1..ec14139 100644 --- a/s/shardconnection.cpp +++ b/s/shardconnection.cpp @@ -41,6 +41,9 @@ namespace mongo { boost::function4<bool, DBClientBase&, const string&, bool, int> checkShardVersionCB = defaultCheckShardVersion; boost::function1<void, DBClientBase*> resetShardVersionCB = defaultResetShardVersion; + // Only print the non-top-level-shard-conn warning once if not verbose + volatile bool printedShardConnWarning = false; + /** * holds all the actual db connections for a client to various servers * 1 pre thread, so don't have to worry about thread safety @@ -76,9 +79,35 @@ namespace mongo { _hosts.clear(); } - DBClientBase * get( const string& addr , const string& ns ) { + DBClientBase * get( const string& addr , const string& ns, bool ignoreDirect = false ) { _check( ns ); + // Determine if non-shard conn is RS member for warning + // All shards added to _hosts if not present in _check() + if( ( logLevel >= 1 || ! printedShardConnWarning ) && ! ignoreDirect && _hosts.find( addr ) == _hosts.end() ){ + + vector<Shard> all; + Shard::getAllShards( all ); + + bool isRSMember = false; + string parentShard; + for ( unsigned i = 0; i < all.size(); i++ ) { + string connString = all[i].getConnString(); + if( connString.find( addr ) != string::npos && connString.find( '/' ) != string::npos ){ + isRSMember = true; + parentShard = connString; + break; + } + } + + if( isRSMember ){ + printedShardConnWarning = true; + warning() << "adding shard sub-connection " << addr << " (parent " << parentShard << ") as sharded, this is safe but unexpected" << endl; + printStackTrace(); + } + } + + Status* &s = _hosts[addr]; if ( ! s ) s = new Status(); @@ -120,22 +149,25 @@ namespace mongo { } void checkVersions( const string& ns ) { + vector<Shard> all; Shard::getAllShards( all ); + + // Now only check top-level shard connections for ( unsigned i=0; i<all.size(); i++ ) { - Status* &s = _hosts[all[i].getConnString()]; - if ( ! s ) + + string sconnString = all[i].getConnString(); + Status* &s = _hosts[ sconnString ]; + + if ( ! s ){ s = new Status(); - } + } + + if( ! s->avail ) + s->avail = pool.get( sconnString ); + + checkShardVersionCB( *s->avail, ns, false, 1 ); - for ( HostMap::iterator i=_hosts.begin(); i!=_hosts.end(); ++i ) { - if ( ! Shard::isAShardNode( i->first ) ) - continue; - Status* ss = i->second; - assert( ss ); - if ( ! ss->avail ) - ss->avail = pool.get( i->first ); - checkShardVersionCB( *ss->avail , ns , false , 1 ); } } @@ -189,24 +221,24 @@ namespace mongo { thread_specific_ptr<ClientConnections> ClientConnections::_perThread; - ShardConnection::ShardConnection( const Shard * s , const string& ns ) + ShardConnection::ShardConnection( const Shard * s , const string& ns, bool ignoreDirect ) : _addr( s->getConnString() ) , _ns( ns ) { - _init(); + _init( ignoreDirect ); } - ShardConnection::ShardConnection( const Shard& s , const string& ns ) + ShardConnection::ShardConnection( const Shard& s , const string& ns, bool ignoreDirect ) : _addr( s.getConnString() ) , _ns( ns ) { - _init(); + _init( ignoreDirect ); } - ShardConnection::ShardConnection( const string& addr , const string& ns ) + ShardConnection::ShardConnection( const string& addr , const string& ns, bool ignoreDirect ) : _addr( addr ) , _ns( ns ) { - _init(); + _init( ignoreDirect ); } - void ShardConnection::_init() { + void ShardConnection::_init( bool ignoreDirect ) { assert( _addr.size() ); - _conn = ClientConnections::threadInstance()->get( _addr , _ns ); + _conn = ClientConnections::threadInstance()->get( _addr , _ns, ignoreDirect ); _finishedInit = false; } diff --git a/s/strategy_shard.cpp b/s/strategy_shard.cpp index 26ea79a..337fa58 100644 --- a/s/strategy_shard.cpp +++ b/s/strategy_shard.cpp @@ -151,7 +151,7 @@ namespace mongo { // Many operations benefit from having the shard key early in the object o = manager->getShardKey().moveToFront(o); - const int maxTries = 10; + const int maxTries = 30; bool gotThrough = false; for ( int i=0; i<maxTries; i++ ) { diff --git a/s/writeback_listener.cpp b/s/writeback_listener.cpp index 769163e..df7cc35 100644 --- a/s/writeback_listener.cpp +++ b/s/writeback_listener.cpp @@ -167,7 +167,9 @@ namespace mongo { if ( logLevel ) log(1) << debugString( m ) << endl; - if ( needVersion.isSet() && needVersion <= db->getChunkManager( ns )->getVersion() ) { + ShardChunkVersion start = db->getChunkManager( ns )->getVersion(); + + if ( needVersion.isSet() && needVersion <= start ) { // this means when the write went originally, the version was old // if we're here, it means we've already updated the config, so don't need to do again //db->getChunkManager( ns , true ); // SERVER-1349 @@ -176,7 +178,16 @@ namespace mongo { // we received a writeback object that was sent to a previous version of a shard // the actual shard may not have the object the writeback operation is for // we need to reload the chunk manager and get the new shard versions - db->getChunkManager( ns , true ); + bool good = false; + for ( int i=0; i<100; i++ ) { + if ( db->getChunkManager( ns , true )->getVersion() >= needVersion ) { + good = true; + break; + } + log() << "writeback getChunkManager didn't update?" << endl; + sleepmillis(10); + } + assert( good ); } // do request and then call getLastError |