summaryrefslogtreecommitdiff
path: root/s/shard_version.cpp
diff options
context:
space:
mode:
Diffstat (limited to 's/shard_version.cpp')
-rw-r--r--s/shard_version.cpp121
1 files changed, 98 insertions, 23 deletions
diff --git a/s/shard_version.cpp b/s/shard_version.cpp
index 8782c8e..9c55019 100644
--- a/s/shard_version.cpp
+++ b/s/shard_version.cpp
@@ -31,6 +31,8 @@ namespace mongo {
// when running in sharded mode, use chunk shard version control
+ static bool isVersionable( DBClientBase * conn );
+ static bool initShardVersion( DBClientBase & conn, BSONObj& result );
static bool checkShardVersion( DBClientBase & conn , const string& ns , bool authoritative = false , int tryNumber = 1 );
static void resetShardVersion( DBClientBase * conn );
@@ -40,6 +42,8 @@ namespace mongo {
//
// TODO: Better encapsulate this mechanism.
//
+ isVersionableCB = isVersionable;
+ initShardVersionCB = initShardVersion;
checkShardVersionCB = checkShardVersion;
resetShardVersionCB = resetShardVersion;
}
@@ -52,6 +56,16 @@ namespace mongo {
: _mutex( "ConnectionShardStatus" ) {
}
+ bool isInitialized( DBClientBase * conn ){
+ scoped_lock lk( _mutex );
+ return _init.find( conn ) != _init.end();
+ }
+
+ void setInitialized( DBClientBase * conn ){
+ scoped_lock lk( _mutex );
+ _init.insert( conn );
+ }
+
S getSequence( DBClientBase * conn , const string& ns ) {
scoped_lock lk( _mutex );
return _map[conn][ns];
@@ -65,13 +79,15 @@ namespace mongo {
void reset( DBClientBase * conn ) {
scoped_lock lk( _mutex );
_map.erase( conn );
+ _init.erase( conn );
}
- // protects _map
+ // protects _maps
mongo::mutex _mutex;
// a map from a connection into ChunkManager's sequence number for each namespace
map<DBClientBase*, map<string,unsigned long long> > _map;
+ set<DBClientBase*> _init;
} connectionShardStatus;
@@ -79,6 +95,75 @@ namespace mongo {
connectionShardStatus.reset( conn );
}
+ bool isVersionable( DBClientBase* conn ){
+ return conn->type() == ConnectionString::MASTER || conn->type() == ConnectionString::SET;
+ }
+
+ DBClientBase* getVersionable( DBClientBase* conn ){
+
+ switch ( conn->type() ) {
+ case ConnectionString::INVALID:
+ massert( 15904, str::stream() << "cannot set version on invalid connection " << conn->toString(), false );
+ return NULL;
+ case ConnectionString::MASTER:
+ return conn;
+ case ConnectionString::PAIR:
+ massert( 15905, str::stream() << "cannot set version or shard on pair connection " << conn->toString(), false );
+ return NULL;
+ case ConnectionString::SYNC:
+ massert( 15906, str::stream() << "cannot set version or shard on sync connection " << conn->toString(), false );
+ return NULL;
+ case ConnectionString::SET:
+ DBClientReplicaSet* set = (DBClientReplicaSet*) conn;
+ return &( set->masterConn() );
+ }
+
+ assert( false );
+ return NULL;
+ }
+
+ extern OID serverID;
+
+ bool initShardVersion( DBClientBase& conn_in, BSONObj& result ){
+
+ WriteBackListener::init( conn_in );
+
+ DBClientBase* conn = getVersionable( &conn_in );
+ assert( conn ); // errors thrown above
+
+ BSONObjBuilder cmdBuilder;
+
+ cmdBuilder.append( "setShardVersion" , "" );
+ cmdBuilder.appendBool( "init", true );
+ cmdBuilder.append( "configdb" , configServer.modelServer() );
+ cmdBuilder.appendOID( "serverID" , &serverID );
+ cmdBuilder.appendBool( "authoritative" , true );
+
+ BSONObj cmd = cmdBuilder.obj();
+
+ LOG(1) << "initializing shard connection to " << conn->toString() << endl;
+ LOG(2) << "initial sharding settings : " << cmd << endl;
+
+ bool ok = conn->runCommand( "admin" , cmd , result );
+ connectionShardStatus.setInitialized( conn );
+
+ // HACK for backwards compatibility with v1.8.x, v2.0.0 and v2.0.1
+ // Result is false, but will still initialize serverID and configdb
+ // Not master does not initialize serverID and configdb, but we ignore since if the connection is not master,
+ // we are not setting the shard version at all
+ if( ! ok && ! result["errmsg"].eoo() && ( result["errmsg"].String() == "need to specify namespace"/* 2.0.1/2 */ ||
+ result["errmsg"].String() == "need to speciy namespace" /* 1.8 */ ||
+ result["errmsg"].String() == "not master" /* both */ ) )
+ {
+ ok = true;
+ }
+
+ LOG(3) << "initial sharding result : " << result << endl;
+
+ return ok;
+
+ }
+
/**
* @return true if had to do something
*/
@@ -91,31 +176,14 @@ namespace mongo {
if ( ! conf )
return false;
- DBClientBase* conn = 0;
+ DBClientBase* conn = getVersionable( &conn_in );
+ assert(conn); // errors thrown above
- switch ( conn_in.type() ) {
- case ConnectionString::INVALID:
- assert(0);
- break;
- case ConnectionString::MASTER:
- // great
- conn = &conn_in;
- break;
- case ConnectionString::PAIR:
- assert( ! "pair not support for sharding" );
- break;
- case ConnectionString::SYNC:
- // TODO: we should check later that we aren't actually sharded on this
- conn = &conn_in;
- break;
- case ConnectionString::SET:
- DBClientReplicaSet* set = (DBClientReplicaSet*)&conn_in;
- conn = &(set->masterConn());
- break;
+ if( ! connectionShardStatus.isInitialized( conn ) ){
+ BSONObj result;
+ uassert( 15918, str::stream() << "cannot initialize version on shard " << conn->getServerAddress() << causedBy( result.toString() ), initShardVersion( *conn, result ) );
}
- assert(conn);
-
unsigned long long officialSequenceNumber = 0;
ChunkManagerPtr manager;
@@ -140,6 +208,13 @@ namespace mongo {
version = manager->getVersion( Shard::make( conn->getServerAddress() ) );
}
+ if( version == 0 ){
+ LOG(2) << "resetting shard version of " << ns << " on " << conn->getServerAddress() << ", " <<
+ ( ! isSharded ? "no longer sharded" :
+ ( ! manager ? "no chunk manager found" :
+ "version is zero" ) ) << endl;
+ }
+
LOG(2) << " have to set shard version for conn: " << conn << " ns:" << ns
<< " my last seq: " << sequenceNumber << " current: " << officialSequenceNumber
<< " version: " << version << " manager: " << manager.get()