summaryrefslogtreecommitdiff
path: root/db/repl
diff options
context:
space:
mode:
Diffstat (limited to 'db/repl')
-rw-r--r--db/repl/consensus.cpp12
-rw-r--r--db/repl/health.cpp34
-rw-r--r--db/repl/health.h2
-rw-r--r--db/repl/heartbeat.cpp27
-rw-r--r--db/repl/manager.cpp30
-rw-r--r--db/repl/replset_commands.cpp18
-rw-r--r--db/repl/rs.cpp138
-rw-r--r--db/repl/rs.h74
-rw-r--r--db/repl/rs_config.cpp57
-rw-r--r--db/repl/rs_config.h13
-rw-r--r--db/repl/rs_initialsync.cpp2
-rw-r--r--db/repl/rs_member.h19
-rw-r--r--db/repl/rs_rollback.cpp291
-rw-r--r--db/repl/rs_sync.cpp122
14 files changed, 656 insertions, 183 deletions
diff --git a/db/repl/consensus.cpp b/db/repl/consensus.cpp
index 4eba17d..4044538 100644
--- a/db/repl/consensus.cpp
+++ b/db/repl/consensus.cpp
@@ -83,6 +83,17 @@ namespace mongo {
return vUp * 2 > totalVotes();
}
+ bool Consensus::shouldRelinquish() const {
+ int vUp = rs._self->config().votes;
+ const long long T = rs.config().ho.heartbeatTimeoutMillis * rs.config().ho.heartbeatConnRetries;
+ for( Member *m = rs.head(); m; m=m->next() ) {
+ long long dt = m->hbinfo().timeDown();
+ if( dt < T )
+ vUp += m->config().votes;
+ }
+ return !( vUp * 2 > totalVotes() );
+ }
+
static const int VETO = -10000;
const time_t LeaseTime = 30;
@@ -322,6 +333,7 @@ namespace mongo {
void Consensus::electSelf() {
assert( !rs.lockedByMe() );
assert( !rs.myConfig().arbiterOnly );
+ assert( rs.myConfig().slaveDelay == 0 );
try {
_electSelf();
}
diff --git a/db/repl/health.cpp b/db/repl/health.cpp
index b0be25f..72396fe 100644
--- a/db/repl/health.cpp
+++ b/db/repl/health.cpp
@@ -89,11 +89,13 @@ namespace mongo {
}
s << td(config().votes);
{
- string stateText = ReplSet::stateAsStr(state());
+ string stateText = state().toString();
+ if( _config.hidden )
+ stateText += " (hidden)";
if( ok || stateText.empty() )
s << td(stateText); // text blank if we've never connected
else
- s << td( grey(str::stream() << "(was " << ReplSet::stateAsStr(state()) << ')', true) );
+ s << td( grey(str::stream() << "(was " << state().toString() << ')', true) );
}
s << td( grey(hbinfo().lastHeartbeatMsg,!ok) );
stringstream q;
@@ -105,28 +107,30 @@ namespace mongo {
s << td("");
s << _tr();
}
-
+
string ReplSetImpl::stateAsHtml(MemberState s) {
if( s.s == MemberState::RS_STARTUP ) return a("", "serving still starting up, or still trying to initiate the set", "STARTUP");
if( s.s == MemberState::RS_PRIMARY ) return a("", "this server thinks it is primary", "PRIMARY");
if( s.s == MemberState::RS_SECONDARY ) return a("", "this server thinks it is a secondary (slave mode)", "SECONDARY");
if( s.s == MemberState::RS_RECOVERING ) return a("", "recovering/resyncing; after recovery usually auto-transitions to secondary", "RECOVERING");
- if( s.s == MemberState::RS_FATAL ) return a("", "something bad has occurred and server is not completely offline with regard to the replica set. fatal error.", "RS_FATAL");
- if( s.s == MemberState::RS_STARTUP2 ) return a("", "loaded config, still determining who is primary", "RS_STARTUP2");
+ if( s.s == MemberState::RS_FATAL ) return a("", "something bad has occurred and server is not completely offline with regard to the replica set. fatal error.", "FATAL");
+ if( s.s == MemberState::RS_STARTUP2 ) return a("", "loaded config, still determining who is primary", "STARTUP2");
if( s.s == MemberState::RS_ARBITER ) return a("", "this server is an arbiter only", "ARBITER");
if( s.s == MemberState::RS_DOWN ) return a("", "member is down, slow, or unreachable", "DOWN");
+ if( s.s == MemberState::RS_ROLLBACK ) return a("", "rolling back operations to get in sync", "ROLLBACK");
return "";
}
- string ReplSetImpl::stateAsStr(MemberState s) {
- if( s.s == MemberState::RS_STARTUP ) return "STARTUP";
- if( s.s == MemberState::RS_PRIMARY ) return "PRIMARY";
- if( s.s == MemberState::RS_SECONDARY ) return "SECONDARY";
- if( s.s == MemberState::RS_RECOVERING ) return "RECOVERING";
- if( s.s == MemberState::RS_FATAL ) return "FATAL";
- if( s.s == MemberState::RS_STARTUP2 ) return "STARTUP2";
- if( s.s == MemberState::RS_ARBITER ) return "ARBITER";
- if( s.s == MemberState::RS_DOWN ) return "DOWN";
+ string MemberState::toString() const {
+ if( s == MemberState::RS_STARTUP ) return "STARTUP";
+ if( s == MemberState::RS_PRIMARY ) return "PRIMARY";
+ if( s == MemberState::RS_SECONDARY ) return "SECONDARY";
+ if( s == MemberState::RS_RECOVERING ) return "RECOVERING";
+ if( s == MemberState::RS_FATAL ) return "FATAL";
+ if( s == MemberState::RS_STARTUP2 ) return "STARTUP2";
+ if( s == MemberState::RS_ARBITER ) return "ARBITER";
+ if( s == MemberState::RS_DOWN ) return "DOWN";
+ if( s == MemberState::RS_ROLLBACK ) return "ROLLBACK";
return "";
}
@@ -302,7 +306,7 @@ namespace mongo {
td(ago(started)) <<
td("") << // last heartbeat
td(ToString(_self->config().votes)) <<
- td(stateAsHtml(box.getState()));
+ td( stateAsHtml(box.getState()) + (_self->config().hidden?" (hidden)":"") );
s << td( _hbmsg );
stringstream q;
q << "/_replSetOplog?" << _self->id();
diff --git a/db/repl/health.h b/db/repl/health.h
index 8b1005e..645a3b5 100644
--- a/db/repl/health.h
+++ b/db/repl/health.h
@@ -27,7 +27,7 @@ namespace mongo {
HealthOptions() {
heartbeatSleepMillis = 2000;
heartbeatTimeoutMillis = 10000;
- heartbeatConnRetries = 3;
+ heartbeatConnRetries = 2;
}
bool isDefault() const { return *this == HealthOptions(); }
diff --git a/db/repl/heartbeat.cpp b/db/repl/heartbeat.cpp
index 78ce5d1..4f28897 100644
--- a/db/repl/heartbeat.cpp
+++ b/db/repl/heartbeat.cpp
@@ -40,6 +40,13 @@ namespace mongo {
// hacky
string *discoveredSeed = 0;
+ long long HeartbeatInfo::timeDown() const {
+ if( up() ) return 0;
+ if( downSince == 0 )
+ return 0; // still waiting on first heartbeat
+ return jsTime() - downSince;
+ }
+
/* { replSetHeartbeat : <setname> } */
class CmdReplSetHeartbeat : public ReplSetCommand {
public:
@@ -55,6 +62,14 @@ namespace mongo {
errmsg = "not running with --replSet";
return false;
}
+
+ /* we want to keep heartbeat connections open when relinquishing primary. tag them here. */
+ {
+ MessagingPort *mp = cc()._mp;
+ if( mp )
+ mp->tag |= 1;
+ }
+
if( cmdObj["pv"].Int() != 1 ) {
errmsg = "incompatible replset protocol version";
return false;
@@ -91,7 +106,7 @@ namespace mongo {
result.append("set", theReplSet->name());
result.append("state", theReplSet->state().s);
result.append("hbmsg", theReplSet->hbmsg());
- result.append("time", (int) time(0));
+ result.append("time", (long long) time(0));
result.appendDate("opTime", theReplSet->lastOpTimeWritten.asDate());
int v = theReplSet->config().version;
result.append("v", v);
@@ -196,7 +211,12 @@ namespace mongo {
static time_t last = 0;
time_t now = time(0);
- if( mem.changed(old) || now-last>4 ) {
+ bool changed = mem.changed(old);
+ if( changed ) {
+ if( old.hbstate != mem.hbstate )
+ log() << "replSet " << h.toString() << ' ' << mem.hbstate.toString() << rsLog;
+ }
+ if( changed || now-last>4 ) {
last = now;
theReplSet->mgr->send( boost::bind(&Manager::msgCheckNewState, theReplSet->mgr) );
}
@@ -205,8 +225,9 @@ namespace mongo {
private:
void down(HeartbeatInfo& mem, string msg) {
mem.health = 0.0;
- if( mem.upSince ) {
+ if( mem.upSince || mem.downSince == 0 ) {
mem.upSince = 0;
+ mem.downSince = jsTime();
log() << "replSet info " << h.toString() << " is now down (or slow to respond)" << rsLog;
}
mem.lastHeartbeatMsg = msg;
diff --git a/db/repl/manager.cpp b/db/repl/manager.cpp
index e870688..862ac46 100644
--- a/db/repl/manager.cpp
+++ b/db/repl/manager.cpp
@@ -29,12 +29,17 @@ namespace mongo {
};
/* check members OTHER THAN US to see if they think they are primary */
- const Member * Manager::findOtherPrimary() {
+ const Member * Manager::findOtherPrimary(bool& two) {
+ two = false;
Member *m = rs->head();
Member *p = 0;
while( m ) {
+ DEV assert( m != rs->_self );
if( m->state().primary() && m->hbinfo().up() ) {
- if( p ) throw "twomasters"; // our polling is asynchronous, so this is often ok.
+ if( p ) {
+ two = true;
+ return 0;
+ }
p = m;
}
m = m->next();
@@ -63,7 +68,11 @@ namespace mongo {
if( rs->box.getPrimary() == m )
return;
rs->_self->lhb() = "";
- rs->box.set(rs->iAmArbiterOnly() ? MemberState::RS_ARBITER : MemberState::RS_RECOVERING, m);
+ if( rs->iAmArbiterOnly() ) {
+ rs->box.set(MemberState::RS_ARBITER, m);
+ } else {
+ rs->box.noteRemoteIsPrimary(m);
+ }
}
/** called as the health threads get new results */
@@ -87,11 +96,14 @@ namespace mongo {
}
const Member *p2;
- try { p2 = findOtherPrimary(); }
- catch(string s) {
- /* two other nodes think they are primary (asynchronously polled) -- wait for things to settle down. */
- log() << "replSet warning DIAG 2 primary" << s << rsLog;
- return;
+ {
+ bool two;
+ p2 = findOtherPrimary(two);
+ if( two ) {
+ /* two other nodes think they are primary (asynchronously polled) -- wait for things to settle down. */
+ log() << "replSet warning DIAG two primaries (transiently)" << rsLog;
+ return;
+ }
}
if( p2 ) {
@@ -136,7 +148,7 @@ namespace mongo {
return;
}
- if( !rs->elect.aMajoritySeemsToBeUp() ) {
+ if( rs->elect.shouldRelinquish() ) {
log() << "replSet can't see a majority of the set, relinquishing primary" << rsLog;
rs->relinquish();
}
diff --git a/db/repl/replset_commands.cpp b/db/repl/replset_commands.cpp
index f8f46d5..328b0ab 100644
--- a/db/repl/replset_commands.cpp
+++ b/db/repl/replset_commands.cpp
@@ -34,19 +34,27 @@ namespace mongo {
*/
bool replSetBlind = false;
+ unsigned replSetForceInitialSyncFailure = 0;
class CmdReplSetTest : public ReplSetCommand {
public:
virtual void help( stringstream &help ) const {
- help << "Just for testing : do not use.\n";
+ help << "Just for regression tests.\n";
}
CmdReplSetTest() : ReplSetCommand("replSetTest") { }
virtual bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl) {
+ log() << "replSet replSetTest command received: " << cmdObj.toString() << rsLog;
+ if( cmdObj.hasElement("forceInitialSyncFailure") ) {
+ replSetForceInitialSyncFailure = (unsigned) cmdObj["forceInitialSyncFailure"].Number();
+ return true;
+ }
+
+ // may not need this, but if removed check all tests still work:
if( !check(errmsg, result) )
return false;
+
if( cmdObj.hasElement("blind") ) {
replSetBlind = cmdObj.getBoolField("blind");
- log() << "replSet info replSetTest command received, replSetBlind=" << replSetBlind << rsLog;
return true;
}
return false;
@@ -55,6 +63,7 @@ namespace mongo {
class CmdReplSetGetRBID : public ReplSetCommand {
public:
+ /* todo: ideally this should only change on rollbacks NOT on mongod restarts also. fix... */
int rbid;
virtual void help( stringstream &help ) const {
help << "internal";
@@ -65,12 +74,15 @@ namespace mongo {
virtual bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl) {
if( !check(errmsg, result) )
return false;
- result.append("rbid",rbid);
+ result.append("rbid",rbid);
return true;
}
} cmdReplSetRBID;
using namespace bson;
+ void incRBID() {
+ cmdReplSetRBID.rbid++;
+ }
int getRBID(DBClientConnection *c) {
bo info;
c->simpleCommand("admin", &info, "replSetGetRBID");
diff --git a/db/repl/rs.cpp b/db/repl/rs.cpp
index a6737be..1c0444a 100644
--- a/db/repl/rs.cpp
+++ b/db/repl/rs.cpp
@@ -31,39 +31,48 @@ namespace mongo {
extern string *discoveredSeed;
void ReplSetImpl::sethbmsg(string s, int logLevel) {
- static time_t lastLogged;
- if( s == _hbmsg ) {
- // unchanged
- if( time(0)-lastLogged < 60 )
- return;
- }
-
- unsigned sz = s.size();
- if( sz >= 256 )
- memcpy(_hbmsg, s.c_str(), 255);
- else {
- _hbmsg[sz] = 0;
- memcpy(_hbmsg, s.c_str(), sz);
- }
- if( !s.empty() ) {
- lastLogged = time(0);
- log(logLevel) << "replSet " << s << rsLog;
- }
+ static time_t lastLogged;
+ _hbmsgTime = time(0);
+
+ if( s == _hbmsg ) {
+ // unchanged
+ if( _hbmsgTime - lastLogged < 60 )
+ return;
+ }
+
+ unsigned sz = s.size();
+ if( sz >= 256 )
+ memcpy(_hbmsg, s.c_str(), 255);
+ else {
+ _hbmsg[sz] = 0;
+ memcpy(_hbmsg, s.c_str(), sz);
+ }
+ if( !s.empty() ) {
+ lastLogged = _hbmsgTime;
+ log(logLevel) << "replSet " << s << rsLog;
+ }
}
void ReplSetImpl::assumePrimary() {
assert( iAmPotentiallyHot() );
writelock lk("admin."); // so we are synchronized with _logOp()
box.setSelfPrimary(_self);
- log() << "replSet PRIMARY" << rsLog; // self (" << _self->id() << ") is now primary" << rsLog;
+ //log() << "replSet PRIMARY" << rsLog; // self (" << _self->id() << ") is now primary" << rsLog;
}
void ReplSetImpl::changeState(MemberState s) { box.change(s, _self); }
void ReplSetImpl::relinquish() {
if( box.getState().primary() ) {
+ log() << "replSet relinquishing primary state" << rsLog;
changeState(MemberState::RS_RECOVERING);
- log() << "replSet info relinquished primary state" << rsLog;
+
+ /* close sockets that were talking to us */
+ /*log() << "replSet closing sockets after reqlinquishing primary" << rsLog;
+ MessagingPort::closeAllSockets(1);*/
+
+ // todo: >
+ //changeState(MemberState::RS_SECONDARY);
}
else if( box.getState().startup2() ) {
// ? add comment
@@ -109,11 +118,18 @@ namespace mongo {
}
void ReplSetImpl::_fillIsMasterHost(const Member *m, vector<string>& hosts, vector<string>& passives, vector<string>& arbiters) {
+ if( m->config().hidden )
+ return;
+
if( m->potentiallyHot() ) {
hosts.push_back(m->h().toString());
}
else if( !m->config().arbiterOnly ) {
- passives.push_back(m->h().toString());
+ if( m->config().slaveDelay ) {
+ /* hmmm - we don't list these as they are stale. */
+ } else {
+ passives.push_back(m->h().toString());
+ }
}
else {
arbiters.push_back(m->h().toString());
@@ -152,6 +168,10 @@ namespace mongo {
}
if( myConfig().arbiterOnly )
b.append("arbiterOnly", true);
+ if( myConfig().slaveDelay )
+ b.append("slaveDelay", myConfig().slaveDelay);
+ if( myConfig().hidden )
+ b.append("hidden", true);
}
/** @param cfgString <setname>/<seedhost1>,<seedhost2> */
@@ -200,6 +220,7 @@ namespace mongo {
_self(0),
mgr( new Manager(this) )
{
+ _cfg = 0;
memset(_hbmsg, 0, sizeof(_hbmsg));
*_hbmsg = '.'; // temp...just to see
lastH = 0;
@@ -245,7 +266,7 @@ namespace mongo {
loadLastOpTimeWritten();
}
catch(std::exception& e) {
- log() << "replSet ERROR FATAL couldn't query the local " << rsoplog << " collection. Terminating mongod after 30 seconds." << rsLog;
+ log() << "replSet error fatal couldn't query the local " << rsoplog << " collection. Terminating mongod after 30 seconds." << rsLog;
log() << e.what() << rsLog;
sleepsecs(30);
dbexit( EXIT_REPLICATION_ERROR );
@@ -260,25 +281,64 @@ namespace mongo {
ReplSetImpl::StartupStatus ReplSetImpl::startupStatus = PRESTART;
string ReplSetImpl::startupStatusMsg;
- // true if ok; throws if config really bad; false if config doesn't include self
- bool ReplSetImpl::initFromConfig(ReplSetConfig& c) {
+ extern BSONObj *getLastErrorDefault;
+
+ /** @param reconf true if this is a reconfiguration and not an initial load of the configuration.
+ @return true if ok; throws if config really bad; false if config doesn't include self
+ */
+ bool ReplSetImpl::initFromConfig(ReplSetConfig& c, bool reconf) {
+ /* NOTE: haveNewConfig() writes the new config to disk before we get here. So
+ we cannot error out at this point, except fatally. Check errors earlier.
+ */
lock lk(this);
+ if( getLastErrorDefault || !c.getLastErrorDefaults.isEmpty() ) {
+ // see comment in dbcommands.cpp for getlasterrordefault
+ getLastErrorDefault = new BSONObj( c.getLastErrorDefaults );
+ }
+
+ list<const ReplSetConfig::MemberCfg*> newOnes;
+ bool additive = reconf;
{
+ unsigned nfound = 0;
int me = 0;
for( vector<ReplSetConfig::MemberCfg>::iterator i = c.members.begin(); i != c.members.end(); i++ ) {
const ReplSetConfig::MemberCfg& m = *i;
if( m.h.isSelf() ) {
+ nfound++;
me++;
+
+ if( !reconf || (_self && _self->id() == (unsigned) m._id) )
+ ;
+ else {
+ log() << "replSet " << _self->id() << ' ' << m._id << rsLog;
+ assert(false);
+ }
+ }
+ else if( reconf ) {
+ const Member *old = findById(m._id);
+ if( old ) {
+ nfound++;
+ assert( (int) old->id() == m._id );
+ if( old->config() == m ) {
+ additive = false;
+ }
+ }
+ else {
+ newOnes.push_back(&m);
+ }
}
}
if( me == 0 ) {
// log() << "replSet config : " << _cfg->toString() << rsLog;
- log() << "replSet warning can't find self in the repl set configuration:" << rsLog;
+ log() << "replSet error can't find self in the repl set configuration:" << rsLog;
log() << c.toString() << rsLog;
- return false;
+ assert(false);
}
uassert( 13302, "replSet error self appears twice in the repl set configuration", me<=1 );
+
+ if( reconf && config().members.size() != nfound )
+ additive = false;
}
_cfg = new ReplSetConfig(c);
@@ -287,6 +347,24 @@ namespace mongo {
_name = _cfg->_id;
assert( !_name.empty() );
+ if( additive ) {
+ log() << "replSet info : additive change to configuration" << rsLog;
+ for( list<const ReplSetConfig::MemberCfg*>::const_iterator i = newOnes.begin(); i != newOnes.end(); i++ ) {
+ const ReplSetConfig::MemberCfg* m = *i;
+ Member *mi = new Member(m->h, m->_id, m, false);
+
+ /** we will indicate that new members are up() initially so that we don't relinquish our
+ primary state because we can't (transiently) see a majority. they should be up as we
+ check that new members are up before getting here on reconfig anyway.
+ */
+ mi->get_hbinfo().health = 0.1;
+
+ _members.push(mi);
+ startHealthTaskFor(mi);
+ }
+ return true;
+ }
+
// start with no members. if this is a reconfig, drop the old ones.
_members.orphanAll();
@@ -416,6 +494,7 @@ namespace mongo {
startupStatusMsg = "replSet error loading set config (BADCONFIG)";
log() << "replSet error loading configurations " << e.toString() << rsLog;
log() << "replSet error replication will not start" << rsLog;
+ sethbmsg("error loading set config");
_fatal();
throw;
}
@@ -429,11 +508,10 @@ namespace mongo {
{
//lock l(this);
box.set(MemberState::RS_FATAL, 0);
- sethbmsg("fatal error");
- log() << "replSet error fatal error, stopping replication" << rsLog;
+ //sethbmsg("fatal error");
+ log() << "replSet error fatal, stopping replication" << rsLog;
}
-
void ReplSet::haveNewConfig(ReplSetConfig& newConfig, bool addComment) {
lock l(this); // convention is to lock replset before taking the db rwlock
writelock lk("");
@@ -442,7 +520,7 @@ namespace mongo {
comment = BSON( "msg" << "Reconfig set" << "version" << newConfig.version );
newConfig.saveConfigLocally(comment);
try {
- initFromConfig(newConfig);
+ initFromConfig(newConfig, true);
log() << "replSet replSetReconfig new config saved locally" << rsLog;
}
catch(DBException& e) {
diff --git a/db/repl/rs.h b/db/repl/rs.h
index 17a070c..6c4d9a8 100644
--- a/db/repl/rs.h
+++ b/db/repl/rs.h
@@ -44,19 +44,19 @@ namespace mongo {
public:
Member(HostAndPort h, unsigned ord, const ReplSetConfig::MemberCfg *c, bool self);
string fullName() const { return h().toString(); }
- const ReplSetConfig::MemberCfg& config() const { return *_config; }
+ const ReplSetConfig::MemberCfg& config() const { return _config; }
const HeartbeatInfo& hbinfo() const { return _hbinfo; }
- string lhb() { return _hbinfo.lastHeartbeatMsg; }
+ HeartbeatInfo& get_hbinfo() { return _hbinfo; }
+ string lhb() const { return _hbinfo.lastHeartbeatMsg; }
MemberState state() const { return _hbinfo.hbstate; }
const HostAndPort& h() const { return _h; }
unsigned id() const { return _hbinfo.id(); }
- bool potentiallyHot() const { return _config->potentiallyHot(); } // not arbiter, not priority 0
-
+ bool potentiallyHot() const { return _config.potentiallyHot(); } // not arbiter, not priority 0
void summarizeMember(stringstream& s) const;
friend class ReplSetImpl;
private:
- const ReplSetConfig::MemberCfg *_config; /* todo: when this changes??? */
- HostAndPort _h;
+ const ReplSetConfig::MemberCfg _config;
+ const HostAndPort _h;
HeartbeatInfo _hbinfo;
};
@@ -64,7 +64,13 @@ namespace mongo {
ReplSetImpl *rs;
bool busyWithElectSelf;
int _primary;
- const Member* findOtherPrimary();
+
+ /** @param two - if true two primaries were seen. this can happen transiently, in addition to our
+ polling being only occasional. in this case null is returned, but the caller should
+ not assume primary itself in that situation.
+ */
+ const Member* findOtherPrimary(bool& two);
+
void noteARemoteIsPrimary(const Member *);
virtual void starting();
public:
@@ -102,6 +108,7 @@ namespace mongo {
int totalVotes() const;
bool aMajoritySeemsToBeUp() const;
+ bool shouldRelinquish() const;
void electSelf();
void electCmdReceived(BSONObj, BSONObjBuilder*);
void multiCommand(BSONObj cmd, list<Target>& L);
@@ -119,8 +126,8 @@ namespace mongo {
protected:
RSBase() : magic(0x12345677), m("RSBase"), _locked(0) { }
~RSBase() {
- log() << "~RSBase should never be called?" << rsLog;
- assert(false);
+ /* this can happen if we throw in the constructor; otherwise never happens. thus we log it as it is quite unusual. */
+ log() << "replSet ~RSBase called" << rsLog;
}
class lock {
@@ -175,6 +182,9 @@ namespace mongo {
const Member* getPrimary() const { return sp.primary; }
void change(MemberState s, const Member *self) {
scoped_lock lk(m);
+ if( sp.state != s ) {
+ log() << "replSet " << s.toString() << rsLog;
+ }
sp.state = s;
if( s.primary() ) {
sp.primary = self;
@@ -194,6 +204,12 @@ namespace mongo {
assert( !sp.state.primary() );
sp.primary = mem;
}
+ void noteRemoteIsPrimary(const Member *remote) {
+ scoped_lock lk(m);
+ if( !sp.state.secondary() && !sp.state.fatal() )
+ sp.state = MemberState::RS_RECOVERING;
+ sp.primary = remote;
+ }
StateBox() : m("StateBox") { }
private:
mutex m;
@@ -226,7 +242,6 @@ namespace mongo {
};
static StartupStatus startupStatus;
static string startupStatusMsg;
- static string stateAsStr(MemberState state);
static string stateAsHtml(MemberState state);
/* todo thread */
@@ -241,28 +256,24 @@ namespace mongo {
void endOldHealthTasks();
void startHealthTaskFor(Member *m);
- private:
Consensus elect;
- bool ok() const { return !box.getState().fatal(); }
-
void relinquish();
void forgetPrimary();
-
protected:
bool _stepDown();
private:
void assumePrimary();
void loadLastOpTimeWritten();
void changeState(MemberState s);
-
protected:
// "heartbeat message"
// sent in requestHeartbeat respond in field "hbm"
char _hbmsg[256]; // we change this unlocked, thus not an stl::string
+ time_t _hbmsgTime; // when it was logged
public:
void sethbmsg(string s, int logLevel = 0);
protected:
- bool initFromConfig(ReplSetConfig& c); // true if ok; throws if config really bad; false if config doesn't include self
+ bool initFromConfig(ReplSetConfig& c, bool reconf=false); // true if ok; throws if config really bad; false if config doesn't include self
void _fillIsMaster(BSONObjBuilder&);
void _fillIsMasterHost(const Member*, vector<string>&, vector<string>&, vector<string>&);
const ReplSetConfig& config() { return *_cfg; }
@@ -323,8 +334,10 @@ namespace mongo {
void _syncDoInitialSync();
void syncDoInitialSync();
void _syncThread();
+ bool tryToGoLiveAsASecondary(OpTime&); // readlocks
void syncTail();
void syncApply(const BSONObj &o);
+ unsigned _syncRollback(OplogReader& r);
void syncRollback(OplogReader& r);
void syncFixUp(HowToFixUp& h, OplogReader& r);
public:
@@ -344,9 +357,10 @@ namespace mongo {
/* call after constructing to start - returns fairly quickly after la[unching its threads */
void go() { _go(); }
+
void fatal() { _fatal(); }
- bool isPrimary();
- bool isSecondary();
+ bool isPrimary() { return box.getState().primary(); }
+ bool isSecondary() { return box.getState().secondary(); }
MemberState state() const { return ReplSetImpl::state(); }
string name() const { return ReplSetImpl::name(); }
const ReplSetConfig& config() { return ReplSetImpl::config(); }
@@ -366,7 +380,10 @@ namespace mongo {
bool lockedByMe() { return RSBase::lockedByMe(); }
// heartbeat msg to send to others; descriptive diagnostic info
- string hbmsg() const { return _hbmsg; }
+ string hbmsg() const {
+ if( time(0)-_hbmsgTime > 120 ) return "";
+ return _hbmsg;
+ }
};
/** base class for repl set commands. checks basic things such as in rs mode before the command
@@ -388,6 +405,8 @@ namespace mongo {
if( theReplSet == 0 ) {
result.append("startupStatus", ReplSet::startupStatus);
errmsg = ReplSet::startupStatusMsg.empty() ? "replset unknown error 2" : ReplSet::startupStatusMsg;
+ if( ReplSet::startupStatus == 3 )
+ result.append("info", "run rs.initiate(...) if not yet done for the set");
return false;
}
return true;
@@ -397,19 +416,10 @@ namespace mongo {
/** inlines ----------------- */
inline Member::Member(HostAndPort h, unsigned ord, const ReplSetConfig::MemberCfg *c, bool self) :
- _config(c), _h(h), _hbinfo(ord) {
- if( self ) {
- _hbinfo.health = 1.0;
- }
- }
-
- inline bool ReplSet::isPrimary() {
- /* todo replset */
- return box.getState().primary();
- }
-
- inline bool ReplSet::isSecondary() {
- return box.getState().secondary();
+ _config(*c), _h(h), _hbinfo(ord)
+ {
+ if( self )
+ _hbinfo.health = 1.0;
}
}
diff --git a/db/repl/rs_config.cpp b/db/repl/rs_config.cpp
index 76b20a4..85c9a46 100644
--- a/db/repl/rs_config.cpp
+++ b/db/repl/rs_config.cpp
@@ -31,6 +31,16 @@ namespace mongo {
void logOpInitiate(const bo&);
+ void assertOnlyHas(BSONObj o, const set<string>& fields) {
+ BSONObj::iterator i(o);
+ while( i.more() ) {
+ BSONElement e = i.next();
+ if( !fields.count( e.fieldName() ) ) {
+ uasserted(13434, str::stream() << "unexpected field '" << e.fieldName() << "'in object");
+ }
+ }
+ }
+
list<HostAndPort> ReplSetConfig::otherMemberHostnames() const {
list<HostAndPort> L;
for( vector<MemberCfg>::const_iterator i = members.begin(); i != members.end(); i++ ) {
@@ -42,7 +52,7 @@ namespace mongo {
/* comment MUST only be set when initiating the set by the initiator */
void ReplSetConfig::saveConfigLocally(bo comment) {
- check();
+ checkRsConfig();
log() << "replSet info saving a newer config version to local.system.replset" << rsLog;
{
writelock lk("");
@@ -81,6 +91,8 @@ namespace mongo {
if( votes != 1 ) b << "votes" << votes;
if( priority != 1.0 ) b << "priority" << priority;
if( arbiterOnly ) b << "arbiterOnly" << true;
+ if( slaveDelay ) b << "slaveDelay" << slaveDelay;
+ if( hidden ) b << "hidden" << hidden;
return b.obj();
}
@@ -115,9 +127,17 @@ namespace mongo {
mchk(priority >= 0 && priority <= 1000);
mchk(votes >= 0 && votes <= 100);
uassert(13419, "this version of mongod only supports priorities 0 and 1", priority == 0 || priority == 1);
+ uassert(13437, "slaveDelay requires priority be zero", slaveDelay == 0 || priority == 0);
+ uassert(13438, "bad slaveDelay value", slaveDelay >= 0 && slaveDelay <= 3600 * 24 * 366);
+ uassert(13439, "priority must be 0 when hidden=true", priority == 0 || !hidden);
}
+ /** @param o old config
+ @param n new config
+ */
/*static*/ bool ReplSetConfig::legalChange(const ReplSetConfig& o, const ReplSetConfig& n, string& errmsg) {
+ assert( theReplSet );
+
if( o._id != n._id ) {
errmsg = "set name may not change";
return false;
@@ -131,6 +151,25 @@ namespace mongo {
return false;
}
+ map<HostAndPort,const ReplSetConfig::MemberCfg*> old;
+ for( vector<ReplSetConfig::MemberCfg>::const_iterator i = o.members.begin(); i != o.members.end(); i++ ) {
+ old[i->h] = &(*i);
+ }
+ int me = 0;
+ for( vector<ReplSetConfig::MemberCfg>::const_iterator i = n.members.begin(); i != n.members.end(); i++ ) {
+ const ReplSetConfig::MemberCfg& m = *i;
+ if( old.count(m.h) ) {
+ if( old[m.h]->_id != m._id ) {
+ log() << "replSet reconfig error with member: " << m.h.toString() << rsLog;
+ uasserted(13432, "_id may not change for members");
+ }
+ }
+ if( m.h.isSelf() )
+ me++;
+ }
+
+ uassert(13433, "can't find self in new replset config", me == 1);
+
/* TODO : MORE CHECKS HERE */
log() << "replSet TODO : don't allow removal of a node until we handle it at the removed node end?" << endl;
@@ -144,7 +183,7 @@ namespace mongo {
_ok = false;
}
- void ReplSetConfig::check() const {
+ void ReplSetConfig::checkRsConfig() const {
uassert(13132,
"nonmatching repl set name in _id field; check --replSet command line",
_id == cmdLine.ourSetName());
@@ -154,6 +193,10 @@ namespace mongo {
}
void ReplSetConfig::from(BSONObj o) {
+ static const string legal[] = {"_id","version", "members","settings"};
+ static const set<string> legals(legal, legal + 4);
+ assertOnlyHas(o, legals);
+
md5 = o.md5();
_id = o["_id"].String();
if( o["version"].ok() ) {
@@ -170,7 +213,7 @@ namespace mongo {
if( settings["heartbeatTimeout"].ok() )
ho.heartbeatTimeoutMillis = (unsigned) (settings["heartbeatTimeout"].Number() * 1000);
ho.check();
- try { getLastErrorDefaults = settings["getLastErrorDefaults"].Obj(); } catch(...) { }
+ try { getLastErrorDefaults = settings["getLastErrorDefaults"].Obj().copy(); } catch(...) { }
}
set<string> hosts;
@@ -188,6 +231,10 @@ namespace mongo {
BSONObj mobj = members[i].Obj();
MemberCfg m;
try {
+ static const string legal[] = {"_id","votes","priority","host","hidden","slaveDelay","arbiterOnly"};
+ static const set<string> legals(legal, legal + 7);
+ assertOnlyHas(mobj, legals);
+
try {
m._id = (int) mobj["_id"].Number();
} catch(...) {
@@ -205,6 +252,9 @@ namespace mongo {
if( m.h.isLocalHost() )
localhosts++;
m.arbiterOnly = mobj.getBoolField("arbiterOnly");
+ m.slaveDelay = mobj["slaveDelay"].numberInt();
+ if( mobj.hasElement("hidden") )
+ m.hidden = mobj.getBoolField("hidden");
if( mobj.hasElement("priority") )
m.priority = mobj["priority"].Number();
if( mobj.hasElement("votes") )
@@ -308,6 +358,7 @@ namespace mongo {
BSONObj o = c->nextSafe();
uassert(13109, "multiple rows in " + rsConfigNs + " not supported", !c->more());
from(o);
+ checkRsConfig();
_ok = true;
log(level) << "replSet load config ok from " << (h.isSelf() ? "self" : h.toString()) << rsLog;
}
diff --git a/db/repl/rs_config.h b/db/repl/rs_config.h
index 38df772..e39dad7 100644
--- a/db/repl/rs_config.h
+++ b/db/repl/rs_config.h
@@ -41,18 +41,27 @@ namespace mongo {
bool ok() const { return _ok; }
struct MemberCfg {
- MemberCfg() : _id(-1), votes(1), priority(1.0), arbiterOnly(false) { }
+ MemberCfg() : _id(-1), votes(1), priority(1.0), arbiterOnly(false), slaveDelay(0), hidden(false) { }
int _id; /* ordinal */
unsigned votes; /* how many votes this node gets. default 1. */
HostAndPort h;
double priority; /* 0 means can never be primary */
bool arbiterOnly;
+ int slaveDelay; /* seconds. int rather than unsigned for convenient to/front bson conversion. */
+ bool hidden; /* if set, don't advertise to drives in isMaster. for non-primaries (priority 0) */
+
void check() const; /* check validity, assert if not. */
BSONObj asBson() const;
bool potentiallyHot() const {
return !arbiterOnly && priority > 0;
}
+ bool operator==(const MemberCfg& r) const {
+ return _id==r._id && votes == r.votes && h == r.h && priority == r.priority &&
+ arbiterOnly == r.arbiterOnly && slaveDelay == r.slaveDelay && hidden == r.hidden;
+ }
+ bool operator!=(const MemberCfg& r) const { return !(*this == r); }
};
+
vector<MemberCfg> members;
string _id;
int version;
@@ -68,7 +77,7 @@ namespace mongo {
string toString() const { return asBson().toString(); }
/** validate the settings. does not call check() on each member, you have to do that separately. */
- void check() const;
+ void checkRsConfig() const;
/** check if modification makes sense */
static bool legalChange(const ReplSetConfig& old, const ReplSetConfig& n, string& errmsg);
diff --git a/db/repl/rs_initialsync.cpp b/db/repl/rs_initialsync.cpp
index 4c6bd4d..3851c66 100644
--- a/db/repl/rs_initialsync.cpp
+++ b/db/repl/rs_initialsync.cpp
@@ -66,7 +66,7 @@ namespace mongo {
void _logOpObjRS(const BSONObj& op);
- bool copyCollectionFromRemote(const string& host, const string& ns, const BSONObj& query, string errmsg);
+ bool copyCollectionFromRemote(const string& host, const string& ns, const BSONObj& query, string &errmsg, bool logforrepl);
static void emptyOplog() {
writelock lk(rsoplog);
diff --git a/db/repl/rs_member.h b/db/repl/rs_member.h
index 4f6846a..6a797b5 100644
--- a/db/repl/rs_member.h
+++ b/db/repl/rs_member.h
@@ -40,7 +40,8 @@ namespace mongo {
RS_STARTUP2,
RS_UNKNOWN, /* remote node not yet reached */
RS_ARBITER,
- RS_DOWN /* node not reachable for a report */
+ RS_DOWN, /* node not reachable for a report */
+ RS_ROLLBACK
} s;
MemberState(MS ms = RS_UNKNOWN) : s(ms) { }
@@ -51,6 +52,9 @@ namespace mongo {
bool recovering() const { return s == RS_RECOVERING; }
bool startup2() const { return s == RS_STARTUP2; }
bool fatal() const { return s == RS_FATAL; }
+ bool rollback() const { return s == RS_ROLLBACK; }
+
+ string toString() const;
bool operator==(const MemberState& r) const { return s == r.s; }
bool operator!=(const MemberState& r) const { return s != r.s; }
@@ -61,26 +65,31 @@ namespace mongo {
class HeartbeatInfo {
unsigned _id;
public:
- HeartbeatInfo() : _id(0xffffffff),skew(INT_MIN) { }
+ HeartbeatInfo() : _id(0xffffffff),hbstate(MemberState::RS_UNKNOWN),health(-1.0),downSince(0),skew(INT_MIN) { }
HeartbeatInfo(unsigned id);
bool up() const { return health > 0; }
unsigned id() const { return _id; }
MemberState hbstate;
double health;
time_t upSince;
+ long long downSince;
time_t lastHeartbeat;
string lastHeartbeatMsg;
OpTime opTime;
int skew;
+ long long timeDown() const; // ms
+
/* true if changed in a way of interest to the repl set manager. */
bool changed(const HeartbeatInfo& old) const;
};
inline HeartbeatInfo::HeartbeatInfo(unsigned id) : _id(id) {
- health = -1.0;
- lastHeartbeat = upSince = 0;
- skew = INT_MIN;
+ hbstate = MemberState::RS_UNKNOWN;
+ health = -1.0;
+ downSince = 0;
+ lastHeartbeat = upSince = 0;
+ skew = INT_MIN;
}
inline bool HeartbeatInfo::changed(const HeartbeatInfo& old) const {
diff --git a/db/repl/rs_rollback.cpp b/db/repl/rs_rollback.cpp
index 1bb7217..6b2544c 100644
--- a/db/repl/rs_rollback.cpp
+++ b/db/repl/rs_rollback.cpp
@@ -62,6 +62,14 @@ namespace mongo {
using namespace bson;
+ bool copyCollectionFromRemote(const string& host, const string& ns, const BSONObj& query, string& errmsg, bool logforrepl);
+ void incRBID();
+
+ class rsfatal : public std::exception {
+ public:
+ virtual const char* what() const throw(){ return "replica set fatal exception"; }
+ };
+
struct DocID {
const char *ns;
be _id;
@@ -81,6 +89,8 @@ namespace mongo {
/* collections to drop */
set<string> toDrop;
+ set<string> collectionsToResync;
+
OpTime commonPoint;
DiskLoc commonPointOurDiskloc;
@@ -113,17 +123,59 @@ namespace mongo {
if( *op == 'c' ) {
be first = o.firstElement();
NamespaceString s(d.ns); // foo.$cmd
-
- if( string("create") == first.fieldName() ) {
- /* Create collection operation
- { ts: ..., h: ..., op: "c", ns: "foo.$cmd", o: { create: "abc", ... } }
- */
- string ns = s.db + '.' + o["create"].String(); // -> foo.abc
- h.toDrop.insert(ns);
+ string cmdname = first.fieldName();
+ Command *cmd = Command::findCommand(cmdname.c_str());
+ if( cmd == 0 ) {
+ log() << "replSet warning rollback no suchcommand " << first.fieldName() << " - different mongod versions perhaps?" << rsLog;
return;
}
- else {
- log() << "replSet WARNING can't roll back this command yet: " << o.toString() << rsLog;
+ else {
+ /* findandmodify - tranlated?
+ godinsert?,
+ renamecollection a->b. just resync a & b
+ */
+ if( cmdname == "create" ) {
+ /* Create collection operation
+ { ts: ..., h: ..., op: "c", ns: "foo.$cmd", o: { create: "abc", ... } }
+ */
+ string ns = s.db + '.' + o["create"].String(); // -> foo.abc
+ h.toDrop.insert(ns);
+ return;
+ }
+ else if( cmdname == "drop" ) {
+ string ns = s.db + '.' + first.valuestr();
+ h.collectionsToResync.insert(ns);
+ return;
+ }
+ else if( cmdname == "dropIndexes" || cmdname == "deleteIndexes" ) {
+ /* TODO: this is bad. we simply full resync the collection here, which could be very slow. */
+ log() << "replSet info rollback of dropIndexes is slow in this version of mongod" << rsLog;
+ string ns = s.db + '.' + first.valuestr();
+ h.collectionsToResync.insert(ns);
+ return;
+ }
+ else if( cmdname == "renameCollection" ) {
+ /* TODO: slow. */
+ log() << "replSet info rollback of renameCollection is slow in this version of mongod" << rsLog;
+ string from = first.valuestr();
+ string to = o["to"].String();
+ h.collectionsToResync.insert(from);
+ h.collectionsToResync.insert(to);
+ return;
+ }
+ else if( cmdname == "reIndex" ) {
+ return;
+ }
+ else if( cmdname == "dropDatabase" ) {
+ log() << "replSet error rollback : can't rollback drop database full resync will be required" << rsLog;
+ log() << "replSet " << o.toString() << rsLog;
+ throw rsfatal();
+ }
+ else {
+ log() << "replSet error can't rollback this command yet: " << o.toString() << rsLog;
+ log() << "replSet cmdname=" << cmdname << rsLog;
+ throw rsfatal();
+ }
}
}
@@ -141,8 +193,6 @@ namespace mongo {
static void syncRollbackFindCommonPoint(DBClientConnection *them, HowToFixUp& h) {
static time_t last;
if( time(0)-last < 60 ) {
- // this could put a lot of load on someone else, don't repeat too often
- sleepsecs(10);
throw "findcommonpoint waiting a while before trying again";
}
last = time(0);
@@ -170,12 +220,14 @@ namespace mongo {
BSONObj theirObj = t->nextSafe();
OpTime theirTime = theirObj["ts"]._opTime();
- if( 1 ) {
+ {
long long diff = (long long) ourTime.getSecs() - ((long long) theirTime.getSecs());
/* diff could be positive, negative, or zero */
- log() << "replSet info syncRollback diff in end of log times : " << diff << " seconds" << rsLog;
+ log() << "replSet info rollback our last optime: " << ourTime.toStringPretty() << rsLog;
+ log() << "replSet info rollback their last optime: " << theirTime.toStringPretty() << rsLog;
+ log() << "replSet info rollback diff in end of log times: " << diff << " seconds" << rsLog;
if( diff > 3600 ) {
- log() << "replSet syncRollback too long a time period for a rollback." << rsLog;
+ log() << "replSet rollback too long a time period for a rollback." << rsLog;
throw "error not willing to roll back more than one hour of data";
}
}
@@ -197,16 +249,35 @@ namespace mongo {
refetch(h, ourObj);
+ if( !t->more() ) {
+ log() << "replSet rollback error RS100 reached beginning of remote oplog" << rsLog;
+ log() << "replSet them: " << them->toString() << " scanned: " << scanned << rsLog;
+ log() << "replSet theirTime: " << theirTime.toStringLong() << rsLog;
+ log() << "replSet ourTime: " << ourTime.toStringLong() << rsLog;
+ throw "RS100 reached beginning of remote oplog [2]";
+ }
theirObj = t->nextSafe();
theirTime = theirObj["ts"]._opTime();
u.advance();
- if( !u.ok() ) throw "reached beginning of local oplog";
+ if( !u.ok() ) {
+ log() << "replSet rollback error RS101 reached beginning of local oplog" << rsLog;
+ log() << "replSet them: " << them->toString() << " scanned: " << scanned << rsLog;
+ log() << "replSet theirTime: " << theirTime.toStringLong() << rsLog;
+ log() << "replSet ourTime: " << ourTime.toStringLong() << rsLog;
+ throw "RS101 reached beginning of local oplog [1]";
+ }
ourObj = u.current();
ourTime = ourObj["ts"]._opTime();
}
else if( theirTime > ourTime ) {
- /* todo: we could hit beginning of log here. exception thrown is ok but not descriptive, so fix up */
+ if( !t->more() ) {
+ log() << "replSet rollback error RS100 reached beginning of remote oplog" << rsLog;
+ log() << "replSet them: " << them->toString() << " scanned: " << scanned << rsLog;
+ log() << "replSet theirTime: " << theirTime.toStringLong() << rsLog;
+ log() << "replSet ourTime: " << ourTime.toStringLong() << rsLog;
+ throw "RS100 reached beginning of remote oplog [1]";
+ }
theirObj = t->nextSafe();
theirTime = theirObj["ts"]._opTime();
}
@@ -214,7 +285,13 @@ namespace mongo {
// theirTime < ourTime
refetch(h, ourObj);
u.advance();
- if( !u.ok() ) throw "reached beginning of local oplog";
+ if( !u.ok() ) {
+ log() << "replSet rollback error RS101 reached beginning of local oplog" << rsLog;
+ log() << "replSet them: " << them->toString() << " scanned: " << scanned << rsLog;
+ log() << "replSet theirTime: " << theirTime.toStringLong() << rsLog;
+ log() << "replSet ourTime: " << ourTime.toStringLong() << rsLog;
+ throw "RS101 reached beginning of local oplog [2]";
+ }
ourObj = u.current();
ourTime = ourObj["ts"]._opTime();
}
@@ -226,7 +303,19 @@ namespace mongo {
bson::bo goodVersionOfObject;
};
- void ReplSetImpl::syncFixUp(HowToFixUp& h, OplogReader& r) {
+ static void setMinValid(bo newMinValid) {
+ try {
+ log() << "replSet minvalid=" << newMinValid["ts"]._opTime().toStringLong() << rsLog;
+ }
+ catch(...) { }
+ {
+ Helpers::putSingleton("local.replset.minvalid", newMinValid);
+ Client::Context cx( "local." );
+ cx.db()->flushFiles(true);
+ }
+ }
+
+ void ReplSetImpl::syncFixUp(HowToFixUp& h, OplogReader& r) {
DBClientConnection *them = r.conn();
// fetch all first so we needn't handle interruption in a fancy way
@@ -237,6 +326,7 @@ namespace mongo {
bo newMinValid;
+ /* fetch all the goodVersions of each document from current primary */
DocID d;
unsigned long long n = 0;
try {
@@ -258,43 +348,98 @@ namespace mongo {
}
newMinValid = r.getLastOp(rsoplog);
if( newMinValid.isEmpty() ) {
- sethbmsg("syncRollback error newMinValid empty?");
+ sethbmsg("rollback error newMinValid empty?");
return;
}
}
catch(DBException& e) {
- sethbmsg(str::stream() << "syncRollback re-get objects: " << e.toString(),0);
- log() << "syncRollback couldn't re-get ns:" << d.ns << " _id:" << d._id << ' ' << n << '/' << h.toRefetch.size() << rsLog;
+ sethbmsg(str::stream() << "rollback re-get objects: " << e.toString(),0);
+ log() << "rollback couldn't re-get ns:" << d.ns << " _id:" << d._id << ' ' << n << '/' << h.toRefetch.size() << rsLog;
throw e;
}
- sethbmsg("syncRollback 3.5");
+ MemoryMappedFile::flushAll(true);
+
+ sethbmsg("rollback 3.5");
if( h.rbid != getRBID(r.conn()) ) {
// our source rolled back itself. so the data we received isn't necessarily consistent.
- sethbmsg("syncRollback rbid on source changed during rollback, cancelling this attempt");
+ sethbmsg("rollback rbid on source changed during rollback, cancelling this attempt");
return;
}
// update them
- sethbmsg(str::stream() << "syncRollback 4 n:" << goodVersions.size());
+ sethbmsg(str::stream() << "rollback 4 n:" << goodVersions.size());
bool warn = false;
assert( !h.commonPointOurDiskloc.isNull() );
- MemoryMappedFile::flushAll(true);
-
dbMutex.assertWriteLocked();
/* we have items we are writing that aren't from a point-in-time. thus best not to come online
until we get to that point in freshness. */
- try {
- log() << "replSet set minvalid=" << newMinValid["ts"]._opTime().toString() << rsLog;
+ setMinValid(newMinValid);
+
+ /** any full collection resyncs required? */
+ if( !h.collectionsToResync.empty() ) {
+ for( set<string>::iterator i = h.collectionsToResync.begin(); i != h.collectionsToResync.end(); i++ ) {
+ string ns = *i;
+ sethbmsg(str::stream() << "rollback 4.1 coll resync " << ns);
+ Client::Context c(*i, dbpath, 0, /*doauth*/false);
+ try {
+ bob res;
+ string errmsg;
+ dropCollection(ns, errmsg, res);
+ {
+ dbtemprelease r;
+ bool ok = copyCollectionFromRemote(them->getServerAddress(), ns, bo(), errmsg, false);
+ if( !ok ) {
+ log() << "replSet rollback error resyncing collection " << ns << ' ' << errmsg << rsLog;
+ throw "rollback error resyncing rollection [1]";
+ }
+ }
+ }
+ catch(...) {
+ log() << "replset rollback error resyncing collection " << ns << rsLog;
+ throw "rollback error resyncing rollection [2]";
+ }
+ }
+
+ /* we did more reading from primary, so check it again for a rollback (which would mess us up), and
+ make minValid newer.
+ */
+ sethbmsg("rollback 4.2");
+ {
+ string err;
+ try {
+ newMinValid = r.getLastOp(rsoplog);
+ if( newMinValid.isEmpty() ) {
+ err = "can't get minvalid from primary";
+ } else {
+ setMinValid(newMinValid);
+ }
+ }
+ catch(...) {
+ err = "can't get/set minvalid";
+ }
+ if( h.rbid != getRBID(r.conn()) ) {
+ // our source rolled back itself. so the data we received isn't necessarily consistent.
+ // however, we've now done writes. thus we have a problem.
+ err += "rbid at primary changed during resync/rollback";
+ }
+ if( !err.empty() ) {
+ log() << "replSet error rolling back : " << err << ". A full resync will be necessary." << rsLog;
+ /* todo: reset minvalid so that we are permanently in fatal state */
+ /* todo: don't be fatal, but rather, get all the data first. */
+ sethbmsg("rollback error");
+ throw rsfatal();
+ }
+ }
+ sethbmsg("rollback 4.3");
}
- catch(...){}
- Helpers::putSingleton("local.replset.minvalid", newMinValid);
- /** first drop collections to drop - that might make things faster below actually if there were subsequent inserts */
+ sethbmsg("rollback 4.6");
+ /** drop collections to drop before doing individual fixups - that might make things faster below actually if there were subsequent inserts to rollback */
for( set<string>::iterator i = h.toDrop.begin(); i != h.toDrop.end(); i++ ) {
Client::Context c(*i, dbpath, 0, /*doauth*/false);
try {
@@ -308,6 +453,7 @@ namespace mongo {
}
}
+ sethbmsg("rollback 4.7");
Client::Context c(rsoplog, dbpath, 0, /*doauth*/false);
NamespaceDetails *oplogDetails = nsdetails(rsoplog);
uassert(13423, str::stream() << "replSet error in rollback can't find " << rsoplog, oplogDetails);
@@ -320,7 +466,12 @@ namespace mongo {
bo pattern = d._id.wrap(); // { _id : ... }
try {
assert( d.ns && *d.ns );
-
+ if( h.collectionsToResync.count(d.ns) ) {
+ /* we just synced this entire collection */
+ continue;
+ }
+
+ /* keep an archive of items rolled back */
shared_ptr<RemoveSaver>& rs = removeSavers[d.ns];
if ( ! rs )
rs.reset( new RemoveSaver( "rollback" , "" , d.ns ) );
@@ -343,7 +494,7 @@ namespace mongo {
long long start = Listener::getElapsedTimeMillis();
DiskLoc loc = Helpers::findOne(d.ns, pattern, false);
if( Listener::getElapsedTimeMillis() - start > 200 )
- log() << "replSet warning roll back slow no _id index for " << d.ns << rsLog;
+ log() << "replSet warning roll back slow no _id index for " << d.ns << " perhaps?" << rsLog;
//would be faster but requires index: DiskLoc loc = Helpers::findById(nsd, pattern);
if( !loc.isNull() ) {
try {
@@ -411,9 +562,9 @@ namespace mongo {
removeSavers.clear(); // this effectively closes all of them
- sethbmsg(str::stream() << "syncRollback 5 d:" << deletes << " u:" << updates);
+ sethbmsg(str::stream() << "rollback 5 d:" << deletes << " u:" << updates);
MemoryMappedFile::flushAll(true);
- sethbmsg("syncRollback 6");
+ sethbmsg("rollback 6");
// clean up oplog
log(2) << "replSet rollback truncate oplog after " << h.commonPoint.toStringPretty() << rsLog;
@@ -423,59 +574,99 @@ namespace mongo {
/* reset cached lastoptimewritten and h value */
loadLastOpTimeWritten();
- sethbmsg("syncRollback 7");
+ sethbmsg("rollback 7");
MemoryMappedFile::flushAll(true);
// done
if( warn )
sethbmsg("issues during syncRollback, see log");
else
- sethbmsg("syncRollback done");
+ sethbmsg("rollback done");
}
void ReplSetImpl::syncRollback(OplogReader&r) {
+ unsigned s = _syncRollback(r);
+ if( s )
+ sleepsecs(s);
+ }
+
+ unsigned ReplSetImpl::_syncRollback(OplogReader&r) {
assert( !lockedByMe() );
assert( !dbMutex.atLeastReadLocked() );
- sethbmsg("syncRollback 0");
+ sethbmsg("rollback 0");
writelocktry lk(rsoplog, 20000);
if( !lk.got() ) {
- sethbmsg("syncRollback couldn't get write lock in a reasonable time");
- sleepsecs(2);
- return;
+ sethbmsg("rollback couldn't get write lock in a reasonable time");
+ return 2;
+ }
+
+ if( box.getState().secondary() ) {
+ /* by doing this, we will not service reads (return an error as we aren't in secondary staate.
+ that perhaps is moot becasue of the write lock above, but that write lock probably gets deferred
+ or removed or yielded later anyway.
+
+ also, this is better for status reporting - we know what is happening.
+ */
+ box.change(MemberState::RS_ROLLBACK, _self);
}
HowToFixUp how;
- sethbmsg("syncRollback 1");
+ sethbmsg("rollback 1");
{
r.resetCursor();
/*DBClientConnection us(false, 0, 0);
string errmsg;
if( !us.connect(HostAndPort::me().toString(),errmsg) ) {
- sethbmsg("syncRollback connect to self failure" + errmsg);
+ sethbmsg("rollback connect to self failure" + errmsg);
return;
}*/
- sethbmsg("syncRollback 2 FindCommonPoint");
+ sethbmsg("rollback 2 FindCommonPoint");
try {
syncRollbackFindCommonPoint(r.conn(), how);
}
catch( const char *p ) {
- sethbmsg(string("syncRollback 2 error ") + p);
- sleepsecs(10);
- return;
+ sethbmsg(string("rollback 2 error ") + p);
+ return 10;
+ }
+ catch( rsfatal& ) {
+ _fatal();
+ return 2;
}
catch( DBException& e ) {
- sethbmsg(string("syncRollback 2 exception ") + e.toString() + "; sleeping 1 min");
+ sethbmsg(string("rollback 2 exception ") + e.toString() + "; sleeping 1 min");
+ dbtemprelease r;
sleepsecs(60);
throw;
}
}
- sethbmsg("replSet syncRollback 3 fixup");
+ sethbmsg("replSet rollback 3 fixup");
+
+ {
+ incRBID();
+ try {
+ syncFixUp(how, r);
+ }
+ catch( rsfatal& ) {
+ sethbmsg("rollback fixup error");
+ _fatal();
+ return 2;
+ }
+ catch(...) {
+ incRBID(); throw;
+ }
+ incRBID();
+
+ /* success - leave "ROLLBACK" state
+ can go to SECONDARY once minvalid is achieved
+ */
+ box.change(MemberState::RS_RECOVERING, _self);
+ }
- syncFixUp(how, r);
+ return 0;
}
}
diff --git a/db/repl/rs_sync.cpp b/db/repl/rs_sync.cpp
index bece96c..9ea65cf 100644
--- a/db/repl/rs_sync.cpp
+++ b/db/repl/rs_sync.cpp
@@ -24,8 +24,11 @@ namespace mongo {
using namespace bson;
+ extern unsigned replSetForceInitialSyncFailure;
+
void startSyncThread() {
Client::initThread("rs_sync");
+ cc().iAmSyncThread();
theReplSet->syncThread();
cc().shutdown();
}
@@ -91,6 +94,7 @@ namespace mongo {
// todo : use exhaust
unsigned long long n = 0;
while( 1 ) {
+
if( !r.more() )
break;
BSONObj o = r.nextSafe(); /* note we might get "not master" at some point */
@@ -103,9 +107,14 @@ namespace mongo {
anymore. assumePrimary is in the db lock so we are safe as long as
we check after we locked above. */
const Member *p1 = box.getPrimary();
- if( p1 != primary ) {
- log() << "replSet primary was:" << primary->fullName() << " now:" <<
- (p1 != 0 ? p1->fullName() : "none") << rsLog;
+ if( p1 != primary || replSetForceInitialSyncFailure ) {
+ int f = replSetForceInitialSyncFailure;
+ if( f > 0 ) {
+ replSetForceInitialSyncFailure = f-1;
+ log() << "replSet test code invoked, replSetForceInitialSyncFailure" << rsLog;
+ }
+ log() << "replSet primary was:" << primary->fullName() << " now:" <<
+ (p1 != 0 ? p1->fullName() : "none") << rsLog;
throw DBException("primary changed",0);
}
@@ -131,6 +140,32 @@ namespace mongo {
return true;
}
+ /* should be in RECOVERING state on arrival here.
+ readlocks
+ @return true if transitioned to SECONDARY
+ */
+ bool ReplSetImpl::tryToGoLiveAsASecondary(OpTime& /*out*/ minvalid) {
+ bool golive = false;
+ {
+ readlock lk("local.replset.minvalid");
+ BSONObj mv;
+ if( Helpers::getSingleton("local.replset.minvalid", mv) ) {
+ minvalid = mv["ts"]._opTime();
+ if( minvalid <= lastOpTimeWritten ) {
+ golive=true;
+ }
+ }
+ else
+ golive = true; /* must have been the original member */
+ }
+ if( golive ) {
+ sethbmsg("");
+ changeState(MemberState::RS_SECONDARY);
+ }
+ return golive;
+ }
+
+ /* tail the primary's oplog. ok to return, will be re-called. */
void ReplSetImpl::syncTail() {
// todo : locking vis a vis the mgr...
@@ -147,14 +182,19 @@ namespace mongo {
{
BSONObj remoteOldestOp = r.findOne(rsoplog, Query());
OpTime ts = remoteOldestOp["ts"]._opTime();
- DEV log() << "remoteOldestOp: " << ts.toStringPretty() << endl;
- else log(3) << "remoteOldestOp: " << ts.toStringPretty() << endl;
+ DEV log() << "replSet remoteOldestOp: " << ts.toStringLong() << rsLog;
+ else log(3) << "replSet remoteOldestOp: " << ts.toStringLong() << rsLog;
+ DEV {
+ // debugging sync1.js...
+ log() << "replSet lastOpTimeWritten: " << lastOpTimeWritten.toStringLong() << rsLog;
+ log() << "replSet our state: " << state().toString() << rsLog;
+ }
if( lastOpTimeWritten < ts ) {
- log() << "replSet error too stale to catch up, at least from primary " << hn << rsLog;
- log() << "replSet our last optime : " << lastOpTimeWritten.toStringPretty() << rsLog;
- log() << "replSet oldest at " << hn << " : " << ts.toStringPretty() << rsLog;
+ log() << "replSet error RS102 too stale to catch up, at least from primary: " << hn << rsLog;
+ log() << "replSet our last optime : " << lastOpTimeWritten.toStringLong() << rsLog;
+ log() << "replSet oldest at " << hn << " : " << ts.toStringLong() << rsLog;
log() << "replSet See http://www.mongodb.org/display/DOCS/Resyncing+a+Very+Stale+Replica+Set+Member" << rsLog;
- sethbmsg("error too stale to catch up");
+ sethbmsg("error RS102 too stale to catch up");
sleepsecs(120);
return;
}
@@ -213,7 +253,13 @@ namespace mongo {
}
}
- while( 1 ) {
+ /* we have now checked if we need to rollback and we either don't have to or did it. */
+ {
+ OpTime minvalid;
+ tryToGoLiveAsASecondary(minvalid);
+ }
+
+ while( 1 ) {
while( 1 ) {
if( !r.moreInCurrentBatch() ) {
/* we need to occasionally check some things. between
@@ -222,27 +268,13 @@ namespace mongo {
/* perhaps we should check this earlier? but not before the rollback checks. */
if( state().recovering() ) {
/* can we go to RS_SECONDARY state? we can if not too old and if minvalid achieved */
- bool golive = false;
OpTime minvalid;
- {
- readlock lk("local.replset.minvalid");
- BSONObj mv;
- if( Helpers::getSingleton("local.replset.minvalid", mv) ) {
- minvalid = mv["ts"]._opTime();
- if( minvalid <= lastOpTimeWritten ) {
- golive=true;
- }
- }
- else
- golive = true; /* must have been the original member */
- }
+ bool golive = ReplSetImpl::tryToGoLiveAsASecondary(minvalid);
if( golive ) {
- sethbmsg("");
- log() << "replSet SECONDARY" << rsLog;
- changeState(MemberState::RS_SECONDARY);
+ ;
}
else {
- sethbmsg(str::stream() << "still syncing, not yet to minValid optime " << minvalid.toString());
+ sethbmsg(str::stream() << "still syncing, not yet to minValid optime" << minvalid.toString());
}
/* todo: too stale capability */
@@ -270,12 +302,40 @@ namespace mongo {
syncApply(o);
_logOpObjRS(o); /* with repl sets we write the ops to our oplog too: */
}
+ int sd = myConfig().slaveDelay;
+ if( sd ) {
+ const OpTime ts = o["ts"]._opTime();
+ long long a = ts.getSecs();
+ long long b = time(0);
+ long long lag = b - a;
+ long long sleeptime = sd - lag;
+ if( sleeptime > 0 ) {
+ uassert(12000, "rs slaveDelay differential too big check clocks and systems", sleeptime < 0x40000000);
+ log() << "replSet temp slavedelay sleep:" << sleeptime << rsLog;
+ if( sleeptime < 60 ) {
+ sleepsecs((int) sleeptime);
+ }
+ else {
+ // sleep(hours) would prevent reconfigs from taking effect & such!
+ long long waitUntil = b + sleeptime;
+ while( 1 ) {
+ sleepsecs(6);
+ if( time(0) >= waitUntil )
+ break;
+ if( box.getPrimary() != primary )
+ break;
+ if( myConfig().slaveDelay != sd ) // reconf
+ break;
+ }
+ }
+ }
+ }
}
}
r.tailCheck();
if( !r.haveCursor() ) {
- log() << "replSet TEMP end syncTail pass with " << hn << rsLog;
- // TODO : reuse our cnonection to the primary.
+ log(1) << "replSet end syncTail pass with " << hn << rsLog;
+ // TODO : reuse our connection to the primary.
return;
}
if( box.getPrimary() != primary )
@@ -290,6 +350,10 @@ namespace mongo {
sleepsecs(1);
return;
}
+ if( sp.state.fatal() ) {
+ sleepsecs(5);
+ return;
+ }
/* later, we can sync from up secondaries if we want. tbd. */
if( sp.primary == 0 )