diff options
Diffstat (limited to 'db/repl/rs_sync.cpp')
-rw-r--r-- | db/repl/rs_sync.cpp | 60 |
1 files changed, 38 insertions, 22 deletions
diff --git a/db/repl/rs_sync.cpp b/db/repl/rs_sync.cpp index b29328b..8cd3e14 100644 --- a/db/repl/rs_sync.cpp +++ b/db/repl/rs_sync.cpp @@ -32,17 +32,19 @@ namespace mongo { } } - /* apply the log op that is in param o */ - void ReplSetImpl::syncApply(const BSONObj &o) { + /* apply the log op that is in param o + @return bool failedUpdate + */ + bool ReplSetImpl::syncApply(const BSONObj &o) { const char *ns = o.getStringField("ns"); if ( *ns == '.' || *ns == 0 ) { blank(o); - return; + return false; } Client::Context ctx(ns); ctx.getClient()->curop()->reset(); - applyOperation_inlock(o); + return applyOperation_inlock(o); } /* initial oplog application, during initial sync, after cloning. @@ -57,6 +59,7 @@ namespace mongo { const string hn = source->h().toString(); OplogReader r; + try { if( !r.connect(hn) ) { log() << "replSet initial sync error can't connect to " << hn << " to read " << rsoplog << rsLog; @@ -113,12 +116,9 @@ namespace mongo { if( !r.more() ) break; BSONObj o = r.nextSafe(); /* note we might get "not master" at some point */ - { - ts = o["ts"]._opTime(); + ts = o["ts"]._opTime(); - /* if we have become primary, we dont' want to apply things from elsewhere - anymore. assumePrimary is in the db lock so we are safe as long as - we check after we locked above. */ + { if( (source->state() != MemberState::RS_PRIMARY && source->state() != MemberState::RS_SECONDARY) || replSetForceInitialSyncFailure ) { @@ -133,9 +133,12 @@ namespace mongo { throw DBException("primary changed",0); } - if( ts >= applyGTE ) { - // optimes before we started copying need not be applied. - syncApply(o); + if( ts >= applyGTE ) { // optimes before we started copying need not be applied. + bool failedUpdate = syncApply(o); + if( failedUpdate && shouldRetry(o, hn)) { + failedUpdate = syncApply(o); + uassert(15915, "replSet update still fails after adding missing object", !failedUpdate); + } } _logOpObjRS(o); /* with repl sets we write the ops to our oplog too */ } @@ -149,7 +152,11 @@ namespace mongo { start = now; } } - + + if ( ts > minValid ) { + break; + } + getDur().commitIfNeeded(); } catch (DBException& e) { @@ -157,7 +164,7 @@ namespace mongo { if( e.getCode() == 11000 || e.getCode() == 11001 ) { continue; } - + // handle cursor not found (just requery) if( e.getCode() == 13127 ) { r.resetCursor(); @@ -290,7 +297,7 @@ namespace mongo { target = 0; } } - + // no server found if (target == 0) { // if there is no one to sync from @@ -298,7 +305,7 @@ namespace mongo { tryToGoLiveAsASecondary(minvalid); return; } - + r.tailingQueryGTE(rsoplog, lastOpTimeWritten); // if target cut connections between connecting and querying (for // example, because it stepped down) we might not have a cursor @@ -408,7 +415,7 @@ namespace mongo { if( !target->hbinfo().hbstate.readable() ) { break; } - + if( myConfig().slaveDelay != sd ) // reconf break; } @@ -429,7 +436,7 @@ namespace mongo { } syncApply(o); - _logOpObjRS(o); // with repl sets we write the ops to our oplog too + _logOpObjRS(o); // with repl sets we write the ops to our oplog too } catch (DBException& e) { sethbmsg(str::stream() << "syncTail: " << e.toString() << ", syncing: " << o); @@ -444,7 +451,7 @@ namespace mongo { // TODO : reuse our connection to the primary. return; } - + if( !target->hbinfo().hbstate.readable() ) { return; } @@ -458,7 +465,7 @@ namespace mongo { sleepsecs(1); return; } - if( sp.state.fatal() || sp.state.startup() ) { + if( _blockSync || sp.state.fatal() || sp.state.startup() ) { sleepsecs(5); return; } @@ -530,6 +537,15 @@ namespace mongo { replLocalAuth(); } + void ReplSetImpl::blockSync(bool block) { + _blockSync = block; + if (_blockSync) { + // syncing is how we get into SECONDARY state, so we'll be stuck in + // RECOVERING until we unblock + changeState(MemberState::RS_RECOVERING); + } + } + void GhostSync::associateSlave(const BSONObj& id, const int memberId) { const OID rid = id["_id"].OID(); rwlock lk( _lock , true ); @@ -556,10 +572,10 @@ namespace mongo { OCCASIONALLY warning() << "couldn't update slave " << rid << " no entry" << rsLog; return; } - + GhostSlave& slave = i->second; if (!slave.init) { - OCCASIONALLY log() << "couldn't update slave " << rid << " not init" << rsLog; + OCCASIONALLY log() << "couldn't update slave " << rid << " not init" << rsLog; return; } |