diff options
Diffstat (limited to 'db/repl/rs_initialsync.cpp')
-rw-r--r-- | db/repl/rs_initialsync.cpp | 205 |
1 files changed, 142 insertions, 63 deletions
diff --git a/db/repl/rs_initialsync.cpp b/db/repl/rs_initialsync.cpp index 3851c66..5a54059 100644 --- a/db/repl/rs_initialsync.cpp +++ b/db/repl/rs_initialsync.cpp @@ -15,6 +15,7 @@ */ #include "pch.h" +#include "../repl.h" #include "../client.h" #include "../../client/dbclient.h" #include "rs.h" @@ -33,15 +34,17 @@ namespace mongo { // add try/catch with sleep - void isyncassert(const char *msg, bool expr) { - if( !expr ) { + void isyncassert(const char *msg, bool expr) { + if( !expr ) { string m = str::stream() << "initial sync " << msg; theReplSet->sethbmsg(m, 0); uasserted(13404, m); } } - void ReplSetImpl::syncDoInitialSync() { + void ReplSetImpl::syncDoInitialSync() { + createOplog(); + while( 1 ) { try { _syncDoInitialSync(); @@ -54,14 +57,14 @@ namespace mongo { } } - bool cloneFrom(const char *masterHost, string& errmsg, const string& fromdb, bool logForReplication, - bool slaveOk, bool useReplAuth, bool snapshot); + bool cloneFrom(const char *masterHost, string& errmsg, const string& fromdb, bool logForReplication, + bool slaveOk, bool useReplAuth, bool snapshot); /* todo : progress metering to sethbmsg. */ static bool clone(const char *master, string db) { string err; - return cloneFrom(master, err, db, false, - /*slaveok later can be true*/ false, true, false); + return cloneFrom(master, err, db, false, + /* slave_ok */ true, true, false); } void _logOpObjRS(const BSONObj& op); @@ -71,11 +74,11 @@ namespace mongo { static void emptyOplog() { writelock lk(rsoplog); Client::Context ctx(rsoplog); - NamespaceDetails *d = nsdetails(rsoplog); + NamespaceDetails *d = nsdetails(rsoplog); - // temp - if( d && d->nrecords == 0 ) - return; // already empty, ok. + // temp + if( d && d->stats.nrecords == 0 ) + return; // already empty, ok. log(1) << "replSet empty oplog" << rsLog; d->emptyCappedCollection(rsoplog); @@ -84,10 +87,10 @@ namespace mongo { string errmsg; bob res; dropCollection(rsoplog, errmsg, res); - log() << "replSet recreated oplog so it is empty. todo optimize this..." << rsLog; - createOplog();*/ + log() << "replSet recreated oplog so it is empty. todo optimize this..." << rsLog; + createOplog();*/ - // TEMP: restart to recreate empty oplog + // TEMP: restart to recreate empty oplog //log() << "replSet FATAL error during initial sync. mongod restart required." << rsLog; //dbexit( EXIT_CLEAN ); @@ -100,106 +103,182 @@ namespace mongo { */ } - void ReplSetImpl::_syncDoInitialSync() { - sethbmsg("initial sync pending",0); + /** + * Choose a member to sync from. + * + * The initalSync option is an object with 1 k/v pair: + * + * "state" : 1|2 + * "name" : "host" + * "_id" : N + * "optime" : t + * + * All except optime are exact matches. "optime" will find a secondary with + * an optime >= to the optime given. + */ + const Member* ReplSetImpl::getMemberToSyncTo() { + BSONObj sync = myConfig().initialSync; + bool secondaryOnly = false, isOpTime = false; + char *name = 0; + int id = -1; + OpTime optime; StateBox::SP sp = box.get(); assert( !sp.state.primary() ); // wouldn't make sense if we were. - const Member *cp = sp.primary; - if( cp == 0 ) { - sethbmsg("initial sync need a member to be primary",0); + // if it exists, we've already checked that these fields are valid in + // rs_config.cpp + if ( !sync.isEmpty() ) { + if (sync.hasElement("state")) { + if (sync["state"].Number() == 1) { + if (sp.primary) { + sethbmsg( str::stream() << "syncing to primary: " << sp.primary->fullName(), 0); + return const_cast<Member*>(sp.primary); + } + else { + sethbmsg("couldn't clone from primary"); + return NULL; + } + } + else { + secondaryOnly = true; + } + } + if (sync.hasElement("name")) { + name = (char*)sync["name"].valuestr(); + } + if (sync.hasElement("_id")) { + id = (int)sync["_id"].Number(); + } + if (sync.hasElement("optime")) { + isOpTime = true; + optime = sync["optime"]._opTime(); + } + } + + for( Member *m = head(); m; m = m->next() ) { + if (!m->hbinfo().up() || + (m->state() != MemberState::RS_SECONDARY && + m->state() != MemberState::RS_PRIMARY) || + (secondaryOnly && m->state() != MemberState::RS_SECONDARY) || + (id != -1 && (int)m->id() != id) || + (name != 0 && strcmp(name, m->fullName().c_str()) != 0) || + (isOpTime && optime >= m->hbinfo().opTime)) { + continue; + } + + sethbmsg( str::stream() << "syncing to: " << m->fullName(), 0); + return const_cast<Member*>(m); + } + + sethbmsg( str::stream() << "couldn't find a member matching the sync criteria: " << + "\nstate? " << (secondaryOnly ? "2" : "none") << + "\nname? " << (name ? name : "none") << + "\n_id? " << id << + "\noptime? " << optime.toStringPretty() ); + + return NULL; + } + + /** + * Do the initial sync for this member. + */ + void ReplSetImpl::_syncDoInitialSync() { + sethbmsg("initial sync pending",0); + + const Member *source = getMemberToSyncTo(); + if (!source) { + sethbmsg("initial sync need a member to be primary or secondary to do our initial sync", 0); sleepsecs(15); return; } - string masterHostname = cp->h().toString(); + string sourceHostname = source->h().toString(); OplogReader r; - if( !r.connect(masterHostname) ) { - sethbmsg( str::stream() << "initial sync couldn't connect to " << cp->h().toString() , 0); + if( !r.connect(sourceHostname) ) { + sethbmsg( str::stream() << "initial sync couldn't connect to " << source->h().toString() , 0); sleepsecs(15); return; } BSONObj lastOp = r.getLastOp(rsoplog); - if( lastOp.isEmpty() ) { + if( lastOp.isEmpty() ) { sethbmsg("initial sync couldn't read remote oplog", 0); sleepsecs(15); return; } OpTime startingTS = lastOp["ts"]._opTime(); - - { - /* make sure things aren't too flappy */ - sleepsecs(5); - isyncassert( "flapping?", box.getPrimary() == cp ); - BSONObj o = r.getLastOp(rsoplog); - isyncassert( "flapping [2]?", !o.isEmpty() ); - } - - sethbmsg("initial sync drop all databases", 0); - dropAllDatabasesExceptLocal(); -// sethbmsg("initial sync drop oplog", 0); -// emptyOplog(); - - list<string> dbs = r.conn()->getDatabaseNames(); - for( list<string>::iterator i = dbs.begin(); i != dbs.end(); i++ ) { - string db = *i; - if( db != "local" ) { - sethbmsg( str::stream() << "initial sync cloning db: " << db , 0); - bool ok; - { - writelock lk(db); - Client::Context ctx(db); - ok = clone(masterHostname.c_str(), db); - } - if( !ok ) { - sethbmsg( str::stream() << "initial sync error clone of " << db << " failed sleeping 5 minutes" ,0); - sleepsecs(300); - return; + if (replSettings.fastsync) { + log() << "fastsync: skipping database clone" << rsLog; + } + else { + sethbmsg("initial sync drop all databases", 0); + dropAllDatabasesExceptLocal(); + + sethbmsg("initial sync clone all databases", 0); + + list<string> dbs = r.conn()->getDatabaseNames(); + for( list<string>::iterator i = dbs.begin(); i != dbs.end(); i++ ) { + string db = *i; + if( db != "local" ) { + sethbmsg( str::stream() << "initial sync cloning db: " << db , 0); + bool ok; + { + writelock lk(db); + Client::Context ctx(db); + ok = clone(sourceHostname.c_str(), db); + } + if( !ok ) { + sethbmsg( str::stream() << "initial sync error clone of " << db << " failed sleeping 5 minutes" ,0); + sleepsecs(300); + return; + } } } } sethbmsg("initial sync query minValid",0); - /* our cloned copy will be strange until we apply oplog events that occurred + isyncassert( "initial sync source must remain readable throughout our initial sync", source->state().readable() ); + + /* our cloned copy will be strange until we apply oplog events that occurred through the process. we note that time point here. */ BSONObj minValid = r.getLastOp(rsoplog); - assert( !minValid.isEmpty() ); + isyncassert( "getLastOp is empty ", !minValid.isEmpty() ); OpTime mvoptime = minValid["ts"]._opTime(); assert( !mvoptime.isNull() ); - /* copy the oplog + /* apply relevant portion of the oplog */ { - sethbmsg("initial sync copy+apply oplog"); - if( ! initialSyncOplogApplication(masterHostname, cp, startingTS, mvoptime) ) { // note we assume here that this call does not throw + sethbmsg("initial sync initial oplog application"); + isyncassert( "initial sync source must remain readable throughout our initial sync [2]", source->state().readable() ); + if( ! initialSyncOplogApplication(source, /*applyGTE*/startingTS, /*minValid*/mvoptime) ) { // note we assume here that this call does not throw log() << "replSet initial sync failed during applyoplog" << rsLog; emptyOplog(); // otherwise we'll be up! - lastOpTimeWritten = OpTime(); - lastH = 0; + lastOpTimeWritten = OpTime(); + lastH = 0; log() << "replSet cleaning up [1]" << rsLog; { writelock lk("local."); Client::Context cx( "local." ); - cx.db()->flushFiles(true); + cx.db()->flushFiles(true); } log() << "replSet cleaning up [2]" << rsLog; - sleepsecs(2); + sleepsecs(5); return; } } sethbmsg("initial sync finishing up",0); - + assert( !box.getState().primary() ); // wouldn't make sense if we were. { writelock lk("local."); Client::Context cx( "local." ); - cx.db()->flushFiles(true); + cx.db()->flushFiles(true); try { log() << "replSet set minValid=" << minValid["ts"]._opTime().toString() << rsLog; } |