summaryrefslogtreecommitdiff
path: root/db/repl
diff options
context:
space:
mode:
Diffstat (limited to 'db/repl')
-rw-r--r--db/repl/connections.h46
-rw-r--r--db/repl/health.cpp5
-rw-r--r--db/repl/heartbeat.cpp229
-rw-r--r--db/repl/manager.cpp36
-rw-r--r--db/repl/rs.cpp1
-rw-r--r--db/repl/rs.h8
-rw-r--r--db/repl/rs_config.h16
-rw-r--r--db/repl/rs_initialsync.cpp5
-rw-r--r--db/repl/rs_member.h6
-rw-r--r--db/repl/rs_sync.cpp60
10 files changed, 282 insertions, 130 deletions
diff --git a/db/repl/connections.h b/db/repl/connections.h
index 78cfb30..61c581b 100644
--- a/db/repl/connections.h
+++ b/db/repl/connections.h
@@ -47,6 +47,10 @@ namespace mongo {
~ScopedConn() {
// conLock releases...
}
+ void reconnect() {
+ conn()->port().shutdown();
+ connect();
+ }
/* If we were to run a query and not exhaust the cursor, future use of the connection would be problematic.
So here what we do is wrapper known safe methods and not allow cursor-style queries at all. This makes
@@ -61,9 +65,6 @@ namespace mongo {
BSONObj findOne(const string &ns, const Query& q, const BSONObj *fieldsToReturn = 0, int queryOptions = 0) {
return conn()->findOne(ns, q, fieldsToReturn, queryOptions);
}
- void setTimeout(double to) {
- conn()->setSoTimeout(to);
- }
private:
auto_ptr<scoped_lock> connLock;
@@ -78,15 +79,36 @@ namespace mongo {
typedef map<string,ScopedConn::X*> M;
static M& _map;
DBClientConnection* conn() { return &x->cc; }
+ const string _hostport;
+
+ // we should already be locked...
+ bool connect() {
+ string err;
+ if (!x->cc.connect(_hostport, err)) {
+ log() << "couldn't connect to " << _hostport << ": " << err << rsLog;
+ return false;
+ }
+
+ // if we cannot authenticate against a member, then either its key file
+ // or our key file has to change. if our key file has to change, we'll
+ // be rebooting. if their file has to change, they'll be rebooted so the
+ // connection created above will go dead, reconnect, and reauth.
+ if (!noauth && !x->cc.auth("local", internalSecurity.user, internalSecurity.pwd, err, false)) {
+ log() << "could not authenticate against " << _hostport << ", " << err << rsLog;
+ return false;
+ }
+
+ return true;
+ }
};
- inline ScopedConn::ScopedConn(string hostport) {
+ inline ScopedConn::ScopedConn(string hostport) : _hostport(hostport) {
bool first = false;
{
scoped_lock lk(mapMutex);
- x = _map[hostport];
+ x = _map[_hostport];
if( x == 0 ) {
- x = _map[hostport] = new X();
+ x = _map[_hostport] = new X();
first = true;
connLock.reset( new scoped_lock(x->z) );
}
@@ -96,17 +118,7 @@ namespace mongo {
return;
}
- // we already locked above...
- string err;
- if (!x->cc.connect(hostport, err)) {
- log() << "couldn't connect to " << hostport << ": " << err << rsLog;
- return;
- }
-
- if (!noauth && !x->cc.auth("local", internalSecurity.user, internalSecurity.pwd, err, false)) {
- log() << "could not authenticate against " << conn()->toString() << ", " << err << rsLog;
- return;
- }
+ connect();
}
}
diff --git a/db/repl/health.cpp b/db/repl/health.cpp
index 711b457..7e5a39f 100644
--- a/db/repl/health.cpp
+++ b/db/repl/health.cpp
@@ -402,6 +402,11 @@ namespace mongo {
string s = m->lhb();
if( !s.empty() )
bb.append("errmsg", s);
+
+ if (m->hbinfo().authIssue) {
+ bb.append("authenticated", false);
+ }
+
v.push_back(bb.obj());
m = m->next();
}
diff --git a/db/repl/heartbeat.cpp b/db/repl/heartbeat.cpp
index 7d3f78c..138ba45 100644
--- a/db/repl/heartbeat.cpp
+++ b/db/repl/heartbeat.cpp
@@ -51,11 +51,14 @@ namespace mongo {
/* { replSetHeartbeat : <setname> } */
class CmdReplSetHeartbeat : public ReplSetCommand {
public:
- virtual bool adminOnly() const { return false; }
CmdReplSetHeartbeat() : ReplSetCommand("replSetHeartbeat") { }
virtual bool run(const string& , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool fromRepl) {
- if( replSetBlind )
+ if( replSetBlind ) {
+ if (theReplSet) {
+ errmsg = str::stream() << theReplSet->selfFullName() << " is blind";
+ }
return false;
+ }
/* we don't call ReplSetCommand::check() here because heartbeat
checks many things that are pre-initialization. */
@@ -99,8 +102,8 @@ namespace mongo {
if( !from.empty() ) {
replSettings.discoveredSeeds.insert(from);
}
- errmsg = "still initializing";
- return false;
+ result.append("hbmsg", "still initializing");
+ return true;
}
if( theReplSet->name() != cmdObj.getStringField("replSetHeartbeat") ) {
@@ -123,32 +126,54 @@ namespace mongo {
}
} cmdReplSetHeartbeat;
- /* throws dbexception */
- bool requestHeartbeat(string setName, string from, string memberFullName, BSONObj& result, int myCfgVersion, int& theirCfgVersion, bool checkEmpty) {
+ bool requestHeartbeat(string setName, string from, string memberFullName, BSONObj& result,
+ int myCfgVersion, int& theirCfgVersion, bool checkEmpty) {
if( replSetBlind ) {
- //sleepmillis( rand() );
return false;
}
- BSONObj cmd = BSON( "replSetHeartbeat" << setName << "v" << myCfgVersion << "pv" << 1 << "checkEmpty" << checkEmpty << "from" << from );
+ BSONObj cmd = BSON( "replSetHeartbeat" << setName <<
+ "v" << myCfgVersion <<
+ "pv" << 1 <<
+ "checkEmpty" << checkEmpty <<
+ "from" << from );
- // we might be talking to ourself - generally not a great idea to do outbound waiting calls in a write lock
- assert( !dbMutex.isWriteLocked() );
-
- // these are slow (multisecond to respond), so generally we don't want to be locked, at least not without
+ // generally not a great idea to do outbound waiting calls in a
+ // write lock. heartbeats can be slow (multisecond to respond), so
+ // generally we don't want to be locked, at least not without
// thinking acarefully about it first.
- assert( theReplSet == 0 || !theReplSet->lockedByMe() );
+ uassert(15900, "can't heartbeat: too much lock",
+ !dbMutex.isWriteLocked() || theReplSet == 0 || !theReplSet->lockedByMe() );
ScopedConn conn(memberFullName);
return conn.runCommand("admin", cmd, result, 0);
}
- /* poll every other set member to check its status */
+ /**
+ * Poll every other set member to check its status.
+ *
+ * A detail about local machines and authentication: suppose we have 2
+ * members, A and B, on the same machine using different keyFiles. A is
+ * primary. If we're just starting the set, there are no admin users, so A
+ * and B can access each other because it's local access.
+ *
+ * Then we add a user to A. B cannot sync this user from A, because as soon
+ * as we add a an admin user, A requires auth. However, A can still
+ * heartbeat B, because B *doesn't* have an admin user. So A can reach B
+ * but B cannot reach A.
+ *
+ * Once B is restarted with the correct keyFile, everything should work as
+ * expected.
+ */
class ReplSetHealthPollTask : public task::Task {
+ private:
HostAndPort h;
HeartbeatInfo m;
+ int tries;
+ const int threshold;
public:
- ReplSetHealthPollTask(const HostAndPort& hh, const HeartbeatInfo& mm) : h(hh), m(mm) { }
+ ReplSetHealthPollTask(const HostAndPort& hh, const HeartbeatInfo& mm)
+ : h(hh), m(mm), tries(0), threshold(15) { }
string name() const { return "rsHealthPoll"; }
void doWork() {
@@ -163,16 +188,7 @@ namespace mongo {
BSONObj info;
int theirConfigVersion = -10000;
- Timer timer;
-
- bool ok = requestHeartbeat(theReplSet->name(), theReplSet->selfFullName(), h.toString(), info, theReplSet->config().version, theirConfigVersion);
-
- mem.ping = (unsigned int)timer.millis();
-
- time_t before = timer.startTime() / 1000000;
- // we set this on any response - we don't get this far if
- // couldn't connect because exception is thrown
- time_t after = mem.lastHeartbeat = before + (mem.ping / 1000);
+ bool ok = _requestHeartbeat(mem, info, theirConfigVersion);
// weight new ping with old pings
// on the first ping, just use the ping value
@@ -180,68 +196,12 @@ namespace mongo {
mem.ping = (unsigned int)((old.ping * .8) + (mem.ping * .2));
}
- if ( info["time"].isNumber() ) {
- long long t = info["time"].numberLong();
- if( t > after )
- mem.skew = (int) (t - after);
- else if( t < before )
- mem.skew = (int) (t - before); // negative
- }
- else {
- // it won't be there if remote hasn't initialized yet
- if( info.hasElement("time") )
- warning() << "heatbeat.time isn't a number: " << info << endl;
- mem.skew = INT_MIN;
- }
-
- {
- be state = info["state"];
- if( state.ok() )
- mem.hbstate = MemberState(state.Int());
- }
if( ok ) {
- HeartbeatInfo::numPings++;
-
- if( mem.upSince == 0 ) {
- log() << "replSet info member " << h.toString() << " is up" << rsLog;
- mem.upSince = mem.lastHeartbeat;
- }
- mem.health = 1.0;
- mem.lastHeartbeatMsg = info["hbmsg"].String();
- if( info.hasElement("opTime") )
- mem.opTime = info["opTime"].Date();
-
- // see if this member is in the electable set
- if( info["e"].eoo() ) {
- // for backwards compatibility
- const Member *member = theReplSet->findById(mem.id());
- if (member && member->config().potentiallyHot()) {
- theReplSet->addToElectable(mem.id());
- }
- else {
- theReplSet->rmFromElectable(mem.id());
- }
- }
- // add this server to the electable set if it is within 10
- // seconds of the latest optime we know of
- else if( info["e"].trueValue() &&
- mem.opTime >= theReplSet->lastOpTimeWritten.getSecs() - 10) {
- unsigned lastOp = theReplSet->lastOtherOpTime().getSecs();
- if (lastOp > 0 && mem.opTime >= lastOp - 10) {
- theReplSet->addToElectable(mem.id());
- }
- }
- else {
- theReplSet->rmFromElectable(mem.id());
- }
-
- be cfg = info["config"];
- if( cfg.ok() ) {
- // received a new config
- boost::function<void()> f =
- boost::bind(&Manager::msgReceivedNewConfig, theReplSet->mgr, cfg.Obj().copy());
- theReplSet->mgr->send(f);
- }
+ up(info, mem);
+ }
+ else if (!info["errmsg"].eoo() &&
+ info["errmsg"].str() == "need to login") {
+ authIssue(mem);
}
else {
down(mem, info.getStringField("errmsg"));
@@ -271,7 +231,58 @@ namespace mongo {
}
private:
+ bool _requestHeartbeat(HeartbeatInfo& mem, BSONObj& info, int& theirConfigVersion) {
+ if (tries++ % threshold == (threshold - 1)) {
+ ScopedConn conn(h.toString());
+ conn.reconnect();
+ }
+
+ Timer timer;
+
+ bool ok = requestHeartbeat(theReplSet->name(), theReplSet->selfFullName(),
+ h.toString(), info, theReplSet->config().version, theirConfigVersion);
+
+ mem.ping = (unsigned int)timer.millis();
+
+ time_t before = timer.startTime() / 1000000;
+ // we set this on any response - we don't get this far if
+ // couldn't connect because exception is thrown
+ time_t after = mem.lastHeartbeat = before + (mem.ping / 1000);
+
+ if ( info["time"].isNumber() ) {
+ long long t = info["time"].numberLong();
+ if( t > after )
+ mem.skew = (int) (t - after);
+ else if( t < before )
+ mem.skew = (int) (t - before); // negative
+ }
+ else {
+ // it won't be there if remote hasn't initialized yet
+ if( info.hasElement("time") )
+ warning() << "heatbeat.time isn't a number: " << info << endl;
+ mem.skew = INT_MIN;
+ }
+
+ {
+ be state = info["state"];
+ if( state.ok() )
+ mem.hbstate = MemberState(state.Int());
+ }
+
+ return ok;
+ }
+
+ void authIssue(HeartbeatInfo& mem) {
+ mem.authIssue = true;
+ mem.hbstate = MemberState::RS_UNKNOWN;
+
+ // set health to 0 so that this doesn't count towards majority
+ mem.health = 0.0;
+ theReplSet->rmFromElectable(mem.id());
+ }
+
void down(HeartbeatInfo& mem, string msg) {
+ mem.authIssue = false;
mem.health = 0.0;
mem.ping = 0;
if( mem.upSince || mem.downSince == 0 ) {
@@ -283,6 +294,52 @@ namespace mongo {
mem.lastHeartbeatMsg = msg;
theReplSet->rmFromElectable(mem.id());
}
+
+ void up(const BSONObj& info, HeartbeatInfo& mem) {
+ HeartbeatInfo::numPings++;
+ mem.authIssue = false;
+
+ if( mem.upSince == 0 ) {
+ log() << "replSet member " << h.toString() << " is up" << rsLog;
+ mem.upSince = mem.lastHeartbeat;
+ }
+ mem.health = 1.0;
+ mem.lastHeartbeatMsg = info["hbmsg"].String();
+ if( info.hasElement("opTime") )
+ mem.opTime = info["opTime"].Date();
+
+ // see if this member is in the electable set
+ if( info["e"].eoo() ) {
+ // for backwards compatibility
+ const Member *member = theReplSet->findById(mem.id());
+ if (member && member->config().potentiallyHot()) {
+ theReplSet->addToElectable(mem.id());
+ }
+ else {
+ theReplSet->rmFromElectable(mem.id());
+ }
+ }
+ // add this server to the electable set if it is within 10
+ // seconds of the latest optime we know of
+ else if( info["e"].trueValue() &&
+ mem.opTime >= theReplSet->lastOpTimeWritten.getSecs() - 10) {
+ unsigned lastOp = theReplSet->lastOtherOpTime().getSecs();
+ if (lastOp > 0 && mem.opTime >= lastOp - 10) {
+ theReplSet->addToElectable(mem.id());
+ }
+ }
+ else {
+ theReplSet->rmFromElectable(mem.id());
+ }
+
+ be cfg = info["config"];
+ if( cfg.ok() ) {
+ // received a new config
+ boost::function<void()> f =
+ boost::bind(&Manager::msgReceivedNewConfig, theReplSet->mgr, cfg.Obj().copy());
+ theReplSet->mgr->send(f);
+ }
+ }
};
void ReplSetImpl::endOldHealthTasks() {
diff --git a/db/repl/manager.cpp b/db/repl/manager.cpp
index 3c4c0eb..c91adc3 100644
--- a/db/repl/manager.cpp
+++ b/db/repl/manager.cpp
@@ -119,6 +119,39 @@ namespace mongo {
}
}
+ void Manager::checkAuth() {
+ int down = 0, authIssue = 0, total = 0;
+
+ for( Member *m = rs->head(); m; m=m->next() ) {
+ total++;
+
+ // all authIssue servers will also be not up
+ if (!m->hbinfo().up()) {
+ down++;
+ if (m->hbinfo().authIssue) {
+ authIssue++;
+ }
+ }
+ }
+
+ // if all nodes are down or failed auth AND at least one failed
+ // auth, go into recovering. If all nodes are down, stay a
+ // secondary.
+ if (authIssue > 0 && down == total) {
+ log() << "replset error could not reach/authenticate against any members" << endl;
+
+ if (rs->box.getPrimary() == rs->_self) {
+ log() << "auth problems, relinquishing primary" << rsLog;
+ rs->relinquish();
+ }
+
+ rs->blockSync(true);
+ }
+ else {
+ rs->blockSync(false);
+ }
+ }
+
/** called as the health threads get new results */
void Manager::msgCheckNewState() {
{
@@ -130,7 +163,8 @@ namespace mongo {
if( busyWithElectSelf ) return;
checkElectableSet();
-
+ checkAuth();
+
const Member *p = rs->box.getPrimary();
if( p && p != rs->_self ) {
if( !p->hbinfo().up() ||
diff --git a/db/repl/rs.cpp b/db/repl/rs.cpp
index 1fbbc10..f827291 100644
--- a/db/repl/rs.cpp
+++ b/db/repl/rs.cpp
@@ -329,6 +329,7 @@ namespace mongo {
ReplSetImpl::ReplSetImpl(ReplSetCmdline& replSetCmdline) : elect(this),
_currentSyncTarget(0),
+ _blockSync(false),
_hbmsgTime(0),
_self(0),
_maintenanceMode(0),
diff --git a/db/repl/rs.h b/db/repl/rs.h
index 61041a6..2b3ea9b 100644
--- a/db/repl/rs.h
+++ b/db/repl/rs.h
@@ -93,6 +93,7 @@ namespace mongo {
void noteARemoteIsPrimary(const Member *);
void checkElectableSet();
+ void checkAuth();
virtual void starting();
public:
Manager(ReplSetImpl *rs);
@@ -348,6 +349,9 @@ namespace mongo {
const Member* getMemberToSyncTo();
Member* _currentSyncTarget;
+ bool _blockSync;
+ void blockSync(bool block);
+
// set of electable members' _ids
set<unsigned> _electableSet;
protected:
@@ -491,7 +495,7 @@ namespace mongo {
void _syncThread();
bool tryToGoLiveAsASecondary(OpTime&); // readlocks
void syncTail();
- void syncApply(const BSONObj &o);
+ bool syncApply(const BSONObj &o);
unsigned _syncRollback(OplogReader& r);
void syncRollback(OplogReader& r);
void syncFixUp(HowToFixUp& h, OplogReader& r);
@@ -577,7 +581,7 @@ namespace mongo {
* that still need to be checked for auth.
*/
bool checkAuth(string& errmsg, BSONObjBuilder& result) {
- if( !noauth && adminOnly() ) {
+ if( !noauth ) {
AuthenticationInfo *ai = cc().getAuthenticationInfo();
if (!ai->isAuthorizedForLock("admin", locktype())) {
errmsg = "replSet command unauthorized";
diff --git a/db/repl/rs_config.h b/db/repl/rs_config.h
index f69052a..b22b61e 100644
--- a/db/repl/rs_config.h
+++ b/db/repl/rs_config.h
@@ -80,6 +80,22 @@ namespace mongo {
}
}
bool operator==(const MemberCfg& r) const {
+ if (!tags.empty() || !r.tags.empty()) {
+ if (tags.size() != r.tags.size()) {
+ return false;
+ }
+
+ // if they are the same size and not equal, at least one
+ // element in A must be different in B
+ for (map<string,string>::const_iterator lit = tags.begin(); lit != tags.end(); lit++) {
+ map<string,string>::const_iterator rit = r.tags.find((*lit).first);
+
+ if (rit == r.tags.end() || (*lit).second != (*rit).second) {
+ return false;
+ }
+ }
+ }
+
return _id==r._id && votes == r.votes && h == r.h && priority == r.priority &&
arbiterOnly == r.arbiterOnly && slaveDelay == r.slaveDelay && hidden == r.hidden &&
buildIndexes == buildIndexes;
diff --git a/db/repl/rs_initialsync.cpp b/db/repl/rs_initialsync.cpp
index 101b03a..112d739 100644
--- a/db/repl/rs_initialsync.cpp
+++ b/db/repl/rs_initialsync.cpp
@@ -81,6 +81,7 @@ namespace mongo {
const Member* ReplSetImpl::getMemberToSyncTo() {
Member *closest = 0;
+ bool buildIndexes = true;
// wait for 2N pings before choosing a sync target
if (_cfg) {
@@ -90,11 +91,15 @@ namespace mongo {
OCCASIONALLY log() << "waiting for " << needMorePings << " pings from other members before syncing" << endl;
return NULL;
}
+
+ buildIndexes = myConfig().buildIndexes;
}
// find the member with the lowest ping time that has more data than me
for (Member *m = _members.head(); m; m = m->next()) {
if (m->hbinfo().up() &&
+ // make sure members with buildIndexes sync from other members w/indexes
+ (!buildIndexes || (buildIndexes && m->config().buildIndexes)) &&
(m->state() == MemberState::RS_PRIMARY ||
(m->state() == MemberState::RS_SECONDARY && m->hbinfo().opTime > lastOpTimeWritten)) &&
(!closest || m->hbinfo().ping < closest->hbinfo().ping)) {
diff --git a/db/repl/rs_member.h b/db/repl/rs_member.h
index d60bb52..38b6c9b 100644
--- a/db/repl/rs_member.h
+++ b/db/repl/rs_member.h
@@ -69,7 +69,8 @@ namespace mongo {
class HeartbeatInfo {
unsigned _id;
public:
- HeartbeatInfo() : _id(0xffffffff),hbstate(MemberState::RS_UNKNOWN),health(-1.0),downSince(0),skew(INT_MIN) { }
+ HeartbeatInfo() : _id(0xffffffff), hbstate(MemberState::RS_UNKNOWN), health(-1.0),
+ downSince(0), skew(INT_MIN), authIssue(false) { }
HeartbeatInfo(unsigned id);
unsigned id() const { return _id; }
MemberState hbstate;
@@ -80,6 +81,7 @@ namespace mongo {
DiagStr lastHeartbeatMsg;
OpTime opTime;
int skew;
+ bool authIssue;
unsigned int ping; // milliseconds
static unsigned int numPings;
@@ -94,7 +96,7 @@ namespace mongo {
bool changed(const HeartbeatInfo& old) const;
};
- inline HeartbeatInfo::HeartbeatInfo(unsigned id) : _id(id) {
+ inline HeartbeatInfo::HeartbeatInfo(unsigned id) : _id(id), authIssue(false) {
hbstate = MemberState::RS_UNKNOWN;
health = -1.0;
downSince = 0;
diff --git a/db/repl/rs_sync.cpp b/db/repl/rs_sync.cpp
index b29328b..8cd3e14 100644
--- a/db/repl/rs_sync.cpp
+++ b/db/repl/rs_sync.cpp
@@ -32,17 +32,19 @@ namespace mongo {
}
}
- /* apply the log op that is in param o */
- void ReplSetImpl::syncApply(const BSONObj &o) {
+ /* apply the log op that is in param o
+ @return bool failedUpdate
+ */
+ bool ReplSetImpl::syncApply(const BSONObj &o) {
const char *ns = o.getStringField("ns");
if ( *ns == '.' || *ns == 0 ) {
blank(o);
- return;
+ return false;
}
Client::Context ctx(ns);
ctx.getClient()->curop()->reset();
- applyOperation_inlock(o);
+ return applyOperation_inlock(o);
}
/* initial oplog application, during initial sync, after cloning.
@@ -57,6 +59,7 @@ namespace mongo {
const string hn = source->h().toString();
OplogReader r;
+
try {
if( !r.connect(hn) ) {
log() << "replSet initial sync error can't connect to " << hn << " to read " << rsoplog << rsLog;
@@ -113,12 +116,9 @@ namespace mongo {
if( !r.more() )
break;
BSONObj o = r.nextSafe(); /* note we might get "not master" at some point */
- {
- ts = o["ts"]._opTime();
+ ts = o["ts"]._opTime();
- /* if we have become primary, we dont' want to apply things from elsewhere
- anymore. assumePrimary is in the db lock so we are safe as long as
- we check after we locked above. */
+ {
if( (source->state() != MemberState::RS_PRIMARY &&
source->state() != MemberState::RS_SECONDARY) ||
replSetForceInitialSyncFailure ) {
@@ -133,9 +133,12 @@ namespace mongo {
throw DBException("primary changed",0);
}
- if( ts >= applyGTE ) {
- // optimes before we started copying need not be applied.
- syncApply(o);
+ if( ts >= applyGTE ) { // optimes before we started copying need not be applied.
+ bool failedUpdate = syncApply(o);
+ if( failedUpdate && shouldRetry(o, hn)) {
+ failedUpdate = syncApply(o);
+ uassert(15915, "replSet update still fails after adding missing object", !failedUpdate);
+ }
}
_logOpObjRS(o); /* with repl sets we write the ops to our oplog too */
}
@@ -149,7 +152,11 @@ namespace mongo {
start = now;
}
}
-
+
+ if ( ts > minValid ) {
+ break;
+ }
+
getDur().commitIfNeeded();
}
catch (DBException& e) {
@@ -157,7 +164,7 @@ namespace mongo {
if( e.getCode() == 11000 || e.getCode() == 11001 ) {
continue;
}
-
+
// handle cursor not found (just requery)
if( e.getCode() == 13127 ) {
r.resetCursor();
@@ -290,7 +297,7 @@ namespace mongo {
target = 0;
}
}
-
+
// no server found
if (target == 0) {
// if there is no one to sync from
@@ -298,7 +305,7 @@ namespace mongo {
tryToGoLiveAsASecondary(minvalid);
return;
}
-
+
r.tailingQueryGTE(rsoplog, lastOpTimeWritten);
// if target cut connections between connecting and querying (for
// example, because it stepped down) we might not have a cursor
@@ -408,7 +415,7 @@ namespace mongo {
if( !target->hbinfo().hbstate.readable() ) {
break;
}
-
+
if( myConfig().slaveDelay != sd ) // reconf
break;
}
@@ -429,7 +436,7 @@ namespace mongo {
}
syncApply(o);
- _logOpObjRS(o); // with repl sets we write the ops to our oplog too
+ _logOpObjRS(o); // with repl sets we write the ops to our oplog too
}
catch (DBException& e) {
sethbmsg(str::stream() << "syncTail: " << e.toString() << ", syncing: " << o);
@@ -444,7 +451,7 @@ namespace mongo {
// TODO : reuse our connection to the primary.
return;
}
-
+
if( !target->hbinfo().hbstate.readable() ) {
return;
}
@@ -458,7 +465,7 @@ namespace mongo {
sleepsecs(1);
return;
}
- if( sp.state.fatal() || sp.state.startup() ) {
+ if( _blockSync || sp.state.fatal() || sp.state.startup() ) {
sleepsecs(5);
return;
}
@@ -530,6 +537,15 @@ namespace mongo {
replLocalAuth();
}
+ void ReplSetImpl::blockSync(bool block) {
+ _blockSync = block;
+ if (_blockSync) {
+ // syncing is how we get into SECONDARY state, so we'll be stuck in
+ // RECOVERING until we unblock
+ changeState(MemberState::RS_RECOVERING);
+ }
+ }
+
void GhostSync::associateSlave(const BSONObj& id, const int memberId) {
const OID rid = id["_id"].OID();
rwlock lk( _lock , true );
@@ -556,10 +572,10 @@ namespace mongo {
OCCASIONALLY warning() << "couldn't update slave " << rid << " no entry" << rsLog;
return;
}
-
+
GhostSlave& slave = i->second;
if (!slave.init) {
- OCCASIONALLY log() << "couldn't update slave " << rid << " not init" << rsLog;
+ OCCASIONALLY log() << "couldn't update slave " << rid << " not init" << rsLog;
return;
}