summaryrefslogtreecommitdiff
path: root/db/repl/heartbeat.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'db/repl/heartbeat.cpp')
-rw-r--r--db/repl/heartbeat.cpp229
1 files changed, 143 insertions, 86 deletions
diff --git a/db/repl/heartbeat.cpp b/db/repl/heartbeat.cpp
index 7d3f78c..138ba45 100644
--- a/db/repl/heartbeat.cpp
+++ b/db/repl/heartbeat.cpp
@@ -51,11 +51,14 @@ namespace mongo {
/* { replSetHeartbeat : <setname> } */
class CmdReplSetHeartbeat : public ReplSetCommand {
public:
- virtual bool adminOnly() const { return false; }
CmdReplSetHeartbeat() : ReplSetCommand("replSetHeartbeat") { }
virtual bool run(const string& , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool fromRepl) {
- if( replSetBlind )
+ if( replSetBlind ) {
+ if (theReplSet) {
+ errmsg = str::stream() << theReplSet->selfFullName() << " is blind";
+ }
return false;
+ }
/* we don't call ReplSetCommand::check() here because heartbeat
checks many things that are pre-initialization. */
@@ -99,8 +102,8 @@ namespace mongo {
if( !from.empty() ) {
replSettings.discoveredSeeds.insert(from);
}
- errmsg = "still initializing";
- return false;
+ result.append("hbmsg", "still initializing");
+ return true;
}
if( theReplSet->name() != cmdObj.getStringField("replSetHeartbeat") ) {
@@ -123,32 +126,54 @@ namespace mongo {
}
} cmdReplSetHeartbeat;
- /* throws dbexception */
- bool requestHeartbeat(string setName, string from, string memberFullName, BSONObj& result, int myCfgVersion, int& theirCfgVersion, bool checkEmpty) {
+ bool requestHeartbeat(string setName, string from, string memberFullName, BSONObj& result,
+ int myCfgVersion, int& theirCfgVersion, bool checkEmpty) {
if( replSetBlind ) {
- //sleepmillis( rand() );
return false;
}
- BSONObj cmd = BSON( "replSetHeartbeat" << setName << "v" << myCfgVersion << "pv" << 1 << "checkEmpty" << checkEmpty << "from" << from );
+ BSONObj cmd = BSON( "replSetHeartbeat" << setName <<
+ "v" << myCfgVersion <<
+ "pv" << 1 <<
+ "checkEmpty" << checkEmpty <<
+ "from" << from );
- // we might be talking to ourself - generally not a great idea to do outbound waiting calls in a write lock
- assert( !dbMutex.isWriteLocked() );
-
- // these are slow (multisecond to respond), so generally we don't want to be locked, at least not without
+ // generally not a great idea to do outbound waiting calls in a
+ // write lock. heartbeats can be slow (multisecond to respond), so
+ // generally we don't want to be locked, at least not without
// thinking acarefully about it first.
- assert( theReplSet == 0 || !theReplSet->lockedByMe() );
+ uassert(15900, "can't heartbeat: too much lock",
+ !dbMutex.isWriteLocked() || theReplSet == 0 || !theReplSet->lockedByMe() );
ScopedConn conn(memberFullName);
return conn.runCommand("admin", cmd, result, 0);
}
- /* poll every other set member to check its status */
+ /**
+ * Poll every other set member to check its status.
+ *
+ * A detail about local machines and authentication: suppose we have 2
+ * members, A and B, on the same machine using different keyFiles. A is
+ * primary. If we're just starting the set, there are no admin users, so A
+ * and B can access each other because it's local access.
+ *
+ * Then we add a user to A. B cannot sync this user from A, because as soon
+ * as we add a an admin user, A requires auth. However, A can still
+ * heartbeat B, because B *doesn't* have an admin user. So A can reach B
+ * but B cannot reach A.
+ *
+ * Once B is restarted with the correct keyFile, everything should work as
+ * expected.
+ */
class ReplSetHealthPollTask : public task::Task {
+ private:
HostAndPort h;
HeartbeatInfo m;
+ int tries;
+ const int threshold;
public:
- ReplSetHealthPollTask(const HostAndPort& hh, const HeartbeatInfo& mm) : h(hh), m(mm) { }
+ ReplSetHealthPollTask(const HostAndPort& hh, const HeartbeatInfo& mm)
+ : h(hh), m(mm), tries(0), threshold(15) { }
string name() const { return "rsHealthPoll"; }
void doWork() {
@@ -163,16 +188,7 @@ namespace mongo {
BSONObj info;
int theirConfigVersion = -10000;
- Timer timer;
-
- bool ok = requestHeartbeat(theReplSet->name(), theReplSet->selfFullName(), h.toString(), info, theReplSet->config().version, theirConfigVersion);
-
- mem.ping = (unsigned int)timer.millis();
-
- time_t before = timer.startTime() / 1000000;
- // we set this on any response - we don't get this far if
- // couldn't connect because exception is thrown
- time_t after = mem.lastHeartbeat = before + (mem.ping / 1000);
+ bool ok = _requestHeartbeat(mem, info, theirConfigVersion);
// weight new ping with old pings
// on the first ping, just use the ping value
@@ -180,68 +196,12 @@ namespace mongo {
mem.ping = (unsigned int)((old.ping * .8) + (mem.ping * .2));
}
- if ( info["time"].isNumber() ) {
- long long t = info["time"].numberLong();
- if( t > after )
- mem.skew = (int) (t - after);
- else if( t < before )
- mem.skew = (int) (t - before); // negative
- }
- else {
- // it won't be there if remote hasn't initialized yet
- if( info.hasElement("time") )
- warning() << "heatbeat.time isn't a number: " << info << endl;
- mem.skew = INT_MIN;
- }
-
- {
- be state = info["state"];
- if( state.ok() )
- mem.hbstate = MemberState(state.Int());
- }
if( ok ) {
- HeartbeatInfo::numPings++;
-
- if( mem.upSince == 0 ) {
- log() << "replSet info member " << h.toString() << " is up" << rsLog;
- mem.upSince = mem.lastHeartbeat;
- }
- mem.health = 1.0;
- mem.lastHeartbeatMsg = info["hbmsg"].String();
- if( info.hasElement("opTime") )
- mem.opTime = info["opTime"].Date();
-
- // see if this member is in the electable set
- if( info["e"].eoo() ) {
- // for backwards compatibility
- const Member *member = theReplSet->findById(mem.id());
- if (member && member->config().potentiallyHot()) {
- theReplSet->addToElectable(mem.id());
- }
- else {
- theReplSet->rmFromElectable(mem.id());
- }
- }
- // add this server to the electable set if it is within 10
- // seconds of the latest optime we know of
- else if( info["e"].trueValue() &&
- mem.opTime >= theReplSet->lastOpTimeWritten.getSecs() - 10) {
- unsigned lastOp = theReplSet->lastOtherOpTime().getSecs();
- if (lastOp > 0 && mem.opTime >= lastOp - 10) {
- theReplSet->addToElectable(mem.id());
- }
- }
- else {
- theReplSet->rmFromElectable(mem.id());
- }
-
- be cfg = info["config"];
- if( cfg.ok() ) {
- // received a new config
- boost::function<void()> f =
- boost::bind(&Manager::msgReceivedNewConfig, theReplSet->mgr, cfg.Obj().copy());
- theReplSet->mgr->send(f);
- }
+ up(info, mem);
+ }
+ else if (!info["errmsg"].eoo() &&
+ info["errmsg"].str() == "need to login") {
+ authIssue(mem);
}
else {
down(mem, info.getStringField("errmsg"));
@@ -271,7 +231,58 @@ namespace mongo {
}
private:
+ bool _requestHeartbeat(HeartbeatInfo& mem, BSONObj& info, int& theirConfigVersion) {
+ if (tries++ % threshold == (threshold - 1)) {
+ ScopedConn conn(h.toString());
+ conn.reconnect();
+ }
+
+ Timer timer;
+
+ bool ok = requestHeartbeat(theReplSet->name(), theReplSet->selfFullName(),
+ h.toString(), info, theReplSet->config().version, theirConfigVersion);
+
+ mem.ping = (unsigned int)timer.millis();
+
+ time_t before = timer.startTime() / 1000000;
+ // we set this on any response - we don't get this far if
+ // couldn't connect because exception is thrown
+ time_t after = mem.lastHeartbeat = before + (mem.ping / 1000);
+
+ if ( info["time"].isNumber() ) {
+ long long t = info["time"].numberLong();
+ if( t > after )
+ mem.skew = (int) (t - after);
+ else if( t < before )
+ mem.skew = (int) (t - before); // negative
+ }
+ else {
+ // it won't be there if remote hasn't initialized yet
+ if( info.hasElement("time") )
+ warning() << "heatbeat.time isn't a number: " << info << endl;
+ mem.skew = INT_MIN;
+ }
+
+ {
+ be state = info["state"];
+ if( state.ok() )
+ mem.hbstate = MemberState(state.Int());
+ }
+
+ return ok;
+ }
+
+ void authIssue(HeartbeatInfo& mem) {
+ mem.authIssue = true;
+ mem.hbstate = MemberState::RS_UNKNOWN;
+
+ // set health to 0 so that this doesn't count towards majority
+ mem.health = 0.0;
+ theReplSet->rmFromElectable(mem.id());
+ }
+
void down(HeartbeatInfo& mem, string msg) {
+ mem.authIssue = false;
mem.health = 0.0;
mem.ping = 0;
if( mem.upSince || mem.downSince == 0 ) {
@@ -283,6 +294,52 @@ namespace mongo {
mem.lastHeartbeatMsg = msg;
theReplSet->rmFromElectable(mem.id());
}
+
+ void up(const BSONObj& info, HeartbeatInfo& mem) {
+ HeartbeatInfo::numPings++;
+ mem.authIssue = false;
+
+ if( mem.upSince == 0 ) {
+ log() << "replSet member " << h.toString() << " is up" << rsLog;
+ mem.upSince = mem.lastHeartbeat;
+ }
+ mem.health = 1.0;
+ mem.lastHeartbeatMsg = info["hbmsg"].String();
+ if( info.hasElement("opTime") )
+ mem.opTime = info["opTime"].Date();
+
+ // see if this member is in the electable set
+ if( info["e"].eoo() ) {
+ // for backwards compatibility
+ const Member *member = theReplSet->findById(mem.id());
+ if (member && member->config().potentiallyHot()) {
+ theReplSet->addToElectable(mem.id());
+ }
+ else {
+ theReplSet->rmFromElectable(mem.id());
+ }
+ }
+ // add this server to the electable set if it is within 10
+ // seconds of the latest optime we know of
+ else if( info["e"].trueValue() &&
+ mem.opTime >= theReplSet->lastOpTimeWritten.getSecs() - 10) {
+ unsigned lastOp = theReplSet->lastOtherOpTime().getSecs();
+ if (lastOp > 0 && mem.opTime >= lastOp - 10) {
+ theReplSet->addToElectable(mem.id());
+ }
+ }
+ else {
+ theReplSet->rmFromElectable(mem.id());
+ }
+
+ be cfg = info["config"];
+ if( cfg.ok() ) {
+ // received a new config
+ boost::function<void()> f =
+ boost::bind(&Manager::msgReceivedNewConfig, theReplSet->mgr, cfg.Obj().copy());
+ theReplSet->mgr->send(f);
+ }
+ }
};
void ReplSetImpl::endOldHealthTasks() {