summaryrefslogtreecommitdiff
path: root/s
diff options
context:
space:
mode:
Diffstat (limited to 's')
-rw-r--r--s/chunk.cpp6
-rw-r--r--s/chunk.h1
-rw-r--r--s/client.cpp4
-rw-r--r--s/config.cpp48
-rw-r--r--s/config.h3
-rw-r--r--s/d_migrate.cpp126
-rw-r--r--s/shard.h8
-rw-r--r--s/shardconnection.cpp72
-rw-r--r--s/strategy_shard.cpp2
-rw-r--r--s/writeback_listener.cpp15
10 files changed, 197 insertions, 88 deletions
diff --git a/s/chunk.cpp b/s/chunk.cpp
index 1e473e2..2d0ad5d 100644
--- a/s/chunk.cpp
+++ b/s/chunk.cpp
@@ -749,7 +749,7 @@ namespace mongo {
FieldRange range = frs->range(_key.key().firstElement().fieldName());
if ( !range.nontrivial() ) {
DEV PRINT(range.nontrivial());
- getAllShards(shards);
+ getAllShards_inlock(shards);
return;
}
}
@@ -806,6 +806,10 @@ namespace mongo {
void ChunkManager::getAllShards( set<Shard>& all ) {
rwlock lk( _lock , false );
+ getAllShards_inlock( all );
+ }
+
+ void ChunkManager::getAllShards_inlock( set<Shard>& all ){
all.insert(_shards.begin(), _shards.end());
}
diff --git a/s/chunk.h b/s/chunk.h
index 21e1fbf..6054afc 100644
--- a/s/chunk.h
+++ b/s/chunk.h
@@ -350,6 +350,7 @@ namespace mongo {
void _load();
void ensureIndex_inlock();
+ void getAllShards_inlock( set<Shard>& all );
string _ns;
ShardKeyPattern _key;
diff --git a/s/client.cpp b/s/client.cpp
index c0d25fb..c053289 100644
--- a/s/client.cpp
+++ b/s/client.cpp
@@ -141,7 +141,7 @@ namespace mongo {
if ( shards->size() == 1 ) {
string theShard = *(shards->begin() );
- ShardConnection conn( theShard , "" );
+ ShardConnection conn( theShard , "", true );
BSONObj res;
bool ok = false;
@@ -211,7 +211,7 @@ namespace mongo {
for ( set<string>::iterator i = shards->begin(); i != shards->end(); i++ ) {
string theShard = *i;
bbb.append( theShard );
- ShardConnection conn( theShard , "" );
+ ShardConnection conn( theShard , "", true );
BSONObj res;
bool ok = false;
try {
diff --git a/s/config.cpp b/s/config.cpp
index 9ed3207..0766717 100644
--- a/s/config.cpp
+++ b/s/config.cpp
@@ -20,6 +20,8 @@
#include "../util/message.h"
#include "../util/stringutils.h"
#include "../util/unittest.h"
+#include "../util/timer.h"
+
#include "../client/connpool.h"
#include "../client/model.h"
#include "../db/pdfile.h"
@@ -53,8 +55,15 @@ namespace mongo {
DBConfig::CollectionInfo::CollectionInfo( const BSONObj& in ) {
_dirty = false;
_dropped = in["dropped"].trueValue();
- if ( in["key"].isABSONObj() )
+ if ( in["key"].isABSONObj() ) {
+ Timer t;
shard( in["_id"].String() , in["key"].Obj() , in["unique"].trueValue() );
+ log() << "creating ChunkManager ns: " << in["_id"]
+ << " took: " << t.millis() << "ms"
+ << " sequenceNumber: " << _cm->getSequenceNumber()
+ << endl;
+ _dirty = false;
+ }
}
@@ -87,6 +96,32 @@ namespace mongo {
_dirty = false;
}
+ bool DBConfig::CollectionInfo::needsReloading( DBClientBase * conn , const BSONObj& collectionInfo ) {
+ if ( ! _cm ) {
+ return true;
+ }
+
+ if ( _dirty || _dropped ) {
+ return true;
+ }
+
+ if ( collectionInfo["dropped"].trueValue() ) {
+ return true;
+ }
+
+ BSONObj newest = conn->findOne( ShardNS::chunk ,
+ Query( BSON( "ns" << collectionInfo["_id"].String() ) ).sort( "lastmod" , -1 ) );
+
+ if ( newest.isEmpty() ) {
+ // either a drop or something else weird
+ return true;
+ }
+
+ ShardChunkVersion fromdb = newest["lastmod"];
+ ShardChunkVersion inmemory = _cm->getVersion();
+ return fromdb != inmemory;
+ }
+
bool DBConfig::isSharded( const string& ns ) {
if ( ! _shardingEnabled )
return false;
@@ -232,13 +267,20 @@ namespace mongo {
unserialize( o );
BSONObjBuilder b;
- b.appendRegex( "_id" , (string)"^" + _name + "." );
+ b.appendRegex( "_id" , (string)"^" + _name + "\\." );
auto_ptr<DBClientCursor> cursor = conn->query( ShardNS::collection ,b.obj() );
assert( cursor.get() );
while ( cursor->more() ) {
BSONObj o = cursor->next();
- _collections[o["_id"].String()] = CollectionInfo( o );
+ string ns = o["_id"].String();
+
+ Collections::iterator i = _collections.find( ns );
+ if ( i != _collections.end() && ! i->second.needsReloading( conn.get() , o ) ) {
+ continue;
+ }
+
+ _collections[ns] = CollectionInfo( o );
}
conn.done();
diff --git a/s/config.h b/s/config.h
index 0636835..13afe23 100644
--- a/s/config.h
+++ b/s/config.h
@@ -88,7 +88,8 @@ namespace mongo {
bool wasDropped() const { return _dropped; }
void save( const string& ns , DBClientBase* conn );
-
+
+ bool needsReloading( DBClientBase * conn , const BSONObj& collectionInfo );
private:
ChunkManagerPtr _cm;
diff --git a/s/d_migrate.cpp b/s/d_migrate.cpp
index df12e54..6f2607d 100644
--- a/s/d_migrate.cpp
+++ b/s/d_migrate.cpp
@@ -165,59 +165,6 @@ namespace mongo {
static const char * const cleanUpThreadName = "cleanupOldData";
- void _cleanupOldData( OldDataCleanup cleanup ) {
- Client::initThread( cleanUpThreadName );
- log() << " (start) waiting to cleanup " << cleanup.ns << " from " << cleanup.min << " -> " << cleanup.max << " # cursors:" << cleanup.initial.size() << endl;
-
- int loops = 0;
- Timer t;
- while ( t.seconds() < 900 ) { // 15 minutes
- assert( dbMutex.getState() == 0 );
- sleepmillis( 20 );
-
- set<CursorId> now;
- ClientCursor::find( cleanup.ns , now );
-
- set<CursorId> left;
- for ( set<CursorId>::iterator i=cleanup.initial.begin(); i!=cleanup.initial.end(); ++i ) {
- CursorId id = *i;
- if ( now.count(id) )
- left.insert( id );
- }
-
- if ( left.size() == 0 )
- break;
- cleanup.initial = left;
-
- if ( ( loops++ % 200 ) == 0 ) {
- log() << " (looping " << loops << ") waiting to cleanup " << cleanup.ns << " from " << cleanup.min << " -> " << cleanup.max << " # cursors:" << cleanup.initial.size() << endl;
-
- stringstream ss;
- for ( set<CursorId>::iterator i=cleanup.initial.begin(); i!=cleanup.initial.end(); ++i ) {
- CursorId id = *i;
- ss << id << " ";
- }
- log() << " cursors: " << ss.str() << endl;
- }
- }
-
- cleanup.doRemove();
-
- cc().shutdown();
- }
-
- void cleanupOldData( OldDataCleanup cleanup ) {
- try {
- _cleanupOldData( cleanup );
- }
- catch ( std::exception& e ) {
- log() << " error cleaning old data:" << e.what() << endl;
- }
- catch ( ... ) {
- log() << " unknown error cleaning old data" << endl;
- }
- }
-
class ChunkCommandHelper : public Command {
public:
ChunkCommandHelper( const char * name )
@@ -243,13 +190,14 @@ namespace mongo {
class MigrateFromStatus {
public:
- MigrateFromStatus() : _m("MigrateFromStatus") {
+ MigrateFromStatus() : _m("MigrateFromStatus") , _workLock( "MigrateFromStatus::WorkLock" ) {
_active = false;
_inCriticalSection = false;
_memoryUsed = 0;
}
void start( string ns , const BSONObj& min , const BSONObj& max ) {
+ scoped_lock lk( _workLock );
scoped_lock l(_m); // reads and writes _active
assert( ! _active );
@@ -568,6 +516,20 @@ namespace mongo {
bool isActive() const { return _getActive(); }
+
+ void doRemove( OldDataCleanup& cleanup ) {
+ while ( true ) {
+ {
+ scoped_lock lk( _workLock );
+ if ( ! _active ) {
+ cleanup.doRemove();
+ return;
+ }
+ }
+ sleepmillis( 100 );
+ }
+ }
+
private:
mutable mongo::mutex _m; // protect _inCriticalSection and _active
bool _inCriticalSection;
@@ -591,6 +553,9 @@ namespace mongo {
list<BSONObj> _deleted; // objects deleted during clone that should be deleted later
long long _memoryUsed; // bytes in _reload + _deleted
+ mutable mongo::mutex _workLock; // this is used to make sure only 1 thread is doing serious work
+ // for now, this means migrate or removing old chunk data
+
bool _getActive() const { scoped_lock l(_m); return _active; }
void _setActive( bool b ) { scoped_lock l(_m); _active = b; }
@@ -605,6 +570,59 @@ namespace mongo {
}
};
+ void _cleanupOldData( OldDataCleanup cleanup ) {
+ Client::initThread( cleanUpThreadName );
+ log() << " (start) waiting to cleanup " << cleanup.ns << " from " << cleanup.min << " -> " << cleanup.max << " # cursors:" << cleanup.initial.size() << endl;
+
+ int loops = 0;
+ Timer t;
+ while ( t.seconds() < 900 ) { // 15 minutes
+ assert( dbMutex.getState() == 0 );
+ sleepmillis( 20 );
+
+ set<CursorId> now;
+ ClientCursor::find( cleanup.ns , now );
+
+ set<CursorId> left;
+ for ( set<CursorId>::iterator i=cleanup.initial.begin(); i!=cleanup.initial.end(); ++i ) {
+ CursorId id = *i;
+ if ( now.count(id) )
+ left.insert( id );
+ }
+
+ if ( left.size() == 0 )
+ break;
+ cleanup.initial = left;
+
+ if ( ( loops++ % 200 ) == 0 ) {
+ log() << " (looping " << loops << ") waiting to cleanup " << cleanup.ns << " from " << cleanup.min << " -> " << cleanup.max << " # cursors:" << cleanup.initial.size() << endl;
+
+ stringstream ss;
+ for ( set<CursorId>::iterator i=cleanup.initial.begin(); i!=cleanup.initial.end(); ++i ) {
+ CursorId id = *i;
+ ss << id << " ";
+ }
+ log() << " cursors: " << ss.str() << endl;
+ }
+ }
+
+ migrateFromStatus.doRemove( cleanup );
+
+ cc().shutdown();
+ }
+
+ void cleanupOldData( OldDataCleanup cleanup ) {
+ try {
+ _cleanupOldData( cleanup );
+ }
+ catch ( std::exception& e ) {
+ log() << " error cleaning old data:" << e.what() << endl;
+ }
+ catch ( ... ) {
+ log() << " unknown error cleaning old data" << endl;
+ }
+ }
+
void logOpForSharding( const char * opstr , const char * ns , const BSONObj& obj , BSONObj * patt ) {
migrateFromStatus.logOp( opstr , ns , obj , patt );
}
diff --git a/s/shard.h b/s/shard.h
index 836ffe7..70e478c 100644
--- a/s/shard.h
+++ b/s/shard.h
@@ -213,9 +213,9 @@ namespace mongo {
class ShardConnection : public AScopedConnection {
public:
- ShardConnection( const Shard * s , const string& ns );
- ShardConnection( const Shard& s , const string& ns );
- ShardConnection( const string& addr , const string& ns );
+ ShardConnection( const Shard * s , const string& ns, bool ignoreDirect = false );
+ ShardConnection( const Shard& s , const string& ns, bool ignoreDirect = false );
+ ShardConnection( const string& addr , const string& ns, bool ignoreDirect = false );
~ShardConnection();
@@ -265,7 +265,7 @@ namespace mongo {
static void checkMyConnectionVersions( const string & ns );
private:
- void _init();
+ void _init( bool ignoreDirect = false );
void _finishInit();
bool _finishedInit;
diff --git a/s/shardconnection.cpp b/s/shardconnection.cpp
index d05f5b1..ec14139 100644
--- a/s/shardconnection.cpp
+++ b/s/shardconnection.cpp
@@ -41,6 +41,9 @@ namespace mongo {
boost::function4<bool, DBClientBase&, const string&, bool, int> checkShardVersionCB = defaultCheckShardVersion;
boost::function1<void, DBClientBase*> resetShardVersionCB = defaultResetShardVersion;
+ // Only print the non-top-level-shard-conn warning once if not verbose
+ volatile bool printedShardConnWarning = false;
+
/**
* holds all the actual db connections for a client to various servers
* 1 pre thread, so don't have to worry about thread safety
@@ -76,9 +79,35 @@ namespace mongo {
_hosts.clear();
}
- DBClientBase * get( const string& addr , const string& ns ) {
+ DBClientBase * get( const string& addr , const string& ns, bool ignoreDirect = false ) {
_check( ns );
+ // Determine if non-shard conn is RS member for warning
+ // All shards added to _hosts if not present in _check()
+ if( ( logLevel >= 1 || ! printedShardConnWarning ) && ! ignoreDirect && _hosts.find( addr ) == _hosts.end() ){
+
+ vector<Shard> all;
+ Shard::getAllShards( all );
+
+ bool isRSMember = false;
+ string parentShard;
+ for ( unsigned i = 0; i < all.size(); i++ ) {
+ string connString = all[i].getConnString();
+ if( connString.find( addr ) != string::npos && connString.find( '/' ) != string::npos ){
+ isRSMember = true;
+ parentShard = connString;
+ break;
+ }
+ }
+
+ if( isRSMember ){
+ printedShardConnWarning = true;
+ warning() << "adding shard sub-connection " << addr << " (parent " << parentShard << ") as sharded, this is safe but unexpected" << endl;
+ printStackTrace();
+ }
+ }
+
+
Status* &s = _hosts[addr];
if ( ! s )
s = new Status();
@@ -120,22 +149,25 @@ namespace mongo {
}
void checkVersions( const string& ns ) {
+
vector<Shard> all;
Shard::getAllShards( all );
+
+ // Now only check top-level shard connections
for ( unsigned i=0; i<all.size(); i++ ) {
- Status* &s = _hosts[all[i].getConnString()];
- if ( ! s )
+
+ string sconnString = all[i].getConnString();
+ Status* &s = _hosts[ sconnString ];
+
+ if ( ! s ){
s = new Status();
- }
+ }
+
+ if( ! s->avail )
+ s->avail = pool.get( sconnString );
+
+ checkShardVersionCB( *s->avail, ns, false, 1 );
- for ( HostMap::iterator i=_hosts.begin(); i!=_hosts.end(); ++i ) {
- if ( ! Shard::isAShardNode( i->first ) )
- continue;
- Status* ss = i->second;
- assert( ss );
- if ( ! ss->avail )
- ss->avail = pool.get( i->first );
- checkShardVersionCB( *ss->avail , ns , false , 1 );
}
}
@@ -189,24 +221,24 @@ namespace mongo {
thread_specific_ptr<ClientConnections> ClientConnections::_perThread;
- ShardConnection::ShardConnection( const Shard * s , const string& ns )
+ ShardConnection::ShardConnection( const Shard * s , const string& ns, bool ignoreDirect )
: _addr( s->getConnString() ) , _ns( ns ) {
- _init();
+ _init( ignoreDirect );
}
- ShardConnection::ShardConnection( const Shard& s , const string& ns )
+ ShardConnection::ShardConnection( const Shard& s , const string& ns, bool ignoreDirect )
: _addr( s.getConnString() ) , _ns( ns ) {
- _init();
+ _init( ignoreDirect );
}
- ShardConnection::ShardConnection( const string& addr , const string& ns )
+ ShardConnection::ShardConnection( const string& addr , const string& ns, bool ignoreDirect )
: _addr( addr ) , _ns( ns ) {
- _init();
+ _init( ignoreDirect );
}
- void ShardConnection::_init() {
+ void ShardConnection::_init( bool ignoreDirect ) {
assert( _addr.size() );
- _conn = ClientConnections::threadInstance()->get( _addr , _ns );
+ _conn = ClientConnections::threadInstance()->get( _addr , _ns, ignoreDirect );
_finishedInit = false;
}
diff --git a/s/strategy_shard.cpp b/s/strategy_shard.cpp
index 26ea79a..337fa58 100644
--- a/s/strategy_shard.cpp
+++ b/s/strategy_shard.cpp
@@ -151,7 +151,7 @@ namespace mongo {
// Many operations benefit from having the shard key early in the object
o = manager->getShardKey().moveToFront(o);
- const int maxTries = 10;
+ const int maxTries = 30;
bool gotThrough = false;
for ( int i=0; i<maxTries; i++ ) {
diff --git a/s/writeback_listener.cpp b/s/writeback_listener.cpp
index 769163e..df7cc35 100644
--- a/s/writeback_listener.cpp
+++ b/s/writeback_listener.cpp
@@ -167,7 +167,9 @@ namespace mongo {
if ( logLevel ) log(1) << debugString( m ) << endl;
- if ( needVersion.isSet() && needVersion <= db->getChunkManager( ns )->getVersion() ) {
+ ShardChunkVersion start = db->getChunkManager( ns )->getVersion();
+
+ if ( needVersion.isSet() && needVersion <= start ) {
// this means when the write went originally, the version was old
// if we're here, it means we've already updated the config, so don't need to do again
//db->getChunkManager( ns , true ); // SERVER-1349
@@ -176,7 +178,16 @@ namespace mongo {
// we received a writeback object that was sent to a previous version of a shard
// the actual shard may not have the object the writeback operation is for
// we need to reload the chunk manager and get the new shard versions
- db->getChunkManager( ns , true );
+ bool good = false;
+ for ( int i=0; i<100; i++ ) {
+ if ( db->getChunkManager( ns , true )->getVersion() >= needVersion ) {
+ good = true;
+ break;
+ }
+ log() << "writeback getChunkManager didn't update?" << endl;
+ sleepmillis(10);
+ }
+ assert( good );
}
// do request and then call getLastError