summaryrefslogtreecommitdiff
path: root/db/oplog.cpp
diff options
context:
space:
mode:
authorAntonin Kral <a.kral@bobek.cz>2011-09-14 17:08:06 +0200
committerAntonin Kral <a.kral@bobek.cz>2011-09-14 17:08:06 +0200
commit5d342a758c6095b4d30aba0750b54f13b8916f51 (patch)
tree762e9aa84781f5e3b96db2c02d356c29cf0217c0 /db/oplog.cpp
parentcbe2d992e9cd1ea66af9fa91df006106775d3073 (diff)
downloadmongodb-5d342a758c6095b4d30aba0750b54f13b8916f51.tar.gz
Imported Upstream version 2.0.0
Diffstat (limited to 'db/oplog.cpp')
-rw-r--r--db/oplog.cpp216
1 files changed, 181 insertions, 35 deletions
diff --git a/db/oplog.cpp b/db/oplog.cpp
index 1557cbd..dc9db76 100644
--- a/db/oplog.cpp
+++ b/db/oplog.cpp
@@ -23,6 +23,12 @@
#include "commands.h"
#include "repl/rs.h"
#include "stats/counters.h"
+#include "../util/file.h"
+#include "../util/unittest.h"
+#include "queryoptimizer.h"
+#include "ops/update.h"
+#include "ops/delete.h"
+#include "ops/query.h"
namespace mongo {
@@ -113,10 +119,12 @@ namespace mongo {
*b = EOO;
}
+ // global is safe as we are in write lock. we put the static outside the function to avoid the implicit mutex
+ // the compiler would use if inside the function. the reason this is static is to avoid a malloc/free for this
+ // on every logop call.
+ static BufBuilder logopbufbuilder(8*1024);
static void _logOpRS(const char *opstr, const char *ns, const char *logNS, const BSONObj& obj, BSONObj *o2, bool *bb ) {
DEV assertInWriteLock();
- // ^- static is safe as we are in write lock
- static BufBuilder bufbuilder(8*1024);
if ( strncmp(ns, "local.", 6) == 0 ) {
if ( strncmp(ns, "local.slaves", 12) == 0 )
@@ -125,7 +133,6 @@ namespace mongo {
}
const OpTime ts = OpTime::now();
-
long long hashNew;
if( theReplSet ) {
massert(13312, "replSet error : logOp() but not primary?", theReplSet->box.getState().primary());
@@ -141,12 +148,10 @@ namespace mongo {
instead we do a single copy to the destination position in the memory mapped file.
*/
- bufbuilder.reset();
- BSONObjBuilder b(bufbuilder);
-
+ logopbufbuilder.reset();
+ BSONObjBuilder b(logopbufbuilder);
b.appendTimestamp("ts", ts.asDate());
b.append("h", hashNew);
-
b.append("op", opstr);
b.append("ns", ns);
if ( bb )
@@ -361,7 +366,7 @@ namespace mongo {
sz = (256-64) * 1000 * 1000;
#else
sz = 990.0 * 1000 * 1000;
- boost::intmax_t free = freeSpace(); //-1 if call not supported.
+ boost::intmax_t free = File::freeSpace(dbpath); //-1 if call not supported.
double fivePct = free * 0.05;
if ( fivePct > sz )
sz = fivePct;
@@ -389,11 +394,151 @@ namespace mongo {
// -------------------------------------
- struct TestOpTime {
- TestOpTime() {
+ FindingStartCursor::FindingStartCursor( const QueryPlan & qp ) :
+ _qp( qp ),
+ _findingStart( true ),
+ _findingStartMode(),
+ _findingStartTimer( 0 )
+ { init(); }
+
+ void FindingStartCursor::next() {
+ if ( !_findingStartCursor || !_findingStartCursor->ok() ) {
+ _findingStart = false;
+ _c = _qp.newCursor(); // on error, start from beginning
+ destroyClientCursor();
+ return;
+ }
+ switch( _findingStartMode ) {
+ // Initial mode: scan backwards from end of collection
+ case Initial: {
+ if ( !_matcher->matchesCurrent( _findingStartCursor->c() ) ) {
+ _findingStart = false; // found first record out of query range, so scan normally
+ _c = _qp.newCursor( _findingStartCursor->currLoc() );
+ destroyClientCursor();
+ return;
+ }
+ _findingStartCursor->advance();
+ RARELY {
+ if ( _findingStartTimer.seconds() >= __findingStartInitialTimeout ) {
+ // If we've scanned enough, switch to find extent mode.
+ createClientCursor( extentFirstLoc( _findingStartCursor->currLoc() ) );
+ _findingStartMode = FindExtent;
+ return;
+ }
+ }
+ return;
+ }
+ // FindExtent mode: moving backwards through extents, check first
+ // document of each extent.
+ case FindExtent: {
+ if ( !_matcher->matchesCurrent( _findingStartCursor->c() ) ) {
+ _findingStartMode = InExtent;
+ return;
+ }
+ DiskLoc prev = prevExtentFirstLoc( _findingStartCursor->currLoc() );
+ if ( prev.isNull() ) { // hit beginning, so start scanning from here
+ createClientCursor();
+ _findingStartMode = InExtent;
+ return;
+ }
+ // There might be a more efficient implementation than creating new cursor & client cursor each time,
+ // not worrying about that for now
+ createClientCursor( prev );
+ return;
+ }
+ // InExtent mode: once an extent is chosen, find starting doc in the extent.
+ case InExtent: {
+ if ( _matcher->matchesCurrent( _findingStartCursor->c() ) ) {
+ _findingStart = false; // found first record in query range, so scan normally
+ _c = _qp.newCursor( _findingStartCursor->currLoc() );
+ destroyClientCursor();
+ return;
+ }
+ _findingStartCursor->advance();
+ return;
+ }
+ default: {
+ massert( 14038, "invalid _findingStartMode", false );
+ }
+ }
+ }
+
+ DiskLoc FindingStartCursor::extentFirstLoc( const DiskLoc &rec ) {
+ Extent *e = rec.rec()->myExtent( rec );
+ if ( !_qp.nsd()->capLooped() || ( e->myLoc != _qp.nsd()->capExtent ) )
+ return e->firstRecord;
+ // Likely we are on the fresh side of capExtent, so return first fresh record.
+ // If we are on the stale side of capExtent, then the collection is small and it
+ // doesn't matter if we start the extent scan with capFirstNewRecord.
+ return _qp.nsd()->capFirstNewRecord;
+ }
+
+ void wassertExtentNonempty( const Extent *e ) {
+ // TODO ensure this requirement is clearly enforced, or fix.
+ wassert( !e->firstRecord.isNull() );
+ }
+
+ DiskLoc FindingStartCursor::prevExtentFirstLoc( const DiskLoc &rec ) {
+ Extent *e = rec.rec()->myExtent( rec );
+ if ( _qp.nsd()->capLooped() ) {
+ if ( e->xprev.isNull() ) {
+ e = _qp.nsd()->lastExtent.ext();
+ }
+ else {
+ e = e->xprev.ext();
+ }
+ if ( e->myLoc != _qp.nsd()->capExtent ) {
+ wassertExtentNonempty( e );
+ return e->firstRecord;
+ }
+ }
+ else {
+ if ( !e->xprev.isNull() ) {
+ e = e->xprev.ext();
+ wassertExtentNonempty( e );
+ return e->firstRecord;
+ }
+ }
+ return DiskLoc(); // reached beginning of collection
+ }
+
+ void FindingStartCursor::createClientCursor( const DiskLoc &startLoc ) {
+ shared_ptr<Cursor> c = _qp.newCursor( startLoc );
+ _findingStartCursor.reset( new ClientCursor(QueryOption_NoCursorTimeout, c, _qp.ns()) );
+ }
+
+ bool FindingStartCursor::firstDocMatchesOrEmpty() const {
+ shared_ptr<Cursor> c = _qp.newCursor();
+ return !c->ok() || _matcher->matchesCurrent( c.get() );
+ }
+
+ void FindingStartCursor::init() {
+ BSONElement tsElt = _qp.originalQuery()[ "ts" ];
+ massert( 13044, "no ts field in query", !tsElt.eoo() );
+ BSONObjBuilder b;
+ b.append( tsElt );
+ BSONObj tsQuery = b.obj();
+ _matcher.reset(new CoveredIndexMatcher(tsQuery, _qp.indexKey()));
+ if ( firstDocMatchesOrEmpty() ) {
+ _c = _qp.newCursor();
+ _findingStart = false;
+ return;
+ }
+ // Use a ClientCursor here so we can release db mutex while scanning
+ // oplog (can take quite a while with large oplogs).
+ shared_ptr<Cursor> c = _qp.newReverseCursor();
+ _findingStartCursor.reset( new ClientCursor(QueryOption_NoCursorTimeout, c, _qp.ns(), BSONObj()) );
+ _findingStartTimer.reset();
+ _findingStartMode = Initial;
+ }
+
+ // -------------------------------------
+
+ struct TestOpTime : public UnitTest {
+ void run() {
OpTime t;
for ( int i = 0; i < 10; i++ ) {
- OpTime s = OpTime::now();
+ OpTime s = OpTime::now_inlock();
assert( s != t );
t = s;
}
@@ -481,18 +626,23 @@ namespace mongo {
}
void applyOperation_inlock(const BSONObj& op , bool fromRepl ) {
+ assertInWriteLock();
+ LOG(6) << "applying op: " << op << endl;
+
OpCounters * opCounters = fromRepl ? &replOpCounters : &globalOpCounters;
- if( logLevel >= 6 )
- log() << "applying op: " << op << endl;
+ const char *names[] = { "o", "ns", "op", "b" };
+ BSONElement fields[4];
+ op.getFields(4, names, fields);
- assertInWriteLock();
+ BSONObj o;
+ if( fields[0].isABSONObj() )
+ o = fields[0].embeddedObject();
+
+ const char *ns = fields[1].valuestrsafe();
- OpDebug debug;
- BSONObj o = op.getObjectField("o");
- const char *ns = op.getStringField("ns");
// operation type -- see logOp() comments for types
- const char *opType = op.getStringField("op");
+ const char *opType = fields[2].valuestrsafe();
if ( *opType == 'i' ) {
opCounters->gotInsert();
@@ -505,57 +655,53 @@ namespace mongo {
}
else {
// do upserts for inserts as we might get replayed more than once
+ OpDebug debug;
BSONElement _id;
if( !o.getObjectID(_id) ) {
/* No _id. This will be very slow. */
Timer t;
- updateObjects(ns, o, o, true, false, false , debug );
+ updateObjects(ns, o, o, true, false, false, debug );
if( t.millis() >= 2 ) {
RARELY OCCASIONALLY log() << "warning, repl doing slow updates (no _id field) for " << ns << endl;
}
}
else {
- BSONObjBuilder b;
- b.append(_id);
-
/* erh 10/16/2009 - this is probably not relevant any more since its auto-created, but not worth removing */
RARELY ensureHaveIdIndex(ns); // otherwise updates will be slow
/* todo : it may be better to do an insert here, and then catch the dup key exception and do update
then. very few upserts will not be inserts...
*/
+ BSONObjBuilder b;
+ b.append(_id);
updateObjects(ns, o, b.done(), true, false, false , debug );
}
}
}
else if ( *opType == 'u' ) {
opCounters->gotUpdate();
-
RARELY ensureHaveIdIndex(ns); // otherwise updates will be super slow
- updateObjects(ns, o, op.getObjectField("o2"), /*upsert*/ op.getBoolField("b"), /*multi*/ false, /*logop*/ false , debug );
+ OpDebug debug;
+ updateObjects(ns, o, op.getObjectField("o2"), /*upsert*/ fields[3].booleanSafe(), /*multi*/ false, /*logop*/ false , debug );
}
else if ( *opType == 'd' ) {
opCounters->gotDelete();
-
if ( opType[1] == 0 )
- deleteObjects(ns, o, op.getBoolField("b"));
+ deleteObjects(ns, o, /*justOne*/ fields[3].booleanSafe());
else
assert( opType[1] == 'b' ); // "db" advertisement
}
- else if ( *opType == 'n' ) {
- // no op
- }
else if ( *opType == 'c' ) {
opCounters->gotCommand();
-
BufBuilder bb;
BSONObjBuilder ob;
_runCommands(ns, o, bb, ob, true, 0);
}
+ else if ( *opType == 'n' ) {
+ // no op
+ }
else {
- stringstream ss;
- ss << "unknown opType [" << opType << "]";
- throw MsgAssertionException( 13141 , ss.str() );
+ throw MsgAssertionException( 14825 , ErrorMsg("error in applyOperation : unknown opType ", *opType) );
}
}
@@ -566,9 +712,9 @@ namespace mongo {
virtual LockType locktype() const { return WRITE; }
ApplyOpsCmd() : Command( "applyOps" ) {}
virtual void help( stringstream &help ) const {
- help << "examples: { applyOps : [ ] , preCondition : [ { ns : ... , q : ... , res : ... } ] }";
+ help << "internal (sharding)\n{ applyOps : [ ] , preCondition : [ { ns : ... , q : ... , res : ... } ] }";
}
- virtual bool run(const string& dbname, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl) {
+ virtual bool run(const string& dbname, BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool fromRepl) {
if ( cmdObj.firstElement().type() != Array ) {
errmsg = "ops has to be an array";