diff options
author | Antonin Kral <a.kral@bobek.cz> | 2011-09-14 17:08:06 +0200 |
---|---|---|
committer | Antonin Kral <a.kral@bobek.cz> | 2011-09-14 17:08:06 +0200 |
commit | 5d342a758c6095b4d30aba0750b54f13b8916f51 (patch) | |
tree | 762e9aa84781f5e3b96db2c02d356c29cf0217c0 /db/oplog.cpp | |
parent | cbe2d992e9cd1ea66af9fa91df006106775d3073 (diff) | |
download | mongodb-5d342a758c6095b4d30aba0750b54f13b8916f51.tar.gz |
Imported Upstream version 2.0.0
Diffstat (limited to 'db/oplog.cpp')
-rw-r--r-- | db/oplog.cpp | 216 |
1 files changed, 181 insertions, 35 deletions
diff --git a/db/oplog.cpp b/db/oplog.cpp index 1557cbd..dc9db76 100644 --- a/db/oplog.cpp +++ b/db/oplog.cpp @@ -23,6 +23,12 @@ #include "commands.h" #include "repl/rs.h" #include "stats/counters.h" +#include "../util/file.h" +#include "../util/unittest.h" +#include "queryoptimizer.h" +#include "ops/update.h" +#include "ops/delete.h" +#include "ops/query.h" namespace mongo { @@ -113,10 +119,12 @@ namespace mongo { *b = EOO; } + // global is safe as we are in write lock. we put the static outside the function to avoid the implicit mutex + // the compiler would use if inside the function. the reason this is static is to avoid a malloc/free for this + // on every logop call. + static BufBuilder logopbufbuilder(8*1024); static void _logOpRS(const char *opstr, const char *ns, const char *logNS, const BSONObj& obj, BSONObj *o2, bool *bb ) { DEV assertInWriteLock(); - // ^- static is safe as we are in write lock - static BufBuilder bufbuilder(8*1024); if ( strncmp(ns, "local.", 6) == 0 ) { if ( strncmp(ns, "local.slaves", 12) == 0 ) @@ -125,7 +133,6 @@ namespace mongo { } const OpTime ts = OpTime::now(); - long long hashNew; if( theReplSet ) { massert(13312, "replSet error : logOp() but not primary?", theReplSet->box.getState().primary()); @@ -141,12 +148,10 @@ namespace mongo { instead we do a single copy to the destination position in the memory mapped file. */ - bufbuilder.reset(); - BSONObjBuilder b(bufbuilder); - + logopbufbuilder.reset(); + BSONObjBuilder b(logopbufbuilder); b.appendTimestamp("ts", ts.asDate()); b.append("h", hashNew); - b.append("op", opstr); b.append("ns", ns); if ( bb ) @@ -361,7 +366,7 @@ namespace mongo { sz = (256-64) * 1000 * 1000; #else sz = 990.0 * 1000 * 1000; - boost::intmax_t free = freeSpace(); //-1 if call not supported. + boost::intmax_t free = File::freeSpace(dbpath); //-1 if call not supported. double fivePct = free * 0.05; if ( fivePct > sz ) sz = fivePct; @@ -389,11 +394,151 @@ namespace mongo { // ------------------------------------- - struct TestOpTime { - TestOpTime() { + FindingStartCursor::FindingStartCursor( const QueryPlan & qp ) : + _qp( qp ), + _findingStart( true ), + _findingStartMode(), + _findingStartTimer( 0 ) + { init(); } + + void FindingStartCursor::next() { + if ( !_findingStartCursor || !_findingStartCursor->ok() ) { + _findingStart = false; + _c = _qp.newCursor(); // on error, start from beginning + destroyClientCursor(); + return; + } + switch( _findingStartMode ) { + // Initial mode: scan backwards from end of collection + case Initial: { + if ( !_matcher->matchesCurrent( _findingStartCursor->c() ) ) { + _findingStart = false; // found first record out of query range, so scan normally + _c = _qp.newCursor( _findingStartCursor->currLoc() ); + destroyClientCursor(); + return; + } + _findingStartCursor->advance(); + RARELY { + if ( _findingStartTimer.seconds() >= __findingStartInitialTimeout ) { + // If we've scanned enough, switch to find extent mode. + createClientCursor( extentFirstLoc( _findingStartCursor->currLoc() ) ); + _findingStartMode = FindExtent; + return; + } + } + return; + } + // FindExtent mode: moving backwards through extents, check first + // document of each extent. + case FindExtent: { + if ( !_matcher->matchesCurrent( _findingStartCursor->c() ) ) { + _findingStartMode = InExtent; + return; + } + DiskLoc prev = prevExtentFirstLoc( _findingStartCursor->currLoc() ); + if ( prev.isNull() ) { // hit beginning, so start scanning from here + createClientCursor(); + _findingStartMode = InExtent; + return; + } + // There might be a more efficient implementation than creating new cursor & client cursor each time, + // not worrying about that for now + createClientCursor( prev ); + return; + } + // InExtent mode: once an extent is chosen, find starting doc in the extent. + case InExtent: { + if ( _matcher->matchesCurrent( _findingStartCursor->c() ) ) { + _findingStart = false; // found first record in query range, so scan normally + _c = _qp.newCursor( _findingStartCursor->currLoc() ); + destroyClientCursor(); + return; + } + _findingStartCursor->advance(); + return; + } + default: { + massert( 14038, "invalid _findingStartMode", false ); + } + } + } + + DiskLoc FindingStartCursor::extentFirstLoc( const DiskLoc &rec ) { + Extent *e = rec.rec()->myExtent( rec ); + if ( !_qp.nsd()->capLooped() || ( e->myLoc != _qp.nsd()->capExtent ) ) + return e->firstRecord; + // Likely we are on the fresh side of capExtent, so return first fresh record. + // If we are on the stale side of capExtent, then the collection is small and it + // doesn't matter if we start the extent scan with capFirstNewRecord. + return _qp.nsd()->capFirstNewRecord; + } + + void wassertExtentNonempty( const Extent *e ) { + // TODO ensure this requirement is clearly enforced, or fix. + wassert( !e->firstRecord.isNull() ); + } + + DiskLoc FindingStartCursor::prevExtentFirstLoc( const DiskLoc &rec ) { + Extent *e = rec.rec()->myExtent( rec ); + if ( _qp.nsd()->capLooped() ) { + if ( e->xprev.isNull() ) { + e = _qp.nsd()->lastExtent.ext(); + } + else { + e = e->xprev.ext(); + } + if ( e->myLoc != _qp.nsd()->capExtent ) { + wassertExtentNonempty( e ); + return e->firstRecord; + } + } + else { + if ( !e->xprev.isNull() ) { + e = e->xprev.ext(); + wassertExtentNonempty( e ); + return e->firstRecord; + } + } + return DiskLoc(); // reached beginning of collection + } + + void FindingStartCursor::createClientCursor( const DiskLoc &startLoc ) { + shared_ptr<Cursor> c = _qp.newCursor( startLoc ); + _findingStartCursor.reset( new ClientCursor(QueryOption_NoCursorTimeout, c, _qp.ns()) ); + } + + bool FindingStartCursor::firstDocMatchesOrEmpty() const { + shared_ptr<Cursor> c = _qp.newCursor(); + return !c->ok() || _matcher->matchesCurrent( c.get() ); + } + + void FindingStartCursor::init() { + BSONElement tsElt = _qp.originalQuery()[ "ts" ]; + massert( 13044, "no ts field in query", !tsElt.eoo() ); + BSONObjBuilder b; + b.append( tsElt ); + BSONObj tsQuery = b.obj(); + _matcher.reset(new CoveredIndexMatcher(tsQuery, _qp.indexKey())); + if ( firstDocMatchesOrEmpty() ) { + _c = _qp.newCursor(); + _findingStart = false; + return; + } + // Use a ClientCursor here so we can release db mutex while scanning + // oplog (can take quite a while with large oplogs). + shared_ptr<Cursor> c = _qp.newReverseCursor(); + _findingStartCursor.reset( new ClientCursor(QueryOption_NoCursorTimeout, c, _qp.ns(), BSONObj()) ); + _findingStartTimer.reset(); + _findingStartMode = Initial; + } + + // ------------------------------------- + + struct TestOpTime : public UnitTest { + void run() { OpTime t; for ( int i = 0; i < 10; i++ ) { - OpTime s = OpTime::now(); + OpTime s = OpTime::now_inlock(); assert( s != t ); t = s; } @@ -481,18 +626,23 @@ namespace mongo { } void applyOperation_inlock(const BSONObj& op , bool fromRepl ) { + assertInWriteLock(); + LOG(6) << "applying op: " << op << endl; + OpCounters * opCounters = fromRepl ? &replOpCounters : &globalOpCounters; - if( logLevel >= 6 ) - log() << "applying op: " << op << endl; + const char *names[] = { "o", "ns", "op", "b" }; + BSONElement fields[4]; + op.getFields(4, names, fields); - assertInWriteLock(); + BSONObj o; + if( fields[0].isABSONObj() ) + o = fields[0].embeddedObject(); + + const char *ns = fields[1].valuestrsafe(); - OpDebug debug; - BSONObj o = op.getObjectField("o"); - const char *ns = op.getStringField("ns"); // operation type -- see logOp() comments for types - const char *opType = op.getStringField("op"); + const char *opType = fields[2].valuestrsafe(); if ( *opType == 'i' ) { opCounters->gotInsert(); @@ -505,57 +655,53 @@ namespace mongo { } else { // do upserts for inserts as we might get replayed more than once + OpDebug debug; BSONElement _id; if( !o.getObjectID(_id) ) { /* No _id. This will be very slow. */ Timer t; - updateObjects(ns, o, o, true, false, false , debug ); + updateObjects(ns, o, o, true, false, false, debug ); if( t.millis() >= 2 ) { RARELY OCCASIONALLY log() << "warning, repl doing slow updates (no _id field) for " << ns << endl; } } else { - BSONObjBuilder b; - b.append(_id); - /* erh 10/16/2009 - this is probably not relevant any more since its auto-created, but not worth removing */ RARELY ensureHaveIdIndex(ns); // otherwise updates will be slow /* todo : it may be better to do an insert here, and then catch the dup key exception and do update then. very few upserts will not be inserts... */ + BSONObjBuilder b; + b.append(_id); updateObjects(ns, o, b.done(), true, false, false , debug ); } } } else if ( *opType == 'u' ) { opCounters->gotUpdate(); - RARELY ensureHaveIdIndex(ns); // otherwise updates will be super slow - updateObjects(ns, o, op.getObjectField("o2"), /*upsert*/ op.getBoolField("b"), /*multi*/ false, /*logop*/ false , debug ); + OpDebug debug; + updateObjects(ns, o, op.getObjectField("o2"), /*upsert*/ fields[3].booleanSafe(), /*multi*/ false, /*logop*/ false , debug ); } else if ( *opType == 'd' ) { opCounters->gotDelete(); - if ( opType[1] == 0 ) - deleteObjects(ns, o, op.getBoolField("b")); + deleteObjects(ns, o, /*justOne*/ fields[3].booleanSafe()); else assert( opType[1] == 'b' ); // "db" advertisement } - else if ( *opType == 'n' ) { - // no op - } else if ( *opType == 'c' ) { opCounters->gotCommand(); - BufBuilder bb; BSONObjBuilder ob; _runCommands(ns, o, bb, ob, true, 0); } + else if ( *opType == 'n' ) { + // no op + } else { - stringstream ss; - ss << "unknown opType [" << opType << "]"; - throw MsgAssertionException( 13141 , ss.str() ); + throw MsgAssertionException( 14825 , ErrorMsg("error in applyOperation : unknown opType ", *opType) ); } } @@ -566,9 +712,9 @@ namespace mongo { virtual LockType locktype() const { return WRITE; } ApplyOpsCmd() : Command( "applyOps" ) {} virtual void help( stringstream &help ) const { - help << "examples: { applyOps : [ ] , preCondition : [ { ns : ... , q : ... , res : ... } ] }"; + help << "internal (sharding)\n{ applyOps : [ ] , preCondition : [ { ns : ... , q : ... , res : ... } ] }"; } - virtual bool run(const string& dbname, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl) { + virtual bool run(const string& dbname, BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool fromRepl) { if ( cmdObj.firstElement().type() != Array ) { errmsg = "ops has to be an array"; |