diff options
Diffstat (limited to 'db/dur_recover.cpp')
-rw-r--r-- | db/dur_recover.cpp | 186 |
1 files changed, 132 insertions, 54 deletions
diff --git a/db/dur_recover.cpp b/db/dur_recover.cpp index 1480a59..3c9fee7 100644 --- a/db/dur_recover.cpp +++ b/db/dur_recover.cpp @@ -19,6 +19,7 @@ #include "pch.h" #include "dur.h" +#include "dur_stats.h" #include "dur_recover.h" #include "dur_journal.h" #include "dur_journalformat.h" @@ -26,13 +27,16 @@ #include "namespace.h" #include "../util/mongoutils/str.h" #include "../util/bufreader.h" +#include "../util/concurrency/race.h" #include "pdfile.h" #include "database.h" #include "db.h" #include "../util/unittest.h" +#include "../util/checksum.h" #include "cmdline.h" #include "curop.h" #include "mongommf.h" +#include "../util/compress.h" #include <sys/stat.h> #include <fcntl.h> @@ -90,62 +94,73 @@ namespace mongo { throws */ class JournalSectionIterator : boost::noncopyable { + auto_ptr<BufReader> _entries; + const JSectHeader _h; + const char *_lastDbName; // pointer into mmaped journal file + const bool _doDurOps; + string _uncompressed; public: - JournalSectionIterator(const void *p, unsigned len, bool doDurOps) - : _br(p, len) - , _sectHead(static_cast<const JSectHeader*>(_br.skip(sizeof(JSectHeader)))) - , _lastDbName(NULL) - , _doDurOps(doDurOps) - {} + JournalSectionIterator(const JSectHeader& h, const void *compressed, unsigned compressedLen, bool doDurOpsRecovering) : + _h(h), + _lastDbName(0) + , _doDurOps(doDurOpsRecovering) + { + assert( doDurOpsRecovering ); + bool ok = uncompress((const char *)compressed, compressedLen, &_uncompressed); + if( !ok ) { + // it should always be ok (i think?) as there is a previous check to see that the JSectFooter is ok + log() << "couldn't uncompress journal section" << endl; + msgasserted(15874, "couldn't uncompress journal section"); + } + const char *p = _uncompressed.c_str(); + assert( compressedLen == _h.sectionLen() - sizeof(JSectFooter) - sizeof(JSectHeader) ); + _entries = auto_ptr<BufReader>( new BufReader(p, _uncompressed.size()) ); + } + + // we work with the uncompressed buffer when doing a WRITETODATAFILES (for speed) + JournalSectionIterator(const JSectHeader &h, const void *p, unsigned len) : + _entries( new BufReader((const char *) p, len) ), + _h(h), + _lastDbName(0) + , _doDurOps(false) - bool atEof() const { return _br.atEof(); } + { } - unsigned long long seqNumber() const { return _sectHead->seqNumber; } + bool atEof() const { return _entries->atEof(); } + + unsigned long long seqNumber() const { return _h.seqNumber; } /** get the next entry from the log. this function parses and combines JDbContext and JEntry's. - * @return true if got an entry. false at successful end of section (and no entry returned). * throws on premature end of section. */ - bool next(ParsedJournalEntry& e) { + void next(ParsedJournalEntry& e) { unsigned lenOrOpCode; - _br.read(lenOrOpCode); + _entries->read(lenOrOpCode); if (lenOrOpCode > JEntry::OpCode_Min) { switch( lenOrOpCode ) { case JEntry::OpCode_Footer: { - if (_doDurOps) { - const char* pos = (const char*) _br.pos(); - pos -= sizeof(lenOrOpCode); // rewind to include OpCode - const JSectFooter& footer = *(const JSectFooter*)pos; - int len = pos - (char*)_sectHead; - if (!footer.checkHash(_sectHead, len)) { - massert(13594, str::stream() << "Journal checksum doesn't match. recorded: " - << toHex(footer.hash, sizeof(footer.hash)) - << " actual: " << md5simpledigest(_sectHead, len) - , false); - } - } - return false; // false return value denotes end of section + assert( false ); } case JEntry::OpCode_FileCreated: case JEntry::OpCode_DropDb: { e.dbName = 0; - boost::shared_ptr<DurOp> op = DurOp::read(lenOrOpCode, _br); + boost::shared_ptr<DurOp> op = DurOp::read(lenOrOpCode, *_entries); if (_doDurOps) { e.op = op; } - return true; + return; } case JEntry::OpCode_DbContext: { - _lastDbName = (const char*) _br.pos(); - const unsigned limit = std::min((unsigned)Namespace::MaxNsLen, _br.remaining()); + _lastDbName = (const char*) _entries->pos(); + const unsigned limit = std::min((unsigned)Namespace::MaxNsLen, _entries->remaining()); const unsigned len = strnlen(_lastDbName, limit); massert(13533, "problem processing journal file during recovery", _lastDbName[len] == '\0'); - _br.skip(len+1); // skip '\0' too - _br.read(lenOrOpCode); + _entries->skip(len+1); // skip '\0' too + _entries->read(lenOrOpCode); // read this for the fall through } // fall through as a basic operation always follows jdbcontext, and we don't have anything to return yet @@ -157,18 +172,13 @@ namespace mongo { // JEntry - a basic write assert( lenOrOpCode && lenOrOpCode < JEntry::OpCode_Min ); - _br.rewind(4); - e.e = (JEntry *) _br.skip(sizeof(JEntry)); + _entries->rewind(4); + e.e = (JEntry *) _entries->skip(sizeof(JEntry)); e.dbName = e.e->isLocalDbContext() ? "local" : _lastDbName; assert( e.e->len == lenOrOpCode ); - _br.skip(e.e->len); - return true; + _entries->skip(e.e->len); } - private: - BufReader _br; - const JSectHeader* _sectHead; - const char *_lastDbName; // pointer into mmaped journal file - const bool _doDurOps; + }; static string fileName(const char* dbName, int fileNo) { @@ -204,6 +214,11 @@ namespace mongo { } void RecoveryJob::write(const ParsedJournalEntry& entry) { + //TODO(mathias): look into making some of these dasserts + assert(entry.e); + assert(entry.dbName); + assert(strnlen(entry.dbName, MaxDatabaseNameLen) < MaxDatabaseNameLen); + const string fn = fileName(entry.dbName, entry.e->getFileNo()); MongoFile* file; { @@ -225,8 +240,12 @@ namespace mongo { } if ((entry.e->ofs + entry.e->len) <= mmf->length()) { + assert(mmf->view_write()); + assert(entry.e->srcData()); + void* dest = (char*)mmf->view_write() + entry.e->ofs; memcpy(dest, entry.e->srcData(), entry.e->len); + stats.curr->_writeToDataFilesBytes += entry.e->len; } else { massert(13622, "Trying to write past end of file in WRITETODATAFILES", _recovering); @@ -278,27 +297,64 @@ namespace mongo { log() << "END section" << endl; } - void RecoveryJob::processSection(const void *p, unsigned len) { + void RecoveryJob::processSection(const JSectHeader *h, const void *p, unsigned len, const JSectFooter *f) { scoped_lock lk(_mx); + RACECHECK + + /** todo: we should really verify the checksum to see that seqNumber is ok? + that is expensive maybe there is some sort of checksum of just the header + within the header itself + */ + if( _recovering && _lastDataSyncedFromLastRun > h->seqNumber + ExtraKeepTimeMs ) { + if( h->seqNumber != _lastSeqMentionedInConsoleLog ) { + static int n; + if( ++n < 10 ) { + log() << "recover skipping application of section seq:" << h->seqNumber << " < lsn:" << _lastDataSyncedFromLastRun << endl; + } + else if( n == 10 ) { + log() << "recover skipping application of section more..." << endl; + } + _lastSeqMentionedInConsoleLog = h->seqNumber; + } + return; + } - vector<ParsedJournalEntry> entries; - JournalSectionIterator i(p, len, _recovering); + auto_ptr<JournalSectionIterator> i; + if( _recovering ) { + i = auto_ptr<JournalSectionIterator>(new JournalSectionIterator(*h, p, len, _recovering)); + } + else { + i = auto_ptr<JournalSectionIterator>(new JournalSectionIterator(*h, /*after header*/p, /*w/out header*/len)); + } - //DEV log() << "recovery processSection seq:" << i.seqNumber() << endl; - if( _recovering && _lastDataSyncedFromLastRun > i.seqNumber() + ExtraKeepTimeMs ) { - if( i.seqNumber() != _lastSeqMentionedInConsoleLog ) { - log() << "recover skipping application of section seq:" << i.seqNumber() << " < lsn:" << _lastDataSyncedFromLastRun << endl; - _lastSeqMentionedInConsoleLog = i.seqNumber(); + // we use a static so that we don't have to reallocate every time through. occasionally we + // go back to a small allocation so that if there were a spiky growth it won't stick forever. + static vector<ParsedJournalEntry> entries; + entries.clear(); +/** TEMP uncomment + RARELY OCCASIONALLY { + if( entries.capacity() > 2048 ) { + entries.shrink_to_fit(); + entries.reserve(2048); } - return; } +*/ // first read all entries to make sure this section is valid ParsedJournalEntry e; - while( i.next(e) ) { + while( !i->atEof() ) { + i->next(e); entries.push_back(e); } + // after the entries check the footer checksum + if( _recovering ) { + assert( ((const char *)h) + sizeof(JSectHeader) == p ); + if( !f->checkHash(h, len + sizeof(JSectHeader)) ) { + msgasserted(13594, "journal checksum doesn't match"); + } + } + // got all the entries for one group commit. apply them: applyEntries(entries); } @@ -334,11 +390,16 @@ namespace mongo { if( h.fileId != fileId ) { if( debug || (cmdLine.durOptions & CmdLine::DurDumpJournal) ) { log() << "Ending processFileBuffer at differing fileId want:" << fileId << " got:" << h.fileId << endl; - log() << " sect len:" << h.len << " seqnum:" << h.seqNumber << endl; + log() << " sect len:" << h.sectionLen() << " seqnum:" << h.seqNumber << endl; } return true; } - processSection(br.skip(h.len), h.len); + unsigned slen = h.sectionLen(); + unsigned dataLen = slen - sizeof(JSectHeader) - sizeof(JSectFooter); + const char *hdr = (const char *) br.skip(h.sectionLenWithPadding()); + const char *data = hdr + sizeof(JSectHeader); + const char *footer = data + dataLen; + processSection((const JSectHeader*) hdr, data, dataLen, (const JSectFooter*) footer); // ctrl c check killCurrentOp.checkForInterrupt(false); @@ -356,6 +417,17 @@ namespace mongo { /** apply a specific journal file */ bool RecoveryJob::processFile(path journalfile) { log() << "recover " << journalfile.string() << endl; + + try { + if( boost::filesystem::file_size( journalfile.string() ) == 0 ) { + log() << "recover info " << journalfile.string() << " has zero length" << endl; + return true; + } + } catch(...) { + // if something weird like a permissions problem keep going so the massert down below can happen (presumably) + log() << "recover exception checking filesize" << endl; + } + MemoryMappedFile f; void *p = f.mapWithOptions(journalfile.string().c_str(), MongoFile::READONLY | MongoFile::SEQUENTIAL); massert(13544, str::stream() << "recover error couldn't open " << journalfile.string(), p); @@ -371,13 +443,19 @@ namespace mongo { _lastDataSyncedFromLastRun = journalReadLSN(); log() << "recover lsn: " << _lastDataSyncedFromLastRun << endl; + // todo: we could truncate the journal file at rotation time to the right length, then this abruptEnd + // check can be turned back on. this is relevant when prealloc is being used. for( unsigned i = 0; i != files.size(); ++i ) { - /*bool abruptEnd = */processFile(files[i]); - /*if( abruptEnd && i+1 < files.size() ) { + bool abruptEnd = processFile(files[i]); + if( abruptEnd && i+1 < files.size() ) { +#if 1 // Leaving this as a warning for now. TODO: make this an error post 2.0 + log() << "recover warning: abrupt end to file " << files[i].string() << ", yet it isn't the last journal file" << endl; +#else log() << "recover error: abrupt end to file " << files[i].string() << ", yet it isn't the last journal file" << endl; close(); uasserted(13535, "recover abrupt journal file end"); - }*/ +#endif + } } close(); |