diff options
Diffstat (limited to 'db/compact.cpp')
-rw-r--r-- | db/compact.cpp | 361 |
1 files changed, 230 insertions, 131 deletions
diff --git a/db/compact.cpp b/db/compact.cpp index 6bafd91..c6e5f77 100644 --- a/db/compact.cpp +++ b/db/compact.cpp @@ -1,4 +1,4 @@ -/* @file compact.cpp +/** @file compact.cpp compaction of deleted space in pdfiles (datafiles) */ @@ -25,174 +25,273 @@ #include "concurrency.h" #include "commands.h" #include "curop-inl.h" +#include "background.h" +#include "extsort.h" +#include "compact.h" #include "../util/concurrency/task.h" namespace mongo { - class CompactJob : public task::Task { - public: - CompactJob(string ns) : _ns(ns) { } - private: - virtual string name() const { return "compact"; } - virtual void doWork(); - NamespaceDetails * beginBlock(); - void doBatch(); - void prep(); - const string _ns; - unsigned long long _nrecords; - unsigned long long _ncompacted; - DiskLoc _firstExtent; - }; + char faux; - // lock & set context first. this checks that collection still exists, and that it hasn't - // morphed into a capped collection between locks (which is possible) - NamespaceDetails * CompactJob::beginBlock() { - NamespaceDetails *nsd = nsdetails(_ns.c_str()); - if( nsd == 0 ) throw "ns no longer present"; - if( nsd->firstExtent.isNull() ) - throw "no first extent"; - if( nsd->capped ) - throw "capped collection"; - return nsd; - } + void addRecordToRecListInExtent(Record *r, DiskLoc loc); + DiskLoc allocateSpaceForANewRecord(const char *ns, NamespaceDetails *d, int lenWHdr, bool god); + void freeExtents(DiskLoc firstExt, DiskLoc lastExt); + + /** @return number of skipped (invalid) documents */ + unsigned compactExtent(const char *ns, NamespaceDetails *d, const DiskLoc ext, int n, + const scoped_array<IndexSpec> &indexSpecs, + scoped_array<SortPhaseOne>& phase1, int nidx, bool validate) + { + log() << "compact extent #" << n << endl; + + Extent *e = ext.ext(); + e->assertOk(); + assert( e->validates() ); + unsigned skipped = 0; - void CompactJob::doBatch() { - unsigned n = 0; { - /* pre-touch records in a read lock so that paging happens in read not write lock. - note we are only touching the records though; if indexes aren't in RAM, they will - page later. So the concept is only partial. - */ - readlock lk; + // the next/prev pointers within the extent might not be in order so we first page the whole thing in + // sequentially + log() << "compact paging in len=" << e->length/1000000.0 << "MB" << endl; Timer t; - Client::Context ctx(_ns); - NamespaceDetails *nsd = beginBlock(); - if( nsd->firstExtent != _firstExtent ) { - // TEMP DEV - stop after 1st extent - throw "change of first extent"; - } - DiskLoc loc = nsd->firstExtent.ext()->firstRecord; - while( !loc.isNull() ) { - Record *r = loc.rec(); - loc = r->getNext(loc); - if( ++n >= 100 || (n % 8 == 0 && t.millis() > 50) ) - break; + MAdvise adv(e, e->length, MAdvise::Sequential); + const char *p = (const char *) e; + for( int i = 0; i < e->length; i += 4096 ) { + faux += *p; } + int ms = t.millis(); + if( ms > 1000 ) + log() << "compact end paging in " << ms << "ms " << e->length/1000000.0/ms << "MB/sec" << endl; } + { - writelock lk; - Client::Context ctx(_ns); - NamespaceDetails *nsd = beginBlock(); - for( unsigned i = 0; i < n; i++ ) { - if( nsd->firstExtent != _firstExtent ) { - // TEMP DEV - stop after 1st extent - throw "change of first extent (or it is now null)"; + log() << "compact copying records" << endl; + unsigned totalSize = 0; + int nrecs = 0; + DiskLoc L = e->firstRecord; + if( !L.isNull() ) + while( 1 ) { + Record *recOld = L.rec(); + L = recOld->nextInExtent(L); + nrecs++; + BSONObj objOld(recOld); + + if( !validate || objOld.valid() ) { + unsigned sz = objOld.objsize(); + unsigned lenWHdr = sz + Record::HeaderSize; + totalSize += lenWHdr; + DiskLoc extentLoc; + DiskLoc loc = allocateSpaceForANewRecord(ns, d, lenWHdr, false); + uassert(14024, "compact error out of space during compaction", !loc.isNull()); + Record *recNew = loc.rec(); + recNew = (Record *) getDur().writingPtr(recNew, lenWHdr); + addRecordToRecListInExtent(recNew, loc); + memcpy(recNew->data, objOld.objdata(), sz); + + { + // extract keys for all indexes we will be rebuilding + for( int x = 0; x < nidx; x++ ) { + phase1[x].addKeys(indexSpecs[x], objOld, loc); + } + } } - DiskLoc loc = nsd->firstExtent.ext()->firstRecord; - Record *rec = loc.rec(); - BSONObj o = loc.obj().getOwned(); // todo: inefficient, double mem copy... - try { - theDataFileMgr.deleteRecord(_ns.c_str(), rec, loc, false); + else { + if( ++skipped <= 10 ) + log() << "compact skipping invalid object" << endl; } - catch(DBException&) { throw "error deleting record"; } - try { - theDataFileMgr.insertNoReturnVal(_ns.c_str(), o); + + if( L.isNull() ) { + // we just did the very last record from the old extent. it's still pointed to + // by the old extent ext, but that will be fixed below after this loop + break; } - catch(DBException&) { - /* todo: save the record somehow??? try again with 'avoid' logic? */ - log() << "compact: error re-inserting record ns:" << _ns << " n:" << _nrecords << " _id:" << o["_id"].toString() << endl; - throw "error re-inserting record"; + + // remove the old records (orphan them) periodically so our commit block doesn't get too large + bool stopping = false; + RARELY stopping = *killCurrentOp.checkForInterruptNoAssert(false) != 0; + if( stopping || getDur().aCommitIsNeeded() ) { + e->firstRecord.writing() = L; + Record *r = L.rec(); + getDur().writingInt(r->prevOfs) = DiskLoc::NullOfs; + getDur().commitIfNeeded(); + killCurrentOp.checkForInterrupt(false); } - ++_ncompacted; - if( killCurrentOp.globalInterruptCheck() ) - throw "interrupted"; } + + assert( d->firstExtent == ext ); + assert( d->lastExtent != ext ); + DiskLoc newFirst = e->xnext; + d->firstExtent.writing() = newFirst; + newFirst.ext()->xprev.writing().Null(); + getDur().writing(e)->markEmpty(); + freeExtents(ext,ext); + getDur().commitIfNeeded(); + + log() << "compact " << nrecs << " documents " << totalSize/1000000.0 << "MB" << endl; } - } - void CompactJob::prep() { - readlock lk; - Client::Context ctx(_ns); - NamespaceDetails *nsd = beginBlock(); - DiskLoc L = nsd->firstExtent; - assert( !L.isNull() ); - _firstExtent = L; - _nrecords = nsd->stats.nrecords; - _ncompacted = 0; + return skipped; } - static mutex m("compact"); - static volatile bool running; - - void CompactJob::doWork() { - Client::initThread("compact"); - cc().curop()->reset(); - cc().curop()->setNS(_ns.c_str()); - cc().curop()->markCommand(); - sleepsecs(60); - try { - prep(); - while( _ncompacted < _nrecords ) - doBatch(); + extern SortPhaseOne *precalced; + + bool _compact(const char *ns, NamespaceDetails *d, string& errmsg, bool validate, BSONObjBuilder& result) { + //int les = d->lastExtentSize; + + // this is a big job, so might as well make things tidy before we start just to be nice. + getDur().commitNow(); + + list<DiskLoc> extents; + for( DiskLoc L = d->firstExtent; !L.isNull(); L = L.ext()->xnext ) + extents.push_back(L); + log() << "compact " << extents.size() << " extents" << endl; + + ProgressMeterHolder pm( cc().curop()->setMessage( "compact extent" , extents.size() ) ); + + // same data, but might perform a little different after compact? + NamespaceDetailsTransient::get_w(ns).clearQueryCache(); + + int nidx = d->nIndexes; + scoped_array<IndexSpec> indexSpecs( new IndexSpec[nidx] ); + scoped_array<SortPhaseOne> phase1( new SortPhaseOne[nidx] ); + { + NamespaceDetails::IndexIterator ii = d->ii(); + int x = 0; + while( ii.more() ) { + BSONObjBuilder b; + IndexDetails& idx = ii.next(); + BSONObj::iterator i(idx.info.obj()); + while( i.more() ) { + BSONElement e = i.next(); + if( !str::equals(e.fieldName(), "v") && !str::equals(e.fieldName(), "background") ) { + b.append(e); + } + } + BSONObj o = b.obj().getOwned(); + phase1[x].sorter.reset( new BSONObjExternalSorter( idx.idxInterface(), o.getObjectField("key") ) ); + phase1[x].sorter->hintNumObjects( d->stats.nrecords ); + indexSpecs[x++].reset(o); + } } - catch(const char *p) { - log() << "info: exception compact " << p << endl; + + log() << "compact orphan deleted lists" << endl; + for( int i = 0; i < Buckets; i++ ) { + d->deletedList[i].writing().Null(); } - catch(...) { - log() << "info: exception compact" << endl; + + // before dropping indexes, at least make sure we can allocate one extent! + uassert(14025, "compact error no space available to allocate", !allocateSpaceForANewRecord(ns, d, Record::HeaderSize+1, false).isNull()); + + // note that the drop indexes call also invalidates all clientcursors for the namespace, which is important and wanted here + log() << "compact dropping indexes" << endl; + BSONObjBuilder b; + if( !dropIndexes(d, ns, "*", errmsg, b, true) ) { + errmsg = "compact drop indexes failed"; + log() << errmsg << endl; + return false; } - mongo::running = false; - cc().shutdown(); - } - /* --- CompactCmd --- */ + getDur().commitNow(); - class CompactCmd : public Command { - public: - virtual bool run(const string& db, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl) { - string coll = cmdObj.firstElement().valuestr(); - if( coll.empty() || db.empty() ) { - errmsg = "no collection name specified"; - return false; + long long skipped = 0; + int n = 0; + for( list<DiskLoc>::iterator i = extents.begin(); i != extents.end(); i++ ) { + skipped += compactExtent(ns, d, *i, n++, indexSpecs, phase1, nidx, validate); + pm.hit(); + } + + if( skipped ) { + result.append("invalidObjects", skipped); + } + + assert( d->firstExtent.ext()->xprev.isNull() ); + + // indexes will do their own progress meter? + pm.finished(); + + // build indexes + NamespaceString s(ns); + string si = s.db + ".system.indexes"; + for( int i = 0; i < nidx; i++ ) { + killCurrentOp.checkForInterrupt(false); + BSONObj info = indexSpecs[i].info; + log() << "compact create index " << info["key"].Obj().toString() << endl; + try { + precalced = &phase1[i]; + theDataFileMgr.insert(si.c_str(), info.objdata(), info.objsize()); } - string ns = db + '.' + coll; - assert( isANormalNSName(ns.c_str()) ); - { - readlock lk; - Client::Context ctx(ns); - if( nsdetails(ns.c_str()) == 0 ) { - errmsg = "namespace " + ns + " does not exist"; - return false; - } + catch(...) { + precalced = 0; + throw; } - { - scoped_lock lk(m); - if( running ) { - errmsg = "a compaction is already running"; - return false; - } - running = true; - task::fork( new CompactJob(ns) ); - return true; + precalced = 0; + } + + return true; + } + + bool compact(const string& ns, string &errmsg, bool validate, BSONObjBuilder& result) { + massert( 14028, "bad ns", NamespaceString::normal(ns.c_str()) ); + massert( 14027, "can't compact a system namespace", !str::contains(ns, ".system.") ); // items in system.indexes cannot be moved there are pointers to those disklocs in NamespaceDetails + + bool ok; + { + writelock lk; + BackgroundOperation::assertNoBgOpInProgForNs(ns.c_str()); + Client::Context ctx(ns); + NamespaceDetails *d = nsdetails(ns.c_str()); + massert( 13660, str::stream() << "namespace " << ns << " does not exist", d ); + massert( 13661, "cannot compact capped collection", !d->capped ); + log() << "compact " << ns << " begin" << endl; + try { + ok = _compact(ns.c_str(), d, errmsg, validate, result); } - errmsg = "not done"; - return false; + catch(...) { + log() << "compact " << ns << " end (with error)" << endl; + throw; + } + log() << "compact " << ns << " end" << endl; } + return ok; + } + + bool isCurrentlyAReplSetPrimary(); + class CompactCmd : public Command { + public: virtual LockType locktype() const { return NONE; } virtual bool adminOnly() const { return false; } virtual bool slaveOk() const { return true; } + virtual bool maintenanceMode() const { return true; } virtual bool logTheOp() { return false; } virtual void help( stringstream& help ) const { - help << "compact / defragment a collection in the background, slowly, attempting to minimize disruptions to other operations\n" - "{ compact : <collection> }"; + help << "compact collection\n" + "warning: this operation blocks the server and is slow. you can cancel with cancelOp()\n" + "{ compact : <collection_name>, [force:true], [validate:true] }\n" + " force - allows to run on a replica set primary\n" + " validate - check records are noncorrupt before adding to newly compacting extents. slower but safer (default is true in this version)\n"; } virtual bool requiresAuth() { return true; } - - /** @param webUI expose the command in the web ui as localhost:28017/<name> - @param oldName an optional old, deprecated name for the command - */ CompactCmd() : Command("compact") { } + + virtual bool run(const string& db, BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool fromRepl) { + string coll = cmdObj.firstElement().valuestr(); + if( coll.empty() || db.empty() ) { + errmsg = "no collection name specified"; + return false; + } + + if( isCurrentlyAReplSetPrimary() && !cmdObj["force"].trueValue() ) { + errmsg = "will not run compact on an active replica set primary as this is a slow blocking operation. use force:true to force"; + return false; + } + + string ns = db + '.' + coll; + bool validate = !cmdObj.hasElement("validate") || cmdObj["validate"].trueValue(); // default is true at the moment + bool ok = compact(ns, errmsg, validate, result); + return ok; + } }; static CompactCmd compactCmd; |