summaryrefslogtreecommitdiff
path: root/db/repl/rs_sync.cpp
diff options
context:
space:
mode:
authorAntonin Kral <a.kral@bobek.cz>2011-12-15 09:35:47 +0100
committerAntonin Kral <a.kral@bobek.cz>2011-12-15 09:35:47 +0100
commitf0d9a01bccdaeb466c12c92057914bbfef59526c (patch)
tree7679efa1f0daf7d1d906882a15dc77af6b7aef32 /db/repl/rs_sync.cpp
parent5d342a758c6095b4d30aba0750b54f13b8916f51 (diff)
downloadmongodb-f0d9a01bccdaeb466c12c92057914bbfef59526c.tar.gz
Imported Upstream version 2.0.2
Diffstat (limited to 'db/repl/rs_sync.cpp')
-rw-r--r--db/repl/rs_sync.cpp60
1 files changed, 38 insertions, 22 deletions
diff --git a/db/repl/rs_sync.cpp b/db/repl/rs_sync.cpp
index b29328b..8cd3e14 100644
--- a/db/repl/rs_sync.cpp
+++ b/db/repl/rs_sync.cpp
@@ -32,17 +32,19 @@ namespace mongo {
}
}
- /* apply the log op that is in param o */
- void ReplSetImpl::syncApply(const BSONObj &o) {
+ /* apply the log op that is in param o
+ @return bool failedUpdate
+ */
+ bool ReplSetImpl::syncApply(const BSONObj &o) {
const char *ns = o.getStringField("ns");
if ( *ns == '.' || *ns == 0 ) {
blank(o);
- return;
+ return false;
}
Client::Context ctx(ns);
ctx.getClient()->curop()->reset();
- applyOperation_inlock(o);
+ return applyOperation_inlock(o);
}
/* initial oplog application, during initial sync, after cloning.
@@ -57,6 +59,7 @@ namespace mongo {
const string hn = source->h().toString();
OplogReader r;
+
try {
if( !r.connect(hn) ) {
log() << "replSet initial sync error can't connect to " << hn << " to read " << rsoplog << rsLog;
@@ -113,12 +116,9 @@ namespace mongo {
if( !r.more() )
break;
BSONObj o = r.nextSafe(); /* note we might get "not master" at some point */
- {
- ts = o["ts"]._opTime();
+ ts = o["ts"]._opTime();
- /* if we have become primary, we dont' want to apply things from elsewhere
- anymore. assumePrimary is in the db lock so we are safe as long as
- we check after we locked above. */
+ {
if( (source->state() != MemberState::RS_PRIMARY &&
source->state() != MemberState::RS_SECONDARY) ||
replSetForceInitialSyncFailure ) {
@@ -133,9 +133,12 @@ namespace mongo {
throw DBException("primary changed",0);
}
- if( ts >= applyGTE ) {
- // optimes before we started copying need not be applied.
- syncApply(o);
+ if( ts >= applyGTE ) { // optimes before we started copying need not be applied.
+ bool failedUpdate = syncApply(o);
+ if( failedUpdate && shouldRetry(o, hn)) {
+ failedUpdate = syncApply(o);
+ uassert(15915, "replSet update still fails after adding missing object", !failedUpdate);
+ }
}
_logOpObjRS(o); /* with repl sets we write the ops to our oplog too */
}
@@ -149,7 +152,11 @@ namespace mongo {
start = now;
}
}
-
+
+ if ( ts > minValid ) {
+ break;
+ }
+
getDur().commitIfNeeded();
}
catch (DBException& e) {
@@ -157,7 +164,7 @@ namespace mongo {
if( e.getCode() == 11000 || e.getCode() == 11001 ) {
continue;
}
-
+
// handle cursor not found (just requery)
if( e.getCode() == 13127 ) {
r.resetCursor();
@@ -290,7 +297,7 @@ namespace mongo {
target = 0;
}
}
-
+
// no server found
if (target == 0) {
// if there is no one to sync from
@@ -298,7 +305,7 @@ namespace mongo {
tryToGoLiveAsASecondary(minvalid);
return;
}
-
+
r.tailingQueryGTE(rsoplog, lastOpTimeWritten);
// if target cut connections between connecting and querying (for
// example, because it stepped down) we might not have a cursor
@@ -408,7 +415,7 @@ namespace mongo {
if( !target->hbinfo().hbstate.readable() ) {
break;
}
-
+
if( myConfig().slaveDelay != sd ) // reconf
break;
}
@@ -429,7 +436,7 @@ namespace mongo {
}
syncApply(o);
- _logOpObjRS(o); // with repl sets we write the ops to our oplog too
+ _logOpObjRS(o); // with repl sets we write the ops to our oplog too
}
catch (DBException& e) {
sethbmsg(str::stream() << "syncTail: " << e.toString() << ", syncing: " << o);
@@ -444,7 +451,7 @@ namespace mongo {
// TODO : reuse our connection to the primary.
return;
}
-
+
if( !target->hbinfo().hbstate.readable() ) {
return;
}
@@ -458,7 +465,7 @@ namespace mongo {
sleepsecs(1);
return;
}
- if( sp.state.fatal() || sp.state.startup() ) {
+ if( _blockSync || sp.state.fatal() || sp.state.startup() ) {
sleepsecs(5);
return;
}
@@ -530,6 +537,15 @@ namespace mongo {
replLocalAuth();
}
+ void ReplSetImpl::blockSync(bool block) {
+ _blockSync = block;
+ if (_blockSync) {
+ // syncing is how we get into SECONDARY state, so we'll be stuck in
+ // RECOVERING until we unblock
+ changeState(MemberState::RS_RECOVERING);
+ }
+ }
+
void GhostSync::associateSlave(const BSONObj& id, const int memberId) {
const OID rid = id["_id"].OID();
rwlock lk( _lock , true );
@@ -556,10 +572,10 @@ namespace mongo {
OCCASIONALLY warning() << "couldn't update slave " << rid << " no entry" << rsLog;
return;
}
-
+
GhostSlave& slave = i->second;
if (!slave.init) {
- OCCASIONALLY log() << "couldn't update slave " << rid << " not init" << rsLog;
+ OCCASIONALLY log() << "couldn't update slave " << rid << " not init" << rsLog;
return;
}