summaryrefslogtreecommitdiff
path: root/db/dur_recover.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'db/dur_recover.cpp')
-rw-r--r--db/dur_recover.cpp186
1 files changed, 132 insertions, 54 deletions
diff --git a/db/dur_recover.cpp b/db/dur_recover.cpp
index 1480a59..3c9fee7 100644
--- a/db/dur_recover.cpp
+++ b/db/dur_recover.cpp
@@ -19,6 +19,7 @@
#include "pch.h"
#include "dur.h"
+#include "dur_stats.h"
#include "dur_recover.h"
#include "dur_journal.h"
#include "dur_journalformat.h"
@@ -26,13 +27,16 @@
#include "namespace.h"
#include "../util/mongoutils/str.h"
#include "../util/bufreader.h"
+#include "../util/concurrency/race.h"
#include "pdfile.h"
#include "database.h"
#include "db.h"
#include "../util/unittest.h"
+#include "../util/checksum.h"
#include "cmdline.h"
#include "curop.h"
#include "mongommf.h"
+#include "../util/compress.h"
#include <sys/stat.h>
#include <fcntl.h>
@@ -90,62 +94,73 @@ namespace mongo {
throws
*/
class JournalSectionIterator : boost::noncopyable {
+ auto_ptr<BufReader> _entries;
+ const JSectHeader _h;
+ const char *_lastDbName; // pointer into mmaped journal file
+ const bool _doDurOps;
+ string _uncompressed;
public:
- JournalSectionIterator(const void *p, unsigned len, bool doDurOps)
- : _br(p, len)
- , _sectHead(static_cast<const JSectHeader*>(_br.skip(sizeof(JSectHeader))))
- , _lastDbName(NULL)
- , _doDurOps(doDurOps)
- {}
+ JournalSectionIterator(const JSectHeader& h, const void *compressed, unsigned compressedLen, bool doDurOpsRecovering) :
+ _h(h),
+ _lastDbName(0)
+ , _doDurOps(doDurOpsRecovering)
+ {
+ assert( doDurOpsRecovering );
+ bool ok = uncompress((const char *)compressed, compressedLen, &_uncompressed);
+ if( !ok ) {
+ // it should always be ok (i think?) as there is a previous check to see that the JSectFooter is ok
+ log() << "couldn't uncompress journal section" << endl;
+ msgasserted(15874, "couldn't uncompress journal section");
+ }
+ const char *p = _uncompressed.c_str();
+ assert( compressedLen == _h.sectionLen() - sizeof(JSectFooter) - sizeof(JSectHeader) );
+ _entries = auto_ptr<BufReader>( new BufReader(p, _uncompressed.size()) );
+ }
+
+ // we work with the uncompressed buffer when doing a WRITETODATAFILES (for speed)
+ JournalSectionIterator(const JSectHeader &h, const void *p, unsigned len) :
+ _entries( new BufReader((const char *) p, len) ),
+ _h(h),
+ _lastDbName(0)
+ , _doDurOps(false)
- bool atEof() const { return _br.atEof(); }
+ { }
- unsigned long long seqNumber() const { return _sectHead->seqNumber; }
+ bool atEof() const { return _entries->atEof(); }
+
+ unsigned long long seqNumber() const { return _h.seqNumber; }
/** get the next entry from the log. this function parses and combines JDbContext and JEntry's.
- * @return true if got an entry. false at successful end of section (and no entry returned).
* throws on premature end of section.
*/
- bool next(ParsedJournalEntry& e) {
+ void next(ParsedJournalEntry& e) {
unsigned lenOrOpCode;
- _br.read(lenOrOpCode);
+ _entries->read(lenOrOpCode);
if (lenOrOpCode > JEntry::OpCode_Min) {
switch( lenOrOpCode ) {
case JEntry::OpCode_Footer: {
- if (_doDurOps) {
- const char* pos = (const char*) _br.pos();
- pos -= sizeof(lenOrOpCode); // rewind to include OpCode
- const JSectFooter& footer = *(const JSectFooter*)pos;
- int len = pos - (char*)_sectHead;
- if (!footer.checkHash(_sectHead, len)) {
- massert(13594, str::stream() << "Journal checksum doesn't match. recorded: "
- << toHex(footer.hash, sizeof(footer.hash))
- << " actual: " << md5simpledigest(_sectHead, len)
- , false);
- }
- }
- return false; // false return value denotes end of section
+ assert( false );
}
case JEntry::OpCode_FileCreated:
case JEntry::OpCode_DropDb: {
e.dbName = 0;
- boost::shared_ptr<DurOp> op = DurOp::read(lenOrOpCode, _br);
+ boost::shared_ptr<DurOp> op = DurOp::read(lenOrOpCode, *_entries);
if (_doDurOps) {
e.op = op;
}
- return true;
+ return;
}
case JEntry::OpCode_DbContext: {
- _lastDbName = (const char*) _br.pos();
- const unsigned limit = std::min((unsigned)Namespace::MaxNsLen, _br.remaining());
+ _lastDbName = (const char*) _entries->pos();
+ const unsigned limit = std::min((unsigned)Namespace::MaxNsLen, _entries->remaining());
const unsigned len = strnlen(_lastDbName, limit);
massert(13533, "problem processing journal file during recovery", _lastDbName[len] == '\0');
- _br.skip(len+1); // skip '\0' too
- _br.read(lenOrOpCode);
+ _entries->skip(len+1); // skip '\0' too
+ _entries->read(lenOrOpCode); // read this for the fall through
}
// fall through as a basic operation always follows jdbcontext, and we don't have anything to return yet
@@ -157,18 +172,13 @@ namespace mongo {
// JEntry - a basic write
assert( lenOrOpCode && lenOrOpCode < JEntry::OpCode_Min );
- _br.rewind(4);
- e.e = (JEntry *) _br.skip(sizeof(JEntry));
+ _entries->rewind(4);
+ e.e = (JEntry *) _entries->skip(sizeof(JEntry));
e.dbName = e.e->isLocalDbContext() ? "local" : _lastDbName;
assert( e.e->len == lenOrOpCode );
- _br.skip(e.e->len);
- return true;
+ _entries->skip(e.e->len);
}
- private:
- BufReader _br;
- const JSectHeader* _sectHead;
- const char *_lastDbName; // pointer into mmaped journal file
- const bool _doDurOps;
+
};
static string fileName(const char* dbName, int fileNo) {
@@ -204,6 +214,11 @@ namespace mongo {
}
void RecoveryJob::write(const ParsedJournalEntry& entry) {
+ //TODO(mathias): look into making some of these dasserts
+ assert(entry.e);
+ assert(entry.dbName);
+ assert(strnlen(entry.dbName, MaxDatabaseNameLen) < MaxDatabaseNameLen);
+
const string fn = fileName(entry.dbName, entry.e->getFileNo());
MongoFile* file;
{
@@ -225,8 +240,12 @@ namespace mongo {
}
if ((entry.e->ofs + entry.e->len) <= mmf->length()) {
+ assert(mmf->view_write());
+ assert(entry.e->srcData());
+
void* dest = (char*)mmf->view_write() + entry.e->ofs;
memcpy(dest, entry.e->srcData(), entry.e->len);
+ stats.curr->_writeToDataFilesBytes += entry.e->len;
}
else {
massert(13622, "Trying to write past end of file in WRITETODATAFILES", _recovering);
@@ -278,27 +297,64 @@ namespace mongo {
log() << "END section" << endl;
}
- void RecoveryJob::processSection(const void *p, unsigned len) {
+ void RecoveryJob::processSection(const JSectHeader *h, const void *p, unsigned len, const JSectFooter *f) {
scoped_lock lk(_mx);
+ RACECHECK
+
+ /** todo: we should really verify the checksum to see that seqNumber is ok?
+ that is expensive maybe there is some sort of checksum of just the header
+ within the header itself
+ */
+ if( _recovering && _lastDataSyncedFromLastRun > h->seqNumber + ExtraKeepTimeMs ) {
+ if( h->seqNumber != _lastSeqMentionedInConsoleLog ) {
+ static int n;
+ if( ++n < 10 ) {
+ log() << "recover skipping application of section seq:" << h->seqNumber << " < lsn:" << _lastDataSyncedFromLastRun << endl;
+ }
+ else if( n == 10 ) {
+ log() << "recover skipping application of section more..." << endl;
+ }
+ _lastSeqMentionedInConsoleLog = h->seqNumber;
+ }
+ return;
+ }
- vector<ParsedJournalEntry> entries;
- JournalSectionIterator i(p, len, _recovering);
+ auto_ptr<JournalSectionIterator> i;
+ if( _recovering ) {
+ i = auto_ptr<JournalSectionIterator>(new JournalSectionIterator(*h, p, len, _recovering));
+ }
+ else {
+ i = auto_ptr<JournalSectionIterator>(new JournalSectionIterator(*h, /*after header*/p, /*w/out header*/len));
+ }
- //DEV log() << "recovery processSection seq:" << i.seqNumber() << endl;
- if( _recovering && _lastDataSyncedFromLastRun > i.seqNumber() + ExtraKeepTimeMs ) {
- if( i.seqNumber() != _lastSeqMentionedInConsoleLog ) {
- log() << "recover skipping application of section seq:" << i.seqNumber() << " < lsn:" << _lastDataSyncedFromLastRun << endl;
- _lastSeqMentionedInConsoleLog = i.seqNumber();
+ // we use a static so that we don't have to reallocate every time through. occasionally we
+ // go back to a small allocation so that if there were a spiky growth it won't stick forever.
+ static vector<ParsedJournalEntry> entries;
+ entries.clear();
+/** TEMP uncomment
+ RARELY OCCASIONALLY {
+ if( entries.capacity() > 2048 ) {
+ entries.shrink_to_fit();
+ entries.reserve(2048);
}
- return;
}
+*/
// first read all entries to make sure this section is valid
ParsedJournalEntry e;
- while( i.next(e) ) {
+ while( !i->atEof() ) {
+ i->next(e);
entries.push_back(e);
}
+ // after the entries check the footer checksum
+ if( _recovering ) {
+ assert( ((const char *)h) + sizeof(JSectHeader) == p );
+ if( !f->checkHash(h, len + sizeof(JSectHeader)) ) {
+ msgasserted(13594, "journal checksum doesn't match");
+ }
+ }
+
// got all the entries for one group commit. apply them:
applyEntries(entries);
}
@@ -334,11 +390,16 @@ namespace mongo {
if( h.fileId != fileId ) {
if( debug || (cmdLine.durOptions & CmdLine::DurDumpJournal) ) {
log() << "Ending processFileBuffer at differing fileId want:" << fileId << " got:" << h.fileId << endl;
- log() << " sect len:" << h.len << " seqnum:" << h.seqNumber << endl;
+ log() << " sect len:" << h.sectionLen() << " seqnum:" << h.seqNumber << endl;
}
return true;
}
- processSection(br.skip(h.len), h.len);
+ unsigned slen = h.sectionLen();
+ unsigned dataLen = slen - sizeof(JSectHeader) - sizeof(JSectFooter);
+ const char *hdr = (const char *) br.skip(h.sectionLenWithPadding());
+ const char *data = hdr + sizeof(JSectHeader);
+ const char *footer = data + dataLen;
+ processSection((const JSectHeader*) hdr, data, dataLen, (const JSectFooter*) footer);
// ctrl c check
killCurrentOp.checkForInterrupt(false);
@@ -356,6 +417,17 @@ namespace mongo {
/** apply a specific journal file */
bool RecoveryJob::processFile(path journalfile) {
log() << "recover " << journalfile.string() << endl;
+
+ try {
+ if( boost::filesystem::file_size( journalfile.string() ) == 0 ) {
+ log() << "recover info " << journalfile.string() << " has zero length" << endl;
+ return true;
+ }
+ } catch(...) {
+ // if something weird like a permissions problem keep going so the massert down below can happen (presumably)
+ log() << "recover exception checking filesize" << endl;
+ }
+
MemoryMappedFile f;
void *p = f.mapWithOptions(journalfile.string().c_str(), MongoFile::READONLY | MongoFile::SEQUENTIAL);
massert(13544, str::stream() << "recover error couldn't open " << journalfile.string(), p);
@@ -371,13 +443,19 @@ namespace mongo {
_lastDataSyncedFromLastRun = journalReadLSN();
log() << "recover lsn: " << _lastDataSyncedFromLastRun << endl;
+ // todo: we could truncate the journal file at rotation time to the right length, then this abruptEnd
+ // check can be turned back on. this is relevant when prealloc is being used.
for( unsigned i = 0; i != files.size(); ++i ) {
- /*bool abruptEnd = */processFile(files[i]);
- /*if( abruptEnd && i+1 < files.size() ) {
+ bool abruptEnd = processFile(files[i]);
+ if( abruptEnd && i+1 < files.size() ) {
+#if 1 // Leaving this as a warning for now. TODO: make this an error post 2.0
+ log() << "recover warning: abrupt end to file " << files[i].string() << ", yet it isn't the last journal file" << endl;
+#else
log() << "recover error: abrupt end to file " << files[i].string() << ", yet it isn't the last journal file" << endl;
close();
uasserted(13535, "recover abrupt journal file end");
- }*/
+#endif
+ }
}
close();