/** * Copyright (C) 2010 10gen Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ #include "pch.h" #include "../commands.h" #include "rs.h" #include "multicmd.h" namespace mongo { class CmdReplSetFresh : public ReplSetCommand { public: CmdReplSetFresh() : ReplSetCommand("replSetFresh") { } private: virtual bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl) { if( !check(errmsg, result) ) return false; if( cmdObj["set"].String() != theReplSet->name() ) { errmsg = "wrong repl set name"; return false; } string who = cmdObj["who"].String(); int cfgver = cmdObj["cfgver"].Int(); OpTime opTime(cmdObj["opTime"].Date()); bool weAreFresher = false; if( theReplSet->config().version > cfgver ) { log() << "replSet member " << who << " is not yet aware its cfg version " << cfgver << " is stale" << rsLog; result.append("info", "config version stale"); weAreFresher = true; } else if( opTime < theReplSet->lastOpTimeWritten ) { weAreFresher = true; } result.appendDate("opTime", theReplSet->lastOpTimeWritten.asDate()); result.append("fresher", weAreFresher); return true; } } cmdReplSetFresh; class CmdReplSetElect : public ReplSetCommand { public: CmdReplSetElect() : ReplSetCommand("replSetElect") { } private: virtual bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl) { if( !check(errmsg, result) ) return false; //task::lam f = boost::bind(&Consensus::electCmdReceived, &theReplSet->elect, cmdObj, &result); //theReplSet->mgr->call(f); theReplSet->elect.electCmdReceived(cmdObj, &result); return true; } } cmdReplSetElect; int Consensus::totalVotes() const { static int complain = 0; int vTot = rs._self->config().votes; for( Member *m = rs.head(); m; m=m->next() ) vTot += m->config().votes; if( vTot % 2 == 0 && vTot && complain++ == 0 ) log() << "replSet " /*buildbot! warning */ "total number of votes is even - add arbiter or give one member an extra vote" << rsLog; return vTot; } bool Consensus::aMajoritySeemsToBeUp() const { int vUp = rs._self->config().votes; for( Member *m = rs.head(); m; m=m->next() ) vUp += m->hbinfo().up() ? m->config().votes : 0; return vUp * 2 > totalVotes(); } bool Consensus::shouldRelinquish() const { int vUp = rs._self->config().votes; const long long T = rs.config().ho.heartbeatTimeoutMillis * rs.config().ho.heartbeatConnRetries; for( Member *m = rs.head(); m; m=m->next() ) { long long dt = m->hbinfo().timeDown(); if( dt < T ) vUp += m->config().votes; } return !( vUp * 2 > totalVotes() ); } static const int VETO = -10000; const time_t LeaseTime = 30; unsigned Consensus::yea(unsigned memberId) { /* throws VoteException */ Atomic::tran t(ly); LastYea &ly = t.ref(); time_t now = time(0); if( ly.when + LeaseTime >= now && ly.who != memberId ) { log(1) << "replSet not voting yea for " << memberId << " voted for " << ly.who << ' ' << now-ly.when << " secs ago" << rsLog; throw VoteException(); } ly.when = now; ly.who = memberId; return rs._self->config().votes; } /* we vote for ourself at start of election. once it fails, we can cancel the lease we had in place instead of leaving it for a long time. */ void Consensus::electionFailed(unsigned meid) { Atomic::tran t(ly); LastYea &L = t.ref(); DEV assert( L.who == meid ); // this may not always always hold, so be aware, but adding for now as a quick sanity test if( L.who == meid ) L.when = 0; } /* todo: threading **************** !!!!!!!!!!!!!!!! */ void Consensus::electCmdReceived(BSONObj cmd, BSONObjBuilder* _b) { BSONObjBuilder& b = *_b; DEV log() << "replSet received elect msg " << cmd.toString() << rsLog; else log(2) << "replSet received elect msg " << cmd.toString() << rsLog; string set = cmd["set"].String(); unsigned whoid = cmd["whoid"].Int(); int cfgver = cmd["cfgver"].Int(); OID round = cmd["round"].OID(); int myver = rs.config().version; const Member* primary = rs.box.getPrimary(); const Member* hopeful = rs.findById(whoid); int vote = 0; if( set != rs.name() ) { log() << "replSet error received an elect request for '" << set << "' but our set name is '" << rs.name() << "'" << rsLog; } else if( myver < cfgver ) { // we are stale. don't vote } else if( myver > cfgver ) { // they are stale! log() << "replSet info got stale version # during election" << rsLog; vote = -10000; } else if( !hopeful ) { log() << "couldn't find member with id " << whoid << rsLog; vote = -10000; } else if( primary && primary == rs._self && rs.lastOpTimeWritten >= hopeful->hbinfo().opTime ) { // hbinfo is not updated, so we have to check the primary's last optime separately log() << "I am already primary, " << hopeful->fullName() << " can try again once I've stepped down" << rsLog; vote = -10000; } else if( primary && primary->hbinfo().opTime >= hopeful->hbinfo().opTime ) { // other members might be aware of more up-to-date nodes log() << hopeful->fullName() << " is trying to elect itself but " << primary->fullName() << " is already primary and more up-to-date" << rsLog; vote = -10000; } else { try { vote = yea(whoid); rs.relinquish(); log() << "replSet info voting yea for " << whoid << rsLog; } catch(VoteException&) { log() << "replSet voting no already voted for another" << rsLog; } } b.append("vote", vote); b.append("round", round); } void ReplSetImpl::_getTargets(list& L, int& configVersion) { configVersion = config().version; for( Member *m = head(); m; m=m->next() ) if( m->hbinfo().maybeUp() ) L.push_back( Target(m->fullName()) ); } /* config version is returned as it is ok to use this unlocked. BUT, if unlocked, you would need to check later that the config didn't change. */ void ReplSetImpl::getTargets(list& L, int& configVersion) { if( lockedByMe() ) { _getTargets(L, configVersion); return; } lock lk(this); _getTargets(L, configVersion); } /* Do we have the newest data of them all? @param allUp - set to true if all members are up. Only set if true returned. @return true if we are freshest. Note we may tie. */ bool Consensus::weAreFreshest(bool& allUp, int& nTies) { const OpTime ord = theReplSet->lastOpTimeWritten; nTies = 0; assert( !ord.isNull() ); BSONObj cmd = BSON( "replSetFresh" << 1 << "set" << rs.name() << "opTime" << Date_t(ord.asDate()) << "who" << rs._self->fullName() << "cfgver" << rs._cfg->version ); list L; int ver; /* the following queries arbiters, even though they are never fresh. wonder if that makes sense. it doesn't, but it could, if they "know" what freshness it one day. so consider removing arbiters from getTargets() here. although getTargets is used elsewhere for elections; there arbiters are certainly targets - so a "includeArbs" bool would be necessary if we want to make not fetching them herein happen. */ rs.getTargets(L, ver); multiCommand(cmd, L); int nok = 0; allUp = true; for( list::iterator i = L.begin(); i != L.end(); i++ ) { if( i->ok ) { nok++; if( i->result["fresher"].trueValue() ) return false; OpTime remoteOrd( i->result["opTime"].Date() ); if( remoteOrd == ord ) nTies++; assert( remoteOrd <= ord ); } else { DEV log() << "replSet freshest returns " << i->result.toString() << rsLog; allUp = false; } } log(1) << "replSet dev we are freshest of up nodes, nok:" << nok << " nTies:" << nTies << rsLog; assert( ord <= theReplSet->lastOpTimeWritten ); // <= as this may change while we are working... return true; } extern time_t started; void Consensus::multiCommand(BSONObj cmd, list& L) { assert( !rs.lockedByMe() ); mongo::multiCommand(cmd, L); } void Consensus::_electSelf() { if( time(0) < steppedDown ) return; { const OpTime ord = theReplSet->lastOpTimeWritten; if( ord == 0 ) { log() << "replSet info not trying to elect self, do not yet have a complete set of data from any point in time" << rsLog; return; } } bool allUp; int nTies; if( !weAreFreshest(allUp, nTies) ) { log() << "replSet info not electing self, we are not freshest" << rsLog; return; } rs.sethbmsg("",9); if( !allUp && time(0) - started < 60 * 5 ) { /* the idea here is that if a bunch of nodes bounce all at once, we don't want to drop data if we don't have to -- we'd rather be offline and wait a little longer instead todo: make this configurable. */ rs.sethbmsg("not electing self, not all members up and we have been up less than 5 minutes"); return; } Member& me = *rs._self; if( nTies ) { /* tie? we then randomly sleep to try to not collide on our voting. */ /* todo: smarter. */ if( me.id() == 0 || sleptLast ) { // would be fine for one node not to sleep // todo: biggest / highest priority nodes should be the ones that get to not sleep } else { assert( !rs.lockedByMe() ); // bad to go to sleep locked unsigned ms = ((unsigned) rand()) % 1000 + 50; DEV log() << "replSet tie " << nTies << " sleeping a little " << ms << "ms" << rsLog; sleptLast = true; sleepmillis(ms); throw RetryAfterSleepException(); } } sleptLast = false; time_t start = time(0); unsigned meid = me.id(); int tally = yea( meid ); bool success = false; try { log() << "replSet info electSelf " << meid << rsLog; BSONObj electCmd = BSON( "replSetElect" << 1 << "set" << rs.name() << "who" << me.fullName() << "whoid" << me.hbinfo().id() << "cfgver" << rs._cfg->version << "round" << OID::gen() /* this is just for diagnostics */ ); int configVersion; list L; rs.getTargets(L, configVersion); multiCommand(electCmd, L); { RSBase::lock lk(&rs); for( list::iterator i = L.begin(); i != L.end(); i++ ) { DEV log() << "replSet elect res: " << i->result.toString() << rsLog; if( i->ok ) { int v = i->result["vote"].Int(); tally += v; } } if( tally*2 <= totalVotes() ) { log() << "replSet couldn't elect self, only received " << tally << " votes" << rsLog; } else if( time(0) - start > 30 ) { // defensive; should never happen as we have timeouts on connection and operation for our conn log() << "replSet too much time passed during our election, ignoring result" << rsLog; } else if( configVersion != rs.config().version ) { log() << "replSet config version changed during our election, ignoring result" << rsLog; } else { /* succeeded. */ log(1) << "replSet election succeeded, assuming primary role" << rsLog; success = true; rs.assumePrimary(); } } } catch( std::exception& ) { if( !success ) electionFailed(meid); throw; } if( !success ) electionFailed(meid); } void Consensus::electSelf() { assert( !rs.lockedByMe() ); assert( !rs.myConfig().arbiterOnly ); assert( rs.myConfig().slaveDelay == 0 ); try { _electSelf(); } catch(RetryAfterSleepException&) { throw; } catch(VoteException& ) { log() << "replSet not trying to elect self as responded yea to someone else recently" << rsLog; } catch(DBException& e) { log() << "replSet warning caught unexpected exception in electSelf() " << e.toString() << rsLog; } catch(...) { log() << "replSet warning caught unexpected exception in electSelf()" << rsLog; } } }