diff options
Diffstat (limited to 'db/oplog.cpp')
-rw-r--r-- | db/oplog.cpp | 228 |
1 files changed, 131 insertions, 97 deletions
diff --git a/db/oplog.cpp b/db/oplog.cpp index 93800c7..1557cbd 100644 --- a/db/oplog.cpp +++ b/db/oplog.cpp @@ -22,18 +22,19 @@ #include "repl.h" #include "commands.h" #include "repl/rs.h" +#include "stats/counters.h" namespace mongo { void logOpForSharding( const char * opstr , const char * ns , const BSONObj& obj , BSONObj * patt ); - int __findingStartInitialTimeout = 5; // configurable for testing + int __findingStartInitialTimeout = 5; // configurable for testing // cached copies of these...so don't rename them, drop them, etc.!!! static NamespaceDetails *localOplogMainDetails = 0; static Database *localDB = 0; static NamespaceDetails *rsOplogDetails = 0; - void oplogCheckCloseDatabase( Database * db ){ + void oplogCheckCloseDatabase( Database * db ) { localDB = 0; localOplogMainDetails = 0; rsOplogDetails = 0; @@ -44,10 +45,10 @@ namespace mongo { uassert(13288, "replSet error write op to db before replSet initialized", str::startsWith(ns, "local.") || *opstr == 'n'); } - /** write an op to the oplog that is already built. + /** write an op to the oplog that is already built. todo : make _logOpRS() call this so we don't repeat ourself? */ - void _logOpObjRS(const BSONObj& op) { + void _logOpObjRS(const BSONObj& op) { DEV assertInWriteLock(); const OpTime ts = op["ts"]._opTime(); @@ -62,11 +63,11 @@ namespace mongo { rsOplogDetails = nsdetails(logns); massert(13389, "local.oplog.rs missing. did you drop it? if so restart server", rsOplogDetails); } - Client::Context ctx( "" , localDB, false ); + Client::Context ctx( logns , localDB, false ); { int len = op.objsize(); Record *r = theDataFileMgr.fast_oplog_insert(rsOplogDetails, logns, len); - memcpy(r->data, op.objdata(), len); + memcpy(getDur().writingPtr(r->data, len), op.objdata(), len); } /* todo: now() has code to handle clock skew. but if the skew server to server is large it will get unhappy. this code (or code in now() maybe) should be improved. @@ -82,11 +83,42 @@ namespace mongo { } } + /** given a BSON object, create a new one at dst which is the existing (partial) object + with a new object element appended at the end with fieldname "o". + + @param partial already build object with everything except the o member. e.g. something like: + { ts:..., ns:..., os2:... } + @param o a bson object to be added with fieldname "o" + @dst where to put the newly built combined object. e.g. ends up as something like: + { ts:..., ns:..., os2:..., o:... } + */ + void append_O_Obj(char *dst, const BSONObj& partial, const BSONObj& o) { + const int size1 = partial.objsize() - 1; // less the EOO char + const int oOfs = size1+3; // 3 = byte BSONOBJTYPE + byte 'o' + byte \0 + + void *p = getDur().writingPtr(dst, oOfs+o.objsize()+1); + + memcpy(p, partial.objdata(), size1); + + // adjust overall bson object size for the o: field + *(static_cast<unsigned*>(p)) += o.objsize() + 1/*fieldtype byte*/ + 2/*"o" fieldname*/; + + char *b = static_cast<char *>(p); + b += size1; + *b++ = (char) Object; + *b++ = 'o'; // { o : ... } + *b++ = 0; // null terminate "o" fieldname + memcpy(b, o.objdata(), o.objsize()); + b += o.objsize(); + *b = EOO; + } + static void _logOpRS(const char *opstr, const char *ns, const char *logNS, const BSONObj& obj, BSONObj *o2, bool *bb ) { DEV assertInWriteLock(); + // ^- static is safe as we are in write lock static BufBuilder bufbuilder(8*1024); - - if ( strncmp(ns, "local.", 6) == 0 ){ + + if ( strncmp(ns, "local.", 6) == 0 ) { if ( strncmp(ns, "local.slaves", 12) == 0 ) resetSlaveCache(); return; @@ -94,15 +126,15 @@ namespace mongo { const OpTime ts = OpTime::now(); - long long hNew; - if( theReplSet ) { + long long hashNew; + if( theReplSet ) { massert(13312, "replSet error : logOp() but not primary?", theReplSet->box.getState().primary()); - hNew = (theReplSet->lastH * 131 + ts.asLL()) * 17 + theReplSet->selfId(); + hashNew = (theReplSet->lastH * 131 + ts.asLL()) * 17 + theReplSet->selfId(); } else { // must be initiation assert( *ns == 0 ); - hNew = 0; + hashNew = 0; } /* we jump through a bunch of hoops here to avoid copying the obj buffer twice -- @@ -113,7 +145,7 @@ namespace mongo { BSONObjBuilder b(bufbuilder); b.appendTimestamp("ts", ts.asDate()); - b.append("h", hNew); + b.append("h", hashNew); b.append("op", opstr); b.append("ns", ns); @@ -136,7 +168,7 @@ namespace mongo { rsOplogDetails = nsdetails(logns); massert(13347, "local.oplog.rs missing. did you drop it? if so restart server", rsOplogDetails); } - Client::Context ctx( "" , localDB, false ); + Client::Context ctx( logns , localDB, false ); r = theDataFileMgr.fast_oplog_insert(rsOplogDetails, logns, len); /* todo: now() has code to handle clock skew. but if the skew server to server is large it will get unhappy. this code (or code in now() maybe) should be improved. @@ -147,22 +179,13 @@ namespace mongo { log() << "replSet " << theReplSet->isPrimary() << rsLog; } theReplSet->lastOpTimeWritten = ts; - theReplSet->lastH = hNew; + theReplSet->lastH = hashNew; ctx.getClient()->setLastOp( ts.asDate() ); } } - char *p = r->data; - memcpy(p, partial.objdata(), posz); - *((unsigned *)p) += obj.objsize() + 1 + 2; - p += posz - 1; - *p++ = (char) Object; - *p++ = 'o'; - *p++ = 0; - memcpy(p, obj.objdata(), obj.objsize()); - p += obj.objsize(); - *p = EOO; - + append_O_Obj(r->data, partial, obj); + if ( logLevel >= 6 ) { BSONObj temp(r); log( 6 ) << "logOp:" << temp << endl; @@ -192,9 +215,9 @@ namespace mongo { static void _logOpOld(const char *opstr, const char *ns, const char *logNS, const BSONObj& obj, BSONObj *o2, bool *bb ) { DEV assertInWriteLock(); static BufBuilder bufbuilder(8*1024); - - if ( strncmp(ns, "local.", 6) == 0 ){ - if ( strncmp(ns, "local.slaves", 12) == 0 ){ + + if ( strncmp(ns, "local.", 6) == 0 ) { + if ( strncmp(ns, "local.slaves", 12) == 0 ) { resetSlaveCache(); } return; @@ -202,7 +225,7 @@ namespace mongo { const OpTime ts = OpTime::now(); Client::Context context; - + /* we jump through a bunch of hoops here to avoid copying the obj buffer twice -- instead we do a single copy to the destination position in the memory mapped file. */ @@ -216,9 +239,10 @@ namespace mongo { b.appendBool("b", *bb); if ( o2 ) b.append("o2", *o2); - BSONObj partial = b.done(); - int posz = partial.objsize(); - int len = posz + obj.objsize() + 1 + 2 /*o:*/; + BSONObj partial = b.done(); // partial is everything except the o:... part. + + int po_sz = partial.objsize(); + int len = po_sz + obj.objsize() + 1 + 2 /*o:*/; Record *r; if( logNS == 0 ) { @@ -230,25 +254,18 @@ namespace mongo { localOplogMainDetails = nsdetails(logNS); assert( localOplogMainDetails ); } - Client::Context ctx( "" , localDB, false ); + Client::Context ctx( logNS , localDB, false ); r = theDataFileMgr.fast_oplog_insert(localOplogMainDetails, logNS, len); - } else { + } + else { Client::Context ctx( logNS, dbpath, 0, false ); assert( nsdetails( logNS ) ); + // first we allocate the space, then we fill it below. r = theDataFileMgr.fast_oplog_insert( nsdetails( logNS ), logNS, len); } - char *p = r->data; - memcpy(p, partial.objdata(), posz); - *((unsigned *)p) += obj.objsize() + 1 + 2; - p += posz - 1; - *p++ = (char) Object; - *p++ = 'o'; - *p++ = 0; - memcpy(p, obj.objdata(), obj.objsize()); - p += obj.objsize(); - *p = EOO; - + append_O_Obj(r->data, partial, obj); + context.getClient()->setLastOp( ts.asDate() ); if ( logLevel >= 6 ) { @@ -259,17 +276,17 @@ namespace mongo { } static void (*_logOp)(const char *opstr, const char *ns, const char *logNS, const BSONObj& obj, BSONObj *o2, bool *bb ) = _logOpOld; - void newReplUp() { + void newReplUp() { replSettings.master = true; - _logOp = _logOpRS; + _logOp = _logOpRS; } - void newRepl() { + void newRepl() { replSettings.master = true; - _logOp = _logOpUninitialized; + _logOp = _logOpUninitialized; } void oldRepl() { _logOp = _logOpOld; } - void logKeepalive() { + void logKeepalive() { _logOp("n", "", 0, BSONObj(), 0, 0); } void logOpComment(const BSONObj& obj) { @@ -289,13 +306,10 @@ namespace mongo { void logOp(const char *opstr, const char *ns, const BSONObj& obj, BSONObj *patt, bool *b) { if ( replSettings.master ) { _logOp(opstr, ns, 0, obj, patt, b); - // why? : - //char cl[ 256 ]; - //nsToDatabase( ns, cl ); } - + logOpForSharding( opstr , ns , obj , patt ); - } + } void createOplog() { dblock lk; @@ -307,15 +321,15 @@ namespace mongo { ns = rsoplog; Client::Context ctx(ns); - + NamespaceDetails * nsd = nsdetails( ns ); if ( nsd ) { - - if ( cmdLine.oplogSize != 0 ){ + + if ( cmdLine.oplogSize != 0 ) { int o = (int)(nsd->storageSize() / ( 1024 * 1024 ) ); int n = (int)(cmdLine.oplogSize / ( 1024 * 1024 ) ); - if ( n != o ){ + if ( n != o ) { stringstream ss; ss << "cmdline oplogsize (" << n << ") different than existing (" << o << ") see: http://dochub.mongodb.org/core/increase-oplog"; log() << ss.str() << endl; @@ -332,19 +346,19 @@ namespace mongo { } return; } - + /* create an oplog collection, if it doesn't yet exist. */ BSONObjBuilder b; double sz; if ( cmdLine.oplogSize != 0 ) sz = (double)cmdLine.oplogSize; else { - /* not specified. pick a default size */ + /* not specified. pick a default size */ sz = 50.0 * 1000 * 1000; if ( sizeof(int *) >= 8 ) { #if defined(__APPLE__) - // typically these are desktops (dev machines), so keep it smallish - sz = (256-64) * 1000 * 1000; + // typically these are desktops (dev machines), so keep it smallish + sz = (256-64) * 1000 * 1000; #else sz = 990.0 * 1000 * 1000; boost::intmax_t free = freeSpace(); //-1 if call not supported. @@ -356,7 +370,7 @@ namespace mongo { } log() << "******" << endl; - log() << "creating replication oplog of size: " << (int)( sz / ( 1024 * 1024 ) ) << "MB... (use --oplogSize to change)" << endl; + log() << "creating replication oplog of size: " << (int)( sz / ( 1024 * 1024 ) ) << "MB..." << endl; b.append("size", sz); b.appendBool("capped", 1); @@ -366,7 +380,7 @@ namespace mongo { BSONObj o = b.done(); userCreateNS(ns, o, err, false); if( !rs ) - logOp( "n", "dummy", BSONObj() ); + logOp( "n", "", BSONObj() ); /* sync here so we don't get any surprising lag later when we try to sync */ MemoryMappedFile::flushAll(true); @@ -394,8 +408,8 @@ namespace mongo { void pretouchN(vector<BSONObj>& v, unsigned a, unsigned b) { DEV assert( !dbMutex.isWriteLocked() ); - Client *c = &cc(); - if( c == 0 ) { + Client *c = currentClient.get(); + if( c == 0 ) { Client::initThread("pretouchN"); c = &cc(); } @@ -413,7 +427,7 @@ namespace mongo { continue; /* todo : other operations */ - try { + try { BSONObj o = op.getObjectField(which); BSONElement _id; if( o.getObjectID(_id) ) { @@ -426,7 +440,7 @@ namespace mongo { _dummy_z += result.objsize(); // touch } } - catch( DBException& e ) { + catch( DBException& e ) { log() << "ignoring assertion in pretouchN() " << a << ' ' << b << ' ' << i << ' ' << e.toString() << endl; } } @@ -447,7 +461,7 @@ namespace mongo { return; /* todo : other operations */ - try { + try { BSONObj o = op.getObjectField(which); BSONElement _id; if( o.getObjectID(_id) ) { @@ -461,15 +475,17 @@ namespace mongo { _dummy_z += result.objsize(); // touch } } - catch( DBException& ) { + catch( DBException& ) { log() << "ignoring assertion in pretouchOperation()" << endl; } } - void applyOperation_inlock(const BSONObj& op){ - if( logLevel >= 6 ) + void applyOperation_inlock(const BSONObj& op , bool fromRepl ) { + OpCounters * opCounters = fromRepl ? &replOpCounters : &globalOpCounters; + + if( logLevel >= 6 ) log() << "applying op: " << op << endl; - + assertInWriteLock(); OpDebug debug; @@ -479,6 +495,8 @@ namespace mongo { const char *opType = op.getStringField("op"); if ( *opType == 'i' ) { + opCounters->gotInsert(); + const char *p = strchr(ns, '.'); if ( p && strcmp(p, ".system.indexes") == 0 ) { // updates aren't allowed for indexes -- so we will do a regular insert. if index already @@ -499,11 +517,11 @@ namespace mongo { else { BSONObjBuilder b; b.append(_id); - + /* erh 10/16/2009 - this is probably not relevant any more since its auto-created, but not worth removing */ - RARELY ensureHaveIdIndex(ns); // otherwise updates will be slow + RARELY ensureHaveIdIndex(ns); // otherwise updates will be slow - /* todo : it may be better to do an insert here, and then catch the dup key exception and do update + /* todo : it may be better to do an insert here, and then catch the dup key exception and do update then. very few upserts will not be inserts... */ updateObjects(ns, o, b.done(), true, false, false , debug ); @@ -511,10 +529,14 @@ namespace mongo { } } else if ( *opType == 'u' ) { + opCounters->gotUpdate(); + RARELY ensureHaveIdIndex(ns); // otherwise updates will be super slow updateObjects(ns, o, op.getObjectField("o2"), /*upsert*/ op.getBoolField("b"), /*multi*/ false, /*logop*/ false , debug ); } else if ( *opType == 'd' ) { + opCounters->gotDelete(); + if ( opType[1] == 0 ) deleteObjects(ns, o, op.getBoolField("b")); else @@ -523,7 +545,9 @@ namespace mongo { else if ( *opType == 'n' ) { // no op } - else if ( *opType == 'c' ){ + else if ( *opType == 'c' ) { + opCounters->gotCommand(); + BufBuilder bb; BSONObjBuilder ob; _runCommands(ns, o, bb, ob, true, 0); @@ -533,9 +557,9 @@ namespace mongo { ss << "unknown opType [" << opType << "]"; throw MsgAssertionException( 13141 , ss.str() ); } - + } - + class ApplyOpsCmd : public Command { public: virtual bool slaveOk() const { return false; } @@ -545,17 +569,18 @@ namespace mongo { help << "examples: { applyOps : [ ] , preCondition : [ { ns : ... , q : ... , res : ... } ] }"; } virtual bool run(const string& dbname, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl) { - - if ( cmdObj.firstElement().type() != Array ){ + + if ( cmdObj.firstElement().type() != Array ) { errmsg = "ops has to be an array"; return false; } - + BSONObj ops = cmdObj.firstElement().Obj(); - - { // check input + + { + // check input BSONObjIterator i( ops ); - while ( i.more() ){ + while ( i.more() ) { BSONElement e = i.next(); if ( e.type() == Object ) continue; @@ -564,16 +589,16 @@ namespace mongo { return false; } } - - if ( cmdObj["preCondition"].type() == Array ){ + + if ( cmdObj["preCondition"].type() == Array ) { BSONObjIterator i( cmdObj["preCondition"].Obj() ); - while ( i.more() ){ + while ( i.more() ) { BSONObj f = i.next().Obj(); - + BSONObj realres = db.findOne( f["ns"].String() , f["q"].Obj() ); - + Matcher m( f["res"].Obj() ); - if ( ! m.matches( realres ) ){ + if ( ! m.matches( realres ) ) { result.append( "got" , realres ); result.append( "whatFailed" , f ); errmsg = "pre-condition failed"; @@ -581,23 +606,32 @@ namespace mongo { } } } - + // apply int num = 0; BSONObjIterator i( ops ); - while ( i.more() ){ + while ( i.more() ) { BSONElement e = i.next(); - applyOperation_inlock( e.Obj() ); + applyOperation_inlock( e.Obj() , false ); num++; } result.append( "applied" , num ); + if ( ! fromRepl ) { + // We want this applied atomically on slaves + // so we re-wrap without the pre-condition for speed + + string tempNS = str::stream() << dbname << ".$cmd"; + + logOp( "c" , tempNS.c_str() , cmdObj.firstElement().wrap() ); + } + return true; } DBDirectClient db; - + } applyOpsCmd; } |