summaryrefslogtreecommitdiff
path: root/db/repl.cpp
diff options
context:
space:
mode:
authorAntonin Kral <a.kral@bobek.cz>2011-03-17 00:05:43 +0100
committerAntonin Kral <a.kral@bobek.cz>2011-03-17 00:05:43 +0100
commit582fc32574a3b158c81e49cb00e6ae59205e66ba (patch)
treeac64a3243e0d2121709f685695247052858115c8 /db/repl.cpp
parent2761bffa96595ac1698d86bbc2e95ebb0d4d6e93 (diff)
downloadmongodb-582fc32574a3b158c81e49cb00e6ae59205e66ba.tar.gz
Imported Upstream version 1.8.0
Diffstat (limited to 'db/repl.cpp')
-rw-r--r--db/repl.cpp631
1 files changed, 355 insertions, 276 deletions
diff --git a/db/repl.cpp b/db/repl.cpp
index ea0eab9..b14034d 100644
--- a/db/repl.cpp
+++ b/db/repl.cpp
@@ -25,7 +25,7 @@
local.sources - indicates what sources we pull from as a "slave", and the last update of each
local.oplog.$main - our op log as "master"
- local.dbinfo.<dbname>
+ local.dbinfo.<dbname> - no longer used???
local.pair.startup - can contain a special value indicating for a pair that we have the master copy.
used when replacing other half of the pair which has permanently failed.
local.pair.sync - { initialsynccomplete: 1 }
@@ -49,13 +49,13 @@
#include "repl/rs.h"
namespace mongo {
-
+
// our config from command line etc.
ReplSettings replSettings;
/* if 1 sync() is running */
volatile int syncing = 0;
- static volatile int relinquishSyncingSome = 0;
+ static volatile int relinquishSyncingSome = 0;
/* if true replace our peer in a replication pair -- don't worry about if his
local.oplog.$main is empty.
@@ -68,9 +68,9 @@ namespace mongo {
const char *replAllDead = 0;
time_t lastForcedResync = 0;
-
+
IdTracker &idTracker = *( new IdTracker() );
-
+
} // namespace mongo
#include "replpair.h"
@@ -159,8 +159,8 @@ namespace mongo {
break;
{
dbtemprelease t;
- relinquishSyncingSome = 1;
- sleepmillis(1);
+ relinquishSyncingSome = 1;
+ sleepmillis(1);
}
}
if ( syncing ) {
@@ -206,7 +206,7 @@ namespace mongo {
return true;
}
} cmdForceDead;
-
+
/* operator requested resynchronization of replication (on the slave). { resync : 1 } */
class CmdResync : public Command {
public:
@@ -221,22 +221,28 @@ namespace mongo {
void help(stringstream&h) const { h << "resync (from scratch) an out of date replica slave.\nhttp://www.mongodb.org/display/DOCS/Master+Slave"; }
CmdResync() : Command("resync") { }
virtual bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl) {
+ if( cmdLine.usingReplSets() ) {
+ errmsg = "resync command not currently supported with replica sets. See RS102 info in the mongodb documentations";
+ result.append("info", "http://www.mongodb.org/display/DOCS/Resyncing+a+Very+Stale+Replica+Set+Member");
+ return false;
+ }
+
if ( cmdObj.getBoolField( "force" ) ) {
if ( !waitForSyncToFinish( errmsg ) )
return false;
replAllDead = "resync forced";
- }
+ }
if ( !replAllDead ) {
errmsg = "not dead, no need to resync";
return false;
}
if ( !waitForSyncToFinish( errmsg ) )
return false;
-
+
ReplSource::forceResyncDead( "client" );
result.append( "info", "triggered resync for all sources" );
- return true;
- }
+ return true;
+ }
bool waitForSyncToFinish( string &errmsg ) const {
// Wait for slave thread to finish syncing, so sources will be be
// reloaded with new saved state on next pass.
@@ -246,7 +252,7 @@ namespace mongo {
break;
{
dbtemprelease t;
- relinquishSyncingSome = 1;
+ relinquishSyncingSome = 1;
sleepmillis(1);
}
}
@@ -257,16 +263,31 @@ namespace mongo {
return true;
}
} cmdResync;
-
- bool anyReplEnabled(){
- return replPair || replSettings.slave || replSettings.master;
+
+ bool anyReplEnabled() {
+ return replPair || replSettings.slave || replSettings.master || theReplSet;
}
- void appendReplicationInfo( BSONObjBuilder& result , bool authed , int level ){
-
+ bool replAuthenticate(DBClientBase *conn);
+
+ void appendReplicationInfo( BSONObjBuilder& result , bool authed , int level ) {
+
+ if ( replSet ) {
+ if( theReplSet == 0 ) {
+ result.append("ismaster", false);
+ result.append("secondary", false);
+ result.append("info", ReplSet::startupStatusMsg);
+ result.append( "isreplicaset" , true );
+ return;
+ }
+
+ theReplSet->fillIsMaster(result);
+ return;
+ }
+
if ( replAllDead ) {
result.append("ismaster", 0);
- if( authed ) {
+ if( authed ) {
if ( replPair )
result.append("remote", replPair->remote);
}
@@ -285,25 +306,25 @@ namespace mongo {
result.appendBool("ismaster", _isMaster() );
}
- if ( level && replSet ){
+ if ( level && replSet ) {
result.append( "info" , "is replica set" );
}
- else if ( level ){
+ else if ( level ) {
BSONObjBuilder sources( result.subarrayStart( "sources" ) );
-
+
readlock lk( "local.sources" );
- Client::Context ctx( "local.sources" );
+ Client::Context ctx( "local.sources", dbpath, 0, authed );
shared_ptr<Cursor> c = findTableScan("local.sources", BSONObj());
int n = 0;
- while ( c->ok() ){
+ while ( c->ok() ) {
BSONObj s = c->current();
-
+
BSONObjBuilder bb;
bb.append( s["host"] );
string sourcename = s["source"].valuestr();
if ( sourcename != "main" )
bb.append( s["source"] );
-
+
{
BSONElement e = s["syncedTo"];
BSONObjBuilder t( bb.subobjStart( "syncedTo" ) );
@@ -311,23 +332,27 @@ namespace mongo {
t.append( "inc" , e.timestampInc() );
t.done();
}
-
- if ( level > 1 ){
+
+ if ( level > 1 ) {
dbtemprelease unlock;
+ // note: there is no so-style timeout on this connection; perhaps we should have one.
ScopedDbConnection conn( s["host"].valuestr() );
- BSONObj first = conn->findOne( (string)"local.oplog.$" + sourcename , Query().sort( BSON( "$natural" << 1 ) ) );
- BSONObj last = conn->findOne( (string)"local.oplog.$" + sourcename , Query().sort( BSON( "$natural" << -1 ) ) );
- bb.appendDate( "masterFirst" , first["ts"].timestampTime() );
- bb.appendDate( "masterLast" , last["ts"].timestampTime() );
- double lag = (double) (last["ts"].timestampTime() - s["syncedTo"].timestampTime());
- bb.append( "lagSeconds" , lag / 1000 );
+ DBClientConnection *cliConn = dynamic_cast< DBClientConnection* >( &conn.conn() );
+ if ( cliConn && replAuthenticate( cliConn ) ) {
+ BSONObj first = conn->findOne( (string)"local.oplog.$" + sourcename , Query().sort( BSON( "$natural" << 1 ) ) );
+ BSONObj last = conn->findOne( (string)"local.oplog.$" + sourcename , Query().sort( BSON( "$natural" << -1 ) ) );
+ bb.appendDate( "masterFirst" , first["ts"].timestampTime() );
+ bb.appendDate( "masterLast" , last["ts"].timestampTime() );
+ double lag = (double) (last["ts"].timestampTime() - s["syncedTo"].timestampTime());
+ bb.append( "lagSeconds" , lag / 1000 );
+ }
conn.done();
}
sources.append( BSONObjBuilder::numStr( n++ ) , bb.obj() );
c->advance();
}
-
+
sources.done();
}
}
@@ -345,26 +370,15 @@ namespace mongo {
virtual LockType locktype() const { return NONE; }
CmdIsMaster() : Command("isMaster", true, "ismaster") { }
virtual bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool /*fromRepl*/) {
- /* currently request to arbiter is (somewhat arbitrarily) an ismaster request that is not
- authenticated.
- we allow unauthenticated ismaster but we aren't as verbose informationally if
- one is not authenticated for admin db to be safe.
- */
-
- if( replSet ) {
- if( theReplSet == 0 ) {
- result.append("ismaster", false);
- result.append("secondary", false);
- errmsg = "replSet still trying to initialize";
- result.append("info", ReplSet::startupStatusMsg);
- return true;
- }
- theReplSet->fillIsMaster(result);
- return true;
- }
-
- bool authed = cc().getAuthenticationInfo()->isAuthorizedReads("admin");
+ /* currently request to arbiter is (somewhat arbitrarily) an ismaster request that is not
+ authenticated.
+ we allow unauthenticated ismaster but we aren't as verbose informationally if
+ one is not authenticated for admin db to be safe.
+ */
+ bool authed = cc().getAuthenticationInfo()->isAuthorizedReads("admin");
appendReplicationInfo( result , authed );
+
+ result.appendNumber("maxBsonObjectSize", BSONObjMaxUserSize);
return true;
}
} cmdismaster;
@@ -375,14 +389,14 @@ namespace mongo {
virtual bool slaveOk() const {
return true;
}
- virtual LockType locktype() const { return WRITE; }
+ virtual LockType locktype() const { return NONE; }
CmdIsInitialSyncComplete() : Command( "isinitialsynccomplete" ) {}
virtual bool run(const string&, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool /*fromRepl*/) {
result.appendBool( "initialsynccomplete", getInitialSyncCompleted() );
return true;
}
} cmdisinitialsynccomplete;
-
+
/* negotiate who is master
-1=not set (probably means we just booted)
@@ -482,7 +496,7 @@ namespace mongo {
return true;
}
} cmdnegotiatemaster;
-
+
int ReplPair::negotiate(DBClientConnection *conn, string method) {
BSONObjBuilder b;
b.append("negotiatemaster",1);
@@ -491,7 +505,7 @@ namespace mongo {
b.append("your_port", remotePort);
BSONObj cmd = b.done();
BSONObj res = conn->findOne("admin.$cmd", cmd);
- if ( ! res["ok"].trueValue() ){
+ if ( ! res["ok"].trueValue() ) {
string message = method + " negotiate failed";
problem() << message << ": " << res.toString() << '\n';
setMasterLocked(State_Confused, message.c_str());
@@ -503,7 +517,8 @@ namespace mongo {
// choose who is master.
if ( x != State_Slave && x != State_Master && x != State_Negotiating ) {
problem() << method << " negotiate: bad you_are value " << res.toString() << endl;
- } else if ( x != State_Negotiating ) {
+ }
+ else if ( x != State_Negotiating ) {
string message = method + " negotiation";
setMasterLocked(x, message.c_str());
}
@@ -542,8 +557,8 @@ namespace mongo {
break;
addDbNextPass.insert( e.fieldName() );
}
- }
-
+ }
+
dbsObj = o.getObjectField("incompleteCloneDbs");
if ( !dbsObj.isEmpty() ) {
BSONObjIterator i(dbsObj);
@@ -553,7 +568,7 @@ namespace mongo {
break;
incompleteCloneDbs.insert( e.fieldName() );
}
- }
+ }
_lastSavedLocalTs = OpTime( o.getField( "localLogTs" ).date() );
}
@@ -569,7 +584,7 @@ namespace mongo {
b.appendTimestamp("syncedTo", syncedTo.asDate());
b.appendTimestamp("localLogTs", _lastSavedLocalTs.asDate());
-
+
BSONObjBuilder dbsNextPassBuilder;
int n = 0;
for ( set<string>::iterator i = addDbNextPass.begin(); i != addDbNextPass.end(); i++ ) {
@@ -622,7 +637,7 @@ namespace mongo {
}
}
- static void addSourceToList(ReplSource::SourceVector &v, ReplSource& s, const BSONObj &spec, ReplSource::SourceVector &old) {
+ static void addSourceToList(ReplSource::SourceVector &v, ReplSource& s, ReplSource::SourceVector &old) {
if ( !s.syncedTo.isNull() ) { // Don't reuse old ReplSource if there was a forced resync.
for ( ReplSource::SourceVector::iterator i = old.begin(); i != old.end(); ) {
if ( s == **i ) {
@@ -684,11 +699,12 @@ namespace mongo {
else {
try {
massert( 10384 , "--only requires use of --source", cmdLine.only.empty());
- } catch ( ... ) {
+ }
+ catch ( ... ) {
dbexit( EXIT_BADOPTIONS );
}
}
-
+
if ( replPair ) {
const string &remote = replPair->remote;
// --pairwith host specified.
@@ -730,9 +746,9 @@ namespace mongo {
tmp.syncedTo = OpTime();
tmp.replacing = true;
}
- }
+ }
if ( ( !replPair && tmp.syncedTo.isNull() ) ||
- ( replPair && replSettings.fastsync ) ) {
+ ( replPair && replSettings.fastsync ) ) {
DBDirectClient c;
if ( c.exists( "local.oplog.$main" ) ) {
BSONObj op = c.findOne( "local.oplog.$main", QUERY( "op" << NE << "n" ).sort( BSON( "$natural" << -1 ) ) );
@@ -742,7 +758,7 @@ namespace mongo {
}
}
}
- addSourceToList(v, tmp, c->current(), old);
+ addSourceToList(v, tmp, old);
c->advance();
}
@@ -766,7 +782,7 @@ namespace mongo {
}
return false;
}
-
+
void ReplSource::forceResyncDead( const char *requester ) {
if ( !replAllDead )
return;
@@ -775,9 +791,9 @@ namespace mongo {
for( SourceVector::iterator i = sources.begin(); i != sources.end(); ++i ) {
(*i)->forceResync( requester );
}
- replAllDead = 0;
+ replAllDead = 0;
}
-
+
void ReplSource::forceResync( const char *requester ) {
BSONObj info;
{
@@ -800,7 +816,7 @@ namespace mongo {
}
}
}
- }
+ }
syncedTo = OpTime();
addDbNextPass.clear();
save();
@@ -812,7 +828,7 @@ namespace mongo {
dropDatabase(db);
return db;
}
-
+
/* grab initial copy of a database from the master */
bool ReplSource::resync(string db) {
string dummyNs = resyncDrop( db.c_str(), "internal" );
@@ -841,7 +857,7 @@ namespace mongo {
log() << "sync: caught user assertion " << e << " while applying op: " << op << endl;;
}
catch ( DBException& e ) {
- log() << "sync: caught db exception " << e << " while applying op: " << op << endl;;
+ log() << "sync: caught db exception " << e << " while applying op: " << op << endl;;
}
}
@@ -850,15 +866,17 @@ namespace mongo {
{ ts: ..., op: <optype>, ns: ..., o: <obj> , o2: <extraobj>, b: <boolflag> }
...
see logOp() comments.
+
+ @param alreadyLocked caller already put us in write lock if true
*/
- void ReplSource::sync_pullOpLog_applyOperation(BSONObj& op, OpTime *localLogTail) {
+ void ReplSource::sync_pullOpLog_applyOperation(BSONObj& op, OpTime *localLogTail, bool alreadyLocked) {
if( logLevel >= 6 ) // op.tostring is expensive so doing this check explicitly
log(6) << "processing op: " << op << endl;
if( op.getStringField("op")[0] == 'n' )
return;
- char clientName[MaxDatabaseLen];
+ char clientName[MaxDatabaseNameLen];
const char *ns = op.getStringField("ns");
nsToDatabase(ns, clientName);
@@ -867,22 +885,27 @@ namespace mongo {
return;
}
else if ( *ns == 0 ) {
- problem() << "halting replication, bad op in oplog:\n " << op.toString() << endl;
- replAllDead = "bad object in oplog";
- throw SyncException();
+ /*if( op.getStringField("op")[0] != 'n' )*/ {
+ problem() << "halting replication, bad op in oplog:\n " << op.toString() << endl;
+ replAllDead = "bad object in oplog";
+ throw SyncException();
+ }
+ //ns = "local.system.x";
+ //nsToDatabase(ns, clientName);
}
if ( !only.empty() && only != clientName )
return;
- if( cmdLine.pretouch ) {
+ if( cmdLine.pretouch && !alreadyLocked/*doesn't make sense if in write lock already*/ ) {
if( cmdLine.pretouch > 1 ) {
/* note: this is bad - should be put in ReplSource. but this is first test... */
static int countdown;
+ assert( countdown >= 0 );
if( countdown > 0 ) {
countdown--; // was pretouched on a prev pass
- assert( countdown >= 0 );
- } else {
+ }
+ else {
const int m = 4;
if( tp.get() == 0 ) {
int nthr = min(8, cmdLine.pretouch);
@@ -911,7 +934,7 @@ namespace mongo {
}
}
- dblock lk;
+ scoped_ptr<writelock> lk( alreadyLocked ? 0 : new writelock() );
if ( localLogTail && replPair && replPair->state == ReplPair::State_Master ) {
updateSetsWithLocalOps( *localLogTail, true ); // allow unlocking
@@ -923,7 +946,7 @@ namespace mongo {
log() << "replAllDead, throwing SyncException: " << replAllDead << endl;
throw SyncException();
}
-
+
Client::Context ctx( ns );
ctx.getClient()->curop()->reset();
@@ -932,14 +955,14 @@ namespace mongo {
if( logLevel >= 6 )
log(6) << "ns: " << ns << ", justCreated: " << ctx.justCreated() << ", empty: " << empty << ", incompleteClone: " << incompleteClone << endl;
-
+
// always apply admin command command
// this is a bit hacky -- the semantics of replication/commands aren't well specified
if ( strcmp( clientName, "admin" ) == 0 && *op.getStringField( "op" ) == 'c' ) {
applyOperation( op );
return;
}
-
+
if ( ctx.justCreated() || empty || incompleteClone ) {
// we must add to incomplete list now that setClient has been called
incompleteCloneDbs.insert( clientName );
@@ -950,7 +973,8 @@ namespace mongo {
clone 100 databases in one pass.)
*/
addDbNextPass.insert( clientName );
- } else {
+ }
+ else {
if ( incompleteClone ) {
log() << "An earlier initial clone of '" << clientName << "' did not complete, now resyncing." << endl;
}
@@ -962,21 +986,25 @@ namespace mongo {
incompleteCloneDbs.erase( clientName );
}
save();
- } else {
+ }
+ else {
bool mod;
if ( replPair && replPair->state == ReplPair::State_Master ) {
BSONObj id = idForOp( op, mod );
if ( !idTracker.haveId( ns, id ) ) {
- applyOperation( op );
- } else if ( idTracker.haveModId( ns, id ) ) {
+ applyOperation( op );
+ }
+ else if ( idTracker.haveModId( ns, id ) ) {
log( 6 ) << "skipping operation matching mod id object " << op << endl;
BSONObj existing;
if ( Helpers::findOne( ns, id, existing ) )
logOp( "i", ns, existing );
- } else {
+ }
+ else {
log( 6 ) << "skipping operation matching changed id object " << op << endl;
}
- } else {
+ }
+ else {
applyOperation( op );
}
addDbNextPass.erase( clientName );
@@ -988,33 +1016,33 @@ namespace mongo {
const char *opType = op.getStringField( "op" );
BSONObj o = op.getObjectField( "o" );
switch( opType[ 0 ] ) {
- case 'i': {
- BSONObjBuilder idBuilder;
- BSONElement id;
- if ( !o.getObjectID( id ) )
- return BSONObj();
- idBuilder.append( id );
- return idBuilder.obj();
- }
- case 'u': {
- BSONObj o2 = op.getObjectField( "o2" );
- if ( strcmp( o2.firstElement().fieldName(), "_id" ) != 0 )
- return BSONObj();
- if ( o.firstElement().fieldName()[ 0 ] == '$' )
- mod = true;
- return o2;
- }
- case 'd': {
- if ( opType[ 1 ] != '\0' )
- return BSONObj(); // skip "db" op type
- return o;
- }
- default:
- break;
- }
+ case 'i': {
+ BSONObjBuilder idBuilder;
+ BSONElement id;
+ if ( !o.getObjectID( id ) )
+ return BSONObj();
+ idBuilder.append( id );
+ return idBuilder.obj();
+ }
+ case 'u': {
+ BSONObj o2 = op.getObjectField( "o2" );
+ if ( strcmp( o2.firstElement().fieldName(), "_id" ) != 0 )
+ return BSONObj();
+ if ( o.firstElement().fieldName()[ 0 ] == '$' )
+ mod = true;
+ return o2;
+ }
+ case 'd': {
+ if ( opType[ 1 ] != '\0' )
+ return BSONObj(); // skip "db" op type
+ return o;
+ }
+ default:
+ break;
+ }
return BSONObj();
}
-
+
void ReplSource::updateSetsWithOp( const BSONObj &op, bool mayUnlock ) {
if ( mayUnlock ) {
idTracker.mayUpgradeStorage();
@@ -1029,42 +1057,42 @@ namespace mongo {
if ( mod )
idTracker.haveModId( ns, id, true );
idTracker.haveId( ns, id, true );
- }
+ }
}
-
+
void ReplSource::syncToTailOfRemoteLog() {
string _ns = ns();
BSONObjBuilder b;
if ( !only.empty() ) {
b.appendRegex("ns", string("^") + only);
- }
+ }
BSONObj last = oplogReader.findOne( _ns.c_str(), Query( b.done() ).sort( BSON( "$natural" << -1 ) ) );
if ( !last.isEmpty() ) {
BSONElement ts = last.getField( "ts" );
massert( 10386 , "non Date ts found: " + last.toString(), ts.type() == Date || ts.type() == Timestamp );
syncedTo = OpTime( ts.date() );
- }
+ }
}
-
+
OpTime ReplSource::nextLastSavedLocalTs() const {
Client::Context ctx( "local.oplog.$main" );
shared_ptr<Cursor> c = findTableScan( "local.oplog.$main", BSON( "$natural" << -1 ) );
if ( c->ok() )
- return OpTime( c->current().getField( "ts" ).date() );
+ return OpTime( c->current().getField( "ts" ).date() );
return OpTime();
}
-
+
void ReplSource::setLastSavedLocalTs( const OpTime &nextLocalTs ) {
_lastSavedLocalTs = nextLocalTs;
log( 3 ) << "updated _lastSavedLocalTs to: " << _lastSavedLocalTs << endl;
}
-
+
void ReplSource::resetSlave() {
log() << "**********************************************************\n";
log() << "Sending forcedead command to slave to stop its replication\n";
log() << "Host: " << hostName << " paired: " << paired << endl;
massert( 10387 , "request to kill slave replication failed",
- oplogReader.conn()->simpleCommand( "admin", 0, "forcedead" ) );
+ oplogReader.conn()->simpleCommand( "admin", 0, "forcedead" ) );
syncToTailOfRemoteLog();
{
dblock lk;
@@ -1073,7 +1101,7 @@ namespace mongo {
oplogReader.resetCursor();
}
}
-
+
bool ReplSource::updateSetsWithLocalOps( OpTime &localLogTail, bool mayUnlock ) {
Client::Context ctx( "local.oplog.$main" );
shared_ptr<Cursor> localLog = findTableScan( "local.oplog.$main", BSON( "$natural" << -1 ) );
@@ -1099,14 +1127,16 @@ namespace mongo {
dbtemprelease t;
resetSlave();
massert( 10388 , "local master log filled, forcing slave resync", false );
- }
+ }
if ( !newTail.isNull() )
localLogTail = newTail;
return true;
}
-
+
+ extern unsigned replApplyBatchSize;
+
/* slave: pull some data from the master's oplog
- note: not yet in db mutex at this point.
+ note: not yet in db mutex at this point.
@return -1 error
0 ok, don't sleep
1 ok, sleep
@@ -1126,7 +1156,7 @@ namespace mongo {
OpTime localLogTail = _lastSavedLocalTs;
bool initial = syncedTo.isNull();
-
+
if ( !oplogReader.haveCursor() || initial ) {
if ( initial ) {
// Important to grab last oplog timestamp before listing databases.
@@ -1152,13 +1182,13 @@ namespace mongo {
dblock lk;
save();
}
-
+
BSONObjBuilder q;
q.appendDate("$gte", syncedTo.asDate());
BSONObjBuilder query;
query.append("ts", q.done());
if ( !only.empty() ) {
- // note we may here skip a LOT of data table scanning, a lot of work for the master.
+ // note we may here skip a LOT of data table scanning, a lot of work for the master.
query.appendRegex("ns", string("^") + only); // maybe append "\\." here?
}
BSONObj queryObj = query.done();
@@ -1185,7 +1215,7 @@ namespace mongo {
b.append("ns", *i + '.');
b.append("op", "db");
BSONObj op = b.done();
- sync_pullOpLog_applyOperation(op, 0);
+ sync_pullOpLog_applyOperation(op, 0, false);
}
}
@@ -1195,7 +1225,8 @@ namespace mongo {
if( oplogReader.awaitCapable() )
okResultCode = 0; // don't sleep
- } else {
+ }
+ else {
log() << "repl: " << ns << " oplog is empty\n";
}
{
@@ -1207,11 +1238,11 @@ namespace mongo {
setLastSavedLocalTs( nextLastSaved );
}
}
- save();
+ save();
}
return okResultCode;
}
-
+
OpTime nextOpTime;
{
BSONObj op = oplogReader.next();
@@ -1234,32 +1265,31 @@ namespace mongo {
massert( 10391 , "repl: bad object read from remote oplog", false);
}
}
-
+
if ( replPair && replPair->state == ReplPair::State_Master ) {
-
+
OpTime next( ts.date() );
if ( !tailing && !initial && next != syncedTo ) {
log() << "remote slave log filled, forcing slave resync" << endl;
resetSlave();
return 1;
- }
-
+ }
+
dblock lk;
updateSetsWithLocalOps( localLogTail, true );
}
-
+
nextOpTime = OpTime( ts.date() );
log(2) << "repl: first op time received: " << nextOpTime.toString() << '\n';
- if ( tailing || initial ) {
- if ( initial )
- log(1) << "repl: initial run\n";
- else {
- if( !( syncedTo <= nextOpTime ) ) {
- log() << "repl ASSERTION failed : syncedTo <= nextOpTime" << endl;
- log() << "repl syncTo: " << syncedTo.toStringLong() << endl;
- log() << "repl nextOpTime: " << nextOpTime.toStringLong() << endl;
- assert(false);
- }
+ if ( initial ) {
+ log(1) << "repl: initial run\n";
+ }
+ if( tailing ) {
+ if( !( syncedTo < nextOpTime ) ) {
+ log() << "repl ASSERTION failed : syncedTo < nextOpTime" << endl;
+ log() << "repl syncTo: " << syncedTo.toStringLong() << endl;
+ log() << "repl nextOpTime: " << nextOpTime.toStringLong() << endl;
+ assert(false);
}
oplogReader.putBack( op ); // op will be processed in the loop below
nextOpTime = OpTime(); // will reread the op below
@@ -1281,14 +1311,14 @@ namespace mongo {
throw SyncException();
}
else {
- /* t == syncedTo, so the first op was applied previously. */
+ /* t == syncedTo, so the first op was applied previously or it is the first op of initial query and need not be applied. */
}
}
// apply operations
{
int n = 0;
- time_t saveLast = time(0);
+ time_t saveLast = time(0);
while ( 1 ) {
/* from a.s.:
I think the idea here is that we can establish a sync point between the local op log and the remote log with the following steps:
@@ -1316,7 +1346,8 @@ namespace mongo {
if ( getInitialSyncCompleted() ) { // if initial sync hasn't completed, break out of loop so we can set to completed or clone more dbs
continue;
}
- } else {
+ }
+ else {
setLastSavedLocalTs( nextLastSaved );
}
}
@@ -1332,109 +1363,132 @@ namespace mongo {
else {
}
- OCCASIONALLY if( n > 0 && ( n > 100000 || time(0) - saveLast > 60 ) ) {
- // periodically note our progress, in case we are doing a lot of work and crash
- dblock lk;
+ OCCASIONALLY if( n > 0 && ( n > 100000 || time(0) - saveLast > 60 ) ) {
+ // periodically note our progress, in case we are doing a lot of work and crash
+ dblock lk;
syncedTo = nextOpTime;
// can't update local log ts since there are pending operations from our peer
- save();
+ save();
log() << "repl: checkpoint applied " << n << " operations" << endl;
log() << "repl: syncedTo: " << syncedTo.toStringLong() << endl;
- saveLast = time(0);
- n = 0;
- }
+ saveLast = time(0);
+ n = 0;
+ }
BSONObj op = oplogReader.next();
- BSONElement ts = op.getField("ts");
- if( !( ts.type() == Date || ts.type() == Timestamp ) ) {
- log() << "sync error: problem querying remote oplog record\n";
- log() << "op: " << op.toString() << '\n';
- log() << "halting replication" << endl;
- replInfo = replAllDead = "sync error: no ts found querying remote oplog record";
- throw SyncException();
- }
- OpTime last = nextOpTime;
- nextOpTime = OpTime( ts.date() );
- if ( !( last < nextOpTime ) ) {
- log() << "sync error: last applied optime at slave >= nextOpTime from master" << endl;
- log() << " last: " << last.toStringLong() << '\n';
- log() << " nextOpTime: " << nextOpTime.toStringLong() << '\n';
- log() << " halting replication" << endl;
- replInfo = replAllDead = "sync error last >= nextOpTime";
- uassert( 10123 , "replication error last applied optime at slave >= nextOpTime from master", false);
- }
- if ( replSettings.slavedelay && ( unsigned( time( 0 ) ) < nextOpTime.getSecs() + replSettings.slavedelay ) ) {
- oplogReader.putBack( op );
- _sleepAdviceTime = nextOpTime.getSecs() + replSettings.slavedelay + 1;
- dblock lk;
- if ( n > 0 ) {
- syncedTo = last;
- save();
+
+ unsigned b = replApplyBatchSize;
+ bool justOne = b == 1;
+ scoped_ptr<writelock> lk( justOne ? 0 : new writelock() );
+ while( 1 ) {
+
+ BSONElement ts = op.getField("ts");
+ if( !( ts.type() == Date || ts.type() == Timestamp ) ) {
+ log() << "sync error: problem querying remote oplog record" << endl;
+ log() << "op: " << op.toString() << endl;
+ log() << "halting replication" << endl;
+ replInfo = replAllDead = "sync error: no ts found querying remote oplog record";
+ throw SyncException();
+ }
+ OpTime last = nextOpTime;
+ nextOpTime = OpTime( ts.date() );
+ if ( !( last < nextOpTime ) ) {
+ log() << "sync error: last applied optime at slave >= nextOpTime from master" << endl;
+ log() << " last: " << last.toStringLong() << endl;
+ log() << " nextOpTime: " << nextOpTime.toStringLong() << endl;
+ log() << " halting replication" << endl;
+ replInfo = replAllDead = "sync error last >= nextOpTime";
+ uassert( 10123 , "replication error last applied optime at slave >= nextOpTime from master", false);
+ }
+ if ( replSettings.slavedelay && ( unsigned( time( 0 ) ) < nextOpTime.getSecs() + replSettings.slavedelay ) ) {
+ assert( justOne );
+ oplogReader.putBack( op );
+ _sleepAdviceTime = nextOpTime.getSecs() + replSettings.slavedelay + 1;
+ dblock lk;
+ if ( n > 0 ) {
+ syncedTo = last;
+ save();
+ }
+ log() << "repl: applied " << n << " operations" << endl;
+ log() << "repl: syncedTo: " << syncedTo.toStringLong() << endl;
+ log() << "waiting until: " << _sleepAdviceTime << " to continue" << endl;
+ return okResultCode;
}
- log() << "repl: applied " << n << " operations" << endl;
- log() << "repl: syncedTo: " << syncedTo.toStringLong() << endl;
- log() << "waiting until: " << _sleepAdviceTime << " to continue" << endl;
- break;
- }
- sync_pullOpLog_applyOperation(op, &localLogTail);
- n++;
+ sync_pullOpLog_applyOperation(op, &localLogTail, !justOne);
+ n++;
+
+ if( --b == 0 )
+ break;
+ // if to here, we are doing mulpile applications in a singel write lock acquisition
+ if( !oplogReader.moreInCurrentBatch() ) {
+ // break if no more in batch so we release lock while reading from the master
+ break;
+ }
+ op = oplogReader.next();
+
+ getDur().commitIfNeeded();
+ }
}
}
return okResultCode;
}
- BSONObj userReplQuery = fromjson("{\"user\":\"repl\"}");
-
- bool replAuthenticate(DBClientConnection *conn) {
- if( ! cc().isAdmin() ){
- log() << "replauthenticate: requires admin permissions, failing\n";
- return false;
- }
-
- BSONObj user;
- {
- dblock lk;
- Client::Context ctxt("local.");
- if( !Helpers::findOne("local.system.users", userReplQuery, user) ) {
- // try the first user is local
- if( !Helpers::getSingleton("local.system.users", user) ) {
- if( noauth )
- return true; // presumably we are running a --noauth setup all around.
-
- log() << "replauthenticate: no user in local.system.users to use for authentication\n";
- return false;
- }
- }
-
- }
-
- string u = user.getStringField("user");
- string p = user.getStringField("pwd");
- massert( 10392 , "bad user object? [1]", !u.empty());
- massert( 10393 , "bad user object? [2]", !p.empty());
- string err;
- if( !conn->auth("local", u.c_str(), p.c_str(), err, false) ) {
- log() << "replauthenticate: can't authenticate to master server, user:" << u << endl;
- return false;
- }
- return true;
- }
+ BSONObj userReplQuery = fromjson("{\"user\":\"repl\"}");
+
+ bool replAuthenticate(DBClientBase *conn) {
+ if( ! cc().isAdmin() ) {
+ log() << "replauthenticate: requires admin permissions, failing\n";
+ return false;
+ }
+
+ string u;
+ string p;
+ if (internalSecurity.pwd.length() > 0) {
+ u = internalSecurity.user;
+ p = internalSecurity.pwd;
+ }
+ else {
+ BSONObj user;
+ {
+ dblock lk;
+ Client::Context ctxt("local.");
+ if( !Helpers::findOne("local.system.users", userReplQuery, user) ||
+ // try the first user in local
+ !Helpers::getSingleton("local.system.users", user) ) {
+ log() << "replauthenticate: no user in local.system.users to use for authentication\n";
+ return noauth;
+ }
+ }
+ u = user.getStringField("user");
+ p = user.getStringField("pwd");
+ massert( 10392 , "bad user object? [1]", !u.empty());
+ massert( 10393 , "bad user object? [2]", !p.empty());
+ }
+
+ string err;
+ if( !conn->auth("local", u.c_str(), p.c_str(), err, false) ) {
+ log() << "replauthenticate: can't authenticate to master server, user:" << u << endl;
+ return false;
+ }
+ return true;
+ }
bool replHandshake(DBClientConnection *conn) {
-
+
BSONObj me;
{
dblock l;
- if ( ! Helpers::getSingleton( "local.me" , me ) ){
+ // local.me is an identifier for a server for getLastError w:2+
+ if ( ! Helpers::getSingleton( "local.me" , me ) ) {
BSONObjBuilder b;
b.appendOID( "_id" , 0 , true );
me = b.obj();
Helpers::putSingleton( "local.me" , me );
}
}
-
+
BSONObjBuilder cmd;
cmd.appendAs( me["_id"] , "handshake" );
@@ -1450,9 +1504,9 @@ namespace mongo {
_conn = auto_ptr<DBClientConnection>(new DBClientConnection( false, 0, replPair ? 20 : 0 /* tcp timeout */));
string errmsg;
ReplInfo r("trying to connect to sync source");
- if ( !_conn->connect(hostName.c_str(), errmsg) ||
- !replAuthenticate(_conn.get()) ||
- !replHandshake(_conn.get()) ) {
+ if ( !_conn->connect(hostName.c_str(), errmsg) ||
+ (!noauth && !replAuthenticate(_conn.get())) ||
+ !replHandshake(_conn.get()) ) {
resetConnection();
log() << "repl: " << errmsg << endl;
return false;
@@ -1460,7 +1514,7 @@ namespace mongo {
}
return true;
}
-
+
/* note: not yet in mutex at this point.
returns >= 0 if ok. return -1 if you want to reconnect.
return value of zero indicates no sleep necessary before next call
@@ -1486,14 +1540,14 @@ namespace mongo {
}
if ( !oplogReader.connect(hostName) ) {
- log(4) << "repl: can't connect to sync source" << endl;
+ log(4) << "repl: can't connect to sync source" << endl;
if ( replPair && paired ) {
assert( startsWith(hostName.c_str(), replPair->remoteHost.c_str()) );
replPair->arbitrate();
}
return -1;
}
-
+
if ( paired ) {
int remote = replPair->negotiate(oplogReader.conn(), "direct");
int nMasters = ( remote == ReplPair::State_Master ) + ( replPair->state == ReplPair::State_Master );
@@ -1504,17 +1558,17 @@ namespace mongo {
}
/*
- // get current mtime at the server.
- BSONObj o = conn->findOne("admin.$cmd", opTimeQuery);
- BSONElement e = o.getField("optime");
- if( e.eoo() ) {
- log() << "repl: failed to get cur optime from master" << endl;
- log() << " " << o.toString() << endl;
- return false;
- }
- uassert( 10124 , e.type() == Date );
- OpTime serverCurTime;
- serverCurTime.asDate() = e.date();
+ // get current mtime at the server.
+ BSONObj o = conn->findOne("admin.$cmd", opTimeQuery);
+ BSONElement e = o.getField("optime");
+ if( e.eoo() ) {
+ log() << "repl: failed to get cur optime from master" << endl;
+ log() << " " << o.toString() << endl;
+ return false;
+ }
+ uassert( 10124 , e.type() == Date );
+ OpTime serverCurTime;
+ serverCurTime.asDate() = e.date();
*/
return sync_pullOpLog(nApplied);
}
@@ -1527,7 +1581,7 @@ namespace mongo {
_ reuse that cursor when we can
*/
- /* returns: # of seconds to sleep before next pass
+ /* returns: # of seconds to sleep before next pass
0 = no sleep recommended
1 = special sentinel indicating adaptive sleep recommended
*/
@@ -1543,6 +1597,7 @@ namespace mongo {
/* replication is not configured yet (for --slave) in local.sources. Poll for config it
every 20 seconds.
*/
+ log() << "no source given, add a master to local.sources to start replication" << endl;
return 20;
}
@@ -1553,7 +1608,7 @@ namespace mongo {
try {
res = s->sync(nApplied);
bool moreToSync = s->haveMoreDbsToSync();
- if( res < 0 ) {
+ if( res < 0 ) {
sleepAdvice = 3;
}
else if( moreToSync ) {
@@ -1562,7 +1617,7 @@ namespace mongo {
else if ( s->sleepAdvice() ) {
sleepAdvice = s->sleepAdvice();
}
- else
+ else
sleepAdvice = res;
if ( res >= 0 && !moreToSync /*&& !s->syncedTo.isNull()*/ ) {
pairSync->setInitialSyncCompletedLocking();
@@ -1588,9 +1643,9 @@ namespace mongo {
}
catch ( const std::exception &e ) {
log() << "repl: std::exception " << e.what() << endl;
- replInfo = "replMain caught std::exception";
+ replInfo = "replMain caught std::exception";
}
- catch ( ... ) {
+ catch ( ... ) {
log() << "unexpected exception during replication. replication will halt" << endl;
replAllDead = "caught unexpected exception during replication";
}
@@ -1616,15 +1671,16 @@ namespace mongo {
try {
int nApplied = 0;
s = _replMain(sources, nApplied);
- if( s == 1 ) {
+ if( s == 1 ) {
if( nApplied == 0 ) s = 2;
- else if( nApplied > 100 ) {
+ else if( nApplied > 100 ) {
// sleep very little - just enought that we aren't truly hammering master
sleepmillis(75);
s = 0;
}
}
- } catch (...) {
+ }
+ catch (...) {
out() << "caught exception in _replMain" << endl;
s = 4;
}
@@ -1634,10 +1690,10 @@ namespace mongo {
syncing--;
}
- if( relinquishSyncingSome ) {
- relinquishSyncingSome = 0;
- s = 1; // sleep before going back in to syncing=1
- }
+ if( relinquishSyncingSome ) {
+ relinquishSyncingSome = 0;
+ s = 1; // sleep before going back in to syncing=1
+ }
if ( s ) {
stringstream ss;
@@ -1660,21 +1716,21 @@ namespace mongo {
while( 1 ) {
sleepsecs( toSleep );
- /* write a keep-alive like entry to the log. this will make things like
+ /* write a keep-alive like entry to the log. this will make things like
printReplicationStatus() and printSlaveReplicationStatus() stay up-to-date
even when things are idle.
*/
{
writelocktry lk("",1);
- if ( lk.got() ){
+ if ( lk.got() ) {
toSleep = 10;
-
- cc().getAuthenticationInfo()->authorize("admin");
-
- try {
+
+ cc().getAuthenticationInfo()->authorize("admin");
+
+ try {
logKeepalive();
}
- catch(...) {
+ catch(...) {
log() << "caught exception in replMasterThread()" << endl;
}
}
@@ -1690,11 +1746,11 @@ namespace mongo {
sleepsecs(1);
Client::initThread("replslave");
cc().iAmSyncThread();
-
+
{
dblock lk;
cc().getAuthenticationInfo()->authorize("admin");
-
+
BSONObj obj;
if ( Helpers::getSingleton("local.pair.startup", obj) ) {
// should be: {replacepeer:1}
@@ -1730,12 +1786,11 @@ namespace mongo {
void startReplication() {
/* if we are going to be a replica set, we aren't doing other forms of replication. */
if( !cmdLine._replSet.empty() ) {
- if( replSettings.slave || replSettings.master || replPair ) {
+ if( replSettings.slave || replSettings.master || replPair ) {
log() << "***" << endl;
log() << "ERROR: can't use --slave or --master replication options with --replSet" << endl;
log() << "***" << endl;
}
- createOplog();
newRepl();
return;
}
@@ -1773,7 +1828,7 @@ namespace mongo {
createOplog();
boost::thread t(replMasterThread);
}
-
+
while( replSettings.fastsync ) // don't allow writes until we've set up from log
sleepmillis( 50 );
}
@@ -1807,5 +1862,29 @@ namespace mongo {
}
tp.join();
}
-
+
+ class ReplApplyBatchSizeValidator : public ParameterValidator {
+ public:
+ ReplApplyBatchSizeValidator() : ParameterValidator( "replApplyBatchSize" ) {}
+
+ virtual bool isValid( BSONElement e , string& errmsg ) {
+ int b = e.numberInt();
+ if( b < 1 || b > 1024 ) {
+ errmsg = "replApplyBatchSize has to be >= 1 and < 1024";
+ return false;
+ }
+
+ if ( replSettings.slavedelay != 0 && b > 1 ) {
+ errmsg = "can't use a batch size > 1 with slavedelay";
+ return false;
+ }
+ if ( ! replSettings.slave ) {
+ errmsg = "can't set replApplyBatchSize on a non-slave machine";
+ return false;
+ }
+
+ return true;
+ }
+ } replApplyBatchSizeValidator;
+
} // namespace mongo