diff options
author | Antonin Kral <a.kral@bobek.cz> | 2011-09-14 17:08:06 +0200 |
---|---|---|
committer | Antonin Kral <a.kral@bobek.cz> | 2011-09-14 17:08:06 +0200 |
commit | 5d342a758c6095b4d30aba0750b54f13b8916f51 (patch) | |
tree | 762e9aa84781f5e3b96db2c02d356c29cf0217c0 /db/repl.cpp | |
parent | cbe2d992e9cd1ea66af9fa91df006106775d3073 (diff) | |
download | mongodb-5d342a758c6095b4d30aba0750b54f13b8916f51.tar.gz |
Imported Upstream version 2.0.0
Diffstat (limited to 'db/repl.cpp')
-rw-r--r-- | db/repl.cpp | 832 |
1 files changed, 226 insertions, 606 deletions
diff --git a/db/repl.cpp b/db/repl.cpp index b14034d..a18d725 100644 --- a/db/repl.cpp +++ b/db/repl.cpp @@ -26,27 +26,30 @@ local.sources - indicates what sources we pull from as a "slave", and the last update of each local.oplog.$main - our op log as "master" local.dbinfo.<dbname> - no longer used??? - local.pair.startup - can contain a special value indicating for a pair that we have the master copy. + local.pair.startup - [deprecated] can contain a special value indicating for a pair that we have the master copy. used when replacing other half of the pair which has permanently failed. - local.pair.sync - { initialsynccomplete: 1 } + local.pair.sync - [deprecated] { initialsynccomplete: 1 } */ #include "pch.h" #include "jsobj.h" #include "../util/goodies.h" #include "repl.h" -#include "../util/message.h" +#include "../util/net/message.h" #include "../util/background.h" #include "../client/dbclient.h" #include "../client/connpool.h" #include "pdfile.h" -#include "query.h" +#include "ops/query.h" #include "db.h" #include "commands.h" #include "security.h" #include "cmdline.h" #include "repl_block.h" #include "repl/rs.h" +#include "replutil.h" +#include "repl/connections.h" +#include "ops/update.h" namespace mongo { @@ -57,11 +60,6 @@ namespace mongo { volatile int syncing = 0; static volatile int relinquishSyncingSome = 0; - /* if true replace our peer in a replication pair -- don't worry about if his - local.oplog.$main is empty. - */ - bool replacePeer = false; - /* "dead" means something really bad happened like replication falling completely out of sync. when non-null, we are dead and the string is informational */ @@ -69,23 +67,10 @@ namespace mongo { time_t lastForcedResync = 0; - IdTracker &idTracker = *( new IdTracker() ); - } // namespace mongo -#include "replpair.h" - namespace mongo { - PairSync *pairSync = new PairSync(); - bool getInitialSyncCompleted() { - return pairSync->initialSyncCompleted(); - } - - /* --- ReplPair -------------------------------- */ - - ReplPair *replPair = 0; - /* output by the web console */ const char *replInfo = ""; struct ReplInfo { @@ -97,116 +82,6 @@ namespace mongo { } }; - void ReplPair::setMaster(int n, const char *_comment ) { - if ( n == State_Master && !getInitialSyncCompleted() ) - return; - info = _comment; - if ( n != state && !cmdLine.quiet ) - tlog() << "pair: setting master=" << n << " was " << state << endl; - state = n; - } - - /* peer unreachable, try our arbiter */ - void ReplPair::arbitrate() { - ReplInfo r("arbitrate"); - - if ( arbHost == "-" ) { - // no arbiter. we are up, let's assume partner is down and network is not partitioned. - setMasterLocked(State_Master, "remote unreachable"); - return; - } - - auto_ptr<DBClientConnection> conn( newClientConnection() ); - string errmsg; - if ( !conn->connect(arbHost.c_str(), errmsg) ) { - tlog() << "repl: cantconn arbiter " << errmsg << endl; - setMasterLocked(State_CantArb, "can't connect to arb"); - return; - } - - negotiate( conn.get(), "arbiter" ); - } - - /* --------------------------------------------- */ - - class CmdReplacePeer : public Command { - public: - virtual bool slaveOk() const { - return true; - } - virtual bool adminOnly() const { - return true; - } - virtual LockType locktype() const { return WRITE; } - void help(stringstream&h) const { h << "replace a node in a replica pair"; } - CmdReplacePeer() : Command("replacePeer", false, "replacepeer") { } - virtual bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl) { - if ( replPair == 0 ) { - errmsg = "not paired"; - return false; - } - if ( !getInitialSyncCompleted() ) { - errmsg = "not caught up cannot replace peer"; - return false; - } - if ( syncing < 0 ) { - errmsg = "replacepeer already invoked"; - return false; - } - Timer t; - while ( 1 ) { - if ( syncing == 0 || t.millis() > 30000 ) - break; - { - dbtemprelease t; - relinquishSyncingSome = 1; - sleepmillis(1); - } - } - if ( syncing ) { - assert( syncing > 0 ); - errmsg = "timeout waiting for sync() to finish"; - return false; - } - { - ReplSource::SourceVector sources; - ReplSource::loadAll(sources); - if ( sources.size() != 1 ) { - errmsg = "local.sources.count() != 1, cannot replace peer"; - return false; - } - } - { - Helpers::emptyCollection("local.sources"); - BSONObj o = fromjson("{\"replacepeer\":1}"); - Helpers::putSingleton("local.pair.startup", o); - } - syncing = -1; - replAllDead = "replacepeer invoked -- adjust local.sources hostname then restart this db process"; - result.append("info", "adjust local.sources hostname; db restart now required"); - return true; - } - } cmdReplacePeer; - - class CmdForceDead : public Command { - public: - virtual bool slaveOk() const { - return true; - } - virtual bool adminOnly() const { - return true; - } - virtual void help(stringstream& h) const { h << "internal"; } - virtual LockType locktype() const { return WRITE; } - CmdForceDead() : Command("forcedead") { } - virtual bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl) { - replAllDead = "replication forced to stop by 'forcedead' command"; - log() << "*********************************************************\n"; - log() << "received 'forcedead' command, replication forced to stop" << endl; - return true; - } - } cmdForceDead; - /* operator requested resynchronization of replication (on the slave). { resync : 1 } */ class CmdResync : public Command { public: @@ -220,7 +95,7 @@ namespace mongo { virtual LockType locktype() const { return WRITE; } void help(stringstream&h) const { h << "resync (from scratch) an out of date replica slave.\nhttp://www.mongodb.org/display/DOCS/Master+Slave"; } CmdResync() : Command("resync") { } - virtual bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl) { + virtual bool run(const string& , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool fromRepl) { if( cmdLine.usingReplSets() ) { errmsg = "resync command not currently supported with replica sets. See RS102 info in the mongodb documentations"; result.append("info", "http://www.mongodb.org/display/DOCS/Resyncing+a+Very+Stale+Replica+Set+Member"); @@ -265,7 +140,7 @@ namespace mongo { } cmdResync; bool anyReplEnabled() { - return replPair || replSettings.slave || replSettings.master || theReplSet; + return replSettings.slave || replSettings.master || theReplSet; } bool replAuthenticate(DBClientBase *conn); @@ -276,7 +151,7 @@ namespace mongo { if( theReplSet == 0 ) { result.append("ismaster", false); result.append("secondary", false); - result.append("info", ReplSet::startupStatusMsg); + result.append("info", ReplSet::startupStatusMsg.get()); result.append( "isreplicaset" , true ); return; } @@ -287,21 +162,9 @@ namespace mongo { if ( replAllDead ) { result.append("ismaster", 0); - if( authed ) { - if ( replPair ) - result.append("remote", replPair->remote); - } string s = string("dead: ") + replAllDead; result.append("info", s); } - else if ( replPair ) { - result.append("ismaster", replPair->state); - if( authed ) { - result.append("remote", replPair->remote); - if ( !replPair->info.empty() ) - result.append("info", replPair->info.toString()); - } - } else { result.appendBool("ismaster", _isMaster() ); } @@ -369,7 +232,7 @@ namespace mongo { } virtual LockType locktype() const { return NONE; } CmdIsMaster() : Command("isMaster", true, "ismaster") { } - virtual bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool /*fromRepl*/) { + virtual bool run(const string& , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool /*fromRepl*/) { /* currently request to arbiter is (somewhat arbitrarily) an ismaster request that is not authenticated. we allow unauthenticated ismaster but we aren't as verbose informationally if @@ -383,159 +246,11 @@ namespace mongo { } } cmdismaster; - class CmdIsInitialSyncComplete : public Command { - public: - virtual bool requiresAuth() { return false; } - virtual bool slaveOk() const { - return true; - } - virtual LockType locktype() const { return NONE; } - CmdIsInitialSyncComplete() : Command( "isinitialsynccomplete" ) {} - virtual bool run(const string&, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool /*fromRepl*/) { - result.appendBool( "initialsynccomplete", getInitialSyncCompleted() ); - return true; - } - } cmdisinitialsynccomplete; - - /* negotiate who is master - - -1=not set (probably means we just booted) - 0=was slave - 1=was master - - remote,local -> new remote,local - !1,1 -> 0,1 - 1,!1 -> 1,0 - -1,-1 -> dominant->1, nondom->0 - 0,0 -> dominant->1, nondom->0 - 1,1 -> dominant->1, nondom->0 - - { negotiatemaster:1, i_was:<state>, your_name:<hostname> } - returns: - { ok:1, you_are:..., i_am:... } - */ - class CmdNegotiateMaster : public Command { - public: - CmdNegotiateMaster() : Command("negotiatemaster") { } - virtual bool slaveOk() const { - return true; - } - virtual bool adminOnly() const { - return true; - } - virtual LockType locktype() const { return WRITE; } - virtual bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool) { - if ( replPair == 0 ) { - massert( 10383 , "Another mongod instance believes incorrectly that this node is its peer", !cmdObj.getBoolField( "fromArbiter" ) ); - // assume that we are an arbiter and should forward the request - string host = cmdObj.getStringField("your_name"); - int port = cmdObj.getIntField( "your_port" ); - if ( port == INT_MIN ) { - errmsg = "no port specified"; - problem() << errmsg << endl; - return false; - } - stringstream ss; - ss << host << ":" << port; - string remote = ss.str(); - BSONObj ret; - { - dbtemprelease t; - auto_ptr<DBClientConnection> conn( new DBClientConnection() ); - if ( !conn->connect( remote.c_str(), errmsg ) ) { - result.append( "you_are", ReplPair::State_Master ); - return true; - } - BSONObjBuilder forwardCommand; - forwardCommand.appendElements( cmdObj ); - forwardCommand.appendBool( "fromArbiter", true ); - ret = conn->findOne( "admin.$cmd", forwardCommand.done() ); - } - BSONObjIterator i( ret ); - while( i.moreWithEOO() ) { - BSONElement e = i.next(); - if ( e.eoo() ) - break; - if ( e.fieldName() != string( "ok" ) ) - result.append( e ); - } - return ret["ok"].trueValue(); - } - - int was = cmdObj.getIntField("i_was"); - string myname = cmdObj.getStringField("your_name"); - if ( myname.empty() || was < -3 ) { - errmsg = "your_name/i_was not specified"; - return false; - } - - int N = ReplPair::State_Negotiating; - int M = ReplPair::State_Master; - int S = ReplPair::State_Slave; - - if ( !replPair->dominant( myname ) ) { - result.append( "you_are", N ); - result.append( "i_am", replPair->state ); - return true; - } - - int me, you; - if ( !getInitialSyncCompleted() || ( replPair->state != M && was == M ) ) { - me=S; - you=M; - } - else { - me=M; - you=S; - } - replPair->setMaster( me, "CmdNegotiateMaster::run()" ); - - result.append("you_are", you); - result.append("i_am", me); - - return true; - } - } cmdnegotiatemaster; - - int ReplPair::negotiate(DBClientConnection *conn, string method) { - BSONObjBuilder b; - b.append("negotiatemaster",1); - b.append("i_was", state); - b.append("your_name", remoteHost); - b.append("your_port", remotePort); - BSONObj cmd = b.done(); - BSONObj res = conn->findOne("admin.$cmd", cmd); - if ( ! res["ok"].trueValue() ) { - string message = method + " negotiate failed"; - problem() << message << ": " << res.toString() << '\n'; - setMasterLocked(State_Confused, message.c_str()); - return State_Confused; - } - int x = res.getIntField("you_are"); - int remote = res.getIntField("i_am"); - // State_Negotiating means the remote node is not dominant and cannot - // choose who is master. - if ( x != State_Slave && x != State_Master && x != State_Negotiating ) { - problem() << method << " negotiate: bad you_are value " << res.toString() << endl; - } - else if ( x != State_Negotiating ) { - string message = method + " negotiation"; - setMasterLocked(x, message.c_str()); - } - return remote; - } - - /* --------------------------------------------------------------*/ - ReplSource::ReplSource() { - replacing = false; nClonedThisPass = 0; - paired = false; } ReplSource::ReplSource(BSONObj o) : nClonedThisPass(0) { - replacing = false; - paired = false; only = o.getStringField("only"); hostName = o.getStringField("host"); _sourceName = o.getStringField("source"); @@ -569,8 +284,6 @@ namespace mongo { incompleteCloneDbs.insert( e.fieldName() ); } } - - _lastSavedLocalTs = OpTime( o.getField( "localLogTs" ).date() ); } /* Turn our C++ Source object into a BSONObj */ @@ -583,8 +296,6 @@ namespace mongo { if ( !syncedTo.isNull() ) b.appendTimestamp("syncedTo", syncedTo.asDate()); - b.appendTimestamp("localLogTs", _lastSavedLocalTs.asDate()); - BSONObjBuilder dbsNextPassBuilder; int n = 0; for ( set<string>::iterator i = addDbNextPass.begin(); i != addDbNextPass.end(); i++ ) { @@ -625,16 +336,6 @@ namespace mongo { assert( ! res.mod ); assert( res.num == 1 ); } - - if ( replacing ) { - /* if we were in "replace" mode, we now have synced up with the replacement, - so turn that off. - */ - replacing = false; - wassert( replacePeer ); - replacePeer = false; - Helpers::emptyCollection("local.pair.startup"); - } } static void addSourceToList(ReplSource::SourceVector &v, ReplSource& s, ReplSource::SourceVector &old) { @@ -660,8 +361,6 @@ namespace mongo { SourceVector old = v; v.clear(); - bool gotPairWith = false; - if ( !cmdLine.source.empty() ) { // --source <host> specified. // check that no items are in sources other than that @@ -705,71 +404,21 @@ namespace mongo { } } - if ( replPair ) { - const string &remote = replPair->remote; - // --pairwith host specified. - if ( replSettings.fastsync ) { - Helpers::emptyCollection( "local.sources" ); // ignore saved sources - } - // check that no items are in sources other than that - // add if missing - shared_ptr<Cursor> c = findTableScan("local.sources", BSONObj()); - int n = 0; - while ( c->ok() ) { - n++; - ReplSource tmp(c->current()); - if ( tmp.hostName != remote ) { - log() << "pairwith " << remote << " != " << tmp.hostName << " from local.sources collection" << endl; - log() << "terminating after 30 seconds" << endl; - sleepsecs(30); - dbexit( EXIT_REPLICATION_ERROR ); - } - c->advance(); - } - uassert( 10122 , "local.sources collection corrupt?", n<2 ); - if ( n == 0 ) { - // source missing. add. - ReplSource s; - s.hostName = remote; - s.save(); - } - } - shared_ptr<Cursor> c = findTableScan("local.sources", BSONObj()); while ( c->ok() ) { ReplSource tmp(c->current()); - if ( replPair && tmp.hostName == replPair->remote && tmp.sourceName() == "main" ) { - gotPairWith = true; - tmp.paired = true; - if ( replacePeer ) { - // peer was replaced -- start back at the beginning. - tmp.syncedTo = OpTime(); - tmp.replacing = true; - } - } - if ( ( !replPair && tmp.syncedTo.isNull() ) || - ( replPair && replSettings.fastsync ) ) { + if ( tmp.syncedTo.isNull() ) { DBDirectClient c; if ( c.exists( "local.oplog.$main" ) ) { BSONObj op = c.findOne( "local.oplog.$main", QUERY( "op" << NE << "n" ).sort( BSON( "$natural" << -1 ) ) ); if ( !op.isEmpty() ) { tmp.syncedTo = op[ "ts" ].date(); - tmp._lastSavedLocalTs = op[ "ts" ].date(); } } } addSourceToList(v, tmp, old); c->advance(); } - - if ( !gotPairWith && replPair ) { - /* add the --pairwith server */ - shared_ptr< ReplSource > s( new ReplSource() ); - s->paired = true; - s->hostName = replPair->remote; - s->replacing = replacePeer; - v.push_back(s); - } } BSONObj opTimeQuery = fromjson("{\"getoptime\":1}"); @@ -789,6 +438,7 @@ namespace mongo { SourceVector sources; ReplSource::loadAll(sources); for( SourceVector::iterator i = sources.begin(); i != sources.end(); ++i ) { + log() << requester << " forcing resync from " << (*i)->hostName << endl; (*i)->forceResync( requester ); } replAllDead = 0; @@ -798,7 +448,9 @@ namespace mongo { BSONObj info; { dbtemprelease t; - oplogReader.connect(hostName); + if (!oplogReader.connect(hostName)) { + msgassertedNoTrace( 14051 , "unable to connect to resync"); + } /* todo use getDatabaseNames() method here */ bool ok = oplogReader.conn()->runCommand( "admin", BSON( "listDatabases" << 1 ), info ); massert( 10385 , "Unable to get database list", ok ); @@ -830,22 +482,132 @@ namespace mongo { } /* grab initial copy of a database from the master */ - bool ReplSource::resync(string db) { + void ReplSource::resync(string db) { string dummyNs = resyncDrop( db.c_str(), "internal" ); Client::Context ctx( dummyNs ); { log() << "resync: cloning database " << db << " to get an initial copy" << endl; ReplInfo r("resync: cloning a database"); string errmsg; - bool ok = cloneFrom(hostName.c_str(), errmsg, cc().database()->name, false, /*slaveok*/ true, /*replauth*/ true, /*snapshot*/false); + int errCode = 0; + bool ok = cloneFrom(hostName.c_str(), errmsg, cc().database()->name, false, /*slaveok*/ true, /*replauth*/ true, /*snapshot*/false, /*mayYield*/true, /*mayBeInterrupted*/false, &errCode); if ( !ok ) { - problem() << "resync of " << db << " from " << hostName << " failed " << errmsg << endl; - throw SyncException(); + if ( errCode == DatabaseDifferCaseCode ) { + resyncDrop( db.c_str(), "internal" ); + log() << "resync: database " << db << " not valid on the master due to a name conflict, dropping." << endl; + return; + } + else { + problem() << "resync of " << db << " from " << hostName << " failed " << errmsg << endl; + throw SyncException(); + } } } log() << "resync: done with initial clone for db: " << db << endl; + return; + } + + DatabaseIgnorer ___databaseIgnorer; + + void DatabaseIgnorer::doIgnoreUntilAfter( const string &db, const OpTime &futureOplogTime ) { + if ( futureOplogTime > _ignores[ db ] ) { + _ignores[ db ] = futureOplogTime; + } + } + + bool DatabaseIgnorer::ignoreAt( const string &db, const OpTime ¤tOplogTime ) { + if ( _ignores[ db ].isNull() ) { + return false; + } + if ( _ignores[ db ] >= currentOplogTime ) { + return true; + } else { + // The ignore state has expired, so clear it. + _ignores.erase( db ); + return false; + } + } + + bool ReplSource::handleDuplicateDbName( const BSONObj &op, const char *ns, const char *db ) { + if ( dbHolder.isLoaded( ns, dbpath ) ) { + // Database is already present. + return true; + } + BSONElement ts = op.getField( "ts" ); + if ( ( ts.type() == Date || ts.type() == Timestamp ) && ___databaseIgnorer.ignoreAt( db, ts.date() ) ) { + // Database is ignored due to a previous indication that it is + // missing from master after optime "ts". + return false; + } + if ( Database::duplicateUncasedName( db, dbpath ).empty() ) { + // No duplicate database names are present. + return true; + } + + OpTime lastTime; + bool dbOk = false; + { + dbtemprelease release; + + // We always log an operation after executing it (never before), so + // a database list will always be valid as of an oplog entry generated + // before it was retrieved. + + BSONObj last = oplogReader.findOne( this->ns().c_str(), Query().sort( BSON( "$natural" << -1 ) ) ); + if ( !last.isEmpty() ) { + BSONElement ts = last.getField( "ts" ); + massert( 14032, "Invalid 'ts' in remote log", ts.type() == Date || ts.type() == Timestamp ); + lastTime = OpTime( ts.date() ); + } + + BSONObj info; + bool ok = oplogReader.conn()->runCommand( "admin", BSON( "listDatabases" << 1 ), info ); + massert( 14033, "Unable to get database list", ok ); + BSONObjIterator i( info.getField( "databases" ).embeddedObject() ); + while( i.more() ) { + BSONElement e = i.next(); + + const char * name = e.embeddedObject().getField( "name" ).valuestr(); + if ( strcasecmp( name, db ) != 0 ) + continue; + + if ( strcmp( name, db ) == 0 ) { + // The db exists on master, still need to check that no conflicts exist there. + dbOk = true; + continue; + } + + // The master has a db name that conflicts with the requested name. + dbOk = false; + break; + } + } + + if ( !dbOk ) { + ___databaseIgnorer.doIgnoreUntilAfter( db, lastTime ); + incompleteCloneDbs.erase(db); + addDbNextPass.erase(db); + return false; + } + + // Check for duplicates again, since we released the lock above. + set< string > duplicates; + Database::duplicateUncasedName( db, dbpath, &duplicates ); + + // The database is present on the master and no conflicting databases + // are present on the master. Drop any local conflicts. + for( set< string >::const_iterator i = duplicates.begin(); i != duplicates.end(); ++i ) { + ___databaseIgnorer.doIgnoreUntilAfter( *i, lastTime ); + incompleteCloneDbs.erase(*i); + addDbNextPass.erase(*i); + Client::Context ctx(*i); + dropDatabase(*i); + } + + massert( 14034, "Duplicate database names present after attempting to delete duplicates", + Database::duplicateUncasedName( db, dbpath ).empty() ); return true; } @@ -869,7 +631,7 @@ namespace mongo { @param alreadyLocked caller already put us in write lock if true */ - void ReplSource::sync_pullOpLog_applyOperation(BSONObj& op, OpTime *localLogTail, bool alreadyLocked) { + void ReplSource::sync_pullOpLog_applyOperation(BSONObj& op, bool alreadyLocked) { if( logLevel >= 6 ) // op.tostring is expensive so doing this check explicitly log(6) << "processing op: " << op << endl; @@ -936,17 +698,16 @@ namespace mongo { scoped_ptr<writelock> lk( alreadyLocked ? 0 : new writelock() ); - if ( localLogTail && replPair && replPair->state == ReplPair::State_Master ) { - updateSetsWithLocalOps( *localLogTail, true ); // allow unlocking - updateSetsWithLocalOps( *localLogTail, false ); // don't allow unlocking or conversion to db backed storage - } - if ( replAllDead ) { // hmmm why is this check here and not at top of this function? does it get set between top and here? log() << "replAllDead, throwing SyncException: " << replAllDead << endl; throw SyncException(); } + if ( !handleDuplicateDbName( op, ns, clientName ) ) { + return; + } + Client::Context ctx( ns ); ctx.getClient()->curop()->reset(); @@ -988,78 +749,11 @@ namespace mongo { save(); } else { - bool mod; - if ( replPair && replPair->state == ReplPair::State_Master ) { - BSONObj id = idForOp( op, mod ); - if ( !idTracker.haveId( ns, id ) ) { - applyOperation( op ); - } - else if ( idTracker.haveModId( ns, id ) ) { - log( 6 ) << "skipping operation matching mod id object " << op << endl; - BSONObj existing; - if ( Helpers::findOne( ns, id, existing ) ) - logOp( "i", ns, existing ); - } - else { - log( 6 ) << "skipping operation matching changed id object " << op << endl; - } - } - else { - applyOperation( op ); - } + applyOperation( op ); addDbNextPass.erase( clientName ); } } - BSONObj ReplSource::idForOp( const BSONObj &op, bool &mod ) { - mod = false; - const char *opType = op.getStringField( "op" ); - BSONObj o = op.getObjectField( "o" ); - switch( opType[ 0 ] ) { - case 'i': { - BSONObjBuilder idBuilder; - BSONElement id; - if ( !o.getObjectID( id ) ) - return BSONObj(); - idBuilder.append( id ); - return idBuilder.obj(); - } - case 'u': { - BSONObj o2 = op.getObjectField( "o2" ); - if ( strcmp( o2.firstElement().fieldName(), "_id" ) != 0 ) - return BSONObj(); - if ( o.firstElement().fieldName()[ 0 ] == '$' ) - mod = true; - return o2; - } - case 'd': { - if ( opType[ 1 ] != '\0' ) - return BSONObj(); // skip "db" op type - return o; - } - default: - break; - } - return BSONObj(); - } - - void ReplSource::updateSetsWithOp( const BSONObj &op, bool mayUnlock ) { - if ( mayUnlock ) { - idTracker.mayUpgradeStorage(); - } - bool mod; - BSONObj id = idForOp( op, mod ); - if ( !id.isEmpty() ) { - const char *ns = op.getStringField( "ns" ); - // Since our range of local ops may not be the same as our peer's - // range of unapplied ops, it is always necessary to rewrite objects - // to the oplog after a mod update. - if ( mod ) - idTracker.haveModId( ns, id, true ); - idTracker.haveId( ns, id, true ); - } - } - void ReplSource::syncToTailOfRemoteLog() { string _ns = ns(); BSONObjBuilder b; @@ -1074,65 +768,6 @@ namespace mongo { } } - OpTime ReplSource::nextLastSavedLocalTs() const { - Client::Context ctx( "local.oplog.$main" ); - shared_ptr<Cursor> c = findTableScan( "local.oplog.$main", BSON( "$natural" << -1 ) ); - if ( c->ok() ) - return OpTime( c->current().getField( "ts" ).date() ); - return OpTime(); - } - - void ReplSource::setLastSavedLocalTs( const OpTime &nextLocalTs ) { - _lastSavedLocalTs = nextLocalTs; - log( 3 ) << "updated _lastSavedLocalTs to: " << _lastSavedLocalTs << endl; - } - - void ReplSource::resetSlave() { - log() << "**********************************************************\n"; - log() << "Sending forcedead command to slave to stop its replication\n"; - log() << "Host: " << hostName << " paired: " << paired << endl; - massert( 10387 , "request to kill slave replication failed", - oplogReader.conn()->simpleCommand( "admin", 0, "forcedead" ) ); - syncToTailOfRemoteLog(); - { - dblock lk; - setLastSavedLocalTs( nextLastSavedLocalTs() ); - save(); - oplogReader.resetCursor(); - } - } - - bool ReplSource::updateSetsWithLocalOps( OpTime &localLogTail, bool mayUnlock ) { - Client::Context ctx( "local.oplog.$main" ); - shared_ptr<Cursor> localLog = findTableScan( "local.oplog.$main", BSON( "$natural" << -1 ) ); - OpTime newTail; - for( ; localLog->ok(); localLog->advance() ) { - BSONObj op = localLog->current(); - OpTime ts( localLog->current().getField( "ts" ).date() ); - if ( newTail.isNull() ) { - newTail = ts; - } - if ( !( localLogTail < ts ) ) - break; - updateSetsWithOp( op, mayUnlock ); - if ( mayUnlock ) { - RARELY { - dbtemprelease t; - } - } - } - if ( !localLogTail.isNull() && !localLog->ok() ) { - // local log filled up - idTracker.reset(); - dbtemprelease t; - resetSlave(); - massert( 10388 , "local master log filled, forcing slave resync", false ); - } - if ( !newTail.isNull() ) - localLogTail = newTail; - return true; - } - extern unsigned replApplyBatchSize; /* slave: pull some data from the master's oplog @@ -1149,12 +784,6 @@ namespace mongo { bool tailing = true; oplogReader.tailCheck(); - if ( replPair && replPair->state == ReplPair::State_Master ) { - dblock lk; - idTracker.reset(); - } - OpTime localLogTail = _lastSavedLocalTs; - bool initial = syncedTo.isNull(); if ( !oplogReader.haveCursor() || initial ) { @@ -1215,7 +844,7 @@ namespace mongo { b.append("ns", *i + '.'); b.append("op", "db"); BSONObj op = b.done(); - sync_pullOpLog_applyOperation(op, 0, false); + sync_pullOpLog_applyOperation(op, false); } } @@ -1231,13 +860,6 @@ namespace mongo { } { dblock lk; - OpTime nextLastSaved = nextLastSavedLocalTs(); - { - dbtemprelease t; - if ( !oplogReader.more() ) { - setLastSavedLocalTs( nextLastSaved ); - } - } save(); } return okResultCode; @@ -1266,19 +888,6 @@ namespace mongo { } } - if ( replPair && replPair->state == ReplPair::State_Master ) { - - OpTime next( ts.date() ); - if ( !tailing && !initial && next != syncedTo ) { - log() << "remote slave log filled, forcing slave resync" << endl; - resetSlave(); - return 1; - } - - dblock lk; - updateSetsWithLocalOps( localLogTail, true ); - } - nextOpTime = OpTime( ts.date() ); log(2) << "repl: first op time received: " << nextOpTime.toString() << '\n'; if ( initial ) { @@ -1320,37 +929,21 @@ namespace mongo { int n = 0; time_t saveLast = time(0); while ( 1 ) { - /* from a.s.: - I think the idea here is that we can establish a sync point between the local op log and the remote log with the following steps: - - 1) identify most recent op in local log -- call it O - 2) ask "does nextOpTime reflect the tail of the remote op log?" (in other words, is more() false?) - If yes, all subsequent ops after nextOpTime in the remote log must have occurred after O. If no, we can't establish a sync point. - - Note that we can't do step (2) followed by step (1) because if we do so ops may be added to both machines between steps (2) and (1) and we can't establish a sync point. (In particular, between (2) and (1) an op may be added to the remote log before a different op is added to the local log. In this case, the newest remote op will have occurred after nextOpTime but before O.) - - Now, for performance reasons we don't want to have to identify the most recent op in the local log every time we call c->more() because in performance sensitive situations more() will be true most of the time. So we do: - - 0) more()? - 1) find most recent op in local log - 2) more()? - */ bool moreInitialSyncsPending = !addDbNextPass.empty() && n; // we need "&& n" to assure we actually process at least one op to get a sync point recorded in the first place. if ( moreInitialSyncsPending || !oplogReader.more() ) { dblock lk; - OpTime nextLastSaved = nextLastSavedLocalTs(); + + // NOTE aaron 2011-03-29 This block may be unnecessary, but I'm leaving it in place to avoid changing timing behavior. { dbtemprelease t; if ( !moreInitialSyncsPending && oplogReader.more() ) { - if ( getInitialSyncCompleted() ) { // if initial sync hasn't completed, break out of loop so we can set to completed or clone more dbs - continue; - } - } - else { - setLastSavedLocalTs( nextLastSaved ); + continue; } + // otherwise, break out of loop so we can set to completed or clone more dbs } + if( oplogReader.awaitCapable() && tailing ) okResultCode = 0; // don't sleep syncedTo = nextOpTime; @@ -1415,7 +1008,7 @@ namespace mongo { return okResultCode; } - sync_pullOpLog_applyOperation(op, &localLogTail, !justOne); + sync_pullOpLog_applyOperation(op, !justOne); n++; if( --b == 0 ) @@ -1438,6 +1031,9 @@ namespace mongo { BSONObj userReplQuery = fromjson("{\"user\":\"repl\"}"); bool replAuthenticate(DBClientBase *conn) { + if( noauth ) { + return true; + } if( ! cc().isAdmin() ) { log() << "replauthenticate: requires admin permissions, failing\n"; return false; @@ -1458,7 +1054,7 @@ namespace mongo { // try the first user in local !Helpers::getSingleton("local.system.users", user) ) { log() << "replauthenticate: no user in local.system.users to use for authentication\n"; - return noauth; + return false; } } u = user.getStringField("user"); @@ -1477,13 +1073,24 @@ namespace mongo { bool replHandshake(DBClientConnection *conn) { + string myname = getHostName(); + BSONObj me; { + dblock l; // local.me is an identifier for a server for getLastError w:2+ - if ( ! Helpers::getSingleton( "local.me" , me ) ) { + if ( ! Helpers::getSingleton( "local.me" , me ) || + ! me.hasField("host") || + me["host"].String() != myname ) { + + // clean out local.me + Helpers::emptyCollection("local.me"); + + // repopulate BSONObjBuilder b; b.appendOID( "_id" , 0 , true ); + b.append( "host", myname ); me = b.obj(); Helpers::putSingleton( "local.me" , me ); } @@ -1491,6 +1098,9 @@ namespace mongo { BSONObjBuilder cmd; cmd.appendAs( me["_id"] , "handshake" ); + if (theReplSet) { + cmd.append("member", theReplSet->selfId()); + } BSONObj res; bool ok = conn->runCommand( "admin" , cmd.obj() , res ); @@ -1499,14 +1109,13 @@ namespace mongo { return true; } - bool OplogReader::connect(string hostName) { + bool OplogReader::commonConnect(const string& hostName) { if( conn() == 0 ) { - _conn = auto_ptr<DBClientConnection>(new DBClientConnection( false, 0, replPair ? 20 : 0 /* tcp timeout */)); + _conn = shared_ptr<DBClientConnection>(new DBClientConnection( false, 0, 0 /* tcp timeout */)); string errmsg; ReplInfo r("trying to connect to sync source"); if ( !_conn->connect(hostName.c_str(), errmsg) || - (!noauth && !replAuthenticate(_conn.get())) || - !replHandshake(_conn.get()) ) { + (!noauth && !replAuthenticate(_conn.get())) ) { resetConnection(); log() << "repl: " << errmsg << endl; return false; @@ -1514,6 +1123,37 @@ namespace mongo { } return true; } + + bool OplogReader::connect(string hostName) { + if (conn() != 0) { + return true; + } + + if (commonConnect(hostName)) { + return replHandshake(_conn.get()); + } + return false; + } + + bool OplogReader::connect(const BSONObj& rid, const int from, const string& to) { + if (conn() != 0) { + return true; + } + if (commonConnect(to)) { + log() << "handshake between " << from << " and " << to << endl; + return passthroughHandshake(rid, from); + } + return false; + } + + bool OplogReader::passthroughHandshake(const BSONObj& rid, const int f) { + BSONObjBuilder cmd; + cmd.appendAs( rid["_id"], "handshake" ); + cmd.append( "member" , f ); + + BSONObj res; + return conn()->runCommand( "admin" , cmd.obj() , res ); + } /* note: not yet in mutex at this point. returns >= 0 if ok. return -1 if you want to reconnect. @@ -1541,22 +1181,9 @@ namespace mongo { if ( !oplogReader.connect(hostName) ) { log(4) << "repl: can't connect to sync source" << endl; - if ( replPair && paired ) { - assert( startsWith(hostName.c_str(), replPair->remoteHost.c_str()) ); - replPair->arbitrate(); - } return -1; } - if ( paired ) { - int remote = replPair->negotiate(oplogReader.conn(), "direct"); - int nMasters = ( remote == ReplPair::State_Master ) + ( replPair->state == ReplPair::State_Master ); - if ( getInitialSyncCompleted() && nMasters != 1 ) { - log() << ( nMasters == 0 ? "no master" : "two masters" ) << ", deferring oplog pull" << endl; - return 1; - } - } - /* // get current mtime at the server. BSONObj o = conn->findOne("admin.$cmd", opTimeQuery); @@ -1619,9 +1246,6 @@ namespace mongo { } else sleepAdvice = res; - if ( res >= 0 && !moreToSync /*&& !s->syncedTo.isNull()*/ ) { - pairSync->setInitialSyncCompletedLocking(); - } } catch ( const SyncException& ) { log() << "caught SyncException" << endl; @@ -1662,8 +1286,11 @@ namespace mongo { { dblock lk; if ( replAllDead ) { - if ( !replSettings.autoresync || !ReplSource::throttledForceResyncDead( "auto" ) ) + // throttledForceResyncDead can throw + if ( !replSettings.autoresync || !ReplSource::throttledForceResyncDead( "auto" ) ) { + log() << "all sources dead: " << replAllDead << ", sleeping for 5 seconds" << endl; break; + } } assert( syncing == 0 ); // i.e., there is only one sync thread running. we will want to change/fix this. syncing++; @@ -1697,7 +1324,7 @@ namespace mongo { if ( s ) { stringstream ss; - ss << "repl: sleep " << s << "sec before next pass"; + ss << "repl: sleep " << s << " sec before next pass"; string msg = ss.str(); if ( ! cmdLine.quiet ) log() << msg << endl; @@ -1707,8 +1334,6 @@ namespace mongo { } } - int debug_stop_repl = 0; - static void replMasterThread() { sleepsecs(4); Client::initThread("replmaster"); @@ -1725,7 +1350,7 @@ namespace mongo { if ( lk.got() ) { toSleep = 10; - cc().getAuthenticationInfo()->authorize("admin"); + replLocalAuth(); try { logKeepalive(); @@ -1749,21 +1374,12 @@ namespace mongo { { dblock lk; - cc().getAuthenticationInfo()->authorize("admin"); - - BSONObj obj; - if ( Helpers::getSingleton("local.pair.startup", obj) ) { - // should be: {replacepeer:1} - replacePeer = true; - pairSync->setInitialSyncCompleted(); // we are the half that has all the data - } + replLocalAuth(); } while ( 1 ) { try { replMain(); - if ( debug_stop_repl ) - break; sleepsecs(5); } catch ( AssertionException& ) { @@ -1771,6 +1387,15 @@ namespace mongo { problem() << "Assertion in replSlaveThread(): sleeping 5 minutes before retry" << endl; sleepsecs(300); } + catch ( DBException& e ) { + problem() << "exception in replSlaveThread(): " << e.what() + << ", sleeping 5 minutes before retry" << endl; + sleepsecs(300); + } + catch ( ... ) { + problem() << "error in replSlaveThread(): sleeping 5 minutes before retry" << endl; + sleepsecs(300); + } } } @@ -1783,15 +1408,21 @@ namespace mongo { void newRepl(); void oldRepl(); + void startReplSets(ReplSetCmdline*); void startReplication() { /* if we are going to be a replica set, we aren't doing other forms of replication. */ if( !cmdLine._replSet.empty() ) { - if( replSettings.slave || replSettings.master || replPair ) { + if( replSettings.slave || replSettings.master ) { log() << "***" << endl; log() << "ERROR: can't use --slave or --master replication options with --replSet" << endl; log() << "***" << endl; } newRepl(); + + replSet = true; + ReplSetCmdline *replSetCmdline = new ReplSetCmdline(cmdLine._replSet); + boost::thread t( boost::bind( &startReplSets, replSetCmdline) ); + return; } @@ -1802,28 +1433,22 @@ namespace mongo { */ //boost::thread tempt(tempThread); - if( !replSettings.slave && !replSettings.master && !replPair ) + if( !replSettings.slave && !replSettings.master ) return; { dblock lk; - cc().getAuthenticationInfo()->authorize("admin"); - pairSync->init(); + replLocalAuth(); } - if ( replSettings.slave || replPair ) { - if ( replSettings.slave ) { - assert( replSettings.slave == SimpleSlave ); - log(1) << "slave=true" << endl; - } - else - replSettings.slave = ReplPairSlave; + if ( replSettings.slave ) { + assert( replSettings.slave == SimpleSlave ); + log(1) << "slave=true" << endl; boost::thread repl_thread(replSlaveThread); } - if ( replSettings.master || replPair ) { - if ( replSettings.master ) - log(1) << "master=true" << endl; + if ( replSettings.master ) { + log(1) << "master=true" << endl; replSettings.master = true; createOplog(); boost::thread t(replMasterThread); @@ -1833,11 +1458,6 @@ namespace mongo { sleepmillis( 50 ); } - /* called from main at server startup */ - void pairWith(const char *remoteEnd, const char *arb) { - replPair = new ReplPair(remoteEnd, arb); - } - void testPretouch() { int nthr = min(8, 8); nthr = max(nthr, 1); |