diff options
author | Antonin Kral <a.kral@bobek.cz> | 2011-09-14 17:08:06 +0200 |
---|---|---|
committer | Antonin Kral <a.kral@bobek.cz> | 2011-09-14 17:08:06 +0200 |
commit | 5d342a758c6095b4d30aba0750b54f13b8916f51 (patch) | |
tree | 762e9aa84781f5e3b96db2c02d356c29cf0217c0 /db/repl | |
parent | cbe2d992e9cd1ea66af9fa91df006106775d3073 (diff) | |
download | mongodb-5d342a758c6095b4d30aba0750b54f13b8916f51.tar.gz |
Imported Upstream version 2.0.0
Diffstat (limited to 'db/repl')
-rw-r--r-- | db/repl/connections.h | 15 | ||||
-rw-r--r-- | db/repl/consensus.cpp | 118 | ||||
-rw-r--r-- | db/repl/health.cpp | 55 | ||||
-rw-r--r-- | db/repl/health.h | 14 | ||||
-rw-r--r-- | db/repl/heartbeat.cpp | 89 | ||||
-rw-r--r-- | db/repl/manager.cpp | 62 | ||||
-rw-r--r-- | db/repl/multicmd.h | 6 | ||||
-rw-r--r-- | db/repl/replset_commands.cpp | 97 | ||||
-rw-r--r-- | db/repl/rs.cpp | 270 | ||||
-rw-r--r-- | db/repl/rs.h | 256 | ||||
-rw-r--r-- | db/repl/rs_config.cpp | 348 | ||||
-rw-r--r-- | db/repl/rs_config.h | 144 | ||||
-rw-r--r-- | db/repl/rs_initialsync.cpp | 135 | ||||
-rw-r--r-- | db/repl/rs_initiate.cpp | 41 | ||||
-rw-r--r-- | db/repl/rs_member.h | 19 | ||||
-rw-r--r-- | db/repl/rs_rollback.cpp | 28 | ||||
-rw-r--r-- | db/repl/rs_sync.cpp | 341 |
17 files changed, 1440 insertions, 598 deletions
diff --git a/db/repl/connections.h b/db/repl/connections.h index 7e7bfe5..78cfb30 100644 --- a/db/repl/connections.h +++ b/db/repl/connections.h @@ -20,7 +20,7 @@ #include <map> #include "../../client/dbclient.h" -#include "../security_key.h" +#include "../security_common.h" namespace mongo { @@ -44,13 +44,14 @@ namespace mongo { public: /** throws assertions if connect failure etc. */ ScopedConn(string hostport); - ~ScopedConn(); + ~ScopedConn() { + // conLock releases... + } /* If we were to run a query and not exhaust the cursor, future use of the connection would be problematic. So here what we do is wrapper known safe methods and not allow cursor-style queries at all. This makes ScopedConn limited in functionality but very safe. More non-cursor wrappers can be added here if needed. */ - bool runCommand(const string &dbname, const BSONObj& cmd, BSONObj &info, int options=0) { return conn()->runCommand(dbname, cmd, info, options); } @@ -108,12 +109,4 @@ namespace mongo { } } - inline ScopedConn::~ScopedConn() { - // conLock releases... - } - - /*inline DBClientConnection* ScopedConn::operator->() { - return &x->cc; - }*/ - } diff --git a/db/repl/consensus.cpp b/db/repl/consensus.cpp index dadb22e..fd18cdc 100644 --- a/db/repl/consensus.cpp +++ b/db/repl/consensus.cpp @@ -25,7 +25,49 @@ namespace mongo { public: CmdReplSetFresh() : ReplSetCommand("replSetFresh") { } private: - virtual bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl) { + + bool shouldVeto(const BSONObj& cmdObj, string& errmsg) { + unsigned id = cmdObj["id"].Int(); + const Member* primary = theReplSet->box.getPrimary(); + const Member* hopeful = theReplSet->findById(id); + const Member *highestPriority = theReplSet->getMostElectable(); + + if( !hopeful ) { + errmsg = str::stream() << "replSet couldn't find member with id " << id; + return true; + } + else if( theReplSet->isPrimary() && theReplSet->lastOpTimeWritten >= hopeful->hbinfo().opTime ) { + // hbinfo is not updated, so we have to check the primary's last optime separately + errmsg = str::stream() << "I am already primary, " << hopeful->fullName() << + " can try again once I've stepped down"; + return true; + } + else if( primary && primary->hbinfo().opTime >= hopeful->hbinfo().opTime ) { + // other members might be aware of more up-to-date nodes + errmsg = str::stream() << hopeful->fullName() << " is trying to elect itself but " << + primary->fullName() << " is already primary and more up-to-date"; + return true; + } + else if( highestPriority && highestPriority->config().priority > hopeful->config().priority) { + errmsg = str::stream() << hopeful->fullName() << " has lower priority than " << highestPriority->fullName(); + return true; + } + + // don't veto older versions + if (cmdObj["id"].eoo()) { + // they won't be looking for the veto field + return false; + } + + if (!hopeful || !theReplSet->isElectable(id) || + (highestPriority && highestPriority->config().priority > hopeful->config().priority)) { + return true; + } + + return false; + } + + virtual bool run(const string& , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool fromRepl) { if( !check(errmsg, result) ) return false; @@ -43,11 +85,15 @@ namespace mongo { result.append("info", "config version stale"); weAreFresher = true; } - else if( opTime < theReplSet->lastOpTimeWritten ) { + // check not only our own optime, but any other member we can reach + else if( opTime < theReplSet->lastOpTimeWritten || + opTime < theReplSet->lastOtherOpTime()) { weAreFresher = true; } result.appendDate("opTime", theReplSet->lastOpTimeWritten.asDate()); result.append("fresher", weAreFresher); + result.append("veto", shouldVeto(cmdObj, errmsg)); + return true; } } cmdReplSetFresh; @@ -56,11 +102,9 @@ namespace mongo { public: CmdReplSetElect() : ReplSetCommand("replSetElect") { } private: - virtual bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl) { + virtual bool run(const string& , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool fromRepl) { if( !check(errmsg, result) ) return false; - //task::lam f = boost::bind(&Consensus::electCmdReceived, &theReplSet->elect, cmdObj, &result); - //theReplSet->mgr->call(f); theReplSet->elect.electCmdReceived(cmdObj, &result); return true; } @@ -91,6 +135,10 @@ namespace mongo { if( dt < T ) vUp += m->config().votes; } + + // the manager will handle calling stepdown if another node should be + // primary due to priority + return !( vUp * 2 > totalVotes() ); } @@ -98,17 +146,19 @@ namespace mongo { const time_t LeaseTime = 30; + mutex Consensus::lyMutex("ly"); + unsigned Consensus::yea(unsigned memberId) { /* throws VoteException */ - Atomic<LastYea>::tran t(ly); - LastYea &ly = t.ref(); + mutex::scoped_lock lk(lyMutex); + LastYea &L = this->ly.ref(lk); time_t now = time(0); - if( ly.when + LeaseTime >= now && ly.who != memberId ) { - log(1) << "replSet not voting yea for " << memberId << - " voted for " << ly.who << ' ' << now-ly.when << " secs ago" << rsLog; + if( L.when + LeaseTime >= now && L.who != memberId ) { + LOG(1) << "replSet not voting yea for " << memberId << + " voted for " << L.who << ' ' << now-L.when << " secs ago" << rsLog; throw VoteException(); } - ly.when = now; - ly.who = memberId; + L.when = now; + L.who = memberId; return rs._self->config().votes; } @@ -116,8 +166,8 @@ namespace mongo { place instead of leaving it for a long time. */ void Consensus::electionFailed(unsigned meid) { - Atomic<LastYea>::tran t(ly); - LastYea &L = t.ref(); + mutex::scoped_lock lk(lyMutex); + LastYea &L = ly.ref(lk); DEV assert( L.who == meid ); // this may not always always hold, so be aware, but adding for now as a quick sanity test if( L.who == meid ) L.when = 0; @@ -127,7 +177,7 @@ namespace mongo { void Consensus::electCmdReceived(BSONObj cmd, BSONObjBuilder* _b) { BSONObjBuilder& b = *_b; DEV log() << "replSet received elect msg " << cmd.toString() << rsLog; - else log(2) << "replSet received elect msg " << cmd.toString() << rsLog; + else LOG(2) << "replSet received elect msg " << cmd.toString() << rsLog; string set = cmd["set"].String(); unsigned whoid = cmd["whoid"].Int(); int cfgver = cmd["cfgver"].Int(); @@ -136,22 +186,22 @@ namespace mongo { const Member* primary = rs.box.getPrimary(); const Member* hopeful = rs.findById(whoid); + const Member* highestPriority = rs.getMostElectable(); int vote = 0; if( set != rs.name() ) { log() << "replSet error received an elect request for '" << set << "' but our set name is '" << rs.name() << "'" << rsLog; - } else if( myver < cfgver ) { // we are stale. don't vote } else if( myver > cfgver ) { // they are stale! - log() << "replSet info got stale version # during election" << rsLog; + log() << "replSet electCmdReceived info got stale version # during election" << rsLog; vote = -10000; } else if( !hopeful ) { - log() << "couldn't find member with id " << whoid << rsLog; + log() << "replSet electCmdReceived couldn't find member with id " << whoid << rsLog; vote = -10000; } else if( primary && primary == rs._self && rs.lastOpTimeWritten >= hopeful->hbinfo().opTime ) { @@ -166,14 +216,19 @@ namespace mongo { primary->fullName() << " is already primary and more up-to-date" << rsLog; vote = -10000; } + else if( highestPriority && highestPriority->config().priority > hopeful->config().priority) { + log() << hopeful->fullName() << " has lower priority than " << highestPriority->fullName(); + vote = -10000; + } else { try { vote = yea(whoid); + dassert( hopeful->id() == whoid ); rs.relinquish(); - log() << "replSet info voting yea for " << whoid << rsLog; + log() << "replSet info voting yea for " << hopeful->fullName() << " (" << whoid << ')' << rsLog; } catch(VoteException&) { - log() << "replSet voting no already voted for another" << rsLog; + log() << "replSet voting no for " << hopeful->fullName() << " already voted for another" << rsLog; } } @@ -212,7 +267,8 @@ namespace mongo { "set" << rs.name() << "opTime" << Date_t(ord.asDate()) << "who" << rs._self->fullName() << - "cfgver" << rs._cfg->version ); + "cfgver" << rs._cfg->version << + "id" << rs._self->id()); list<Target> L; int ver; /* the following queries arbiters, even though they are never fresh. wonder if that makes sense. @@ -228,19 +284,33 @@ namespace mongo { for( list<Target>::iterator i = L.begin(); i != L.end(); i++ ) { if( i->ok ) { nok++; - if( i->result["fresher"].trueValue() ) + if( i->result["fresher"].trueValue() ) { + log() << "not electing self, we are not freshest" << rsLog; return false; + } OpTime remoteOrd( i->result["opTime"].Date() ); if( remoteOrd == ord ) nTies++; assert( remoteOrd <= ord ); + + if( i->result["veto"].trueValue() ) { + BSONElement msg = i->result["errmsg"]; + if (!msg.eoo()) { + log() << "not electing self, " << i->toHost << " would veto with '" << + msg.String() << "'" << rsLog; + } + else { + log() << "not electing self, " << i->toHost << " would veto" << rsLog; + } + return false; + } } else { DEV log() << "replSet freshest returns " << i->result.toString() << rsLog; allUp = false; } } - log(1) << "replSet dev we are freshest of up nodes, nok:" << nok << " nTies:" << nTies << rsLog; + LOG(1) << "replSet dev we are freshest of up nodes, nok:" << nok << " nTies:" << nTies << rsLog; assert( ord <= theReplSet->lastOpTimeWritten ); // <= as this may change while we are working... return true; } @@ -267,7 +337,6 @@ namespace mongo { bool allUp; int nTies; if( !weAreFreshest(allUp, nTies) ) { - log() << "replSet info not electing self, we are not freshest" << rsLog; return; } @@ -324,7 +393,6 @@ namespace mongo { multiCommand(electCmd, L); { - RSBase::lock lk(&rs); for( list<Target>::iterator i = L.begin(); i != L.end(); i++ ) { DEV log() << "replSet elect res: " << i->result.toString() << rsLog; if( i->ok ) { diff --git a/db/repl/health.cpp b/db/repl/health.cpp index 762ca90..711b457 100644 --- a/db/repl/health.cpp +++ b/db/repl/health.cpp @@ -32,7 +32,6 @@ #include "../dbhelpers.h" namespace mongo { - /* decls for connections.h */ ScopedConn::M& ScopedConn::_map = *(new ScopedConn::M()); mutex ScopedConn::mapMutex("ScopedConn::mapMutex"); @@ -43,9 +42,9 @@ namespace mongo { using namespace mongoutils::html; using namespace bson; - static RamLog _rsLog; - Tee *rsLog = &_rsLog; - extern bool replSetBlind; + static RamLog * _rsLog = new RamLog( "rs" ); + Tee *rsLog = _rsLog; + extern bool replSetBlind; // for testing string ago(time_t t) { if( t == 0 ) return ""; @@ -126,19 +125,6 @@ namespace mongo { return ""; } - string MemberState::toString() const { - if( s == MemberState::RS_STARTUP ) return "STARTUP"; - if( s == MemberState::RS_PRIMARY ) return "PRIMARY"; - if( s == MemberState::RS_SECONDARY ) return "SECONDARY"; - if( s == MemberState::RS_RECOVERING ) return "RECOVERING"; - if( s == MemberState::RS_FATAL ) return "FATAL"; - if( s == MemberState::RS_STARTUP2 ) return "STARTUP2"; - if( s == MemberState::RS_ARBITER ) return "ARBITER"; - if( s == MemberState::RS_DOWN ) return "DOWN"; - if( s == MemberState::RS_ROLLBACK ) return "ROLLBACK"; - return ""; - } - extern time_t started; // oplogdiags in web ui @@ -208,8 +194,8 @@ namespace mongo { ss << "<style type=\"text/css\" media=\"screen\">" "table { font-size:75% }\n" -// "th { background-color:#bbb; color:#000 }\n" -// "td,th { padding:.25em }\n" + // "th { background-color:#bbb; color:#000 }\n" + // "td,th { padding:.25em }\n" "</style>\n"; ss << table(h, true); @@ -306,6 +292,8 @@ namespace mongo { myMinValid = "exception fetching minvalid"; } + const Member *_self = this->_self; + assert(_self); { stringstream s; /* self row */ @@ -340,20 +328,40 @@ namespace mongo { void fillRsLog(stringstream& s) { - _rsLog.toHTML( s ); + _rsLog->toHTML( s ); } const Member* ReplSetImpl::findById(unsigned id) const { - if( id == _self->id() ) return _self; + if( _self && id == _self->id() ) return _self; + for( Member *m = head(); m; m = m->next() ) if( m->id() == id ) return m; return 0; } + + const OpTime ReplSetImpl::lastOtherOpTime() const { + OpTime closest(0,0); + + for( Member *m = _members.head(); m; m=m->next() ) { + if (!m->hbinfo().up()) { + continue; + } + + if (m->hbinfo().opTime > closest) { + closest = m->hbinfo().opTime; + } + } + + return closest; + } void ReplSetImpl::_summarizeStatus(BSONObjBuilder& b) const { vector<BSONObj> v; + const Member *_self = this->_self; + assert( _self ); + // add self { BSONObjBuilder bb; @@ -390,6 +398,7 @@ namespace mongo { bb.appendTimestamp("optime", m->hbinfo().opTime.asDate()); bb.appendDate("optimeDate", m->hbinfo().opTime.getSecs() * 1000LL); bb.appendTimeT("lastHeartbeat", m->hbinfo().lastHeartbeat); + bb.append("pingMs", m->hbinfo().ping); string s = m->lhb(); if( !s.empty() ) bb.append("errmsg", s); @@ -400,6 +409,10 @@ namespace mongo { b.append("set", name()); b.appendTimeT("date", time(0)); b.append("myState", box.getState().s); + const Member *syncTarget = _currentSyncTarget; + if (syncTarget) { + b.append("syncingTo", syncTarget->fullName()); + } b.append("members", v); if( replSetBlind ) b.append("blind",true); // to avoid confusion if set...normally never set except for testing. diff --git a/db/repl/health.h b/db/repl/health.h index a32db00..55cca93 100644 --- a/db/repl/health.h +++ b/db/repl/health.h @@ -24,11 +24,11 @@ namespace mongo { bool requestHeartbeat(string setname, string fromHost, string memberFullName, BSONObj& result, int myConfigVersion, int& theirConfigVersion, bool checkEmpty = false); struct HealthOptions { - HealthOptions() { - heartbeatSleepMillis = 2000; - heartbeatTimeoutMillis = 10000; - heartbeatConnRetries = 2; - } + HealthOptions() : + heartbeatSleepMillis(2000), + heartbeatTimeoutMillis( 10000 ), + heartbeatConnRetries(2) + { } bool isDefault() const { return *this == HealthOptions(); } @@ -43,8 +43,8 @@ namespace mongo { } bool operator==(const HealthOptions& r) const { - return heartbeatSleepMillis==r.heartbeatSleepMillis && heartbeatTimeoutMillis==r.heartbeatTimeoutMillis && heartbeatConnRetries==heartbeatConnRetries; + return heartbeatSleepMillis==r.heartbeatSleepMillis && heartbeatTimeoutMillis==r.heartbeatTimeoutMillis && heartbeatConnRetries==r.heartbeatConnRetries; } }; - + } diff --git a/db/repl/heartbeat.cpp b/db/repl/heartbeat.cpp index 3972466..7d3f78c 100644 --- a/db/repl/heartbeat.cpp +++ b/db/repl/heartbeat.cpp @@ -30,15 +30,16 @@ #include "connections.h" #include "../../util/unittest.h" #include "../instance.h" +#include "../repl.h" namespace mongo { using namespace bson; extern bool replSetBlind; + extern ReplSettings replSettings; - // hacky - string *discoveredSeed = 0; + unsigned int HeartbeatInfo::numPings; long long HeartbeatInfo::timeDown() const { if( up() ) return 0; @@ -52,7 +53,7 @@ namespace mongo { public: virtual bool adminOnly() const { return false; } CmdReplSetHeartbeat() : ReplSetCommand("replSetHeartbeat") { } - virtual bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl) { + virtual bool run(const string& , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool fromRepl) { if( replSetBlind ) return false; @@ -63,9 +64,13 @@ namespace mongo { return false; } + if (!checkAuth(errmsg, result)) { + return false; + } + /* we want to keep heartbeat connections open when relinquishing primary. tag them here. */ { - MessagingPort *mp = cc().port(); + AbstractMessagingPort *mp = cc().port(); if( mp ) mp->tag |= 1; } @@ -78,8 +83,8 @@ namespace mongo { string s = string(cmdObj.getStringField("replSetHeartbeat")); if( cmdLine.ourSetName() != s ) { errmsg = "repl set names do not match"; - log() << "cmdline: " << cmdLine._replSet << endl; - log() << "s: " << s << endl; + log() << "replSet set names do not match, our cmdline: " << cmdLine._replSet << rsLog; + log() << "replSet s: " << s << rsLog; result.append("mismatch", true); return false; } @@ -91,8 +96,8 @@ namespace mongo { } if( theReplSet == 0 ) { string from( cmdObj.getStringField("from") ); - if( !from.empty() && discoveredSeed == 0 ) { - discoveredSeed = new string(from); + if( !from.empty() ) { + replSettings.discoveredSeeds.insert(from); } errmsg = "still initializing"; return false; @@ -105,6 +110,7 @@ namespace mongo { } result.append("set", theReplSet->name()); result.append("state", theReplSet->state().s); + result.append("e", theReplSet->iAmElectable()); result.append("hbmsg", theReplSet->hbmsg()); result.append("time", (long long) time(0)); result.appendDate("opTime", theReplSet->lastOpTimeWritten.asDate()); @@ -144,10 +150,10 @@ namespace mongo { public: ReplSetHealthPollTask(const HostAndPort& hh, const HeartbeatInfo& mm) : h(hh), m(mm) { } - string name() const { return "ReplSetHealthPollTask"; } + string name() const { return "rsHealthPoll"; } void doWork() { if ( !theReplSet ) { - log(2) << "theReplSet not initialized yet, skipping health poll this round" << rsLog; + LOG(2) << "replSet not initialized yet, skipping health poll this round" << rsLog; return; } @@ -157,11 +163,22 @@ namespace mongo { BSONObj info; int theirConfigVersion = -10000; - time_t before = time(0); + Timer timer; bool ok = requestHeartbeat(theReplSet->name(), theReplSet->selfFullName(), h.toString(), info, theReplSet->config().version, theirConfigVersion); - time_t after = mem.lastHeartbeat = time(0); // we set this on any response - we don't get this far if couldn't connect because exception is thrown + mem.ping = (unsigned int)timer.millis(); + + time_t before = timer.startTime() / 1000000; + // we set this on any response - we don't get this far if + // couldn't connect because exception is thrown + time_t after = mem.lastHeartbeat = before + (mem.ping / 1000); + + // weight new ping with old pings + // on the first ping, just use the ping value + if (old.ping != 0) { + mem.ping = (unsigned int)((old.ping * .8) + (mem.ping * .2)); + } if ( info["time"].isNumber() ) { long long t = info["time"].numberLong(); @@ -183,8 +200,10 @@ namespace mongo { mem.hbstate = MemberState(state.Int()); } if( ok ) { + HeartbeatInfo::numPings++; + if( mem.upSince == 0 ) { - log() << "replSet info " << h.toString() << " is up" << rsLog; + log() << "replSet info member " << h.toString() << " is up" << rsLog; mem.upSince = mem.lastHeartbeat; } mem.health = 1.0; @@ -192,6 +211,30 @@ namespace mongo { if( info.hasElement("opTime") ) mem.opTime = info["opTime"].Date(); + // see if this member is in the electable set + if( info["e"].eoo() ) { + // for backwards compatibility + const Member *member = theReplSet->findById(mem.id()); + if (member && member->config().potentiallyHot()) { + theReplSet->addToElectable(mem.id()); + } + else { + theReplSet->rmFromElectable(mem.id()); + } + } + // add this server to the electable set if it is within 10 + // seconds of the latest optime we know of + else if( info["e"].trueValue() && + mem.opTime >= theReplSet->lastOpTimeWritten.getSecs() - 10) { + unsigned lastOp = theReplSet->lastOtherOpTime().getSecs(); + if (lastOp > 0 && mem.opTime >= lastOp - 10) { + theReplSet->addToElectable(mem.id()); + } + } + else { + theReplSet->rmFromElectable(mem.id()); + } + be cfg = info["config"]; if( cfg.ok() ) { // received a new config @@ -208,7 +251,7 @@ namespace mongo { down(mem, e.what()); } catch(...) { - down(mem, "something unusual went wrong"); + down(mem, "replSet unexpected exception in ReplSetHealthPollTask"); } m = mem; @@ -219,7 +262,7 @@ namespace mongo { bool changed = mem.changed(old); if( changed ) { if( old.hbstate != mem.hbstate ) - log() << "replSet member " << h.toString() << ' ' << mem.hbstate.toString() << rsLog; + log() << "replSet member " << h.toString() << " is now in state " << mem.hbstate.toString() << rsLog; } if( changed || now-last>4 ) { last = now; @@ -230,12 +273,15 @@ namespace mongo { private: void down(HeartbeatInfo& mem, string msg) { mem.health = 0.0; + mem.ping = 0; if( mem.upSince || mem.downSince == 0 ) { mem.upSince = 0; mem.downSince = jsTime(); + mem.hbstate = MemberState::RS_DOWN; log() << "replSet info " << h.toString() << " is down (or slow to respond): " << msg << rsLog; } mem.lastHeartbeatMsg = msg; + theReplSet->rmFromElectable(mem.id()); } }; @@ -262,18 +308,13 @@ namespace mongo { */ void ReplSetImpl::startThreads() { task::fork(mgr); - - /*Member* m = _members.head(); - while( m ) { - ReplSetHealthPollTask *task = new ReplSetHealthPollTask(m->h(), m->hbinfo()); - healthTasks.insert(task); - task::repeat(shared_ptr<task::Task>(task), 2000); - m = m->next(); - }*/ - mgr->send( boost::bind(&Manager::msgCheckNewState, theReplSet->mgr) ); boost::thread t(startSyncThread); + + task::fork(ghost); + + // member heartbeats are started in ReplSetImpl::initFromConfig } } diff --git a/db/repl/manager.cpp b/db/repl/manager.cpp index d2e0764..3c4c0eb 100644 --- a/db/repl/manager.cpp +++ b/db/repl/manager.cpp @@ -19,6 +19,7 @@ #include "pch.h" #include "rs.h" +#include "connections.h" #include "../client.h" namespace mongo { @@ -50,7 +51,7 @@ namespace mongo { } Manager::Manager(ReplSetImpl *_rs) : - task::Server("rs Manager"), rs(_rs), busyWithElectSelf(false), _primary(NOPRIMARY) { + task::Server("rsMgr"), rs(_rs), busyWithElectSelf(false), _primary(NOPRIMARY) { } Manager::~Manager() { @@ -63,10 +64,8 @@ namespace mongo { } void Manager::starting() { - Client::initThread("rs Manager"); - if (!noauth) { - cc().getAuthenticationInfo()->authorize("local"); - } + Client::initThread("rsMgr"); + replLocalAuth(); } void Manager::noteARemoteIsPrimary(const Member *m) { @@ -81,6 +80,45 @@ namespace mongo { } } + void Manager::checkElectableSet() { + unsigned otherOp = rs->lastOtherOpTime().getSecs(); + + // make sure the electable set is up-to-date + if (rs->elect.aMajoritySeemsToBeUp() && + rs->iAmPotentiallyHot() && + (otherOp == 0 || rs->lastOpTimeWritten.getSecs() >= otherOp - 10)) { + theReplSet->addToElectable(rs->selfId()); + } + else { + theReplSet->rmFromElectable(rs->selfId()); + } + + // check if we should ask the primary (possibly ourselves) to step down + const Member *highestPriority = theReplSet->getMostElectable(); + const Member *primary = rs->box.getPrimary(); + + if (primary && highestPriority && + highestPriority->config().priority > primary->config().priority) { + log() << "stepping down " << primary->fullName() << endl; + + if (primary->h().isSelf()) { + // replSetStepDown tries to acquire the same lock + // msgCheckNewState takes, so we can't call replSetStepDown on + // ourselves. + rs->relinquish(); + } + else { + BSONObj cmd = BSON( "replSetStepDown" << 1 ); + ScopedConn conn(primary->fullName()); + BSONObj result; + if (!conn.runCommand("admin", cmd, result, 0)) { + log() << "stepping down " << primary->fullName() + << " failed: " << result << endl; + } + } + } + } + /** called as the health threads get new results */ void Manager::msgCheckNewState() { { @@ -90,7 +128,9 @@ namespace mongo { RSBase::lock lk(rs); if( busyWithElectSelf ) return; - + + checkElectableSet(); + const Member *p = rs->box.getPrimary(); if( p && p != rs->_self ) { if( !p->hbinfo().up() || @@ -154,7 +194,7 @@ namespace mongo { } if( rs->elect.shouldRelinquish() ) { - log() << "replSet can't see a majority of the set, relinquishing primary" << rsLog; + log() << "can't see a majority of the set, relinquishing primary" << rsLog; rs->relinquish(); } @@ -163,9 +203,7 @@ namespace mongo { if( !rs->iAmPotentiallyHot() ) // if not we never try to be primary return; - - /* TODO : CHECK PRIORITY HERE. can't be elected if priority zero. */ - + /* no one seems to be primary. shall we try to elect ourself? */ if( !rs->elect.aMajoritySeemsToBeUp() ) { static time_t last; @@ -178,6 +216,10 @@ namespace mongo { return; } + if( !rs->iAmElectable() ) { + return; + } + busyWithElectSelf = true; // don't try to do further elections & such while we are already working on one. } try { diff --git a/db/repl/multicmd.h b/db/repl/multicmd.h index df7c4e5..99dabea 100644 --- a/db/repl/multicmd.h +++ b/db/repl/multicmd.h @@ -53,16 +53,16 @@ namespace mongo { }; inline void multiCommand(BSONObj cmd, list<Target>& L) { - list<BackgroundJob *> jobs; + list< shared_ptr<BackgroundJob> > jobs; for( list<Target>::iterator i = L.begin(); i != L.end(); i++ ) { Target& d = *i; _MultiCommandJob *j = new _MultiCommandJob(cmd, d); + jobs.push_back( shared_ptr<BackgroundJob>(j) ); j->go(); - jobs.push_back(j); } - for( list<BackgroundJob*>::iterator i = jobs.begin(); i != jobs.end(); i++ ) { + for( list< shared_ptr<BackgroundJob> >::iterator i = jobs.begin(); i != jobs.end(); i++ ) { (*i)->wait(); } } diff --git a/db/repl/replset_commands.cpp b/db/repl/replset_commands.cpp index 1d110ac..68dab7e 100644 --- a/db/repl/replset_commands.cpp +++ b/db/repl/replset_commands.cpp @@ -17,6 +17,7 @@ #include "pch.h" #include "../cmdline.h" #include "../commands.h" +#include "../repl.h" #include "health.h" #include "rs.h" #include "rs_config.h" @@ -28,7 +29,7 @@ using namespace bson; namespace mongo { - void checkMembersUpForConfigChange(const ReplSetConfig& cfg, bool initial); + void checkMembersUpForConfigChange(const ReplSetConfig& cfg, BSONObjBuilder& result, bool initial); /* commands in other files: replSetHeartbeat - health.cpp @@ -44,14 +45,18 @@ namespace mongo { help << "Just for regression tests.\n"; } CmdReplSetTest() : ReplSetCommand("replSetTest") { } - virtual bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl) { + virtual bool run(const string& , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool fromRepl) { log() << "replSet replSetTest command received: " << cmdObj.toString() << rsLog; + + if (!checkAuth(errmsg, result)) { + return false; + } + if( cmdObj.hasElement("forceInitialSyncFailure") ) { replSetForceInitialSyncFailure = (unsigned) cmdObj["forceInitialSyncFailure"].Number(); return true; } - // may not need this, but if removed check all tests still work: if( !check(errmsg, result) ) return false; @@ -63,7 +68,10 @@ namespace mongo { } } cmdReplSetTest; - /** get rollback id */ + /** get rollback id. used to check if a rollback happened during some interval of time. + as consumed, the rollback id is not in any particular order, it simply changes on each rollback. + @see incRBID() + */ class CmdReplSetGetRBID : public ReplSetCommand { public: /* todo: ideally this should only change on rollbacks NOT on mongod restarts also. fix... */ @@ -72,9 +80,11 @@ namespace mongo { help << "internal"; } CmdReplSetGetRBID() : ReplSetCommand("replSetGetRBID") { - rbid = (int) curTimeMillis(); + // this is ok but micros or combo with some rand() and/or 64 bits might be better -- + // imagine a restart and a clock correction simultaneously (very unlikely but possible...) + rbid = (int) curTimeMillis64(); } - virtual bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl) { + virtual bool run(const string& , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool fromRepl) { if( !check(errmsg, result) ) return false; result.append("rbid",rbid); @@ -102,7 +112,7 @@ namespace mongo { help << "\nhttp://www.mongodb.org/display/DOCS/Replica+Set+Commands"; } CmdReplSetGetStatus() : ReplSetCommand("replSetGetStatus", true) { } - virtual bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl) { + virtual bool run(const string& , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool fromRepl) { if ( cmdObj["forShell"].trueValue() ) lastError.disableForCommand(); @@ -122,20 +132,38 @@ namespace mongo { help << "\nhttp://www.mongodb.org/display/DOCS/Replica+Set+Commands"; } CmdReplSetReconfig() : ReplSetCommand("replSetReconfig"), mutex("rsreconfig") { } - virtual bool run(const string& a, BSONObj& b, string& errmsg, BSONObjBuilder& c, bool d) { + virtual bool run(const string& a, BSONObj& b, int e, string& errmsg, BSONObjBuilder& c, bool d) { try { rwlock_try_write lk(mutex); - return _run(a,b,errmsg,c,d); + return _run(a,b,e,errmsg,c,d); } catch(rwlock_try_write::exception&) { } errmsg = "a replSetReconfig is already in progress"; return false; } private: - bool _run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl) { - if( !check(errmsg, result) ) + bool _run(const string& , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool fromRepl) { + if ( !checkAuth(errmsg, result) ) { return false; - if( !theReplSet->box.getState().primary() ) { + } + + if( cmdObj["replSetReconfig"].type() != Object ) { + errmsg = "no configuration specified"; + return false; + } + + bool force = cmdObj.hasField("force") && cmdObj["force"].trueValue(); + if( force && !theReplSet ) { + replSettings.reconfig = cmdObj["replSetReconfig"].Obj().getOwned(); + result.append("msg", "will try this config momentarily, try running rs.conf() again in a few seconds"); + return true; + } + + if ( !check(errmsg, result) ) { + return false; + } + + if( !force && !theReplSet->box.getState().primary() ) { errmsg = "replSetReconfig command must be sent to the current replica set primary."; return false; } @@ -152,18 +180,8 @@ namespace mongo { } } - if( cmdObj["replSetReconfig"].type() != Object ) { - errmsg = "no configuration specified"; - return false; - } - - /** TODO - Support changes when a majority, but not all, members of a set are up. - Determine what changes should not be allowed as they would cause erroneous states. - What should be possible when a majority is not up? - */ try { - ReplSetConfig newConfig(cmdObj["replSetReconfig"].Obj()); + ReplSetConfig newConfig(cmdObj["replSetReconfig"].Obj(), force); log() << "replSet replSetReconfig config object parses ok, " << newConfig.members.size() << " members specified" << rsLog; @@ -171,12 +189,12 @@ namespace mongo { return false; } - checkMembersUpForConfigChange(newConfig,false); + checkMembersUpForConfigChange(newConfig, result, false); log() << "replSet replSetReconfig [2]" << rsLog; theReplSet->haveNewConfig(newConfig, true); - ReplSet::startupStatusMsg = "replSetReconfig'd"; + ReplSet::startupStatusMsg.set("replSetReconfig'd"); } catch( DBException& e ) { log() << "replSet replSetReconfig exception: " << e.what() << rsLog; @@ -199,7 +217,7 @@ namespace mongo { } CmdReplSetFreeze() : ReplSetCommand("replSetFreeze") { } - virtual bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl) { + virtual bool run(const string& , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool fromRepl) { if( !check(errmsg, result) ) return false; int secs = (int) cmdObj.firstElement().numberInt(); @@ -223,13 +241,38 @@ namespace mongo { } CmdReplSetStepDown() : ReplSetCommand("replSetStepDown") { } - virtual bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl) { + virtual bool run(const string& , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool fromRepl) { if( !check(errmsg, result) ) return false; if( !theReplSet->box.getState().primary() ) { errmsg = "not primary so can't step down"; return false; } + + bool force = cmdObj.hasField("force") && cmdObj["force"].trueValue(); + + // only step down if there is another node synced to within 10 + // seconds of this node + if (!force) { + long long int lastOp = (long long int)theReplSet->lastOpTimeWritten.getSecs(); + long long int closest = (long long int)theReplSet->lastOtherOpTime().getSecs(); + + long long int diff = lastOp - closest; + result.append("closest", closest); + result.append("difference", diff); + + if (diff < 0) { + // not our problem, but we'll wait until thing settle down + errmsg = "someone is ahead of the primary?"; + return false; + } + + if (diff > 10) { + errmsg = "no secondaries within 10 seconds of my optime"; + return false; + } + } + int secs = (int) cmdObj.firstElement().numberInt(); if( secs == 0 ) secs = 60; diff --git a/db/repl/rs.cpp b/db/repl/rs.cpp index bbfb057..1fbbc10 100644 --- a/db/repl/rs.cpp +++ b/db/repl/rs.cpp @@ -16,7 +16,7 @@ #include "pch.h" #include "../cmdline.h" -#include "../../util/sock.h" +#include "../../util/net/sock.h" #include "../client.h" #include "../../client/dbclient.h" #include "../dbhelpers.h" @@ -24,14 +24,20 @@ #include "rs.h" #include "connections.h" #include "../repl.h" +#include "../instance.h" -namespace mongo { +using namespace std; +namespace mongo { + using namespace bson; bool replSet = false; ReplSet *theReplSet = 0; - extern string *discoveredSeed; + + bool isCurrentlyAReplSetPrimary() { + return theReplSet && theReplSet->isPrimary(); + } void ReplSetImpl::sethbmsg(string s, int logLevel) { static time_t lastLogged; @@ -57,21 +63,71 @@ namespace mongo { } void ReplSetImpl::assumePrimary() { + LOG(2) << "replSet assuming primary" << endl; assert( iAmPotentiallyHot() ); writelock lk("admin."); // so we are synchronized with _logOp() - box.setSelfPrimary(_self); - //log() << "replSet PRIMARY" << rsLog; // self (" << _self->id() << ") is now primary" << rsLog; + + // Make sure that new OpTimes are higher than existing ones even with clock skew + DBDirectClient c; + BSONObj lastOp = c.findOne( "local.oplog.rs", Query().sort(reverseNaturalObj), NULL, QueryOption_SlaveOk ); + if ( !lastOp.isEmpty() ) { + OpTime::setLast( lastOp[ "ts" ].date() ); + } + + changeState(MemberState::RS_PRIMARY); } void ReplSetImpl::changeState(MemberState s) { box.change(s, _self); } + void ReplSetImpl::setMaintenanceMode(const bool inc) { + lock lk(this); + + if (inc) { + log() << "replSet going into maintenance mode (" << _maintenanceMode << " other tasks)" << rsLog; + + _maintenanceMode++; + changeState(MemberState::RS_RECOVERING); + } + else { + _maintenanceMode--; + // no need to change state, syncTail will try to go live as a secondary soon + + log() << "leaving maintenance mode (" << _maintenanceMode << " other tasks)" << rsLog; + } + } + + Member* ReplSetImpl::getMostElectable() { + lock lk(this); + + Member *max = 0; + + for (set<unsigned>::iterator it = _electableSet.begin(); it != _electableSet.end(); it++) { + const Member *temp = findById(*it); + if (!temp) { + log() << "couldn't find member: " << *it << endl; + _electableSet.erase(*it); + continue; + } + if (!max || max->config().priority < temp->config().priority) { + max = (Member*)temp; + } + } + + return max; + } + const bool closeOnRelinquish = true; void ReplSetImpl::relinquish() { + LOG(2) << "replSet attempting to relinquish" << endl; if( box.getState().primary() ) { - log() << "replSet relinquishing primary state" << rsLog; - changeState(MemberState::RS_SECONDARY); - + { + writelock lk("admin."); // so we are synchronized with _logOp() + + log() << "replSet relinquishing primary state" << rsLog; + changeState(MemberState::RS_SECONDARY); + } + if( closeOnRelinquish ) { /* close sockets that were talking to us so they don't blithly send many writes that will fail with "not master" (of course client could check result code, but in case they are not) @@ -173,6 +229,8 @@ namespace mongo { } void ReplSetImpl::_fillIsMaster(BSONObjBuilder& b) { + lock lk(this); + const StateBox::SP sp = box.get(); bool isp = sp.state.primary(); b.append("setName", name()); @@ -203,9 +261,13 @@ namespace mongo { if( m ) b.append("primary", m->h().toString()); } + else { + b.append("primary", _self->fullName()); + } + if( myConfig().arbiterOnly ) b.append("arbiterOnly", true); - if( myConfig().priority == 0 ) + if( myConfig().priority == 0 && !myConfig().arbiterOnly) b.append("passive", true); if( myConfig().slaveDelay ) b.append("slaveDelay", myConfig().slaveDelay); @@ -213,6 +275,13 @@ namespace mongo { b.append("hidden", true); if( !myConfig().buildIndexes ) b.append("buildIndexes", false); + if( !myConfig().tags.empty() ) { + BSONObjBuilder a; + for( map<string,string>::const_iterator i = myConfig().tags.begin(); i != myConfig().tags.end(); i++ ) + a.append((*i).first, (*i).second); + b.append("tags", a.done()); + } + b.append("me", myConfig().h.toString()); } /** @param cfgString <setname>/<seedhost1>,<seedhost2> */ @@ -259,19 +328,22 @@ namespace mongo { } ReplSetImpl::ReplSetImpl(ReplSetCmdline& replSetCmdline) : elect(this), + _currentSyncTarget(0), + _hbmsgTime(0), _self(0), - mgr( new Manager(this) ) { + _maintenanceMode(0), + mgr( new Manager(this) ), + ghost( new GhostSync(this) ) { + _cfg = 0; memset(_hbmsg, 0, sizeof(_hbmsg)); - *_hbmsg = '.'; // temp...just to see + strcpy( _hbmsg , "initial startup" ); lastH = 0; changeState(MemberState::RS_STARTUP); _seeds = &replSetCmdline.seeds; - //for( vector<HostAndPort>::iterator i = seeds->begin(); i != seeds->end(); i++ ) - // addMemberIfMissing(*i); - log(1) << "replSet beginning startup..." << rsLog; + LOG(1) << "replSet beginning startup..." << rsLog; loadConfig(); @@ -282,7 +354,7 @@ namespace mongo { for( set<HostAndPort>::iterator i = replSetCmdline.seedSet.begin(); i != replSetCmdline.seedSet.end(); i++ ) { if( i->isSelf() ) { if( sss == 1 ) - log(1) << "replSet warning self is listed in the seed list and there are no other seeds listed did you intend that?" << rsLog; + LOG(1) << "replSet warning self is listed in the seed list and there are no other seeds listed did you intend that?" << rsLog; } else log() << "replSet warning command line seed " << i->toString() << " is not present in the current repl set config" << rsLog; @@ -291,14 +363,13 @@ namespace mongo { void newReplUp(); - void ReplSetImpl::loadLastOpTimeWritten() { - //assert( lastOpTimeWritten.isNull() ); + void ReplSetImpl::loadLastOpTimeWritten(bool quiet) { readlock lk(rsoplog); BSONObj o; if( Helpers::getLast(rsoplog, o) ) { lastH = o["h"].numberLong(); lastOpTimeWritten = o["ts"]._opTime(); - uassert(13290, "bad replSet oplog entry?", !lastOpTimeWritten.isNull()); + uassert(13290, "bad replSet oplog entry?", quiet || !lastOpTimeWritten.isNull()); } } @@ -326,7 +397,10 @@ namespace mongo { extern BSONObj *getLastErrorDefault; void ReplSetImpl::setSelfTo(Member *m) { + // already locked in initFromConfig _self = m; + _id = m->id(); + _config = m->config(); if( m ) _buildIndexes = m->config().buildIndexes; else _buildIndexes = true; } @@ -345,29 +419,32 @@ namespace mongo { getLastErrorDefault = new BSONObj( c.getLastErrorDefaults ); } - list<const ReplSetConfig::MemberCfg*> newOnes; + list<ReplSetConfig::MemberCfg*> newOnes; + // additive short-cuts the new config setup. If we are just adding a + // node/nodes and nothing else is changing, this is additive. If it's + // not a reconfig, we're not adding anything bool additive = reconf; { unsigned nfound = 0; int me = 0; for( vector<ReplSetConfig::MemberCfg>::iterator i = c.members.begin(); i != c.members.end(); i++ ) { - const ReplSetConfig::MemberCfg& m = *i; + + ReplSetConfig::MemberCfg& m = *i; if( m.h.isSelf() ) { - nfound++; me++; - if( !reconf || (_self && _self->id() == (unsigned) m._id) ) - ; - else { - log() << "replSet " << _self->id() << ' ' << m._id << rsLog; + } + + if( reconf ) { + if (m.h.isSelf() && (!_self || (int)_self->id() != m._id)) { + log() << "self doesn't match: " << m._id << rsLog; assert(false); } - } - else if( reconf ) { + const Member *old = findById(m._id); if( old ) { nfound++; assert( (int) old->id() == m._id ); - if( old->config() == m ) { + if( old->config() != m ) { additive = false; } } @@ -375,23 +452,21 @@ namespace mongo { newOnes.push_back(&m); } } - - // change timeout settings, if necessary - ScopedConn conn(m.h.toString()); - conn.setTimeout(c.ho.heartbeatTimeoutMillis/1000.0); } if( me == 0 ) { - // initial startup with fastsync - if (!reconf && replSettings.fastsync) { - return false; - } - // log() << "replSet config : " << _cfg->toString() << rsLog; + _members.orphanAll(); + // hbs must continue to pick up new config + // stop sync thread + box.set(MemberState::RS_STARTUP, 0); + + // go into holding pattern log() << "replSet error self not present in the repl set configuration:" << rsLog; log() << c.toString() << rsLog; - uasserted(13497, "replSet error self not present in the configuration"); + return false; } uassert( 13302, "replSet error self appears twice in the repl set configuration", me<=1 ); + // if we found different members that the original config, reload everything if( reconf && config().members.size() != nfound ) additive = false; } @@ -402,10 +477,11 @@ namespace mongo { _name = _cfg->_id; assert( !_name.empty() ); + // this is a shortcut for simple changes if( additive ) { log() << "replSet info : additive change to configuration" << rsLog; - for( list<const ReplSetConfig::MemberCfg*>::const_iterator i = newOnes.begin(); i != newOnes.end(); i++ ) { - const ReplSetConfig::MemberCfg* m = *i; + for( list<ReplSetConfig::MemberCfg*>::const_iterator i = newOnes.begin(); i != newOnes.end(); i++ ) { + ReplSetConfig::MemberCfg *m = *i; Member *mi = new Member(m->h, m->_id, m, false); /** we will indicate that new members are up() initially so that we don't relinquish our @@ -417,6 +493,11 @@ namespace mongo { _members.push(mi); startHealthTaskFor(mi); } + + // if we aren't creating new members, we may have to update the + // groups for the current ones + _cfg->updateMembers(_members); + return true; } @@ -433,21 +514,21 @@ namespace mongo { } forgetPrimary(); - bool iWasArbiterOnly = _self ? iAmArbiterOnly() : false; - setSelfTo(0); + // not setting _self to 0 as other threads use _self w/o locking + int me = 0; + + // For logging + string members = ""; + for( vector<ReplSetConfig::MemberCfg>::iterator i = _cfg->members.begin(); i != _cfg->members.end(); i++ ) { - const ReplSetConfig::MemberCfg& m = *i; + ReplSetConfig::MemberCfg& m = *i; Member *mi; + members += ( members == "" ? "" : ", " ) + m.h.toString(); if( m.h.isSelf() ) { - assert( _self == 0 ); + assert( me++ == 0 ); mi = new Member(m.h, m._id, &m, true); setSelfTo(mi); - // if the arbiter status changed - if (iWasArbiterOnly ^ iAmArbiterOnly()) { - _changeArbiterState(); - } - if( (int)mi->id() == oldPrimaryId ) box.setSelfPrimary(mi); } @@ -459,38 +540,12 @@ namespace mongo { box.setOtherPrimary(mi); } } - return true; - } - void startSyncThread(); - - void ReplSetImpl::_changeArbiterState() { - if (iAmArbiterOnly()) { - changeState(MemberState::RS_ARBITER); - - // if there is an oplog, free it - // not sure if this is necessary, maybe just leave the oplog and let - // the user delete it if they want the space? - writelock lk(rsoplog); - Client::Context c(rsoplog); - NamespaceDetails *d = nsdetails(rsoplog); - if (d) { - string errmsg; - bob res; - dropCollection(rsoplog, errmsg, res); - - // clear last op time to force initial sync (if the arbiter - // becomes a "normal" server again) - lastOpTimeWritten = OpTime(); - } + if( me == 0 ){ + log() << "replSet warning did not detect own host in full reconfig, members " << members << " config: " << c << rsLog; } - else { - changeState(MemberState::RS_RECOVERING); - // oplog will be allocated when sync begins - /* TODO : could this cause two sync threads to exist (race condition)? */ - boost::thread t(startSyncThread); - } + return true; } // Our own config must be the first one. @@ -514,7 +569,6 @@ namespace mongo { if( highest->version > myVersion && highest->version >= 0 ) { log() << "replSet got config version " << highest->version << " from a remote, saving locally" << rsLog; - writelock lk("admin."); highest->saveConfigLocally(BSONObj()); } return true; @@ -523,7 +577,7 @@ namespace mongo { void ReplSetImpl::loadConfig() { while( 1 ) { startupStatus = LOADINGCONFIG; - startupStatusMsg = "loading " + rsConfigNs + " config (LOADINGCONFIG)"; + startupStatusMsg.set("loading " + rsConfigNs + " config (LOADINGCONFIG)"); try { vector<ReplSetConfig> configs; try { @@ -531,7 +585,6 @@ namespace mongo { } catch(DBException& e) { log() << "replSet exception loading our local replset configuration object : " << e.toString() << rsLog; - throw; } for( vector<HostAndPort>::const_iterator i = _seeds->begin(); i != _seeds->end(); i++ ) { try { @@ -542,12 +595,25 @@ namespace mongo { } } - if( discoveredSeed ) { + if( replSettings.discoveredSeeds.size() > 0 ) { + for (set<string>::iterator i = replSettings.discoveredSeeds.begin(); i != replSettings.discoveredSeeds.end(); i++) { + try { + configs.push_back( ReplSetConfig(HostAndPort(*i)) ); + } + catch( DBException& ) { + log(1) << "replSet exception trying to load config from discovered seed " << *i << rsLog; + replSettings.discoveredSeeds.erase(*i); + } + } + } + + if (!replSettings.reconfig.isEmpty()) { try { - configs.push_back( ReplSetConfig(HostAndPort(*discoveredSeed)) ); + configs.push_back(ReplSetConfig(replSettings.reconfig, true)); } - catch( DBException& ) { - log(1) << "replSet exception trying to load config from discovered seed " << *discoveredSeed << rsLog; + catch( DBException& re) { + log() << "couldn't load reconfig: " << re.what() << endl; + replSettings.reconfig = BSONObj(); } } @@ -563,17 +629,17 @@ namespace mongo { if( nempty == (int) configs.size() ) { startupStatus = EMPTYCONFIG; - startupStatusMsg = "can't get " + rsConfigNs + " config from self or any seed (EMPTYCONFIG)"; + startupStatusMsg.set("can't get " + rsConfigNs + " config from self or any seed (EMPTYCONFIG)"); log() << "replSet can't get " << rsConfigNs << " config from self or any seed (EMPTYCONFIG)" << rsLog; static unsigned once; if( ++once == 1 ) log() << "replSet info you may need to run replSetInitiate -- rs.initiate() in the shell -- if that is not already done" << rsLog; if( _seeds->size() == 0 ) - log(1) << "replSet info no seed hosts were specified on the --replSet command line" << rsLog; + LOG(1) << "replSet info no seed hosts were specified on the --replSet command line" << rsLog; } else { startupStatus = EMPTYUNREACHABLE; - startupStatusMsg = "can't currently get " + rsConfigNs + " config from self or any seed (EMPTYUNREACHABLE)"; + startupStatusMsg.set("can't currently get " + rsConfigNs + " config from self or any seed (EMPTYUNREACHABLE)"); log() << "replSet can't get " << rsConfigNs << " config from self or any seed (yet)" << rsLog; } @@ -589,7 +655,7 @@ namespace mongo { } catch(DBException& e) { startupStatus = BADCONFIG; - startupStatusMsg = "replSet error loading set config (BADCONFIG)"; + startupStatusMsg.set("replSet error loading set config (BADCONFIG)"); log() << "replSet error loading configurations " << e.toString() << rsLog; log() << "replSet error replication will not start" << rsLog; sethbmsg("error loading set config"); @@ -598,27 +664,26 @@ namespace mongo { } break; } - startupStatusMsg = "? started"; + startupStatusMsg.set("? started"); startupStatus = STARTED; } void ReplSetImpl::_fatal() { - //lock l(this); box.set(MemberState::RS_FATAL, 0); - //sethbmsg("fatal error"); log() << "replSet error fatal, stopping replication" << rsLog; } void ReplSet::haveNewConfig(ReplSetConfig& newConfig, bool addComment) { - lock l(this); // convention is to lock replset before taking the db rwlock - writelock lk(""); bo comment; if( addComment ) comment = BSON( "msg" << "Reconfig set" << "version" << newConfig.version ); + newConfig.saveConfigLocally(comment); + try { - initFromConfig(newConfig, true); - log() << "replSet replSetReconfig new config saved locally" << rsLog; + if (initFromConfig(newConfig, true)) { + log() << "replSet replSetReconfig new config saved locally" << rsLog; + } } catch(DBException& e) { if( e.getCode() == 13497 /* removed from set */ ) { @@ -652,16 +717,14 @@ namespace mongo { terminates. */ void startReplSets(ReplSetCmdline *replSetCmdline) { - Client::initThread("startReplSets"); + Client::initThread("rsStart"); try { assert( theReplSet == 0 ); if( replSetCmdline == 0 ) { assert(!replSet); return; } - if( !noauth ) { - cc().getAuthenticationInfo()->authorize("local"); - } + replLocalAuth(); (theReplSet = new ReplSet(*replSetCmdline))->go(); } catch(std::exception& e) { @@ -672,6 +735,13 @@ namespace mongo { cc().shutdown(); } + void replLocalAuth() { + if ( noauth ) + return; + cc().getAuthenticationInfo()->authorize("local","_repl"); + } + + } namespace boost { diff --git a/db/repl/rs.h b/db/repl/rs.h index ea9aef1..61041a6 100644 --- a/db/repl/rs.h +++ b/db/repl/rs.h @@ -21,13 +21,26 @@ #include "../../util/concurrency/list.h" #include "../../util/concurrency/value.h" #include "../../util/concurrency/msg.h" -#include "../../util/hostandport.h" +#include "../../util/net/hostandport.h" #include "../commands.h" +#include "../oplogreader.h" #include "rs_exception.h" #include "rs_optime.h" #include "rs_member.h" #include "rs_config.h" +/** + * Order of Events + * + * On startup, if the --replSet option is present, startReplSets is called. + * startReplSets forks off a new thread for replica set activities. It creates + * the global theReplSet variable and calls go() on it. + * + * theReplSet's constructor changes the replica set's state to RS_STARTUP, + * starts the replica set manager, and loads the config (if the replica set + * has been initialized). + */ + namespace mongo { struct HowToFixUp; @@ -41,11 +54,15 @@ namespace mongo { /* member of a replica set */ class Member : public List1<Member>::Base { + private: + ~Member(); // intentionally unimplemented as should never be called -- see List1<>::Base. + Member(const Member&); public: - Member(HostAndPort h, unsigned ord, const ReplSetConfig::MemberCfg *c, bool self); + Member(HostAndPort h, unsigned ord, ReplSetConfig::MemberCfg *c, bool self); string fullName() const { return h().toString(); } const ReplSetConfig::MemberCfg& config() const { return _config; } + ReplSetConfig::MemberCfg& configw() { return _config; } const HeartbeatInfo& hbinfo() const { return _hbinfo; } HeartbeatInfo& get_hbinfo() { return _hbinfo; } string lhb() const { return _hbinfo.lastHeartbeatMsg; } @@ -58,7 +75,7 @@ namespace mongo { private: friend class ReplSetImpl; - const ReplSetConfig::MemberCfg _config; + ReplSetConfig::MemberCfg _config; const HostAndPort _h; HeartbeatInfo _hbinfo; }; @@ -75,6 +92,7 @@ namespace mongo { const Member* findOtherPrimary(bool& two); void noteARemoteIsPrimary(const Member *); + void checkElectableSet(); virtual void starting(); public: Manager(ReplSetImpl *rs); @@ -83,6 +101,47 @@ namespace mongo { void msgCheckNewState(); }; + class GhostSync : public task::Server { + struct GhostSlave { + GhostSlave() : last(0), slave(0), init(false) {} + OplogReader reader; + OpTime last; + Member* slave; + bool init; + }; + /** + * This is a cache of ghost slaves + */ + typedef map<mongo::OID,GhostSlave> MAP; + MAP _ghostCache; + RWLock _lock; // protects _ghostCache + ReplSetImpl *rs; + virtual void starting(); + public: + GhostSync(ReplSetImpl *_rs) : task::Server("rsGhostSync"), _lock("GhostSync"), rs(_rs) {} + ~GhostSync() { + log() << "~GhostSync() called" << rsLog; + } + + /** + * Replica sets can sync in a hierarchical fashion, which throws off w + * calculation on the master. percolate() faux-syncs from an upstream + * node so that the primary will know what the slaves are up to. + * + * We can't just directly sync to the primary because it could be + * unreachable, e.g., S1--->S2--->S3--->P. S2 should ghost sync from S3 + * and S3 can ghost sync from the primary. + * + * Say we have an S1--->S2--->P situation and this node is S2. rid + * would refer to S1. S2 would create a ghost slave of S1 and connect + * it to P (_currentSyncTarget). Then it would use this connection to + * pretend to be S1, replicating off of P. + */ + void percolate(const BSONObj& rid, const OpTime& last); + void associateSlave(const BSONObj& rid, const int memberId); + void updateSlave(const mongo::OID& id, const OpTime& last); + }; + struct Target; class Consensus { @@ -92,7 +151,8 @@ namespace mongo { time_t when; unsigned who; }; - Atomic<LastYea> ly; + static mutex lyMutex; + Guarded<LastYea,lyMutex> ly; unsigned yea(unsigned memberId); // throws VoteException void electionFailed(unsigned meid); void _electSelf(); @@ -117,7 +177,12 @@ namespace mongo { void multiCommand(BSONObj cmd, list<Target>& L); }; - /** most operations on a ReplSet object should be done while locked. that logic implemented here. */ + /** + * most operations on a ReplSet object should be done while locked. that + * logic implemented here. + * + * Order of locking: lock the replica set, then take a rwlock. + */ class RSBase : boost::noncopyable { public: const unsigned magic; @@ -133,6 +198,7 @@ namespace mongo { log() << "replSet ~RSBase called" << rsLog; } + public: class lock { RSBase& rsbase; auto_ptr<scoped_lock> sl; @@ -156,7 +222,6 @@ namespace mongo { } }; - public: /* for asserts */ bool locked() const { return _locked != 0; } @@ -178,13 +243,19 @@ namespace mongo { const Member *primary; }; const SP get() { - scoped_lock lk(m); + rwlock lk(m, false); return sp; } - MemberState getState() const { return sp.state; } - const Member* getPrimary() const { return sp.primary; } + MemberState getState() const { + rwlock lk(m, false); + return sp.state; + } + const Member* getPrimary() const { + rwlock lk(m, false); + return sp.primary; + } void change(MemberState s, const Member *self) { - scoped_lock lk(m); + rwlock lk(m, true); if( sp.state != s ) { log() << "replSet " << s.toString() << rsLog; } @@ -198,24 +269,25 @@ namespace mongo { } } void set(MemberState s, const Member *p) { - scoped_lock lk(m); - sp.state = s; sp.primary = p; + rwlock lk(m, true); + sp.state = s; + sp.primary = p; } void setSelfPrimary(const Member *self) { change(MemberState::RS_PRIMARY, self); } void setOtherPrimary(const Member *mem) { - scoped_lock lk(m); + rwlock lk(m, true); assert( !sp.state.primary() ); sp.primary = mem; } void noteRemoteIsPrimary(const Member *remote) { - scoped_lock lk(m); + rwlock lk(m, true); if( !sp.state.secondary() && !sp.state.fatal() ) sp.state = MemberState::RS_RECOVERING; sp.primary = remote; } StateBox() : m("StateBox") { } private: - mongo::mutex m; + RWLock m; SP sp; }; @@ -267,10 +339,17 @@ namespace mongo { bool _freeze(int secs); private: void assumePrimary(); - void loadLastOpTimeWritten(); + void loadLastOpTimeWritten(bool quiet=false); void changeState(MemberState s); + + /** + * Find the closest member (using ping time) with a higher latest optime. + */ const Member* getMemberToSyncTo(); - void _changeArbiterState(); + Member* _currentSyncTarget; + + // set of electable members' _ids + set<unsigned> _electableSet; protected: // "heartbeat message" // sent in requestHeartbeat respond in field "hbm" @@ -278,8 +357,54 @@ namespace mongo { time_t _hbmsgTime; // when it was logged public: void sethbmsg(string s, int logLevel = 0); + + /** + * Election with Priorities + * + * Each node (n) keeps a set of nodes that could be elected primary. + * Each node in this set: + * + * 1. can connect to a majority of the set + * 2. has a priority greater than 0 + * 3. has an optime within 10 seconds of the most up-to-date node + * that n can reach + * + * If a node fails to meet one or more of these criteria, it is removed + * from the list. This list is updated whenever the node receives a + * heartbeat. + * + * When a node sends an "am I freshest?" query, the node receiving the + * query checks their electable list to make sure that no one else is + * electable AND higher priority. If this check passes, the node will + * return an "ok" response, if not, it will veto. + * + * If a node is primary and there is another node with higher priority + * on the electable list (i.e., it must be synced to within 10 seconds + * of the current primary), the node (or nodes) with connections to both + * the primary and the secondary with higher priority will issue + * replSetStepDown requests to the primary to allow the higher-priority + * node to take over. + */ + void addToElectable(const unsigned m) { lock lk(this); _electableSet.insert(m); } + void rmFromElectable(const unsigned m) { lock lk(this); _electableSet.erase(m); } + bool iAmElectable() { lock lk(this); return _electableSet.find(_self->id()) != _electableSet.end(); } + bool isElectable(const unsigned id) { lock lk(this); return _electableSet.find(id) != _electableSet.end(); } + Member* getMostElectable(); protected: - bool initFromConfig(ReplSetConfig& c, bool reconf=false); // true if ok; throws if config really bad; false if config doesn't include self + /** + * Load a new config as the replica set's main config. + * + * If there is a "simple" change (just adding a node), this shortcuts + * the config. Returns true if the config was changed. Returns false + * if the config doesn't include a this node. Throws an exception if + * something goes very wrong. + * + * Behavior to note: + * - locks this + * - intentionally leaks the old _cfg and any old _members (if the + * change isn't strictly additive) + */ + bool initFromConfig(ReplSetConfig& c, bool reconf=false); void _fillIsMaster(BSONObjBuilder&); void _fillIsMasterHost(const Member*, vector<string>&, vector<string>&, vector<string>&); const ReplSetConfig& config() { return *_cfg; } @@ -301,27 +426,48 @@ namespace mongo { const vector<HostAndPort> *_seeds; ReplSetConfig *_cfg; - /** load our configuration from admin.replset. try seed machines too. - @return true if ok; throws if config really bad; false if config doesn't include self - */ + /** + * Finds the configuration with the highest version number and attempts + * load it. + */ bool _loadConfigFinish(vector<ReplSetConfig>& v); + /** + * Gather all possible configs (from command line seeds, our own config + * doc, and any hosts listed therein) and try to initiate from the most + * recent config we find. + */ void loadConfig(); list<HostAndPort> memberHostnames() const; - const ReplSetConfig::MemberCfg& myConfig() const { return _self->config(); } + const ReplSetConfig::MemberCfg& myConfig() const { return _config; } bool iAmArbiterOnly() const { return myConfig().arbiterOnly; } - bool iAmPotentiallyHot() const { return myConfig().potentiallyHot(); } + bool iAmPotentiallyHot() const { + return myConfig().potentiallyHot() && // not an arbiter + elect.steppedDown <= time(0) && // not stepped down/frozen + state() == MemberState::RS_SECONDARY; // not stale + } protected: Member *_self; bool _buildIndexes; // = _self->config().buildIndexes void setSelfTo(Member *); // use this as it sets buildIndexes var private: - List1<Member> _members; /* all members of the set EXCEPT self. */ + List1<Member> _members; // all members of the set EXCEPT _self. + ReplSetConfig::MemberCfg _config; // config of _self + unsigned _id; // _id of _self + int _maintenanceMode; // if we should stay in recovering state public: - unsigned selfId() const { return _self->id(); } + // this is called from within a writelock in logOpRS + unsigned selfId() const { return _id; } Manager *mgr; - + GhostSync *ghost; + /** + * This forces a secondary to go into recovering state and stay there + * until this is called again, passing in "false". Multiple threads can + * call this and it will leave maintenance mode once all of the callers + * have called it again, passing in false. + */ + void setMaintenanceMode(const bool inc); private: Member* head() const { return _members.head(); } public: @@ -334,6 +480,7 @@ namespace mongo { friend class CmdReplSetElect; friend class Member; friend class Manager; + friend class GhostSync; friend class Consensus; private: @@ -352,6 +499,7 @@ namespace mongo { bool _isStale(OplogReader& r, const string& hn); public: void syncThread(); + const OpTime lastOtherOpTime() const; }; class ReplSet : public ReplSetImpl { @@ -365,7 +513,7 @@ namespace mongo { bool freeze(int secs) { return _freeze(secs); } string selfFullName() { - lock lk(this); + assert( _self ); return _self->fullName(); } @@ -385,12 +533,20 @@ namespace mongo { void summarizeStatus(BSONObjBuilder& b) const { _summarizeStatus(b); } void fillIsMaster(BSONObjBuilder& b) { _fillIsMaster(b); } - /* we have a new config (reconfig) - apply it. - @param comment write a no-op comment to the oplog about it. only makes sense if one is primary and initiating the reconf. - */ + /** + * We have a new config (reconfig) - apply it. + * @param comment write a no-op comment to the oplog about it. only + * makes sense if one is primary and initiating the reconf. + * + * The slaves are updated when they get a heartbeat indicating the new + * config. The comment is a no-op. + */ void haveNewConfig(ReplSetConfig& c, bool comment); - /* if we delete old configs, this needs to assure locking. currently we don't so it is ok. */ + /** + * Pointer assignment isn't necessarily atomic, so this needs to assure + * locking, even though we don't delete old configs. + */ const ReplSetConfig& getConfig() { return config(); } bool lockedByMe() { return RSBase::lockedByMe(); } @@ -402,9 +558,10 @@ namespace mongo { } }; - /** base class for repl set commands. checks basic things such as in rs mode before the command - does its real work - */ + /** + * Base class for repl set commands. Checks basic things such if we're in + * rs mode before the command does its real work. + */ class ReplSetCommand : public Command { protected: ReplSetCommand(const char * s, bool show=false) : Command(s, show) { } @@ -413,26 +570,53 @@ namespace mongo { virtual bool logTheOp() { return false; } virtual LockType locktype() const { return NONE; } virtual void help( stringstream &help ) const { help << "internal"; } + + /** + * Some replica set commands call this and then call check(). This is + * intentional, as they might do things before theReplSet is initialized + * that still need to be checked for auth. + */ + bool checkAuth(string& errmsg, BSONObjBuilder& result) { + if( !noauth && adminOnly() ) { + AuthenticationInfo *ai = cc().getAuthenticationInfo(); + if (!ai->isAuthorizedForLock("admin", locktype())) { + errmsg = "replSet command unauthorized"; + return false; + } + } + return true; + } + bool check(string& errmsg, BSONObjBuilder& result) { if( !replSet ) { errmsg = "not running with --replSet"; return false; } + if( theReplSet == 0 ) { result.append("startupStatus", ReplSet::startupStatus); + string s; errmsg = ReplSet::startupStatusMsg.empty() ? "replset unknown error 2" : ReplSet::startupStatusMsg.get(); if( ReplSet::startupStatus == 3 ) result.append("info", "run rs.initiate(...) if not yet done for the set"); return false; } - return true; + + return checkAuth(errmsg, result); } }; + /** + * does local authentication + * directly authorizes against AuthenticationInfo + */ + void replLocalAuth(); + /** inlines ----------------- */ - inline Member::Member(HostAndPort h, unsigned ord, const ReplSetConfig::MemberCfg *c, bool self) : + inline Member::Member(HostAndPort h, unsigned ord, ReplSetConfig::MemberCfg *c, bool self) : _config(*c), _h(h), _hbinfo(ord) { + assert(c); if( self ) _hbinfo.health = 1.0; } diff --git a/db/repl/rs_config.cpp b/db/repl/rs_config.cpp index 2341fe9..13352b1 100644 --- a/db/repl/rs_config.cpp +++ b/db/repl/rs_config.cpp @@ -20,10 +20,11 @@ #include "rs.h" #include "../../client/dbclient.h" #include "../../client/syncclusterconnection.h" -#include "../../util/hostandport.h" +#include "../../util/net/hostandport.h" #include "../dbhelpers.h" #include "connections.h" #include "../oplog.h" +#include "../instance.h" using namespace bson; @@ -36,7 +37,7 @@ namespace mongo { while( i.more() ) { BSONElement e = i.next(); if( !fields.count( e.fieldName() ) ) { - uasserted(13434, str::stream() << "unexpected field '" << e.fieldName() << "'in object"); + uasserted(13434, str::stream() << "unexpected field '" << e.fieldName() << "' in object"); } } } @@ -63,27 +64,14 @@ namespace mongo { //rather than above, do a logOp()? probably BSONObj o = asBson(); Helpers::putSingletonGod(rsConfigNs.c_str(), o, false/*logOp=false; local db so would work regardless...*/); - if( !comment.isEmpty() ) + if( !comment.isEmpty() && (!theReplSet || theReplSet->isPrimary()) ) logOpInitiate(comment); cx.db()->flushFiles(true); } - DEV log() << "replSet saveConfigLocally done" << rsLog; + log() << "replSet saveConfigLocally done" << rsLog; } - /*static*/ - /*void ReplSetConfig::receivedNewConfig(BSONObj cfg) { - if( theReplSet ) - return; // this is for initial setup only, so far. todo - - ReplSetConfig c(cfg); - - writelock lk("admin."); - if( theReplSet ) - return; - c.saveConfigLocally(bo()); - }*/ - bo ReplSetConfig::MemberCfg::asBson() const { bob b; b << "_id" << _id; @@ -95,36 +83,52 @@ namespace mongo { if( hidden ) b << "hidden" << hidden; if( !buildIndexes ) b << "buildIndexes" << buildIndexes; if( !tags.empty() ) { - BSONArrayBuilder a; - for( set<string>::const_iterator i = tags.begin(); i != tags.end(); i++ ) - a.append(*i); - b.appendArray("tags", a.done()); - } - if( !initialSync.isEmpty() ) { - b << "initialSync" << initialSync; + BSONObjBuilder a; + for( map<string,string>::const_iterator i = tags.begin(); i != tags.end(); i++ ) + a.append((*i).first, (*i).second); + b.append("tags", a.done()); } return b.obj(); } + void ReplSetConfig::updateMembers(List1<Member> &dest) { + for (vector<MemberCfg>::iterator source = members.begin(); source < members.end(); source++) { + for( Member *d = dest.head(); d; d = d->next() ) { + if (d->fullName() == (*source).h.toString()) { + d->configw().groupsw() = (*source).groups(); + } + } + } + } + bo ReplSetConfig::asBson() const { bob b; b.append("_id", _id).append("version", version); - if( !ho.isDefault() || !getLastErrorDefaults.isEmpty() ) { - bob settings; - if( !ho.isDefault() ) - settings << "heartbeatConnRetries " << ho.heartbeatConnRetries << - "heartbeatSleep" << ho.heartbeatSleepMillis / 1000.0 << - "heartbeatTimeout" << ho.heartbeatTimeoutMillis / 1000.0; - if( !getLastErrorDefaults.isEmpty() ) - settings << "getLastErrorDefaults" << getLastErrorDefaults; - b << "settings" << settings.obj(); - } BSONArrayBuilder a; for( unsigned i = 0; i < members.size(); i++ ) a.append( members[i].asBson() ); b.append("members", a.arr()); + if( !ho.isDefault() || !getLastErrorDefaults.isEmpty() || !rules.empty()) { + bob settings; + if( !rules.empty() ) { + bob modes; + for (map<string,TagRule*>::const_iterator it = rules.begin(); it != rules.end(); it++) { + bob clauses; + vector<TagClause*> r = (*it).second->clauses; + for (vector<TagClause*>::iterator it2 = r.begin(); it2 < r.end(); it2++) { + clauses << (*it2)->name << (*it2)->target; + } + modes << (*it).first << clauses.obj(); + } + settings << "getLastErrorModes" << modes.obj(); + } + if( !getLastErrorDefaults.isEmpty() ) + settings << "getLastErrorDefaults" << getLastErrorDefaults; + b << "settings" << settings.obj(); + } + return b.obj(); } @@ -135,38 +139,87 @@ namespace mongo { void ReplSetConfig::MemberCfg::check() const { mchk(_id >= 0 && _id <= 255); mchk(priority >= 0 && priority <= 1000); - mchk(votes >= 0 && votes <= 100); - uassert(13419, "this version of mongod only supports priorities 0 and 1", priority == 0 || priority == 1); + mchk(votes <= 100); // votes >= 0 because it is unsigned + uassert(13419, "priorities must be between 0.0 and 100.0", priority >= 0.0 && priority <= 100.0); uassert(13437, "slaveDelay requires priority be zero", slaveDelay == 0 || priority == 0); uassert(13438, "bad slaveDelay value", slaveDelay >= 0 && slaveDelay <= 3600 * 24 * 366); uassert(13439, "priority must be 0 when hidden=true", priority == 0 || !hidden); uassert(13477, "priority must be 0 when buildIndexes=false", buildIndexes || priority == 0); + } +/* + string ReplSetConfig::TagSubgroup::toString() const { + bool first = true; + string result = "\""+name+"\": ["; + for (set<const MemberCfg*>::const_iterator i = m.begin(); i != m.end(); i++) { + if (!first) { + result += ", "; + } + first = false; + result += (*i)->h.toString(); + } + return result+"]"; + } + */ + string ReplSetConfig::TagClause::toString() const { + string result = name+": {"; + for (map<string,TagSubgroup*>::const_iterator i = subgroups.begin(); i != subgroups.end(); i++) { +//TEMP? result += (*i).second->toString()+", "; + } + result += "TagClause toString TEMPORARILY DISABLED"; + return result + "}"; + } - if (!initialSync.isEmpty()) { - static const string legal[] = {"state", "name", "_id","optime"}; - static const set<string> legals(legal, legal + 4); - assertOnlyHas(initialSync, legals); + string ReplSetConfig::TagRule::toString() const { + string result = "{"; + for (vector<TagClause*>::const_iterator it = clauses.begin(); it < clauses.end(); it++) { + result += ((TagClause*)(*it))->toString()+","; + } + return result+"}"; + } - if (initialSync.hasElement("state")) { - uassert(13525, "initialSync source state must be 1 or 2", - initialSync["state"].isNumber() && - (initialSync["state"].Number() == 1 || - initialSync["state"].Number() == 2)); - } - if (initialSync.hasElement("name")) { - uassert(13526, "initialSync source name must be a string", - initialSync["name"].type() == mongo::String); + void ReplSetConfig::TagSubgroup::updateLast(const OpTime& op) { + if (last < op) { + last = op; + + for (vector<TagClause*>::iterator it = clauses.begin(); it < clauses.end(); it++) { + (*it)->updateLast(op); } - if (initialSync.hasElement("_id")) { - uassert(13527, "initialSync source _id must be a number", - initialSync["_id"].isNumber()); + } + } + + void ReplSetConfig::TagClause::updateLast(const OpTime& op) { + if (last >= op) { + return; + } + + // check at least n subgroups greater than clause.last + int count = 0; + map<string,TagSubgroup*>::iterator it; + for (it = subgroups.begin(); it != subgroups.end(); it++) { + if ((*it).second->last >= op) { + count++; } - if (initialSync.hasElement("optime")) { - uassert(13528, "initialSync source optime must be a timestamp", - initialSync["optime"].type() == mongo::Timestamp || - initialSync["optime"].type() == mongo::Date); + } + + if (count >= actualTarget) { + last = op; + rule->updateLast(op); + } + } + + void ReplSetConfig::TagRule::updateLast(const OpTime& op) { + OpTime *earliest = (OpTime*)&op; + vector<TagClause*>::iterator it; + + for (it = clauses.begin(); it < clauses.end(); it++) { + if ((*it)->last < *earliest) { + earliest = &(*it)->last; } } + + // rules are simply and-ed clauses, so whatever the most-behind + // clause is at is what the rule is at + last = *earliest; } /** @param o old config @@ -184,18 +237,28 @@ namespace mongo { if someone had some intermediate config this node doesnt have, that could be necessary. but then how did we become primary? so perhaps we are fine as-is. */ - if( o.version + 1 != n.version ) { - errmsg = "version number wrong"; + if( o.version >= n.version ) { + errmsg = str::stream() << "version number must increase, old: " + << o.version << " new: " << n.version; return false; } map<HostAndPort,const ReplSetConfig::MemberCfg*> old; + bool isLocalHost = false; for( vector<ReplSetConfig::MemberCfg>::const_iterator i = o.members.begin(); i != o.members.end(); i++ ) { + if (i->h.isLocalHost()) { + isLocalHost = true; + } old[i->h] = &(*i); } int me = 0; for( vector<ReplSetConfig::MemberCfg>::const_iterator i = n.members.begin(); i != n.members.end(); i++ ) { const ReplSetConfig::MemberCfg& m = *i; + if ( (isLocalHost && !m.h.isLocalHost()) || (!isLocalHost && m.h.isLocalHost())) { + log() << "reconfig error, cannot switch between localhost and hostnames: " + << m.h.toString() << rsLog; + uasserted(13645, "hosts cannot switch between localhost and hostname"); + } if( old.count(m.h) ) { const ReplSetConfig::MemberCfg& oldCfg = *old[m.h]; if( oldCfg._id != m._id ) { @@ -212,6 +275,7 @@ namespace mongo { log() << "replSet reconfig error with member: " << m.h.toString() << " arbiterOnly cannot change. remove and readd the member instead " << rsLog; uasserted(13510, "arbiterOnly may not change for members"); } + uassert(14827, "arbiters cannot have tags", !m.arbiterOnly || m.tags.size() == 0 ); } if( m.h.isSelf() ) me++; @@ -250,6 +314,122 @@ namespace mongo { } } + void ReplSetConfig::_populateTagMap(map<string,TagClause> &tagMap) { + // create subgroups for each server corresponding to each of + // its tags. E.g.: + // + // A is tagged with {"server" : "A", "dc" : "ny"} + // B is tagged with {"server" : "B", "dc" : "ny"} + // + // At the end of this step, tagMap will contain: + // + // "server" => {"A" : [A], "B" : [B]} + // "dc" => {"ny" : [A,B]} + + for (unsigned i=0; i<members.size(); i++) { + MemberCfg member = members[i]; + + for (map<string,string>::iterator tag = member.tags.begin(); tag != member.tags.end(); tag++) { + string label = (*tag).first; + string value = (*tag).second; + + TagClause& clause = tagMap[label]; + clause.name = label; + + TagSubgroup* subgroup; + // search for "ny" in "dc"'s clause + if (clause.subgroups.find(value) == clause.subgroups.end()) { + clause.subgroups[value] = subgroup = new TagSubgroup(value); + } + else { + subgroup = clause.subgroups[value]; + } + + subgroup->m.insert(&members[i]); + } + } + } + + void ReplSetConfig::parseRules(const BSONObj& modes) { + map<string,TagClause> tagMap; + _populateTagMap(tagMap); + + for (BSONObj::iterator i = modes.begin(); i.more(); ) { + unsigned int primaryOnly = 0; + + // ruleName : {dc : 2, m : 3} + BSONElement rule = i.next(); + uassert(14046, "getLastErrorMode rules must be objects", rule.type() == mongo::Object); + + TagRule* r = new TagRule(); + + BSONObj clauseObj = rule.Obj(); + for (BSONObj::iterator c = clauseObj.begin(); c.more(); ) { + BSONElement clauseElem = c.next(); + uassert(14829, "getLastErrorMode criteria must be numeric", clauseElem.isNumber()); + + // get the clause, e.g., "x.y" : 3 + const char *criteria = clauseElem.fieldName(); + int value = clauseElem.numberInt(); + uassert(14828, str::stream() << "getLastErrorMode criteria must be greater than 0: " << clauseElem, value > 0); + + TagClause* node = new TagClause(tagMap[criteria]); + + int numGroups = node->subgroups.size(); + uassert(14831, str::stream() << "mode " << clauseObj << " requires " + << value << " tagged with " << criteria << ", but only " + << numGroups << " with this tag were found", numGroups >= value); + + node->name = criteria; + node->target = value; + // if any subgroups contain "me", we can decrease the target + node->actualTarget = node->target; + + // then we want to add pointers between clause & subgroup + for (map<string,TagSubgroup*>::iterator sgs = node->subgroups.begin(); + sgs != node->subgroups.end(); sgs++) { + bool foundMe = false; + (*sgs).second->clauses.push_back(node); + + // if this subgroup contains the primary, it's automatically always up-to-date + for( set<MemberCfg*>::const_iterator cfg = (*sgs).second->m.begin(); + cfg != (*sgs).second->m.end(); + cfg++) + { + if ((*cfg)->h.isSelf()) { + node->actualTarget--; + foundMe = true; + } + } + + for (set<MemberCfg *>::iterator cfg = (*sgs).second->m.begin(); + !foundMe && cfg != (*sgs).second->m.end(); cfg++) { + (*cfg)->groupsw().insert((*sgs).second); + } + } + + // if all of the members of this clause involve the primary, it's always up-to-date + if (node->actualTarget == 0) { + node->last = OpTime(INT_MAX, INT_MAX); + primaryOnly++; + } + + // this is a valid clause, so we want to add it to its rule + node->rule = r; + r->clauses.push_back(node); + } + + // if all of the clauses are satisfied by the primary, this rule is trivially true + if (primaryOnly == r->clauses.size()) { + r->last = OpTime(INT_MAX, INT_MAX); + } + + // if we got here, this is a valid rule + LOG(1) << "replSet new rule " << rule.fieldName() << ": " << r->toString() << rsLog; + rules[rule.fieldName()] = r; + } + } + void ReplSetConfig::from(BSONObj o) { static const string legal[] = {"_id","version", "members","settings"}; static const set<string> legals(legal, legal + 4); @@ -262,19 +442,6 @@ namespace mongo { uassert(13115, "bad " + rsConfigNs + " config: version", version > 0); } - if( o["settings"].ok() ) { - BSONObj settings = o["settings"].Obj(); - if( settings["heartbeatConnRetries "].ok() ) - ho.heartbeatConnRetries = settings["heartbeatConnRetries "].numberInt(); - if( settings["heartbeatSleep"].ok() ) - ho.heartbeatSleepMillis = (unsigned) (settings["heartbeatSleep"].Number() * 1000); - if( settings["heartbeatTimeout"].ok() ) - ho.heartbeatTimeoutMillis = (unsigned) (settings["heartbeatTimeout"].Number() * 1000); - ho.check(); - try { getLastErrorDefaults = settings["getLastErrorDefaults"].Obj().copy(); } - catch(...) { } - } - set<string> hosts; set<int> ords; vector<BSONElement> members; @@ -292,7 +459,7 @@ namespace mongo { try { static const string legal[] = { "_id","votes","priority","host", "hidden","slaveDelay", - "arbiterOnly","buildIndexes","tags","initialSync" + "arbiterOnly","buildIndexes","tags","initialSync" // deprecated }; static const set<string> legals(legal, legal + 10); assertOnlyHas(mobj, legals); @@ -304,10 +471,12 @@ namespace mongo { /* TODO: use of string exceptions may be problematic for reconfig case! */ throw "_id must be numeric"; } - string s; try { - s = mobj["host"].String(); + string s = mobj["host"].String(); m.h = HostAndPort(s); + if (!m.h.hasPort()) { + m.h.setPort(m.h.port()); + } } catch(...) { throw string("bad or missing host field? ") + mobj.toString(); @@ -325,12 +494,10 @@ namespace mongo { if( mobj.hasElement("votes") ) m.votes = (unsigned) mobj["votes"].Number(); if( mobj.hasElement("tags") ) { - vector<BSONElement> v = mobj["tags"].Array(); - for( unsigned i = 0; i < v.size(); i++ ) - m.tags.insert( v[i].String() ); - } - if( mobj.hasElement("initialSync")) { - m.initialSync = mobj["initialSync"].Obj().getOwned(); + const BSONObj &t = mobj["tags"].Obj(); + for (BSONObj::iterator c = t.begin(); c.more(); c.next()) { + m.tags[(*c).fieldName()] = (*c).String(); + } } m.check(); } @@ -356,22 +523,38 @@ namespace mongo { } uassert(13393, "can't use localhost in repl set member names except when using it for all members", localhosts == 0 || localhosts == members.size()); uassert(13117, "bad " + rsConfigNs + " config", !_id.empty()); + + if( o["settings"].ok() ) { + BSONObj settings = o["settings"].Obj(); + if( settings["getLastErrorModes"].ok() ) { + parseRules(settings["getLastErrorModes"].Obj()); + } + ho.check(); + try { getLastErrorDefaults = settings["getLastErrorDefaults"].Obj().copy(); } + catch(...) { } + } } static inline void configAssert(bool expr) { uassert(13122, "bad repl set config?", expr); } - ReplSetConfig::ReplSetConfig(BSONObj cfg) { + ReplSetConfig::ReplSetConfig(BSONObj cfg, bool force) { + _constructed = false; clear(); from(cfg); - configAssert( version < 0 /*unspecified*/ || (version >= 1 && version <= 5000) ); + if( force ) { + version += rand() % 100000 + 10000; + } + configAssert( version < 0 /*unspecified*/ || (version >= 1) ); if( version < 1 ) version = 1; _ok = true; + _constructed = true; } ReplSetConfig::ReplSetConfig(const HostAndPort& h) { + _constructed = false; clear(); int level = 2; DEV level = 0; @@ -447,6 +630,7 @@ namespace mongo { checkRsConfig(); _ok = true; log(level) << "replSet load config ok from " << (h.isSelf() ? "self" : h.toString()) << rsLog; + _constructed = true; } } diff --git a/db/repl/rs_config.h b/db/repl/rs_config.h index 7d43fe6..f69052a 100644 --- a/db/repl/rs_config.h +++ b/db/repl/rs_config.h @@ -20,26 +20,37 @@ #pragma once -#include "../../util/hostandport.h" +#include "../../util/net/hostandport.h" +#include "../../util/concurrency/race.h" #include "health.h" namespace mongo { - - /* singleton config object is stored here */ + class Member; const string rsConfigNs = "local.system.replset"; class ReplSetConfig { enum { EMPTYCONFIG = -2 }; + struct TagSubgroup; public: - /* if something is misconfigured, throws an exception. - if couldn't be queried or is just blank, ok() will be false. - */ + /** + * This contacts the given host and tries to get a config from them. + * + * This sends a test heartbeat to the host and, if all goes well and the + * host has a more recent config, fetches the config and loads it (see + * from(). + * + * If it's contacting itself, it skips the heartbeat (for obvious + * reasons.) If something is misconfigured, throws an exception. If the + * host couldn't be queried or is just blank, ok() will be false. + */ ReplSetConfig(const HostAndPort& h); - ReplSetConfig(BSONObj cfg); + ReplSetConfig(BSONObj cfg, bool force=false); bool ok() const { return _ok; } + struct TagRule; + struct MemberCfg { MemberCfg() : _id(-1), votes(1), priority(1.0), arbiterOnly(false), slaveDelay(0), hidden(false), buildIndexes(true) { } int _id; /* ordinal */ @@ -50,12 +61,24 @@ namespace mongo { int slaveDelay; /* seconds. int rather than unsigned for convenient to/front bson conversion. */ bool hidden; /* if set, don't advertise to drives in isMaster. for non-primaries (priority 0) */ bool buildIndexes; /* if false, do not create any non-_id indexes */ - set<string> tags; /* tagging for data center, rack, etc. */ - BSONObj initialSync; /* directions for initial sync source */ - + map<string,string> tags; /* tagging for data center, rack, etc. */ + private: + set<TagSubgroup*> _groups; // the subgroups this member belongs to + public: + const set<TagSubgroup*>& groups() const { + return _groups; + } + set<TagSubgroup*>& groupsw() { + return _groups; + } void check() const; /* check validity, assert if not. */ BSONObj asBson() const; bool potentiallyHot() const { return !arbiterOnly && priority > 0; } + void updateGroups(const OpTime& last) { + for (set<TagSubgroup*>::iterator it = _groups.begin(); it != _groups.end(); it++) { + ((TagSubgroup*)(*it))->updateLast(last); + } + } bool operator==(const MemberCfg& r) const { return _id==r._id && votes == r.votes && h == r.h && priority == r.priority && arbiterOnly == r.arbiterOnly && slaveDelay == r.slaveDelay && hidden == r.hidden && @@ -70,6 +93,7 @@ namespace mongo { HealthOptions ho; string md5; BSONObj getLastErrorDefaults; + map<string,TagRule*> rules; list<HostAndPort> otherMemberHostnames() const; // except self @@ -88,12 +112,112 @@ namespace mongo { void saveConfigLocally(BSONObj comment); // to local db string saveConfigEverywhere(); // returns textual info on what happened + /** + * Update members' groups when the config changes but members stay the same. + */ + void updateMembers(List1<Member> &dest); + BSONObj asBson() const; + bool _constructed; private: bool _ok; void from(BSONObj); void clear(); + + struct TagClause; + + /** + * This is a logical grouping of servers. It is pointed to by a set of + * servers with a certain tag. + * + * For example, suppose servers A, B, and C have the tag "dc" : "nyc". If we + * have a rule {"dc" : 2}, then we want A _or_ B _or_ C to have the + * write for one of the "dc" critiria to be fulfilled, so all three will + * point to this subgroup. When one of their oplog-tailing cursors is + * updated, this subgroup is updated. + */ + struct TagSubgroup : boost::noncopyable { + ~TagSubgroup(); // never called; not defined + TagSubgroup(string nm) : name(nm) { } + const string name; + OpTime last; + vector<TagClause*> clauses; + + // this probably won't actually point to valid members after the + // subgroup is created, as initFromConfig() makes a copy of the + // config + set<MemberCfg*> m; + + void updateLast(const OpTime& op); + + //string toString() const; + + /** + * If two tags have the same name, they should compare as equal so + * that members don't have to update two identical groups on writes. + */ + bool operator() (TagSubgroup& lhs, TagSubgroup& rhs) const { + return lhs.name < rhs.name; + } + }; + + /** + * An argument in a rule. For example, if we had the rule {dc : 2, + * machines : 3}, "dc" : 2 and "machines" : 3 would be two TagClauses. + * + * Each tag clause has a set of associated subgroups. For example, if + * we had "dc" : 2, our subgroups might be "nyc", "sf", and "hk". + */ + struct TagClause { + OpTime last; + map<string,TagSubgroup*> subgroups; + TagRule *rule; + string name; + /** + * If we have get a clause like {machines : 3} and this server is + * tagged with "machines", then it's really {machines : 2}, as we + * will always be up-to-date. So, target would be 3 and + * actualTarget would be 2, in that example. + */ + int target; + int actualTarget; + + void updateLast(const OpTime& op); + string toString() const; + }; + + /** + * Parses getLastErrorModes. + */ + void parseRules(const BSONObj& modes); + + /** + * Create a hash containing every possible clause that could be used in a + * rule and the servers related to that clause. + * + * For example, suppose we have the following servers: + * A {"dc" : "ny", "ny" : "rk1"} + * B {"dc" : "ny", "ny" : "rk1"} + * C {"dc" : "ny", "ny" : "rk2"} + * D {"dc" : "sf", "sf" : "rk1"} + * E {"dc" : "sf", "sf" : "rk2"} + * + * This would give us the possible criteria: + * "dc" -> {A, B, C},{D, E} + * "ny" -> {A, B},{C} + * "sf" -> {D},{E} + */ + void _populateTagMap(map<string,TagClause> &tagMap); + + public: + struct TagRule { + vector<TagClause*> clauses; + OpTime last; + + void updateLast(const OpTime& op); + string toString() const; + }; }; } diff --git a/db/repl/rs_initialsync.cpp b/db/repl/rs_initialsync.cpp index 5a54059..101b03a 100644 --- a/db/repl/rs_initialsync.cpp +++ b/db/repl/rs_initialsync.cpp @@ -34,7 +34,7 @@ namespace mongo { // add try/catch with sleep - void isyncassert(const char *msg, bool expr) { + void isyncassert(const string& msg, bool expr) { if( !expr ) { string m = str::stream() << "initial sync " << msg; theReplSet->sethbmsg(m, 0); @@ -57,20 +57,15 @@ namespace mongo { } } - bool cloneFrom(const char *masterHost, string& errmsg, const string& fromdb, bool logForReplication, - bool slaveOk, bool useReplAuth, bool snapshot); - /* todo : progress metering to sethbmsg. */ static bool clone(const char *master, string db) { string err; return cloneFrom(master, err, db, false, - /* slave_ok */ true, true, false); + /* slave_ok */ true, true, false, /*mayYield*/true, /*mayBeInterrupted*/false); } void _logOpObjRS(const BSONObj& op); - bool copyCollectionFromRemote(const string& host, const string& ns, const BSONObj& query, string &errmsg, bool logforrepl); - static void emptyOplog() { writelock lk(rsoplog); Client::Context ctx(rsoplog); @@ -80,104 +75,47 @@ namespace mongo { if( d && d->stats.nrecords == 0 ) return; // already empty, ok. - log(1) << "replSet empty oplog" << rsLog; + LOG(1) << "replSet empty oplog" << rsLog; d->emptyCappedCollection(rsoplog); - - /* - string errmsg; - bob res; - dropCollection(rsoplog, errmsg, res); - log() << "replSet recreated oplog so it is empty. todo optimize this..." << rsLog; - createOplog();*/ - - // TEMP: restart to recreate empty oplog - //log() << "replSet FATAL error during initial sync. mongod restart required." << rsLog; - //dbexit( EXIT_CLEAN ); - - /* - writelock lk(rsoplog); - Client::Context c(rsoplog, dbpath, 0, doauth/false); - NamespaceDetails *oplogDetails = nsdetails(rsoplog); - uassert(13412, str::stream() << "replSet error " << rsoplog << " is missing", oplogDetails != 0); - oplogDetails->cappedTruncateAfter(rsoplog, h.commonPointOurDiskloc, false); - */ } - /** - * Choose a member to sync from. - * - * The initalSync option is an object with 1 k/v pair: - * - * "state" : 1|2 - * "name" : "host" - * "_id" : N - * "optime" : t - * - * All except optime are exact matches. "optime" will find a secondary with - * an optime >= to the optime given. - */ const Member* ReplSetImpl::getMemberToSyncTo() { - BSONObj sync = myConfig().initialSync; - bool secondaryOnly = false, isOpTime = false; - char *name = 0; - int id = -1; - OpTime optime; - - StateBox::SP sp = box.get(); - assert( !sp.state.primary() ); // wouldn't make sense if we were. - - // if it exists, we've already checked that these fields are valid in - // rs_config.cpp - if ( !sync.isEmpty() ) { - if (sync.hasElement("state")) { - if (sync["state"].Number() == 1) { - if (sp.primary) { - sethbmsg( str::stream() << "syncing to primary: " << sp.primary->fullName(), 0); - return const_cast<Member*>(sp.primary); - } - else { - sethbmsg("couldn't clone from primary"); - return NULL; - } - } - else { - secondaryOnly = true; - } - } - if (sync.hasElement("name")) { - name = (char*)sync["name"].valuestr(); - } - if (sync.hasElement("_id")) { - id = (int)sync["_id"].Number(); - } - if (sync.hasElement("optime")) { - isOpTime = true; - optime = sync["optime"]._opTime(); + Member *closest = 0; + + // wait for 2N pings before choosing a sync target + if (_cfg) { + int needMorePings = config().members.size()*2 - HeartbeatInfo::numPings; + + if (needMorePings > 0) { + OCCASIONALLY log() << "waiting for " << needMorePings << " pings from other members before syncing" << endl; + return NULL; } } - for( Member *m = head(); m; m = m->next() ) { - if (!m->hbinfo().up() || - (m->state() != MemberState::RS_SECONDARY && - m->state() != MemberState::RS_PRIMARY) || - (secondaryOnly && m->state() != MemberState::RS_SECONDARY) || - (id != -1 && (int)m->id() != id) || - (name != 0 && strcmp(name, m->fullName().c_str()) != 0) || - (isOpTime && optime >= m->hbinfo().opTime)) { - continue; + // find the member with the lowest ping time that has more data than me + for (Member *m = _members.head(); m; m = m->next()) { + if (m->hbinfo().up() && + (m->state() == MemberState::RS_PRIMARY || + (m->state() == MemberState::RS_SECONDARY && m->hbinfo().opTime > lastOpTimeWritten)) && + (!closest || m->hbinfo().ping < closest->hbinfo().ping)) { + closest = m; } + } + + { + lock lk(this); - sethbmsg( str::stream() << "syncing to: " << m->fullName(), 0); - return const_cast<Member*>(m); + if (!closest) { + _currentSyncTarget = NULL; + return NULL; + } + + _currentSyncTarget = closest; } - sethbmsg( str::stream() << "couldn't find a member matching the sync criteria: " << - "\nstate? " << (secondaryOnly ? "2" : "none") << - "\nname? " << (name ? name : "none") << - "\n_id? " << id << - "\noptime? " << optime.toStringPretty() ); + sethbmsg( str::stream() << "syncing to: " << closest->fullName(), 0); - return NULL; + return const_cast<Member*>(closest); } /** @@ -186,6 +124,12 @@ namespace mongo { void ReplSetImpl::_syncDoInitialSync() { sethbmsg("initial sync pending",0); + // if this is the first node, it may have already become primary + if ( box.getState().primary() ) { + sethbmsg("I'm already primary, no need for initial sync",0); + return; + } + const Member *source = getMemberToSyncTo(); if (!source) { sethbmsg("initial sync need a member to be primary or secondary to do our initial sync", 0); @@ -252,13 +196,14 @@ namespace mongo { /* apply relevant portion of the oplog */ { - sethbmsg("initial sync initial oplog application"); - isyncassert( "initial sync source must remain readable throughout our initial sync [2]", source->state().readable() ); + isyncassert( str::stream() << "initial sync source must remain readable throughout our initial sync [2] state now: " << source->state().toString() , source->state().readable() ); if( ! initialSyncOplogApplication(source, /*applyGTE*/startingTS, /*minValid*/mvoptime) ) { // note we assume here that this call does not throw log() << "replSet initial sync failed during applyoplog" << rsLog; emptyOplog(); // otherwise we'll be up! + lastOpTimeWritten = OpTime(); lastH = 0; + log() << "replSet cleaning up [1]" << rsLog; { writelock lk("local."); diff --git a/db/repl/rs_initiate.cpp b/db/repl/rs_initiate.cpp index cf1941f..3d998a8 100644 --- a/db/repl/rs_initiate.cpp +++ b/db/repl/rs_initiate.cpp @@ -37,8 +37,8 @@ namespace mongo { throws @param initial true when initiating */ - void checkMembersUpForConfigChange(const ReplSetConfig& cfg, bool initial) { - int failures = 0; + void checkMembersUpForConfigChange(const ReplSetConfig& cfg, BSONObjBuilder& result, bool initial) { + int failures = 0, allVotes = 0, allowableFailures = 0; int me = 0; stringstream selfs; for( vector<ReplSetConfig::MemberCfg>::const_iterator i = cfg.members.begin(); i != cfg.members.end(); i++ ) { @@ -51,7 +51,10 @@ namespace mongo { uasserted(13420, "initiation and reconfiguration of a replica set must be sent to a node that can become primary"); } } + allVotes += i->votes; } + allowableFailures = allVotes - (allVotes/2 + 1); + uassert(13278, "bad config: isSelf is true for multiple hosts: " + selfs.str(), me <= 1); // dups? if( me != 1 ) { stringstream ss; @@ -61,6 +64,7 @@ namespace mongo { uasserted(13279, ss.str()); } + vector<string> down; for( vector<ReplSetConfig::MemberCfg>::const_iterator i = cfg.members.begin(); i != cfg.members.end(); i++ ) { // we know we're up if (i->h.isSelf()) { @@ -100,27 +104,27 @@ namespace mongo { } } if( !ok && !res["rs"].trueValue() ) { + down.push_back(i->h.toString()); + if( !res.isEmpty() ) { /* strange. got a response, but not "ok". log it. */ log() << "replSet warning " << i->h.toString() << " replied: " << res.toString() << rsLog; } bool allowFailure = false; - failures++; - if( res.isEmpty() && !initial && failures == 1 ) { - /* for now we are only allowing 1 node to be down on a reconfig. this can be made to be a minority - trying to keep change small as release is near. - */ + failures += i->votes; + if( !initial && failures <= allowableFailures ) { const Member* m = theReplSet->findById( i->_id ); if( m ) { - // ok, so this was an existing member (wouldn't make sense to add to config a new member that is down) assert( m->h().toString() == i->h.toString() ); - allowFailure = true; } + // it's okay if the down member isn't part of the config, + // we might be adding a new member that isn't up yet + allowFailure = true; } if( !allowFailure ) { - string msg = string("need members up to initiate, not ok : ") + i->h.toString(); + string msg = string("need all members up to initiate, not ok : ") + i->h.toString(); if( !initial ) msg = string("need most members up to reconfigure, not ok : ") + i->h.toString(); uasserted(13144, msg); @@ -133,6 +137,9 @@ namespace mongo { !hasData || i->h.isSelf()); } } + if (down.size() > 0) { + result.append("down", down); + } } class CmdReplSetInitiate : public ReplSetCommand { @@ -143,7 +150,7 @@ namespace mongo { h << "Initiate/christen a replica set."; h << "\nhttp://www.mongodb.org/display/DOCS/Replica+Set+Commands"; } - virtual bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl) { + virtual bool run(const string& , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool fromRepl) { log() << "replSet replSetInitiate admin command received from client" << rsLog; if( !replSet ) { @@ -179,7 +186,7 @@ namespace mongo { if( ReplSet::startupStatus == ReplSet::BADCONFIG ) { errmsg = "server already in BADCONFIG state (check logs); not initiating"; - result.append("info", ReplSet::startupStatusMsg); + result.append("info", ReplSet::startupStatusMsg.get()); return false; } if( ReplSet::startupStatus != ReplSet::EMPTYCONFIG ) { @@ -204,6 +211,7 @@ namespace mongo { b.append("_id", name); bob members; members.append("0", BSON( "_id" << 0 << "host" << HostAndPort::Me().toString() )); + result.append("me", HostAndPort::Me().toString()); for( unsigned i = 0; i < seeds.size(); i++ ) members.append(bob::numStr(i+1), BSON( "_id" << i+1 << "host" << seeds[i].toString())); b.appendArray("members", members.obj()); @@ -226,7 +234,7 @@ namespace mongo { log() << "replSet replSetInitiate config object parses ok, " << newConfig.members.size() << " members specified" << rsLog; - checkMembersUpForConfigChange(newConfig, true); + checkMembersUpForConfigChange(newConfig, result, true); log() << "replSet replSetInitiate all members seem up" << rsLog; @@ -238,7 +246,7 @@ namespace mongo { log() << "replSet replSetInitiate config now saved locally. Should come online in about a minute." << rsLog; result.append("info", "Config now saved locally. Should come online in about a minute."); ReplSet::startupStatus = ReplSet::SOON; - ReplSet::startupStatusMsg = "Received replSetInitiate - should come online shortly."; + ReplSet::startupStatusMsg.set("Received replSetInitiate - should come online shortly."); } catch( DBException& e ) { log() << "replSet replSetInitiate exception: " << e.what() << rsLog; @@ -248,6 +256,11 @@ namespace mongo { errmsg = string("couldn't initiate : ") + e.what(); return false; } + catch( string& e2 ) { + log() << e2 << rsLog; + errmsg = e2; + return false; + } return true; } diff --git a/db/repl/rs_member.h b/db/repl/rs_member.h index b685c04..d60bb52 100644 --- a/db/repl/rs_member.h +++ b/db/repl/rs_member.h @@ -49,6 +49,7 @@ namespace mongo { MemberState(MS ms = RS_UNKNOWN) : s(ms) { } explicit MemberState(int ms) : s((MS) ms) { } + bool startup() const { return s == RS_STARTUP; } bool primary() const { return s == RS_PRIMARY; } bool secondary() const { return s == RS_SECONDARY; } bool recovering() const { return s == RS_RECOVERING; } @@ -79,6 +80,8 @@ namespace mongo { DiagStr lastHeartbeatMsg; OpTime opTime; int skew; + unsigned int ping; // milliseconds + static unsigned int numPings; bool up() const { return health > 0; } @@ -104,4 +107,20 @@ namespace mongo { hbstate != old.hbstate; } + inline string MemberState::toString() const { + switch ( s ) { + case RS_STARTUP: return "STARTUP"; + case RS_PRIMARY: return "PRIMARY"; + case RS_SECONDARY: return "SECONDARY"; + case RS_RECOVERING: return "RECOVERING"; + case RS_FATAL: return "FATAL"; + case RS_STARTUP2: return "STARTUP2"; + case RS_ARBITER: return "ARBITER"; + case RS_DOWN: return "DOWN"; + case RS_ROLLBACK: return "ROLLBACK"; + case RS_UNKNOWN: return "UNKNOWN"; + } + return ""; + } + } diff --git a/db/repl/rs_rollback.cpp b/db/repl/rs_rollback.cpp index 0b4cc28..f012e65 100644 --- a/db/repl/rs_rollback.cpp +++ b/db/repl/rs_rollback.cpp @@ -20,7 +20,10 @@ #include "../../client/dbclient.h" #include "rs.h" #include "../repl.h" -#include "../query.h" +#include "../ops/query.h" +#include "../cloner.h" +#include "../ops/update.h" +#include "../ops/delete.h" /* Scenarios @@ -62,7 +65,6 @@ namespace mongo { using namespace bson; - bool copyCollectionFromRemote(const string& host, const string& ns, const BSONObj& query, string& errmsg, bool logforrepl); void incRBID(); class rsfatal : public std::exception { @@ -227,9 +229,9 @@ namespace mongo { log() << "replSet info rollback our last optime: " << ourTime.toStringPretty() << rsLog; log() << "replSet info rollback their last optime: " << theirTime.toStringPretty() << rsLog; log() << "replSet info rollback diff in end of log times: " << diff << " seconds" << rsLog; - if( diff > 3600 ) { + if( diff > 1800 ) { log() << "replSet rollback too long a time period for a rollback." << rsLog; - throw "error not willing to roll back more than one hour of data"; + throw "error not willing to roll back more than 30 minutes of data"; } } @@ -339,7 +341,7 @@ namespace mongo { { /* TODO : slow. lots of round trips. */ n++; - bo good= them->findOne(d.ns, d._id.wrap()).getOwned(); + bo good= them->findOne(d.ns, d._id.wrap(), NULL, QueryOption_SlaveOk).getOwned(); totSize += good.objsize(); uassert( 13410, "replSet too much data to roll back", totSize < 300 * 1024 * 1024 ); @@ -393,7 +395,7 @@ namespace mongo { dropCollection(ns, errmsg, res); { dbtemprelease r; - bool ok = copyCollectionFromRemote(them->getServerAddress(), ns, bo(), errmsg, false); + bool ok = copyCollectionFromRemote(them->getServerAddress(), ns, bo(), errmsg, false, true, false); if( !ok ) { log() << "replSet rollback error resyncing collection " << ns << ' ' << errmsg << rsLog; throw "rollback error resyncing rollection [1]"; @@ -572,7 +574,7 @@ namespace mongo { sethbmsg("rollback 6"); // clean up oplog - log(2) << "replSet rollback truncate oplog after " << h.commonPoint.toStringPretty() << rsLog; + LOG(2) << "replSet rollback truncate oplog after " << h.commonPoint.toStringPretty() << rsLog; // todo: fatal error if this throws? oplogDetails->cappedTruncateAfter(rsoplog, h.commonPointOurDiskloc, false); @@ -607,26 +609,20 @@ namespace mongo { return 2; } - if( box.getState().secondary() ) { + if( state().secondary() ) { /* by doing this, we will not service reads (return an error as we aren't in secondary staate. that perhaps is moot becasue of the write lock above, but that write lock probably gets deferred or removed or yielded later anyway. also, this is better for status reporting - we know what is happening. */ - box.change(MemberState::RS_ROLLBACK, _self); + changeState(MemberState::RS_ROLLBACK); } HowToFixUp how; sethbmsg("rollback 1"); { r.resetCursor(); - /*DBClientConnection us(false, 0, 0); - string errmsg; - if( !us.connect(HostAndPort::me().toString(),errmsg) ) { - sethbmsg("rollback connect to self failure" + errmsg); - return; - }*/ sethbmsg("rollback 2 FindCommonPoint"); try { @@ -668,7 +664,7 @@ namespace mongo { /* success - leave "ROLLBACK" state can go to SECONDARY once minvalid is achieved */ - box.change(MemberState::RS_RECOVERING, _self); + changeState(MemberState::RS_RECOVERING); } return 0; diff --git a/db/repl/rs_sync.cpp b/db/repl/rs_sync.cpp index 8d06fcc..b29328b 100644 --- a/db/repl/rs_sync.cpp +++ b/db/repl/rs_sync.cpp @@ -20,28 +20,28 @@ #include "rs.h" #include "../repl.h" #include "connections.h" + namespace mongo { using namespace bson; extern unsigned replSetForceInitialSyncFailure; + void NOINLINE_DECL blank(const BSONObj& o) { + if( *o.getStringField("op") != 'n' ) { + log() << "replSet skipping bad op in oplog: " << o.toString() << rsLog; + } + } + /* apply the log op that is in param o */ void ReplSetImpl::syncApply(const BSONObj &o) { - char db[MaxDatabaseNameLen]; const char *ns = o.getStringField("ns"); - nsToDatabase(ns, db); - if ( *ns == '.' || *ns == 0 ) { - if( *o.getStringField("op") == 'n' ) - return; - log() << "replSet skipping bad op in oplog: " << o.toString() << endl; + blank(o); return; } Client::Context ctx(ns); ctx.getClient()->curop()->reset(); - - /* todo : if this asserts, do we want to ignore or not? */ applyOperation_inlock(o); } @@ -63,15 +63,11 @@ namespace mongo { return false; } - { - BSONObjBuilder q; - q.appendDate("$gte", applyGTE.asDate()); - BSONObjBuilder query; - query.append("ts", q.done()); - BSONObj queryObj = query.done(); - r.query(rsoplog, queryObj); + r.tailingQueryGTE( rsoplog, applyGTE ); + if ( !r.haveCursor() ) { + log() << "replSet initial sync oplog query error" << rsLog; + return false; } - assert( r.haveCursor() ); { if( !r.more() ) { @@ -83,7 +79,7 @@ namespace mongo { OpTime t = op["ts"]._opTime(); r.putBack(op); - if( op.firstElement().fieldName() == string("$err") ) { + if( op.firstElementFieldName() == string("$err") ) { log() << "replSet initial sync error querying " << rsoplog << " on " << hn << " : " << op.toString() << rsLog; return false; } @@ -95,6 +91,9 @@ namespace mongo { log() << "replSet initial sync but received a first optime of " << t << " from " << hn << rsLog; return false; } + + sethbmsg(str::stream() << "initial oplog application from " << hn << " starting at " + << t.toStringPretty() << " to " << minValid.toStringPretty()); } } catch(DBException& e) { @@ -107,6 +106,7 @@ namespace mongo { // todo : use exhaust OpTime ts; + time_t start = time(0); unsigned long long n = 0; while( 1 ) { try { @@ -139,18 +139,35 @@ namespace mongo { } _logOpObjRS(o); /* with repl sets we write the ops to our oplog too */ } - if( ++n % 100000 == 0 ) { - // simple progress metering - log() << "replSet initialSyncOplogApplication " << n << rsLog; + + if ( ++n % 1000 == 0 ) { + time_t now = time(0); + if (now - start > 10) { + // simple progress metering + log() << "replSet initialSyncOplogApplication applied " << n << " operations, synced to " + << ts.toStringPretty() << rsLog; + start = now; + } } getDur().commitIfNeeded(); } catch (DBException& e) { + // skip duplicate key exceptions if( e.getCode() == 11000 || e.getCode() == 11001 ) { - // skip duplicate key exceptions continue; } + + // handle cursor not found (just requery) + if( e.getCode() == 13127 ) { + r.resetCursor(); + r.tailingQueryGTE(rsoplog, ts); + if( r.haveCursor() ) { + continue; + } + } + + // TODO: handle server restart if( ts <= minValid ) { // didn't make it far enough @@ -171,6 +188,16 @@ namespace mongo { */ bool ReplSetImpl::tryToGoLiveAsASecondary(OpTime& /*out*/ minvalid) { bool golive = false; + + { + lock lk( this ); + + if (_maintenanceMode > 0) { + // we're not actually going live + return true; + } + } + { readlock lk("local.replset.minvalid"); BSONObj mv; @@ -190,35 +217,35 @@ namespace mongo { return golive; } - /** - * Checks if the oplog given is too far ahead to read from. - * - * @param r the oplog - * @param hn the hostname (for log messages) - * - * @return if we are stale compared to the oplog on hn - */ bool ReplSetImpl::_isStale(OplogReader& r, const string& hn) { BSONObj remoteOldestOp = r.findOne(rsoplog, Query()); OpTime ts = remoteOldestOp["ts"]._opTime(); DEV log() << "replSet remoteOldestOp: " << ts.toStringLong() << rsLog; - else log(3) << "replSet remoteOldestOp: " << ts.toStringLong() << rsLog; + else LOG(3) << "replSet remoteOldestOp: " << ts.toStringLong() << rsLog; DEV { - // debugging sync1.js... log() << "replSet lastOpTimeWritten: " << lastOpTimeWritten.toStringLong() << rsLog; log() << "replSet our state: " << state().toString() << rsLog; } - if( lastOpTimeWritten < ts ) { - log() << "replSet error RS102 too stale to catch up, at least from " << hn << rsLog; - log() << "replSet our last optime : " << lastOpTimeWritten.toStringLong() << rsLog; - log() << "replSet oldest at " << hn << " : " << ts.toStringLong() << rsLog; - log() << "replSet See http://www.mongodb.org/display/DOCS/Resyncing+a+Very+Stale+Replica+Set+Member" << rsLog; - sethbmsg("error RS102 too stale to catch up"); - changeState(MemberState::RS_RECOVERING); - sleepsecs(120); - return true; - } - return false; + if( lastOpTimeWritten >= ts ) { + return false; + } + + // we're stale + log() << "replSet error RS102 too stale to catch up, at least from " << hn << rsLog; + log() << "replSet our last optime : " << lastOpTimeWritten.toStringLong() << rsLog; + log() << "replSet oldest at " << hn << " : " << ts.toStringLong() << rsLog; + log() << "replSet See http://www.mongodb.org/display/DOCS/Resyncing+a+Very+Stale+Replica+Set+Member" << rsLog; + + // reset minvalid so that we can't become primary prematurely + { + writelock lk("local.replset.minvalid"); + Helpers::putSingleton("local.replset.minvalid", remoteOldestOp); + } + + sethbmsg("error RS102 too stale to catch up"); + changeState(MemberState::RS_RECOVERING); + sleepsecs(120); + return true; } /** @@ -234,7 +261,7 @@ namespace mongo { assert(r.conn() == 0); if( !r.connect(hn) ) { - log(2) << "replSet can't connect to " << hn << " to read operations" << rsLog; + LOG(2) << "replSet can't connect to " << hn << " to read operations" << rsLog; r.resetConnection(); return false; } @@ -250,8 +277,11 @@ namespace mongo { // todo : locking vis a vis the mgr... OplogReader r; string hn; + const Member *target = 0; - const Member *target = box.getPrimary(); + // if we cannot reach the master but someone else is more up-to-date + // than we are, sync from them. + target = getMemberToSyncTo(); if (target != 0) { hn = target->h().toString(); if (!_getOplogReader(r, hn)) { @@ -260,32 +290,21 @@ namespace mongo { target = 0; } } - - // if we cannot reach the master but someone else is more up-to-date - // than we are, sync from them. - if( target == 0 ) { - for(Member *m = head(); m; m=m->next()) { - hn = m->h().toString(); - if (m->hbinfo().up() && m->state().readable() && - (m->hbinfo().opTime > lastOpTimeWritten) && - m->config().slaveDelay == 0 && - _getOplogReader(r, hn)) { - target = m; - break; - } - } - - // no server found - if (target == 0) { - // if there is no one to sync from - OpTime minvalid; - tryToGoLiveAsASecondary(minvalid); - return; - } + + // no server found + if (target == 0) { + // if there is no one to sync from + OpTime minvalid; + tryToGoLiveAsASecondary(minvalid); + return; } - + r.tailingQueryGTE(rsoplog, lastOpTimeWritten); - assert( r.haveCursor() ); + // if target cut connections between connecting and querying (for + // example, because it stepped down) we might not have a cursor + if ( !r.haveCursor() ) { + return; + } uassert(1000, "replSet source for syncing doesn't seem to be await capable -- is it an older version of mongodb?", r.awaitCapable() ); @@ -314,22 +333,14 @@ namespace mongo { sleepsecs(2); } return; - /* - log() << "replSet syncTail error querying oplog >= " << lastOpTimeWritten.toString() << " from " << hn << rsLog; - try { - log() << "replSet " << hn << " last op: " << r.getLastOp(rsoplog).toString() << rsLog; - } - catch(...) { } - sleepsecs(1); - return;*/ } BSONObj o = r.nextSafe(); OpTime ts = o["ts"]._opTime(); long long h = o["h"].numberLong(); if( ts != lastOpTimeWritten || h != lastH ) { - log() << "replSet our last op time written: " << lastOpTimeWritten.toStringPretty() << endl; - log() << "replset source's GTE: " << ts.toStringPretty() << endl; + log() << "replSet our last op time written: " << lastOpTimeWritten.toStringPretty() << rsLog; + log() << "replset source's GTE: " << ts.toStringPretty() << rsLog; syncRollback(r); return; } @@ -362,15 +373,8 @@ namespace mongo { /* todo: too stale capability */ } - { - const Member *primary = box.getPrimary(); - - if( !target->hbinfo().hbstate.readable() || - // if we are not syncing from the primary, return (if - // it's up) so that we can try accessing it again - (target != primary && primary != 0)) { - return; - } + if( !target->hbinfo().hbstate.readable() ) { + return; } } if( !r.more() ) @@ -389,20 +393,22 @@ namespace mongo { long long sleeptime = sd - lag; if( sleeptime > 0 ) { uassert(12000, "rs slaveDelay differential too big check clocks and systems", sleeptime < 0x40000000); - log() << "replSet temp slavedelay sleep:" << sleeptime << rsLog; if( sleeptime < 60 ) { sleepsecs((int) sleeptime); } else { + log() << "replSet slavedelay sleep long time: " << sleeptime << rsLog; // sleep(hours) would prevent reconfigs from taking effect & such! long long waitUntil = b + sleeptime; while( 1 ) { sleepsecs(6); if( time(0) >= waitUntil ) break; + if( !target->hbinfo().hbstate.readable() ) { break; } + if( myConfig().slaveDelay != sd ) // reconf break; } @@ -411,7 +417,7 @@ namespace mongo { } - { + try { writelock lk(""); /* if we have become primary, we dont' want to apply things from elsewhere @@ -423,16 +429,22 @@ namespace mongo { } syncApply(o); - _logOpObjRS(o); /* with repl sets we write the ops to our oplog too: */ + _logOpObjRS(o); // with repl sets we write the ops to our oplog too + } + catch (DBException& e) { + sethbmsg(str::stream() << "syncTail: " << e.toString() << ", syncing: " << o); + sleepsecs(30); + return; } } } r.tailCheck(); if( !r.haveCursor() ) { - log(1) << "replSet end syncTail pass with " << hn << rsLog; + LOG(1) << "replSet end syncTail pass with " << hn << rsLog; // TODO : reuse our connection to the primary. return; } + if( !target->hbinfo().hbstate.readable() ) { return; } @@ -446,7 +458,7 @@ namespace mongo { sleepsecs(1); return; } - if( sp.state.fatal() ) { + if( sp.state.fatal() || sp.state.startup() ) { sleepsecs(5); return; } @@ -462,32 +474,23 @@ namespace mongo { } void ReplSetImpl::syncThread() { - /* test here was to force a receive timeout - ScopedConn c("localhost"); - bo info; - try { - log() << "this is temp" << endl; - c.runCommand("admin", BSON("sleep"<<120), info); - log() << info.toString() << endl; - c.runCommand("admin", BSON("sleep"<<120), info); - log() << "temp" << endl; - } - catch( DBException& e ) { - log() << e.toString() << endl; - c.runCommand("admin", BSON("sleep"<<120), info); - log() << "temp" << endl; - } - */ - while( 1 ) { - if( myConfig().arbiterOnly ) + // After a reconfig, we may not be in the replica set anymore, so + // check that we are in the set (and not an arbiter) before + // trying to sync with other replicas. + if( ! _self ) { + log() << "replSet warning did not detect own host and port, not syncing, config: " << theReplSet->config() << rsLog; + return; + } + if( myConfig().arbiterOnly ) { return; + } try { _syncThread(); } catch(DBException& e) { - sethbmsg("syncThread: " + e.toString()); + sethbmsg(str::stream() << "syncThread: " << e.toString()); sleepsecs(10); } catch(...) { @@ -501,7 +504,9 @@ namespace mongo { are no heartbeat threads, so we do it here to be sure. this is relevant if the singleton member has done a stepDown() and needs to come back up. */ - OCCASIONALLY mgr->send( boost::bind(&Manager::msgCheckNewState, theReplSet->mgr) ); + OCCASIONALLY { + mgr->send( boost::bind(&Manager::msgCheckNewState, theReplSet->mgr) ); + } } } @@ -513,13 +518,115 @@ namespace mongo { } n++; - Client::initThread("replica set sync"); - cc().iAmSyncThread(); - if (!noauth) { - cc().getAuthenticationInfo()->authorize("local"); - } + Client::initThread("rsSync"); + cc().iAmSyncThread(); // for isSyncThread() (which is used not used much, is used in secondary create index code + replLocalAuth(); theReplSet->syncThread(); cc().shutdown(); } + void GhostSync::starting() { + Client::initThread("rsGhostSync"); + replLocalAuth(); + } + + void GhostSync::associateSlave(const BSONObj& id, const int memberId) { + const OID rid = id["_id"].OID(); + rwlock lk( _lock , true ); + GhostSlave &slave = _ghostCache[rid]; + if (slave.init) { + LOG(1) << "tracking " << slave.slave->h().toString() << " as " << rid << rsLog; + return; + } + + slave.slave = (Member*)rs->findById(memberId); + if (slave.slave != 0) { + slave.init = true; + } + else { + log() << "replset couldn't find a slave with id " << memberId + << ", not tracking " << rid << rsLog; + } + } + + void GhostSync::updateSlave(const mongo::OID& rid, const OpTime& last) { + rwlock lk( _lock , false ); + MAP::iterator i = _ghostCache.find( rid ); + if ( i == _ghostCache.end() ) { + OCCASIONALLY warning() << "couldn't update slave " << rid << " no entry" << rsLog; + return; + } + + GhostSlave& slave = i->second; + if (!slave.init) { + OCCASIONALLY log() << "couldn't update slave " << rid << " not init" << rsLog; + return; + } + + ((ReplSetConfig::MemberCfg)slave.slave->config()).updateGroups(last); + } + + void GhostSync::percolate(const BSONObj& id, const OpTime& last) { + const OID rid = id["_id"].OID(); + GhostSlave* slave; + { + rwlock lk( _lock , false ); + + MAP::iterator i = _ghostCache.find( rid ); + if ( i == _ghostCache.end() ) { + OCCASIONALLY log() << "couldn't percolate slave " << rid << " no entry" << rsLog; + return; + } + + slave = &(i->second); + if (!slave->init) { + OCCASIONALLY log() << "couldn't percolate slave " << rid << " not init" << rsLog; + return; + } + } + + assert(slave->slave); + + const Member *target = rs->_currentSyncTarget; + if (!target || rs->box.getState().primary() + // we are currently syncing from someone who's syncing from us + // the target might end up with a new Member, but s.slave never + // changes so we'll compare the names + || target == slave->slave || target->fullName() == slave->slave->fullName()) { + LOG(1) << "replica set ghost target no good" << endl; + return; + } + + try { + if (!slave->reader.haveCursor()) { + if (!slave->reader.connect(id, slave->slave->id(), target->fullName())) { + // error message logged in OplogReader::connect + return; + } + slave->reader.ghostQueryGTE(rsoplog, last); + } + + LOG(1) << "replSet last: " << slave->last.toString() << " to " << last.toString() << rsLog; + if (slave->last > last) { + return; + } + + while (slave->last <= last) { + if (!slave->reader.more()) { + // we'll be back + return; + } + + BSONObj o = slave->reader.nextSafe(); + slave->last = o["ts"]._opTime(); + } + LOG(2) << "now last is " << slave->last.toString() << rsLog; + } + catch (DBException& e) { + // we'll be back + LOG(2) << "replSet ghost sync error: " << e.what() << " for " + << slave->slave->fullName() << rsLog; + slave->reader.resetConnection(); + } + } } |