diff options
Diffstat (limited to 'db/repl/consensus.cpp')
-rw-r--r-- | db/repl/consensus.cpp | 342 |
1 files changed, 342 insertions, 0 deletions
diff --git a/db/repl/consensus.cpp b/db/repl/consensus.cpp new file mode 100644 index 0000000..4eba17d --- /dev/null +++ b/db/repl/consensus.cpp @@ -0,0 +1,342 @@ +/** +* Copyright (C) 2010 10gen Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "pch.h" +#include "../commands.h" +#include "rs.h" +#include "multicmd.h" + +namespace mongo { + + class CmdReplSetFresh : public ReplSetCommand { + public: + CmdReplSetFresh() : ReplSetCommand("replSetFresh") { } + private: + virtual bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl) { + if( !check(errmsg, result) ) + return false; + + if( cmdObj["set"].String() != theReplSet->name() ) { + errmsg = "wrong repl set name"; + return false; + } + string who = cmdObj["who"].String(); + int cfgver = cmdObj["cfgver"].Int(); + OpTime opTime(cmdObj["opTime"].Date()); + + bool weAreFresher = false; + if( theReplSet->config().version > cfgver ) { + log() << "replSet member " << who << " is not yet aware its cfg version " << cfgver << " is stale" << rsLog; + result.append("info", "config version stale"); + weAreFresher = true; + } + else if( opTime < theReplSet->lastOpTimeWritten ) { + weAreFresher = true; + } + result.appendDate("opTime", theReplSet->lastOpTimeWritten.asDate()); + result.append("fresher", weAreFresher); + return true; + } + } cmdReplSetFresh; + + class CmdReplSetElect : public ReplSetCommand { + public: + CmdReplSetElect() : ReplSetCommand("replSetElect") { } + private: + virtual bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl) { + if( !check(errmsg, result) ) + return false; + //task::lam f = boost::bind(&Consensus::electCmdReceived, &theReplSet->elect, cmdObj, &result); + //theReplSet->mgr->call(f); + theReplSet->elect.electCmdReceived(cmdObj, &result); + return true; + } + } cmdReplSetElect; + + int Consensus::totalVotes() const { + static int complain = 0; + int vTot = rs._self->config().votes; + for( Member *m = rs.head(); m; m=m->next() ) + vTot += m->config().votes; + if( vTot % 2 == 0 && vTot && complain++ == 0 ) + log() << "replSet warning total number of votes is even - considering giving one member an extra vote" << rsLog; + return vTot; + } + + bool Consensus::aMajoritySeemsToBeUp() const { + int vUp = rs._self->config().votes; + for( Member *m = rs.head(); m; m=m->next() ) + vUp += m->hbinfo().up() ? m->config().votes : 0; + return vUp * 2 > totalVotes(); + } + + static const int VETO = -10000; + + const time_t LeaseTime = 30; + + unsigned Consensus::yea(unsigned memberId) /* throws VoteException */ { + Atomic<LastYea>::tran t(ly); + LastYea &ly = t.ref(); + time_t now = time(0); + if( ly.when + LeaseTime >= now && ly.who != memberId ) { + log(1) << "replSet not voting yea for " << memberId << + " voted for " << ly.who << ' ' << now-ly.when << " secs ago" << rsLog; + throw VoteException(); + } + ly.when = now; + ly.who = memberId; + return rs._self->config().votes; + } + + /* we vote for ourself at start of election. once it fails, we can cancel the lease we had in + place instead of leaving it for a long time. + */ + void Consensus::electionFailed(unsigned meid) { + Atomic<LastYea>::tran t(ly); + LastYea &L = t.ref(); + DEV assert( L.who == meid ); // this may not always always hold, so be aware, but adding for now as a quick sanity test + if( L.who == meid ) + L.when = 0; + } + + /* todo: threading **************** !!!!!!!!!!!!!!!! */ + void Consensus::electCmdReceived(BSONObj cmd, BSONObjBuilder* _b) { + BSONObjBuilder& b = *_b; + DEV log() << "replSet received elect msg " << cmd.toString() << rsLog; + else log(2) << "replSet received elect msg " << cmd.toString() << rsLog; + string set = cmd["set"].String(); + unsigned whoid = cmd["whoid"].Int(); + int cfgver = cmd["cfgver"].Int(); + OID round = cmd["round"].OID(); + int myver = rs.config().version; + + int vote = 0; + if( set != rs.name() ) { + log() << "replSet error received an elect request for '" << set << "' but our set name is '" << rs.name() << "'" << rsLog; + + } + else if( myver < cfgver ) { + // we are stale. don't vote + } + else if( myver > cfgver ) { + // they are stale! + log() << "replSet info got stale version # during election" << rsLog; + vote = -10000; + } + else { + try { + vote = yea(whoid); + rs.relinquish(); + log() << "replSet info voting yea for " << whoid << rsLog; + } + catch(VoteException&) { + log() << "replSet voting no already voted for another" << rsLog; + } + } + + b.append("vote", vote); + b.append("round", round); + } + + void ReplSetImpl::_getTargets(list<Target>& L, int& configVersion) { + configVersion = config().version; + for( Member *m = head(); m; m=m->next() ) + if( m->hbinfo().up() ) + L.push_back( Target(m->fullName()) ); + } + + /* config version is returned as it is ok to use this unlocked. BUT, if unlocked, you would need + to check later that the config didn't change. */ + void ReplSetImpl::getTargets(list<Target>& L, int& configVersion) { + if( lockedByMe() ) { + _getTargets(L, configVersion); + return; + } + lock lk(this); + _getTargets(L, configVersion); + } + + /* Do we have the newest data of them all? + @param allUp - set to true if all members are up. Only set if true returned. + @return true if we are freshest. Note we may tie. + */ + bool Consensus::weAreFreshest(bool& allUp, int& nTies) { + const OpTime ord = theReplSet->lastOpTimeWritten; + nTies = 0; + assert( !ord.isNull() ); + BSONObj cmd = BSON( + "replSetFresh" << 1 << + "set" << rs.name() << + "opTime" << Date_t(ord.asDate()) << + "who" << rs._self->fullName() << + "cfgver" << rs._cfg->version ); + list<Target> L; + int ver; + rs.getTargets(L, ver); + multiCommand(cmd, L); + int nok = 0; + allUp = true; + for( list<Target>::iterator i = L.begin(); i != L.end(); i++ ) { + if( i->ok ) { + nok++; + if( i->result["fresher"].trueValue() ) + return false; + OpTime remoteOrd( i->result["opTime"].Date() ); + if( remoteOrd == ord ) + nTies++; + assert( remoteOrd <= ord ); + } + else { + DEV log() << "replSet freshest returns " << i->result.toString() << rsLog; + allUp = false; + } + } + DEV log() << "replSet dev we are freshest of up nodes, nok:" << nok << " nTies:" << nTies << rsLog; + assert( ord <= theReplSet->lastOpTimeWritten ); // <= as this may change while we are working... + return true; + } + + extern time_t started; + + void Consensus::multiCommand(BSONObj cmd, list<Target>& L) { + assert( !rs.lockedByMe() ); + mongo::multiCommand(cmd, L); + } + + void Consensus::_electSelf() { + if( time(0) < steppedDown ) + return; + + { + const OpTime ord = theReplSet->lastOpTimeWritten; + if( ord == 0 ) { + log() << "replSet info not trying to elect self, do not yet have a complete set of data from any point in time" << rsLog; + return; + } + } + + bool allUp; + int nTies; + if( !weAreFreshest(allUp, nTies) ) { + log() << "replSet info not electing self, we are not freshest" << rsLog; + return; + } + + rs.sethbmsg("",9); + + if( !allUp && time(0) - started < 60 * 5 ) { + /* the idea here is that if a bunch of nodes bounce all at once, we don't want to drop data + if we don't have to -- we'd rather be offline and wait a little longer instead + todo: make this configurable. + */ + rs.sethbmsg("not electing self, not all members up and we have been up less than 5 minutes"); + return; + } + + Member& me = *rs._self; + + if( nTies ) { + /* tie? we then randomly sleep to try to not collide on our voting. */ + /* todo: smarter. */ + if( me.id() == 0 || sleptLast ) { + // would be fine for one node not to sleep + // todo: biggest / highest priority nodes should be the ones that get to not sleep + } else { + assert( !rs.lockedByMe() ); // bad to go to sleep locked + unsigned ms = ((unsigned) rand()) % 1000 + 50; + DEV log() << "replSet tie " << nTies << " sleeping a little " << ms << "ms" << rsLog; + sleptLast = true; + sleepmillis(ms); + throw RetryAfterSleepException(); + } + } + sleptLast = false; + + time_t start = time(0); + unsigned meid = me.id(); + int tally = yea( meid ); + bool success = false; + try { + log() << "replSet info electSelf " << meid << rsLog; + + BSONObj electCmd = BSON( + "replSetElect" << 1 << + "set" << rs.name() << + "who" << me.fullName() << + "whoid" << me.hbinfo().id() << + "cfgver" << rs._cfg->version << + "round" << OID::gen() /* this is just for diagnostics */ + ); + + int configVersion; + list<Target> L; + rs.getTargets(L, configVersion); + multiCommand(electCmd, L); + + { + RSBase::lock lk(&rs); + for( list<Target>::iterator i = L.begin(); i != L.end(); i++ ) { + DEV log() << "replSet elect res: " << i->result.toString() << rsLog; + if( i->ok ) { + int v = i->result["vote"].Int(); + tally += v; + } + } + if( tally*2 <= totalVotes() ) { + log() << "replSet couldn't elect self, only received " << tally << " votes" << rsLog; + } + else if( time(0) - start > 30 ) { + // defensive; should never happen as we have timeouts on connection and operation for our conn + log() << "replSet too much time passed during our election, ignoring result" << rsLog; + } + else if( configVersion != rs.config().version ) { + log() << "replSet config version changed during our election, ignoring result" << rsLog; + } + else { + /* succeeded. */ + log(1) << "replSet election succeeded, assuming primary role" << rsLog; + success = true; + rs.assumePrimary(); + } + } + } catch( std::exception& ) { + if( !success ) electionFailed(meid); + throw; + } + if( !success ) electionFailed(meid); + } + + void Consensus::electSelf() { + assert( !rs.lockedByMe() ); + assert( !rs.myConfig().arbiterOnly ); + try { + _electSelf(); + } + catch(RetryAfterSleepException&) { + throw; + } + catch(VoteException& ) { + log() << "replSet not trying to elect self as responded yea to someone else recently" << rsLog; + } + catch(DBException& e) { + log() << "replSet warning caught unexpected exception in electSelf() " << e.toString() << rsLog; + } + catch(...) { + log() << "replSet warning caught unexpected exception in electSelf()" << rsLog; + } + } + +} |