diff options
author | Antonin Kral <a.kral@bobek.cz> | 2010-08-11 12:38:57 +0200 |
---|---|---|
committer | Antonin Kral <a.kral@bobek.cz> | 2010-08-11 12:38:57 +0200 |
commit | 7645618fd3914cb8a20561625913c20d49504a49 (patch) | |
tree | 8370f846f58f6d71165b7a0e2eda04648584ec76 /db/repl | |
parent | 68c73c3c7608b4c87f07440dc3232801720b1168 (diff) | |
download | mongodb-7645618fd3914cb8a20561625913c20d49504a49.tar.gz |
Imported Upstream version 1.6.0
Diffstat (limited to 'db/repl')
-rw-r--r-- | db/repl/connections.h | 91 | ||||
-rw-r--r-- | db/repl/consensus.cpp | 342 | ||||
-rw-r--r-- | db/repl/health.cpp | 389 | ||||
-rw-r--r-- | db/repl/health.h | 50 | ||||
-rw-r--r-- | db/repl/heartbeat.cpp | 257 | ||||
-rw-r--r-- | db/repl/manager.cpp | 179 | ||||
-rw-r--r-- | db/repl/multicmd.h | 70 | ||||
-rw-r--r-- | db/repl/replset_commands.cpp | 293 | ||||
-rw-r--r-- | db/repl/rs.cpp | 500 | ||||
-rw-r--r-- | db/repl/rs.h | 415 | ||||
-rw-r--r-- | db/repl/rs_config.cpp | 315 | ||||
-rw-r--r-- | db/repl/rs_config.h | 88 | ||||
-rwxr-xr-x | db/repl/rs_exception.h | 17 | ||||
-rw-r--r-- | db/repl/rs_initialsync.cpp | 214 | ||||
-rw-r--r-- | db/repl/rs_initiate.cpp | 238 | ||||
-rw-r--r-- | db/repl/rs_member.h | 91 | ||||
-rw-r--r-- | db/repl/rs_optime.h | 58 | ||||
-rw-r--r-- | db/repl/rs_rollback.cpp | 481 | ||||
-rw-r--r-- | db/repl/rs_sync.cpp | 328 | ||||
-rw-r--r-- | db/repl/test.html | 11 | ||||
-rw-r--r-- | db/repl/testing.js | 42 |
21 files changed, 4469 insertions, 0 deletions
diff --git a/db/repl/connections.h b/db/repl/connections.h new file mode 100644 index 0000000..95defe4 --- /dev/null +++ b/db/repl/connections.h @@ -0,0 +1,91 @@ +// @file + +/* + * Copyright (C) 2010 10gen Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#pragma once + +#include <map> +#include "../../client/dbclient.h" + +namespace mongo { + + /** here we keep a single connection (with reconnect) for a set of hosts, + one each, and allow one user at a time per host. if in use already for that + host, we block. so this is an easy way to keep a 1-deep pool of connections + that many threads can share. + + thread-safe. + + Example: + { + ScopedConn c("foo.acme.com:9999"); + c->runCommand(...); + } + + throws exception on connect error (but fine to try again later with a new + scopedconn object for same host). + */ + class ScopedConn { + public: + /** throws assertions if connect failure etc. */ + ScopedConn(string hostport); + ~ScopedConn(); + DBClientConnection* operator->(); + private: + auto_ptr<scoped_lock> connLock; + static mutex mapMutex; + struct X { + mutex z; + DBClientConnection cc; + X() : z("X"), cc(/*reconnect*/true, 0, /*timeout*/10) { + cc._logLevel = 2; + } + } *x; + typedef map<string,ScopedConn::X*> M; + static M& _map; + }; + + inline ScopedConn::ScopedConn(string hostport) { + bool first = false; + { + scoped_lock lk(mapMutex); + x = _map[hostport]; + if( x == 0 ) { + x = _map[hostport] = new X(); + first = true; + connLock.reset( new scoped_lock(x->z) ); + } + } + if( !first ) { + connLock.reset( new scoped_lock(x->z) ); + return; + } + + // we already locked above... + string err; + x->cc.connect(hostport, err); + } + + inline ScopedConn::~ScopedConn() { + // conLock releases... + } + + inline DBClientConnection* ScopedConn::operator->() { + return &x->cc; + } + +} diff --git a/db/repl/consensus.cpp b/db/repl/consensus.cpp new file mode 100644 index 0000000..4eba17d --- /dev/null +++ b/db/repl/consensus.cpp @@ -0,0 +1,342 @@ +/** +* Copyright (C) 2010 10gen Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "pch.h" +#include "../commands.h" +#include "rs.h" +#include "multicmd.h" + +namespace mongo { + + class CmdReplSetFresh : public ReplSetCommand { + public: + CmdReplSetFresh() : ReplSetCommand("replSetFresh") { } + private: + virtual bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl) { + if( !check(errmsg, result) ) + return false; + + if( cmdObj["set"].String() != theReplSet->name() ) { + errmsg = "wrong repl set name"; + return false; + } + string who = cmdObj["who"].String(); + int cfgver = cmdObj["cfgver"].Int(); + OpTime opTime(cmdObj["opTime"].Date()); + + bool weAreFresher = false; + if( theReplSet->config().version > cfgver ) { + log() << "replSet member " << who << " is not yet aware its cfg version " << cfgver << " is stale" << rsLog; + result.append("info", "config version stale"); + weAreFresher = true; + } + else if( opTime < theReplSet->lastOpTimeWritten ) { + weAreFresher = true; + } + result.appendDate("opTime", theReplSet->lastOpTimeWritten.asDate()); + result.append("fresher", weAreFresher); + return true; + } + } cmdReplSetFresh; + + class CmdReplSetElect : public ReplSetCommand { + public: + CmdReplSetElect() : ReplSetCommand("replSetElect") { } + private: + virtual bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl) { + if( !check(errmsg, result) ) + return false; + //task::lam f = boost::bind(&Consensus::electCmdReceived, &theReplSet->elect, cmdObj, &result); + //theReplSet->mgr->call(f); + theReplSet->elect.electCmdReceived(cmdObj, &result); + return true; + } + } cmdReplSetElect; + + int Consensus::totalVotes() const { + static int complain = 0; + int vTot = rs._self->config().votes; + for( Member *m = rs.head(); m; m=m->next() ) + vTot += m->config().votes; + if( vTot % 2 == 0 && vTot && complain++ == 0 ) + log() << "replSet warning total number of votes is even - considering giving one member an extra vote" << rsLog; + return vTot; + } + + bool Consensus::aMajoritySeemsToBeUp() const { + int vUp = rs._self->config().votes; + for( Member *m = rs.head(); m; m=m->next() ) + vUp += m->hbinfo().up() ? m->config().votes : 0; + return vUp * 2 > totalVotes(); + } + + static const int VETO = -10000; + + const time_t LeaseTime = 30; + + unsigned Consensus::yea(unsigned memberId) /* throws VoteException */ { + Atomic<LastYea>::tran t(ly); + LastYea &ly = t.ref(); + time_t now = time(0); + if( ly.when + LeaseTime >= now && ly.who != memberId ) { + log(1) << "replSet not voting yea for " << memberId << + " voted for " << ly.who << ' ' << now-ly.when << " secs ago" << rsLog; + throw VoteException(); + } + ly.when = now; + ly.who = memberId; + return rs._self->config().votes; + } + + /* we vote for ourself at start of election. once it fails, we can cancel the lease we had in + place instead of leaving it for a long time. + */ + void Consensus::electionFailed(unsigned meid) { + Atomic<LastYea>::tran t(ly); + LastYea &L = t.ref(); + DEV assert( L.who == meid ); // this may not always always hold, so be aware, but adding for now as a quick sanity test + if( L.who == meid ) + L.when = 0; + } + + /* todo: threading **************** !!!!!!!!!!!!!!!! */ + void Consensus::electCmdReceived(BSONObj cmd, BSONObjBuilder* _b) { + BSONObjBuilder& b = *_b; + DEV log() << "replSet received elect msg " << cmd.toString() << rsLog; + else log(2) << "replSet received elect msg " << cmd.toString() << rsLog; + string set = cmd["set"].String(); + unsigned whoid = cmd["whoid"].Int(); + int cfgver = cmd["cfgver"].Int(); + OID round = cmd["round"].OID(); + int myver = rs.config().version; + + int vote = 0; + if( set != rs.name() ) { + log() << "replSet error received an elect request for '" << set << "' but our set name is '" << rs.name() << "'" << rsLog; + + } + else if( myver < cfgver ) { + // we are stale. don't vote + } + else if( myver > cfgver ) { + // they are stale! + log() << "replSet info got stale version # during election" << rsLog; + vote = -10000; + } + else { + try { + vote = yea(whoid); + rs.relinquish(); + log() << "replSet info voting yea for " << whoid << rsLog; + } + catch(VoteException&) { + log() << "replSet voting no already voted for another" << rsLog; + } + } + + b.append("vote", vote); + b.append("round", round); + } + + void ReplSetImpl::_getTargets(list<Target>& L, int& configVersion) { + configVersion = config().version; + for( Member *m = head(); m; m=m->next() ) + if( m->hbinfo().up() ) + L.push_back( Target(m->fullName()) ); + } + + /* config version is returned as it is ok to use this unlocked. BUT, if unlocked, you would need + to check later that the config didn't change. */ + void ReplSetImpl::getTargets(list<Target>& L, int& configVersion) { + if( lockedByMe() ) { + _getTargets(L, configVersion); + return; + } + lock lk(this); + _getTargets(L, configVersion); + } + + /* Do we have the newest data of them all? + @param allUp - set to true if all members are up. Only set if true returned. + @return true if we are freshest. Note we may tie. + */ + bool Consensus::weAreFreshest(bool& allUp, int& nTies) { + const OpTime ord = theReplSet->lastOpTimeWritten; + nTies = 0; + assert( !ord.isNull() ); + BSONObj cmd = BSON( + "replSetFresh" << 1 << + "set" << rs.name() << + "opTime" << Date_t(ord.asDate()) << + "who" << rs._self->fullName() << + "cfgver" << rs._cfg->version ); + list<Target> L; + int ver; + rs.getTargets(L, ver); + multiCommand(cmd, L); + int nok = 0; + allUp = true; + for( list<Target>::iterator i = L.begin(); i != L.end(); i++ ) { + if( i->ok ) { + nok++; + if( i->result["fresher"].trueValue() ) + return false; + OpTime remoteOrd( i->result["opTime"].Date() ); + if( remoteOrd == ord ) + nTies++; + assert( remoteOrd <= ord ); + } + else { + DEV log() << "replSet freshest returns " << i->result.toString() << rsLog; + allUp = false; + } + } + DEV log() << "replSet dev we are freshest of up nodes, nok:" << nok << " nTies:" << nTies << rsLog; + assert( ord <= theReplSet->lastOpTimeWritten ); // <= as this may change while we are working... + return true; + } + + extern time_t started; + + void Consensus::multiCommand(BSONObj cmd, list<Target>& L) { + assert( !rs.lockedByMe() ); + mongo::multiCommand(cmd, L); + } + + void Consensus::_electSelf() { + if( time(0) < steppedDown ) + return; + + { + const OpTime ord = theReplSet->lastOpTimeWritten; + if( ord == 0 ) { + log() << "replSet info not trying to elect self, do not yet have a complete set of data from any point in time" << rsLog; + return; + } + } + + bool allUp; + int nTies; + if( !weAreFreshest(allUp, nTies) ) { + log() << "replSet info not electing self, we are not freshest" << rsLog; + return; + } + + rs.sethbmsg("",9); + + if( !allUp && time(0) - started < 60 * 5 ) { + /* the idea here is that if a bunch of nodes bounce all at once, we don't want to drop data + if we don't have to -- we'd rather be offline and wait a little longer instead + todo: make this configurable. + */ + rs.sethbmsg("not electing self, not all members up and we have been up less than 5 minutes"); + return; + } + + Member& me = *rs._self; + + if( nTies ) { + /* tie? we then randomly sleep to try to not collide on our voting. */ + /* todo: smarter. */ + if( me.id() == 0 || sleptLast ) { + // would be fine for one node not to sleep + // todo: biggest / highest priority nodes should be the ones that get to not sleep + } else { + assert( !rs.lockedByMe() ); // bad to go to sleep locked + unsigned ms = ((unsigned) rand()) % 1000 + 50; + DEV log() << "replSet tie " << nTies << " sleeping a little " << ms << "ms" << rsLog; + sleptLast = true; + sleepmillis(ms); + throw RetryAfterSleepException(); + } + } + sleptLast = false; + + time_t start = time(0); + unsigned meid = me.id(); + int tally = yea( meid ); + bool success = false; + try { + log() << "replSet info electSelf " << meid << rsLog; + + BSONObj electCmd = BSON( + "replSetElect" << 1 << + "set" << rs.name() << + "who" << me.fullName() << + "whoid" << me.hbinfo().id() << + "cfgver" << rs._cfg->version << + "round" << OID::gen() /* this is just for diagnostics */ + ); + + int configVersion; + list<Target> L; + rs.getTargets(L, configVersion); + multiCommand(electCmd, L); + + { + RSBase::lock lk(&rs); + for( list<Target>::iterator i = L.begin(); i != L.end(); i++ ) { + DEV log() << "replSet elect res: " << i->result.toString() << rsLog; + if( i->ok ) { + int v = i->result["vote"].Int(); + tally += v; + } + } + if( tally*2 <= totalVotes() ) { + log() << "replSet couldn't elect self, only received " << tally << " votes" << rsLog; + } + else if( time(0) - start > 30 ) { + // defensive; should never happen as we have timeouts on connection and operation for our conn + log() << "replSet too much time passed during our election, ignoring result" << rsLog; + } + else if( configVersion != rs.config().version ) { + log() << "replSet config version changed during our election, ignoring result" << rsLog; + } + else { + /* succeeded. */ + log(1) << "replSet election succeeded, assuming primary role" << rsLog; + success = true; + rs.assumePrimary(); + } + } + } catch( std::exception& ) { + if( !success ) electionFailed(meid); + throw; + } + if( !success ) electionFailed(meid); + } + + void Consensus::electSelf() { + assert( !rs.lockedByMe() ); + assert( !rs.myConfig().arbiterOnly ); + try { + _electSelf(); + } + catch(RetryAfterSleepException&) { + throw; + } + catch(VoteException& ) { + log() << "replSet not trying to elect self as responded yea to someone else recently" << rsLog; + } + catch(DBException& e) { + log() << "replSet warning caught unexpected exception in electSelf() " << e.toString() << rsLog; + } + catch(...) { + log() << "replSet warning caught unexpected exception in electSelf()" << rsLog; + } + } + +} diff --git a/db/repl/health.cpp b/db/repl/health.cpp new file mode 100644 index 0000000..b0be25f --- /dev/null +++ b/db/repl/health.cpp @@ -0,0 +1,389 @@ +/** +* Copyright (C) 2008 10gen Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful,b +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "pch.h" +#include "rs.h" +#include "health.h" +#include "../../util/background.h" +#include "../../client/dbclient.h" +#include "../commands.h" +#include "../../util/concurrency/value.h" +#include "../../util/concurrency/task.h" +#include "../../util/mongoutils/html.h" +#include "../../util/goodies.h" +#include "../../util/ramlog.h" +#include "../helpers/dblogger.h" +#include "connections.h" +#include "../../util/unittest.h" +#include "../dbhelpers.h" + +namespace mongo { + /* decls for connections.h */ + ScopedConn::M& ScopedConn::_map = *(new ScopedConn::M()); + mutex ScopedConn::mapMutex("ScopedConn::mapMutex"); +} + +namespace mongo { + + using namespace mongoutils::html; + using namespace bson; + + static RamLog _rsLog; + Tee *rsLog = &_rsLog; + + string ago(time_t t) { + if( t == 0 ) return ""; + + time_t x = time(0) - t; + stringstream s; + if( x < 180 ) { + s << x << " sec"; + if( x != 1 ) s << 's'; + } + else if( x < 3600 ) { + s.precision(2); + s << x / 60.0 << " mins"; + } + else { + s.precision(2); + s << x / 3600.0 << " hrs"; + } + return s.str(); + } + + void Member::summarizeMember(stringstream& s) const { + s << tr(); + { + stringstream u; + u << "http://" << h().host() << ':' << (h().port() + 1000) << "/_replSet"; + s << td( a(u.str(), "", fullName()) ); + } + s << td( id() ); + double h = hbinfo().health; + bool ok = h > 0; + s << td(red(str::stream() << h,h == 0)); + s << td(ago(hbinfo().upSince)); + bool never = false; + { + string h; + time_t hb = hbinfo().lastHeartbeat; + if( hb == 0 ) { + h = "never"; + never = true; + } + else h = ago(hb) + " ago"; + s << td(h); + } + s << td(config().votes); + { + string stateText = ReplSet::stateAsStr(state()); + if( ok || stateText.empty() ) + s << td(stateText); // text blank if we've never connected + else + s << td( grey(str::stream() << "(was " << ReplSet::stateAsStr(state()) << ')', true) ); + } + s << td( grey(hbinfo().lastHeartbeatMsg,!ok) ); + stringstream q; + q << "/_replSetOplog?" << id(); + s << td( a(q.str(), "", never ? "?" : hbinfo().opTime.toString()) ); + if( hbinfo().skew > INT_MIN ) { + s << td( grey(str::stream() << hbinfo().skew,!ok) ); + } else + s << td(""); + s << _tr(); + } + + string ReplSetImpl::stateAsHtml(MemberState s) { + if( s.s == MemberState::RS_STARTUP ) return a("", "serving still starting up, or still trying to initiate the set", "STARTUP"); + if( s.s == MemberState::RS_PRIMARY ) return a("", "this server thinks it is primary", "PRIMARY"); + if( s.s == MemberState::RS_SECONDARY ) return a("", "this server thinks it is a secondary (slave mode)", "SECONDARY"); + if( s.s == MemberState::RS_RECOVERING ) return a("", "recovering/resyncing; after recovery usually auto-transitions to secondary", "RECOVERING"); + if( s.s == MemberState::RS_FATAL ) return a("", "something bad has occurred and server is not completely offline with regard to the replica set. fatal error.", "RS_FATAL"); + if( s.s == MemberState::RS_STARTUP2 ) return a("", "loaded config, still determining who is primary", "RS_STARTUP2"); + if( s.s == MemberState::RS_ARBITER ) return a("", "this server is an arbiter only", "ARBITER"); + if( s.s == MemberState::RS_DOWN ) return a("", "member is down, slow, or unreachable", "DOWN"); + return ""; + } + + string ReplSetImpl::stateAsStr(MemberState s) { + if( s.s == MemberState::RS_STARTUP ) return "STARTUP"; + if( s.s == MemberState::RS_PRIMARY ) return "PRIMARY"; + if( s.s == MemberState::RS_SECONDARY ) return "SECONDARY"; + if( s.s == MemberState::RS_RECOVERING ) return "RECOVERING"; + if( s.s == MemberState::RS_FATAL ) return "FATAL"; + if( s.s == MemberState::RS_STARTUP2 ) return "STARTUP2"; + if( s.s == MemberState::RS_ARBITER ) return "ARBITER"; + if( s.s == MemberState::RS_DOWN ) return "DOWN"; + return ""; + } + + extern time_t started; + + // oplogdiags in web ui + static void say(stringstream&ss, const bo& op) { + ss << "<tr>"; + + set<string> skip; + be e = op["ts"]; + if( e.type() == Date || e.type() == Timestamp ) { + OpTime ot = e._opTime(); + ss << td( time_t_to_String_short( ot.getSecs() ) ); + ss << td( ot.toString() ); + skip.insert("ts"); + } + else ss << td("?") << td("?"); + + e = op["h"]; + if( e.type() == NumberLong ) { + ss << "<td>" << hex << e.Long() << "</td>\n"; + skip.insert("h"); + } else + ss << td("?"); + + ss << td(op["op"].valuestrsafe()); + ss << td(op["ns"].valuestrsafe()); + skip.insert("op"); + skip.insert("ns"); + + ss << "<td>"; + for( bo::iterator i(op); i.more(); ) { + be e = i.next(); + if( skip.count(e.fieldName()) ) continue; + ss << e.toString() << ' '; + } + ss << "</td>"; + + ss << "</tr>"; + ss << '\n'; + } + + void ReplSetImpl::_getOplogDiagsAsHtml(unsigned server_id, stringstream& ss) const { + const Member *m = findById(server_id); + if( m == 0 ) { + ss << "Error : can't find a member with id: " << server_id << '\n'; + return; + } + + ss << p("Server : " + m->fullName() + "<br>ns : " + rsoplog ); + + //const bo fields = BSON( "o" << false << "o2" << false ); + const bo fields; + + ScopedConn conn(m->fullName()); + + auto_ptr<DBClientCursor> c = conn->query(rsoplog, Query().sort("$natural",1), 20, 0, &fields); + if( c.get() == 0 ) { + ss << "couldn't query " << rsoplog; + return; + } + static const char *h[] = {"ts","optime", "h","op","ns","rest",0}; + + ss << "<style type=\"text/css\" media=\"screen\">" + "table { font-size:75% }\n" +// "th { background-color:#bbb; color:#000 }\n" +// "td,th { padding:.25em }\n" + "</style>\n"; + + ss << table(h, true); + //ss << "<pre>\n"; + int n = 0; + OpTime otFirst; + OpTime otLast; + OpTime otEnd; + while( c->more() ) { + bo o = c->next(); + otLast = o["ts"]._opTime(); + if( otFirst.isNull() ) + otFirst = otLast; + say(ss, o); + n++; + } + if( n == 0 ) { + ss << rsoplog << " is empty\n"; + } + else { + auto_ptr<DBClientCursor> c = conn->query(rsoplog, Query().sort("$natural",-1), 20, 0, &fields); + if( c.get() == 0 ) { + ss << "couldn't query [2] " << rsoplog; + return; + } + string x; + bo o = c->next(); + otEnd = o["ts"]._opTime(); + while( 1 ) { + stringstream z; + if( o["ts"]._opTime() == otLast ) + break; + say(z, o); + x = z.str() + x; + if( !c->more() ) + break; + o = c->next(); + } + if( !x.empty() ) { + ss << "<tr><td>...</td><td>...</td><td>...</td><td>...</td><td>...</td></tr>\n" << x; + //ss << "\n...\n\n" << x; + } + } + ss << _table(); + ss << p(time_t_to_String_short(time(0)) + " current time"); + + //ss << "</pre>\n"; + + if( !otEnd.isNull() ) { + ss << "<p>Log length in time: "; + unsigned d = otEnd.getSecs() - otFirst.getSecs(); + double h = d / 3600.0; + ss.precision(3); + if( h < 72 ) + ss << h << " hours"; + else + ss << h / 24.0 << " days"; + ss << "</p>\n"; + } + + } + + void ReplSetImpl::_summarizeAsHtml(stringstream& s) const { + s << table(0, false); + s << tr("Set name:", _name); + s << tr("Majority up:", elect.aMajoritySeemsToBeUp()?"yes":"no" ); + s << _table(); + + const char *h[] = {"Member", + "<a title=\"member id in the replset config\">id</a>", + "Up", + "<a title=\"length of time we have been continuously connected to the other member with no reconnects (for self, shows uptime)\">cctime</a>", + "<a title=\"when this server last received a heartbeat response - includes error code responses\">Last heartbeat</a>", + "Votes", "State", "Status", + "<a title=\"how up to date this server is. this value polled every few seconds so actually lag is typically much lower than value shown here.\">optime</a>", + "<a title=\"Clock skew in seconds relative to this server. Informational; server clock variances will make the diagnostics hard to read, but otherwise are benign..\">skew</a>", + 0}; + s << table(h); + + /* this is to sort the member rows by their ordinal _id, so they show up in the same + order on all the different web ui's; that is less confusing for the operator. */ + map<int,string> mp; + + string myMinValid; + try { + readlocktry lk("local.replset.minvalid", 300); + if( lk.got() ) { + BSONObj mv; + if( Helpers::getSingleton("local.replset.minvalid", mv) ) { + myMinValid = "minvalid:" + mv["ts"]._opTime().toString(); + } + } + else myMinValid = "."; + } + catch(...) { + myMinValid = "exception fetching minvalid"; + } + + { + stringstream s; + /* self row */ + s << tr() << td(_self->fullName() + " (me)") << + td(_self->id()) << + td("1") << //up + td(ago(started)) << + td("") << // last heartbeat + td(ToString(_self->config().votes)) << + td(stateAsHtml(box.getState())); + s << td( _hbmsg ); + stringstream q; + q << "/_replSetOplog?" << _self->id(); + s << td( a(q.str(), myMinValid, theReplSet->lastOpTimeWritten.toString()) ); + s << td(""); // skew + s << _tr(); + mp[_self->hbinfo().id()] = s.str(); + } + Member *m = head(); + while( m ) { + stringstream s; + m->summarizeMember(s); + mp[m->hbinfo().id()] = s.str(); + m = m->next(); + } + + for( map<int,string>::const_iterator i = mp.begin(); i != mp.end(); i++ ) + s << i->second; + s << _table(); + } + + + void fillRsLog(stringstream& s) { + _rsLog.toHTML( s ); + } + + const Member* ReplSetImpl::findById(unsigned id) const { + if( id == _self->id() ) return _self; + for( Member *m = head(); m; m = m->next() ) + if( m->id() == id ) + return m; + return 0; + } + + void ReplSetImpl::_summarizeStatus(BSONObjBuilder& b) const { + vector<BSONObj> v; + + // add self + { + HostAndPort h(getHostName(), cmdLine.port); + + BSONObjBuilder bb; + bb.append("_id", (int) _self->id()); + bb.append("name", h.toString()); + bb.append("health", 1.0); + bb.append("state", (int) box.getState().s); + string s = _self->lhb(); + if( !s.empty() ) + bb.append("errmsg", s); + bb.append("self", true); + v.push_back(bb.obj()); + } + + Member *m =_members.head(); + while( m ) { + BSONObjBuilder bb; + bb.append("_id", (int) m->id()); + bb.append("name", m->fullName()); + bb.append("health", m->hbinfo().health); + bb.append("state", (int) m->state().s); + bb.append("uptime", (unsigned) (m->hbinfo().upSince ? (time(0)-m->hbinfo().upSince) : 0)); + bb.appendTimeT("lastHeartbeat", m->hbinfo().lastHeartbeat); + string s = m->lhb(); + if( !s.empty() ) + bb.append("errmsg", s); + v.push_back(bb.obj()); + m = m->next(); + } + sort(v.begin(), v.end()); + b.append("set", name()); + b.appendTimeT("date", time(0)); + b.append("myState", box.getState().s); + b.append("members", v); + } + + static struct Test : public UnitTest { + void run() { + HealthOptions a,b; + assert( a == b ); + assert( a.isDefault() ); + } + } test; + +} diff --git a/db/repl/health.h b/db/repl/health.h new file mode 100644 index 0000000..8b1005e --- /dev/null +++ b/db/repl/health.h @@ -0,0 +1,50 @@ +// replset.h + +/** +* Copyright (C) 2008 10gen Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#pragma once + +namespace mongo { + + /* throws */ + bool requestHeartbeat(string setname, string fromHost, string memberFullName, BSONObj& result, int myConfigVersion, int& theirConfigVersion, bool checkEmpty = false); + + struct HealthOptions { + HealthOptions() { + heartbeatSleepMillis = 2000; + heartbeatTimeoutMillis = 10000; + heartbeatConnRetries = 3; + } + + bool isDefault() const { return *this == HealthOptions(); } + + // see http://www.mongodb.org/display/DOCS/Replica+Set+Internals + unsigned heartbeatSleepMillis; + unsigned heartbeatTimeoutMillis; + unsigned heartbeatConnRetries ; + + void check() { + uassert(13112, "bad replset heartbeat option", heartbeatSleepMillis >= 10); + uassert(13113, "bad replset heartbeat option", heartbeatTimeoutMillis >= 10); + } + + bool operator==(const HealthOptions& r) const { + return heartbeatSleepMillis==r.heartbeatSleepMillis && heartbeatTimeoutMillis==r.heartbeatTimeoutMillis && heartbeatConnRetries==heartbeatConnRetries; + } + }; + +} diff --git a/db/repl/heartbeat.cpp b/db/repl/heartbeat.cpp new file mode 100644 index 0000000..78ce5d1 --- /dev/null +++ b/db/repl/heartbeat.cpp @@ -0,0 +1,257 @@ +/** +* Copyright (C) 2008 10gen Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful,b +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "pch.h" +#include "rs.h" +#include "health.h" +#include "../../util/background.h" +#include "../../client/dbclient.h" +#include "../commands.h" +#include "../../util/concurrency/value.h" +#include "../../util/concurrency/task.h" +#include "../../util/concurrency/msg.h" +#include "../../util/mongoutils/html.h" +#include "../../util/goodies.h" +#include "../../util/ramlog.h" +#include "../helpers/dblogger.h" +#include "connections.h" +#include "../../util/unittest.h" +#include "../instance.h" + +namespace mongo { + + using namespace bson; + + extern bool replSetBlind; + + // hacky + string *discoveredSeed = 0; + + /* { replSetHeartbeat : <setname> } */ + class CmdReplSetHeartbeat : public ReplSetCommand { + public: + virtual bool adminOnly() const { return false; } + CmdReplSetHeartbeat() : ReplSetCommand("replSetHeartbeat") { } + virtual bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl) { + if( replSetBlind ) + return false; + + /* we don't call ReplSetCommand::check() here because heartbeat + checks many things that are pre-initialization. */ + if( !replSet ) { + errmsg = "not running with --replSet"; + return false; + } + if( cmdObj["pv"].Int() != 1 ) { + errmsg = "incompatible replset protocol version"; + return false; + } + { + string s = string(cmdObj.getStringField("replSetHeartbeat")); + if( cmdLine.ourSetName() != s ) { + errmsg = "repl set names do not match"; + log() << "cmdline: " << cmdLine._replSet << endl; + log() << "s: " << s << endl; + result.append("mismatch", true); + return false; + } + } + + result.append("rs", true); + if( cmdObj["checkEmpty"].trueValue() ) { + result.append("hasData", replHasDatabases()); + } + if( theReplSet == 0 ) { + string from( cmdObj.getStringField("from") ); + if( !from.empty() && discoveredSeed == 0 ) { + discoveredSeed = new string(from); + } + errmsg = "still initializing"; + return false; + } + + if( theReplSet->name() != cmdObj.getStringField("replSetHeartbeat") ) { + errmsg = "repl set names do not match (2)"; + result.append("mismatch", true); + return false; + } + result.append("set", theReplSet->name()); + result.append("state", theReplSet->state().s); + result.append("hbmsg", theReplSet->hbmsg()); + result.append("time", (int) time(0)); + result.appendDate("opTime", theReplSet->lastOpTimeWritten.asDate()); + int v = theReplSet->config().version; + result.append("v", v); + if( v > cmdObj["v"].Int() ) + result << "config" << theReplSet->config().asBson(); + + return true; + } + } cmdReplSetHeartbeat; + + /* throws dbexception */ + bool requestHeartbeat(string setName, string from, string memberFullName, BSONObj& result, int myCfgVersion, int& theirCfgVersion, bool checkEmpty) { + if( replSetBlind ) { + //sleepmillis( rand() ); + return false; + } + + BSONObj cmd = BSON( "replSetHeartbeat" << setName << "v" << myCfgVersion << "pv" << 1 << "checkEmpty" << checkEmpty << "from" << from ); + + // we might be talking to ourself - generally not a great idea to do outbound waiting calls in a write lock + assert( !dbMutex.isWriteLocked() ); + + // these are slow (multisecond to respond), so generally we don't want to be locked, at least not without + // thinking acarefully about it first. + assert( theReplSet == 0 || !theReplSet->lockedByMe() ); + + ScopedConn conn(memberFullName); + return conn->runCommand("admin", cmd, result); + } + + /* poll every other set member to check its status */ + class ReplSetHealthPollTask : public task::Task { + HostAndPort h; + HeartbeatInfo m; + public: + ReplSetHealthPollTask(const HostAndPort& hh, const HeartbeatInfo& mm) : h(hh), m(mm) { } + + string name() { return "ReplSetHealthPollTask"; } + void doWork() { + if ( !theReplSet ) { + log(2) << "theReplSet not initialized yet, skipping health poll this round" << rsLog; + return; + } + + HeartbeatInfo mem = m; + HeartbeatInfo old = mem; + try { + BSONObj info; + int theirConfigVersion = -10000; + + time_t before = time(0); + + bool ok = requestHeartbeat(theReplSet->name(), theReplSet->selfFullName(), h.toString(), info, theReplSet->config().version, theirConfigVersion); + + time_t after = mem.lastHeartbeat = time(0); // we set this on any response - we don't get this far if couldn't connect because exception is thrown + + try { + mem.skew = 0; + long long t = info["time"].Long(); + if( t > after ) + mem.skew = (int) (t - after); + else if( t < before ) + mem.skew = (int) (t - before); // negative + } + catch(...) { + mem.skew = INT_MIN; + } + + { + be state = info["state"]; + if( state.ok() ) + mem.hbstate = MemberState(state.Int()); + } + if( ok ) { + if( mem.upSince == 0 ) { + log() << "replSet info " << h.toString() << " is now up" << rsLog; + mem.upSince = mem.lastHeartbeat; + } + mem.health = 1.0; + mem.lastHeartbeatMsg = info["hbmsg"].String(); + if( info.hasElement("opTime") ) + mem.opTime = info["opTime"].Date(); + + be cfg = info["config"]; + if( cfg.ok() ) { + // received a new config + boost::function<void()> f = + boost::bind(&Manager::msgReceivedNewConfig, theReplSet->mgr, cfg.Obj().copy()); + theReplSet->mgr->send(f); + } + } + else { + down(mem, info.getStringField("errmsg")); + } + } + catch(...) { + down(mem, "connect/transport error"); + } + m = mem; + + theReplSet->mgr->send( boost::bind(&ReplSet::msgUpdateHBInfo, theReplSet, mem) ); + + static time_t last = 0; + time_t now = time(0); + if( mem.changed(old) || now-last>4 ) { + last = now; + theReplSet->mgr->send( boost::bind(&Manager::msgCheckNewState, theReplSet->mgr) ); + } + } + + private: + void down(HeartbeatInfo& mem, string msg) { + mem.health = 0.0; + if( mem.upSince ) { + mem.upSince = 0; + log() << "replSet info " << h.toString() << " is now down (or slow to respond)" << rsLog; + } + mem.lastHeartbeatMsg = msg; + } + }; + + void ReplSetImpl::endOldHealthTasks() { + unsigned sz = healthTasks.size(); + for( set<ReplSetHealthPollTask*>::iterator i = healthTasks.begin(); i != healthTasks.end(); i++ ) + (*i)->halt(); + healthTasks.clear(); + if( sz ) + DEV log() << "replSet debug: cleared old tasks " << sz << endl; + } + + void ReplSetImpl::startHealthTaskFor(Member *m) { + ReplSetHealthPollTask *task = new ReplSetHealthPollTask(m->h(), m->hbinfo()); + healthTasks.insert(task); + task::repeat(task, 2000); + } + + void startSyncThread(); + + /** called during repl set startup. caller expects it to return fairly quickly. + note ReplSet object is only created once we get a config - so this won't run + until the initiation. + */ + void ReplSetImpl::startThreads() { + task::fork(mgr); + + /*Member* m = _members.head(); + while( m ) { + ReplSetHealthPollTask *task = new ReplSetHealthPollTask(m->h(), m->hbinfo()); + healthTasks.insert(task); + task::repeat(shared_ptr<task::Task>(task), 2000); + m = m->next(); + }*/ + + mgr->send( boost::bind(&Manager::msgCheckNewState, theReplSet->mgr) ); + + boost::thread t(startSyncThread); + } + +} + +/* todo: + stop bg job and delete on removefromset +*/ diff --git a/db/repl/manager.cpp b/db/repl/manager.cpp new file mode 100644 index 0000000..e870688 --- /dev/null +++ b/db/repl/manager.cpp @@ -0,0 +1,179 @@ +/* @file manager.cpp +*/ + +/** +* Copyright (C) 2008 10gen Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful,b +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "pch.h" +#include "rs.h" +#include "../client.h" + +namespace mongo { + + enum { + NOPRIMARY = -2, + SELFPRIMARY = -1 + }; + + /* check members OTHER THAN US to see if they think they are primary */ + const Member * Manager::findOtherPrimary() { + Member *m = rs->head(); + Member *p = 0; + while( m ) { + if( m->state().primary() && m->hbinfo().up() ) { + if( p ) throw "twomasters"; // our polling is asynchronous, so this is often ok. + p = m; + } + m = m->next(); + } + if( p ) + noteARemoteIsPrimary(p); + return p; + } + + Manager::Manager(ReplSetImpl *_rs) : + task::Server("rs Manager"), rs(_rs), busyWithElectSelf(false), _primary(NOPRIMARY) + { + } + + Manager::~Manager() { + log() << "ERROR: ~Manager should never be called" << rsLog; + rs->mgr = 0; + assert(false); + } + + void Manager::starting() { + Client::initThread("rs Manager"); + } + + void Manager::noteARemoteIsPrimary(const Member *m) { + if( rs->box.getPrimary() == m ) + return; + rs->_self->lhb() = ""; + rs->box.set(rs->iAmArbiterOnly() ? MemberState::RS_ARBITER : MemberState::RS_RECOVERING, m); + } + + /** called as the health threads get new results */ + void Manager::msgCheckNewState() { + { + theReplSet->assertValid(); + rs->assertValid(); + + RSBase::lock lk(rs); + + if( busyWithElectSelf ) return; + + const Member *p = rs->box.getPrimary(); + if( p && p != rs->_self ) { + if( !p->hbinfo().up() || + !p->hbinfo().hbstate.primary() ) + { + p = 0; + rs->box.setOtherPrimary(0); + } + } + + const Member *p2; + try { p2 = findOtherPrimary(); } + catch(string s) { + /* two other nodes think they are primary (asynchronously polled) -- wait for things to settle down. */ + log() << "replSet warning DIAG 2 primary" << s << rsLog; + return; + } + + if( p2 ) { + /* someone else thinks they are primary. */ + if( p == p2 ) { + // we thought the same; all set. + return; + } + if( p == 0 ) { + noteARemoteIsPrimary(p2); + return; + } + // todo xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx + if( p != rs->_self ) { + // switch primary from oldremotep->newremotep2 + noteARemoteIsPrimary(p2); + return; + } + /* we thought we were primary, yet now someone else thinks they are. */ + if( !rs->elect.aMajoritySeemsToBeUp() ) { + /* we can't see a majority. so the other node is probably the right choice. */ + noteARemoteIsPrimary(p2); + return; + } + /* ignore for now, keep thinking we are master. + this could just be timing (we poll every couple seconds) or could indicate + a problem? if it happens consistently for a duration of time we should + alert the sysadmin. + */ + return; + } + + /* didn't find anyone who wants to be primary */ + + if( p ) { + /* we are already primary */ + + if( p != rs->_self ) { + rs->sethbmsg("error p != rs->self in checkNewState"); + log() << "replSet " << p->fullName() << rsLog; + log() << "replSet " << rs->_self->fullName() << rsLog; + return; + } + + if( !rs->elect.aMajoritySeemsToBeUp() ) { + log() << "replSet can't see a majority of the set, relinquishing primary" << rsLog; + rs->relinquish(); + } + + return; + } + + if( !rs->iAmPotentiallyHot() ) // if not we never try to be primary + return; + + /* TODO : CHECK PRIORITY HERE. can't be elected if priority zero. */ + + /* no one seems to be primary. shall we try to elect ourself? */ + if( !rs->elect.aMajoritySeemsToBeUp() ) { + static time_t last; + static int n; + int ll = 0; + if( ++n > 5 ) ll++; + if( last + 60 > time(0 ) ) ll++; + log(ll) << "replSet can't see a majority, will not try to elect self" << rsLog; + last = time(0); + return; + } + + busyWithElectSelf = true; // don't try to do further elections & such while we are already working on one. + } + try { + rs->elect.electSelf(); + } + catch(RetryAfterSleepException&) { + /* we want to process new inbounds before trying this again. so we just put a checkNewstate in the queue for eval later. */ + requeue(); + } + catch(...) { + log() << "replSet error unexpected assertion in rs manager" << rsLog; + } + busyWithElectSelf = false; + } + +} diff --git a/db/repl/multicmd.h b/db/repl/multicmd.h new file mode 100644 index 0000000..61c9b5f --- /dev/null +++ b/db/repl/multicmd.h @@ -0,0 +1,70 @@ +// @file multicmd.h + +/** +* Copyright (C) 2008 10gen Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#pragma once + +#include "../../util/background.h" +#include "connections.h" + +namespace mongo { + + struct Target { + Target(string hostport) : toHost(hostport), ok(false) { } + Target() : ok(false) { } + string toHost; + bool ok; + BSONObj result; + }; + + /* -- implementation ------------- */ + + class _MultiCommandJob : public BackgroundJob { + public: + BSONObj& cmd; + Target& d; + _MultiCommandJob(BSONObj& _cmd, Target& _d) : cmd(_cmd), d(_d) { } + private: + string name() { return "MultiCommandJob"; } + void run() { + try { + ScopedConn c(d.toHost); + d.ok = c->runCommand("admin", cmd, d.result); + } + catch(DBException&) { + DEV log() << "dev caught dbexception on multiCommand " << d.toHost << rsLog; + } + } + }; + + inline void multiCommand(BSONObj cmd, list<Target>& L) { + typedef shared_ptr<_MultiCommandJob> P; + list<P> jobs; + list<BackgroundJob *> _jobs; + + for( list<Target>::iterator i = L.begin(); i != L.end(); i++ ) { + Target& d = *i; + _MultiCommandJob *j = new _MultiCommandJob(cmd, d); + jobs.push_back(P(j)); + _jobs.push_back(j); + } + + BackgroundJob::go(_jobs); + BackgroundJob::wait(_jobs,5); + } + +} diff --git a/db/repl/replset_commands.cpp b/db/repl/replset_commands.cpp new file mode 100644 index 0000000..f8f46d5 --- /dev/null +++ b/db/repl/replset_commands.cpp @@ -0,0 +1,293 @@ +/** +* Copyright (C) 2008 10gen Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "pch.h" +#include "../cmdline.h" +#include "../commands.h" +#include "health.h" +#include "rs.h" +#include "rs_config.h" +#include "../dbwebserver.h" +#include "../../util/mongoutils/html.h" +#include "../../client/dbclient.h" + +namespace mongo { + + void checkMembersUpForConfigChange(const ReplSetConfig& cfg, bool initial); + + /* commands in other files: + replSetHeartbeat - health.cpp + replSetInitiate - rs_mod.cpp + */ + + bool replSetBlind = false; + + class CmdReplSetTest : public ReplSetCommand { + public: + virtual void help( stringstream &help ) const { + help << "Just for testing : do not use.\n"; + } + CmdReplSetTest() : ReplSetCommand("replSetTest") { } + virtual bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl) { + if( !check(errmsg, result) ) + return false; + if( cmdObj.hasElement("blind") ) { + replSetBlind = cmdObj.getBoolField("blind"); + log() << "replSet info replSetTest command received, replSetBlind=" << replSetBlind << rsLog; + return true; + } + return false; + } + } cmdReplSetTest; + + class CmdReplSetGetRBID : public ReplSetCommand { + public: + int rbid; + virtual void help( stringstream &help ) const { + help << "internal"; + } + CmdReplSetGetRBID() : ReplSetCommand("replSetGetRBID") { + rbid = (int) curTimeMillis(); + } + virtual bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl) { + if( !check(errmsg, result) ) + return false; + result.append("rbid",rbid); + return true; + } + } cmdReplSetRBID; + + using namespace bson; + int getRBID(DBClientConnection *c) { + bo info; + c->simpleCommand("admin", &info, "replSetGetRBID"); + return info["rbid"].numberInt(); + } + + class CmdReplSetGetStatus : public ReplSetCommand { + public: + virtual void help( stringstream &help ) const { + help << "Report status of a replica set from the POV of this server\n"; + help << "{ replSetGetStatus : 1 }"; + help << "\nhttp://www.mongodb.org/display/DOCS/Replica+Set+Commands"; + } + CmdReplSetGetStatus() : ReplSetCommand("replSetGetStatus", true) { } + virtual bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl) { + if( !check(errmsg, result) ) + return false; + theReplSet->summarizeStatus(result); + return true; + } + } cmdReplSetGetStatus; + + class CmdReplSetReconfig : public ReplSetCommand { + RWLock mutex; /* we don't need rw but we wanted try capability. :-( */ + public: + virtual void help( stringstream &help ) const { + help << "Adjust configuration of a replica set\n"; + help << "{ replSetReconfig : config_object }"; + help << "\nhttp://www.mongodb.org/display/DOCS/Replica+Set+Commands"; + } + CmdReplSetReconfig() : ReplSetCommand("replSetReconfig"), mutex("rsreconfig") { } + virtual bool run(const string& a, BSONObj& b, string& errmsg, BSONObjBuilder& c, bool d) { + try { + rwlock_try_write lk(mutex); + return _run(a,b,errmsg,c,d); + } + catch(rwlock_try_write::exception&) { } + errmsg = "a replSetReconfig is already in progress"; + return false; + } + private: + bool _run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl) { + if( !check(errmsg, result) ) + return false; + if( !theReplSet->box.getState().primary() ) { + errmsg = "replSetReconfig command must be sent to the current replica set primary."; + return false; + } + + { + // just make sure we can get a write lock before doing anything else. we'll reacquire one + // later. of course it could be stuck then, but this check lowers the risk if weird things + // are up - we probably don't want a change to apply 30 minutes after the initial attempt. + time_t t = time(0); + writelock lk(""); + if( time(0)-t > 20 ) { + errmsg = "took a long time to get write lock, so not initiating. Initiate when server less busy?"; + return false; + } + } + + if( cmdObj["replSetReconfig"].type() != Object ) { + errmsg = "no configuration specified"; + return false; + } + + /** TODO + Support changes when a majority, but not all, members of a set are up. + Determine what changes should not be allowed as they would cause erroneous states. + What should be possible when a majority is not up? + */ + try { + ReplSetConfig newConfig(cmdObj["replSetReconfig"].Obj()); + + log() << "replSet replSetReconfig config object parses ok, " << newConfig.members.size() << " members specified" << rsLog; + + if( !ReplSetConfig::legalChange(theReplSet->getConfig(), newConfig, errmsg) ) { + return false; + } + + checkMembersUpForConfigChange(newConfig,false); + + log() << "replSet replSetReconfig [2]" << rsLog; + + theReplSet->haveNewConfig(newConfig, true); + ReplSet::startupStatusMsg = "replSetReconfig'd"; + } + catch( DBException& e ) { + log() << "replSet replSetReconfig exception: " << e.what() << rsLog; + throw; + } + + return true; + } + } cmdReplSetReconfig; + + class CmdReplSetFreeze : public ReplSetCommand { + public: + virtual void help( stringstream &help ) const { + help << "Enable / disable failover for the set - locks current primary as primary even if issues occur.\nFor use during system maintenance.\n"; + help << "{ replSetFreeze : <bool> }"; + help << "\nhttp://www.mongodb.org/display/DOCS/Replica+Set+Commands"; + } + + CmdReplSetFreeze() : ReplSetCommand("replSetFreeze") { } + virtual bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl) { + if( !check(errmsg, result) ) + return false; + errmsg = "not yet implemented"; /*TODO*/ + return false; + } + } cmdReplSetFreeze; + + class CmdReplSetStepDown: public ReplSetCommand { + public: + virtual void help( stringstream &help ) const { + help << "Step down as primary. Will not try to reelect self or 1 minute.\n"; + help << "(If another member with same priority takes over in the meantime, it will stay primary.)\n"; + help << "http://www.mongodb.org/display/DOCS/Replica+Set+Commands"; + } + + CmdReplSetStepDown() : ReplSetCommand("replSetStepDown") { } + virtual bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl) { + if( !check(errmsg, result) ) + return false; + if( !theReplSet->box.getState().primary() ) { + errmsg = "not primary so can't step down"; + return false; + } + return theReplSet->stepDown(); + } + } cmdReplSetStepDown; + + using namespace bson; + using namespace mongoutils::html; + extern void fillRsLog(stringstream&); + + class ReplSetHandler : public DbWebHandler { + public: + ReplSetHandler() : DbWebHandler( "_replSet" , 1 , true ){} + + virtual bool handles( const string& url ) const { + return startsWith( url , "/_replSet" ); + } + + virtual void handle( const char *rq, string url, + string& responseMsg, int& responseCode, + vector<string>& headers, const SockAddr &from ){ + + string s = str::after(url, "/_replSetOplog?"); + if( !s.empty() ) + responseMsg = _replSetOplog(s); + else + responseMsg = _replSet(); + responseCode = 200; + } + + + string _replSetOplog(string parms) { + stringstream s; + string t = "Replication oplog"; + s << start(t); + s << p(t); + + if( theReplSet == 0 ) { + if( cmdLine._replSet.empty() ) + s << p("Not using --replSet"); + else { + s << p("Still starting up, or else set is not yet " + a("http://www.mongodb.org/display/DOCS/Replica+Set+Configuration#InitialSetup", "", "initiated") + + ".<br>" + ReplSet::startupStatusMsg); + } + } + else { + try { + theReplSet->getOplogDiagsAsHtml(stringToNum(parms.c_str()), s); + } + catch(std::exception& e) { + s << "error querying oplog: " << e.what() << '\n'; + } + } + + s << _end(); + return s.str(); + } + + /* /_replSet show replica set status in html format */ + string _replSet() { + stringstream s; + s << start("Replica Set Status " + prettyHostName()); + s << p( a("/", "back", "Home") + " | " + + a("/local/system.replset/?html=1", "", "View Replset Config") + " | " + + a("/replSetGetStatus?text", "", "replSetGetStatus") + " | " + + a("http://www.mongodb.org/display/DOCS/Replica+Sets", "", "Docs") + ); + + if( theReplSet == 0 ) { + if( cmdLine._replSet.empty() ) + s << p("Not using --replSet"); + else { + s << p("Still starting up, or else set is not yet " + a("http://www.mongodb.org/display/DOCS/Replica+Set+Configuration#InitialSetup", "", "initiated") + + ".<br>" + ReplSet::startupStatusMsg); + } + } + else { + try { + theReplSet->summarizeAsHtml(s); + } + catch(...) { s << "error summarizing replset status\n"; } + } + s << p("Recent replset log activity:"); + fillRsLog(s); + s << _end(); + return s.str(); + } + + + + } replSetHandler; + +} diff --git a/db/repl/rs.cpp b/db/repl/rs.cpp new file mode 100644 index 0000000..3e12e42 --- /dev/null +++ b/db/repl/rs.cpp @@ -0,0 +1,500 @@ +/** +* Copyright (C) 2008 10gen Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "pch.h" +#include "../cmdline.h" +#include "../../util/sock.h" +#include "../client.h" +#include "../../client/dbclient.h" +#include "../dbhelpers.h" +#include "rs.h" + +namespace mongo { + + using namespace bson; + + bool replSet = false; + ReplSet *theReplSet = 0; + extern string *discoveredSeed; + + void ReplSetImpl::sethbmsg(string s, int logLevel) { + static time_t lastLogged; + if( s == _hbmsg ) { + // unchanged + if( time(0)-lastLogged < 60 ) + return; + } + + unsigned sz = s.size(); + if( sz >= 256 ) + memcpy(_hbmsg, s.c_str(), 255); + else { + _hbmsg[sz] = 0; + memcpy(_hbmsg, s.c_str(), sz); + } + if( !s.empty() ) { + lastLogged = time(0); + log(logLevel) << "replSet " << s << rsLog; + } + } + + void ReplSetImpl::assumePrimary() { + assert( iAmPotentiallyHot() ); + writelock lk("admin."); // so we are synchronized with _logOp() + box.setSelfPrimary(_self); + log() << "replSet PRIMARY" << rsLog; // self (" << _self->id() << ") is now primary" << rsLog; + } + + void ReplSetImpl::changeState(MemberState s) { box.change(s, _self); } + + void ReplSetImpl::relinquish() { + if( box.getState().primary() ) { + changeState(MemberState::RS_RECOVERING); + log() << "replSet info relinquished primary state" << rsLog; + } + else if( box.getState().startup2() ) { + // ? add comment + changeState(MemberState::RS_RECOVERING); + } + } + + /* look freshly for who is primary - includes relinquishing ourself. */ + void ReplSetImpl::forgetPrimary() { + if( box.getState().primary() ) + relinquish(); + else { + box.setOtherPrimary(0); + } + } + + bool ReplSetImpl::_stepDown() { + lock lk(this); + if( box.getState().primary() ) { + changeState(MemberState::RS_RECOVERING); + elect.steppedDown = time(0) + 60; + log() << "replSet info stepped down as primary" << rsLog; + return true; + } + return false; + } + + void ReplSetImpl::msgUpdateHBInfo(HeartbeatInfo h) { + for( Member *m = _members.head(); m; m=m->next() ) { + if( m->id() == h.id() ) { + m->_hbinfo = h; + return; + } + } + } + + list<HostAndPort> ReplSetImpl::memberHostnames() const { + list<HostAndPort> L; + L.push_back(_self->h()); + for( Member *m = _members.head(); m; m = m->next() ) + L.push_back(m->h()); + return L; + } + + void ReplSetImpl::_fillIsMasterHost(const Member *m, vector<string>& hosts, vector<string>& passives, vector<string>& arbiters) { + if( m->potentiallyHot() ) { + hosts.push_back(m->h().toString()); + } + else if( !m->config().arbiterOnly ) { + passives.push_back(m->h().toString()); + } + else { + arbiters.push_back(m->h().toString()); + } + } + + void ReplSetImpl::_fillIsMaster(BSONObjBuilder& b) { + const StateBox::SP sp = box.get(); + bool isp = sp.state.primary(); + b.append("ismaster", isp); + b.append("secondary", sp.state.secondary()); + { + vector<string> hosts, passives, arbiters; + _fillIsMasterHost(_self, hosts, passives, arbiters); + + for( Member *m = _members.head(); m; m = m->next() ) { + _fillIsMasterHost(m, hosts, passives, arbiters); + } + + if( hosts.size() > 0 ) { + b.append("hosts", hosts); + } + if( passives.size() > 0 ) { + b.append("passives", passives); + } + if( arbiters.size() > 0 ) { + b.append("arbiters", arbiters); + } + } + + if( !isp ) { + const Member *m = sp.primary; + if( m ) + b.append("primary", m->h().toString()); + } + if( myConfig().arbiterOnly ) + b.append("arbiterOnly", true); + } + + /** @param cfgString <setname>/<seedhost1>,<seedhost2> */ + + void parseReplsetCmdLine(string cfgString, string& setname, vector<HostAndPort>& seeds, set<HostAndPort>& seedSet ) { + const char *p = cfgString.c_str(); + const char *slash = strchr(p, '/'); + if( slash ) + setname = string(p, slash-p); + else + setname = p; + uassert(13093, "bad --replSet config string format is: <setname>[/<seedhost1>,<seedhost2>,...]", !setname.empty()); + + if( slash == 0 ) + return; + + p = slash + 1; + while( 1 ) { + const char *comma = strchr(p, ','); + if( comma == 0 ) comma = strchr(p,0); + if( p == comma ) + break; + { + HostAndPort m; + try { + m = HostAndPort( string(p, comma-p) ); + } + catch(...) { + uassert(13114, "bad --replSet seed hostname", false); + } + uassert(13096, "bad --replSet command line config string - dups?", seedSet.count(m) == 0 ); + seedSet.insert(m); + //uassert(13101, "can't use localhost in replset host list", !m.isLocalHost()); + if( m.isSelf() ) { + log(1) << "replSet ignoring seed " << m.toString() << " (=self)" << rsLog; + } else + seeds.push_back(m); + if( *comma == 0 ) + break; + p = comma + 1; + } + } + } + + ReplSetImpl::ReplSetImpl(ReplSetCmdline& replSetCmdline) : elect(this), + _self(0), + mgr( new Manager(this) ) + { + memset(_hbmsg, 0, sizeof(_hbmsg)); + *_hbmsg = '.'; // temp...just to see + lastH = 0; + changeState(MemberState::RS_STARTUP); + + _seeds = &replSetCmdline.seeds; + //for( vector<HostAndPort>::iterator i = seeds->begin(); i != seeds->end(); i++ ) + // addMemberIfMissing(*i); + + log(1) << "replSet beginning startup..." << rsLog; + + loadConfig(); + + unsigned sss = replSetCmdline.seedSet.size(); + for( Member *m = head(); m; m = m->next() ) { + replSetCmdline.seedSet.erase(m->h()); + } + for( set<HostAndPort>::iterator i = replSetCmdline.seedSet.begin(); i != replSetCmdline.seedSet.end(); i++ ) { + if( i->isSelf() ) { + if( sss == 1 ) + log(1) << "replSet warning self is listed in the seed list and there are no other seeds listed did you intend that?" << rsLog; + } else + log() << "replSet warning command line seed " << i->toString() << " is not present in the current repl set config" << rsLog; + } + } + + void newReplUp(); + + void ReplSetImpl::loadLastOpTimeWritten() { + //assert( lastOpTimeWritten.isNull() ); + readlock lk(rsoplog); + BSONObj o; + if( Helpers::getLast(rsoplog, o) ) { + lastH = o["h"].numberLong(); + lastOpTimeWritten = o["ts"]._opTime(); + uassert(13290, "bad replSet oplog entry?", !lastOpTimeWritten.isNull()); + } + } + + /* call after constructing to start - returns fairly quickly after launching its threads */ + void ReplSetImpl::_go() { + try { + loadLastOpTimeWritten(); + } + catch(std::exception& e) { + log() << "replSet ERROR FATAL couldn't query the local " << rsoplog << " collection. Terminating mongod after 30 seconds." << rsLog; + log() << e.what() << rsLog; + sleepsecs(30); + dbexit( EXIT_REPLICATION_ERROR ); + return; + } + + changeState(MemberState::RS_STARTUP2); + startThreads(); + newReplUp(); // oplog.cpp + } + + ReplSetImpl::StartupStatus ReplSetImpl::startupStatus = PRESTART; + string ReplSetImpl::startupStatusMsg; + + // true if ok; throws if config really bad; false if config doesn't include self + bool ReplSetImpl::initFromConfig(ReplSetConfig& c) { + lock lk(this); + + { + int me = 0; + for( vector<ReplSetConfig::MemberCfg>::iterator i = c.members.begin(); i != c.members.end(); i++ ) { + const ReplSetConfig::MemberCfg& m = *i; + if( m.h.isSelf() ) { + me++; + } + } + if( me == 0 ) { + // log() << "replSet config : " << _cfg->toString() << rsLog; + log() << "replSet warning can't find self in the repl set configuration:" << rsLog; + log() << c.toString() << rsLog; + return false; + } + uassert( 13302, "replSet error self appears twice in the repl set configuration", me<=1 ); + } + + _cfg = new ReplSetConfig(c); + assert( _cfg->ok() ); + assert( _name.empty() || _name == _cfg->_id ); + _name = _cfg->_id; + assert( !_name.empty() ); + + // start with no members. if this is a reconfig, drop the old ones. + _members.orphanAll(); + + endOldHealthTasks(); + + int oldPrimaryId = -1; + { + const Member *p = box.getPrimary(); + if( p ) + oldPrimaryId = p->id(); + } + forgetPrimary(); + _self = 0; + for( vector<ReplSetConfig::MemberCfg>::iterator i = _cfg->members.begin(); i != _cfg->members.end(); i++ ) { + const ReplSetConfig::MemberCfg& m = *i; + Member *mi; + if( m.h.isSelf() ) { + assert( _self == 0 ); + mi = _self = new Member(m.h, m._id, &m, true); + if( (int)mi->id() == oldPrimaryId ) + box.setSelfPrimary(mi); + } else { + mi = new Member(m.h, m._id, &m, false); + _members.push(mi); + startHealthTaskFor(mi); + if( (int)mi->id() == oldPrimaryId ) + box.setOtherPrimary(mi); + } + } + return true; + } + + // Our own config must be the first one. + bool ReplSetImpl::_loadConfigFinish(vector<ReplSetConfig>& cfgs) { + int v = -1; + ReplSetConfig *highest = 0; + int myVersion = -2000; + int n = 0; + for( vector<ReplSetConfig>::iterator i = cfgs.begin(); i != cfgs.end(); i++ ) { + ReplSetConfig& cfg = *i; + if( ++n == 1 ) myVersion = cfg.version; + if( cfg.ok() && cfg.version > v ) { + highest = &cfg; + v = cfg.version; + } + } + assert( highest ); + + if( !initFromConfig(*highest) ) + return false; + + if( highest->version > myVersion && highest->version >= 0 ) { + log() << "replSet got config version " << highest->version << " from a remote, saving locally" << rsLog; + writelock lk("admin."); + highest->saveConfigLocally(BSONObj()); + } + return true; + } + + void ReplSetImpl::loadConfig() { + while( 1 ) { + startupStatus = LOADINGCONFIG; + startupStatusMsg = "loading " + rsConfigNs + " config (LOADINGCONFIG)"; + try { + vector<ReplSetConfig> configs; + try { + configs.push_back( ReplSetConfig(HostAndPort::me()) ); + } + catch(DBException& e) { + log() << "replSet exception loading our local replset configuration object : " << e.toString() << rsLog; + throw; + } + for( vector<HostAndPort>::const_iterator i = _seeds->begin(); i != _seeds->end(); i++ ) { + try { + configs.push_back( ReplSetConfig(*i) ); + } + catch( DBException& e ) { + log() << "replSet exception trying to load config from " << *i << " : " << e.toString() << rsLog; + } + } + + if( discoveredSeed ) { + try { + configs.push_back( ReplSetConfig(HostAndPort(*discoveredSeed)) ); + } + catch( DBException& ) { + log(1) << "replSet exception trying to load config from discovered seed " << *discoveredSeed << rsLog; + } + } + + int nok = 0; + int nempty = 0; + for( vector<ReplSetConfig>::iterator i = configs.begin(); i != configs.end(); i++ ) { + if( i->ok() ) + nok++; + if( i->empty() ) + nempty++; + } + if( nok == 0 ) { + + if( nempty == (int) configs.size() ) { + startupStatus = EMPTYCONFIG; + startupStatusMsg = "can't get " + rsConfigNs + " config from self or any seed (EMPTYCONFIG)"; + log() << "replSet can't get " << rsConfigNs << " config from self or any seed (EMPTYCONFIG)" << rsLog; + log(1) << "replSet have you ran replSetInitiate yet?" << rsLog; + if( _seeds->size() == 0 ) + log(1) << "replSet info no seed hosts were specified on the --replSet command line" << rsLog; + } + else { + startupStatus = EMPTYUNREACHABLE; + startupStatusMsg = "can't currently get " + rsConfigNs + " config from self or any seed (EMPTYUNREACHABLE)"; + log() << "replSet can't get " << rsConfigNs << " config from self or any seed (yet)" << rsLog; + } + + sleepsecs(10); + continue; + } + + if( !_loadConfigFinish(configs) ) { + log() << "replSet info Couldn't load config yet. Sleeping 20sec and will try again." << rsLog; + sleepsecs(20); + continue; + } + } + catch(DBException& e) { + startupStatus = BADCONFIG; + startupStatusMsg = "replSet error loading set config (BADCONFIG)"; + log() << "replSet error loading configurations " << e.toString() << rsLog; + log() << "replSet error replication will not start" << rsLog; + _fatal(); + throw; + } + break; + } + startupStatusMsg = "? started"; + startupStatus = STARTED; + } + + void ReplSetImpl::_fatal() + { + //lock l(this); + box.set(MemberState::RS_FATAL, 0); + sethbmsg("fatal error"); + log() << "replSet error fatal error, stopping replication" << rsLog; + } + + + void ReplSet::haveNewConfig(ReplSetConfig& newConfig, bool addComment) { + lock l(this); // convention is to lock replset before taking the db rwlock + writelock lk(""); + bo comment; + if( addComment ) + comment = BSON( "msg" << "Reconfig set" << "version" << newConfig.version ); + newConfig.saveConfigLocally(comment); + try { + initFromConfig(newConfig); + log() << "replSet replSetReconfig new config saved locally" << rsLog; + } + catch(DBException& e) { + log() << "replSet error unexpected exception in haveNewConfig() : " << e.toString() << rsLog; + _fatal(); + } + catch(...) { + log() << "replSet error unexpected exception in haveNewConfig()" << rsLog; + _fatal(); + } + } + + void Manager::msgReceivedNewConfig(BSONObj o) { + log() << "replset msgReceivedNewConfig version: " << o["version"].toString() << rsLog; + ReplSetConfig c(o); + if( c.version > rs->config().version ) + theReplSet->haveNewConfig(c, false); + else { + log() << "replSet info msgReceivedNewConfig but version isn't higher " << + c.version << ' ' << rs->config().version << rsLog; + } + } + + /* forked as a thread during startup + it can run quite a while looking for config. but once found, + a separate thread takes over as ReplSetImpl::Manager, and this thread + terminates. + */ + void startReplSets(ReplSetCmdline *replSetCmdline) { + Client::initThread("startReplSets"); + try { + assert( theReplSet == 0 ); + if( replSetCmdline == 0 ) { + assert(!replSet); + return; + } + (theReplSet = new ReplSet(*replSetCmdline))->go(); + } + catch(std::exception& e) { + log() << "replSet caught exception in startReplSets thread: " << e.what() << rsLog; + if( theReplSet ) + theReplSet->fatal(); + } + cc().shutdown(); + } + +} + +namespace boost { + + void assertion_failed(char const * expr, char const * function, char const * file, long line) + { + mongo::log() << "boost assertion failure " << expr << ' ' << function << ' ' << file << ' ' << line << endl; + } + +} diff --git a/db/repl/rs.h b/db/repl/rs.h new file mode 100644 index 0000000..17a070c --- /dev/null +++ b/db/repl/rs.h @@ -0,0 +1,415 @@ +// /db/repl/rs.h + +/** +* Copyright (C) 2008 10gen Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#pragma once + +#include "../../util/concurrency/list.h" +#include "../../util/concurrency/value.h" +#include "../../util/concurrency/msg.h" +#include "../../util/hostandport.h" +#include "../commands.h" +#include "rs_exception.h" +#include "rs_optime.h" +#include "rs_member.h" +#include "rs_config.h" + +namespace mongo { + + struct HowToFixUp; + struct Target; + class DBClientConnection; + class ReplSetImpl; + class OplogReader; + extern bool replSet; // true if using repl sets + extern class ReplSet *theReplSet; // null until initialized + extern Tee *rsLog; + + /* member of a replica set */ + class Member : public List1<Member>::Base { + public: + Member(HostAndPort h, unsigned ord, const ReplSetConfig::MemberCfg *c, bool self); + string fullName() const { return h().toString(); } + const ReplSetConfig::MemberCfg& config() const { return *_config; } + const HeartbeatInfo& hbinfo() const { return _hbinfo; } + string lhb() { return _hbinfo.lastHeartbeatMsg; } + MemberState state() const { return _hbinfo.hbstate; } + const HostAndPort& h() const { return _h; } + unsigned id() const { return _hbinfo.id(); } + bool potentiallyHot() const { return _config->potentiallyHot(); } // not arbiter, not priority 0 + + void summarizeMember(stringstream& s) const; + friend class ReplSetImpl; + private: + const ReplSetConfig::MemberCfg *_config; /* todo: when this changes??? */ + HostAndPort _h; + HeartbeatInfo _hbinfo; + }; + + class Manager : public task::Server { + ReplSetImpl *rs; + bool busyWithElectSelf; + int _primary; + const Member* findOtherPrimary(); + void noteARemoteIsPrimary(const Member *); + virtual void starting(); + public: + Manager(ReplSetImpl *rs); + ~Manager(); + void msgReceivedNewConfig(BSONObj); + void msgCheckNewState(); + }; + + struct Target; + + class Consensus { + ReplSetImpl &rs; + struct LastYea { + LastYea() : when(0), who(0xffffffff) { } + time_t when; + unsigned who; + }; + Atomic<LastYea> ly; + unsigned yea(unsigned memberId); // throws VoteException + void electionFailed(unsigned meid); + void _electSelf(); + bool weAreFreshest(bool& allUp, int& nTies); + bool sleptLast; // slept last elect() pass + public: + Consensus(ReplSetImpl *t) : rs(*t) { + sleptLast = false; + steppedDown = 0; + } + + /* if we've stepped down, this is when we are allowed to try to elect ourself again. + todo: handle possible weirdnesses at clock skews etc. + */ + time_t steppedDown; + + int totalVotes() const; + bool aMajoritySeemsToBeUp() const; + void electSelf(); + void electCmdReceived(BSONObj, BSONObjBuilder*); + void multiCommand(BSONObj cmd, list<Target>& L); + }; + + /** most operations on a ReplSet object should be done while locked. that logic implemented here. */ + class RSBase : boost::noncopyable { + public: + const unsigned magic; + void assertValid() { assert( magic == 0x12345677 ); } + private: + mutex m; + int _locked; + ThreadLocalValue<bool> _lockedByMe; + protected: + RSBase() : magic(0x12345677), m("RSBase"), _locked(0) { } + ~RSBase() { + log() << "~RSBase should never be called?" << rsLog; + assert(false); + } + + class lock { + RSBase& rsbase; + auto_ptr<scoped_lock> sl; + public: + lock(RSBase* b) : rsbase(*b) { + if( rsbase._lockedByMe.get() ) + return; // recursive is ok... + + sl.reset( new scoped_lock(rsbase.m) ); + DEV assert(rsbase._locked == 0); + rsbase._locked++; + rsbase._lockedByMe.set(true); + } + ~lock() { + if( sl.get() ) { + assert( rsbase._lockedByMe.get() ); + DEV assert(rsbase._locked == 1); + rsbase._lockedByMe.set(false); + rsbase._locked--; + } + } + }; + + public: + /* for asserts */ + bool locked() const { return _locked != 0; } + + /* if true, is locked, and was locked by this thread. note if false, it could be in the lock or not for another + just for asserts & such so we can make the contracts clear on who locks what when. + we don't use these locks that frequently, so the little bit of overhead is fine. + */ + bool lockedByMe() { return _lockedByMe.get(); } + }; + + class ReplSetHealthPollTask; + + /* safe container for our state that keeps member pointer and state variables always aligned */ + class StateBox : boost::noncopyable { + public: + struct SP { // SP is like pair<MemberState,const Member *> but nicer + SP() : state(MemberState::RS_STARTUP), primary(0) { } + MemberState state; + const Member *primary; + }; + const SP get() { + scoped_lock lk(m); + return sp; + } + MemberState getState() const { return sp.state; } + const Member* getPrimary() const { return sp.primary; } + void change(MemberState s, const Member *self) { + scoped_lock lk(m); + sp.state = s; + if( s.primary() ) { + sp.primary = self; + } + else { + if( self == sp.primary ) + sp.primary = 0; + } + } + void set(MemberState s, const Member *p) { + scoped_lock lk(m); + sp.state = s; sp.primary = p; + } + void setSelfPrimary(const Member *self) { change(MemberState::RS_PRIMARY, self); } + void setOtherPrimary(const Member *mem) { + scoped_lock lk(m); + assert( !sp.state.primary() ); + sp.primary = mem; + } + StateBox() : m("StateBox") { } + private: + mutex m; + SP sp; + }; + + void parseReplsetCmdLine(string cfgString, string& setname, vector<HostAndPort>& seeds, set<HostAndPort>& seedSet ); + + /** Parameter given to the --replSet command line option (parsed). + Syntax is "<setname>/<seedhost1>,<seedhost2>" + where setname is a name and seedhost is "<host>[:<port>]" */ + class ReplSetCmdline { + public: + ReplSetCmdline(string cfgString) { parseReplsetCmdLine(cfgString, setname, seeds, seedSet); } + string setname; + vector<HostAndPort> seeds; + set<HostAndPort> seedSet; + }; + + /* information about the entire repl set, such as the various servers in the set, and their state */ + /* note: We currently do not free mem when the set goes away - it is assumed the replset is a + singleton and long lived. + */ + class ReplSetImpl : protected RSBase { + public: + /** info on our state if the replset isn't yet "up". for example, if we are pre-initiation. */ + enum StartupStatus { + PRESTART=0, LOADINGCONFIG=1, BADCONFIG=2, EMPTYCONFIG=3, + EMPTYUNREACHABLE=4, STARTED=5, SOON=6 + }; + static StartupStatus startupStatus; + static string startupStatusMsg; + static string stateAsStr(MemberState state); + static string stateAsHtml(MemberState state); + + /* todo thread */ + void msgUpdateHBInfo(HeartbeatInfo); + + StateBox box; + + OpTime lastOpTimeWritten; + long long lastH; // hash we use to make sure we are reading the right flow of ops and aren't on an out-of-date "fork" + private: + set<ReplSetHealthPollTask*> healthTasks; + void endOldHealthTasks(); + void startHealthTaskFor(Member *m); + + private: + Consensus elect; + bool ok() const { return !box.getState().fatal(); } + + void relinquish(); + void forgetPrimary(); + + protected: + bool _stepDown(); + private: + void assumePrimary(); + void loadLastOpTimeWritten(); + void changeState(MemberState s); + + protected: + // "heartbeat message" + // sent in requestHeartbeat respond in field "hbm" + char _hbmsg[256]; // we change this unlocked, thus not an stl::string + public: + void sethbmsg(string s, int logLevel = 0); + protected: + bool initFromConfig(ReplSetConfig& c); // true if ok; throws if config really bad; false if config doesn't include self + void _fillIsMaster(BSONObjBuilder&); + void _fillIsMasterHost(const Member*, vector<string>&, vector<string>&, vector<string>&); + const ReplSetConfig& config() { return *_cfg; } + string name() const { return _name; } /* @return replica set's logical name */ + MemberState state() const { return box.getState(); } + void _fatal(); + void _getOplogDiagsAsHtml(unsigned server_id, stringstream& ss) const; + void _summarizeAsHtml(stringstream&) const; + void _summarizeStatus(BSONObjBuilder&) const; // for replSetGetStatus command + + /* throws exception if a problem initializing. */ + ReplSetImpl(ReplSetCmdline&); + + /* call afer constructing to start - returns fairly quickly after launching its threads */ + void _go(); + + private: + string _name; + const vector<HostAndPort> *_seeds; + ReplSetConfig *_cfg; + + /** load our configuration from admin.replset. try seed machines too. + @return true if ok; throws if config really bad; false if config doesn't include self + */ + bool _loadConfigFinish(vector<ReplSetConfig>& v); + void loadConfig(); + + list<HostAndPort> memberHostnames() const; + const ReplSetConfig::MemberCfg& myConfig() const { return _self->config(); } + bool iAmArbiterOnly() const { return myConfig().arbiterOnly; } + bool iAmPotentiallyHot() const { return myConfig().potentiallyHot(); } + protected: + Member *_self; + private: + List1<Member> _members; /* all members of the set EXCEPT self. */ + + public: + unsigned selfId() const { return _self->id(); } + Manager *mgr; + + private: + Member* head() const { return _members.head(); } + public: + const Member* findById(unsigned id) const; + private: + void _getTargets(list<Target>&, int &configVersion); + void getTargets(list<Target>&, int &configVersion); + void startThreads(); + friend class FeedbackThread; + friend class CmdReplSetElect; + friend class Member; + friend class Manager; + friend class Consensus; + + private: + /* pulling data from primary related - see rs_sync.cpp */ + bool initialSyncOplogApplication(string hn, const Member *primary, OpTime applyGTE, OpTime minValid); + void _syncDoInitialSync(); + void syncDoInitialSync(); + void _syncThread(); + void syncTail(); + void syncApply(const BSONObj &o); + void syncRollback(OplogReader& r); + void syncFixUp(HowToFixUp& h, OplogReader& r); + public: + void syncThread(); + }; + + class ReplSet : public ReplSetImpl { + public: + ReplSet(ReplSetCmdline& replSetCmdline) : ReplSetImpl(replSetCmdline) { } + + bool stepDown() { return _stepDown(); } + + string selfFullName() { + lock lk(this); + return _self->fullName(); + } + + /* call after constructing to start - returns fairly quickly after la[unching its threads */ + void go() { _go(); } + void fatal() { _fatal(); } + bool isPrimary(); + bool isSecondary(); + MemberState state() const { return ReplSetImpl::state(); } + string name() const { return ReplSetImpl::name(); } + const ReplSetConfig& config() { return ReplSetImpl::config(); } + void getOplogDiagsAsHtml(unsigned server_id, stringstream& ss) const { _getOplogDiagsAsHtml(server_id,ss); } + void summarizeAsHtml(stringstream& ss) const { _summarizeAsHtml(ss); } + void summarizeStatus(BSONObjBuilder& b) const { _summarizeStatus(b); } + void fillIsMaster(BSONObjBuilder& b) { _fillIsMaster(b); } + + /* we have a new config (reconfig) - apply it. + @param comment write a no-op comment to the oplog about it. only makes sense if one is primary and initiating the reconf. + */ + void haveNewConfig(ReplSetConfig& c, bool comment); + + /* if we delete old configs, this needs to assure locking. currently we don't so it is ok. */ + const ReplSetConfig& getConfig() { return config(); } + + bool lockedByMe() { return RSBase::lockedByMe(); } + + // heartbeat msg to send to others; descriptive diagnostic info + string hbmsg() const { return _hbmsg; } + }; + + /** base class for repl set commands. checks basic things such as in rs mode before the command + does its real work + */ + class ReplSetCommand : public Command { + protected: + ReplSetCommand(const char * s, bool show=false) : Command(s, show) { } + virtual bool slaveOk() const { return true; } + virtual bool adminOnly() const { return true; } + virtual bool logTheOp() { return false; } + virtual LockType locktype() const { return NONE; } + virtual void help( stringstream &help ) const { help << "internal"; } + bool check(string& errmsg, BSONObjBuilder& result) { + if( !replSet ) { + errmsg = "not running with --replSet"; + return false; + } + if( theReplSet == 0 ) { + result.append("startupStatus", ReplSet::startupStatus); + errmsg = ReplSet::startupStatusMsg.empty() ? "replset unknown error 2" : ReplSet::startupStatusMsg; + return false; + } + return true; + } + }; + + /** inlines ----------------- */ + + inline Member::Member(HostAndPort h, unsigned ord, const ReplSetConfig::MemberCfg *c, bool self) : + _config(c), _h(h), _hbinfo(ord) { + if( self ) { + _hbinfo.health = 1.0; + } + } + + inline bool ReplSet::isPrimary() { + /* todo replset */ + return box.getState().primary(); + } + + inline bool ReplSet::isSecondary() { + return box.getState().secondary(); + } + +} diff --git a/db/repl/rs_config.cpp b/db/repl/rs_config.cpp new file mode 100644 index 0000000..76b20a4 --- /dev/null +++ b/db/repl/rs_config.cpp @@ -0,0 +1,315 @@ +// rs_config.cpp + +/** +* Copyright (C) 2008 10gen Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "pch.h" +#include "rs.h" +#include "../../client/dbclient.h" +#include "../../client/syncclusterconnection.h" +#include "../../util/hostandport.h" +#include "../dbhelpers.h" +#include "connections.h" +#include "../oplog.h" + +using namespace bson; + +namespace mongo { + + void logOpInitiate(const bo&); + + list<HostAndPort> ReplSetConfig::otherMemberHostnames() const { + list<HostAndPort> L; + for( vector<MemberCfg>::const_iterator i = members.begin(); i != members.end(); i++ ) { + if( !i->h.isSelf() ) + L.push_back(i->h); + } + return L; + } + + /* comment MUST only be set when initiating the set by the initiator */ + void ReplSetConfig::saveConfigLocally(bo comment) { + check(); + log() << "replSet info saving a newer config version to local.system.replset" << rsLog; + { + writelock lk(""); + Client::Context cx( rsConfigNs ); + cx.db()->flushFiles(true); + + //theReplSet->lastOpTimeWritten = ??; + //rather than above, do a logOp()? probably + BSONObj o = asBson(); + Helpers::putSingletonGod(rsConfigNs.c_str(), o, false/*logOp=false; local db so would work regardless...*/); + if( !comment.isEmpty() ) + logOpInitiate(comment); + + cx.db()->flushFiles(true); + } + DEV log() << "replSet saveConfigLocally done" << rsLog; + } + + /*static*/ + /*void ReplSetConfig::receivedNewConfig(BSONObj cfg) { + if( theReplSet ) + return; // this is for initial setup only, so far. todo + + ReplSetConfig c(cfg); + + writelock lk("admin."); + if( theReplSet ) + return; + c.saveConfigLocally(bo()); + }*/ + + bo ReplSetConfig::MemberCfg::asBson() const { + bob b; + b << "_id" << _id; + b.append("host", h.toString()); + if( votes != 1 ) b << "votes" << votes; + if( priority != 1.0 ) b << "priority" << priority; + if( arbiterOnly ) b << "arbiterOnly" << true; + return b.obj(); + } + + bo ReplSetConfig::asBson() const { + bob b; + b.append("_id", _id).append("version", version); + if( !ho.isDefault() || !getLastErrorDefaults.isEmpty() ) { + bob settings; + if( !ho.isDefault() ) + settings << "heartbeatConnRetries " << ho.heartbeatConnRetries << + "heartbeatSleep" << ho.heartbeatSleepMillis / 1000 << + "heartbeatTimeout" << ho.heartbeatTimeoutMillis / 1000; + if( !getLastErrorDefaults.isEmpty() ) + settings << "getLastErrorDefaults" << getLastErrorDefaults; + b << "settings" << settings.obj(); + } + + BSONArrayBuilder a; + for( unsigned i = 0; i < members.size(); i++ ) + a.append( members[i].asBson() ); + b.append("members", a.arr()); + + return b.obj(); + } + + static inline void mchk(bool expr) { + uassert(13126, "bad Member config", expr); + } + + void ReplSetConfig::MemberCfg::check() const{ + mchk(_id >= 0 && _id <= 255); + mchk(priority >= 0 && priority <= 1000); + mchk(votes >= 0 && votes <= 100); + uassert(13419, "this version of mongod only supports priorities 0 and 1", priority == 0 || priority == 1); + } + + /*static*/ bool ReplSetConfig::legalChange(const ReplSetConfig& o, const ReplSetConfig& n, string& errmsg) { + if( o._id != n._id ) { + errmsg = "set name may not change"; + return false; + } + /* TODO : wonder if we need to allow o.version < n.version only, which is more lenient. + if someone had some intermediate config this node doesnt have, that could be + necessary. but then how did we become primary? so perhaps we are fine as-is. + */ + if( o.version + 1 != n.version ) { + errmsg = "version number wrong"; + return false; + } + + /* TODO : MORE CHECKS HERE */ + + log() << "replSet TODO : don't allow removal of a node until we handle it at the removed node end?" << endl; + // we could change its votes to zero perhaps instead as a short term... + + return true; + } + + void ReplSetConfig::clear() { + version = -5; + _ok = false; + } + + void ReplSetConfig::check() const { + uassert(13132, + "nonmatching repl set name in _id field; check --replSet command line", + _id == cmdLine.ourSetName()); + uassert(13308, "replSet bad config version #", version > 0); + uassert(13133, "replSet bad config no members", members.size() >= 1); + uassert(13309, "replSet bad config maximum number of members is 7 (for now)", members.size() <= 7); + } + + void ReplSetConfig::from(BSONObj o) { + md5 = o.md5(); + _id = o["_id"].String(); + if( o["version"].ok() ) { + version = o["version"].numberInt(); + uassert(13115, "bad " + rsConfigNs + " config: version", version > 0); + } + + if( o["settings"].ok() ) { + BSONObj settings = o["settings"].Obj(); + if( settings["heartbeatConnRetries "].ok() ) + ho.heartbeatConnRetries = settings["heartbeatConnRetries "].numberInt(); + if( settings["heartbeatSleep"].ok() ) + ho.heartbeatSleepMillis = (unsigned) (settings["heartbeatSleep"].Number() * 1000); + if( settings["heartbeatTimeout"].ok() ) + ho.heartbeatTimeoutMillis = (unsigned) (settings["heartbeatTimeout"].Number() * 1000); + ho.check(); + try { getLastErrorDefaults = settings["getLastErrorDefaults"].Obj(); } catch(...) { } + } + + set<string> hosts; + set<int> ords; + vector<BSONElement> members; + try { + members = o["members"].Array(); + } + catch(...) { + uasserted(13131, "replSet error parsing (or missing) 'members' field in config object"); + } + + unsigned localhosts = 0; + for( unsigned i = 0; i < members.size(); i++ ) { + BSONObj mobj = members[i].Obj(); + MemberCfg m; + try { + try { + m._id = (int) mobj["_id"].Number(); + } catch(...) { + /* TODO: use of string exceptions may be problematic for reconfig case! */ + throw "_id must be numeric"; + } + string s; + try { + s = mobj["host"].String(); + m.h = HostAndPort(s); + } + catch(...) { + throw string("bad or missing host field? ") + mobj.toString(); + } + if( m.h.isLocalHost() ) + localhosts++; + m.arbiterOnly = mobj.getBoolField("arbiterOnly"); + if( mobj.hasElement("priority") ) + m.priority = mobj["priority"].Number(); + if( mobj.hasElement("votes") ) + m.votes = (unsigned) mobj["votes"].Number(); + m.check(); + } + catch( const char * p ) { + log() << "replSet cfg parsing exception for members[" << i << "] " << p << rsLog; + stringstream ss; + ss << "replSet members[" << i << "] " << p; + uassert(13107, ss.str(), false); + } + catch(DBException& e) { + log() << "replSet cfg parsing exception for members[" << i << "] " << e.what() << rsLog; + stringstream ss; + ss << "bad config for member[" << i << "] " << e.what(); + uassert(13135, ss.str(), false); + } + if( !(ords.count(m._id) == 0 && hosts.count(m.h.toString()) == 0) ) { + log() << "replSet " << o.toString() << rsLog; + uassert(13108, "bad replset config -- duplicate hosts in the config object?", false); + } + hosts.insert(m.h.toString()); + ords.insert(m._id); + this->members.push_back(m); + } + uassert(13393, "can't use localhost in repl set member names except when using it for all members", localhosts == 0 || localhosts == members.size()); + uassert(13117, "bad " + rsConfigNs + " config", !_id.empty()); + } + + static inline void configAssert(bool expr) { + uassert(13122, "bad repl set config?", expr); + } + + ReplSetConfig::ReplSetConfig(BSONObj cfg) { + clear(); + from(cfg); + configAssert( version < 0 /*unspecified*/ || (version >= 1 && version <= 5000) ); + if( version < 1 ) + version = 1; + _ok = true; + } + + ReplSetConfig::ReplSetConfig(const HostAndPort& h) { + clear(); + int level = 2; + DEV level = 0; + //log(0) << "replSet load config from: " << h.toString() << rsLog; + + auto_ptr<DBClientCursor> c; + int v = -5; + try { + if( h.isSelf() ) { + ; + } + else { + /* first, make sure other node is configured to be a replset. just to be safe. */ + string setname = cmdLine.ourSetName(); + BSONObj cmd = BSON( "replSetHeartbeat" << setname ); + int theirVersion; + BSONObj info; + bool ok = requestHeartbeat(setname, "", h.toString(), info, -2, theirVersion); + if( info["rs"].trueValue() ) { + // yes, it is a replicate set, although perhaps not yet initialized + } + else { + if( !ok ) { + log() << "replSet TEMP !ok heartbeating " << h.toString() << " on cfg load" << rsLog; + if( !info.isEmpty() ) + log() << "replSet info " << h.toString() << " : " << info.toString() << rsLog; + return; + } + { + stringstream ss; + ss << "replSet error: member " << h.toString() << " is not in --replSet mode"; + msgassertedNoTrace(13260, ss.str().c_str()); // not caught as not a user exception - we want it not caught + //for python err# checker: uassert(13260, "", false); + } + } + } + + v = -4; + ScopedConn conn(h.toString()); + v = -3; + c = conn->query(rsConfigNs); + if( c.get() == 0 ) { + version = v; return; + } + if( !c->more() ) { + version = EMPTYCONFIG; + return; + } + version = -1; + } + catch( DBException& e) { + version = v; + log(level) << "replSet load config couldn't get from " << h.toString() << ' ' << e.what() << rsLog; + return; + } + + BSONObj o = c->nextSafe(); + uassert(13109, "multiple rows in " + rsConfigNs + " not supported", !c->more()); + from(o); + _ok = true; + log(level) << "replSet load config ok from " << (h.isSelf() ? "self" : h.toString()) << rsLog; + } + +} diff --git a/db/repl/rs_config.h b/db/repl/rs_config.h new file mode 100644 index 0000000..38df772 --- /dev/null +++ b/db/repl/rs_config.h @@ -0,0 +1,88 @@ +// rs_config.h +// repl set configuration +// + +/** +* Copyright (C) 2008 10gen Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#pragma once + +#include "../../util/hostandport.h" +#include "health.h" + +namespace mongo { + + /* singleton config object is stored here */ + const string rsConfigNs = "local.system.replset"; + + class ReplSetConfig { + enum { EMPTYCONFIG = -2 }; + public: + /* if something is misconfigured, throws an exception. + if couldn't be queried or is just blank, ok() will be false. + */ + ReplSetConfig(const HostAndPort& h); + + ReplSetConfig(BSONObj cfg); + + bool ok() const { return _ok; } + + struct MemberCfg { + MemberCfg() : _id(-1), votes(1), priority(1.0), arbiterOnly(false) { } + int _id; /* ordinal */ + unsigned votes; /* how many votes this node gets. default 1. */ + HostAndPort h; + double priority; /* 0 means can never be primary */ + bool arbiterOnly; + void check() const; /* check validity, assert if not. */ + BSONObj asBson() const; + bool potentiallyHot() const { + return !arbiterOnly && priority > 0; + } + }; + vector<MemberCfg> members; + string _id; + int version; + HealthOptions ho; + string md5; + BSONObj getLastErrorDefaults; + + list<HostAndPort> otherMemberHostnames() const; // except self + + /** @return true if could connect, and there is no cfg object there at all */ + bool empty() const { return version == EMPTYCONFIG; } + + string toString() const { return asBson().toString(); } + + /** validate the settings. does not call check() on each member, you have to do that separately. */ + void check() const; + + /** check if modification makes sense */ + static bool legalChange(const ReplSetConfig& old, const ReplSetConfig& n, string& errmsg); + + //static void receivedNewConfig(BSONObj); + void saveConfigLocally(BSONObj comment); // to local db + string saveConfigEverywhere(); // returns textual info on what happened + + BSONObj asBson() const; + + private: + bool _ok; + void from(BSONObj); + void clear(); + }; + +} diff --git a/db/repl/rs_exception.h b/db/repl/rs_exception.h new file mode 100755 index 0000000..e71cad2 --- /dev/null +++ b/db/repl/rs_exception.h @@ -0,0 +1,17 @@ +// @file rs_exception.h
+
+#pragma once
+
+namespace mongo {
+
+ class VoteException : public std::exception { + public: + const char * what() const throw () { return "VoteException"; }
+ }; + + class RetryAfterSleepException : public std::exception { + public: + const char * what() const throw () { return "RetryAfterSleepException"; } + }; + +} diff --git a/db/repl/rs_initialsync.cpp b/db/repl/rs_initialsync.cpp new file mode 100644 index 0000000..4c6bd4d --- /dev/null +++ b/db/repl/rs_initialsync.cpp @@ -0,0 +1,214 @@ +/** +* Copyright (C) 2008 10gen Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "pch.h" +#include "../client.h" +#include "../../client/dbclient.h" +#include "rs.h" +#include "../oplogreader.h" +#include "../../util/mongoutils/str.h" +#include "../dbhelpers.h" +#include "rs_optime.h" +#include "../oplog.h" + +namespace mongo { + + using namespace mongoutils; + using namespace bson; + + void dropAllDatabasesExceptLocal(); + + // add try/catch with sleep + + void isyncassert(const char *msg, bool expr) { + if( !expr ) { + string m = str::stream() << "initial sync " << msg; + theReplSet->sethbmsg(m, 0); + uasserted(13404, m); + } + } + + void ReplSetImpl::syncDoInitialSync() { + while( 1 ) { + try { + _syncDoInitialSync(); + break; + } + catch(DBException& e) { + sethbmsg("initial sync exception " + e.toString(), 0); + sleepsecs(30); + } + } + } + + bool cloneFrom(const char *masterHost, string& errmsg, const string& fromdb, bool logForReplication, + bool slaveOk, bool useReplAuth, bool snapshot); + + /* todo : progress metering to sethbmsg. */ + static bool clone(const char *master, string db) { + string err; + return cloneFrom(master, err, db, false, + /*slaveok later can be true*/ false, true, false); + } + + void _logOpObjRS(const BSONObj& op); + + bool copyCollectionFromRemote(const string& host, const string& ns, const BSONObj& query, string errmsg); + + static void emptyOplog() { + writelock lk(rsoplog); + Client::Context ctx(rsoplog); + NamespaceDetails *d = nsdetails(rsoplog); + + // temp + if( d && d->nrecords == 0 ) + return; // already empty, ok. + + log(1) << "replSet empty oplog" << rsLog; + d->emptyCappedCollection(rsoplog); + + /* + string errmsg; + bob res; + dropCollection(rsoplog, errmsg, res); + log() << "replSet recreated oplog so it is empty. todo optimize this..." << rsLog; + createOplog();*/ + + // TEMP: restart to recreate empty oplog + //log() << "replSet FATAL error during initial sync. mongod restart required." << rsLog; + //dbexit( EXIT_CLEAN ); + + /* + writelock lk(rsoplog); + Client::Context c(rsoplog, dbpath, 0, doauth/false); + NamespaceDetails *oplogDetails = nsdetails(rsoplog); + uassert(13412, str::stream() << "replSet error " << rsoplog << " is missing", oplogDetails != 0); + oplogDetails->cappedTruncateAfter(rsoplog, h.commonPointOurDiskloc, false); + */ + } + + void ReplSetImpl::_syncDoInitialSync() { + sethbmsg("initial sync pending",0); + + StateBox::SP sp = box.get(); + assert( !sp.state.primary() ); // wouldn't make sense if we were. + + const Member *cp = sp.primary; + if( cp == 0 ) { + sethbmsg("initial sync need a member to be primary",0); + sleepsecs(15); + return; + } + + string masterHostname = cp->h().toString(); + OplogReader r; + if( !r.connect(masterHostname) ) { + sethbmsg( str::stream() << "initial sync couldn't connect to " << cp->h().toString() , 0); + sleepsecs(15); + return; + } + + BSONObj lastOp = r.getLastOp(rsoplog); + if( lastOp.isEmpty() ) { + sethbmsg("initial sync couldn't read remote oplog", 0); + sleepsecs(15); + return; + } + OpTime startingTS = lastOp["ts"]._opTime(); + + { + /* make sure things aren't too flappy */ + sleepsecs(5); + isyncassert( "flapping?", box.getPrimary() == cp ); + BSONObj o = r.getLastOp(rsoplog); + isyncassert( "flapping [2]?", !o.isEmpty() ); + } + + sethbmsg("initial sync drop all databases", 0); + dropAllDatabasesExceptLocal(); + +// sethbmsg("initial sync drop oplog", 0); +// emptyOplog(); + + list<string> dbs = r.conn()->getDatabaseNames(); + for( list<string>::iterator i = dbs.begin(); i != dbs.end(); i++ ) { + string db = *i; + if( db != "local" ) { + sethbmsg( str::stream() << "initial sync cloning db: " << db , 0); + bool ok; + { + writelock lk(db); + Client::Context ctx(db); + ok = clone(masterHostname.c_str(), db); + } + if( !ok ) { + sethbmsg( str::stream() << "initial sync error clone of " << db << " failed sleeping 5 minutes" ,0); + sleepsecs(300); + return; + } + } + } + + sethbmsg("initial sync query minValid",0); + + /* our cloned copy will be strange until we apply oplog events that occurred + through the process. we note that time point here. */ + BSONObj minValid = r.getLastOp(rsoplog); + assert( !minValid.isEmpty() ); + OpTime mvoptime = minValid["ts"]._opTime(); + assert( !mvoptime.isNull() ); + + /* copy the oplog + */ + { + sethbmsg("initial sync copy+apply oplog"); + if( ! initialSyncOplogApplication(masterHostname, cp, startingTS, mvoptime) ) { // note we assume here that this call does not throw + log() << "replSet initial sync failed during applyoplog" << rsLog; + emptyOplog(); // otherwise we'll be up! + lastOpTimeWritten = OpTime(); + lastH = 0; + log() << "replSet cleaning up [1]" << rsLog; + { + writelock lk("local."); + Client::Context cx( "local." ); + cx.db()->flushFiles(true); + } + log() << "replSet cleaning up [2]" << rsLog; + sleepsecs(2); + return; + } + } + + sethbmsg("initial sync finishing up",0); + + assert( !box.getState().primary() ); // wouldn't make sense if we were. + + { + writelock lk("local."); + Client::Context cx( "local." ); + cx.db()->flushFiles(true); + try { + log() << "replSet set minValid=" << minValid["ts"]._opTime().toString() << rsLog; + } + catch(...) { } + Helpers::putSingleton("local.replset.minvalid", minValid); + cx.db()->flushFiles(true); + } + + sethbmsg("initial sync done",0); + } + +} diff --git a/db/repl/rs_initiate.cpp b/db/repl/rs_initiate.cpp new file mode 100644 index 0000000..9c74be0 --- /dev/null +++ b/db/repl/rs_initiate.cpp @@ -0,0 +1,238 @@ +/* @file rs_initiate.cpp + */ + +/** +* Copyright (C) 2008 10gen Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "pch.h" +#include "../cmdline.h" +#include "../commands.h" +#include "../../util/mmap.h" +#include "../../util/mongoutils/str.h" +#include "health.h" +#include "rs.h" +#include "rs_config.h" +#include "../dbhelpers.h" + +using namespace bson; +using namespace mongoutils; + +namespace mongo { + + /* called on a reconfig AND on initiate + throws + @param initial true when initiating + */ + void checkMembersUpForConfigChange(const ReplSetConfig& cfg, bool initial) { + int failures = 0; + int me = 0; + for( vector<ReplSetConfig::MemberCfg>::const_iterator i = cfg.members.begin(); i != cfg.members.end(); i++ ) { + if( i->h.isSelf() ) { + me++; + if( !i->potentiallyHot() ) { + uasserted(13420, "initiation and reconfiguration of a replica set must be sent to a node that can become primary"); + } + } + } + uassert(13278, "bad config - dups?", me <= 1); // dups? + uassert(13279, "can't find self in the replset config", me == 1); + + for( vector<ReplSetConfig::MemberCfg>::const_iterator i = cfg.members.begin(); i != cfg.members.end(); i++ ) { + BSONObj res; + { + bool ok = false; + try { + int theirVersion = -1000; + ok = requestHeartbeat(cfg._id, "", i->h.toString(), res, -1, theirVersion, initial/*check if empty*/); + if( theirVersion >= cfg.version ) { + stringstream ss; + ss << "replSet member " << i->h.toString() << " has too new a config version (" << theirVersion << ") to reconfigure"; + uasserted(13259, ss.str()); + } + } + catch(DBException& e) { + log() << "replSet cmufcc requestHeartbeat " << i->h.toString() << " : " << e.toString() << rsLog; + } + catch(...) { + log() << "replSet cmufcc error exception in requestHeartbeat?" << rsLog; + } + if( res.getBoolField("mismatch") ) + uasserted(13145, "set name does not match the set name host " + i->h.toString() + " expects"); + if( *res.getStringField("set") ) { + if( cfg.version <= 1 ) { + // this was to be initiation, no one shoudl be initiated already. + uasserted(13256, "member " + i->h.toString() + " is already initiated"); + } + else { + // Assure no one has a newer config. + if( res["v"].Int() >= cfg.version ) { + uasserted(13341, "member " + i->h.toString() + " has a config version >= to the new cfg version; cannot change config"); + } + } + } + if( !ok && !res["rs"].trueValue() ) { + if( !res.isEmpty() ) { + /* strange. got a response, but not "ok". log it. */ + log() << "replSet warning " << i->h.toString() << " replied: " << res.toString() << rsLog; + } + + bool allowFailure = false; + failures++; + if( res.isEmpty() && !initial && failures == 1 ) { + /* for now we are only allowing 1 node to be down on a reconfig. this can be made to be a minority + trying to keep change small as release is near. + */ + const Member* m = theReplSet->findById( i->_id ); + if( m ) { + // ok, so this was an existing member (wouldn't make sense to add to config a new member that is down) + assert( m->h().toString() == i->h.toString() ); + allowFailure = true; + } + } + + if( !allowFailure ) { + string msg = string("need members up to initiate, not ok : ") + i->h.toString(); + if( !initial ) + msg = string("need most members up to reconfigure, not ok : ") + i->h.toString(); + uasserted(13144, msg); + } + } + } + if( initial ) { + bool hasData = res["hasData"].Bool(); + uassert(13311, "member " + i->h.toString() + " has data already, cannot initiate set. All members except initiator must be empty.", + !hasData || i->h.isSelf()); + } + } + } + + class CmdReplSetInitiate : public ReplSetCommand { + public: + virtual LockType locktype() const { return NONE; } + CmdReplSetInitiate() : ReplSetCommand("replSetInitiate") { } + virtual void help(stringstream& h) const { + h << "Initiate/christen a replica set."; + h << "\nhttp://www.mongodb.org/display/DOCS/Replica+Set+Commands"; + } + virtual bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl) { + log() << "replSet replSetInitiate admin command received from client" << rsLog; + + if( !replSet ) { + errmsg = "server is not running with --replSet"; + return false; + } + if( theReplSet ) { + errmsg = "already initialized"; + result.append("info", "try querying " + rsConfigNs + " to see current configuration"); + return false; + } + + { + // just make sure we can get a write lock before doing anything else. we'll reacquire one + // later. of course it could be stuck then, but this check lowers the risk if weird things + // are up. + time_t t = time(0); + writelock lk(""); + if( time(0)-t > 10 ) { + errmsg = "took a long time to get write lock, so not initiating. Initiate when server less busy?"; + return false; + } + + /* check that we don't already have an oplog. that could cause issues. + it is ok if the initiating member has *other* data than that. + */ + BSONObj o; + if( Helpers::getFirst(rsoplog, o) ) { + errmsg = rsoplog + string(" is not empty on the initiating member. cannot initiate."); + return false; + } + } + + if( ReplSet::startupStatus == ReplSet::BADCONFIG ) { + errmsg = "server already in BADCONFIG state (check logs); not initiating"; + result.append("info", ReplSet::startupStatusMsg); + return false; + } + if( ReplSet::startupStatus != ReplSet::EMPTYCONFIG ) { + result.append("startupStatus", ReplSet::startupStatus); + errmsg = "all members and seeds must be reachable to initiate set"; + result.append("info", cmdLine._replSet); + return false; + } + + BSONObj configObj; + + if( cmdObj["replSetInitiate"].type() != Object ) { + result.append("info2", "no configuration explicitly specified -- making one"); + log() << "replSet info initiate : no configuration specified. Using a default configuration for the set" << rsLog; + + string name; + vector<HostAndPort> seeds; + set<HostAndPort> seedSet; + parseReplsetCmdLine(cmdLine._replSet, name, seeds, seedSet); // may throw... + + bob b; + b.append("_id", name); + bob members; + members.append("0", BSON( "_id" << 0 << "host" << HostAndPort::Me().toString() )); + for( unsigned i = 0; i < seeds.size(); i++ ) + members.append(bob::numStr(i+1), BSON( "_id" << i+1 << "host" << seeds[i].toString())); + b.appendArray("members", members.obj()); + configObj = b.obj(); + log() << "replSet created this configuration for initiation : " << configObj.toString() << rsLog; + } + else { + configObj = cmdObj["replSetInitiate"].Obj(); + } + + bool parsed = false; + try { + ReplSetConfig newConfig(configObj); + parsed = true; + + if( newConfig.version > 1 ) { + errmsg = "can't initiate with a version number greater than 1"; + return false; + } + + log() << "replSet replSetInitiate config object parses ok, " << newConfig.members.size() << " members specified" << rsLog; + + checkMembersUpForConfigChange(newConfig, true); + + log() << "replSet replSetInitiate all members seem up" << rsLog; + + writelock lk(""); + bo comment = BSON( "msg" << "initiating set"); + newConfig.saveConfigLocally(comment); + log() << "replSet replSetInitiate config now saved locally. Should come online in about a minute." << rsLog; + result.append("info", "Config now saved locally. Should come online in about a minute."); + ReplSet::startupStatus = ReplSet::SOON; + ReplSet::startupStatusMsg = "Received replSetInitiate - should come online shortly."; + } + catch( DBException& e ) { + log() << "replSet replSetInitiate exception: " << e.what() << rsLog; + if( !parsed ) + errmsg = string("couldn't parse cfg object ") + e.what(); + else + errmsg = string("couldn't initiate : ") + e.what(); + return false; + } + + return true; + } + } cmdReplSetInitiate; + +} diff --git a/db/repl/rs_member.h b/db/repl/rs_member.h new file mode 100644 index 0000000..4f6846a --- /dev/null +++ b/db/repl/rs_member.h @@ -0,0 +1,91 @@ +// @file rsmember.h +/* + * Copyright (C) 2010 10gen Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +/** replica set member */ + +#pragma once + +namespace mongo { + + + /* + RS_STARTUP serving still starting up, or still trying to initiate the set + RS_PRIMARY this server thinks it is primary + RS_SECONDARY this server thinks it is a secondary (slave mode) + RS_RECOVERING recovering/resyncing; after recovery usually auto-transitions to secondary + RS_FATAL something bad has occurred and server is not completely offline with regard to the replica set. fatal error. + RS_STARTUP2 loaded config, still determining who is primary + */ + struct MemberState { + enum MS { + RS_STARTUP, + RS_PRIMARY, + RS_SECONDARY, + RS_RECOVERING, + RS_FATAL, + RS_STARTUP2, + RS_UNKNOWN, /* remote node not yet reached */ + RS_ARBITER, + RS_DOWN /* node not reachable for a report */ + } s; + + MemberState(MS ms = RS_UNKNOWN) : s(ms) { } + explicit MemberState(int ms) : s((MS) ms) { } + + bool primary() const { return s == RS_PRIMARY; } + bool secondary() const { return s == RS_SECONDARY; } + bool recovering() const { return s == RS_RECOVERING; } + bool startup2() const { return s == RS_STARTUP2; } + bool fatal() const { return s == RS_FATAL; } + + bool operator==(const MemberState& r) const { return s == r.s; } + bool operator!=(const MemberState& r) const { return s != r.s; } + }; + + /* this is supposed to be just basic information on a member, + and copy constructable. */ + class HeartbeatInfo { + unsigned _id; + public: + HeartbeatInfo() : _id(0xffffffff),skew(INT_MIN) { } + HeartbeatInfo(unsigned id); + bool up() const { return health > 0; } + unsigned id() const { return _id; } + MemberState hbstate; + double health; + time_t upSince; + time_t lastHeartbeat; + string lastHeartbeatMsg; + OpTime opTime; + int skew; + + /* true if changed in a way of interest to the repl set manager. */ + bool changed(const HeartbeatInfo& old) const; + }; + + inline HeartbeatInfo::HeartbeatInfo(unsigned id) : _id(id) { + health = -1.0; + lastHeartbeat = upSince = 0; + skew = INT_MIN; + } + + inline bool HeartbeatInfo::changed(const HeartbeatInfo& old) const { + return health != old.health || + hbstate != old.hbstate; + } + +} diff --git a/db/repl/rs_optime.h b/db/repl/rs_optime.h new file mode 100644 index 0000000..b3607fa --- /dev/null +++ b/db/repl/rs_optime.h @@ -0,0 +1,58 @@ +// @file rs_optime.h
+
+/*
+ * Copyright (C) 2010 10gen Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#pragma once
+
+#include "../../util/optime.h"
+
+namespace mongo {
+
+ const char rsoplog[] = "local.oplog.rs"; +
+ /*
+ class RSOpTime : public OpTime {
+ public:
+ bool initiated() const { return getSecs() != 0; }
+ };*/
+
+ /*struct RSOpTime {
+ unsigned long long ord;
+
+ RSOpTime() : ord(0) { }
+
+ bool initiated() const { return ord > 0; }
+
+ void initiate() {
+ assert( !initiated() );
+ ord = 1000000;
+ }
+
+ ReplTime inc() {
+ DEV assertInWriteLock();
+ return ++ord;
+ }
+
+ string toString() const { return str::stream() << ord; }
+
+ // query the oplog and set the highest value herein. acquires a db read lock. throws.
+ void load();
+ };
+
+ extern RSOpTime rsOpTime;*/
+
+}
diff --git a/db/repl/rs_rollback.cpp b/db/repl/rs_rollback.cpp new file mode 100644 index 0000000..1bb7217 --- /dev/null +++ b/db/repl/rs_rollback.cpp @@ -0,0 +1,481 @@ +/* @file rs_rollback.cpp +* +* Copyright (C) 2008 10gen Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "pch.h" +#include "../client.h" +#include "../../client/dbclient.h" +#include "rs.h" +#include "../repl.h" +#include "../query.h" + +/* Scenarios + + We went offline with ops not replicated out. + + F = node that failed and coming back. + P = node that took over, new primary + + #1: + F : a b c d e f g + P : a b c d q + + The design is "keep P". One could argue here that "keep F" has some merits, however, in most cases P + will have significantly more data. Also note that P may have a proper subset of F's stream if there were + no subsequent writes. + + For now the model is simply : get F back in sync with P. If P was really behind or something, we should have + just chosen not to fail over anyway. + + #2: + F : a b c d e f g -> a b c d + P : a b c d + + #3: + F : a b c d e f g -> a b c d q r s t u v w x z + P : a b c d.q r s t u v w x z + + Steps + find an event in common. 'd'. + undo our events beyond that by: + (1) taking copy from other server of those objects + (2) do not consider copy valid until we pass reach an optime after when we fetched the new version of object + -- i.e., reset minvalid. + (3) we could skip operations on objects that are previous in time to our capture of the object as an optimization. + +*/ + +namespace mongo { + + using namespace bson; + + struct DocID { + const char *ns; + be _id; + bool operator<(const DocID& d) const { + int c = strcmp(ns, d.ns); + if( c < 0 ) return true; + if( c > 0 ) return false; + return _id < d._id; + } + }; + + struct HowToFixUp { + /* note this is a set -- if there are many $inc's on a single document we need to rollback, we only + need to refetch it once. */ + set<DocID> toRefetch; + + /* collections to drop */ + set<string> toDrop; + + OpTime commonPoint; + DiskLoc commonPointOurDiskloc; + + int rbid; // remote server's current rollback sequence # + }; + + static void refetch(HowToFixUp& h, const BSONObj& ourObj) { + const char *op = ourObj.getStringField("op"); + if( *op == 'n' ) + return; + + unsigned long long totSize = 0; + totSize += ourObj.objsize(); + if( totSize > 512 * 1024 * 1024 ) + throw "rollback too large"; + + DocID d; + d.ns = ourObj.getStringField("ns"); + if( *d.ns == 0 ) { + log() << "replSet WARNING ignoring op on rollback no ns TODO : " << ourObj.toString() << rsLog; + return; + } + + bo o = ourObj.getObjectField(*op=='u' ? "o2" : "o"); + if( o.isEmpty() ) { + log() << "replSet warning ignoring op on rollback : " << ourObj.toString() << rsLog; + return; + } + + if( *op == 'c' ) { + be first = o.firstElement(); + NamespaceString s(d.ns); // foo.$cmd + + if( string("create") == first.fieldName() ) { + /* Create collection operation + { ts: ..., h: ..., op: "c", ns: "foo.$cmd", o: { create: "abc", ... } }
+ */ + string ns = s.db + '.' + o["create"].String(); // -> foo.abc + h.toDrop.insert(ns); + return; + } + else { + log() << "replSet WARNING can't roll back this command yet: " << o.toString() << rsLog; + } + } + + d._id = o["_id"]; + if( d._id.eoo() ) { + log() << "replSet WARNING ignoring op on rollback no _id TODO : " << d.ns << ' '<< ourObj.toString() << rsLog; + return; + } + + h.toRefetch.insert(d); + } + + int getRBID(DBClientConnection*); + + static void syncRollbackFindCommonPoint(DBClientConnection *them, HowToFixUp& h) { + static time_t last; + if( time(0)-last < 60 ) { + // this could put a lot of load on someone else, don't repeat too often + sleepsecs(10); + throw "findcommonpoint waiting a while before trying again"; + } + last = time(0); + + assert( dbMutex.atLeastReadLocked() ); + Client::Context c(rsoplog, dbpath, 0, false); + NamespaceDetails *nsd = nsdetails(rsoplog); + assert(nsd); + ReverseCappedCursor u(nsd); + if( !u.ok() ) + throw "our oplog empty or unreadable"; + + const Query q = Query().sort(reverseNaturalObj); + const bo fields = BSON( "ts" << 1 << "h" << 1 ); + + //auto_ptr<DBClientCursor> u = us->query(rsoplog, q, 0, 0, &fields, 0, 0); + + h.rbid = getRBID(them); + auto_ptr<DBClientCursor> t = them->query(rsoplog, q, 0, 0, &fields, 0, 0); + + if( t.get() == 0 || !t->more() ) throw "remote oplog empty or unreadable"; + + BSONObj ourObj = u.current(); + OpTime ourTime = ourObj["ts"]._opTime(); + BSONObj theirObj = t->nextSafe(); + OpTime theirTime = theirObj["ts"]._opTime(); + + if( 1 ) { + long long diff = (long long) ourTime.getSecs() - ((long long) theirTime.getSecs()); + /* diff could be positive, negative, or zero */ + log() << "replSet info syncRollback diff in end of log times : " << diff << " seconds" << rsLog; + if( diff > 3600 ) { + log() << "replSet syncRollback too long a time period for a rollback." << rsLog; + throw "error not willing to roll back more than one hour of data"; + } + } + + unsigned long long scanned = 0; + while( 1 ) { + scanned++; + /* todo add code to assure no excessive scanning for too long */ + if( ourTime == theirTime ) { + if( ourObj["h"].Long() == theirObj["h"].Long() ) { + // found the point back in time where we match. + // todo : check a few more just to be careful about hash collisions. + log() << "replSet rollback found matching events at " << ourTime.toStringPretty() << rsLog; + log() << "replSet rollback findcommonpoint scanned : " << scanned << rsLog; + h.commonPoint = ourTime; + h.commonPointOurDiskloc = u.currLoc(); + return; + } + + refetch(h, ourObj); + + theirObj = t->nextSafe(); + theirTime = theirObj["ts"]._opTime(); + + u.advance(); + if( !u.ok() ) throw "reached beginning of local oplog"; + ourObj = u.current(); + ourTime = ourObj["ts"]._opTime(); + } + else if( theirTime > ourTime ) { + /* todo: we could hit beginning of log here. exception thrown is ok but not descriptive, so fix up */ + theirObj = t->nextSafe(); + theirTime = theirObj["ts"]._opTime(); + } + else { + // theirTime < ourTime + refetch(h, ourObj); + u.advance(); + if( !u.ok() ) throw "reached beginning of local oplog"; + ourObj = u.current(); + ourTime = ourObj["ts"]._opTime(); + } + } + } + + struct X { + const bson::bo *op; + bson::bo goodVersionOfObject; + }; + + void ReplSetImpl::syncFixUp(HowToFixUp& h, OplogReader& r) { + DBClientConnection *them = r.conn(); + + // fetch all first so we needn't handle interruption in a fancy way + + unsigned long long totSize = 0; + + list< pair<DocID,bo> > goodVersions; + + bo newMinValid; + + DocID d; + unsigned long long n = 0; + try { + for( set<DocID>::iterator i = h.toRefetch.begin(); i != h.toRefetch.end(); i++ ) { + d = *i; + + assert( !d._id.eoo() ); + + { + /* TODO : slow. lots of round trips. */ + n++; + bo good= them->findOne(d.ns, d._id.wrap()).getOwned(); + totSize += good.objsize(); + uassert( 13410, "replSet too much data to roll back", totSize < 300 * 1024 * 1024 ); + + // note good might be eoo, indicating we should delete it + goodVersions.push_back(pair<DocID,bo>(d,good)); + } + } + newMinValid = r.getLastOp(rsoplog); + if( newMinValid.isEmpty() ) { + sethbmsg("syncRollback error newMinValid empty?"); + return; + } + } + catch(DBException& e) { + sethbmsg(str::stream() << "syncRollback re-get objects: " << e.toString(),0); + log() << "syncRollback couldn't re-get ns:" << d.ns << " _id:" << d._id << ' ' << n << '/' << h.toRefetch.size() << rsLog; + throw e; + } + + sethbmsg("syncRollback 3.5"); + if( h.rbid != getRBID(r.conn()) ) { + // our source rolled back itself. so the data we received isn't necessarily consistent. + sethbmsg("syncRollback rbid on source changed during rollback, cancelling this attempt"); + return; + } + + // update them + sethbmsg(str::stream() << "syncRollback 4 n:" << goodVersions.size()); + + bool warn = false; + + assert( !h.commonPointOurDiskloc.isNull() ); + + MemoryMappedFile::flushAll(true); + + dbMutex.assertWriteLocked(); + + /* we have items we are writing that aren't from a point-in-time. thus best not to come online + until we get to that point in freshness. */ + try { + log() << "replSet set minvalid=" << newMinValid["ts"]._opTime().toString() << rsLog; + } + catch(...){} + Helpers::putSingleton("local.replset.minvalid", newMinValid); + + /** first drop collections to drop - that might make things faster below actually if there were subsequent inserts */ + for( set<string>::iterator i = h.toDrop.begin(); i != h.toDrop.end(); i++ ) { + Client::Context c(*i, dbpath, 0, /*doauth*/false); + try { + bob res; + string errmsg; + log(1) << "replSet rollback drop: " << *i << rsLog; + dropCollection(*i, errmsg, res); + } + catch(...) { + log() << "replset rollback error dropping collection " << *i << rsLog; + } + } + + Client::Context c(rsoplog, dbpath, 0, /*doauth*/false); + NamespaceDetails *oplogDetails = nsdetails(rsoplog); + uassert(13423, str::stream() << "replSet error in rollback can't find " << rsoplog, oplogDetails); + + map<string,shared_ptr<RemoveSaver> > removeSavers; + + unsigned deletes = 0, updates = 0; + for( list<pair<DocID,bo> >::iterator i = goodVersions.begin(); i != goodVersions.end(); i++ ) { + const DocID& d = i->first; + bo pattern = d._id.wrap(); // { _id : ... } + try { + assert( d.ns && *d.ns ); + + shared_ptr<RemoveSaver>& rs = removeSavers[d.ns]; + if ( ! rs ) + rs.reset( new RemoveSaver( "rollback" , "" , d.ns ) ); + + // todo: lots of overhead in context, this can be faster + Client::Context c(d.ns, dbpath, 0, /*doauth*/false); + if( i->second.isEmpty() ) { + // wasn't on the primary; delete. + /* TODO1.6 : can't delete from a capped collection. need to handle that here. */ + deletes++; + + NamespaceDetails *nsd = nsdetails(d.ns); + if( nsd ) { + if( nsd->capped ) { + /* can't delete from a capped collection - so we truncate instead. if this item must go, + so must all successors!!! */ + try { + /** todo: IIRC cappedTrunateAfter does not handle completely empty. todo. */ + // this will crazy slow if no _id index. + long long start = Listener::getElapsedTimeMillis(); + DiskLoc loc = Helpers::findOne(d.ns, pattern, false); + if( Listener::getElapsedTimeMillis() - start > 200 ) + log() << "replSet warning roll back slow no _id index for " << d.ns << rsLog; + //would be faster but requires index: DiskLoc loc = Helpers::findById(nsd, pattern); + if( !loc.isNull() ) { + try { + nsd->cappedTruncateAfter(d.ns, loc, true); + } + catch(DBException& e) { + if( e.getCode() == 13415 ) { + // hack: need to just make cappedTruncate do this... + nsd->emptyCappedCollection(d.ns); + } else { + throw; + } + } + } + } + catch(DBException& e) { + log() << "replSet error rolling back capped collection rec " << d.ns << ' ' << e.toString() << rsLog; + } + } + else { + try { + deletes++; + deleteObjects(d.ns, pattern, /*justone*/true, /*logop*/false, /*god*/true, rs.get() ); + } + catch(...) { + log() << "replSet error rollback delete failed ns:" << d.ns << rsLog; + } + } + // did we just empty the collection? if so let's check if it even exists on the source. + if( nsd->nrecords == 0 ) { + try { + string sys = cc().database()->name + ".system.namespaces"; + bo o = them->findOne(sys, QUERY("name"<<d.ns)); + if( o.isEmpty() ) { + // we should drop + try { + bob res; + string errmsg; + dropCollection(d.ns, errmsg, res); + } + catch(...) { + log() << "replset error rolling back collection " << d.ns << rsLog; + } + } + } + catch(DBException& ) { + /* this isn't *that* big a deal, but is bad. */ + log() << "replSet warning rollback error querying for existence of " << d.ns << " at the primary, ignoring" << rsLog; + } + } + } + } + else { + // todo faster... + OpDebug debug; + updates++; + _updateObjects(/*god*/true, d.ns, i->second, pattern, /*upsert=*/true, /*multi=*/false , /*logtheop=*/false , debug, rs.get() ); + } + } + catch(DBException& e) { + log() << "replSet exception in rollback ns:" << d.ns << ' ' << pattern.toString() << ' ' << e.toString() << " ndeletes:" << deletes << rsLog; + warn = true; + } + } + + removeSavers.clear(); // this effectively closes all of them + + sethbmsg(str::stream() << "syncRollback 5 d:" << deletes << " u:" << updates); + MemoryMappedFile::flushAll(true); + sethbmsg("syncRollback 6"); + + // clean up oplog + log(2) << "replSet rollback truncate oplog after " << h.commonPoint.toStringPretty() << rsLog; + // todo: fatal error if this throws? + oplogDetails->cappedTruncateAfter(rsoplog, h.commonPointOurDiskloc, false); + + /* reset cached lastoptimewritten and h value */ + loadLastOpTimeWritten(); + + sethbmsg("syncRollback 7"); + MemoryMappedFile::flushAll(true); + + // done + if( warn ) + sethbmsg("issues during syncRollback, see log"); + else + sethbmsg("syncRollback done"); + } + + void ReplSetImpl::syncRollback(OplogReader&r) { + assert( !lockedByMe() ); + assert( !dbMutex.atLeastReadLocked() ); + + sethbmsg("syncRollback 0"); + + writelocktry lk(rsoplog, 20000); + if( !lk.got() ) { + sethbmsg("syncRollback couldn't get write lock in a reasonable time"); + sleepsecs(2); + return; + } + + HowToFixUp how; + sethbmsg("syncRollback 1"); + { + r.resetCursor(); + /*DBClientConnection us(false, 0, 0); + string errmsg; + if( !us.connect(HostAndPort::me().toString(),errmsg) ) { + sethbmsg("syncRollback connect to self failure" + errmsg); + return; + }*/ + + sethbmsg("syncRollback 2 FindCommonPoint"); + try { + syncRollbackFindCommonPoint(r.conn(), how); + } + catch( const char *p ) { + sethbmsg(string("syncRollback 2 error ") + p); + sleepsecs(10); + return; + } + catch( DBException& e ) { + sethbmsg(string("syncRollback 2 exception ") + e.toString() + "; sleeping 1 min"); + sleepsecs(60); + throw; + } + } + + sethbmsg("replSet syncRollback 3 fixup"); + + syncFixUp(how, r); + } + +} diff --git a/db/repl/rs_sync.cpp b/db/repl/rs_sync.cpp new file mode 100644 index 0000000..bece96c --- /dev/null +++ b/db/repl/rs_sync.cpp @@ -0,0 +1,328 @@ +/** +* Copyright (C) 2008 10gen Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "pch.h" +#include "../client.h" +#include "../../client/dbclient.h" +#include "rs.h" +#include "../repl.h" + +namespace mongo { + + using namespace bson; + + void startSyncThread() { + Client::initThread("rs_sync"); + theReplSet->syncThread(); + cc().shutdown(); + } + + void ReplSetImpl::syncApply(const BSONObj &o) { + //const char *op = o.getStringField("op"); + + char db[MaxDatabaseLen]; + const char *ns = o.getStringField("ns"); + nsToDatabase(ns, db); + + if ( *ns == '.' || *ns == 0 ) { + if( *o.getStringField("op") == 'n' ) + return; + log() << "replSet skipping bad op in oplog: " << o.toString() << endl; + return; + } + + Client::Context ctx(ns); + ctx.getClient()->curop()->reset(); + + /* todo : if this asserts, do we want to ignore or not? */ + applyOperation_inlock(o); + } + + bool ReplSetImpl::initialSyncOplogApplication( + string hn, + const Member *primary, + OpTime applyGTE, + OpTime minValid) + { + if( primary == 0 ) return false; + + OpTime ts; + try { + OplogReader r; + if( !r.connect(hn) ) { + log(2) << "replSet can't connect to " << hn << " to read operations" << rsLog; + return false; + } + + r.query(rsoplog, bo()); + assert( r.haveCursor() ); + + /* we lock outside the loop to avoid the overhead of locking on every operation. server isn't usable yet anyway! */ + writelock lk(""); + + { + if( !r.more() ) { + sethbmsg("replSet initial sync error reading remote oplog"); + return false; + } + bo op = r.next(); + OpTime t = op["ts"]._opTime(); + r.putBack(op); + assert( !t.isNull() ); + if( t > applyGTE ) { + sethbmsg(str::stream() << "error " << hn << " oplog wrapped during initial sync"); + return false; + } + } + + // todo : use exhaust + unsigned long long n = 0; + while( 1 ) { + if( !r.more() ) + break; + BSONObj o = r.nextSafe(); /* note we might get "not master" at some point */ + { + //writelock lk(""); + + ts = o["ts"]._opTime(); + + /* if we have become primary, we dont' want to apply things from elsewhere + anymore. assumePrimary is in the db lock so we are safe as long as + we check after we locked above. */ + const Member *p1 = box.getPrimary(); + if( p1 != primary ) { + log() << "replSet primary was:" << primary->fullName() << " now:" << + (p1 != 0 ? p1->fullName() : "none") << rsLog; + throw DBException("primary changed",0); + } + + if( ts >= applyGTE ) { + // optimes before we started copying need not be applied. + syncApply(o); + } + _logOpObjRS(o); /* with repl sets we write the ops to our oplog too */ + } + if( ++n % 100000 == 0 ) { + // simple progress metering + log() << "replSet initialSyncOplogApplication " << n << rsLog; + } + } + } + catch(DBException& e) { + if( ts <= minValid ) { + // didn't make it far enough + log() << "replSet initial sync failing, error applying oplog " << e.toString() << rsLog; + return false; + } + } + return true; + } + + void ReplSetImpl::syncTail() { + // todo : locking vis a vis the mgr... + + const Member *primary = box.getPrimary(); + if( primary == 0 ) return; + string hn = primary->h().toString(); + OplogReader r; + if( !r.connect(primary->h().toString()) ) { + log(2) << "replSet can't connect to " << hn << " to read operations" << rsLog; + return; + } + + /* first make sure we are not hopelessly out of sync by being very stale. */ + { + BSONObj remoteOldestOp = r.findOne(rsoplog, Query()); + OpTime ts = remoteOldestOp["ts"]._opTime(); + DEV log() << "remoteOldestOp: " << ts.toStringPretty() << endl; + else log(3) << "remoteOldestOp: " << ts.toStringPretty() << endl; + if( lastOpTimeWritten < ts ) { + log() << "replSet error too stale to catch up, at least from primary " << hn << rsLog; + log() << "replSet our last optime : " << lastOpTimeWritten.toStringPretty() << rsLog; + log() << "replSet oldest at " << hn << " : " << ts.toStringPretty() << rsLog; + log() << "replSet See http://www.mongodb.org/display/DOCS/Resyncing+a+Very+Stale+Replica+Set+Member" << rsLog; + sethbmsg("error too stale to catch up"); + sleepsecs(120); + return; + } + } + + r.tailingQueryGTE(rsoplog, lastOpTimeWritten); + assert( r.haveCursor() ); + assert( r.awaitCapable() ); + + { + if( !r.more() ) { + /* maybe we are ahead and need to roll back? */ + try { + bo theirLastOp = r.getLastOp(rsoplog); + if( theirLastOp.isEmpty() ) { + log() << "replSet error empty query result from " << hn << " oplog" << rsLog; + sleepsecs(2); + return; + } + OpTime theirTS = theirLastOp["ts"]._opTime(); + if( theirTS < lastOpTimeWritten ) { + log() << "replSet we are ahead of the primary, will try to roll back" << rsLog; + syncRollback(r); + return; + } + /* we're not ahead? maybe our new query got fresher data. best to come back and try again */ + log() << "replSet syncTail condition 1" << rsLog; + sleepsecs(1); + } + catch(DBException& e) { + log() << "replSet error querying " << hn << ' ' << e.toString() << rsLog; + sleepsecs(2); + } + return; + /* + log() << "replSet syncTail error querying oplog >= " << lastOpTimeWritten.toString() << " from " << hn << rsLog; + try { + log() << "replSet " << hn << " last op: " << r.getLastOp(rsoplog).toString() << rsLog; + } + catch(...) { } + sleepsecs(1); + return;*/ + } + + BSONObj o = r.nextSafe(); + OpTime ts = o["ts"]._opTime(); + long long h = o["h"].numberLong(); + if( ts != lastOpTimeWritten || h != lastH ) { + log(1) << "TEMP our last op time written: " << lastOpTimeWritten.toStringPretty() << endl; + log(1) << "TEMP primary's GTE: " << ts.toStringPretty() << endl; + /* + }*/ + + syncRollback(r); + return; + } + } + + while( 1 ) { + while( 1 ) { + if( !r.moreInCurrentBatch() ) { + /* we need to occasionally check some things. between + batches is probably a good time. */ + + /* perhaps we should check this earlier? but not before the rollback checks. */ + if( state().recovering() ) { + /* can we go to RS_SECONDARY state? we can if not too old and if minvalid achieved */ + bool golive = false; + OpTime minvalid; + { + readlock lk("local.replset.minvalid"); + BSONObj mv; + if( Helpers::getSingleton("local.replset.minvalid", mv) ) { + minvalid = mv["ts"]._opTime(); + if( minvalid <= lastOpTimeWritten ) { + golive=true; + } + } + else + golive = true; /* must have been the original member */ + } + if( golive ) { + sethbmsg(""); + log() << "replSet SECONDARY" << rsLog; + changeState(MemberState::RS_SECONDARY); + } + else { + sethbmsg(str::stream() << "still syncing, not yet to minValid optime " << minvalid.toString()); + } + + /* todo: too stale capability */ + } + + if( box.getPrimary() != primary ) + return; + } + if( !r.more() ) + break; + { + BSONObj o = r.nextSafe(); /* note we might get "not master" at some point */ + { + writelock lk(""); + + /* if we have become primary, we dont' want to apply things from elsewhere + anymore. assumePrimary is in the db lock so we are safe as long as + we check after we locked above. */ + if( box.getPrimary() != primary ) { + if( box.getState().primary() ) + log(0) << "replSet stopping syncTail we are now primary" << rsLog; + return; + } + + syncApply(o); + _logOpObjRS(o); /* with repl sets we write the ops to our oplog too: */ + } + } + } + r.tailCheck(); + if( !r.haveCursor() ) { + log() << "replSet TEMP end syncTail pass with " << hn << rsLog; + // TODO : reuse our cnonection to the primary. + return; + } + if( box.getPrimary() != primary ) + return; + // looping back is ok because this is a tailable cursor + } + } + + void ReplSetImpl::_syncThread() { + StateBox::SP sp = box.get(); + if( sp.state.primary() ) { + sleepsecs(1); + return; + } + + /* later, we can sync from up secondaries if we want. tbd. */ + if( sp.primary == 0 ) + return; + + /* do we have anything at all? */ + if( lastOpTimeWritten.isNull() ) { + syncDoInitialSync(); + return; // _syncThread will be recalled, starts from top again in case sync failed. + } + + /* we have some data. continue tailing. */ + syncTail(); + } + + void ReplSetImpl::syncThread() { + if( myConfig().arbiterOnly ) + return; + while( 1 ) { + try { + _syncThread(); + } + catch(DBException& e) { + sethbmsg("syncThread: " + e.toString()); + sleepsecs(10); + } + catch(...) { + sethbmsg("unexpected exception in syncThread()"); + // TODO : SET NOT SECONDARY here. + sleepsecs(60); + } + sleepsecs(1); + } + } + +} diff --git a/db/repl/test.html b/db/repl/test.html new file mode 100644 index 0000000..295ad2e --- /dev/null +++ b/db/repl/test.html @@ -0,0 +1,11 @@ +<HTML>
+<BODY>
+<!-- see also jstests/rs/ -->
+<iframe src="http://127.0.0.1:28000/_replSet" width="100%" height="50%" frameborder=1>
+</iframe>
+
+<iframe src="http://127.0.0.1:28001/_replSet" width="100%" height="50%" frameborder=1>
+</iframe>
+
+</BODY>
+</HTML>
diff --git a/db/repl/testing.js b/db/repl/testing.js new file mode 100644 index 0000000..d741cf3 --- /dev/null +++ b/db/repl/testing.js @@ -0,0 +1,42 @@ +// helpers for testing repl sets
+// run
+// mongo --shell <host:port> testing.js
+
+cfg = {
+ _id: 'asdf',
+ members: [
+ { _id : 0, host : "dm_hp" },
+ { _id : 2, host : "dm_hp:27002" }
+ ]
+};
+c2 = {
+ _id: 'asdf',
+ members: [
+ { _id: 0, host: "dmthink" },
+ { _id: 2, host: "dmthink:27002" }
+ ]
+};
+
+db = db.getSisterDB("admin");
+local = db.getSisterDB("local");
+
+print("\n\ndb = admin db on localhost:27017");
+print("b = admin on localhost:27002");
+print("rc(x) = db.runCommand(x)");
+print("cfg = samp replset config");
+print("i() = replSetInitiate(cfg)");
+print("ism() = rc('ismaster')");
+print("\n\n");
+
+function rc(c) { return db.runCommand(c); }
+function i() { return rc({ replSetInitiate: cfg }); }
+function ism() { return rc("isMaster"); }
+
+b = 0;
+try {
+ b = new Mongo("localhost:27002").getDB("admin");
+}
+catch (e) {
+ print("\nCouldn't connect to b mongod instance\n");
+}
+
|