diff options
Diffstat (limited to 'db/repl.h')
-rw-r--r-- | db/repl.h | 191 |
1 files changed, 15 insertions, 176 deletions
@@ -23,7 +23,6 @@ at the master: local.oplog.$<source> - local.oplog.$main is the default */ #pragma once @@ -33,16 +32,14 @@ #include "dbhelpers.h" #include "query.h" #include "queryoptimizer.h" - #include "../client/dbclient.h" - #include "../util/optime.h" +#include "oplog.h" +#include "../util/concurrency/thread_pool.h" +#include "oplogreader.h" namespace mongo { - class DBClientConnection; - class DBClientCursor; - /* replication slave? (possibly with slave or repl pair nonmaster) --slave cmd line setting -> SimpleSlave */ @@ -79,10 +76,9 @@ namespace mongo { /* A replication exception */ class SyncException : public DBException { public: - virtual const char* what() const throw() { return "sync exception"; } - virtual int getCode(){ return 10001; } + SyncException() : DBException( "sync exception" , 10001 ){} }; - + /* A Source is a source from which we can pull (replicate) data. stored in collection local.sources. @@ -94,16 +90,15 @@ namespace mongo { not done (always use main for now). */ class ReplSource { + auto_ptr<ThreadPool> tp; + bool resync(string db); /* pull some operations from the master's oplog, and apply them. */ - bool sync_pullOpLog(int& nApplied); + int sync_pullOpLog(int& nApplied); void sync_pullOpLog_applyOperation(BSONObj& op, OpTime *localLogTail); - auto_ptr<DBClientConnection> conn; - auto_ptr<DBClientCursor> cursor; - /* we only clone one database per pass, even if a lot need done. This helps us avoid overflowing the master's transaction log by doing too much work before going back to read more transactions. (Imagine a scenario of slave startup where we try to @@ -117,8 +112,6 @@ namespace mongo { // returns the dummy ns used to do the drop string resyncDrop( const char *db, const char *requester ); - // returns true if connected on return - bool connect(); // returns possibly unowned id spec for the operation. static BSONObj idForOp( const BSONObj &op, bool &mod ); static void updateSetsWithOp( const BSONObj &op, bool mayUpdateStorage ); @@ -136,6 +129,8 @@ namespace mongo { unsigned _sleepAdviceTime; public: + OplogReader oplogReader; + static void applyOperation(const BSONObj& op); bool replacing; // in "replace mode" -- see CmdReplacePeer bool paired; // --pair in use @@ -162,12 +157,11 @@ namespace mongo { typedef vector< shared_ptr< ReplSource > > SourceVector; static void loadAll(SourceVector&); explicit ReplSource(BSONObj); - bool sync(int& nApplied); + + /* -1 = error */ + int sync(int& nApplied); + void save(); // write ourself to local.sources - void resetConnection() { - cursor = auto_ptr<DBClientCursor>(0); - conn = auto_ptr<DBClientConnection>(0); - } // make a jsobj from our member fields of the form // { host: ..., source: ..., syncedTo: ... } @@ -176,7 +170,7 @@ namespace mongo { bool operator==(const ReplSource&r) const { return hostName == r.hostName && sourceName() == r.sourceName(); } - operator string() const { return sourceName() + "@" + hostName; } + string toString() const { return sourceName() + "@" + hostName; } bool haveMoreDbsToSync() const { return !addDbNextPass.empty(); } int sleepAdvice() const { @@ -191,15 +185,6 @@ namespace mongo { void forceResync( const char *requester ); }; - /* Write operation to the log (local.oplog.$main) - "i" insert - "u" update - "d" delete - "c" db cmd - "db" declares presence of a database (ns is set to the db name + '.') - */ - void logOp(const char *opstr, const char *ns, const BSONObj& obj, BSONObj *patt = 0, bool *b = 0); - // class for managing a set of ids in memory class MemIds { public: @@ -342,151 +327,5 @@ namespace mongo { bool anyReplEnabled(); void appendReplicationInfo( BSONObjBuilder& result , bool authed , int level = 0 ); - void replCheckCloseDatabase( Database * db ); - extern int __findingStartInitialTimeout; // configurable for testing - - class FindingStartCursor { - public: - FindingStartCursor( const QueryPlan & qp ) : - _qp( qp ), - _findingStart( true ), - _findingStartMode(), - _findingStartTimer( 0 ), - _findingStartCursor( 0 ) - { init(); } - bool done() const { return !_findingStart; } - auto_ptr< Cursor > cRelease() { return _c; } - void next() { - if ( !_findingStartCursor || !_findingStartCursor->c->ok() ) { - _findingStart = false; - _c = _qp.newCursor(); // on error, start from beginning - destroyClientCursor(); - return; - } - switch( _findingStartMode ) { - case Initial: { - if ( !_matcher->matches( _findingStartCursor->c->currKey(), _findingStartCursor->c->currLoc() ) ) { - _findingStart = false; // found first record out of query range, so scan normally - _c = _qp.newCursor( _findingStartCursor->c->currLoc() ); - destroyClientCursor(); - return; - } - _findingStartCursor->c->advance(); - RARELY { - if ( _findingStartTimer.seconds() >= __findingStartInitialTimeout ) { - createClientCursor( startLoc( _findingStartCursor->c->currLoc() ) ); - _findingStartMode = FindExtent; - return; - } - } - maybeRelease(); - return; - } - case FindExtent: { - if ( !_matcher->matches( _findingStartCursor->c->currKey(), _findingStartCursor->c->currLoc() ) ) { - _findingStartMode = InExtent; - return; - } - DiskLoc prev = prevLoc( _findingStartCursor->c->currLoc() ); - if ( prev.isNull() ) { // hit beginning, so start scanning from here - createClientCursor(); - _findingStartMode = InExtent; - return; - } - // There might be a more efficient implementation than creating new cursor & client cursor each time, - // not worrying about that for now - createClientCursor( prev ); - maybeRelease(); - return; - } - case InExtent: { - if ( _matcher->matches( _findingStartCursor->c->currKey(), _findingStartCursor->c->currLoc() ) ) { - _findingStart = false; // found first record in query range, so scan normally - _c = _qp.newCursor( _findingStartCursor->c->currLoc() ); - destroyClientCursor(); - return; - } - _findingStartCursor->c->advance(); - maybeRelease(); - return; - } - default: { - massert( 12600, "invalid _findingStartMode", false ); - } - } - } - private: - enum FindingStartMode { Initial, FindExtent, InExtent }; - const QueryPlan &_qp; - bool _findingStart; - FindingStartMode _findingStartMode; - auto_ptr< CoveredIndexMatcher > _matcher; - Timer _findingStartTimer; - ClientCursor * _findingStartCursor; - auto_ptr< Cursor > _c; - DiskLoc startLoc( const DiskLoc &rec ) { - Extent *e = rec.rec()->myExtent( rec ); - if ( !_qp.nsd()->capLooped() || ( e->myLoc != _qp.nsd()->capExtent ) ) - return e->firstRecord; - // Likely we are on the fresh side of capExtent, so return first fresh record. - // If we are on the stale side of capExtent, then the collection is small and it - // doesn't matter if we start the extent scan with capFirstNewRecord. - return _qp.nsd()->capFirstNewRecord; - } - - // should never have an empty extent in the oplog, so don't worry about that case - DiskLoc prevLoc( const DiskLoc &rec ) { - Extent *e = rec.rec()->myExtent( rec ); - if ( _qp.nsd()->capLooped() ) { - if ( e->xprev.isNull() ) - e = _qp.nsd()->lastExtent.ext(); - else - e = e->xprev.ext(); - if ( e->myLoc != _qp.nsd()->capExtent ) - return e->firstRecord; - } else { - if ( !e->xprev.isNull() ) { - e = e->xprev.ext(); - return e->firstRecord; - } - } - return DiskLoc(); // reached beginning of collection - } - void createClientCursor( const DiskLoc &startLoc = DiskLoc() ) { - auto_ptr<Cursor> c = _qp.newCursor( startLoc ); - _findingStartCursor = new ClientCursor(c, _qp.ns(), false); - } - void destroyClientCursor() { - if ( _findingStartCursor ) { - ClientCursor::erase( _findingStartCursor->cursorid ); - _findingStartCursor = 0; - } - } - void maybeRelease() { - RARELY { - CursorId id = _findingStartCursor->cursorid; - _findingStartCursor->updateLocation(); - { - dbtemprelease t; - } - _findingStartCursor = ClientCursor::find( id, false ); - } - } - void init() { - // Use a ClientCursor here so we can release db mutex while scanning - // oplog (can take quite a while with large oplogs). - auto_ptr<Cursor> c = _qp.newReverseCursor(); - _findingStartCursor = new ClientCursor(c, _qp.ns(), false); - _findingStartTimer.reset(); - _findingStartMode = Initial; - BSONElement tsElt = _qp.query()[ "ts" ]; - massert( 13044, "no ts field in query", !tsElt.eoo() ); - BSONObjBuilder b; - b.append( tsElt ); - BSONObj tsQuery = b.obj(); - _matcher.reset(new CoveredIndexMatcher(tsQuery, _qp.indexKey())); - } - }; - } // namespace mongo |