diff options
Diffstat (limited to 'db/repl')
-rw-r--r-- | db/repl/connections.h | 49 | ||||
-rw-r--r-- | db/repl/consensus.cpp | 124 | ||||
-rw-r--r-- | db/repl/health.cpp | 161 | ||||
-rw-r--r-- | db/repl/health.h | 8 | ||||
-rw-r--r-- | db/repl/heartbeat.cpp | 71 | ||||
-rw-r--r-- | db/repl/manager.cpp | 70 | ||||
-rw-r--r-- | db/repl/multicmd.h | 29 | ||||
-rw-r--r-- | db/repl/replset_commands.cpp | 106 | ||||
-rw-r--r-- | db/repl/rs.cpp | 282 | ||||
-rw-r--r-- | db/repl/rs.h | 115 | ||||
-rw-r--r-- | db/repl/rs_config.cpp | 174 | ||||
-rw-r--r-- | db/repl/rs_config.h | 20 | ||||
-rw-r--r--[-rwxr-xr-x] | db/repl/rs_exception.h | 18 | ||||
-rw-r--r-- | db/repl/rs_initialsync.cpp | 205 | ||||
-rw-r--r-- | db/repl/rs_initiate.cpp | 66 | ||||
-rw-r--r-- | db/repl/rs_member.h | 35 | ||||
-rw-r--r-- | db/repl/rs_optime.h | 114 | ||||
-rw-r--r-- | db/repl/rs_rollback.cpp | 661 | ||||
-rw-r--r-- | db/repl/rs_sync.cpp | 368 |
19 files changed, 1585 insertions, 1091 deletions
diff --git a/db/repl/connections.h b/db/repl/connections.h index cdf2fad..7e7bfe5 100644 --- a/db/repl/connections.h +++ b/db/repl/connections.h @@ -1,4 +1,4 @@ -// @file +// @file /* * Copyright (C) 2010 10gen Inc. @@ -20,11 +20,12 @@ #include <map> #include "../../client/dbclient.h" +#include "../security_key.h" -namespace mongo { +namespace mongo { - /** here we keep a single connection (with reconnect) for a set of hosts, - one each, and allow one user at a time per host. if in use already for that + /** here we keep a single connection (with reconnect) for a set of hosts, + one each, and allow one user at a time per host. if in use already for that host, we block. so this is an easy way to keep a 1-deep pool of connections that many threads can share. @@ -39,35 +40,37 @@ namespace mongo { throws exception on connect error (but fine to try again later with a new scopedconn object for same host). */ - class ScopedConn { + class ScopedConn { public: /** throws assertions if connect failure etc. */ ScopedConn(string hostport); ~ScopedConn(); /* If we were to run a query and not exhaust the cursor, future use of the connection would be problematic. - So here what we do is wrapper known safe methods and not allow cursor-style queries at all. This makes + So here what we do is wrapper known safe methods and not allow cursor-style queries at all. This makes ScopedConn limited in functionality but very safe. More non-cursor wrappers can be added here if needed. */ bool runCommand(const string &dbname, const BSONObj& cmd, BSONObj &info, int options=0) { return conn()->runCommand(dbname, cmd, info, options); } - unsigned long long count(const string &ns) { - return conn()->count(ns); + unsigned long long count(const string &ns) { + return conn()->count(ns); } - BSONObj findOne(const string &ns, const Query& q, const BSONObj *fieldsToReturn = 0, int queryOptions = 0) { + BSONObj findOne(const string &ns, const Query& q, const BSONObj *fieldsToReturn = 0, int queryOptions = 0) { return conn()->findOne(ns, q, fieldsToReturn, queryOptions); } + void setTimeout(double to) { + conn()->setSoTimeout(to); + } private: auto_ptr<scoped_lock> connLock; - static mutex mapMutex; - struct X { - mutex z; + static mongo::mutex mapMutex; + struct X { + mongo::mutex z; DBClientConnection cc; - X() : z("X"), cc(/*reconnect*/ true, 0, - /*timeout*/ theReplSet ? theReplSet->config().ho.heartbeatTimeoutMillis/1000.0 : 10.0) { + X() : z("X"), cc(/*reconnect*/ true, 0, /*timeout*/ 10.0) { cc._logLevel = 2; } } *x; @@ -87,22 +90,30 @@ namespace mongo { connLock.reset( new scoped_lock(x->z) ); } } - if( !first ) { + if( !first ) { connLock.reset( new scoped_lock(x->z) ); return; } // we already locked above... string err; - x->cc.connect(hostport, err); + if (!x->cc.connect(hostport, err)) { + log() << "couldn't connect to " << hostport << ": " << err << rsLog; + return; + } + + if (!noauth && !x->cc.auth("local", internalSecurity.user, internalSecurity.pwd, err, false)) { + log() << "could not authenticate against " << conn()->toString() << ", " << err << rsLog; + return; + } } - inline ScopedConn::~ScopedConn() { + inline ScopedConn::~ScopedConn() { // conLock releases... } - /*inline DBClientConnection* ScopedConn::operator->() { - return &x->cc; + /*inline DBClientConnection* ScopedConn::operator->() { + return &x->cc; }*/ } diff --git a/db/repl/consensus.cpp b/db/repl/consensus.cpp index 1519c26..f764abe 100644 --- a/db/repl/consensus.cpp +++ b/db/repl/consensus.cpp @@ -19,9 +19,9 @@ #include "rs.h" #include "multicmd.h" -namespace mongo { +namespace mongo { - class CmdReplSetFresh : public ReplSetCommand { + class CmdReplSetFresh : public ReplSetCommand { public: CmdReplSetFresh() : ReplSetCommand("replSetFresh") { } private: @@ -29,23 +29,23 @@ namespace mongo { if( !check(errmsg, result) ) return false; - if( cmdObj["set"].String() != theReplSet->name() ) { + if( cmdObj["set"].String() != theReplSet->name() ) { errmsg = "wrong repl set name"; return false; } string who = cmdObj["who"].String(); int cfgver = cmdObj["cfgver"].Int(); - OpTime opTime(cmdObj["opTime"].Date()); + OpTime opTime(cmdObj["opTime"].Date()); bool weAreFresher = false; - if( theReplSet->config().version > cfgver ) { + if( theReplSet->config().version > cfgver ) { log() << "replSet member " << who << " is not yet aware its cfg version " << cfgver << " is stale" << rsLog; - result.append("info", "config version stale"); + result.append("info", "config version stale"); + weAreFresher = true; + } + else if( opTime < theReplSet->lastOpTimeWritten ) { weAreFresher = true; } - else if( opTime < theReplSet->lastOpTimeWritten ) { - weAreFresher = true; - } result.appendDate("opTime", theReplSet->lastOpTimeWritten.asDate()); result.append("fresher", weAreFresher); return true; @@ -66,19 +66,19 @@ namespace mongo { } } cmdReplSetElect; - int Consensus::totalVotes() const { + int Consensus::totalVotes() const { static int complain = 0; int vTot = rs._self->config().votes; - for( Member *m = rs.head(); m; m=m->next() ) + for( Member *m = rs.head(); m; m=m->next() ) vTot += m->config().votes; if( vTot % 2 == 0 && vTot && complain++ == 0 ) - log() << "replSet warning total number of votes is even - considering giving one member an extra vote" << rsLog; + log() << "replSet " /*buildbot! warning */ "total number of votes is even - add arbiter or give one member an extra vote" << rsLog; return vTot; } bool Consensus::aMajoritySeemsToBeUp() const { int vUp = rs._self->config().votes; - for( Member *m = rs.head(); m; m=m->next() ) + for( Member *m = rs.head(); m; m=m->next() ) vUp += m->hbinfo().up() ? m->config().votes : 0; return vUp * 2 > totalVotes(); } @@ -98,13 +98,13 @@ namespace mongo { const time_t LeaseTime = 30; - unsigned Consensus::yea(unsigned memberId) /* throws VoteException */ { + unsigned Consensus::yea(unsigned memberId) { /* throws VoteException */ Atomic<LastYea>::tran t(ly); LastYea &ly = t.ref(); time_t now = time(0); if( ly.when + LeaseTime >= now && ly.who != memberId ) { log(1) << "replSet not voting yea for " << memberId << - " voted for " << ly.who << ' ' << now-ly.when << " secs ago" << rsLog; + " voted for " << ly.who << ' ' << now-ly.when << " secs ago" << rsLog; throw VoteException(); } ly.when = now; @@ -112,7 +112,7 @@ namespace mongo { return rs._self->config().votes; } - /* we vote for ourself at start of election. once it fails, we can cancel the lease we had in + /* we vote for ourself at start of election. once it fails, we can cancel the lease we had in place instead of leaving it for a long time. */ void Consensus::electionFailed(unsigned meid) { @@ -124,7 +124,7 @@ namespace mongo { } /* todo: threading **************** !!!!!!!!!!!!!!!! */ - void Consensus::electCmdReceived(BSONObj cmd, BSONObjBuilder* _b) { + void Consensus::electCmdReceived(BSONObj cmd, BSONObjBuilder* _b) { BSONObjBuilder& b = *_b; DEV log() << "replSet received elect msg " << cmd.toString() << rsLog; else log(2) << "replSet received elect msg " << cmd.toString() << rsLog; @@ -138,14 +138,14 @@ namespace mongo { const Member* hopeful = rs.findById(whoid); int vote = 0; - if( set != rs.name() ) { + if( set != rs.name() ) { log() << "replSet error received an elect request for '" << set << "' but our set name is '" << rs.name() << "'" << rsLog; } - else if( myver < cfgver ) { + else if( myver < cfgver ) { // we are stale. don't vote } - else if( myver > cfgver ) { + else if( myver > cfgver ) { // they are stale! log() << "replSet info got stale version # during election" << rsLog; vote = -10000; @@ -154,10 +154,10 @@ namespace mongo { log() << "couldn't find member with id " << whoid << rsLog; vote = -10000; } - else if( primary && primary->hbinfo().opTime > hopeful->hbinfo().opTime ) { + else if( primary && primary->hbinfo().opTime >= hopeful->hbinfo().opTime ) { // other members might be aware of more up-to-date nodes log() << hopeful->fullName() << " is trying to elect itself but " << - primary->fullName() << " is already primary and more up-to-date" << rsLog; + primary->fullName() << " is already primary and more up-to-date" << rsLog; vote = -10000; } else { @@ -166,7 +166,7 @@ namespace mongo { rs.relinquish(); log() << "replSet info voting yea for " << whoid << rsLog; } - catch(VoteException&) { + catch(VoteException&) { log() << "replSet voting no already voted for another" << rsLog; } } @@ -182,10 +182,10 @@ namespace mongo { L.push_back( Target(m->fullName()) ); } - /* config version is returned as it is ok to use this unlocked. BUT, if unlocked, you would need + /* config version is returned as it is ok to use this unlocked. BUT, if unlocked, you would need to check later that the config didn't change. */ void ReplSetImpl::getTargets(list<Target>& L, int& configVersion) { - if( lockedByMe() ) { + if( lockedByMe() ) { _getTargets(L, configVersion); return; } @@ -200,15 +200,21 @@ namespace mongo { bool Consensus::weAreFreshest(bool& allUp, int& nTies) { const OpTime ord = theReplSet->lastOpTimeWritten; nTies = 0; - assert( !ord.isNull() ); + assert( !ord.isNull() ); BSONObj cmd = BSON( - "replSetFresh" << 1 << - "set" << rs.name() << - "opTime" << Date_t(ord.asDate()) << - "who" << rs._self->fullName() << - "cfgver" << rs._cfg->version ); + "replSetFresh" << 1 << + "set" << rs.name() << + "opTime" << Date_t(ord.asDate()) << + "who" << rs._self->fullName() << + "cfgver" << rs._cfg->version ); list<Target> L; int ver; + /* the following queries arbiters, even though they are never fresh. wonder if that makes sense. + it doesn't, but it could, if they "know" what freshness it one day. so consider removing + arbiters from getTargets() here. although getTargets is used elsewhere for elections; there + arbiters are certainly targets - so a "includeArbs" bool would be necessary if we want to make + not fetching them herein happen. + */ rs.getTargets(L, ver); multiCommand(cmd, L); int nok = 0; @@ -228,25 +234,25 @@ namespace mongo { allUp = false; } } - DEV log() << "replSet dev we are freshest of up nodes, nok:" << nok << " nTies:" << nTies << rsLog; + log(1) << "replSet dev we are freshest of up nodes, nok:" << nok << " nTies:" << nTies << rsLog; assert( ord <= theReplSet->lastOpTimeWritten ); // <= as this may change while we are working... return true; } extern time_t started; - void Consensus::multiCommand(BSONObj cmd, list<Target>& L) { + void Consensus::multiCommand(BSONObj cmd, list<Target>& L) { assert( !rs.lockedByMe() ); mongo::multiCommand(cmd, L); } void Consensus::_electSelf() { - if( time(0) < steppedDown ) + if( time(0) < steppedDown ) return; { const OpTime ord = theReplSet->lastOpTimeWritten; - if( ord == 0 ) { + if( ord == 0 ) { log() << "replSet info not trying to elect self, do not yet have a complete set of data from any point in time" << rsLog; return; } @@ -254,16 +260,16 @@ namespace mongo { bool allUp; int nTies; - if( !weAreFreshest(allUp, nTies) ) { + if( !weAreFreshest(allUp, nTies) ) { log() << "replSet info not electing self, we are not freshest" << rsLog; return; } rs.sethbmsg("",9); - if( !allUp && time(0) - started < 60 * 5 ) { - /* the idea here is that if a bunch of nodes bounce all at once, we don't want to drop data - if we don't have to -- we'd rather be offline and wait a little longer instead + if( !allUp && time(0) - started < 60 * 5 ) { + /* the idea here is that if a bunch of nodes bounce all at once, we don't want to drop data + if we don't have to -- we'd rather be offline and wait a little longer instead todo: make this configurable. */ rs.sethbmsg("not electing self, not all members up and we have been up less than 5 minutes"); @@ -276,9 +282,10 @@ namespace mongo { /* tie? we then randomly sleep to try to not collide on our voting. */ /* todo: smarter. */ if( me.id() == 0 || sleptLast ) { - // would be fine for one node not to sleep + // would be fine for one node not to sleep // todo: biggest / highest priority nodes should be the ones that get to not sleep - } else { + } + else { assert( !rs.lockedByMe() ); // bad to go to sleep locked unsigned ms = ((unsigned) rand()) % 1000 + 50; DEV log() << "replSet tie " << nTies << " sleeping a little " << ms << "ms" << rsLog; @@ -297,13 +304,13 @@ namespace mongo { log() << "replSet info electSelf " << meid << rsLog; BSONObj electCmd = BSON( - "replSetElect" << 1 << - "set" << rs.name() << - "who" << me.fullName() << - "whoid" << me.hbinfo().id() << - "cfgver" << rs._cfg->version << - "round" << OID::gen() /* this is just for diagnostics */ - ); + "replSetElect" << 1 << + "set" << rs.name() << + "who" << me.fullName() << + "whoid" << me.hbinfo().id() << + "cfgver" << rs._cfg->version << + "round" << OID::gen() /* this is just for diagnostics */ + ); int configVersion; list<Target> L; @@ -326,7 +333,7 @@ namespace mongo { // defensive; should never happen as we have timeouts on connection and operation for our conn log() << "replSet too much time passed during our election, ignoring result" << rsLog; } - else if( configVersion != rs.config().version ) { + else if( configVersion != rs.config().version ) { log() << "replSet config version changed during our election, ignoring result" << rsLog; } else { @@ -334,9 +341,10 @@ namespace mongo { log(1) << "replSet election succeeded, assuming primary role" << rsLog; success = true; rs.assumePrimary(); - } + } } - } catch( std::exception& ) { + } + catch( std::exception& ) { if( !success ) electionFailed(meid); throw; } @@ -347,19 +355,19 @@ namespace mongo { assert( !rs.lockedByMe() ); assert( !rs.myConfig().arbiterOnly ); assert( rs.myConfig().slaveDelay == 0 ); - try { - _electSelf(); - } - catch(RetryAfterSleepException&) { + try { + _electSelf(); + } + catch(RetryAfterSleepException&) { throw; } - catch(VoteException& ) { + catch(VoteException& ) { log() << "replSet not trying to elect self as responded yea to someone else recently" << rsLog; } - catch(DBException& e) { + catch(DBException& e) { log() << "replSet warning caught unexpected exception in electSelf() " << e.toString() << rsLog; } - catch(...) { + catch(...) { log() << "replSet warning caught unexpected exception in electSelf()" << rsLog; } } diff --git a/db/repl/health.cpp b/db/repl/health.cpp index c75221c..762ca90 100644 --- a/db/repl/health.cpp +++ b/db/repl/health.cpp @@ -32,20 +32,22 @@ #include "../dbhelpers.h" namespace mongo { + /* decls for connections.h */ - ScopedConn::M& ScopedConn::_map = *(new ScopedConn::M()); + ScopedConn::M& ScopedConn::_map = *(new ScopedConn::M()); mutex ScopedConn::mapMutex("ScopedConn::mapMutex"); } -namespace mongo { +namespace mongo { using namespace mongoutils::html; using namespace bson; static RamLog _rsLog; Tee *rsLog = &_rsLog; + extern bool replSetBlind; - string ago(time_t t) { + string ago(time_t t) { if( t == 0 ) return ""; time_t x = time(0) - t; @@ -58,14 +60,14 @@ namespace mongo { s.precision(2); s << x / 60.0 << " mins"; } - else { + else { s.precision(2); s << x / 3600.0 << " hrs"; } return s.str(); } - void Member::summarizeMember(stringstream& s) const { + void Member::summarizeMember(stringstream& s) const { s << tr(); { stringstream u; @@ -89,27 +91,29 @@ namespace mongo { s << td(h); } s << td(config().votes); - { + s << td(config().priority); + { string stateText = state().toString(); if( _config.hidden ) stateText += " (hidden)"; - if( ok || stateText.empty() ) + if( ok || stateText.empty() ) s << td(stateText); // text blank if we've never connected else s << td( grey(str::stream() << "(was " << state().toString() << ')', true) ); } s << td( grey(hbinfo().lastHeartbeatMsg,!ok) ); stringstream q; - q << "/_replSetOplog?" << id(); + q << "/_replSetOplog?_id=" << id(); s << td( a(q.str(), "", never ? "?" : hbinfo().opTime.toString()) ); if( hbinfo().skew > INT_MIN ) { s << td( grey(str::stream() << hbinfo().skew,!ok) ); - } else + } + else s << td(""); s << _tr(); } - - string ReplSetImpl::stateAsHtml(MemberState s) { + + string ReplSetImpl::stateAsHtml(MemberState s) { if( s.s == MemberState::RS_STARTUP ) return a("", "serving still starting up, or still trying to initiate the set", "STARTUP"); if( s.s == MemberState::RS_PRIMARY ) return a("", "this server thinks it is primary", "PRIMARY"); if( s.s == MemberState::RS_SECONDARY ) return a("", "this server thinks it is a secondary (slave mode)", "SECONDARY"); @@ -122,7 +126,7 @@ namespace mongo { return ""; } - string MemberState::toString() const { + string MemberState::toString() const { if( s == MemberState::RS_STARTUP ) return "STARTUP"; if( s == MemberState::RS_PRIMARY ) return "PRIMARY"; if( s == MemberState::RS_SECONDARY ) return "SECONDARY"; @@ -143,9 +147,9 @@ namespace mongo { set<string> skip; be e = op["ts"]; - if( e.type() == Date || e.type() == Timestamp ) { + if( e.type() == Date || e.type() == Timestamp ) { OpTime ot = e._opTime(); - ss << td( time_t_to_String_short( ot.getSecs() ) ); + ss << td( time_t_to_String_short( ot.getSecs() ) ); ss << td( ot.toString() ); skip.insert("ts"); } @@ -155,7 +159,8 @@ namespace mongo { if( e.type() == NumberLong ) { ss << "<td>" << hex << e.Long() << "</td>\n"; skip.insert("h"); - } else + } + else ss << td("?"); ss << td(op["op"].valuestrsafe()); @@ -164,20 +169,17 @@ namespace mongo { skip.insert("ns"); ss << "<td>"; - for( bo::iterator i(op); i.more(); ) { + for( bo::iterator i(op); i.more(); ) { be e = i.next(); if( skip.count(e.fieldName()) ) continue; ss << e.toString() << ' '; } - ss << "</td>"; - - ss << "</tr>"; - ss << '\n'; + ss << "</td></tr>\n"; } - void ReplSetImpl::_getOplogDiagsAsHtml(unsigned server_id, stringstream& ss) const { + void ReplSetImpl::_getOplogDiagsAsHtml(unsigned server_id, stringstream& ss) const { const Member *m = findById(server_id); - if( m == 0 ) { + if( m == 0 ) { ss << "Error : can't find a member with id: " << server_id << '\n'; return; } @@ -187,21 +189,29 @@ namespace mongo { //const bo fields = BSON( "o" << false << "o2" << false ); const bo fields; - ScopedDbConnection conn(m->fullName()); + /** todo fix we might want an so timeout here */ + DBClientConnection conn(false, 0, /*timeout*/ 20); + { + string errmsg; + if( !conn.connect(m->fullName(), errmsg) ) { + ss << "couldn't connect to " << m->fullName() << ' ' << errmsg; + return; + } + } - auto_ptr<DBClientCursor> c = conn->query(rsoplog, Query().sort("$natural",1), 20, 0, &fields); - if( c.get() == 0 ) { + auto_ptr<DBClientCursor> c = conn.query(rsoplog, Query().sort("$natural",1), 20, 0, &fields); + if( c.get() == 0 ) { ss << "couldn't query " << rsoplog; return; } static const char *h[] = {"ts","optime", "h","op","ns","rest",0}; ss << "<style type=\"text/css\" media=\"screen\">" - "table { font-size:75% }\n" + "table { font-size:75% }\n" // "th { background-color:#bbb; color:#000 }\n" // "td,th { padding:.25em }\n" - "</style>\n"; - + "</style>\n"; + ss << table(h, true); //ss << "<pre>\n"; int n = 0; @@ -211,17 +221,17 @@ namespace mongo { while( c->more() ) { bo o = c->next(); otLast = o["ts"]._opTime(); - if( otFirst.isNull() ) + if( otFirst.isNull() ) otFirst = otLast; say(ss, o); - n++; + n++; } if( n == 0 ) { ss << rsoplog << " is empty\n"; } - else { - auto_ptr<DBClientCursor> c = conn->query(rsoplog, Query().sort("$natural",-1), 20, 0, &fields); - if( c.get() == 0 ) { + else { + auto_ptr<DBClientCursor> c = conn.query(rsoplog, Query().sort("$natural",-1), 20, 0, &fields); + if( c.get() == 0 ) { ss << "couldn't query [2] " << rsoplog; return; } @@ -230,7 +240,7 @@ namespace mongo { otEnd = o["ts"]._opTime(); while( 1 ) { stringstream z; - if( o["ts"]._opTime() == otLast ) + if( o["ts"]._opTime() == otLast ) break; say(z, o); x = z.str() + x; @@ -253,32 +263,31 @@ namespace mongo { ss.precision(3); if( h < 72 ) ss << h << " hours"; - else + else ss << h / 24.0 << " days"; ss << "</p>\n"; } - - conn.done(); } - void ReplSetImpl::_summarizeAsHtml(stringstream& s) const { + void ReplSetImpl::_summarizeAsHtml(stringstream& s) const { s << table(0, false); s << tr("Set name:", _name); s << tr("Majority up:", elect.aMajoritySeemsToBeUp()?"yes":"no" ); s << _table(); - const char *h[] = {"Member", - "<a title=\"member id in the replset config\">id</a>", - "Up", - "<a title=\"length of time we have been continuously connected to the other member with no reconnects (for self, shows uptime)\">cctime</a>", - "<a title=\"when this server last received a heartbeat response - includes error code responses\">Last heartbeat</a>", - "Votes", "State", "Status", - "<a title=\"how up to date this server is. this value polled every few seconds so actually lag is typically much lower than value shown here.\">optime</a>", - "<a title=\"Clock skew in seconds relative to this server. Informational; server clock variances will make the diagnostics hard to read, but otherwise are benign..\">skew</a>", - 0}; + const char *h[] = {"Member", + "<a title=\"member id in the replset config\">id</a>", + "Up", + "<a title=\"length of time we have been continuously connected to the other member with no reconnects (for self, shows uptime)\">cctime</a>", + "<a title=\"when this server last received a heartbeat response - includes error code responses\">Last heartbeat</a>", + "Votes", "Priority", "State", "Messages", + "<a title=\"how up to date this server is. this value polled every few seconds so actually lag is typically much lower than value shown here.\">optime</a>", + "<a title=\"Clock skew in seconds relative to this server. Informational; server clock variances will make the diagnostics hard to read, but otherwise are benign..\">skew</a>", + 0 + }; s << table(h); - /* this is to sort the member rows by their ordinal _id, so they show up in the same + /* this is to sort the member rows by their ordinal _id, so they show up in the same order on all the different web ui's; that is less confusing for the operator. */ map<int,string> mp; @@ -287,13 +296,13 @@ namespace mongo { readlocktry lk("local.replset.minvalid", 300); if( lk.got() ) { BSONObj mv; - if( Helpers::getSingleton("local.replset.minvalid", mv) ) { + if( Helpers::getSingleton("local.replset.minvalid", mv) ) { myMinValid = "minvalid:" + mv["ts"]._opTime().toString(); } } else myMinValid = "."; } - catch(...) { + catch(...) { myMinValid = "exception fetching minvalid"; } @@ -301,25 +310,26 @@ namespace mongo { stringstream s; /* self row */ s << tr() << td(_self->fullName() + " (me)") << - td(_self->id()) << - td("1") << //up - td(ago(started)) << - td("") << // last heartbeat - td(ToString(_self->config().votes)) << - td( stateAsHtml(box.getState()) + (_self->config().hidden?" (hidden)":"") ); + td(_self->id()) << + td("1") << //up + td(ago(started)) << + td("") << // last heartbeat + td(ToString(_self->config().votes)) << + td(ToString(_self->config().priority)) << + td( stateAsHtml(box.getState()) + (_self->config().hidden?" (hidden)":"") ); s << td( _hbmsg ); stringstream q; - q << "/_replSetOplog?" << _self->id(); + q << "/_replSetOplog?_id=" << _self->id(); s << td( a(q.str(), myMinValid, theReplSet->lastOpTimeWritten.toString()) ); s << td(""); // skew s << _tr(); - mp[_self->hbinfo().id()] = s.str(); + mp[_self->hbinfo().id()] = s.str(); } Member *m = head(); while( m ) { - stringstream s; + stringstream s; m->summarizeMember(s); - mp[m->hbinfo().id()] = s.str(); + mp[m->hbinfo().id()] = s.str(); m = m->next(); } @@ -333,26 +343,27 @@ namespace mongo { _rsLog.toHTML( s ); } - const Member* ReplSetImpl::findById(unsigned id) const { + const Member* ReplSetImpl::findById(unsigned id) const { if( id == _self->id() ) return _self; for( Member *m = head(); m; m = m->next() ) - if( m->id() == id ) + if( m->id() == id ) return m; return 0; } - void ReplSetImpl::_summarizeStatus(BSONObjBuilder& b) const { + void ReplSetImpl::_summarizeStatus(BSONObjBuilder& b) const { vector<BSONObj> v; // add self { - HostAndPort h(getHostName(), cmdLine.port); - BSONObjBuilder bb; bb.append("_id", (int) _self->id()); - bb.append("name", h.toString()); + bb.append("name", _self->fullName()); bb.append("health", 1.0); bb.append("state", (int) box.getState().s); + bb.append("stateStr", box.getState().toString()); + bb.appendTimestamp("optime", lastOpTimeWritten.asDate()); + bb.appendDate("optimeDate", lastOpTimeWritten.getSecs() * 1000LL); string s = _self->lhb(); if( !s.empty() ) bb.append("errmsg", s); @@ -365,9 +376,19 @@ namespace mongo { BSONObjBuilder bb; bb.append("_id", (int) m->id()); bb.append("name", m->fullName()); - bb.append("health", m->hbinfo().health); + double h = m->hbinfo().health; + bb.append("health", h); bb.append("state", (int) m->state().s); + if( h == 0 ) { + // if we can't connect the state info is from the past and could be confusing to show + bb.append("stateStr", "(not reachable/healthy)"); + } + else { + bb.append("stateStr", m->state().toString()); + } bb.append("uptime", (unsigned) (m->hbinfo().upSince ? (time(0)-m->hbinfo().upSince) : 0)); + bb.appendTimestamp("optime", m->hbinfo().opTime.asDate()); + bb.appendDate("optimeDate", m->hbinfo().opTime.getSecs() * 1000LL); bb.appendTimeT("lastHeartbeat", m->hbinfo().lastHeartbeat); string s = m->lhb(); if( !s.empty() ) @@ -380,10 +401,12 @@ namespace mongo { b.appendTimeT("date", time(0)); b.append("myState", box.getState().s); b.append("members", v); + if( replSetBlind ) + b.append("blind",true); // to avoid confusion if set...normally never set except for testing. } - static struct Test : public UnitTest { - void run() { + static struct Test : public UnitTest { + void run() { HealthOptions a,b; assert( a == b ); assert( a.isDefault() ); diff --git a/db/repl/health.h b/db/repl/health.h index 645a3b5..a32db00 100644 --- a/db/repl/health.h +++ b/db/repl/health.h @@ -23,8 +23,8 @@ namespace mongo { /* throws */ bool requestHeartbeat(string setname, string fromHost, string memberFullName, BSONObj& result, int myConfigVersion, int& theirConfigVersion, bool checkEmpty = false); - struct HealthOptions { - HealthOptions() { + struct HealthOptions { + HealthOptions() { heartbeatSleepMillis = 2000; heartbeatTimeoutMillis = 10000; heartbeatConnRetries = 2; @@ -42,8 +42,8 @@ namespace mongo { uassert(13113, "bad replset heartbeat option", heartbeatTimeoutMillis >= 10); } - bool operator==(const HealthOptions& r) const { - return heartbeatSleepMillis==r.heartbeatSleepMillis && heartbeatTimeoutMillis==r.heartbeatTimeoutMillis && heartbeatConnRetries==heartbeatConnRetries; + bool operator==(const HealthOptions& r) const { + return heartbeatSleepMillis==r.heartbeatSleepMillis && heartbeatTimeoutMillis==r.heartbeatTimeoutMillis && heartbeatConnRetries==heartbeatConnRetries; } }; diff --git a/db/repl/heartbeat.cpp b/db/repl/heartbeat.cpp index b39fad7..3972466 100644 --- a/db/repl/heartbeat.cpp +++ b/db/repl/heartbeat.cpp @@ -31,7 +31,7 @@ #include "../../util/unittest.h" #include "../instance.h" -namespace mongo { +namespace mongo { using namespace bson; @@ -42,7 +42,7 @@ namespace mongo { long long HeartbeatInfo::timeDown() const { if( up() ) return 0; - if( downSince == 0 ) + if( downSince == 0 ) return 0; // still waiting on first heartbeat return jsTime() - downSince; } @@ -53,10 +53,10 @@ namespace mongo { virtual bool adminOnly() const { return false; } CmdReplSetHeartbeat() : ReplSetCommand("replSetHeartbeat") { } virtual bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl) { - if( replSetBlind ) + if( replSetBlind ) return false; - /* we don't call ReplSetCommand::check() here because heartbeat + /* we don't call ReplSetCommand::check() here because heartbeat checks many things that are pre-initialization. */ if( !replSet ) { errmsg = "not running with --replSet"; @@ -65,12 +65,12 @@ namespace mongo { /* we want to keep heartbeat connections open when relinquishing primary. tag them here. */ { - MessagingPort *mp = cc()._mp; - if( mp ) + MessagingPort *mp = cc().port(); + if( mp ) mp->tag |= 1; } - if( cmdObj["pv"].Int() != 1 ) { + if( cmdObj["pv"].Int() != 1 ) { errmsg = "incompatible replset protocol version"; return false; } @@ -86,7 +86,7 @@ namespace mongo { } result.append("rs", true); - if( cmdObj["checkEmpty"].trueValue() ) { + if( cmdObj["checkEmpty"].trueValue() ) { result.append("hasData", replHasDatabases()); } if( theReplSet == 0 ) { @@ -98,7 +98,7 @@ namespace mongo { return false; } - if( theReplSet->name() != cmdObj.getStringField("replSetHeartbeat") ) { + if( theReplSet->name() != cmdObj.getStringField("replSetHeartbeat") ) { errmsg = "repl set names do not match (2)"; result.append("mismatch", true); return false; @@ -118,8 +118,8 @@ namespace mongo { } cmdReplSetHeartbeat; /* throws dbexception */ - bool requestHeartbeat(string setName, string from, string memberFullName, BSONObj& result, int myCfgVersion, int& theirCfgVersion, bool checkEmpty) { - if( replSetBlind ) { + bool requestHeartbeat(string setName, string from, string memberFullName, BSONObj& result, int myCfgVersion, int& theirCfgVersion, bool checkEmpty) { + if( replSetBlind ) { //sleepmillis( rand() ); return false; } @@ -144,8 +144,8 @@ namespace mongo { public: ReplSetHealthPollTask(const HostAndPort& hh, const HeartbeatInfo& mm) : h(hh), m(mm) { } - string name() { return "ReplSetHealthPollTask"; } - void doWork() { + string name() const { return "ReplSetHealthPollTask"; } + void doWork() { if ( !theReplSet ) { log(2) << "theReplSet not initialized yet, skipping health poll this round" << rsLog; return; @@ -153,7 +153,7 @@ namespace mongo { HeartbeatInfo mem = m; HeartbeatInfo old = mem; - try { + try { BSONObj info; int theirConfigVersion = -10000; @@ -163,15 +163,17 @@ namespace mongo { time_t after = mem.lastHeartbeat = time(0); // we set this on any response - we don't get this far if couldn't connect because exception is thrown - try { - mem.skew = 0; - long long t = info["time"].Long(); - if( t > after ) + if ( info["time"].isNumber() ) { + long long t = info["time"].numberLong(); + if( t > after ) mem.skew = (int) (t - after); - else if( t < before ) + else if( t < before ) mem.skew = (int) (t - before); // negative } - catch(...) { + else { + // it won't be there if remote hasn't initialized yet + if( info.hasElement("time") ) + warning() << "heatbeat.time isn't a number: " << info << endl; mem.skew = INT_MIN; } @@ -182,7 +184,7 @@ namespace mongo { } if( ok ) { if( mem.upSince == 0 ) { - log() << "replSet info " << h.toString() << " is now up" << rsLog; + log() << "replSet info " << h.toString() << " is up" << rsLog; mem.upSince = mem.lastHeartbeat; } mem.health = 1.0; @@ -193,17 +195,20 @@ namespace mongo { be cfg = info["config"]; if( cfg.ok() ) { // received a new config - boost::function<void()> f = + boost::function<void()> f = boost::bind(&Manager::msgReceivedNewConfig, theReplSet->mgr, cfg.Obj().copy()); theReplSet->mgr->send(f); } } - else { + else { down(mem, info.getStringField("errmsg")); } } - catch(...) { - down(mem, "connect/transport error"); + catch(DBException& e) { + down(mem, e.what()); + } + catch(...) { + down(mem, "something unusual went wrong"); } m = mem; @@ -212,9 +217,9 @@ namespace mongo { static time_t last = 0; time_t now = time(0); bool changed = mem.changed(old); - if( changed ) { - if( old.hbstate != mem.hbstate ) - log() << "replSet " << h.toString() << ' ' << mem.hbstate.toString() << rsLog; + if( changed ) { + if( old.hbstate != mem.hbstate ) + log() << "replSet member " << h.toString() << ' ' << mem.hbstate.toString() << rsLog; } if( changed || now-last>4 ) { last = now; @@ -228,18 +233,18 @@ namespace mongo { if( mem.upSince || mem.downSince == 0 ) { mem.upSince = 0; mem.downSince = jsTime(); - log() << "replSet info " << h.toString() << " is now down (or slow to respond)" << rsLog; + log() << "replSet info " << h.toString() << " is down (or slow to respond): " << msg << rsLog; } mem.lastHeartbeatMsg = msg; } }; - void ReplSetImpl::endOldHealthTasks() { + void ReplSetImpl::endOldHealthTasks() { unsigned sz = healthTasks.size(); for( set<ReplSetHealthPollTask*>::iterator i = healthTasks.begin(); i != healthTasks.end(); i++ ) (*i)->halt(); healthTasks.clear(); - if( sz ) + if( sz ) DEV log() << "replSet debug: cleared old tasks " << sz << endl; } @@ -251,8 +256,8 @@ namespace mongo { void startSyncThread(); - /** called during repl set startup. caller expects it to return fairly quickly. - note ReplSet object is only created once we get a config - so this won't run + /** called during repl set startup. caller expects it to return fairly quickly. + note ReplSet object is only created once we get a config - so this won't run until the initiation. */ void ReplSetImpl::startThreads() { diff --git a/db/repl/manager.cpp b/db/repl/manager.cpp index 862ac46..ed39c31 100644 --- a/db/repl/manager.cpp +++ b/db/repl/manager.cpp @@ -1,4 +1,4 @@ -/* @file manager.cpp +/* @file manager.cpp */ /** @@ -23,20 +23,20 @@ namespace mongo { - enum { + enum { NOPRIMARY = -2, SELFPRIMARY = -1 }; /* check members OTHER THAN US to see if they think they are primary */ - const Member * Manager::findOtherPrimary(bool& two) { + const Member * Manager::findOtherPrimary(bool& two) { two = false; Member *m = rs->head(); Member *p = 0; while( m ) { DEV assert( m != rs->_self ); if( m->state().primary() && m->hbinfo().up() ) { - if( p ) { + if( p ) { two = true; return 0; } @@ -44,33 +44,36 @@ namespace mongo { } m = m->next(); } - if( p ) + if( p ) noteARemoteIsPrimary(p); return p; } - Manager::Manager(ReplSetImpl *_rs) : - task::Server("rs Manager"), rs(_rs), busyWithElectSelf(false), _primary(NOPRIMARY) - { + Manager::Manager(ReplSetImpl *_rs) : + task::Server("rs Manager"), rs(_rs), busyWithElectSelf(false), _primary(NOPRIMARY) { } - - Manager::~Manager() { - log() << "ERROR: ~Manager should never be called" << rsLog; + + Manager::~Manager() { + /* we don't destroy the replset object we sit in; however, the destructor could have thrown on init. + the log message below is just a reminder to come back one day and review this code more, and to + make it cleaner. + */ + log() << "info: ~Manager called" << rsLog; rs->mgr = 0; - assert(false); } - void Manager::starting() { + void Manager::starting() { Client::initThread("rs Manager"); } - void Manager::noteARemoteIsPrimary(const Member *m) { + void Manager::noteARemoteIsPrimary(const Member *m) { if( rs->box.getPrimary() == m ) return; rs->_self->lhb() = ""; if( rs->iAmArbiterOnly() ) { rs->box.set(MemberState::RS_ARBITER, m); - } else { + } + else { rs->box.noteRemoteIsPrimary(m); } } @@ -87,9 +90,8 @@ namespace mongo { const Member *p = rs->box.getPrimary(); if( p && p != rs->_self ) { - if( !p->hbinfo().up() || - !p->hbinfo().hbstate.primary() ) - { + if( !p->hbinfo().up() || + !p->hbinfo().hbstate.primary() ) { p = 0; rs->box.setOtherPrimary(0); } @@ -101,36 +103,36 @@ namespace mongo { p2 = findOtherPrimary(two); if( two ) { /* two other nodes think they are primary (asynchronously polled) -- wait for things to settle down. */ - log() << "replSet warning DIAG two primaries (transiently)" << rsLog; + log() << "replSet info two primaries (transiently)" << rsLog; return; } } if( p2 ) { /* someone else thinks they are primary. */ - if( p == p2 ) { + if( p == p2 ) { // we thought the same; all set. return; } if( p == 0 ) { - noteARemoteIsPrimary(p2); + noteARemoteIsPrimary(p2); return; } // todo xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx if( p != rs->_self ) { // switch primary from oldremotep->newremotep2 - noteARemoteIsPrimary(p2); + noteARemoteIsPrimary(p2); return; } /* we thought we were primary, yet now someone else thinks they are. */ if( !rs->elect.aMajoritySeemsToBeUp() ) { /* we can't see a majority. so the other node is probably the right choice. */ - noteARemoteIsPrimary(p2); + noteARemoteIsPrimary(p2); return; } - /* ignore for now, keep thinking we are master. - this could just be timing (we poll every couple seconds) or could indicate - a problem? if it happens consistently for a duration of time we should + /* ignore for now, keep thinking we are master. + this could just be timing (we poll every couple seconds) or could indicate + a problem? if it happens consistently for a duration of time we should alert the sysadmin. */ return; @@ -138,17 +140,17 @@ namespace mongo { /* didn't find anyone who wants to be primary */ - if( p ) { + if( p ) { /* we are already primary */ - if( p != rs->_self ) { + if( p != rs->_self ) { rs->sethbmsg("error p != rs->self in checkNewState"); log() << "replSet " << p->fullName() << rsLog; log() << "replSet " << rs->_self->fullName() << rsLog; return; } - if( rs->elect.shouldRelinquish() ) { + if( rs->elect.shouldRelinquish() ) { log() << "replSet can't see a majority of the set, relinquishing primary" << rsLog; rs->relinquish(); } @@ -162,7 +164,7 @@ namespace mongo { /* TODO : CHECK PRIORITY HERE. can't be elected if priority zero. */ /* no one seems to be primary. shall we try to elect ourself? */ - if( !rs->elect.aMajoritySeemsToBeUp() ) { + if( !rs->elect.aMajoritySeemsToBeUp() ) { static time_t last; static int n; int ll = 0; @@ -175,15 +177,15 @@ namespace mongo { busyWithElectSelf = true; // don't try to do further elections & such while we are already working on one. } - try { - rs->elect.electSelf(); + try { + rs->elect.electSelf(); } catch(RetryAfterSleepException&) { /* we want to process new inbounds before trying this again. so we just put a checkNewstate in the queue for eval later. */ requeue(); } - catch(...) { - log() << "replSet error unexpected assertion in rs manager" << rsLog; + catch(...) { + log() << "replSet error unexpected assertion in rs manager" << rsLog; } busyWithElectSelf = false; } diff --git a/db/repl/multicmd.h b/db/repl/multicmd.h index 9eb9a17..df7c4e5 100644 --- a/db/repl/multicmd.h +++ b/db/repl/multicmd.h @@ -21,7 +21,7 @@ #include "../../util/background.h" #include "connections.h" -namespace mongo { +namespace mongo { struct Target { Target(string hostport) : toHost(hostport), ok(false) { } @@ -33,38 +33,37 @@ namespace mongo { /* -- implementation ------------- */ - class _MultiCommandJob : public BackgroundJob { + class _MultiCommandJob : public BackgroundJob { public: BSONObj& cmd; Target& d; _MultiCommandJob(BSONObj& _cmd, Target& _d) : cmd(_cmd), d(_d) { } + private: - string name() { return "MultiCommandJob"; } + string name() const { return "MultiCommandJob"; } void run() { - try { + try { ScopedConn c(d.toHost); d.ok = c.runCommand("admin", cmd, d.result); } - catch(DBException&) { + catch(DBException&) { DEV log() << "dev caught dbexception on multiCommand " << d.toHost << rsLog; } } }; - inline void multiCommand(BSONObj cmd, list<Target>& L) { - typedef shared_ptr<_MultiCommandJob> P; - list<P> jobs; - list<BackgroundJob *> _jobs; + inline void multiCommand(BSONObj cmd, list<Target>& L) { + list<BackgroundJob *> jobs; - for( list<Target>::iterator i = L.begin(); i != L.end(); i++ ) { + for( list<Target>::iterator i = L.begin(); i != L.end(); i++ ) { Target& d = *i; _MultiCommandJob *j = new _MultiCommandJob(cmd, d); - jobs.push_back(P(j)); - _jobs.push_back(j); + j->go(); + jobs.push_back(j); } - BackgroundJob::go(_jobs); - BackgroundJob::wait(_jobs,5); + for( list<BackgroundJob*>::iterator i = jobs.begin(); i != jobs.end(); i++ ) { + (*i)->wait(); + } } - } diff --git a/db/repl/replset_commands.cpp b/db/repl/replset_commands.cpp index 328b0ab..dc8567a 100644 --- a/db/repl/replset_commands.cpp +++ b/db/repl/replset_commands.cpp @@ -24,7 +24,9 @@ #include "../../util/mongoutils/html.h" #include "../../client/dbclient.h" -namespace mongo { +using namespace bson; + +namespace mongo { void checkMembersUpForConfigChange(const ReplSetConfig& cfg, bool initial); @@ -50,7 +52,7 @@ namespace mongo { } // may not need this, but if removed check all tests still work: - if( !check(errmsg, result) ) + if( !check(errmsg, result) ) return false; if( cmdObj.hasElement("blind") ) { @@ -61,6 +63,7 @@ namespace mongo { } } cmdReplSetTest; + /** get rollback id */ class CmdReplSetGetRBID : public ReplSetCommand { public: /* todo: ideally this should only change on rollbacks NOT on mongod restarts also. fix... */ @@ -68,26 +71,28 @@ namespace mongo { virtual void help( stringstream &help ) const { help << "internal"; } - CmdReplSetGetRBID() : ReplSetCommand("replSetGetRBID") { + CmdReplSetGetRBID() : ReplSetCommand("replSetGetRBID") { rbid = (int) curTimeMillis(); } virtual bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl) { - if( !check(errmsg, result) ) + if( !check(errmsg, result) ) return false; result.append("rbid",rbid); return true; } } cmdReplSetRBID; - using namespace bson; - void incRBID() { + /** we increment the rollback id on every rollback event. */ + void incRBID() { cmdReplSetRBID.rbid++; } - int getRBID(DBClientConnection *c) { + + /** helper to get rollback id from another server. */ + int getRBID(DBClientConnection *c) { bo info; c->simpleCommand("admin", &info, "replSetGetRBID"); return info["rbid"].numberInt(); - } + } class CmdReplSetGetStatus : public ReplSetCommand { public: @@ -98,7 +103,10 @@ namespace mongo { } CmdReplSetGetStatus() : ReplSetCommand("replSetGetStatus", true) { } virtual bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl) { - if( !check(errmsg, result) ) + if ( cmdObj["forShell"].trueValue() ) + lastError.disableForCommand(); + + if( !check(errmsg, result) ) return false; theReplSet->summarizeStatus(result); return true; @@ -115,7 +123,7 @@ namespace mongo { } CmdReplSetReconfig() : ReplSetCommand("replSetReconfig"), mutex("rsreconfig") { } virtual bool run(const string& a, BSONObj& b, string& errmsg, BSONObjBuilder& c, bool d) { - try { + try { rwlock_try_write lk(mutex); return _run(a,b,errmsg,c,d); } @@ -125,16 +133,16 @@ namespace mongo { } private: bool _run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl) { - if( !check(errmsg, result) ) + if( !check(errmsg, result) ) return false; - if( !theReplSet->box.getState().primary() ) { + if( !theReplSet->box.getState().primary() ) { errmsg = "replSetReconfig command must be sent to the current replica set primary."; return false; } { - // just make sure we can get a write lock before doing anything else. we'll reacquire one - // later. of course it could be stuck then, but this check lowers the risk if weird things + // just make sure we can get a write lock before doing anything else. we'll reacquire one + // later. of course it could be stuck then, but this check lowers the risk if weird things // are up - we probably don't want a change to apply 30 minutes after the initial attempt. time_t t = time(0); writelock lk(""); @@ -159,7 +167,7 @@ namespace mongo { log() << "replSet replSetReconfig config object parses ok, " << newConfig.members.size() << " members specified" << rsLog; - if( !ReplSetConfig::legalChange(theReplSet->getConfig(), newConfig, errmsg) ) { + if( !ReplSetConfig::legalChange(theReplSet->getConfig(), newConfig, errmsg) ) { return false; } @@ -170,7 +178,7 @@ namespace mongo { theReplSet->haveNewConfig(newConfig, true); ReplSet::startupStatusMsg = "replSetReconfig'd"; } - catch( DBException& e ) { + catch( DBException& e ) { log() << "replSet replSetReconfig exception: " << e.what() << rsLog; throw; } @@ -182,8 +190,11 @@ namespace mongo { class CmdReplSetFreeze : public ReplSetCommand { public: virtual void help( stringstream &help ) const { - help << "Enable / disable failover for the set - locks current primary as primary even if issues occur.\nFor use during system maintenance.\n"; - help << "{ replSetFreeze : <bool> }"; + help << "{ replSetFreeze : <seconds> }"; + help << "'freeze' state of member to the extent we can do that. What this really means is that\n"; + help << "this node will not attempt to become primary until the time period specified expires.\n"; + help << "You can call again with {replSetFreeze:0} to unfreeze sooner.\n"; + help << "A process restart unfreezes the member also.\n"; help << "\nhttp://www.mongodb.org/display/DOCS/Replica+Set+Commands"; } @@ -191,15 +202,22 @@ namespace mongo { virtual bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl) { if( !check(errmsg, result) ) return false; - errmsg = "not yet implemented"; /*TODO*/ - return false; + int secs = (int) cmdObj.firstElement().numberInt(); + if( theReplSet->freeze(secs) ) { + if( secs == 0 ) + result.append("info","unfreezing"); + } + if( secs == 1 ) + result.append("warning", "you really want to freeze for only 1 second?"); + return true; } } cmdReplSetFreeze; class CmdReplSetStepDown: public ReplSetCommand { public: virtual void help( stringstream &help ) const { - help << "Step down as primary. Will not try to reelect self or 1 minute.\n"; + help << "{ replSetStepDown : <seconds> }\n"; + help << "Step down as primary. Will not try to reelect self for the specified time period (1 minute if no numeric secs value specified).\n"; help << "(If another member with same priority takes over in the meantime, it will stay primary.)\n"; help << "http://www.mongodb.org/display/DOCS/Replica+Set+Commands"; } @@ -212,7 +230,10 @@ namespace mongo { errmsg = "not primary so can't step down"; return false; } - return theReplSet->stepDown(); + int secs = (int) cmdObj.firstElement().numberInt(); + if( secs == 0 ) + secs = 60; + return theReplSet->stepDown(secs); } } cmdReplSetStepDown; @@ -222,45 +243,46 @@ namespace mongo { class ReplSetHandler : public DbWebHandler { public: - ReplSetHandler() : DbWebHandler( "_replSet" , 1 , true ){} + ReplSetHandler() : DbWebHandler( "_replSet" , 1 , true ) {} virtual bool handles( const string& url ) const { return startsWith( url , "/_replSet" ); } - virtual void handle( const char *rq, string url, + virtual void handle( const char *rq, string url, BSONObj params, string& responseMsg, int& responseCode, - vector<string>& headers, const SockAddr &from ){ - - string s = str::after(url, "/_replSetOplog?"); - if( !s.empty() ) - responseMsg = _replSetOplog(s); + vector<string>& headers, const SockAddr &from ) { + + if( url == "/_replSetOplog" ) { + responseMsg = _replSetOplog(params); + } else responseMsg = _replSet(); responseCode = 200; } + string _replSetOplog(bo parms) { + int _id = (int) str::toUnsigned( parms["_id"].String() ); - string _replSetOplog(string parms) { stringstream s; string t = "Replication oplog"; s << start(t); s << p(t); - if( theReplSet == 0 ) { - if( cmdLine._replSet.empty() ) + if( theReplSet == 0 ) { + if( cmdLine._replSet.empty() ) s << p("Not using --replSet"); else { - s << p("Still starting up, or else set is not yet " + a("http://www.mongodb.org/display/DOCS/Replica+Set+Configuration#InitialSetup", "", "initiated") + s << p("Still starting up, or else set is not yet " + a("http://www.mongodb.org/display/DOCS/Replica+Set+Configuration#InitialSetup", "", "initiated") + ".<br>" + ReplSet::startupStatusMsg); } } else { try { - theReplSet->getOplogDiagsAsHtml(stringToNum(parms.c_str()), s); + theReplSet->getOplogDiagsAsHtml(_id, s); } - catch(std::exception& e) { - s << "error querying oplog: " << e.what() << '\n'; + catch(std::exception& e) { + s << "error querying oplog: " << e.what() << '\n'; } } @@ -269,20 +291,20 @@ namespace mongo { } /* /_replSet show replica set status in html format */ - string _replSet() { + string _replSet() { stringstream s; s << start("Replica Set Status " + prettyHostName()); - s << p( a("/", "back", "Home") + " | " + + s << p( a("/", "back", "Home") + " | " + a("/local/system.replset/?html=1", "", "View Replset Config") + " | " + - a("/replSetGetStatus?text", "", "replSetGetStatus") + " | " + + a("/replSetGetStatus?text=1", "", "replSetGetStatus") + " | " + a("http://www.mongodb.org/display/DOCS/Replica+Sets", "", "Docs") ); - if( theReplSet == 0 ) { - if( cmdLine._replSet.empty() ) + if( theReplSet == 0 ) { + if( cmdLine._replSet.empty() ) s << p("Not using --replSet"); else { - s << p("Still starting up, or else set is not yet " + a("http://www.mongodb.org/display/DOCS/Replica+Set+Configuration#InitialSetup", "", "initiated") + s << p("Still starting up, or else set is not yet " + a("http://www.mongodb.org/display/DOCS/Replica+Set+Configuration#InitialSetup", "", "initiated") + ".<br>" + ReplSet::startupStatusMsg); } } diff --git a/db/repl/rs.cpp b/db/repl/rs.cpp index 1c0444a..90ed9f4 100644 --- a/db/repl/rs.cpp +++ b/db/repl/rs.cpp @@ -20,9 +20,12 @@ #include "../client.h" #include "../../client/dbclient.h" #include "../dbhelpers.h" +#include "../../s/d_logic.h" #include "rs.h" +#include "connections.h" +#include "../repl.h" -namespace mongo { +namespace mongo { using namespace bson; @@ -30,18 +33,18 @@ namespace mongo { ReplSet *theReplSet = 0; extern string *discoveredSeed; - void ReplSetImpl::sethbmsg(string s, int logLevel) { + void ReplSetImpl::sethbmsg(string s, int logLevel) { static time_t lastLogged; _hbmsgTime = time(0); - if( s == _hbmsg ) { + if( s == _hbmsg ) { // unchanged if( _hbmsgTime - lastLogged < 60 ) return; } unsigned sz = s.size(); - if( sz >= 256 ) + if( sz >= 256 ) memcpy(_hbmsg, s.c_str(), 255); else { _hbmsg[sz] = 0; @@ -53,7 +56,7 @@ namespace mongo { } } - void ReplSetImpl::assumePrimary() { + void ReplSetImpl::assumePrimary() { assert( iAmPotentiallyHot() ); writelock lk("admin."); // so we are synchronized with _logOp() box.setSelfPrimary(_self); @@ -62,17 +65,26 @@ namespace mongo { void ReplSetImpl::changeState(MemberState s) { box.change(s, _self); } - void ReplSetImpl::relinquish() { + const bool closeOnRelinquish = true; + + void ReplSetImpl::relinquish() { if( box.getState().primary() ) { log() << "replSet relinquishing primary state" << rsLog; - changeState(MemberState::RS_RECOVERING); - - /* close sockets that were talking to us */ - /*log() << "replSet closing sockets after reqlinquishing primary" << rsLog; - MessagingPort::closeAllSockets(1);*/ + changeState(MemberState::RS_SECONDARY); + + if( closeOnRelinquish ) { + /* close sockets that were talking to us so they don't blithly send many writes that will fail + with "not master" (of course client could check result code, but in case they are not) + */ + log() << "replSet closing client sockets after reqlinquishing primary" << rsLog; + MessagingPort::closeAllSockets(1); + } + + // now that all connections were closed, strip this mongod from all sharding details + // if and when it gets promoted to a primary again, only then it should reload the sharding state + // the rationale here is that this mongod won't bring stale state when it regains primaryhood + shardingState.resetShardingState(); - // todo: > - //changeState(MemberState::RS_SECONDARY); } else if( box.getState().startup2() ) { // ? add comment @@ -81,26 +93,48 @@ namespace mongo { } /* look freshly for who is primary - includes relinquishing ourself. */ - void ReplSetImpl::forgetPrimary() { - if( box.getState().primary() ) + void ReplSetImpl::forgetPrimary() { + if( box.getState().primary() ) relinquish(); else { box.setOtherPrimary(0); } } - bool ReplSetImpl::_stepDown() { + // for the replSetStepDown command + bool ReplSetImpl::_stepDown(int secs) { lock lk(this); - if( box.getState().primary() ) { - changeState(MemberState::RS_RECOVERING); - elect.steppedDown = time(0) + 60; - log() << "replSet info stepped down as primary" << rsLog; + if( box.getState().primary() ) { + elect.steppedDown = time(0) + secs; + log() << "replSet info stepping down as primary secs=" << secs << rsLog; + relinquish(); return true; } return false; } - void ReplSetImpl::msgUpdateHBInfo(HeartbeatInfo h) { + bool ReplSetImpl::_freeze(int secs) { + lock lk(this); + /* note if we are primary we remain primary but won't try to elect ourself again until + this time period expires. + */ + if( secs == 0 ) { + elect.steppedDown = 0; + log() << "replSet info 'unfreezing'" << rsLog; + } + else { + if( !box.getState().primary() ) { + elect.steppedDown = time(0) + secs; + log() << "replSet info 'freezing' for " << secs << " seconds" << rsLog; + } + else { + log() << "replSet info received freeze command but we are primary" << rsLog; + } + } + return true; + } + + void ReplSetImpl::msgUpdateHBInfo(HeartbeatInfo h) { for( Member *m = _members.head(); m; m=m->next() ) { if( m->id() == h.id() ) { m->_hbinfo = h; @@ -109,7 +143,7 @@ namespace mongo { } } - list<HostAndPort> ReplSetImpl::memberHostnames() const { + list<HostAndPort> ReplSetImpl::memberHostnames() const { list<HostAndPort> L; L.push_back(_self->h()); for( Member *m = _members.head(); m; m = m->next() ) @@ -118,6 +152,7 @@ namespace mongo { } void ReplSetImpl::_fillIsMasterHost(const Member *m, vector<string>& hosts, vector<string>& passives, vector<string>& arbiters) { + assert( m ); if( m->config().hidden ) return; @@ -126,8 +161,9 @@ namespace mongo { } else if( !m->config().arbiterOnly ) { if( m->config().slaveDelay ) { - /* hmmm - we don't list these as they are stale. */ - } else { + /* hmmm - we don't list these as they are stale. */ + } + else { passives.push_back(m->h().toString()); } } @@ -147,6 +183,7 @@ namespace mongo { _fillIsMasterHost(_self, hosts, passives, arbiters); for( Member *m = _members.head(); m; m = m->next() ) { + assert( m ); _fillIsMasterHost(m, hosts, passives, arbiters); } @@ -161,23 +198,27 @@ namespace mongo { } } - if( !isp ) { + if( !isp ) { const Member *m = sp.primary; if( m ) b.append("primary", m->h().toString()); } if( myConfig().arbiterOnly ) b.append("arbiterOnly", true); + if( myConfig().priority == 0 ) + b.append("passive", true); if( myConfig().slaveDelay ) b.append("slaveDelay", myConfig().slaveDelay); if( myConfig().hidden ) b.append("hidden", true); + if( !myConfig().buildIndexes ) + b.append("buildIndexes", false); } /** @param cfgString <setname>/<seedhost1>,<seedhost2> */ - void parseReplsetCmdLine(string cfgString, string& setname, vector<HostAndPort>& seeds, set<HostAndPort>& seedSet ) { - const char *p = cfgString.c_str(); + void parseReplsetCmdLine(string cfgString, string& setname, vector<HostAndPort>& seeds, set<HostAndPort>& seedSet ) { + const char *p = cfgString.c_str(); const char *slash = strchr(p, '/'); if( slash ) setname = string(p, slash-p); @@ -207,7 +248,8 @@ namespace mongo { //uassert(13101, "can't use localhost in replset host list", !m.isLocalHost()); if( m.isSelf() ) { log(1) << "replSet ignoring seed " << m.toString() << " (=self)" << rsLog; - } else + } + else seeds.push_back(m); if( *comma == 0 ) break; @@ -216,10 +258,9 @@ namespace mongo { } } - ReplSetImpl::ReplSetImpl(ReplSetCmdline& replSetCmdline) : elect(this), - _self(0), - mgr( new Manager(this) ) - { + ReplSetImpl::ReplSetImpl(ReplSetCmdline& replSetCmdline) : elect(this), + _self(0), + mgr( new Manager(this) ) { _cfg = 0; memset(_hbmsg, 0, sizeof(_hbmsg)); *_hbmsg = '.'; // temp...just to see @@ -240,20 +281,21 @@ namespace mongo { } for( set<HostAndPort>::iterator i = replSetCmdline.seedSet.begin(); i != replSetCmdline.seedSet.end(); i++ ) { if( i->isSelf() ) { - if( sss == 1 ) + if( sss == 1 ) log(1) << "replSet warning self is listed in the seed list and there are no other seeds listed did you intend that?" << rsLog; - } else + } + else log() << "replSet warning command line seed " << i->toString() << " is not present in the current repl set config" << rsLog; } } void newReplUp(); - void ReplSetImpl::loadLastOpTimeWritten() { + void ReplSetImpl::loadLastOpTimeWritten() { //assert( lastOpTimeWritten.isNull() ); readlock lk(rsoplog); BSONObj o; - if( Helpers::getLast(rsoplog, o) ) { + if( Helpers::getLast(rsoplog, o) ) { lastH = o["h"].numberLong(); lastOpTimeWritten = o["ts"]._opTime(); uassert(13290, "bad replSet oplog entry?", !lastOpTimeWritten.isNull()); @@ -261,11 +303,11 @@ namespace mongo { } /* call after constructing to start - returns fairly quickly after launching its threads */ - void ReplSetImpl::_go() { - try { + void ReplSetImpl::_go() { + try { loadLastOpTimeWritten(); } - catch(std::exception& e) { + catch(std::exception& e) { log() << "replSet error fatal couldn't query the local " << rsoplog << " collection. Terminating mongod after 30 seconds." << rsLog; log() << e.what() << rsLog; sleepsecs(30); @@ -283,11 +325,17 @@ namespace mongo { extern BSONObj *getLastErrorDefault; + void ReplSetImpl::setSelfTo(Member *m) { + _self = m; + if( m ) _buildIndexes = m->config().buildIndexes; + else _buildIndexes = true; + } + /** @param reconf true if this is a reconfiguration and not an initial load of the configuration. @return true if ok; throws if config really bad; false if config doesn't include self */ bool ReplSetImpl::initFromConfig(ReplSetConfig& c, bool reconf) { - /* NOTE: haveNewConfig() writes the new config to disk before we get here. So + /* NOTE: haveNewConfig() writes the new config to disk before we get here. So we cannot error out at this point, except fatally. Check errors earlier. */ lock lk(this); @@ -302,25 +350,24 @@ namespace mongo { { unsigned nfound = 0; int me = 0; - for( vector<ReplSetConfig::MemberCfg>::iterator i = c.members.begin(); i != c.members.end(); i++ ) { + for( vector<ReplSetConfig::MemberCfg>::iterator i = c.members.begin(); i != c.members.end(); i++ ) { const ReplSetConfig::MemberCfg& m = *i; if( m.h.isSelf() ) { nfound++; me++; - if( !reconf || (_self && _self->id() == (unsigned) m._id) ) ; - else { + else { log() << "replSet " << _self->id() << ' ' << m._id << rsLog; assert(false); } } - else if( reconf ) { + else if( reconf ) { const Member *old = findById(m._id); - if( old ) { + if( old ) { nfound++; assert( (int) old->id() == m._id ); - if( old->config() == m ) { + if( old->config() == m ) { additive = false; } } @@ -328,16 +375,24 @@ namespace mongo { newOnes.push_back(&m); } } + + // change timeout settings, if necessary + ScopedConn conn(m.h.toString()); + conn.setTimeout(c.ho.heartbeatTimeoutMillis/1000.0); } if( me == 0 ) { + // initial startup with fastsync + if (!reconf && replSettings.fastsync) { + return false; + } // log() << "replSet config : " << _cfg->toString() << rsLog; - log() << "replSet error can't find self in the repl set configuration:" << rsLog; + log() << "replSet error self not present in the repl set configuration:" << rsLog; log() << c.toString() << rsLog; - assert(false); + uasserted(13497, "replSet error self not present in the configuration"); } uassert( 13302, "replSet error self appears twice in the repl set configuration", me<=1 ); - if( reconf && config().members.size() != nfound ) + if( reconf && config().members.size() != nfound ) additive = false; } @@ -347,14 +402,14 @@ namespace mongo { _name = _cfg->_id; assert( !_name.empty() ); - if( additive ) { + if( additive ) { log() << "replSet info : additive change to configuration" << rsLog; for( list<const ReplSetConfig::MemberCfg*>::const_iterator i = newOnes.begin(); i != newOnes.end(); i++ ) { const ReplSetConfig::MemberCfg* m = *i; Member *mi = new Member(m->h, m->_id, m, false); - /** we will indicate that new members are up() initially so that we don't relinquish our - primary state because we can't (transiently) see a majority. they should be up as we + /** we will indicate that new members are up() initially so that we don't relinquish our + primary state because we can't (transiently) see a majority. they should be up as we check that new members are up before getting here on reconfig anyway. */ mi->get_hbinfo().health = 0.1; @@ -373,20 +428,30 @@ namespace mongo { int oldPrimaryId = -1; { const Member *p = box.getPrimary(); - if( p ) + if( p ) oldPrimaryId = p->id(); } forgetPrimary(); - _self = 0; - for( vector<ReplSetConfig::MemberCfg>::iterator i = _cfg->members.begin(); i != _cfg->members.end(); i++ ) { + + bool iWasArbiterOnly = _self ? iAmArbiterOnly() : false; + setSelfTo(0); + for( vector<ReplSetConfig::MemberCfg>::iterator i = _cfg->members.begin(); i != _cfg->members.end(); i++ ) { const ReplSetConfig::MemberCfg& m = *i; Member *mi; if( m.h.isSelf() ) { assert( _self == 0 ); - mi = _self = new Member(m.h, m._id, &m, true); + mi = new Member(m.h, m._id, &m, true); + setSelfTo(mi); + + // if the arbiter status changed + if (iWasArbiterOnly ^ iAmArbiterOnly()) { + _changeArbiterState(); + } + if( (int)mi->id() == oldPrimaryId ) box.setSelfPrimary(mi); - } else { + } + else { mi = new Member(m.h, m._id, &m, false); _members.push(mi); startHealthTaskFor(mi); @@ -397,26 +462,57 @@ namespace mongo { return true; } + void startSyncThread(); + + void ReplSetImpl::_changeArbiterState() { + if (iAmArbiterOnly()) { + changeState(MemberState::RS_ARBITER); + + // if there is an oplog, free it + // not sure if this is necessary, maybe just leave the oplog and let + // the user delete it if they want the space? + writelock lk(rsoplog); + Client::Context c(rsoplog); + NamespaceDetails *d = nsdetails(rsoplog); + if (d) { + string errmsg; + bob res; + dropCollection(rsoplog, errmsg, res); + + // clear last op time to force initial sync (if the arbiter + // becomes a "normal" server again) + lastOpTimeWritten = OpTime(); + } + } + else { + changeState(MemberState::RS_RECOVERING); + + // oplog will be allocated when sync begins + /* TODO : could this cause two sync threads to exist (race condition)? */ + boost::thread t(startSyncThread); + } + } + // Our own config must be the first one. - bool ReplSetImpl::_loadConfigFinish(vector<ReplSetConfig>& cfgs) { + bool ReplSetImpl::_loadConfigFinish(vector<ReplSetConfig>& cfgs) { int v = -1; ReplSetConfig *highest = 0; int myVersion = -2000; int n = 0; - for( vector<ReplSetConfig>::iterator i = cfgs.begin(); i != cfgs.end(); i++ ) { + for( vector<ReplSetConfig>::iterator i = cfgs.begin(); i != cfgs.end(); i++ ) { ReplSetConfig& cfg = *i; if( ++n == 1 ) myVersion = cfg.version; - if( cfg.ok() && cfg.version > v ) { + if( cfg.ok() && cfg.version > v ) { highest = &cfg; v = cfg.version; } } assert( highest ); - if( !initFromConfig(*highest) ) + if( !initFromConfig(*highest) ) return false; - if( highest->version > myVersion && highest->version >= 0 ) { + if( highest->version > myVersion && highest->version >= 0 ) { log() << "replSet got config version " << highest->version << " from a remote, saving locally" << rsLog; writelock lk("admin."); highest->saveConfigLocally(BSONObj()); @@ -430,7 +526,7 @@ namespace mongo { startupStatusMsg = "loading " + rsConfigNs + " config (LOADINGCONFIG)"; try { vector<ReplSetConfig> configs; - try { + try { configs.push_back( ReplSetConfig(HostAndPort::me()) ); } catch(DBException& e) { @@ -438,26 +534,26 @@ namespace mongo { throw; } for( vector<HostAndPort>::const_iterator i = _seeds->begin(); i != _seeds->end(); i++ ) { - try { + try { configs.push_back( ReplSetConfig(*i) ); } - catch( DBException& e ) { + catch( DBException& e ) { log() << "replSet exception trying to load config from " << *i << " : " << e.toString() << rsLog; } } - if( discoveredSeed ) { + if( discoveredSeed ) { try { configs.push_back( ReplSetConfig(HostAndPort(*discoveredSeed)) ); } - catch( DBException& ) { + catch( DBException& ) { log(1) << "replSet exception trying to load config from discovered seed " << *discoveredSeed << rsLog; } } int nok = 0; int nempty = 0; - for( vector<ReplSetConfig>::iterator i = configs.begin(); i != configs.end(); i++ ) { + for( vector<ReplSetConfig>::iterator i = configs.begin(); i != configs.end(); i++ ) { if( i->ok() ) nok++; if( i->empty() ) @@ -469,7 +565,9 @@ namespace mongo { startupStatus = EMPTYCONFIG; startupStatusMsg = "can't get " + rsConfigNs + " config from self or any seed (EMPTYCONFIG)"; log() << "replSet can't get " << rsConfigNs << " config from self or any seed (EMPTYCONFIG)" << rsLog; - log(1) << "replSet have you ran replSetInitiate yet?" << rsLog; + static unsigned once; + if( ++once == 1 ) + log() << "replSet info you may need to run replSetInitiate -- rs.initiate() in the shell -- if that is not already done" << rsLog; if( _seeds->size() == 0 ) log(1) << "replSet info no seed hosts were specified on the --replSet command line" << rsLog; } @@ -483,13 +581,13 @@ namespace mongo { continue; } - if( !_loadConfigFinish(configs) ) { + if( !_loadConfigFinish(configs) ) { log() << "replSet info Couldn't load config yet. Sleeping 20sec and will try again." << rsLog; sleepsecs(20); continue; } } - catch(DBException& e) { + catch(DBException& e) { startupStatus = BADCONFIG; startupStatusMsg = "replSet error loading set config (BADCONFIG)"; log() << "replSet error loading configurations " << e.toString() << rsLog; @@ -504,30 +602,34 @@ namespace mongo { startupStatus = STARTED; } - void ReplSetImpl::_fatal() - { + void ReplSetImpl::_fatal() { //lock l(this); box.set(MemberState::RS_FATAL, 0); //sethbmsg("fatal error"); - log() << "replSet error fatal, stopping replication" << rsLog; + log() << "replSet error fatal, stopping replication" << rsLog; } - void ReplSet::haveNewConfig(ReplSetConfig& newConfig, bool addComment) { + void ReplSet::haveNewConfig(ReplSetConfig& newConfig, bool addComment) { lock l(this); // convention is to lock replset before taking the db rwlock writelock lk(""); bo comment; if( addComment ) comment = BSON( "msg" << "Reconfig set" << "version" << newConfig.version ); newConfig.saveConfigLocally(comment); - try { + try { initFromConfig(newConfig, true); log() << "replSet replSetReconfig new config saved locally" << rsLog; } - catch(DBException& e) { + catch(DBException& e) { + if( e.getCode() == 13497 /* removed from set */ ) { + cc().shutdown(); + dbexit( EXIT_CLEAN , "removed from replica set" ); // never returns + assert(0); + } log() << "replSet error unexpected exception in haveNewConfig() : " << e.toString() << rsLog; _fatal(); } - catch(...) { + catch(...) { log() << "replSet error unexpected exception in haveNewConfig()" << rsLog; _fatal(); } @@ -538,30 +640,33 @@ namespace mongo { ReplSetConfig c(o); if( c.version > rs->config().version ) theReplSet->haveNewConfig(c, false); - else { - log() << "replSet info msgReceivedNewConfig but version isn't higher " << - c.version << ' ' << rs->config().version << rsLog; + else { + log() << "replSet info msgReceivedNewConfig but version isn't higher " << + c.version << ' ' << rs->config().version << rsLog; } } - /* forked as a thread during startup - it can run quite a while looking for config. but once found, + /* forked as a thread during startup + it can run quite a while looking for config. but once found, a separate thread takes over as ReplSetImpl::Manager, and this thread terminates. */ void startReplSets(ReplSetCmdline *replSetCmdline) { Client::initThread("startReplSets"); - try { + try { assert( theReplSet == 0 ); if( replSetCmdline == 0 ) { assert(!replSet); return; } + if( !noauth ) { + cc().getAuthenticationInfo()->authorize("local"); + } (theReplSet = new ReplSet(*replSetCmdline))->go(); } - catch(std::exception& e) { + catch(std::exception& e) { log() << "replSet caught exception in startReplSets thread: " << e.what() << rsLog; - if( theReplSet ) + if( theReplSet ) theReplSet->fatal(); } cc().shutdown(); @@ -569,10 +674,9 @@ namespace mongo { } -namespace boost { +namespace boost { - void assertion_failed(char const * expr, char const * function, char const * file, long line) - { + void assertion_failed(char const * expr, char const * function, char const * file, long line) { mongo::log() << "boost assertion failure " << expr << ' ' << function << ' ' << file << ' ' << line << endl; } diff --git a/db/repl/rs.h b/db/repl/rs.h index 6c4d9a8..1419ad6 100644 --- a/db/repl/rs.h +++ b/db/repl/rs.h @@ -43,6 +43,7 @@ namespace mongo { class Member : public List1<Member>::Base { public: Member(HostAndPort h, unsigned ord, const ReplSetConfig::MemberCfg *c, bool self); + string fullName() const { return h().toString(); } const ReplSetConfig::MemberCfg& config() const { return _config; } const HeartbeatInfo& hbinfo() const { return _hbinfo; } @@ -51,10 +52,12 @@ namespace mongo { MemberState state() const { return _hbinfo.hbstate; } const HostAndPort& h() const { return _h; } unsigned id() const { return _hbinfo.id(); } + bool potentiallyHot() const { return _config.potentiallyHot(); } // not arbiter, not priority 0 void summarizeMember(stringstream& s) const; - friend class ReplSetImpl; + private: + friend class ReplSetImpl; const ReplSetConfig::MemberCfg _config; const HostAndPort _h; HeartbeatInfo _hbinfo; @@ -65,8 +68,8 @@ namespace mongo { bool busyWithElectSelf; int _primary; - /** @param two - if true two primaries were seen. this can happen transiently, in addition to our - polling being only occasional. in this case null is returned, but the caller should + /** @param two - if true two primaries were seen. this can happen transiently, in addition to our + polling being only occasional. in this case null is returned, but the caller should not assume primary itself in that situation. */ const Member* findOtherPrimary(bool& two); @@ -75,7 +78,7 @@ namespace mongo { virtual void starting(); public: Manager(ReplSetImpl *rs); - ~Manager(); + virtual ~Manager(); void msgReceivedNewConfig(BSONObj); void msgCheckNewState(); }; @@ -84,7 +87,7 @@ namespace mongo { class Consensus { ReplSetImpl &rs; - struct LastYea { + struct LastYea { LastYea() : when(0), who(0xffffffff) { } time_t when; unsigned who; @@ -96,12 +99,12 @@ namespace mongo { bool weAreFreshest(bool& allUp, int& nTies); bool sleptLast; // slept last elect() pass public: - Consensus(ReplSetImpl *t) : rs(*t) { + Consensus(ReplSetImpl *t) : rs(*t) { sleptLast = false; steppedDown = 0; } - /* if we've stepped down, this is when we are allowed to try to elect ourself again. + /* if we've stepped down, this is when we are allowed to try to elect ourself again. todo: handle possible weirdnesses at clock skews etc. */ time_t steppedDown; @@ -115,40 +118,40 @@ namespace mongo { }; /** most operations on a ReplSet object should be done while locked. that logic implemented here. */ - class RSBase : boost::noncopyable { + class RSBase : boost::noncopyable { public: const unsigned magic; void assertValid() { assert( magic == 0x12345677 ); } private: - mutex m; + mongo::mutex m; int _locked; ThreadLocalValue<bool> _lockedByMe; protected: RSBase() : magic(0x12345677), m("RSBase"), _locked(0) { } - ~RSBase() { + ~RSBase() { /* this can happen if we throw in the constructor; otherwise never happens. thus we log it as it is quite unusual. */ log() << "replSet ~RSBase called" << rsLog; } - class lock { + class lock { RSBase& rsbase; auto_ptr<scoped_lock> sl; public: - lock(RSBase* b) : rsbase(*b) { + lock(RSBase* b) : rsbase(*b) { if( rsbase._lockedByMe.get() ) return; // recursive is ok... sl.reset( new scoped_lock(rsbase.m) ); DEV assert(rsbase._locked == 0); - rsbase._locked++; + rsbase._locked++; rsbase._lockedByMe.set(true); } - ~lock() { + ~lock() { if( sl.get() ) { assert( rsbase._lockedByMe.get() ); DEV assert(rsbase._locked == 1); rsbase._lockedByMe.set(false); - rsbase._locked--; + rsbase._locked--; } } }; @@ -157,11 +160,11 @@ namespace mongo { /* for asserts */ bool locked() const { return _locked != 0; } - /* if true, is locked, and was locked by this thread. note if false, it could be in the lock or not for another + /* if true, is locked, and was locked by this thread. note if false, it could be in the lock or not for another just for asserts & such so we can make the contracts clear on who locks what when. we don't use these locks that frequently, so the little bit of overhead is fine. */ - bool lockedByMe() { return _lockedByMe.get(); } + bool lockedByMe() { return _lockedByMe.get(); } }; class ReplSetHealthPollTask; @@ -174,19 +177,19 @@ namespace mongo { MemberState state; const Member *primary; }; - const SP get() { + const SP get() { scoped_lock lk(m); return sp; } MemberState getState() const { return sp.state; } const Member* getPrimary() const { return sp.primary; } - void change(MemberState s, const Member *self) { + void change(MemberState s, const Member *self) { scoped_lock lk(m); - if( sp.state != s ) { + if( sp.state != s ) { log() << "replSet " << s.toString() << rsLog; } sp.state = s; - if( s.primary() ) { + if( s.primary() ) { sp.primary = self; } else { @@ -194,17 +197,17 @@ namespace mongo { sp.primary = 0; } } - void set(MemberState s, const Member *p) { + void set(MemberState s, const Member *p) { scoped_lock lk(m); sp.state = s; sp.primary = p; } void setSelfPrimary(const Member *self) { change(MemberState::RS_PRIMARY, self); } - void setOtherPrimary(const Member *mem) { + void setOtherPrimary(const Member *mem) { scoped_lock lk(m); assert( !sp.state.primary() ); sp.primary = mem; } - void noteRemoteIsPrimary(const Member *remote) { + void noteRemoteIsPrimary(const Member *remote) { scoped_lock lk(m); if( !sp.state.secondary() && !sp.state.fatal() ) sp.state = MemberState::RS_RECOVERING; @@ -212,10 +215,10 @@ namespace mongo { } StateBox() : m("StateBox") { } private: - mutex m; + mongo::mutex m; SP sp; }; - + void parseReplsetCmdLine(string cfgString, string& setname, vector<HostAndPort>& seeds, set<HostAndPort>& seedSet ); /** Parameter given to the --replSet command line option (parsed). @@ -230,15 +233,15 @@ namespace mongo { }; /* information about the entire repl set, such as the various servers in the set, and their state */ - /* note: We currently do not free mem when the set goes away - it is assumed the replset is a + /* note: We currently do not free mem when the set goes away - it is assumed the replset is a singleton and long lived. */ class ReplSetImpl : protected RSBase { public: /** info on our state if the replset isn't yet "up". for example, if we are pre-initiation. */ - enum StartupStatus { - PRESTART=0, LOADINGCONFIG=1, BADCONFIG=2, EMPTYCONFIG=3, - EMPTYUNREACHABLE=4, STARTED=5, SOON=6 + enum StartupStatus { + PRESTART=0, LOADINGCONFIG=1, BADCONFIG=2, EMPTYCONFIG=3, + EMPTYUNREACHABLE=4, STARTED=5, SOON=6 }; static StartupStatus startupStatus; static string startupStatusMsg; @@ -260,18 +263,21 @@ namespace mongo { void relinquish(); void forgetPrimary(); protected: - bool _stepDown(); + bool _stepDown(int secs); + bool _freeze(int secs); private: void assumePrimary(); void loadLastOpTimeWritten(); void changeState(MemberState s); + const Member* getMemberToSyncTo(); + void _changeArbiterState(); protected: // "heartbeat message" - // sent in requestHeartbeat respond in field "hbm" + // sent in requestHeartbeat respond in field "hbm" char _hbmsg[256]; // we change this unlocked, thus not an stl::string time_t _hbmsgTime; // when it was logged public: - void sethbmsg(string s, int logLevel = 0); + void sethbmsg(string s, int logLevel = 0); protected: bool initFromConfig(ReplSetConfig& c, bool reconf=false); // true if ok; throws if config really bad; false if config doesn't include self void _fillIsMaster(BSONObjBuilder&); @@ -281,7 +287,7 @@ namespace mongo { MemberState state() const { return box.getState(); } void _fatal(); void _getOplogDiagsAsHtml(unsigned server_id, stringstream& ss) const; - void _summarizeAsHtml(stringstream&) const; + void _summarizeAsHtml(stringstream&) const; void _summarizeStatus(BSONObjBuilder&) const; // for replSetGetStatus command /* throws exception if a problem initializing. */ @@ -295,7 +301,7 @@ namespace mongo { const vector<HostAndPort> *_seeds; ReplSetConfig *_cfg; - /** load our configuration from admin.replset. try seed machines too. + /** load our configuration from admin.replset. try seed machines too. @return true if ok; throws if config really bad; false if config doesn't include self */ bool _loadConfigFinish(vector<ReplSetConfig>& v); @@ -306,7 +312,9 @@ namespace mongo { bool iAmArbiterOnly() const { return myConfig().arbiterOnly; } bool iAmPotentiallyHot() const { return myConfig().potentiallyHot(); } protected: - Member *_self; + Member *_self; + bool _buildIndexes; // = _self->config().buildIndexes + void setSelfTo(Member *); // use this as it sets buildIndexes var private: List1<Member> _members; /* all members of the set EXCEPT self. */ @@ -330,7 +338,7 @@ namespace mongo { private: /* pulling data from primary related - see rs_sync.cpp */ - bool initialSyncOplogApplication(string hn, const Member *primary, OpTime applyGTE, OpTime minValid); + bool initialSyncOplogApplication(const Member *primary, OpTime applyGTE, OpTime minValid); void _syncDoInitialSync(); void syncDoInitialSync(); void _syncThread(); @@ -340,21 +348,29 @@ namespace mongo { unsigned _syncRollback(OplogReader& r); void syncRollback(OplogReader& r); void syncFixUp(HowToFixUp& h, OplogReader& r); + bool _getOplogReader(OplogReader& r, string& hn); + bool _isStale(OplogReader& r, const string& hn); public: void syncThread(); }; - class ReplSet : public ReplSetImpl { + class ReplSet : public ReplSetImpl { public: ReplSet(ReplSetCmdline& replSetCmdline) : ReplSetImpl(replSetCmdline) { } - bool stepDown() { return _stepDown(); } + // for the replSetStepDown command + bool stepDown(int secs) { return _stepDown(secs); } - string selfFullName() { + // for the replSetFreeze command + bool freeze(int secs) { return _freeze(secs); } + + string selfFullName() { lock lk(this); return _self->fullName(); } + bool buildIndexes() const { return _buildIndexes; } + /* call after constructing to start - returns fairly quickly after la[unching its threads */ void go() { _go(); } @@ -369,7 +385,7 @@ namespace mongo { void summarizeStatus(BSONObjBuilder& b) const { _summarizeStatus(b); } void fillIsMaster(BSONObjBuilder& b) { _fillIsMaster(b); } - /* we have a new config (reconfig) - apply it. + /* we have a new config (reconfig) - apply it. @param comment write a no-op comment to the oplog about it. only makes sense if one is primary and initiating the reconf. */ void haveNewConfig(ReplSetConfig& c, bool comment); @@ -380,16 +396,16 @@ namespace mongo { bool lockedByMe() { return RSBase::lockedByMe(); } // heartbeat msg to send to others; descriptive diagnostic info - string hbmsg() const { + string hbmsg() const { if( time(0)-_hbmsgTime > 120 ) return ""; - return _hbmsg; + return _hbmsg; } }; - /** base class for repl set commands. checks basic things such as in rs mode before the command + /** base class for repl set commands. checks basic things such as in rs mode before the command does its real work */ - class ReplSetCommand : public Command { + class ReplSetCommand : public Command { protected: ReplSetCommand(const char * s, bool show=false) : Command(s, show) { } virtual bool slaveOk() const { return true; } @@ -398,14 +414,14 @@ namespace mongo { virtual LockType locktype() const { return NONE; } virtual void help( stringstream &help ) const { help << "internal"; } bool check(string& errmsg, BSONObjBuilder& result) { - if( !replSet ) { + if( !replSet ) { errmsg = "not running with --replSet"; return false; } if( theReplSet == 0 ) { result.append("startupStatus", ReplSet::startupStatus); errmsg = ReplSet::startupStatusMsg.empty() ? "replset unknown error 2" : ReplSet::startupStatusMsg; - if( ReplSet::startupStatus == 3 ) + if( ReplSet::startupStatus == 3 ) result.append("info", "run rs.initiate(...) if not yet done for the set"); return false; } @@ -415,9 +431,8 @@ namespace mongo { /** inlines ----------------- */ - inline Member::Member(HostAndPort h, unsigned ord, const ReplSetConfig::MemberCfg *c, bool self) : - _config(*c), _h(h), _hbinfo(ord) - { + inline Member::Member(HostAndPort h, unsigned ord, const ReplSetConfig::MemberCfg *c, bool self) : + _config(*c), _h(h), _hbinfo(ord) { if( self ) _hbinfo.health = 1.0; } diff --git a/db/repl/rs_config.cpp b/db/repl/rs_config.cpp index 371507d..5998f51 100644 --- a/db/repl/rs_config.cpp +++ b/db/repl/rs_config.cpp @@ -27,11 +27,11 @@ using namespace bson; -namespace mongo { +namespace mongo { void logOpInitiate(const bo&); - void assertOnlyHas(BSONObj o, const set<string>& fields) { + void assertOnlyHas(BSONObj o, const set<string>& fields) { BSONObj::iterator i(o); while( i.more() ) { BSONElement e = i.next(); @@ -41,7 +41,7 @@ namespace mongo { } } - list<HostAndPort> ReplSetConfig::otherMemberHostnames() const { + list<HostAndPort> ReplSetConfig::otherMemberHostnames() const { list<HostAndPort> L; for( vector<MemberCfg>::const_iterator i = members.begin(); i != members.end(); i++ ) { if( !i->h.isSelf() ) @@ -49,12 +49,12 @@ namespace mongo { } return L; } - + /* comment MUST only be set when initiating the set by the initiator */ - void ReplSetConfig::saveConfigLocally(bo comment) { + void ReplSetConfig::saveConfigLocally(bo comment) { checkRsConfig(); log() << "replSet info saving a newer config version to local.system.replset" << rsLog; - { + { writelock lk(""); Client::Context cx( rsConfigNs ); cx.db()->flushFiles(true); @@ -70,21 +70,21 @@ namespace mongo { } DEV log() << "replSet saveConfigLocally done" << rsLog; } - - /*static*/ - /*void ReplSetConfig::receivedNewConfig(BSONObj cfg) { + + /*static*/ + /*void ReplSetConfig::receivedNewConfig(BSONObj cfg) { if( theReplSet ) return; // this is for initial setup only, so far. todo ReplSetConfig c(cfg); writelock lk("admin."); - if( theReplSet ) + if( theReplSet ) return; c.saveConfigLocally(bo()); }*/ - bo ReplSetConfig::MemberCfg::asBson() const { + bo ReplSetConfig::MemberCfg::asBson() const { bob b; b << "_id" << _id; b.append("host", h.toString()); @@ -93,18 +93,28 @@ namespace mongo { if( arbiterOnly ) b << "arbiterOnly" << true; if( slaveDelay ) b << "slaveDelay" << slaveDelay; if( hidden ) b << "hidden" << hidden; + if( !buildIndexes ) b << "buildIndexes" << buildIndexes; + if( !tags.empty() ) { + BSONArrayBuilder a; + for( set<string>::const_iterator i = tags.begin(); i != tags.end(); i++ ) + a.append(*i); + b.appendArray("tags", a.done()); + } + if( !initialSync.isEmpty() ) { + b << "initialSync" << initialSync; + } return b.obj(); } - bo ReplSetConfig::asBson() const { + bo ReplSetConfig::asBson() const { bob b; b.append("_id", _id).append("version", version); if( !ho.isDefault() || !getLastErrorDefaults.isEmpty() ) { bob settings; if( !ho.isDefault() ) - settings << "heartbeatConnRetries " << ho.heartbeatConnRetries << - "heartbeatSleep" << ho.heartbeatSleepMillis / 1000 << - "heartbeatTimeout" << ho.heartbeatTimeoutMillis / 1000; + settings << "heartbeatConnRetries " << ho.heartbeatConnRetries << + "heartbeatSleep" << ho.heartbeatSleepMillis / 1000.0 << + "heartbeatTimeout" << ho.heartbeatTimeoutMillis / 1000.0; if( !getLastErrorDefaults.isEmpty() ) settings << "getLastErrorDefaults" << getLastErrorDefaults; b << "settings" << settings.obj(); @@ -122,7 +132,7 @@ namespace mongo { uassert(13126, "bad Member config", expr); } - void ReplSetConfig::MemberCfg::check() const{ + void ReplSetConfig::MemberCfg::check() const { mchk(_id >= 0 && _id <= 255); mchk(priority >= 0 && priority <= 1000); mchk(votes >= 0 && votes <= 100); @@ -130,41 +140,80 @@ namespace mongo { uassert(13437, "slaveDelay requires priority be zero", slaveDelay == 0 || priority == 0); uassert(13438, "bad slaveDelay value", slaveDelay >= 0 && slaveDelay <= 3600 * 24 * 366); uassert(13439, "priority must be 0 when hidden=true", priority == 0 || !hidden); + uassert(13477, "priority must be 0 when buildIndexes=false", buildIndexes || priority == 0); + + if (!initialSync.isEmpty()) { + static const string legal[] = {"state", "name", "_id","optime"}; + static const set<string> legals(legal, legal + 4); + assertOnlyHas(initialSync, legals); + + if (initialSync.hasElement("state")) { + uassert(13525, "initialSync source state must be 1 or 2", + initialSync["state"].isNumber() && + (initialSync["state"].Number() == 1 || + initialSync["state"].Number() == 2)); + } + if (initialSync.hasElement("name")) { + uassert(13526, "initialSync source name must be a string", + initialSync["name"].type() == mongo::String); + } + if (initialSync.hasElement("_id")) { + uassert(13527, "initialSync source _id must be a number", + initialSync["_id"].isNumber()); + } + if (initialSync.hasElement("optime")) { + uassert(13528, "initialSync source optime must be a timestamp", + initialSync["optime"].type() == mongo::Timestamp || + initialSync["optime"].type() == mongo::Date); + } + } } /** @param o old config - @param n new config + @param n new config */ - /*static*/ bool ReplSetConfig::legalChange(const ReplSetConfig& o, const ReplSetConfig& n, string& errmsg) { + /*static*/ + bool ReplSetConfig::legalChange(const ReplSetConfig& o, const ReplSetConfig& n, string& errmsg) { assert( theReplSet ); - if( o._id != n._id ) { - errmsg = "set name may not change"; + if( o._id != n._id ) { + errmsg = "set name may not change"; return false; } /* TODO : wonder if we need to allow o.version < n.version only, which is more lenient. - if someone had some intermediate config this node doesnt have, that could be + if someone had some intermediate config this node doesnt have, that could be necessary. but then how did we become primary? so perhaps we are fine as-is. */ - if( o.version + 1 != n.version ) { + if( o.version + 1 != n.version ) { errmsg = "version number wrong"; return false; } map<HostAndPort,const ReplSetConfig::MemberCfg*> old; - for( vector<ReplSetConfig::MemberCfg>::const_iterator i = o.members.begin(); i != o.members.end(); i++ ) { + for( vector<ReplSetConfig::MemberCfg>::const_iterator i = o.members.begin(); i != o.members.end(); i++ ) { old[i->h] = &(*i); } int me = 0; - for( vector<ReplSetConfig::MemberCfg>::const_iterator i = n.members.begin(); i != n.members.end(); i++ ) { + for( vector<ReplSetConfig::MemberCfg>::const_iterator i = n.members.begin(); i != n.members.end(); i++ ) { const ReplSetConfig::MemberCfg& m = *i; - if( old.count(m.h) ) { - if( old[m.h]->_id != m._id ) { + if( old.count(m.h) ) { + const ReplSetConfig::MemberCfg& oldCfg = *old[m.h]; + if( oldCfg._id != m._id ) { log() << "replSet reconfig error with member: " << m.h.toString() << rsLog; uasserted(13432, "_id may not change for members"); } + if( oldCfg.buildIndexes != m.buildIndexes ) { + log() << "replSet reconfig error with member: " << m.h.toString() << rsLog; + uasserted(13476, "buildIndexes may not change for members"); + } + /* are transitions to and from arbiterOnly guaranteed safe? if not, we should disallow here. + there is a test at replsets/replsetarb3.js */ + if( oldCfg.arbiterOnly != m.arbiterOnly ) { + log() << "replSet reconfig error with member: " << m.h.toString() << " arbiterOnly cannot change. remove and readd the member instead " << rsLog; + uasserted(13510, "arbiterOnly may not change for members"); + } } - if( m.h.isSelf() ) + if( m.h.isSelf() ) me++; } @@ -172,24 +221,33 @@ namespace mongo { /* TODO : MORE CHECKS HERE */ - log() << "replSet TODO : don't allow removal of a node until we handle it at the removed node end?" << endl; + DEV log() << "replSet TODO : don't allow removal of a node until we handle it at the removed node end?" << endl; // we could change its votes to zero perhaps instead as a short term... return true; } - void ReplSetConfig::clear() { + void ReplSetConfig::clear() { version = -5; _ok = false; } - void ReplSetConfig::checkRsConfig() const { + void ReplSetConfig::checkRsConfig() const { uassert(13132, - "nonmatching repl set name in _id field; check --replSet command line", - _id == cmdLine.ourSetName()); + "nonmatching repl set name in _id field; check --replSet command line", + _id == cmdLine.ourSetName()); uassert(13308, "replSet bad config version #", version > 0); uassert(13133, "replSet bad config no members", members.size() >= 1); - uassert(13309, "replSet bad config maximum number of members is 7 (for now)", members.size() <= 7); + uassert(13309, "replSet bad config maximum number of members is 12", members.size() <= 12); + { + unsigned voters = 0; + for( vector<MemberCfg>::const_iterator i = members.begin(); i != members.end(); ++i ) { + if( i->votes ) + voters++; + } + uassert(13612, "replSet bad config maximum number of voting members is 7", voters <= 7); + uassert(13613, "replSet bad config no voting members", voters > 0); + } } void ReplSetConfig::from(BSONObj o) { @@ -213,7 +271,8 @@ namespace mongo { if( settings["heartbeatTimeout"].ok() ) ho.heartbeatTimeoutMillis = (unsigned) (settings["heartbeatTimeout"].Number() * 1000); ho.check(); - try { getLastErrorDefaults = settings["getLastErrorDefaults"].Obj().copy(); } catch(...) { } + try { getLastErrorDefaults = settings["getLastErrorDefaults"].Obj().copy(); } + catch(...) { } } set<string> hosts; @@ -231,43 +290,57 @@ namespace mongo { BSONObj mobj = members[i].Obj(); MemberCfg m; try { - static const string legal[] = {"_id","votes","priority","host","hidden","slaveDelay","arbiterOnly"}; - static const set<string> legals(legal, legal + 7); + static const string legal[] = { + "_id","votes","priority","host", "hidden","slaveDelay", + "arbiterOnly","buildIndexes","tags","initialSync" + }; + static const set<string> legals(legal, legal + 10); assertOnlyHas(mobj, legals); - try { + try { m._id = (int) mobj["_id"].Number(); - } catch(...) { + } + catch(...) { /* TODO: use of string exceptions may be problematic for reconfig case! */ - throw "_id must be numeric"; + throw "_id must be numeric"; } string s; try { s = mobj["host"].String(); m.h = HostAndPort(s); } - catch(...) { + catch(...) { throw string("bad or missing host field? ") + mobj.toString(); } - if( m.h.isLocalHost() ) + if( m.h.isLocalHost() ) localhosts++; m.arbiterOnly = mobj.getBoolField("arbiterOnly"); m.slaveDelay = mobj["slaveDelay"].numberInt(); if( mobj.hasElement("hidden") ) m.hidden = mobj.getBoolField("hidden"); + if( mobj.hasElement("buildIndexes") ) + m.buildIndexes = mobj.getBoolField("buildIndexes"); if( mobj.hasElement("priority") ) m.priority = mobj["priority"].Number(); if( mobj.hasElement("votes") ) m.votes = (unsigned) mobj["votes"].Number(); + if( mobj.hasElement("tags") ) { + vector<BSONElement> v = mobj["tags"].Array(); + for( unsigned i = 0; i < v.size(); i++ ) + m.tags.insert( v[i].String() ); + } + if( mobj.hasElement("initialSync")) { + m.initialSync = mobj["initialSync"].Obj().getOwned(); + } m.check(); } - catch( const char * p ) { + catch( const char * p ) { log() << "replSet cfg parsing exception for members[" << i << "] " << p << rsLog; stringstream ss; ss << "replSet members[" << i << "] " << p; uassert(13107, ss.str(), false); } - catch(DBException& e) { + catch(DBException& e) { log() << "replSet cfg parsing exception for members[" << i << "] " << e.what() << rsLog; stringstream ss; ss << "bad config for member[" << i << "] " << e.what(); @@ -289,7 +362,7 @@ namespace mongo { uassert(13122, "bad repl set config?", expr); } - ReplSetConfig::ReplSetConfig(BSONObj cfg) { + ReplSetConfig::ReplSetConfig(BSONObj cfg) { clear(); from(cfg); configAssert( version < 0 /*unspecified*/ || (version >= 1 && version <= 5000) ); @@ -315,18 +388,19 @@ namespace mongo { BSONObj cmd = BSON( "replSetHeartbeat" << setname ); int theirVersion; BSONObj info; + log() << "trying to contact " << h.toString() << rsLog; bool ok = requestHeartbeat(setname, "", h.toString(), info, -2, theirVersion); - if( info["rs"].trueValue() ) { + if( info["rs"].trueValue() ) { // yes, it is a replicate set, although perhaps not yet initialized } else { if( !ok ) { log() << "replSet TEMP !ok heartbeating " << h.toString() << " on cfg load" << rsLog; - if( !info.isEmpty() ) + if( !info.isEmpty() ) log() << "replSet info " << h.toString() << " : " << info.toString() << rsLog; return; } - { + { stringstream ss; ss << "replSet error: member " << h.toString() << " is not in --replSet mode"; msgassertedNoTrace(13260, ss.str().c_str()); // not caught as not a user exception - we want it not caught @@ -343,7 +417,7 @@ namespace mongo { cfg = conn.findOne(rsConfigNs, Query()).getOwned(); count = conn.count(rsConfigNs); } - catch ( DBException& e) { + catch ( DBException& ) { if ( !h.isSelf() ) { throw; } @@ -356,14 +430,14 @@ namespace mongo { if( count > 1 ) uasserted(13109, str::stream() << "multiple rows in " << rsConfigNs << " not supported host: " << h.toString()); - + if( cfg.isEmpty() ) { version = EMPTYCONFIG; return; } version = -1; } - catch( DBException& e) { + catch( DBException& e) { version = v; log(level) << "replSet load config couldn't get from " << h.toString() << ' ' << e.what() << rsLog; return; diff --git a/db/repl/rs_config.h b/db/repl/rs_config.h index e39dad7..7d43fe6 100644 --- a/db/repl/rs_config.h +++ b/db/repl/rs_config.h @@ -23,7 +23,7 @@ #include "../../util/hostandport.h" #include "health.h" -namespace mongo { +namespace mongo { /* singleton config object is stored here */ const string rsConfigNs = "local.system.replset"; @@ -31,7 +31,7 @@ namespace mongo { class ReplSetConfig { enum { EMPTYCONFIG = -2 }; public: - /* if something is misconfigured, throws an exception. + /* if something is misconfigured, throws an exception. if couldn't be queried or is just blank, ok() will be false. */ ReplSetConfig(const HostAndPort& h); @@ -41,7 +41,7 @@ namespace mongo { bool ok() const { return _ok; } struct MemberCfg { - MemberCfg() : _id(-1), votes(1), priority(1.0), arbiterOnly(false), slaveDelay(0), hidden(false) { } + MemberCfg() : _id(-1), votes(1), priority(1.0), arbiterOnly(false), slaveDelay(0), hidden(false), buildIndexes(true) { } int _id; /* ordinal */ unsigned votes; /* how many votes this node gets. default 1. */ HostAndPort h; @@ -49,15 +49,17 @@ namespace mongo { bool arbiterOnly; int slaveDelay; /* seconds. int rather than unsigned for convenient to/front bson conversion. */ bool hidden; /* if set, don't advertise to drives in isMaster. for non-primaries (priority 0) */ + bool buildIndexes; /* if false, do not create any non-_id indexes */ + set<string> tags; /* tagging for data center, rack, etc. */ + BSONObj initialSync; /* directions for initial sync source */ void check() const; /* check validity, assert if not. */ BSONObj asBson() const; - bool potentiallyHot() const { - return !arbiterOnly && priority > 0; - } - bool operator==(const MemberCfg& r) const { - return _id==r._id && votes == r.votes && h == r.h && priority == r.priority && - arbiterOnly == r.arbiterOnly && slaveDelay == r.slaveDelay && hidden == r.hidden; + bool potentiallyHot() const { return !arbiterOnly && priority > 0; } + bool operator==(const MemberCfg& r) const { + return _id==r._id && votes == r.votes && h == r.h && priority == r.priority && + arbiterOnly == r.arbiterOnly && slaveDelay == r.slaveDelay && hidden == r.hidden && + buildIndexes == buildIndexes; } bool operator!=(const MemberCfg& r) const { return !(*this == r); } }; diff --git a/db/repl/rs_exception.h b/db/repl/rs_exception.h index e71cad2..fc372fc 100755..100644 --- a/db/repl/rs_exception.h +++ b/db/repl/rs_exception.h @@ -1,15 +1,15 @@ -// @file rs_exception.h
-
-#pragma once
-
-namespace mongo {
-
- class VoteException : public std::exception { +// @file rs_exception.h + +#pragma once + +namespace mongo { + + class VoteException : public std::exception { public: - const char * what() const throw () { return "VoteException"; }
+ const char * what() const throw () { return "VoteException"; } }; - class RetryAfterSleepException : public std::exception { + class RetryAfterSleepException : public std::exception { public: const char * what() const throw () { return "RetryAfterSleepException"; } }; diff --git a/db/repl/rs_initialsync.cpp b/db/repl/rs_initialsync.cpp index 3851c66..5a54059 100644 --- a/db/repl/rs_initialsync.cpp +++ b/db/repl/rs_initialsync.cpp @@ -15,6 +15,7 @@ */ #include "pch.h" +#include "../repl.h" #include "../client.h" #include "../../client/dbclient.h" #include "rs.h" @@ -33,15 +34,17 @@ namespace mongo { // add try/catch with sleep - void isyncassert(const char *msg, bool expr) { - if( !expr ) { + void isyncassert(const char *msg, bool expr) { + if( !expr ) { string m = str::stream() << "initial sync " << msg; theReplSet->sethbmsg(m, 0); uasserted(13404, m); } } - void ReplSetImpl::syncDoInitialSync() { + void ReplSetImpl::syncDoInitialSync() { + createOplog(); + while( 1 ) { try { _syncDoInitialSync(); @@ -54,14 +57,14 @@ namespace mongo { } } - bool cloneFrom(const char *masterHost, string& errmsg, const string& fromdb, bool logForReplication, - bool slaveOk, bool useReplAuth, bool snapshot); + bool cloneFrom(const char *masterHost, string& errmsg, const string& fromdb, bool logForReplication, + bool slaveOk, bool useReplAuth, bool snapshot); /* todo : progress metering to sethbmsg. */ static bool clone(const char *master, string db) { string err; - return cloneFrom(master, err, db, false, - /*slaveok later can be true*/ false, true, false); + return cloneFrom(master, err, db, false, + /* slave_ok */ true, true, false); } void _logOpObjRS(const BSONObj& op); @@ -71,11 +74,11 @@ namespace mongo { static void emptyOplog() { writelock lk(rsoplog); Client::Context ctx(rsoplog); - NamespaceDetails *d = nsdetails(rsoplog); + NamespaceDetails *d = nsdetails(rsoplog); - // temp - if( d && d->nrecords == 0 ) - return; // already empty, ok. + // temp + if( d && d->stats.nrecords == 0 ) + return; // already empty, ok. log(1) << "replSet empty oplog" << rsLog; d->emptyCappedCollection(rsoplog); @@ -84,10 +87,10 @@ namespace mongo { string errmsg; bob res; dropCollection(rsoplog, errmsg, res); - log() << "replSet recreated oplog so it is empty. todo optimize this..." << rsLog; - createOplog();*/ + log() << "replSet recreated oplog so it is empty. todo optimize this..." << rsLog; + createOplog();*/ - // TEMP: restart to recreate empty oplog + // TEMP: restart to recreate empty oplog //log() << "replSet FATAL error during initial sync. mongod restart required." << rsLog; //dbexit( EXIT_CLEAN ); @@ -100,106 +103,182 @@ namespace mongo { */ } - void ReplSetImpl::_syncDoInitialSync() { - sethbmsg("initial sync pending",0); + /** + * Choose a member to sync from. + * + * The initalSync option is an object with 1 k/v pair: + * + * "state" : 1|2 + * "name" : "host" + * "_id" : N + * "optime" : t + * + * All except optime are exact matches. "optime" will find a secondary with + * an optime >= to the optime given. + */ + const Member* ReplSetImpl::getMemberToSyncTo() { + BSONObj sync = myConfig().initialSync; + bool secondaryOnly = false, isOpTime = false; + char *name = 0; + int id = -1; + OpTime optime; StateBox::SP sp = box.get(); assert( !sp.state.primary() ); // wouldn't make sense if we were. - const Member *cp = sp.primary; - if( cp == 0 ) { - sethbmsg("initial sync need a member to be primary",0); + // if it exists, we've already checked that these fields are valid in + // rs_config.cpp + if ( !sync.isEmpty() ) { + if (sync.hasElement("state")) { + if (sync["state"].Number() == 1) { + if (sp.primary) { + sethbmsg( str::stream() << "syncing to primary: " << sp.primary->fullName(), 0); + return const_cast<Member*>(sp.primary); + } + else { + sethbmsg("couldn't clone from primary"); + return NULL; + } + } + else { + secondaryOnly = true; + } + } + if (sync.hasElement("name")) { + name = (char*)sync["name"].valuestr(); + } + if (sync.hasElement("_id")) { + id = (int)sync["_id"].Number(); + } + if (sync.hasElement("optime")) { + isOpTime = true; + optime = sync["optime"]._opTime(); + } + } + + for( Member *m = head(); m; m = m->next() ) { + if (!m->hbinfo().up() || + (m->state() != MemberState::RS_SECONDARY && + m->state() != MemberState::RS_PRIMARY) || + (secondaryOnly && m->state() != MemberState::RS_SECONDARY) || + (id != -1 && (int)m->id() != id) || + (name != 0 && strcmp(name, m->fullName().c_str()) != 0) || + (isOpTime && optime >= m->hbinfo().opTime)) { + continue; + } + + sethbmsg( str::stream() << "syncing to: " << m->fullName(), 0); + return const_cast<Member*>(m); + } + + sethbmsg( str::stream() << "couldn't find a member matching the sync criteria: " << + "\nstate? " << (secondaryOnly ? "2" : "none") << + "\nname? " << (name ? name : "none") << + "\n_id? " << id << + "\noptime? " << optime.toStringPretty() ); + + return NULL; + } + + /** + * Do the initial sync for this member. + */ + void ReplSetImpl::_syncDoInitialSync() { + sethbmsg("initial sync pending",0); + + const Member *source = getMemberToSyncTo(); + if (!source) { + sethbmsg("initial sync need a member to be primary or secondary to do our initial sync", 0); sleepsecs(15); return; } - string masterHostname = cp->h().toString(); + string sourceHostname = source->h().toString(); OplogReader r; - if( !r.connect(masterHostname) ) { - sethbmsg( str::stream() << "initial sync couldn't connect to " << cp->h().toString() , 0); + if( !r.connect(sourceHostname) ) { + sethbmsg( str::stream() << "initial sync couldn't connect to " << source->h().toString() , 0); sleepsecs(15); return; } BSONObj lastOp = r.getLastOp(rsoplog); - if( lastOp.isEmpty() ) { + if( lastOp.isEmpty() ) { sethbmsg("initial sync couldn't read remote oplog", 0); sleepsecs(15); return; } OpTime startingTS = lastOp["ts"]._opTime(); - - { - /* make sure things aren't too flappy */ - sleepsecs(5); - isyncassert( "flapping?", box.getPrimary() == cp ); - BSONObj o = r.getLastOp(rsoplog); - isyncassert( "flapping [2]?", !o.isEmpty() ); - } - - sethbmsg("initial sync drop all databases", 0); - dropAllDatabasesExceptLocal(); -// sethbmsg("initial sync drop oplog", 0); -// emptyOplog(); - - list<string> dbs = r.conn()->getDatabaseNames(); - for( list<string>::iterator i = dbs.begin(); i != dbs.end(); i++ ) { - string db = *i; - if( db != "local" ) { - sethbmsg( str::stream() << "initial sync cloning db: " << db , 0); - bool ok; - { - writelock lk(db); - Client::Context ctx(db); - ok = clone(masterHostname.c_str(), db); - } - if( !ok ) { - sethbmsg( str::stream() << "initial sync error clone of " << db << " failed sleeping 5 minutes" ,0); - sleepsecs(300); - return; + if (replSettings.fastsync) { + log() << "fastsync: skipping database clone" << rsLog; + } + else { + sethbmsg("initial sync drop all databases", 0); + dropAllDatabasesExceptLocal(); + + sethbmsg("initial sync clone all databases", 0); + + list<string> dbs = r.conn()->getDatabaseNames(); + for( list<string>::iterator i = dbs.begin(); i != dbs.end(); i++ ) { + string db = *i; + if( db != "local" ) { + sethbmsg( str::stream() << "initial sync cloning db: " << db , 0); + bool ok; + { + writelock lk(db); + Client::Context ctx(db); + ok = clone(sourceHostname.c_str(), db); + } + if( !ok ) { + sethbmsg( str::stream() << "initial sync error clone of " << db << " failed sleeping 5 minutes" ,0); + sleepsecs(300); + return; + } } } } sethbmsg("initial sync query minValid",0); - /* our cloned copy will be strange until we apply oplog events that occurred + isyncassert( "initial sync source must remain readable throughout our initial sync", source->state().readable() ); + + /* our cloned copy will be strange until we apply oplog events that occurred through the process. we note that time point here. */ BSONObj minValid = r.getLastOp(rsoplog); - assert( !minValid.isEmpty() ); + isyncassert( "getLastOp is empty ", !minValid.isEmpty() ); OpTime mvoptime = minValid["ts"]._opTime(); assert( !mvoptime.isNull() ); - /* copy the oplog + /* apply relevant portion of the oplog */ { - sethbmsg("initial sync copy+apply oplog"); - if( ! initialSyncOplogApplication(masterHostname, cp, startingTS, mvoptime) ) { // note we assume here that this call does not throw + sethbmsg("initial sync initial oplog application"); + isyncassert( "initial sync source must remain readable throughout our initial sync [2]", source->state().readable() ); + if( ! initialSyncOplogApplication(source, /*applyGTE*/startingTS, /*minValid*/mvoptime) ) { // note we assume here that this call does not throw log() << "replSet initial sync failed during applyoplog" << rsLog; emptyOplog(); // otherwise we'll be up! - lastOpTimeWritten = OpTime(); - lastH = 0; + lastOpTimeWritten = OpTime(); + lastH = 0; log() << "replSet cleaning up [1]" << rsLog; { writelock lk("local."); Client::Context cx( "local." ); - cx.db()->flushFiles(true); + cx.db()->flushFiles(true); } log() << "replSet cleaning up [2]" << rsLog; - sleepsecs(2); + sleepsecs(5); return; } } sethbmsg("initial sync finishing up",0); - + assert( !box.getState().primary() ); // wouldn't make sense if we were. { writelock lk("local."); Client::Context cx( "local." ); - cx.db()->flushFiles(true); + cx.db()->flushFiles(true); try { log() << "replSet set minValid=" << minValid["ts"]._opTime().toString() << rsLog; } diff --git a/db/repl/rs_initiate.cpp b/db/repl/rs_initiate.cpp index 9c74be0..cf1941f 100644 --- a/db/repl/rs_initiate.cpp +++ b/db/repl/rs_initiate.cpp @@ -26,47 +26,63 @@ #include "rs.h" #include "rs_config.h" #include "../dbhelpers.h" +#include "../oplog.h" using namespace bson; using namespace mongoutils; -namespace mongo { +namespace mongo { /* called on a reconfig AND on initiate - throws + throws @param initial true when initiating */ void checkMembersUpForConfigChange(const ReplSetConfig& cfg, bool initial) { int failures = 0; int me = 0; + stringstream selfs; for( vector<ReplSetConfig::MemberCfg>::const_iterator i = cfg.members.begin(); i != cfg.members.end(); i++ ) { if( i->h.isSelf() ) { me++; - if( !i->potentiallyHot() ) { + if( me > 1 ) + selfs << ','; + selfs << i->h.toString(); + if( !i->potentiallyHot() ) { uasserted(13420, "initiation and reconfiguration of a replica set must be sent to a node that can become primary"); } } } - uassert(13278, "bad config - dups?", me <= 1); // dups? - uassert(13279, "can't find self in the replset config", me == 1); + uassert(13278, "bad config: isSelf is true for multiple hosts: " + selfs.str(), me <= 1); // dups? + if( me != 1 ) { + stringstream ss; + ss << "can't find self in the replset config"; + if( !cmdLine.isDefaultPort() ) ss << " my port: " << cmdLine.port; + if( me != 0 ) ss << " found: " << me; + uasserted(13279, ss.str()); + } for( vector<ReplSetConfig::MemberCfg>::const_iterator i = cfg.members.begin(); i != cfg.members.end(); i++ ) { + // we know we're up + if (i->h.isSelf()) { + continue; + } + BSONObj res; { bool ok = false; try { int theirVersion = -1000; - ok = requestHeartbeat(cfg._id, "", i->h.toString(), res, -1, theirVersion, initial/*check if empty*/); - if( theirVersion >= cfg.version ) { + ok = requestHeartbeat(cfg._id, "", i->h.toString(), res, -1, theirVersion, initial/*check if empty*/); + if( theirVersion >= cfg.version ) { stringstream ss; ss << "replSet member " << i->h.toString() << " has too new a config version (" << theirVersion << ") to reconfigure"; uasserted(13259, ss.str()); } } - catch(DBException& e) { + catch(DBException& e) { log() << "replSet cmufcc requestHeartbeat " << i->h.toString() << " : " << e.toString() << rsLog; } - catch(...) { + catch(...) { log() << "replSet cmufcc error exception in requestHeartbeat?" << rsLog; } if( res.getBoolField("mismatch") ) @@ -96,7 +112,7 @@ namespace mongo { trying to keep change small as release is near. */ const Member* m = theReplSet->findById( i->_id ); - if( m ) { + if( m ) { // ok, so this was an existing member (wouldn't make sense to add to config a new member that is down) assert( m->h().toString() == i->h.toString() ); allowFailure = true; @@ -113,24 +129,24 @@ namespace mongo { } if( initial ) { bool hasData = res["hasData"].Bool(); - uassert(13311, "member " + i->h.toString() + " has data already, cannot initiate set. All members except initiator must be empty.", - !hasData || i->h.isSelf()); + uassert(13311, "member " + i->h.toString() + " has data already, cannot initiate set. All members except initiator must be empty.", + !hasData || i->h.isSelf()); } } } - class CmdReplSetInitiate : public ReplSetCommand { + class CmdReplSetInitiate : public ReplSetCommand { public: virtual LockType locktype() const { return NONE; } CmdReplSetInitiate() : ReplSetCommand("replSetInitiate") { } - virtual void help(stringstream& h) const { - h << "Initiate/christen a replica set."; + virtual void help(stringstream& h) const { + h << "Initiate/christen a replica set."; h << "\nhttp://www.mongodb.org/display/DOCS/Replica+Set+Commands"; } virtual bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl) { log() << "replSet replSetInitiate admin command received from client" << rsLog; - if( !replSet ) { + if( !replSet ) { errmsg = "server is not running with --replSet"; return false; } @@ -141,12 +157,12 @@ namespace mongo { } { - // just make sure we can get a write lock before doing anything else. we'll reacquire one - // later. of course it could be stuck then, but this check lowers the risk if weird things + // just make sure we can get a write lock before doing anything else. we'll reacquire one + // later. of course it could be stuck then, but this check lowers the risk if weird things // are up. time_t t = time(0); writelock lk(""); - if( time(0)-t > 10 ) { + if( time(0)-t > 10 ) { errmsg = "took a long time to get write lock, so not initiating. Initiate when server less busy?"; return false; } @@ -155,7 +171,7 @@ namespace mongo { it is ok if the initiating member has *other* data than that. */ BSONObj o; - if( Helpers::getFirst(rsoplog, o) ) { + if( Helpers::getFirst(rsoplog, o) ) { errmsg = rsoplog + string(" is not empty on the initiating member. cannot initiate."); return false; } @@ -194,7 +210,7 @@ namespace mongo { configObj = b.obj(); log() << "replSet created this configuration for initiation : " << configObj.toString() << rsLog; } - else { + else { configObj = cmdObj["replSetInitiate"].Obj(); } @@ -203,7 +219,7 @@ namespace mongo { ReplSetConfig newConfig(configObj); parsed = true; - if( newConfig.version > 1 ) { + if( newConfig.version > 1 ) { errmsg = "can't initiate with a version number greater than 1"; return false; } @@ -214,6 +230,8 @@ namespace mongo { log() << "replSet replSetInitiate all members seem up" << rsLog; + createOplog(); + writelock lk(""); bo comment = BSON( "msg" << "initiating set"); newConfig.saveConfigLocally(comment); @@ -222,9 +240,9 @@ namespace mongo { ReplSet::startupStatus = ReplSet::SOON; ReplSet::startupStatusMsg = "Received replSetInitiate - should come online shortly."; } - catch( DBException& e ) { + catch( DBException& e ) { log() << "replSet replSetInitiate exception: " << e.what() << rsLog; - if( !parsed ) + if( !parsed ) errmsg = string("couldn't parse cfg object ") + e.what(); else errmsg = string("couldn't initiate : ") + e.what(); diff --git a/db/repl/rs_member.h b/db/repl/rs_member.h index 099cb22..017b6ea 100644 --- a/db/repl/rs_member.h +++ b/db/repl/rs_member.h @@ -30,18 +30,18 @@ namespace mongo { RS_FATAL something bad has occurred and server is not completely offline with regard to the replica set. fatal error. RS_STARTUP2 loaded config, still determining who is primary */ - struct MemberState { - enum MS { - RS_STARTUP, - RS_PRIMARY, - RS_SECONDARY, - RS_RECOVERING, - RS_FATAL, - RS_STARTUP2, - RS_UNKNOWN, /* remote node not yet reached */ - RS_ARBITER, - RS_DOWN, /* node not reachable for a report */ - RS_ROLLBACK + struct MemberState { + enum MS { + RS_STARTUP = 0, + RS_PRIMARY = 1, + RS_SECONDARY = 2, + RS_RECOVERING = 3, + RS_FATAL = 4, + RS_STARTUP2 = 5, + RS_UNKNOWN = 6, /* remote node not yet reached */ + RS_ARBITER = 7, + RS_DOWN = 8, /* node not reachable for a report */ + RS_ROLLBACK = 9 } s; MemberState(MS ms = RS_UNKNOWN) : s(ms) { } @@ -53,6 +53,7 @@ namespace mongo { bool startup2() const { return s == RS_STARTUP2; } bool fatal() const { return s == RS_FATAL; } bool rollback() const { return s == RS_ROLLBACK; } + bool readable() const { return s == RS_PRIMARY || s == RS_SECONDARY; } string toString() const; @@ -60,9 +61,9 @@ namespace mongo { bool operator!=(const MemberState& r) const { return s != r.s; } }; - /* this is supposed to be just basic information on a member, + /* this is supposed to be just basic information on a member, and copy constructable. */ - class HeartbeatInfo { + class HeartbeatInfo { unsigned _id; public: HeartbeatInfo() : _id(0xffffffff),hbstate(MemberState::RS_UNKNOWN),health(-1.0),downSince(0),skew(INT_MIN) { } @@ -88,15 +89,15 @@ namespace mongo { bool changed(const HeartbeatInfo& old) const; }; - inline HeartbeatInfo::HeartbeatInfo(unsigned id) : _id(id) { + inline HeartbeatInfo::HeartbeatInfo(unsigned id) : _id(id) { hbstate = MemberState::RS_UNKNOWN; health = -1.0; downSince = 0; - lastHeartbeat = upSince = 0; + lastHeartbeat = upSince = 0; skew = INT_MIN; } - inline bool HeartbeatInfo::changed(const HeartbeatInfo& old) const { + inline bool HeartbeatInfo::changed(const HeartbeatInfo& old) const { return health != old.health || hbstate != old.hbstate; } diff --git a/db/repl/rs_optime.h b/db/repl/rs_optime.h index b3607fa..f0ca569 100644 --- a/db/repl/rs_optime.h +++ b/db/repl/rs_optime.h @@ -1,58 +1,58 @@ -// @file rs_optime.h
-
-/*
- * Copyright (C) 2010 10gen Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- */
-
-#pragma once
-
-#include "../../util/optime.h"
-
-namespace mongo {
-
+// @file rs_optime.h + +/* + * Copyright (C) 2010 10gen Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#pragma once + +#include "../../util/optime.h" + +namespace mongo { + const char rsoplog[] = "local.oplog.rs"; -
- /*
- class RSOpTime : public OpTime {
- public:
- bool initiated() const { return getSecs() != 0; }
- };*/
-
- /*struct RSOpTime {
- unsigned long long ord;
-
- RSOpTime() : ord(0) { }
-
- bool initiated() const { return ord > 0; }
-
- void initiate() {
- assert( !initiated() );
- ord = 1000000;
- }
-
- ReplTime inc() {
- DEV assertInWriteLock();
- return ++ord;
- }
-
- string toString() const { return str::stream() << ord; }
-
- // query the oplog and set the highest value herein. acquires a db read lock. throws.
- void load();
- };
-
- extern RSOpTime rsOpTime;*/
-
-}
+ + /* + class RSOpTime : public OpTime { + public: + bool initiated() const { return getSecs() != 0; } + };*/ + + /*struct RSOpTime { + unsigned long long ord; + + RSOpTime() : ord(0) { } + + bool initiated() const { return ord > 0; } + + void initiate() { + assert( !initiated() ); + ord = 1000000; + } + + ReplTime inc() { + DEV assertInWriteLock(); + return ++ord; + } + + string toString() const { return str::stream() << ord; } + + // query the oplog and set the highest value herein. acquires a db read lock. throws. + void load(); + }; + + extern RSOpTime rsOpTime;*/ + +} diff --git a/db/repl/rs_rollback.cpp b/db/repl/rs_rollback.cpp index 6b2544c..0b4cc28 100644 --- a/db/repl/rs_rollback.cpp +++ b/db/repl/rs_rollback.cpp @@ -1,5 +1,5 @@ /* @file rs_rollback.cpp -* +* * Copyright (C) 2008 10gen Inc. * * This program is free software: you can redistribute it and/or modify @@ -25,7 +25,7 @@ /* Scenarios We went offline with ops not replicated out. - + F = node that failed and coming back. P = node that took over, new primary @@ -33,11 +33,11 @@ F : a b c d e f g P : a b c d q - The design is "keep P". One could argue here that "keep F" has some merits, however, in most cases P - will have significantly more data. Also note that P may have a proper subset of F's stream if there were + The design is "keep P". One could argue here that "keep F" has some merits, however, in most cases P + will have significantly more data. Also note that P may have a proper subset of F's stream if there were no subsequent writes. - For now the model is simply : get F back in sync with P. If P was really behind or something, we should have + For now the model is simply : get F back in sync with P. If P was really behind or something, we should have just chosen not to fail over anyway. #2: @@ -50,9 +50,9 @@ Steps find an event in common. 'd'. - undo our events beyond that by: + undo our events beyond that by: (1) taking copy from other server of those objects - (2) do not consider copy valid until we pass reach an optime after when we fetched the new version of object + (2) do not consider copy valid until we pass reach an optime after when we fetched the new version of object -- i.e., reset minvalid. (3) we could skip operations on objects that are previous in time to our capture of the object as an optimization. @@ -65,15 +65,15 @@ namespace mongo { bool copyCollectionFromRemote(const string& host, const string& ns, const BSONObj& query, string& errmsg, bool logforrepl); void incRBID(); - class rsfatal : public std::exception { + class rsfatal : public std::exception { public: - virtual const char* what() const throw(){ return "replica set fatal exception"; } + virtual const char* what() const throw() { return "replica set fatal exception"; } }; struct DocID { const char *ns; be _id; - bool operator<(const DocID& d) const { + bool operator<(const DocID& d) const { int c = strcmp(ns, d.ns); if( c < 0 ) return true; if( c > 0 ) return false; @@ -82,7 +82,7 @@ namespace mongo { }; struct HowToFixUp { - /* note this is a set -- if there are many $inc's on a single document we need to rollback, we only + /* note this is a set -- if there are many $inc's on a single document we need to rollback, we only need to refetch it once. */ set<DocID> toRefetch; @@ -97,9 +97,9 @@ namespace mongo { int rbid; // remote server's current rollback sequence # }; - static void refetch(HowToFixUp& h, const BSONObj& ourObj) { + static void refetch(HowToFixUp& h, const BSONObj& ourObj) { const char *op = ourObj.getStringField("op"); - if( *op == 'n' ) + if( *op == 'n' ) return; unsigned long long totSize = 0; @@ -108,53 +108,54 @@ namespace mongo { throw "rollback too large"; DocID d; + // NOTE The assigned ns value may become invalid if we yield. d.ns = ourObj.getStringField("ns"); - if( *d.ns == 0 ) { + if( *d.ns == 0 ) { log() << "replSet WARNING ignoring op on rollback no ns TODO : " << ourObj.toString() << rsLog; return; } bo o = ourObj.getObjectField(*op=='u' ? "o2" : "o"); - if( o.isEmpty() ) { + if( o.isEmpty() ) { log() << "replSet warning ignoring op on rollback : " << ourObj.toString() << rsLog; return; } - if( *op == 'c' ) { + if( *op == 'c' ) { be first = o.firstElement(); NamespaceString s(d.ns); // foo.$cmd string cmdname = first.fieldName(); Command *cmd = Command::findCommand(cmdname.c_str()); - if( cmd == 0 ) { + if( cmd == 0 ) { log() << "replSet warning rollback no suchcommand " << first.fieldName() << " - different mongod versions perhaps?" << rsLog; return; } else { /* findandmodify - tranlated? - godinsert?, + godinsert?, renamecollection a->b. just resync a & b */ if( cmdname == "create" ) { - /* Create collection operation - { ts: ..., h: ..., op: "c", ns: "foo.$cmd", o: { create: "abc", ... } }
+ /* Create collection operation + { ts: ..., h: ..., op: "c", ns: "foo.$cmd", o: { create: "abc", ... } } */ string ns = s.db + '.' + o["create"].String(); // -> foo.abc h.toDrop.insert(ns); return; } - else if( cmdname == "drop" ) { + else if( cmdname == "drop" ) { string ns = s.db + '.' + first.valuestr(); h.collectionsToResync.insert(ns); return; } - else if( cmdname == "dropIndexes" || cmdname == "deleteIndexes" ) { + else if( cmdname == "dropIndexes" || cmdname == "deleteIndexes" ) { /* TODO: this is bad. we simply full resync the collection here, which could be very slow. */ log() << "replSet info rollback of dropIndexes is slow in this version of mongod" << rsLog; string ns = s.db + '.' + first.valuestr(); h.collectionsToResync.insert(ns); return; } - else if( cmdname == "renameCollection" ) { + else if( cmdname == "renameCollection" ) { /* TODO: slow. */ log() << "replSet info rollback of renameCollection is slow in this version of mongod" << rsLog; string from = first.valuestr(); @@ -163,15 +164,15 @@ namespace mongo { h.collectionsToResync.insert(to); return; } - else if( cmdname == "reIndex" ) { + else if( cmdname == "reIndex" ) { return; } - else if( cmdname == "dropDatabase" ) { + else if( cmdname == "dropDatabase" ) { log() << "replSet error rollback : can't rollback drop database full resync will be required" << rsLog; log() << "replSet " << o.toString() << rsLog; throw rsfatal(); } - else { + else { log() << "replSet error can't rollback this command yet: " << o.toString() << rsLog; log() << "replSet cmdname=" << cmdname << rsLog; throw rsfatal(); @@ -190,15 +191,15 @@ namespace mongo { int getRBID(DBClientConnection*); - static void syncRollbackFindCommonPoint(DBClientConnection *them, HowToFixUp& h) { + static void syncRollbackFindCommonPoint(DBClientConnection *them, HowToFixUp& h) { static time_t last; - if( time(0)-last < 60 ) { + if( time(0)-last < 60 ) { throw "findcommonpoint waiting a while before trying again"; } last = time(0); assert( dbMutex.atLeastReadLocked() ); - Client::Context c(rsoplog, dbpath, 0, false); + Client::Context c(rsoplog); NamespaceDetails *nsd = nsdetails(rsoplog); assert(nsd); ReverseCappedCursor u(nsd); @@ -226,7 +227,7 @@ namespace mongo { log() << "replSet info rollback our last optime: " << ourTime.toStringPretty() << rsLog; log() << "replSet info rollback their last optime: " << theirTime.toStringPretty() << rsLog; log() << "replSet info rollback diff in end of log times: " << diff << " seconds" << rsLog; - if( diff > 3600 ) { + if( diff > 3600 ) { log() << "replSet rollback too long a time period for a rollback." << rsLog; throw "error not willing to roll back more than one hour of data"; } @@ -236,8 +237,8 @@ namespace mongo { while( 1 ) { scanned++; /* todo add code to assure no excessive scanning for too long */ - if( ourTime == theirTime ) { - if( ourObj["h"].Long() == theirObj["h"].Long() ) { + if( ourTime == theirTime ) { + if( ourObj["h"].Long() == theirObj["h"].Long() ) { // found the point back in time where we match. // todo : check a few more just to be careful about hash collisions. log() << "replSet rollback found matching events at " << ourTime.toStringPretty() << rsLog; @@ -249,7 +250,7 @@ namespace mongo { refetch(h, ourObj); - if( !t->more() ) { + if( !t->more() ) { log() << "replSet rollback error RS100 reached beginning of remote oplog" << rsLog; log() << "replSet them: " << them->toString() << " scanned: " << scanned << rsLog; log() << "replSet theirTime: " << theirTime.toStringLong() << rsLog; @@ -270,8 +271,8 @@ namespace mongo { ourObj = u.current(); ourTime = ourObj["ts"]._opTime(); } - else if( theirTime > ourTime ) { - if( !t->more() ) { + else if( theirTime > ourTime ) { + if( !t->more() ) { log() << "replSet rollback error RS100 reached beginning of remote oplog" << rsLog; log() << "replSet them: " << them->toString() << " scanned: " << scanned << rsLog; log() << "replSet theirTime: " << theirTime.toStringLong() << rsLog; @@ -281,11 +282,11 @@ namespace mongo { theirObj = t->nextSafe(); theirTime = theirObj["ts"]._opTime(); } - else { + else { // theirTime < ourTime refetch(h, ourObj); u.advance(); - if( !u.ok() ) { + if( !u.ok() ) { log() << "replSet rollback error RS101 reached beginning of local oplog" << rsLog; log() << "replSet them: " << them->toString() << " scanned: " << scanned << rsLog; log() << "replSet theirTime: " << theirTime.toStringLong() << rsLog; @@ -298,299 +299,303 @@ namespace mongo { } } - struct X { + struct X { const bson::bo *op; bson::bo goodVersionOfObject; }; - static void setMinValid(bo newMinValid) { - try { - log() << "replSet minvalid=" << newMinValid["ts"]._opTime().toStringLong() << rsLog; - } - catch(...) { } - { - Helpers::putSingleton("local.replset.minvalid", newMinValid); - Client::Context cx( "local." ); - cx.db()->flushFiles(true); - } + static void setMinValid(bo newMinValid) { + try { + log() << "replSet minvalid=" << newMinValid["ts"]._opTime().toStringLong() << rsLog; + } + catch(...) { } + { + Helpers::putSingleton("local.replset.minvalid", newMinValid); + Client::Context cx( "local." ); + cx.db()->flushFiles(true); + } } void ReplSetImpl::syncFixUp(HowToFixUp& h, OplogReader& r) { - DBClientConnection *them = r.conn(); - - // fetch all first so we needn't handle interruption in a fancy way - - unsigned long long totSize = 0; - - list< pair<DocID,bo> > goodVersions; - - bo newMinValid; - - /* fetch all the goodVersions of each document from current primary */ - DocID d; - unsigned long long n = 0; - try { - for( set<DocID>::iterator i = h.toRefetch.begin(); i != h.toRefetch.end(); i++ ) { - d = *i; - - assert( !d._id.eoo() ); - - { - /* TODO : slow. lots of round trips. */ - n++; - bo good= them->findOne(d.ns, d._id.wrap()).getOwned(); - totSize += good.objsize(); - uassert( 13410, "replSet too much data to roll back", totSize < 300 * 1024 * 1024 ); - - // note good might be eoo, indicating we should delete it - goodVersions.push_back(pair<DocID,bo>(d,good)); - } - } - newMinValid = r.getLastOp(rsoplog); - if( newMinValid.isEmpty() ) { - sethbmsg("rollback error newMinValid empty?"); - return; - } - } - catch(DBException& e) { - sethbmsg(str::stream() << "rollback re-get objects: " << e.toString(),0); - log() << "rollback couldn't re-get ns:" << d.ns << " _id:" << d._id << ' ' << n << '/' << h.toRefetch.size() << rsLog; - throw e; - } - - MemoryMappedFile::flushAll(true); - - sethbmsg("rollback 3.5"); - if( h.rbid != getRBID(r.conn()) ) { - // our source rolled back itself. so the data we received isn't necessarily consistent. - sethbmsg("rollback rbid on source changed during rollback, cancelling this attempt"); - return; - } - - // update them - sethbmsg(str::stream() << "rollback 4 n:" << goodVersions.size()); - - bool warn = false; - - assert( !h.commonPointOurDiskloc.isNull() ); - - dbMutex.assertWriteLocked(); - - /* we have items we are writing that aren't from a point-in-time. thus best not to come online - until we get to that point in freshness. */ - setMinValid(newMinValid); - - /** any full collection resyncs required? */ - if( !h.collectionsToResync.empty() ) { - for( set<string>::iterator i = h.collectionsToResync.begin(); i != h.collectionsToResync.end(); i++ ) { - string ns = *i; - sethbmsg(str::stream() << "rollback 4.1 coll resync " << ns); - Client::Context c(*i, dbpath, 0, /*doauth*/false); - try { - bob res; - string errmsg; - dropCollection(ns, errmsg, res); - { - dbtemprelease r; - bool ok = copyCollectionFromRemote(them->getServerAddress(), ns, bo(), errmsg, false); - if( !ok ) { - log() << "replSet rollback error resyncing collection " << ns << ' ' << errmsg << rsLog; - throw "rollback error resyncing rollection [1]"; - } - } - } - catch(...) { - log() << "replset rollback error resyncing collection " << ns << rsLog; - throw "rollback error resyncing rollection [2]"; - } - } - - /* we did more reading from primary, so check it again for a rollback (which would mess us up), and - make minValid newer. - */ - sethbmsg("rollback 4.2"); - { - string err; - try { - newMinValid = r.getLastOp(rsoplog); - if( newMinValid.isEmpty() ) { - err = "can't get minvalid from primary"; - } else { - setMinValid(newMinValid); - } - } - catch(...) { - err = "can't get/set minvalid"; - } - if( h.rbid != getRBID(r.conn()) ) { - // our source rolled back itself. so the data we received isn't necessarily consistent. - // however, we've now done writes. thus we have a problem. - err += "rbid at primary changed during resync/rollback"; - } - if( !err.empty() ) { - log() << "replSet error rolling back : " << err << ". A full resync will be necessary." << rsLog; - /* todo: reset minvalid so that we are permanently in fatal state */ - /* todo: don't be fatal, but rather, get all the data first. */ - sethbmsg("rollback error"); - throw rsfatal(); - } - } - sethbmsg("rollback 4.3"); - } - - sethbmsg("rollback 4.6"); - /** drop collections to drop before doing individual fixups - that might make things faster below actually if there were subsequent inserts to rollback */ - for( set<string>::iterator i = h.toDrop.begin(); i != h.toDrop.end(); i++ ) { - Client::Context c(*i, dbpath, 0, /*doauth*/false); - try { - bob res; - string errmsg; - log(1) << "replSet rollback drop: " << *i << rsLog; - dropCollection(*i, errmsg, res); - } - catch(...) { - log() << "replset rollback error dropping collection " << *i << rsLog; - } - } - - sethbmsg("rollback 4.7"); - Client::Context c(rsoplog, dbpath, 0, /*doauth*/false); - NamespaceDetails *oplogDetails = nsdetails(rsoplog); - uassert(13423, str::stream() << "replSet error in rollback can't find " << rsoplog, oplogDetails); - - map<string,shared_ptr<RemoveSaver> > removeSavers; - - unsigned deletes = 0, updates = 0; - for( list<pair<DocID,bo> >::iterator i = goodVersions.begin(); i != goodVersions.end(); i++ ) { - const DocID& d = i->first; - bo pattern = d._id.wrap(); // { _id : ... } - try { - assert( d.ns && *d.ns ); - if( h.collectionsToResync.count(d.ns) ) { - /* we just synced this entire collection */ - continue; - } - - /* keep an archive of items rolled back */ - shared_ptr<RemoveSaver>& rs = removeSavers[d.ns]; - if ( ! rs ) - rs.reset( new RemoveSaver( "rollback" , "" , d.ns ) ); - - // todo: lots of overhead in context, this can be faster - Client::Context c(d.ns, dbpath, 0, /*doauth*/false); - if( i->second.isEmpty() ) { - // wasn't on the primary; delete. - /* TODO1.6 : can't delete from a capped collection. need to handle that here. */ - deletes++; - - NamespaceDetails *nsd = nsdetails(d.ns); - if( nsd ) { - if( nsd->capped ) { - /* can't delete from a capped collection - so we truncate instead. if this item must go, - so must all successors!!! */ - try { - /** todo: IIRC cappedTrunateAfter does not handle completely empty. todo. */ - // this will crazy slow if no _id index. - long long start = Listener::getElapsedTimeMillis(); - DiskLoc loc = Helpers::findOne(d.ns, pattern, false); - if( Listener::getElapsedTimeMillis() - start > 200 ) - log() << "replSet warning roll back slow no _id index for " << d.ns << " perhaps?" << rsLog; - //would be faster but requires index: DiskLoc loc = Helpers::findById(nsd, pattern); - if( !loc.isNull() ) { - try { - nsd->cappedTruncateAfter(d.ns, loc, true); - } - catch(DBException& e) { - if( e.getCode() == 13415 ) { - // hack: need to just make cappedTruncate do this... - nsd->emptyCappedCollection(d.ns); - } else { - throw; - } - } - } - } - catch(DBException& e) { - log() << "replSet error rolling back capped collection rec " << d.ns << ' ' << e.toString() << rsLog; - } - } - else { - try { - deletes++; - deleteObjects(d.ns, pattern, /*justone*/true, /*logop*/false, /*god*/true, rs.get() ); - } - catch(...) { - log() << "replSet error rollback delete failed ns:" << d.ns << rsLog; - } - } - // did we just empty the collection? if so let's check if it even exists on the source. - if( nsd->nrecords == 0 ) { - try { - string sys = cc().database()->name + ".system.namespaces"; - bo o = them->findOne(sys, QUERY("name"<<d.ns)); - if( o.isEmpty() ) { - // we should drop - try { - bob res; - string errmsg; - dropCollection(d.ns, errmsg, res); - } - catch(...) { - log() << "replset error rolling back collection " << d.ns << rsLog; - } - } - } - catch(DBException& ) { - /* this isn't *that* big a deal, but is bad. */ - log() << "replSet warning rollback error querying for existence of " << d.ns << " at the primary, ignoring" << rsLog; - } - } - } - } - else { - // todo faster... - OpDebug debug; - updates++; - _updateObjects(/*god*/true, d.ns, i->second, pattern, /*upsert=*/true, /*multi=*/false , /*logtheop=*/false , debug, rs.get() ); - } - } - catch(DBException& e) { - log() << "replSet exception in rollback ns:" << d.ns << ' ' << pattern.toString() << ' ' << e.toString() << " ndeletes:" << deletes << rsLog; - warn = true; - } - } - - removeSavers.clear(); // this effectively closes all of them - - sethbmsg(str::stream() << "rollback 5 d:" << deletes << " u:" << updates); - MemoryMappedFile::flushAll(true); - sethbmsg("rollback 6"); - - // clean up oplog - log(2) << "replSet rollback truncate oplog after " << h.commonPoint.toStringPretty() << rsLog; - // todo: fatal error if this throws? - oplogDetails->cappedTruncateAfter(rsoplog, h.commonPointOurDiskloc, false); - - /* reset cached lastoptimewritten and h value */ - loadLastOpTimeWritten(); - - sethbmsg("rollback 7"); - MemoryMappedFile::flushAll(true); - - // done - if( warn ) - sethbmsg("issues during syncRollback, see log"); - else - sethbmsg("rollback done"); - } - - void ReplSetImpl::syncRollback(OplogReader&r) { + DBClientConnection *them = r.conn(); + + // fetch all first so we needn't handle interruption in a fancy way + + unsigned long long totSize = 0; + + list< pair<DocID,bo> > goodVersions; + + bo newMinValid; + + /* fetch all the goodVersions of each document from current primary */ + DocID d; + unsigned long long n = 0; + try { + for( set<DocID>::iterator i = h.toRefetch.begin(); i != h.toRefetch.end(); i++ ) { + d = *i; + + assert( !d._id.eoo() ); + + { + /* TODO : slow. lots of round trips. */ + n++; + bo good= them->findOne(d.ns, d._id.wrap()).getOwned(); + totSize += good.objsize(); + uassert( 13410, "replSet too much data to roll back", totSize < 300 * 1024 * 1024 ); + + // note good might be eoo, indicating we should delete it + goodVersions.push_back(pair<DocID,bo>(d,good)); + } + } + newMinValid = r.getLastOp(rsoplog); + if( newMinValid.isEmpty() ) { + sethbmsg("rollback error newMinValid empty?"); + return; + } + } + catch(DBException& e) { + sethbmsg(str::stream() << "rollback re-get objects: " << e.toString(),0); + log() << "rollback couldn't re-get ns:" << d.ns << " _id:" << d._id << ' ' << n << '/' << h.toRefetch.size() << rsLog; + throw e; + } + + MemoryMappedFile::flushAll(true); + + sethbmsg("rollback 3.5"); + if( h.rbid != getRBID(r.conn()) ) { + // our source rolled back itself. so the data we received isn't necessarily consistent. + sethbmsg("rollback rbid on source changed during rollback, cancelling this attempt"); + return; + } + + // update them + sethbmsg(str::stream() << "rollback 4 n:" << goodVersions.size()); + + bool warn = false; + + assert( !h.commonPointOurDiskloc.isNull() ); + + dbMutex.assertWriteLocked(); + + /* we have items we are writing that aren't from a point-in-time. thus best not to come online + until we get to that point in freshness. */ + setMinValid(newMinValid); + + /** any full collection resyncs required? */ + if( !h.collectionsToResync.empty() ) { + for( set<string>::iterator i = h.collectionsToResync.begin(); i != h.collectionsToResync.end(); i++ ) { + string ns = *i; + sethbmsg(str::stream() << "rollback 4.1 coll resync " << ns); + Client::Context c(*i); + try { + bob res; + string errmsg; + dropCollection(ns, errmsg, res); + { + dbtemprelease r; + bool ok = copyCollectionFromRemote(them->getServerAddress(), ns, bo(), errmsg, false); + if( !ok ) { + log() << "replSet rollback error resyncing collection " << ns << ' ' << errmsg << rsLog; + throw "rollback error resyncing rollection [1]"; + } + } + } + catch(...) { + log() << "replset rollback error resyncing collection " << ns << rsLog; + throw "rollback error resyncing rollection [2]"; + } + } + + /* we did more reading from primary, so check it again for a rollback (which would mess us up), and + make minValid newer. + */ + sethbmsg("rollback 4.2"); + { + string err; + try { + newMinValid = r.getLastOp(rsoplog); + if( newMinValid.isEmpty() ) { + err = "can't get minvalid from primary"; + } + else { + setMinValid(newMinValid); + } + } + catch(...) { + err = "can't get/set minvalid"; + } + if( h.rbid != getRBID(r.conn()) ) { + // our source rolled back itself. so the data we received isn't necessarily consistent. + // however, we've now done writes. thus we have a problem. + err += "rbid at primary changed during resync/rollback"; + } + if( !err.empty() ) { + log() << "replSet error rolling back : " << err << ". A full resync will be necessary." << rsLog; + /* todo: reset minvalid so that we are permanently in fatal state */ + /* todo: don't be fatal, but rather, get all the data first. */ + sethbmsg("rollback error"); + throw rsfatal(); + } + } + sethbmsg("rollback 4.3"); + } + + sethbmsg("rollback 4.6"); + /** drop collections to drop before doing individual fixups - that might make things faster below actually if there were subsequent inserts to rollback */ + for( set<string>::iterator i = h.toDrop.begin(); i != h.toDrop.end(); i++ ) { + Client::Context c(*i); + try { + bob res; + string errmsg; + log(1) << "replSet rollback drop: " << *i << rsLog; + dropCollection(*i, errmsg, res); + } + catch(...) { + log() << "replset rollback error dropping collection " << *i << rsLog; + } + } + + sethbmsg("rollback 4.7"); + Client::Context c(rsoplog); + NamespaceDetails *oplogDetails = nsdetails(rsoplog); + uassert(13423, str::stream() << "replSet error in rollback can't find " << rsoplog, oplogDetails); + + map<string,shared_ptr<RemoveSaver> > removeSavers; + + unsigned deletes = 0, updates = 0; + for( list<pair<DocID,bo> >::iterator i = goodVersions.begin(); i != goodVersions.end(); i++ ) { + const DocID& d = i->first; + bo pattern = d._id.wrap(); // { _id : ... } + try { + assert( d.ns && *d.ns ); + if( h.collectionsToResync.count(d.ns) ) { + /* we just synced this entire collection */ + continue; + } + + getDur().commitIfNeeded(); + + /* keep an archive of items rolled back */ + shared_ptr<RemoveSaver>& rs = removeSavers[d.ns]; + if ( ! rs ) + rs.reset( new RemoveSaver( "rollback" , "" , d.ns ) ); + + // todo: lots of overhead in context, this can be faster + Client::Context c(d.ns); + if( i->second.isEmpty() ) { + // wasn't on the primary; delete. + /* TODO1.6 : can't delete from a capped collection. need to handle that here. */ + deletes++; + + NamespaceDetails *nsd = nsdetails(d.ns); + if( nsd ) { + if( nsd->capped ) { + /* can't delete from a capped collection - so we truncate instead. if this item must go, + so must all successors!!! */ + try { + /** todo: IIRC cappedTrunateAfter does not handle completely empty. todo. */ + // this will crazy slow if no _id index. + long long start = Listener::getElapsedTimeMillis(); + DiskLoc loc = Helpers::findOne(d.ns, pattern, false); + if( Listener::getElapsedTimeMillis() - start > 200 ) + log() << "replSet warning roll back slow no _id index for " << d.ns << " perhaps?" << rsLog; + //would be faster but requires index: DiskLoc loc = Helpers::findById(nsd, pattern); + if( !loc.isNull() ) { + try { + nsd->cappedTruncateAfter(d.ns, loc, true); + } + catch(DBException& e) { + if( e.getCode() == 13415 ) { + // hack: need to just make cappedTruncate do this... + nsd->emptyCappedCollection(d.ns); + } + else { + throw; + } + } + } + } + catch(DBException& e) { + log() << "replSet error rolling back capped collection rec " << d.ns << ' ' << e.toString() << rsLog; + } + } + else { + try { + deletes++; + deleteObjects(d.ns, pattern, /*justone*/true, /*logop*/false, /*god*/true, rs.get() ); + } + catch(...) { + log() << "replSet error rollback delete failed ns:" << d.ns << rsLog; + } + } + // did we just empty the collection? if so let's check if it even exists on the source. + if( nsd->stats.nrecords == 0 ) { + try { + string sys = cc().database()->name + ".system.namespaces"; + bo o = them->findOne(sys, QUERY("name"<<d.ns)); + if( o.isEmpty() ) { + // we should drop + try { + bob res; + string errmsg; + dropCollection(d.ns, errmsg, res); + } + catch(...) { + log() << "replset error rolling back collection " << d.ns << rsLog; + } + } + } + catch(DBException& ) { + /* this isn't *that* big a deal, but is bad. */ + log() << "replSet warning rollback error querying for existence of " << d.ns << " at the primary, ignoring" << rsLog; + } + } + } + } + else { + // todo faster... + OpDebug debug; + updates++; + _updateObjects(/*god*/true, d.ns, i->second, pattern, /*upsert=*/true, /*multi=*/false , /*logtheop=*/false , debug, rs.get() ); + } + } + catch(DBException& e) { + log() << "replSet exception in rollback ns:" << d.ns << ' ' << pattern.toString() << ' ' << e.toString() << " ndeletes:" << deletes << rsLog; + warn = true; + } + } + + removeSavers.clear(); // this effectively closes all of them + + sethbmsg(str::stream() << "rollback 5 d:" << deletes << " u:" << updates); + MemoryMappedFile::flushAll(true); + sethbmsg("rollback 6"); + + // clean up oplog + log(2) << "replSet rollback truncate oplog after " << h.commonPoint.toStringPretty() << rsLog; + // todo: fatal error if this throws? + oplogDetails->cappedTruncateAfter(rsoplog, h.commonPointOurDiskloc, false); + + /* reset cached lastoptimewritten and h value */ + loadLastOpTimeWritten(); + + sethbmsg("rollback 7"); + MemoryMappedFile::flushAll(true); + + // done + if( warn ) + sethbmsg("issues during syncRollback, see log"); + else + sethbmsg("rollback done"); + } + + void ReplSetImpl::syncRollback(OplogReader&r) { unsigned s = _syncRollback(r); - if( s ) + if( s ) sleepsecs(s); } - unsigned ReplSetImpl::_syncRollback(OplogReader&r) { + unsigned ReplSetImpl::_syncRollback(OplogReader&r) { assert( !lockedByMe() ); assert( !dbMutex.atLeastReadLocked() ); @@ -604,7 +609,7 @@ namespace mongo { if( box.getState().secondary() ) { /* by doing this, we will not service reads (return an error as we aren't in secondary staate. - that perhaps is moot becasue of the write lock above, but that write lock probably gets deferred + that perhaps is moot becasue of the write lock above, but that write lock probably gets deferred or removed or yielded later anyway. also, this is better for status reporting - we know what is happening. @@ -618,7 +623,7 @@ namespace mongo { r.resetCursor(); /*DBClientConnection us(false, 0, 0); string errmsg; - if( !us.connect(HostAndPort::me().toString(),errmsg) ) { + if( !us.connect(HostAndPort::me().toString(),errmsg) ) { sethbmsg("rollback connect to self failure" + errmsg); return; }*/ @@ -627,15 +632,15 @@ namespace mongo { try { syncRollbackFindCommonPoint(r.conn(), how); } - catch( const char *p ) { + catch( const char *p ) { sethbmsg(string("rollback 2 error ") + p); return 10; } - catch( rsfatal& ) { + catch( rsfatal& ) { _fatal(); return 2; } - catch( DBException& e ) { + catch( DBException& e ) { sethbmsg(string("rollback 2 exception ") + e.toString() + "; sleeping 1 min"); dbtemprelease r; sleepsecs(60); @@ -647,20 +652,20 @@ namespace mongo { { incRBID(); - try { + try { syncFixUp(how, r); } - catch( rsfatal& ) { + catch( rsfatal& ) { sethbmsg("rollback fixup error"); _fatal(); return 2; } - catch(...) { + catch(...) { incRBID(); throw; } incRBID(); - /* success - leave "ROLLBACK" state + /* success - leave "ROLLBACK" state can go to SECONDARY once minvalid is achieved */ box.change(MemberState::RS_RECOVERING, _self); diff --git a/db/repl/rs_sync.cpp b/db/repl/rs_sync.cpp index 9de3f60..8d06fcc 100644 --- a/db/repl/rs_sync.cpp +++ b/db/repl/rs_sync.cpp @@ -19,30 +19,21 @@ #include "../../client/dbclient.h" #include "rs.h" #include "../repl.h" - +#include "connections.h" namespace mongo { using namespace bson; - extern unsigned replSetForceInitialSyncFailure; - void startSyncThread() { - Client::initThread("rs_sync"); - cc().iAmSyncThread(); - theReplSet->syncThread(); - cc().shutdown(); - } - + /* apply the log op that is in param o */ void ReplSetImpl::syncApply(const BSONObj &o) { - //const char *op = o.getStringField("op"); - - char db[MaxDatabaseLen]; + char db[MaxDatabaseNameLen]; const char *ns = o.getStringField("ns"); nsToDatabase(ns, db); if ( *ns == '.' || *ns == 0 ) { - if( *o.getStringField("op") == 'n' ) - return; + if( *o.getStringField("op") == 'n' ) + return; log() << "replSet skipping bad op in oplog: " << o.toString() << endl; return; } @@ -54,19 +45,21 @@ namespace mongo { applyOperation_inlock(o); } + /* initial oplog application, during initial sync, after cloning. + @return false on failure. + this method returns an error and doesn't throw exceptions (i think). + */ bool ReplSetImpl::initialSyncOplogApplication( - string hn, - const Member *primary, + const Member *source, OpTime applyGTE, - OpTime minValid) - { - if( primary == 0 ) return false; + OpTime minValid) { + if( source == 0 ) return false; - OpTime ts; + const string hn = source->h().toString(); + OplogReader r; try { - OplogReader r; - if( !r.connect(hn) ) { - log(2) << "replSet can't connect to " << hn << " to read operations" << rsLog; + if( !r.connect(hn) ) { + log() << "replSet initial sync error can't connect to " << hn << " to read " << rsoplog << rsLog; return false; } @@ -80,48 +73,63 @@ namespace mongo { } assert( r.haveCursor() ); - /* we lock outside the loop to avoid the overhead of locking on every operation. server isn't usable yet anyway! */ - writelock lk(""); - { - if( !r.more() ) { + if( !r.more() ) { sethbmsg("replSet initial sync error reading remote oplog"); + log() << "replSet initial sync error remote oplog (" << rsoplog << ") on host " << hn << " is empty?" << rsLog; return false; } bo op = r.next(); OpTime t = op["ts"]._opTime(); r.putBack(op); - assert( !t.isNull() ); + + if( op.firstElement().fieldName() == string("$err") ) { + log() << "replSet initial sync error querying " << rsoplog << " on " << hn << " : " << op.toString() << rsLog; + return false; + } + + uassert( 13508 , str::stream() << "no 'ts' in first op in oplog: " << op , !t.isNull() ); if( t > applyGTE ) { sethbmsg(str::stream() << "error " << hn << " oplog wrapped during initial sync"); + log() << "replSet initial sync expected first optime of " << applyGTE << rsLog; + log() << "replSet initial sync but received a first optime of " << t << " from " << hn << rsLog; return false; } } + } + catch(DBException& e) { + log() << "replSet initial sync failing: " << e.toString() << rsLog; + return false; + } - // todo : use exhaust - unsigned long long n = 0; - while( 1 ) { + /* we lock outside the loop to avoid the overhead of locking on every operation. */ + writelock lk(""); + // todo : use exhaust + OpTime ts; + unsigned long long n = 0; + while( 1 ) { + try { if( !r.more() ) break; BSONObj o = r.nextSafe(); /* note we might get "not master" at some point */ { - //writelock lk(""); - ts = o["ts"]._opTime(); /* if we have become primary, we dont' want to apply things from elsewhere - anymore. assumePrimary is in the db lock so we are safe as long as + anymore. assumePrimary is in the db lock so we are safe as long as we check after we locked above. */ - const Member *p1 = box.getPrimary(); - if( p1 != primary || replSetForceInitialSyncFailure ) { + if( (source->state() != MemberState::RS_PRIMARY && + source->state() != MemberState::RS_SECONDARY) || + replSetForceInitialSyncFailure ) { + int f = replSetForceInitialSyncFailure; if( f > 0 ) { replSetForceInitialSyncFailure = f-1; log() << "replSet test code invoked, replSetForceInitialSyncFailure" << rsLog; + throw DBException("forced error",0); } - log() << "replSet primary was:" << primary->fullName() << " now:" << - (p1 != 0 ? p1->fullName() : "none") << rsLog; + log() << "replSet we are now primary" << rsLog; throw DBException("primary changed",0); } @@ -131,38 +139,48 @@ namespace mongo { } _logOpObjRS(o); /* with repl sets we write the ops to our oplog too */ } - if( ++n % 100000 == 0 ) { + if( ++n % 100000 == 0 ) { // simple progress metering log() << "replSet initialSyncOplogApplication " << n << rsLog; } + + getDur().commitIfNeeded(); } - } - catch(DBException& e) { - if( ts <= minValid ) { - // didn't make it far enough - log() << "replSet initial sync failing, error applying oplog " << e.toString() << rsLog; - return false; + catch (DBException& e) { + if( e.getCode() == 11000 || e.getCode() == 11001 ) { + // skip duplicate key exceptions + continue; + } + + if( ts <= minValid ) { + // didn't make it far enough + log() << "replSet initial sync failing, error applying oplog " << e.toString() << rsLog; + return false; + } + + // otherwise, whatever + break; } } return true; } - /* should be in RECOVERING state on arrival here. + /* should be in RECOVERING state on arrival here. readlocks @return true if transitioned to SECONDARY */ - bool ReplSetImpl::tryToGoLiveAsASecondary(OpTime& /*out*/ minvalid) { - bool golive = false; + bool ReplSetImpl::tryToGoLiveAsASecondary(OpTime& /*out*/ minvalid) { + bool golive = false; { readlock lk("local.replset.minvalid"); BSONObj mv; - if( Helpers::getSingleton("local.replset.minvalid", mv) ) { + if( Helpers::getSingleton("local.replset.minvalid", mv) ) { minvalid = mv["ts"]._opTime(); - if( minvalid <= lastOpTimeWritten ) { + if( minvalid <= lastOpTimeWritten ) { golive=true; } } - else + else golive = true; /* must have been the original member */ } if( golive ) { @@ -172,44 +190,104 @@ namespace mongo { return golive; } - /* tail the primary's oplog. ok to return, will be re-called. */ - void ReplSetImpl::syncTail() { - // todo : locking vis a vis the mgr... + /** + * Checks if the oplog given is too far ahead to read from. + * + * @param r the oplog + * @param hn the hostname (for log messages) + * + * @return if we are stale compared to the oplog on hn + */ + bool ReplSetImpl::_isStale(OplogReader& r, const string& hn) { + BSONObj remoteOldestOp = r.findOne(rsoplog, Query()); + OpTime ts = remoteOldestOp["ts"]._opTime(); + DEV log() << "replSet remoteOldestOp: " << ts.toStringLong() << rsLog; + else log(3) << "replSet remoteOldestOp: " << ts.toStringLong() << rsLog; + DEV { + // debugging sync1.js... + log() << "replSet lastOpTimeWritten: " << lastOpTimeWritten.toStringLong() << rsLog; + log() << "replSet our state: " << state().toString() << rsLog; + } + if( lastOpTimeWritten < ts ) { + log() << "replSet error RS102 too stale to catch up, at least from " << hn << rsLog; + log() << "replSet our last optime : " << lastOpTimeWritten.toStringLong() << rsLog; + log() << "replSet oldest at " << hn << " : " << ts.toStringLong() << rsLog; + log() << "replSet See http://www.mongodb.org/display/DOCS/Resyncing+a+Very+Stale+Replica+Set+Member" << rsLog; + sethbmsg("error RS102 too stale to catch up"); + changeState(MemberState::RS_RECOVERING); + sleepsecs(120); + return true; + } + return false; + } - const Member *primary = box.getPrimary(); - if( primary == 0 ) return; - string hn = primary->h().toString(); - OplogReader r; - if( !r.connect(primary->h().toString()) ) { + /** + * Tries to connect the oplog reader to a potential sync source. If + * successful, it checks that we are not stale compared to this source. + * + * @param r reader to populate + * @param hn hostname to try + * + * @return if both checks pass, it returns true, otherwise false. + */ + bool ReplSetImpl::_getOplogReader(OplogReader& r, string& hn) { + assert(r.conn() == 0); + + if( !r.connect(hn) ) { log(2) << "replSet can't connect to " << hn << " to read operations" << rsLog; - return; + r.resetConnection(); + return false; + } + if( _isStale(r, hn)) { + r.resetConnection(); + return false; } + return true; + } - /* first make sure we are not hopelessly out of sync by being very stale. */ - { - BSONObj remoteOldestOp = r.findOne(rsoplog, Query()); - OpTime ts = remoteOldestOp["ts"]._opTime(); - DEV log() << "replSet remoteOldestOp: " << ts.toStringLong() << rsLog; - else log(3) << "replSet remoteOldestOp: " << ts.toStringLong() << rsLog; - DEV { - // debugging sync1.js... - log() << "replSet lastOpTimeWritten: " << lastOpTimeWritten.toStringLong() << rsLog; - log() << "replSet our state: " << state().toString() << rsLog; + /* tail an oplog. ok to return, will be re-called. */ + void ReplSetImpl::syncTail() { + // todo : locking vis a vis the mgr... + OplogReader r; + string hn; + + const Member *target = box.getPrimary(); + if (target != 0) { + hn = target->h().toString(); + if (!_getOplogReader(r, hn)) { + // we might be stale wrt the primary, but could still sync from + // a secondary + target = 0; + } + } + + // if we cannot reach the master but someone else is more up-to-date + // than we are, sync from them. + if( target == 0 ) { + for(Member *m = head(); m; m=m->next()) { + hn = m->h().toString(); + if (m->hbinfo().up() && m->state().readable() && + (m->hbinfo().opTime > lastOpTimeWritten) && + m->config().slaveDelay == 0 && + _getOplogReader(r, hn)) { + target = m; + break; + } } - if( lastOpTimeWritten < ts ) { - log() << "replSet error RS102 too stale to catch up, at least from primary: " << hn << rsLog; - log() << "replSet our last optime : " << lastOpTimeWritten.toStringLong() << rsLog; - log() << "replSet oldest at " << hn << " : " << ts.toStringLong() << rsLog; - log() << "replSet See http://www.mongodb.org/display/DOCS/Resyncing+a+Very+Stale+Replica+Set+Member" << rsLog; - sethbmsg("error RS102 too stale to catch up"); - sleepsecs(120); + + // no server found + if (target == 0) { + // if there is no one to sync from + OpTime minvalid; + tryToGoLiveAsASecondary(minvalid); return; } } r.tailingQueryGTE(rsoplog, lastOpTimeWritten); assert( r.haveCursor() ); - assert( r.awaitCapable() ); + + uassert(1000, "replSet source for syncing doesn't seem to be await capable -- is it an older version of mongodb?", r.awaitCapable() ); { if( !r.more() ) { @@ -222,7 +300,7 @@ namespace mongo { return; } OpTime theirTS = theirLastOp["ts"]._opTime(); - if( theirTS < lastOpTimeWritten ) { + if( theirTS < lastOpTimeWritten ) { log() << "replSet we are ahead of the primary, will try to roll back" << rsLog; syncRollback(r); return; @@ -231,7 +309,7 @@ namespace mongo { log() << "replSet syncTail condition 1" << rsLog; sleepsecs(1); } - catch(DBException& e) { + catch(DBException& e) { log() << "replSet error querying " << hn << ' ' << e.toString() << rsLog; sleepsecs(2); } @@ -249,12 +327,9 @@ namespace mongo { BSONObj o = r.nextSafe(); OpTime ts = o["ts"]._opTime(); long long h = o["h"].numberLong(); - if( ts != lastOpTimeWritten || h != lastH ) { - log(1) << "TEMP our last op time written: " << lastOpTimeWritten.toStringPretty() << endl; - log(1) << "TEMP primary's GTE: " << ts.toStringPretty() << endl; - /* - }*/ - + if( ts != lastOpTimeWritten || h != lastH ) { + log() << "replSet our last op time written: " << lastOpTimeWritten.toStringPretty() << endl; + log() << "replset source's GTE: " << ts.toStringPretty() << endl; syncRollback(r); return; } @@ -268,49 +343,45 @@ namespace mongo { while( 1 ) { while( 1 ) { - if( !r.moreInCurrentBatch() ) { - /* we need to occasionally check some things. between + if( !r.moreInCurrentBatch() ) { + /* we need to occasionally check some things. between batches is probably a good time. */ /* perhaps we should check this earlier? but not before the rollback checks. */ - if( state().recovering() ) { + if( state().recovering() ) { /* can we go to RS_SECONDARY state? we can if not too old and if minvalid achieved */ OpTime minvalid; bool golive = ReplSetImpl::tryToGoLiveAsASecondary(minvalid); if( golive ) { ; } - else { + else { sethbmsg(str::stream() << "still syncing, not yet to minValid optime" << minvalid.toString()); } /* todo: too stale capability */ } - if( box.getPrimary() != primary ) - return; + { + const Member *primary = box.getPrimary(); + + if( !target->hbinfo().hbstate.readable() || + // if we are not syncing from the primary, return (if + // it's up) so that we can try accessing it again + (target != primary && primary != 0)) { + return; + } + } } if( !r.more() ) break; - { + { BSONObj o = r.nextSafe(); /* note we might get "not master" at some point */ - { - writelock lk(""); - /* if we have become primary, we dont' want to apply things from elsewhere - anymore. assumePrimary is in the db lock so we are safe as long as - we check after we locked above. */ - if( box.getPrimary() != primary ) { - if( box.getState().primary() ) - log(0) << "replSet stopping syncTail we are now primary" << rsLog; - return; - } - - syncApply(o); - _logOpObjRS(o); /* with repl sets we write the ops to our oplog too: */ - } int sd = myConfig().slaveDelay; - if( sd ) { + // ignore slaveDelay if the box is still initializing. once + // it becomes secondary we can worry about it. + if( sd && box.getState().secondary() ) { const OpTime ts = o["ts"]._opTime(); long long a = ts.getSecs(); long long b = time(0); @@ -329,13 +400,30 @@ namespace mongo { sleepsecs(6); if( time(0) >= waitUntil ) break; - if( box.getPrimary() != primary ) + if( !target->hbinfo().hbstate.readable() ) { break; + } if( myConfig().slaveDelay != sd ) // reconf break; } } } + + } + + { + writelock lk(""); + + /* if we have become primary, we dont' want to apply things from elsewhere + anymore. assumePrimary is in the db lock so we are safe as long as + we check after we locked above. */ + if( box.getState().primary() ) { + log(0) << "replSet stopping syncTail we are now primary" << rsLog; + return; + } + + syncApply(o); + _logOpObjRS(o); /* with repl sets we write the ops to our oplog too: */ } } } @@ -345,8 +433,9 @@ namespace mongo { // TODO : reuse our connection to the primary. return; } - if( box.getPrimary() != primary ) + if( !target->hbinfo().hbstate.readable() ) { return; + } // looping back is ok because this is a tailable cursor } } @@ -357,15 +446,11 @@ namespace mongo { sleepsecs(1); return; } - if( sp.state.fatal() ) { + if( sp.state.fatal() ) { sleepsecs(5); return; } - /* later, we can sync from up secondaries if we want. tbd. */ - if( sp.primary == 0 ) - return; - /* do we have anything at all? */ if( lastOpTimeWritten.isNull() ) { syncDoInitialSync(); @@ -377,23 +462,64 @@ namespace mongo { } void ReplSetImpl::syncThread() { - if( myConfig().arbiterOnly ) - return; - while( 1 ) { + /* test here was to force a receive timeout + ScopedConn c("localhost"); + bo info; + try { + log() << "this is temp" << endl; + c.runCommand("admin", BSON("sleep"<<120), info); + log() << info.toString() << endl; + c.runCommand("admin", BSON("sleep"<<120), info); + log() << "temp" << endl; + } + catch( DBException& e ) { + log() << e.toString() << endl; + c.runCommand("admin", BSON("sleep"<<120), info); + log() << "temp" << endl; + } + */ + + while( 1 ) { + if( myConfig().arbiterOnly ) + return; + try { _syncThread(); } - catch(DBException& e) { + catch(DBException& e) { sethbmsg("syncThread: " + e.toString()); sleepsecs(10); } - catch(...) { + catch(...) { sethbmsg("unexpected exception in syncThread()"); - // TODO : SET NOT SECONDARY here. + // TODO : SET NOT SECONDARY here? sleepsecs(60); } sleepsecs(1); + + /* normally msgCheckNewState gets called periodically, but in a single node repl set there + are no heartbeat threads, so we do it here to be sure. this is relevant if the singleton + member has done a stepDown() and needs to come back up. + */ + OCCASIONALLY mgr->send( boost::bind(&Manager::msgCheckNewState, theReplSet->mgr) ); + } + } + + void startSyncThread() { + static int n; + if( n != 0 ) { + log() << "replSet ERROR : more than one sync thread?" << rsLog; + assert( n == 0 ); + } + n++; + + Client::initThread("replica set sync"); + cc().iAmSyncThread(); + if (!noauth) { + cc().getAuthenticationInfo()->authorize("local"); } + theReplSet->syncThread(); + cc().shutdown(); } } |