diff options
Diffstat (limited to 's/shard_version.cpp')
-rw-r--r-- | s/shard_version.cpp | 121 |
1 files changed, 98 insertions, 23 deletions
diff --git a/s/shard_version.cpp b/s/shard_version.cpp index 8782c8e..9c55019 100644 --- a/s/shard_version.cpp +++ b/s/shard_version.cpp @@ -31,6 +31,8 @@ namespace mongo { // when running in sharded mode, use chunk shard version control + static bool isVersionable( DBClientBase * conn ); + static bool initShardVersion( DBClientBase & conn, BSONObj& result ); static bool checkShardVersion( DBClientBase & conn , const string& ns , bool authoritative = false , int tryNumber = 1 ); static void resetShardVersion( DBClientBase * conn ); @@ -40,6 +42,8 @@ namespace mongo { // // TODO: Better encapsulate this mechanism. // + isVersionableCB = isVersionable; + initShardVersionCB = initShardVersion; checkShardVersionCB = checkShardVersion; resetShardVersionCB = resetShardVersion; } @@ -52,6 +56,16 @@ namespace mongo { : _mutex( "ConnectionShardStatus" ) { } + bool isInitialized( DBClientBase * conn ){ + scoped_lock lk( _mutex ); + return _init.find( conn ) != _init.end(); + } + + void setInitialized( DBClientBase * conn ){ + scoped_lock lk( _mutex ); + _init.insert( conn ); + } + S getSequence( DBClientBase * conn , const string& ns ) { scoped_lock lk( _mutex ); return _map[conn][ns]; @@ -65,13 +79,15 @@ namespace mongo { void reset( DBClientBase * conn ) { scoped_lock lk( _mutex ); _map.erase( conn ); + _init.erase( conn ); } - // protects _map + // protects _maps mongo::mutex _mutex; // a map from a connection into ChunkManager's sequence number for each namespace map<DBClientBase*, map<string,unsigned long long> > _map; + set<DBClientBase*> _init; } connectionShardStatus; @@ -79,6 +95,75 @@ namespace mongo { connectionShardStatus.reset( conn ); } + bool isVersionable( DBClientBase* conn ){ + return conn->type() == ConnectionString::MASTER || conn->type() == ConnectionString::SET; + } + + DBClientBase* getVersionable( DBClientBase* conn ){ + + switch ( conn->type() ) { + case ConnectionString::INVALID: + massert( 15904, str::stream() << "cannot set version on invalid connection " << conn->toString(), false ); + return NULL; + case ConnectionString::MASTER: + return conn; + case ConnectionString::PAIR: + massert( 15905, str::stream() << "cannot set version or shard on pair connection " << conn->toString(), false ); + return NULL; + case ConnectionString::SYNC: + massert( 15906, str::stream() << "cannot set version or shard on sync connection " << conn->toString(), false ); + return NULL; + case ConnectionString::SET: + DBClientReplicaSet* set = (DBClientReplicaSet*) conn; + return &( set->masterConn() ); + } + + assert( false ); + return NULL; + } + + extern OID serverID; + + bool initShardVersion( DBClientBase& conn_in, BSONObj& result ){ + + WriteBackListener::init( conn_in ); + + DBClientBase* conn = getVersionable( &conn_in ); + assert( conn ); // errors thrown above + + BSONObjBuilder cmdBuilder; + + cmdBuilder.append( "setShardVersion" , "" ); + cmdBuilder.appendBool( "init", true ); + cmdBuilder.append( "configdb" , configServer.modelServer() ); + cmdBuilder.appendOID( "serverID" , &serverID ); + cmdBuilder.appendBool( "authoritative" , true ); + + BSONObj cmd = cmdBuilder.obj(); + + LOG(1) << "initializing shard connection to " << conn->toString() << endl; + LOG(2) << "initial sharding settings : " << cmd << endl; + + bool ok = conn->runCommand( "admin" , cmd , result ); + connectionShardStatus.setInitialized( conn ); + + // HACK for backwards compatibility with v1.8.x, v2.0.0 and v2.0.1 + // Result is false, but will still initialize serverID and configdb + // Not master does not initialize serverID and configdb, but we ignore since if the connection is not master, + // we are not setting the shard version at all + if( ! ok && ! result["errmsg"].eoo() && ( result["errmsg"].String() == "need to specify namespace"/* 2.0.1/2 */ || + result["errmsg"].String() == "need to speciy namespace" /* 1.8 */ || + result["errmsg"].String() == "not master" /* both */ ) ) + { + ok = true; + } + + LOG(3) << "initial sharding result : " << result << endl; + + return ok; + + } + /** * @return true if had to do something */ @@ -91,31 +176,14 @@ namespace mongo { if ( ! conf ) return false; - DBClientBase* conn = 0; + DBClientBase* conn = getVersionable( &conn_in ); + assert(conn); // errors thrown above - switch ( conn_in.type() ) { - case ConnectionString::INVALID: - assert(0); - break; - case ConnectionString::MASTER: - // great - conn = &conn_in; - break; - case ConnectionString::PAIR: - assert( ! "pair not support for sharding" ); - break; - case ConnectionString::SYNC: - // TODO: we should check later that we aren't actually sharded on this - conn = &conn_in; - break; - case ConnectionString::SET: - DBClientReplicaSet* set = (DBClientReplicaSet*)&conn_in; - conn = &(set->masterConn()); - break; + if( ! connectionShardStatus.isInitialized( conn ) ){ + BSONObj result; + uassert( 15918, str::stream() << "cannot initialize version on shard " << conn->getServerAddress() << causedBy( result.toString() ), initShardVersion( *conn, result ) ); } - assert(conn); - unsigned long long officialSequenceNumber = 0; ChunkManagerPtr manager; @@ -140,6 +208,13 @@ namespace mongo { version = manager->getVersion( Shard::make( conn->getServerAddress() ) ); } + if( version == 0 ){ + LOG(2) << "resetting shard version of " << ns << " on " << conn->getServerAddress() << ", " << + ( ! isSharded ? "no longer sharded" : + ( ! manager ? "no chunk manager found" : + "version is zero" ) ) << endl; + } + LOG(2) << " have to set shard version for conn: " << conn << " ns:" << ns << " my last seq: " << sequenceNumber << " current: " << officialSequenceNumber << " version: " << version << " manager: " << manager.get() |