summaryrefslogtreecommitdiff
path: root/db/repl
diff options
context:
space:
mode:
Diffstat (limited to 'db/repl')
-rw-r--r--db/repl/connections.h49
-rw-r--r--db/repl/consensus.cpp124
-rw-r--r--db/repl/health.cpp161
-rw-r--r--db/repl/health.h8
-rw-r--r--db/repl/heartbeat.cpp71
-rw-r--r--db/repl/manager.cpp70
-rw-r--r--db/repl/multicmd.h29
-rw-r--r--db/repl/replset_commands.cpp106
-rw-r--r--db/repl/rs.cpp282
-rw-r--r--db/repl/rs.h115
-rw-r--r--db/repl/rs_config.cpp174
-rw-r--r--db/repl/rs_config.h20
-rw-r--r--[-rwxr-xr-x]db/repl/rs_exception.h18
-rw-r--r--db/repl/rs_initialsync.cpp205
-rw-r--r--db/repl/rs_initiate.cpp66
-rw-r--r--db/repl/rs_member.h35
-rw-r--r--db/repl/rs_optime.h114
-rw-r--r--db/repl/rs_rollback.cpp661
-rw-r--r--db/repl/rs_sync.cpp368
19 files changed, 1585 insertions, 1091 deletions
diff --git a/db/repl/connections.h b/db/repl/connections.h
index cdf2fad..7e7bfe5 100644
--- a/db/repl/connections.h
+++ b/db/repl/connections.h
@@ -1,4 +1,4 @@
-// @file
+// @file
/*
* Copyright (C) 2010 10gen Inc.
@@ -20,11 +20,12 @@
#include <map>
#include "../../client/dbclient.h"
+#include "../security_key.h"
-namespace mongo {
+namespace mongo {
- /** here we keep a single connection (with reconnect) for a set of hosts,
- one each, and allow one user at a time per host. if in use already for that
+ /** here we keep a single connection (with reconnect) for a set of hosts,
+ one each, and allow one user at a time per host. if in use already for that
host, we block. so this is an easy way to keep a 1-deep pool of connections
that many threads can share.
@@ -39,35 +40,37 @@ namespace mongo {
throws exception on connect error (but fine to try again later with a new
scopedconn object for same host).
*/
- class ScopedConn {
+ class ScopedConn {
public:
/** throws assertions if connect failure etc. */
ScopedConn(string hostport);
~ScopedConn();
/* If we were to run a query and not exhaust the cursor, future use of the connection would be problematic.
- So here what we do is wrapper known safe methods and not allow cursor-style queries at all. This makes
+ So here what we do is wrapper known safe methods and not allow cursor-style queries at all. This makes
ScopedConn limited in functionality but very safe. More non-cursor wrappers can be added here if needed.
*/
bool runCommand(const string &dbname, const BSONObj& cmd, BSONObj &info, int options=0) {
return conn()->runCommand(dbname, cmd, info, options);
}
- unsigned long long count(const string &ns) {
- return conn()->count(ns);
+ unsigned long long count(const string &ns) {
+ return conn()->count(ns);
}
- BSONObj findOne(const string &ns, const Query& q, const BSONObj *fieldsToReturn = 0, int queryOptions = 0) {
+ BSONObj findOne(const string &ns, const Query& q, const BSONObj *fieldsToReturn = 0, int queryOptions = 0) {
return conn()->findOne(ns, q, fieldsToReturn, queryOptions);
}
+ void setTimeout(double to) {
+ conn()->setSoTimeout(to);
+ }
private:
auto_ptr<scoped_lock> connLock;
- static mutex mapMutex;
- struct X {
- mutex z;
+ static mongo::mutex mapMutex;
+ struct X {
+ mongo::mutex z;
DBClientConnection cc;
- X() : z("X"), cc(/*reconnect*/ true, 0,
- /*timeout*/ theReplSet ? theReplSet->config().ho.heartbeatTimeoutMillis/1000.0 : 10.0) {
+ X() : z("X"), cc(/*reconnect*/ true, 0, /*timeout*/ 10.0) {
cc._logLevel = 2;
}
} *x;
@@ -87,22 +90,30 @@ namespace mongo {
connLock.reset( new scoped_lock(x->z) );
}
}
- if( !first ) {
+ if( !first ) {
connLock.reset( new scoped_lock(x->z) );
return;
}
// we already locked above...
string err;
- x->cc.connect(hostport, err);
+ if (!x->cc.connect(hostport, err)) {
+ log() << "couldn't connect to " << hostport << ": " << err << rsLog;
+ return;
+ }
+
+ if (!noauth && !x->cc.auth("local", internalSecurity.user, internalSecurity.pwd, err, false)) {
+ log() << "could not authenticate against " << conn()->toString() << ", " << err << rsLog;
+ return;
+ }
}
- inline ScopedConn::~ScopedConn() {
+ inline ScopedConn::~ScopedConn() {
// conLock releases...
}
- /*inline DBClientConnection* ScopedConn::operator->() {
- return &x->cc;
+ /*inline DBClientConnection* ScopedConn::operator->() {
+ return &x->cc;
}*/
}
diff --git a/db/repl/consensus.cpp b/db/repl/consensus.cpp
index 1519c26..f764abe 100644
--- a/db/repl/consensus.cpp
+++ b/db/repl/consensus.cpp
@@ -19,9 +19,9 @@
#include "rs.h"
#include "multicmd.h"
-namespace mongo {
+namespace mongo {
- class CmdReplSetFresh : public ReplSetCommand {
+ class CmdReplSetFresh : public ReplSetCommand {
public:
CmdReplSetFresh() : ReplSetCommand("replSetFresh") { }
private:
@@ -29,23 +29,23 @@ namespace mongo {
if( !check(errmsg, result) )
return false;
- if( cmdObj["set"].String() != theReplSet->name() ) {
+ if( cmdObj["set"].String() != theReplSet->name() ) {
errmsg = "wrong repl set name";
return false;
}
string who = cmdObj["who"].String();
int cfgver = cmdObj["cfgver"].Int();
- OpTime opTime(cmdObj["opTime"].Date());
+ OpTime opTime(cmdObj["opTime"].Date());
bool weAreFresher = false;
- if( theReplSet->config().version > cfgver ) {
+ if( theReplSet->config().version > cfgver ) {
log() << "replSet member " << who << " is not yet aware its cfg version " << cfgver << " is stale" << rsLog;
- result.append("info", "config version stale");
+ result.append("info", "config version stale");
+ weAreFresher = true;
+ }
+ else if( opTime < theReplSet->lastOpTimeWritten ) {
weAreFresher = true;
}
- else if( opTime < theReplSet->lastOpTimeWritten ) {
- weAreFresher = true;
- }
result.appendDate("opTime", theReplSet->lastOpTimeWritten.asDate());
result.append("fresher", weAreFresher);
return true;
@@ -66,19 +66,19 @@ namespace mongo {
}
} cmdReplSetElect;
- int Consensus::totalVotes() const {
+ int Consensus::totalVotes() const {
static int complain = 0;
int vTot = rs._self->config().votes;
- for( Member *m = rs.head(); m; m=m->next() )
+ for( Member *m = rs.head(); m; m=m->next() )
vTot += m->config().votes;
if( vTot % 2 == 0 && vTot && complain++ == 0 )
- log() << "replSet warning total number of votes is even - considering giving one member an extra vote" << rsLog;
+ log() << "replSet " /*buildbot! warning */ "total number of votes is even - add arbiter or give one member an extra vote" << rsLog;
return vTot;
}
bool Consensus::aMajoritySeemsToBeUp() const {
int vUp = rs._self->config().votes;
- for( Member *m = rs.head(); m; m=m->next() )
+ for( Member *m = rs.head(); m; m=m->next() )
vUp += m->hbinfo().up() ? m->config().votes : 0;
return vUp * 2 > totalVotes();
}
@@ -98,13 +98,13 @@ namespace mongo {
const time_t LeaseTime = 30;
- unsigned Consensus::yea(unsigned memberId) /* throws VoteException */ {
+ unsigned Consensus::yea(unsigned memberId) { /* throws VoteException */
Atomic<LastYea>::tran t(ly);
LastYea &ly = t.ref();
time_t now = time(0);
if( ly.when + LeaseTime >= now && ly.who != memberId ) {
log(1) << "replSet not voting yea for " << memberId <<
- " voted for " << ly.who << ' ' << now-ly.when << " secs ago" << rsLog;
+ " voted for " << ly.who << ' ' << now-ly.when << " secs ago" << rsLog;
throw VoteException();
}
ly.when = now;
@@ -112,7 +112,7 @@ namespace mongo {
return rs._self->config().votes;
}
- /* we vote for ourself at start of election. once it fails, we can cancel the lease we had in
+ /* we vote for ourself at start of election. once it fails, we can cancel the lease we had in
place instead of leaving it for a long time.
*/
void Consensus::electionFailed(unsigned meid) {
@@ -124,7 +124,7 @@ namespace mongo {
}
/* todo: threading **************** !!!!!!!!!!!!!!!! */
- void Consensus::electCmdReceived(BSONObj cmd, BSONObjBuilder* _b) {
+ void Consensus::electCmdReceived(BSONObj cmd, BSONObjBuilder* _b) {
BSONObjBuilder& b = *_b;
DEV log() << "replSet received elect msg " << cmd.toString() << rsLog;
else log(2) << "replSet received elect msg " << cmd.toString() << rsLog;
@@ -138,14 +138,14 @@ namespace mongo {
const Member* hopeful = rs.findById(whoid);
int vote = 0;
- if( set != rs.name() ) {
+ if( set != rs.name() ) {
log() << "replSet error received an elect request for '" << set << "' but our set name is '" << rs.name() << "'" << rsLog;
}
- else if( myver < cfgver ) {
+ else if( myver < cfgver ) {
// we are stale. don't vote
}
- else if( myver > cfgver ) {
+ else if( myver > cfgver ) {
// they are stale!
log() << "replSet info got stale version # during election" << rsLog;
vote = -10000;
@@ -154,10 +154,10 @@ namespace mongo {
log() << "couldn't find member with id " << whoid << rsLog;
vote = -10000;
}
- else if( primary && primary->hbinfo().opTime > hopeful->hbinfo().opTime ) {
+ else if( primary && primary->hbinfo().opTime >= hopeful->hbinfo().opTime ) {
// other members might be aware of more up-to-date nodes
log() << hopeful->fullName() << " is trying to elect itself but " <<
- primary->fullName() << " is already primary and more up-to-date" << rsLog;
+ primary->fullName() << " is already primary and more up-to-date" << rsLog;
vote = -10000;
}
else {
@@ -166,7 +166,7 @@ namespace mongo {
rs.relinquish();
log() << "replSet info voting yea for " << whoid << rsLog;
}
- catch(VoteException&) {
+ catch(VoteException&) {
log() << "replSet voting no already voted for another" << rsLog;
}
}
@@ -182,10 +182,10 @@ namespace mongo {
L.push_back( Target(m->fullName()) );
}
- /* config version is returned as it is ok to use this unlocked. BUT, if unlocked, you would need
+ /* config version is returned as it is ok to use this unlocked. BUT, if unlocked, you would need
to check later that the config didn't change. */
void ReplSetImpl::getTargets(list<Target>& L, int& configVersion) {
- if( lockedByMe() ) {
+ if( lockedByMe() ) {
_getTargets(L, configVersion);
return;
}
@@ -200,15 +200,21 @@ namespace mongo {
bool Consensus::weAreFreshest(bool& allUp, int& nTies) {
const OpTime ord = theReplSet->lastOpTimeWritten;
nTies = 0;
- assert( !ord.isNull() );
+ assert( !ord.isNull() );
BSONObj cmd = BSON(
- "replSetFresh" << 1 <<
- "set" << rs.name() <<
- "opTime" << Date_t(ord.asDate()) <<
- "who" << rs._self->fullName() <<
- "cfgver" << rs._cfg->version );
+ "replSetFresh" << 1 <<
+ "set" << rs.name() <<
+ "opTime" << Date_t(ord.asDate()) <<
+ "who" << rs._self->fullName() <<
+ "cfgver" << rs._cfg->version );
list<Target> L;
int ver;
+ /* the following queries arbiters, even though they are never fresh. wonder if that makes sense.
+ it doesn't, but it could, if they "know" what freshness it one day. so consider removing
+ arbiters from getTargets() here. although getTargets is used elsewhere for elections; there
+ arbiters are certainly targets - so a "includeArbs" bool would be necessary if we want to make
+ not fetching them herein happen.
+ */
rs.getTargets(L, ver);
multiCommand(cmd, L);
int nok = 0;
@@ -228,25 +234,25 @@ namespace mongo {
allUp = false;
}
}
- DEV log() << "replSet dev we are freshest of up nodes, nok:" << nok << " nTies:" << nTies << rsLog;
+ log(1) << "replSet dev we are freshest of up nodes, nok:" << nok << " nTies:" << nTies << rsLog;
assert( ord <= theReplSet->lastOpTimeWritten ); // <= as this may change while we are working...
return true;
}
extern time_t started;
- void Consensus::multiCommand(BSONObj cmd, list<Target>& L) {
+ void Consensus::multiCommand(BSONObj cmd, list<Target>& L) {
assert( !rs.lockedByMe() );
mongo::multiCommand(cmd, L);
}
void Consensus::_electSelf() {
- if( time(0) < steppedDown )
+ if( time(0) < steppedDown )
return;
{
const OpTime ord = theReplSet->lastOpTimeWritten;
- if( ord == 0 ) {
+ if( ord == 0 ) {
log() << "replSet info not trying to elect self, do not yet have a complete set of data from any point in time" << rsLog;
return;
}
@@ -254,16 +260,16 @@ namespace mongo {
bool allUp;
int nTies;
- if( !weAreFreshest(allUp, nTies) ) {
+ if( !weAreFreshest(allUp, nTies) ) {
log() << "replSet info not electing self, we are not freshest" << rsLog;
return;
}
rs.sethbmsg("",9);
- if( !allUp && time(0) - started < 60 * 5 ) {
- /* the idea here is that if a bunch of nodes bounce all at once, we don't want to drop data
- if we don't have to -- we'd rather be offline and wait a little longer instead
+ if( !allUp && time(0) - started < 60 * 5 ) {
+ /* the idea here is that if a bunch of nodes bounce all at once, we don't want to drop data
+ if we don't have to -- we'd rather be offline and wait a little longer instead
todo: make this configurable.
*/
rs.sethbmsg("not electing self, not all members up and we have been up less than 5 minutes");
@@ -276,9 +282,10 @@ namespace mongo {
/* tie? we then randomly sleep to try to not collide on our voting. */
/* todo: smarter. */
if( me.id() == 0 || sleptLast ) {
- // would be fine for one node not to sleep
+ // would be fine for one node not to sleep
// todo: biggest / highest priority nodes should be the ones that get to not sleep
- } else {
+ }
+ else {
assert( !rs.lockedByMe() ); // bad to go to sleep locked
unsigned ms = ((unsigned) rand()) % 1000 + 50;
DEV log() << "replSet tie " << nTies << " sleeping a little " << ms << "ms" << rsLog;
@@ -297,13 +304,13 @@ namespace mongo {
log() << "replSet info electSelf " << meid << rsLog;
BSONObj electCmd = BSON(
- "replSetElect" << 1 <<
- "set" << rs.name() <<
- "who" << me.fullName() <<
- "whoid" << me.hbinfo().id() <<
- "cfgver" << rs._cfg->version <<
- "round" << OID::gen() /* this is just for diagnostics */
- );
+ "replSetElect" << 1 <<
+ "set" << rs.name() <<
+ "who" << me.fullName() <<
+ "whoid" << me.hbinfo().id() <<
+ "cfgver" << rs._cfg->version <<
+ "round" << OID::gen() /* this is just for diagnostics */
+ );
int configVersion;
list<Target> L;
@@ -326,7 +333,7 @@ namespace mongo {
// defensive; should never happen as we have timeouts on connection and operation for our conn
log() << "replSet too much time passed during our election, ignoring result" << rsLog;
}
- else if( configVersion != rs.config().version ) {
+ else if( configVersion != rs.config().version ) {
log() << "replSet config version changed during our election, ignoring result" << rsLog;
}
else {
@@ -334,9 +341,10 @@ namespace mongo {
log(1) << "replSet election succeeded, assuming primary role" << rsLog;
success = true;
rs.assumePrimary();
- }
+ }
}
- } catch( std::exception& ) {
+ }
+ catch( std::exception& ) {
if( !success ) electionFailed(meid);
throw;
}
@@ -347,19 +355,19 @@ namespace mongo {
assert( !rs.lockedByMe() );
assert( !rs.myConfig().arbiterOnly );
assert( rs.myConfig().slaveDelay == 0 );
- try {
- _electSelf();
- }
- catch(RetryAfterSleepException&) {
+ try {
+ _electSelf();
+ }
+ catch(RetryAfterSleepException&) {
throw;
}
- catch(VoteException& ) {
+ catch(VoteException& ) {
log() << "replSet not trying to elect self as responded yea to someone else recently" << rsLog;
}
- catch(DBException& e) {
+ catch(DBException& e) {
log() << "replSet warning caught unexpected exception in electSelf() " << e.toString() << rsLog;
}
- catch(...) {
+ catch(...) {
log() << "replSet warning caught unexpected exception in electSelf()" << rsLog;
}
}
diff --git a/db/repl/health.cpp b/db/repl/health.cpp
index c75221c..762ca90 100644
--- a/db/repl/health.cpp
+++ b/db/repl/health.cpp
@@ -32,20 +32,22 @@
#include "../dbhelpers.h"
namespace mongo {
+
/* decls for connections.h */
- ScopedConn::M& ScopedConn::_map = *(new ScopedConn::M());
+ ScopedConn::M& ScopedConn::_map = *(new ScopedConn::M());
mutex ScopedConn::mapMutex("ScopedConn::mapMutex");
}
-namespace mongo {
+namespace mongo {
using namespace mongoutils::html;
using namespace bson;
static RamLog _rsLog;
Tee *rsLog = &_rsLog;
+ extern bool replSetBlind;
- string ago(time_t t) {
+ string ago(time_t t) {
if( t == 0 ) return "";
time_t x = time(0) - t;
@@ -58,14 +60,14 @@ namespace mongo {
s.precision(2);
s << x / 60.0 << " mins";
}
- else {
+ else {
s.precision(2);
s << x / 3600.0 << " hrs";
}
return s.str();
}
- void Member::summarizeMember(stringstream& s) const {
+ void Member::summarizeMember(stringstream& s) const {
s << tr();
{
stringstream u;
@@ -89,27 +91,29 @@ namespace mongo {
s << td(h);
}
s << td(config().votes);
- {
+ s << td(config().priority);
+ {
string stateText = state().toString();
if( _config.hidden )
stateText += " (hidden)";
- if( ok || stateText.empty() )
+ if( ok || stateText.empty() )
s << td(stateText); // text blank if we've never connected
else
s << td( grey(str::stream() << "(was " << state().toString() << ')', true) );
}
s << td( grey(hbinfo().lastHeartbeatMsg,!ok) );
stringstream q;
- q << "/_replSetOplog?" << id();
+ q << "/_replSetOplog?_id=" << id();
s << td( a(q.str(), "", never ? "?" : hbinfo().opTime.toString()) );
if( hbinfo().skew > INT_MIN ) {
s << td( grey(str::stream() << hbinfo().skew,!ok) );
- } else
+ }
+ else
s << td("");
s << _tr();
}
-
- string ReplSetImpl::stateAsHtml(MemberState s) {
+
+ string ReplSetImpl::stateAsHtml(MemberState s) {
if( s.s == MemberState::RS_STARTUP ) return a("", "serving still starting up, or still trying to initiate the set", "STARTUP");
if( s.s == MemberState::RS_PRIMARY ) return a("", "this server thinks it is primary", "PRIMARY");
if( s.s == MemberState::RS_SECONDARY ) return a("", "this server thinks it is a secondary (slave mode)", "SECONDARY");
@@ -122,7 +126,7 @@ namespace mongo {
return "";
}
- string MemberState::toString() const {
+ string MemberState::toString() const {
if( s == MemberState::RS_STARTUP ) return "STARTUP";
if( s == MemberState::RS_PRIMARY ) return "PRIMARY";
if( s == MemberState::RS_SECONDARY ) return "SECONDARY";
@@ -143,9 +147,9 @@ namespace mongo {
set<string> skip;
be e = op["ts"];
- if( e.type() == Date || e.type() == Timestamp ) {
+ if( e.type() == Date || e.type() == Timestamp ) {
OpTime ot = e._opTime();
- ss << td( time_t_to_String_short( ot.getSecs() ) );
+ ss << td( time_t_to_String_short( ot.getSecs() ) );
ss << td( ot.toString() );
skip.insert("ts");
}
@@ -155,7 +159,8 @@ namespace mongo {
if( e.type() == NumberLong ) {
ss << "<td>" << hex << e.Long() << "</td>\n";
skip.insert("h");
- } else
+ }
+ else
ss << td("?");
ss << td(op["op"].valuestrsafe());
@@ -164,20 +169,17 @@ namespace mongo {
skip.insert("ns");
ss << "<td>";
- for( bo::iterator i(op); i.more(); ) {
+ for( bo::iterator i(op); i.more(); ) {
be e = i.next();
if( skip.count(e.fieldName()) ) continue;
ss << e.toString() << ' ';
}
- ss << "</td>";
-
- ss << "</tr>";
- ss << '\n';
+ ss << "</td></tr>\n";
}
- void ReplSetImpl::_getOplogDiagsAsHtml(unsigned server_id, stringstream& ss) const {
+ void ReplSetImpl::_getOplogDiagsAsHtml(unsigned server_id, stringstream& ss) const {
const Member *m = findById(server_id);
- if( m == 0 ) {
+ if( m == 0 ) {
ss << "Error : can't find a member with id: " << server_id << '\n';
return;
}
@@ -187,21 +189,29 @@ namespace mongo {
//const bo fields = BSON( "o" << false << "o2" << false );
const bo fields;
- ScopedDbConnection conn(m->fullName());
+ /** todo fix we might want an so timeout here */
+ DBClientConnection conn(false, 0, /*timeout*/ 20);
+ {
+ string errmsg;
+ if( !conn.connect(m->fullName(), errmsg) ) {
+ ss << "couldn't connect to " << m->fullName() << ' ' << errmsg;
+ return;
+ }
+ }
- auto_ptr<DBClientCursor> c = conn->query(rsoplog, Query().sort("$natural",1), 20, 0, &fields);
- if( c.get() == 0 ) {
+ auto_ptr<DBClientCursor> c = conn.query(rsoplog, Query().sort("$natural",1), 20, 0, &fields);
+ if( c.get() == 0 ) {
ss << "couldn't query " << rsoplog;
return;
}
static const char *h[] = {"ts","optime", "h","op","ns","rest",0};
ss << "<style type=\"text/css\" media=\"screen\">"
- "table { font-size:75% }\n"
+ "table { font-size:75% }\n"
// "th { background-color:#bbb; color:#000 }\n"
// "td,th { padding:.25em }\n"
- "</style>\n";
-
+ "</style>\n";
+
ss << table(h, true);
//ss << "<pre>\n";
int n = 0;
@@ -211,17 +221,17 @@ namespace mongo {
while( c->more() ) {
bo o = c->next();
otLast = o["ts"]._opTime();
- if( otFirst.isNull() )
+ if( otFirst.isNull() )
otFirst = otLast;
say(ss, o);
- n++;
+ n++;
}
if( n == 0 ) {
ss << rsoplog << " is empty\n";
}
- else {
- auto_ptr<DBClientCursor> c = conn->query(rsoplog, Query().sort("$natural",-1), 20, 0, &fields);
- if( c.get() == 0 ) {
+ else {
+ auto_ptr<DBClientCursor> c = conn.query(rsoplog, Query().sort("$natural",-1), 20, 0, &fields);
+ if( c.get() == 0 ) {
ss << "couldn't query [2] " << rsoplog;
return;
}
@@ -230,7 +240,7 @@ namespace mongo {
otEnd = o["ts"]._opTime();
while( 1 ) {
stringstream z;
- if( o["ts"]._opTime() == otLast )
+ if( o["ts"]._opTime() == otLast )
break;
say(z, o);
x = z.str() + x;
@@ -253,32 +263,31 @@ namespace mongo {
ss.precision(3);
if( h < 72 )
ss << h << " hours";
- else
+ else
ss << h / 24.0 << " days";
ss << "</p>\n";
}
-
- conn.done();
}
- void ReplSetImpl::_summarizeAsHtml(stringstream& s) const {
+ void ReplSetImpl::_summarizeAsHtml(stringstream& s) const {
s << table(0, false);
s << tr("Set name:", _name);
s << tr("Majority up:", elect.aMajoritySeemsToBeUp()?"yes":"no" );
s << _table();
- const char *h[] = {"Member",
- "<a title=\"member id in the replset config\">id</a>",
- "Up",
- "<a title=\"length of time we have been continuously connected to the other member with no reconnects (for self, shows uptime)\">cctime</a>",
- "<a title=\"when this server last received a heartbeat response - includes error code responses\">Last heartbeat</a>",
- "Votes", "State", "Status",
- "<a title=\"how up to date this server is. this value polled every few seconds so actually lag is typically much lower than value shown here.\">optime</a>",
- "<a title=\"Clock skew in seconds relative to this server. Informational; server clock variances will make the diagnostics hard to read, but otherwise are benign..\">skew</a>",
- 0};
+ const char *h[] = {"Member",
+ "<a title=\"member id in the replset config\">id</a>",
+ "Up",
+ "<a title=\"length of time we have been continuously connected to the other member with no reconnects (for self, shows uptime)\">cctime</a>",
+ "<a title=\"when this server last received a heartbeat response - includes error code responses\">Last heartbeat</a>",
+ "Votes", "Priority", "State", "Messages",
+ "<a title=\"how up to date this server is. this value polled every few seconds so actually lag is typically much lower than value shown here.\">optime</a>",
+ "<a title=\"Clock skew in seconds relative to this server. Informational; server clock variances will make the diagnostics hard to read, but otherwise are benign..\">skew</a>",
+ 0
+ };
s << table(h);
- /* this is to sort the member rows by their ordinal _id, so they show up in the same
+ /* this is to sort the member rows by their ordinal _id, so they show up in the same
order on all the different web ui's; that is less confusing for the operator. */
map<int,string> mp;
@@ -287,13 +296,13 @@ namespace mongo {
readlocktry lk("local.replset.minvalid", 300);
if( lk.got() ) {
BSONObj mv;
- if( Helpers::getSingleton("local.replset.minvalid", mv) ) {
+ if( Helpers::getSingleton("local.replset.minvalid", mv) ) {
myMinValid = "minvalid:" + mv["ts"]._opTime().toString();
}
}
else myMinValid = ".";
}
- catch(...) {
+ catch(...) {
myMinValid = "exception fetching minvalid";
}
@@ -301,25 +310,26 @@ namespace mongo {
stringstream s;
/* self row */
s << tr() << td(_self->fullName() + " (me)") <<
- td(_self->id()) <<
- td("1") << //up
- td(ago(started)) <<
- td("") << // last heartbeat
- td(ToString(_self->config().votes)) <<
- td( stateAsHtml(box.getState()) + (_self->config().hidden?" (hidden)":"") );
+ td(_self->id()) <<
+ td("1") << //up
+ td(ago(started)) <<
+ td("") << // last heartbeat
+ td(ToString(_self->config().votes)) <<
+ td(ToString(_self->config().priority)) <<
+ td( stateAsHtml(box.getState()) + (_self->config().hidden?" (hidden)":"") );
s << td( _hbmsg );
stringstream q;
- q << "/_replSetOplog?" << _self->id();
+ q << "/_replSetOplog?_id=" << _self->id();
s << td( a(q.str(), myMinValid, theReplSet->lastOpTimeWritten.toString()) );
s << td(""); // skew
s << _tr();
- mp[_self->hbinfo().id()] = s.str();
+ mp[_self->hbinfo().id()] = s.str();
}
Member *m = head();
while( m ) {
- stringstream s;
+ stringstream s;
m->summarizeMember(s);
- mp[m->hbinfo().id()] = s.str();
+ mp[m->hbinfo().id()] = s.str();
m = m->next();
}
@@ -333,26 +343,27 @@ namespace mongo {
_rsLog.toHTML( s );
}
- const Member* ReplSetImpl::findById(unsigned id) const {
+ const Member* ReplSetImpl::findById(unsigned id) const {
if( id == _self->id() ) return _self;
for( Member *m = head(); m; m = m->next() )
- if( m->id() == id )
+ if( m->id() == id )
return m;
return 0;
}
- void ReplSetImpl::_summarizeStatus(BSONObjBuilder& b) const {
+ void ReplSetImpl::_summarizeStatus(BSONObjBuilder& b) const {
vector<BSONObj> v;
// add self
{
- HostAndPort h(getHostName(), cmdLine.port);
-
BSONObjBuilder bb;
bb.append("_id", (int) _self->id());
- bb.append("name", h.toString());
+ bb.append("name", _self->fullName());
bb.append("health", 1.0);
bb.append("state", (int) box.getState().s);
+ bb.append("stateStr", box.getState().toString());
+ bb.appendTimestamp("optime", lastOpTimeWritten.asDate());
+ bb.appendDate("optimeDate", lastOpTimeWritten.getSecs() * 1000LL);
string s = _self->lhb();
if( !s.empty() )
bb.append("errmsg", s);
@@ -365,9 +376,19 @@ namespace mongo {
BSONObjBuilder bb;
bb.append("_id", (int) m->id());
bb.append("name", m->fullName());
- bb.append("health", m->hbinfo().health);
+ double h = m->hbinfo().health;
+ bb.append("health", h);
bb.append("state", (int) m->state().s);
+ if( h == 0 ) {
+ // if we can't connect the state info is from the past and could be confusing to show
+ bb.append("stateStr", "(not reachable/healthy)");
+ }
+ else {
+ bb.append("stateStr", m->state().toString());
+ }
bb.append("uptime", (unsigned) (m->hbinfo().upSince ? (time(0)-m->hbinfo().upSince) : 0));
+ bb.appendTimestamp("optime", m->hbinfo().opTime.asDate());
+ bb.appendDate("optimeDate", m->hbinfo().opTime.getSecs() * 1000LL);
bb.appendTimeT("lastHeartbeat", m->hbinfo().lastHeartbeat);
string s = m->lhb();
if( !s.empty() )
@@ -380,10 +401,12 @@ namespace mongo {
b.appendTimeT("date", time(0));
b.append("myState", box.getState().s);
b.append("members", v);
+ if( replSetBlind )
+ b.append("blind",true); // to avoid confusion if set...normally never set except for testing.
}
- static struct Test : public UnitTest {
- void run() {
+ static struct Test : public UnitTest {
+ void run() {
HealthOptions a,b;
assert( a == b );
assert( a.isDefault() );
diff --git a/db/repl/health.h b/db/repl/health.h
index 645a3b5..a32db00 100644
--- a/db/repl/health.h
+++ b/db/repl/health.h
@@ -23,8 +23,8 @@ namespace mongo {
/* throws */
bool requestHeartbeat(string setname, string fromHost, string memberFullName, BSONObj& result, int myConfigVersion, int& theirConfigVersion, bool checkEmpty = false);
- struct HealthOptions {
- HealthOptions() {
+ struct HealthOptions {
+ HealthOptions() {
heartbeatSleepMillis = 2000;
heartbeatTimeoutMillis = 10000;
heartbeatConnRetries = 2;
@@ -42,8 +42,8 @@ namespace mongo {
uassert(13113, "bad replset heartbeat option", heartbeatTimeoutMillis >= 10);
}
- bool operator==(const HealthOptions& r) const {
- return heartbeatSleepMillis==r.heartbeatSleepMillis && heartbeatTimeoutMillis==r.heartbeatTimeoutMillis && heartbeatConnRetries==heartbeatConnRetries;
+ bool operator==(const HealthOptions& r) const {
+ return heartbeatSleepMillis==r.heartbeatSleepMillis && heartbeatTimeoutMillis==r.heartbeatTimeoutMillis && heartbeatConnRetries==heartbeatConnRetries;
}
};
diff --git a/db/repl/heartbeat.cpp b/db/repl/heartbeat.cpp
index b39fad7..3972466 100644
--- a/db/repl/heartbeat.cpp
+++ b/db/repl/heartbeat.cpp
@@ -31,7 +31,7 @@
#include "../../util/unittest.h"
#include "../instance.h"
-namespace mongo {
+namespace mongo {
using namespace bson;
@@ -42,7 +42,7 @@ namespace mongo {
long long HeartbeatInfo::timeDown() const {
if( up() ) return 0;
- if( downSince == 0 )
+ if( downSince == 0 )
return 0; // still waiting on first heartbeat
return jsTime() - downSince;
}
@@ -53,10 +53,10 @@ namespace mongo {
virtual bool adminOnly() const { return false; }
CmdReplSetHeartbeat() : ReplSetCommand("replSetHeartbeat") { }
virtual bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl) {
- if( replSetBlind )
+ if( replSetBlind )
return false;
- /* we don't call ReplSetCommand::check() here because heartbeat
+ /* we don't call ReplSetCommand::check() here because heartbeat
checks many things that are pre-initialization. */
if( !replSet ) {
errmsg = "not running with --replSet";
@@ -65,12 +65,12 @@ namespace mongo {
/* we want to keep heartbeat connections open when relinquishing primary. tag them here. */
{
- MessagingPort *mp = cc()._mp;
- if( mp )
+ MessagingPort *mp = cc().port();
+ if( mp )
mp->tag |= 1;
}
- if( cmdObj["pv"].Int() != 1 ) {
+ if( cmdObj["pv"].Int() != 1 ) {
errmsg = "incompatible replset protocol version";
return false;
}
@@ -86,7 +86,7 @@ namespace mongo {
}
result.append("rs", true);
- if( cmdObj["checkEmpty"].trueValue() ) {
+ if( cmdObj["checkEmpty"].trueValue() ) {
result.append("hasData", replHasDatabases());
}
if( theReplSet == 0 ) {
@@ -98,7 +98,7 @@ namespace mongo {
return false;
}
- if( theReplSet->name() != cmdObj.getStringField("replSetHeartbeat") ) {
+ if( theReplSet->name() != cmdObj.getStringField("replSetHeartbeat") ) {
errmsg = "repl set names do not match (2)";
result.append("mismatch", true);
return false;
@@ -118,8 +118,8 @@ namespace mongo {
} cmdReplSetHeartbeat;
/* throws dbexception */
- bool requestHeartbeat(string setName, string from, string memberFullName, BSONObj& result, int myCfgVersion, int& theirCfgVersion, bool checkEmpty) {
- if( replSetBlind ) {
+ bool requestHeartbeat(string setName, string from, string memberFullName, BSONObj& result, int myCfgVersion, int& theirCfgVersion, bool checkEmpty) {
+ if( replSetBlind ) {
//sleepmillis( rand() );
return false;
}
@@ -144,8 +144,8 @@ namespace mongo {
public:
ReplSetHealthPollTask(const HostAndPort& hh, const HeartbeatInfo& mm) : h(hh), m(mm) { }
- string name() { return "ReplSetHealthPollTask"; }
- void doWork() {
+ string name() const { return "ReplSetHealthPollTask"; }
+ void doWork() {
if ( !theReplSet ) {
log(2) << "theReplSet not initialized yet, skipping health poll this round" << rsLog;
return;
@@ -153,7 +153,7 @@ namespace mongo {
HeartbeatInfo mem = m;
HeartbeatInfo old = mem;
- try {
+ try {
BSONObj info;
int theirConfigVersion = -10000;
@@ -163,15 +163,17 @@ namespace mongo {
time_t after = mem.lastHeartbeat = time(0); // we set this on any response - we don't get this far if couldn't connect because exception is thrown
- try {
- mem.skew = 0;
- long long t = info["time"].Long();
- if( t > after )
+ if ( info["time"].isNumber() ) {
+ long long t = info["time"].numberLong();
+ if( t > after )
mem.skew = (int) (t - after);
- else if( t < before )
+ else if( t < before )
mem.skew = (int) (t - before); // negative
}
- catch(...) {
+ else {
+ // it won't be there if remote hasn't initialized yet
+ if( info.hasElement("time") )
+ warning() << "heatbeat.time isn't a number: " << info << endl;
mem.skew = INT_MIN;
}
@@ -182,7 +184,7 @@ namespace mongo {
}
if( ok ) {
if( mem.upSince == 0 ) {
- log() << "replSet info " << h.toString() << " is now up" << rsLog;
+ log() << "replSet info " << h.toString() << " is up" << rsLog;
mem.upSince = mem.lastHeartbeat;
}
mem.health = 1.0;
@@ -193,17 +195,20 @@ namespace mongo {
be cfg = info["config"];
if( cfg.ok() ) {
// received a new config
- boost::function<void()> f =
+ boost::function<void()> f =
boost::bind(&Manager::msgReceivedNewConfig, theReplSet->mgr, cfg.Obj().copy());
theReplSet->mgr->send(f);
}
}
- else {
+ else {
down(mem, info.getStringField("errmsg"));
}
}
- catch(...) {
- down(mem, "connect/transport error");
+ catch(DBException& e) {
+ down(mem, e.what());
+ }
+ catch(...) {
+ down(mem, "something unusual went wrong");
}
m = mem;
@@ -212,9 +217,9 @@ namespace mongo {
static time_t last = 0;
time_t now = time(0);
bool changed = mem.changed(old);
- if( changed ) {
- if( old.hbstate != mem.hbstate )
- log() << "replSet " << h.toString() << ' ' << mem.hbstate.toString() << rsLog;
+ if( changed ) {
+ if( old.hbstate != mem.hbstate )
+ log() << "replSet member " << h.toString() << ' ' << mem.hbstate.toString() << rsLog;
}
if( changed || now-last>4 ) {
last = now;
@@ -228,18 +233,18 @@ namespace mongo {
if( mem.upSince || mem.downSince == 0 ) {
mem.upSince = 0;
mem.downSince = jsTime();
- log() << "replSet info " << h.toString() << " is now down (or slow to respond)" << rsLog;
+ log() << "replSet info " << h.toString() << " is down (or slow to respond): " << msg << rsLog;
}
mem.lastHeartbeatMsg = msg;
}
};
- void ReplSetImpl::endOldHealthTasks() {
+ void ReplSetImpl::endOldHealthTasks() {
unsigned sz = healthTasks.size();
for( set<ReplSetHealthPollTask*>::iterator i = healthTasks.begin(); i != healthTasks.end(); i++ )
(*i)->halt();
healthTasks.clear();
- if( sz )
+ if( sz )
DEV log() << "replSet debug: cleared old tasks " << sz << endl;
}
@@ -251,8 +256,8 @@ namespace mongo {
void startSyncThread();
- /** called during repl set startup. caller expects it to return fairly quickly.
- note ReplSet object is only created once we get a config - so this won't run
+ /** called during repl set startup. caller expects it to return fairly quickly.
+ note ReplSet object is only created once we get a config - so this won't run
until the initiation.
*/
void ReplSetImpl::startThreads() {
diff --git a/db/repl/manager.cpp b/db/repl/manager.cpp
index 862ac46..ed39c31 100644
--- a/db/repl/manager.cpp
+++ b/db/repl/manager.cpp
@@ -1,4 +1,4 @@
-/* @file manager.cpp
+/* @file manager.cpp
*/
/**
@@ -23,20 +23,20 @@
namespace mongo {
- enum {
+ enum {
NOPRIMARY = -2,
SELFPRIMARY = -1
};
/* check members OTHER THAN US to see if they think they are primary */
- const Member * Manager::findOtherPrimary(bool& two) {
+ const Member * Manager::findOtherPrimary(bool& two) {
two = false;
Member *m = rs->head();
Member *p = 0;
while( m ) {
DEV assert( m != rs->_self );
if( m->state().primary() && m->hbinfo().up() ) {
- if( p ) {
+ if( p ) {
two = true;
return 0;
}
@@ -44,33 +44,36 @@ namespace mongo {
}
m = m->next();
}
- if( p )
+ if( p )
noteARemoteIsPrimary(p);
return p;
}
- Manager::Manager(ReplSetImpl *_rs) :
- task::Server("rs Manager"), rs(_rs), busyWithElectSelf(false), _primary(NOPRIMARY)
- {
+ Manager::Manager(ReplSetImpl *_rs) :
+ task::Server("rs Manager"), rs(_rs), busyWithElectSelf(false), _primary(NOPRIMARY) {
}
-
- Manager::~Manager() {
- log() << "ERROR: ~Manager should never be called" << rsLog;
+
+ Manager::~Manager() {
+ /* we don't destroy the replset object we sit in; however, the destructor could have thrown on init.
+ the log message below is just a reminder to come back one day and review this code more, and to
+ make it cleaner.
+ */
+ log() << "info: ~Manager called" << rsLog;
rs->mgr = 0;
- assert(false);
}
- void Manager::starting() {
+ void Manager::starting() {
Client::initThread("rs Manager");
}
- void Manager::noteARemoteIsPrimary(const Member *m) {
+ void Manager::noteARemoteIsPrimary(const Member *m) {
if( rs->box.getPrimary() == m )
return;
rs->_self->lhb() = "";
if( rs->iAmArbiterOnly() ) {
rs->box.set(MemberState::RS_ARBITER, m);
- } else {
+ }
+ else {
rs->box.noteRemoteIsPrimary(m);
}
}
@@ -87,9 +90,8 @@ namespace mongo {
const Member *p = rs->box.getPrimary();
if( p && p != rs->_self ) {
- if( !p->hbinfo().up() ||
- !p->hbinfo().hbstate.primary() )
- {
+ if( !p->hbinfo().up() ||
+ !p->hbinfo().hbstate.primary() ) {
p = 0;
rs->box.setOtherPrimary(0);
}
@@ -101,36 +103,36 @@ namespace mongo {
p2 = findOtherPrimary(two);
if( two ) {
/* two other nodes think they are primary (asynchronously polled) -- wait for things to settle down. */
- log() << "replSet warning DIAG two primaries (transiently)" << rsLog;
+ log() << "replSet info two primaries (transiently)" << rsLog;
return;
}
}
if( p2 ) {
/* someone else thinks they are primary. */
- if( p == p2 ) {
+ if( p == p2 ) {
// we thought the same; all set.
return;
}
if( p == 0 ) {
- noteARemoteIsPrimary(p2);
+ noteARemoteIsPrimary(p2);
return;
}
// todo xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
if( p != rs->_self ) {
// switch primary from oldremotep->newremotep2
- noteARemoteIsPrimary(p2);
+ noteARemoteIsPrimary(p2);
return;
}
/* we thought we were primary, yet now someone else thinks they are. */
if( !rs->elect.aMajoritySeemsToBeUp() ) {
/* we can't see a majority. so the other node is probably the right choice. */
- noteARemoteIsPrimary(p2);
+ noteARemoteIsPrimary(p2);
return;
}
- /* ignore for now, keep thinking we are master.
- this could just be timing (we poll every couple seconds) or could indicate
- a problem? if it happens consistently for a duration of time we should
+ /* ignore for now, keep thinking we are master.
+ this could just be timing (we poll every couple seconds) or could indicate
+ a problem? if it happens consistently for a duration of time we should
alert the sysadmin.
*/
return;
@@ -138,17 +140,17 @@ namespace mongo {
/* didn't find anyone who wants to be primary */
- if( p ) {
+ if( p ) {
/* we are already primary */
- if( p != rs->_self ) {
+ if( p != rs->_self ) {
rs->sethbmsg("error p != rs->self in checkNewState");
log() << "replSet " << p->fullName() << rsLog;
log() << "replSet " << rs->_self->fullName() << rsLog;
return;
}
- if( rs->elect.shouldRelinquish() ) {
+ if( rs->elect.shouldRelinquish() ) {
log() << "replSet can't see a majority of the set, relinquishing primary" << rsLog;
rs->relinquish();
}
@@ -162,7 +164,7 @@ namespace mongo {
/* TODO : CHECK PRIORITY HERE. can't be elected if priority zero. */
/* no one seems to be primary. shall we try to elect ourself? */
- if( !rs->elect.aMajoritySeemsToBeUp() ) {
+ if( !rs->elect.aMajoritySeemsToBeUp() ) {
static time_t last;
static int n;
int ll = 0;
@@ -175,15 +177,15 @@ namespace mongo {
busyWithElectSelf = true; // don't try to do further elections & such while we are already working on one.
}
- try {
- rs->elect.electSelf();
+ try {
+ rs->elect.electSelf();
}
catch(RetryAfterSleepException&) {
/* we want to process new inbounds before trying this again. so we just put a checkNewstate in the queue for eval later. */
requeue();
}
- catch(...) {
- log() << "replSet error unexpected assertion in rs manager" << rsLog;
+ catch(...) {
+ log() << "replSet error unexpected assertion in rs manager" << rsLog;
}
busyWithElectSelf = false;
}
diff --git a/db/repl/multicmd.h b/db/repl/multicmd.h
index 9eb9a17..df7c4e5 100644
--- a/db/repl/multicmd.h
+++ b/db/repl/multicmd.h
@@ -21,7 +21,7 @@
#include "../../util/background.h"
#include "connections.h"
-namespace mongo {
+namespace mongo {
struct Target {
Target(string hostport) : toHost(hostport), ok(false) { }
@@ -33,38 +33,37 @@ namespace mongo {
/* -- implementation ------------- */
- class _MultiCommandJob : public BackgroundJob {
+ class _MultiCommandJob : public BackgroundJob {
public:
BSONObj& cmd;
Target& d;
_MultiCommandJob(BSONObj& _cmd, Target& _d) : cmd(_cmd), d(_d) { }
+
private:
- string name() { return "MultiCommandJob"; }
+ string name() const { return "MultiCommandJob"; }
void run() {
- try {
+ try {
ScopedConn c(d.toHost);
d.ok = c.runCommand("admin", cmd, d.result);
}
- catch(DBException&) {
+ catch(DBException&) {
DEV log() << "dev caught dbexception on multiCommand " << d.toHost << rsLog;
}
}
};
- inline void multiCommand(BSONObj cmd, list<Target>& L) {
- typedef shared_ptr<_MultiCommandJob> P;
- list<P> jobs;
- list<BackgroundJob *> _jobs;
+ inline void multiCommand(BSONObj cmd, list<Target>& L) {
+ list<BackgroundJob *> jobs;
- for( list<Target>::iterator i = L.begin(); i != L.end(); i++ ) {
+ for( list<Target>::iterator i = L.begin(); i != L.end(); i++ ) {
Target& d = *i;
_MultiCommandJob *j = new _MultiCommandJob(cmd, d);
- jobs.push_back(P(j));
- _jobs.push_back(j);
+ j->go();
+ jobs.push_back(j);
}
- BackgroundJob::go(_jobs);
- BackgroundJob::wait(_jobs,5);
+ for( list<BackgroundJob*>::iterator i = jobs.begin(); i != jobs.end(); i++ ) {
+ (*i)->wait();
+ }
}
-
}
diff --git a/db/repl/replset_commands.cpp b/db/repl/replset_commands.cpp
index 328b0ab..dc8567a 100644
--- a/db/repl/replset_commands.cpp
+++ b/db/repl/replset_commands.cpp
@@ -24,7 +24,9 @@
#include "../../util/mongoutils/html.h"
#include "../../client/dbclient.h"
-namespace mongo {
+using namespace bson;
+
+namespace mongo {
void checkMembersUpForConfigChange(const ReplSetConfig& cfg, bool initial);
@@ -50,7 +52,7 @@ namespace mongo {
}
// may not need this, but if removed check all tests still work:
- if( !check(errmsg, result) )
+ if( !check(errmsg, result) )
return false;
if( cmdObj.hasElement("blind") ) {
@@ -61,6 +63,7 @@ namespace mongo {
}
} cmdReplSetTest;
+ /** get rollback id */
class CmdReplSetGetRBID : public ReplSetCommand {
public:
/* todo: ideally this should only change on rollbacks NOT on mongod restarts also. fix... */
@@ -68,26 +71,28 @@ namespace mongo {
virtual void help( stringstream &help ) const {
help << "internal";
}
- CmdReplSetGetRBID() : ReplSetCommand("replSetGetRBID") {
+ CmdReplSetGetRBID() : ReplSetCommand("replSetGetRBID") {
rbid = (int) curTimeMillis();
}
virtual bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl) {
- if( !check(errmsg, result) )
+ if( !check(errmsg, result) )
return false;
result.append("rbid",rbid);
return true;
}
} cmdReplSetRBID;
- using namespace bson;
- void incRBID() {
+ /** we increment the rollback id on every rollback event. */
+ void incRBID() {
cmdReplSetRBID.rbid++;
}
- int getRBID(DBClientConnection *c) {
+
+ /** helper to get rollback id from another server. */
+ int getRBID(DBClientConnection *c) {
bo info;
c->simpleCommand("admin", &info, "replSetGetRBID");
return info["rbid"].numberInt();
- }
+ }
class CmdReplSetGetStatus : public ReplSetCommand {
public:
@@ -98,7 +103,10 @@ namespace mongo {
}
CmdReplSetGetStatus() : ReplSetCommand("replSetGetStatus", true) { }
virtual bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl) {
- if( !check(errmsg, result) )
+ if ( cmdObj["forShell"].trueValue() )
+ lastError.disableForCommand();
+
+ if( !check(errmsg, result) )
return false;
theReplSet->summarizeStatus(result);
return true;
@@ -115,7 +123,7 @@ namespace mongo {
}
CmdReplSetReconfig() : ReplSetCommand("replSetReconfig"), mutex("rsreconfig") { }
virtual bool run(const string& a, BSONObj& b, string& errmsg, BSONObjBuilder& c, bool d) {
- try {
+ try {
rwlock_try_write lk(mutex);
return _run(a,b,errmsg,c,d);
}
@@ -125,16 +133,16 @@ namespace mongo {
}
private:
bool _run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl) {
- if( !check(errmsg, result) )
+ if( !check(errmsg, result) )
return false;
- if( !theReplSet->box.getState().primary() ) {
+ if( !theReplSet->box.getState().primary() ) {
errmsg = "replSetReconfig command must be sent to the current replica set primary.";
return false;
}
{
- // just make sure we can get a write lock before doing anything else. we'll reacquire one
- // later. of course it could be stuck then, but this check lowers the risk if weird things
+ // just make sure we can get a write lock before doing anything else. we'll reacquire one
+ // later. of course it could be stuck then, but this check lowers the risk if weird things
// are up - we probably don't want a change to apply 30 minutes after the initial attempt.
time_t t = time(0);
writelock lk("");
@@ -159,7 +167,7 @@ namespace mongo {
log() << "replSet replSetReconfig config object parses ok, " << newConfig.members.size() << " members specified" << rsLog;
- if( !ReplSetConfig::legalChange(theReplSet->getConfig(), newConfig, errmsg) ) {
+ if( !ReplSetConfig::legalChange(theReplSet->getConfig(), newConfig, errmsg) ) {
return false;
}
@@ -170,7 +178,7 @@ namespace mongo {
theReplSet->haveNewConfig(newConfig, true);
ReplSet::startupStatusMsg = "replSetReconfig'd";
}
- catch( DBException& e ) {
+ catch( DBException& e ) {
log() << "replSet replSetReconfig exception: " << e.what() << rsLog;
throw;
}
@@ -182,8 +190,11 @@ namespace mongo {
class CmdReplSetFreeze : public ReplSetCommand {
public:
virtual void help( stringstream &help ) const {
- help << "Enable / disable failover for the set - locks current primary as primary even if issues occur.\nFor use during system maintenance.\n";
- help << "{ replSetFreeze : <bool> }";
+ help << "{ replSetFreeze : <seconds> }";
+ help << "'freeze' state of member to the extent we can do that. What this really means is that\n";
+ help << "this node will not attempt to become primary until the time period specified expires.\n";
+ help << "You can call again with {replSetFreeze:0} to unfreeze sooner.\n";
+ help << "A process restart unfreezes the member also.\n";
help << "\nhttp://www.mongodb.org/display/DOCS/Replica+Set+Commands";
}
@@ -191,15 +202,22 @@ namespace mongo {
virtual bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl) {
if( !check(errmsg, result) )
return false;
- errmsg = "not yet implemented"; /*TODO*/
- return false;
+ int secs = (int) cmdObj.firstElement().numberInt();
+ if( theReplSet->freeze(secs) ) {
+ if( secs == 0 )
+ result.append("info","unfreezing");
+ }
+ if( secs == 1 )
+ result.append("warning", "you really want to freeze for only 1 second?");
+ return true;
}
} cmdReplSetFreeze;
class CmdReplSetStepDown: public ReplSetCommand {
public:
virtual void help( stringstream &help ) const {
- help << "Step down as primary. Will not try to reelect self or 1 minute.\n";
+ help << "{ replSetStepDown : <seconds> }\n";
+ help << "Step down as primary. Will not try to reelect self for the specified time period (1 minute if no numeric secs value specified).\n";
help << "(If another member with same priority takes over in the meantime, it will stay primary.)\n";
help << "http://www.mongodb.org/display/DOCS/Replica+Set+Commands";
}
@@ -212,7 +230,10 @@ namespace mongo {
errmsg = "not primary so can't step down";
return false;
}
- return theReplSet->stepDown();
+ int secs = (int) cmdObj.firstElement().numberInt();
+ if( secs == 0 )
+ secs = 60;
+ return theReplSet->stepDown(secs);
}
} cmdReplSetStepDown;
@@ -222,45 +243,46 @@ namespace mongo {
class ReplSetHandler : public DbWebHandler {
public:
- ReplSetHandler() : DbWebHandler( "_replSet" , 1 , true ){}
+ ReplSetHandler() : DbWebHandler( "_replSet" , 1 , true ) {}
virtual bool handles( const string& url ) const {
return startsWith( url , "/_replSet" );
}
- virtual void handle( const char *rq, string url,
+ virtual void handle( const char *rq, string url, BSONObj params,
string& responseMsg, int& responseCode,
- vector<string>& headers, const SockAddr &from ){
-
- string s = str::after(url, "/_replSetOplog?");
- if( !s.empty() )
- responseMsg = _replSetOplog(s);
+ vector<string>& headers, const SockAddr &from ) {
+
+ if( url == "/_replSetOplog" ) {
+ responseMsg = _replSetOplog(params);
+ }
else
responseMsg = _replSet();
responseCode = 200;
}
+ string _replSetOplog(bo parms) {
+ int _id = (int) str::toUnsigned( parms["_id"].String() );
- string _replSetOplog(string parms) {
stringstream s;
string t = "Replication oplog";
s << start(t);
s << p(t);
- if( theReplSet == 0 ) {
- if( cmdLine._replSet.empty() )
+ if( theReplSet == 0 ) {
+ if( cmdLine._replSet.empty() )
s << p("Not using --replSet");
else {
- s << p("Still starting up, or else set is not yet " + a("http://www.mongodb.org/display/DOCS/Replica+Set+Configuration#InitialSetup", "", "initiated")
+ s << p("Still starting up, or else set is not yet " + a("http://www.mongodb.org/display/DOCS/Replica+Set+Configuration#InitialSetup", "", "initiated")
+ ".<br>" + ReplSet::startupStatusMsg);
}
}
else {
try {
- theReplSet->getOplogDiagsAsHtml(stringToNum(parms.c_str()), s);
+ theReplSet->getOplogDiagsAsHtml(_id, s);
}
- catch(std::exception& e) {
- s << "error querying oplog: " << e.what() << '\n';
+ catch(std::exception& e) {
+ s << "error querying oplog: " << e.what() << '\n';
}
}
@@ -269,20 +291,20 @@ namespace mongo {
}
/* /_replSet show replica set status in html format */
- string _replSet() {
+ string _replSet() {
stringstream s;
s << start("Replica Set Status " + prettyHostName());
- s << p( a("/", "back", "Home") + " | " +
+ s << p( a("/", "back", "Home") + " | " +
a("/local/system.replset/?html=1", "", "View Replset Config") + " | " +
- a("/replSetGetStatus?text", "", "replSetGetStatus") + " | " +
+ a("/replSetGetStatus?text=1", "", "replSetGetStatus") + " | " +
a("http://www.mongodb.org/display/DOCS/Replica+Sets", "", "Docs")
);
- if( theReplSet == 0 ) {
- if( cmdLine._replSet.empty() )
+ if( theReplSet == 0 ) {
+ if( cmdLine._replSet.empty() )
s << p("Not using --replSet");
else {
- s << p("Still starting up, or else set is not yet " + a("http://www.mongodb.org/display/DOCS/Replica+Set+Configuration#InitialSetup", "", "initiated")
+ s << p("Still starting up, or else set is not yet " + a("http://www.mongodb.org/display/DOCS/Replica+Set+Configuration#InitialSetup", "", "initiated")
+ ".<br>" + ReplSet::startupStatusMsg);
}
}
diff --git a/db/repl/rs.cpp b/db/repl/rs.cpp
index 1c0444a..90ed9f4 100644
--- a/db/repl/rs.cpp
+++ b/db/repl/rs.cpp
@@ -20,9 +20,12 @@
#include "../client.h"
#include "../../client/dbclient.h"
#include "../dbhelpers.h"
+#include "../../s/d_logic.h"
#include "rs.h"
+#include "connections.h"
+#include "../repl.h"
-namespace mongo {
+namespace mongo {
using namespace bson;
@@ -30,18 +33,18 @@ namespace mongo {
ReplSet *theReplSet = 0;
extern string *discoveredSeed;
- void ReplSetImpl::sethbmsg(string s, int logLevel) {
+ void ReplSetImpl::sethbmsg(string s, int logLevel) {
static time_t lastLogged;
_hbmsgTime = time(0);
- if( s == _hbmsg ) {
+ if( s == _hbmsg ) {
// unchanged
if( _hbmsgTime - lastLogged < 60 )
return;
}
unsigned sz = s.size();
- if( sz >= 256 )
+ if( sz >= 256 )
memcpy(_hbmsg, s.c_str(), 255);
else {
_hbmsg[sz] = 0;
@@ -53,7 +56,7 @@ namespace mongo {
}
}
- void ReplSetImpl::assumePrimary() {
+ void ReplSetImpl::assumePrimary() {
assert( iAmPotentiallyHot() );
writelock lk("admin."); // so we are synchronized with _logOp()
box.setSelfPrimary(_self);
@@ -62,17 +65,26 @@ namespace mongo {
void ReplSetImpl::changeState(MemberState s) { box.change(s, _self); }
- void ReplSetImpl::relinquish() {
+ const bool closeOnRelinquish = true;
+
+ void ReplSetImpl::relinquish() {
if( box.getState().primary() ) {
log() << "replSet relinquishing primary state" << rsLog;
- changeState(MemberState::RS_RECOVERING);
-
- /* close sockets that were talking to us */
- /*log() << "replSet closing sockets after reqlinquishing primary" << rsLog;
- MessagingPort::closeAllSockets(1);*/
+ changeState(MemberState::RS_SECONDARY);
+
+ if( closeOnRelinquish ) {
+ /* close sockets that were talking to us so they don't blithly send many writes that will fail
+ with "not master" (of course client could check result code, but in case they are not)
+ */
+ log() << "replSet closing client sockets after reqlinquishing primary" << rsLog;
+ MessagingPort::closeAllSockets(1);
+ }
+
+ // now that all connections were closed, strip this mongod from all sharding details
+ // if and when it gets promoted to a primary again, only then it should reload the sharding state
+ // the rationale here is that this mongod won't bring stale state when it regains primaryhood
+ shardingState.resetShardingState();
- // todo: >
- //changeState(MemberState::RS_SECONDARY);
}
else if( box.getState().startup2() ) {
// ? add comment
@@ -81,26 +93,48 @@ namespace mongo {
}
/* look freshly for who is primary - includes relinquishing ourself. */
- void ReplSetImpl::forgetPrimary() {
- if( box.getState().primary() )
+ void ReplSetImpl::forgetPrimary() {
+ if( box.getState().primary() )
relinquish();
else {
box.setOtherPrimary(0);
}
}
- bool ReplSetImpl::_stepDown() {
+ // for the replSetStepDown command
+ bool ReplSetImpl::_stepDown(int secs) {
lock lk(this);
- if( box.getState().primary() ) {
- changeState(MemberState::RS_RECOVERING);
- elect.steppedDown = time(0) + 60;
- log() << "replSet info stepped down as primary" << rsLog;
+ if( box.getState().primary() ) {
+ elect.steppedDown = time(0) + secs;
+ log() << "replSet info stepping down as primary secs=" << secs << rsLog;
+ relinquish();
return true;
}
return false;
}
- void ReplSetImpl::msgUpdateHBInfo(HeartbeatInfo h) {
+ bool ReplSetImpl::_freeze(int secs) {
+ lock lk(this);
+ /* note if we are primary we remain primary but won't try to elect ourself again until
+ this time period expires.
+ */
+ if( secs == 0 ) {
+ elect.steppedDown = 0;
+ log() << "replSet info 'unfreezing'" << rsLog;
+ }
+ else {
+ if( !box.getState().primary() ) {
+ elect.steppedDown = time(0) + secs;
+ log() << "replSet info 'freezing' for " << secs << " seconds" << rsLog;
+ }
+ else {
+ log() << "replSet info received freeze command but we are primary" << rsLog;
+ }
+ }
+ return true;
+ }
+
+ void ReplSetImpl::msgUpdateHBInfo(HeartbeatInfo h) {
for( Member *m = _members.head(); m; m=m->next() ) {
if( m->id() == h.id() ) {
m->_hbinfo = h;
@@ -109,7 +143,7 @@ namespace mongo {
}
}
- list<HostAndPort> ReplSetImpl::memberHostnames() const {
+ list<HostAndPort> ReplSetImpl::memberHostnames() const {
list<HostAndPort> L;
L.push_back(_self->h());
for( Member *m = _members.head(); m; m = m->next() )
@@ -118,6 +152,7 @@ namespace mongo {
}
void ReplSetImpl::_fillIsMasterHost(const Member *m, vector<string>& hosts, vector<string>& passives, vector<string>& arbiters) {
+ assert( m );
if( m->config().hidden )
return;
@@ -126,8 +161,9 @@ namespace mongo {
}
else if( !m->config().arbiterOnly ) {
if( m->config().slaveDelay ) {
- /* hmmm - we don't list these as they are stale. */
- } else {
+ /* hmmm - we don't list these as they are stale. */
+ }
+ else {
passives.push_back(m->h().toString());
}
}
@@ -147,6 +183,7 @@ namespace mongo {
_fillIsMasterHost(_self, hosts, passives, arbiters);
for( Member *m = _members.head(); m; m = m->next() ) {
+ assert( m );
_fillIsMasterHost(m, hosts, passives, arbiters);
}
@@ -161,23 +198,27 @@ namespace mongo {
}
}
- if( !isp ) {
+ if( !isp ) {
const Member *m = sp.primary;
if( m )
b.append("primary", m->h().toString());
}
if( myConfig().arbiterOnly )
b.append("arbiterOnly", true);
+ if( myConfig().priority == 0 )
+ b.append("passive", true);
if( myConfig().slaveDelay )
b.append("slaveDelay", myConfig().slaveDelay);
if( myConfig().hidden )
b.append("hidden", true);
+ if( !myConfig().buildIndexes )
+ b.append("buildIndexes", false);
}
/** @param cfgString <setname>/<seedhost1>,<seedhost2> */
- void parseReplsetCmdLine(string cfgString, string& setname, vector<HostAndPort>& seeds, set<HostAndPort>& seedSet ) {
- const char *p = cfgString.c_str();
+ void parseReplsetCmdLine(string cfgString, string& setname, vector<HostAndPort>& seeds, set<HostAndPort>& seedSet ) {
+ const char *p = cfgString.c_str();
const char *slash = strchr(p, '/');
if( slash )
setname = string(p, slash-p);
@@ -207,7 +248,8 @@ namespace mongo {
//uassert(13101, "can't use localhost in replset host list", !m.isLocalHost());
if( m.isSelf() ) {
log(1) << "replSet ignoring seed " << m.toString() << " (=self)" << rsLog;
- } else
+ }
+ else
seeds.push_back(m);
if( *comma == 0 )
break;
@@ -216,10 +258,9 @@ namespace mongo {
}
}
- ReplSetImpl::ReplSetImpl(ReplSetCmdline& replSetCmdline) : elect(this),
- _self(0),
- mgr( new Manager(this) )
- {
+ ReplSetImpl::ReplSetImpl(ReplSetCmdline& replSetCmdline) : elect(this),
+ _self(0),
+ mgr( new Manager(this) ) {
_cfg = 0;
memset(_hbmsg, 0, sizeof(_hbmsg));
*_hbmsg = '.'; // temp...just to see
@@ -240,20 +281,21 @@ namespace mongo {
}
for( set<HostAndPort>::iterator i = replSetCmdline.seedSet.begin(); i != replSetCmdline.seedSet.end(); i++ ) {
if( i->isSelf() ) {
- if( sss == 1 )
+ if( sss == 1 )
log(1) << "replSet warning self is listed in the seed list and there are no other seeds listed did you intend that?" << rsLog;
- } else
+ }
+ else
log() << "replSet warning command line seed " << i->toString() << " is not present in the current repl set config" << rsLog;
}
}
void newReplUp();
- void ReplSetImpl::loadLastOpTimeWritten() {
+ void ReplSetImpl::loadLastOpTimeWritten() {
//assert( lastOpTimeWritten.isNull() );
readlock lk(rsoplog);
BSONObj o;
- if( Helpers::getLast(rsoplog, o) ) {
+ if( Helpers::getLast(rsoplog, o) ) {
lastH = o["h"].numberLong();
lastOpTimeWritten = o["ts"]._opTime();
uassert(13290, "bad replSet oplog entry?", !lastOpTimeWritten.isNull());
@@ -261,11 +303,11 @@ namespace mongo {
}
/* call after constructing to start - returns fairly quickly after launching its threads */
- void ReplSetImpl::_go() {
- try {
+ void ReplSetImpl::_go() {
+ try {
loadLastOpTimeWritten();
}
- catch(std::exception& e) {
+ catch(std::exception& e) {
log() << "replSet error fatal couldn't query the local " << rsoplog << " collection. Terminating mongod after 30 seconds." << rsLog;
log() << e.what() << rsLog;
sleepsecs(30);
@@ -283,11 +325,17 @@ namespace mongo {
extern BSONObj *getLastErrorDefault;
+ void ReplSetImpl::setSelfTo(Member *m) {
+ _self = m;
+ if( m ) _buildIndexes = m->config().buildIndexes;
+ else _buildIndexes = true;
+ }
+
/** @param reconf true if this is a reconfiguration and not an initial load of the configuration.
@return true if ok; throws if config really bad; false if config doesn't include self
*/
bool ReplSetImpl::initFromConfig(ReplSetConfig& c, bool reconf) {
- /* NOTE: haveNewConfig() writes the new config to disk before we get here. So
+ /* NOTE: haveNewConfig() writes the new config to disk before we get here. So
we cannot error out at this point, except fatally. Check errors earlier.
*/
lock lk(this);
@@ -302,25 +350,24 @@ namespace mongo {
{
unsigned nfound = 0;
int me = 0;
- for( vector<ReplSetConfig::MemberCfg>::iterator i = c.members.begin(); i != c.members.end(); i++ ) {
+ for( vector<ReplSetConfig::MemberCfg>::iterator i = c.members.begin(); i != c.members.end(); i++ ) {
const ReplSetConfig::MemberCfg& m = *i;
if( m.h.isSelf() ) {
nfound++;
me++;
-
if( !reconf || (_self && _self->id() == (unsigned) m._id) )
;
- else {
+ else {
log() << "replSet " << _self->id() << ' ' << m._id << rsLog;
assert(false);
}
}
- else if( reconf ) {
+ else if( reconf ) {
const Member *old = findById(m._id);
- if( old ) {
+ if( old ) {
nfound++;
assert( (int) old->id() == m._id );
- if( old->config() == m ) {
+ if( old->config() == m ) {
additive = false;
}
}
@@ -328,16 +375,24 @@ namespace mongo {
newOnes.push_back(&m);
}
}
+
+ // change timeout settings, if necessary
+ ScopedConn conn(m.h.toString());
+ conn.setTimeout(c.ho.heartbeatTimeoutMillis/1000.0);
}
if( me == 0 ) {
+ // initial startup with fastsync
+ if (!reconf && replSettings.fastsync) {
+ return false;
+ }
// log() << "replSet config : " << _cfg->toString() << rsLog;
- log() << "replSet error can't find self in the repl set configuration:" << rsLog;
+ log() << "replSet error self not present in the repl set configuration:" << rsLog;
log() << c.toString() << rsLog;
- assert(false);
+ uasserted(13497, "replSet error self not present in the configuration");
}
uassert( 13302, "replSet error self appears twice in the repl set configuration", me<=1 );
- if( reconf && config().members.size() != nfound )
+ if( reconf && config().members.size() != nfound )
additive = false;
}
@@ -347,14 +402,14 @@ namespace mongo {
_name = _cfg->_id;
assert( !_name.empty() );
- if( additive ) {
+ if( additive ) {
log() << "replSet info : additive change to configuration" << rsLog;
for( list<const ReplSetConfig::MemberCfg*>::const_iterator i = newOnes.begin(); i != newOnes.end(); i++ ) {
const ReplSetConfig::MemberCfg* m = *i;
Member *mi = new Member(m->h, m->_id, m, false);
- /** we will indicate that new members are up() initially so that we don't relinquish our
- primary state because we can't (transiently) see a majority. they should be up as we
+ /** we will indicate that new members are up() initially so that we don't relinquish our
+ primary state because we can't (transiently) see a majority. they should be up as we
check that new members are up before getting here on reconfig anyway.
*/
mi->get_hbinfo().health = 0.1;
@@ -373,20 +428,30 @@ namespace mongo {
int oldPrimaryId = -1;
{
const Member *p = box.getPrimary();
- if( p )
+ if( p )
oldPrimaryId = p->id();
}
forgetPrimary();
- _self = 0;
- for( vector<ReplSetConfig::MemberCfg>::iterator i = _cfg->members.begin(); i != _cfg->members.end(); i++ ) {
+
+ bool iWasArbiterOnly = _self ? iAmArbiterOnly() : false;
+ setSelfTo(0);
+ for( vector<ReplSetConfig::MemberCfg>::iterator i = _cfg->members.begin(); i != _cfg->members.end(); i++ ) {
const ReplSetConfig::MemberCfg& m = *i;
Member *mi;
if( m.h.isSelf() ) {
assert( _self == 0 );
- mi = _self = new Member(m.h, m._id, &m, true);
+ mi = new Member(m.h, m._id, &m, true);
+ setSelfTo(mi);
+
+ // if the arbiter status changed
+ if (iWasArbiterOnly ^ iAmArbiterOnly()) {
+ _changeArbiterState();
+ }
+
if( (int)mi->id() == oldPrimaryId )
box.setSelfPrimary(mi);
- } else {
+ }
+ else {
mi = new Member(m.h, m._id, &m, false);
_members.push(mi);
startHealthTaskFor(mi);
@@ -397,26 +462,57 @@ namespace mongo {
return true;
}
+ void startSyncThread();
+
+ void ReplSetImpl::_changeArbiterState() {
+ if (iAmArbiterOnly()) {
+ changeState(MemberState::RS_ARBITER);
+
+ // if there is an oplog, free it
+ // not sure if this is necessary, maybe just leave the oplog and let
+ // the user delete it if they want the space?
+ writelock lk(rsoplog);
+ Client::Context c(rsoplog);
+ NamespaceDetails *d = nsdetails(rsoplog);
+ if (d) {
+ string errmsg;
+ bob res;
+ dropCollection(rsoplog, errmsg, res);
+
+ // clear last op time to force initial sync (if the arbiter
+ // becomes a "normal" server again)
+ lastOpTimeWritten = OpTime();
+ }
+ }
+ else {
+ changeState(MemberState::RS_RECOVERING);
+
+ // oplog will be allocated when sync begins
+ /* TODO : could this cause two sync threads to exist (race condition)? */
+ boost::thread t(startSyncThread);
+ }
+ }
+
// Our own config must be the first one.
- bool ReplSetImpl::_loadConfigFinish(vector<ReplSetConfig>& cfgs) {
+ bool ReplSetImpl::_loadConfigFinish(vector<ReplSetConfig>& cfgs) {
int v = -1;
ReplSetConfig *highest = 0;
int myVersion = -2000;
int n = 0;
- for( vector<ReplSetConfig>::iterator i = cfgs.begin(); i != cfgs.end(); i++ ) {
+ for( vector<ReplSetConfig>::iterator i = cfgs.begin(); i != cfgs.end(); i++ ) {
ReplSetConfig& cfg = *i;
if( ++n == 1 ) myVersion = cfg.version;
- if( cfg.ok() && cfg.version > v ) {
+ if( cfg.ok() && cfg.version > v ) {
highest = &cfg;
v = cfg.version;
}
}
assert( highest );
- if( !initFromConfig(*highest) )
+ if( !initFromConfig(*highest) )
return false;
- if( highest->version > myVersion && highest->version >= 0 ) {
+ if( highest->version > myVersion && highest->version >= 0 ) {
log() << "replSet got config version " << highest->version << " from a remote, saving locally" << rsLog;
writelock lk("admin.");
highest->saveConfigLocally(BSONObj());
@@ -430,7 +526,7 @@ namespace mongo {
startupStatusMsg = "loading " + rsConfigNs + " config (LOADINGCONFIG)";
try {
vector<ReplSetConfig> configs;
- try {
+ try {
configs.push_back( ReplSetConfig(HostAndPort::me()) );
}
catch(DBException& e) {
@@ -438,26 +534,26 @@ namespace mongo {
throw;
}
for( vector<HostAndPort>::const_iterator i = _seeds->begin(); i != _seeds->end(); i++ ) {
- try {
+ try {
configs.push_back( ReplSetConfig(*i) );
}
- catch( DBException& e ) {
+ catch( DBException& e ) {
log() << "replSet exception trying to load config from " << *i << " : " << e.toString() << rsLog;
}
}
- if( discoveredSeed ) {
+ if( discoveredSeed ) {
try {
configs.push_back( ReplSetConfig(HostAndPort(*discoveredSeed)) );
}
- catch( DBException& ) {
+ catch( DBException& ) {
log(1) << "replSet exception trying to load config from discovered seed " << *discoveredSeed << rsLog;
}
}
int nok = 0;
int nempty = 0;
- for( vector<ReplSetConfig>::iterator i = configs.begin(); i != configs.end(); i++ ) {
+ for( vector<ReplSetConfig>::iterator i = configs.begin(); i != configs.end(); i++ ) {
if( i->ok() )
nok++;
if( i->empty() )
@@ -469,7 +565,9 @@ namespace mongo {
startupStatus = EMPTYCONFIG;
startupStatusMsg = "can't get " + rsConfigNs + " config from self or any seed (EMPTYCONFIG)";
log() << "replSet can't get " << rsConfigNs << " config from self or any seed (EMPTYCONFIG)" << rsLog;
- log(1) << "replSet have you ran replSetInitiate yet?" << rsLog;
+ static unsigned once;
+ if( ++once == 1 )
+ log() << "replSet info you may need to run replSetInitiate -- rs.initiate() in the shell -- if that is not already done" << rsLog;
if( _seeds->size() == 0 )
log(1) << "replSet info no seed hosts were specified on the --replSet command line" << rsLog;
}
@@ -483,13 +581,13 @@ namespace mongo {
continue;
}
- if( !_loadConfigFinish(configs) ) {
+ if( !_loadConfigFinish(configs) ) {
log() << "replSet info Couldn't load config yet. Sleeping 20sec and will try again." << rsLog;
sleepsecs(20);
continue;
}
}
- catch(DBException& e) {
+ catch(DBException& e) {
startupStatus = BADCONFIG;
startupStatusMsg = "replSet error loading set config (BADCONFIG)";
log() << "replSet error loading configurations " << e.toString() << rsLog;
@@ -504,30 +602,34 @@ namespace mongo {
startupStatus = STARTED;
}
- void ReplSetImpl::_fatal()
- {
+ void ReplSetImpl::_fatal() {
//lock l(this);
box.set(MemberState::RS_FATAL, 0);
//sethbmsg("fatal error");
- log() << "replSet error fatal, stopping replication" << rsLog;
+ log() << "replSet error fatal, stopping replication" << rsLog;
}
- void ReplSet::haveNewConfig(ReplSetConfig& newConfig, bool addComment) {
+ void ReplSet::haveNewConfig(ReplSetConfig& newConfig, bool addComment) {
lock l(this); // convention is to lock replset before taking the db rwlock
writelock lk("");
bo comment;
if( addComment )
comment = BSON( "msg" << "Reconfig set" << "version" << newConfig.version );
newConfig.saveConfigLocally(comment);
- try {
+ try {
initFromConfig(newConfig, true);
log() << "replSet replSetReconfig new config saved locally" << rsLog;
}
- catch(DBException& e) {
+ catch(DBException& e) {
+ if( e.getCode() == 13497 /* removed from set */ ) {
+ cc().shutdown();
+ dbexit( EXIT_CLEAN , "removed from replica set" ); // never returns
+ assert(0);
+ }
log() << "replSet error unexpected exception in haveNewConfig() : " << e.toString() << rsLog;
_fatal();
}
- catch(...) {
+ catch(...) {
log() << "replSet error unexpected exception in haveNewConfig()" << rsLog;
_fatal();
}
@@ -538,30 +640,33 @@ namespace mongo {
ReplSetConfig c(o);
if( c.version > rs->config().version )
theReplSet->haveNewConfig(c, false);
- else {
- log() << "replSet info msgReceivedNewConfig but version isn't higher " <<
- c.version << ' ' << rs->config().version << rsLog;
+ else {
+ log() << "replSet info msgReceivedNewConfig but version isn't higher " <<
+ c.version << ' ' << rs->config().version << rsLog;
}
}
- /* forked as a thread during startup
- it can run quite a while looking for config. but once found,
+ /* forked as a thread during startup
+ it can run quite a while looking for config. but once found,
a separate thread takes over as ReplSetImpl::Manager, and this thread
terminates.
*/
void startReplSets(ReplSetCmdline *replSetCmdline) {
Client::initThread("startReplSets");
- try {
+ try {
assert( theReplSet == 0 );
if( replSetCmdline == 0 ) {
assert(!replSet);
return;
}
+ if( !noauth ) {
+ cc().getAuthenticationInfo()->authorize("local");
+ }
(theReplSet = new ReplSet(*replSetCmdline))->go();
}
- catch(std::exception& e) {
+ catch(std::exception& e) {
log() << "replSet caught exception in startReplSets thread: " << e.what() << rsLog;
- if( theReplSet )
+ if( theReplSet )
theReplSet->fatal();
}
cc().shutdown();
@@ -569,10 +674,9 @@ namespace mongo {
}
-namespace boost {
+namespace boost {
- void assertion_failed(char const * expr, char const * function, char const * file, long line)
- {
+ void assertion_failed(char const * expr, char const * function, char const * file, long line) {
mongo::log() << "boost assertion failure " << expr << ' ' << function << ' ' << file << ' ' << line << endl;
}
diff --git a/db/repl/rs.h b/db/repl/rs.h
index 6c4d9a8..1419ad6 100644
--- a/db/repl/rs.h
+++ b/db/repl/rs.h
@@ -43,6 +43,7 @@ namespace mongo {
class Member : public List1<Member>::Base {
public:
Member(HostAndPort h, unsigned ord, const ReplSetConfig::MemberCfg *c, bool self);
+
string fullName() const { return h().toString(); }
const ReplSetConfig::MemberCfg& config() const { return _config; }
const HeartbeatInfo& hbinfo() const { return _hbinfo; }
@@ -51,10 +52,12 @@ namespace mongo {
MemberState state() const { return _hbinfo.hbstate; }
const HostAndPort& h() const { return _h; }
unsigned id() const { return _hbinfo.id(); }
+
bool potentiallyHot() const { return _config.potentiallyHot(); } // not arbiter, not priority 0
void summarizeMember(stringstream& s) const;
- friend class ReplSetImpl;
+
private:
+ friend class ReplSetImpl;
const ReplSetConfig::MemberCfg _config;
const HostAndPort _h;
HeartbeatInfo _hbinfo;
@@ -65,8 +68,8 @@ namespace mongo {
bool busyWithElectSelf;
int _primary;
- /** @param two - if true two primaries were seen. this can happen transiently, in addition to our
- polling being only occasional. in this case null is returned, but the caller should
+ /** @param two - if true two primaries were seen. this can happen transiently, in addition to our
+ polling being only occasional. in this case null is returned, but the caller should
not assume primary itself in that situation.
*/
const Member* findOtherPrimary(bool& two);
@@ -75,7 +78,7 @@ namespace mongo {
virtual void starting();
public:
Manager(ReplSetImpl *rs);
- ~Manager();
+ virtual ~Manager();
void msgReceivedNewConfig(BSONObj);
void msgCheckNewState();
};
@@ -84,7 +87,7 @@ namespace mongo {
class Consensus {
ReplSetImpl &rs;
- struct LastYea {
+ struct LastYea {
LastYea() : when(0), who(0xffffffff) { }
time_t when;
unsigned who;
@@ -96,12 +99,12 @@ namespace mongo {
bool weAreFreshest(bool& allUp, int& nTies);
bool sleptLast; // slept last elect() pass
public:
- Consensus(ReplSetImpl *t) : rs(*t) {
+ Consensus(ReplSetImpl *t) : rs(*t) {
sleptLast = false;
steppedDown = 0;
}
- /* if we've stepped down, this is when we are allowed to try to elect ourself again.
+ /* if we've stepped down, this is when we are allowed to try to elect ourself again.
todo: handle possible weirdnesses at clock skews etc.
*/
time_t steppedDown;
@@ -115,40 +118,40 @@ namespace mongo {
};
/** most operations on a ReplSet object should be done while locked. that logic implemented here. */
- class RSBase : boost::noncopyable {
+ class RSBase : boost::noncopyable {
public:
const unsigned magic;
void assertValid() { assert( magic == 0x12345677 ); }
private:
- mutex m;
+ mongo::mutex m;
int _locked;
ThreadLocalValue<bool> _lockedByMe;
protected:
RSBase() : magic(0x12345677), m("RSBase"), _locked(0) { }
- ~RSBase() {
+ ~RSBase() {
/* this can happen if we throw in the constructor; otherwise never happens. thus we log it as it is quite unusual. */
log() << "replSet ~RSBase called" << rsLog;
}
- class lock {
+ class lock {
RSBase& rsbase;
auto_ptr<scoped_lock> sl;
public:
- lock(RSBase* b) : rsbase(*b) {
+ lock(RSBase* b) : rsbase(*b) {
if( rsbase._lockedByMe.get() )
return; // recursive is ok...
sl.reset( new scoped_lock(rsbase.m) );
DEV assert(rsbase._locked == 0);
- rsbase._locked++;
+ rsbase._locked++;
rsbase._lockedByMe.set(true);
}
- ~lock() {
+ ~lock() {
if( sl.get() ) {
assert( rsbase._lockedByMe.get() );
DEV assert(rsbase._locked == 1);
rsbase._lockedByMe.set(false);
- rsbase._locked--;
+ rsbase._locked--;
}
}
};
@@ -157,11 +160,11 @@ namespace mongo {
/* for asserts */
bool locked() const { return _locked != 0; }
- /* if true, is locked, and was locked by this thread. note if false, it could be in the lock or not for another
+ /* if true, is locked, and was locked by this thread. note if false, it could be in the lock or not for another
just for asserts & such so we can make the contracts clear on who locks what when.
we don't use these locks that frequently, so the little bit of overhead is fine.
*/
- bool lockedByMe() { return _lockedByMe.get(); }
+ bool lockedByMe() { return _lockedByMe.get(); }
};
class ReplSetHealthPollTask;
@@ -174,19 +177,19 @@ namespace mongo {
MemberState state;
const Member *primary;
};
- const SP get() {
+ const SP get() {
scoped_lock lk(m);
return sp;
}
MemberState getState() const { return sp.state; }
const Member* getPrimary() const { return sp.primary; }
- void change(MemberState s, const Member *self) {
+ void change(MemberState s, const Member *self) {
scoped_lock lk(m);
- if( sp.state != s ) {
+ if( sp.state != s ) {
log() << "replSet " << s.toString() << rsLog;
}
sp.state = s;
- if( s.primary() ) {
+ if( s.primary() ) {
sp.primary = self;
}
else {
@@ -194,17 +197,17 @@ namespace mongo {
sp.primary = 0;
}
}
- void set(MemberState s, const Member *p) {
+ void set(MemberState s, const Member *p) {
scoped_lock lk(m);
sp.state = s; sp.primary = p;
}
void setSelfPrimary(const Member *self) { change(MemberState::RS_PRIMARY, self); }
- void setOtherPrimary(const Member *mem) {
+ void setOtherPrimary(const Member *mem) {
scoped_lock lk(m);
assert( !sp.state.primary() );
sp.primary = mem;
}
- void noteRemoteIsPrimary(const Member *remote) {
+ void noteRemoteIsPrimary(const Member *remote) {
scoped_lock lk(m);
if( !sp.state.secondary() && !sp.state.fatal() )
sp.state = MemberState::RS_RECOVERING;
@@ -212,10 +215,10 @@ namespace mongo {
}
StateBox() : m("StateBox") { }
private:
- mutex m;
+ mongo::mutex m;
SP sp;
};
-
+
void parseReplsetCmdLine(string cfgString, string& setname, vector<HostAndPort>& seeds, set<HostAndPort>& seedSet );
/** Parameter given to the --replSet command line option (parsed).
@@ -230,15 +233,15 @@ namespace mongo {
};
/* information about the entire repl set, such as the various servers in the set, and their state */
- /* note: We currently do not free mem when the set goes away - it is assumed the replset is a
+ /* note: We currently do not free mem when the set goes away - it is assumed the replset is a
singleton and long lived.
*/
class ReplSetImpl : protected RSBase {
public:
/** info on our state if the replset isn't yet "up". for example, if we are pre-initiation. */
- enum StartupStatus {
- PRESTART=0, LOADINGCONFIG=1, BADCONFIG=2, EMPTYCONFIG=3,
- EMPTYUNREACHABLE=4, STARTED=5, SOON=6
+ enum StartupStatus {
+ PRESTART=0, LOADINGCONFIG=1, BADCONFIG=2, EMPTYCONFIG=3,
+ EMPTYUNREACHABLE=4, STARTED=5, SOON=6
};
static StartupStatus startupStatus;
static string startupStatusMsg;
@@ -260,18 +263,21 @@ namespace mongo {
void relinquish();
void forgetPrimary();
protected:
- bool _stepDown();
+ bool _stepDown(int secs);
+ bool _freeze(int secs);
private:
void assumePrimary();
void loadLastOpTimeWritten();
void changeState(MemberState s);
+ const Member* getMemberToSyncTo();
+ void _changeArbiterState();
protected:
// "heartbeat message"
- // sent in requestHeartbeat respond in field "hbm"
+ // sent in requestHeartbeat respond in field "hbm"
char _hbmsg[256]; // we change this unlocked, thus not an stl::string
time_t _hbmsgTime; // when it was logged
public:
- void sethbmsg(string s, int logLevel = 0);
+ void sethbmsg(string s, int logLevel = 0);
protected:
bool initFromConfig(ReplSetConfig& c, bool reconf=false); // true if ok; throws if config really bad; false if config doesn't include self
void _fillIsMaster(BSONObjBuilder&);
@@ -281,7 +287,7 @@ namespace mongo {
MemberState state() const { return box.getState(); }
void _fatal();
void _getOplogDiagsAsHtml(unsigned server_id, stringstream& ss) const;
- void _summarizeAsHtml(stringstream&) const;
+ void _summarizeAsHtml(stringstream&) const;
void _summarizeStatus(BSONObjBuilder&) const; // for replSetGetStatus command
/* throws exception if a problem initializing. */
@@ -295,7 +301,7 @@ namespace mongo {
const vector<HostAndPort> *_seeds;
ReplSetConfig *_cfg;
- /** load our configuration from admin.replset. try seed machines too.
+ /** load our configuration from admin.replset. try seed machines too.
@return true if ok; throws if config really bad; false if config doesn't include self
*/
bool _loadConfigFinish(vector<ReplSetConfig>& v);
@@ -306,7 +312,9 @@ namespace mongo {
bool iAmArbiterOnly() const { return myConfig().arbiterOnly; }
bool iAmPotentiallyHot() const { return myConfig().potentiallyHot(); }
protected:
- Member *_self;
+ Member *_self;
+ bool _buildIndexes; // = _self->config().buildIndexes
+ void setSelfTo(Member *); // use this as it sets buildIndexes var
private:
List1<Member> _members; /* all members of the set EXCEPT self. */
@@ -330,7 +338,7 @@ namespace mongo {
private:
/* pulling data from primary related - see rs_sync.cpp */
- bool initialSyncOplogApplication(string hn, const Member *primary, OpTime applyGTE, OpTime minValid);
+ bool initialSyncOplogApplication(const Member *primary, OpTime applyGTE, OpTime minValid);
void _syncDoInitialSync();
void syncDoInitialSync();
void _syncThread();
@@ -340,21 +348,29 @@ namespace mongo {
unsigned _syncRollback(OplogReader& r);
void syncRollback(OplogReader& r);
void syncFixUp(HowToFixUp& h, OplogReader& r);
+ bool _getOplogReader(OplogReader& r, string& hn);
+ bool _isStale(OplogReader& r, const string& hn);
public:
void syncThread();
};
- class ReplSet : public ReplSetImpl {
+ class ReplSet : public ReplSetImpl {
public:
ReplSet(ReplSetCmdline& replSetCmdline) : ReplSetImpl(replSetCmdline) { }
- bool stepDown() { return _stepDown(); }
+ // for the replSetStepDown command
+ bool stepDown(int secs) { return _stepDown(secs); }
- string selfFullName() {
+ // for the replSetFreeze command
+ bool freeze(int secs) { return _freeze(secs); }
+
+ string selfFullName() {
lock lk(this);
return _self->fullName();
}
+ bool buildIndexes() const { return _buildIndexes; }
+
/* call after constructing to start - returns fairly quickly after la[unching its threads */
void go() { _go(); }
@@ -369,7 +385,7 @@ namespace mongo {
void summarizeStatus(BSONObjBuilder& b) const { _summarizeStatus(b); }
void fillIsMaster(BSONObjBuilder& b) { _fillIsMaster(b); }
- /* we have a new config (reconfig) - apply it.
+ /* we have a new config (reconfig) - apply it.
@param comment write a no-op comment to the oplog about it. only makes sense if one is primary and initiating the reconf.
*/
void haveNewConfig(ReplSetConfig& c, bool comment);
@@ -380,16 +396,16 @@ namespace mongo {
bool lockedByMe() { return RSBase::lockedByMe(); }
// heartbeat msg to send to others; descriptive diagnostic info
- string hbmsg() const {
+ string hbmsg() const {
if( time(0)-_hbmsgTime > 120 ) return "";
- return _hbmsg;
+ return _hbmsg;
}
};
- /** base class for repl set commands. checks basic things such as in rs mode before the command
+ /** base class for repl set commands. checks basic things such as in rs mode before the command
does its real work
*/
- class ReplSetCommand : public Command {
+ class ReplSetCommand : public Command {
protected:
ReplSetCommand(const char * s, bool show=false) : Command(s, show) { }
virtual bool slaveOk() const { return true; }
@@ -398,14 +414,14 @@ namespace mongo {
virtual LockType locktype() const { return NONE; }
virtual void help( stringstream &help ) const { help << "internal"; }
bool check(string& errmsg, BSONObjBuilder& result) {
- if( !replSet ) {
+ if( !replSet ) {
errmsg = "not running with --replSet";
return false;
}
if( theReplSet == 0 ) {
result.append("startupStatus", ReplSet::startupStatus);
errmsg = ReplSet::startupStatusMsg.empty() ? "replset unknown error 2" : ReplSet::startupStatusMsg;
- if( ReplSet::startupStatus == 3 )
+ if( ReplSet::startupStatus == 3 )
result.append("info", "run rs.initiate(...) if not yet done for the set");
return false;
}
@@ -415,9 +431,8 @@ namespace mongo {
/** inlines ----------------- */
- inline Member::Member(HostAndPort h, unsigned ord, const ReplSetConfig::MemberCfg *c, bool self) :
- _config(*c), _h(h), _hbinfo(ord)
- {
+ inline Member::Member(HostAndPort h, unsigned ord, const ReplSetConfig::MemberCfg *c, bool self) :
+ _config(*c), _h(h), _hbinfo(ord) {
if( self )
_hbinfo.health = 1.0;
}
diff --git a/db/repl/rs_config.cpp b/db/repl/rs_config.cpp
index 371507d..5998f51 100644
--- a/db/repl/rs_config.cpp
+++ b/db/repl/rs_config.cpp
@@ -27,11 +27,11 @@
using namespace bson;
-namespace mongo {
+namespace mongo {
void logOpInitiate(const bo&);
- void assertOnlyHas(BSONObj o, const set<string>& fields) {
+ void assertOnlyHas(BSONObj o, const set<string>& fields) {
BSONObj::iterator i(o);
while( i.more() ) {
BSONElement e = i.next();
@@ -41,7 +41,7 @@ namespace mongo {
}
}
- list<HostAndPort> ReplSetConfig::otherMemberHostnames() const {
+ list<HostAndPort> ReplSetConfig::otherMemberHostnames() const {
list<HostAndPort> L;
for( vector<MemberCfg>::const_iterator i = members.begin(); i != members.end(); i++ ) {
if( !i->h.isSelf() )
@@ -49,12 +49,12 @@ namespace mongo {
}
return L;
}
-
+
/* comment MUST only be set when initiating the set by the initiator */
- void ReplSetConfig::saveConfigLocally(bo comment) {
+ void ReplSetConfig::saveConfigLocally(bo comment) {
checkRsConfig();
log() << "replSet info saving a newer config version to local.system.replset" << rsLog;
- {
+ {
writelock lk("");
Client::Context cx( rsConfigNs );
cx.db()->flushFiles(true);
@@ -70,21 +70,21 @@ namespace mongo {
}
DEV log() << "replSet saveConfigLocally done" << rsLog;
}
-
- /*static*/
- /*void ReplSetConfig::receivedNewConfig(BSONObj cfg) {
+
+ /*static*/
+ /*void ReplSetConfig::receivedNewConfig(BSONObj cfg) {
if( theReplSet )
return; // this is for initial setup only, so far. todo
ReplSetConfig c(cfg);
writelock lk("admin.");
- if( theReplSet )
+ if( theReplSet )
return;
c.saveConfigLocally(bo());
}*/
- bo ReplSetConfig::MemberCfg::asBson() const {
+ bo ReplSetConfig::MemberCfg::asBson() const {
bob b;
b << "_id" << _id;
b.append("host", h.toString());
@@ -93,18 +93,28 @@ namespace mongo {
if( arbiterOnly ) b << "arbiterOnly" << true;
if( slaveDelay ) b << "slaveDelay" << slaveDelay;
if( hidden ) b << "hidden" << hidden;
+ if( !buildIndexes ) b << "buildIndexes" << buildIndexes;
+ if( !tags.empty() ) {
+ BSONArrayBuilder a;
+ for( set<string>::const_iterator i = tags.begin(); i != tags.end(); i++ )
+ a.append(*i);
+ b.appendArray("tags", a.done());
+ }
+ if( !initialSync.isEmpty() ) {
+ b << "initialSync" << initialSync;
+ }
return b.obj();
}
- bo ReplSetConfig::asBson() const {
+ bo ReplSetConfig::asBson() const {
bob b;
b.append("_id", _id).append("version", version);
if( !ho.isDefault() || !getLastErrorDefaults.isEmpty() ) {
bob settings;
if( !ho.isDefault() )
- settings << "heartbeatConnRetries " << ho.heartbeatConnRetries <<
- "heartbeatSleep" << ho.heartbeatSleepMillis / 1000 <<
- "heartbeatTimeout" << ho.heartbeatTimeoutMillis / 1000;
+ settings << "heartbeatConnRetries " << ho.heartbeatConnRetries <<
+ "heartbeatSleep" << ho.heartbeatSleepMillis / 1000.0 <<
+ "heartbeatTimeout" << ho.heartbeatTimeoutMillis / 1000.0;
if( !getLastErrorDefaults.isEmpty() )
settings << "getLastErrorDefaults" << getLastErrorDefaults;
b << "settings" << settings.obj();
@@ -122,7 +132,7 @@ namespace mongo {
uassert(13126, "bad Member config", expr);
}
- void ReplSetConfig::MemberCfg::check() const{
+ void ReplSetConfig::MemberCfg::check() const {
mchk(_id >= 0 && _id <= 255);
mchk(priority >= 0 && priority <= 1000);
mchk(votes >= 0 && votes <= 100);
@@ -130,41 +140,80 @@ namespace mongo {
uassert(13437, "slaveDelay requires priority be zero", slaveDelay == 0 || priority == 0);
uassert(13438, "bad slaveDelay value", slaveDelay >= 0 && slaveDelay <= 3600 * 24 * 366);
uassert(13439, "priority must be 0 when hidden=true", priority == 0 || !hidden);
+ uassert(13477, "priority must be 0 when buildIndexes=false", buildIndexes || priority == 0);
+
+ if (!initialSync.isEmpty()) {
+ static const string legal[] = {"state", "name", "_id","optime"};
+ static const set<string> legals(legal, legal + 4);
+ assertOnlyHas(initialSync, legals);
+
+ if (initialSync.hasElement("state")) {
+ uassert(13525, "initialSync source state must be 1 or 2",
+ initialSync["state"].isNumber() &&
+ (initialSync["state"].Number() == 1 ||
+ initialSync["state"].Number() == 2));
+ }
+ if (initialSync.hasElement("name")) {
+ uassert(13526, "initialSync source name must be a string",
+ initialSync["name"].type() == mongo::String);
+ }
+ if (initialSync.hasElement("_id")) {
+ uassert(13527, "initialSync source _id must be a number",
+ initialSync["_id"].isNumber());
+ }
+ if (initialSync.hasElement("optime")) {
+ uassert(13528, "initialSync source optime must be a timestamp",
+ initialSync["optime"].type() == mongo::Timestamp ||
+ initialSync["optime"].type() == mongo::Date);
+ }
+ }
}
/** @param o old config
- @param n new config
+ @param n new config
*/
- /*static*/ bool ReplSetConfig::legalChange(const ReplSetConfig& o, const ReplSetConfig& n, string& errmsg) {
+ /*static*/
+ bool ReplSetConfig::legalChange(const ReplSetConfig& o, const ReplSetConfig& n, string& errmsg) {
assert( theReplSet );
- if( o._id != n._id ) {
- errmsg = "set name may not change";
+ if( o._id != n._id ) {
+ errmsg = "set name may not change";
return false;
}
/* TODO : wonder if we need to allow o.version < n.version only, which is more lenient.
- if someone had some intermediate config this node doesnt have, that could be
+ if someone had some intermediate config this node doesnt have, that could be
necessary. but then how did we become primary? so perhaps we are fine as-is.
*/
- if( o.version + 1 != n.version ) {
+ if( o.version + 1 != n.version ) {
errmsg = "version number wrong";
return false;
}
map<HostAndPort,const ReplSetConfig::MemberCfg*> old;
- for( vector<ReplSetConfig::MemberCfg>::const_iterator i = o.members.begin(); i != o.members.end(); i++ ) {
+ for( vector<ReplSetConfig::MemberCfg>::const_iterator i = o.members.begin(); i != o.members.end(); i++ ) {
old[i->h] = &(*i);
}
int me = 0;
- for( vector<ReplSetConfig::MemberCfg>::const_iterator i = n.members.begin(); i != n.members.end(); i++ ) {
+ for( vector<ReplSetConfig::MemberCfg>::const_iterator i = n.members.begin(); i != n.members.end(); i++ ) {
const ReplSetConfig::MemberCfg& m = *i;
- if( old.count(m.h) ) {
- if( old[m.h]->_id != m._id ) {
+ if( old.count(m.h) ) {
+ const ReplSetConfig::MemberCfg& oldCfg = *old[m.h];
+ if( oldCfg._id != m._id ) {
log() << "replSet reconfig error with member: " << m.h.toString() << rsLog;
uasserted(13432, "_id may not change for members");
}
+ if( oldCfg.buildIndexes != m.buildIndexes ) {
+ log() << "replSet reconfig error with member: " << m.h.toString() << rsLog;
+ uasserted(13476, "buildIndexes may not change for members");
+ }
+ /* are transitions to and from arbiterOnly guaranteed safe? if not, we should disallow here.
+ there is a test at replsets/replsetarb3.js */
+ if( oldCfg.arbiterOnly != m.arbiterOnly ) {
+ log() << "replSet reconfig error with member: " << m.h.toString() << " arbiterOnly cannot change. remove and readd the member instead " << rsLog;
+ uasserted(13510, "arbiterOnly may not change for members");
+ }
}
- if( m.h.isSelf() )
+ if( m.h.isSelf() )
me++;
}
@@ -172,24 +221,33 @@ namespace mongo {
/* TODO : MORE CHECKS HERE */
- log() << "replSet TODO : don't allow removal of a node until we handle it at the removed node end?" << endl;
+ DEV log() << "replSet TODO : don't allow removal of a node until we handle it at the removed node end?" << endl;
// we could change its votes to zero perhaps instead as a short term...
return true;
}
- void ReplSetConfig::clear() {
+ void ReplSetConfig::clear() {
version = -5;
_ok = false;
}
- void ReplSetConfig::checkRsConfig() const {
+ void ReplSetConfig::checkRsConfig() const {
uassert(13132,
- "nonmatching repl set name in _id field; check --replSet command line",
- _id == cmdLine.ourSetName());
+ "nonmatching repl set name in _id field; check --replSet command line",
+ _id == cmdLine.ourSetName());
uassert(13308, "replSet bad config version #", version > 0);
uassert(13133, "replSet bad config no members", members.size() >= 1);
- uassert(13309, "replSet bad config maximum number of members is 7 (for now)", members.size() <= 7);
+ uassert(13309, "replSet bad config maximum number of members is 12", members.size() <= 12);
+ {
+ unsigned voters = 0;
+ for( vector<MemberCfg>::const_iterator i = members.begin(); i != members.end(); ++i ) {
+ if( i->votes )
+ voters++;
+ }
+ uassert(13612, "replSet bad config maximum number of voting members is 7", voters <= 7);
+ uassert(13613, "replSet bad config no voting members", voters > 0);
+ }
}
void ReplSetConfig::from(BSONObj o) {
@@ -213,7 +271,8 @@ namespace mongo {
if( settings["heartbeatTimeout"].ok() )
ho.heartbeatTimeoutMillis = (unsigned) (settings["heartbeatTimeout"].Number() * 1000);
ho.check();
- try { getLastErrorDefaults = settings["getLastErrorDefaults"].Obj().copy(); } catch(...) { }
+ try { getLastErrorDefaults = settings["getLastErrorDefaults"].Obj().copy(); }
+ catch(...) { }
}
set<string> hosts;
@@ -231,43 +290,57 @@ namespace mongo {
BSONObj mobj = members[i].Obj();
MemberCfg m;
try {
- static const string legal[] = {"_id","votes","priority","host","hidden","slaveDelay","arbiterOnly"};
- static const set<string> legals(legal, legal + 7);
+ static const string legal[] = {
+ "_id","votes","priority","host", "hidden","slaveDelay",
+ "arbiterOnly","buildIndexes","tags","initialSync"
+ };
+ static const set<string> legals(legal, legal + 10);
assertOnlyHas(mobj, legals);
- try {
+ try {
m._id = (int) mobj["_id"].Number();
- } catch(...) {
+ }
+ catch(...) {
/* TODO: use of string exceptions may be problematic for reconfig case! */
- throw "_id must be numeric";
+ throw "_id must be numeric";
}
string s;
try {
s = mobj["host"].String();
m.h = HostAndPort(s);
}
- catch(...) {
+ catch(...) {
throw string("bad or missing host field? ") + mobj.toString();
}
- if( m.h.isLocalHost() )
+ if( m.h.isLocalHost() )
localhosts++;
m.arbiterOnly = mobj.getBoolField("arbiterOnly");
m.slaveDelay = mobj["slaveDelay"].numberInt();
if( mobj.hasElement("hidden") )
m.hidden = mobj.getBoolField("hidden");
+ if( mobj.hasElement("buildIndexes") )
+ m.buildIndexes = mobj.getBoolField("buildIndexes");
if( mobj.hasElement("priority") )
m.priority = mobj["priority"].Number();
if( mobj.hasElement("votes") )
m.votes = (unsigned) mobj["votes"].Number();
+ if( mobj.hasElement("tags") ) {
+ vector<BSONElement> v = mobj["tags"].Array();
+ for( unsigned i = 0; i < v.size(); i++ )
+ m.tags.insert( v[i].String() );
+ }
+ if( mobj.hasElement("initialSync")) {
+ m.initialSync = mobj["initialSync"].Obj().getOwned();
+ }
m.check();
}
- catch( const char * p ) {
+ catch( const char * p ) {
log() << "replSet cfg parsing exception for members[" << i << "] " << p << rsLog;
stringstream ss;
ss << "replSet members[" << i << "] " << p;
uassert(13107, ss.str(), false);
}
- catch(DBException& e) {
+ catch(DBException& e) {
log() << "replSet cfg parsing exception for members[" << i << "] " << e.what() << rsLog;
stringstream ss;
ss << "bad config for member[" << i << "] " << e.what();
@@ -289,7 +362,7 @@ namespace mongo {
uassert(13122, "bad repl set config?", expr);
}
- ReplSetConfig::ReplSetConfig(BSONObj cfg) {
+ ReplSetConfig::ReplSetConfig(BSONObj cfg) {
clear();
from(cfg);
configAssert( version < 0 /*unspecified*/ || (version >= 1 && version <= 5000) );
@@ -315,18 +388,19 @@ namespace mongo {
BSONObj cmd = BSON( "replSetHeartbeat" << setname );
int theirVersion;
BSONObj info;
+ log() << "trying to contact " << h.toString() << rsLog;
bool ok = requestHeartbeat(setname, "", h.toString(), info, -2, theirVersion);
- if( info["rs"].trueValue() ) {
+ if( info["rs"].trueValue() ) {
// yes, it is a replicate set, although perhaps not yet initialized
}
else {
if( !ok ) {
log() << "replSet TEMP !ok heartbeating " << h.toString() << " on cfg load" << rsLog;
- if( !info.isEmpty() )
+ if( !info.isEmpty() )
log() << "replSet info " << h.toString() << " : " << info.toString() << rsLog;
return;
}
- {
+ {
stringstream ss;
ss << "replSet error: member " << h.toString() << " is not in --replSet mode";
msgassertedNoTrace(13260, ss.str().c_str()); // not caught as not a user exception - we want it not caught
@@ -343,7 +417,7 @@ namespace mongo {
cfg = conn.findOne(rsConfigNs, Query()).getOwned();
count = conn.count(rsConfigNs);
}
- catch ( DBException& e) {
+ catch ( DBException& ) {
if ( !h.isSelf() ) {
throw;
}
@@ -356,14 +430,14 @@ namespace mongo {
if( count > 1 )
uasserted(13109, str::stream() << "multiple rows in " << rsConfigNs << " not supported host: " << h.toString());
-
+
if( cfg.isEmpty() ) {
version = EMPTYCONFIG;
return;
}
version = -1;
}
- catch( DBException& e) {
+ catch( DBException& e) {
version = v;
log(level) << "replSet load config couldn't get from " << h.toString() << ' ' << e.what() << rsLog;
return;
diff --git a/db/repl/rs_config.h b/db/repl/rs_config.h
index e39dad7..7d43fe6 100644
--- a/db/repl/rs_config.h
+++ b/db/repl/rs_config.h
@@ -23,7 +23,7 @@
#include "../../util/hostandport.h"
#include "health.h"
-namespace mongo {
+namespace mongo {
/* singleton config object is stored here */
const string rsConfigNs = "local.system.replset";
@@ -31,7 +31,7 @@ namespace mongo {
class ReplSetConfig {
enum { EMPTYCONFIG = -2 };
public:
- /* if something is misconfigured, throws an exception.
+ /* if something is misconfigured, throws an exception.
if couldn't be queried or is just blank, ok() will be false.
*/
ReplSetConfig(const HostAndPort& h);
@@ -41,7 +41,7 @@ namespace mongo {
bool ok() const { return _ok; }
struct MemberCfg {
- MemberCfg() : _id(-1), votes(1), priority(1.0), arbiterOnly(false), slaveDelay(0), hidden(false) { }
+ MemberCfg() : _id(-1), votes(1), priority(1.0), arbiterOnly(false), slaveDelay(0), hidden(false), buildIndexes(true) { }
int _id; /* ordinal */
unsigned votes; /* how many votes this node gets. default 1. */
HostAndPort h;
@@ -49,15 +49,17 @@ namespace mongo {
bool arbiterOnly;
int slaveDelay; /* seconds. int rather than unsigned for convenient to/front bson conversion. */
bool hidden; /* if set, don't advertise to drives in isMaster. for non-primaries (priority 0) */
+ bool buildIndexes; /* if false, do not create any non-_id indexes */
+ set<string> tags; /* tagging for data center, rack, etc. */
+ BSONObj initialSync; /* directions for initial sync source */
void check() const; /* check validity, assert if not. */
BSONObj asBson() const;
- bool potentiallyHot() const {
- return !arbiterOnly && priority > 0;
- }
- bool operator==(const MemberCfg& r) const {
- return _id==r._id && votes == r.votes && h == r.h && priority == r.priority &&
- arbiterOnly == r.arbiterOnly && slaveDelay == r.slaveDelay && hidden == r.hidden;
+ bool potentiallyHot() const { return !arbiterOnly && priority > 0; }
+ bool operator==(const MemberCfg& r) const {
+ return _id==r._id && votes == r.votes && h == r.h && priority == r.priority &&
+ arbiterOnly == r.arbiterOnly && slaveDelay == r.slaveDelay && hidden == r.hidden &&
+ buildIndexes == buildIndexes;
}
bool operator!=(const MemberCfg& r) const { return !(*this == r); }
};
diff --git a/db/repl/rs_exception.h b/db/repl/rs_exception.h
index e71cad2..fc372fc 100755..100644
--- a/db/repl/rs_exception.h
+++ b/db/repl/rs_exception.h
@@ -1,15 +1,15 @@
-// @file rs_exception.h
-
-#pragma once
-
-namespace mongo {
-
- class VoteException : public std::exception {
+// @file rs_exception.h
+
+#pragma once
+
+namespace mongo {
+
+ class VoteException : public std::exception {
public:
- const char * what() const throw () { return "VoteException"; }
+ const char * what() const throw () { return "VoteException"; }
};
- class RetryAfterSleepException : public std::exception {
+ class RetryAfterSleepException : public std::exception {
public:
const char * what() const throw () { return "RetryAfterSleepException"; }
};
diff --git a/db/repl/rs_initialsync.cpp b/db/repl/rs_initialsync.cpp
index 3851c66..5a54059 100644
--- a/db/repl/rs_initialsync.cpp
+++ b/db/repl/rs_initialsync.cpp
@@ -15,6 +15,7 @@
*/
#include "pch.h"
+#include "../repl.h"
#include "../client.h"
#include "../../client/dbclient.h"
#include "rs.h"
@@ -33,15 +34,17 @@ namespace mongo {
// add try/catch with sleep
- void isyncassert(const char *msg, bool expr) {
- if( !expr ) {
+ void isyncassert(const char *msg, bool expr) {
+ if( !expr ) {
string m = str::stream() << "initial sync " << msg;
theReplSet->sethbmsg(m, 0);
uasserted(13404, m);
}
}
- void ReplSetImpl::syncDoInitialSync() {
+ void ReplSetImpl::syncDoInitialSync() {
+ createOplog();
+
while( 1 ) {
try {
_syncDoInitialSync();
@@ -54,14 +57,14 @@ namespace mongo {
}
}
- bool cloneFrom(const char *masterHost, string& errmsg, const string& fromdb, bool logForReplication,
- bool slaveOk, bool useReplAuth, bool snapshot);
+ bool cloneFrom(const char *masterHost, string& errmsg, const string& fromdb, bool logForReplication,
+ bool slaveOk, bool useReplAuth, bool snapshot);
/* todo : progress metering to sethbmsg. */
static bool clone(const char *master, string db) {
string err;
- return cloneFrom(master, err, db, false,
- /*slaveok later can be true*/ false, true, false);
+ return cloneFrom(master, err, db, false,
+ /* slave_ok */ true, true, false);
}
void _logOpObjRS(const BSONObj& op);
@@ -71,11 +74,11 @@ namespace mongo {
static void emptyOplog() {
writelock lk(rsoplog);
Client::Context ctx(rsoplog);
- NamespaceDetails *d = nsdetails(rsoplog);
+ NamespaceDetails *d = nsdetails(rsoplog);
- // temp
- if( d && d->nrecords == 0 )
- return; // already empty, ok.
+ // temp
+ if( d && d->stats.nrecords == 0 )
+ return; // already empty, ok.
log(1) << "replSet empty oplog" << rsLog;
d->emptyCappedCollection(rsoplog);
@@ -84,10 +87,10 @@ namespace mongo {
string errmsg;
bob res;
dropCollection(rsoplog, errmsg, res);
- log() << "replSet recreated oplog so it is empty. todo optimize this..." << rsLog;
- createOplog();*/
+ log() << "replSet recreated oplog so it is empty. todo optimize this..." << rsLog;
+ createOplog();*/
- // TEMP: restart to recreate empty oplog
+ // TEMP: restart to recreate empty oplog
//log() << "replSet FATAL error during initial sync. mongod restart required." << rsLog;
//dbexit( EXIT_CLEAN );
@@ -100,106 +103,182 @@ namespace mongo {
*/
}
- void ReplSetImpl::_syncDoInitialSync() {
- sethbmsg("initial sync pending",0);
+ /**
+ * Choose a member to sync from.
+ *
+ * The initalSync option is an object with 1 k/v pair:
+ *
+ * "state" : 1|2
+ * "name" : "host"
+ * "_id" : N
+ * "optime" : t
+ *
+ * All except optime are exact matches. "optime" will find a secondary with
+ * an optime >= to the optime given.
+ */
+ const Member* ReplSetImpl::getMemberToSyncTo() {
+ BSONObj sync = myConfig().initialSync;
+ bool secondaryOnly = false, isOpTime = false;
+ char *name = 0;
+ int id = -1;
+ OpTime optime;
StateBox::SP sp = box.get();
assert( !sp.state.primary() ); // wouldn't make sense if we were.
- const Member *cp = sp.primary;
- if( cp == 0 ) {
- sethbmsg("initial sync need a member to be primary",0);
+ // if it exists, we've already checked that these fields are valid in
+ // rs_config.cpp
+ if ( !sync.isEmpty() ) {
+ if (sync.hasElement("state")) {
+ if (sync["state"].Number() == 1) {
+ if (sp.primary) {
+ sethbmsg( str::stream() << "syncing to primary: " << sp.primary->fullName(), 0);
+ return const_cast<Member*>(sp.primary);
+ }
+ else {
+ sethbmsg("couldn't clone from primary");
+ return NULL;
+ }
+ }
+ else {
+ secondaryOnly = true;
+ }
+ }
+ if (sync.hasElement("name")) {
+ name = (char*)sync["name"].valuestr();
+ }
+ if (sync.hasElement("_id")) {
+ id = (int)sync["_id"].Number();
+ }
+ if (sync.hasElement("optime")) {
+ isOpTime = true;
+ optime = sync["optime"]._opTime();
+ }
+ }
+
+ for( Member *m = head(); m; m = m->next() ) {
+ if (!m->hbinfo().up() ||
+ (m->state() != MemberState::RS_SECONDARY &&
+ m->state() != MemberState::RS_PRIMARY) ||
+ (secondaryOnly && m->state() != MemberState::RS_SECONDARY) ||
+ (id != -1 && (int)m->id() != id) ||
+ (name != 0 && strcmp(name, m->fullName().c_str()) != 0) ||
+ (isOpTime && optime >= m->hbinfo().opTime)) {
+ continue;
+ }
+
+ sethbmsg( str::stream() << "syncing to: " << m->fullName(), 0);
+ return const_cast<Member*>(m);
+ }
+
+ sethbmsg( str::stream() << "couldn't find a member matching the sync criteria: " <<
+ "\nstate? " << (secondaryOnly ? "2" : "none") <<
+ "\nname? " << (name ? name : "none") <<
+ "\n_id? " << id <<
+ "\noptime? " << optime.toStringPretty() );
+
+ return NULL;
+ }
+
+ /**
+ * Do the initial sync for this member.
+ */
+ void ReplSetImpl::_syncDoInitialSync() {
+ sethbmsg("initial sync pending",0);
+
+ const Member *source = getMemberToSyncTo();
+ if (!source) {
+ sethbmsg("initial sync need a member to be primary or secondary to do our initial sync", 0);
sleepsecs(15);
return;
}
- string masterHostname = cp->h().toString();
+ string sourceHostname = source->h().toString();
OplogReader r;
- if( !r.connect(masterHostname) ) {
- sethbmsg( str::stream() << "initial sync couldn't connect to " << cp->h().toString() , 0);
+ if( !r.connect(sourceHostname) ) {
+ sethbmsg( str::stream() << "initial sync couldn't connect to " << source->h().toString() , 0);
sleepsecs(15);
return;
}
BSONObj lastOp = r.getLastOp(rsoplog);
- if( lastOp.isEmpty() ) {
+ if( lastOp.isEmpty() ) {
sethbmsg("initial sync couldn't read remote oplog", 0);
sleepsecs(15);
return;
}
OpTime startingTS = lastOp["ts"]._opTime();
-
- {
- /* make sure things aren't too flappy */
- sleepsecs(5);
- isyncassert( "flapping?", box.getPrimary() == cp );
- BSONObj o = r.getLastOp(rsoplog);
- isyncassert( "flapping [2]?", !o.isEmpty() );
- }
-
- sethbmsg("initial sync drop all databases", 0);
- dropAllDatabasesExceptLocal();
-// sethbmsg("initial sync drop oplog", 0);
-// emptyOplog();
-
- list<string> dbs = r.conn()->getDatabaseNames();
- for( list<string>::iterator i = dbs.begin(); i != dbs.end(); i++ ) {
- string db = *i;
- if( db != "local" ) {
- sethbmsg( str::stream() << "initial sync cloning db: " << db , 0);
- bool ok;
- {
- writelock lk(db);
- Client::Context ctx(db);
- ok = clone(masterHostname.c_str(), db);
- }
- if( !ok ) {
- sethbmsg( str::stream() << "initial sync error clone of " << db << " failed sleeping 5 minutes" ,0);
- sleepsecs(300);
- return;
+ if (replSettings.fastsync) {
+ log() << "fastsync: skipping database clone" << rsLog;
+ }
+ else {
+ sethbmsg("initial sync drop all databases", 0);
+ dropAllDatabasesExceptLocal();
+
+ sethbmsg("initial sync clone all databases", 0);
+
+ list<string> dbs = r.conn()->getDatabaseNames();
+ for( list<string>::iterator i = dbs.begin(); i != dbs.end(); i++ ) {
+ string db = *i;
+ if( db != "local" ) {
+ sethbmsg( str::stream() << "initial sync cloning db: " << db , 0);
+ bool ok;
+ {
+ writelock lk(db);
+ Client::Context ctx(db);
+ ok = clone(sourceHostname.c_str(), db);
+ }
+ if( !ok ) {
+ sethbmsg( str::stream() << "initial sync error clone of " << db << " failed sleeping 5 minutes" ,0);
+ sleepsecs(300);
+ return;
+ }
}
}
}
sethbmsg("initial sync query minValid",0);
- /* our cloned copy will be strange until we apply oplog events that occurred
+ isyncassert( "initial sync source must remain readable throughout our initial sync", source->state().readable() );
+
+ /* our cloned copy will be strange until we apply oplog events that occurred
through the process. we note that time point here. */
BSONObj minValid = r.getLastOp(rsoplog);
- assert( !minValid.isEmpty() );
+ isyncassert( "getLastOp is empty ", !minValid.isEmpty() );
OpTime mvoptime = minValid["ts"]._opTime();
assert( !mvoptime.isNull() );
- /* copy the oplog
+ /* apply relevant portion of the oplog
*/
{
- sethbmsg("initial sync copy+apply oplog");
- if( ! initialSyncOplogApplication(masterHostname, cp, startingTS, mvoptime) ) { // note we assume here that this call does not throw
+ sethbmsg("initial sync initial oplog application");
+ isyncassert( "initial sync source must remain readable throughout our initial sync [2]", source->state().readable() );
+ if( ! initialSyncOplogApplication(source, /*applyGTE*/startingTS, /*minValid*/mvoptime) ) { // note we assume here that this call does not throw
log() << "replSet initial sync failed during applyoplog" << rsLog;
emptyOplog(); // otherwise we'll be up!
- lastOpTimeWritten = OpTime();
- lastH = 0;
+ lastOpTimeWritten = OpTime();
+ lastH = 0;
log() << "replSet cleaning up [1]" << rsLog;
{
writelock lk("local.");
Client::Context cx( "local." );
- cx.db()->flushFiles(true);
+ cx.db()->flushFiles(true);
}
log() << "replSet cleaning up [2]" << rsLog;
- sleepsecs(2);
+ sleepsecs(5);
return;
}
}
sethbmsg("initial sync finishing up",0);
-
+
assert( !box.getState().primary() ); // wouldn't make sense if we were.
{
writelock lk("local.");
Client::Context cx( "local." );
- cx.db()->flushFiles(true);
+ cx.db()->flushFiles(true);
try {
log() << "replSet set minValid=" << minValid["ts"]._opTime().toString() << rsLog;
}
diff --git a/db/repl/rs_initiate.cpp b/db/repl/rs_initiate.cpp
index 9c74be0..cf1941f 100644
--- a/db/repl/rs_initiate.cpp
+++ b/db/repl/rs_initiate.cpp
@@ -26,47 +26,63 @@
#include "rs.h"
#include "rs_config.h"
#include "../dbhelpers.h"
+#include "../oplog.h"
using namespace bson;
using namespace mongoutils;
-namespace mongo {
+namespace mongo {
/* called on a reconfig AND on initiate
- throws
+ throws
@param initial true when initiating
*/
void checkMembersUpForConfigChange(const ReplSetConfig& cfg, bool initial) {
int failures = 0;
int me = 0;
+ stringstream selfs;
for( vector<ReplSetConfig::MemberCfg>::const_iterator i = cfg.members.begin(); i != cfg.members.end(); i++ ) {
if( i->h.isSelf() ) {
me++;
- if( !i->potentiallyHot() ) {
+ if( me > 1 )
+ selfs << ',';
+ selfs << i->h.toString();
+ if( !i->potentiallyHot() ) {
uasserted(13420, "initiation and reconfiguration of a replica set must be sent to a node that can become primary");
}
}
}
- uassert(13278, "bad config - dups?", me <= 1); // dups?
- uassert(13279, "can't find self in the replset config", me == 1);
+ uassert(13278, "bad config: isSelf is true for multiple hosts: " + selfs.str(), me <= 1); // dups?
+ if( me != 1 ) {
+ stringstream ss;
+ ss << "can't find self in the replset config";
+ if( !cmdLine.isDefaultPort() ) ss << " my port: " << cmdLine.port;
+ if( me != 0 ) ss << " found: " << me;
+ uasserted(13279, ss.str());
+ }
for( vector<ReplSetConfig::MemberCfg>::const_iterator i = cfg.members.begin(); i != cfg.members.end(); i++ ) {
+ // we know we're up
+ if (i->h.isSelf()) {
+ continue;
+ }
+
BSONObj res;
{
bool ok = false;
try {
int theirVersion = -1000;
- ok = requestHeartbeat(cfg._id, "", i->h.toString(), res, -1, theirVersion, initial/*check if empty*/);
- if( theirVersion >= cfg.version ) {
+ ok = requestHeartbeat(cfg._id, "", i->h.toString(), res, -1, theirVersion, initial/*check if empty*/);
+ if( theirVersion >= cfg.version ) {
stringstream ss;
ss << "replSet member " << i->h.toString() << " has too new a config version (" << theirVersion << ") to reconfigure";
uasserted(13259, ss.str());
}
}
- catch(DBException& e) {
+ catch(DBException& e) {
log() << "replSet cmufcc requestHeartbeat " << i->h.toString() << " : " << e.toString() << rsLog;
}
- catch(...) {
+ catch(...) {
log() << "replSet cmufcc error exception in requestHeartbeat?" << rsLog;
}
if( res.getBoolField("mismatch") )
@@ -96,7 +112,7 @@ namespace mongo {
trying to keep change small as release is near.
*/
const Member* m = theReplSet->findById( i->_id );
- if( m ) {
+ if( m ) {
// ok, so this was an existing member (wouldn't make sense to add to config a new member that is down)
assert( m->h().toString() == i->h.toString() );
allowFailure = true;
@@ -113,24 +129,24 @@ namespace mongo {
}
if( initial ) {
bool hasData = res["hasData"].Bool();
- uassert(13311, "member " + i->h.toString() + " has data already, cannot initiate set. All members except initiator must be empty.",
- !hasData || i->h.isSelf());
+ uassert(13311, "member " + i->h.toString() + " has data already, cannot initiate set. All members except initiator must be empty.",
+ !hasData || i->h.isSelf());
}
}
}
- class CmdReplSetInitiate : public ReplSetCommand {
+ class CmdReplSetInitiate : public ReplSetCommand {
public:
virtual LockType locktype() const { return NONE; }
CmdReplSetInitiate() : ReplSetCommand("replSetInitiate") { }
- virtual void help(stringstream& h) const {
- h << "Initiate/christen a replica set.";
+ virtual void help(stringstream& h) const {
+ h << "Initiate/christen a replica set.";
h << "\nhttp://www.mongodb.org/display/DOCS/Replica+Set+Commands";
}
virtual bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl) {
log() << "replSet replSetInitiate admin command received from client" << rsLog;
- if( !replSet ) {
+ if( !replSet ) {
errmsg = "server is not running with --replSet";
return false;
}
@@ -141,12 +157,12 @@ namespace mongo {
}
{
- // just make sure we can get a write lock before doing anything else. we'll reacquire one
- // later. of course it could be stuck then, but this check lowers the risk if weird things
+ // just make sure we can get a write lock before doing anything else. we'll reacquire one
+ // later. of course it could be stuck then, but this check lowers the risk if weird things
// are up.
time_t t = time(0);
writelock lk("");
- if( time(0)-t > 10 ) {
+ if( time(0)-t > 10 ) {
errmsg = "took a long time to get write lock, so not initiating. Initiate when server less busy?";
return false;
}
@@ -155,7 +171,7 @@ namespace mongo {
it is ok if the initiating member has *other* data than that.
*/
BSONObj o;
- if( Helpers::getFirst(rsoplog, o) ) {
+ if( Helpers::getFirst(rsoplog, o) ) {
errmsg = rsoplog + string(" is not empty on the initiating member. cannot initiate.");
return false;
}
@@ -194,7 +210,7 @@ namespace mongo {
configObj = b.obj();
log() << "replSet created this configuration for initiation : " << configObj.toString() << rsLog;
}
- else {
+ else {
configObj = cmdObj["replSetInitiate"].Obj();
}
@@ -203,7 +219,7 @@ namespace mongo {
ReplSetConfig newConfig(configObj);
parsed = true;
- if( newConfig.version > 1 ) {
+ if( newConfig.version > 1 ) {
errmsg = "can't initiate with a version number greater than 1";
return false;
}
@@ -214,6 +230,8 @@ namespace mongo {
log() << "replSet replSetInitiate all members seem up" << rsLog;
+ createOplog();
+
writelock lk("");
bo comment = BSON( "msg" << "initiating set");
newConfig.saveConfigLocally(comment);
@@ -222,9 +240,9 @@ namespace mongo {
ReplSet::startupStatus = ReplSet::SOON;
ReplSet::startupStatusMsg = "Received replSetInitiate - should come online shortly.";
}
- catch( DBException& e ) {
+ catch( DBException& e ) {
log() << "replSet replSetInitiate exception: " << e.what() << rsLog;
- if( !parsed )
+ if( !parsed )
errmsg = string("couldn't parse cfg object ") + e.what();
else
errmsg = string("couldn't initiate : ") + e.what();
diff --git a/db/repl/rs_member.h b/db/repl/rs_member.h
index 099cb22..017b6ea 100644
--- a/db/repl/rs_member.h
+++ b/db/repl/rs_member.h
@@ -30,18 +30,18 @@ namespace mongo {
RS_FATAL something bad has occurred and server is not completely offline with regard to the replica set. fatal error.
RS_STARTUP2 loaded config, still determining who is primary
*/
- struct MemberState {
- enum MS {
- RS_STARTUP,
- RS_PRIMARY,
- RS_SECONDARY,
- RS_RECOVERING,
- RS_FATAL,
- RS_STARTUP2,
- RS_UNKNOWN, /* remote node not yet reached */
- RS_ARBITER,
- RS_DOWN, /* node not reachable for a report */
- RS_ROLLBACK
+ struct MemberState {
+ enum MS {
+ RS_STARTUP = 0,
+ RS_PRIMARY = 1,
+ RS_SECONDARY = 2,
+ RS_RECOVERING = 3,
+ RS_FATAL = 4,
+ RS_STARTUP2 = 5,
+ RS_UNKNOWN = 6, /* remote node not yet reached */
+ RS_ARBITER = 7,
+ RS_DOWN = 8, /* node not reachable for a report */
+ RS_ROLLBACK = 9
} s;
MemberState(MS ms = RS_UNKNOWN) : s(ms) { }
@@ -53,6 +53,7 @@ namespace mongo {
bool startup2() const { return s == RS_STARTUP2; }
bool fatal() const { return s == RS_FATAL; }
bool rollback() const { return s == RS_ROLLBACK; }
+ bool readable() const { return s == RS_PRIMARY || s == RS_SECONDARY; }
string toString() const;
@@ -60,9 +61,9 @@ namespace mongo {
bool operator!=(const MemberState& r) const { return s != r.s; }
};
- /* this is supposed to be just basic information on a member,
+ /* this is supposed to be just basic information on a member,
and copy constructable. */
- class HeartbeatInfo {
+ class HeartbeatInfo {
unsigned _id;
public:
HeartbeatInfo() : _id(0xffffffff),hbstate(MemberState::RS_UNKNOWN),health(-1.0),downSince(0),skew(INT_MIN) { }
@@ -88,15 +89,15 @@ namespace mongo {
bool changed(const HeartbeatInfo& old) const;
};
- inline HeartbeatInfo::HeartbeatInfo(unsigned id) : _id(id) {
+ inline HeartbeatInfo::HeartbeatInfo(unsigned id) : _id(id) {
hbstate = MemberState::RS_UNKNOWN;
health = -1.0;
downSince = 0;
- lastHeartbeat = upSince = 0;
+ lastHeartbeat = upSince = 0;
skew = INT_MIN;
}
- inline bool HeartbeatInfo::changed(const HeartbeatInfo& old) const {
+ inline bool HeartbeatInfo::changed(const HeartbeatInfo& old) const {
return health != old.health ||
hbstate != old.hbstate;
}
diff --git a/db/repl/rs_optime.h b/db/repl/rs_optime.h
index b3607fa..f0ca569 100644
--- a/db/repl/rs_optime.h
+++ b/db/repl/rs_optime.h
@@ -1,58 +1,58 @@
-// @file rs_optime.h
-
-/*
- * Copyright (C) 2010 10gen Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- */
-
-#pragma once
-
-#include "../../util/optime.h"
-
-namespace mongo {
-
+// @file rs_optime.h
+
+/*
+ * Copyright (C) 2010 10gen Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#pragma once
+
+#include "../../util/optime.h"
+
+namespace mongo {
+
const char rsoplog[] = "local.oplog.rs";
-
- /*
- class RSOpTime : public OpTime {
- public:
- bool initiated() const { return getSecs() != 0; }
- };*/
-
- /*struct RSOpTime {
- unsigned long long ord;
-
- RSOpTime() : ord(0) { }
-
- bool initiated() const { return ord > 0; }
-
- void initiate() {
- assert( !initiated() );
- ord = 1000000;
- }
-
- ReplTime inc() {
- DEV assertInWriteLock();
- return ++ord;
- }
-
- string toString() const { return str::stream() << ord; }
-
- // query the oplog and set the highest value herein. acquires a db read lock. throws.
- void load();
- };
-
- extern RSOpTime rsOpTime;*/
-
-}
+
+ /*
+ class RSOpTime : public OpTime {
+ public:
+ bool initiated() const { return getSecs() != 0; }
+ };*/
+
+ /*struct RSOpTime {
+ unsigned long long ord;
+
+ RSOpTime() : ord(0) { }
+
+ bool initiated() const { return ord > 0; }
+
+ void initiate() {
+ assert( !initiated() );
+ ord = 1000000;
+ }
+
+ ReplTime inc() {
+ DEV assertInWriteLock();
+ return ++ord;
+ }
+
+ string toString() const { return str::stream() << ord; }
+
+ // query the oplog and set the highest value herein. acquires a db read lock. throws.
+ void load();
+ };
+
+ extern RSOpTime rsOpTime;*/
+
+}
diff --git a/db/repl/rs_rollback.cpp b/db/repl/rs_rollback.cpp
index 6b2544c..0b4cc28 100644
--- a/db/repl/rs_rollback.cpp
+++ b/db/repl/rs_rollback.cpp
@@ -1,5 +1,5 @@
/* @file rs_rollback.cpp
-*
+*
* Copyright (C) 2008 10gen Inc.
*
* This program is free software: you can redistribute it and/or modify
@@ -25,7 +25,7 @@
/* Scenarios
We went offline with ops not replicated out.
-
+
F = node that failed and coming back.
P = node that took over, new primary
@@ -33,11 +33,11 @@
F : a b c d e f g
P : a b c d q
- The design is "keep P". One could argue here that "keep F" has some merits, however, in most cases P
- will have significantly more data. Also note that P may have a proper subset of F's stream if there were
+ The design is "keep P". One could argue here that "keep F" has some merits, however, in most cases P
+ will have significantly more data. Also note that P may have a proper subset of F's stream if there were
no subsequent writes.
- For now the model is simply : get F back in sync with P. If P was really behind or something, we should have
+ For now the model is simply : get F back in sync with P. If P was really behind or something, we should have
just chosen not to fail over anyway.
#2:
@@ -50,9 +50,9 @@
Steps
find an event in common. 'd'.
- undo our events beyond that by:
+ undo our events beyond that by:
(1) taking copy from other server of those objects
- (2) do not consider copy valid until we pass reach an optime after when we fetched the new version of object
+ (2) do not consider copy valid until we pass reach an optime after when we fetched the new version of object
-- i.e., reset minvalid.
(3) we could skip operations on objects that are previous in time to our capture of the object as an optimization.
@@ -65,15 +65,15 @@ namespace mongo {
bool copyCollectionFromRemote(const string& host, const string& ns, const BSONObj& query, string& errmsg, bool logforrepl);
void incRBID();
- class rsfatal : public std::exception {
+ class rsfatal : public std::exception {
public:
- virtual const char* what() const throw(){ return "replica set fatal exception"; }
+ virtual const char* what() const throw() { return "replica set fatal exception"; }
};
struct DocID {
const char *ns;
be _id;
- bool operator<(const DocID& d) const {
+ bool operator<(const DocID& d) const {
int c = strcmp(ns, d.ns);
if( c < 0 ) return true;
if( c > 0 ) return false;
@@ -82,7 +82,7 @@ namespace mongo {
};
struct HowToFixUp {
- /* note this is a set -- if there are many $inc's on a single document we need to rollback, we only
+ /* note this is a set -- if there are many $inc's on a single document we need to rollback, we only
need to refetch it once. */
set<DocID> toRefetch;
@@ -97,9 +97,9 @@ namespace mongo {
int rbid; // remote server's current rollback sequence #
};
- static void refetch(HowToFixUp& h, const BSONObj& ourObj) {
+ static void refetch(HowToFixUp& h, const BSONObj& ourObj) {
const char *op = ourObj.getStringField("op");
- if( *op == 'n' )
+ if( *op == 'n' )
return;
unsigned long long totSize = 0;
@@ -108,53 +108,54 @@ namespace mongo {
throw "rollback too large";
DocID d;
+ // NOTE The assigned ns value may become invalid if we yield.
d.ns = ourObj.getStringField("ns");
- if( *d.ns == 0 ) {
+ if( *d.ns == 0 ) {
log() << "replSet WARNING ignoring op on rollback no ns TODO : " << ourObj.toString() << rsLog;
return;
}
bo o = ourObj.getObjectField(*op=='u' ? "o2" : "o");
- if( o.isEmpty() ) {
+ if( o.isEmpty() ) {
log() << "replSet warning ignoring op on rollback : " << ourObj.toString() << rsLog;
return;
}
- if( *op == 'c' ) {
+ if( *op == 'c' ) {
be first = o.firstElement();
NamespaceString s(d.ns); // foo.$cmd
string cmdname = first.fieldName();
Command *cmd = Command::findCommand(cmdname.c_str());
- if( cmd == 0 ) {
+ if( cmd == 0 ) {
log() << "replSet warning rollback no suchcommand " << first.fieldName() << " - different mongod versions perhaps?" << rsLog;
return;
}
else {
/* findandmodify - tranlated?
- godinsert?,
+ godinsert?,
renamecollection a->b. just resync a & b
*/
if( cmdname == "create" ) {
- /* Create collection operation
- { ts: ..., h: ..., op: "c", ns: "foo.$cmd", o: { create: "abc", ... } }
+ /* Create collection operation
+ { ts: ..., h: ..., op: "c", ns: "foo.$cmd", o: { create: "abc", ... } }
*/
string ns = s.db + '.' + o["create"].String(); // -> foo.abc
h.toDrop.insert(ns);
return;
}
- else if( cmdname == "drop" ) {
+ else if( cmdname == "drop" ) {
string ns = s.db + '.' + first.valuestr();
h.collectionsToResync.insert(ns);
return;
}
- else if( cmdname == "dropIndexes" || cmdname == "deleteIndexes" ) {
+ else if( cmdname == "dropIndexes" || cmdname == "deleteIndexes" ) {
/* TODO: this is bad. we simply full resync the collection here, which could be very slow. */
log() << "replSet info rollback of dropIndexes is slow in this version of mongod" << rsLog;
string ns = s.db + '.' + first.valuestr();
h.collectionsToResync.insert(ns);
return;
}
- else if( cmdname == "renameCollection" ) {
+ else if( cmdname == "renameCollection" ) {
/* TODO: slow. */
log() << "replSet info rollback of renameCollection is slow in this version of mongod" << rsLog;
string from = first.valuestr();
@@ -163,15 +164,15 @@ namespace mongo {
h.collectionsToResync.insert(to);
return;
}
- else if( cmdname == "reIndex" ) {
+ else if( cmdname == "reIndex" ) {
return;
}
- else if( cmdname == "dropDatabase" ) {
+ else if( cmdname == "dropDatabase" ) {
log() << "replSet error rollback : can't rollback drop database full resync will be required" << rsLog;
log() << "replSet " << o.toString() << rsLog;
throw rsfatal();
}
- else {
+ else {
log() << "replSet error can't rollback this command yet: " << o.toString() << rsLog;
log() << "replSet cmdname=" << cmdname << rsLog;
throw rsfatal();
@@ -190,15 +191,15 @@ namespace mongo {
int getRBID(DBClientConnection*);
- static void syncRollbackFindCommonPoint(DBClientConnection *them, HowToFixUp& h) {
+ static void syncRollbackFindCommonPoint(DBClientConnection *them, HowToFixUp& h) {
static time_t last;
- if( time(0)-last < 60 ) {
+ if( time(0)-last < 60 ) {
throw "findcommonpoint waiting a while before trying again";
}
last = time(0);
assert( dbMutex.atLeastReadLocked() );
- Client::Context c(rsoplog, dbpath, 0, false);
+ Client::Context c(rsoplog);
NamespaceDetails *nsd = nsdetails(rsoplog);
assert(nsd);
ReverseCappedCursor u(nsd);
@@ -226,7 +227,7 @@ namespace mongo {
log() << "replSet info rollback our last optime: " << ourTime.toStringPretty() << rsLog;
log() << "replSet info rollback their last optime: " << theirTime.toStringPretty() << rsLog;
log() << "replSet info rollback diff in end of log times: " << diff << " seconds" << rsLog;
- if( diff > 3600 ) {
+ if( diff > 3600 ) {
log() << "replSet rollback too long a time period for a rollback." << rsLog;
throw "error not willing to roll back more than one hour of data";
}
@@ -236,8 +237,8 @@ namespace mongo {
while( 1 ) {
scanned++;
/* todo add code to assure no excessive scanning for too long */
- if( ourTime == theirTime ) {
- if( ourObj["h"].Long() == theirObj["h"].Long() ) {
+ if( ourTime == theirTime ) {
+ if( ourObj["h"].Long() == theirObj["h"].Long() ) {
// found the point back in time where we match.
// todo : check a few more just to be careful about hash collisions.
log() << "replSet rollback found matching events at " << ourTime.toStringPretty() << rsLog;
@@ -249,7 +250,7 @@ namespace mongo {
refetch(h, ourObj);
- if( !t->more() ) {
+ if( !t->more() ) {
log() << "replSet rollback error RS100 reached beginning of remote oplog" << rsLog;
log() << "replSet them: " << them->toString() << " scanned: " << scanned << rsLog;
log() << "replSet theirTime: " << theirTime.toStringLong() << rsLog;
@@ -270,8 +271,8 @@ namespace mongo {
ourObj = u.current();
ourTime = ourObj["ts"]._opTime();
}
- else if( theirTime > ourTime ) {
- if( !t->more() ) {
+ else if( theirTime > ourTime ) {
+ if( !t->more() ) {
log() << "replSet rollback error RS100 reached beginning of remote oplog" << rsLog;
log() << "replSet them: " << them->toString() << " scanned: " << scanned << rsLog;
log() << "replSet theirTime: " << theirTime.toStringLong() << rsLog;
@@ -281,11 +282,11 @@ namespace mongo {
theirObj = t->nextSafe();
theirTime = theirObj["ts"]._opTime();
}
- else {
+ else {
// theirTime < ourTime
refetch(h, ourObj);
u.advance();
- if( !u.ok() ) {
+ if( !u.ok() ) {
log() << "replSet rollback error RS101 reached beginning of local oplog" << rsLog;
log() << "replSet them: " << them->toString() << " scanned: " << scanned << rsLog;
log() << "replSet theirTime: " << theirTime.toStringLong() << rsLog;
@@ -298,299 +299,303 @@ namespace mongo {
}
}
- struct X {
+ struct X {
const bson::bo *op;
bson::bo goodVersionOfObject;
};
- static void setMinValid(bo newMinValid) {
- try {
- log() << "replSet minvalid=" << newMinValid["ts"]._opTime().toStringLong() << rsLog;
- }
- catch(...) { }
- {
- Helpers::putSingleton("local.replset.minvalid", newMinValid);
- Client::Context cx( "local." );
- cx.db()->flushFiles(true);
- }
+ static void setMinValid(bo newMinValid) {
+ try {
+ log() << "replSet minvalid=" << newMinValid["ts"]._opTime().toStringLong() << rsLog;
+ }
+ catch(...) { }
+ {
+ Helpers::putSingleton("local.replset.minvalid", newMinValid);
+ Client::Context cx( "local." );
+ cx.db()->flushFiles(true);
+ }
}
void ReplSetImpl::syncFixUp(HowToFixUp& h, OplogReader& r) {
- DBClientConnection *them = r.conn();
-
- // fetch all first so we needn't handle interruption in a fancy way
-
- unsigned long long totSize = 0;
-
- list< pair<DocID,bo> > goodVersions;
-
- bo newMinValid;
-
- /* fetch all the goodVersions of each document from current primary */
- DocID d;
- unsigned long long n = 0;
- try {
- for( set<DocID>::iterator i = h.toRefetch.begin(); i != h.toRefetch.end(); i++ ) {
- d = *i;
-
- assert( !d._id.eoo() );
-
- {
- /* TODO : slow. lots of round trips. */
- n++;
- bo good= them->findOne(d.ns, d._id.wrap()).getOwned();
- totSize += good.objsize();
- uassert( 13410, "replSet too much data to roll back", totSize < 300 * 1024 * 1024 );
-
- // note good might be eoo, indicating we should delete it
- goodVersions.push_back(pair<DocID,bo>(d,good));
- }
- }
- newMinValid = r.getLastOp(rsoplog);
- if( newMinValid.isEmpty() ) {
- sethbmsg("rollback error newMinValid empty?");
- return;
- }
- }
- catch(DBException& e) {
- sethbmsg(str::stream() << "rollback re-get objects: " << e.toString(),0);
- log() << "rollback couldn't re-get ns:" << d.ns << " _id:" << d._id << ' ' << n << '/' << h.toRefetch.size() << rsLog;
- throw e;
- }
-
- MemoryMappedFile::flushAll(true);
-
- sethbmsg("rollback 3.5");
- if( h.rbid != getRBID(r.conn()) ) {
- // our source rolled back itself. so the data we received isn't necessarily consistent.
- sethbmsg("rollback rbid on source changed during rollback, cancelling this attempt");
- return;
- }
-
- // update them
- sethbmsg(str::stream() << "rollback 4 n:" << goodVersions.size());
-
- bool warn = false;
-
- assert( !h.commonPointOurDiskloc.isNull() );
-
- dbMutex.assertWriteLocked();
-
- /* we have items we are writing that aren't from a point-in-time. thus best not to come online
- until we get to that point in freshness. */
- setMinValid(newMinValid);
-
- /** any full collection resyncs required? */
- if( !h.collectionsToResync.empty() ) {
- for( set<string>::iterator i = h.collectionsToResync.begin(); i != h.collectionsToResync.end(); i++ ) {
- string ns = *i;
- sethbmsg(str::stream() << "rollback 4.1 coll resync " << ns);
- Client::Context c(*i, dbpath, 0, /*doauth*/false);
- try {
- bob res;
- string errmsg;
- dropCollection(ns, errmsg, res);
- {
- dbtemprelease r;
- bool ok = copyCollectionFromRemote(them->getServerAddress(), ns, bo(), errmsg, false);
- if( !ok ) {
- log() << "replSet rollback error resyncing collection " << ns << ' ' << errmsg << rsLog;
- throw "rollback error resyncing rollection [1]";
- }
- }
- }
- catch(...) {
- log() << "replset rollback error resyncing collection " << ns << rsLog;
- throw "rollback error resyncing rollection [2]";
- }
- }
-
- /* we did more reading from primary, so check it again for a rollback (which would mess us up), and
- make minValid newer.
- */
- sethbmsg("rollback 4.2");
- {
- string err;
- try {
- newMinValid = r.getLastOp(rsoplog);
- if( newMinValid.isEmpty() ) {
- err = "can't get minvalid from primary";
- } else {
- setMinValid(newMinValid);
- }
- }
- catch(...) {
- err = "can't get/set minvalid";
- }
- if( h.rbid != getRBID(r.conn()) ) {
- // our source rolled back itself. so the data we received isn't necessarily consistent.
- // however, we've now done writes. thus we have a problem.
- err += "rbid at primary changed during resync/rollback";
- }
- if( !err.empty() ) {
- log() << "replSet error rolling back : " << err << ". A full resync will be necessary." << rsLog;
- /* todo: reset minvalid so that we are permanently in fatal state */
- /* todo: don't be fatal, but rather, get all the data first. */
- sethbmsg("rollback error");
- throw rsfatal();
- }
- }
- sethbmsg("rollback 4.3");
- }
-
- sethbmsg("rollback 4.6");
- /** drop collections to drop before doing individual fixups - that might make things faster below actually if there were subsequent inserts to rollback */
- for( set<string>::iterator i = h.toDrop.begin(); i != h.toDrop.end(); i++ ) {
- Client::Context c(*i, dbpath, 0, /*doauth*/false);
- try {
- bob res;
- string errmsg;
- log(1) << "replSet rollback drop: " << *i << rsLog;
- dropCollection(*i, errmsg, res);
- }
- catch(...) {
- log() << "replset rollback error dropping collection " << *i << rsLog;
- }
- }
-
- sethbmsg("rollback 4.7");
- Client::Context c(rsoplog, dbpath, 0, /*doauth*/false);
- NamespaceDetails *oplogDetails = nsdetails(rsoplog);
- uassert(13423, str::stream() << "replSet error in rollback can't find " << rsoplog, oplogDetails);
-
- map<string,shared_ptr<RemoveSaver> > removeSavers;
-
- unsigned deletes = 0, updates = 0;
- for( list<pair<DocID,bo> >::iterator i = goodVersions.begin(); i != goodVersions.end(); i++ ) {
- const DocID& d = i->first;
- bo pattern = d._id.wrap(); // { _id : ... }
- try {
- assert( d.ns && *d.ns );
- if( h.collectionsToResync.count(d.ns) ) {
- /* we just synced this entire collection */
- continue;
- }
-
- /* keep an archive of items rolled back */
- shared_ptr<RemoveSaver>& rs = removeSavers[d.ns];
- if ( ! rs )
- rs.reset( new RemoveSaver( "rollback" , "" , d.ns ) );
-
- // todo: lots of overhead in context, this can be faster
- Client::Context c(d.ns, dbpath, 0, /*doauth*/false);
- if( i->second.isEmpty() ) {
- // wasn't on the primary; delete.
- /* TODO1.6 : can't delete from a capped collection. need to handle that here. */
- deletes++;
-
- NamespaceDetails *nsd = nsdetails(d.ns);
- if( nsd ) {
- if( nsd->capped ) {
- /* can't delete from a capped collection - so we truncate instead. if this item must go,
- so must all successors!!! */
- try {
- /** todo: IIRC cappedTrunateAfter does not handle completely empty. todo. */
- // this will crazy slow if no _id index.
- long long start = Listener::getElapsedTimeMillis();
- DiskLoc loc = Helpers::findOne(d.ns, pattern, false);
- if( Listener::getElapsedTimeMillis() - start > 200 )
- log() << "replSet warning roll back slow no _id index for " << d.ns << " perhaps?" << rsLog;
- //would be faster but requires index: DiskLoc loc = Helpers::findById(nsd, pattern);
- if( !loc.isNull() ) {
- try {
- nsd->cappedTruncateAfter(d.ns, loc, true);
- }
- catch(DBException& e) {
- if( e.getCode() == 13415 ) {
- // hack: need to just make cappedTruncate do this...
- nsd->emptyCappedCollection(d.ns);
- } else {
- throw;
- }
- }
- }
- }
- catch(DBException& e) {
- log() << "replSet error rolling back capped collection rec " << d.ns << ' ' << e.toString() << rsLog;
- }
- }
- else {
- try {
- deletes++;
- deleteObjects(d.ns, pattern, /*justone*/true, /*logop*/false, /*god*/true, rs.get() );
- }
- catch(...) {
- log() << "replSet error rollback delete failed ns:" << d.ns << rsLog;
- }
- }
- // did we just empty the collection? if so let's check if it even exists on the source.
- if( nsd->nrecords == 0 ) {
- try {
- string sys = cc().database()->name + ".system.namespaces";
- bo o = them->findOne(sys, QUERY("name"<<d.ns));
- if( o.isEmpty() ) {
- // we should drop
- try {
- bob res;
- string errmsg;
- dropCollection(d.ns, errmsg, res);
- }
- catch(...) {
- log() << "replset error rolling back collection " << d.ns << rsLog;
- }
- }
- }
- catch(DBException& ) {
- /* this isn't *that* big a deal, but is bad. */
- log() << "replSet warning rollback error querying for existence of " << d.ns << " at the primary, ignoring" << rsLog;
- }
- }
- }
- }
- else {
- // todo faster...
- OpDebug debug;
- updates++;
- _updateObjects(/*god*/true, d.ns, i->second, pattern, /*upsert=*/true, /*multi=*/false , /*logtheop=*/false , debug, rs.get() );
- }
- }
- catch(DBException& e) {
- log() << "replSet exception in rollback ns:" << d.ns << ' ' << pattern.toString() << ' ' << e.toString() << " ndeletes:" << deletes << rsLog;
- warn = true;
- }
- }
-
- removeSavers.clear(); // this effectively closes all of them
-
- sethbmsg(str::stream() << "rollback 5 d:" << deletes << " u:" << updates);
- MemoryMappedFile::flushAll(true);
- sethbmsg("rollback 6");
-
- // clean up oplog
- log(2) << "replSet rollback truncate oplog after " << h.commonPoint.toStringPretty() << rsLog;
- // todo: fatal error if this throws?
- oplogDetails->cappedTruncateAfter(rsoplog, h.commonPointOurDiskloc, false);
-
- /* reset cached lastoptimewritten and h value */
- loadLastOpTimeWritten();
-
- sethbmsg("rollback 7");
- MemoryMappedFile::flushAll(true);
-
- // done
- if( warn )
- sethbmsg("issues during syncRollback, see log");
- else
- sethbmsg("rollback done");
- }
-
- void ReplSetImpl::syncRollback(OplogReader&r) {
+ DBClientConnection *them = r.conn();
+
+ // fetch all first so we needn't handle interruption in a fancy way
+
+ unsigned long long totSize = 0;
+
+ list< pair<DocID,bo> > goodVersions;
+
+ bo newMinValid;
+
+ /* fetch all the goodVersions of each document from current primary */
+ DocID d;
+ unsigned long long n = 0;
+ try {
+ for( set<DocID>::iterator i = h.toRefetch.begin(); i != h.toRefetch.end(); i++ ) {
+ d = *i;
+
+ assert( !d._id.eoo() );
+
+ {
+ /* TODO : slow. lots of round trips. */
+ n++;
+ bo good= them->findOne(d.ns, d._id.wrap()).getOwned();
+ totSize += good.objsize();
+ uassert( 13410, "replSet too much data to roll back", totSize < 300 * 1024 * 1024 );
+
+ // note good might be eoo, indicating we should delete it
+ goodVersions.push_back(pair<DocID,bo>(d,good));
+ }
+ }
+ newMinValid = r.getLastOp(rsoplog);
+ if( newMinValid.isEmpty() ) {
+ sethbmsg("rollback error newMinValid empty?");
+ return;
+ }
+ }
+ catch(DBException& e) {
+ sethbmsg(str::stream() << "rollback re-get objects: " << e.toString(),0);
+ log() << "rollback couldn't re-get ns:" << d.ns << " _id:" << d._id << ' ' << n << '/' << h.toRefetch.size() << rsLog;
+ throw e;
+ }
+
+ MemoryMappedFile::flushAll(true);
+
+ sethbmsg("rollback 3.5");
+ if( h.rbid != getRBID(r.conn()) ) {
+ // our source rolled back itself. so the data we received isn't necessarily consistent.
+ sethbmsg("rollback rbid on source changed during rollback, cancelling this attempt");
+ return;
+ }
+
+ // update them
+ sethbmsg(str::stream() << "rollback 4 n:" << goodVersions.size());
+
+ bool warn = false;
+
+ assert( !h.commonPointOurDiskloc.isNull() );
+
+ dbMutex.assertWriteLocked();
+
+ /* we have items we are writing that aren't from a point-in-time. thus best not to come online
+ until we get to that point in freshness. */
+ setMinValid(newMinValid);
+
+ /** any full collection resyncs required? */
+ if( !h.collectionsToResync.empty() ) {
+ for( set<string>::iterator i = h.collectionsToResync.begin(); i != h.collectionsToResync.end(); i++ ) {
+ string ns = *i;
+ sethbmsg(str::stream() << "rollback 4.1 coll resync " << ns);
+ Client::Context c(*i);
+ try {
+ bob res;
+ string errmsg;
+ dropCollection(ns, errmsg, res);
+ {
+ dbtemprelease r;
+ bool ok = copyCollectionFromRemote(them->getServerAddress(), ns, bo(), errmsg, false);
+ if( !ok ) {
+ log() << "replSet rollback error resyncing collection " << ns << ' ' << errmsg << rsLog;
+ throw "rollback error resyncing rollection [1]";
+ }
+ }
+ }
+ catch(...) {
+ log() << "replset rollback error resyncing collection " << ns << rsLog;
+ throw "rollback error resyncing rollection [2]";
+ }
+ }
+
+ /* we did more reading from primary, so check it again for a rollback (which would mess us up), and
+ make minValid newer.
+ */
+ sethbmsg("rollback 4.2");
+ {
+ string err;
+ try {
+ newMinValid = r.getLastOp(rsoplog);
+ if( newMinValid.isEmpty() ) {
+ err = "can't get minvalid from primary";
+ }
+ else {
+ setMinValid(newMinValid);
+ }
+ }
+ catch(...) {
+ err = "can't get/set minvalid";
+ }
+ if( h.rbid != getRBID(r.conn()) ) {
+ // our source rolled back itself. so the data we received isn't necessarily consistent.
+ // however, we've now done writes. thus we have a problem.
+ err += "rbid at primary changed during resync/rollback";
+ }
+ if( !err.empty() ) {
+ log() << "replSet error rolling back : " << err << ". A full resync will be necessary." << rsLog;
+ /* todo: reset minvalid so that we are permanently in fatal state */
+ /* todo: don't be fatal, but rather, get all the data first. */
+ sethbmsg("rollback error");
+ throw rsfatal();
+ }
+ }
+ sethbmsg("rollback 4.3");
+ }
+
+ sethbmsg("rollback 4.6");
+ /** drop collections to drop before doing individual fixups - that might make things faster below actually if there were subsequent inserts to rollback */
+ for( set<string>::iterator i = h.toDrop.begin(); i != h.toDrop.end(); i++ ) {
+ Client::Context c(*i);
+ try {
+ bob res;
+ string errmsg;
+ log(1) << "replSet rollback drop: " << *i << rsLog;
+ dropCollection(*i, errmsg, res);
+ }
+ catch(...) {
+ log() << "replset rollback error dropping collection " << *i << rsLog;
+ }
+ }
+
+ sethbmsg("rollback 4.7");
+ Client::Context c(rsoplog);
+ NamespaceDetails *oplogDetails = nsdetails(rsoplog);
+ uassert(13423, str::stream() << "replSet error in rollback can't find " << rsoplog, oplogDetails);
+
+ map<string,shared_ptr<RemoveSaver> > removeSavers;
+
+ unsigned deletes = 0, updates = 0;
+ for( list<pair<DocID,bo> >::iterator i = goodVersions.begin(); i != goodVersions.end(); i++ ) {
+ const DocID& d = i->first;
+ bo pattern = d._id.wrap(); // { _id : ... }
+ try {
+ assert( d.ns && *d.ns );
+ if( h.collectionsToResync.count(d.ns) ) {
+ /* we just synced this entire collection */
+ continue;
+ }
+
+ getDur().commitIfNeeded();
+
+ /* keep an archive of items rolled back */
+ shared_ptr<RemoveSaver>& rs = removeSavers[d.ns];
+ if ( ! rs )
+ rs.reset( new RemoveSaver( "rollback" , "" , d.ns ) );
+
+ // todo: lots of overhead in context, this can be faster
+ Client::Context c(d.ns);
+ if( i->second.isEmpty() ) {
+ // wasn't on the primary; delete.
+ /* TODO1.6 : can't delete from a capped collection. need to handle that here. */
+ deletes++;
+
+ NamespaceDetails *nsd = nsdetails(d.ns);
+ if( nsd ) {
+ if( nsd->capped ) {
+ /* can't delete from a capped collection - so we truncate instead. if this item must go,
+ so must all successors!!! */
+ try {
+ /** todo: IIRC cappedTrunateAfter does not handle completely empty. todo. */
+ // this will crazy slow if no _id index.
+ long long start = Listener::getElapsedTimeMillis();
+ DiskLoc loc = Helpers::findOne(d.ns, pattern, false);
+ if( Listener::getElapsedTimeMillis() - start > 200 )
+ log() << "replSet warning roll back slow no _id index for " << d.ns << " perhaps?" << rsLog;
+ //would be faster but requires index: DiskLoc loc = Helpers::findById(nsd, pattern);
+ if( !loc.isNull() ) {
+ try {
+ nsd->cappedTruncateAfter(d.ns, loc, true);
+ }
+ catch(DBException& e) {
+ if( e.getCode() == 13415 ) {
+ // hack: need to just make cappedTruncate do this...
+ nsd->emptyCappedCollection(d.ns);
+ }
+ else {
+ throw;
+ }
+ }
+ }
+ }
+ catch(DBException& e) {
+ log() << "replSet error rolling back capped collection rec " << d.ns << ' ' << e.toString() << rsLog;
+ }
+ }
+ else {
+ try {
+ deletes++;
+ deleteObjects(d.ns, pattern, /*justone*/true, /*logop*/false, /*god*/true, rs.get() );
+ }
+ catch(...) {
+ log() << "replSet error rollback delete failed ns:" << d.ns << rsLog;
+ }
+ }
+ // did we just empty the collection? if so let's check if it even exists on the source.
+ if( nsd->stats.nrecords == 0 ) {
+ try {
+ string sys = cc().database()->name + ".system.namespaces";
+ bo o = them->findOne(sys, QUERY("name"<<d.ns));
+ if( o.isEmpty() ) {
+ // we should drop
+ try {
+ bob res;
+ string errmsg;
+ dropCollection(d.ns, errmsg, res);
+ }
+ catch(...) {
+ log() << "replset error rolling back collection " << d.ns << rsLog;
+ }
+ }
+ }
+ catch(DBException& ) {
+ /* this isn't *that* big a deal, but is bad. */
+ log() << "replSet warning rollback error querying for existence of " << d.ns << " at the primary, ignoring" << rsLog;
+ }
+ }
+ }
+ }
+ else {
+ // todo faster...
+ OpDebug debug;
+ updates++;
+ _updateObjects(/*god*/true, d.ns, i->second, pattern, /*upsert=*/true, /*multi=*/false , /*logtheop=*/false , debug, rs.get() );
+ }
+ }
+ catch(DBException& e) {
+ log() << "replSet exception in rollback ns:" << d.ns << ' ' << pattern.toString() << ' ' << e.toString() << " ndeletes:" << deletes << rsLog;
+ warn = true;
+ }
+ }
+
+ removeSavers.clear(); // this effectively closes all of them
+
+ sethbmsg(str::stream() << "rollback 5 d:" << deletes << " u:" << updates);
+ MemoryMappedFile::flushAll(true);
+ sethbmsg("rollback 6");
+
+ // clean up oplog
+ log(2) << "replSet rollback truncate oplog after " << h.commonPoint.toStringPretty() << rsLog;
+ // todo: fatal error if this throws?
+ oplogDetails->cappedTruncateAfter(rsoplog, h.commonPointOurDiskloc, false);
+
+ /* reset cached lastoptimewritten and h value */
+ loadLastOpTimeWritten();
+
+ sethbmsg("rollback 7");
+ MemoryMappedFile::flushAll(true);
+
+ // done
+ if( warn )
+ sethbmsg("issues during syncRollback, see log");
+ else
+ sethbmsg("rollback done");
+ }
+
+ void ReplSetImpl::syncRollback(OplogReader&r) {
unsigned s = _syncRollback(r);
- if( s )
+ if( s )
sleepsecs(s);
}
- unsigned ReplSetImpl::_syncRollback(OplogReader&r) {
+ unsigned ReplSetImpl::_syncRollback(OplogReader&r) {
assert( !lockedByMe() );
assert( !dbMutex.atLeastReadLocked() );
@@ -604,7 +609,7 @@ namespace mongo {
if( box.getState().secondary() ) {
/* by doing this, we will not service reads (return an error as we aren't in secondary staate.
- that perhaps is moot becasue of the write lock above, but that write lock probably gets deferred
+ that perhaps is moot becasue of the write lock above, but that write lock probably gets deferred
or removed or yielded later anyway.
also, this is better for status reporting - we know what is happening.
@@ -618,7 +623,7 @@ namespace mongo {
r.resetCursor();
/*DBClientConnection us(false, 0, 0);
string errmsg;
- if( !us.connect(HostAndPort::me().toString(),errmsg) ) {
+ if( !us.connect(HostAndPort::me().toString(),errmsg) ) {
sethbmsg("rollback connect to self failure" + errmsg);
return;
}*/
@@ -627,15 +632,15 @@ namespace mongo {
try {
syncRollbackFindCommonPoint(r.conn(), how);
}
- catch( const char *p ) {
+ catch( const char *p ) {
sethbmsg(string("rollback 2 error ") + p);
return 10;
}
- catch( rsfatal& ) {
+ catch( rsfatal& ) {
_fatal();
return 2;
}
- catch( DBException& e ) {
+ catch( DBException& e ) {
sethbmsg(string("rollback 2 exception ") + e.toString() + "; sleeping 1 min");
dbtemprelease r;
sleepsecs(60);
@@ -647,20 +652,20 @@ namespace mongo {
{
incRBID();
- try {
+ try {
syncFixUp(how, r);
}
- catch( rsfatal& ) {
+ catch( rsfatal& ) {
sethbmsg("rollback fixup error");
_fatal();
return 2;
}
- catch(...) {
+ catch(...) {
incRBID(); throw;
}
incRBID();
- /* success - leave "ROLLBACK" state
+ /* success - leave "ROLLBACK" state
can go to SECONDARY once minvalid is achieved
*/
box.change(MemberState::RS_RECOVERING, _self);
diff --git a/db/repl/rs_sync.cpp b/db/repl/rs_sync.cpp
index 9de3f60..8d06fcc 100644
--- a/db/repl/rs_sync.cpp
+++ b/db/repl/rs_sync.cpp
@@ -19,30 +19,21 @@
#include "../../client/dbclient.h"
#include "rs.h"
#include "../repl.h"
-
+#include "connections.h"
namespace mongo {
using namespace bson;
-
extern unsigned replSetForceInitialSyncFailure;
- void startSyncThread() {
- Client::initThread("rs_sync");
- cc().iAmSyncThread();
- theReplSet->syncThread();
- cc().shutdown();
- }
-
+ /* apply the log op that is in param o */
void ReplSetImpl::syncApply(const BSONObj &o) {
- //const char *op = o.getStringField("op");
-
- char db[MaxDatabaseLen];
+ char db[MaxDatabaseNameLen];
const char *ns = o.getStringField("ns");
nsToDatabase(ns, db);
if ( *ns == '.' || *ns == 0 ) {
- if( *o.getStringField("op") == 'n' )
- return;
+ if( *o.getStringField("op") == 'n' )
+ return;
log() << "replSet skipping bad op in oplog: " << o.toString() << endl;
return;
}
@@ -54,19 +45,21 @@ namespace mongo {
applyOperation_inlock(o);
}
+ /* initial oplog application, during initial sync, after cloning.
+ @return false on failure.
+ this method returns an error and doesn't throw exceptions (i think).
+ */
bool ReplSetImpl::initialSyncOplogApplication(
- string hn,
- const Member *primary,
+ const Member *source,
OpTime applyGTE,
- OpTime minValid)
- {
- if( primary == 0 ) return false;
+ OpTime minValid) {
+ if( source == 0 ) return false;
- OpTime ts;
+ const string hn = source->h().toString();
+ OplogReader r;
try {
- OplogReader r;
- if( !r.connect(hn) ) {
- log(2) << "replSet can't connect to " << hn << " to read operations" << rsLog;
+ if( !r.connect(hn) ) {
+ log() << "replSet initial sync error can't connect to " << hn << " to read " << rsoplog << rsLog;
return false;
}
@@ -80,48 +73,63 @@ namespace mongo {
}
assert( r.haveCursor() );
- /* we lock outside the loop to avoid the overhead of locking on every operation. server isn't usable yet anyway! */
- writelock lk("");
-
{
- if( !r.more() ) {
+ if( !r.more() ) {
sethbmsg("replSet initial sync error reading remote oplog");
+ log() << "replSet initial sync error remote oplog (" << rsoplog << ") on host " << hn << " is empty?" << rsLog;
return false;
}
bo op = r.next();
OpTime t = op["ts"]._opTime();
r.putBack(op);
- assert( !t.isNull() );
+
+ if( op.firstElement().fieldName() == string("$err") ) {
+ log() << "replSet initial sync error querying " << rsoplog << " on " << hn << " : " << op.toString() << rsLog;
+ return false;
+ }
+
+ uassert( 13508 , str::stream() << "no 'ts' in first op in oplog: " << op , !t.isNull() );
if( t > applyGTE ) {
sethbmsg(str::stream() << "error " << hn << " oplog wrapped during initial sync");
+ log() << "replSet initial sync expected first optime of " << applyGTE << rsLog;
+ log() << "replSet initial sync but received a first optime of " << t << " from " << hn << rsLog;
return false;
}
}
+ }
+ catch(DBException& e) {
+ log() << "replSet initial sync failing: " << e.toString() << rsLog;
+ return false;
+ }
- // todo : use exhaust
- unsigned long long n = 0;
- while( 1 ) {
+ /* we lock outside the loop to avoid the overhead of locking on every operation. */
+ writelock lk("");
+ // todo : use exhaust
+ OpTime ts;
+ unsigned long long n = 0;
+ while( 1 ) {
+ try {
if( !r.more() )
break;
BSONObj o = r.nextSafe(); /* note we might get "not master" at some point */
{
- //writelock lk("");
-
ts = o["ts"]._opTime();
/* if we have become primary, we dont' want to apply things from elsewhere
- anymore. assumePrimary is in the db lock so we are safe as long as
+ anymore. assumePrimary is in the db lock so we are safe as long as
we check after we locked above. */
- const Member *p1 = box.getPrimary();
- if( p1 != primary || replSetForceInitialSyncFailure ) {
+ if( (source->state() != MemberState::RS_PRIMARY &&
+ source->state() != MemberState::RS_SECONDARY) ||
+ replSetForceInitialSyncFailure ) {
+
int f = replSetForceInitialSyncFailure;
if( f > 0 ) {
replSetForceInitialSyncFailure = f-1;
log() << "replSet test code invoked, replSetForceInitialSyncFailure" << rsLog;
+ throw DBException("forced error",0);
}
- log() << "replSet primary was:" << primary->fullName() << " now:" <<
- (p1 != 0 ? p1->fullName() : "none") << rsLog;
+ log() << "replSet we are now primary" << rsLog;
throw DBException("primary changed",0);
}
@@ -131,38 +139,48 @@ namespace mongo {
}
_logOpObjRS(o); /* with repl sets we write the ops to our oplog too */
}
- if( ++n % 100000 == 0 ) {
+ if( ++n % 100000 == 0 ) {
// simple progress metering
log() << "replSet initialSyncOplogApplication " << n << rsLog;
}
+
+ getDur().commitIfNeeded();
}
- }
- catch(DBException& e) {
- if( ts <= minValid ) {
- // didn't make it far enough
- log() << "replSet initial sync failing, error applying oplog " << e.toString() << rsLog;
- return false;
+ catch (DBException& e) {
+ if( e.getCode() == 11000 || e.getCode() == 11001 ) {
+ // skip duplicate key exceptions
+ continue;
+ }
+
+ if( ts <= minValid ) {
+ // didn't make it far enough
+ log() << "replSet initial sync failing, error applying oplog " << e.toString() << rsLog;
+ return false;
+ }
+
+ // otherwise, whatever
+ break;
}
}
return true;
}
- /* should be in RECOVERING state on arrival here.
+ /* should be in RECOVERING state on arrival here.
readlocks
@return true if transitioned to SECONDARY
*/
- bool ReplSetImpl::tryToGoLiveAsASecondary(OpTime& /*out*/ minvalid) {
- bool golive = false;
+ bool ReplSetImpl::tryToGoLiveAsASecondary(OpTime& /*out*/ minvalid) {
+ bool golive = false;
{
readlock lk("local.replset.minvalid");
BSONObj mv;
- if( Helpers::getSingleton("local.replset.minvalid", mv) ) {
+ if( Helpers::getSingleton("local.replset.minvalid", mv) ) {
minvalid = mv["ts"]._opTime();
- if( minvalid <= lastOpTimeWritten ) {
+ if( minvalid <= lastOpTimeWritten ) {
golive=true;
}
}
- else
+ else
golive = true; /* must have been the original member */
}
if( golive ) {
@@ -172,44 +190,104 @@ namespace mongo {
return golive;
}
- /* tail the primary's oplog. ok to return, will be re-called. */
- void ReplSetImpl::syncTail() {
- // todo : locking vis a vis the mgr...
+ /**
+ * Checks if the oplog given is too far ahead to read from.
+ *
+ * @param r the oplog
+ * @param hn the hostname (for log messages)
+ *
+ * @return if we are stale compared to the oplog on hn
+ */
+ bool ReplSetImpl::_isStale(OplogReader& r, const string& hn) {
+ BSONObj remoteOldestOp = r.findOne(rsoplog, Query());
+ OpTime ts = remoteOldestOp["ts"]._opTime();
+ DEV log() << "replSet remoteOldestOp: " << ts.toStringLong() << rsLog;
+ else log(3) << "replSet remoteOldestOp: " << ts.toStringLong() << rsLog;
+ DEV {
+ // debugging sync1.js...
+ log() << "replSet lastOpTimeWritten: " << lastOpTimeWritten.toStringLong() << rsLog;
+ log() << "replSet our state: " << state().toString() << rsLog;
+ }
+ if( lastOpTimeWritten < ts ) {
+ log() << "replSet error RS102 too stale to catch up, at least from " << hn << rsLog;
+ log() << "replSet our last optime : " << lastOpTimeWritten.toStringLong() << rsLog;
+ log() << "replSet oldest at " << hn << " : " << ts.toStringLong() << rsLog;
+ log() << "replSet See http://www.mongodb.org/display/DOCS/Resyncing+a+Very+Stale+Replica+Set+Member" << rsLog;
+ sethbmsg("error RS102 too stale to catch up");
+ changeState(MemberState::RS_RECOVERING);
+ sleepsecs(120);
+ return true;
+ }
+ return false;
+ }
- const Member *primary = box.getPrimary();
- if( primary == 0 ) return;
- string hn = primary->h().toString();
- OplogReader r;
- if( !r.connect(primary->h().toString()) ) {
+ /**
+ * Tries to connect the oplog reader to a potential sync source. If
+ * successful, it checks that we are not stale compared to this source.
+ *
+ * @param r reader to populate
+ * @param hn hostname to try
+ *
+ * @return if both checks pass, it returns true, otherwise false.
+ */
+ bool ReplSetImpl::_getOplogReader(OplogReader& r, string& hn) {
+ assert(r.conn() == 0);
+
+ if( !r.connect(hn) ) {
log(2) << "replSet can't connect to " << hn << " to read operations" << rsLog;
- return;
+ r.resetConnection();
+ return false;
+ }
+ if( _isStale(r, hn)) {
+ r.resetConnection();
+ return false;
}
+ return true;
+ }
- /* first make sure we are not hopelessly out of sync by being very stale. */
- {
- BSONObj remoteOldestOp = r.findOne(rsoplog, Query());
- OpTime ts = remoteOldestOp["ts"]._opTime();
- DEV log() << "replSet remoteOldestOp: " << ts.toStringLong() << rsLog;
- else log(3) << "replSet remoteOldestOp: " << ts.toStringLong() << rsLog;
- DEV {
- // debugging sync1.js...
- log() << "replSet lastOpTimeWritten: " << lastOpTimeWritten.toStringLong() << rsLog;
- log() << "replSet our state: " << state().toString() << rsLog;
+ /* tail an oplog. ok to return, will be re-called. */
+ void ReplSetImpl::syncTail() {
+ // todo : locking vis a vis the mgr...
+ OplogReader r;
+ string hn;
+
+ const Member *target = box.getPrimary();
+ if (target != 0) {
+ hn = target->h().toString();
+ if (!_getOplogReader(r, hn)) {
+ // we might be stale wrt the primary, but could still sync from
+ // a secondary
+ target = 0;
+ }
+ }
+
+ // if we cannot reach the master but someone else is more up-to-date
+ // than we are, sync from them.
+ if( target == 0 ) {
+ for(Member *m = head(); m; m=m->next()) {
+ hn = m->h().toString();
+ if (m->hbinfo().up() && m->state().readable() &&
+ (m->hbinfo().opTime > lastOpTimeWritten) &&
+ m->config().slaveDelay == 0 &&
+ _getOplogReader(r, hn)) {
+ target = m;
+ break;
+ }
}
- if( lastOpTimeWritten < ts ) {
- log() << "replSet error RS102 too stale to catch up, at least from primary: " << hn << rsLog;
- log() << "replSet our last optime : " << lastOpTimeWritten.toStringLong() << rsLog;
- log() << "replSet oldest at " << hn << " : " << ts.toStringLong() << rsLog;
- log() << "replSet See http://www.mongodb.org/display/DOCS/Resyncing+a+Very+Stale+Replica+Set+Member" << rsLog;
- sethbmsg("error RS102 too stale to catch up");
- sleepsecs(120);
+
+ // no server found
+ if (target == 0) {
+ // if there is no one to sync from
+ OpTime minvalid;
+ tryToGoLiveAsASecondary(minvalid);
return;
}
}
r.tailingQueryGTE(rsoplog, lastOpTimeWritten);
assert( r.haveCursor() );
- assert( r.awaitCapable() );
+
+ uassert(1000, "replSet source for syncing doesn't seem to be await capable -- is it an older version of mongodb?", r.awaitCapable() );
{
if( !r.more() ) {
@@ -222,7 +300,7 @@ namespace mongo {
return;
}
OpTime theirTS = theirLastOp["ts"]._opTime();
- if( theirTS < lastOpTimeWritten ) {
+ if( theirTS < lastOpTimeWritten ) {
log() << "replSet we are ahead of the primary, will try to roll back" << rsLog;
syncRollback(r);
return;
@@ -231,7 +309,7 @@ namespace mongo {
log() << "replSet syncTail condition 1" << rsLog;
sleepsecs(1);
}
- catch(DBException& e) {
+ catch(DBException& e) {
log() << "replSet error querying " << hn << ' ' << e.toString() << rsLog;
sleepsecs(2);
}
@@ -249,12 +327,9 @@ namespace mongo {
BSONObj o = r.nextSafe();
OpTime ts = o["ts"]._opTime();
long long h = o["h"].numberLong();
- if( ts != lastOpTimeWritten || h != lastH ) {
- log(1) << "TEMP our last op time written: " << lastOpTimeWritten.toStringPretty() << endl;
- log(1) << "TEMP primary's GTE: " << ts.toStringPretty() << endl;
- /*
- }*/
-
+ if( ts != lastOpTimeWritten || h != lastH ) {
+ log() << "replSet our last op time written: " << lastOpTimeWritten.toStringPretty() << endl;
+ log() << "replset source's GTE: " << ts.toStringPretty() << endl;
syncRollback(r);
return;
}
@@ -268,49 +343,45 @@ namespace mongo {
while( 1 ) {
while( 1 ) {
- if( !r.moreInCurrentBatch() ) {
- /* we need to occasionally check some things. between
+ if( !r.moreInCurrentBatch() ) {
+ /* we need to occasionally check some things. between
batches is probably a good time. */
/* perhaps we should check this earlier? but not before the rollback checks. */
- if( state().recovering() ) {
+ if( state().recovering() ) {
/* can we go to RS_SECONDARY state? we can if not too old and if minvalid achieved */
OpTime minvalid;
bool golive = ReplSetImpl::tryToGoLiveAsASecondary(minvalid);
if( golive ) {
;
}
- else {
+ else {
sethbmsg(str::stream() << "still syncing, not yet to minValid optime" << minvalid.toString());
}
/* todo: too stale capability */
}
- if( box.getPrimary() != primary )
- return;
+ {
+ const Member *primary = box.getPrimary();
+
+ if( !target->hbinfo().hbstate.readable() ||
+ // if we are not syncing from the primary, return (if
+ // it's up) so that we can try accessing it again
+ (target != primary && primary != 0)) {
+ return;
+ }
+ }
}
if( !r.more() )
break;
- {
+ {
BSONObj o = r.nextSafe(); /* note we might get "not master" at some point */
- {
- writelock lk("");
- /* if we have become primary, we dont' want to apply things from elsewhere
- anymore. assumePrimary is in the db lock so we are safe as long as
- we check after we locked above. */
- if( box.getPrimary() != primary ) {
- if( box.getState().primary() )
- log(0) << "replSet stopping syncTail we are now primary" << rsLog;
- return;
- }
-
- syncApply(o);
- _logOpObjRS(o); /* with repl sets we write the ops to our oplog too: */
- }
int sd = myConfig().slaveDelay;
- if( sd ) {
+ // ignore slaveDelay if the box is still initializing. once
+ // it becomes secondary we can worry about it.
+ if( sd && box.getState().secondary() ) {
const OpTime ts = o["ts"]._opTime();
long long a = ts.getSecs();
long long b = time(0);
@@ -329,13 +400,30 @@ namespace mongo {
sleepsecs(6);
if( time(0) >= waitUntil )
break;
- if( box.getPrimary() != primary )
+ if( !target->hbinfo().hbstate.readable() ) {
break;
+ }
if( myConfig().slaveDelay != sd ) // reconf
break;
}
}
}
+
+ }
+
+ {
+ writelock lk("");
+
+ /* if we have become primary, we dont' want to apply things from elsewhere
+ anymore. assumePrimary is in the db lock so we are safe as long as
+ we check after we locked above. */
+ if( box.getState().primary() ) {
+ log(0) << "replSet stopping syncTail we are now primary" << rsLog;
+ return;
+ }
+
+ syncApply(o);
+ _logOpObjRS(o); /* with repl sets we write the ops to our oplog too: */
}
}
}
@@ -345,8 +433,9 @@ namespace mongo {
// TODO : reuse our connection to the primary.
return;
}
- if( box.getPrimary() != primary )
+ if( !target->hbinfo().hbstate.readable() ) {
return;
+ }
// looping back is ok because this is a tailable cursor
}
}
@@ -357,15 +446,11 @@ namespace mongo {
sleepsecs(1);
return;
}
- if( sp.state.fatal() ) {
+ if( sp.state.fatal() ) {
sleepsecs(5);
return;
}
- /* later, we can sync from up secondaries if we want. tbd. */
- if( sp.primary == 0 )
- return;
-
/* do we have anything at all? */
if( lastOpTimeWritten.isNull() ) {
syncDoInitialSync();
@@ -377,23 +462,64 @@ namespace mongo {
}
void ReplSetImpl::syncThread() {
- if( myConfig().arbiterOnly )
- return;
- while( 1 ) {
+ /* test here was to force a receive timeout
+ ScopedConn c("localhost");
+ bo info;
+ try {
+ log() << "this is temp" << endl;
+ c.runCommand("admin", BSON("sleep"<<120), info);
+ log() << info.toString() << endl;
+ c.runCommand("admin", BSON("sleep"<<120), info);
+ log() << "temp" << endl;
+ }
+ catch( DBException& e ) {
+ log() << e.toString() << endl;
+ c.runCommand("admin", BSON("sleep"<<120), info);
+ log() << "temp" << endl;
+ }
+ */
+
+ while( 1 ) {
+ if( myConfig().arbiterOnly )
+ return;
+
try {
_syncThread();
}
- catch(DBException& e) {
+ catch(DBException& e) {
sethbmsg("syncThread: " + e.toString());
sleepsecs(10);
}
- catch(...) {
+ catch(...) {
sethbmsg("unexpected exception in syncThread()");
- // TODO : SET NOT SECONDARY here.
+ // TODO : SET NOT SECONDARY here?
sleepsecs(60);
}
sleepsecs(1);
+
+ /* normally msgCheckNewState gets called periodically, but in a single node repl set there
+ are no heartbeat threads, so we do it here to be sure. this is relevant if the singleton
+ member has done a stepDown() and needs to come back up.
+ */
+ OCCASIONALLY mgr->send( boost::bind(&Manager::msgCheckNewState, theReplSet->mgr) );
+ }
+ }
+
+ void startSyncThread() {
+ static int n;
+ if( n != 0 ) {
+ log() << "replSet ERROR : more than one sync thread?" << rsLog;
+ assert( n == 0 );
+ }
+ n++;
+
+ Client::initThread("replica set sync");
+ cc().iAmSyncThread();
+ if (!noauth) {
+ cc().getAuthenticationInfo()->authorize("local");
}
+ theReplSet->syncThread();
+ cc().shutdown();
}
}