diff options
Diffstat (limited to 'db/repl/rs_sync.cpp')
-rw-r--r-- | db/repl/rs_sync.cpp | 368 |
1 files changed, 247 insertions, 121 deletions
diff --git a/db/repl/rs_sync.cpp b/db/repl/rs_sync.cpp index 9de3f60..8d06fcc 100644 --- a/db/repl/rs_sync.cpp +++ b/db/repl/rs_sync.cpp @@ -19,30 +19,21 @@ #include "../../client/dbclient.h" #include "rs.h" #include "../repl.h" - +#include "connections.h" namespace mongo { using namespace bson; - extern unsigned replSetForceInitialSyncFailure; - void startSyncThread() { - Client::initThread("rs_sync"); - cc().iAmSyncThread(); - theReplSet->syncThread(); - cc().shutdown(); - } - + /* apply the log op that is in param o */ void ReplSetImpl::syncApply(const BSONObj &o) { - //const char *op = o.getStringField("op"); - - char db[MaxDatabaseLen]; + char db[MaxDatabaseNameLen]; const char *ns = o.getStringField("ns"); nsToDatabase(ns, db); if ( *ns == '.' || *ns == 0 ) { - if( *o.getStringField("op") == 'n' ) - return; + if( *o.getStringField("op") == 'n' ) + return; log() << "replSet skipping bad op in oplog: " << o.toString() << endl; return; } @@ -54,19 +45,21 @@ namespace mongo { applyOperation_inlock(o); } + /* initial oplog application, during initial sync, after cloning. + @return false on failure. + this method returns an error and doesn't throw exceptions (i think). + */ bool ReplSetImpl::initialSyncOplogApplication( - string hn, - const Member *primary, + const Member *source, OpTime applyGTE, - OpTime minValid) - { - if( primary == 0 ) return false; + OpTime minValid) { + if( source == 0 ) return false; - OpTime ts; + const string hn = source->h().toString(); + OplogReader r; try { - OplogReader r; - if( !r.connect(hn) ) { - log(2) << "replSet can't connect to " << hn << " to read operations" << rsLog; + if( !r.connect(hn) ) { + log() << "replSet initial sync error can't connect to " << hn << " to read " << rsoplog << rsLog; return false; } @@ -80,48 +73,63 @@ namespace mongo { } assert( r.haveCursor() ); - /* we lock outside the loop to avoid the overhead of locking on every operation. server isn't usable yet anyway! */ - writelock lk(""); - { - if( !r.more() ) { + if( !r.more() ) { sethbmsg("replSet initial sync error reading remote oplog"); + log() << "replSet initial sync error remote oplog (" << rsoplog << ") on host " << hn << " is empty?" << rsLog; return false; } bo op = r.next(); OpTime t = op["ts"]._opTime(); r.putBack(op); - assert( !t.isNull() ); + + if( op.firstElement().fieldName() == string("$err") ) { + log() << "replSet initial sync error querying " << rsoplog << " on " << hn << " : " << op.toString() << rsLog; + return false; + } + + uassert( 13508 , str::stream() << "no 'ts' in first op in oplog: " << op , !t.isNull() ); if( t > applyGTE ) { sethbmsg(str::stream() << "error " << hn << " oplog wrapped during initial sync"); + log() << "replSet initial sync expected first optime of " << applyGTE << rsLog; + log() << "replSet initial sync but received a first optime of " << t << " from " << hn << rsLog; return false; } } + } + catch(DBException& e) { + log() << "replSet initial sync failing: " << e.toString() << rsLog; + return false; + } - // todo : use exhaust - unsigned long long n = 0; - while( 1 ) { + /* we lock outside the loop to avoid the overhead of locking on every operation. */ + writelock lk(""); + // todo : use exhaust + OpTime ts; + unsigned long long n = 0; + while( 1 ) { + try { if( !r.more() ) break; BSONObj o = r.nextSafe(); /* note we might get "not master" at some point */ { - //writelock lk(""); - ts = o["ts"]._opTime(); /* if we have become primary, we dont' want to apply things from elsewhere - anymore. assumePrimary is in the db lock so we are safe as long as + anymore. assumePrimary is in the db lock so we are safe as long as we check after we locked above. */ - const Member *p1 = box.getPrimary(); - if( p1 != primary || replSetForceInitialSyncFailure ) { + if( (source->state() != MemberState::RS_PRIMARY && + source->state() != MemberState::RS_SECONDARY) || + replSetForceInitialSyncFailure ) { + int f = replSetForceInitialSyncFailure; if( f > 0 ) { replSetForceInitialSyncFailure = f-1; log() << "replSet test code invoked, replSetForceInitialSyncFailure" << rsLog; + throw DBException("forced error",0); } - log() << "replSet primary was:" << primary->fullName() << " now:" << - (p1 != 0 ? p1->fullName() : "none") << rsLog; + log() << "replSet we are now primary" << rsLog; throw DBException("primary changed",0); } @@ -131,38 +139,48 @@ namespace mongo { } _logOpObjRS(o); /* with repl sets we write the ops to our oplog too */ } - if( ++n % 100000 == 0 ) { + if( ++n % 100000 == 0 ) { // simple progress metering log() << "replSet initialSyncOplogApplication " << n << rsLog; } + + getDur().commitIfNeeded(); } - } - catch(DBException& e) { - if( ts <= minValid ) { - // didn't make it far enough - log() << "replSet initial sync failing, error applying oplog " << e.toString() << rsLog; - return false; + catch (DBException& e) { + if( e.getCode() == 11000 || e.getCode() == 11001 ) { + // skip duplicate key exceptions + continue; + } + + if( ts <= minValid ) { + // didn't make it far enough + log() << "replSet initial sync failing, error applying oplog " << e.toString() << rsLog; + return false; + } + + // otherwise, whatever + break; } } return true; } - /* should be in RECOVERING state on arrival here. + /* should be in RECOVERING state on arrival here. readlocks @return true if transitioned to SECONDARY */ - bool ReplSetImpl::tryToGoLiveAsASecondary(OpTime& /*out*/ minvalid) { - bool golive = false; + bool ReplSetImpl::tryToGoLiveAsASecondary(OpTime& /*out*/ minvalid) { + bool golive = false; { readlock lk("local.replset.minvalid"); BSONObj mv; - if( Helpers::getSingleton("local.replset.minvalid", mv) ) { + if( Helpers::getSingleton("local.replset.minvalid", mv) ) { minvalid = mv["ts"]._opTime(); - if( minvalid <= lastOpTimeWritten ) { + if( minvalid <= lastOpTimeWritten ) { golive=true; } } - else + else golive = true; /* must have been the original member */ } if( golive ) { @@ -172,44 +190,104 @@ namespace mongo { return golive; } - /* tail the primary's oplog. ok to return, will be re-called. */ - void ReplSetImpl::syncTail() { - // todo : locking vis a vis the mgr... + /** + * Checks if the oplog given is too far ahead to read from. + * + * @param r the oplog + * @param hn the hostname (for log messages) + * + * @return if we are stale compared to the oplog on hn + */ + bool ReplSetImpl::_isStale(OplogReader& r, const string& hn) { + BSONObj remoteOldestOp = r.findOne(rsoplog, Query()); + OpTime ts = remoteOldestOp["ts"]._opTime(); + DEV log() << "replSet remoteOldestOp: " << ts.toStringLong() << rsLog; + else log(3) << "replSet remoteOldestOp: " << ts.toStringLong() << rsLog; + DEV { + // debugging sync1.js... + log() << "replSet lastOpTimeWritten: " << lastOpTimeWritten.toStringLong() << rsLog; + log() << "replSet our state: " << state().toString() << rsLog; + } + if( lastOpTimeWritten < ts ) { + log() << "replSet error RS102 too stale to catch up, at least from " << hn << rsLog; + log() << "replSet our last optime : " << lastOpTimeWritten.toStringLong() << rsLog; + log() << "replSet oldest at " << hn << " : " << ts.toStringLong() << rsLog; + log() << "replSet See http://www.mongodb.org/display/DOCS/Resyncing+a+Very+Stale+Replica+Set+Member" << rsLog; + sethbmsg("error RS102 too stale to catch up"); + changeState(MemberState::RS_RECOVERING); + sleepsecs(120); + return true; + } + return false; + } - const Member *primary = box.getPrimary(); - if( primary == 0 ) return; - string hn = primary->h().toString(); - OplogReader r; - if( !r.connect(primary->h().toString()) ) { + /** + * Tries to connect the oplog reader to a potential sync source. If + * successful, it checks that we are not stale compared to this source. + * + * @param r reader to populate + * @param hn hostname to try + * + * @return if both checks pass, it returns true, otherwise false. + */ + bool ReplSetImpl::_getOplogReader(OplogReader& r, string& hn) { + assert(r.conn() == 0); + + if( !r.connect(hn) ) { log(2) << "replSet can't connect to " << hn << " to read operations" << rsLog; - return; + r.resetConnection(); + return false; + } + if( _isStale(r, hn)) { + r.resetConnection(); + return false; } + return true; + } - /* first make sure we are not hopelessly out of sync by being very stale. */ - { - BSONObj remoteOldestOp = r.findOne(rsoplog, Query()); - OpTime ts = remoteOldestOp["ts"]._opTime(); - DEV log() << "replSet remoteOldestOp: " << ts.toStringLong() << rsLog; - else log(3) << "replSet remoteOldestOp: " << ts.toStringLong() << rsLog; - DEV { - // debugging sync1.js... - log() << "replSet lastOpTimeWritten: " << lastOpTimeWritten.toStringLong() << rsLog; - log() << "replSet our state: " << state().toString() << rsLog; + /* tail an oplog. ok to return, will be re-called. */ + void ReplSetImpl::syncTail() { + // todo : locking vis a vis the mgr... + OplogReader r; + string hn; + + const Member *target = box.getPrimary(); + if (target != 0) { + hn = target->h().toString(); + if (!_getOplogReader(r, hn)) { + // we might be stale wrt the primary, but could still sync from + // a secondary + target = 0; + } + } + + // if we cannot reach the master but someone else is more up-to-date + // than we are, sync from them. + if( target == 0 ) { + for(Member *m = head(); m; m=m->next()) { + hn = m->h().toString(); + if (m->hbinfo().up() && m->state().readable() && + (m->hbinfo().opTime > lastOpTimeWritten) && + m->config().slaveDelay == 0 && + _getOplogReader(r, hn)) { + target = m; + break; + } } - if( lastOpTimeWritten < ts ) { - log() << "replSet error RS102 too stale to catch up, at least from primary: " << hn << rsLog; - log() << "replSet our last optime : " << lastOpTimeWritten.toStringLong() << rsLog; - log() << "replSet oldest at " << hn << " : " << ts.toStringLong() << rsLog; - log() << "replSet See http://www.mongodb.org/display/DOCS/Resyncing+a+Very+Stale+Replica+Set+Member" << rsLog; - sethbmsg("error RS102 too stale to catch up"); - sleepsecs(120); + + // no server found + if (target == 0) { + // if there is no one to sync from + OpTime minvalid; + tryToGoLiveAsASecondary(minvalid); return; } } r.tailingQueryGTE(rsoplog, lastOpTimeWritten); assert( r.haveCursor() ); - assert( r.awaitCapable() ); + + uassert(1000, "replSet source for syncing doesn't seem to be await capable -- is it an older version of mongodb?", r.awaitCapable() ); { if( !r.more() ) { @@ -222,7 +300,7 @@ namespace mongo { return; } OpTime theirTS = theirLastOp["ts"]._opTime(); - if( theirTS < lastOpTimeWritten ) { + if( theirTS < lastOpTimeWritten ) { log() << "replSet we are ahead of the primary, will try to roll back" << rsLog; syncRollback(r); return; @@ -231,7 +309,7 @@ namespace mongo { log() << "replSet syncTail condition 1" << rsLog; sleepsecs(1); } - catch(DBException& e) { + catch(DBException& e) { log() << "replSet error querying " << hn << ' ' << e.toString() << rsLog; sleepsecs(2); } @@ -249,12 +327,9 @@ namespace mongo { BSONObj o = r.nextSafe(); OpTime ts = o["ts"]._opTime(); long long h = o["h"].numberLong(); - if( ts != lastOpTimeWritten || h != lastH ) { - log(1) << "TEMP our last op time written: " << lastOpTimeWritten.toStringPretty() << endl; - log(1) << "TEMP primary's GTE: " << ts.toStringPretty() << endl; - /* - }*/ - + if( ts != lastOpTimeWritten || h != lastH ) { + log() << "replSet our last op time written: " << lastOpTimeWritten.toStringPretty() << endl; + log() << "replset source's GTE: " << ts.toStringPretty() << endl; syncRollback(r); return; } @@ -268,49 +343,45 @@ namespace mongo { while( 1 ) { while( 1 ) { - if( !r.moreInCurrentBatch() ) { - /* we need to occasionally check some things. between + if( !r.moreInCurrentBatch() ) { + /* we need to occasionally check some things. between batches is probably a good time. */ /* perhaps we should check this earlier? but not before the rollback checks. */ - if( state().recovering() ) { + if( state().recovering() ) { /* can we go to RS_SECONDARY state? we can if not too old and if minvalid achieved */ OpTime minvalid; bool golive = ReplSetImpl::tryToGoLiveAsASecondary(minvalid); if( golive ) { ; } - else { + else { sethbmsg(str::stream() << "still syncing, not yet to minValid optime" << minvalid.toString()); } /* todo: too stale capability */ } - if( box.getPrimary() != primary ) - return; + { + const Member *primary = box.getPrimary(); + + if( !target->hbinfo().hbstate.readable() || + // if we are not syncing from the primary, return (if + // it's up) so that we can try accessing it again + (target != primary && primary != 0)) { + return; + } + } } if( !r.more() ) break; - { + { BSONObj o = r.nextSafe(); /* note we might get "not master" at some point */ - { - writelock lk(""); - /* if we have become primary, we dont' want to apply things from elsewhere - anymore. assumePrimary is in the db lock so we are safe as long as - we check after we locked above. */ - if( box.getPrimary() != primary ) { - if( box.getState().primary() ) - log(0) << "replSet stopping syncTail we are now primary" << rsLog; - return; - } - - syncApply(o); - _logOpObjRS(o); /* with repl sets we write the ops to our oplog too: */ - } int sd = myConfig().slaveDelay; - if( sd ) { + // ignore slaveDelay if the box is still initializing. once + // it becomes secondary we can worry about it. + if( sd && box.getState().secondary() ) { const OpTime ts = o["ts"]._opTime(); long long a = ts.getSecs(); long long b = time(0); @@ -329,13 +400,30 @@ namespace mongo { sleepsecs(6); if( time(0) >= waitUntil ) break; - if( box.getPrimary() != primary ) + if( !target->hbinfo().hbstate.readable() ) { break; + } if( myConfig().slaveDelay != sd ) // reconf break; } } } + + } + + { + writelock lk(""); + + /* if we have become primary, we dont' want to apply things from elsewhere + anymore. assumePrimary is in the db lock so we are safe as long as + we check after we locked above. */ + if( box.getState().primary() ) { + log(0) << "replSet stopping syncTail we are now primary" << rsLog; + return; + } + + syncApply(o); + _logOpObjRS(o); /* with repl sets we write the ops to our oplog too: */ } } } @@ -345,8 +433,9 @@ namespace mongo { // TODO : reuse our connection to the primary. return; } - if( box.getPrimary() != primary ) + if( !target->hbinfo().hbstate.readable() ) { return; + } // looping back is ok because this is a tailable cursor } } @@ -357,15 +446,11 @@ namespace mongo { sleepsecs(1); return; } - if( sp.state.fatal() ) { + if( sp.state.fatal() ) { sleepsecs(5); return; } - /* later, we can sync from up secondaries if we want. tbd. */ - if( sp.primary == 0 ) - return; - /* do we have anything at all? */ if( lastOpTimeWritten.isNull() ) { syncDoInitialSync(); @@ -377,23 +462,64 @@ namespace mongo { } void ReplSetImpl::syncThread() { - if( myConfig().arbiterOnly ) - return; - while( 1 ) { + /* test here was to force a receive timeout + ScopedConn c("localhost"); + bo info; + try { + log() << "this is temp" << endl; + c.runCommand("admin", BSON("sleep"<<120), info); + log() << info.toString() << endl; + c.runCommand("admin", BSON("sleep"<<120), info); + log() << "temp" << endl; + } + catch( DBException& e ) { + log() << e.toString() << endl; + c.runCommand("admin", BSON("sleep"<<120), info); + log() << "temp" << endl; + } + */ + + while( 1 ) { + if( myConfig().arbiterOnly ) + return; + try { _syncThread(); } - catch(DBException& e) { + catch(DBException& e) { sethbmsg("syncThread: " + e.toString()); sleepsecs(10); } - catch(...) { + catch(...) { sethbmsg("unexpected exception in syncThread()"); - // TODO : SET NOT SECONDARY here. + // TODO : SET NOT SECONDARY here? sleepsecs(60); } sleepsecs(1); + + /* normally msgCheckNewState gets called periodically, but in a single node repl set there + are no heartbeat threads, so we do it here to be sure. this is relevant if the singleton + member has done a stepDown() and needs to come back up. + */ + OCCASIONALLY mgr->send( boost::bind(&Manager::msgCheckNewState, theReplSet->mgr) ); + } + } + + void startSyncThread() { + static int n; + if( n != 0 ) { + log() << "replSet ERROR : more than one sync thread?" << rsLog; + assert( n == 0 ); + } + n++; + + Client::initThread("replica set sync"); + cc().iAmSyncThread(); + if (!noauth) { + cc().getAuthenticationInfo()->authorize("local"); } + theReplSet->syncThread(); + cc().shutdown(); } } |