diff options
Diffstat (limited to 'db/repl/rs_rollback.cpp')
-rw-r--r-- | db/repl/rs_rollback.cpp | 661 |
1 files changed, 333 insertions, 328 deletions
diff --git a/db/repl/rs_rollback.cpp b/db/repl/rs_rollback.cpp index 6b2544c..0b4cc28 100644 --- a/db/repl/rs_rollback.cpp +++ b/db/repl/rs_rollback.cpp @@ -1,5 +1,5 @@ /* @file rs_rollback.cpp -* +* * Copyright (C) 2008 10gen Inc. * * This program is free software: you can redistribute it and/or modify @@ -25,7 +25,7 @@ /* Scenarios We went offline with ops not replicated out. - + F = node that failed and coming back. P = node that took over, new primary @@ -33,11 +33,11 @@ F : a b c d e f g P : a b c d q - The design is "keep P". One could argue here that "keep F" has some merits, however, in most cases P - will have significantly more data. Also note that P may have a proper subset of F's stream if there were + The design is "keep P". One could argue here that "keep F" has some merits, however, in most cases P + will have significantly more data. Also note that P may have a proper subset of F's stream if there were no subsequent writes. - For now the model is simply : get F back in sync with P. If P was really behind or something, we should have + For now the model is simply : get F back in sync with P. If P was really behind or something, we should have just chosen not to fail over anyway. #2: @@ -50,9 +50,9 @@ Steps find an event in common. 'd'. - undo our events beyond that by: + undo our events beyond that by: (1) taking copy from other server of those objects - (2) do not consider copy valid until we pass reach an optime after when we fetched the new version of object + (2) do not consider copy valid until we pass reach an optime after when we fetched the new version of object -- i.e., reset minvalid. (3) we could skip operations on objects that are previous in time to our capture of the object as an optimization. @@ -65,15 +65,15 @@ namespace mongo { bool copyCollectionFromRemote(const string& host, const string& ns, const BSONObj& query, string& errmsg, bool logforrepl); void incRBID(); - class rsfatal : public std::exception { + class rsfatal : public std::exception { public: - virtual const char* what() const throw(){ return "replica set fatal exception"; } + virtual const char* what() const throw() { return "replica set fatal exception"; } }; struct DocID { const char *ns; be _id; - bool operator<(const DocID& d) const { + bool operator<(const DocID& d) const { int c = strcmp(ns, d.ns); if( c < 0 ) return true; if( c > 0 ) return false; @@ -82,7 +82,7 @@ namespace mongo { }; struct HowToFixUp { - /* note this is a set -- if there are many $inc's on a single document we need to rollback, we only + /* note this is a set -- if there are many $inc's on a single document we need to rollback, we only need to refetch it once. */ set<DocID> toRefetch; @@ -97,9 +97,9 @@ namespace mongo { int rbid; // remote server's current rollback sequence # }; - static void refetch(HowToFixUp& h, const BSONObj& ourObj) { + static void refetch(HowToFixUp& h, const BSONObj& ourObj) { const char *op = ourObj.getStringField("op"); - if( *op == 'n' ) + if( *op == 'n' ) return; unsigned long long totSize = 0; @@ -108,53 +108,54 @@ namespace mongo { throw "rollback too large"; DocID d; + // NOTE The assigned ns value may become invalid if we yield. d.ns = ourObj.getStringField("ns"); - if( *d.ns == 0 ) { + if( *d.ns == 0 ) { log() << "replSet WARNING ignoring op on rollback no ns TODO : " << ourObj.toString() << rsLog; return; } bo o = ourObj.getObjectField(*op=='u' ? "o2" : "o"); - if( o.isEmpty() ) { + if( o.isEmpty() ) { log() << "replSet warning ignoring op on rollback : " << ourObj.toString() << rsLog; return; } - if( *op == 'c' ) { + if( *op == 'c' ) { be first = o.firstElement(); NamespaceString s(d.ns); // foo.$cmd string cmdname = first.fieldName(); Command *cmd = Command::findCommand(cmdname.c_str()); - if( cmd == 0 ) { + if( cmd == 0 ) { log() << "replSet warning rollback no suchcommand " << first.fieldName() << " - different mongod versions perhaps?" << rsLog; return; } else { /* findandmodify - tranlated? - godinsert?, + godinsert?, renamecollection a->b. just resync a & b */ if( cmdname == "create" ) { - /* Create collection operation - { ts: ..., h: ..., op: "c", ns: "foo.$cmd", o: { create: "abc", ... } }
+ /* Create collection operation + { ts: ..., h: ..., op: "c", ns: "foo.$cmd", o: { create: "abc", ... } } */ string ns = s.db + '.' + o["create"].String(); // -> foo.abc h.toDrop.insert(ns); return; } - else if( cmdname == "drop" ) { + else if( cmdname == "drop" ) { string ns = s.db + '.' + first.valuestr(); h.collectionsToResync.insert(ns); return; } - else if( cmdname == "dropIndexes" || cmdname == "deleteIndexes" ) { + else if( cmdname == "dropIndexes" || cmdname == "deleteIndexes" ) { /* TODO: this is bad. we simply full resync the collection here, which could be very slow. */ log() << "replSet info rollback of dropIndexes is slow in this version of mongod" << rsLog; string ns = s.db + '.' + first.valuestr(); h.collectionsToResync.insert(ns); return; } - else if( cmdname == "renameCollection" ) { + else if( cmdname == "renameCollection" ) { /* TODO: slow. */ log() << "replSet info rollback of renameCollection is slow in this version of mongod" << rsLog; string from = first.valuestr(); @@ -163,15 +164,15 @@ namespace mongo { h.collectionsToResync.insert(to); return; } - else if( cmdname == "reIndex" ) { + else if( cmdname == "reIndex" ) { return; } - else if( cmdname == "dropDatabase" ) { + else if( cmdname == "dropDatabase" ) { log() << "replSet error rollback : can't rollback drop database full resync will be required" << rsLog; log() << "replSet " << o.toString() << rsLog; throw rsfatal(); } - else { + else { log() << "replSet error can't rollback this command yet: " << o.toString() << rsLog; log() << "replSet cmdname=" << cmdname << rsLog; throw rsfatal(); @@ -190,15 +191,15 @@ namespace mongo { int getRBID(DBClientConnection*); - static void syncRollbackFindCommonPoint(DBClientConnection *them, HowToFixUp& h) { + static void syncRollbackFindCommonPoint(DBClientConnection *them, HowToFixUp& h) { static time_t last; - if( time(0)-last < 60 ) { + if( time(0)-last < 60 ) { throw "findcommonpoint waiting a while before trying again"; } last = time(0); assert( dbMutex.atLeastReadLocked() ); - Client::Context c(rsoplog, dbpath, 0, false); + Client::Context c(rsoplog); NamespaceDetails *nsd = nsdetails(rsoplog); assert(nsd); ReverseCappedCursor u(nsd); @@ -226,7 +227,7 @@ namespace mongo { log() << "replSet info rollback our last optime: " << ourTime.toStringPretty() << rsLog; log() << "replSet info rollback their last optime: " << theirTime.toStringPretty() << rsLog; log() << "replSet info rollback diff in end of log times: " << diff << " seconds" << rsLog; - if( diff > 3600 ) { + if( diff > 3600 ) { log() << "replSet rollback too long a time period for a rollback." << rsLog; throw "error not willing to roll back more than one hour of data"; } @@ -236,8 +237,8 @@ namespace mongo { while( 1 ) { scanned++; /* todo add code to assure no excessive scanning for too long */ - if( ourTime == theirTime ) { - if( ourObj["h"].Long() == theirObj["h"].Long() ) { + if( ourTime == theirTime ) { + if( ourObj["h"].Long() == theirObj["h"].Long() ) { // found the point back in time where we match. // todo : check a few more just to be careful about hash collisions. log() << "replSet rollback found matching events at " << ourTime.toStringPretty() << rsLog; @@ -249,7 +250,7 @@ namespace mongo { refetch(h, ourObj); - if( !t->more() ) { + if( !t->more() ) { log() << "replSet rollback error RS100 reached beginning of remote oplog" << rsLog; log() << "replSet them: " << them->toString() << " scanned: " << scanned << rsLog; log() << "replSet theirTime: " << theirTime.toStringLong() << rsLog; @@ -270,8 +271,8 @@ namespace mongo { ourObj = u.current(); ourTime = ourObj["ts"]._opTime(); } - else if( theirTime > ourTime ) { - if( !t->more() ) { + else if( theirTime > ourTime ) { + if( !t->more() ) { log() << "replSet rollback error RS100 reached beginning of remote oplog" << rsLog; log() << "replSet them: " << them->toString() << " scanned: " << scanned << rsLog; log() << "replSet theirTime: " << theirTime.toStringLong() << rsLog; @@ -281,11 +282,11 @@ namespace mongo { theirObj = t->nextSafe(); theirTime = theirObj["ts"]._opTime(); } - else { + else { // theirTime < ourTime refetch(h, ourObj); u.advance(); - if( !u.ok() ) { + if( !u.ok() ) { log() << "replSet rollback error RS101 reached beginning of local oplog" << rsLog; log() << "replSet them: " << them->toString() << " scanned: " << scanned << rsLog; log() << "replSet theirTime: " << theirTime.toStringLong() << rsLog; @@ -298,299 +299,303 @@ namespace mongo { } } - struct X { + struct X { const bson::bo *op; bson::bo goodVersionOfObject; }; - static void setMinValid(bo newMinValid) { - try { - log() << "replSet minvalid=" << newMinValid["ts"]._opTime().toStringLong() << rsLog; - } - catch(...) { } - { - Helpers::putSingleton("local.replset.minvalid", newMinValid); - Client::Context cx( "local." ); - cx.db()->flushFiles(true); - } + static void setMinValid(bo newMinValid) { + try { + log() << "replSet minvalid=" << newMinValid["ts"]._opTime().toStringLong() << rsLog; + } + catch(...) { } + { + Helpers::putSingleton("local.replset.minvalid", newMinValid); + Client::Context cx( "local." ); + cx.db()->flushFiles(true); + } } void ReplSetImpl::syncFixUp(HowToFixUp& h, OplogReader& r) { - DBClientConnection *them = r.conn(); - - // fetch all first so we needn't handle interruption in a fancy way - - unsigned long long totSize = 0; - - list< pair<DocID,bo> > goodVersions; - - bo newMinValid; - - /* fetch all the goodVersions of each document from current primary */ - DocID d; - unsigned long long n = 0; - try { - for( set<DocID>::iterator i = h.toRefetch.begin(); i != h.toRefetch.end(); i++ ) { - d = *i; - - assert( !d._id.eoo() ); - - { - /* TODO : slow. lots of round trips. */ - n++; - bo good= them->findOne(d.ns, d._id.wrap()).getOwned(); - totSize += good.objsize(); - uassert( 13410, "replSet too much data to roll back", totSize < 300 * 1024 * 1024 ); - - // note good might be eoo, indicating we should delete it - goodVersions.push_back(pair<DocID,bo>(d,good)); - } - } - newMinValid = r.getLastOp(rsoplog); - if( newMinValid.isEmpty() ) { - sethbmsg("rollback error newMinValid empty?"); - return; - } - } - catch(DBException& e) { - sethbmsg(str::stream() << "rollback re-get objects: " << e.toString(),0); - log() << "rollback couldn't re-get ns:" << d.ns << " _id:" << d._id << ' ' << n << '/' << h.toRefetch.size() << rsLog; - throw e; - } - - MemoryMappedFile::flushAll(true); - - sethbmsg("rollback 3.5"); - if( h.rbid != getRBID(r.conn()) ) { - // our source rolled back itself. so the data we received isn't necessarily consistent. - sethbmsg("rollback rbid on source changed during rollback, cancelling this attempt"); - return; - } - - // update them - sethbmsg(str::stream() << "rollback 4 n:" << goodVersions.size()); - - bool warn = false; - - assert( !h.commonPointOurDiskloc.isNull() ); - - dbMutex.assertWriteLocked(); - - /* we have items we are writing that aren't from a point-in-time. thus best not to come online - until we get to that point in freshness. */ - setMinValid(newMinValid); - - /** any full collection resyncs required? */ - if( !h.collectionsToResync.empty() ) { - for( set<string>::iterator i = h.collectionsToResync.begin(); i != h.collectionsToResync.end(); i++ ) { - string ns = *i; - sethbmsg(str::stream() << "rollback 4.1 coll resync " << ns); - Client::Context c(*i, dbpath, 0, /*doauth*/false); - try { - bob res; - string errmsg; - dropCollection(ns, errmsg, res); - { - dbtemprelease r; - bool ok = copyCollectionFromRemote(them->getServerAddress(), ns, bo(), errmsg, false); - if( !ok ) { - log() << "replSet rollback error resyncing collection " << ns << ' ' << errmsg << rsLog; - throw "rollback error resyncing rollection [1]"; - } - } - } - catch(...) { - log() << "replset rollback error resyncing collection " << ns << rsLog; - throw "rollback error resyncing rollection [2]"; - } - } - - /* we did more reading from primary, so check it again for a rollback (which would mess us up), and - make minValid newer. - */ - sethbmsg("rollback 4.2"); - { - string err; - try { - newMinValid = r.getLastOp(rsoplog); - if( newMinValid.isEmpty() ) { - err = "can't get minvalid from primary"; - } else { - setMinValid(newMinValid); - } - } - catch(...) { - err = "can't get/set minvalid"; - } - if( h.rbid != getRBID(r.conn()) ) { - // our source rolled back itself. so the data we received isn't necessarily consistent. - // however, we've now done writes. thus we have a problem. - err += "rbid at primary changed during resync/rollback"; - } - if( !err.empty() ) { - log() << "replSet error rolling back : " << err << ". A full resync will be necessary." << rsLog; - /* todo: reset minvalid so that we are permanently in fatal state */ - /* todo: don't be fatal, but rather, get all the data first. */ - sethbmsg("rollback error"); - throw rsfatal(); - } - } - sethbmsg("rollback 4.3"); - } - - sethbmsg("rollback 4.6"); - /** drop collections to drop before doing individual fixups - that might make things faster below actually if there were subsequent inserts to rollback */ - for( set<string>::iterator i = h.toDrop.begin(); i != h.toDrop.end(); i++ ) { - Client::Context c(*i, dbpath, 0, /*doauth*/false); - try { - bob res; - string errmsg; - log(1) << "replSet rollback drop: " << *i << rsLog; - dropCollection(*i, errmsg, res); - } - catch(...) { - log() << "replset rollback error dropping collection " << *i << rsLog; - } - } - - sethbmsg("rollback 4.7"); - Client::Context c(rsoplog, dbpath, 0, /*doauth*/false); - NamespaceDetails *oplogDetails = nsdetails(rsoplog); - uassert(13423, str::stream() << "replSet error in rollback can't find " << rsoplog, oplogDetails); - - map<string,shared_ptr<RemoveSaver> > removeSavers; - - unsigned deletes = 0, updates = 0; - for( list<pair<DocID,bo> >::iterator i = goodVersions.begin(); i != goodVersions.end(); i++ ) { - const DocID& d = i->first; - bo pattern = d._id.wrap(); // { _id : ... } - try { - assert( d.ns && *d.ns ); - if( h.collectionsToResync.count(d.ns) ) { - /* we just synced this entire collection */ - continue; - } - - /* keep an archive of items rolled back */ - shared_ptr<RemoveSaver>& rs = removeSavers[d.ns]; - if ( ! rs ) - rs.reset( new RemoveSaver( "rollback" , "" , d.ns ) ); - - // todo: lots of overhead in context, this can be faster - Client::Context c(d.ns, dbpath, 0, /*doauth*/false); - if( i->second.isEmpty() ) { - // wasn't on the primary; delete. - /* TODO1.6 : can't delete from a capped collection. need to handle that here. */ - deletes++; - - NamespaceDetails *nsd = nsdetails(d.ns); - if( nsd ) { - if( nsd->capped ) { - /* can't delete from a capped collection - so we truncate instead. if this item must go, - so must all successors!!! */ - try { - /** todo: IIRC cappedTrunateAfter does not handle completely empty. todo. */ - // this will crazy slow if no _id index. - long long start = Listener::getElapsedTimeMillis(); - DiskLoc loc = Helpers::findOne(d.ns, pattern, false); - if( Listener::getElapsedTimeMillis() - start > 200 ) - log() << "replSet warning roll back slow no _id index for " << d.ns << " perhaps?" << rsLog; - //would be faster but requires index: DiskLoc loc = Helpers::findById(nsd, pattern); - if( !loc.isNull() ) { - try { - nsd->cappedTruncateAfter(d.ns, loc, true); - } - catch(DBException& e) { - if( e.getCode() == 13415 ) { - // hack: need to just make cappedTruncate do this... - nsd->emptyCappedCollection(d.ns); - } else { - throw; - } - } - } - } - catch(DBException& e) { - log() << "replSet error rolling back capped collection rec " << d.ns << ' ' << e.toString() << rsLog; - } - } - else { - try { - deletes++; - deleteObjects(d.ns, pattern, /*justone*/true, /*logop*/false, /*god*/true, rs.get() ); - } - catch(...) { - log() << "replSet error rollback delete failed ns:" << d.ns << rsLog; - } - } - // did we just empty the collection? if so let's check if it even exists on the source. - if( nsd->nrecords == 0 ) { - try { - string sys = cc().database()->name + ".system.namespaces"; - bo o = them->findOne(sys, QUERY("name"<<d.ns)); - if( o.isEmpty() ) { - // we should drop - try { - bob res; - string errmsg; - dropCollection(d.ns, errmsg, res); - } - catch(...) { - log() << "replset error rolling back collection " << d.ns << rsLog; - } - } - } - catch(DBException& ) { - /* this isn't *that* big a deal, but is bad. */ - log() << "replSet warning rollback error querying for existence of " << d.ns << " at the primary, ignoring" << rsLog; - } - } - } - } - else { - // todo faster... - OpDebug debug; - updates++; - _updateObjects(/*god*/true, d.ns, i->second, pattern, /*upsert=*/true, /*multi=*/false , /*logtheop=*/false , debug, rs.get() ); - } - } - catch(DBException& e) { - log() << "replSet exception in rollback ns:" << d.ns << ' ' << pattern.toString() << ' ' << e.toString() << " ndeletes:" << deletes << rsLog; - warn = true; - } - } - - removeSavers.clear(); // this effectively closes all of them - - sethbmsg(str::stream() << "rollback 5 d:" << deletes << " u:" << updates); - MemoryMappedFile::flushAll(true); - sethbmsg("rollback 6"); - - // clean up oplog - log(2) << "replSet rollback truncate oplog after " << h.commonPoint.toStringPretty() << rsLog; - // todo: fatal error if this throws? - oplogDetails->cappedTruncateAfter(rsoplog, h.commonPointOurDiskloc, false); - - /* reset cached lastoptimewritten and h value */ - loadLastOpTimeWritten(); - - sethbmsg("rollback 7"); - MemoryMappedFile::flushAll(true); - - // done - if( warn ) - sethbmsg("issues during syncRollback, see log"); - else - sethbmsg("rollback done"); - } - - void ReplSetImpl::syncRollback(OplogReader&r) { + DBClientConnection *them = r.conn(); + + // fetch all first so we needn't handle interruption in a fancy way + + unsigned long long totSize = 0; + + list< pair<DocID,bo> > goodVersions; + + bo newMinValid; + + /* fetch all the goodVersions of each document from current primary */ + DocID d; + unsigned long long n = 0; + try { + for( set<DocID>::iterator i = h.toRefetch.begin(); i != h.toRefetch.end(); i++ ) { + d = *i; + + assert( !d._id.eoo() ); + + { + /* TODO : slow. lots of round trips. */ + n++; + bo good= them->findOne(d.ns, d._id.wrap()).getOwned(); + totSize += good.objsize(); + uassert( 13410, "replSet too much data to roll back", totSize < 300 * 1024 * 1024 ); + + // note good might be eoo, indicating we should delete it + goodVersions.push_back(pair<DocID,bo>(d,good)); + } + } + newMinValid = r.getLastOp(rsoplog); + if( newMinValid.isEmpty() ) { + sethbmsg("rollback error newMinValid empty?"); + return; + } + } + catch(DBException& e) { + sethbmsg(str::stream() << "rollback re-get objects: " << e.toString(),0); + log() << "rollback couldn't re-get ns:" << d.ns << " _id:" << d._id << ' ' << n << '/' << h.toRefetch.size() << rsLog; + throw e; + } + + MemoryMappedFile::flushAll(true); + + sethbmsg("rollback 3.5"); + if( h.rbid != getRBID(r.conn()) ) { + // our source rolled back itself. so the data we received isn't necessarily consistent. + sethbmsg("rollback rbid on source changed during rollback, cancelling this attempt"); + return; + } + + // update them + sethbmsg(str::stream() << "rollback 4 n:" << goodVersions.size()); + + bool warn = false; + + assert( !h.commonPointOurDiskloc.isNull() ); + + dbMutex.assertWriteLocked(); + + /* we have items we are writing that aren't from a point-in-time. thus best not to come online + until we get to that point in freshness. */ + setMinValid(newMinValid); + + /** any full collection resyncs required? */ + if( !h.collectionsToResync.empty() ) { + for( set<string>::iterator i = h.collectionsToResync.begin(); i != h.collectionsToResync.end(); i++ ) { + string ns = *i; + sethbmsg(str::stream() << "rollback 4.1 coll resync " << ns); + Client::Context c(*i); + try { + bob res; + string errmsg; + dropCollection(ns, errmsg, res); + { + dbtemprelease r; + bool ok = copyCollectionFromRemote(them->getServerAddress(), ns, bo(), errmsg, false); + if( !ok ) { + log() << "replSet rollback error resyncing collection " << ns << ' ' << errmsg << rsLog; + throw "rollback error resyncing rollection [1]"; + } + } + } + catch(...) { + log() << "replset rollback error resyncing collection " << ns << rsLog; + throw "rollback error resyncing rollection [2]"; + } + } + + /* we did more reading from primary, so check it again for a rollback (which would mess us up), and + make minValid newer. + */ + sethbmsg("rollback 4.2"); + { + string err; + try { + newMinValid = r.getLastOp(rsoplog); + if( newMinValid.isEmpty() ) { + err = "can't get minvalid from primary"; + } + else { + setMinValid(newMinValid); + } + } + catch(...) { + err = "can't get/set minvalid"; + } + if( h.rbid != getRBID(r.conn()) ) { + // our source rolled back itself. so the data we received isn't necessarily consistent. + // however, we've now done writes. thus we have a problem. + err += "rbid at primary changed during resync/rollback"; + } + if( !err.empty() ) { + log() << "replSet error rolling back : " << err << ". A full resync will be necessary." << rsLog; + /* todo: reset minvalid so that we are permanently in fatal state */ + /* todo: don't be fatal, but rather, get all the data first. */ + sethbmsg("rollback error"); + throw rsfatal(); + } + } + sethbmsg("rollback 4.3"); + } + + sethbmsg("rollback 4.6"); + /** drop collections to drop before doing individual fixups - that might make things faster below actually if there were subsequent inserts to rollback */ + for( set<string>::iterator i = h.toDrop.begin(); i != h.toDrop.end(); i++ ) { + Client::Context c(*i); + try { + bob res; + string errmsg; + log(1) << "replSet rollback drop: " << *i << rsLog; + dropCollection(*i, errmsg, res); + } + catch(...) { + log() << "replset rollback error dropping collection " << *i << rsLog; + } + } + + sethbmsg("rollback 4.7"); + Client::Context c(rsoplog); + NamespaceDetails *oplogDetails = nsdetails(rsoplog); + uassert(13423, str::stream() << "replSet error in rollback can't find " << rsoplog, oplogDetails); + + map<string,shared_ptr<RemoveSaver> > removeSavers; + + unsigned deletes = 0, updates = 0; + for( list<pair<DocID,bo> >::iterator i = goodVersions.begin(); i != goodVersions.end(); i++ ) { + const DocID& d = i->first; + bo pattern = d._id.wrap(); // { _id : ... } + try { + assert( d.ns && *d.ns ); + if( h.collectionsToResync.count(d.ns) ) { + /* we just synced this entire collection */ + continue; + } + + getDur().commitIfNeeded(); + + /* keep an archive of items rolled back */ + shared_ptr<RemoveSaver>& rs = removeSavers[d.ns]; + if ( ! rs ) + rs.reset( new RemoveSaver( "rollback" , "" , d.ns ) ); + + // todo: lots of overhead in context, this can be faster + Client::Context c(d.ns); + if( i->second.isEmpty() ) { + // wasn't on the primary; delete. + /* TODO1.6 : can't delete from a capped collection. need to handle that here. */ + deletes++; + + NamespaceDetails *nsd = nsdetails(d.ns); + if( nsd ) { + if( nsd->capped ) { + /* can't delete from a capped collection - so we truncate instead. if this item must go, + so must all successors!!! */ + try { + /** todo: IIRC cappedTrunateAfter does not handle completely empty. todo. */ + // this will crazy slow if no _id index. + long long start = Listener::getElapsedTimeMillis(); + DiskLoc loc = Helpers::findOne(d.ns, pattern, false); + if( Listener::getElapsedTimeMillis() - start > 200 ) + log() << "replSet warning roll back slow no _id index for " << d.ns << " perhaps?" << rsLog; + //would be faster but requires index: DiskLoc loc = Helpers::findById(nsd, pattern); + if( !loc.isNull() ) { + try { + nsd->cappedTruncateAfter(d.ns, loc, true); + } + catch(DBException& e) { + if( e.getCode() == 13415 ) { + // hack: need to just make cappedTruncate do this... + nsd->emptyCappedCollection(d.ns); + } + else { + throw; + } + } + } + } + catch(DBException& e) { + log() << "replSet error rolling back capped collection rec " << d.ns << ' ' << e.toString() << rsLog; + } + } + else { + try { + deletes++; + deleteObjects(d.ns, pattern, /*justone*/true, /*logop*/false, /*god*/true, rs.get() ); + } + catch(...) { + log() << "replSet error rollback delete failed ns:" << d.ns << rsLog; + } + } + // did we just empty the collection? if so let's check if it even exists on the source. + if( nsd->stats.nrecords == 0 ) { + try { + string sys = cc().database()->name + ".system.namespaces"; + bo o = them->findOne(sys, QUERY("name"<<d.ns)); + if( o.isEmpty() ) { + // we should drop + try { + bob res; + string errmsg; + dropCollection(d.ns, errmsg, res); + } + catch(...) { + log() << "replset error rolling back collection " << d.ns << rsLog; + } + } + } + catch(DBException& ) { + /* this isn't *that* big a deal, but is bad. */ + log() << "replSet warning rollback error querying for existence of " << d.ns << " at the primary, ignoring" << rsLog; + } + } + } + } + else { + // todo faster... + OpDebug debug; + updates++; + _updateObjects(/*god*/true, d.ns, i->second, pattern, /*upsert=*/true, /*multi=*/false , /*logtheop=*/false , debug, rs.get() ); + } + } + catch(DBException& e) { + log() << "replSet exception in rollback ns:" << d.ns << ' ' << pattern.toString() << ' ' << e.toString() << " ndeletes:" << deletes << rsLog; + warn = true; + } + } + + removeSavers.clear(); // this effectively closes all of them + + sethbmsg(str::stream() << "rollback 5 d:" << deletes << " u:" << updates); + MemoryMappedFile::flushAll(true); + sethbmsg("rollback 6"); + + // clean up oplog + log(2) << "replSet rollback truncate oplog after " << h.commonPoint.toStringPretty() << rsLog; + // todo: fatal error if this throws? + oplogDetails->cappedTruncateAfter(rsoplog, h.commonPointOurDiskloc, false); + + /* reset cached lastoptimewritten and h value */ + loadLastOpTimeWritten(); + + sethbmsg("rollback 7"); + MemoryMappedFile::flushAll(true); + + // done + if( warn ) + sethbmsg("issues during syncRollback, see log"); + else + sethbmsg("rollback done"); + } + + void ReplSetImpl::syncRollback(OplogReader&r) { unsigned s = _syncRollback(r); - if( s ) + if( s ) sleepsecs(s); } - unsigned ReplSetImpl::_syncRollback(OplogReader&r) { + unsigned ReplSetImpl::_syncRollback(OplogReader&r) { assert( !lockedByMe() ); assert( !dbMutex.atLeastReadLocked() ); @@ -604,7 +609,7 @@ namespace mongo { if( box.getState().secondary() ) { /* by doing this, we will not service reads (return an error as we aren't in secondary staate. - that perhaps is moot becasue of the write lock above, but that write lock probably gets deferred + that perhaps is moot becasue of the write lock above, but that write lock probably gets deferred or removed or yielded later anyway. also, this is better for status reporting - we know what is happening. @@ -618,7 +623,7 @@ namespace mongo { r.resetCursor(); /*DBClientConnection us(false, 0, 0); string errmsg; - if( !us.connect(HostAndPort::me().toString(),errmsg) ) { + if( !us.connect(HostAndPort::me().toString(),errmsg) ) { sethbmsg("rollback connect to self failure" + errmsg); return; }*/ @@ -627,15 +632,15 @@ namespace mongo { try { syncRollbackFindCommonPoint(r.conn(), how); } - catch( const char *p ) { + catch( const char *p ) { sethbmsg(string("rollback 2 error ") + p); return 10; } - catch( rsfatal& ) { + catch( rsfatal& ) { _fatal(); return 2; } - catch( DBException& e ) { + catch( DBException& e ) { sethbmsg(string("rollback 2 exception ") + e.toString() + "; sleeping 1 min"); dbtemprelease r; sleepsecs(60); @@ -647,20 +652,20 @@ namespace mongo { { incRBID(); - try { + try { syncFixUp(how, r); } - catch( rsfatal& ) { + catch( rsfatal& ) { sethbmsg("rollback fixup error"); _fatal(); return 2; } - catch(...) { + catch(...) { incRBID(); throw; } incRBID(); - /* success - leave "ROLLBACK" state + /* success - leave "ROLLBACK" state can go to SECONDARY once minvalid is achieved */ box.change(MemberState::RS_RECOVERING, _self); |