summaryrefslogtreecommitdiff
path: root/db/repl/heartbeat.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'db/repl/heartbeat.cpp')
-rw-r--r--db/repl/heartbeat.cpp257
1 files changed, 257 insertions, 0 deletions
diff --git a/db/repl/heartbeat.cpp b/db/repl/heartbeat.cpp
new file mode 100644
index 0000000..78ce5d1
--- /dev/null
+++ b/db/repl/heartbeat.cpp
@@ -0,0 +1,257 @@
+/**
+* Copyright (C) 2008 10gen Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+*
+* This program is distributed in the hope that it will be useful,b
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "pch.h"
+#include "rs.h"
+#include "health.h"
+#include "../../util/background.h"
+#include "../../client/dbclient.h"
+#include "../commands.h"
+#include "../../util/concurrency/value.h"
+#include "../../util/concurrency/task.h"
+#include "../../util/concurrency/msg.h"
+#include "../../util/mongoutils/html.h"
+#include "../../util/goodies.h"
+#include "../../util/ramlog.h"
+#include "../helpers/dblogger.h"
+#include "connections.h"
+#include "../../util/unittest.h"
+#include "../instance.h"
+
+namespace mongo {
+
+ using namespace bson;
+
+ extern bool replSetBlind;
+
+ // hacky
+ string *discoveredSeed = 0;
+
+ /* { replSetHeartbeat : <setname> } */
+ class CmdReplSetHeartbeat : public ReplSetCommand {
+ public:
+ virtual bool adminOnly() const { return false; }
+ CmdReplSetHeartbeat() : ReplSetCommand("replSetHeartbeat") { }
+ virtual bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl) {
+ if( replSetBlind )
+ return false;
+
+ /* we don't call ReplSetCommand::check() here because heartbeat
+ checks many things that are pre-initialization. */
+ if( !replSet ) {
+ errmsg = "not running with --replSet";
+ return false;
+ }
+ if( cmdObj["pv"].Int() != 1 ) {
+ errmsg = "incompatible replset protocol version";
+ return false;
+ }
+ {
+ string s = string(cmdObj.getStringField("replSetHeartbeat"));
+ if( cmdLine.ourSetName() != s ) {
+ errmsg = "repl set names do not match";
+ log() << "cmdline: " << cmdLine._replSet << endl;
+ log() << "s: " << s << endl;
+ result.append("mismatch", true);
+ return false;
+ }
+ }
+
+ result.append("rs", true);
+ if( cmdObj["checkEmpty"].trueValue() ) {
+ result.append("hasData", replHasDatabases());
+ }
+ if( theReplSet == 0 ) {
+ string from( cmdObj.getStringField("from") );
+ if( !from.empty() && discoveredSeed == 0 ) {
+ discoveredSeed = new string(from);
+ }
+ errmsg = "still initializing";
+ return false;
+ }
+
+ if( theReplSet->name() != cmdObj.getStringField("replSetHeartbeat") ) {
+ errmsg = "repl set names do not match (2)";
+ result.append("mismatch", true);
+ return false;
+ }
+ result.append("set", theReplSet->name());
+ result.append("state", theReplSet->state().s);
+ result.append("hbmsg", theReplSet->hbmsg());
+ result.append("time", (int) time(0));
+ result.appendDate("opTime", theReplSet->lastOpTimeWritten.asDate());
+ int v = theReplSet->config().version;
+ result.append("v", v);
+ if( v > cmdObj["v"].Int() )
+ result << "config" << theReplSet->config().asBson();
+
+ return true;
+ }
+ } cmdReplSetHeartbeat;
+
+ /* throws dbexception */
+ bool requestHeartbeat(string setName, string from, string memberFullName, BSONObj& result, int myCfgVersion, int& theirCfgVersion, bool checkEmpty) {
+ if( replSetBlind ) {
+ //sleepmillis( rand() );
+ return false;
+ }
+
+ BSONObj cmd = BSON( "replSetHeartbeat" << setName << "v" << myCfgVersion << "pv" << 1 << "checkEmpty" << checkEmpty << "from" << from );
+
+ // we might be talking to ourself - generally not a great idea to do outbound waiting calls in a write lock
+ assert( !dbMutex.isWriteLocked() );
+
+ // these are slow (multisecond to respond), so generally we don't want to be locked, at least not without
+ // thinking acarefully about it first.
+ assert( theReplSet == 0 || !theReplSet->lockedByMe() );
+
+ ScopedConn conn(memberFullName);
+ return conn->runCommand("admin", cmd, result);
+ }
+
+ /* poll every other set member to check its status */
+ class ReplSetHealthPollTask : public task::Task {
+ HostAndPort h;
+ HeartbeatInfo m;
+ public:
+ ReplSetHealthPollTask(const HostAndPort& hh, const HeartbeatInfo& mm) : h(hh), m(mm) { }
+
+ string name() { return "ReplSetHealthPollTask"; }
+ void doWork() {
+ if ( !theReplSet ) {
+ log(2) << "theReplSet not initialized yet, skipping health poll this round" << rsLog;
+ return;
+ }
+
+ HeartbeatInfo mem = m;
+ HeartbeatInfo old = mem;
+ try {
+ BSONObj info;
+ int theirConfigVersion = -10000;
+
+ time_t before = time(0);
+
+ bool ok = requestHeartbeat(theReplSet->name(), theReplSet->selfFullName(), h.toString(), info, theReplSet->config().version, theirConfigVersion);
+
+ time_t after = mem.lastHeartbeat = time(0); // we set this on any response - we don't get this far if couldn't connect because exception is thrown
+
+ try {
+ mem.skew = 0;
+ long long t = info["time"].Long();
+ if( t > after )
+ mem.skew = (int) (t - after);
+ else if( t < before )
+ mem.skew = (int) (t - before); // negative
+ }
+ catch(...) {
+ mem.skew = INT_MIN;
+ }
+
+ {
+ be state = info["state"];
+ if( state.ok() )
+ mem.hbstate = MemberState(state.Int());
+ }
+ if( ok ) {
+ if( mem.upSince == 0 ) {
+ log() << "replSet info " << h.toString() << " is now up" << rsLog;
+ mem.upSince = mem.lastHeartbeat;
+ }
+ mem.health = 1.0;
+ mem.lastHeartbeatMsg = info["hbmsg"].String();
+ if( info.hasElement("opTime") )
+ mem.opTime = info["opTime"].Date();
+
+ be cfg = info["config"];
+ if( cfg.ok() ) {
+ // received a new config
+ boost::function<void()> f =
+ boost::bind(&Manager::msgReceivedNewConfig, theReplSet->mgr, cfg.Obj().copy());
+ theReplSet->mgr->send(f);
+ }
+ }
+ else {
+ down(mem, info.getStringField("errmsg"));
+ }
+ }
+ catch(...) {
+ down(mem, "connect/transport error");
+ }
+ m = mem;
+
+ theReplSet->mgr->send( boost::bind(&ReplSet::msgUpdateHBInfo, theReplSet, mem) );
+
+ static time_t last = 0;
+ time_t now = time(0);
+ if( mem.changed(old) || now-last>4 ) {
+ last = now;
+ theReplSet->mgr->send( boost::bind(&Manager::msgCheckNewState, theReplSet->mgr) );
+ }
+ }
+
+ private:
+ void down(HeartbeatInfo& mem, string msg) {
+ mem.health = 0.0;
+ if( mem.upSince ) {
+ mem.upSince = 0;
+ log() << "replSet info " << h.toString() << " is now down (or slow to respond)" << rsLog;
+ }
+ mem.lastHeartbeatMsg = msg;
+ }
+ };
+
+ void ReplSetImpl::endOldHealthTasks() {
+ unsigned sz = healthTasks.size();
+ for( set<ReplSetHealthPollTask*>::iterator i = healthTasks.begin(); i != healthTasks.end(); i++ )
+ (*i)->halt();
+ healthTasks.clear();
+ if( sz )
+ DEV log() << "replSet debug: cleared old tasks " << sz << endl;
+ }
+
+ void ReplSetImpl::startHealthTaskFor(Member *m) {
+ ReplSetHealthPollTask *task = new ReplSetHealthPollTask(m->h(), m->hbinfo());
+ healthTasks.insert(task);
+ task::repeat(task, 2000);
+ }
+
+ void startSyncThread();
+
+ /** called during repl set startup. caller expects it to return fairly quickly.
+ note ReplSet object is only created once we get a config - so this won't run
+ until the initiation.
+ */
+ void ReplSetImpl::startThreads() {
+ task::fork(mgr);
+
+ /*Member* m = _members.head();
+ while( m ) {
+ ReplSetHealthPollTask *task = new ReplSetHealthPollTask(m->h(), m->hbinfo());
+ healthTasks.insert(task);
+ task::repeat(shared_ptr<task::Task>(task), 2000);
+ m = m->next();
+ }*/
+
+ mgr->send( boost::bind(&Manager::msgCheckNewState, theReplSet->mgr) );
+
+ boost::thread t(startSyncThread);
+ }
+
+}
+
+/* todo:
+ stop bg job and delete on removefromset
+*/