diff options
Diffstat (limited to 'db/repl')
-rw-r--r-- | db/repl/consensus.cpp | 12 | ||||
-rw-r--r-- | db/repl/health.cpp | 34 | ||||
-rw-r--r-- | db/repl/health.h | 2 | ||||
-rw-r--r-- | db/repl/heartbeat.cpp | 27 | ||||
-rw-r--r-- | db/repl/manager.cpp | 30 | ||||
-rw-r--r-- | db/repl/replset_commands.cpp | 18 | ||||
-rw-r--r-- | db/repl/rs.cpp | 138 | ||||
-rw-r--r-- | db/repl/rs.h | 74 | ||||
-rw-r--r-- | db/repl/rs_config.cpp | 57 | ||||
-rw-r--r-- | db/repl/rs_config.h | 13 | ||||
-rw-r--r-- | db/repl/rs_initialsync.cpp | 2 | ||||
-rw-r--r-- | db/repl/rs_member.h | 19 | ||||
-rw-r--r-- | db/repl/rs_rollback.cpp | 291 | ||||
-rw-r--r-- | db/repl/rs_sync.cpp | 122 |
14 files changed, 656 insertions, 183 deletions
diff --git a/db/repl/consensus.cpp b/db/repl/consensus.cpp index 4eba17d..4044538 100644 --- a/db/repl/consensus.cpp +++ b/db/repl/consensus.cpp @@ -83,6 +83,17 @@ namespace mongo { return vUp * 2 > totalVotes(); } + bool Consensus::shouldRelinquish() const { + int vUp = rs._self->config().votes; + const long long T = rs.config().ho.heartbeatTimeoutMillis * rs.config().ho.heartbeatConnRetries; + for( Member *m = rs.head(); m; m=m->next() ) { + long long dt = m->hbinfo().timeDown(); + if( dt < T ) + vUp += m->config().votes; + } + return !( vUp * 2 > totalVotes() ); + } + static const int VETO = -10000; const time_t LeaseTime = 30; @@ -322,6 +333,7 @@ namespace mongo { void Consensus::electSelf() { assert( !rs.lockedByMe() ); assert( !rs.myConfig().arbiterOnly ); + assert( rs.myConfig().slaveDelay == 0 ); try { _electSelf(); } diff --git a/db/repl/health.cpp b/db/repl/health.cpp index b0be25f..72396fe 100644 --- a/db/repl/health.cpp +++ b/db/repl/health.cpp @@ -89,11 +89,13 @@ namespace mongo { } s << td(config().votes); { - string stateText = ReplSet::stateAsStr(state()); + string stateText = state().toString(); + if( _config.hidden ) + stateText += " (hidden)"; if( ok || stateText.empty() ) s << td(stateText); // text blank if we've never connected else - s << td( grey(str::stream() << "(was " << ReplSet::stateAsStr(state()) << ')', true) ); + s << td( grey(str::stream() << "(was " << state().toString() << ')', true) ); } s << td( grey(hbinfo().lastHeartbeatMsg,!ok) ); stringstream q; @@ -105,28 +107,30 @@ namespace mongo { s << td(""); s << _tr(); } - + string ReplSetImpl::stateAsHtml(MemberState s) { if( s.s == MemberState::RS_STARTUP ) return a("", "serving still starting up, or still trying to initiate the set", "STARTUP"); if( s.s == MemberState::RS_PRIMARY ) return a("", "this server thinks it is primary", "PRIMARY"); if( s.s == MemberState::RS_SECONDARY ) return a("", "this server thinks it is a secondary (slave mode)", "SECONDARY"); if( s.s == MemberState::RS_RECOVERING ) return a("", "recovering/resyncing; after recovery usually auto-transitions to secondary", "RECOVERING"); - if( s.s == MemberState::RS_FATAL ) return a("", "something bad has occurred and server is not completely offline with regard to the replica set. fatal error.", "RS_FATAL"); - if( s.s == MemberState::RS_STARTUP2 ) return a("", "loaded config, still determining who is primary", "RS_STARTUP2"); + if( s.s == MemberState::RS_FATAL ) return a("", "something bad has occurred and server is not completely offline with regard to the replica set. fatal error.", "FATAL"); + if( s.s == MemberState::RS_STARTUP2 ) return a("", "loaded config, still determining who is primary", "STARTUP2"); if( s.s == MemberState::RS_ARBITER ) return a("", "this server is an arbiter only", "ARBITER"); if( s.s == MemberState::RS_DOWN ) return a("", "member is down, slow, or unreachable", "DOWN"); + if( s.s == MemberState::RS_ROLLBACK ) return a("", "rolling back operations to get in sync", "ROLLBACK"); return ""; } - string ReplSetImpl::stateAsStr(MemberState s) { - if( s.s == MemberState::RS_STARTUP ) return "STARTUP"; - if( s.s == MemberState::RS_PRIMARY ) return "PRIMARY"; - if( s.s == MemberState::RS_SECONDARY ) return "SECONDARY"; - if( s.s == MemberState::RS_RECOVERING ) return "RECOVERING"; - if( s.s == MemberState::RS_FATAL ) return "FATAL"; - if( s.s == MemberState::RS_STARTUP2 ) return "STARTUP2"; - if( s.s == MemberState::RS_ARBITER ) return "ARBITER"; - if( s.s == MemberState::RS_DOWN ) return "DOWN"; + string MemberState::toString() const { + if( s == MemberState::RS_STARTUP ) return "STARTUP"; + if( s == MemberState::RS_PRIMARY ) return "PRIMARY"; + if( s == MemberState::RS_SECONDARY ) return "SECONDARY"; + if( s == MemberState::RS_RECOVERING ) return "RECOVERING"; + if( s == MemberState::RS_FATAL ) return "FATAL"; + if( s == MemberState::RS_STARTUP2 ) return "STARTUP2"; + if( s == MemberState::RS_ARBITER ) return "ARBITER"; + if( s == MemberState::RS_DOWN ) return "DOWN"; + if( s == MemberState::RS_ROLLBACK ) return "ROLLBACK"; return ""; } @@ -302,7 +306,7 @@ namespace mongo { td(ago(started)) << td("") << // last heartbeat td(ToString(_self->config().votes)) << - td(stateAsHtml(box.getState())); + td( stateAsHtml(box.getState()) + (_self->config().hidden?" (hidden)":"") ); s << td( _hbmsg ); stringstream q; q << "/_replSetOplog?" << _self->id(); diff --git a/db/repl/health.h b/db/repl/health.h index 8b1005e..645a3b5 100644 --- a/db/repl/health.h +++ b/db/repl/health.h @@ -27,7 +27,7 @@ namespace mongo { HealthOptions() { heartbeatSleepMillis = 2000; heartbeatTimeoutMillis = 10000; - heartbeatConnRetries = 3; + heartbeatConnRetries = 2; } bool isDefault() const { return *this == HealthOptions(); } diff --git a/db/repl/heartbeat.cpp b/db/repl/heartbeat.cpp index 78ce5d1..4f28897 100644 --- a/db/repl/heartbeat.cpp +++ b/db/repl/heartbeat.cpp @@ -40,6 +40,13 @@ namespace mongo { // hacky string *discoveredSeed = 0; + long long HeartbeatInfo::timeDown() const { + if( up() ) return 0; + if( downSince == 0 ) + return 0; // still waiting on first heartbeat + return jsTime() - downSince; + } + /* { replSetHeartbeat : <setname> } */ class CmdReplSetHeartbeat : public ReplSetCommand { public: @@ -55,6 +62,14 @@ namespace mongo { errmsg = "not running with --replSet"; return false; } + + /* we want to keep heartbeat connections open when relinquishing primary. tag them here. */ + { + MessagingPort *mp = cc()._mp; + if( mp ) + mp->tag |= 1; + } + if( cmdObj["pv"].Int() != 1 ) { errmsg = "incompatible replset protocol version"; return false; @@ -91,7 +106,7 @@ namespace mongo { result.append("set", theReplSet->name()); result.append("state", theReplSet->state().s); result.append("hbmsg", theReplSet->hbmsg()); - result.append("time", (int) time(0)); + result.append("time", (long long) time(0)); result.appendDate("opTime", theReplSet->lastOpTimeWritten.asDate()); int v = theReplSet->config().version; result.append("v", v); @@ -196,7 +211,12 @@ namespace mongo { static time_t last = 0; time_t now = time(0); - if( mem.changed(old) || now-last>4 ) { + bool changed = mem.changed(old); + if( changed ) { + if( old.hbstate != mem.hbstate ) + log() << "replSet " << h.toString() << ' ' << mem.hbstate.toString() << rsLog; + } + if( changed || now-last>4 ) { last = now; theReplSet->mgr->send( boost::bind(&Manager::msgCheckNewState, theReplSet->mgr) ); } @@ -205,8 +225,9 @@ namespace mongo { private: void down(HeartbeatInfo& mem, string msg) { mem.health = 0.0; - if( mem.upSince ) { + if( mem.upSince || mem.downSince == 0 ) { mem.upSince = 0; + mem.downSince = jsTime(); log() << "replSet info " << h.toString() << " is now down (or slow to respond)" << rsLog; } mem.lastHeartbeatMsg = msg; diff --git a/db/repl/manager.cpp b/db/repl/manager.cpp index e870688..862ac46 100644 --- a/db/repl/manager.cpp +++ b/db/repl/manager.cpp @@ -29,12 +29,17 @@ namespace mongo { }; /* check members OTHER THAN US to see if they think they are primary */ - const Member * Manager::findOtherPrimary() { + const Member * Manager::findOtherPrimary(bool& two) { + two = false; Member *m = rs->head(); Member *p = 0; while( m ) { + DEV assert( m != rs->_self ); if( m->state().primary() && m->hbinfo().up() ) { - if( p ) throw "twomasters"; // our polling is asynchronous, so this is often ok. + if( p ) { + two = true; + return 0; + } p = m; } m = m->next(); @@ -63,7 +68,11 @@ namespace mongo { if( rs->box.getPrimary() == m ) return; rs->_self->lhb() = ""; - rs->box.set(rs->iAmArbiterOnly() ? MemberState::RS_ARBITER : MemberState::RS_RECOVERING, m); + if( rs->iAmArbiterOnly() ) { + rs->box.set(MemberState::RS_ARBITER, m); + } else { + rs->box.noteRemoteIsPrimary(m); + } } /** called as the health threads get new results */ @@ -87,11 +96,14 @@ namespace mongo { } const Member *p2; - try { p2 = findOtherPrimary(); } - catch(string s) { - /* two other nodes think they are primary (asynchronously polled) -- wait for things to settle down. */ - log() << "replSet warning DIAG 2 primary" << s << rsLog; - return; + { + bool two; + p2 = findOtherPrimary(two); + if( two ) { + /* two other nodes think they are primary (asynchronously polled) -- wait for things to settle down. */ + log() << "replSet warning DIAG two primaries (transiently)" << rsLog; + return; + } } if( p2 ) { @@ -136,7 +148,7 @@ namespace mongo { return; } - if( !rs->elect.aMajoritySeemsToBeUp() ) { + if( rs->elect.shouldRelinquish() ) { log() << "replSet can't see a majority of the set, relinquishing primary" << rsLog; rs->relinquish(); } diff --git a/db/repl/replset_commands.cpp b/db/repl/replset_commands.cpp index f8f46d5..328b0ab 100644 --- a/db/repl/replset_commands.cpp +++ b/db/repl/replset_commands.cpp @@ -34,19 +34,27 @@ namespace mongo { */ bool replSetBlind = false; + unsigned replSetForceInitialSyncFailure = 0; class CmdReplSetTest : public ReplSetCommand { public: virtual void help( stringstream &help ) const { - help << "Just for testing : do not use.\n"; + help << "Just for regression tests.\n"; } CmdReplSetTest() : ReplSetCommand("replSetTest") { } virtual bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl) { + log() << "replSet replSetTest command received: " << cmdObj.toString() << rsLog; + if( cmdObj.hasElement("forceInitialSyncFailure") ) { + replSetForceInitialSyncFailure = (unsigned) cmdObj["forceInitialSyncFailure"].Number(); + return true; + } + + // may not need this, but if removed check all tests still work: if( !check(errmsg, result) ) return false; + if( cmdObj.hasElement("blind") ) { replSetBlind = cmdObj.getBoolField("blind"); - log() << "replSet info replSetTest command received, replSetBlind=" << replSetBlind << rsLog; return true; } return false; @@ -55,6 +63,7 @@ namespace mongo { class CmdReplSetGetRBID : public ReplSetCommand { public: + /* todo: ideally this should only change on rollbacks NOT on mongod restarts also. fix... */ int rbid; virtual void help( stringstream &help ) const { help << "internal"; @@ -65,12 +74,15 @@ namespace mongo { virtual bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl) { if( !check(errmsg, result) ) return false; - result.append("rbid",rbid); + result.append("rbid",rbid); return true; } } cmdReplSetRBID; using namespace bson; + void incRBID() { + cmdReplSetRBID.rbid++; + } int getRBID(DBClientConnection *c) { bo info; c->simpleCommand("admin", &info, "replSetGetRBID"); diff --git a/db/repl/rs.cpp b/db/repl/rs.cpp index a6737be..1c0444a 100644 --- a/db/repl/rs.cpp +++ b/db/repl/rs.cpp @@ -31,39 +31,48 @@ namespace mongo { extern string *discoveredSeed; void ReplSetImpl::sethbmsg(string s, int logLevel) { - static time_t lastLogged; - if( s == _hbmsg ) { - // unchanged - if( time(0)-lastLogged < 60 ) - return; - } - - unsigned sz = s.size(); - if( sz >= 256 ) - memcpy(_hbmsg, s.c_str(), 255); - else { - _hbmsg[sz] = 0; - memcpy(_hbmsg, s.c_str(), sz); - } - if( !s.empty() ) { - lastLogged = time(0); - log(logLevel) << "replSet " << s << rsLog; - } + static time_t lastLogged; + _hbmsgTime = time(0); + + if( s == _hbmsg ) { + // unchanged + if( _hbmsgTime - lastLogged < 60 ) + return; + } + + unsigned sz = s.size(); + if( sz >= 256 ) + memcpy(_hbmsg, s.c_str(), 255); + else { + _hbmsg[sz] = 0; + memcpy(_hbmsg, s.c_str(), sz); + } + if( !s.empty() ) { + lastLogged = _hbmsgTime; + log(logLevel) << "replSet " << s << rsLog; + } } void ReplSetImpl::assumePrimary() { assert( iAmPotentiallyHot() ); writelock lk("admin."); // so we are synchronized with _logOp() box.setSelfPrimary(_self); - log() << "replSet PRIMARY" << rsLog; // self (" << _self->id() << ") is now primary" << rsLog; + //log() << "replSet PRIMARY" << rsLog; // self (" << _self->id() << ") is now primary" << rsLog; } void ReplSetImpl::changeState(MemberState s) { box.change(s, _self); } void ReplSetImpl::relinquish() { if( box.getState().primary() ) { + log() << "replSet relinquishing primary state" << rsLog; changeState(MemberState::RS_RECOVERING); - log() << "replSet info relinquished primary state" << rsLog; + + /* close sockets that were talking to us */ + /*log() << "replSet closing sockets after reqlinquishing primary" << rsLog; + MessagingPort::closeAllSockets(1);*/ + + // todo: > + //changeState(MemberState::RS_SECONDARY); } else if( box.getState().startup2() ) { // ? add comment @@ -109,11 +118,18 @@ namespace mongo { } void ReplSetImpl::_fillIsMasterHost(const Member *m, vector<string>& hosts, vector<string>& passives, vector<string>& arbiters) { + if( m->config().hidden ) + return; + if( m->potentiallyHot() ) { hosts.push_back(m->h().toString()); } else if( !m->config().arbiterOnly ) { - passives.push_back(m->h().toString()); + if( m->config().slaveDelay ) { + /* hmmm - we don't list these as they are stale. */ + } else { + passives.push_back(m->h().toString()); + } } else { arbiters.push_back(m->h().toString()); @@ -152,6 +168,10 @@ namespace mongo { } if( myConfig().arbiterOnly ) b.append("arbiterOnly", true); + if( myConfig().slaveDelay ) + b.append("slaveDelay", myConfig().slaveDelay); + if( myConfig().hidden ) + b.append("hidden", true); } /** @param cfgString <setname>/<seedhost1>,<seedhost2> */ @@ -200,6 +220,7 @@ namespace mongo { _self(0), mgr( new Manager(this) ) { + _cfg = 0; memset(_hbmsg, 0, sizeof(_hbmsg)); *_hbmsg = '.'; // temp...just to see lastH = 0; @@ -245,7 +266,7 @@ namespace mongo { loadLastOpTimeWritten(); } catch(std::exception& e) { - log() << "replSet ERROR FATAL couldn't query the local " << rsoplog << " collection. Terminating mongod after 30 seconds." << rsLog; + log() << "replSet error fatal couldn't query the local " << rsoplog << " collection. Terminating mongod after 30 seconds." << rsLog; log() << e.what() << rsLog; sleepsecs(30); dbexit( EXIT_REPLICATION_ERROR ); @@ -260,25 +281,64 @@ namespace mongo { ReplSetImpl::StartupStatus ReplSetImpl::startupStatus = PRESTART; string ReplSetImpl::startupStatusMsg; - // true if ok; throws if config really bad; false if config doesn't include self - bool ReplSetImpl::initFromConfig(ReplSetConfig& c) { + extern BSONObj *getLastErrorDefault; + + /** @param reconf true if this is a reconfiguration and not an initial load of the configuration. + @return true if ok; throws if config really bad; false if config doesn't include self + */ + bool ReplSetImpl::initFromConfig(ReplSetConfig& c, bool reconf) { + /* NOTE: haveNewConfig() writes the new config to disk before we get here. So + we cannot error out at this point, except fatally. Check errors earlier. + */ lock lk(this); + if( getLastErrorDefault || !c.getLastErrorDefaults.isEmpty() ) { + // see comment in dbcommands.cpp for getlasterrordefault + getLastErrorDefault = new BSONObj( c.getLastErrorDefaults ); + } + + list<const ReplSetConfig::MemberCfg*> newOnes; + bool additive = reconf; { + unsigned nfound = 0; int me = 0; for( vector<ReplSetConfig::MemberCfg>::iterator i = c.members.begin(); i != c.members.end(); i++ ) { const ReplSetConfig::MemberCfg& m = *i; if( m.h.isSelf() ) { + nfound++; me++; + + if( !reconf || (_self && _self->id() == (unsigned) m._id) ) + ; + else { + log() << "replSet " << _self->id() << ' ' << m._id << rsLog; + assert(false); + } + } + else if( reconf ) { + const Member *old = findById(m._id); + if( old ) { + nfound++; + assert( (int) old->id() == m._id ); + if( old->config() == m ) { + additive = false; + } + } + else { + newOnes.push_back(&m); + } } } if( me == 0 ) { // log() << "replSet config : " << _cfg->toString() << rsLog; - log() << "replSet warning can't find self in the repl set configuration:" << rsLog; + log() << "replSet error can't find self in the repl set configuration:" << rsLog; log() << c.toString() << rsLog; - return false; + assert(false); } uassert( 13302, "replSet error self appears twice in the repl set configuration", me<=1 ); + + if( reconf && config().members.size() != nfound ) + additive = false; } _cfg = new ReplSetConfig(c); @@ -287,6 +347,24 @@ namespace mongo { _name = _cfg->_id; assert( !_name.empty() ); + if( additive ) { + log() << "replSet info : additive change to configuration" << rsLog; + for( list<const ReplSetConfig::MemberCfg*>::const_iterator i = newOnes.begin(); i != newOnes.end(); i++ ) { + const ReplSetConfig::MemberCfg* m = *i; + Member *mi = new Member(m->h, m->_id, m, false); + + /** we will indicate that new members are up() initially so that we don't relinquish our + primary state because we can't (transiently) see a majority. they should be up as we + check that new members are up before getting here on reconfig anyway. + */ + mi->get_hbinfo().health = 0.1; + + _members.push(mi); + startHealthTaskFor(mi); + } + return true; + } + // start with no members. if this is a reconfig, drop the old ones. _members.orphanAll(); @@ -416,6 +494,7 @@ namespace mongo { startupStatusMsg = "replSet error loading set config (BADCONFIG)"; log() << "replSet error loading configurations " << e.toString() << rsLog; log() << "replSet error replication will not start" << rsLog; + sethbmsg("error loading set config"); _fatal(); throw; } @@ -429,11 +508,10 @@ namespace mongo { { //lock l(this); box.set(MemberState::RS_FATAL, 0); - sethbmsg("fatal error"); - log() << "replSet error fatal error, stopping replication" << rsLog; + //sethbmsg("fatal error"); + log() << "replSet error fatal, stopping replication" << rsLog; } - void ReplSet::haveNewConfig(ReplSetConfig& newConfig, bool addComment) { lock l(this); // convention is to lock replset before taking the db rwlock writelock lk(""); @@ -442,7 +520,7 @@ namespace mongo { comment = BSON( "msg" << "Reconfig set" << "version" << newConfig.version ); newConfig.saveConfigLocally(comment); try { - initFromConfig(newConfig); + initFromConfig(newConfig, true); log() << "replSet replSetReconfig new config saved locally" << rsLog; } catch(DBException& e) { diff --git a/db/repl/rs.h b/db/repl/rs.h index 17a070c..6c4d9a8 100644 --- a/db/repl/rs.h +++ b/db/repl/rs.h @@ -44,19 +44,19 @@ namespace mongo { public: Member(HostAndPort h, unsigned ord, const ReplSetConfig::MemberCfg *c, bool self); string fullName() const { return h().toString(); } - const ReplSetConfig::MemberCfg& config() const { return *_config; } + const ReplSetConfig::MemberCfg& config() const { return _config; } const HeartbeatInfo& hbinfo() const { return _hbinfo; } - string lhb() { return _hbinfo.lastHeartbeatMsg; } + HeartbeatInfo& get_hbinfo() { return _hbinfo; } + string lhb() const { return _hbinfo.lastHeartbeatMsg; } MemberState state() const { return _hbinfo.hbstate; } const HostAndPort& h() const { return _h; } unsigned id() const { return _hbinfo.id(); } - bool potentiallyHot() const { return _config->potentiallyHot(); } // not arbiter, not priority 0 - + bool potentiallyHot() const { return _config.potentiallyHot(); } // not arbiter, not priority 0 void summarizeMember(stringstream& s) const; friend class ReplSetImpl; private: - const ReplSetConfig::MemberCfg *_config; /* todo: when this changes??? */ - HostAndPort _h; + const ReplSetConfig::MemberCfg _config; + const HostAndPort _h; HeartbeatInfo _hbinfo; }; @@ -64,7 +64,13 @@ namespace mongo { ReplSetImpl *rs; bool busyWithElectSelf; int _primary; - const Member* findOtherPrimary(); + + /** @param two - if true two primaries were seen. this can happen transiently, in addition to our + polling being only occasional. in this case null is returned, but the caller should + not assume primary itself in that situation. + */ + const Member* findOtherPrimary(bool& two); + void noteARemoteIsPrimary(const Member *); virtual void starting(); public: @@ -102,6 +108,7 @@ namespace mongo { int totalVotes() const; bool aMajoritySeemsToBeUp() const; + bool shouldRelinquish() const; void electSelf(); void electCmdReceived(BSONObj, BSONObjBuilder*); void multiCommand(BSONObj cmd, list<Target>& L); @@ -119,8 +126,8 @@ namespace mongo { protected: RSBase() : magic(0x12345677), m("RSBase"), _locked(0) { } ~RSBase() { - log() << "~RSBase should never be called?" << rsLog; - assert(false); + /* this can happen if we throw in the constructor; otherwise never happens. thus we log it as it is quite unusual. */ + log() << "replSet ~RSBase called" << rsLog; } class lock { @@ -175,6 +182,9 @@ namespace mongo { const Member* getPrimary() const { return sp.primary; } void change(MemberState s, const Member *self) { scoped_lock lk(m); + if( sp.state != s ) { + log() << "replSet " << s.toString() << rsLog; + } sp.state = s; if( s.primary() ) { sp.primary = self; @@ -194,6 +204,12 @@ namespace mongo { assert( !sp.state.primary() ); sp.primary = mem; } + void noteRemoteIsPrimary(const Member *remote) { + scoped_lock lk(m); + if( !sp.state.secondary() && !sp.state.fatal() ) + sp.state = MemberState::RS_RECOVERING; + sp.primary = remote; + } StateBox() : m("StateBox") { } private: mutex m; @@ -226,7 +242,6 @@ namespace mongo { }; static StartupStatus startupStatus; static string startupStatusMsg; - static string stateAsStr(MemberState state); static string stateAsHtml(MemberState state); /* todo thread */ @@ -241,28 +256,24 @@ namespace mongo { void endOldHealthTasks(); void startHealthTaskFor(Member *m); - private: Consensus elect; - bool ok() const { return !box.getState().fatal(); } - void relinquish(); void forgetPrimary(); - protected: bool _stepDown(); private: void assumePrimary(); void loadLastOpTimeWritten(); void changeState(MemberState s); - protected: // "heartbeat message" // sent in requestHeartbeat respond in field "hbm" char _hbmsg[256]; // we change this unlocked, thus not an stl::string + time_t _hbmsgTime; // when it was logged public: void sethbmsg(string s, int logLevel = 0); protected: - bool initFromConfig(ReplSetConfig& c); // true if ok; throws if config really bad; false if config doesn't include self + bool initFromConfig(ReplSetConfig& c, bool reconf=false); // true if ok; throws if config really bad; false if config doesn't include self void _fillIsMaster(BSONObjBuilder&); void _fillIsMasterHost(const Member*, vector<string>&, vector<string>&, vector<string>&); const ReplSetConfig& config() { return *_cfg; } @@ -323,8 +334,10 @@ namespace mongo { void _syncDoInitialSync(); void syncDoInitialSync(); void _syncThread(); + bool tryToGoLiveAsASecondary(OpTime&); // readlocks void syncTail(); void syncApply(const BSONObj &o); + unsigned _syncRollback(OplogReader& r); void syncRollback(OplogReader& r); void syncFixUp(HowToFixUp& h, OplogReader& r); public: @@ -344,9 +357,10 @@ namespace mongo { /* call after constructing to start - returns fairly quickly after la[unching its threads */ void go() { _go(); } + void fatal() { _fatal(); } - bool isPrimary(); - bool isSecondary(); + bool isPrimary() { return box.getState().primary(); } + bool isSecondary() { return box.getState().secondary(); } MemberState state() const { return ReplSetImpl::state(); } string name() const { return ReplSetImpl::name(); } const ReplSetConfig& config() { return ReplSetImpl::config(); } @@ -366,7 +380,10 @@ namespace mongo { bool lockedByMe() { return RSBase::lockedByMe(); } // heartbeat msg to send to others; descriptive diagnostic info - string hbmsg() const { return _hbmsg; } + string hbmsg() const { + if( time(0)-_hbmsgTime > 120 ) return ""; + return _hbmsg; + } }; /** base class for repl set commands. checks basic things such as in rs mode before the command @@ -388,6 +405,8 @@ namespace mongo { if( theReplSet == 0 ) { result.append("startupStatus", ReplSet::startupStatus); errmsg = ReplSet::startupStatusMsg.empty() ? "replset unknown error 2" : ReplSet::startupStatusMsg; + if( ReplSet::startupStatus == 3 ) + result.append("info", "run rs.initiate(...) if not yet done for the set"); return false; } return true; @@ -397,19 +416,10 @@ namespace mongo { /** inlines ----------------- */ inline Member::Member(HostAndPort h, unsigned ord, const ReplSetConfig::MemberCfg *c, bool self) : - _config(c), _h(h), _hbinfo(ord) { - if( self ) { - _hbinfo.health = 1.0; - } - } - - inline bool ReplSet::isPrimary() { - /* todo replset */ - return box.getState().primary(); - } - - inline bool ReplSet::isSecondary() { - return box.getState().secondary(); + _config(*c), _h(h), _hbinfo(ord) + { + if( self ) + _hbinfo.health = 1.0; } } diff --git a/db/repl/rs_config.cpp b/db/repl/rs_config.cpp index 76b20a4..85c9a46 100644 --- a/db/repl/rs_config.cpp +++ b/db/repl/rs_config.cpp @@ -31,6 +31,16 @@ namespace mongo { void logOpInitiate(const bo&); + void assertOnlyHas(BSONObj o, const set<string>& fields) { + BSONObj::iterator i(o); + while( i.more() ) { + BSONElement e = i.next(); + if( !fields.count( e.fieldName() ) ) { + uasserted(13434, str::stream() << "unexpected field '" << e.fieldName() << "'in object"); + } + } + } + list<HostAndPort> ReplSetConfig::otherMemberHostnames() const { list<HostAndPort> L; for( vector<MemberCfg>::const_iterator i = members.begin(); i != members.end(); i++ ) { @@ -42,7 +52,7 @@ namespace mongo { /* comment MUST only be set when initiating the set by the initiator */ void ReplSetConfig::saveConfigLocally(bo comment) { - check(); + checkRsConfig(); log() << "replSet info saving a newer config version to local.system.replset" << rsLog; { writelock lk(""); @@ -81,6 +91,8 @@ namespace mongo { if( votes != 1 ) b << "votes" << votes; if( priority != 1.0 ) b << "priority" << priority; if( arbiterOnly ) b << "arbiterOnly" << true; + if( slaveDelay ) b << "slaveDelay" << slaveDelay; + if( hidden ) b << "hidden" << hidden; return b.obj(); } @@ -115,9 +127,17 @@ namespace mongo { mchk(priority >= 0 && priority <= 1000); mchk(votes >= 0 && votes <= 100); uassert(13419, "this version of mongod only supports priorities 0 and 1", priority == 0 || priority == 1); + uassert(13437, "slaveDelay requires priority be zero", slaveDelay == 0 || priority == 0); + uassert(13438, "bad slaveDelay value", slaveDelay >= 0 && slaveDelay <= 3600 * 24 * 366); + uassert(13439, "priority must be 0 when hidden=true", priority == 0 || !hidden); } + /** @param o old config + @param n new config + */ /*static*/ bool ReplSetConfig::legalChange(const ReplSetConfig& o, const ReplSetConfig& n, string& errmsg) { + assert( theReplSet ); + if( o._id != n._id ) { errmsg = "set name may not change"; return false; @@ -131,6 +151,25 @@ namespace mongo { return false; } + map<HostAndPort,const ReplSetConfig::MemberCfg*> old; + for( vector<ReplSetConfig::MemberCfg>::const_iterator i = o.members.begin(); i != o.members.end(); i++ ) { + old[i->h] = &(*i); + } + int me = 0; + for( vector<ReplSetConfig::MemberCfg>::const_iterator i = n.members.begin(); i != n.members.end(); i++ ) { + const ReplSetConfig::MemberCfg& m = *i; + if( old.count(m.h) ) { + if( old[m.h]->_id != m._id ) { + log() << "replSet reconfig error with member: " << m.h.toString() << rsLog; + uasserted(13432, "_id may not change for members"); + } + } + if( m.h.isSelf() ) + me++; + } + + uassert(13433, "can't find self in new replset config", me == 1); + /* TODO : MORE CHECKS HERE */ log() << "replSet TODO : don't allow removal of a node until we handle it at the removed node end?" << endl; @@ -144,7 +183,7 @@ namespace mongo { _ok = false; } - void ReplSetConfig::check() const { + void ReplSetConfig::checkRsConfig() const { uassert(13132, "nonmatching repl set name in _id field; check --replSet command line", _id == cmdLine.ourSetName()); @@ -154,6 +193,10 @@ namespace mongo { } void ReplSetConfig::from(BSONObj o) { + static const string legal[] = {"_id","version", "members","settings"}; + static const set<string> legals(legal, legal + 4); + assertOnlyHas(o, legals); + md5 = o.md5(); _id = o["_id"].String(); if( o["version"].ok() ) { @@ -170,7 +213,7 @@ namespace mongo { if( settings["heartbeatTimeout"].ok() ) ho.heartbeatTimeoutMillis = (unsigned) (settings["heartbeatTimeout"].Number() * 1000); ho.check(); - try { getLastErrorDefaults = settings["getLastErrorDefaults"].Obj(); } catch(...) { } + try { getLastErrorDefaults = settings["getLastErrorDefaults"].Obj().copy(); } catch(...) { } } set<string> hosts; @@ -188,6 +231,10 @@ namespace mongo { BSONObj mobj = members[i].Obj(); MemberCfg m; try { + static const string legal[] = {"_id","votes","priority","host","hidden","slaveDelay","arbiterOnly"}; + static const set<string> legals(legal, legal + 7); + assertOnlyHas(mobj, legals); + try { m._id = (int) mobj["_id"].Number(); } catch(...) { @@ -205,6 +252,9 @@ namespace mongo { if( m.h.isLocalHost() ) localhosts++; m.arbiterOnly = mobj.getBoolField("arbiterOnly"); + m.slaveDelay = mobj["slaveDelay"].numberInt(); + if( mobj.hasElement("hidden") ) + m.hidden = mobj.getBoolField("hidden"); if( mobj.hasElement("priority") ) m.priority = mobj["priority"].Number(); if( mobj.hasElement("votes") ) @@ -308,6 +358,7 @@ namespace mongo { BSONObj o = c->nextSafe(); uassert(13109, "multiple rows in " + rsConfigNs + " not supported", !c->more()); from(o); + checkRsConfig(); _ok = true; log(level) << "replSet load config ok from " << (h.isSelf() ? "self" : h.toString()) << rsLog; } diff --git a/db/repl/rs_config.h b/db/repl/rs_config.h index 38df772..e39dad7 100644 --- a/db/repl/rs_config.h +++ b/db/repl/rs_config.h @@ -41,18 +41,27 @@ namespace mongo { bool ok() const { return _ok; } struct MemberCfg { - MemberCfg() : _id(-1), votes(1), priority(1.0), arbiterOnly(false) { } + MemberCfg() : _id(-1), votes(1), priority(1.0), arbiterOnly(false), slaveDelay(0), hidden(false) { } int _id; /* ordinal */ unsigned votes; /* how many votes this node gets. default 1. */ HostAndPort h; double priority; /* 0 means can never be primary */ bool arbiterOnly; + int slaveDelay; /* seconds. int rather than unsigned for convenient to/front bson conversion. */ + bool hidden; /* if set, don't advertise to drives in isMaster. for non-primaries (priority 0) */ + void check() const; /* check validity, assert if not. */ BSONObj asBson() const; bool potentiallyHot() const { return !arbiterOnly && priority > 0; } + bool operator==(const MemberCfg& r) const { + return _id==r._id && votes == r.votes && h == r.h && priority == r.priority && + arbiterOnly == r.arbiterOnly && slaveDelay == r.slaveDelay && hidden == r.hidden; + } + bool operator!=(const MemberCfg& r) const { return !(*this == r); } }; + vector<MemberCfg> members; string _id; int version; @@ -68,7 +77,7 @@ namespace mongo { string toString() const { return asBson().toString(); } /** validate the settings. does not call check() on each member, you have to do that separately. */ - void check() const; + void checkRsConfig() const; /** check if modification makes sense */ static bool legalChange(const ReplSetConfig& old, const ReplSetConfig& n, string& errmsg); diff --git a/db/repl/rs_initialsync.cpp b/db/repl/rs_initialsync.cpp index 4c6bd4d..3851c66 100644 --- a/db/repl/rs_initialsync.cpp +++ b/db/repl/rs_initialsync.cpp @@ -66,7 +66,7 @@ namespace mongo { void _logOpObjRS(const BSONObj& op); - bool copyCollectionFromRemote(const string& host, const string& ns, const BSONObj& query, string errmsg); + bool copyCollectionFromRemote(const string& host, const string& ns, const BSONObj& query, string &errmsg, bool logforrepl); static void emptyOplog() { writelock lk(rsoplog); diff --git a/db/repl/rs_member.h b/db/repl/rs_member.h index 4f6846a..6a797b5 100644 --- a/db/repl/rs_member.h +++ b/db/repl/rs_member.h @@ -40,7 +40,8 @@ namespace mongo { RS_STARTUP2, RS_UNKNOWN, /* remote node not yet reached */ RS_ARBITER, - RS_DOWN /* node not reachable for a report */ + RS_DOWN, /* node not reachable for a report */ + RS_ROLLBACK } s; MemberState(MS ms = RS_UNKNOWN) : s(ms) { } @@ -51,6 +52,9 @@ namespace mongo { bool recovering() const { return s == RS_RECOVERING; } bool startup2() const { return s == RS_STARTUP2; } bool fatal() const { return s == RS_FATAL; } + bool rollback() const { return s == RS_ROLLBACK; } + + string toString() const; bool operator==(const MemberState& r) const { return s == r.s; } bool operator!=(const MemberState& r) const { return s != r.s; } @@ -61,26 +65,31 @@ namespace mongo { class HeartbeatInfo { unsigned _id; public: - HeartbeatInfo() : _id(0xffffffff),skew(INT_MIN) { } + HeartbeatInfo() : _id(0xffffffff),hbstate(MemberState::RS_UNKNOWN),health(-1.0),downSince(0),skew(INT_MIN) { } HeartbeatInfo(unsigned id); bool up() const { return health > 0; } unsigned id() const { return _id; } MemberState hbstate; double health; time_t upSince; + long long downSince; time_t lastHeartbeat; string lastHeartbeatMsg; OpTime opTime; int skew; + long long timeDown() const; // ms + /* true if changed in a way of interest to the repl set manager. */ bool changed(const HeartbeatInfo& old) const; }; inline HeartbeatInfo::HeartbeatInfo(unsigned id) : _id(id) { - health = -1.0; - lastHeartbeat = upSince = 0; - skew = INT_MIN; + hbstate = MemberState::RS_UNKNOWN; + health = -1.0; + downSince = 0; + lastHeartbeat = upSince = 0; + skew = INT_MIN; } inline bool HeartbeatInfo::changed(const HeartbeatInfo& old) const { diff --git a/db/repl/rs_rollback.cpp b/db/repl/rs_rollback.cpp index 1bb7217..6b2544c 100644 --- a/db/repl/rs_rollback.cpp +++ b/db/repl/rs_rollback.cpp @@ -62,6 +62,14 @@ namespace mongo { using namespace bson; + bool copyCollectionFromRemote(const string& host, const string& ns, const BSONObj& query, string& errmsg, bool logforrepl); + void incRBID(); + + class rsfatal : public std::exception { + public: + virtual const char* what() const throw(){ return "replica set fatal exception"; } + }; + struct DocID { const char *ns; be _id; @@ -81,6 +89,8 @@ namespace mongo { /* collections to drop */ set<string> toDrop; + set<string> collectionsToResync; + OpTime commonPoint; DiskLoc commonPointOurDiskloc; @@ -113,17 +123,59 @@ namespace mongo { if( *op == 'c' ) { be first = o.firstElement(); NamespaceString s(d.ns); // foo.$cmd - - if( string("create") == first.fieldName() ) { - /* Create collection operation - { ts: ..., h: ..., op: "c", ns: "foo.$cmd", o: { create: "abc", ... } }
- */ - string ns = s.db + '.' + o["create"].String(); // -> foo.abc - h.toDrop.insert(ns); + string cmdname = first.fieldName(); + Command *cmd = Command::findCommand(cmdname.c_str()); + if( cmd == 0 ) { + log() << "replSet warning rollback no suchcommand " << first.fieldName() << " - different mongod versions perhaps?" << rsLog; return; } - else { - log() << "replSet WARNING can't roll back this command yet: " << o.toString() << rsLog; + else { + /* findandmodify - tranlated? + godinsert?, + renamecollection a->b. just resync a & b + */ + if( cmdname == "create" ) { + /* Create collection operation + { ts: ..., h: ..., op: "c", ns: "foo.$cmd", o: { create: "abc", ... } }
+ */ + string ns = s.db + '.' + o["create"].String(); // -> foo.abc + h.toDrop.insert(ns); + return; + } + else if( cmdname == "drop" ) { + string ns = s.db + '.' + first.valuestr(); + h.collectionsToResync.insert(ns); + return; + } + else if( cmdname == "dropIndexes" || cmdname == "deleteIndexes" ) { + /* TODO: this is bad. we simply full resync the collection here, which could be very slow. */ + log() << "replSet info rollback of dropIndexes is slow in this version of mongod" << rsLog; + string ns = s.db + '.' + first.valuestr(); + h.collectionsToResync.insert(ns); + return; + } + else if( cmdname == "renameCollection" ) { + /* TODO: slow. */ + log() << "replSet info rollback of renameCollection is slow in this version of mongod" << rsLog; + string from = first.valuestr(); + string to = o["to"].String(); + h.collectionsToResync.insert(from); + h.collectionsToResync.insert(to); + return; + } + else if( cmdname == "reIndex" ) { + return; + } + else if( cmdname == "dropDatabase" ) { + log() << "replSet error rollback : can't rollback drop database full resync will be required" << rsLog; + log() << "replSet " << o.toString() << rsLog; + throw rsfatal(); + } + else { + log() << "replSet error can't rollback this command yet: " << o.toString() << rsLog; + log() << "replSet cmdname=" << cmdname << rsLog; + throw rsfatal(); + } } } @@ -141,8 +193,6 @@ namespace mongo { static void syncRollbackFindCommonPoint(DBClientConnection *them, HowToFixUp& h) { static time_t last; if( time(0)-last < 60 ) { - // this could put a lot of load on someone else, don't repeat too often - sleepsecs(10); throw "findcommonpoint waiting a while before trying again"; } last = time(0); @@ -170,12 +220,14 @@ namespace mongo { BSONObj theirObj = t->nextSafe(); OpTime theirTime = theirObj["ts"]._opTime(); - if( 1 ) { + { long long diff = (long long) ourTime.getSecs() - ((long long) theirTime.getSecs()); /* diff could be positive, negative, or zero */ - log() << "replSet info syncRollback diff in end of log times : " << diff << " seconds" << rsLog; + log() << "replSet info rollback our last optime: " << ourTime.toStringPretty() << rsLog; + log() << "replSet info rollback their last optime: " << theirTime.toStringPretty() << rsLog; + log() << "replSet info rollback diff in end of log times: " << diff << " seconds" << rsLog; if( diff > 3600 ) { - log() << "replSet syncRollback too long a time period for a rollback." << rsLog; + log() << "replSet rollback too long a time period for a rollback." << rsLog; throw "error not willing to roll back more than one hour of data"; } } @@ -197,16 +249,35 @@ namespace mongo { refetch(h, ourObj); + if( !t->more() ) { + log() << "replSet rollback error RS100 reached beginning of remote oplog" << rsLog; + log() << "replSet them: " << them->toString() << " scanned: " << scanned << rsLog; + log() << "replSet theirTime: " << theirTime.toStringLong() << rsLog; + log() << "replSet ourTime: " << ourTime.toStringLong() << rsLog; + throw "RS100 reached beginning of remote oplog [2]"; + } theirObj = t->nextSafe(); theirTime = theirObj["ts"]._opTime(); u.advance(); - if( !u.ok() ) throw "reached beginning of local oplog"; + if( !u.ok() ) { + log() << "replSet rollback error RS101 reached beginning of local oplog" << rsLog; + log() << "replSet them: " << them->toString() << " scanned: " << scanned << rsLog; + log() << "replSet theirTime: " << theirTime.toStringLong() << rsLog; + log() << "replSet ourTime: " << ourTime.toStringLong() << rsLog; + throw "RS101 reached beginning of local oplog [1]"; + } ourObj = u.current(); ourTime = ourObj["ts"]._opTime(); } else if( theirTime > ourTime ) { - /* todo: we could hit beginning of log here. exception thrown is ok but not descriptive, so fix up */ + if( !t->more() ) { + log() << "replSet rollback error RS100 reached beginning of remote oplog" << rsLog; + log() << "replSet them: " << them->toString() << " scanned: " << scanned << rsLog; + log() << "replSet theirTime: " << theirTime.toStringLong() << rsLog; + log() << "replSet ourTime: " << ourTime.toStringLong() << rsLog; + throw "RS100 reached beginning of remote oplog [1]"; + } theirObj = t->nextSafe(); theirTime = theirObj["ts"]._opTime(); } @@ -214,7 +285,13 @@ namespace mongo { // theirTime < ourTime refetch(h, ourObj); u.advance(); - if( !u.ok() ) throw "reached beginning of local oplog"; + if( !u.ok() ) { + log() << "replSet rollback error RS101 reached beginning of local oplog" << rsLog; + log() << "replSet them: " << them->toString() << " scanned: " << scanned << rsLog; + log() << "replSet theirTime: " << theirTime.toStringLong() << rsLog; + log() << "replSet ourTime: " << ourTime.toStringLong() << rsLog; + throw "RS101 reached beginning of local oplog [2]"; + } ourObj = u.current(); ourTime = ourObj["ts"]._opTime(); } @@ -226,7 +303,19 @@ namespace mongo { bson::bo goodVersionOfObject; }; - void ReplSetImpl::syncFixUp(HowToFixUp& h, OplogReader& r) { + static void setMinValid(bo newMinValid) { + try { + log() << "replSet minvalid=" << newMinValid["ts"]._opTime().toStringLong() << rsLog; + } + catch(...) { } + { + Helpers::putSingleton("local.replset.minvalid", newMinValid); + Client::Context cx( "local." ); + cx.db()->flushFiles(true); + } + } + + void ReplSetImpl::syncFixUp(HowToFixUp& h, OplogReader& r) { DBClientConnection *them = r.conn(); // fetch all first so we needn't handle interruption in a fancy way @@ -237,6 +326,7 @@ namespace mongo { bo newMinValid; + /* fetch all the goodVersions of each document from current primary */ DocID d; unsigned long long n = 0; try { @@ -258,43 +348,98 @@ namespace mongo { } newMinValid = r.getLastOp(rsoplog); if( newMinValid.isEmpty() ) { - sethbmsg("syncRollback error newMinValid empty?"); + sethbmsg("rollback error newMinValid empty?"); return; } } catch(DBException& e) { - sethbmsg(str::stream() << "syncRollback re-get objects: " << e.toString(),0); - log() << "syncRollback couldn't re-get ns:" << d.ns << " _id:" << d._id << ' ' << n << '/' << h.toRefetch.size() << rsLog; + sethbmsg(str::stream() << "rollback re-get objects: " << e.toString(),0); + log() << "rollback couldn't re-get ns:" << d.ns << " _id:" << d._id << ' ' << n << '/' << h.toRefetch.size() << rsLog; throw e; } - sethbmsg("syncRollback 3.5"); + MemoryMappedFile::flushAll(true); + + sethbmsg("rollback 3.5"); if( h.rbid != getRBID(r.conn()) ) { // our source rolled back itself. so the data we received isn't necessarily consistent. - sethbmsg("syncRollback rbid on source changed during rollback, cancelling this attempt"); + sethbmsg("rollback rbid on source changed during rollback, cancelling this attempt"); return; } // update them - sethbmsg(str::stream() << "syncRollback 4 n:" << goodVersions.size()); + sethbmsg(str::stream() << "rollback 4 n:" << goodVersions.size()); bool warn = false; assert( !h.commonPointOurDiskloc.isNull() ); - MemoryMappedFile::flushAll(true); - dbMutex.assertWriteLocked(); /* we have items we are writing that aren't from a point-in-time. thus best not to come online until we get to that point in freshness. */ - try { - log() << "replSet set minvalid=" << newMinValid["ts"]._opTime().toString() << rsLog; + setMinValid(newMinValid); + + /** any full collection resyncs required? */ + if( !h.collectionsToResync.empty() ) { + for( set<string>::iterator i = h.collectionsToResync.begin(); i != h.collectionsToResync.end(); i++ ) { + string ns = *i; + sethbmsg(str::stream() << "rollback 4.1 coll resync " << ns); + Client::Context c(*i, dbpath, 0, /*doauth*/false); + try { + bob res; + string errmsg; + dropCollection(ns, errmsg, res); + { + dbtemprelease r; + bool ok = copyCollectionFromRemote(them->getServerAddress(), ns, bo(), errmsg, false); + if( !ok ) { + log() << "replSet rollback error resyncing collection " << ns << ' ' << errmsg << rsLog; + throw "rollback error resyncing rollection [1]"; + } + } + } + catch(...) { + log() << "replset rollback error resyncing collection " << ns << rsLog; + throw "rollback error resyncing rollection [2]"; + } + } + + /* we did more reading from primary, so check it again for a rollback (which would mess us up), and + make minValid newer. + */ + sethbmsg("rollback 4.2"); + { + string err; + try { + newMinValid = r.getLastOp(rsoplog); + if( newMinValid.isEmpty() ) { + err = "can't get minvalid from primary"; + } else { + setMinValid(newMinValid); + } + } + catch(...) { + err = "can't get/set minvalid"; + } + if( h.rbid != getRBID(r.conn()) ) { + // our source rolled back itself. so the data we received isn't necessarily consistent. + // however, we've now done writes. thus we have a problem. + err += "rbid at primary changed during resync/rollback"; + } + if( !err.empty() ) { + log() << "replSet error rolling back : " << err << ". A full resync will be necessary." << rsLog; + /* todo: reset minvalid so that we are permanently in fatal state */ + /* todo: don't be fatal, but rather, get all the data first. */ + sethbmsg("rollback error"); + throw rsfatal(); + } + } + sethbmsg("rollback 4.3"); } - catch(...){} - Helpers::putSingleton("local.replset.minvalid", newMinValid); - /** first drop collections to drop - that might make things faster below actually if there were subsequent inserts */ + sethbmsg("rollback 4.6"); + /** drop collections to drop before doing individual fixups - that might make things faster below actually if there were subsequent inserts to rollback */ for( set<string>::iterator i = h.toDrop.begin(); i != h.toDrop.end(); i++ ) { Client::Context c(*i, dbpath, 0, /*doauth*/false); try { @@ -308,6 +453,7 @@ namespace mongo { } } + sethbmsg("rollback 4.7"); Client::Context c(rsoplog, dbpath, 0, /*doauth*/false); NamespaceDetails *oplogDetails = nsdetails(rsoplog); uassert(13423, str::stream() << "replSet error in rollback can't find " << rsoplog, oplogDetails); @@ -320,7 +466,12 @@ namespace mongo { bo pattern = d._id.wrap(); // { _id : ... } try { assert( d.ns && *d.ns ); - + if( h.collectionsToResync.count(d.ns) ) { + /* we just synced this entire collection */ + continue; + } + + /* keep an archive of items rolled back */ shared_ptr<RemoveSaver>& rs = removeSavers[d.ns]; if ( ! rs ) rs.reset( new RemoveSaver( "rollback" , "" , d.ns ) ); @@ -343,7 +494,7 @@ namespace mongo { long long start = Listener::getElapsedTimeMillis(); DiskLoc loc = Helpers::findOne(d.ns, pattern, false); if( Listener::getElapsedTimeMillis() - start > 200 ) - log() << "replSet warning roll back slow no _id index for " << d.ns << rsLog; + log() << "replSet warning roll back slow no _id index for " << d.ns << " perhaps?" << rsLog; //would be faster but requires index: DiskLoc loc = Helpers::findById(nsd, pattern); if( !loc.isNull() ) { try { @@ -411,9 +562,9 @@ namespace mongo { removeSavers.clear(); // this effectively closes all of them - sethbmsg(str::stream() << "syncRollback 5 d:" << deletes << " u:" << updates); + sethbmsg(str::stream() << "rollback 5 d:" << deletes << " u:" << updates); MemoryMappedFile::flushAll(true); - sethbmsg("syncRollback 6"); + sethbmsg("rollback 6"); // clean up oplog log(2) << "replSet rollback truncate oplog after " << h.commonPoint.toStringPretty() << rsLog; @@ -423,59 +574,99 @@ namespace mongo { /* reset cached lastoptimewritten and h value */ loadLastOpTimeWritten(); - sethbmsg("syncRollback 7"); + sethbmsg("rollback 7"); MemoryMappedFile::flushAll(true); // done if( warn ) sethbmsg("issues during syncRollback, see log"); else - sethbmsg("syncRollback done"); + sethbmsg("rollback done"); } void ReplSetImpl::syncRollback(OplogReader&r) { + unsigned s = _syncRollback(r); + if( s ) + sleepsecs(s); + } + + unsigned ReplSetImpl::_syncRollback(OplogReader&r) { assert( !lockedByMe() ); assert( !dbMutex.atLeastReadLocked() ); - sethbmsg("syncRollback 0"); + sethbmsg("rollback 0"); writelocktry lk(rsoplog, 20000); if( !lk.got() ) { - sethbmsg("syncRollback couldn't get write lock in a reasonable time"); - sleepsecs(2); - return; + sethbmsg("rollback couldn't get write lock in a reasonable time"); + return 2; + } + + if( box.getState().secondary() ) { + /* by doing this, we will not service reads (return an error as we aren't in secondary staate. + that perhaps is moot becasue of the write lock above, but that write lock probably gets deferred + or removed or yielded later anyway. + + also, this is better for status reporting - we know what is happening. + */ + box.change(MemberState::RS_ROLLBACK, _self); } HowToFixUp how; - sethbmsg("syncRollback 1"); + sethbmsg("rollback 1"); { r.resetCursor(); /*DBClientConnection us(false, 0, 0); string errmsg; if( !us.connect(HostAndPort::me().toString(),errmsg) ) { - sethbmsg("syncRollback connect to self failure" + errmsg); + sethbmsg("rollback connect to self failure" + errmsg); return; }*/ - sethbmsg("syncRollback 2 FindCommonPoint"); + sethbmsg("rollback 2 FindCommonPoint"); try { syncRollbackFindCommonPoint(r.conn(), how); } catch( const char *p ) { - sethbmsg(string("syncRollback 2 error ") + p); - sleepsecs(10); - return; + sethbmsg(string("rollback 2 error ") + p); + return 10; + } + catch( rsfatal& ) { + _fatal(); + return 2; } catch( DBException& e ) { - sethbmsg(string("syncRollback 2 exception ") + e.toString() + "; sleeping 1 min"); + sethbmsg(string("rollback 2 exception ") + e.toString() + "; sleeping 1 min"); + dbtemprelease r; sleepsecs(60); throw; } } - sethbmsg("replSet syncRollback 3 fixup"); + sethbmsg("replSet rollback 3 fixup"); + + { + incRBID(); + try { + syncFixUp(how, r); + } + catch( rsfatal& ) { + sethbmsg("rollback fixup error"); + _fatal(); + return 2; + } + catch(...) { + incRBID(); throw; + } + incRBID(); + + /* success - leave "ROLLBACK" state + can go to SECONDARY once minvalid is achieved + */ + box.change(MemberState::RS_RECOVERING, _self); + } - syncFixUp(how, r); + return 0; } } diff --git a/db/repl/rs_sync.cpp b/db/repl/rs_sync.cpp index bece96c..9ea65cf 100644 --- a/db/repl/rs_sync.cpp +++ b/db/repl/rs_sync.cpp @@ -24,8 +24,11 @@ namespace mongo { using namespace bson; + extern unsigned replSetForceInitialSyncFailure; + void startSyncThread() { Client::initThread("rs_sync"); + cc().iAmSyncThread(); theReplSet->syncThread(); cc().shutdown(); } @@ -91,6 +94,7 @@ namespace mongo { // todo : use exhaust unsigned long long n = 0; while( 1 ) { + if( !r.more() ) break; BSONObj o = r.nextSafe(); /* note we might get "not master" at some point */ @@ -103,9 +107,14 @@ namespace mongo { anymore. assumePrimary is in the db lock so we are safe as long as we check after we locked above. */ const Member *p1 = box.getPrimary(); - if( p1 != primary ) { - log() << "replSet primary was:" << primary->fullName() << " now:" << - (p1 != 0 ? p1->fullName() : "none") << rsLog; + if( p1 != primary || replSetForceInitialSyncFailure ) { + int f = replSetForceInitialSyncFailure; + if( f > 0 ) { + replSetForceInitialSyncFailure = f-1; + log() << "replSet test code invoked, replSetForceInitialSyncFailure" << rsLog; + } + log() << "replSet primary was:" << primary->fullName() << " now:" << + (p1 != 0 ? p1->fullName() : "none") << rsLog; throw DBException("primary changed",0); } @@ -131,6 +140,32 @@ namespace mongo { return true; } + /* should be in RECOVERING state on arrival here. + readlocks + @return true if transitioned to SECONDARY + */ + bool ReplSetImpl::tryToGoLiveAsASecondary(OpTime& /*out*/ minvalid) { + bool golive = false; + { + readlock lk("local.replset.minvalid"); + BSONObj mv; + if( Helpers::getSingleton("local.replset.minvalid", mv) ) { + minvalid = mv["ts"]._opTime(); + if( minvalid <= lastOpTimeWritten ) { + golive=true; + } + } + else + golive = true; /* must have been the original member */ + } + if( golive ) { + sethbmsg(""); + changeState(MemberState::RS_SECONDARY); + } + return golive; + } + + /* tail the primary's oplog. ok to return, will be re-called. */ void ReplSetImpl::syncTail() { // todo : locking vis a vis the mgr... @@ -147,14 +182,19 @@ namespace mongo { { BSONObj remoteOldestOp = r.findOne(rsoplog, Query()); OpTime ts = remoteOldestOp["ts"]._opTime(); - DEV log() << "remoteOldestOp: " << ts.toStringPretty() << endl; - else log(3) << "remoteOldestOp: " << ts.toStringPretty() << endl; + DEV log() << "replSet remoteOldestOp: " << ts.toStringLong() << rsLog; + else log(3) << "replSet remoteOldestOp: " << ts.toStringLong() << rsLog; + DEV { + // debugging sync1.js... + log() << "replSet lastOpTimeWritten: " << lastOpTimeWritten.toStringLong() << rsLog; + log() << "replSet our state: " << state().toString() << rsLog; + } if( lastOpTimeWritten < ts ) { - log() << "replSet error too stale to catch up, at least from primary " << hn << rsLog; - log() << "replSet our last optime : " << lastOpTimeWritten.toStringPretty() << rsLog; - log() << "replSet oldest at " << hn << " : " << ts.toStringPretty() << rsLog; + log() << "replSet error RS102 too stale to catch up, at least from primary: " << hn << rsLog; + log() << "replSet our last optime : " << lastOpTimeWritten.toStringLong() << rsLog; + log() << "replSet oldest at " << hn << " : " << ts.toStringLong() << rsLog; log() << "replSet See http://www.mongodb.org/display/DOCS/Resyncing+a+Very+Stale+Replica+Set+Member" << rsLog; - sethbmsg("error too stale to catch up"); + sethbmsg("error RS102 too stale to catch up"); sleepsecs(120); return; } @@ -213,7 +253,13 @@ namespace mongo { } } - while( 1 ) { + /* we have now checked if we need to rollback and we either don't have to or did it. */ + { + OpTime minvalid; + tryToGoLiveAsASecondary(minvalid); + } + + while( 1 ) { while( 1 ) { if( !r.moreInCurrentBatch() ) { /* we need to occasionally check some things. between @@ -222,27 +268,13 @@ namespace mongo { /* perhaps we should check this earlier? but not before the rollback checks. */ if( state().recovering() ) { /* can we go to RS_SECONDARY state? we can if not too old and if minvalid achieved */ - bool golive = false; OpTime minvalid; - { - readlock lk("local.replset.minvalid"); - BSONObj mv; - if( Helpers::getSingleton("local.replset.minvalid", mv) ) { - minvalid = mv["ts"]._opTime(); - if( minvalid <= lastOpTimeWritten ) { - golive=true; - } - } - else - golive = true; /* must have been the original member */ - } + bool golive = ReplSetImpl::tryToGoLiveAsASecondary(minvalid); if( golive ) { - sethbmsg(""); - log() << "replSet SECONDARY" << rsLog; - changeState(MemberState::RS_SECONDARY); + ; } else { - sethbmsg(str::stream() << "still syncing, not yet to minValid optime " << minvalid.toString()); + sethbmsg(str::stream() << "still syncing, not yet to minValid optime" << minvalid.toString()); } /* todo: too stale capability */ @@ -270,12 +302,40 @@ namespace mongo { syncApply(o); _logOpObjRS(o); /* with repl sets we write the ops to our oplog too: */ } + int sd = myConfig().slaveDelay; + if( sd ) { + const OpTime ts = o["ts"]._opTime(); + long long a = ts.getSecs(); + long long b = time(0); + long long lag = b - a; + long long sleeptime = sd - lag; + if( sleeptime > 0 ) { + uassert(12000, "rs slaveDelay differential too big check clocks and systems", sleeptime < 0x40000000); + log() << "replSet temp slavedelay sleep:" << sleeptime << rsLog; + if( sleeptime < 60 ) { + sleepsecs((int) sleeptime); + } + else { + // sleep(hours) would prevent reconfigs from taking effect & such! + long long waitUntil = b + sleeptime; + while( 1 ) { + sleepsecs(6); + if( time(0) >= waitUntil ) + break; + if( box.getPrimary() != primary ) + break; + if( myConfig().slaveDelay != sd ) // reconf + break; + } + } + } + } } } r.tailCheck(); if( !r.haveCursor() ) { - log() << "replSet TEMP end syncTail pass with " << hn << rsLog; - // TODO : reuse our cnonection to the primary. + log(1) << "replSet end syncTail pass with " << hn << rsLog; + // TODO : reuse our connection to the primary. return; } if( box.getPrimary() != primary ) @@ -290,6 +350,10 @@ namespace mongo { sleepsecs(1); return; } + if( sp.state.fatal() ) { + sleepsecs(5); + return; + } /* later, we can sync from up secondaries if we want. tbd. */ if( sp.primary == 0 ) |