summaryrefslogtreecommitdiff
path: root/db/repl.h
diff options
context:
space:
mode:
authorAntonin Kral <a.kral@bobek.cz>2010-03-25 19:21:32 +0100
committerAntonin Kral <a.kral@bobek.cz>2010-03-25 19:21:32 +0100
commit0ca01a91ae0a3562e54c226e7b9512feb2ea83d0 (patch)
tree2b3886e435b0217d6afd63a213b04d32bb4b4f6f /db/repl.h
parenta696359b248adef0cc8576fce3f473535e995136 (diff)
downloadmongodb-0ca01a91ae0a3562e54c226e7b9512feb2ea83d0.tar.gz
Imported Upstream version 1.4.0
Diffstat (limited to 'db/repl.h')
-rw-r--r--db/repl.h196
1 files changed, 181 insertions, 15 deletions
diff --git a/db/repl.h b/db/repl.h
index a4c1737..c5e0f63 100644
--- a/db/repl.h
+++ b/db/repl.h
@@ -32,6 +32,7 @@
#include "db.h"
#include "dbhelpers.h"
#include "query.h"
+#include "queryoptimizer.h"
#include "../client/dbclient.h"
@@ -46,14 +47,31 @@ namespace mongo {
--slave cmd line setting -> SimpleSlave
*/
typedef enum { NotSlave=0, SimpleSlave, ReplPairSlave } SlaveTypes;
- extern SlaveTypes slave;
- /* true means we are master and doing replication. if we are not writing to oplog (no --master or repl pairing),
- this won't be true.
- */
- extern bool master;
+ class ReplSettings {
+ public:
+ SlaveTypes slave;
+
+ /* true means we are master and doing replication. if we are not writing to oplog (no --master or repl pairing),
+ this won't be true.
+ */
+ bool master;
+
+ int opIdMem;
+
+ bool fastsync;
+
+ bool autoresync;
+
+ int slavedelay;
+
+ ReplSettings()
+ : slave(NotSlave) , master(false) , opIdMem(100000000) , fastsync() , autoresync(false), slavedelay() {
+ }
+
+ };
- extern int opIdMem;
+ extern ReplSettings replSettings;
bool cloneFrom(const char *masterHost, string& errmsg, const string& fromdb, bool logForReplication,
bool slaveOk, bool useReplAuth, bool snapshot);
@@ -115,6 +133,7 @@ namespace mongo {
// returns false if the slave has been reset
bool updateSetsWithLocalOps( OpTime &localLogTail, bool mayUnlock );
string ns() const { return string( "local.oplog.$" ) + sourceName(); }
+ unsigned _sleepAdviceTime;
public:
static void applyOperation(const BSONObj& op);
@@ -131,11 +150,11 @@ namespace mongo {
OpTime syncedTo;
/* This is for repl pairs.
- _lastSavedLocalTs is the most recent point in the local log that we know is consistent
- with the remote log ( ie say the local op log has entries ABCDE and the remote op log
- has ABCXY, then _lastSavedLocalTs won't be greater than C until we have reconciled
- the DE-XY difference.)
- */
+ _lastSavedLocalTs is the most recent point in the local log that we know is consistent
+ with the remote log ( ie say the local op log has entries ABCDE and the remote op log
+ has ABCXY, then _lastSavedLocalTs won't be greater than C until we have reconciled
+ the DE-XY difference.)
+ */
OpTime _lastSavedLocalTs;
int nClonedThisPass;
@@ -160,7 +179,13 @@ namespace mongo {
operator string() const { return sourceName() + "@" + hostName; }
bool haveMoreDbsToSync() const { return !addDbNextPass.empty(); }
-
+ int sleepAdvice() const {
+ if ( !_sleepAdviceTime )
+ return 0;
+ int wait = _sleepAdviceTime - unsigned( time( 0 ) );
+ return wait > 0 ? wait : 0;
+ }
+
static bool throttledForceResyncDead( const char *requester );
static void forceResyncDead( const char *requester );
void forceResync( const char *requester );
@@ -173,7 +198,6 @@ namespace mongo {
"c" db cmd
"db" declares presence of a database (ns is set to the db name + '.')
*/
- void _logOp(const char *opstr, const char *ns, const char *logNs, const BSONObj& obj, BSONObj *patt, bool *b, const OpTime &ts);
void logOp(const char *opstr, const char *ns, const BSONObj& obj, BSONObj *patt = 0, bool *b = 0);
// class for managing a set of ids in memory
@@ -239,9 +263,9 @@ namespace mongo {
dbIds_( "local.temp.replIds" ),
dbModIds_( "local.temp.replModIds" ),
inMem_( true ),
- maxMem_( opIdMem ) {
+ maxMem_( replSettings.opIdMem ) {
}
- void reset( int maxMem = opIdMem ) {
+ void reset( int maxMem = replSettings.opIdMem ) {
memIds_.reset();
memModIds_.reset();
dbIds_.reset();
@@ -312,4 +336,146 @@ namespace mongo {
int maxMem_;
};
+ bool anyReplEnabled();
+ void appendReplicationInfo( BSONObjBuilder& result , bool authed , int level = 0 );
+
+ void replCheckCloseDatabase( Database * db );
+
+ extern int __findingStartInitialTimeout; // configurable for testing
+
+ class FindingStartCursor {
+ public:
+ FindingStartCursor( const QueryPlan & qp ) :
+ _qp( qp ),
+ _findingStart( true ),
+ _findingStartMode(),
+ _findingStartTimer( 0 ),
+ _findingStartCursor( 0 )
+ { init(); }
+ bool done() const { return !_findingStart; }
+ auto_ptr< Cursor > cRelease() { return _c; }
+ void next() {
+ if ( !_findingStartCursor || !_findingStartCursor->c->ok() ) {
+ _findingStart = false;
+ _c = _qp.newCursor(); // on error, start from beginning
+ destroyClientCursor();
+ return;
+ }
+ switch( _findingStartMode ) {
+ case Initial: {
+ if ( !_matcher->matches( _findingStartCursor->c->currKey(), _findingStartCursor->c->currLoc() ) ) {
+ _findingStart = false; // found first record out of query range, so scan normally
+ _c = _qp.newCursor( _findingStartCursor->c->currLoc() );
+ destroyClientCursor();
+ return;
+ }
+ _findingStartCursor->c->advance();
+ RARELY {
+ if ( _findingStartTimer.seconds() >= __findingStartInitialTimeout ) {
+ createClientCursor( startLoc( _findingStartCursor->c->currLoc() ) );
+ _findingStartMode = FindExtent;
+ return;
+ }
+ }
+ maybeRelease();
+ return;
+ }
+ case FindExtent: {
+ if ( !_matcher->matches( _findingStartCursor->c->currKey(), _findingStartCursor->c->currLoc() ) ) {
+ _findingStartMode = InExtent;
+ return;
+ }
+ DiskLoc prev = prevLoc( _findingStartCursor->c->currLoc() );
+ if ( prev.isNull() ) { // hit beginning, so start scanning from here
+ createClientCursor();
+ _findingStartMode = InExtent;
+ return;
+ }
+ // There might be a more efficient implementation than creating new cursor & client cursor each time,
+ // not worrying about that for now
+ createClientCursor( prev );
+ maybeRelease();
+ return;
+ }
+ case InExtent: {
+ if ( _matcher->matches( _findingStartCursor->c->currKey(), _findingStartCursor->c->currLoc() ) ) {
+ _findingStart = false; // found first record in query range, so scan normally
+ _c = _qp.newCursor( _findingStartCursor->c->currLoc() );
+ destroyClientCursor();
+ return;
+ }
+ _findingStartCursor->c->advance();
+ maybeRelease();
+ return;
+ }
+ default: {
+ massert( 12600, "invalid _findingStartMode", false );
+ }
+ }
+ }
+ private:
+ enum FindingStartMode { Initial, FindExtent, InExtent };
+ const QueryPlan &_qp;
+ bool _findingStart;
+ FindingStartMode _findingStartMode;
+ auto_ptr< CoveredIndexMatcher > _matcher;
+ Timer _findingStartTimer;
+ ClientCursor * _findingStartCursor;
+ auto_ptr< Cursor > _c;
+ DiskLoc startLoc( const DiskLoc &rec ) {
+ Extent *e = rec.rec()->myExtent( rec );
+ if ( e->myLoc != _qp.nsd()->capExtent )
+ return e->firstRecord;
+ // Likely we are on the fresh side of capExtent, so return first fresh record.
+ // If we are on the stale side of capExtent, then the collection is small and it
+ // doesn't matter if we start the extent scan with capFirstNewRecord.
+ return _qp.nsd()->capFirstNewRecord;
+ }
+
+ DiskLoc prevLoc( const DiskLoc &rec ) {
+ Extent *e = rec.rec()->myExtent( rec );
+ if ( e->xprev.isNull() )
+ e = _qp.nsd()->lastExtent.ext();
+ else
+ e = e->xprev.ext();
+ if ( e->myLoc != _qp.nsd()->capExtent )
+ return e->firstRecord;
+ return DiskLoc(); // reached beginning of collection
+ }
+ void createClientCursor( const DiskLoc &startLoc = DiskLoc() ) {
+ auto_ptr<Cursor> c = _qp.newCursor( startLoc );
+ _findingStartCursor = new ClientCursor(c, _qp.ns(), false);
+ }
+ void destroyClientCursor() {
+ if ( _findingStartCursor ) {
+ ClientCursor::erase( _findingStartCursor->cursorid );
+ _findingStartCursor = 0;
+ }
+ }
+ void maybeRelease() {
+ RARELY {
+ CursorId id = _findingStartCursor->cursorid;
+ _findingStartCursor->updateLocation();
+ {
+ dbtemprelease t;
+ }
+ _findingStartCursor = ClientCursor::find( id, false );
+ }
+ }
+ void init() {
+ // Use a ClientCursor here so we can release db mutex while scanning
+ // oplog (can take quite a while with large oplogs).
+ auto_ptr<Cursor> c = _qp.newReverseCursor();
+ _findingStartCursor = new ClientCursor(c, _qp.ns(), false);
+ _findingStartTimer.reset();
+ _findingStartMode = Initial;
+ BSONElement tsElt = _qp.query()[ "ts" ];
+ massert( 13044, "no ts field in query", !tsElt.eoo() );
+ BSONObjBuilder b;
+ b.append( tsElt );
+ BSONObj tsQuery = b.obj();
+ _matcher.reset(new CoveredIndexMatcher(tsQuery, _qp.indexKey()));
+ }
+ };
+
} // namespace mongo