diff options
Diffstat (limited to 'db/repl/rs.cpp')
-rw-r--r-- | db/repl/rs.cpp | 282 |
1 files changed, 193 insertions, 89 deletions
diff --git a/db/repl/rs.cpp b/db/repl/rs.cpp index 1c0444a..90ed9f4 100644 --- a/db/repl/rs.cpp +++ b/db/repl/rs.cpp @@ -20,9 +20,12 @@ #include "../client.h" #include "../../client/dbclient.h" #include "../dbhelpers.h" +#include "../../s/d_logic.h" #include "rs.h" +#include "connections.h" +#include "../repl.h" -namespace mongo { +namespace mongo { using namespace bson; @@ -30,18 +33,18 @@ namespace mongo { ReplSet *theReplSet = 0; extern string *discoveredSeed; - void ReplSetImpl::sethbmsg(string s, int logLevel) { + void ReplSetImpl::sethbmsg(string s, int logLevel) { static time_t lastLogged; _hbmsgTime = time(0); - if( s == _hbmsg ) { + if( s == _hbmsg ) { // unchanged if( _hbmsgTime - lastLogged < 60 ) return; } unsigned sz = s.size(); - if( sz >= 256 ) + if( sz >= 256 ) memcpy(_hbmsg, s.c_str(), 255); else { _hbmsg[sz] = 0; @@ -53,7 +56,7 @@ namespace mongo { } } - void ReplSetImpl::assumePrimary() { + void ReplSetImpl::assumePrimary() { assert( iAmPotentiallyHot() ); writelock lk("admin."); // so we are synchronized with _logOp() box.setSelfPrimary(_self); @@ -62,17 +65,26 @@ namespace mongo { void ReplSetImpl::changeState(MemberState s) { box.change(s, _self); } - void ReplSetImpl::relinquish() { + const bool closeOnRelinquish = true; + + void ReplSetImpl::relinquish() { if( box.getState().primary() ) { log() << "replSet relinquishing primary state" << rsLog; - changeState(MemberState::RS_RECOVERING); - - /* close sockets that were talking to us */ - /*log() << "replSet closing sockets after reqlinquishing primary" << rsLog; - MessagingPort::closeAllSockets(1);*/ + changeState(MemberState::RS_SECONDARY); + + if( closeOnRelinquish ) { + /* close sockets that were talking to us so they don't blithly send many writes that will fail + with "not master" (of course client could check result code, but in case they are not) + */ + log() << "replSet closing client sockets after reqlinquishing primary" << rsLog; + MessagingPort::closeAllSockets(1); + } + + // now that all connections were closed, strip this mongod from all sharding details + // if and when it gets promoted to a primary again, only then it should reload the sharding state + // the rationale here is that this mongod won't bring stale state when it regains primaryhood + shardingState.resetShardingState(); - // todo: > - //changeState(MemberState::RS_SECONDARY); } else if( box.getState().startup2() ) { // ? add comment @@ -81,26 +93,48 @@ namespace mongo { } /* look freshly for who is primary - includes relinquishing ourself. */ - void ReplSetImpl::forgetPrimary() { - if( box.getState().primary() ) + void ReplSetImpl::forgetPrimary() { + if( box.getState().primary() ) relinquish(); else { box.setOtherPrimary(0); } } - bool ReplSetImpl::_stepDown() { + // for the replSetStepDown command + bool ReplSetImpl::_stepDown(int secs) { lock lk(this); - if( box.getState().primary() ) { - changeState(MemberState::RS_RECOVERING); - elect.steppedDown = time(0) + 60; - log() << "replSet info stepped down as primary" << rsLog; + if( box.getState().primary() ) { + elect.steppedDown = time(0) + secs; + log() << "replSet info stepping down as primary secs=" << secs << rsLog; + relinquish(); return true; } return false; } - void ReplSetImpl::msgUpdateHBInfo(HeartbeatInfo h) { + bool ReplSetImpl::_freeze(int secs) { + lock lk(this); + /* note if we are primary we remain primary but won't try to elect ourself again until + this time period expires. + */ + if( secs == 0 ) { + elect.steppedDown = 0; + log() << "replSet info 'unfreezing'" << rsLog; + } + else { + if( !box.getState().primary() ) { + elect.steppedDown = time(0) + secs; + log() << "replSet info 'freezing' for " << secs << " seconds" << rsLog; + } + else { + log() << "replSet info received freeze command but we are primary" << rsLog; + } + } + return true; + } + + void ReplSetImpl::msgUpdateHBInfo(HeartbeatInfo h) { for( Member *m = _members.head(); m; m=m->next() ) { if( m->id() == h.id() ) { m->_hbinfo = h; @@ -109,7 +143,7 @@ namespace mongo { } } - list<HostAndPort> ReplSetImpl::memberHostnames() const { + list<HostAndPort> ReplSetImpl::memberHostnames() const { list<HostAndPort> L; L.push_back(_self->h()); for( Member *m = _members.head(); m; m = m->next() ) @@ -118,6 +152,7 @@ namespace mongo { } void ReplSetImpl::_fillIsMasterHost(const Member *m, vector<string>& hosts, vector<string>& passives, vector<string>& arbiters) { + assert( m ); if( m->config().hidden ) return; @@ -126,8 +161,9 @@ namespace mongo { } else if( !m->config().arbiterOnly ) { if( m->config().slaveDelay ) { - /* hmmm - we don't list these as they are stale. */ - } else { + /* hmmm - we don't list these as they are stale. */ + } + else { passives.push_back(m->h().toString()); } } @@ -147,6 +183,7 @@ namespace mongo { _fillIsMasterHost(_self, hosts, passives, arbiters); for( Member *m = _members.head(); m; m = m->next() ) { + assert( m ); _fillIsMasterHost(m, hosts, passives, arbiters); } @@ -161,23 +198,27 @@ namespace mongo { } } - if( !isp ) { + if( !isp ) { const Member *m = sp.primary; if( m ) b.append("primary", m->h().toString()); } if( myConfig().arbiterOnly ) b.append("arbiterOnly", true); + if( myConfig().priority == 0 ) + b.append("passive", true); if( myConfig().slaveDelay ) b.append("slaveDelay", myConfig().slaveDelay); if( myConfig().hidden ) b.append("hidden", true); + if( !myConfig().buildIndexes ) + b.append("buildIndexes", false); } /** @param cfgString <setname>/<seedhost1>,<seedhost2> */ - void parseReplsetCmdLine(string cfgString, string& setname, vector<HostAndPort>& seeds, set<HostAndPort>& seedSet ) { - const char *p = cfgString.c_str(); + void parseReplsetCmdLine(string cfgString, string& setname, vector<HostAndPort>& seeds, set<HostAndPort>& seedSet ) { + const char *p = cfgString.c_str(); const char *slash = strchr(p, '/'); if( slash ) setname = string(p, slash-p); @@ -207,7 +248,8 @@ namespace mongo { //uassert(13101, "can't use localhost in replset host list", !m.isLocalHost()); if( m.isSelf() ) { log(1) << "replSet ignoring seed " << m.toString() << " (=self)" << rsLog; - } else + } + else seeds.push_back(m); if( *comma == 0 ) break; @@ -216,10 +258,9 @@ namespace mongo { } } - ReplSetImpl::ReplSetImpl(ReplSetCmdline& replSetCmdline) : elect(this), - _self(0), - mgr( new Manager(this) ) - { + ReplSetImpl::ReplSetImpl(ReplSetCmdline& replSetCmdline) : elect(this), + _self(0), + mgr( new Manager(this) ) { _cfg = 0; memset(_hbmsg, 0, sizeof(_hbmsg)); *_hbmsg = '.'; // temp...just to see @@ -240,20 +281,21 @@ namespace mongo { } for( set<HostAndPort>::iterator i = replSetCmdline.seedSet.begin(); i != replSetCmdline.seedSet.end(); i++ ) { if( i->isSelf() ) { - if( sss == 1 ) + if( sss == 1 ) log(1) << "replSet warning self is listed in the seed list and there are no other seeds listed did you intend that?" << rsLog; - } else + } + else log() << "replSet warning command line seed " << i->toString() << " is not present in the current repl set config" << rsLog; } } void newReplUp(); - void ReplSetImpl::loadLastOpTimeWritten() { + void ReplSetImpl::loadLastOpTimeWritten() { //assert( lastOpTimeWritten.isNull() ); readlock lk(rsoplog); BSONObj o; - if( Helpers::getLast(rsoplog, o) ) { + if( Helpers::getLast(rsoplog, o) ) { lastH = o["h"].numberLong(); lastOpTimeWritten = o["ts"]._opTime(); uassert(13290, "bad replSet oplog entry?", !lastOpTimeWritten.isNull()); @@ -261,11 +303,11 @@ namespace mongo { } /* call after constructing to start - returns fairly quickly after launching its threads */ - void ReplSetImpl::_go() { - try { + void ReplSetImpl::_go() { + try { loadLastOpTimeWritten(); } - catch(std::exception& e) { + catch(std::exception& e) { log() << "replSet error fatal couldn't query the local " << rsoplog << " collection. Terminating mongod after 30 seconds." << rsLog; log() << e.what() << rsLog; sleepsecs(30); @@ -283,11 +325,17 @@ namespace mongo { extern BSONObj *getLastErrorDefault; + void ReplSetImpl::setSelfTo(Member *m) { + _self = m; + if( m ) _buildIndexes = m->config().buildIndexes; + else _buildIndexes = true; + } + /** @param reconf true if this is a reconfiguration and not an initial load of the configuration. @return true if ok; throws if config really bad; false if config doesn't include self */ bool ReplSetImpl::initFromConfig(ReplSetConfig& c, bool reconf) { - /* NOTE: haveNewConfig() writes the new config to disk before we get here. So + /* NOTE: haveNewConfig() writes the new config to disk before we get here. So we cannot error out at this point, except fatally. Check errors earlier. */ lock lk(this); @@ -302,25 +350,24 @@ namespace mongo { { unsigned nfound = 0; int me = 0; - for( vector<ReplSetConfig::MemberCfg>::iterator i = c.members.begin(); i != c.members.end(); i++ ) { + for( vector<ReplSetConfig::MemberCfg>::iterator i = c.members.begin(); i != c.members.end(); i++ ) { const ReplSetConfig::MemberCfg& m = *i; if( m.h.isSelf() ) { nfound++; me++; - if( !reconf || (_self && _self->id() == (unsigned) m._id) ) ; - else { + else { log() << "replSet " << _self->id() << ' ' << m._id << rsLog; assert(false); } } - else if( reconf ) { + else if( reconf ) { const Member *old = findById(m._id); - if( old ) { + if( old ) { nfound++; assert( (int) old->id() == m._id ); - if( old->config() == m ) { + if( old->config() == m ) { additive = false; } } @@ -328,16 +375,24 @@ namespace mongo { newOnes.push_back(&m); } } + + // change timeout settings, if necessary + ScopedConn conn(m.h.toString()); + conn.setTimeout(c.ho.heartbeatTimeoutMillis/1000.0); } if( me == 0 ) { + // initial startup with fastsync + if (!reconf && replSettings.fastsync) { + return false; + } // log() << "replSet config : " << _cfg->toString() << rsLog; - log() << "replSet error can't find self in the repl set configuration:" << rsLog; + log() << "replSet error self not present in the repl set configuration:" << rsLog; log() << c.toString() << rsLog; - assert(false); + uasserted(13497, "replSet error self not present in the configuration"); } uassert( 13302, "replSet error self appears twice in the repl set configuration", me<=1 ); - if( reconf && config().members.size() != nfound ) + if( reconf && config().members.size() != nfound ) additive = false; } @@ -347,14 +402,14 @@ namespace mongo { _name = _cfg->_id; assert( !_name.empty() ); - if( additive ) { + if( additive ) { log() << "replSet info : additive change to configuration" << rsLog; for( list<const ReplSetConfig::MemberCfg*>::const_iterator i = newOnes.begin(); i != newOnes.end(); i++ ) { const ReplSetConfig::MemberCfg* m = *i; Member *mi = new Member(m->h, m->_id, m, false); - /** we will indicate that new members are up() initially so that we don't relinquish our - primary state because we can't (transiently) see a majority. they should be up as we + /** we will indicate that new members are up() initially so that we don't relinquish our + primary state because we can't (transiently) see a majority. they should be up as we check that new members are up before getting here on reconfig anyway. */ mi->get_hbinfo().health = 0.1; @@ -373,20 +428,30 @@ namespace mongo { int oldPrimaryId = -1; { const Member *p = box.getPrimary(); - if( p ) + if( p ) oldPrimaryId = p->id(); } forgetPrimary(); - _self = 0; - for( vector<ReplSetConfig::MemberCfg>::iterator i = _cfg->members.begin(); i != _cfg->members.end(); i++ ) { + + bool iWasArbiterOnly = _self ? iAmArbiterOnly() : false; + setSelfTo(0); + for( vector<ReplSetConfig::MemberCfg>::iterator i = _cfg->members.begin(); i != _cfg->members.end(); i++ ) { const ReplSetConfig::MemberCfg& m = *i; Member *mi; if( m.h.isSelf() ) { assert( _self == 0 ); - mi = _self = new Member(m.h, m._id, &m, true); + mi = new Member(m.h, m._id, &m, true); + setSelfTo(mi); + + // if the arbiter status changed + if (iWasArbiterOnly ^ iAmArbiterOnly()) { + _changeArbiterState(); + } + if( (int)mi->id() == oldPrimaryId ) box.setSelfPrimary(mi); - } else { + } + else { mi = new Member(m.h, m._id, &m, false); _members.push(mi); startHealthTaskFor(mi); @@ -397,26 +462,57 @@ namespace mongo { return true; } + void startSyncThread(); + + void ReplSetImpl::_changeArbiterState() { + if (iAmArbiterOnly()) { + changeState(MemberState::RS_ARBITER); + + // if there is an oplog, free it + // not sure if this is necessary, maybe just leave the oplog and let + // the user delete it if they want the space? + writelock lk(rsoplog); + Client::Context c(rsoplog); + NamespaceDetails *d = nsdetails(rsoplog); + if (d) { + string errmsg; + bob res; + dropCollection(rsoplog, errmsg, res); + + // clear last op time to force initial sync (if the arbiter + // becomes a "normal" server again) + lastOpTimeWritten = OpTime(); + } + } + else { + changeState(MemberState::RS_RECOVERING); + + // oplog will be allocated when sync begins + /* TODO : could this cause two sync threads to exist (race condition)? */ + boost::thread t(startSyncThread); + } + } + // Our own config must be the first one. - bool ReplSetImpl::_loadConfigFinish(vector<ReplSetConfig>& cfgs) { + bool ReplSetImpl::_loadConfigFinish(vector<ReplSetConfig>& cfgs) { int v = -1; ReplSetConfig *highest = 0; int myVersion = -2000; int n = 0; - for( vector<ReplSetConfig>::iterator i = cfgs.begin(); i != cfgs.end(); i++ ) { + for( vector<ReplSetConfig>::iterator i = cfgs.begin(); i != cfgs.end(); i++ ) { ReplSetConfig& cfg = *i; if( ++n == 1 ) myVersion = cfg.version; - if( cfg.ok() && cfg.version > v ) { + if( cfg.ok() && cfg.version > v ) { highest = &cfg; v = cfg.version; } } assert( highest ); - if( !initFromConfig(*highest) ) + if( !initFromConfig(*highest) ) return false; - if( highest->version > myVersion && highest->version >= 0 ) { + if( highest->version > myVersion && highest->version >= 0 ) { log() << "replSet got config version " << highest->version << " from a remote, saving locally" << rsLog; writelock lk("admin."); highest->saveConfigLocally(BSONObj()); @@ -430,7 +526,7 @@ namespace mongo { startupStatusMsg = "loading " + rsConfigNs + " config (LOADINGCONFIG)"; try { vector<ReplSetConfig> configs; - try { + try { configs.push_back( ReplSetConfig(HostAndPort::me()) ); } catch(DBException& e) { @@ -438,26 +534,26 @@ namespace mongo { throw; } for( vector<HostAndPort>::const_iterator i = _seeds->begin(); i != _seeds->end(); i++ ) { - try { + try { configs.push_back( ReplSetConfig(*i) ); } - catch( DBException& e ) { + catch( DBException& e ) { log() << "replSet exception trying to load config from " << *i << " : " << e.toString() << rsLog; } } - if( discoveredSeed ) { + if( discoveredSeed ) { try { configs.push_back( ReplSetConfig(HostAndPort(*discoveredSeed)) ); } - catch( DBException& ) { + catch( DBException& ) { log(1) << "replSet exception trying to load config from discovered seed " << *discoveredSeed << rsLog; } } int nok = 0; int nempty = 0; - for( vector<ReplSetConfig>::iterator i = configs.begin(); i != configs.end(); i++ ) { + for( vector<ReplSetConfig>::iterator i = configs.begin(); i != configs.end(); i++ ) { if( i->ok() ) nok++; if( i->empty() ) @@ -469,7 +565,9 @@ namespace mongo { startupStatus = EMPTYCONFIG; startupStatusMsg = "can't get " + rsConfigNs + " config from self or any seed (EMPTYCONFIG)"; log() << "replSet can't get " << rsConfigNs << " config from self or any seed (EMPTYCONFIG)" << rsLog; - log(1) << "replSet have you ran replSetInitiate yet?" << rsLog; + static unsigned once; + if( ++once == 1 ) + log() << "replSet info you may need to run replSetInitiate -- rs.initiate() in the shell -- if that is not already done" << rsLog; if( _seeds->size() == 0 ) log(1) << "replSet info no seed hosts were specified on the --replSet command line" << rsLog; } @@ -483,13 +581,13 @@ namespace mongo { continue; } - if( !_loadConfigFinish(configs) ) { + if( !_loadConfigFinish(configs) ) { log() << "replSet info Couldn't load config yet. Sleeping 20sec and will try again." << rsLog; sleepsecs(20); continue; } } - catch(DBException& e) { + catch(DBException& e) { startupStatus = BADCONFIG; startupStatusMsg = "replSet error loading set config (BADCONFIG)"; log() << "replSet error loading configurations " << e.toString() << rsLog; @@ -504,30 +602,34 @@ namespace mongo { startupStatus = STARTED; } - void ReplSetImpl::_fatal() - { + void ReplSetImpl::_fatal() { //lock l(this); box.set(MemberState::RS_FATAL, 0); //sethbmsg("fatal error"); - log() << "replSet error fatal, stopping replication" << rsLog; + log() << "replSet error fatal, stopping replication" << rsLog; } - void ReplSet::haveNewConfig(ReplSetConfig& newConfig, bool addComment) { + void ReplSet::haveNewConfig(ReplSetConfig& newConfig, bool addComment) { lock l(this); // convention is to lock replset before taking the db rwlock writelock lk(""); bo comment; if( addComment ) comment = BSON( "msg" << "Reconfig set" << "version" << newConfig.version ); newConfig.saveConfigLocally(comment); - try { + try { initFromConfig(newConfig, true); log() << "replSet replSetReconfig new config saved locally" << rsLog; } - catch(DBException& e) { + catch(DBException& e) { + if( e.getCode() == 13497 /* removed from set */ ) { + cc().shutdown(); + dbexit( EXIT_CLEAN , "removed from replica set" ); // never returns + assert(0); + } log() << "replSet error unexpected exception in haveNewConfig() : " << e.toString() << rsLog; _fatal(); } - catch(...) { + catch(...) { log() << "replSet error unexpected exception in haveNewConfig()" << rsLog; _fatal(); } @@ -538,30 +640,33 @@ namespace mongo { ReplSetConfig c(o); if( c.version > rs->config().version ) theReplSet->haveNewConfig(c, false); - else { - log() << "replSet info msgReceivedNewConfig but version isn't higher " << - c.version << ' ' << rs->config().version << rsLog; + else { + log() << "replSet info msgReceivedNewConfig but version isn't higher " << + c.version << ' ' << rs->config().version << rsLog; } } - /* forked as a thread during startup - it can run quite a while looking for config. but once found, + /* forked as a thread during startup + it can run quite a while looking for config. but once found, a separate thread takes over as ReplSetImpl::Manager, and this thread terminates. */ void startReplSets(ReplSetCmdline *replSetCmdline) { Client::initThread("startReplSets"); - try { + try { assert( theReplSet == 0 ); if( replSetCmdline == 0 ) { assert(!replSet); return; } + if( !noauth ) { + cc().getAuthenticationInfo()->authorize("local"); + } (theReplSet = new ReplSet(*replSetCmdline))->go(); } - catch(std::exception& e) { + catch(std::exception& e) { log() << "replSet caught exception in startReplSets thread: " << e.what() << rsLog; - if( theReplSet ) + if( theReplSet ) theReplSet->fatal(); } cc().shutdown(); @@ -569,10 +674,9 @@ namespace mongo { } -namespace boost { +namespace boost { - void assertion_failed(char const * expr, char const * function, char const * file, long line) - { + void assertion_failed(char const * expr, char const * function, char const * file, long line) { mongo::log() << "boost assertion failure " << expr << ' ' << function << ' ' << file << ' ' << line << endl; } |