diff options
Diffstat (limited to 's/grid.cpp')
-rw-r--r-- | s/grid.cpp | 257 |
1 files changed, 191 insertions, 66 deletions
@@ -19,46 +19,47 @@ #include "pch.h" #include <iomanip> - #include "../client/connpool.h" #include "../util/stringutils.h" +#include "../util/unittest.h" #include "grid.h" #include "shard.h" namespace mongo { - - DBConfigPtr Grid::getDBConfig( string database , bool create , const string& shardNameHint ){ + + DBConfigPtr Grid::getDBConfig( string database , bool create , const string& shardNameHint ) { { string::size_type i = database.find( "." ); if ( i != string::npos ) database = database.substr( 0 , i ); } - + if ( database == "config" ) return configServerPtr; scoped_lock l( _lock ); DBConfigPtr& cc = _databases[database]; - if ( !cc ){ + if ( !cc ) { cc.reset(new DBConfig( database )); - if ( ! cc->load() ){ - if ( create ){ + if ( ! cc->load() ) { + if ( create ) { // note here that cc->primary == 0. log() << "couldn't find database [" << database << "] in config db" << endl; - - { // lets check case + + { + // lets check case ScopedDbConnection conn( configServer.modelServer() ); BSONObjBuilder b; b.appendRegex( "_id" , (string)"^" + database + "$" , "i" ); BSONObj d = conn->findOne( ShardNS::database , b.obj() ); conn.done(); - if ( ! d.isEmpty() ){ + if ( ! d.isEmpty() ) { cc.reset(); stringstream ss; - ss << "can't have 2 databases that just differ on case " + ss << "can't have 2 databases that just differ on case " << " have: " << d["_id"].String() << " want to add: " << database; @@ -67,20 +68,22 @@ namespace mongo { } Shard primary; - if ( database == "admin" ){ + if ( database == "admin" ) { primary = configServer.getPrimary(); - } else if ( shardNameHint.empty() ){ + } + else if ( shardNameHint.empty() ) { primary = Shard::pick(); - } else { + } + else { // use the shard name if provided Shard shard; shard.reset( shardNameHint ); primary = shard; } - if ( primary.ok() ){ + if ( primary.ok() ) { cc->setPrimary( primary.getName() ); // saves 'cc' to configDB log() << "\t put [" << database << "] on: " << primary << endl; } @@ -94,53 +97,63 @@ namespace mongo { cc.reset(); } } - + } - + return cc; } - void Grid::removeDB( string database ){ + void Grid::removeDB( string database ) { uassert( 10186 , "removeDB expects db name" , database.find( '.' ) == string::npos ); scoped_lock l( _lock ); _databases.erase( database ); - + } bool Grid::allowLocalHost() const { return _allowLocalShard; } - void Grid::setAllowLocalHost( bool allow ){ + void Grid::setAllowLocalHost( bool allow ) { _allowLocalShard = allow; } - bool Grid::addShard( string* name , const ConnectionString& servers , long long maxSize , string& errMsg ){ + bool Grid::addShard( string* name , const ConnectionString& servers , long long maxSize , string& errMsg ) { // name can be NULL, so privide a dummy one here to avoid testing it elsewhere string nameInternal; if ( ! name ) { name = &nameInternal; } - // Check whether the host (or set) exists and run several sanity checks on this request. + // Check whether the host (or set) exists and run several sanity checks on this request. // There are two set of sanity checks: making sure adding this particular shard is consistent - // with the replica set state (if it exists) and making sure this shards databases can be + // with the replica set state (if it exists) and making sure this shards databases can be // brought into the grid without conflict. vector<string> dbNames; try { ScopedDbConnection newShardConn( servers ); newShardConn->getLastError(); - - if ( newShardConn->type() == ConnectionString::SYNC ){ + + if ( newShardConn->type() == ConnectionString::SYNC ) { newShardConn.done(); errMsg = "can't use sync cluster as a shard. for replica set, have to use <setname>/<server1>,<server2>,..."; return false; } + BSONObj resIsMongos; + bool ok = newShardConn->runCommand( "admin" , BSON( "isdbgrid" << 1 ) , resIsMongos ); + + // should return ok=0, cmd not found if it's a normal mongod + if ( ok ) { + errMsg = "can't add a mongos process as a shard"; + newShardConn.done(); + return false; + } + BSONObj resIsMaster; - bool ok = newShardConn->runCommand( "admin" , BSON( "isMaster" << 1 ) , resIsMaster ); - if ( !ok ){ + ok = newShardConn->runCommand( "admin" , BSON( "isMaster" << 1 ) , resIsMaster ); + if ( !ok ) { ostringstream ss; ss << "failed running isMaster: " << resIsMaster; errMsg = ss.str(); @@ -151,7 +164,7 @@ namespace mongo { // if the shard has only one host, make sure it is not part of a replica set string setName = resIsMaster["setName"].str(); string commandSetName = servers.getSetName(); - if ( commandSetName.empty() && ! setName.empty() ){ + if ( commandSetName.empty() && ! setName.empty() ) { ostringstream ss; ss << "host is part of set: " << setName << " use replica set url format <setname>/<server1>,<server2>,...."; errMsg = ss.str(); @@ -160,7 +173,7 @@ namespace mongo { } // if the shard is part of replica set, make sure it is the right one - if ( ! commandSetName.empty() && ( commandSetName != setName ) ){ + if ( ! commandSetName.empty() && ( commandSetName != setName ) ) { ostringstream ss; ss << "host is part of a different set: " << setName; errMsg = ss.str(); @@ -168,30 +181,39 @@ namespace mongo { return false; } - // if the shard is part of a replica set, make sure all the hosts mentioned in 'servers' are part of + // if the shard is part of a replica set, make sure all the hosts mentioned in 'servers' are part of // the set. It is fine if not all members of the set are present in 'servers'. bool foundAll = true; string offendingHost; - if ( ! commandSetName.empty() ){ + if ( ! commandSetName.empty() ) { set<string> hostSet; BSONObjIterator iter( resIsMaster["hosts"].Obj() ); - while ( iter.more() ){ + while ( iter.more() ) { hostSet.insert( iter.next().String() ); // host:port } + if ( resIsMaster["passives"].isABSONObj() ) { + BSONObjIterator piter( resIsMaster["passives"].Obj() ); + while ( piter.more() ) { + hostSet.insert( piter.next().String() ); // host:port + } + } vector<HostAndPort> hosts = servers.getServers(); - for ( size_t i = 0 ; i < hosts.size() ; i++ ){ + for ( size_t i = 0 ; i < hosts.size() ; i++ ) { + if (!hosts[i].hasPort()) { + hosts[i].setPort(CmdLine::DefaultDBPort); + } string host = hosts[i].toString(); // host:port - if ( hostSet.find( host ) == hostSet.end() ){ + if ( hostSet.find( host ) == hostSet.end() ) { offendingHost = host; foundAll = false; break; } } } - if ( ! foundAll ){ + if ( ! foundAll ) { ostringstream ss; - ss << "host " << offendingHost << " does not belong to replica set " << setName;; + ss << "host " << offendingHost << " does not belong to replica set as a non-passive member" << setName;; errMsg = ss.str(); newShardConn.done(); return false; @@ -199,15 +221,15 @@ namespace mongo { // shard name defaults to the name of the replica set if ( name->empty() && ! setName.empty() ) - *name = setName; + *name = setName; - // In order to be accepted as a new shard, that mongod must not have any database name that exists already - // in any other shards. If that test passes, the new shard's databases are going to be entered as + // In order to be accepted as a new shard, that mongod must not have any database name that exists already + // in any other shards. If that test passes, the new shard's databases are going to be entered as // non-sharded db's whose primary is the newly added shard. BSONObj resListDB; ok = newShardConn->runCommand( "admin" , BSON( "listDatabases" << 1 ) , resListDB ); - if ( !ok ){ + if ( !ok ) { ostringstream ss; ss << "failed listing " << servers.toString() << "'s databases:" << resListDB; errMsg = ss.str(); @@ -216,20 +238,21 @@ namespace mongo { } BSONObjIterator i( resListDB["databases"].Obj() ); - while ( i.more() ){ + while ( i.more() ) { BSONObj dbEntry = i.next().Obj(); const string& dbName = dbEntry["name"].String(); - if ( _isSpecialLocalDB( dbName ) ){ + if ( _isSpecialLocalDB( dbName ) ) { // 'local', 'admin', and 'config' are system DBs and should be excluded here continue; - } else { + } + else { dbNames.push_back( dbName ); } } newShardConn.done(); } - catch ( DBException& e ){ + catch ( DBException& e ) { ostringstream ss; ss << "couldn't connect to new shard "; ss << e.what(); @@ -238,9 +261,9 @@ namespace mongo { } // check that none of the existing shard candidate's db's exist elsewhere - for ( vector<string>::const_iterator it = dbNames.begin(); it != dbNames.end(); ++it ){ + for ( vector<string>::const_iterator it = dbNames.begin(); it != dbNames.end(); ++it ) { DBConfigPtr config = getDBConfig( *it , false ); - if ( config.get() != NULL ){ + if ( config.get() != NULL ) { ostringstream ss; ss << "can't add shard " << servers.toString() << " because a local database '" << *it; ss << "' exists in another " << config->getPrimary().toString(); @@ -250,26 +273,26 @@ namespace mongo { } // if a name for a shard wasn't provided, pick one. - if ( name->empty() && ! _getNewShardName( name ) ){ + if ( name->empty() && ! _getNewShardName( name ) ) { errMsg = "error generating new shard name"; return false; } - + // build the ConfigDB shard document BSONObjBuilder b; b.append( "_id" , *name ); b.append( "host" , servers.toString() ); - if ( maxSize > 0 ){ + if ( maxSize > 0 ) { b.append( ShardFields::maxSize.name() , maxSize ); } BSONObj shardDoc = b.obj(); { ScopedDbConnection conn( configServer.getPrimary() ); - + // check whether the set of hosts (or single host) is not an already a known shard BSONObj old = conn->findOne( ShardNS::shard , BSON( "host" << servers.toString() ) ); - if ( ! old.isEmpty() ){ + if ( ! old.isEmpty() ) { errMsg = "host already used"; conn.done(); return false; @@ -279,7 +302,7 @@ namespace mongo { conn->insert( ShardNS::shard , shardDoc ); errMsg = conn->getLastError(); - if ( ! errMsg.empty() ){ + if ( ! errMsg.empty() ) { log() << "error adding shard: " << shardDoc << " err: " << errMsg << endl; conn.done(); return false; @@ -291,37 +314,37 @@ namespace mongo { Shard::reloadShardInfo(); // add all databases of the new shard - for ( vector<string>::const_iterator it = dbNames.begin(); it != dbNames.end(); ++it ){ + for ( vector<string>::const_iterator it = dbNames.begin(); it != dbNames.end(); ++it ) { DBConfigPtr config = getDBConfig( *it , true , *name ); - if ( ! config ){ - log() << "adding shard " << servers << " even though could not add database " << *it << endl; + if ( ! config ) { + log() << "adding shard " << servers << " even though could not add database " << *it << endl; } } return true; } - - bool Grid::knowAboutShard( const string& name ) const{ + + bool Grid::knowAboutShard( const string& name ) const { ShardConnection conn( configServer.getPrimary() , "" ); BSONObj shard = conn->findOne( ShardNS::shard , BSON( "host" << name ) ); conn.done(); return ! shard.isEmpty(); } - bool Grid::_getNewShardName( string* name ) const{ + bool Grid::_getNewShardName( string* name ) const { DEV assert( name ); bool ok = false; - int count = 0; + int count = 0; ShardConnection conn( configServer.getPrimary() , "" ); - BSONObj o = conn->findOne( ShardNS::shard , Query( fromjson ( "{_id: /^shard/}" ) ).sort( BSON( "_id" << -1 ) ) ); + BSONObj o = conn->findOne( ShardNS::shard , Query( fromjson ( "{_id: /^shard/}" ) ).sort( BSON( "_id" << -1 ) ) ); if ( ! o.isEmpty() ) { string last = o["_id"].String(); istringstream is( last.substr( 5 ) ); is >> count; count++; - } + } if (count < 9999) { stringstream ss; ss << "shard" << setfill('0') << setw(4) << count; @@ -337,14 +360,75 @@ namespace mongo { ShardConnection conn( configServer.getPrimary() , "" ); // look for the stop balancer marker - BSONObj stopMarker = conn->findOne( ShardNS::settings, BSON( "_id" << "balancer" << "stopped" << true ) ); + BSONObj balancerDoc = conn->findOne( ShardNS::settings, BSON( "_id" << "balancer" ) ); conn.done(); - return stopMarker.isEmpty(); + + boost::posix_time::ptime now = boost::posix_time::second_clock::local_time(); + if ( _balancerStopped( balancerDoc ) || ! _inBalancingWindow( balancerDoc , now ) ) { + return false; + } + + return true; + } + + bool Grid::_balancerStopped( const BSONObj& balancerDoc ) { + // check the 'stopped' marker maker + // if present, it is a simple bool + BSONElement stoppedElem = balancerDoc["stopped"]; + if ( ! stoppedElem.eoo() && stoppedElem.isBoolean() ) { + return stoppedElem.boolean(); + } + return false; + } + + bool Grid::_inBalancingWindow( const BSONObj& balancerDoc , const boost::posix_time::ptime& now ) { + // check the 'activeWindow' marker + // if present, it is an interval during the day when the balancer should be active + // { start: "08:00" , stop: "19:30" }, strftime format is %H:%M + BSONElement windowElem = balancerDoc["activeWindow"]; + if ( windowElem.eoo() ) { + return true; + } + + // check if both 'start' and 'stop' are present + if ( ! windowElem.isABSONObj() ) { + log(1) << "'activeWindow' format is { start: \"hh:mm\" , stop: ... }" << balancerDoc << endl; + return true; + } + BSONObj intervalDoc = windowElem.Obj(); + const string start = intervalDoc["start"].str(); + const string stop = intervalDoc["stop"].str(); + if ( start.empty() || stop.empty() ) { + log(1) << "must specify both start and end of balancing window: " << intervalDoc << endl; + return true; + } + + // check that both 'start' and 'stop' are valid time-of-day + boost::posix_time::ptime startTime, stopTime; + if ( ! toPointInTime( start , &startTime ) || ! toPointInTime( stop , &stopTime ) ) { + log(1) << "cannot parse active window (use hh:mm 24hs format): " << intervalDoc << endl; + return true; + } + + // allow balancing if during the activeWindow + // note that a window may be open during the night + if ( stopTime > startTime ) { + if ( ( now >= startTime ) && ( now <= stopTime ) ) { + return true; + } + } + else if ( startTime > stopTime ) { + if ( ( now >=startTime ) || ( now <= stopTime ) ) { + return true; + } + } + + return false; } unsigned long long Grid::getNextOpTime() const { ScopedDbConnection conn( configServer.getPrimary() ); - + BSONObj result; massert( 10421 , "getoptime failed" , conn->simpleCommand( "admin" , &result , "getoptime" ) ); conn.done(); @@ -352,10 +436,51 @@ namespace mongo { return result["optime"]._numberLong(); } - bool Grid::_isSpecialLocalDB( const string& dbName ){ + bool Grid::_isSpecialLocalDB( const string& dbName ) { return ( dbName == "local" ) || ( dbName == "admin" ) || ( dbName == "config" ); } Grid grid; -} + // unit tests + + class BalancingWindowUnitTest : public UnitTest { + public: + void run() { + // T0 < T1 < now < T2 < T3 and Error + const string T0 = "9:00"; + const string T1 = "11:00"; + boost::posix_time::ptime now( currentDate(), boost::posix_time::hours( 13 ) + boost::posix_time::minutes( 48 ) ); + const string T2 = "17:00"; + const string T3 = "21:30"; + const string E = "28:35"; + + BSONObj w1 = BSON( "activeWindow" << BSON( "start" << T0 << "stop" << T1 ) ); // closed in the past + BSONObj w2 = BSON( "activeWindow" << BSON( "start" << T2 << "stop" << T3 ) ); // not opened until the future + BSONObj w3 = BSON( "activeWindow" << BSON( "start" << T1 << "stop" << T2 ) ); // open now + BSONObj w4 = BSON( "activeWindow" << BSON( "start" << T3 << "stop" << T2 ) ); // open since last day + + assert( ! Grid::_inBalancingWindow( w1 , now ) ); + assert( ! Grid::_inBalancingWindow( w2 , now ) ); + assert( Grid::_inBalancingWindow( w3 , now ) ); + assert( Grid::_inBalancingWindow( w4 , now ) ); + + // bad input should not stop the balancer + + BSONObj w5; // empty window + BSONObj w6 = BSON( "activeWindow" << BSON( "start" << 1 ) ); // missing stop + BSONObj w7 = BSON( "activeWindow" << BSON( "stop" << 1 ) ); // missing start + BSONObj w8 = BSON( "wrongMarker" << 1 << "start" << 1 << "stop" << 1 ); // active window marker missing + BSONObj w9 = BSON( "activeWindow" << BSON( "start" << T3 << "stop" << E ) ); // garbage in window + + assert( Grid::_inBalancingWindow( w5 , now ) ); + assert( Grid::_inBalancingWindow( w6 , now ) ); + assert( Grid::_inBalancingWindow( w7 , now ) ); + assert( Grid::_inBalancingWindow( w8 , now ) ); + assert( Grid::_inBalancingWindow( w9 , now ) ); + + log(1) << "BalancingWidowObjTest passed" << endl; + } + } BalancingWindowObjTest; + +} |