summaryrefslogtreecommitdiff
path: root/db/repl.h
diff options
context:
space:
mode:
Diffstat (limited to 'db/repl.h')
-rw-r--r--db/repl.h191
1 files changed, 15 insertions, 176 deletions
diff --git a/db/repl.h b/db/repl.h
index eb1cb26..f33acad 100644
--- a/db/repl.h
+++ b/db/repl.h
@@ -23,7 +23,6 @@
at the master:
local.oplog.$<source>
- local.oplog.$main is the default
*/
#pragma once
@@ -33,16 +32,14 @@
#include "dbhelpers.h"
#include "query.h"
#include "queryoptimizer.h"
-
#include "../client/dbclient.h"
-
#include "../util/optime.h"
+#include "oplog.h"
+#include "../util/concurrency/thread_pool.h"
+#include "oplogreader.h"
namespace mongo {
- class DBClientConnection;
- class DBClientCursor;
-
/* replication slave? (possibly with slave or repl pair nonmaster)
--slave cmd line setting -> SimpleSlave
*/
@@ -79,10 +76,9 @@ namespace mongo {
/* A replication exception */
class SyncException : public DBException {
public:
- virtual const char* what() const throw() { return "sync exception"; }
- virtual int getCode(){ return 10001; }
+ SyncException() : DBException( "sync exception" , 10001 ){}
};
-
+
/* A Source is a source from which we can pull (replicate) data.
stored in collection local.sources.
@@ -94,16 +90,15 @@ namespace mongo {
not done (always use main for now).
*/
class ReplSource {
+ auto_ptr<ThreadPool> tp;
+
bool resync(string db);
/* pull some operations from the master's oplog, and apply them. */
- bool sync_pullOpLog(int& nApplied);
+ int sync_pullOpLog(int& nApplied);
void sync_pullOpLog_applyOperation(BSONObj& op, OpTime *localLogTail);
- auto_ptr<DBClientConnection> conn;
- auto_ptr<DBClientCursor> cursor;
-
/* we only clone one database per pass, even if a lot need done. This helps us
avoid overflowing the master's transaction log by doing too much work before going
back to read more transactions. (Imagine a scenario of slave startup where we try to
@@ -117,8 +112,6 @@ namespace mongo {
// returns the dummy ns used to do the drop
string resyncDrop( const char *db, const char *requester );
- // returns true if connected on return
- bool connect();
// returns possibly unowned id spec for the operation.
static BSONObj idForOp( const BSONObj &op, bool &mod );
static void updateSetsWithOp( const BSONObj &op, bool mayUpdateStorage );
@@ -136,6 +129,8 @@ namespace mongo {
unsigned _sleepAdviceTime;
public:
+ OplogReader oplogReader;
+
static void applyOperation(const BSONObj& op);
bool replacing; // in "replace mode" -- see CmdReplacePeer
bool paired; // --pair in use
@@ -162,12 +157,11 @@ namespace mongo {
typedef vector< shared_ptr< ReplSource > > SourceVector;
static void loadAll(SourceVector&);
explicit ReplSource(BSONObj);
- bool sync(int& nApplied);
+
+ /* -1 = error */
+ int sync(int& nApplied);
+
void save(); // write ourself to local.sources
- void resetConnection() {
- cursor = auto_ptr<DBClientCursor>(0);
- conn = auto_ptr<DBClientConnection>(0);
- }
// make a jsobj from our member fields of the form
// { host: ..., source: ..., syncedTo: ... }
@@ -176,7 +170,7 @@ namespace mongo {
bool operator==(const ReplSource&r) const {
return hostName == r.hostName && sourceName() == r.sourceName();
}
- operator string() const { return sourceName() + "@" + hostName; }
+ string toString() const { return sourceName() + "@" + hostName; }
bool haveMoreDbsToSync() const { return !addDbNextPass.empty(); }
int sleepAdvice() const {
@@ -191,15 +185,6 @@ namespace mongo {
void forceResync( const char *requester );
};
- /* Write operation to the log (local.oplog.$main)
- "i" insert
- "u" update
- "d" delete
- "c" db cmd
- "db" declares presence of a database (ns is set to the db name + '.')
- */
- void logOp(const char *opstr, const char *ns, const BSONObj& obj, BSONObj *patt = 0, bool *b = 0);
-
// class for managing a set of ids in memory
class MemIds {
public:
@@ -342,151 +327,5 @@ namespace mongo {
bool anyReplEnabled();
void appendReplicationInfo( BSONObjBuilder& result , bool authed , int level = 0 );
- void replCheckCloseDatabase( Database * db );
- extern int __findingStartInitialTimeout; // configurable for testing
-
- class FindingStartCursor {
- public:
- FindingStartCursor( const QueryPlan & qp ) :
- _qp( qp ),
- _findingStart( true ),
- _findingStartMode(),
- _findingStartTimer( 0 ),
- _findingStartCursor( 0 )
- { init(); }
- bool done() const { return !_findingStart; }
- auto_ptr< Cursor > cRelease() { return _c; }
- void next() {
- if ( !_findingStartCursor || !_findingStartCursor->c->ok() ) {
- _findingStart = false;
- _c = _qp.newCursor(); // on error, start from beginning
- destroyClientCursor();
- return;
- }
- switch( _findingStartMode ) {
- case Initial: {
- if ( !_matcher->matches( _findingStartCursor->c->currKey(), _findingStartCursor->c->currLoc() ) ) {
- _findingStart = false; // found first record out of query range, so scan normally
- _c = _qp.newCursor( _findingStartCursor->c->currLoc() );
- destroyClientCursor();
- return;
- }
- _findingStartCursor->c->advance();
- RARELY {
- if ( _findingStartTimer.seconds() >= __findingStartInitialTimeout ) {
- createClientCursor( startLoc( _findingStartCursor->c->currLoc() ) );
- _findingStartMode = FindExtent;
- return;
- }
- }
- maybeRelease();
- return;
- }
- case FindExtent: {
- if ( !_matcher->matches( _findingStartCursor->c->currKey(), _findingStartCursor->c->currLoc() ) ) {
- _findingStartMode = InExtent;
- return;
- }
- DiskLoc prev = prevLoc( _findingStartCursor->c->currLoc() );
- if ( prev.isNull() ) { // hit beginning, so start scanning from here
- createClientCursor();
- _findingStartMode = InExtent;
- return;
- }
- // There might be a more efficient implementation than creating new cursor & client cursor each time,
- // not worrying about that for now
- createClientCursor( prev );
- maybeRelease();
- return;
- }
- case InExtent: {
- if ( _matcher->matches( _findingStartCursor->c->currKey(), _findingStartCursor->c->currLoc() ) ) {
- _findingStart = false; // found first record in query range, so scan normally
- _c = _qp.newCursor( _findingStartCursor->c->currLoc() );
- destroyClientCursor();
- return;
- }
- _findingStartCursor->c->advance();
- maybeRelease();
- return;
- }
- default: {
- massert( 12600, "invalid _findingStartMode", false );
- }
- }
- }
- private:
- enum FindingStartMode { Initial, FindExtent, InExtent };
- const QueryPlan &_qp;
- bool _findingStart;
- FindingStartMode _findingStartMode;
- auto_ptr< CoveredIndexMatcher > _matcher;
- Timer _findingStartTimer;
- ClientCursor * _findingStartCursor;
- auto_ptr< Cursor > _c;
- DiskLoc startLoc( const DiskLoc &rec ) {
- Extent *e = rec.rec()->myExtent( rec );
- if ( !_qp.nsd()->capLooped() || ( e->myLoc != _qp.nsd()->capExtent ) )
- return e->firstRecord;
- // Likely we are on the fresh side of capExtent, so return first fresh record.
- // If we are on the stale side of capExtent, then the collection is small and it
- // doesn't matter if we start the extent scan with capFirstNewRecord.
- return _qp.nsd()->capFirstNewRecord;
- }
-
- // should never have an empty extent in the oplog, so don't worry about that case
- DiskLoc prevLoc( const DiskLoc &rec ) {
- Extent *e = rec.rec()->myExtent( rec );
- if ( _qp.nsd()->capLooped() ) {
- if ( e->xprev.isNull() )
- e = _qp.nsd()->lastExtent.ext();
- else
- e = e->xprev.ext();
- if ( e->myLoc != _qp.nsd()->capExtent )
- return e->firstRecord;
- } else {
- if ( !e->xprev.isNull() ) {
- e = e->xprev.ext();
- return e->firstRecord;
- }
- }
- return DiskLoc(); // reached beginning of collection
- }
- void createClientCursor( const DiskLoc &startLoc = DiskLoc() ) {
- auto_ptr<Cursor> c = _qp.newCursor( startLoc );
- _findingStartCursor = new ClientCursor(c, _qp.ns(), false);
- }
- void destroyClientCursor() {
- if ( _findingStartCursor ) {
- ClientCursor::erase( _findingStartCursor->cursorid );
- _findingStartCursor = 0;
- }
- }
- void maybeRelease() {
- RARELY {
- CursorId id = _findingStartCursor->cursorid;
- _findingStartCursor->updateLocation();
- {
- dbtemprelease t;
- }
- _findingStartCursor = ClientCursor::find( id, false );
- }
- }
- void init() {
- // Use a ClientCursor here so we can release db mutex while scanning
- // oplog (can take quite a while with large oplogs).
- auto_ptr<Cursor> c = _qp.newReverseCursor();
- _findingStartCursor = new ClientCursor(c, _qp.ns(), false);
- _findingStartTimer.reset();
- _findingStartMode = Initial;
- BSONElement tsElt = _qp.query()[ "ts" ];
- massert( 13044, "no ts field in query", !tsElt.eoo() );
- BSONObjBuilder b;
- b.append( tsElt );
- BSONObj tsQuery = b.obj();
- _matcher.reset(new CoveredIndexMatcher(tsQuery, _qp.indexKey()));
- }
- };
-
} // namespace mongo