summaryrefslogtreecommitdiff
path: root/db/compact.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'db/compact.cpp')
-rw-r--r--db/compact.cpp361
1 files changed, 230 insertions, 131 deletions
diff --git a/db/compact.cpp b/db/compact.cpp
index 6bafd91..c6e5f77 100644
--- a/db/compact.cpp
+++ b/db/compact.cpp
@@ -1,4 +1,4 @@
-/* @file compact.cpp
+/** @file compact.cpp
compaction of deleted space in pdfiles (datafiles)
*/
@@ -25,174 +25,273 @@
#include "concurrency.h"
#include "commands.h"
#include "curop-inl.h"
+#include "background.h"
+#include "extsort.h"
+#include "compact.h"
#include "../util/concurrency/task.h"
namespace mongo {
- class CompactJob : public task::Task {
- public:
- CompactJob(string ns) : _ns(ns) { }
- private:
- virtual string name() const { return "compact"; }
- virtual void doWork();
- NamespaceDetails * beginBlock();
- void doBatch();
- void prep();
- const string _ns;
- unsigned long long _nrecords;
- unsigned long long _ncompacted;
- DiskLoc _firstExtent;
- };
+ char faux;
- // lock & set context first. this checks that collection still exists, and that it hasn't
- // morphed into a capped collection between locks (which is possible)
- NamespaceDetails * CompactJob::beginBlock() {
- NamespaceDetails *nsd = nsdetails(_ns.c_str());
- if( nsd == 0 ) throw "ns no longer present";
- if( nsd->firstExtent.isNull() )
- throw "no first extent";
- if( nsd->capped )
- throw "capped collection";
- return nsd;
- }
+ void addRecordToRecListInExtent(Record *r, DiskLoc loc);
+ DiskLoc allocateSpaceForANewRecord(const char *ns, NamespaceDetails *d, int lenWHdr, bool god);
+ void freeExtents(DiskLoc firstExt, DiskLoc lastExt);
+
+ /** @return number of skipped (invalid) documents */
+ unsigned compactExtent(const char *ns, NamespaceDetails *d, const DiskLoc ext, int n,
+ const scoped_array<IndexSpec> &indexSpecs,
+ scoped_array<SortPhaseOne>& phase1, int nidx, bool validate)
+ {
+ log() << "compact extent #" << n << endl;
+
+ Extent *e = ext.ext();
+ e->assertOk();
+ assert( e->validates() );
+ unsigned skipped = 0;
- void CompactJob::doBatch() {
- unsigned n = 0;
{
- /* pre-touch records in a read lock so that paging happens in read not write lock.
- note we are only touching the records though; if indexes aren't in RAM, they will
- page later. So the concept is only partial.
- */
- readlock lk;
+ // the next/prev pointers within the extent might not be in order so we first page the whole thing in
+ // sequentially
+ log() << "compact paging in len=" << e->length/1000000.0 << "MB" << endl;
Timer t;
- Client::Context ctx(_ns);
- NamespaceDetails *nsd = beginBlock();
- if( nsd->firstExtent != _firstExtent ) {
- // TEMP DEV - stop after 1st extent
- throw "change of first extent";
- }
- DiskLoc loc = nsd->firstExtent.ext()->firstRecord;
- while( !loc.isNull() ) {
- Record *r = loc.rec();
- loc = r->getNext(loc);
- if( ++n >= 100 || (n % 8 == 0 && t.millis() > 50) )
- break;
+ MAdvise adv(e, e->length, MAdvise::Sequential);
+ const char *p = (const char *) e;
+ for( int i = 0; i < e->length; i += 4096 ) {
+ faux += *p;
}
+ int ms = t.millis();
+ if( ms > 1000 )
+ log() << "compact end paging in " << ms << "ms " << e->length/1000000.0/ms << "MB/sec" << endl;
}
+
{
- writelock lk;
- Client::Context ctx(_ns);
- NamespaceDetails *nsd = beginBlock();
- for( unsigned i = 0; i < n; i++ ) {
- if( nsd->firstExtent != _firstExtent ) {
- // TEMP DEV - stop after 1st extent
- throw "change of first extent (or it is now null)";
+ log() << "compact copying records" << endl;
+ unsigned totalSize = 0;
+ int nrecs = 0;
+ DiskLoc L = e->firstRecord;
+ if( !L.isNull() )
+ while( 1 ) {
+ Record *recOld = L.rec();
+ L = recOld->nextInExtent(L);
+ nrecs++;
+ BSONObj objOld(recOld);
+
+ if( !validate || objOld.valid() ) {
+ unsigned sz = objOld.objsize();
+ unsigned lenWHdr = sz + Record::HeaderSize;
+ totalSize += lenWHdr;
+ DiskLoc extentLoc;
+ DiskLoc loc = allocateSpaceForANewRecord(ns, d, lenWHdr, false);
+ uassert(14024, "compact error out of space during compaction", !loc.isNull());
+ Record *recNew = loc.rec();
+ recNew = (Record *) getDur().writingPtr(recNew, lenWHdr);
+ addRecordToRecListInExtent(recNew, loc);
+ memcpy(recNew->data, objOld.objdata(), sz);
+
+ {
+ // extract keys for all indexes we will be rebuilding
+ for( int x = 0; x < nidx; x++ ) {
+ phase1[x].addKeys(indexSpecs[x], objOld, loc);
+ }
+ }
}
- DiskLoc loc = nsd->firstExtent.ext()->firstRecord;
- Record *rec = loc.rec();
- BSONObj o = loc.obj().getOwned(); // todo: inefficient, double mem copy...
- try {
- theDataFileMgr.deleteRecord(_ns.c_str(), rec, loc, false);
+ else {
+ if( ++skipped <= 10 )
+ log() << "compact skipping invalid object" << endl;
}
- catch(DBException&) { throw "error deleting record"; }
- try {
- theDataFileMgr.insertNoReturnVal(_ns.c_str(), o);
+
+ if( L.isNull() ) {
+ // we just did the very last record from the old extent. it's still pointed to
+ // by the old extent ext, but that will be fixed below after this loop
+ break;
}
- catch(DBException&) {
- /* todo: save the record somehow??? try again with 'avoid' logic? */
- log() << "compact: error re-inserting record ns:" << _ns << " n:" << _nrecords << " _id:" << o["_id"].toString() << endl;
- throw "error re-inserting record";
+
+ // remove the old records (orphan them) periodically so our commit block doesn't get too large
+ bool stopping = false;
+ RARELY stopping = *killCurrentOp.checkForInterruptNoAssert(false) != 0;
+ if( stopping || getDur().aCommitIsNeeded() ) {
+ e->firstRecord.writing() = L;
+ Record *r = L.rec();
+ getDur().writingInt(r->prevOfs) = DiskLoc::NullOfs;
+ getDur().commitIfNeeded();
+ killCurrentOp.checkForInterrupt(false);
}
- ++_ncompacted;
- if( killCurrentOp.globalInterruptCheck() )
- throw "interrupted";
}
+
+ assert( d->firstExtent == ext );
+ assert( d->lastExtent != ext );
+ DiskLoc newFirst = e->xnext;
+ d->firstExtent.writing() = newFirst;
+ newFirst.ext()->xprev.writing().Null();
+ getDur().writing(e)->markEmpty();
+ freeExtents(ext,ext);
+ getDur().commitIfNeeded();
+
+ log() << "compact " << nrecs << " documents " << totalSize/1000000.0 << "MB" << endl;
}
- }
- void CompactJob::prep() {
- readlock lk;
- Client::Context ctx(_ns);
- NamespaceDetails *nsd = beginBlock();
- DiskLoc L = nsd->firstExtent;
- assert( !L.isNull() );
- _firstExtent = L;
- _nrecords = nsd->stats.nrecords;
- _ncompacted = 0;
+ return skipped;
}
- static mutex m("compact");
- static volatile bool running;
-
- void CompactJob::doWork() {
- Client::initThread("compact");
- cc().curop()->reset();
- cc().curop()->setNS(_ns.c_str());
- cc().curop()->markCommand();
- sleepsecs(60);
- try {
- prep();
- while( _ncompacted < _nrecords )
- doBatch();
+ extern SortPhaseOne *precalced;
+
+ bool _compact(const char *ns, NamespaceDetails *d, string& errmsg, bool validate, BSONObjBuilder& result) {
+ //int les = d->lastExtentSize;
+
+ // this is a big job, so might as well make things tidy before we start just to be nice.
+ getDur().commitNow();
+
+ list<DiskLoc> extents;
+ for( DiskLoc L = d->firstExtent; !L.isNull(); L = L.ext()->xnext )
+ extents.push_back(L);
+ log() << "compact " << extents.size() << " extents" << endl;
+
+ ProgressMeterHolder pm( cc().curop()->setMessage( "compact extent" , extents.size() ) );
+
+ // same data, but might perform a little different after compact?
+ NamespaceDetailsTransient::get_w(ns).clearQueryCache();
+
+ int nidx = d->nIndexes;
+ scoped_array<IndexSpec> indexSpecs( new IndexSpec[nidx] );
+ scoped_array<SortPhaseOne> phase1( new SortPhaseOne[nidx] );
+ {
+ NamespaceDetails::IndexIterator ii = d->ii();
+ int x = 0;
+ while( ii.more() ) {
+ BSONObjBuilder b;
+ IndexDetails& idx = ii.next();
+ BSONObj::iterator i(idx.info.obj());
+ while( i.more() ) {
+ BSONElement e = i.next();
+ if( !str::equals(e.fieldName(), "v") && !str::equals(e.fieldName(), "background") ) {
+ b.append(e);
+ }
+ }
+ BSONObj o = b.obj().getOwned();
+ phase1[x].sorter.reset( new BSONObjExternalSorter( idx.idxInterface(), o.getObjectField("key") ) );
+ phase1[x].sorter->hintNumObjects( d->stats.nrecords );
+ indexSpecs[x++].reset(o);
+ }
}
- catch(const char *p) {
- log() << "info: exception compact " << p << endl;
+
+ log() << "compact orphan deleted lists" << endl;
+ for( int i = 0; i < Buckets; i++ ) {
+ d->deletedList[i].writing().Null();
}
- catch(...) {
- log() << "info: exception compact" << endl;
+
+ // before dropping indexes, at least make sure we can allocate one extent!
+ uassert(14025, "compact error no space available to allocate", !allocateSpaceForANewRecord(ns, d, Record::HeaderSize+1, false).isNull());
+
+ // note that the drop indexes call also invalidates all clientcursors for the namespace, which is important and wanted here
+ log() << "compact dropping indexes" << endl;
+ BSONObjBuilder b;
+ if( !dropIndexes(d, ns, "*", errmsg, b, true) ) {
+ errmsg = "compact drop indexes failed";
+ log() << errmsg << endl;
+ return false;
}
- mongo::running = false;
- cc().shutdown();
- }
- /* --- CompactCmd --- */
+ getDur().commitNow();
- class CompactCmd : public Command {
- public:
- virtual bool run(const string& db, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl) {
- string coll = cmdObj.firstElement().valuestr();
- if( coll.empty() || db.empty() ) {
- errmsg = "no collection name specified";
- return false;
+ long long skipped = 0;
+ int n = 0;
+ for( list<DiskLoc>::iterator i = extents.begin(); i != extents.end(); i++ ) {
+ skipped += compactExtent(ns, d, *i, n++, indexSpecs, phase1, nidx, validate);
+ pm.hit();
+ }
+
+ if( skipped ) {
+ result.append("invalidObjects", skipped);
+ }
+
+ assert( d->firstExtent.ext()->xprev.isNull() );
+
+ // indexes will do their own progress meter?
+ pm.finished();
+
+ // build indexes
+ NamespaceString s(ns);
+ string si = s.db + ".system.indexes";
+ for( int i = 0; i < nidx; i++ ) {
+ killCurrentOp.checkForInterrupt(false);
+ BSONObj info = indexSpecs[i].info;
+ log() << "compact create index " << info["key"].Obj().toString() << endl;
+ try {
+ precalced = &phase1[i];
+ theDataFileMgr.insert(si.c_str(), info.objdata(), info.objsize());
}
- string ns = db + '.' + coll;
- assert( isANormalNSName(ns.c_str()) );
- {
- readlock lk;
- Client::Context ctx(ns);
- if( nsdetails(ns.c_str()) == 0 ) {
- errmsg = "namespace " + ns + " does not exist";
- return false;
- }
+ catch(...) {
+ precalced = 0;
+ throw;
}
- {
- scoped_lock lk(m);
- if( running ) {
- errmsg = "a compaction is already running";
- return false;
- }
- running = true;
- task::fork( new CompactJob(ns) );
- return true;
+ precalced = 0;
+ }
+
+ return true;
+ }
+
+ bool compact(const string& ns, string &errmsg, bool validate, BSONObjBuilder& result) {
+ massert( 14028, "bad ns", NamespaceString::normal(ns.c_str()) );
+ massert( 14027, "can't compact a system namespace", !str::contains(ns, ".system.") ); // items in system.indexes cannot be moved there are pointers to those disklocs in NamespaceDetails
+
+ bool ok;
+ {
+ writelock lk;
+ BackgroundOperation::assertNoBgOpInProgForNs(ns.c_str());
+ Client::Context ctx(ns);
+ NamespaceDetails *d = nsdetails(ns.c_str());
+ massert( 13660, str::stream() << "namespace " << ns << " does not exist", d );
+ massert( 13661, "cannot compact capped collection", !d->capped );
+ log() << "compact " << ns << " begin" << endl;
+ try {
+ ok = _compact(ns.c_str(), d, errmsg, validate, result);
}
- errmsg = "not done";
- return false;
+ catch(...) {
+ log() << "compact " << ns << " end (with error)" << endl;
+ throw;
+ }
+ log() << "compact " << ns << " end" << endl;
}
+ return ok;
+ }
+
+ bool isCurrentlyAReplSetPrimary();
+ class CompactCmd : public Command {
+ public:
virtual LockType locktype() const { return NONE; }
virtual bool adminOnly() const { return false; }
virtual bool slaveOk() const { return true; }
+ virtual bool maintenanceMode() const { return true; }
virtual bool logTheOp() { return false; }
virtual void help( stringstream& help ) const {
- help << "compact / defragment a collection in the background, slowly, attempting to minimize disruptions to other operations\n"
- "{ compact : <collection> }";
+ help << "compact collection\n"
+ "warning: this operation blocks the server and is slow. you can cancel with cancelOp()\n"
+ "{ compact : <collection_name>, [force:true], [validate:true] }\n"
+ " force - allows to run on a replica set primary\n"
+ " validate - check records are noncorrupt before adding to newly compacting extents. slower but safer (default is true in this version)\n";
}
virtual bool requiresAuth() { return true; }
-
- /** @param webUI expose the command in the web ui as localhost:28017/<name>
- @param oldName an optional old, deprecated name for the command
- */
CompactCmd() : Command("compact") { }
+
+ virtual bool run(const string& db, BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool fromRepl) {
+ string coll = cmdObj.firstElement().valuestr();
+ if( coll.empty() || db.empty() ) {
+ errmsg = "no collection name specified";
+ return false;
+ }
+
+ if( isCurrentlyAReplSetPrimary() && !cmdObj["force"].trueValue() ) {
+ errmsg = "will not run compact on an active replica set primary as this is a slow blocking operation. use force:true to force";
+ return false;
+ }
+
+ string ns = db + '.' + coll;
+ bool validate = !cmdObj.hasElement("validate") || cmdObj["validate"].trueValue(); // default is true at the moment
+ bool ok = compact(ns, errmsg, validate, result);
+ return ok;
+ }
};
static CompactCmd compactCmd;