diff options
Diffstat (limited to 'db/repl/heartbeat.cpp')
-rw-r--r-- | db/repl/heartbeat.cpp | 229 |
1 files changed, 143 insertions, 86 deletions
diff --git a/db/repl/heartbeat.cpp b/db/repl/heartbeat.cpp index 7d3f78c..138ba45 100644 --- a/db/repl/heartbeat.cpp +++ b/db/repl/heartbeat.cpp @@ -51,11 +51,14 @@ namespace mongo { /* { replSetHeartbeat : <setname> } */ class CmdReplSetHeartbeat : public ReplSetCommand { public: - virtual bool adminOnly() const { return false; } CmdReplSetHeartbeat() : ReplSetCommand("replSetHeartbeat") { } virtual bool run(const string& , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool fromRepl) { - if( replSetBlind ) + if( replSetBlind ) { + if (theReplSet) { + errmsg = str::stream() << theReplSet->selfFullName() << " is blind"; + } return false; + } /* we don't call ReplSetCommand::check() here because heartbeat checks many things that are pre-initialization. */ @@ -99,8 +102,8 @@ namespace mongo { if( !from.empty() ) { replSettings.discoveredSeeds.insert(from); } - errmsg = "still initializing"; - return false; + result.append("hbmsg", "still initializing"); + return true; } if( theReplSet->name() != cmdObj.getStringField("replSetHeartbeat") ) { @@ -123,32 +126,54 @@ namespace mongo { } } cmdReplSetHeartbeat; - /* throws dbexception */ - bool requestHeartbeat(string setName, string from, string memberFullName, BSONObj& result, int myCfgVersion, int& theirCfgVersion, bool checkEmpty) { + bool requestHeartbeat(string setName, string from, string memberFullName, BSONObj& result, + int myCfgVersion, int& theirCfgVersion, bool checkEmpty) { if( replSetBlind ) { - //sleepmillis( rand() ); return false; } - BSONObj cmd = BSON( "replSetHeartbeat" << setName << "v" << myCfgVersion << "pv" << 1 << "checkEmpty" << checkEmpty << "from" << from ); + BSONObj cmd = BSON( "replSetHeartbeat" << setName << + "v" << myCfgVersion << + "pv" << 1 << + "checkEmpty" << checkEmpty << + "from" << from ); - // we might be talking to ourself - generally not a great idea to do outbound waiting calls in a write lock - assert( !dbMutex.isWriteLocked() ); - - // these are slow (multisecond to respond), so generally we don't want to be locked, at least not without + // generally not a great idea to do outbound waiting calls in a + // write lock. heartbeats can be slow (multisecond to respond), so + // generally we don't want to be locked, at least not without // thinking acarefully about it first. - assert( theReplSet == 0 || !theReplSet->lockedByMe() ); + uassert(15900, "can't heartbeat: too much lock", + !dbMutex.isWriteLocked() || theReplSet == 0 || !theReplSet->lockedByMe() ); ScopedConn conn(memberFullName); return conn.runCommand("admin", cmd, result, 0); } - /* poll every other set member to check its status */ + /** + * Poll every other set member to check its status. + * + * A detail about local machines and authentication: suppose we have 2 + * members, A and B, on the same machine using different keyFiles. A is + * primary. If we're just starting the set, there are no admin users, so A + * and B can access each other because it's local access. + * + * Then we add a user to A. B cannot sync this user from A, because as soon + * as we add a an admin user, A requires auth. However, A can still + * heartbeat B, because B *doesn't* have an admin user. So A can reach B + * but B cannot reach A. + * + * Once B is restarted with the correct keyFile, everything should work as + * expected. + */ class ReplSetHealthPollTask : public task::Task { + private: HostAndPort h; HeartbeatInfo m; + int tries; + const int threshold; public: - ReplSetHealthPollTask(const HostAndPort& hh, const HeartbeatInfo& mm) : h(hh), m(mm) { } + ReplSetHealthPollTask(const HostAndPort& hh, const HeartbeatInfo& mm) + : h(hh), m(mm), tries(0), threshold(15) { } string name() const { return "rsHealthPoll"; } void doWork() { @@ -163,16 +188,7 @@ namespace mongo { BSONObj info; int theirConfigVersion = -10000; - Timer timer; - - bool ok = requestHeartbeat(theReplSet->name(), theReplSet->selfFullName(), h.toString(), info, theReplSet->config().version, theirConfigVersion); - - mem.ping = (unsigned int)timer.millis(); - - time_t before = timer.startTime() / 1000000; - // we set this on any response - we don't get this far if - // couldn't connect because exception is thrown - time_t after = mem.lastHeartbeat = before + (mem.ping / 1000); + bool ok = _requestHeartbeat(mem, info, theirConfigVersion); // weight new ping with old pings // on the first ping, just use the ping value @@ -180,68 +196,12 @@ namespace mongo { mem.ping = (unsigned int)((old.ping * .8) + (mem.ping * .2)); } - if ( info["time"].isNumber() ) { - long long t = info["time"].numberLong(); - if( t > after ) - mem.skew = (int) (t - after); - else if( t < before ) - mem.skew = (int) (t - before); // negative - } - else { - // it won't be there if remote hasn't initialized yet - if( info.hasElement("time") ) - warning() << "heatbeat.time isn't a number: " << info << endl; - mem.skew = INT_MIN; - } - - { - be state = info["state"]; - if( state.ok() ) - mem.hbstate = MemberState(state.Int()); - } if( ok ) { - HeartbeatInfo::numPings++; - - if( mem.upSince == 0 ) { - log() << "replSet info member " << h.toString() << " is up" << rsLog; - mem.upSince = mem.lastHeartbeat; - } - mem.health = 1.0; - mem.lastHeartbeatMsg = info["hbmsg"].String(); - if( info.hasElement("opTime") ) - mem.opTime = info["opTime"].Date(); - - // see if this member is in the electable set - if( info["e"].eoo() ) { - // for backwards compatibility - const Member *member = theReplSet->findById(mem.id()); - if (member && member->config().potentiallyHot()) { - theReplSet->addToElectable(mem.id()); - } - else { - theReplSet->rmFromElectable(mem.id()); - } - } - // add this server to the electable set if it is within 10 - // seconds of the latest optime we know of - else if( info["e"].trueValue() && - mem.opTime >= theReplSet->lastOpTimeWritten.getSecs() - 10) { - unsigned lastOp = theReplSet->lastOtherOpTime().getSecs(); - if (lastOp > 0 && mem.opTime >= lastOp - 10) { - theReplSet->addToElectable(mem.id()); - } - } - else { - theReplSet->rmFromElectable(mem.id()); - } - - be cfg = info["config"]; - if( cfg.ok() ) { - // received a new config - boost::function<void()> f = - boost::bind(&Manager::msgReceivedNewConfig, theReplSet->mgr, cfg.Obj().copy()); - theReplSet->mgr->send(f); - } + up(info, mem); + } + else if (!info["errmsg"].eoo() && + info["errmsg"].str() == "need to login") { + authIssue(mem); } else { down(mem, info.getStringField("errmsg")); @@ -271,7 +231,58 @@ namespace mongo { } private: + bool _requestHeartbeat(HeartbeatInfo& mem, BSONObj& info, int& theirConfigVersion) { + if (tries++ % threshold == (threshold - 1)) { + ScopedConn conn(h.toString()); + conn.reconnect(); + } + + Timer timer; + + bool ok = requestHeartbeat(theReplSet->name(), theReplSet->selfFullName(), + h.toString(), info, theReplSet->config().version, theirConfigVersion); + + mem.ping = (unsigned int)timer.millis(); + + time_t before = timer.startTime() / 1000000; + // we set this on any response - we don't get this far if + // couldn't connect because exception is thrown + time_t after = mem.lastHeartbeat = before + (mem.ping / 1000); + + if ( info["time"].isNumber() ) { + long long t = info["time"].numberLong(); + if( t > after ) + mem.skew = (int) (t - after); + else if( t < before ) + mem.skew = (int) (t - before); // negative + } + else { + // it won't be there if remote hasn't initialized yet + if( info.hasElement("time") ) + warning() << "heatbeat.time isn't a number: " << info << endl; + mem.skew = INT_MIN; + } + + { + be state = info["state"]; + if( state.ok() ) + mem.hbstate = MemberState(state.Int()); + } + + return ok; + } + + void authIssue(HeartbeatInfo& mem) { + mem.authIssue = true; + mem.hbstate = MemberState::RS_UNKNOWN; + + // set health to 0 so that this doesn't count towards majority + mem.health = 0.0; + theReplSet->rmFromElectable(mem.id()); + } + void down(HeartbeatInfo& mem, string msg) { + mem.authIssue = false; mem.health = 0.0; mem.ping = 0; if( mem.upSince || mem.downSince == 0 ) { @@ -283,6 +294,52 @@ namespace mongo { mem.lastHeartbeatMsg = msg; theReplSet->rmFromElectable(mem.id()); } + + void up(const BSONObj& info, HeartbeatInfo& mem) { + HeartbeatInfo::numPings++; + mem.authIssue = false; + + if( mem.upSince == 0 ) { + log() << "replSet member " << h.toString() << " is up" << rsLog; + mem.upSince = mem.lastHeartbeat; + } + mem.health = 1.0; + mem.lastHeartbeatMsg = info["hbmsg"].String(); + if( info.hasElement("opTime") ) + mem.opTime = info["opTime"].Date(); + + // see if this member is in the electable set + if( info["e"].eoo() ) { + // for backwards compatibility + const Member *member = theReplSet->findById(mem.id()); + if (member && member->config().potentiallyHot()) { + theReplSet->addToElectable(mem.id()); + } + else { + theReplSet->rmFromElectable(mem.id()); + } + } + // add this server to the electable set if it is within 10 + // seconds of the latest optime we know of + else if( info["e"].trueValue() && + mem.opTime >= theReplSet->lastOpTimeWritten.getSecs() - 10) { + unsigned lastOp = theReplSet->lastOtherOpTime().getSecs(); + if (lastOp > 0 && mem.opTime >= lastOp - 10) { + theReplSet->addToElectable(mem.id()); + } + } + else { + theReplSet->rmFromElectable(mem.id()); + } + + be cfg = info["config"]; + if( cfg.ok() ) { + // received a new config + boost::function<void()> f = + boost::bind(&Manager::msgReceivedNewConfig, theReplSet->mgr, cfg.Obj().copy()); + theReplSet->mgr->send(f); + } + } }; void ReplSetImpl::endOldHealthTasks() { |