diff options
author | Antonin Kral <a.kral@bobek.cz> | 2010-03-25 19:21:32 +0100 |
---|---|---|
committer | Antonin Kral <a.kral@bobek.cz> | 2010-03-25 19:21:32 +0100 |
commit | 0ca01a91ae0a3562e54c226e7b9512feb2ea83d0 (patch) | |
tree | 2b3886e435b0217d6afd63a213b04d32bb4b4f6f /db/repl.h | |
parent | a696359b248adef0cc8576fce3f473535e995136 (diff) | |
download | mongodb-0ca01a91ae0a3562e54c226e7b9512feb2ea83d0.tar.gz |
Imported Upstream version 1.4.0
Diffstat (limited to 'db/repl.h')
-rw-r--r-- | db/repl.h | 196 |
1 files changed, 181 insertions, 15 deletions
@@ -32,6 +32,7 @@ #include "db.h" #include "dbhelpers.h" #include "query.h" +#include "queryoptimizer.h" #include "../client/dbclient.h" @@ -46,14 +47,31 @@ namespace mongo { --slave cmd line setting -> SimpleSlave */ typedef enum { NotSlave=0, SimpleSlave, ReplPairSlave } SlaveTypes; - extern SlaveTypes slave; - /* true means we are master and doing replication. if we are not writing to oplog (no --master or repl pairing), - this won't be true. - */ - extern bool master; + class ReplSettings { + public: + SlaveTypes slave; + + /* true means we are master and doing replication. if we are not writing to oplog (no --master or repl pairing), + this won't be true. + */ + bool master; + + int opIdMem; + + bool fastsync; + + bool autoresync; + + int slavedelay; + + ReplSettings() + : slave(NotSlave) , master(false) , opIdMem(100000000) , fastsync() , autoresync(false), slavedelay() { + } + + }; - extern int opIdMem; + extern ReplSettings replSettings; bool cloneFrom(const char *masterHost, string& errmsg, const string& fromdb, bool logForReplication, bool slaveOk, bool useReplAuth, bool snapshot); @@ -115,6 +133,7 @@ namespace mongo { // returns false if the slave has been reset bool updateSetsWithLocalOps( OpTime &localLogTail, bool mayUnlock ); string ns() const { return string( "local.oplog.$" ) + sourceName(); } + unsigned _sleepAdviceTime; public: static void applyOperation(const BSONObj& op); @@ -131,11 +150,11 @@ namespace mongo { OpTime syncedTo; /* This is for repl pairs. - _lastSavedLocalTs is the most recent point in the local log that we know is consistent
- with the remote log ( ie say the local op log has entries ABCDE and the remote op log
- has ABCXY, then _lastSavedLocalTs won't be greater than C until we have reconciled
- the DE-XY difference.)
- */
+ _lastSavedLocalTs is the most recent point in the local log that we know is consistent + with the remote log ( ie say the local op log has entries ABCDE and the remote op log + has ABCXY, then _lastSavedLocalTs won't be greater than C until we have reconciled + the DE-XY difference.) + */ OpTime _lastSavedLocalTs; int nClonedThisPass; @@ -160,7 +179,13 @@ namespace mongo { operator string() const { return sourceName() + "@" + hostName; } bool haveMoreDbsToSync() const { return !addDbNextPass.empty(); } - + int sleepAdvice() const { + if ( !_sleepAdviceTime ) + return 0; + int wait = _sleepAdviceTime - unsigned( time( 0 ) ); + return wait > 0 ? wait : 0; + } + static bool throttledForceResyncDead( const char *requester ); static void forceResyncDead( const char *requester ); void forceResync( const char *requester ); @@ -173,7 +198,6 @@ namespace mongo { "c" db cmd "db" declares presence of a database (ns is set to the db name + '.') */ - void _logOp(const char *opstr, const char *ns, const char *logNs, const BSONObj& obj, BSONObj *patt, bool *b, const OpTime &ts); void logOp(const char *opstr, const char *ns, const BSONObj& obj, BSONObj *patt = 0, bool *b = 0); // class for managing a set of ids in memory @@ -239,9 +263,9 @@ namespace mongo { dbIds_( "local.temp.replIds" ), dbModIds_( "local.temp.replModIds" ), inMem_( true ), - maxMem_( opIdMem ) { + maxMem_( replSettings.opIdMem ) { } - void reset( int maxMem = opIdMem ) { + void reset( int maxMem = replSettings.opIdMem ) { memIds_.reset(); memModIds_.reset(); dbIds_.reset(); @@ -312,4 +336,146 @@ namespace mongo { int maxMem_; }; + bool anyReplEnabled(); + void appendReplicationInfo( BSONObjBuilder& result , bool authed , int level = 0 ); + + void replCheckCloseDatabase( Database * db ); + + extern int __findingStartInitialTimeout; // configurable for testing + + class FindingStartCursor { + public: + FindingStartCursor( const QueryPlan & qp ) : + _qp( qp ), + _findingStart( true ), + _findingStartMode(), + _findingStartTimer( 0 ), + _findingStartCursor( 0 ) + { init(); } + bool done() const { return !_findingStart; } + auto_ptr< Cursor > cRelease() { return _c; } + void next() { + if ( !_findingStartCursor || !_findingStartCursor->c->ok() ) { + _findingStart = false; + _c = _qp.newCursor(); // on error, start from beginning + destroyClientCursor(); + return; + } + switch( _findingStartMode ) { + case Initial: { + if ( !_matcher->matches( _findingStartCursor->c->currKey(), _findingStartCursor->c->currLoc() ) ) { + _findingStart = false; // found first record out of query range, so scan normally + _c = _qp.newCursor( _findingStartCursor->c->currLoc() ); + destroyClientCursor(); + return; + } + _findingStartCursor->c->advance(); + RARELY { + if ( _findingStartTimer.seconds() >= __findingStartInitialTimeout ) { + createClientCursor( startLoc( _findingStartCursor->c->currLoc() ) ); + _findingStartMode = FindExtent; + return; + } + } + maybeRelease(); + return; + } + case FindExtent: { + if ( !_matcher->matches( _findingStartCursor->c->currKey(), _findingStartCursor->c->currLoc() ) ) { + _findingStartMode = InExtent; + return; + } + DiskLoc prev = prevLoc( _findingStartCursor->c->currLoc() ); + if ( prev.isNull() ) { // hit beginning, so start scanning from here + createClientCursor(); + _findingStartMode = InExtent; + return; + } + // There might be a more efficient implementation than creating new cursor & client cursor each time, + // not worrying about that for now + createClientCursor( prev ); + maybeRelease(); + return; + } + case InExtent: { + if ( _matcher->matches( _findingStartCursor->c->currKey(), _findingStartCursor->c->currLoc() ) ) { + _findingStart = false; // found first record in query range, so scan normally + _c = _qp.newCursor( _findingStartCursor->c->currLoc() ); + destroyClientCursor(); + return; + } + _findingStartCursor->c->advance(); + maybeRelease(); + return; + } + default: { + massert( 12600, "invalid _findingStartMode", false ); + } + } + } + private: + enum FindingStartMode { Initial, FindExtent, InExtent }; + const QueryPlan &_qp; + bool _findingStart; + FindingStartMode _findingStartMode; + auto_ptr< CoveredIndexMatcher > _matcher; + Timer _findingStartTimer; + ClientCursor * _findingStartCursor; + auto_ptr< Cursor > _c; + DiskLoc startLoc( const DiskLoc &rec ) { + Extent *e = rec.rec()->myExtent( rec ); + if ( e->myLoc != _qp.nsd()->capExtent ) + return e->firstRecord; + // Likely we are on the fresh side of capExtent, so return first fresh record. + // If we are on the stale side of capExtent, then the collection is small and it + // doesn't matter if we start the extent scan with capFirstNewRecord. + return _qp.nsd()->capFirstNewRecord; + } + + DiskLoc prevLoc( const DiskLoc &rec ) { + Extent *e = rec.rec()->myExtent( rec ); + if ( e->xprev.isNull() ) + e = _qp.nsd()->lastExtent.ext(); + else + e = e->xprev.ext(); + if ( e->myLoc != _qp.nsd()->capExtent ) + return e->firstRecord; + return DiskLoc(); // reached beginning of collection + } + void createClientCursor( const DiskLoc &startLoc = DiskLoc() ) { + auto_ptr<Cursor> c = _qp.newCursor( startLoc ); + _findingStartCursor = new ClientCursor(c, _qp.ns(), false); + } + void destroyClientCursor() { + if ( _findingStartCursor ) { + ClientCursor::erase( _findingStartCursor->cursorid ); + _findingStartCursor = 0; + } + } + void maybeRelease() { + RARELY { + CursorId id = _findingStartCursor->cursorid; + _findingStartCursor->updateLocation(); + { + dbtemprelease t; + } + _findingStartCursor = ClientCursor::find( id, false ); + } + } + void init() { + // Use a ClientCursor here so we can release db mutex while scanning + // oplog (can take quite a while with large oplogs). + auto_ptr<Cursor> c = _qp.newReverseCursor(); + _findingStartCursor = new ClientCursor(c, _qp.ns(), false); + _findingStartTimer.reset(); + _findingStartMode = Initial; + BSONElement tsElt = _qp.query()[ "ts" ]; + massert( 13044, "no ts field in query", !tsElt.eoo() ); + BSONObjBuilder b; + b.append( tsElt ); + BSONObj tsQuery = b.obj(); + _matcher.reset(new CoveredIndexMatcher(tsQuery, _qp.indexKey())); + } + }; + } // namespace mongo |