summaryrefslogtreecommitdiff
path: root/db/repl/rs_sync.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'db/repl/rs_sync.cpp')
-rw-r--r--db/repl/rs_sync.cpp368
1 files changed, 247 insertions, 121 deletions
diff --git a/db/repl/rs_sync.cpp b/db/repl/rs_sync.cpp
index 9de3f60..8d06fcc 100644
--- a/db/repl/rs_sync.cpp
+++ b/db/repl/rs_sync.cpp
@@ -19,30 +19,21 @@
#include "../../client/dbclient.h"
#include "rs.h"
#include "../repl.h"
-
+#include "connections.h"
namespace mongo {
using namespace bson;
-
extern unsigned replSetForceInitialSyncFailure;
- void startSyncThread() {
- Client::initThread("rs_sync");
- cc().iAmSyncThread();
- theReplSet->syncThread();
- cc().shutdown();
- }
-
+ /* apply the log op that is in param o */
void ReplSetImpl::syncApply(const BSONObj &o) {
- //const char *op = o.getStringField("op");
-
- char db[MaxDatabaseLen];
+ char db[MaxDatabaseNameLen];
const char *ns = o.getStringField("ns");
nsToDatabase(ns, db);
if ( *ns == '.' || *ns == 0 ) {
- if( *o.getStringField("op") == 'n' )
- return;
+ if( *o.getStringField("op") == 'n' )
+ return;
log() << "replSet skipping bad op in oplog: " << o.toString() << endl;
return;
}
@@ -54,19 +45,21 @@ namespace mongo {
applyOperation_inlock(o);
}
+ /* initial oplog application, during initial sync, after cloning.
+ @return false on failure.
+ this method returns an error and doesn't throw exceptions (i think).
+ */
bool ReplSetImpl::initialSyncOplogApplication(
- string hn,
- const Member *primary,
+ const Member *source,
OpTime applyGTE,
- OpTime minValid)
- {
- if( primary == 0 ) return false;
+ OpTime minValid) {
+ if( source == 0 ) return false;
- OpTime ts;
+ const string hn = source->h().toString();
+ OplogReader r;
try {
- OplogReader r;
- if( !r.connect(hn) ) {
- log(2) << "replSet can't connect to " << hn << " to read operations" << rsLog;
+ if( !r.connect(hn) ) {
+ log() << "replSet initial sync error can't connect to " << hn << " to read " << rsoplog << rsLog;
return false;
}
@@ -80,48 +73,63 @@ namespace mongo {
}
assert( r.haveCursor() );
- /* we lock outside the loop to avoid the overhead of locking on every operation. server isn't usable yet anyway! */
- writelock lk("");
-
{
- if( !r.more() ) {
+ if( !r.more() ) {
sethbmsg("replSet initial sync error reading remote oplog");
+ log() << "replSet initial sync error remote oplog (" << rsoplog << ") on host " << hn << " is empty?" << rsLog;
return false;
}
bo op = r.next();
OpTime t = op["ts"]._opTime();
r.putBack(op);
- assert( !t.isNull() );
+
+ if( op.firstElement().fieldName() == string("$err") ) {
+ log() << "replSet initial sync error querying " << rsoplog << " on " << hn << " : " << op.toString() << rsLog;
+ return false;
+ }
+
+ uassert( 13508 , str::stream() << "no 'ts' in first op in oplog: " << op , !t.isNull() );
if( t > applyGTE ) {
sethbmsg(str::stream() << "error " << hn << " oplog wrapped during initial sync");
+ log() << "replSet initial sync expected first optime of " << applyGTE << rsLog;
+ log() << "replSet initial sync but received a first optime of " << t << " from " << hn << rsLog;
return false;
}
}
+ }
+ catch(DBException& e) {
+ log() << "replSet initial sync failing: " << e.toString() << rsLog;
+ return false;
+ }
- // todo : use exhaust
- unsigned long long n = 0;
- while( 1 ) {
+ /* we lock outside the loop to avoid the overhead of locking on every operation. */
+ writelock lk("");
+ // todo : use exhaust
+ OpTime ts;
+ unsigned long long n = 0;
+ while( 1 ) {
+ try {
if( !r.more() )
break;
BSONObj o = r.nextSafe(); /* note we might get "not master" at some point */
{
- //writelock lk("");
-
ts = o["ts"]._opTime();
/* if we have become primary, we dont' want to apply things from elsewhere
- anymore. assumePrimary is in the db lock so we are safe as long as
+ anymore. assumePrimary is in the db lock so we are safe as long as
we check after we locked above. */
- const Member *p1 = box.getPrimary();
- if( p1 != primary || replSetForceInitialSyncFailure ) {
+ if( (source->state() != MemberState::RS_PRIMARY &&
+ source->state() != MemberState::RS_SECONDARY) ||
+ replSetForceInitialSyncFailure ) {
+
int f = replSetForceInitialSyncFailure;
if( f > 0 ) {
replSetForceInitialSyncFailure = f-1;
log() << "replSet test code invoked, replSetForceInitialSyncFailure" << rsLog;
+ throw DBException("forced error",0);
}
- log() << "replSet primary was:" << primary->fullName() << " now:" <<
- (p1 != 0 ? p1->fullName() : "none") << rsLog;
+ log() << "replSet we are now primary" << rsLog;
throw DBException("primary changed",0);
}
@@ -131,38 +139,48 @@ namespace mongo {
}
_logOpObjRS(o); /* with repl sets we write the ops to our oplog too */
}
- if( ++n % 100000 == 0 ) {
+ if( ++n % 100000 == 0 ) {
// simple progress metering
log() << "replSet initialSyncOplogApplication " << n << rsLog;
}
+
+ getDur().commitIfNeeded();
}
- }
- catch(DBException& e) {
- if( ts <= minValid ) {
- // didn't make it far enough
- log() << "replSet initial sync failing, error applying oplog " << e.toString() << rsLog;
- return false;
+ catch (DBException& e) {
+ if( e.getCode() == 11000 || e.getCode() == 11001 ) {
+ // skip duplicate key exceptions
+ continue;
+ }
+
+ if( ts <= minValid ) {
+ // didn't make it far enough
+ log() << "replSet initial sync failing, error applying oplog " << e.toString() << rsLog;
+ return false;
+ }
+
+ // otherwise, whatever
+ break;
}
}
return true;
}
- /* should be in RECOVERING state on arrival here.
+ /* should be in RECOVERING state on arrival here.
readlocks
@return true if transitioned to SECONDARY
*/
- bool ReplSetImpl::tryToGoLiveAsASecondary(OpTime& /*out*/ minvalid) {
- bool golive = false;
+ bool ReplSetImpl::tryToGoLiveAsASecondary(OpTime& /*out*/ minvalid) {
+ bool golive = false;
{
readlock lk("local.replset.minvalid");
BSONObj mv;
- if( Helpers::getSingleton("local.replset.minvalid", mv) ) {
+ if( Helpers::getSingleton("local.replset.minvalid", mv) ) {
minvalid = mv["ts"]._opTime();
- if( minvalid <= lastOpTimeWritten ) {
+ if( minvalid <= lastOpTimeWritten ) {
golive=true;
}
}
- else
+ else
golive = true; /* must have been the original member */
}
if( golive ) {
@@ -172,44 +190,104 @@ namespace mongo {
return golive;
}
- /* tail the primary's oplog. ok to return, will be re-called. */
- void ReplSetImpl::syncTail() {
- // todo : locking vis a vis the mgr...
+ /**
+ * Checks if the oplog given is too far ahead to read from.
+ *
+ * @param r the oplog
+ * @param hn the hostname (for log messages)
+ *
+ * @return if we are stale compared to the oplog on hn
+ */
+ bool ReplSetImpl::_isStale(OplogReader& r, const string& hn) {
+ BSONObj remoteOldestOp = r.findOne(rsoplog, Query());
+ OpTime ts = remoteOldestOp["ts"]._opTime();
+ DEV log() << "replSet remoteOldestOp: " << ts.toStringLong() << rsLog;
+ else log(3) << "replSet remoteOldestOp: " << ts.toStringLong() << rsLog;
+ DEV {
+ // debugging sync1.js...
+ log() << "replSet lastOpTimeWritten: " << lastOpTimeWritten.toStringLong() << rsLog;
+ log() << "replSet our state: " << state().toString() << rsLog;
+ }
+ if( lastOpTimeWritten < ts ) {
+ log() << "replSet error RS102 too stale to catch up, at least from " << hn << rsLog;
+ log() << "replSet our last optime : " << lastOpTimeWritten.toStringLong() << rsLog;
+ log() << "replSet oldest at " << hn << " : " << ts.toStringLong() << rsLog;
+ log() << "replSet See http://www.mongodb.org/display/DOCS/Resyncing+a+Very+Stale+Replica+Set+Member" << rsLog;
+ sethbmsg("error RS102 too stale to catch up");
+ changeState(MemberState::RS_RECOVERING);
+ sleepsecs(120);
+ return true;
+ }
+ return false;
+ }
- const Member *primary = box.getPrimary();
- if( primary == 0 ) return;
- string hn = primary->h().toString();
- OplogReader r;
- if( !r.connect(primary->h().toString()) ) {
+ /**
+ * Tries to connect the oplog reader to a potential sync source. If
+ * successful, it checks that we are not stale compared to this source.
+ *
+ * @param r reader to populate
+ * @param hn hostname to try
+ *
+ * @return if both checks pass, it returns true, otherwise false.
+ */
+ bool ReplSetImpl::_getOplogReader(OplogReader& r, string& hn) {
+ assert(r.conn() == 0);
+
+ if( !r.connect(hn) ) {
log(2) << "replSet can't connect to " << hn << " to read operations" << rsLog;
- return;
+ r.resetConnection();
+ return false;
+ }
+ if( _isStale(r, hn)) {
+ r.resetConnection();
+ return false;
}
+ return true;
+ }
- /* first make sure we are not hopelessly out of sync by being very stale. */
- {
- BSONObj remoteOldestOp = r.findOne(rsoplog, Query());
- OpTime ts = remoteOldestOp["ts"]._opTime();
- DEV log() << "replSet remoteOldestOp: " << ts.toStringLong() << rsLog;
- else log(3) << "replSet remoteOldestOp: " << ts.toStringLong() << rsLog;
- DEV {
- // debugging sync1.js...
- log() << "replSet lastOpTimeWritten: " << lastOpTimeWritten.toStringLong() << rsLog;
- log() << "replSet our state: " << state().toString() << rsLog;
+ /* tail an oplog. ok to return, will be re-called. */
+ void ReplSetImpl::syncTail() {
+ // todo : locking vis a vis the mgr...
+ OplogReader r;
+ string hn;
+
+ const Member *target = box.getPrimary();
+ if (target != 0) {
+ hn = target->h().toString();
+ if (!_getOplogReader(r, hn)) {
+ // we might be stale wrt the primary, but could still sync from
+ // a secondary
+ target = 0;
+ }
+ }
+
+ // if we cannot reach the master but someone else is more up-to-date
+ // than we are, sync from them.
+ if( target == 0 ) {
+ for(Member *m = head(); m; m=m->next()) {
+ hn = m->h().toString();
+ if (m->hbinfo().up() && m->state().readable() &&
+ (m->hbinfo().opTime > lastOpTimeWritten) &&
+ m->config().slaveDelay == 0 &&
+ _getOplogReader(r, hn)) {
+ target = m;
+ break;
+ }
}
- if( lastOpTimeWritten < ts ) {
- log() << "replSet error RS102 too stale to catch up, at least from primary: " << hn << rsLog;
- log() << "replSet our last optime : " << lastOpTimeWritten.toStringLong() << rsLog;
- log() << "replSet oldest at " << hn << " : " << ts.toStringLong() << rsLog;
- log() << "replSet See http://www.mongodb.org/display/DOCS/Resyncing+a+Very+Stale+Replica+Set+Member" << rsLog;
- sethbmsg("error RS102 too stale to catch up");
- sleepsecs(120);
+
+ // no server found
+ if (target == 0) {
+ // if there is no one to sync from
+ OpTime minvalid;
+ tryToGoLiveAsASecondary(minvalid);
return;
}
}
r.tailingQueryGTE(rsoplog, lastOpTimeWritten);
assert( r.haveCursor() );
- assert( r.awaitCapable() );
+
+ uassert(1000, "replSet source for syncing doesn't seem to be await capable -- is it an older version of mongodb?", r.awaitCapable() );
{
if( !r.more() ) {
@@ -222,7 +300,7 @@ namespace mongo {
return;
}
OpTime theirTS = theirLastOp["ts"]._opTime();
- if( theirTS < lastOpTimeWritten ) {
+ if( theirTS < lastOpTimeWritten ) {
log() << "replSet we are ahead of the primary, will try to roll back" << rsLog;
syncRollback(r);
return;
@@ -231,7 +309,7 @@ namespace mongo {
log() << "replSet syncTail condition 1" << rsLog;
sleepsecs(1);
}
- catch(DBException& e) {
+ catch(DBException& e) {
log() << "replSet error querying " << hn << ' ' << e.toString() << rsLog;
sleepsecs(2);
}
@@ -249,12 +327,9 @@ namespace mongo {
BSONObj o = r.nextSafe();
OpTime ts = o["ts"]._opTime();
long long h = o["h"].numberLong();
- if( ts != lastOpTimeWritten || h != lastH ) {
- log(1) << "TEMP our last op time written: " << lastOpTimeWritten.toStringPretty() << endl;
- log(1) << "TEMP primary's GTE: " << ts.toStringPretty() << endl;
- /*
- }*/
-
+ if( ts != lastOpTimeWritten || h != lastH ) {
+ log() << "replSet our last op time written: " << lastOpTimeWritten.toStringPretty() << endl;
+ log() << "replset source's GTE: " << ts.toStringPretty() << endl;
syncRollback(r);
return;
}
@@ -268,49 +343,45 @@ namespace mongo {
while( 1 ) {
while( 1 ) {
- if( !r.moreInCurrentBatch() ) {
- /* we need to occasionally check some things. between
+ if( !r.moreInCurrentBatch() ) {
+ /* we need to occasionally check some things. between
batches is probably a good time. */
/* perhaps we should check this earlier? but not before the rollback checks. */
- if( state().recovering() ) {
+ if( state().recovering() ) {
/* can we go to RS_SECONDARY state? we can if not too old and if minvalid achieved */
OpTime minvalid;
bool golive = ReplSetImpl::tryToGoLiveAsASecondary(minvalid);
if( golive ) {
;
}
- else {
+ else {
sethbmsg(str::stream() << "still syncing, not yet to minValid optime" << minvalid.toString());
}
/* todo: too stale capability */
}
- if( box.getPrimary() != primary )
- return;
+ {
+ const Member *primary = box.getPrimary();
+
+ if( !target->hbinfo().hbstate.readable() ||
+ // if we are not syncing from the primary, return (if
+ // it's up) so that we can try accessing it again
+ (target != primary && primary != 0)) {
+ return;
+ }
+ }
}
if( !r.more() )
break;
- {
+ {
BSONObj o = r.nextSafe(); /* note we might get "not master" at some point */
- {
- writelock lk("");
- /* if we have become primary, we dont' want to apply things from elsewhere
- anymore. assumePrimary is in the db lock so we are safe as long as
- we check after we locked above. */
- if( box.getPrimary() != primary ) {
- if( box.getState().primary() )
- log(0) << "replSet stopping syncTail we are now primary" << rsLog;
- return;
- }
-
- syncApply(o);
- _logOpObjRS(o); /* with repl sets we write the ops to our oplog too: */
- }
int sd = myConfig().slaveDelay;
- if( sd ) {
+ // ignore slaveDelay if the box is still initializing. once
+ // it becomes secondary we can worry about it.
+ if( sd && box.getState().secondary() ) {
const OpTime ts = o["ts"]._opTime();
long long a = ts.getSecs();
long long b = time(0);
@@ -329,13 +400,30 @@ namespace mongo {
sleepsecs(6);
if( time(0) >= waitUntil )
break;
- if( box.getPrimary() != primary )
+ if( !target->hbinfo().hbstate.readable() ) {
break;
+ }
if( myConfig().slaveDelay != sd ) // reconf
break;
}
}
}
+
+ }
+
+ {
+ writelock lk("");
+
+ /* if we have become primary, we dont' want to apply things from elsewhere
+ anymore. assumePrimary is in the db lock so we are safe as long as
+ we check after we locked above. */
+ if( box.getState().primary() ) {
+ log(0) << "replSet stopping syncTail we are now primary" << rsLog;
+ return;
+ }
+
+ syncApply(o);
+ _logOpObjRS(o); /* with repl sets we write the ops to our oplog too: */
}
}
}
@@ -345,8 +433,9 @@ namespace mongo {
// TODO : reuse our connection to the primary.
return;
}
- if( box.getPrimary() != primary )
+ if( !target->hbinfo().hbstate.readable() ) {
return;
+ }
// looping back is ok because this is a tailable cursor
}
}
@@ -357,15 +446,11 @@ namespace mongo {
sleepsecs(1);
return;
}
- if( sp.state.fatal() ) {
+ if( sp.state.fatal() ) {
sleepsecs(5);
return;
}
- /* later, we can sync from up secondaries if we want. tbd. */
- if( sp.primary == 0 )
- return;
-
/* do we have anything at all? */
if( lastOpTimeWritten.isNull() ) {
syncDoInitialSync();
@@ -377,23 +462,64 @@ namespace mongo {
}
void ReplSetImpl::syncThread() {
- if( myConfig().arbiterOnly )
- return;
- while( 1 ) {
+ /* test here was to force a receive timeout
+ ScopedConn c("localhost");
+ bo info;
+ try {
+ log() << "this is temp" << endl;
+ c.runCommand("admin", BSON("sleep"<<120), info);
+ log() << info.toString() << endl;
+ c.runCommand("admin", BSON("sleep"<<120), info);
+ log() << "temp" << endl;
+ }
+ catch( DBException& e ) {
+ log() << e.toString() << endl;
+ c.runCommand("admin", BSON("sleep"<<120), info);
+ log() << "temp" << endl;
+ }
+ */
+
+ while( 1 ) {
+ if( myConfig().arbiterOnly )
+ return;
+
try {
_syncThread();
}
- catch(DBException& e) {
+ catch(DBException& e) {
sethbmsg("syncThread: " + e.toString());
sleepsecs(10);
}
- catch(...) {
+ catch(...) {
sethbmsg("unexpected exception in syncThread()");
- // TODO : SET NOT SECONDARY here.
+ // TODO : SET NOT SECONDARY here?
sleepsecs(60);
}
sleepsecs(1);
+
+ /* normally msgCheckNewState gets called periodically, but in a single node repl set there
+ are no heartbeat threads, so we do it here to be sure. this is relevant if the singleton
+ member has done a stepDown() and needs to come back up.
+ */
+ OCCASIONALLY mgr->send( boost::bind(&Manager::msgCheckNewState, theReplSet->mgr) );
+ }
+ }
+
+ void startSyncThread() {
+ static int n;
+ if( n != 0 ) {
+ log() << "replSet ERROR : more than one sync thread?" << rsLog;
+ assert( n == 0 );
+ }
+ n++;
+
+ Client::initThread("replica set sync");
+ cc().iAmSyncThread();
+ if (!noauth) {
+ cc().getAuthenticationInfo()->authorize("local");
}
+ theReplSet->syncThread();
+ cc().shutdown();
}
}