summaryrefslogtreecommitdiff
path: root/db/oplog.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'db/oplog.cpp')
-rw-r--r--db/oplog.cpp228
1 files changed, 131 insertions, 97 deletions
diff --git a/db/oplog.cpp b/db/oplog.cpp
index 93800c7..1557cbd 100644
--- a/db/oplog.cpp
+++ b/db/oplog.cpp
@@ -22,18 +22,19 @@
#include "repl.h"
#include "commands.h"
#include "repl/rs.h"
+#include "stats/counters.h"
namespace mongo {
void logOpForSharding( const char * opstr , const char * ns , const BSONObj& obj , BSONObj * patt );
- int __findingStartInitialTimeout = 5; // configurable for testing
+ int __findingStartInitialTimeout = 5; // configurable for testing
// cached copies of these...so don't rename them, drop them, etc.!!!
static NamespaceDetails *localOplogMainDetails = 0;
static Database *localDB = 0;
static NamespaceDetails *rsOplogDetails = 0;
- void oplogCheckCloseDatabase( Database * db ){
+ void oplogCheckCloseDatabase( Database * db ) {
localDB = 0;
localOplogMainDetails = 0;
rsOplogDetails = 0;
@@ -44,10 +45,10 @@ namespace mongo {
uassert(13288, "replSet error write op to db before replSet initialized", str::startsWith(ns, "local.") || *opstr == 'n');
}
- /** write an op to the oplog that is already built.
+ /** write an op to the oplog that is already built.
todo : make _logOpRS() call this so we don't repeat ourself?
*/
- void _logOpObjRS(const BSONObj& op) {
+ void _logOpObjRS(const BSONObj& op) {
DEV assertInWriteLock();
const OpTime ts = op["ts"]._opTime();
@@ -62,11 +63,11 @@ namespace mongo {
rsOplogDetails = nsdetails(logns);
massert(13389, "local.oplog.rs missing. did you drop it? if so restart server", rsOplogDetails);
}
- Client::Context ctx( "" , localDB, false );
+ Client::Context ctx( logns , localDB, false );
{
int len = op.objsize();
Record *r = theDataFileMgr.fast_oplog_insert(rsOplogDetails, logns, len);
- memcpy(r->data, op.objdata(), len);
+ memcpy(getDur().writingPtr(r->data, len), op.objdata(), len);
}
/* todo: now() has code to handle clock skew. but if the skew server to server is large it will get unhappy.
this code (or code in now() maybe) should be improved.
@@ -82,11 +83,42 @@ namespace mongo {
}
}
+ /** given a BSON object, create a new one at dst which is the existing (partial) object
+ with a new object element appended at the end with fieldname "o".
+
+ @param partial already build object with everything except the o member. e.g. something like:
+ { ts:..., ns:..., os2:... }
+ @param o a bson object to be added with fieldname "o"
+ @dst where to put the newly built combined object. e.g. ends up as something like:
+ { ts:..., ns:..., os2:..., o:... }
+ */
+ void append_O_Obj(char *dst, const BSONObj& partial, const BSONObj& o) {
+ const int size1 = partial.objsize() - 1; // less the EOO char
+ const int oOfs = size1+3; // 3 = byte BSONOBJTYPE + byte 'o' + byte \0
+
+ void *p = getDur().writingPtr(dst, oOfs+o.objsize()+1);
+
+ memcpy(p, partial.objdata(), size1);
+
+ // adjust overall bson object size for the o: field
+ *(static_cast<unsigned*>(p)) += o.objsize() + 1/*fieldtype byte*/ + 2/*"o" fieldname*/;
+
+ char *b = static_cast<char *>(p);
+ b += size1;
+ *b++ = (char) Object;
+ *b++ = 'o'; // { o : ... }
+ *b++ = 0; // null terminate "o" fieldname
+ memcpy(b, o.objdata(), o.objsize());
+ b += o.objsize();
+ *b = EOO;
+ }
+
static void _logOpRS(const char *opstr, const char *ns, const char *logNS, const BSONObj& obj, BSONObj *o2, bool *bb ) {
DEV assertInWriteLock();
+ // ^- static is safe as we are in write lock
static BufBuilder bufbuilder(8*1024);
-
- if ( strncmp(ns, "local.", 6) == 0 ){
+
+ if ( strncmp(ns, "local.", 6) == 0 ) {
if ( strncmp(ns, "local.slaves", 12) == 0 )
resetSlaveCache();
return;
@@ -94,15 +126,15 @@ namespace mongo {
const OpTime ts = OpTime::now();
- long long hNew;
- if( theReplSet ) {
+ long long hashNew;
+ if( theReplSet ) {
massert(13312, "replSet error : logOp() but not primary?", theReplSet->box.getState().primary());
- hNew = (theReplSet->lastH * 131 + ts.asLL()) * 17 + theReplSet->selfId();
+ hashNew = (theReplSet->lastH * 131 + ts.asLL()) * 17 + theReplSet->selfId();
}
else {
// must be initiation
assert( *ns == 0 );
- hNew = 0;
+ hashNew = 0;
}
/* we jump through a bunch of hoops here to avoid copying the obj buffer twice --
@@ -113,7 +145,7 @@ namespace mongo {
BSONObjBuilder b(bufbuilder);
b.appendTimestamp("ts", ts.asDate());
- b.append("h", hNew);
+ b.append("h", hashNew);
b.append("op", opstr);
b.append("ns", ns);
@@ -136,7 +168,7 @@ namespace mongo {
rsOplogDetails = nsdetails(logns);
massert(13347, "local.oplog.rs missing. did you drop it? if so restart server", rsOplogDetails);
}
- Client::Context ctx( "" , localDB, false );
+ Client::Context ctx( logns , localDB, false );
r = theDataFileMgr.fast_oplog_insert(rsOplogDetails, logns, len);
/* todo: now() has code to handle clock skew. but if the skew server to server is large it will get unhappy.
this code (or code in now() maybe) should be improved.
@@ -147,22 +179,13 @@ namespace mongo {
log() << "replSet " << theReplSet->isPrimary() << rsLog;
}
theReplSet->lastOpTimeWritten = ts;
- theReplSet->lastH = hNew;
+ theReplSet->lastH = hashNew;
ctx.getClient()->setLastOp( ts.asDate() );
}
}
- char *p = r->data;
- memcpy(p, partial.objdata(), posz);
- *((unsigned *)p) += obj.objsize() + 1 + 2;
- p += posz - 1;
- *p++ = (char) Object;
- *p++ = 'o';
- *p++ = 0;
- memcpy(p, obj.objdata(), obj.objsize());
- p += obj.objsize();
- *p = EOO;
-
+ append_O_Obj(r->data, partial, obj);
+
if ( logLevel >= 6 ) {
BSONObj temp(r);
log( 6 ) << "logOp:" << temp << endl;
@@ -192,9 +215,9 @@ namespace mongo {
static void _logOpOld(const char *opstr, const char *ns, const char *logNS, const BSONObj& obj, BSONObj *o2, bool *bb ) {
DEV assertInWriteLock();
static BufBuilder bufbuilder(8*1024);
-
- if ( strncmp(ns, "local.", 6) == 0 ){
- if ( strncmp(ns, "local.slaves", 12) == 0 ){
+
+ if ( strncmp(ns, "local.", 6) == 0 ) {
+ if ( strncmp(ns, "local.slaves", 12) == 0 ) {
resetSlaveCache();
}
return;
@@ -202,7 +225,7 @@ namespace mongo {
const OpTime ts = OpTime::now();
Client::Context context;
-
+
/* we jump through a bunch of hoops here to avoid copying the obj buffer twice --
instead we do a single copy to the destination position in the memory mapped file.
*/
@@ -216,9 +239,10 @@ namespace mongo {
b.appendBool("b", *bb);
if ( o2 )
b.append("o2", *o2);
- BSONObj partial = b.done();
- int posz = partial.objsize();
- int len = posz + obj.objsize() + 1 + 2 /*o:*/;
+ BSONObj partial = b.done(); // partial is everything except the o:... part.
+
+ int po_sz = partial.objsize();
+ int len = po_sz + obj.objsize() + 1 + 2 /*o:*/;
Record *r;
if( logNS == 0 ) {
@@ -230,25 +254,18 @@ namespace mongo {
localOplogMainDetails = nsdetails(logNS);
assert( localOplogMainDetails );
}
- Client::Context ctx( "" , localDB, false );
+ Client::Context ctx( logNS , localDB, false );
r = theDataFileMgr.fast_oplog_insert(localOplogMainDetails, logNS, len);
- } else {
+ }
+ else {
Client::Context ctx( logNS, dbpath, 0, false );
assert( nsdetails( logNS ) );
+ // first we allocate the space, then we fill it below.
r = theDataFileMgr.fast_oplog_insert( nsdetails( logNS ), logNS, len);
}
- char *p = r->data;
- memcpy(p, partial.objdata(), posz);
- *((unsigned *)p) += obj.objsize() + 1 + 2;
- p += posz - 1;
- *p++ = (char) Object;
- *p++ = 'o';
- *p++ = 0;
- memcpy(p, obj.objdata(), obj.objsize());
- p += obj.objsize();
- *p = EOO;
-
+ append_O_Obj(r->data, partial, obj);
+
context.getClient()->setLastOp( ts.asDate() );
if ( logLevel >= 6 ) {
@@ -259,17 +276,17 @@ namespace mongo {
}
static void (*_logOp)(const char *opstr, const char *ns, const char *logNS, const BSONObj& obj, BSONObj *o2, bool *bb ) = _logOpOld;
- void newReplUp() {
+ void newReplUp() {
replSettings.master = true;
- _logOp = _logOpRS;
+ _logOp = _logOpRS;
}
- void newRepl() {
+ void newRepl() {
replSettings.master = true;
- _logOp = _logOpUninitialized;
+ _logOp = _logOpUninitialized;
}
void oldRepl() { _logOp = _logOpOld; }
- void logKeepalive() {
+ void logKeepalive() {
_logOp("n", "", 0, BSONObj(), 0, 0);
}
void logOpComment(const BSONObj& obj) {
@@ -289,13 +306,10 @@ namespace mongo {
void logOp(const char *opstr, const char *ns, const BSONObj& obj, BSONObj *patt, bool *b) {
if ( replSettings.master ) {
_logOp(opstr, ns, 0, obj, patt, b);
- // why? :
- //char cl[ 256 ];
- //nsToDatabase( ns, cl );
}
-
+
logOpForSharding( opstr , ns , obj , patt );
- }
+ }
void createOplog() {
dblock lk;
@@ -307,15 +321,15 @@ namespace mongo {
ns = rsoplog;
Client::Context ctx(ns);
-
+
NamespaceDetails * nsd = nsdetails( ns );
if ( nsd ) {
-
- if ( cmdLine.oplogSize != 0 ){
+
+ if ( cmdLine.oplogSize != 0 ) {
int o = (int)(nsd->storageSize() / ( 1024 * 1024 ) );
int n = (int)(cmdLine.oplogSize / ( 1024 * 1024 ) );
- if ( n != o ){
+ if ( n != o ) {
stringstream ss;
ss << "cmdline oplogsize (" << n << ") different than existing (" << o << ") see: http://dochub.mongodb.org/core/increase-oplog";
log() << ss.str() << endl;
@@ -332,19 +346,19 @@ namespace mongo {
}
return;
}
-
+
/* create an oplog collection, if it doesn't yet exist. */
BSONObjBuilder b;
double sz;
if ( cmdLine.oplogSize != 0 )
sz = (double)cmdLine.oplogSize;
else {
- /* not specified. pick a default size */
+ /* not specified. pick a default size */
sz = 50.0 * 1000 * 1000;
if ( sizeof(int *) >= 8 ) {
#if defined(__APPLE__)
- // typically these are desktops (dev machines), so keep it smallish
- sz = (256-64) * 1000 * 1000;
+ // typically these are desktops (dev machines), so keep it smallish
+ sz = (256-64) * 1000 * 1000;
#else
sz = 990.0 * 1000 * 1000;
boost::intmax_t free = freeSpace(); //-1 if call not supported.
@@ -356,7 +370,7 @@ namespace mongo {
}
log() << "******" << endl;
- log() << "creating replication oplog of size: " << (int)( sz / ( 1024 * 1024 ) ) << "MB... (use --oplogSize to change)" << endl;
+ log() << "creating replication oplog of size: " << (int)( sz / ( 1024 * 1024 ) ) << "MB..." << endl;
b.append("size", sz);
b.appendBool("capped", 1);
@@ -366,7 +380,7 @@ namespace mongo {
BSONObj o = b.done();
userCreateNS(ns, o, err, false);
if( !rs )
- logOp( "n", "dummy", BSONObj() );
+ logOp( "n", "", BSONObj() );
/* sync here so we don't get any surprising lag later when we try to sync */
MemoryMappedFile::flushAll(true);
@@ -394,8 +408,8 @@ namespace mongo {
void pretouchN(vector<BSONObj>& v, unsigned a, unsigned b) {
DEV assert( !dbMutex.isWriteLocked() );
- Client *c = &cc();
- if( c == 0 ) {
+ Client *c = currentClient.get();
+ if( c == 0 ) {
Client::initThread("pretouchN");
c = &cc();
}
@@ -413,7 +427,7 @@ namespace mongo {
continue;
/* todo : other operations */
- try {
+ try {
BSONObj o = op.getObjectField(which);
BSONElement _id;
if( o.getObjectID(_id) ) {
@@ -426,7 +440,7 @@ namespace mongo {
_dummy_z += result.objsize(); // touch
}
}
- catch( DBException& e ) {
+ catch( DBException& e ) {
log() << "ignoring assertion in pretouchN() " << a << ' ' << b << ' ' << i << ' ' << e.toString() << endl;
}
}
@@ -447,7 +461,7 @@ namespace mongo {
return;
/* todo : other operations */
- try {
+ try {
BSONObj o = op.getObjectField(which);
BSONElement _id;
if( o.getObjectID(_id) ) {
@@ -461,15 +475,17 @@ namespace mongo {
_dummy_z += result.objsize(); // touch
}
}
- catch( DBException& ) {
+ catch( DBException& ) {
log() << "ignoring assertion in pretouchOperation()" << endl;
}
}
- void applyOperation_inlock(const BSONObj& op){
- if( logLevel >= 6 )
+ void applyOperation_inlock(const BSONObj& op , bool fromRepl ) {
+ OpCounters * opCounters = fromRepl ? &replOpCounters : &globalOpCounters;
+
+ if( logLevel >= 6 )
log() << "applying op: " << op << endl;
-
+
assertInWriteLock();
OpDebug debug;
@@ -479,6 +495,8 @@ namespace mongo {
const char *opType = op.getStringField("op");
if ( *opType == 'i' ) {
+ opCounters->gotInsert();
+
const char *p = strchr(ns, '.');
if ( p && strcmp(p, ".system.indexes") == 0 ) {
// updates aren't allowed for indexes -- so we will do a regular insert. if index already
@@ -499,11 +517,11 @@ namespace mongo {
else {
BSONObjBuilder b;
b.append(_id);
-
+
/* erh 10/16/2009 - this is probably not relevant any more since its auto-created, but not worth removing */
- RARELY ensureHaveIdIndex(ns); // otherwise updates will be slow
+ RARELY ensureHaveIdIndex(ns); // otherwise updates will be slow
- /* todo : it may be better to do an insert here, and then catch the dup key exception and do update
+ /* todo : it may be better to do an insert here, and then catch the dup key exception and do update
then. very few upserts will not be inserts...
*/
updateObjects(ns, o, b.done(), true, false, false , debug );
@@ -511,10 +529,14 @@ namespace mongo {
}
}
else if ( *opType == 'u' ) {
+ opCounters->gotUpdate();
+
RARELY ensureHaveIdIndex(ns); // otherwise updates will be super slow
updateObjects(ns, o, op.getObjectField("o2"), /*upsert*/ op.getBoolField("b"), /*multi*/ false, /*logop*/ false , debug );
}
else if ( *opType == 'd' ) {
+ opCounters->gotDelete();
+
if ( opType[1] == 0 )
deleteObjects(ns, o, op.getBoolField("b"));
else
@@ -523,7 +545,9 @@ namespace mongo {
else if ( *opType == 'n' ) {
// no op
}
- else if ( *opType == 'c' ){
+ else if ( *opType == 'c' ) {
+ opCounters->gotCommand();
+
BufBuilder bb;
BSONObjBuilder ob;
_runCommands(ns, o, bb, ob, true, 0);
@@ -533,9 +557,9 @@ namespace mongo {
ss << "unknown opType [" << opType << "]";
throw MsgAssertionException( 13141 , ss.str() );
}
-
+
}
-
+
class ApplyOpsCmd : public Command {
public:
virtual bool slaveOk() const { return false; }
@@ -545,17 +569,18 @@ namespace mongo {
help << "examples: { applyOps : [ ] , preCondition : [ { ns : ... , q : ... , res : ... } ] }";
}
virtual bool run(const string& dbname, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl) {
-
- if ( cmdObj.firstElement().type() != Array ){
+
+ if ( cmdObj.firstElement().type() != Array ) {
errmsg = "ops has to be an array";
return false;
}
-
+
BSONObj ops = cmdObj.firstElement().Obj();
-
- { // check input
+
+ {
+ // check input
BSONObjIterator i( ops );
- while ( i.more() ){
+ while ( i.more() ) {
BSONElement e = i.next();
if ( e.type() == Object )
continue;
@@ -564,16 +589,16 @@ namespace mongo {
return false;
}
}
-
- if ( cmdObj["preCondition"].type() == Array ){
+
+ if ( cmdObj["preCondition"].type() == Array ) {
BSONObjIterator i( cmdObj["preCondition"].Obj() );
- while ( i.more() ){
+ while ( i.more() ) {
BSONObj f = i.next().Obj();
-
+
BSONObj realres = db.findOne( f["ns"].String() , f["q"].Obj() );
-
+
Matcher m( f["res"].Obj() );
- if ( ! m.matches( realres ) ){
+ if ( ! m.matches( realres ) ) {
result.append( "got" , realres );
result.append( "whatFailed" , f );
errmsg = "pre-condition failed";
@@ -581,23 +606,32 @@ namespace mongo {
}
}
}
-
+
// apply
int num = 0;
BSONObjIterator i( ops );
- while ( i.more() ){
+ while ( i.more() ) {
BSONElement e = i.next();
- applyOperation_inlock( e.Obj() );
+ applyOperation_inlock( e.Obj() , false );
num++;
}
result.append( "applied" , num );
+ if ( ! fromRepl ) {
+ // We want this applied atomically on slaves
+ // so we re-wrap without the pre-condition for speed
+
+ string tempNS = str::stream() << dbname << ".$cmd";
+
+ logOp( "c" , tempNS.c_str() , cmdObj.firstElement().wrap() );
+ }
+
return true;
}
DBDirectClient db;
-
+
} applyOpsCmd;
}