summaryrefslogtreecommitdiff
path: root/db/repl
diff options
context:
space:
mode:
authorAntonin Kral <a.kral@bobek.cz>2010-08-11 12:38:57 +0200
committerAntonin Kral <a.kral@bobek.cz>2010-08-11 12:38:57 +0200
commit7645618fd3914cb8a20561625913c20d49504a49 (patch)
tree8370f846f58f6d71165b7a0e2eda04648584ec76 /db/repl
parent68c73c3c7608b4c87f07440dc3232801720b1168 (diff)
downloadmongodb-7645618fd3914cb8a20561625913c20d49504a49.tar.gz
Imported Upstream version 1.6.0
Diffstat (limited to 'db/repl')
-rw-r--r--db/repl/connections.h91
-rw-r--r--db/repl/consensus.cpp342
-rw-r--r--db/repl/health.cpp389
-rw-r--r--db/repl/health.h50
-rw-r--r--db/repl/heartbeat.cpp257
-rw-r--r--db/repl/manager.cpp179
-rw-r--r--db/repl/multicmd.h70
-rw-r--r--db/repl/replset_commands.cpp293
-rw-r--r--db/repl/rs.cpp500
-rw-r--r--db/repl/rs.h415
-rw-r--r--db/repl/rs_config.cpp315
-rw-r--r--db/repl/rs_config.h88
-rwxr-xr-xdb/repl/rs_exception.h17
-rw-r--r--db/repl/rs_initialsync.cpp214
-rw-r--r--db/repl/rs_initiate.cpp238
-rw-r--r--db/repl/rs_member.h91
-rw-r--r--db/repl/rs_optime.h58
-rw-r--r--db/repl/rs_rollback.cpp481
-rw-r--r--db/repl/rs_sync.cpp328
-rw-r--r--db/repl/test.html11
-rw-r--r--db/repl/testing.js42
21 files changed, 4469 insertions, 0 deletions
diff --git a/db/repl/connections.h b/db/repl/connections.h
new file mode 100644
index 0000000..95defe4
--- /dev/null
+++ b/db/repl/connections.h
@@ -0,0 +1,91 @@
+// @file
+
+/*
+ * Copyright (C) 2010 10gen Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#pragma once
+
+#include <map>
+#include "../../client/dbclient.h"
+
+namespace mongo {
+
+ /** here we keep a single connection (with reconnect) for a set of hosts,
+ one each, and allow one user at a time per host. if in use already for that
+ host, we block. so this is an easy way to keep a 1-deep pool of connections
+ that many threads can share.
+
+ thread-safe.
+
+ Example:
+ {
+ ScopedConn c("foo.acme.com:9999");
+ c->runCommand(...);
+ }
+
+ throws exception on connect error (but fine to try again later with a new
+ scopedconn object for same host).
+ */
+ class ScopedConn {
+ public:
+ /** throws assertions if connect failure etc. */
+ ScopedConn(string hostport);
+ ~ScopedConn();
+ DBClientConnection* operator->();
+ private:
+ auto_ptr<scoped_lock> connLock;
+ static mutex mapMutex;
+ struct X {
+ mutex z;
+ DBClientConnection cc;
+ X() : z("X"), cc(/*reconnect*/true, 0, /*timeout*/10) {
+ cc._logLevel = 2;
+ }
+ } *x;
+ typedef map<string,ScopedConn::X*> M;
+ static M& _map;
+ };
+
+ inline ScopedConn::ScopedConn(string hostport) {
+ bool first = false;
+ {
+ scoped_lock lk(mapMutex);
+ x = _map[hostport];
+ if( x == 0 ) {
+ x = _map[hostport] = new X();
+ first = true;
+ connLock.reset( new scoped_lock(x->z) );
+ }
+ }
+ if( !first ) {
+ connLock.reset( new scoped_lock(x->z) );
+ return;
+ }
+
+ // we already locked above...
+ string err;
+ x->cc.connect(hostport, err);
+ }
+
+ inline ScopedConn::~ScopedConn() {
+ // conLock releases...
+ }
+
+ inline DBClientConnection* ScopedConn::operator->() {
+ return &x->cc;
+ }
+
+}
diff --git a/db/repl/consensus.cpp b/db/repl/consensus.cpp
new file mode 100644
index 0000000..4eba17d
--- /dev/null
+++ b/db/repl/consensus.cpp
@@ -0,0 +1,342 @@
+/**
+* Copyright (C) 2010 10gen Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "pch.h"
+#include "../commands.h"
+#include "rs.h"
+#include "multicmd.h"
+
+namespace mongo {
+
+ class CmdReplSetFresh : public ReplSetCommand {
+ public:
+ CmdReplSetFresh() : ReplSetCommand("replSetFresh") { }
+ private:
+ virtual bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl) {
+ if( !check(errmsg, result) )
+ return false;
+
+ if( cmdObj["set"].String() != theReplSet->name() ) {
+ errmsg = "wrong repl set name";
+ return false;
+ }
+ string who = cmdObj["who"].String();
+ int cfgver = cmdObj["cfgver"].Int();
+ OpTime opTime(cmdObj["opTime"].Date());
+
+ bool weAreFresher = false;
+ if( theReplSet->config().version > cfgver ) {
+ log() << "replSet member " << who << " is not yet aware its cfg version " << cfgver << " is stale" << rsLog;
+ result.append("info", "config version stale");
+ weAreFresher = true;
+ }
+ else if( opTime < theReplSet->lastOpTimeWritten ) {
+ weAreFresher = true;
+ }
+ result.appendDate("opTime", theReplSet->lastOpTimeWritten.asDate());
+ result.append("fresher", weAreFresher);
+ return true;
+ }
+ } cmdReplSetFresh;
+
+ class CmdReplSetElect : public ReplSetCommand {
+ public:
+ CmdReplSetElect() : ReplSetCommand("replSetElect") { }
+ private:
+ virtual bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl) {
+ if( !check(errmsg, result) )
+ return false;
+ //task::lam f = boost::bind(&Consensus::electCmdReceived, &theReplSet->elect, cmdObj, &result);
+ //theReplSet->mgr->call(f);
+ theReplSet->elect.electCmdReceived(cmdObj, &result);
+ return true;
+ }
+ } cmdReplSetElect;
+
+ int Consensus::totalVotes() const {
+ static int complain = 0;
+ int vTot = rs._self->config().votes;
+ for( Member *m = rs.head(); m; m=m->next() )
+ vTot += m->config().votes;
+ if( vTot % 2 == 0 && vTot && complain++ == 0 )
+ log() << "replSet warning total number of votes is even - considering giving one member an extra vote" << rsLog;
+ return vTot;
+ }
+
+ bool Consensus::aMajoritySeemsToBeUp() const {
+ int vUp = rs._self->config().votes;
+ for( Member *m = rs.head(); m; m=m->next() )
+ vUp += m->hbinfo().up() ? m->config().votes : 0;
+ return vUp * 2 > totalVotes();
+ }
+
+ static const int VETO = -10000;
+
+ const time_t LeaseTime = 30;
+
+ unsigned Consensus::yea(unsigned memberId) /* throws VoteException */ {
+ Atomic<LastYea>::tran t(ly);
+ LastYea &ly = t.ref();
+ time_t now = time(0);
+ if( ly.when + LeaseTime >= now && ly.who != memberId ) {
+ log(1) << "replSet not voting yea for " << memberId <<
+ " voted for " << ly.who << ' ' << now-ly.when << " secs ago" << rsLog;
+ throw VoteException();
+ }
+ ly.when = now;
+ ly.who = memberId;
+ return rs._self->config().votes;
+ }
+
+ /* we vote for ourself at start of election. once it fails, we can cancel the lease we had in
+ place instead of leaving it for a long time.
+ */
+ void Consensus::electionFailed(unsigned meid) {
+ Atomic<LastYea>::tran t(ly);
+ LastYea &L = t.ref();
+ DEV assert( L.who == meid ); // this may not always always hold, so be aware, but adding for now as a quick sanity test
+ if( L.who == meid )
+ L.when = 0;
+ }
+
+ /* todo: threading **************** !!!!!!!!!!!!!!!! */
+ void Consensus::electCmdReceived(BSONObj cmd, BSONObjBuilder* _b) {
+ BSONObjBuilder& b = *_b;
+ DEV log() << "replSet received elect msg " << cmd.toString() << rsLog;
+ else log(2) << "replSet received elect msg " << cmd.toString() << rsLog;
+ string set = cmd["set"].String();
+ unsigned whoid = cmd["whoid"].Int();
+ int cfgver = cmd["cfgver"].Int();
+ OID round = cmd["round"].OID();
+ int myver = rs.config().version;
+
+ int vote = 0;
+ if( set != rs.name() ) {
+ log() << "replSet error received an elect request for '" << set << "' but our set name is '" << rs.name() << "'" << rsLog;
+
+ }
+ else if( myver < cfgver ) {
+ // we are stale. don't vote
+ }
+ else if( myver > cfgver ) {
+ // they are stale!
+ log() << "replSet info got stale version # during election" << rsLog;
+ vote = -10000;
+ }
+ else {
+ try {
+ vote = yea(whoid);
+ rs.relinquish();
+ log() << "replSet info voting yea for " << whoid << rsLog;
+ }
+ catch(VoteException&) {
+ log() << "replSet voting no already voted for another" << rsLog;
+ }
+ }
+
+ b.append("vote", vote);
+ b.append("round", round);
+ }
+
+ void ReplSetImpl::_getTargets(list<Target>& L, int& configVersion) {
+ configVersion = config().version;
+ for( Member *m = head(); m; m=m->next() )
+ if( m->hbinfo().up() )
+ L.push_back( Target(m->fullName()) );
+ }
+
+ /* config version is returned as it is ok to use this unlocked. BUT, if unlocked, you would need
+ to check later that the config didn't change. */
+ void ReplSetImpl::getTargets(list<Target>& L, int& configVersion) {
+ if( lockedByMe() ) {
+ _getTargets(L, configVersion);
+ return;
+ }
+ lock lk(this);
+ _getTargets(L, configVersion);
+ }
+
+ /* Do we have the newest data of them all?
+ @param allUp - set to true if all members are up. Only set if true returned.
+ @return true if we are freshest. Note we may tie.
+ */
+ bool Consensus::weAreFreshest(bool& allUp, int& nTies) {
+ const OpTime ord = theReplSet->lastOpTimeWritten;
+ nTies = 0;
+ assert( !ord.isNull() );
+ BSONObj cmd = BSON(
+ "replSetFresh" << 1 <<
+ "set" << rs.name() <<
+ "opTime" << Date_t(ord.asDate()) <<
+ "who" << rs._self->fullName() <<
+ "cfgver" << rs._cfg->version );
+ list<Target> L;
+ int ver;
+ rs.getTargets(L, ver);
+ multiCommand(cmd, L);
+ int nok = 0;
+ allUp = true;
+ for( list<Target>::iterator i = L.begin(); i != L.end(); i++ ) {
+ if( i->ok ) {
+ nok++;
+ if( i->result["fresher"].trueValue() )
+ return false;
+ OpTime remoteOrd( i->result["opTime"].Date() );
+ if( remoteOrd == ord )
+ nTies++;
+ assert( remoteOrd <= ord );
+ }
+ else {
+ DEV log() << "replSet freshest returns " << i->result.toString() << rsLog;
+ allUp = false;
+ }
+ }
+ DEV log() << "replSet dev we are freshest of up nodes, nok:" << nok << " nTies:" << nTies << rsLog;
+ assert( ord <= theReplSet->lastOpTimeWritten ); // <= as this may change while we are working...
+ return true;
+ }
+
+ extern time_t started;
+
+ void Consensus::multiCommand(BSONObj cmd, list<Target>& L) {
+ assert( !rs.lockedByMe() );
+ mongo::multiCommand(cmd, L);
+ }
+
+ void Consensus::_electSelf() {
+ if( time(0) < steppedDown )
+ return;
+
+ {
+ const OpTime ord = theReplSet->lastOpTimeWritten;
+ if( ord == 0 ) {
+ log() << "replSet info not trying to elect self, do not yet have a complete set of data from any point in time" << rsLog;
+ return;
+ }
+ }
+
+ bool allUp;
+ int nTies;
+ if( !weAreFreshest(allUp, nTies) ) {
+ log() << "replSet info not electing self, we are not freshest" << rsLog;
+ return;
+ }
+
+ rs.sethbmsg("",9);
+
+ if( !allUp && time(0) - started < 60 * 5 ) {
+ /* the idea here is that if a bunch of nodes bounce all at once, we don't want to drop data
+ if we don't have to -- we'd rather be offline and wait a little longer instead
+ todo: make this configurable.
+ */
+ rs.sethbmsg("not electing self, not all members up and we have been up less than 5 minutes");
+ return;
+ }
+
+ Member& me = *rs._self;
+
+ if( nTies ) {
+ /* tie? we then randomly sleep to try to not collide on our voting. */
+ /* todo: smarter. */
+ if( me.id() == 0 || sleptLast ) {
+ // would be fine for one node not to sleep
+ // todo: biggest / highest priority nodes should be the ones that get to not sleep
+ } else {
+ assert( !rs.lockedByMe() ); // bad to go to sleep locked
+ unsigned ms = ((unsigned) rand()) % 1000 + 50;
+ DEV log() << "replSet tie " << nTies << " sleeping a little " << ms << "ms" << rsLog;
+ sleptLast = true;
+ sleepmillis(ms);
+ throw RetryAfterSleepException();
+ }
+ }
+ sleptLast = false;
+
+ time_t start = time(0);
+ unsigned meid = me.id();
+ int tally = yea( meid );
+ bool success = false;
+ try {
+ log() << "replSet info electSelf " << meid << rsLog;
+
+ BSONObj electCmd = BSON(
+ "replSetElect" << 1 <<
+ "set" << rs.name() <<
+ "who" << me.fullName() <<
+ "whoid" << me.hbinfo().id() <<
+ "cfgver" << rs._cfg->version <<
+ "round" << OID::gen() /* this is just for diagnostics */
+ );
+
+ int configVersion;
+ list<Target> L;
+ rs.getTargets(L, configVersion);
+ multiCommand(electCmd, L);
+
+ {
+ RSBase::lock lk(&rs);
+ for( list<Target>::iterator i = L.begin(); i != L.end(); i++ ) {
+ DEV log() << "replSet elect res: " << i->result.toString() << rsLog;
+ if( i->ok ) {
+ int v = i->result["vote"].Int();
+ tally += v;
+ }
+ }
+ if( tally*2 <= totalVotes() ) {
+ log() << "replSet couldn't elect self, only received " << tally << " votes" << rsLog;
+ }
+ else if( time(0) - start > 30 ) {
+ // defensive; should never happen as we have timeouts on connection and operation for our conn
+ log() << "replSet too much time passed during our election, ignoring result" << rsLog;
+ }
+ else if( configVersion != rs.config().version ) {
+ log() << "replSet config version changed during our election, ignoring result" << rsLog;
+ }
+ else {
+ /* succeeded. */
+ log(1) << "replSet election succeeded, assuming primary role" << rsLog;
+ success = true;
+ rs.assumePrimary();
+ }
+ }
+ } catch( std::exception& ) {
+ if( !success ) electionFailed(meid);
+ throw;
+ }
+ if( !success ) electionFailed(meid);
+ }
+
+ void Consensus::electSelf() {
+ assert( !rs.lockedByMe() );
+ assert( !rs.myConfig().arbiterOnly );
+ try {
+ _electSelf();
+ }
+ catch(RetryAfterSleepException&) {
+ throw;
+ }
+ catch(VoteException& ) {
+ log() << "replSet not trying to elect self as responded yea to someone else recently" << rsLog;
+ }
+ catch(DBException& e) {
+ log() << "replSet warning caught unexpected exception in electSelf() " << e.toString() << rsLog;
+ }
+ catch(...) {
+ log() << "replSet warning caught unexpected exception in electSelf()" << rsLog;
+ }
+ }
+
+}
diff --git a/db/repl/health.cpp b/db/repl/health.cpp
new file mode 100644
index 0000000..b0be25f
--- /dev/null
+++ b/db/repl/health.cpp
@@ -0,0 +1,389 @@
+/**
+* Copyright (C) 2008 10gen Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+*
+* This program is distributed in the hope that it will be useful,b
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "pch.h"
+#include "rs.h"
+#include "health.h"
+#include "../../util/background.h"
+#include "../../client/dbclient.h"
+#include "../commands.h"
+#include "../../util/concurrency/value.h"
+#include "../../util/concurrency/task.h"
+#include "../../util/mongoutils/html.h"
+#include "../../util/goodies.h"
+#include "../../util/ramlog.h"
+#include "../helpers/dblogger.h"
+#include "connections.h"
+#include "../../util/unittest.h"
+#include "../dbhelpers.h"
+
+namespace mongo {
+ /* decls for connections.h */
+ ScopedConn::M& ScopedConn::_map = *(new ScopedConn::M());
+ mutex ScopedConn::mapMutex("ScopedConn::mapMutex");
+}
+
+namespace mongo {
+
+ using namespace mongoutils::html;
+ using namespace bson;
+
+ static RamLog _rsLog;
+ Tee *rsLog = &_rsLog;
+
+ string ago(time_t t) {
+ if( t == 0 ) return "";
+
+ time_t x = time(0) - t;
+ stringstream s;
+ if( x < 180 ) {
+ s << x << " sec";
+ if( x != 1 ) s << 's';
+ }
+ else if( x < 3600 ) {
+ s.precision(2);
+ s << x / 60.0 << " mins";
+ }
+ else {
+ s.precision(2);
+ s << x / 3600.0 << " hrs";
+ }
+ return s.str();
+ }
+
+ void Member::summarizeMember(stringstream& s) const {
+ s << tr();
+ {
+ stringstream u;
+ u << "http://" << h().host() << ':' << (h().port() + 1000) << "/_replSet";
+ s << td( a(u.str(), "", fullName()) );
+ }
+ s << td( id() );
+ double h = hbinfo().health;
+ bool ok = h > 0;
+ s << td(red(str::stream() << h,h == 0));
+ s << td(ago(hbinfo().upSince));
+ bool never = false;
+ {
+ string h;
+ time_t hb = hbinfo().lastHeartbeat;
+ if( hb == 0 ) {
+ h = "never";
+ never = true;
+ }
+ else h = ago(hb) + " ago";
+ s << td(h);
+ }
+ s << td(config().votes);
+ {
+ string stateText = ReplSet::stateAsStr(state());
+ if( ok || stateText.empty() )
+ s << td(stateText); // text blank if we've never connected
+ else
+ s << td( grey(str::stream() << "(was " << ReplSet::stateAsStr(state()) << ')', true) );
+ }
+ s << td( grey(hbinfo().lastHeartbeatMsg,!ok) );
+ stringstream q;
+ q << "/_replSetOplog?" << id();
+ s << td( a(q.str(), "", never ? "?" : hbinfo().opTime.toString()) );
+ if( hbinfo().skew > INT_MIN ) {
+ s << td( grey(str::stream() << hbinfo().skew,!ok) );
+ } else
+ s << td("");
+ s << _tr();
+ }
+
+ string ReplSetImpl::stateAsHtml(MemberState s) {
+ if( s.s == MemberState::RS_STARTUP ) return a("", "serving still starting up, or still trying to initiate the set", "STARTUP");
+ if( s.s == MemberState::RS_PRIMARY ) return a("", "this server thinks it is primary", "PRIMARY");
+ if( s.s == MemberState::RS_SECONDARY ) return a("", "this server thinks it is a secondary (slave mode)", "SECONDARY");
+ if( s.s == MemberState::RS_RECOVERING ) return a("", "recovering/resyncing; after recovery usually auto-transitions to secondary", "RECOVERING");
+ if( s.s == MemberState::RS_FATAL ) return a("", "something bad has occurred and server is not completely offline with regard to the replica set. fatal error.", "RS_FATAL");
+ if( s.s == MemberState::RS_STARTUP2 ) return a("", "loaded config, still determining who is primary", "RS_STARTUP2");
+ if( s.s == MemberState::RS_ARBITER ) return a("", "this server is an arbiter only", "ARBITER");
+ if( s.s == MemberState::RS_DOWN ) return a("", "member is down, slow, or unreachable", "DOWN");
+ return "";
+ }
+
+ string ReplSetImpl::stateAsStr(MemberState s) {
+ if( s.s == MemberState::RS_STARTUP ) return "STARTUP";
+ if( s.s == MemberState::RS_PRIMARY ) return "PRIMARY";
+ if( s.s == MemberState::RS_SECONDARY ) return "SECONDARY";
+ if( s.s == MemberState::RS_RECOVERING ) return "RECOVERING";
+ if( s.s == MemberState::RS_FATAL ) return "FATAL";
+ if( s.s == MemberState::RS_STARTUP2 ) return "STARTUP2";
+ if( s.s == MemberState::RS_ARBITER ) return "ARBITER";
+ if( s.s == MemberState::RS_DOWN ) return "DOWN";
+ return "";
+ }
+
+ extern time_t started;
+
+ // oplogdiags in web ui
+ static void say(stringstream&ss, const bo& op) {
+ ss << "<tr>";
+
+ set<string> skip;
+ be e = op["ts"];
+ if( e.type() == Date || e.type() == Timestamp ) {
+ OpTime ot = e._opTime();
+ ss << td( time_t_to_String_short( ot.getSecs() ) );
+ ss << td( ot.toString() );
+ skip.insert("ts");
+ }
+ else ss << td("?") << td("?");
+
+ e = op["h"];
+ if( e.type() == NumberLong ) {
+ ss << "<td>" << hex << e.Long() << "</td>\n";
+ skip.insert("h");
+ } else
+ ss << td("?");
+
+ ss << td(op["op"].valuestrsafe());
+ ss << td(op["ns"].valuestrsafe());
+ skip.insert("op");
+ skip.insert("ns");
+
+ ss << "<td>";
+ for( bo::iterator i(op); i.more(); ) {
+ be e = i.next();
+ if( skip.count(e.fieldName()) ) continue;
+ ss << e.toString() << ' ';
+ }
+ ss << "</td>";
+
+ ss << "</tr>";
+ ss << '\n';
+ }
+
+ void ReplSetImpl::_getOplogDiagsAsHtml(unsigned server_id, stringstream& ss) const {
+ const Member *m = findById(server_id);
+ if( m == 0 ) {
+ ss << "Error : can't find a member with id: " << server_id << '\n';
+ return;
+ }
+
+ ss << p("Server : " + m->fullName() + "<br>ns : " + rsoplog );
+
+ //const bo fields = BSON( "o" << false << "o2" << false );
+ const bo fields;
+
+ ScopedConn conn(m->fullName());
+
+ auto_ptr<DBClientCursor> c = conn->query(rsoplog, Query().sort("$natural",1), 20, 0, &fields);
+ if( c.get() == 0 ) {
+ ss << "couldn't query " << rsoplog;
+ return;
+ }
+ static const char *h[] = {"ts","optime", "h","op","ns","rest",0};
+
+ ss << "<style type=\"text/css\" media=\"screen\">"
+ "table { font-size:75% }\n"
+// "th { background-color:#bbb; color:#000 }\n"
+// "td,th { padding:.25em }\n"
+ "</style>\n";
+
+ ss << table(h, true);
+ //ss << "<pre>\n";
+ int n = 0;
+ OpTime otFirst;
+ OpTime otLast;
+ OpTime otEnd;
+ while( c->more() ) {
+ bo o = c->next();
+ otLast = o["ts"]._opTime();
+ if( otFirst.isNull() )
+ otFirst = otLast;
+ say(ss, o);
+ n++;
+ }
+ if( n == 0 ) {
+ ss << rsoplog << " is empty\n";
+ }
+ else {
+ auto_ptr<DBClientCursor> c = conn->query(rsoplog, Query().sort("$natural",-1), 20, 0, &fields);
+ if( c.get() == 0 ) {
+ ss << "couldn't query [2] " << rsoplog;
+ return;
+ }
+ string x;
+ bo o = c->next();
+ otEnd = o["ts"]._opTime();
+ while( 1 ) {
+ stringstream z;
+ if( o["ts"]._opTime() == otLast )
+ break;
+ say(z, o);
+ x = z.str() + x;
+ if( !c->more() )
+ break;
+ o = c->next();
+ }
+ if( !x.empty() ) {
+ ss << "<tr><td>...</td><td>...</td><td>...</td><td>...</td><td>...</td></tr>\n" << x;
+ //ss << "\n...\n\n" << x;
+ }
+ }
+ ss << _table();
+ ss << p(time_t_to_String_short(time(0)) + " current time");
+
+ //ss << "</pre>\n";
+
+ if( !otEnd.isNull() ) {
+ ss << "<p>Log length in time: ";
+ unsigned d = otEnd.getSecs() - otFirst.getSecs();
+ double h = d / 3600.0;
+ ss.precision(3);
+ if( h < 72 )
+ ss << h << " hours";
+ else
+ ss << h / 24.0 << " days";
+ ss << "</p>\n";
+ }
+
+ }
+
+ void ReplSetImpl::_summarizeAsHtml(stringstream& s) const {
+ s << table(0, false);
+ s << tr("Set name:", _name);
+ s << tr("Majority up:", elect.aMajoritySeemsToBeUp()?"yes":"no" );
+ s << _table();
+
+ const char *h[] = {"Member",
+ "<a title=\"member id in the replset config\">id</a>",
+ "Up",
+ "<a title=\"length of time we have been continuously connected to the other member with no reconnects (for self, shows uptime)\">cctime</a>",
+ "<a title=\"when this server last received a heartbeat response - includes error code responses\">Last heartbeat</a>",
+ "Votes", "State", "Status",
+ "<a title=\"how up to date this server is. this value polled every few seconds so actually lag is typically much lower than value shown here.\">optime</a>",
+ "<a title=\"Clock skew in seconds relative to this server. Informational; server clock variances will make the diagnostics hard to read, but otherwise are benign..\">skew</a>",
+ 0};
+ s << table(h);
+
+ /* this is to sort the member rows by their ordinal _id, so they show up in the same
+ order on all the different web ui's; that is less confusing for the operator. */
+ map<int,string> mp;
+
+ string myMinValid;
+ try {
+ readlocktry lk("local.replset.minvalid", 300);
+ if( lk.got() ) {
+ BSONObj mv;
+ if( Helpers::getSingleton("local.replset.minvalid", mv) ) {
+ myMinValid = "minvalid:" + mv["ts"]._opTime().toString();
+ }
+ }
+ else myMinValid = ".";
+ }
+ catch(...) {
+ myMinValid = "exception fetching minvalid";
+ }
+
+ {
+ stringstream s;
+ /* self row */
+ s << tr() << td(_self->fullName() + " (me)") <<
+ td(_self->id()) <<
+ td("1") << //up
+ td(ago(started)) <<
+ td("") << // last heartbeat
+ td(ToString(_self->config().votes)) <<
+ td(stateAsHtml(box.getState()));
+ s << td( _hbmsg );
+ stringstream q;
+ q << "/_replSetOplog?" << _self->id();
+ s << td( a(q.str(), myMinValid, theReplSet->lastOpTimeWritten.toString()) );
+ s << td(""); // skew
+ s << _tr();
+ mp[_self->hbinfo().id()] = s.str();
+ }
+ Member *m = head();
+ while( m ) {
+ stringstream s;
+ m->summarizeMember(s);
+ mp[m->hbinfo().id()] = s.str();
+ m = m->next();
+ }
+
+ for( map<int,string>::const_iterator i = mp.begin(); i != mp.end(); i++ )
+ s << i->second;
+ s << _table();
+ }
+
+
+ void fillRsLog(stringstream& s) {
+ _rsLog.toHTML( s );
+ }
+
+ const Member* ReplSetImpl::findById(unsigned id) const {
+ if( id == _self->id() ) return _self;
+ for( Member *m = head(); m; m = m->next() )
+ if( m->id() == id )
+ return m;
+ return 0;
+ }
+
+ void ReplSetImpl::_summarizeStatus(BSONObjBuilder& b) const {
+ vector<BSONObj> v;
+
+ // add self
+ {
+ HostAndPort h(getHostName(), cmdLine.port);
+
+ BSONObjBuilder bb;
+ bb.append("_id", (int) _self->id());
+ bb.append("name", h.toString());
+ bb.append("health", 1.0);
+ bb.append("state", (int) box.getState().s);
+ string s = _self->lhb();
+ if( !s.empty() )
+ bb.append("errmsg", s);
+ bb.append("self", true);
+ v.push_back(bb.obj());
+ }
+
+ Member *m =_members.head();
+ while( m ) {
+ BSONObjBuilder bb;
+ bb.append("_id", (int) m->id());
+ bb.append("name", m->fullName());
+ bb.append("health", m->hbinfo().health);
+ bb.append("state", (int) m->state().s);
+ bb.append("uptime", (unsigned) (m->hbinfo().upSince ? (time(0)-m->hbinfo().upSince) : 0));
+ bb.appendTimeT("lastHeartbeat", m->hbinfo().lastHeartbeat);
+ string s = m->lhb();
+ if( !s.empty() )
+ bb.append("errmsg", s);
+ v.push_back(bb.obj());
+ m = m->next();
+ }
+ sort(v.begin(), v.end());
+ b.append("set", name());
+ b.appendTimeT("date", time(0));
+ b.append("myState", box.getState().s);
+ b.append("members", v);
+ }
+
+ static struct Test : public UnitTest {
+ void run() {
+ HealthOptions a,b;
+ assert( a == b );
+ assert( a.isDefault() );
+ }
+ } test;
+
+}
diff --git a/db/repl/health.h b/db/repl/health.h
new file mode 100644
index 0000000..8b1005e
--- /dev/null
+++ b/db/repl/health.h
@@ -0,0 +1,50 @@
+// replset.h
+
+/**
+* Copyright (C) 2008 10gen Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#pragma once
+
+namespace mongo {
+
+ /* throws */
+ bool requestHeartbeat(string setname, string fromHost, string memberFullName, BSONObj& result, int myConfigVersion, int& theirConfigVersion, bool checkEmpty = false);
+
+ struct HealthOptions {
+ HealthOptions() {
+ heartbeatSleepMillis = 2000;
+ heartbeatTimeoutMillis = 10000;
+ heartbeatConnRetries = 3;
+ }
+
+ bool isDefault() const { return *this == HealthOptions(); }
+
+ // see http://www.mongodb.org/display/DOCS/Replica+Set+Internals
+ unsigned heartbeatSleepMillis;
+ unsigned heartbeatTimeoutMillis;
+ unsigned heartbeatConnRetries ;
+
+ void check() {
+ uassert(13112, "bad replset heartbeat option", heartbeatSleepMillis >= 10);
+ uassert(13113, "bad replset heartbeat option", heartbeatTimeoutMillis >= 10);
+ }
+
+ bool operator==(const HealthOptions& r) const {
+ return heartbeatSleepMillis==r.heartbeatSleepMillis && heartbeatTimeoutMillis==r.heartbeatTimeoutMillis && heartbeatConnRetries==heartbeatConnRetries;
+ }
+ };
+
+}
diff --git a/db/repl/heartbeat.cpp b/db/repl/heartbeat.cpp
new file mode 100644
index 0000000..78ce5d1
--- /dev/null
+++ b/db/repl/heartbeat.cpp
@@ -0,0 +1,257 @@
+/**
+* Copyright (C) 2008 10gen Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+*
+* This program is distributed in the hope that it will be useful,b
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "pch.h"
+#include "rs.h"
+#include "health.h"
+#include "../../util/background.h"
+#include "../../client/dbclient.h"
+#include "../commands.h"
+#include "../../util/concurrency/value.h"
+#include "../../util/concurrency/task.h"
+#include "../../util/concurrency/msg.h"
+#include "../../util/mongoutils/html.h"
+#include "../../util/goodies.h"
+#include "../../util/ramlog.h"
+#include "../helpers/dblogger.h"
+#include "connections.h"
+#include "../../util/unittest.h"
+#include "../instance.h"
+
+namespace mongo {
+
+ using namespace bson;
+
+ extern bool replSetBlind;
+
+ // hacky
+ string *discoveredSeed = 0;
+
+ /* { replSetHeartbeat : <setname> } */
+ class CmdReplSetHeartbeat : public ReplSetCommand {
+ public:
+ virtual bool adminOnly() const { return false; }
+ CmdReplSetHeartbeat() : ReplSetCommand("replSetHeartbeat") { }
+ virtual bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl) {
+ if( replSetBlind )
+ return false;
+
+ /* we don't call ReplSetCommand::check() here because heartbeat
+ checks many things that are pre-initialization. */
+ if( !replSet ) {
+ errmsg = "not running with --replSet";
+ return false;
+ }
+ if( cmdObj["pv"].Int() != 1 ) {
+ errmsg = "incompatible replset protocol version";
+ return false;
+ }
+ {
+ string s = string(cmdObj.getStringField("replSetHeartbeat"));
+ if( cmdLine.ourSetName() != s ) {
+ errmsg = "repl set names do not match";
+ log() << "cmdline: " << cmdLine._replSet << endl;
+ log() << "s: " << s << endl;
+ result.append("mismatch", true);
+ return false;
+ }
+ }
+
+ result.append("rs", true);
+ if( cmdObj["checkEmpty"].trueValue() ) {
+ result.append("hasData", replHasDatabases());
+ }
+ if( theReplSet == 0 ) {
+ string from( cmdObj.getStringField("from") );
+ if( !from.empty() && discoveredSeed == 0 ) {
+ discoveredSeed = new string(from);
+ }
+ errmsg = "still initializing";
+ return false;
+ }
+
+ if( theReplSet->name() != cmdObj.getStringField("replSetHeartbeat") ) {
+ errmsg = "repl set names do not match (2)";
+ result.append("mismatch", true);
+ return false;
+ }
+ result.append("set", theReplSet->name());
+ result.append("state", theReplSet->state().s);
+ result.append("hbmsg", theReplSet->hbmsg());
+ result.append("time", (int) time(0));
+ result.appendDate("opTime", theReplSet->lastOpTimeWritten.asDate());
+ int v = theReplSet->config().version;
+ result.append("v", v);
+ if( v > cmdObj["v"].Int() )
+ result << "config" << theReplSet->config().asBson();
+
+ return true;
+ }
+ } cmdReplSetHeartbeat;
+
+ /* throws dbexception */
+ bool requestHeartbeat(string setName, string from, string memberFullName, BSONObj& result, int myCfgVersion, int& theirCfgVersion, bool checkEmpty) {
+ if( replSetBlind ) {
+ //sleepmillis( rand() );
+ return false;
+ }
+
+ BSONObj cmd = BSON( "replSetHeartbeat" << setName << "v" << myCfgVersion << "pv" << 1 << "checkEmpty" << checkEmpty << "from" << from );
+
+ // we might be talking to ourself - generally not a great idea to do outbound waiting calls in a write lock
+ assert( !dbMutex.isWriteLocked() );
+
+ // these are slow (multisecond to respond), so generally we don't want to be locked, at least not without
+ // thinking acarefully about it first.
+ assert( theReplSet == 0 || !theReplSet->lockedByMe() );
+
+ ScopedConn conn(memberFullName);
+ return conn->runCommand("admin", cmd, result);
+ }
+
+ /* poll every other set member to check its status */
+ class ReplSetHealthPollTask : public task::Task {
+ HostAndPort h;
+ HeartbeatInfo m;
+ public:
+ ReplSetHealthPollTask(const HostAndPort& hh, const HeartbeatInfo& mm) : h(hh), m(mm) { }
+
+ string name() { return "ReplSetHealthPollTask"; }
+ void doWork() {
+ if ( !theReplSet ) {
+ log(2) << "theReplSet not initialized yet, skipping health poll this round" << rsLog;
+ return;
+ }
+
+ HeartbeatInfo mem = m;
+ HeartbeatInfo old = mem;
+ try {
+ BSONObj info;
+ int theirConfigVersion = -10000;
+
+ time_t before = time(0);
+
+ bool ok = requestHeartbeat(theReplSet->name(), theReplSet->selfFullName(), h.toString(), info, theReplSet->config().version, theirConfigVersion);
+
+ time_t after = mem.lastHeartbeat = time(0); // we set this on any response - we don't get this far if couldn't connect because exception is thrown
+
+ try {
+ mem.skew = 0;
+ long long t = info["time"].Long();
+ if( t > after )
+ mem.skew = (int) (t - after);
+ else if( t < before )
+ mem.skew = (int) (t - before); // negative
+ }
+ catch(...) {
+ mem.skew = INT_MIN;
+ }
+
+ {
+ be state = info["state"];
+ if( state.ok() )
+ mem.hbstate = MemberState(state.Int());
+ }
+ if( ok ) {
+ if( mem.upSince == 0 ) {
+ log() << "replSet info " << h.toString() << " is now up" << rsLog;
+ mem.upSince = mem.lastHeartbeat;
+ }
+ mem.health = 1.0;
+ mem.lastHeartbeatMsg = info["hbmsg"].String();
+ if( info.hasElement("opTime") )
+ mem.opTime = info["opTime"].Date();
+
+ be cfg = info["config"];
+ if( cfg.ok() ) {
+ // received a new config
+ boost::function<void()> f =
+ boost::bind(&Manager::msgReceivedNewConfig, theReplSet->mgr, cfg.Obj().copy());
+ theReplSet->mgr->send(f);
+ }
+ }
+ else {
+ down(mem, info.getStringField("errmsg"));
+ }
+ }
+ catch(...) {
+ down(mem, "connect/transport error");
+ }
+ m = mem;
+
+ theReplSet->mgr->send( boost::bind(&ReplSet::msgUpdateHBInfo, theReplSet, mem) );
+
+ static time_t last = 0;
+ time_t now = time(0);
+ if( mem.changed(old) || now-last>4 ) {
+ last = now;
+ theReplSet->mgr->send( boost::bind(&Manager::msgCheckNewState, theReplSet->mgr) );
+ }
+ }
+
+ private:
+ void down(HeartbeatInfo& mem, string msg) {
+ mem.health = 0.0;
+ if( mem.upSince ) {
+ mem.upSince = 0;
+ log() << "replSet info " << h.toString() << " is now down (or slow to respond)" << rsLog;
+ }
+ mem.lastHeartbeatMsg = msg;
+ }
+ };
+
+ void ReplSetImpl::endOldHealthTasks() {
+ unsigned sz = healthTasks.size();
+ for( set<ReplSetHealthPollTask*>::iterator i = healthTasks.begin(); i != healthTasks.end(); i++ )
+ (*i)->halt();
+ healthTasks.clear();
+ if( sz )
+ DEV log() << "replSet debug: cleared old tasks " << sz << endl;
+ }
+
+ void ReplSetImpl::startHealthTaskFor(Member *m) {
+ ReplSetHealthPollTask *task = new ReplSetHealthPollTask(m->h(), m->hbinfo());
+ healthTasks.insert(task);
+ task::repeat(task, 2000);
+ }
+
+ void startSyncThread();
+
+ /** called during repl set startup. caller expects it to return fairly quickly.
+ note ReplSet object is only created once we get a config - so this won't run
+ until the initiation.
+ */
+ void ReplSetImpl::startThreads() {
+ task::fork(mgr);
+
+ /*Member* m = _members.head();
+ while( m ) {
+ ReplSetHealthPollTask *task = new ReplSetHealthPollTask(m->h(), m->hbinfo());
+ healthTasks.insert(task);
+ task::repeat(shared_ptr<task::Task>(task), 2000);
+ m = m->next();
+ }*/
+
+ mgr->send( boost::bind(&Manager::msgCheckNewState, theReplSet->mgr) );
+
+ boost::thread t(startSyncThread);
+ }
+
+}
+
+/* todo:
+ stop bg job and delete on removefromset
+*/
diff --git a/db/repl/manager.cpp b/db/repl/manager.cpp
new file mode 100644
index 0000000..e870688
--- /dev/null
+++ b/db/repl/manager.cpp
@@ -0,0 +1,179 @@
+/* @file manager.cpp
+*/
+
+/**
+* Copyright (C) 2008 10gen Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+*
+* This program is distributed in the hope that it will be useful,b
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "pch.h"
+#include "rs.h"
+#include "../client.h"
+
+namespace mongo {
+
+ enum {
+ NOPRIMARY = -2,
+ SELFPRIMARY = -1
+ };
+
+ /* check members OTHER THAN US to see if they think they are primary */
+ const Member * Manager::findOtherPrimary() {
+ Member *m = rs->head();
+ Member *p = 0;
+ while( m ) {
+ if( m->state().primary() && m->hbinfo().up() ) {
+ if( p ) throw "twomasters"; // our polling is asynchronous, so this is often ok.
+ p = m;
+ }
+ m = m->next();
+ }
+ if( p )
+ noteARemoteIsPrimary(p);
+ return p;
+ }
+
+ Manager::Manager(ReplSetImpl *_rs) :
+ task::Server("rs Manager"), rs(_rs), busyWithElectSelf(false), _primary(NOPRIMARY)
+ {
+ }
+
+ Manager::~Manager() {
+ log() << "ERROR: ~Manager should never be called" << rsLog;
+ rs->mgr = 0;
+ assert(false);
+ }
+
+ void Manager::starting() {
+ Client::initThread("rs Manager");
+ }
+
+ void Manager::noteARemoteIsPrimary(const Member *m) {
+ if( rs->box.getPrimary() == m )
+ return;
+ rs->_self->lhb() = "";
+ rs->box.set(rs->iAmArbiterOnly() ? MemberState::RS_ARBITER : MemberState::RS_RECOVERING, m);
+ }
+
+ /** called as the health threads get new results */
+ void Manager::msgCheckNewState() {
+ {
+ theReplSet->assertValid();
+ rs->assertValid();
+
+ RSBase::lock lk(rs);
+
+ if( busyWithElectSelf ) return;
+
+ const Member *p = rs->box.getPrimary();
+ if( p && p != rs->_self ) {
+ if( !p->hbinfo().up() ||
+ !p->hbinfo().hbstate.primary() )
+ {
+ p = 0;
+ rs->box.setOtherPrimary(0);
+ }
+ }
+
+ const Member *p2;
+ try { p2 = findOtherPrimary(); }
+ catch(string s) {
+ /* two other nodes think they are primary (asynchronously polled) -- wait for things to settle down. */
+ log() << "replSet warning DIAG 2 primary" << s << rsLog;
+ return;
+ }
+
+ if( p2 ) {
+ /* someone else thinks they are primary. */
+ if( p == p2 ) {
+ // we thought the same; all set.
+ return;
+ }
+ if( p == 0 ) {
+ noteARemoteIsPrimary(p2);
+ return;
+ }
+ // todo xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
+ if( p != rs->_self ) {
+ // switch primary from oldremotep->newremotep2
+ noteARemoteIsPrimary(p2);
+ return;
+ }
+ /* we thought we were primary, yet now someone else thinks they are. */
+ if( !rs->elect.aMajoritySeemsToBeUp() ) {
+ /* we can't see a majority. so the other node is probably the right choice. */
+ noteARemoteIsPrimary(p2);
+ return;
+ }
+ /* ignore for now, keep thinking we are master.
+ this could just be timing (we poll every couple seconds) or could indicate
+ a problem? if it happens consistently for a duration of time we should
+ alert the sysadmin.
+ */
+ return;
+ }
+
+ /* didn't find anyone who wants to be primary */
+
+ if( p ) {
+ /* we are already primary */
+
+ if( p != rs->_self ) {
+ rs->sethbmsg("error p != rs->self in checkNewState");
+ log() << "replSet " << p->fullName() << rsLog;
+ log() << "replSet " << rs->_self->fullName() << rsLog;
+ return;
+ }
+
+ if( !rs->elect.aMajoritySeemsToBeUp() ) {
+ log() << "replSet can't see a majority of the set, relinquishing primary" << rsLog;
+ rs->relinquish();
+ }
+
+ return;
+ }
+
+ if( !rs->iAmPotentiallyHot() ) // if not we never try to be primary
+ return;
+
+ /* TODO : CHECK PRIORITY HERE. can't be elected if priority zero. */
+
+ /* no one seems to be primary. shall we try to elect ourself? */
+ if( !rs->elect.aMajoritySeemsToBeUp() ) {
+ static time_t last;
+ static int n;
+ int ll = 0;
+ if( ++n > 5 ) ll++;
+ if( last + 60 > time(0 ) ) ll++;
+ log(ll) << "replSet can't see a majority, will not try to elect self" << rsLog;
+ last = time(0);
+ return;
+ }
+
+ busyWithElectSelf = true; // don't try to do further elections & such while we are already working on one.
+ }
+ try {
+ rs->elect.electSelf();
+ }
+ catch(RetryAfterSleepException&) {
+ /* we want to process new inbounds before trying this again. so we just put a checkNewstate in the queue for eval later. */
+ requeue();
+ }
+ catch(...) {
+ log() << "replSet error unexpected assertion in rs manager" << rsLog;
+ }
+ busyWithElectSelf = false;
+ }
+
+}
diff --git a/db/repl/multicmd.h b/db/repl/multicmd.h
new file mode 100644
index 0000000..61c9b5f
--- /dev/null
+++ b/db/repl/multicmd.h
@@ -0,0 +1,70 @@
+// @file multicmd.h
+
+/**
+* Copyright (C) 2008 10gen Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#pragma once
+
+#include "../../util/background.h"
+#include "connections.h"
+
+namespace mongo {
+
+ struct Target {
+ Target(string hostport) : toHost(hostport), ok(false) { }
+ Target() : ok(false) { }
+ string toHost;
+ bool ok;
+ BSONObj result;
+ };
+
+ /* -- implementation ------------- */
+
+ class _MultiCommandJob : public BackgroundJob {
+ public:
+ BSONObj& cmd;
+ Target& d;
+ _MultiCommandJob(BSONObj& _cmd, Target& _d) : cmd(_cmd), d(_d) { }
+ private:
+ string name() { return "MultiCommandJob"; }
+ void run() {
+ try {
+ ScopedConn c(d.toHost);
+ d.ok = c->runCommand("admin", cmd, d.result);
+ }
+ catch(DBException&) {
+ DEV log() << "dev caught dbexception on multiCommand " << d.toHost << rsLog;
+ }
+ }
+ };
+
+ inline void multiCommand(BSONObj cmd, list<Target>& L) {
+ typedef shared_ptr<_MultiCommandJob> P;
+ list<P> jobs;
+ list<BackgroundJob *> _jobs;
+
+ for( list<Target>::iterator i = L.begin(); i != L.end(); i++ ) {
+ Target& d = *i;
+ _MultiCommandJob *j = new _MultiCommandJob(cmd, d);
+ jobs.push_back(P(j));
+ _jobs.push_back(j);
+ }
+
+ BackgroundJob::go(_jobs);
+ BackgroundJob::wait(_jobs,5);
+ }
+
+}
diff --git a/db/repl/replset_commands.cpp b/db/repl/replset_commands.cpp
new file mode 100644
index 0000000..f8f46d5
--- /dev/null
+++ b/db/repl/replset_commands.cpp
@@ -0,0 +1,293 @@
+/**
+* Copyright (C) 2008 10gen Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "pch.h"
+#include "../cmdline.h"
+#include "../commands.h"
+#include "health.h"
+#include "rs.h"
+#include "rs_config.h"
+#include "../dbwebserver.h"
+#include "../../util/mongoutils/html.h"
+#include "../../client/dbclient.h"
+
+namespace mongo {
+
+ void checkMembersUpForConfigChange(const ReplSetConfig& cfg, bool initial);
+
+ /* commands in other files:
+ replSetHeartbeat - health.cpp
+ replSetInitiate - rs_mod.cpp
+ */
+
+ bool replSetBlind = false;
+
+ class CmdReplSetTest : public ReplSetCommand {
+ public:
+ virtual void help( stringstream &help ) const {
+ help << "Just for testing : do not use.\n";
+ }
+ CmdReplSetTest() : ReplSetCommand("replSetTest") { }
+ virtual bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl) {
+ if( !check(errmsg, result) )
+ return false;
+ if( cmdObj.hasElement("blind") ) {
+ replSetBlind = cmdObj.getBoolField("blind");
+ log() << "replSet info replSetTest command received, replSetBlind=" << replSetBlind << rsLog;
+ return true;
+ }
+ return false;
+ }
+ } cmdReplSetTest;
+
+ class CmdReplSetGetRBID : public ReplSetCommand {
+ public:
+ int rbid;
+ virtual void help( stringstream &help ) const {
+ help << "internal";
+ }
+ CmdReplSetGetRBID() : ReplSetCommand("replSetGetRBID") {
+ rbid = (int) curTimeMillis();
+ }
+ virtual bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl) {
+ if( !check(errmsg, result) )
+ return false;
+ result.append("rbid",rbid);
+ return true;
+ }
+ } cmdReplSetRBID;
+
+ using namespace bson;
+ int getRBID(DBClientConnection *c) {
+ bo info;
+ c->simpleCommand("admin", &info, "replSetGetRBID");
+ return info["rbid"].numberInt();
+ }
+
+ class CmdReplSetGetStatus : public ReplSetCommand {
+ public:
+ virtual void help( stringstream &help ) const {
+ help << "Report status of a replica set from the POV of this server\n";
+ help << "{ replSetGetStatus : 1 }";
+ help << "\nhttp://www.mongodb.org/display/DOCS/Replica+Set+Commands";
+ }
+ CmdReplSetGetStatus() : ReplSetCommand("replSetGetStatus", true) { }
+ virtual bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl) {
+ if( !check(errmsg, result) )
+ return false;
+ theReplSet->summarizeStatus(result);
+ return true;
+ }
+ } cmdReplSetGetStatus;
+
+ class CmdReplSetReconfig : public ReplSetCommand {
+ RWLock mutex; /* we don't need rw but we wanted try capability. :-( */
+ public:
+ virtual void help( stringstream &help ) const {
+ help << "Adjust configuration of a replica set\n";
+ help << "{ replSetReconfig : config_object }";
+ help << "\nhttp://www.mongodb.org/display/DOCS/Replica+Set+Commands";
+ }
+ CmdReplSetReconfig() : ReplSetCommand("replSetReconfig"), mutex("rsreconfig") { }
+ virtual bool run(const string& a, BSONObj& b, string& errmsg, BSONObjBuilder& c, bool d) {
+ try {
+ rwlock_try_write lk(mutex);
+ return _run(a,b,errmsg,c,d);
+ }
+ catch(rwlock_try_write::exception&) { }
+ errmsg = "a replSetReconfig is already in progress";
+ return false;
+ }
+ private:
+ bool _run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl) {
+ if( !check(errmsg, result) )
+ return false;
+ if( !theReplSet->box.getState().primary() ) {
+ errmsg = "replSetReconfig command must be sent to the current replica set primary.";
+ return false;
+ }
+
+ {
+ // just make sure we can get a write lock before doing anything else. we'll reacquire one
+ // later. of course it could be stuck then, but this check lowers the risk if weird things
+ // are up - we probably don't want a change to apply 30 minutes after the initial attempt.
+ time_t t = time(0);
+ writelock lk("");
+ if( time(0)-t > 20 ) {
+ errmsg = "took a long time to get write lock, so not initiating. Initiate when server less busy?";
+ return false;
+ }
+ }
+
+ if( cmdObj["replSetReconfig"].type() != Object ) {
+ errmsg = "no configuration specified";
+ return false;
+ }
+
+ /** TODO
+ Support changes when a majority, but not all, members of a set are up.
+ Determine what changes should not be allowed as they would cause erroneous states.
+ What should be possible when a majority is not up?
+ */
+ try {
+ ReplSetConfig newConfig(cmdObj["replSetReconfig"].Obj());
+
+ log() << "replSet replSetReconfig config object parses ok, " << newConfig.members.size() << " members specified" << rsLog;
+
+ if( !ReplSetConfig::legalChange(theReplSet->getConfig(), newConfig, errmsg) ) {
+ return false;
+ }
+
+ checkMembersUpForConfigChange(newConfig,false);
+
+ log() << "replSet replSetReconfig [2]" << rsLog;
+
+ theReplSet->haveNewConfig(newConfig, true);
+ ReplSet::startupStatusMsg = "replSetReconfig'd";
+ }
+ catch( DBException& e ) {
+ log() << "replSet replSetReconfig exception: " << e.what() << rsLog;
+ throw;
+ }
+
+ return true;
+ }
+ } cmdReplSetReconfig;
+
+ class CmdReplSetFreeze : public ReplSetCommand {
+ public:
+ virtual void help( stringstream &help ) const {
+ help << "Enable / disable failover for the set - locks current primary as primary even if issues occur.\nFor use during system maintenance.\n";
+ help << "{ replSetFreeze : <bool> }";
+ help << "\nhttp://www.mongodb.org/display/DOCS/Replica+Set+Commands";
+ }
+
+ CmdReplSetFreeze() : ReplSetCommand("replSetFreeze") { }
+ virtual bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl) {
+ if( !check(errmsg, result) )
+ return false;
+ errmsg = "not yet implemented"; /*TODO*/
+ return false;
+ }
+ } cmdReplSetFreeze;
+
+ class CmdReplSetStepDown: public ReplSetCommand {
+ public:
+ virtual void help( stringstream &help ) const {
+ help << "Step down as primary. Will not try to reelect self or 1 minute.\n";
+ help << "(If another member with same priority takes over in the meantime, it will stay primary.)\n";
+ help << "http://www.mongodb.org/display/DOCS/Replica+Set+Commands";
+ }
+
+ CmdReplSetStepDown() : ReplSetCommand("replSetStepDown") { }
+ virtual bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl) {
+ if( !check(errmsg, result) )
+ return false;
+ if( !theReplSet->box.getState().primary() ) {
+ errmsg = "not primary so can't step down";
+ return false;
+ }
+ return theReplSet->stepDown();
+ }
+ } cmdReplSetStepDown;
+
+ using namespace bson;
+ using namespace mongoutils::html;
+ extern void fillRsLog(stringstream&);
+
+ class ReplSetHandler : public DbWebHandler {
+ public:
+ ReplSetHandler() : DbWebHandler( "_replSet" , 1 , true ){}
+
+ virtual bool handles( const string& url ) const {
+ return startsWith( url , "/_replSet" );
+ }
+
+ virtual void handle( const char *rq, string url,
+ string& responseMsg, int& responseCode,
+ vector<string>& headers, const SockAddr &from ){
+
+ string s = str::after(url, "/_replSetOplog?");
+ if( !s.empty() )
+ responseMsg = _replSetOplog(s);
+ else
+ responseMsg = _replSet();
+ responseCode = 200;
+ }
+
+
+ string _replSetOplog(string parms) {
+ stringstream s;
+ string t = "Replication oplog";
+ s << start(t);
+ s << p(t);
+
+ if( theReplSet == 0 ) {
+ if( cmdLine._replSet.empty() )
+ s << p("Not using --replSet");
+ else {
+ s << p("Still starting up, or else set is not yet " + a("http://www.mongodb.org/display/DOCS/Replica+Set+Configuration#InitialSetup", "", "initiated")
+ + ".<br>" + ReplSet::startupStatusMsg);
+ }
+ }
+ else {
+ try {
+ theReplSet->getOplogDiagsAsHtml(stringToNum(parms.c_str()), s);
+ }
+ catch(std::exception& e) {
+ s << "error querying oplog: " << e.what() << '\n';
+ }
+ }
+
+ s << _end();
+ return s.str();
+ }
+
+ /* /_replSet show replica set status in html format */
+ string _replSet() {
+ stringstream s;
+ s << start("Replica Set Status " + prettyHostName());
+ s << p( a("/", "back", "Home") + " | " +
+ a("/local/system.replset/?html=1", "", "View Replset Config") + " | " +
+ a("/replSetGetStatus?text", "", "replSetGetStatus") + " | " +
+ a("http://www.mongodb.org/display/DOCS/Replica+Sets", "", "Docs")
+ );
+
+ if( theReplSet == 0 ) {
+ if( cmdLine._replSet.empty() )
+ s << p("Not using --replSet");
+ else {
+ s << p("Still starting up, or else set is not yet " + a("http://www.mongodb.org/display/DOCS/Replica+Set+Configuration#InitialSetup", "", "initiated")
+ + ".<br>" + ReplSet::startupStatusMsg);
+ }
+ }
+ else {
+ try {
+ theReplSet->summarizeAsHtml(s);
+ }
+ catch(...) { s << "error summarizing replset status\n"; }
+ }
+ s << p("Recent replset log activity:");
+ fillRsLog(s);
+ s << _end();
+ return s.str();
+ }
+
+
+
+ } replSetHandler;
+
+}
diff --git a/db/repl/rs.cpp b/db/repl/rs.cpp
new file mode 100644
index 0000000..3e12e42
--- /dev/null
+++ b/db/repl/rs.cpp
@@ -0,0 +1,500 @@
+/**
+* Copyright (C) 2008 10gen Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "pch.h"
+#include "../cmdline.h"
+#include "../../util/sock.h"
+#include "../client.h"
+#include "../../client/dbclient.h"
+#include "../dbhelpers.h"
+#include "rs.h"
+
+namespace mongo {
+
+ using namespace bson;
+
+ bool replSet = false;
+ ReplSet *theReplSet = 0;
+ extern string *discoveredSeed;
+
+ void ReplSetImpl::sethbmsg(string s, int logLevel) {
+ static time_t lastLogged;
+ if( s == _hbmsg ) {
+ // unchanged
+ if( time(0)-lastLogged < 60 )
+ return;
+ }
+
+ unsigned sz = s.size();
+ if( sz >= 256 )
+ memcpy(_hbmsg, s.c_str(), 255);
+ else {
+ _hbmsg[sz] = 0;
+ memcpy(_hbmsg, s.c_str(), sz);
+ }
+ if( !s.empty() ) {
+ lastLogged = time(0);
+ log(logLevel) << "replSet " << s << rsLog;
+ }
+ }
+
+ void ReplSetImpl::assumePrimary() {
+ assert( iAmPotentiallyHot() );
+ writelock lk("admin."); // so we are synchronized with _logOp()
+ box.setSelfPrimary(_self);
+ log() << "replSet PRIMARY" << rsLog; // self (" << _self->id() << ") is now primary" << rsLog;
+ }
+
+ void ReplSetImpl::changeState(MemberState s) { box.change(s, _self); }
+
+ void ReplSetImpl::relinquish() {
+ if( box.getState().primary() ) {
+ changeState(MemberState::RS_RECOVERING);
+ log() << "replSet info relinquished primary state" << rsLog;
+ }
+ else if( box.getState().startup2() ) {
+ // ? add comment
+ changeState(MemberState::RS_RECOVERING);
+ }
+ }
+
+ /* look freshly for who is primary - includes relinquishing ourself. */
+ void ReplSetImpl::forgetPrimary() {
+ if( box.getState().primary() )
+ relinquish();
+ else {
+ box.setOtherPrimary(0);
+ }
+ }
+
+ bool ReplSetImpl::_stepDown() {
+ lock lk(this);
+ if( box.getState().primary() ) {
+ changeState(MemberState::RS_RECOVERING);
+ elect.steppedDown = time(0) + 60;
+ log() << "replSet info stepped down as primary" << rsLog;
+ return true;
+ }
+ return false;
+ }
+
+ void ReplSetImpl::msgUpdateHBInfo(HeartbeatInfo h) {
+ for( Member *m = _members.head(); m; m=m->next() ) {
+ if( m->id() == h.id() ) {
+ m->_hbinfo = h;
+ return;
+ }
+ }
+ }
+
+ list<HostAndPort> ReplSetImpl::memberHostnames() const {
+ list<HostAndPort> L;
+ L.push_back(_self->h());
+ for( Member *m = _members.head(); m; m = m->next() )
+ L.push_back(m->h());
+ return L;
+ }
+
+ void ReplSetImpl::_fillIsMasterHost(const Member *m, vector<string>& hosts, vector<string>& passives, vector<string>& arbiters) {
+ if( m->potentiallyHot() ) {
+ hosts.push_back(m->h().toString());
+ }
+ else if( !m->config().arbiterOnly ) {
+ passives.push_back(m->h().toString());
+ }
+ else {
+ arbiters.push_back(m->h().toString());
+ }
+ }
+
+ void ReplSetImpl::_fillIsMaster(BSONObjBuilder& b) {
+ const StateBox::SP sp = box.get();
+ bool isp = sp.state.primary();
+ b.append("ismaster", isp);
+ b.append("secondary", sp.state.secondary());
+ {
+ vector<string> hosts, passives, arbiters;
+ _fillIsMasterHost(_self, hosts, passives, arbiters);
+
+ for( Member *m = _members.head(); m; m = m->next() ) {
+ _fillIsMasterHost(m, hosts, passives, arbiters);
+ }
+
+ if( hosts.size() > 0 ) {
+ b.append("hosts", hosts);
+ }
+ if( passives.size() > 0 ) {
+ b.append("passives", passives);
+ }
+ if( arbiters.size() > 0 ) {
+ b.append("arbiters", arbiters);
+ }
+ }
+
+ if( !isp ) {
+ const Member *m = sp.primary;
+ if( m )
+ b.append("primary", m->h().toString());
+ }
+ if( myConfig().arbiterOnly )
+ b.append("arbiterOnly", true);
+ }
+
+ /** @param cfgString <setname>/<seedhost1>,<seedhost2> */
+
+ void parseReplsetCmdLine(string cfgString, string& setname, vector<HostAndPort>& seeds, set<HostAndPort>& seedSet ) {
+ const char *p = cfgString.c_str();
+ const char *slash = strchr(p, '/');
+ if( slash )
+ setname = string(p, slash-p);
+ else
+ setname = p;
+ uassert(13093, "bad --replSet config string format is: <setname>[/<seedhost1>,<seedhost2>,...]", !setname.empty());
+
+ if( slash == 0 )
+ return;
+
+ p = slash + 1;
+ while( 1 ) {
+ const char *comma = strchr(p, ',');
+ if( comma == 0 ) comma = strchr(p,0);
+ if( p == comma )
+ break;
+ {
+ HostAndPort m;
+ try {
+ m = HostAndPort( string(p, comma-p) );
+ }
+ catch(...) {
+ uassert(13114, "bad --replSet seed hostname", false);
+ }
+ uassert(13096, "bad --replSet command line config string - dups?", seedSet.count(m) == 0 );
+ seedSet.insert(m);
+ //uassert(13101, "can't use localhost in replset host list", !m.isLocalHost());
+ if( m.isSelf() ) {
+ log(1) << "replSet ignoring seed " << m.toString() << " (=self)" << rsLog;
+ } else
+ seeds.push_back(m);
+ if( *comma == 0 )
+ break;
+ p = comma + 1;
+ }
+ }
+ }
+
+ ReplSetImpl::ReplSetImpl(ReplSetCmdline& replSetCmdline) : elect(this),
+ _self(0),
+ mgr( new Manager(this) )
+ {
+ memset(_hbmsg, 0, sizeof(_hbmsg));
+ *_hbmsg = '.'; // temp...just to see
+ lastH = 0;
+ changeState(MemberState::RS_STARTUP);
+
+ _seeds = &replSetCmdline.seeds;
+ //for( vector<HostAndPort>::iterator i = seeds->begin(); i != seeds->end(); i++ )
+ // addMemberIfMissing(*i);
+
+ log(1) << "replSet beginning startup..." << rsLog;
+
+ loadConfig();
+
+ unsigned sss = replSetCmdline.seedSet.size();
+ for( Member *m = head(); m; m = m->next() ) {
+ replSetCmdline.seedSet.erase(m->h());
+ }
+ for( set<HostAndPort>::iterator i = replSetCmdline.seedSet.begin(); i != replSetCmdline.seedSet.end(); i++ ) {
+ if( i->isSelf() ) {
+ if( sss == 1 )
+ log(1) << "replSet warning self is listed in the seed list and there are no other seeds listed did you intend that?" << rsLog;
+ } else
+ log() << "replSet warning command line seed " << i->toString() << " is not present in the current repl set config" << rsLog;
+ }
+ }
+
+ void newReplUp();
+
+ void ReplSetImpl::loadLastOpTimeWritten() {
+ //assert( lastOpTimeWritten.isNull() );
+ readlock lk(rsoplog);
+ BSONObj o;
+ if( Helpers::getLast(rsoplog, o) ) {
+ lastH = o["h"].numberLong();
+ lastOpTimeWritten = o["ts"]._opTime();
+ uassert(13290, "bad replSet oplog entry?", !lastOpTimeWritten.isNull());
+ }
+ }
+
+ /* call after constructing to start - returns fairly quickly after launching its threads */
+ void ReplSetImpl::_go() {
+ try {
+ loadLastOpTimeWritten();
+ }
+ catch(std::exception& e) {
+ log() << "replSet ERROR FATAL couldn't query the local " << rsoplog << " collection. Terminating mongod after 30 seconds." << rsLog;
+ log() << e.what() << rsLog;
+ sleepsecs(30);
+ dbexit( EXIT_REPLICATION_ERROR );
+ return;
+ }
+
+ changeState(MemberState::RS_STARTUP2);
+ startThreads();
+ newReplUp(); // oplog.cpp
+ }
+
+ ReplSetImpl::StartupStatus ReplSetImpl::startupStatus = PRESTART;
+ string ReplSetImpl::startupStatusMsg;
+
+ // true if ok; throws if config really bad; false if config doesn't include self
+ bool ReplSetImpl::initFromConfig(ReplSetConfig& c) {
+ lock lk(this);
+
+ {
+ int me = 0;
+ for( vector<ReplSetConfig::MemberCfg>::iterator i = c.members.begin(); i != c.members.end(); i++ ) {
+ const ReplSetConfig::MemberCfg& m = *i;
+ if( m.h.isSelf() ) {
+ me++;
+ }
+ }
+ if( me == 0 ) {
+ // log() << "replSet config : " << _cfg->toString() << rsLog;
+ log() << "replSet warning can't find self in the repl set configuration:" << rsLog;
+ log() << c.toString() << rsLog;
+ return false;
+ }
+ uassert( 13302, "replSet error self appears twice in the repl set configuration", me<=1 );
+ }
+
+ _cfg = new ReplSetConfig(c);
+ assert( _cfg->ok() );
+ assert( _name.empty() || _name == _cfg->_id );
+ _name = _cfg->_id;
+ assert( !_name.empty() );
+
+ // start with no members. if this is a reconfig, drop the old ones.
+ _members.orphanAll();
+
+ endOldHealthTasks();
+
+ int oldPrimaryId = -1;
+ {
+ const Member *p = box.getPrimary();
+ if( p )
+ oldPrimaryId = p->id();
+ }
+ forgetPrimary();
+ _self = 0;
+ for( vector<ReplSetConfig::MemberCfg>::iterator i = _cfg->members.begin(); i != _cfg->members.end(); i++ ) {
+ const ReplSetConfig::MemberCfg& m = *i;
+ Member *mi;
+ if( m.h.isSelf() ) {
+ assert( _self == 0 );
+ mi = _self = new Member(m.h, m._id, &m, true);
+ if( (int)mi->id() == oldPrimaryId )
+ box.setSelfPrimary(mi);
+ } else {
+ mi = new Member(m.h, m._id, &m, false);
+ _members.push(mi);
+ startHealthTaskFor(mi);
+ if( (int)mi->id() == oldPrimaryId )
+ box.setOtherPrimary(mi);
+ }
+ }
+ return true;
+ }
+
+ // Our own config must be the first one.
+ bool ReplSetImpl::_loadConfigFinish(vector<ReplSetConfig>& cfgs) {
+ int v = -1;
+ ReplSetConfig *highest = 0;
+ int myVersion = -2000;
+ int n = 0;
+ for( vector<ReplSetConfig>::iterator i = cfgs.begin(); i != cfgs.end(); i++ ) {
+ ReplSetConfig& cfg = *i;
+ if( ++n == 1 ) myVersion = cfg.version;
+ if( cfg.ok() && cfg.version > v ) {
+ highest = &cfg;
+ v = cfg.version;
+ }
+ }
+ assert( highest );
+
+ if( !initFromConfig(*highest) )
+ return false;
+
+ if( highest->version > myVersion && highest->version >= 0 ) {
+ log() << "replSet got config version " << highest->version << " from a remote, saving locally" << rsLog;
+ writelock lk("admin.");
+ highest->saveConfigLocally(BSONObj());
+ }
+ return true;
+ }
+
+ void ReplSetImpl::loadConfig() {
+ while( 1 ) {
+ startupStatus = LOADINGCONFIG;
+ startupStatusMsg = "loading " + rsConfigNs + " config (LOADINGCONFIG)";
+ try {
+ vector<ReplSetConfig> configs;
+ try {
+ configs.push_back( ReplSetConfig(HostAndPort::me()) );
+ }
+ catch(DBException& e) {
+ log() << "replSet exception loading our local replset configuration object : " << e.toString() << rsLog;
+ throw;
+ }
+ for( vector<HostAndPort>::const_iterator i = _seeds->begin(); i != _seeds->end(); i++ ) {
+ try {
+ configs.push_back( ReplSetConfig(*i) );
+ }
+ catch( DBException& e ) {
+ log() << "replSet exception trying to load config from " << *i << " : " << e.toString() << rsLog;
+ }
+ }
+
+ if( discoveredSeed ) {
+ try {
+ configs.push_back( ReplSetConfig(HostAndPort(*discoveredSeed)) );
+ }
+ catch( DBException& ) {
+ log(1) << "replSet exception trying to load config from discovered seed " << *discoveredSeed << rsLog;
+ }
+ }
+
+ int nok = 0;
+ int nempty = 0;
+ for( vector<ReplSetConfig>::iterator i = configs.begin(); i != configs.end(); i++ ) {
+ if( i->ok() )
+ nok++;
+ if( i->empty() )
+ nempty++;
+ }
+ if( nok == 0 ) {
+
+ if( nempty == (int) configs.size() ) {
+ startupStatus = EMPTYCONFIG;
+ startupStatusMsg = "can't get " + rsConfigNs + " config from self or any seed (EMPTYCONFIG)";
+ log() << "replSet can't get " << rsConfigNs << " config from self or any seed (EMPTYCONFIG)" << rsLog;
+ log(1) << "replSet have you ran replSetInitiate yet?" << rsLog;
+ if( _seeds->size() == 0 )
+ log(1) << "replSet info no seed hosts were specified on the --replSet command line" << rsLog;
+ }
+ else {
+ startupStatus = EMPTYUNREACHABLE;
+ startupStatusMsg = "can't currently get " + rsConfigNs + " config from self or any seed (EMPTYUNREACHABLE)";
+ log() << "replSet can't get " << rsConfigNs << " config from self or any seed (yet)" << rsLog;
+ }
+
+ sleepsecs(10);
+ continue;
+ }
+
+ if( !_loadConfigFinish(configs) ) {
+ log() << "replSet info Couldn't load config yet. Sleeping 20sec and will try again." << rsLog;
+ sleepsecs(20);
+ continue;
+ }
+ }
+ catch(DBException& e) {
+ startupStatus = BADCONFIG;
+ startupStatusMsg = "replSet error loading set config (BADCONFIG)";
+ log() << "replSet error loading configurations " << e.toString() << rsLog;
+ log() << "replSet error replication will not start" << rsLog;
+ _fatal();
+ throw;
+ }
+ break;
+ }
+ startupStatusMsg = "? started";
+ startupStatus = STARTED;
+ }
+
+ void ReplSetImpl::_fatal()
+ {
+ //lock l(this);
+ box.set(MemberState::RS_FATAL, 0);
+ sethbmsg("fatal error");
+ log() << "replSet error fatal error, stopping replication" << rsLog;
+ }
+
+
+ void ReplSet::haveNewConfig(ReplSetConfig& newConfig, bool addComment) {
+ lock l(this); // convention is to lock replset before taking the db rwlock
+ writelock lk("");
+ bo comment;
+ if( addComment )
+ comment = BSON( "msg" << "Reconfig set" << "version" << newConfig.version );
+ newConfig.saveConfigLocally(comment);
+ try {
+ initFromConfig(newConfig);
+ log() << "replSet replSetReconfig new config saved locally" << rsLog;
+ }
+ catch(DBException& e) {
+ log() << "replSet error unexpected exception in haveNewConfig() : " << e.toString() << rsLog;
+ _fatal();
+ }
+ catch(...) {
+ log() << "replSet error unexpected exception in haveNewConfig()" << rsLog;
+ _fatal();
+ }
+ }
+
+ void Manager::msgReceivedNewConfig(BSONObj o) {
+ log() << "replset msgReceivedNewConfig version: " << o["version"].toString() << rsLog;
+ ReplSetConfig c(o);
+ if( c.version > rs->config().version )
+ theReplSet->haveNewConfig(c, false);
+ else {
+ log() << "replSet info msgReceivedNewConfig but version isn't higher " <<
+ c.version << ' ' << rs->config().version << rsLog;
+ }
+ }
+
+ /* forked as a thread during startup
+ it can run quite a while looking for config. but once found,
+ a separate thread takes over as ReplSetImpl::Manager, and this thread
+ terminates.
+ */
+ void startReplSets(ReplSetCmdline *replSetCmdline) {
+ Client::initThread("startReplSets");
+ try {
+ assert( theReplSet == 0 );
+ if( replSetCmdline == 0 ) {
+ assert(!replSet);
+ return;
+ }
+ (theReplSet = new ReplSet(*replSetCmdline))->go();
+ }
+ catch(std::exception& e) {
+ log() << "replSet caught exception in startReplSets thread: " << e.what() << rsLog;
+ if( theReplSet )
+ theReplSet->fatal();
+ }
+ cc().shutdown();
+ }
+
+}
+
+namespace boost {
+
+ void assertion_failed(char const * expr, char const * function, char const * file, long line)
+ {
+ mongo::log() << "boost assertion failure " << expr << ' ' << function << ' ' << file << ' ' << line << endl;
+ }
+
+}
diff --git a/db/repl/rs.h b/db/repl/rs.h
new file mode 100644
index 0000000..17a070c
--- /dev/null
+++ b/db/repl/rs.h
@@ -0,0 +1,415 @@
+// /db/repl/rs.h
+
+/**
+* Copyright (C) 2008 10gen Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#pragma once
+
+#include "../../util/concurrency/list.h"
+#include "../../util/concurrency/value.h"
+#include "../../util/concurrency/msg.h"
+#include "../../util/hostandport.h"
+#include "../commands.h"
+#include "rs_exception.h"
+#include "rs_optime.h"
+#include "rs_member.h"
+#include "rs_config.h"
+
+namespace mongo {
+
+ struct HowToFixUp;
+ struct Target;
+ class DBClientConnection;
+ class ReplSetImpl;
+ class OplogReader;
+ extern bool replSet; // true if using repl sets
+ extern class ReplSet *theReplSet; // null until initialized
+ extern Tee *rsLog;
+
+ /* member of a replica set */
+ class Member : public List1<Member>::Base {
+ public:
+ Member(HostAndPort h, unsigned ord, const ReplSetConfig::MemberCfg *c, bool self);
+ string fullName() const { return h().toString(); }
+ const ReplSetConfig::MemberCfg& config() const { return *_config; }
+ const HeartbeatInfo& hbinfo() const { return _hbinfo; }
+ string lhb() { return _hbinfo.lastHeartbeatMsg; }
+ MemberState state() const { return _hbinfo.hbstate; }
+ const HostAndPort& h() const { return _h; }
+ unsigned id() const { return _hbinfo.id(); }
+ bool potentiallyHot() const { return _config->potentiallyHot(); } // not arbiter, not priority 0
+
+ void summarizeMember(stringstream& s) const;
+ friend class ReplSetImpl;
+ private:
+ const ReplSetConfig::MemberCfg *_config; /* todo: when this changes??? */
+ HostAndPort _h;
+ HeartbeatInfo _hbinfo;
+ };
+
+ class Manager : public task::Server {
+ ReplSetImpl *rs;
+ bool busyWithElectSelf;
+ int _primary;
+ const Member* findOtherPrimary();
+ void noteARemoteIsPrimary(const Member *);
+ virtual void starting();
+ public:
+ Manager(ReplSetImpl *rs);
+ ~Manager();
+ void msgReceivedNewConfig(BSONObj);
+ void msgCheckNewState();
+ };
+
+ struct Target;
+
+ class Consensus {
+ ReplSetImpl &rs;
+ struct LastYea {
+ LastYea() : when(0), who(0xffffffff) { }
+ time_t when;
+ unsigned who;
+ };
+ Atomic<LastYea> ly;
+ unsigned yea(unsigned memberId); // throws VoteException
+ void electionFailed(unsigned meid);
+ void _electSelf();
+ bool weAreFreshest(bool& allUp, int& nTies);
+ bool sleptLast; // slept last elect() pass
+ public:
+ Consensus(ReplSetImpl *t) : rs(*t) {
+ sleptLast = false;
+ steppedDown = 0;
+ }
+
+ /* if we've stepped down, this is when we are allowed to try to elect ourself again.
+ todo: handle possible weirdnesses at clock skews etc.
+ */
+ time_t steppedDown;
+
+ int totalVotes() const;
+ bool aMajoritySeemsToBeUp() const;
+ void electSelf();
+ void electCmdReceived(BSONObj, BSONObjBuilder*);
+ void multiCommand(BSONObj cmd, list<Target>& L);
+ };
+
+ /** most operations on a ReplSet object should be done while locked. that logic implemented here. */
+ class RSBase : boost::noncopyable {
+ public:
+ const unsigned magic;
+ void assertValid() { assert( magic == 0x12345677 ); }
+ private:
+ mutex m;
+ int _locked;
+ ThreadLocalValue<bool> _lockedByMe;
+ protected:
+ RSBase() : magic(0x12345677), m("RSBase"), _locked(0) { }
+ ~RSBase() {
+ log() << "~RSBase should never be called?" << rsLog;
+ assert(false);
+ }
+
+ class lock {
+ RSBase& rsbase;
+ auto_ptr<scoped_lock> sl;
+ public:
+ lock(RSBase* b) : rsbase(*b) {
+ if( rsbase._lockedByMe.get() )
+ return; // recursive is ok...
+
+ sl.reset( new scoped_lock(rsbase.m) );
+ DEV assert(rsbase._locked == 0);
+ rsbase._locked++;
+ rsbase._lockedByMe.set(true);
+ }
+ ~lock() {
+ if( sl.get() ) {
+ assert( rsbase._lockedByMe.get() );
+ DEV assert(rsbase._locked == 1);
+ rsbase._lockedByMe.set(false);
+ rsbase._locked--;
+ }
+ }
+ };
+
+ public:
+ /* for asserts */
+ bool locked() const { return _locked != 0; }
+
+ /* if true, is locked, and was locked by this thread. note if false, it could be in the lock or not for another
+ just for asserts & such so we can make the contracts clear on who locks what when.
+ we don't use these locks that frequently, so the little bit of overhead is fine.
+ */
+ bool lockedByMe() { return _lockedByMe.get(); }
+ };
+
+ class ReplSetHealthPollTask;
+
+ /* safe container for our state that keeps member pointer and state variables always aligned */
+ class StateBox : boost::noncopyable {
+ public:
+ struct SP { // SP is like pair<MemberState,const Member *> but nicer
+ SP() : state(MemberState::RS_STARTUP), primary(0) { }
+ MemberState state;
+ const Member *primary;
+ };
+ const SP get() {
+ scoped_lock lk(m);
+ return sp;
+ }
+ MemberState getState() const { return sp.state; }
+ const Member* getPrimary() const { return sp.primary; }
+ void change(MemberState s, const Member *self) {
+ scoped_lock lk(m);
+ sp.state = s;
+ if( s.primary() ) {
+ sp.primary = self;
+ }
+ else {
+ if( self == sp.primary )
+ sp.primary = 0;
+ }
+ }
+ void set(MemberState s, const Member *p) {
+ scoped_lock lk(m);
+ sp.state = s; sp.primary = p;
+ }
+ void setSelfPrimary(const Member *self) { change(MemberState::RS_PRIMARY, self); }
+ void setOtherPrimary(const Member *mem) {
+ scoped_lock lk(m);
+ assert( !sp.state.primary() );
+ sp.primary = mem;
+ }
+ StateBox() : m("StateBox") { }
+ private:
+ mutex m;
+ SP sp;
+ };
+
+ void parseReplsetCmdLine(string cfgString, string& setname, vector<HostAndPort>& seeds, set<HostAndPort>& seedSet );
+
+ /** Parameter given to the --replSet command line option (parsed).
+ Syntax is "<setname>/<seedhost1>,<seedhost2>"
+ where setname is a name and seedhost is "<host>[:<port>]" */
+ class ReplSetCmdline {
+ public:
+ ReplSetCmdline(string cfgString) { parseReplsetCmdLine(cfgString, setname, seeds, seedSet); }
+ string setname;
+ vector<HostAndPort> seeds;
+ set<HostAndPort> seedSet;
+ };
+
+ /* information about the entire repl set, such as the various servers in the set, and their state */
+ /* note: We currently do not free mem when the set goes away - it is assumed the replset is a
+ singleton and long lived.
+ */
+ class ReplSetImpl : protected RSBase {
+ public:
+ /** info on our state if the replset isn't yet "up". for example, if we are pre-initiation. */
+ enum StartupStatus {
+ PRESTART=0, LOADINGCONFIG=1, BADCONFIG=2, EMPTYCONFIG=3,
+ EMPTYUNREACHABLE=4, STARTED=5, SOON=6
+ };
+ static StartupStatus startupStatus;
+ static string startupStatusMsg;
+ static string stateAsStr(MemberState state);
+ static string stateAsHtml(MemberState state);
+
+ /* todo thread */
+ void msgUpdateHBInfo(HeartbeatInfo);
+
+ StateBox box;
+
+ OpTime lastOpTimeWritten;
+ long long lastH; // hash we use to make sure we are reading the right flow of ops and aren't on an out-of-date "fork"
+ private:
+ set<ReplSetHealthPollTask*> healthTasks;
+ void endOldHealthTasks();
+ void startHealthTaskFor(Member *m);
+
+ private:
+ Consensus elect;
+ bool ok() const { return !box.getState().fatal(); }
+
+ void relinquish();
+ void forgetPrimary();
+
+ protected:
+ bool _stepDown();
+ private:
+ void assumePrimary();
+ void loadLastOpTimeWritten();
+ void changeState(MemberState s);
+
+ protected:
+ // "heartbeat message"
+ // sent in requestHeartbeat respond in field "hbm"
+ char _hbmsg[256]; // we change this unlocked, thus not an stl::string
+ public:
+ void sethbmsg(string s, int logLevel = 0);
+ protected:
+ bool initFromConfig(ReplSetConfig& c); // true if ok; throws if config really bad; false if config doesn't include self
+ void _fillIsMaster(BSONObjBuilder&);
+ void _fillIsMasterHost(const Member*, vector<string>&, vector<string>&, vector<string>&);
+ const ReplSetConfig& config() { return *_cfg; }
+ string name() const { return _name; } /* @return replica set's logical name */
+ MemberState state() const { return box.getState(); }
+ void _fatal();
+ void _getOplogDiagsAsHtml(unsigned server_id, stringstream& ss) const;
+ void _summarizeAsHtml(stringstream&) const;
+ void _summarizeStatus(BSONObjBuilder&) const; // for replSetGetStatus command
+
+ /* throws exception if a problem initializing. */
+ ReplSetImpl(ReplSetCmdline&);
+
+ /* call afer constructing to start - returns fairly quickly after launching its threads */
+ void _go();
+
+ private:
+ string _name;
+ const vector<HostAndPort> *_seeds;
+ ReplSetConfig *_cfg;
+
+ /** load our configuration from admin.replset. try seed machines too.
+ @return true if ok; throws if config really bad; false if config doesn't include self
+ */
+ bool _loadConfigFinish(vector<ReplSetConfig>& v);
+ void loadConfig();
+
+ list<HostAndPort> memberHostnames() const;
+ const ReplSetConfig::MemberCfg& myConfig() const { return _self->config(); }
+ bool iAmArbiterOnly() const { return myConfig().arbiterOnly; }
+ bool iAmPotentiallyHot() const { return myConfig().potentiallyHot(); }
+ protected:
+ Member *_self;
+ private:
+ List1<Member> _members; /* all members of the set EXCEPT self. */
+
+ public:
+ unsigned selfId() const { return _self->id(); }
+ Manager *mgr;
+
+ private:
+ Member* head() const { return _members.head(); }
+ public:
+ const Member* findById(unsigned id) const;
+ private:
+ void _getTargets(list<Target>&, int &configVersion);
+ void getTargets(list<Target>&, int &configVersion);
+ void startThreads();
+ friend class FeedbackThread;
+ friend class CmdReplSetElect;
+ friend class Member;
+ friend class Manager;
+ friend class Consensus;
+
+ private:
+ /* pulling data from primary related - see rs_sync.cpp */
+ bool initialSyncOplogApplication(string hn, const Member *primary, OpTime applyGTE, OpTime minValid);
+ void _syncDoInitialSync();
+ void syncDoInitialSync();
+ void _syncThread();
+ void syncTail();
+ void syncApply(const BSONObj &o);
+ void syncRollback(OplogReader& r);
+ void syncFixUp(HowToFixUp& h, OplogReader& r);
+ public:
+ void syncThread();
+ };
+
+ class ReplSet : public ReplSetImpl {
+ public:
+ ReplSet(ReplSetCmdline& replSetCmdline) : ReplSetImpl(replSetCmdline) { }
+
+ bool stepDown() { return _stepDown(); }
+
+ string selfFullName() {
+ lock lk(this);
+ return _self->fullName();
+ }
+
+ /* call after constructing to start - returns fairly quickly after la[unching its threads */
+ void go() { _go(); }
+ void fatal() { _fatal(); }
+ bool isPrimary();
+ bool isSecondary();
+ MemberState state() const { return ReplSetImpl::state(); }
+ string name() const { return ReplSetImpl::name(); }
+ const ReplSetConfig& config() { return ReplSetImpl::config(); }
+ void getOplogDiagsAsHtml(unsigned server_id, stringstream& ss) const { _getOplogDiagsAsHtml(server_id,ss); }
+ void summarizeAsHtml(stringstream& ss) const { _summarizeAsHtml(ss); }
+ void summarizeStatus(BSONObjBuilder& b) const { _summarizeStatus(b); }
+ void fillIsMaster(BSONObjBuilder& b) { _fillIsMaster(b); }
+
+ /* we have a new config (reconfig) - apply it.
+ @param comment write a no-op comment to the oplog about it. only makes sense if one is primary and initiating the reconf.
+ */
+ void haveNewConfig(ReplSetConfig& c, bool comment);
+
+ /* if we delete old configs, this needs to assure locking. currently we don't so it is ok. */
+ const ReplSetConfig& getConfig() { return config(); }
+
+ bool lockedByMe() { return RSBase::lockedByMe(); }
+
+ // heartbeat msg to send to others; descriptive diagnostic info
+ string hbmsg() const { return _hbmsg; }
+ };
+
+ /** base class for repl set commands. checks basic things such as in rs mode before the command
+ does its real work
+ */
+ class ReplSetCommand : public Command {
+ protected:
+ ReplSetCommand(const char * s, bool show=false) : Command(s, show) { }
+ virtual bool slaveOk() const { return true; }
+ virtual bool adminOnly() const { return true; }
+ virtual bool logTheOp() { return false; }
+ virtual LockType locktype() const { return NONE; }
+ virtual void help( stringstream &help ) const { help << "internal"; }
+ bool check(string& errmsg, BSONObjBuilder& result) {
+ if( !replSet ) {
+ errmsg = "not running with --replSet";
+ return false;
+ }
+ if( theReplSet == 0 ) {
+ result.append("startupStatus", ReplSet::startupStatus);
+ errmsg = ReplSet::startupStatusMsg.empty() ? "replset unknown error 2" : ReplSet::startupStatusMsg;
+ return false;
+ }
+ return true;
+ }
+ };
+
+ /** inlines ----------------- */
+
+ inline Member::Member(HostAndPort h, unsigned ord, const ReplSetConfig::MemberCfg *c, bool self) :
+ _config(c), _h(h), _hbinfo(ord) {
+ if( self ) {
+ _hbinfo.health = 1.0;
+ }
+ }
+
+ inline bool ReplSet::isPrimary() {
+ /* todo replset */
+ return box.getState().primary();
+ }
+
+ inline bool ReplSet::isSecondary() {
+ return box.getState().secondary();
+ }
+
+}
diff --git a/db/repl/rs_config.cpp b/db/repl/rs_config.cpp
new file mode 100644
index 0000000..76b20a4
--- /dev/null
+++ b/db/repl/rs_config.cpp
@@ -0,0 +1,315 @@
+// rs_config.cpp
+
+/**
+* Copyright (C) 2008 10gen Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "pch.h"
+#include "rs.h"
+#include "../../client/dbclient.h"
+#include "../../client/syncclusterconnection.h"
+#include "../../util/hostandport.h"
+#include "../dbhelpers.h"
+#include "connections.h"
+#include "../oplog.h"
+
+using namespace bson;
+
+namespace mongo {
+
+ void logOpInitiate(const bo&);
+
+ list<HostAndPort> ReplSetConfig::otherMemberHostnames() const {
+ list<HostAndPort> L;
+ for( vector<MemberCfg>::const_iterator i = members.begin(); i != members.end(); i++ ) {
+ if( !i->h.isSelf() )
+ L.push_back(i->h);
+ }
+ return L;
+ }
+
+ /* comment MUST only be set when initiating the set by the initiator */
+ void ReplSetConfig::saveConfigLocally(bo comment) {
+ check();
+ log() << "replSet info saving a newer config version to local.system.replset" << rsLog;
+ {
+ writelock lk("");
+ Client::Context cx( rsConfigNs );
+ cx.db()->flushFiles(true);
+
+ //theReplSet->lastOpTimeWritten = ??;
+ //rather than above, do a logOp()? probably
+ BSONObj o = asBson();
+ Helpers::putSingletonGod(rsConfigNs.c_str(), o, false/*logOp=false; local db so would work regardless...*/);
+ if( !comment.isEmpty() )
+ logOpInitiate(comment);
+
+ cx.db()->flushFiles(true);
+ }
+ DEV log() << "replSet saveConfigLocally done" << rsLog;
+ }
+
+ /*static*/
+ /*void ReplSetConfig::receivedNewConfig(BSONObj cfg) {
+ if( theReplSet )
+ return; // this is for initial setup only, so far. todo
+
+ ReplSetConfig c(cfg);
+
+ writelock lk("admin.");
+ if( theReplSet )
+ return;
+ c.saveConfigLocally(bo());
+ }*/
+
+ bo ReplSetConfig::MemberCfg::asBson() const {
+ bob b;
+ b << "_id" << _id;
+ b.append("host", h.toString());
+ if( votes != 1 ) b << "votes" << votes;
+ if( priority != 1.0 ) b << "priority" << priority;
+ if( arbiterOnly ) b << "arbiterOnly" << true;
+ return b.obj();
+ }
+
+ bo ReplSetConfig::asBson() const {
+ bob b;
+ b.append("_id", _id).append("version", version);
+ if( !ho.isDefault() || !getLastErrorDefaults.isEmpty() ) {
+ bob settings;
+ if( !ho.isDefault() )
+ settings << "heartbeatConnRetries " << ho.heartbeatConnRetries <<
+ "heartbeatSleep" << ho.heartbeatSleepMillis / 1000 <<
+ "heartbeatTimeout" << ho.heartbeatTimeoutMillis / 1000;
+ if( !getLastErrorDefaults.isEmpty() )
+ settings << "getLastErrorDefaults" << getLastErrorDefaults;
+ b << "settings" << settings.obj();
+ }
+
+ BSONArrayBuilder a;
+ for( unsigned i = 0; i < members.size(); i++ )
+ a.append( members[i].asBson() );
+ b.append("members", a.arr());
+
+ return b.obj();
+ }
+
+ static inline void mchk(bool expr) {
+ uassert(13126, "bad Member config", expr);
+ }
+
+ void ReplSetConfig::MemberCfg::check() const{
+ mchk(_id >= 0 && _id <= 255);
+ mchk(priority >= 0 && priority <= 1000);
+ mchk(votes >= 0 && votes <= 100);
+ uassert(13419, "this version of mongod only supports priorities 0 and 1", priority == 0 || priority == 1);
+ }
+
+ /*static*/ bool ReplSetConfig::legalChange(const ReplSetConfig& o, const ReplSetConfig& n, string& errmsg) {
+ if( o._id != n._id ) {
+ errmsg = "set name may not change";
+ return false;
+ }
+ /* TODO : wonder if we need to allow o.version < n.version only, which is more lenient.
+ if someone had some intermediate config this node doesnt have, that could be
+ necessary. but then how did we become primary? so perhaps we are fine as-is.
+ */
+ if( o.version + 1 != n.version ) {
+ errmsg = "version number wrong";
+ return false;
+ }
+
+ /* TODO : MORE CHECKS HERE */
+
+ log() << "replSet TODO : don't allow removal of a node until we handle it at the removed node end?" << endl;
+ // we could change its votes to zero perhaps instead as a short term...
+
+ return true;
+ }
+
+ void ReplSetConfig::clear() {
+ version = -5;
+ _ok = false;
+ }
+
+ void ReplSetConfig::check() const {
+ uassert(13132,
+ "nonmatching repl set name in _id field; check --replSet command line",
+ _id == cmdLine.ourSetName());
+ uassert(13308, "replSet bad config version #", version > 0);
+ uassert(13133, "replSet bad config no members", members.size() >= 1);
+ uassert(13309, "replSet bad config maximum number of members is 7 (for now)", members.size() <= 7);
+ }
+
+ void ReplSetConfig::from(BSONObj o) {
+ md5 = o.md5();
+ _id = o["_id"].String();
+ if( o["version"].ok() ) {
+ version = o["version"].numberInt();
+ uassert(13115, "bad " + rsConfigNs + " config: version", version > 0);
+ }
+
+ if( o["settings"].ok() ) {
+ BSONObj settings = o["settings"].Obj();
+ if( settings["heartbeatConnRetries "].ok() )
+ ho.heartbeatConnRetries = settings["heartbeatConnRetries "].numberInt();
+ if( settings["heartbeatSleep"].ok() )
+ ho.heartbeatSleepMillis = (unsigned) (settings["heartbeatSleep"].Number() * 1000);
+ if( settings["heartbeatTimeout"].ok() )
+ ho.heartbeatTimeoutMillis = (unsigned) (settings["heartbeatTimeout"].Number() * 1000);
+ ho.check();
+ try { getLastErrorDefaults = settings["getLastErrorDefaults"].Obj(); } catch(...) { }
+ }
+
+ set<string> hosts;
+ set<int> ords;
+ vector<BSONElement> members;
+ try {
+ members = o["members"].Array();
+ }
+ catch(...) {
+ uasserted(13131, "replSet error parsing (or missing) 'members' field in config object");
+ }
+
+ unsigned localhosts = 0;
+ for( unsigned i = 0; i < members.size(); i++ ) {
+ BSONObj mobj = members[i].Obj();
+ MemberCfg m;
+ try {
+ try {
+ m._id = (int) mobj["_id"].Number();
+ } catch(...) {
+ /* TODO: use of string exceptions may be problematic for reconfig case! */
+ throw "_id must be numeric";
+ }
+ string s;
+ try {
+ s = mobj["host"].String();
+ m.h = HostAndPort(s);
+ }
+ catch(...) {
+ throw string("bad or missing host field? ") + mobj.toString();
+ }
+ if( m.h.isLocalHost() )
+ localhosts++;
+ m.arbiterOnly = mobj.getBoolField("arbiterOnly");
+ if( mobj.hasElement("priority") )
+ m.priority = mobj["priority"].Number();
+ if( mobj.hasElement("votes") )
+ m.votes = (unsigned) mobj["votes"].Number();
+ m.check();
+ }
+ catch( const char * p ) {
+ log() << "replSet cfg parsing exception for members[" << i << "] " << p << rsLog;
+ stringstream ss;
+ ss << "replSet members[" << i << "] " << p;
+ uassert(13107, ss.str(), false);
+ }
+ catch(DBException& e) {
+ log() << "replSet cfg parsing exception for members[" << i << "] " << e.what() << rsLog;
+ stringstream ss;
+ ss << "bad config for member[" << i << "] " << e.what();
+ uassert(13135, ss.str(), false);
+ }
+ if( !(ords.count(m._id) == 0 && hosts.count(m.h.toString()) == 0) ) {
+ log() << "replSet " << o.toString() << rsLog;
+ uassert(13108, "bad replset config -- duplicate hosts in the config object?", false);
+ }
+ hosts.insert(m.h.toString());
+ ords.insert(m._id);
+ this->members.push_back(m);
+ }
+ uassert(13393, "can't use localhost in repl set member names except when using it for all members", localhosts == 0 || localhosts == members.size());
+ uassert(13117, "bad " + rsConfigNs + " config", !_id.empty());
+ }
+
+ static inline void configAssert(bool expr) {
+ uassert(13122, "bad repl set config?", expr);
+ }
+
+ ReplSetConfig::ReplSetConfig(BSONObj cfg) {
+ clear();
+ from(cfg);
+ configAssert( version < 0 /*unspecified*/ || (version >= 1 && version <= 5000) );
+ if( version < 1 )
+ version = 1;
+ _ok = true;
+ }
+
+ ReplSetConfig::ReplSetConfig(const HostAndPort& h) {
+ clear();
+ int level = 2;
+ DEV level = 0;
+ //log(0) << "replSet load config from: " << h.toString() << rsLog;
+
+ auto_ptr<DBClientCursor> c;
+ int v = -5;
+ try {
+ if( h.isSelf() ) {
+ ;
+ }
+ else {
+ /* first, make sure other node is configured to be a replset. just to be safe. */
+ string setname = cmdLine.ourSetName();
+ BSONObj cmd = BSON( "replSetHeartbeat" << setname );
+ int theirVersion;
+ BSONObj info;
+ bool ok = requestHeartbeat(setname, "", h.toString(), info, -2, theirVersion);
+ if( info["rs"].trueValue() ) {
+ // yes, it is a replicate set, although perhaps not yet initialized
+ }
+ else {
+ if( !ok ) {
+ log() << "replSet TEMP !ok heartbeating " << h.toString() << " on cfg load" << rsLog;
+ if( !info.isEmpty() )
+ log() << "replSet info " << h.toString() << " : " << info.toString() << rsLog;
+ return;
+ }
+ {
+ stringstream ss;
+ ss << "replSet error: member " << h.toString() << " is not in --replSet mode";
+ msgassertedNoTrace(13260, ss.str().c_str()); // not caught as not a user exception - we want it not caught
+ //for python err# checker: uassert(13260, "", false);
+ }
+ }
+ }
+
+ v = -4;
+ ScopedConn conn(h.toString());
+ v = -3;
+ c = conn->query(rsConfigNs);
+ if( c.get() == 0 ) {
+ version = v; return;
+ }
+ if( !c->more() ) {
+ version = EMPTYCONFIG;
+ return;
+ }
+ version = -1;
+ }
+ catch( DBException& e) {
+ version = v;
+ log(level) << "replSet load config couldn't get from " << h.toString() << ' ' << e.what() << rsLog;
+ return;
+ }
+
+ BSONObj o = c->nextSafe();
+ uassert(13109, "multiple rows in " + rsConfigNs + " not supported", !c->more());
+ from(o);
+ _ok = true;
+ log(level) << "replSet load config ok from " << (h.isSelf() ? "self" : h.toString()) << rsLog;
+ }
+
+}
diff --git a/db/repl/rs_config.h b/db/repl/rs_config.h
new file mode 100644
index 0000000..38df772
--- /dev/null
+++ b/db/repl/rs_config.h
@@ -0,0 +1,88 @@
+// rs_config.h
+// repl set configuration
+//
+
+/**
+* Copyright (C) 2008 10gen Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#pragma once
+
+#include "../../util/hostandport.h"
+#include "health.h"
+
+namespace mongo {
+
+ /* singleton config object is stored here */
+ const string rsConfigNs = "local.system.replset";
+
+ class ReplSetConfig {
+ enum { EMPTYCONFIG = -2 };
+ public:
+ /* if something is misconfigured, throws an exception.
+ if couldn't be queried or is just blank, ok() will be false.
+ */
+ ReplSetConfig(const HostAndPort& h);
+
+ ReplSetConfig(BSONObj cfg);
+
+ bool ok() const { return _ok; }
+
+ struct MemberCfg {
+ MemberCfg() : _id(-1), votes(1), priority(1.0), arbiterOnly(false) { }
+ int _id; /* ordinal */
+ unsigned votes; /* how many votes this node gets. default 1. */
+ HostAndPort h;
+ double priority; /* 0 means can never be primary */
+ bool arbiterOnly;
+ void check() const; /* check validity, assert if not. */
+ BSONObj asBson() const;
+ bool potentiallyHot() const {
+ return !arbiterOnly && priority > 0;
+ }
+ };
+ vector<MemberCfg> members;
+ string _id;
+ int version;
+ HealthOptions ho;
+ string md5;
+ BSONObj getLastErrorDefaults;
+
+ list<HostAndPort> otherMemberHostnames() const; // except self
+
+ /** @return true if could connect, and there is no cfg object there at all */
+ bool empty() const { return version == EMPTYCONFIG; }
+
+ string toString() const { return asBson().toString(); }
+
+ /** validate the settings. does not call check() on each member, you have to do that separately. */
+ void check() const;
+
+ /** check if modification makes sense */
+ static bool legalChange(const ReplSetConfig& old, const ReplSetConfig& n, string& errmsg);
+
+ //static void receivedNewConfig(BSONObj);
+ void saveConfigLocally(BSONObj comment); // to local db
+ string saveConfigEverywhere(); // returns textual info on what happened
+
+ BSONObj asBson() const;
+
+ private:
+ bool _ok;
+ void from(BSONObj);
+ void clear();
+ };
+
+}
diff --git a/db/repl/rs_exception.h b/db/repl/rs_exception.h
new file mode 100755
index 0000000..e71cad2
--- /dev/null
+++ b/db/repl/rs_exception.h
@@ -0,0 +1,17 @@
+// @file rs_exception.h
+
+#pragma once
+
+namespace mongo {
+
+ class VoteException : public std::exception {
+ public:
+ const char * what() const throw () { return "VoteException"; }
+ };
+
+ class RetryAfterSleepException : public std::exception {
+ public:
+ const char * what() const throw () { return "RetryAfterSleepException"; }
+ };
+
+}
diff --git a/db/repl/rs_initialsync.cpp b/db/repl/rs_initialsync.cpp
new file mode 100644
index 0000000..4c6bd4d
--- /dev/null
+++ b/db/repl/rs_initialsync.cpp
@@ -0,0 +1,214 @@
+/**
+* Copyright (C) 2008 10gen Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "pch.h"
+#include "../client.h"
+#include "../../client/dbclient.h"
+#include "rs.h"
+#include "../oplogreader.h"
+#include "../../util/mongoutils/str.h"
+#include "../dbhelpers.h"
+#include "rs_optime.h"
+#include "../oplog.h"
+
+namespace mongo {
+
+ using namespace mongoutils;
+ using namespace bson;
+
+ void dropAllDatabasesExceptLocal();
+
+ // add try/catch with sleep
+
+ void isyncassert(const char *msg, bool expr) {
+ if( !expr ) {
+ string m = str::stream() << "initial sync " << msg;
+ theReplSet->sethbmsg(m, 0);
+ uasserted(13404, m);
+ }
+ }
+
+ void ReplSetImpl::syncDoInitialSync() {
+ while( 1 ) {
+ try {
+ _syncDoInitialSync();
+ break;
+ }
+ catch(DBException& e) {
+ sethbmsg("initial sync exception " + e.toString(), 0);
+ sleepsecs(30);
+ }
+ }
+ }
+
+ bool cloneFrom(const char *masterHost, string& errmsg, const string& fromdb, bool logForReplication,
+ bool slaveOk, bool useReplAuth, bool snapshot);
+
+ /* todo : progress metering to sethbmsg. */
+ static bool clone(const char *master, string db) {
+ string err;
+ return cloneFrom(master, err, db, false,
+ /*slaveok later can be true*/ false, true, false);
+ }
+
+ void _logOpObjRS(const BSONObj& op);
+
+ bool copyCollectionFromRemote(const string& host, const string& ns, const BSONObj& query, string errmsg);
+
+ static void emptyOplog() {
+ writelock lk(rsoplog);
+ Client::Context ctx(rsoplog);
+ NamespaceDetails *d = nsdetails(rsoplog);
+
+ // temp
+ if( d && d->nrecords == 0 )
+ return; // already empty, ok.
+
+ log(1) << "replSet empty oplog" << rsLog;
+ d->emptyCappedCollection(rsoplog);
+
+ /*
+ string errmsg;
+ bob res;
+ dropCollection(rsoplog, errmsg, res);
+ log() << "replSet recreated oplog so it is empty. todo optimize this..." << rsLog;
+ createOplog();*/
+
+ // TEMP: restart to recreate empty oplog
+ //log() << "replSet FATAL error during initial sync. mongod restart required." << rsLog;
+ //dbexit( EXIT_CLEAN );
+
+ /*
+ writelock lk(rsoplog);
+ Client::Context c(rsoplog, dbpath, 0, doauth/false);
+ NamespaceDetails *oplogDetails = nsdetails(rsoplog);
+ uassert(13412, str::stream() << "replSet error " << rsoplog << " is missing", oplogDetails != 0);
+ oplogDetails->cappedTruncateAfter(rsoplog, h.commonPointOurDiskloc, false);
+ */
+ }
+
+ void ReplSetImpl::_syncDoInitialSync() {
+ sethbmsg("initial sync pending",0);
+
+ StateBox::SP sp = box.get();
+ assert( !sp.state.primary() ); // wouldn't make sense if we were.
+
+ const Member *cp = sp.primary;
+ if( cp == 0 ) {
+ sethbmsg("initial sync need a member to be primary",0);
+ sleepsecs(15);
+ return;
+ }
+
+ string masterHostname = cp->h().toString();
+ OplogReader r;
+ if( !r.connect(masterHostname) ) {
+ sethbmsg( str::stream() << "initial sync couldn't connect to " << cp->h().toString() , 0);
+ sleepsecs(15);
+ return;
+ }
+
+ BSONObj lastOp = r.getLastOp(rsoplog);
+ if( lastOp.isEmpty() ) {
+ sethbmsg("initial sync couldn't read remote oplog", 0);
+ sleepsecs(15);
+ return;
+ }
+ OpTime startingTS = lastOp["ts"]._opTime();
+
+ {
+ /* make sure things aren't too flappy */
+ sleepsecs(5);
+ isyncassert( "flapping?", box.getPrimary() == cp );
+ BSONObj o = r.getLastOp(rsoplog);
+ isyncassert( "flapping [2]?", !o.isEmpty() );
+ }
+
+ sethbmsg("initial sync drop all databases", 0);
+ dropAllDatabasesExceptLocal();
+
+// sethbmsg("initial sync drop oplog", 0);
+// emptyOplog();
+
+ list<string> dbs = r.conn()->getDatabaseNames();
+ for( list<string>::iterator i = dbs.begin(); i != dbs.end(); i++ ) {
+ string db = *i;
+ if( db != "local" ) {
+ sethbmsg( str::stream() << "initial sync cloning db: " << db , 0);
+ bool ok;
+ {
+ writelock lk(db);
+ Client::Context ctx(db);
+ ok = clone(masterHostname.c_str(), db);
+ }
+ if( !ok ) {
+ sethbmsg( str::stream() << "initial sync error clone of " << db << " failed sleeping 5 minutes" ,0);
+ sleepsecs(300);
+ return;
+ }
+ }
+ }
+
+ sethbmsg("initial sync query minValid",0);
+
+ /* our cloned copy will be strange until we apply oplog events that occurred
+ through the process. we note that time point here. */
+ BSONObj minValid = r.getLastOp(rsoplog);
+ assert( !minValid.isEmpty() );
+ OpTime mvoptime = minValid["ts"]._opTime();
+ assert( !mvoptime.isNull() );
+
+ /* copy the oplog
+ */
+ {
+ sethbmsg("initial sync copy+apply oplog");
+ if( ! initialSyncOplogApplication(masterHostname, cp, startingTS, mvoptime) ) { // note we assume here that this call does not throw
+ log() << "replSet initial sync failed during applyoplog" << rsLog;
+ emptyOplog(); // otherwise we'll be up!
+ lastOpTimeWritten = OpTime();
+ lastH = 0;
+ log() << "replSet cleaning up [1]" << rsLog;
+ {
+ writelock lk("local.");
+ Client::Context cx( "local." );
+ cx.db()->flushFiles(true);
+ }
+ log() << "replSet cleaning up [2]" << rsLog;
+ sleepsecs(2);
+ return;
+ }
+ }
+
+ sethbmsg("initial sync finishing up",0);
+
+ assert( !box.getState().primary() ); // wouldn't make sense if we were.
+
+ {
+ writelock lk("local.");
+ Client::Context cx( "local." );
+ cx.db()->flushFiles(true);
+ try {
+ log() << "replSet set minValid=" << minValid["ts"]._opTime().toString() << rsLog;
+ }
+ catch(...) { }
+ Helpers::putSingleton("local.replset.minvalid", minValid);
+ cx.db()->flushFiles(true);
+ }
+
+ sethbmsg("initial sync done",0);
+ }
+
+}
diff --git a/db/repl/rs_initiate.cpp b/db/repl/rs_initiate.cpp
new file mode 100644
index 0000000..9c74be0
--- /dev/null
+++ b/db/repl/rs_initiate.cpp
@@ -0,0 +1,238 @@
+/* @file rs_initiate.cpp
+ */
+
+/**
+* Copyright (C) 2008 10gen Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "pch.h"
+#include "../cmdline.h"
+#include "../commands.h"
+#include "../../util/mmap.h"
+#include "../../util/mongoutils/str.h"
+#include "health.h"
+#include "rs.h"
+#include "rs_config.h"
+#include "../dbhelpers.h"
+
+using namespace bson;
+using namespace mongoutils;
+
+namespace mongo {
+
+ /* called on a reconfig AND on initiate
+ throws
+ @param initial true when initiating
+ */
+ void checkMembersUpForConfigChange(const ReplSetConfig& cfg, bool initial) {
+ int failures = 0;
+ int me = 0;
+ for( vector<ReplSetConfig::MemberCfg>::const_iterator i = cfg.members.begin(); i != cfg.members.end(); i++ ) {
+ if( i->h.isSelf() ) {
+ me++;
+ if( !i->potentiallyHot() ) {
+ uasserted(13420, "initiation and reconfiguration of a replica set must be sent to a node that can become primary");
+ }
+ }
+ }
+ uassert(13278, "bad config - dups?", me <= 1); // dups?
+ uassert(13279, "can't find self in the replset config", me == 1);
+
+ for( vector<ReplSetConfig::MemberCfg>::const_iterator i = cfg.members.begin(); i != cfg.members.end(); i++ ) {
+ BSONObj res;
+ {
+ bool ok = false;
+ try {
+ int theirVersion = -1000;
+ ok = requestHeartbeat(cfg._id, "", i->h.toString(), res, -1, theirVersion, initial/*check if empty*/);
+ if( theirVersion >= cfg.version ) {
+ stringstream ss;
+ ss << "replSet member " << i->h.toString() << " has too new a config version (" << theirVersion << ") to reconfigure";
+ uasserted(13259, ss.str());
+ }
+ }
+ catch(DBException& e) {
+ log() << "replSet cmufcc requestHeartbeat " << i->h.toString() << " : " << e.toString() << rsLog;
+ }
+ catch(...) {
+ log() << "replSet cmufcc error exception in requestHeartbeat?" << rsLog;
+ }
+ if( res.getBoolField("mismatch") )
+ uasserted(13145, "set name does not match the set name host " + i->h.toString() + " expects");
+ if( *res.getStringField("set") ) {
+ if( cfg.version <= 1 ) {
+ // this was to be initiation, no one shoudl be initiated already.
+ uasserted(13256, "member " + i->h.toString() + " is already initiated");
+ }
+ else {
+ // Assure no one has a newer config.
+ if( res["v"].Int() >= cfg.version ) {
+ uasserted(13341, "member " + i->h.toString() + " has a config version >= to the new cfg version; cannot change config");
+ }
+ }
+ }
+ if( !ok && !res["rs"].trueValue() ) {
+ if( !res.isEmpty() ) {
+ /* strange. got a response, but not "ok". log it. */
+ log() << "replSet warning " << i->h.toString() << " replied: " << res.toString() << rsLog;
+ }
+
+ bool allowFailure = false;
+ failures++;
+ if( res.isEmpty() && !initial && failures == 1 ) {
+ /* for now we are only allowing 1 node to be down on a reconfig. this can be made to be a minority
+ trying to keep change small as release is near.
+ */
+ const Member* m = theReplSet->findById( i->_id );
+ if( m ) {
+ // ok, so this was an existing member (wouldn't make sense to add to config a new member that is down)
+ assert( m->h().toString() == i->h.toString() );
+ allowFailure = true;
+ }
+ }
+
+ if( !allowFailure ) {
+ string msg = string("need members up to initiate, not ok : ") + i->h.toString();
+ if( !initial )
+ msg = string("need most members up to reconfigure, not ok : ") + i->h.toString();
+ uasserted(13144, msg);
+ }
+ }
+ }
+ if( initial ) {
+ bool hasData = res["hasData"].Bool();
+ uassert(13311, "member " + i->h.toString() + " has data already, cannot initiate set. All members except initiator must be empty.",
+ !hasData || i->h.isSelf());
+ }
+ }
+ }
+
+ class CmdReplSetInitiate : public ReplSetCommand {
+ public:
+ virtual LockType locktype() const { return NONE; }
+ CmdReplSetInitiate() : ReplSetCommand("replSetInitiate") { }
+ virtual void help(stringstream& h) const {
+ h << "Initiate/christen a replica set.";
+ h << "\nhttp://www.mongodb.org/display/DOCS/Replica+Set+Commands";
+ }
+ virtual bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl) {
+ log() << "replSet replSetInitiate admin command received from client" << rsLog;
+
+ if( !replSet ) {
+ errmsg = "server is not running with --replSet";
+ return false;
+ }
+ if( theReplSet ) {
+ errmsg = "already initialized";
+ result.append("info", "try querying " + rsConfigNs + " to see current configuration");
+ return false;
+ }
+
+ {
+ // just make sure we can get a write lock before doing anything else. we'll reacquire one
+ // later. of course it could be stuck then, but this check lowers the risk if weird things
+ // are up.
+ time_t t = time(0);
+ writelock lk("");
+ if( time(0)-t > 10 ) {
+ errmsg = "took a long time to get write lock, so not initiating. Initiate when server less busy?";
+ return false;
+ }
+
+ /* check that we don't already have an oplog. that could cause issues.
+ it is ok if the initiating member has *other* data than that.
+ */
+ BSONObj o;
+ if( Helpers::getFirst(rsoplog, o) ) {
+ errmsg = rsoplog + string(" is not empty on the initiating member. cannot initiate.");
+ return false;
+ }
+ }
+
+ if( ReplSet::startupStatus == ReplSet::BADCONFIG ) {
+ errmsg = "server already in BADCONFIG state (check logs); not initiating";
+ result.append("info", ReplSet::startupStatusMsg);
+ return false;
+ }
+ if( ReplSet::startupStatus != ReplSet::EMPTYCONFIG ) {
+ result.append("startupStatus", ReplSet::startupStatus);
+ errmsg = "all members and seeds must be reachable to initiate set";
+ result.append("info", cmdLine._replSet);
+ return false;
+ }
+
+ BSONObj configObj;
+
+ if( cmdObj["replSetInitiate"].type() != Object ) {
+ result.append("info2", "no configuration explicitly specified -- making one");
+ log() << "replSet info initiate : no configuration specified. Using a default configuration for the set" << rsLog;
+
+ string name;
+ vector<HostAndPort> seeds;
+ set<HostAndPort> seedSet;
+ parseReplsetCmdLine(cmdLine._replSet, name, seeds, seedSet); // may throw...
+
+ bob b;
+ b.append("_id", name);
+ bob members;
+ members.append("0", BSON( "_id" << 0 << "host" << HostAndPort::Me().toString() ));
+ for( unsigned i = 0; i < seeds.size(); i++ )
+ members.append(bob::numStr(i+1), BSON( "_id" << i+1 << "host" << seeds[i].toString()));
+ b.appendArray("members", members.obj());
+ configObj = b.obj();
+ log() << "replSet created this configuration for initiation : " << configObj.toString() << rsLog;
+ }
+ else {
+ configObj = cmdObj["replSetInitiate"].Obj();
+ }
+
+ bool parsed = false;
+ try {
+ ReplSetConfig newConfig(configObj);
+ parsed = true;
+
+ if( newConfig.version > 1 ) {
+ errmsg = "can't initiate with a version number greater than 1";
+ return false;
+ }
+
+ log() << "replSet replSetInitiate config object parses ok, " << newConfig.members.size() << " members specified" << rsLog;
+
+ checkMembersUpForConfigChange(newConfig, true);
+
+ log() << "replSet replSetInitiate all members seem up" << rsLog;
+
+ writelock lk("");
+ bo comment = BSON( "msg" << "initiating set");
+ newConfig.saveConfigLocally(comment);
+ log() << "replSet replSetInitiate config now saved locally. Should come online in about a minute." << rsLog;
+ result.append("info", "Config now saved locally. Should come online in about a minute.");
+ ReplSet::startupStatus = ReplSet::SOON;
+ ReplSet::startupStatusMsg = "Received replSetInitiate - should come online shortly.";
+ }
+ catch( DBException& e ) {
+ log() << "replSet replSetInitiate exception: " << e.what() << rsLog;
+ if( !parsed )
+ errmsg = string("couldn't parse cfg object ") + e.what();
+ else
+ errmsg = string("couldn't initiate : ") + e.what();
+ return false;
+ }
+
+ return true;
+ }
+ } cmdReplSetInitiate;
+
+}
diff --git a/db/repl/rs_member.h b/db/repl/rs_member.h
new file mode 100644
index 0000000..4f6846a
--- /dev/null
+++ b/db/repl/rs_member.h
@@ -0,0 +1,91 @@
+// @file rsmember.h
+/*
+ * Copyright (C) 2010 10gen Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+/** replica set member */
+
+#pragma once
+
+namespace mongo {
+
+
+ /*
+ RS_STARTUP serving still starting up, or still trying to initiate the set
+ RS_PRIMARY this server thinks it is primary
+ RS_SECONDARY this server thinks it is a secondary (slave mode)
+ RS_RECOVERING recovering/resyncing; after recovery usually auto-transitions to secondary
+ RS_FATAL something bad has occurred and server is not completely offline with regard to the replica set. fatal error.
+ RS_STARTUP2 loaded config, still determining who is primary
+ */
+ struct MemberState {
+ enum MS {
+ RS_STARTUP,
+ RS_PRIMARY,
+ RS_SECONDARY,
+ RS_RECOVERING,
+ RS_FATAL,
+ RS_STARTUP2,
+ RS_UNKNOWN, /* remote node not yet reached */
+ RS_ARBITER,
+ RS_DOWN /* node not reachable for a report */
+ } s;
+
+ MemberState(MS ms = RS_UNKNOWN) : s(ms) { }
+ explicit MemberState(int ms) : s((MS) ms) { }
+
+ bool primary() const { return s == RS_PRIMARY; }
+ bool secondary() const { return s == RS_SECONDARY; }
+ bool recovering() const { return s == RS_RECOVERING; }
+ bool startup2() const { return s == RS_STARTUP2; }
+ bool fatal() const { return s == RS_FATAL; }
+
+ bool operator==(const MemberState& r) const { return s == r.s; }
+ bool operator!=(const MemberState& r) const { return s != r.s; }
+ };
+
+ /* this is supposed to be just basic information on a member,
+ and copy constructable. */
+ class HeartbeatInfo {
+ unsigned _id;
+ public:
+ HeartbeatInfo() : _id(0xffffffff),skew(INT_MIN) { }
+ HeartbeatInfo(unsigned id);
+ bool up() const { return health > 0; }
+ unsigned id() const { return _id; }
+ MemberState hbstate;
+ double health;
+ time_t upSince;
+ time_t lastHeartbeat;
+ string lastHeartbeatMsg;
+ OpTime opTime;
+ int skew;
+
+ /* true if changed in a way of interest to the repl set manager. */
+ bool changed(const HeartbeatInfo& old) const;
+ };
+
+ inline HeartbeatInfo::HeartbeatInfo(unsigned id) : _id(id) {
+ health = -1.0;
+ lastHeartbeat = upSince = 0;
+ skew = INT_MIN;
+ }
+
+ inline bool HeartbeatInfo::changed(const HeartbeatInfo& old) const {
+ return health != old.health ||
+ hbstate != old.hbstate;
+ }
+
+}
diff --git a/db/repl/rs_optime.h b/db/repl/rs_optime.h
new file mode 100644
index 0000000..b3607fa
--- /dev/null
+++ b/db/repl/rs_optime.h
@@ -0,0 +1,58 @@
+// @file rs_optime.h
+
+/*
+ * Copyright (C) 2010 10gen Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#pragma once
+
+#include "../../util/optime.h"
+
+namespace mongo {
+
+ const char rsoplog[] = "local.oplog.rs";
+
+ /*
+ class RSOpTime : public OpTime {
+ public:
+ bool initiated() const { return getSecs() != 0; }
+ };*/
+
+ /*struct RSOpTime {
+ unsigned long long ord;
+
+ RSOpTime() : ord(0) { }
+
+ bool initiated() const { return ord > 0; }
+
+ void initiate() {
+ assert( !initiated() );
+ ord = 1000000;
+ }
+
+ ReplTime inc() {
+ DEV assertInWriteLock();
+ return ++ord;
+ }
+
+ string toString() const { return str::stream() << ord; }
+
+ // query the oplog and set the highest value herein. acquires a db read lock. throws.
+ void load();
+ };
+
+ extern RSOpTime rsOpTime;*/
+
+}
diff --git a/db/repl/rs_rollback.cpp b/db/repl/rs_rollback.cpp
new file mode 100644
index 0000000..1bb7217
--- /dev/null
+++ b/db/repl/rs_rollback.cpp
@@ -0,0 +1,481 @@
+/* @file rs_rollback.cpp
+*
+* Copyright (C) 2008 10gen Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "pch.h"
+#include "../client.h"
+#include "../../client/dbclient.h"
+#include "rs.h"
+#include "../repl.h"
+#include "../query.h"
+
+/* Scenarios
+
+ We went offline with ops not replicated out.
+
+ F = node that failed and coming back.
+ P = node that took over, new primary
+
+ #1:
+ F : a b c d e f g
+ P : a b c d q
+
+ The design is "keep P". One could argue here that "keep F" has some merits, however, in most cases P
+ will have significantly more data. Also note that P may have a proper subset of F's stream if there were
+ no subsequent writes.
+
+ For now the model is simply : get F back in sync with P. If P was really behind or something, we should have
+ just chosen not to fail over anyway.
+
+ #2:
+ F : a b c d e f g -> a b c d
+ P : a b c d
+
+ #3:
+ F : a b c d e f g -> a b c d q r s t u v w x z
+ P : a b c d.q r s t u v w x z
+
+ Steps
+ find an event in common. 'd'.
+ undo our events beyond that by:
+ (1) taking copy from other server of those objects
+ (2) do not consider copy valid until we pass reach an optime after when we fetched the new version of object
+ -- i.e., reset minvalid.
+ (3) we could skip operations on objects that are previous in time to our capture of the object as an optimization.
+
+*/
+
+namespace mongo {
+
+ using namespace bson;
+
+ struct DocID {
+ const char *ns;
+ be _id;
+ bool operator<(const DocID& d) const {
+ int c = strcmp(ns, d.ns);
+ if( c < 0 ) return true;
+ if( c > 0 ) return false;
+ return _id < d._id;
+ }
+ };
+
+ struct HowToFixUp {
+ /* note this is a set -- if there are many $inc's on a single document we need to rollback, we only
+ need to refetch it once. */
+ set<DocID> toRefetch;
+
+ /* collections to drop */
+ set<string> toDrop;
+
+ OpTime commonPoint;
+ DiskLoc commonPointOurDiskloc;
+
+ int rbid; // remote server's current rollback sequence #
+ };
+
+ static void refetch(HowToFixUp& h, const BSONObj& ourObj) {
+ const char *op = ourObj.getStringField("op");
+ if( *op == 'n' )
+ return;
+
+ unsigned long long totSize = 0;
+ totSize += ourObj.objsize();
+ if( totSize > 512 * 1024 * 1024 )
+ throw "rollback too large";
+
+ DocID d;
+ d.ns = ourObj.getStringField("ns");
+ if( *d.ns == 0 ) {
+ log() << "replSet WARNING ignoring op on rollback no ns TODO : " << ourObj.toString() << rsLog;
+ return;
+ }
+
+ bo o = ourObj.getObjectField(*op=='u' ? "o2" : "o");
+ if( o.isEmpty() ) {
+ log() << "replSet warning ignoring op on rollback : " << ourObj.toString() << rsLog;
+ return;
+ }
+
+ if( *op == 'c' ) {
+ be first = o.firstElement();
+ NamespaceString s(d.ns); // foo.$cmd
+
+ if( string("create") == first.fieldName() ) {
+ /* Create collection operation
+ { ts: ..., h: ..., op: "c", ns: "foo.$cmd", o: { create: "abc", ... } }
+ */
+ string ns = s.db + '.' + o["create"].String(); // -> foo.abc
+ h.toDrop.insert(ns);
+ return;
+ }
+ else {
+ log() << "replSet WARNING can't roll back this command yet: " << o.toString() << rsLog;
+ }
+ }
+
+ d._id = o["_id"];
+ if( d._id.eoo() ) {
+ log() << "replSet WARNING ignoring op on rollback no _id TODO : " << d.ns << ' '<< ourObj.toString() << rsLog;
+ return;
+ }
+
+ h.toRefetch.insert(d);
+ }
+
+ int getRBID(DBClientConnection*);
+
+ static void syncRollbackFindCommonPoint(DBClientConnection *them, HowToFixUp& h) {
+ static time_t last;
+ if( time(0)-last < 60 ) {
+ // this could put a lot of load on someone else, don't repeat too often
+ sleepsecs(10);
+ throw "findcommonpoint waiting a while before trying again";
+ }
+ last = time(0);
+
+ assert( dbMutex.atLeastReadLocked() );
+ Client::Context c(rsoplog, dbpath, 0, false);
+ NamespaceDetails *nsd = nsdetails(rsoplog);
+ assert(nsd);
+ ReverseCappedCursor u(nsd);
+ if( !u.ok() )
+ throw "our oplog empty or unreadable";
+
+ const Query q = Query().sort(reverseNaturalObj);
+ const bo fields = BSON( "ts" << 1 << "h" << 1 );
+
+ //auto_ptr<DBClientCursor> u = us->query(rsoplog, q, 0, 0, &fields, 0, 0);
+
+ h.rbid = getRBID(them);
+ auto_ptr<DBClientCursor> t = them->query(rsoplog, q, 0, 0, &fields, 0, 0);
+
+ if( t.get() == 0 || !t->more() ) throw "remote oplog empty or unreadable";
+
+ BSONObj ourObj = u.current();
+ OpTime ourTime = ourObj["ts"]._opTime();
+ BSONObj theirObj = t->nextSafe();
+ OpTime theirTime = theirObj["ts"]._opTime();
+
+ if( 1 ) {
+ long long diff = (long long) ourTime.getSecs() - ((long long) theirTime.getSecs());
+ /* diff could be positive, negative, or zero */
+ log() << "replSet info syncRollback diff in end of log times : " << diff << " seconds" << rsLog;
+ if( diff > 3600 ) {
+ log() << "replSet syncRollback too long a time period for a rollback." << rsLog;
+ throw "error not willing to roll back more than one hour of data";
+ }
+ }
+
+ unsigned long long scanned = 0;
+ while( 1 ) {
+ scanned++;
+ /* todo add code to assure no excessive scanning for too long */
+ if( ourTime == theirTime ) {
+ if( ourObj["h"].Long() == theirObj["h"].Long() ) {
+ // found the point back in time where we match.
+ // todo : check a few more just to be careful about hash collisions.
+ log() << "replSet rollback found matching events at " << ourTime.toStringPretty() << rsLog;
+ log() << "replSet rollback findcommonpoint scanned : " << scanned << rsLog;
+ h.commonPoint = ourTime;
+ h.commonPointOurDiskloc = u.currLoc();
+ return;
+ }
+
+ refetch(h, ourObj);
+
+ theirObj = t->nextSafe();
+ theirTime = theirObj["ts"]._opTime();
+
+ u.advance();
+ if( !u.ok() ) throw "reached beginning of local oplog";
+ ourObj = u.current();
+ ourTime = ourObj["ts"]._opTime();
+ }
+ else if( theirTime > ourTime ) {
+ /* todo: we could hit beginning of log here. exception thrown is ok but not descriptive, so fix up */
+ theirObj = t->nextSafe();
+ theirTime = theirObj["ts"]._opTime();
+ }
+ else {
+ // theirTime < ourTime
+ refetch(h, ourObj);
+ u.advance();
+ if( !u.ok() ) throw "reached beginning of local oplog";
+ ourObj = u.current();
+ ourTime = ourObj["ts"]._opTime();
+ }
+ }
+ }
+
+ struct X {
+ const bson::bo *op;
+ bson::bo goodVersionOfObject;
+ };
+
+ void ReplSetImpl::syncFixUp(HowToFixUp& h, OplogReader& r) {
+ DBClientConnection *them = r.conn();
+
+ // fetch all first so we needn't handle interruption in a fancy way
+
+ unsigned long long totSize = 0;
+
+ list< pair<DocID,bo> > goodVersions;
+
+ bo newMinValid;
+
+ DocID d;
+ unsigned long long n = 0;
+ try {
+ for( set<DocID>::iterator i = h.toRefetch.begin(); i != h.toRefetch.end(); i++ ) {
+ d = *i;
+
+ assert( !d._id.eoo() );
+
+ {
+ /* TODO : slow. lots of round trips. */
+ n++;
+ bo good= them->findOne(d.ns, d._id.wrap()).getOwned();
+ totSize += good.objsize();
+ uassert( 13410, "replSet too much data to roll back", totSize < 300 * 1024 * 1024 );
+
+ // note good might be eoo, indicating we should delete it
+ goodVersions.push_back(pair<DocID,bo>(d,good));
+ }
+ }
+ newMinValid = r.getLastOp(rsoplog);
+ if( newMinValid.isEmpty() ) {
+ sethbmsg("syncRollback error newMinValid empty?");
+ return;
+ }
+ }
+ catch(DBException& e) {
+ sethbmsg(str::stream() << "syncRollback re-get objects: " << e.toString(),0);
+ log() << "syncRollback couldn't re-get ns:" << d.ns << " _id:" << d._id << ' ' << n << '/' << h.toRefetch.size() << rsLog;
+ throw e;
+ }
+
+ sethbmsg("syncRollback 3.5");
+ if( h.rbid != getRBID(r.conn()) ) {
+ // our source rolled back itself. so the data we received isn't necessarily consistent.
+ sethbmsg("syncRollback rbid on source changed during rollback, cancelling this attempt");
+ return;
+ }
+
+ // update them
+ sethbmsg(str::stream() << "syncRollback 4 n:" << goodVersions.size());
+
+ bool warn = false;
+
+ assert( !h.commonPointOurDiskloc.isNull() );
+
+ MemoryMappedFile::flushAll(true);
+
+ dbMutex.assertWriteLocked();
+
+ /* we have items we are writing that aren't from a point-in-time. thus best not to come online
+ until we get to that point in freshness. */
+ try {
+ log() << "replSet set minvalid=" << newMinValid["ts"]._opTime().toString() << rsLog;
+ }
+ catch(...){}
+ Helpers::putSingleton("local.replset.minvalid", newMinValid);
+
+ /** first drop collections to drop - that might make things faster below actually if there were subsequent inserts */
+ for( set<string>::iterator i = h.toDrop.begin(); i != h.toDrop.end(); i++ ) {
+ Client::Context c(*i, dbpath, 0, /*doauth*/false);
+ try {
+ bob res;
+ string errmsg;
+ log(1) << "replSet rollback drop: " << *i << rsLog;
+ dropCollection(*i, errmsg, res);
+ }
+ catch(...) {
+ log() << "replset rollback error dropping collection " << *i << rsLog;
+ }
+ }
+
+ Client::Context c(rsoplog, dbpath, 0, /*doauth*/false);
+ NamespaceDetails *oplogDetails = nsdetails(rsoplog);
+ uassert(13423, str::stream() << "replSet error in rollback can't find " << rsoplog, oplogDetails);
+
+ map<string,shared_ptr<RemoveSaver> > removeSavers;
+
+ unsigned deletes = 0, updates = 0;
+ for( list<pair<DocID,bo> >::iterator i = goodVersions.begin(); i != goodVersions.end(); i++ ) {
+ const DocID& d = i->first;
+ bo pattern = d._id.wrap(); // { _id : ... }
+ try {
+ assert( d.ns && *d.ns );
+
+ shared_ptr<RemoveSaver>& rs = removeSavers[d.ns];
+ if ( ! rs )
+ rs.reset( new RemoveSaver( "rollback" , "" , d.ns ) );
+
+ // todo: lots of overhead in context, this can be faster
+ Client::Context c(d.ns, dbpath, 0, /*doauth*/false);
+ if( i->second.isEmpty() ) {
+ // wasn't on the primary; delete.
+ /* TODO1.6 : can't delete from a capped collection. need to handle that here. */
+ deletes++;
+
+ NamespaceDetails *nsd = nsdetails(d.ns);
+ if( nsd ) {
+ if( nsd->capped ) {
+ /* can't delete from a capped collection - so we truncate instead. if this item must go,
+ so must all successors!!! */
+ try {
+ /** todo: IIRC cappedTrunateAfter does not handle completely empty. todo. */
+ // this will crazy slow if no _id index.
+ long long start = Listener::getElapsedTimeMillis();
+ DiskLoc loc = Helpers::findOne(d.ns, pattern, false);
+ if( Listener::getElapsedTimeMillis() - start > 200 )
+ log() << "replSet warning roll back slow no _id index for " << d.ns << rsLog;
+ //would be faster but requires index: DiskLoc loc = Helpers::findById(nsd, pattern);
+ if( !loc.isNull() ) {
+ try {
+ nsd->cappedTruncateAfter(d.ns, loc, true);
+ }
+ catch(DBException& e) {
+ if( e.getCode() == 13415 ) {
+ // hack: need to just make cappedTruncate do this...
+ nsd->emptyCappedCollection(d.ns);
+ } else {
+ throw;
+ }
+ }
+ }
+ }
+ catch(DBException& e) {
+ log() << "replSet error rolling back capped collection rec " << d.ns << ' ' << e.toString() << rsLog;
+ }
+ }
+ else {
+ try {
+ deletes++;
+ deleteObjects(d.ns, pattern, /*justone*/true, /*logop*/false, /*god*/true, rs.get() );
+ }
+ catch(...) {
+ log() << "replSet error rollback delete failed ns:" << d.ns << rsLog;
+ }
+ }
+ // did we just empty the collection? if so let's check if it even exists on the source.
+ if( nsd->nrecords == 0 ) {
+ try {
+ string sys = cc().database()->name + ".system.namespaces";
+ bo o = them->findOne(sys, QUERY("name"<<d.ns));
+ if( o.isEmpty() ) {
+ // we should drop
+ try {
+ bob res;
+ string errmsg;
+ dropCollection(d.ns, errmsg, res);
+ }
+ catch(...) {
+ log() << "replset error rolling back collection " << d.ns << rsLog;
+ }
+ }
+ }
+ catch(DBException& ) {
+ /* this isn't *that* big a deal, but is bad. */
+ log() << "replSet warning rollback error querying for existence of " << d.ns << " at the primary, ignoring" << rsLog;
+ }
+ }
+ }
+ }
+ else {
+ // todo faster...
+ OpDebug debug;
+ updates++;
+ _updateObjects(/*god*/true, d.ns, i->second, pattern, /*upsert=*/true, /*multi=*/false , /*logtheop=*/false , debug, rs.get() );
+ }
+ }
+ catch(DBException& e) {
+ log() << "replSet exception in rollback ns:" << d.ns << ' ' << pattern.toString() << ' ' << e.toString() << " ndeletes:" << deletes << rsLog;
+ warn = true;
+ }
+ }
+
+ removeSavers.clear(); // this effectively closes all of them
+
+ sethbmsg(str::stream() << "syncRollback 5 d:" << deletes << " u:" << updates);
+ MemoryMappedFile::flushAll(true);
+ sethbmsg("syncRollback 6");
+
+ // clean up oplog
+ log(2) << "replSet rollback truncate oplog after " << h.commonPoint.toStringPretty() << rsLog;
+ // todo: fatal error if this throws?
+ oplogDetails->cappedTruncateAfter(rsoplog, h.commonPointOurDiskloc, false);
+
+ /* reset cached lastoptimewritten and h value */
+ loadLastOpTimeWritten();
+
+ sethbmsg("syncRollback 7");
+ MemoryMappedFile::flushAll(true);
+
+ // done
+ if( warn )
+ sethbmsg("issues during syncRollback, see log");
+ else
+ sethbmsg("syncRollback done");
+ }
+
+ void ReplSetImpl::syncRollback(OplogReader&r) {
+ assert( !lockedByMe() );
+ assert( !dbMutex.atLeastReadLocked() );
+
+ sethbmsg("syncRollback 0");
+
+ writelocktry lk(rsoplog, 20000);
+ if( !lk.got() ) {
+ sethbmsg("syncRollback couldn't get write lock in a reasonable time");
+ sleepsecs(2);
+ return;
+ }
+
+ HowToFixUp how;
+ sethbmsg("syncRollback 1");
+ {
+ r.resetCursor();
+ /*DBClientConnection us(false, 0, 0);
+ string errmsg;
+ if( !us.connect(HostAndPort::me().toString(),errmsg) ) {
+ sethbmsg("syncRollback connect to self failure" + errmsg);
+ return;
+ }*/
+
+ sethbmsg("syncRollback 2 FindCommonPoint");
+ try {
+ syncRollbackFindCommonPoint(r.conn(), how);
+ }
+ catch( const char *p ) {
+ sethbmsg(string("syncRollback 2 error ") + p);
+ sleepsecs(10);
+ return;
+ }
+ catch( DBException& e ) {
+ sethbmsg(string("syncRollback 2 exception ") + e.toString() + "; sleeping 1 min");
+ sleepsecs(60);
+ throw;
+ }
+ }
+
+ sethbmsg("replSet syncRollback 3 fixup");
+
+ syncFixUp(how, r);
+ }
+
+}
diff --git a/db/repl/rs_sync.cpp b/db/repl/rs_sync.cpp
new file mode 100644
index 0000000..bece96c
--- /dev/null
+++ b/db/repl/rs_sync.cpp
@@ -0,0 +1,328 @@
+/**
+* Copyright (C) 2008 10gen Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "pch.h"
+#include "../client.h"
+#include "../../client/dbclient.h"
+#include "rs.h"
+#include "../repl.h"
+
+namespace mongo {
+
+ using namespace bson;
+
+ void startSyncThread() {
+ Client::initThread("rs_sync");
+ theReplSet->syncThread();
+ cc().shutdown();
+ }
+
+ void ReplSetImpl::syncApply(const BSONObj &o) {
+ //const char *op = o.getStringField("op");
+
+ char db[MaxDatabaseLen];
+ const char *ns = o.getStringField("ns");
+ nsToDatabase(ns, db);
+
+ if ( *ns == '.' || *ns == 0 ) {
+ if( *o.getStringField("op") == 'n' )
+ return;
+ log() << "replSet skipping bad op in oplog: " << o.toString() << endl;
+ return;
+ }
+
+ Client::Context ctx(ns);
+ ctx.getClient()->curop()->reset();
+
+ /* todo : if this asserts, do we want to ignore or not? */
+ applyOperation_inlock(o);
+ }
+
+ bool ReplSetImpl::initialSyncOplogApplication(
+ string hn,
+ const Member *primary,
+ OpTime applyGTE,
+ OpTime minValid)
+ {
+ if( primary == 0 ) return false;
+
+ OpTime ts;
+ try {
+ OplogReader r;
+ if( !r.connect(hn) ) {
+ log(2) << "replSet can't connect to " << hn << " to read operations" << rsLog;
+ return false;
+ }
+
+ r.query(rsoplog, bo());
+ assert( r.haveCursor() );
+
+ /* we lock outside the loop to avoid the overhead of locking on every operation. server isn't usable yet anyway! */
+ writelock lk("");
+
+ {
+ if( !r.more() ) {
+ sethbmsg("replSet initial sync error reading remote oplog");
+ return false;
+ }
+ bo op = r.next();
+ OpTime t = op["ts"]._opTime();
+ r.putBack(op);
+ assert( !t.isNull() );
+ if( t > applyGTE ) {
+ sethbmsg(str::stream() << "error " << hn << " oplog wrapped during initial sync");
+ return false;
+ }
+ }
+
+ // todo : use exhaust
+ unsigned long long n = 0;
+ while( 1 ) {
+ if( !r.more() )
+ break;
+ BSONObj o = r.nextSafe(); /* note we might get "not master" at some point */
+ {
+ //writelock lk("");
+
+ ts = o["ts"]._opTime();
+
+ /* if we have become primary, we dont' want to apply things from elsewhere
+ anymore. assumePrimary is in the db lock so we are safe as long as
+ we check after we locked above. */
+ const Member *p1 = box.getPrimary();
+ if( p1 != primary ) {
+ log() << "replSet primary was:" << primary->fullName() << " now:" <<
+ (p1 != 0 ? p1->fullName() : "none") << rsLog;
+ throw DBException("primary changed",0);
+ }
+
+ if( ts >= applyGTE ) {
+ // optimes before we started copying need not be applied.
+ syncApply(o);
+ }
+ _logOpObjRS(o); /* with repl sets we write the ops to our oplog too */
+ }
+ if( ++n % 100000 == 0 ) {
+ // simple progress metering
+ log() << "replSet initialSyncOplogApplication " << n << rsLog;
+ }
+ }
+ }
+ catch(DBException& e) {
+ if( ts <= minValid ) {
+ // didn't make it far enough
+ log() << "replSet initial sync failing, error applying oplog " << e.toString() << rsLog;
+ return false;
+ }
+ }
+ return true;
+ }
+
+ void ReplSetImpl::syncTail() {
+ // todo : locking vis a vis the mgr...
+
+ const Member *primary = box.getPrimary();
+ if( primary == 0 ) return;
+ string hn = primary->h().toString();
+ OplogReader r;
+ if( !r.connect(primary->h().toString()) ) {
+ log(2) << "replSet can't connect to " << hn << " to read operations" << rsLog;
+ return;
+ }
+
+ /* first make sure we are not hopelessly out of sync by being very stale. */
+ {
+ BSONObj remoteOldestOp = r.findOne(rsoplog, Query());
+ OpTime ts = remoteOldestOp["ts"]._opTime();
+ DEV log() << "remoteOldestOp: " << ts.toStringPretty() << endl;
+ else log(3) << "remoteOldestOp: " << ts.toStringPretty() << endl;
+ if( lastOpTimeWritten < ts ) {
+ log() << "replSet error too stale to catch up, at least from primary " << hn << rsLog;
+ log() << "replSet our last optime : " << lastOpTimeWritten.toStringPretty() << rsLog;
+ log() << "replSet oldest at " << hn << " : " << ts.toStringPretty() << rsLog;
+ log() << "replSet See http://www.mongodb.org/display/DOCS/Resyncing+a+Very+Stale+Replica+Set+Member" << rsLog;
+ sethbmsg("error too stale to catch up");
+ sleepsecs(120);
+ return;
+ }
+ }
+
+ r.tailingQueryGTE(rsoplog, lastOpTimeWritten);
+ assert( r.haveCursor() );
+ assert( r.awaitCapable() );
+
+ {
+ if( !r.more() ) {
+ /* maybe we are ahead and need to roll back? */
+ try {
+ bo theirLastOp = r.getLastOp(rsoplog);
+ if( theirLastOp.isEmpty() ) {
+ log() << "replSet error empty query result from " << hn << " oplog" << rsLog;
+ sleepsecs(2);
+ return;
+ }
+ OpTime theirTS = theirLastOp["ts"]._opTime();
+ if( theirTS < lastOpTimeWritten ) {
+ log() << "replSet we are ahead of the primary, will try to roll back" << rsLog;
+ syncRollback(r);
+ return;
+ }
+ /* we're not ahead? maybe our new query got fresher data. best to come back and try again */
+ log() << "replSet syncTail condition 1" << rsLog;
+ sleepsecs(1);
+ }
+ catch(DBException& e) {
+ log() << "replSet error querying " << hn << ' ' << e.toString() << rsLog;
+ sleepsecs(2);
+ }
+ return;
+ /*
+ log() << "replSet syncTail error querying oplog >= " << lastOpTimeWritten.toString() << " from " << hn << rsLog;
+ try {
+ log() << "replSet " << hn << " last op: " << r.getLastOp(rsoplog).toString() << rsLog;
+ }
+ catch(...) { }
+ sleepsecs(1);
+ return;*/
+ }
+
+ BSONObj o = r.nextSafe();
+ OpTime ts = o["ts"]._opTime();
+ long long h = o["h"].numberLong();
+ if( ts != lastOpTimeWritten || h != lastH ) {
+ log(1) << "TEMP our last op time written: " << lastOpTimeWritten.toStringPretty() << endl;
+ log(1) << "TEMP primary's GTE: " << ts.toStringPretty() << endl;
+ /*
+ }*/
+
+ syncRollback(r);
+ return;
+ }
+ }
+
+ while( 1 ) {
+ while( 1 ) {
+ if( !r.moreInCurrentBatch() ) {
+ /* we need to occasionally check some things. between
+ batches is probably a good time. */
+
+ /* perhaps we should check this earlier? but not before the rollback checks. */
+ if( state().recovering() ) {
+ /* can we go to RS_SECONDARY state? we can if not too old and if minvalid achieved */
+ bool golive = false;
+ OpTime minvalid;
+ {
+ readlock lk("local.replset.minvalid");
+ BSONObj mv;
+ if( Helpers::getSingleton("local.replset.minvalid", mv) ) {
+ minvalid = mv["ts"]._opTime();
+ if( minvalid <= lastOpTimeWritten ) {
+ golive=true;
+ }
+ }
+ else
+ golive = true; /* must have been the original member */
+ }
+ if( golive ) {
+ sethbmsg("");
+ log() << "replSet SECONDARY" << rsLog;
+ changeState(MemberState::RS_SECONDARY);
+ }
+ else {
+ sethbmsg(str::stream() << "still syncing, not yet to minValid optime " << minvalid.toString());
+ }
+
+ /* todo: too stale capability */
+ }
+
+ if( box.getPrimary() != primary )
+ return;
+ }
+ if( !r.more() )
+ break;
+ {
+ BSONObj o = r.nextSafe(); /* note we might get "not master" at some point */
+ {
+ writelock lk("");
+
+ /* if we have become primary, we dont' want to apply things from elsewhere
+ anymore. assumePrimary is in the db lock so we are safe as long as
+ we check after we locked above. */
+ if( box.getPrimary() != primary ) {
+ if( box.getState().primary() )
+ log(0) << "replSet stopping syncTail we are now primary" << rsLog;
+ return;
+ }
+
+ syncApply(o);
+ _logOpObjRS(o); /* with repl sets we write the ops to our oplog too: */
+ }
+ }
+ }
+ r.tailCheck();
+ if( !r.haveCursor() ) {
+ log() << "replSet TEMP end syncTail pass with " << hn << rsLog;
+ // TODO : reuse our cnonection to the primary.
+ return;
+ }
+ if( box.getPrimary() != primary )
+ return;
+ // looping back is ok because this is a tailable cursor
+ }
+ }
+
+ void ReplSetImpl::_syncThread() {
+ StateBox::SP sp = box.get();
+ if( sp.state.primary() ) {
+ sleepsecs(1);
+ return;
+ }
+
+ /* later, we can sync from up secondaries if we want. tbd. */
+ if( sp.primary == 0 )
+ return;
+
+ /* do we have anything at all? */
+ if( lastOpTimeWritten.isNull() ) {
+ syncDoInitialSync();
+ return; // _syncThread will be recalled, starts from top again in case sync failed.
+ }
+
+ /* we have some data. continue tailing. */
+ syncTail();
+ }
+
+ void ReplSetImpl::syncThread() {
+ if( myConfig().arbiterOnly )
+ return;
+ while( 1 ) {
+ try {
+ _syncThread();
+ }
+ catch(DBException& e) {
+ sethbmsg("syncThread: " + e.toString());
+ sleepsecs(10);
+ }
+ catch(...) {
+ sethbmsg("unexpected exception in syncThread()");
+ // TODO : SET NOT SECONDARY here.
+ sleepsecs(60);
+ }
+ sleepsecs(1);
+ }
+ }
+
+}
diff --git a/db/repl/test.html b/db/repl/test.html
new file mode 100644
index 0000000..295ad2e
--- /dev/null
+++ b/db/repl/test.html
@@ -0,0 +1,11 @@
+<HTML>
+<BODY>
+<!-- see also jstests/rs/ -->
+<iframe src="http://127.0.0.1:28000/_replSet" width="100%" height="50%" frameborder=1>
+</iframe>
+
+<iframe src="http://127.0.0.1:28001/_replSet" width="100%" height="50%" frameborder=1>
+</iframe>
+
+</BODY>
+</HTML>
diff --git a/db/repl/testing.js b/db/repl/testing.js
new file mode 100644
index 0000000..d741cf3
--- /dev/null
+++ b/db/repl/testing.js
@@ -0,0 +1,42 @@
+// helpers for testing repl sets
+// run
+// mongo --shell <host:port> testing.js
+
+cfg = {
+ _id: 'asdf',
+ members: [
+ { _id : 0, host : "dm_hp" },
+ { _id : 2, host : "dm_hp:27002" }
+ ]
+};
+c2 = {
+ _id: 'asdf',
+ members: [
+ { _id: 0, host: "dmthink" },
+ { _id: 2, host: "dmthink:27002" }
+ ]
+};
+
+db = db.getSisterDB("admin");
+local = db.getSisterDB("local");
+
+print("\n\ndb = admin db on localhost:27017");
+print("b = admin on localhost:27002");
+print("rc(x) = db.runCommand(x)");
+print("cfg = samp replset config");
+print("i() = replSetInitiate(cfg)");
+print("ism() = rc('ismaster')");
+print("\n\n");
+
+function rc(c) { return db.runCommand(c); }
+function i() { return rc({ replSetInitiate: cfg }); }
+function ism() { return rc("isMaster"); }
+
+b = 0;
+try {
+ b = new Mongo("localhost:27002").getDB("admin");
+}
+catch (e) {
+ print("\nCouldn't connect to b mongod instance\n");
+}
+