diff options
author | Antonin Kral <a.kral@bobek.cz> | 2011-03-17 00:05:43 +0100 |
---|---|---|
committer | Antonin Kral <a.kral@bobek.cz> | 2011-03-17 00:05:43 +0100 |
commit | 582fc32574a3b158c81e49cb00e6ae59205e66ba (patch) | |
tree | ac64a3243e0d2121709f685695247052858115c8 /db/repl.cpp | |
parent | 2761bffa96595ac1698d86bbc2e95ebb0d4d6e93 (diff) | |
download | mongodb-582fc32574a3b158c81e49cb00e6ae59205e66ba.tar.gz |
Imported Upstream version 1.8.0
Diffstat (limited to 'db/repl.cpp')
-rw-r--r-- | db/repl.cpp | 631 |
1 files changed, 355 insertions, 276 deletions
diff --git a/db/repl.cpp b/db/repl.cpp index ea0eab9..b14034d 100644 --- a/db/repl.cpp +++ b/db/repl.cpp @@ -25,7 +25,7 @@ local.sources - indicates what sources we pull from as a "slave", and the last update of each local.oplog.$main - our op log as "master" - local.dbinfo.<dbname> + local.dbinfo.<dbname> - no longer used??? local.pair.startup - can contain a special value indicating for a pair that we have the master copy. used when replacing other half of the pair which has permanently failed. local.pair.sync - { initialsynccomplete: 1 } @@ -49,13 +49,13 @@ #include "repl/rs.h" namespace mongo { - + // our config from command line etc. ReplSettings replSettings; /* if 1 sync() is running */ volatile int syncing = 0; - static volatile int relinquishSyncingSome = 0; + static volatile int relinquishSyncingSome = 0; /* if true replace our peer in a replication pair -- don't worry about if his local.oplog.$main is empty. @@ -68,9 +68,9 @@ namespace mongo { const char *replAllDead = 0; time_t lastForcedResync = 0; - + IdTracker &idTracker = *( new IdTracker() ); - + } // namespace mongo #include "replpair.h" @@ -159,8 +159,8 @@ namespace mongo { break; { dbtemprelease t; - relinquishSyncingSome = 1; - sleepmillis(1); + relinquishSyncingSome = 1; + sleepmillis(1); } } if ( syncing ) { @@ -206,7 +206,7 @@ namespace mongo { return true; } } cmdForceDead; - + /* operator requested resynchronization of replication (on the slave). { resync : 1 } */ class CmdResync : public Command { public: @@ -221,22 +221,28 @@ namespace mongo { void help(stringstream&h) const { h << "resync (from scratch) an out of date replica slave.\nhttp://www.mongodb.org/display/DOCS/Master+Slave"; } CmdResync() : Command("resync") { } virtual bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl) { + if( cmdLine.usingReplSets() ) { + errmsg = "resync command not currently supported with replica sets. See RS102 info in the mongodb documentations"; + result.append("info", "http://www.mongodb.org/display/DOCS/Resyncing+a+Very+Stale+Replica+Set+Member"); + return false; + } + if ( cmdObj.getBoolField( "force" ) ) { if ( !waitForSyncToFinish( errmsg ) ) return false; replAllDead = "resync forced"; - } + } if ( !replAllDead ) { errmsg = "not dead, no need to resync"; return false; } if ( !waitForSyncToFinish( errmsg ) ) return false; - + ReplSource::forceResyncDead( "client" ); result.append( "info", "triggered resync for all sources" ); - return true; - } + return true; + } bool waitForSyncToFinish( string &errmsg ) const { // Wait for slave thread to finish syncing, so sources will be be // reloaded with new saved state on next pass. @@ -246,7 +252,7 @@ namespace mongo { break; { dbtemprelease t; - relinquishSyncingSome = 1; + relinquishSyncingSome = 1; sleepmillis(1); } } @@ -257,16 +263,31 @@ namespace mongo { return true; } } cmdResync; - - bool anyReplEnabled(){ - return replPair || replSettings.slave || replSettings.master; + + bool anyReplEnabled() { + return replPair || replSettings.slave || replSettings.master || theReplSet; } - void appendReplicationInfo( BSONObjBuilder& result , bool authed , int level ){ - + bool replAuthenticate(DBClientBase *conn); + + void appendReplicationInfo( BSONObjBuilder& result , bool authed , int level ) { + + if ( replSet ) { + if( theReplSet == 0 ) { + result.append("ismaster", false); + result.append("secondary", false); + result.append("info", ReplSet::startupStatusMsg); + result.append( "isreplicaset" , true ); + return; + } + + theReplSet->fillIsMaster(result); + return; + } + if ( replAllDead ) { result.append("ismaster", 0); - if( authed ) { + if( authed ) { if ( replPair ) result.append("remote", replPair->remote); } @@ -285,25 +306,25 @@ namespace mongo { result.appendBool("ismaster", _isMaster() ); } - if ( level && replSet ){ + if ( level && replSet ) { result.append( "info" , "is replica set" ); } - else if ( level ){ + else if ( level ) { BSONObjBuilder sources( result.subarrayStart( "sources" ) ); - + readlock lk( "local.sources" ); - Client::Context ctx( "local.sources" ); + Client::Context ctx( "local.sources", dbpath, 0, authed ); shared_ptr<Cursor> c = findTableScan("local.sources", BSONObj()); int n = 0; - while ( c->ok() ){ + while ( c->ok() ) { BSONObj s = c->current(); - + BSONObjBuilder bb; bb.append( s["host"] ); string sourcename = s["source"].valuestr(); if ( sourcename != "main" ) bb.append( s["source"] ); - + { BSONElement e = s["syncedTo"]; BSONObjBuilder t( bb.subobjStart( "syncedTo" ) ); @@ -311,23 +332,27 @@ namespace mongo { t.append( "inc" , e.timestampInc() ); t.done(); } - - if ( level > 1 ){ + + if ( level > 1 ) { dbtemprelease unlock; + // note: there is no so-style timeout on this connection; perhaps we should have one. ScopedDbConnection conn( s["host"].valuestr() ); - BSONObj first = conn->findOne( (string)"local.oplog.$" + sourcename , Query().sort( BSON( "$natural" << 1 ) ) ); - BSONObj last = conn->findOne( (string)"local.oplog.$" + sourcename , Query().sort( BSON( "$natural" << -1 ) ) ); - bb.appendDate( "masterFirst" , first["ts"].timestampTime() ); - bb.appendDate( "masterLast" , last["ts"].timestampTime() ); - double lag = (double) (last["ts"].timestampTime() - s["syncedTo"].timestampTime()); - bb.append( "lagSeconds" , lag / 1000 ); + DBClientConnection *cliConn = dynamic_cast< DBClientConnection* >( &conn.conn() ); + if ( cliConn && replAuthenticate( cliConn ) ) { + BSONObj first = conn->findOne( (string)"local.oplog.$" + sourcename , Query().sort( BSON( "$natural" << 1 ) ) ); + BSONObj last = conn->findOne( (string)"local.oplog.$" + sourcename , Query().sort( BSON( "$natural" << -1 ) ) ); + bb.appendDate( "masterFirst" , first["ts"].timestampTime() ); + bb.appendDate( "masterLast" , last["ts"].timestampTime() ); + double lag = (double) (last["ts"].timestampTime() - s["syncedTo"].timestampTime()); + bb.append( "lagSeconds" , lag / 1000 ); + } conn.done(); } sources.append( BSONObjBuilder::numStr( n++ ) , bb.obj() ); c->advance(); } - + sources.done(); } } @@ -345,26 +370,15 @@ namespace mongo { virtual LockType locktype() const { return NONE; } CmdIsMaster() : Command("isMaster", true, "ismaster") { } virtual bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool /*fromRepl*/) { - /* currently request to arbiter is (somewhat arbitrarily) an ismaster request that is not - authenticated. - we allow unauthenticated ismaster but we aren't as verbose informationally if - one is not authenticated for admin db to be safe. - */ - - if( replSet ) { - if( theReplSet == 0 ) { - result.append("ismaster", false); - result.append("secondary", false); - errmsg = "replSet still trying to initialize"; - result.append("info", ReplSet::startupStatusMsg); - return true; - } - theReplSet->fillIsMaster(result); - return true; - } - - bool authed = cc().getAuthenticationInfo()->isAuthorizedReads("admin"); + /* currently request to arbiter is (somewhat arbitrarily) an ismaster request that is not + authenticated. + we allow unauthenticated ismaster but we aren't as verbose informationally if + one is not authenticated for admin db to be safe. + */ + bool authed = cc().getAuthenticationInfo()->isAuthorizedReads("admin"); appendReplicationInfo( result , authed ); + + result.appendNumber("maxBsonObjectSize", BSONObjMaxUserSize); return true; } } cmdismaster; @@ -375,14 +389,14 @@ namespace mongo { virtual bool slaveOk() const { return true; } - virtual LockType locktype() const { return WRITE; } + virtual LockType locktype() const { return NONE; } CmdIsInitialSyncComplete() : Command( "isinitialsynccomplete" ) {} virtual bool run(const string&, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool /*fromRepl*/) { result.appendBool( "initialsynccomplete", getInitialSyncCompleted() ); return true; } } cmdisinitialsynccomplete; - + /* negotiate who is master -1=not set (probably means we just booted) @@ -482,7 +496,7 @@ namespace mongo { return true; } } cmdnegotiatemaster; - + int ReplPair::negotiate(DBClientConnection *conn, string method) { BSONObjBuilder b; b.append("negotiatemaster",1); @@ -491,7 +505,7 @@ namespace mongo { b.append("your_port", remotePort); BSONObj cmd = b.done(); BSONObj res = conn->findOne("admin.$cmd", cmd); - if ( ! res["ok"].trueValue() ){ + if ( ! res["ok"].trueValue() ) { string message = method + " negotiate failed"; problem() << message << ": " << res.toString() << '\n'; setMasterLocked(State_Confused, message.c_str()); @@ -503,7 +517,8 @@ namespace mongo { // choose who is master. if ( x != State_Slave && x != State_Master && x != State_Negotiating ) { problem() << method << " negotiate: bad you_are value " << res.toString() << endl; - } else if ( x != State_Negotiating ) { + } + else if ( x != State_Negotiating ) { string message = method + " negotiation"; setMasterLocked(x, message.c_str()); } @@ -542,8 +557,8 @@ namespace mongo { break; addDbNextPass.insert( e.fieldName() ); } - } - + } + dbsObj = o.getObjectField("incompleteCloneDbs"); if ( !dbsObj.isEmpty() ) { BSONObjIterator i(dbsObj); @@ -553,7 +568,7 @@ namespace mongo { break; incompleteCloneDbs.insert( e.fieldName() ); } - } + } _lastSavedLocalTs = OpTime( o.getField( "localLogTs" ).date() ); } @@ -569,7 +584,7 @@ namespace mongo { b.appendTimestamp("syncedTo", syncedTo.asDate()); b.appendTimestamp("localLogTs", _lastSavedLocalTs.asDate()); - + BSONObjBuilder dbsNextPassBuilder; int n = 0; for ( set<string>::iterator i = addDbNextPass.begin(); i != addDbNextPass.end(); i++ ) { @@ -622,7 +637,7 @@ namespace mongo { } } - static void addSourceToList(ReplSource::SourceVector &v, ReplSource& s, const BSONObj &spec, ReplSource::SourceVector &old) { + static void addSourceToList(ReplSource::SourceVector &v, ReplSource& s, ReplSource::SourceVector &old) { if ( !s.syncedTo.isNull() ) { // Don't reuse old ReplSource if there was a forced resync. for ( ReplSource::SourceVector::iterator i = old.begin(); i != old.end(); ) { if ( s == **i ) { @@ -684,11 +699,12 @@ namespace mongo { else { try { massert( 10384 , "--only requires use of --source", cmdLine.only.empty()); - } catch ( ... ) { + } + catch ( ... ) { dbexit( EXIT_BADOPTIONS ); } } - + if ( replPair ) { const string &remote = replPair->remote; // --pairwith host specified. @@ -730,9 +746,9 @@ namespace mongo { tmp.syncedTo = OpTime(); tmp.replacing = true; } - } + } if ( ( !replPair && tmp.syncedTo.isNull() ) || - ( replPair && replSettings.fastsync ) ) { + ( replPair && replSettings.fastsync ) ) { DBDirectClient c; if ( c.exists( "local.oplog.$main" ) ) { BSONObj op = c.findOne( "local.oplog.$main", QUERY( "op" << NE << "n" ).sort( BSON( "$natural" << -1 ) ) ); @@ -742,7 +758,7 @@ namespace mongo { } } } - addSourceToList(v, tmp, c->current(), old); + addSourceToList(v, tmp, old); c->advance(); } @@ -766,7 +782,7 @@ namespace mongo { } return false; } - + void ReplSource::forceResyncDead( const char *requester ) { if ( !replAllDead ) return; @@ -775,9 +791,9 @@ namespace mongo { for( SourceVector::iterator i = sources.begin(); i != sources.end(); ++i ) { (*i)->forceResync( requester ); } - replAllDead = 0; + replAllDead = 0; } - + void ReplSource::forceResync( const char *requester ) { BSONObj info; { @@ -800,7 +816,7 @@ namespace mongo { } } } - } + } syncedTo = OpTime(); addDbNextPass.clear(); save(); @@ -812,7 +828,7 @@ namespace mongo { dropDatabase(db); return db; } - + /* grab initial copy of a database from the master */ bool ReplSource::resync(string db) { string dummyNs = resyncDrop( db.c_str(), "internal" ); @@ -841,7 +857,7 @@ namespace mongo { log() << "sync: caught user assertion " << e << " while applying op: " << op << endl;; } catch ( DBException& e ) { - log() << "sync: caught db exception " << e << " while applying op: " << op << endl;; + log() << "sync: caught db exception " << e << " while applying op: " << op << endl;; } } @@ -850,15 +866,17 @@ namespace mongo { { ts: ..., op: <optype>, ns: ..., o: <obj> , o2: <extraobj>, b: <boolflag> } ... see logOp() comments. + + @param alreadyLocked caller already put us in write lock if true */ - void ReplSource::sync_pullOpLog_applyOperation(BSONObj& op, OpTime *localLogTail) { + void ReplSource::sync_pullOpLog_applyOperation(BSONObj& op, OpTime *localLogTail, bool alreadyLocked) { if( logLevel >= 6 ) // op.tostring is expensive so doing this check explicitly log(6) << "processing op: " << op << endl; if( op.getStringField("op")[0] == 'n' ) return; - char clientName[MaxDatabaseLen]; + char clientName[MaxDatabaseNameLen]; const char *ns = op.getStringField("ns"); nsToDatabase(ns, clientName); @@ -867,22 +885,27 @@ namespace mongo { return; } else if ( *ns == 0 ) { - problem() << "halting replication, bad op in oplog:\n " << op.toString() << endl; - replAllDead = "bad object in oplog"; - throw SyncException(); + /*if( op.getStringField("op")[0] != 'n' )*/ { + problem() << "halting replication, bad op in oplog:\n " << op.toString() << endl; + replAllDead = "bad object in oplog"; + throw SyncException(); + } + //ns = "local.system.x"; + //nsToDatabase(ns, clientName); } if ( !only.empty() && only != clientName ) return; - if( cmdLine.pretouch ) { + if( cmdLine.pretouch && !alreadyLocked/*doesn't make sense if in write lock already*/ ) { if( cmdLine.pretouch > 1 ) { /* note: this is bad - should be put in ReplSource. but this is first test... */ static int countdown; + assert( countdown >= 0 ); if( countdown > 0 ) { countdown--; // was pretouched on a prev pass - assert( countdown >= 0 ); - } else { + } + else { const int m = 4; if( tp.get() == 0 ) { int nthr = min(8, cmdLine.pretouch); @@ -911,7 +934,7 @@ namespace mongo { } } - dblock lk; + scoped_ptr<writelock> lk( alreadyLocked ? 0 : new writelock() ); if ( localLogTail && replPair && replPair->state == ReplPair::State_Master ) { updateSetsWithLocalOps( *localLogTail, true ); // allow unlocking @@ -923,7 +946,7 @@ namespace mongo { log() << "replAllDead, throwing SyncException: " << replAllDead << endl; throw SyncException(); } - + Client::Context ctx( ns ); ctx.getClient()->curop()->reset(); @@ -932,14 +955,14 @@ namespace mongo { if( logLevel >= 6 ) log(6) << "ns: " << ns << ", justCreated: " << ctx.justCreated() << ", empty: " << empty << ", incompleteClone: " << incompleteClone << endl; - + // always apply admin command command // this is a bit hacky -- the semantics of replication/commands aren't well specified if ( strcmp( clientName, "admin" ) == 0 && *op.getStringField( "op" ) == 'c' ) { applyOperation( op ); return; } - + if ( ctx.justCreated() || empty || incompleteClone ) { // we must add to incomplete list now that setClient has been called incompleteCloneDbs.insert( clientName ); @@ -950,7 +973,8 @@ namespace mongo { clone 100 databases in one pass.) */ addDbNextPass.insert( clientName ); - } else { + } + else { if ( incompleteClone ) { log() << "An earlier initial clone of '" << clientName << "' did not complete, now resyncing." << endl; } @@ -962,21 +986,25 @@ namespace mongo { incompleteCloneDbs.erase( clientName ); } save(); - } else { + } + else { bool mod; if ( replPair && replPair->state == ReplPair::State_Master ) { BSONObj id = idForOp( op, mod ); if ( !idTracker.haveId( ns, id ) ) { - applyOperation( op ); - } else if ( idTracker.haveModId( ns, id ) ) { + applyOperation( op ); + } + else if ( idTracker.haveModId( ns, id ) ) { log( 6 ) << "skipping operation matching mod id object " << op << endl; BSONObj existing; if ( Helpers::findOne( ns, id, existing ) ) logOp( "i", ns, existing ); - } else { + } + else { log( 6 ) << "skipping operation matching changed id object " << op << endl; } - } else { + } + else { applyOperation( op ); } addDbNextPass.erase( clientName ); @@ -988,33 +1016,33 @@ namespace mongo { const char *opType = op.getStringField( "op" ); BSONObj o = op.getObjectField( "o" ); switch( opType[ 0 ] ) { - case 'i': { - BSONObjBuilder idBuilder; - BSONElement id; - if ( !o.getObjectID( id ) ) - return BSONObj(); - idBuilder.append( id ); - return idBuilder.obj(); - } - case 'u': { - BSONObj o2 = op.getObjectField( "o2" ); - if ( strcmp( o2.firstElement().fieldName(), "_id" ) != 0 ) - return BSONObj(); - if ( o.firstElement().fieldName()[ 0 ] == '$' ) - mod = true; - return o2; - } - case 'd': { - if ( opType[ 1 ] != '\0' ) - return BSONObj(); // skip "db" op type - return o; - } - default: - break; - } + case 'i': { + BSONObjBuilder idBuilder; + BSONElement id; + if ( !o.getObjectID( id ) ) + return BSONObj(); + idBuilder.append( id ); + return idBuilder.obj(); + } + case 'u': { + BSONObj o2 = op.getObjectField( "o2" ); + if ( strcmp( o2.firstElement().fieldName(), "_id" ) != 0 ) + return BSONObj(); + if ( o.firstElement().fieldName()[ 0 ] == '$' ) + mod = true; + return o2; + } + case 'd': { + if ( opType[ 1 ] != '\0' ) + return BSONObj(); // skip "db" op type + return o; + } + default: + break; + } return BSONObj(); } - + void ReplSource::updateSetsWithOp( const BSONObj &op, bool mayUnlock ) { if ( mayUnlock ) { idTracker.mayUpgradeStorage(); @@ -1029,42 +1057,42 @@ namespace mongo { if ( mod ) idTracker.haveModId( ns, id, true ); idTracker.haveId( ns, id, true ); - } + } } - + void ReplSource::syncToTailOfRemoteLog() { string _ns = ns(); BSONObjBuilder b; if ( !only.empty() ) { b.appendRegex("ns", string("^") + only); - } + } BSONObj last = oplogReader.findOne( _ns.c_str(), Query( b.done() ).sort( BSON( "$natural" << -1 ) ) ); if ( !last.isEmpty() ) { BSONElement ts = last.getField( "ts" ); massert( 10386 , "non Date ts found: " + last.toString(), ts.type() == Date || ts.type() == Timestamp ); syncedTo = OpTime( ts.date() ); - } + } } - + OpTime ReplSource::nextLastSavedLocalTs() const { Client::Context ctx( "local.oplog.$main" ); shared_ptr<Cursor> c = findTableScan( "local.oplog.$main", BSON( "$natural" << -1 ) ); if ( c->ok() ) - return OpTime( c->current().getField( "ts" ).date() ); + return OpTime( c->current().getField( "ts" ).date() ); return OpTime(); } - + void ReplSource::setLastSavedLocalTs( const OpTime &nextLocalTs ) { _lastSavedLocalTs = nextLocalTs; log( 3 ) << "updated _lastSavedLocalTs to: " << _lastSavedLocalTs << endl; } - + void ReplSource::resetSlave() { log() << "**********************************************************\n"; log() << "Sending forcedead command to slave to stop its replication\n"; log() << "Host: " << hostName << " paired: " << paired << endl; massert( 10387 , "request to kill slave replication failed", - oplogReader.conn()->simpleCommand( "admin", 0, "forcedead" ) ); + oplogReader.conn()->simpleCommand( "admin", 0, "forcedead" ) ); syncToTailOfRemoteLog(); { dblock lk; @@ -1073,7 +1101,7 @@ namespace mongo { oplogReader.resetCursor(); } } - + bool ReplSource::updateSetsWithLocalOps( OpTime &localLogTail, bool mayUnlock ) { Client::Context ctx( "local.oplog.$main" ); shared_ptr<Cursor> localLog = findTableScan( "local.oplog.$main", BSON( "$natural" << -1 ) ); @@ -1099,14 +1127,16 @@ namespace mongo { dbtemprelease t; resetSlave(); massert( 10388 , "local master log filled, forcing slave resync", false ); - } + } if ( !newTail.isNull() ) localLogTail = newTail; return true; } - + + extern unsigned replApplyBatchSize; + /* slave: pull some data from the master's oplog - note: not yet in db mutex at this point. + note: not yet in db mutex at this point. @return -1 error 0 ok, don't sleep 1 ok, sleep @@ -1126,7 +1156,7 @@ namespace mongo { OpTime localLogTail = _lastSavedLocalTs; bool initial = syncedTo.isNull(); - + if ( !oplogReader.haveCursor() || initial ) { if ( initial ) { // Important to grab last oplog timestamp before listing databases. @@ -1152,13 +1182,13 @@ namespace mongo { dblock lk; save(); } - + BSONObjBuilder q; q.appendDate("$gte", syncedTo.asDate()); BSONObjBuilder query; query.append("ts", q.done()); if ( !only.empty() ) { - // note we may here skip a LOT of data table scanning, a lot of work for the master. + // note we may here skip a LOT of data table scanning, a lot of work for the master. query.appendRegex("ns", string("^") + only); // maybe append "\\." here? } BSONObj queryObj = query.done(); @@ -1185,7 +1215,7 @@ namespace mongo { b.append("ns", *i + '.'); b.append("op", "db"); BSONObj op = b.done(); - sync_pullOpLog_applyOperation(op, 0); + sync_pullOpLog_applyOperation(op, 0, false); } } @@ -1195,7 +1225,8 @@ namespace mongo { if( oplogReader.awaitCapable() ) okResultCode = 0; // don't sleep - } else { + } + else { log() << "repl: " << ns << " oplog is empty\n"; } { @@ -1207,11 +1238,11 @@ namespace mongo { setLastSavedLocalTs( nextLastSaved ); } } - save(); + save(); } return okResultCode; } - + OpTime nextOpTime; { BSONObj op = oplogReader.next(); @@ -1234,32 +1265,31 @@ namespace mongo { massert( 10391 , "repl: bad object read from remote oplog", false); } } - + if ( replPair && replPair->state == ReplPair::State_Master ) { - + OpTime next( ts.date() ); if ( !tailing && !initial && next != syncedTo ) { log() << "remote slave log filled, forcing slave resync" << endl; resetSlave(); return 1; - } - + } + dblock lk; updateSetsWithLocalOps( localLogTail, true ); } - + nextOpTime = OpTime( ts.date() ); log(2) << "repl: first op time received: " << nextOpTime.toString() << '\n'; - if ( tailing || initial ) { - if ( initial ) - log(1) << "repl: initial run\n"; - else { - if( !( syncedTo <= nextOpTime ) ) { - log() << "repl ASSERTION failed : syncedTo <= nextOpTime" << endl; - log() << "repl syncTo: " << syncedTo.toStringLong() << endl; - log() << "repl nextOpTime: " << nextOpTime.toStringLong() << endl; - assert(false); - } + if ( initial ) { + log(1) << "repl: initial run\n"; + } + if( tailing ) { + if( !( syncedTo < nextOpTime ) ) { + log() << "repl ASSERTION failed : syncedTo < nextOpTime" << endl; + log() << "repl syncTo: " << syncedTo.toStringLong() << endl; + log() << "repl nextOpTime: " << nextOpTime.toStringLong() << endl; + assert(false); } oplogReader.putBack( op ); // op will be processed in the loop below nextOpTime = OpTime(); // will reread the op below @@ -1281,14 +1311,14 @@ namespace mongo { throw SyncException(); } else { - /* t == syncedTo, so the first op was applied previously. */ + /* t == syncedTo, so the first op was applied previously or it is the first op of initial query and need not be applied. */ } } // apply operations { int n = 0; - time_t saveLast = time(0); + time_t saveLast = time(0); while ( 1 ) { /* from a.s.: I think the idea here is that we can establish a sync point between the local op log and the remote log with the following steps: @@ -1316,7 +1346,8 @@ namespace mongo { if ( getInitialSyncCompleted() ) { // if initial sync hasn't completed, break out of loop so we can set to completed or clone more dbs continue; } - } else { + } + else { setLastSavedLocalTs( nextLastSaved ); } } @@ -1332,109 +1363,132 @@ namespace mongo { else { } - OCCASIONALLY if( n > 0 && ( n > 100000 || time(0) - saveLast > 60 ) ) { - // periodically note our progress, in case we are doing a lot of work and crash - dblock lk; + OCCASIONALLY if( n > 0 && ( n > 100000 || time(0) - saveLast > 60 ) ) { + // periodically note our progress, in case we are doing a lot of work and crash + dblock lk; syncedTo = nextOpTime; // can't update local log ts since there are pending operations from our peer - save(); + save(); log() << "repl: checkpoint applied " << n << " operations" << endl; log() << "repl: syncedTo: " << syncedTo.toStringLong() << endl; - saveLast = time(0); - n = 0; - } + saveLast = time(0); + n = 0; + } BSONObj op = oplogReader.next(); - BSONElement ts = op.getField("ts"); - if( !( ts.type() == Date || ts.type() == Timestamp ) ) { - log() << "sync error: problem querying remote oplog record\n"; - log() << "op: " << op.toString() << '\n'; - log() << "halting replication" << endl; - replInfo = replAllDead = "sync error: no ts found querying remote oplog record"; - throw SyncException(); - } - OpTime last = nextOpTime; - nextOpTime = OpTime( ts.date() ); - if ( !( last < nextOpTime ) ) { - log() << "sync error: last applied optime at slave >= nextOpTime from master" << endl; - log() << " last: " << last.toStringLong() << '\n'; - log() << " nextOpTime: " << nextOpTime.toStringLong() << '\n'; - log() << " halting replication" << endl; - replInfo = replAllDead = "sync error last >= nextOpTime"; - uassert( 10123 , "replication error last applied optime at slave >= nextOpTime from master", false); - } - if ( replSettings.slavedelay && ( unsigned( time( 0 ) ) < nextOpTime.getSecs() + replSettings.slavedelay ) ) { - oplogReader.putBack( op ); - _sleepAdviceTime = nextOpTime.getSecs() + replSettings.slavedelay + 1; - dblock lk; - if ( n > 0 ) { - syncedTo = last; - save(); + + unsigned b = replApplyBatchSize; + bool justOne = b == 1; + scoped_ptr<writelock> lk( justOne ? 0 : new writelock() ); + while( 1 ) { + + BSONElement ts = op.getField("ts"); + if( !( ts.type() == Date || ts.type() == Timestamp ) ) { + log() << "sync error: problem querying remote oplog record" << endl; + log() << "op: " << op.toString() << endl; + log() << "halting replication" << endl; + replInfo = replAllDead = "sync error: no ts found querying remote oplog record"; + throw SyncException(); + } + OpTime last = nextOpTime; + nextOpTime = OpTime( ts.date() ); + if ( !( last < nextOpTime ) ) { + log() << "sync error: last applied optime at slave >= nextOpTime from master" << endl; + log() << " last: " << last.toStringLong() << endl; + log() << " nextOpTime: " << nextOpTime.toStringLong() << endl; + log() << " halting replication" << endl; + replInfo = replAllDead = "sync error last >= nextOpTime"; + uassert( 10123 , "replication error last applied optime at slave >= nextOpTime from master", false); + } + if ( replSettings.slavedelay && ( unsigned( time( 0 ) ) < nextOpTime.getSecs() + replSettings.slavedelay ) ) { + assert( justOne ); + oplogReader.putBack( op ); + _sleepAdviceTime = nextOpTime.getSecs() + replSettings.slavedelay + 1; + dblock lk; + if ( n > 0 ) { + syncedTo = last; + save(); + } + log() << "repl: applied " << n << " operations" << endl; + log() << "repl: syncedTo: " << syncedTo.toStringLong() << endl; + log() << "waiting until: " << _sleepAdviceTime << " to continue" << endl; + return okResultCode; } - log() << "repl: applied " << n << " operations" << endl; - log() << "repl: syncedTo: " << syncedTo.toStringLong() << endl; - log() << "waiting until: " << _sleepAdviceTime << " to continue" << endl; - break; - } - sync_pullOpLog_applyOperation(op, &localLogTail); - n++; + sync_pullOpLog_applyOperation(op, &localLogTail, !justOne); + n++; + + if( --b == 0 ) + break; + // if to here, we are doing mulpile applications in a singel write lock acquisition + if( !oplogReader.moreInCurrentBatch() ) { + // break if no more in batch so we release lock while reading from the master + break; + } + op = oplogReader.next(); + + getDur().commitIfNeeded(); + } } } return okResultCode; } - BSONObj userReplQuery = fromjson("{\"user\":\"repl\"}"); - - bool replAuthenticate(DBClientConnection *conn) { - if( ! cc().isAdmin() ){ - log() << "replauthenticate: requires admin permissions, failing\n"; - return false; - } - - BSONObj user; - { - dblock lk; - Client::Context ctxt("local."); - if( !Helpers::findOne("local.system.users", userReplQuery, user) ) { - // try the first user is local - if( !Helpers::getSingleton("local.system.users", user) ) { - if( noauth ) - return true; // presumably we are running a --noauth setup all around. - - log() << "replauthenticate: no user in local.system.users to use for authentication\n"; - return false; - } - } - - } - - string u = user.getStringField("user"); - string p = user.getStringField("pwd"); - massert( 10392 , "bad user object? [1]", !u.empty()); - massert( 10393 , "bad user object? [2]", !p.empty()); - string err; - if( !conn->auth("local", u.c_str(), p.c_str(), err, false) ) { - log() << "replauthenticate: can't authenticate to master server, user:" << u << endl; - return false; - } - return true; - } + BSONObj userReplQuery = fromjson("{\"user\":\"repl\"}"); + + bool replAuthenticate(DBClientBase *conn) { + if( ! cc().isAdmin() ) { + log() << "replauthenticate: requires admin permissions, failing\n"; + return false; + } + + string u; + string p; + if (internalSecurity.pwd.length() > 0) { + u = internalSecurity.user; + p = internalSecurity.pwd; + } + else { + BSONObj user; + { + dblock lk; + Client::Context ctxt("local."); + if( !Helpers::findOne("local.system.users", userReplQuery, user) || + // try the first user in local + !Helpers::getSingleton("local.system.users", user) ) { + log() << "replauthenticate: no user in local.system.users to use for authentication\n"; + return noauth; + } + } + u = user.getStringField("user"); + p = user.getStringField("pwd"); + massert( 10392 , "bad user object? [1]", !u.empty()); + massert( 10393 , "bad user object? [2]", !p.empty()); + } + + string err; + if( !conn->auth("local", u.c_str(), p.c_str(), err, false) ) { + log() << "replauthenticate: can't authenticate to master server, user:" << u << endl; + return false; + } + return true; + } bool replHandshake(DBClientConnection *conn) { - + BSONObj me; { dblock l; - if ( ! Helpers::getSingleton( "local.me" , me ) ){ + // local.me is an identifier for a server for getLastError w:2+ + if ( ! Helpers::getSingleton( "local.me" , me ) ) { BSONObjBuilder b; b.appendOID( "_id" , 0 , true ); me = b.obj(); Helpers::putSingleton( "local.me" , me ); } } - + BSONObjBuilder cmd; cmd.appendAs( me["_id"] , "handshake" ); @@ -1450,9 +1504,9 @@ namespace mongo { _conn = auto_ptr<DBClientConnection>(new DBClientConnection( false, 0, replPair ? 20 : 0 /* tcp timeout */)); string errmsg; ReplInfo r("trying to connect to sync source"); - if ( !_conn->connect(hostName.c_str(), errmsg) || - !replAuthenticate(_conn.get()) || - !replHandshake(_conn.get()) ) { + if ( !_conn->connect(hostName.c_str(), errmsg) || + (!noauth && !replAuthenticate(_conn.get())) || + !replHandshake(_conn.get()) ) { resetConnection(); log() << "repl: " << errmsg << endl; return false; @@ -1460,7 +1514,7 @@ namespace mongo { } return true; } - + /* note: not yet in mutex at this point. returns >= 0 if ok. return -1 if you want to reconnect. return value of zero indicates no sleep necessary before next call @@ -1486,14 +1540,14 @@ namespace mongo { } if ( !oplogReader.connect(hostName) ) { - log(4) << "repl: can't connect to sync source" << endl; + log(4) << "repl: can't connect to sync source" << endl; if ( replPair && paired ) { assert( startsWith(hostName.c_str(), replPair->remoteHost.c_str()) ); replPair->arbitrate(); } return -1; } - + if ( paired ) { int remote = replPair->negotiate(oplogReader.conn(), "direct"); int nMasters = ( remote == ReplPair::State_Master ) + ( replPair->state == ReplPair::State_Master ); @@ -1504,17 +1558,17 @@ namespace mongo { } /* - // get current mtime at the server. - BSONObj o = conn->findOne("admin.$cmd", opTimeQuery); - BSONElement e = o.getField("optime"); - if( e.eoo() ) { - log() << "repl: failed to get cur optime from master" << endl; - log() << " " << o.toString() << endl; - return false; - } - uassert( 10124 , e.type() == Date ); - OpTime serverCurTime; - serverCurTime.asDate() = e.date(); + // get current mtime at the server. + BSONObj o = conn->findOne("admin.$cmd", opTimeQuery); + BSONElement e = o.getField("optime"); + if( e.eoo() ) { + log() << "repl: failed to get cur optime from master" << endl; + log() << " " << o.toString() << endl; + return false; + } + uassert( 10124 , e.type() == Date ); + OpTime serverCurTime; + serverCurTime.asDate() = e.date(); */ return sync_pullOpLog(nApplied); } @@ -1527,7 +1581,7 @@ namespace mongo { _ reuse that cursor when we can */ - /* returns: # of seconds to sleep before next pass + /* returns: # of seconds to sleep before next pass 0 = no sleep recommended 1 = special sentinel indicating adaptive sleep recommended */ @@ -1543,6 +1597,7 @@ namespace mongo { /* replication is not configured yet (for --slave) in local.sources. Poll for config it every 20 seconds. */ + log() << "no source given, add a master to local.sources to start replication" << endl; return 20; } @@ -1553,7 +1608,7 @@ namespace mongo { try { res = s->sync(nApplied); bool moreToSync = s->haveMoreDbsToSync(); - if( res < 0 ) { + if( res < 0 ) { sleepAdvice = 3; } else if( moreToSync ) { @@ -1562,7 +1617,7 @@ namespace mongo { else if ( s->sleepAdvice() ) { sleepAdvice = s->sleepAdvice(); } - else + else sleepAdvice = res; if ( res >= 0 && !moreToSync /*&& !s->syncedTo.isNull()*/ ) { pairSync->setInitialSyncCompletedLocking(); @@ -1588,9 +1643,9 @@ namespace mongo { } catch ( const std::exception &e ) { log() << "repl: std::exception " << e.what() << endl; - replInfo = "replMain caught std::exception"; + replInfo = "replMain caught std::exception"; } - catch ( ... ) { + catch ( ... ) { log() << "unexpected exception during replication. replication will halt" << endl; replAllDead = "caught unexpected exception during replication"; } @@ -1616,15 +1671,16 @@ namespace mongo { try { int nApplied = 0; s = _replMain(sources, nApplied); - if( s == 1 ) { + if( s == 1 ) { if( nApplied == 0 ) s = 2; - else if( nApplied > 100 ) { + else if( nApplied > 100 ) { // sleep very little - just enought that we aren't truly hammering master sleepmillis(75); s = 0; } } - } catch (...) { + } + catch (...) { out() << "caught exception in _replMain" << endl; s = 4; } @@ -1634,10 +1690,10 @@ namespace mongo { syncing--; } - if( relinquishSyncingSome ) { - relinquishSyncingSome = 0; - s = 1; // sleep before going back in to syncing=1 - } + if( relinquishSyncingSome ) { + relinquishSyncingSome = 0; + s = 1; // sleep before going back in to syncing=1 + } if ( s ) { stringstream ss; @@ -1660,21 +1716,21 @@ namespace mongo { while( 1 ) { sleepsecs( toSleep ); - /* write a keep-alive like entry to the log. this will make things like + /* write a keep-alive like entry to the log. this will make things like printReplicationStatus() and printSlaveReplicationStatus() stay up-to-date even when things are idle. */ { writelocktry lk("",1); - if ( lk.got() ){ + if ( lk.got() ) { toSleep = 10; - - cc().getAuthenticationInfo()->authorize("admin"); - - try { + + cc().getAuthenticationInfo()->authorize("admin"); + + try { logKeepalive(); } - catch(...) { + catch(...) { log() << "caught exception in replMasterThread()" << endl; } } @@ -1690,11 +1746,11 @@ namespace mongo { sleepsecs(1); Client::initThread("replslave"); cc().iAmSyncThread(); - + { dblock lk; cc().getAuthenticationInfo()->authorize("admin"); - + BSONObj obj; if ( Helpers::getSingleton("local.pair.startup", obj) ) { // should be: {replacepeer:1} @@ -1730,12 +1786,11 @@ namespace mongo { void startReplication() { /* if we are going to be a replica set, we aren't doing other forms of replication. */ if( !cmdLine._replSet.empty() ) { - if( replSettings.slave || replSettings.master || replPair ) { + if( replSettings.slave || replSettings.master || replPair ) { log() << "***" << endl; log() << "ERROR: can't use --slave or --master replication options with --replSet" << endl; log() << "***" << endl; } - createOplog(); newRepl(); return; } @@ -1773,7 +1828,7 @@ namespace mongo { createOplog(); boost::thread t(replMasterThread); } - + while( replSettings.fastsync ) // don't allow writes until we've set up from log sleepmillis( 50 ); } @@ -1807,5 +1862,29 @@ namespace mongo { } tp.join(); } - + + class ReplApplyBatchSizeValidator : public ParameterValidator { + public: + ReplApplyBatchSizeValidator() : ParameterValidator( "replApplyBatchSize" ) {} + + virtual bool isValid( BSONElement e , string& errmsg ) { + int b = e.numberInt(); + if( b < 1 || b > 1024 ) { + errmsg = "replApplyBatchSize has to be >= 1 and < 1024"; + return false; + } + + if ( replSettings.slavedelay != 0 && b > 1 ) { + errmsg = "can't use a batch size > 1 with slavedelay"; + return false; + } + if ( ! replSettings.slave ) { + errmsg = "can't set replApplyBatchSize on a non-slave machine"; + return false; + } + + return true; + } + } replApplyBatchSizeValidator; + } // namespace mongo |