diff options
author | Antonin Kral <a.kral@bobek.cz> | 2011-06-18 21:24:41 +0200 |
---|---|---|
committer | Antonin Kral <a.kral@bobek.cz> | 2011-06-18 21:24:41 +0200 |
commit | 64b33ee522375a8dc15be2875dfb7db4502259b0 (patch) | |
tree | 44979e0aaf6bb576f4a737a93e071e28809b6779 | |
parent | 4d87ff4aa74d7ae975268ac43eee152dc3f5b7e9 (diff) | |
download | mongodb-64b33ee522375a8dc15be2875dfb7db4502259b0.tar.gz |
Imported Upstream version 1.8.2
74 files changed, 682 insertions, 288 deletions
diff --git a/client/clientOnly.cpp b/client/clientOnly.cpp index 726c3a9..5725e5f 100644 --- a/client/clientOnly.cpp +++ b/client/clientOnly.cpp @@ -68,5 +68,9 @@ namespace mongo { return false; } + string prettyHostName() { + assert(0); + return ""; + } } diff --git a/client/connpool.cpp b/client/connpool.cpp index a521699..23d14da 100644 --- a/client/connpool.cpp +++ b/client/connpool.cpp @@ -192,6 +192,9 @@ namespace mongo { { scoped_lock lk( _mutex ); for ( PoolMap::iterator i=_pools.begin(); i!=_pools.end(); ++i ) { + if ( i->second.numCreated() == 0 ) + continue; + string s = i->first; BSONObjBuilder temp( bb.subobjStart( s ) ); temp.append( "available" , i->second.numAvailable() ); diff --git a/client/dbclient.cpp b/client/dbclient.cpp index a68b1af..bb24199 100644 --- a/client/dbclient.cpp +++ b/client/dbclient.cpp @@ -442,15 +442,16 @@ namespace mongo { return false; } - BSONObj DBClientWithCommands::mapreduce(const string &ns, const string &jsmapf, const string &jsreducef, BSONObj query, const string& outputcolname) { + DBClientWithCommands::MROutput DBClientWithCommands::MRInline (BSON("inline" << 1)); + + BSONObj DBClientWithCommands::mapreduce(const string &ns, const string &jsmapf, const string &jsreducef, BSONObj query, MROutput output) { BSONObjBuilder b; b.append("mapreduce", nsGetCollection(ns)); b.appendCode("map", jsmapf); b.appendCode("reduce", jsreducef); if( !query.isEmpty() ) b.append("query", query); - if( !outputcolname.empty() ) - b.append("out", outputcolname); + b.append("out", output.out); BSONObj info; runCommand(nsGetDB(ns), b.done(), info); return info; diff --git a/client/dbclient.h b/client/dbclient.h index 9cb6571..9bc71fd 100644 --- a/client/dbclient.h +++ b/client/dbclient.h @@ -528,6 +528,19 @@ namespace mongo { bool setDbProfilingLevel(const string &dbname, ProfilingLevel level, BSONObj *info = 0); bool getDbProfilingLevel(const string &dbname, ProfilingLevel& level, BSONObj *info = 0); + + /** This implicitly converts from char*, string, and BSONObj to be an argument to mapreduce + You shouldn't need to explicitly construct this + */ + struct MROutput { + MROutput(const char* collection) : out(BSON("replace" << collection)) {} + MROutput(const string& collection) : out(BSON("replace" << collection)) {} + MROutput(const BSONObj& obj) : out(obj) {} + + BSONObj out; + }; + static MROutput MRInline; + /** Run a map/reduce job on the server. See http://www.mongodb.org/display/DOCS/MapReduce @@ -536,8 +549,8 @@ namespace mongo { jsmapf javascript map function code jsreducef javascript reduce function code. query optional query filter for the input - output optional permanent output collection name. if not specified server will - generate a temporary collection and return its name. + output either a string collection name or an object representing output type + if not specified uses inline output type returns a result object which contains: { result : <collection_name>, @@ -551,7 +564,7 @@ namespace mongo { result.getField("ok").trueValue() on the result to check if ok. */ - BSONObj mapreduce(const string &ns, const string &jsmapf, const string &jsreducef, BSONObj query = BSONObj(), const string& output = ""); + BSONObj mapreduce(const string &ns, const string &jsmapf, const string &jsreducef, BSONObj query = BSONObj(), MROutput output = MRInline); /** Run javascript code on the database server. dbname database SavedContext in which the code runs. The javascript variable 'db' will be assigned diff --git a/client/dbclient_rs.cpp b/client/dbclient_rs.cpp index b6ce776..ae01da3 100644 --- a/client/dbclient_rs.cpp +++ b/client/dbclient_rs.cpp @@ -74,7 +74,7 @@ namespace mongo { ReplicaSetMonitor::ReplicaSetMonitor( const string& name , const vector<HostAndPort>& servers ) - : _lock( "ReplicaSetMonitor instance" ) , _checkConnectionLock( "ReplicaSetMonitor check connection lock" ), _name( name ) , _master(-1) { + : _lock( "ReplicaSetMonitor instance" ) , _checkConnectionLock( "ReplicaSetMonitor check connection lock" ), _name( name ) , _master(-1), _nextSlave(0) { uassert( 13642 , "need at least 1 node for a replica set" , servers.size() > 0 ); @@ -85,6 +85,12 @@ namespace mongo { string errmsg; for ( unsigned i=0; i<servers.size(); i++ ) { + + bool haveAlready = false; + for ( unsigned n = 0; n < _nodes.size() && ! haveAlready; n++ ) + haveAlready = ( _nodes[n].addr == servers[i] ); + if( haveAlready ) continue; + auto_ptr<DBClientConnection> conn( new DBClientConnection( true , 0, 5.0 ) ); if (!conn->connect( servers[i] , errmsg ) ) { log(1) << "error connecting to seed " << servers[i] << ": " << errmsg << endl; @@ -221,19 +227,17 @@ namespace mongo { } HostAndPort ReplicaSetMonitor::getSlave() { - int x = rand() % _nodes.size(); - { - scoped_lock lk( _lock ); - for ( unsigned i=0; i<_nodes.size(); i++ ) { - int p = ( i + x ) % _nodes.size(); - if ( p == _master ) - continue; - if ( _nodes[p].ok ) - return _nodes[p].addr; - } + + scoped_lock lk( _lock ); + for ( unsigned i=0; i<_nodes.size(); i++ ) { + _nextSlave = ( _nextSlave + 1 ) % _nodes.size(); + if ( _nextSlave == _master ) + continue; + if ( _nodes[ _nextSlave ].ok ) + return _nodes[ _nextSlave ].addr; } - return _nodes[0].addr; + return _nodes[ 0 ].addr; } /** @@ -292,6 +296,10 @@ namespace mongo { newConn->connect( h , temp ); { scoped_lock lk( _lock ); + if ( _find_inlock( toCheck ) >= 0 ) { + // we need this check inside the lock so there isn't thread contention on adding to vector + continue; + } _nodes.push_back( Node( h , newConn ) ); } log() << "updated set (" << _name << ") to: " << getServerAddress() << endl; @@ -309,10 +317,9 @@ namespace mongo { BSONObj o; c->isMaster(isMaster, &o); - log( ! verbose ) << "ReplicaSetMonitor::_checkConnection: " << c->toString() << ' ' << o << '\n'; + log( ! verbose ) << "ReplicaSetMonitor::_checkConnection: " << c->toString() << ' ' << o << endl; // add other nodes - string maybePrimary; if ( o["hosts"].type() == Array ) { if ( o["primary"].type() == String ) maybePrimary = o["primary"].String(); @@ -394,12 +401,17 @@ namespace mongo { int ReplicaSetMonitor::_find( const string& server ) const { scoped_lock lk( _lock ); + return _find_inlock( server ); + } + + int ReplicaSetMonitor::_find_inlock( const string& server ) const { for ( unsigned i=0; i<_nodes.size(); i++ ) if ( _nodes[i].addr == server ) return i; return -1; } + int ReplicaSetMonitor::_find( const HostAndPort& server ) const { scoped_lock lk( _lock ); for ( unsigned i=0; i<_nodes.size(); i++ ) @@ -426,7 +438,7 @@ namespace mongo { DBClientConnection * DBClientReplicaSet::checkMaster() { HostAndPort h = _monitor->getMaster(); - if ( h == _masterHost ) { + if ( h == _masterHost && _master ) { // a master is selected. let's just make sure connection didn't die if ( ! _master->isFailed() ) return _master.get(); @@ -447,7 +459,7 @@ namespace mongo { DBClientConnection * DBClientReplicaSet::checkSlave() { HostAndPort h = _monitor->getSlave( _slaveHost ); - if ( h == _slaveHost ) { + if ( h == _slaveHost && _slave ) { if ( ! _slave->isFailed() ) return _slave.get(); _monitor->notifySlaveFailure( _slaveHost ); @@ -534,8 +546,8 @@ namespace mongo { try { return checkSlave()->query(ns,query,nToReturn,nToSkip,fieldsToReturn,queryOptions,batchSize); } - catch ( DBException & ) { - LOG(1) << "can't query replica set slave: " << _slaveHost << endl; + catch ( DBException &e ) { + log() << "can't query replica set slave " << i << " : " << _slaveHost << e.what() << endl; } } } @@ -552,8 +564,8 @@ namespace mongo { try { return checkSlave()->findOne(ns,query,fieldsToReturn,queryOptions); } - catch ( DBException & ) { - LOG(1) << "can't query replica set slave: " << _slaveHost << endl; + catch ( DBException &e ) { + LOG(1) << "can't findone replica set slave " << i << " : " << _slaveHost << e.what() << endl; } } } @@ -588,8 +600,8 @@ namespace mongo { *actualServer = s->getServerAddress(); return s->call( toSend , response , assertOk ); } - catch ( DBException & ) { - log(1) << "can't query replica set slave: " << _slaveHost << endl; + catch ( DBException &e ) { + LOG(1) << "can't call replica set slave " << i << " : " << _slaveHost << e.what() << endl; if ( actualServer ) *actualServer = ""; } diff --git a/client/dbclient_rs.h b/client/dbclient_rs.h index fca6e6e..e942d7b 100644 --- a/client/dbclient_rs.h +++ b/client/dbclient_rs.h @@ -124,6 +124,7 @@ namespace mongo { bool _checkConnection( DBClientConnection * c , string& maybePrimary , bool verbose ); int _find( const string& server ) const ; + int _find_inlock( const string& server ) const ; int _find( const HostAndPort& server ) const ; mutable mongo::mutex _lock; // protects _nodes @@ -147,7 +148,7 @@ namespace mongo { vector<Node> _nodes; int _master; // which node is the current master. -1 means no master is known - + int _nextSlave; // which node is the current slave static mongo::mutex _setsLock; // protects _sets static map<string,ReplicaSetMonitorPtr> _sets; // set name to Monitor diff --git a/client/examples/clientTest.cpp b/client/examples/clientTest.cpp index bd4432e..96c014e 100644 --- a/client/examples/clientTest.cpp +++ b/client/examples/clientTest.cpp @@ -224,5 +224,27 @@ int main( int argc, const char **argv ) { } } + { + //Map Reduce (this mostly just tests that it compiles with all output types) + const string ns = "test.mr"; + conn.insert(ns, BSON("a" << 1)); + conn.insert(ns, BSON("a" << 1)); + + const char* map = "function() { emit(this.a, 1); }"; + const char* reduce = "function(key, values) { return Array.sum(values); }"; + + const string outcoll = ns + ".out"; + + BSONObj out; + out = conn.mapreduce(ns, map, reduce, BSONObj()); // default to inline + //MONGO_PRINT(out); + out = conn.mapreduce(ns, map, reduce, BSONObj(), outcoll); + //MONGO_PRINT(out); + out = conn.mapreduce(ns, map, reduce, BSONObj(), outcoll.c_str()); + //MONGO_PRINT(out); + out = conn.mapreduce(ns, map, reduce, BSONObj(), BSON("reduce" << outcoll)); + //MONGO_PRINT(out); + } + cout << "client test finished!" << endl; } diff --git a/db/btree.cpp b/db/btree.cpp index d547a1b..242c534 100644 --- a/db/btree.cpp +++ b/db/btree.cpp @@ -1188,8 +1188,6 @@ namespace mongo { out() << " " << thisLoc.toString() << ".insertHere " << key.toString() << '/' << recordLoc.toString() << ' ' << lchild.toString() << ' ' << rchild.toString() << " keypos:" << keypos << endl; - DiskLoc oldLoc = thisLoc; - if ( !basicInsert(thisLoc, keypos, recordLoc, key, order) ) { thisLoc.btreemod()->split(thisLoc, keypos, recordLoc, key, order, lchild, rchild, idx); return; diff --git a/db/cmdline.cpp b/db/cmdline.cpp index 900a782..2a10fb5 100644 --- a/db/cmdline.cpp +++ b/db/cmdline.cpp @@ -20,6 +20,7 @@ #include "cmdline.h" #include "commands.h" #include "../util/processinfo.h" +#include "../util/message.h" #include "security_key.h" #ifdef _WIN32 @@ -50,6 +51,7 @@ namespace mongo { ("quiet", "quieter output") ("port", po::value<int>(&cmdLine.port), "specify port number") ("bind_ip", po::value<string>(&cmdLine.bind_ip), "comma separated list of ip addresses to listen on - all local ips by default") + ("maxConns",po::value<int>(), "max number of simultaneous connections") ("logpath", po::value<string>() , "log file to send write to instead of stdout - has to be a file, not directory" ) ("logappend" , "append to logpath instead of over-writing" ) ("pidfilepath", po::value<string>(), "full path to pidfile (if not set, no pidfile is created)") @@ -163,6 +165,19 @@ namespace mongo { cmdLine.quiet = true; } + if ( params.count( "maxConns" ) ) { + int newSize = params["maxConns"].as<int>(); + if ( newSize < 5 ) { + out() << "maxConns has to be at least 5" << endl; + dbexit( EXIT_BADOPTIONS ); + } + else if ( newSize >= 10000000 ) { + out() << "maxConns can't be greater than 10000000" << endl; + dbexit( EXIT_BADOPTIONS ); + } + connTicketHolder.resize( newSize ); + } + string logpath; #ifndef _WIN32 diff --git a/db/commands.cpp b/db/commands.cpp index 770d035..30bdc54 100644 --- a/db/commands.cpp +++ b/db/commands.cpp @@ -127,7 +127,6 @@ namespace mongo { if ( strcmp(p, ".$cmd") != 0 ) return false; bool ok = false; - bool valid = false; BSONElement e = jsobj.firstElement(); map<string,Command*>::iterator i; @@ -138,7 +137,6 @@ namespace mongo { migrated over to the command object format. */ else if ( (i = _commands->find(e.fieldName())) != _commands->end() ) { - valid = true; string errmsg; Command *c = i->second; if ( c->adminOnly() && !startsWith(ns, "admin.") ) { diff --git a/db/commands/distinct.cpp b/db/commands/distinct.cpp index 2e26bcd..7b2f6a8 100644 --- a/db/commands/distinct.cpp +++ b/db/commands/distinct.cpp @@ -109,7 +109,7 @@ namespace mongo { int now = bb.len(); - uassert(10044, "distinct too big, 4mb cap", ( now + e.size() + 1024 ) < bufSize ); + uassert(10044, "distinct too big, 16mb cap", ( now + e.size() + 1024 ) < bufSize ); arr.append( e ); BSONElement x( start + now ); diff --git a/db/commands/mr.cpp b/db/commands/mr.cpp index 16c604a..b9f5b59 100644 --- a/db/commands/mr.cpp +++ b/db/commands/mr.cpp @@ -758,7 +758,18 @@ namespace mongo { BSONObj fast_emit( const BSONObj& args ) { uassert( 10077 , "fast_emit takes 2 args" , args.nFields() == 2 ); uassert( 13069 , "an emit can't be more than half max bson size" , args.objsize() < ( BSONObjMaxUserSize / 2 ) ); - (*_tl)->emit( args ); + + if ( args.firstElement().type() == Undefined ) { + BSONObjBuilder b( args.objsize() ); + b.appendNull( "" ); + BSONObjIterator i( args ); + i.next(); + b.append( i.next() ); + (*_tl)->emit( b.obj() ); + } + else { + (*_tl)->emit( args ); + } return BSONObj(); } diff --git a/db/cursor.h b/db/cursor.h index 9797d66..d17b698 100644 --- a/db/cursor.h +++ b/db/cursor.h @@ -113,6 +113,8 @@ namespace mongo { // The implementation may return different matchers depending on the // position of the cursor. If matcher() is nonzero at the start, // matcher() should be checked each time advance() is called. + // Implementations which generate their own matcher should return this + // to avoid a matcher being set manually. virtual CoveredIndexMatcher *matcher() const { return 0; } // A convenience function for setting the value of matcher() manually @@ -639,7 +639,6 @@ int main(int argc, char* argv[]) { ("journalOptions", po::value<int>(), "journal diagnostic options") ("ipv6", "enable IPv6 support (disabled by default)") ("jsonp","allow JSONP access via http (has security implications)") - ("maxConns",po::value<int>(), "max number of simultaneous connections") ("noauth", "run without security") ("nohttpinterface", "disable http interface") ("noprealloc", "disable data file preallocation - will often hurt performance") @@ -996,18 +995,6 @@ int main(int argc, char* argv[]) { if ( params.count( "profile" ) ) { cmdLine.defaultProfile = params["profile"].as<int>(); } - if ( params.count( "maxConns" ) ) { - int newSize = params["maxConns"].as<int>(); - if ( newSize < 5 ) { - out() << "maxConns has to be at least 5" << endl; - dbexit( EXIT_BADOPTIONS ); - } - else if ( newSize >= 10000000 ) { - out() << "maxConns can't be greater than 10000000" << endl; - dbexit( EXIT_BADOPTIONS ); - } - connTicketHolder.resize( newSize ); - } if (params.count("nounixsocket")) { noUnixSocket = true; } diff --git a/db/dbcommands.cpp b/db/dbcommands.cpp index 8974bd3..cf0857a 100644 --- a/db/dbcommands.cpp +++ b/db/dbcommands.cpp @@ -94,9 +94,10 @@ namespace mongo { virtual void help( stringstream& help ) const { help << "return error status of the last operation on this connection\n" << "options:\n" - << " fsync - fsync before returning, or wait for journal commit if running with --dur\n" - << " w - await replication to w servers (including self) before returning\n" - << " wtimeout - timeout for w in milliseconds"; + << " { fsync:true } - fsync before returning, or wait for journal commit if running with --journal\n" + << " { j:true } - wait for journal commit if running with --journal\n" + << " { w:n } - await replication to n servers (including self) before returning\n" + << " { wtimeout:m} - timeout for w in m milliseconds"; } bool run(const string& dbname, BSONObj& _cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl) { LastError *le = lastError.disableForCommand(); @@ -125,7 +126,17 @@ namespace mongo { } } - if ( cmdObj["fsync"].trueValue() ) { + if ( cmdObj["j"].trueValue() ) { + if( !getDur().awaitCommit() ) { + // --journal is off + result.append("jnote", "journaling not enabled on this server"); + } + if( cmdObj["fsync"].trueValue() ) { + errmsg = "fsync and j options are not used together"; + return false; + } + } + else if ( cmdObj["fsync"].trueValue() ) { Timer t; if( !getDur().awaitCommit() ) { // if get here, not running with --dur @@ -1542,7 +1553,7 @@ namespace mongo { uassert( 13049, "godinsert must specify a collection", !coll.empty() ); string ns = dbname + "." + coll; BSONObj obj = cmdObj[ "obj" ].embeddedObjectUserCheck(); - DiskLoc loc = theDataFileMgr.insertWithObjMod( ns.c_str(), obj, true ); + theDataFileMgr.insertWithObjMod( ns.c_str(), obj, true ); return true; } } cmdGodInsert; diff --git a/db/dur_commitjob.cpp b/db/dur_commitjob.cpp index 2d57ffd..c67f37c 100644 --- a/db/dur_commitjob.cpp +++ b/db/dur_commitjob.cpp @@ -200,8 +200,17 @@ namespace mongo { log() << "debug nsincecommitifneeded:" << _nSinceCommitIfNeededCall << " bytes:" << _bytes << endl; } #endif - if ( _bytes >= UncommittedBytesLimit * 3 ) { - wassert( ! "DR102 too much data written uncommitted" ); + if (_bytes > UncommittedBytesLimit * 3) { + static time_t lastComplain; + static unsigned nComplains; + // throttle logging + if( ++nComplains < 100 || time(0) - lastComplain >= 60 ) { + lastComplain = time(0); + log() << "replSet warning DR102 too much data written uncommitted " << _bytes/1000000.0 << "MB" << endl; + if( nComplains < 10 || nComplains % 10 == 0 ) { + wassert(!"replSet warning DR102 too much data written uncommitted"); + } + } } } } diff --git a/db/geo/2d.cpp b/db/geo/2d.cpp index 934ee80..d6c97f6 100644 --- a/db/geo/2d.cpp +++ b/db/geo/2d.cpp @@ -1144,6 +1144,10 @@ namespace mongo { virtual long long nscanned() { return _nscanned; } + virtual CoveredIndexMatcher *matcher() const { + return _s->_hopper->_matcher.get(); + } + shared_ptr<GeoSearch> _s; GeoHopper::Holder::iterator _cur; GeoHopper::Holder::iterator _end; @@ -1212,6 +1216,9 @@ namespace mongo { virtual DiskLoc currLoc() { assert(ok()); return _cur._loc; } virtual BSONObj currKey() const { return _cur._key; } + virtual CoveredIndexMatcher *matcher() const { + return _matcher.get(); + } virtual bool moreToDo() = 0; virtual void fillStack() = 0; diff --git a/db/index.cpp b/db/index.cpp index c696e27..218ecb3 100644 --- a/db/index.cpp +++ b/db/index.cpp @@ -127,7 +127,6 @@ namespace mongo { void getIndexChanges(vector<IndexChanges>& v, NamespaceDetails& d, BSONObj newObj, BSONObj oldObj, bool &changedId) { int z = d.nIndexesBeingBuilt(); v.resize(z); - NamespaceDetails::IndexIterator i = d.ii(); for( int i = 0; i < z; i++ ) { IndexDetails& idx = d.idx(i); BSONObj idxKey = idx.info.obj().getObjectField("key"); // eg { ts : 1 } diff --git a/db/instance.cpp b/db/instance.cpp index 3b668ee..bb2d9a5 100644 --- a/db/instance.cpp +++ b/db/instance.cpp @@ -488,7 +488,7 @@ namespace mongo { writelock lk(ns); // if this ever moves to outside of lock, need to adjust check Client::Context::_finishInit - if ( ! broadcast & handlePossibleShardedMessage( m , 0 ) ) + if ( ! broadcast && handlePossibleShardedMessage( m , 0 ) ) return; Client::Context ctx(ns); diff --git a/db/lasterror.cpp b/db/lasterror.cpp index 4643aa9..240c84b 100644 --- a/db/lasterror.cpp +++ b/db/lasterror.cpp @@ -70,8 +70,10 @@ namespace mongo { b.appendBool( "updatedExisting", updatedExisting == True ); if ( upsertedId.isSet() ) b.append( "upserted" , upsertedId ); - if ( writebackId.isSet() ) + if ( writebackId.isSet() ) { b.append( "writeback" , writebackId ); + b.append( "instanceIdent" , prettyHostName() ); // this can be any unique string + } b.appendNumber( "n", nObjects ); return ! msg.empty(); diff --git a/db/query.cpp b/db/query.cpp index 7f23ac8..671e714 100644 --- a/db/query.cpp +++ b/db/query.cpp @@ -1163,7 +1163,7 @@ namespace mongo { cc = new ClientCursor(queryOptions, multi, ns, jsobj.getOwned()); } else { - cursor->setMatcher( dqo.matcher() ); + if( ! cursor->matcher() ) cursor->setMatcher( dqo.matcher() ); cc = new ClientCursor( queryOptions, cursor, ns, jsobj.getOwned() ); } cursorid = cc->cursorid(); diff --git a/db/queryoptimizer.cpp b/db/queryoptimizer.cpp index 0b9dce7..4eb2a99 100644 --- a/db/queryoptimizer.cpp +++ b/db/queryoptimizer.cpp @@ -914,7 +914,8 @@ doneCheckOrder: } if ( !id ) { - errmsg = (string)"no index found for specified keyPattern: " + keyPattern.toString(); + errmsg = str::stream() << "no index found for specified keyPattern: " << keyPattern.toString() + << " min: " << min << " max: " << max; return 0; } diff --git a/db/queryoptimizer.h b/db/queryoptimizer.h index cf3180a..ebd264e 100644 --- a/db/queryoptimizer.h +++ b/db/queryoptimizer.h @@ -449,7 +449,8 @@ namespace mongo { auto_ptr< FieldRangeSet > frs( new FieldRangeSet( ns, query ) ); auto_ptr< FieldRangeSet > origFrs( new FieldRangeSet( *frs ) ); shared_ptr< Cursor > ret = QueryPlanSet( ns, frs, origFrs, query, sort ).getBestGuess()->newCursor(); - if ( !query.isEmpty() ) { + // If we don't already have a matcher, supply one. + if ( !query.isEmpty() && ! ret->matcher() ) { shared_ptr< CoveredIndexMatcher > matcher( new CoveredIndexMatcher( query, ret->indexKeyPattern() ) ); ret->setMatcher( matcher ); } diff --git a/db/repl/consensus.cpp b/db/repl/consensus.cpp index f764abe..dadb22e 100644 --- a/db/repl/consensus.cpp +++ b/db/repl/consensus.cpp @@ -154,6 +154,12 @@ namespace mongo { log() << "couldn't find member with id " << whoid << rsLog; vote = -10000; } + else if( primary && primary == rs._self && rs.lastOpTimeWritten >= hopeful->hbinfo().opTime ) { + // hbinfo is not updated, so we have to check the primary's last optime separately + log() << "I am already primary, " << hopeful->fullName() + << " can try again once I've stepped down" << rsLog; + vote = -10000; + } else if( primary && primary->hbinfo().opTime >= hopeful->hbinfo().opTime ) { // other members might be aware of more up-to-date nodes log() << hopeful->fullName() << " is trying to elect itself but " << diff --git a/db/repl/manager.cpp b/db/repl/manager.cpp index ed39c31..d2e0764 100644 --- a/db/repl/manager.cpp +++ b/db/repl/manager.cpp @@ -64,6 +64,9 @@ namespace mongo { void Manager::starting() { Client::initThread("rs Manager"); + if (!noauth) { + cc().getAuthenticationInfo()->authorize("local"); + } } void Manager::noteARemoteIsPrimary(const Member *m) { diff --git a/db/repl/replset_commands.cpp b/db/repl/replset_commands.cpp index dc8567a..1d110ac 100644 --- a/db/repl/replset_commands.cpp +++ b/db/repl/replset_commands.cpp @@ -274,7 +274,7 @@ namespace mongo { s << p("Not using --replSet"); else { s << p("Still starting up, or else set is not yet " + a("http://www.mongodb.org/display/DOCS/Replica+Set+Configuration#InitialSetup", "", "initiated") - + ".<br>" + ReplSet::startupStatusMsg); + + ".<br>" + ReplSet::startupStatusMsg.get()); } } else { @@ -305,7 +305,7 @@ namespace mongo { s << p("Not using --replSet"); else { s << p("Still starting up, or else set is not yet " + a("http://www.mongodb.org/display/DOCS/Replica+Set+Configuration#InitialSetup", "", "initiated") - + ".<br>" + ReplSet::startupStatusMsg); + + ".<br>" + ReplSet::startupStatusMsg.get()); } } else { diff --git a/db/repl/rs.cpp b/db/repl/rs.cpp index 90ed9f4..bbfb057 100644 --- a/db/repl/rs.cpp +++ b/db/repl/rs.cpp @@ -321,7 +321,7 @@ namespace mongo { } ReplSetImpl::StartupStatus ReplSetImpl::startupStatus = PRESTART; - string ReplSetImpl::startupStatusMsg; + DiagStr ReplSetImpl::startupStatusMsg; extern BSONObj *getLastErrorDefault; diff --git a/db/repl/rs.h b/db/repl/rs.h index 1419ad6..ea9aef1 100644 --- a/db/repl/rs.h +++ b/db/repl/rs.h @@ -244,7 +244,7 @@ namespace mongo { EMPTYUNREACHABLE=4, STARTED=5, SOON=6 }; static StartupStatus startupStatus; - static string startupStatusMsg; + static DiagStr startupStatusMsg; static string stateAsHtml(MemberState state); /* todo thread */ @@ -420,7 +420,7 @@ namespace mongo { } if( theReplSet == 0 ) { result.append("startupStatus", ReplSet::startupStatus); - errmsg = ReplSet::startupStatusMsg.empty() ? "replset unknown error 2" : ReplSet::startupStatusMsg; + errmsg = ReplSet::startupStatusMsg.empty() ? "replset unknown error 2" : ReplSet::startupStatusMsg.get(); if( ReplSet::startupStatus == 3 ) result.append("info", "run rs.initiate(...) if not yet done for the set"); return false; diff --git a/db/repl/rs_member.h b/db/repl/rs_member.h index 017b6ea..b685c04 100644 --- a/db/repl/rs_member.h +++ b/db/repl/rs_member.h @@ -19,6 +19,8 @@ #pragma once +#include "../../util/concurrency/value.h" + namespace mongo { @@ -74,7 +76,7 @@ namespace mongo { time_t upSince; long long downSince; time_t lastHeartbeat; - string lastHeartbeatMsg; + DiagStr lastHeartbeatMsg; OpTime opTime; int skew; diff --git a/db/update.cpp b/db/update.cpp index e79d3d5..e53f2af 100644 --- a/db/update.cpp +++ b/db/update.cpp @@ -987,7 +987,7 @@ namespace mongo { BSONObj newObj = mss->createNewFromMods(); checkTooLarge(newObj); assert(nsdt); - DiskLoc newLoc = theDataFileMgr.updateRecord(ns, d, nsdt, r, loc , newObj.objdata(), newObj.objsize(), debug); + theDataFileMgr.updateRecord(ns, d, nsdt, r, loc , newObj.objdata(), newObj.objsize(), debug); } if ( logop ) { diff --git a/dbtests/queryoptimizertests.cpp b/dbtests/queryoptimizertests.cpp index acf9217..2d6f752 100644 --- a/dbtests/queryoptimizertests.cpp +++ b/dbtests/queryoptimizertests.cpp @@ -1558,7 +1558,6 @@ namespace QueryOptimizerTests { theDataFileMgr.insertWithObjMod( ns(), temp ); } BSONObj hint = fromjson( "{$hint:{a:1,b:1}}" ); - BSONElement hintElt = hint.firstElement(); auto_ptr< FieldRangeSet > frs( new FieldRangeSet( ns(), fromjson( "{a:5,b:{$in:[2,3,6,9,11]}}" ) ) ); QueryPlan qp( nsd(), 1, *frs, *frs, fromjson( "{a:5,b:{$in:[2,3,6,9,11]}}" ), BSONObj() ); boost::shared_ptr<Cursor> c = qp.newCursor(); @@ -1581,7 +1580,6 @@ namespace QueryOptimizerTests { theDataFileMgr.insertWithObjMod( ns(), temp ); } BSONObj hint = fromjson( "{$hint:{a:1,b:1}}" ); - BSONElement hintElt = hint.firstElement(); auto_ptr< FieldRangeSet > frs( new FieldRangeSet( ns(), fromjson( "{a:{$gte:5},b:{$in:[2,3,6,9,11]}}" ) ) ); QueryPlan qp( nsd(), 1, *frs, *frs, fromjson( "{a:{$gte:5},b:{$in:[2,3,6,9,11]}}" ), BSONObj() ); boost::shared_ptr<Cursor> c = qp.newCursor(); diff --git a/dbtests/spin_lock_test.cpp b/dbtests/spin_lock_test.cpp index 01eb7b3..4b24aba 100644 --- a/dbtests/spin_lock_test.cpp +++ b/dbtests/spin_lock_test.cpp @@ -70,7 +70,7 @@ namespace { public: void run() { -#if defined(__GCC_HAVE_SYNC_COMPARE_AND_SWAP_4) +#if defined(__GCC_HAVE_SYNC_COMPARE_AND_SWAP_4) || defined(_WIN32) SpinLock spin; int counter = 0; @@ -93,10 +93,12 @@ namespace { ASSERT_EQUALS( counter, threads*incs ); #else - - // WARNING "TODO Missing spin lock in this platform." - ASSERT( true ); - + warning() << "spin lock slow on this platform" << endl; + +#if defined(__linux__) + // we don't want to have linux binaries without a fast spinlock + //ASSERT( false ); TODO SERVER-3075 +#endif #endif diff --git a/doxygenConfig b/doxygenConfig index 90bf64e..041c65f 100644 --- a/doxygenConfig +++ b/doxygenConfig @@ -3,7 +3,7 @@ #--------------------------------------------------------------------------- DOXYFILE_ENCODING = UTF-8 PROJECT_NAME = MongoDB -PROJECT_NUMBER = 1.8.1 +PROJECT_NUMBER = 1.8.2 OUTPUT_DIRECTORY = docs/doxygen CREATE_SUBDIRS = NO OUTPUT_LANGUAGE = English diff --git a/jstests/check_shard_index.js b/jstests/check_shard_index.js index a5a1fc1..e63b79d 100644 --- a/jstests/check_shard_index.js +++ b/jstests/check_shard_index.js @@ -42,4 +42,9 @@ assert.eq( 3 , f.count() , "2. count after initial insert should be 3" ); res = db.runCommand( { checkShardingIndex: "test.jstests_shardingindex" , keyPattern: {x:1, y:1} , force: true }); assert.eq( false , res.ok , "2b " + tojson(res) ); +// +res = db.runCommand( { checkShardingIndex: "test.jstests_shardingindex" , keyPattern: {_id:1} , force: true }); +assert.eq( true , res.ok , "3a " + tojson(res) ); +assert( res.idskip , "3b " + tojson(res) ) + print("PASSED"); diff --git a/jstests/mr_undef.js b/jstests/mr_undef.js new file mode 100644 index 0000000..e162f99 --- /dev/null +++ b/jstests/mr_undef.js @@ -0,0 +1,22 @@ + +t = db.mr_undef +t.drop() + +outname = "mr_undef_out" +out = db[outname] +out.drop() + +t.insert({x : 0}) + +var m = function() { emit(this.mod, this.x); } +var r = function(k,v) { total = 0; for(i in v) { total+= v[i]; } return total; } + +res = t.mapReduce(m, r, {out : outname } ) + +assert.eq( 0 , out.find( { _id : { $type : 6 } } ).itcount() , "A1" ) +assert.eq( 1 , out.find( { _id : { $type : 10 } } ).itcount() , "A2" ) + +x = out.findOne() +assert.eq( x , out.findOne( { _id : x["_id"] } ) , "A3" ) + + diff --git a/jstests/or5.js b/jstests/or5.js index baa6bd6..98ff141 100644 --- a/jstests/or5.js +++ b/jstests/or5.js @@ -70,6 +70,7 @@ assert.eq.automsg( "6", "t.find( {$or:[{a:2},{b:3},{c:4}]} ).batchSize( 2 ).itco c = t.find( {$or:[{a:2},{b:3},{c:4}]} ).batchSize( 2 ); c.next(); t.remove( {b:3} ); +db.getLastError(); assert.eq.automsg( "3", c.itcount() ); reset(); @@ -78,6 +79,7 @@ c = t.find( {$or:[{a:2},{b:3},{c:4}]} ).batchSize( 2 ); c.next(); c.next(); t.remove( {b:3} ); +db.getLastError(); assert.eq.automsg( "2", c.itcount() ); reset(); @@ -87,6 +89,7 @@ c.next(); c.next(); c.next(); t.remove( {b:3} ); +db.getLastError(); assert.eq.automsg( "3", c.itcount() ); reset(); @@ -97,6 +100,7 @@ c.next(); c.next(); c.next(); t.remove( {b:3} ); +db.getLastError(); assert.eq.automsg( "2", c.itcount() ); t.drop(); diff --git a/jstests/replsets/auth1.js b/jstests/replsets/auth1.js index 4945869..60e4b95 100644 --- a/jstests/replsets/auth1.js +++ b/jstests/replsets/auth1.js @@ -182,3 +182,15 @@ wait(function() { return results.members[3].state == 2; }); +print("make sure it has the config, too"); +assert.soon(function() { + for (var i in rs.nodes) { + rs.nodes[i].setSlaveOk(); + rs.nodes[i].getDB("admin").auth("foo","bar"); + config = rs.nodes[i].getDB("local").system.replset.findOne(); + if (config.version != 2) { + return false; + } + } + return true; + }); diff --git a/jstests/replsets/initial_sync1.js b/jstests/replsets/initial_sync1.js index ee30b4e..df978c4 100644 --- a/jstests/replsets/initial_sync1.js +++ b/jstests/replsets/initial_sync1.js @@ -114,9 +114,6 @@ wait(function() { }); -/** - * TODO: this fails on buildbot - * see SERVER-2550 print("10. Insert some stuff"); master = replTest.getMaster(); for (var i=0; i<10000; i++) { @@ -126,4 +123,4 @@ for (var i=0; i<10000; i++) { print("11. Everyone happy eventually"); replTest.awaitReplication(); -*/ + diff --git a/jstests/sharding/multi_mongos2a.js b/jstests/sharding/multi_mongos2a.js new file mode 100644 index 0000000..9b907cc --- /dev/null +++ b/jstests/sharding/multi_mongos2a.js @@ -0,0 +1,31 @@ +// multi_mongos2.js +// This tests sharding an existing collection that both shards are aware of (SERVER-2828) + + +// setup sharding with two mongos, s1 and s2 +s1 = new ShardingTest( "multi_mongos1" , 2 , 1 , 2 ); +s2 = s1._mongos[1]; + +s1.adminCommand( { enablesharding : "test" } ); +s1.adminCommand( { shardcollection : "test.foo" , key : { num : 1 } } ); + +s1.config.databases.find().forEach( printjson ) + +s1.getDB('test').existing.insert({_id:1}) +assert.eq(1, s1.getDB('test').existing.count({_id:1})); +assert.eq(1, s2.getDB('test').existing.count({_id:1})); + +s2.adminCommand( { shardcollection : "test.existing" , key : { _id : 1 } } ); +assert.eq(true, s2.getDB('test').existing.stats().sharded); + + +res = s2.getDB( "admin" ).runCommand( { moveChunk: "test.existing" , find : { _id : 1 } , to : s1.getOther( s1.getServer( "test" ) ).name } ); + +assert.eq(1 , res.ok, tojson(res)); + +s1.adminCommand( { flushRouterConfig : 1 } ) + +assert.eq(1, s1.getDB('test').existing.count({_id:1})); // SERVER-2828 +assert.eq(1, s2.getDB('test').existing.count({_id:1})); + +s1.stop(); diff --git a/jstests/sharding/shard_insert_getlasterror_w2.js b/jstests/sharding/shard_insert_getlasterror_w2.js index c722f21..5d185a5 100644 --- a/jstests/sharding/shard_insert_getlasterror_w2.js +++ b/jstests/sharding/shard_insert_getlasterror_w2.js @@ -51,7 +51,7 @@ function go() { db.foo.insert({_id:'a', x:1}); db.foo.insert({_id:'a', x:1}); var x = db.getLastErrorObj(2, 30000) - assert.neq(x.err, null, tojson(x)); + assert.neq(x.err, null, "C1 " + tojson(x)); // Add more data for (var i = N; i < 2*N; i++) { @@ -59,7 +59,7 @@ function go() { var x = db.getLastErrorObj(2, 30000) // wait to be copied to at least one secondary if (i % 30 == 0) print(i) if (i % 100 == 0 || x.err != null) printjson(x); - assert.eq(x.err, null, tojson(x)); + assert.eq(x.err, null, "C2 " + tojson(x)); } // take down the slave and make sure it fails over diff --git a/rpm/mongo.spec b/rpm/mongo.spec index 1e0cd0d..03a9bc4 100644 --- a/rpm/mongo.spec +++ b/rpm/mongo.spec @@ -1,5 +1,5 @@ Name: mongo -Version: 1.8.1 +Version: 1.8.2 Release: mongodb_1%{?dist} Summary: mongo client shell and tools License: AGPL 3.0 diff --git a/s/balance.cpp b/s/balance.cpp index ee0c992..8b01ea7 100644 --- a/s/balance.cpp +++ b/s/balance.cpp @@ -276,20 +276,21 @@ namespace mongo { try { - // first make sure we should even be running + ScopedDbConnection conn( config ); + + // ping has to be first so we keep things in the config server in sync + _ping( conn.conn() ); + + // now make sure we should even be running if ( ! grid.shouldBalance() ) { log(1) << "skipping balancing round because balancing is disabled" << endl; + conn.done(); + sleepsecs( 30 ); continue; } - - ScopedDbConnection conn( config ); - - _ping( conn.conn() ); - if ( ! _checkOIDs() ) { - uassert( 13258 , "oids broken after resetting!" , _checkOIDs() ); - } + uassert( 13258 , "oids broken after resetting!" , _checkOIDs() ); // use fresh shard state Shard::reloadShardInfo(); diff --git a/s/balancer_policy.cpp b/s/balancer_policy.cpp index 2098a1f..482fab0 100644 --- a/s/balancer_policy.cpp +++ b/s/balancer_policy.cpp @@ -40,6 +40,8 @@ namespace mongo { pair<string,unsigned> max("",0); vector<string> drainingShards; + bool maxOpsQueued = false; + for (ShardToChunksIter i = shardToChunksMap.begin(); i!=shardToChunksMap.end(); ++i ) { // Find whether this shard's capacity or availability are exhausted @@ -67,6 +69,7 @@ namespace mongo { // Draining shards take a lower priority than overloaded shards. if ( size > max.second ) { max = make_pair( shard , size ); + maxOpsQueued = opsQueued; } if ( draining && (size > 0)) { drainingShards.push_back( shard ); @@ -80,6 +83,11 @@ namespace mongo { return NULL; } + if ( maxOpsQueued ) { + log() << "biggest shard has unprocessed writebacks, waiting for completion of migrate" << endl; + return NULL; + } + log(1) << "collection : " << ns << endl; log(1) << "donor : " << max.second << " chunks on " << max.first << endl; log(1) << "receiver : " << min.second << " chunks on " << min.first << endl; diff --git a/s/chunk.cpp b/s/chunk.cpp index b2ad03d..1e473e2 100644 --- a/s/chunk.cpp +++ b/s/chunk.cpp @@ -175,7 +175,7 @@ namespace mongo { conn.done(); } - ChunkPtr Chunk::singleSplit( bool force , BSONObj& res ) { + bool Chunk::singleSplit( bool force , BSONObj& res , ChunkPtr* low, ChunkPtr* high) { vector<BSONObj> splitPoint; // if splitting is not obligatory we may return early if there are not enough data @@ -190,7 +190,7 @@ namespace mongo { // 1 split point means we have between half the chunk size to full chunk size // so we shouldn't split log(1) << "chunk not full enough to trigger auto-split" << endl; - return ChunkPtr(); + return false; } splitPoint.push_back( candidates.front() ); @@ -228,13 +228,24 @@ namespace mongo { if ( splitPoint.empty() || _min == splitPoint.front() || _max == splitPoint.front() ) { log() << "want to split chunk, but can't find split point chunk " << toString() << " got: " << ( splitPoint.empty() ? "<empty>" : splitPoint.front().toString() ) << endl; - return ChunkPtr(); + return false; } - return multiSplit( splitPoint , res ); + if (!multiSplit( splitPoint , res , true )) + return false; + + if (low && high) { + low->reset( new Chunk(_manager, _min, splitPoint[0], _shard)); + high->reset(new Chunk(_manager, splitPoint[0], _max, _shard)); + } + else { + assert(!low && !high); // can't have one without the other + } + + return true; } - ChunkPtr Chunk::multiSplit( const vector<BSONObj>& m , BSONObj& res ) { + bool Chunk::multiSplit( const vector<BSONObj>& m , BSONObj& res , bool resetIfSplit) { const size_t maxSplitPoints = 8192; uassert( 10165 , "can't split as shard doesn't have a manager" , _manager ); @@ -261,27 +272,19 @@ namespace mongo { // reloading won't stricly solve all problems, e.g. the collection's metdata lock can be taken // but we issue here so that mongos may refresh wihtout needing to be written/read against - _manager->_reload(); + grid.getDBConfig(_manager->getns())->getChunkManager(_manager->getns(), true); - return ChunkPtr(); + return false; } conn.done(); - _manager->_reload(); - // The previous multisplit logic adjusted the boundaries of 'this' chunk. Any call to 'this' object hereafter - // will see a different _max for the chunk. - // TODO Untie this dependency since, for metadata purposes, the reload() above already fixed boundaries - { - rwlock lk( _manager->_lock , true ); - - setMax(m[0].getOwned()); - DEV assert( shared_from_this() ); - _manager->_chunkMap[_max] = shared_from_this(); - } + if ( resetIfSplit ) { + // force reload of chunks + grid.getDBConfig(_manager->getns())->getChunkManager(_manager->getns(), true); + } - // return the second half, if a single split, or the first new chunk, if a multisplit. - return _manager->findChunk( m[0] ); + return true; } bool Chunk::moveAndCommit( const Shard& to , long long chunkSize /* bytes */, BSONObj& res ) { @@ -311,7 +314,7 @@ namespace mongo { // if succeeded, needs to reload to pick up the new location // if failed, mongos may be stale // reload is excessive here as the failure could be simply because collection metadata is taken - _manager->_reload(); + grid.getDBConfig(_manager->getns())->getChunkManager(_manager->getns(), true); return worked; } @@ -334,21 +337,23 @@ namespace mongo { _dataWritten = 0; // reset so we check often enough BSONObj res; - ChunkPtr newShard = singleSplit( false /* does not force a split if not enough data */ , res ); - if ( newShard.get() == NULL ) { + ChunkPtr low; + ChunkPtr high; + bool worked = singleSplit( false /* does not force a split if not enough data */ , res , &low, &high); + if ( !worked ) { // singleSplit would have issued a message if we got here _dataWritten = 0; // this means there wasn't enough data to split, so don't want to try again until considerable more data return false; } log() << "autosplitted " << _manager->getns() << " shard: " << toString() - << " on: " << newShard->getMax() << "(splitThreshold " << splitThreshold << ")" + << " on: " << low->getMax() << "(splitThreshold " << splitThreshold << ")" #ifdef _DEBUG << " size: " << getPhysicalSize() // slow - but can be usefule when debugging #endif << endl; - moveIfShould( newShard ); + low->moveIfShould( high ); return true; @@ -671,7 +676,7 @@ namespace mongo { log() << "successfully created first chunk for " << c->toString() << endl; } - ChunkPtr ChunkManager::findChunk( const BSONObj & obj , bool retry ) { + ChunkPtr ChunkManager::findChunk( const BSONObj & obj) { BSONObj key = _key.extractKey(obj); { @@ -695,20 +700,13 @@ namespace mongo { PRINT(*c); PRINT(key); - _reload_inlock(); + grid.getDBConfig(getns())->getChunkManager(getns(), true); massert(13141, "Chunk map pointed to incorrect chunk", false); } } - if ( retry ) { - stringstream ss; - ss << "couldn't find a chunk aftry retry which should be impossible extracted: " << key; - throw UserException( 8070 , ss.str() ); - } - - log() << "ChunkManager: couldn't find chunk for: " << key << " going to retry" << endl; - _reload_inlock(); - return findChunk( obj , true ); + massert(8070, str::stream() << "couldn't find a chunk aftry retry which should be impossible extracted: " << key, false); + return ChunkPtr(); // unreachable } ChunkPtr ChunkManager::findChunkOnServer( const Shard& shard ) const { @@ -874,24 +872,26 @@ namespace mongo { configServer.logChange( "dropCollection" , _ns , BSONObj() ); } - void ChunkManager::maybeChunkCollection() { + bool ChunkManager::maybeChunkCollection() { + ensureIndex_inlock(); + uassert( 13346 , "can't pre-split already splitted collection" , (_chunkMap.size() == 1) ); - + ChunkPtr soleChunk = _chunkMap.begin()->second; vector<BSONObj> splitPoints; soleChunk->pickSplitVector( splitPoints , Chunk::MaxChunkSize ); if ( splitPoints.empty() ) { log(1) << "not enough data to warrant chunking " << getns() << endl; - return; + return false; } - + BSONObj res; - ChunkPtr p; - p = soleChunk->multiSplit( splitPoints , res ); - if ( p.get() == NULL ) { + bool worked = soleChunk->multiSplit( splitPoints , res , false ); + if (!worked) { log( LL_WARNING ) << "could not split '" << getns() << "': " << res << endl; - return; + return false; } + return true; } ShardChunkVersion ChunkManager::getVersion( const Shard& shard ) const { @@ -108,18 +108,18 @@ namespace mongo { * @param force if set to true, will split the chunk regardless if the split is really necessary size wise * if set to false, will only split if the chunk has reached the currently desired maximum size * @param res the object containing details about the split execution - * @return if found a key, return a pointer to the first chunk, otherwise return a null pointer + * @return if found a key and split successfully */ - ChunkPtr singleSplit( bool force , BSONObj& res ); + bool singleSplit( bool force , BSONObj& res , ChunkPtr* low=NULL, ChunkPtr* high=NULL); /** * Splits this chunk at the given key (or keys) * * @param splitPoints the vector of keys that should be used to divide this chunk * @param res the object containing details about the split execution - * @return shared pointer to the first new Chunk or null pointer if failed + * @return if split was successful */ - ChunkPtr multiSplit( const vector<BSONObj>& splitPoints , BSONObj& res ); + bool multiSplit( const vector<BSONObj>& splitPoints , BSONObj& res , bool resetIfSplit ); /** * Asks the mongod holding this chunk to find a key that approximately divides this chunk in two @@ -308,13 +308,13 @@ namespace mongo { bool hasShardKey( const BSONObj& obj ); void createFirstChunk( const Shard& shard ); - ChunkPtr findChunk( const BSONObj& obj , bool retry = false ); + ChunkPtr findChunk( const BSONObj& obj ); ChunkPtr findChunkOnServer( const Shard& shard ) const; const ShardKeyPattern& getShardKey() const { return _key; } bool isUnique() const { return _unique; } - void maybeChunkCollection(); + bool maybeChunkCollection(); void getShardsForQuery( set<Shard>& shards , const BSONObj& query ); void getAllShards( set<Shard>& all ); diff --git a/s/client.cpp b/s/client.cpp index 95e3124..c0d25fb 100644 --- a/s/client.cpp +++ b/s/client.cpp @@ -100,7 +100,11 @@ namespace mongo { return; } - all.push_back( WBInfo( cid.numberLong() , w.OID() ) ); + string ident = ""; + if ( gle["instanceIdent"].type() == String ) + ident = gle["instanceIdent"].String(); + + all.push_back( WBInfo( WriteBackListener::ConnectionIdent( ident , cid.numberLong() ) , w.OID() ) ); } vector<BSONObj> ClientInfo::_handleWriteBacks( vector<WBInfo>& all , bool fromWriteBackListener ) { @@ -115,7 +119,7 @@ namespace mongo { } for ( unsigned i=0; i<all.size(); i++ ) { - res.push_back( WriteBackListener::waitFor( all[i].connectionId , all[i].id ) ); + res.push_back( WriteBackListener::waitFor( all[i].ident , all[i].id ) ); } return res; @@ -17,6 +17,7 @@ */ #include "../pch.h" +#include "writeback_listener.h" namespace mongo { @@ -85,8 +86,8 @@ namespace mongo { private: struct WBInfo { - WBInfo( ConnectionId c , OID o ) : connectionId( c ) , id( o ) {} - ConnectionId connectionId; + WBInfo( const WriteBackListener::ConnectionIdent& c , OID o ) : ident( c ) , id( o ) {} + WriteBackListener::ConnectionIdent ident; OID id; }; diff --git a/s/commands_admin.cpp b/s/commands_admin.cpp index 532161a..7677265 100644 --- a/s/commands_admin.cpp +++ b/s/commands_admin.cpp @@ -79,6 +79,20 @@ namespace mongo { } } netstat; + class FlushRouterConfigCmd : public GridAdminCmd { + public: + FlushRouterConfigCmd() : GridAdminCmd("flushRouterConfig") { } + virtual void help( stringstream& help ) const { + help << "flush all router config"; + } + bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool) { + grid.flushConfig(); + result.appendBool( "flushed" , true ); + return true; + } + } flushRouterConfigCmd; + + class ServerStatusCmd : public Command { public: ServerStatusCmd() : Command( "serverStatus" , true ) { @@ -512,9 +526,9 @@ namespace mongo { log() << "splitting: " << ns << " shard: " << chunk << endl; BSONObj res; - ChunkPtr p; + bool worked; if ( middle.isEmpty() ) { - p = chunk->singleSplit( true /* force a split even if not enough data */ , res ); + worked = chunk->singleSplit( true /* force a split even if not enough data */ , res ); } else { @@ -526,10 +540,10 @@ namespace mongo { vector<BSONObj> splitPoints; splitPoints.push_back( middle ); - p = chunk->multiSplit( splitPoints , res ); + worked = chunk->multiSplit( splitPoints , res , true ); } - if ( p.get() == NULL ) { + if ( !worked ) { errmsg = "split failed"; result.append( "cause" , res ); return false; diff --git a/s/commands_public.cpp b/s/commands_public.cpp index 5b1ecaf..f29205b 100644 --- a/s/commands_public.cpp +++ b/s/commands_public.cpp @@ -545,6 +545,10 @@ namespace mongo { bool ok = conn->runCommand( conf->getName() , cmdObj , res ); conn.done(); + if (!ok && res.getIntField("code") == 9996) { // code for StaleConfigException + throw StaleConfigException(fullns, "FindAndModify"); // Command code traps this and re-runs + } + result.appendElements(res); return ok; } diff --git a/s/config.cpp b/s/config.cpp index 35a3be2..9ed3207 100644 --- a/s/config.cpp +++ b/s/config.cpp @@ -143,7 +143,9 @@ namespace mongo { _save(); try { - cm->maybeChunkCollection(); + if ( cm->maybeChunkCollection() ) { + _load(); + } } catch ( UserException& e ) { // failure to chunk is not critical enough to abort the command (and undo the _save()'d configDB state) diff --git a/s/d_logic.cpp b/s/d_logic.cpp index c032883..1ab7c64 100644 --- a/s/d_logic.cpp +++ b/s/d_logic.cpp @@ -103,6 +103,7 @@ namespace mongo { b.append( "ns" , ns ); b.append( "id" , writebackID ); b.append( "connectionId" , cc().getConnectionId() ); + b.append( "instanceIdent" , prettyHostName() ); b.appendTimestamp( "version" , shardingState.getVersion( ns ) ); b.appendTimestamp( "yourVersion" , ShardedConnectionInfo::get( true )->getVersion( ns ) ); b.appendBinData( "msg" , m.header()->len , bdtCustom , (char*)(m.singleData()) ); diff --git a/s/d_migrate.cpp b/s/d_migrate.cpp index 2878276..df12e54 100644 --- a/s/d_migrate.cpp +++ b/s/d_migrate.cpp @@ -273,9 +273,12 @@ namespace mongo { void done() { readlock lk( _ns ); - _deleted.clear(); - _reload.clear(); - _cloneLocs.clear(); + { + scoped_spinlock lk( _trackerLocks ); + _deleted.clear(); + _reload.clear(); + _cloneLocs.clear(); + } _memoryUsed = 0; scoped_lock l(_m); @@ -454,6 +457,7 @@ namespace mongo { while ( cc->ok() ) { DiskLoc dl = cc->currLoc(); if ( ! isLargeChunk ) { + scoped_spinlock lk( _trackerLocks ); _cloneLocs.insert( dl ); } cc->advance(); @@ -480,7 +484,10 @@ namespace mongo { return false; } - log() << "moveChunk number of documents: " << _cloneLocs.size() << endl; + { + scoped_spinlock lk( _trackerLocks ); + log() << "moveChunk number of documents: " << _cloneLocs.size() << endl; + } return true; } @@ -490,29 +497,50 @@ namespace mongo { return false; } - readlock l( _ns ); - Client::Context ctx( _ns ); + ElapsedTracker tracker (128, 10); // same as ClientCursor::_yieldSometimesTracker - NamespaceDetails *d = nsdetails( _ns.c_str() ); - assert( d ); + int allocSize; + { + readlock l(_ns); + Client::Context ctx( _ns ); + NamespaceDetails *d = nsdetails( _ns.c_str() ); + assert( d ); + scoped_spinlock lk( _trackerLocks ); + allocSize = std::min(BSONObjMaxUserSize, (int)((12 + d->averageObjectSize()) * _cloneLocs.size())); + } + BSONArrayBuilder a (allocSize); + + while ( 1 ) { + bool filledBuffer = false; + + readlock l( _ns ); + Client::Context ctx( _ns ); + scoped_spinlock lk( _trackerLocks ); + set<DiskLoc>::iterator i = _cloneLocs.begin(); + for ( ; i!=_cloneLocs.end(); ++i ) { + if (tracker.ping()) // should I yield? + break; + + DiskLoc dl = *i; + BSONObj o = dl.obj(); + + // use the builder size instead of accumulating 'o's size so that we take into consideration + // the overhead of BSONArray indices + if ( a.len() + o.objsize() + 1024 > BSONObjMaxUserSize ) { + filledBuffer = true; // break out of outer while loop + break; + } - BSONArrayBuilder a( std::min( BSONObjMaxUserSize , (int)( ( 12 + d->averageObjectSize() )* _cloneLocs.size() ) ) ); + a.append( o ); + } - set<DiskLoc>::iterator i = _cloneLocs.begin(); - for ( ; i!=_cloneLocs.end(); ++i ) { - DiskLoc dl = *i; - BSONObj o = dl.obj(); + _cloneLocs.erase( _cloneLocs.begin() , i ); - // use the builder size instead of accumulating 'o's size so that we take into consideration - // the overhead of BSONArray indices - if ( a.len() + o.objsize() + 1024 > BSONObjMaxUserSize ) { + if ( _cloneLocs.empty() || filledBuffer ) break; - } - a.append( o ); } result.appendArray( "objects" , a.arr() ); - _cloneLocs.erase( _cloneLocs.begin() , i ); return true; } @@ -525,6 +553,11 @@ namespace mongo { if ( ! db->ownsNS( _ns ) ) return; + + // not needed right now + // but trying to prevent a future bug + scoped_spinlock lk( _trackerLocks ); + _cloneLocs.erase( dl ); } @@ -544,9 +577,13 @@ namespace mongo { BSONObj _min; BSONObj _max; + // we need the lock in case there is a malicious _migrateClone for example + // even though it shouldn't be needed under normal operation + SpinLock _trackerLocks; + // disk locs yet to be transferred from here to the other side - // no locking needed because build by 1 thread in a read lock - // depleted by 1 thread in a read lock + // no locking needed because built initially by 1 thread in a read lock + // emptied by 1 thread in a read lock // updates applied by 1 thread in a write lock set<DiskLoc> _cloneLocs; @@ -1141,6 +1178,8 @@ namespace mongo { assert( state == READY ); assert( ! min.isEmpty() ); assert( ! max.isEmpty() ); + + slaveCount = ( getSlaveCount() / 2 ) + 1; MoveTimingHelper timing( "to" , ns , min , max , 5 /* steps */ ); @@ -1236,11 +1275,32 @@ namespace mongo { break; apply( res , &lastOpApplied ); + + const int maxIterations = 3600*50; + int i; + for ( i=0;i<maxIterations; i++) { + if ( state == ABORT ) { + timing.note( "aborted" ); + return; + } + + if ( opReplicatedEnough( lastOpApplied ) ) + break; + + if ( i > 100 ) { + warning() << "secondaries having hard time keeping up with migrate" << endl; + } - if ( state == ABORT ) { - timing.note( "aborted" ); - return; + sleepmillis( 20 ); } + + if ( i == maxIterations ) { + errmsg = "secondary can't keep up with migrate"; + error() << errmsg << endl; + conn.done(); + state = FAIL; + return; + } } timing.done(4); @@ -1364,14 +1424,17 @@ namespace mongo { return didAnything; } - bool flushPendingWrites( const ReplTime& lastOpApplied ) { + bool opReplicatedEnough( const ReplTime& lastOpApplied ) { // if replication is on, try to force enough secondaries to catch up // TODO opReplicatedEnough should eventually honor priorities and geo-awareness // for now, we try to replicate to a sensible number of secondaries - const int slaveCount = getSlaveCount() / 2 + 1; - if ( ! opReplicatedEnough( lastOpApplied , slaveCount ) ) { - log( LL_WARNING ) << "migrate commit attempt timed out contacting " << slaveCount - << " slaves for '" << ns << "' " << min << " -> " << max << endl; + return mongo::opReplicatedEnough( lastOpApplied , slaveCount ); + } + + bool flushPendingWrites( const ReplTime& lastOpApplied ) { + if ( ! opReplicatedEnough( lastOpApplied ) ) { + warning() << "migrate commit attempt timed out contacting " << slaveCount + << " slaves for '" << ns << "' " << min << " -> " << max << endl; return false; } log() << "migrate commit succeeded flushing to secondaries for '" << ns << "' " << min << " -> " << max << endl; @@ -1438,6 +1501,8 @@ namespace mongo { long long clonedBytes; long long numCatchup; long long numSteady; + + int slaveCount; enum State { READY , CLONE , CATCHUP , STEADY , COMMIT_START , DONE , FAIL , ABORT } state; string errmsg; diff --git a/s/d_split.cpp b/s/d_split.cpp index 66fe38e..3ed6e9b 100644 --- a/s/d_split.cpp +++ b/s/d_split.cpp @@ -138,6 +138,11 @@ namespace mongo { const char* ns = jsobj.getStringField( "checkShardingIndex" ); BSONObj keyPattern = jsobj.getObjectField( "keyPattern" ); + if ( keyPattern.nFields() == 1 && str::equals( "_id" , keyPattern.firstElement().fieldName() ) ) { + result.appendBool( "idskip" , true ); + return true; + } + // If min and max are not provided use the "minKey" and "maxKey" for the sharding key pattern. BSONObj min = jsobj.getObjectField( "min" ); BSONObj max = jsobj.getObjectField( "max" ); @@ -211,6 +216,9 @@ namespace mongo { return false; } cc->advance(); + + if ( ! cc->yieldSometimes() ) + break; } return true; diff --git a/s/d_state.cpp b/s/d_state.cpp index 11fbcef..e10400f 100644 --- a/s/d_state.cpp +++ b/s/d_state.cpp @@ -75,7 +75,7 @@ namespace mongo { << " before [" << _shardName << "] " << " got [" << name << "] " ; - uasserted( 13298 , ss.str() ); + msgasserted( 13298 , ss.str() ); } void ShardingState::gotShardHost( string host ) { @@ -97,7 +97,7 @@ namespace mongo { << " before [" << _shardHost << "] " << " got [" << host << "] " ; - uasserted( 13299 , ss.str() ); + msgasserted( 13299 , ss.str() ); } void ShardingState::resetShardingState() { @@ -440,8 +440,14 @@ namespace mongo { return ( dbName == "local" ) || ( dbName == "admin" ) || ( dbName == "config" ); } + void Grid::flushConfig() { + scoped_lock lk( _lock ); + _databases.clear(); + } + Grid grid; + // unit tests class BalancingWindowUnitTest : public UnitTest { @@ -83,6 +83,8 @@ namespace mongo { bool shouldBalance() const; unsigned long long getNextOpTime() const; + + void flushConfig(); // exposed methods below are for testing only diff --git a/s/shard.cpp b/s/shard.cpp index dbfd8f9..c1e3b56 100644 --- a/s/shard.cpp +++ b/s/shard.cpp @@ -25,6 +25,8 @@ namespace mongo { + typedef shared_ptr<Shard> ShardPtr; + class StaticShardInfo { public: StaticShardInfo() : _mutex("StaticShardInfo") { } @@ -48,9 +50,9 @@ namespace mongo { // the config state intact. The rationale is that this way we could drop shards that // were removed without reinitializing the config DB information. - map<string,Shard>::iterator i = _lookup.find( "config" ); + ShardMap::iterator i = _lookup.find( "config" ); if ( i != _lookup.end() ) { - Shard config = i->second; + ShardPtr config = i->second; _lookup.clear(); _lookup[ "config" ] = config; } @@ -75,14 +77,14 @@ namespace mongo { isDraining = isDrainingElem.Bool(); } - Shard s( name , host , maxSize , isDraining ); + ShardPtr s( new Shard( name , host , maxSize , isDraining ) ); _lookup[name] = s; _installHost( host , s ); } } - const Shard& find( const string& ident ) { + ShardPtr find( const string& ident ) { string mykey = ident; { @@ -94,7 +96,7 @@ namespace mongo { { scoped_lock lk( _mutex ); - map<string,Shard>::iterator i = _lookup.find( mykey ); + ShardMap::iterator i = _lookup.find( mykey ); if ( i != _lookup.end() ) return i->second; @@ -104,23 +106,24 @@ namespace mongo { reload(); scoped_lock lk( _mutex ); - map<string,Shard>::iterator i = _lookup.find( mykey ); + ShardMap::iterator i = _lookup.find( mykey ); massert( 13129 , (string)"can't find shard for: " + mykey , i != _lookup.end() ); return i->second; } void set( const string& name , const Shard& s , bool setName = true , bool setAddr = true ) { scoped_lock lk( _mutex ); + ShardPtr ss( new Shard( s ) ); if ( setName ) - _lookup[name] = s; + _lookup[name] = ss; if ( setAddr ) - _installHost( s.getConnString() , s ); + _installHost( s.getConnString() , ss ); } - void _installHost( const string& host , const Shard& s ) { + void _installHost( const string& host , const ShardPtr& s ) { _lookup[host] = s; - const ConnectionString& cs = s.getAddress(); + const ConnectionString& cs = s->getAddress(); if ( cs.type() == ConnectionString::SET ) { if ( cs.getSetName().size() ) _lookup[ cs.getSetName() ] = s; @@ -134,9 +137,9 @@ namespace mongo { void remove( const string& name ) { scoped_lock lk( _mutex ); - for ( map<string,Shard>::iterator i = _lookup.begin(); i!=_lookup.end(); ) { - Shard s = i->second; - if ( s.getName() == name ) { + for ( ShardMap::iterator i = _lookup.begin(); i!=_lookup.end(); ) { + ShardPtr s = i->second; + if ( s->getName() == name ) { _lookup.erase(i++); } else { @@ -145,35 +148,49 @@ namespace mongo { } } - void getAllShards( vector<Shard>& all ) const { + void getAllShards( vector<ShardPtr>& all ) const { scoped_lock lk( _mutex ); std::set<string> seen; - for ( map<string,Shard>::const_iterator i = _lookup.begin(); i!=_lookup.end(); ++i ) { - const Shard& s = i->second; - if ( s.getName() == "config" ) + for ( ShardMap::const_iterator i = _lookup.begin(); i!=_lookup.end(); ++i ) { + const ShardPtr& s = i->second; + if ( s->getName() == "config" ) continue; - if ( seen.count( s.getName() ) ) + if ( seen.count( s->getName() ) ) continue; - seen.insert( s.getName() ); + seen.insert( s->getName() ); all.push_back( s ); } } + + void getAllShards( vector<Shard>& all ) const { + scoped_lock lk( _mutex ); + std::set<string> seen; + for ( ShardMap::const_iterator i = _lookup.begin(); i!=_lookup.end(); ++i ) { + const ShardPtr& s = i->second; + if ( s->getName() == "config" ) + continue; + if ( seen.count( s->getName() ) ) + continue; + seen.insert( s->getName() ); + all.push_back( *s ); + } + } + bool isAShardNode( const string& addr ) const { scoped_lock lk( _mutex ); // check direct nods or set names - map<string,Shard>::const_iterator i = _lookup.find( addr ); + ShardMap::const_iterator i = _lookup.find( addr ); if ( i != _lookup.end() ) return true; // check for set nodes - for ( map<string,Shard>::const_iterator i = _lookup.begin(); i!=_lookup.end(); ++i ) { + for ( ShardMap::const_iterator i = _lookup.begin(); i!=_lookup.end(); ++i ) { if ( i->first == "config" ) continue; - const Shard& s = i->second; - if ( s.containsNode( addr ) ) + if ( i->second->containsNode( addr ) ) return true; } @@ -185,8 +202,8 @@ namespace mongo { BSONObjBuilder b( _lookup.size() + 50 ); - for ( map<string,Shard>::const_iterator i = _lookup.begin(); i!=_lookup.end(); ++i ) { - b.append( i->first , i->second.getConnString() ); + for ( ShardMap::const_iterator i = _lookup.begin(); i!=_lookup.end(); ++i ) { + b.append( i->first , i->second->getConnString() ); } result.append( "map" , b.obj() ); @@ -195,7 +212,8 @@ namespace mongo { } private: - map<string,Shard> _lookup; + typedef map<string,ShardPtr> ShardMap; + ShardMap _lookup; mutable mongo::mutex _mutex; } staticShardInfo; @@ -242,14 +260,14 @@ namespace mongo { } void Shard::reset( const string& ident ) { - const Shard& s = staticShardInfo.find( ident ); - massert( 13128 , (string)"can't find shard for: " + ident , s.ok() ); - _name = s._name; - _addr = s._addr; - _cs = s._cs; + ShardPtr s = staticShardInfo.find( ident ); + massert( 13128 , (string)"can't find shard for: " + ident , s->ok() ); + _name = s->_name; + _addr = s->_addr; + _cs = s->_cs; _rsInit(); - _maxSize = s._maxSize; - _isDraining = s._isDraining; + _maxSize = s->_maxSize; + _isDraining = s->_isDraining; } bool Shard::containsNode( const string& node ) const { @@ -271,10 +289,10 @@ namespace mongo { } void Shard::printShardInfo( ostream& out ) { - vector<Shard> all; - getAllShards( all ); + vector<ShardPtr> all; + staticShardInfo.getAllShards( all ); for ( unsigned i=0; i<all.size(); i++ ) - out << all[i].toString() << "\n"; + out << all[i]->toString() << "\n"; out.flush(); } @@ -306,7 +324,7 @@ namespace mongo { } Shard Shard::pick( const Shard& current ) { - vector<Shard> all; + vector<ShardPtr> all; staticShardInfo.getAllShards( all ); if ( all.size() == 0 ) { staticShardInfo.reload(); @@ -316,13 +334,13 @@ namespace mongo { } // if current shard was provided, pick a different shard only if it is a better choice - ShardStatus best = all[0].getStatus(); + ShardStatus best = all[0]->getStatus(); if ( current != EMPTY ) { best = current.getStatus(); } for ( size_t i=0; i<all.size(); i++ ) { - ShardStatus t = all[i].getStatus(); + ShardStatus t = all[i]->getStatus(); if ( t < best ) best = t; } diff --git a/s/shard_version.cpp b/s/shard_version.cpp index 043b9bd..a189a08 100644 --- a/s/shard_version.cpp +++ b/s/shard_version.cpp @@ -97,7 +97,9 @@ namespace mongo { const bool isSharded = conf->isSharded( ns ); if ( isSharded ) { manager = conf->getChunkManager( ns , authoritative ); - officialSequenceNumber = manager->getSequenceNumber(); + // It's possible the chunk manager was reset since we checked whether sharded was true, + // so must check this here. + if( manager ) officialSequenceNumber = manager->getSequenceNumber(); } // has the ChunkManager been reloaded since the last time we updated the connection-level version? @@ -109,7 +111,7 @@ namespace mongo { ShardChunkVersion version = 0; - if ( isSharded ) { + if ( isSharded && manager ) { version = manager->getVersion( Shard::make( conn.getServerAddress() ) ); } diff --git a/s/strategy_shard.cpp b/s/strategy_shard.cpp index 2eca0c6..26ea79a 100644 --- a/s/strategy_shard.cpp +++ b/s/strategy_shard.cpp @@ -151,8 +151,10 @@ namespace mongo { // Many operations benefit from having the shard key early in the object o = manager->getShardKey().moveToFront(o); + const int maxTries = 10; + bool gotThrough = false; - for ( int i=0; i<10; i++ ) { + for ( int i=0; i<maxTries; i++ ) { try { ChunkPtr c = manager->findChunk( o ); log(4) << " server:" << c->getShard().toString() << " " << o << endl; @@ -165,7 +167,7 @@ namespace mongo { break; } catch ( StaleConfigException& ) { - log(1) << "retrying insert because of StaleConfigException: " << o << endl; + log( i < ( maxTries / 2 ) ) << "retrying insert because of StaleConfigException: " << o << endl; r.reset(); manager = r.getChunkManager(); } diff --git a/s/strategy_single.cpp b/s/strategy_single.cpp index b3b5502..3fd357a 100644 --- a/s/strategy_single.cpp +++ b/s/strategy_single.cpp @@ -88,7 +88,7 @@ namespace mongo { long long id = r.d().getInt64( 4 ); - ShardConnection conn( cursorCache.getRef( id ) , ns ); + ScopedDbConnection conn( cursorCache.getRef( id ) ); Message response; bool ok = conn->callRead( r.m() , response); diff --git a/s/writeback_listener.cpp b/s/writeback_listener.cpp index 3051013..769163e 100644 --- a/s/writeback_listener.cpp +++ b/s/writeback_listener.cpp @@ -36,7 +36,7 @@ namespace mongo { set<string> WriteBackListener::_seenSets; mongo::mutex WriteBackListener::_cacheLock("WriteBackListener"); - map<ConnectionId,WriteBackListener::WBStatus> WriteBackListener::_seenWritebacks; + map<WriteBackListener::ConnectionIdent,WriteBackListener::WBStatus> WriteBackListener::_seenWritebacks; mongo::mutex WriteBackListener::_seenWritebacksLock("WriteBackListener::seen"); WriteBackListener::WriteBackListener( const string& addr ) : _addr( addr ) { @@ -86,18 +86,18 @@ namespace mongo { } /* static */ - BSONObj WriteBackListener::waitFor( ConnectionId connectionId, const OID& oid ) { + BSONObj WriteBackListener::waitFor( const ConnectionIdent& ident, const OID& oid ) { Timer t; for ( int i=0; i<5000; i++ ) { { scoped_lock lk( _seenWritebacksLock ); - WBStatus s = _seenWritebacks[connectionId]; + WBStatus s = _seenWritebacks[ident]; if ( oid < s.id ) { // this means we're waiting for a GLE that already passed. // it should be impossible becauseonce we call GLE, no other // writebacks should happen with that connection id msgasserted( 13633 , str::stream() << "got writeback waitfor for older id " << - " oid: " << oid << " s.id: " << s.id << " connectionId: " << connectionId ); + " oid: " << oid << " s.id: " << s.id << " connection: " << ident.toString() ); } else if ( oid == s.id ) { return s.gle; @@ -142,10 +142,13 @@ namespace mongo { if ( data.getBoolField( "writeBack" ) ) { string ns = data["ns"].valuestrsafe(); - ConnectionId cid = 0; + ConnectionIdent cid( "" , 0 ); OID wid; if ( data["connectionId"].isNumber() && data["id"].type() == jstOID ) { - cid = data["connectionId"].numberLong(); + string s = ""; + if ( data["instanceIdent"].type() == String ) + s = data["instanceIdent"].String(); + cid = ConnectionIdent( s , data["connectionId"].numberLong() ); wid = data["id"].OID(); } else { @@ -226,7 +229,7 @@ namespace mongo { secsToSleep = 0; continue; } - catch ( std::exception e ) { + catch ( std::exception& e ) { if ( inShutdown() ) { // we're shutting down, so just clean up diff --git a/s/writeback_listener.h b/s/writeback_listener.h index 7335999..0125073 100644 --- a/s/writeback_listener.h +++ b/s/writeback_listener.h @@ -35,10 +35,30 @@ namespace mongo { */ class WriteBackListener : public BackgroundJob { public: + + class ConnectionIdent { + public: + ConnectionIdent( const string& ii , ConnectionId id ) + : instanceIdent( ii ) , connectionId( id ) { + } + + bool operator<(const ConnectionIdent& other) const { + if ( instanceIdent == other.instanceIdent ) + return connectionId < other.connectionId; + + return instanceIdent < other.instanceIdent; + } + + string toString() const { return str::stream() << instanceIdent << ":" << connectionId; } + + string instanceIdent; + ConnectionId connectionId; + }; + static void init( DBClientBase& conn ); static void init( const string& host ); - static BSONObj waitFor( ConnectionId connectionId, const OID& oid ); + static BSONObj waitFor( const ConnectionIdent& ident, const OID& oid ); protected: WriteBackListener( const string& addr ); @@ -59,7 +79,7 @@ namespace mongo { }; static mongo::mutex _seenWritebacksLock; // protects _seenWritbacks - static map<ConnectionId,WBStatus> _seenWritebacks; // connectionId -> last write back GLE + static map<ConnectionIdent,WBStatus> _seenWritebacks; // connectionId -> last write back GLE }; void waitForWriteback( const OID& oid ); diff --git a/scripting/engine.cpp b/scripting/engine.cpp index 60e56ae..f9be639 100644 --- a/scripting/engine.cpp +++ b/scripting/engine.cpp @@ -189,12 +189,12 @@ namespace mongo { set<string> thisTime; while ( c->more() ) { - BSONObj o = c->next(); + BSONObj o = c->nextSafe(); BSONElement n = o["_id"]; BSONElement v = o["value"]; - uassert( 10209 , "name has to be a string" , n.type() == String ); + uassert( 10209 , str::stream() << "name has to be a string: " << n , n.type() == String ); uassert( 10210 , "value has to be set" , v.type() != EOO ); setElement( n.valuestr() , v ); diff --git a/shell/shell_utils.cpp b/shell/shell_utils.cpp index 7a62030..09a3e46 100644 --- a/shell/shell_utils.cpp +++ b/shell/shell_utils.cpp @@ -265,7 +265,7 @@ namespace mongo { BSONElement e = oneArg(args); bool found = false; - path root( args.firstElement().valuestrsafe() ); + path root( e.valuestrsafe() ); if ( boost::filesystem::exists( root ) ) { found = true; boost::filesystem::remove_all( root ); diff --git a/tools/dump.cpp b/tools/dump.cpp index c99aaaa..155f84b 100644 --- a/tools/dump.cpp +++ b/tools/dump.cpp @@ -293,6 +293,8 @@ public: } } + auth("local"); + BSONObj op = conn(true).findOne(opLogName, Query().sort("$natural", -1), 0, QueryOption_SlaveOk); if (op.isEmpty()) { cout << "No operations in oplog. Please ensure you are connecting to a master." << endl; diff --git a/tools/tool.cpp b/tools/tool.cpp index f687269..54dc5df 100644 --- a/tools/tool.cpp +++ b/tools/tool.cpp @@ -400,14 +400,14 @@ namespace mongo { ProgressMeter m( fileLength ); while ( read < fileLength ) { - int readlen = fread(buf, 4, 1, file); + size_t amt = fread(buf, 1, 4, file); + assert( amt == 4 ); + int size = ((int*)buf)[0]; - if ( size >= BUF_SIZE ) { - cerr << "got an object of size: " << size << " terminating..." << endl; - } - uassert( 10264 , "invalid object size" , size < BUF_SIZE ); + uassert( 10264 , str::stream() << "invalid object size: " << size , size < BUF_SIZE ); - readlen = fread(buf+4, size-4, 1, file); + amt = fread(buf+4, 1, size-4, file); + assert( amt == (size_t)( size - 4 ) ); BSONObj o( buf ); if ( _objcheck && ! o.valid() ) { diff --git a/util/assert_util.cpp b/util/assert_util.cpp index 47be5e9..8280d8b 100644 --- a/util/assert_util.cpp +++ b/util/assert_util.cpp @@ -91,6 +91,7 @@ namespace mongo { void uasserted(int msgid, const char *msg) { assertionCount.condrollover( ++assertionCount.user ); + LOG(1) << "User Assertion: " << msgid << ":" << msg << endl; raiseError(msgid,msg); throw UserException(msgid, msg); } diff --git a/util/concurrency/spin_lock.h b/util/concurrency/spin_lock.h index d5360f7..02a8797 100644 --- a/util/concurrency/spin_lock.h +++ b/util/concurrency/spin_lock.h @@ -49,6 +49,16 @@ namespace mongo { SpinLock(SpinLock&); SpinLock& operator=(SpinLock&); }; + + struct scoped_spinlock { + scoped_spinlock( SpinLock& l ) : _l(l){ + _l.lock(); + } + ~scoped_spinlock() { + _l.unlock(); + } + SpinLock& _l; + }; } // namespace mongo diff --git a/util/concurrency/value.h b/util/concurrency/value.h index 08d5306..0a0ef85 100644 --- a/util/concurrency/value.h +++ b/util/concurrency/value.h @@ -60,28 +60,31 @@ namespace mongo { }; }; - /** this string COULD be mangled but with the double buffering, assuming writes - are infrequent, it's unlikely. thus, this is reasonable for lockless setting of - diagnostic strings, where their content isn't critical. - */ class DiagStr { - char buf1[256]; - char buf2[256]; - char *p; + string _s; + static mutex m; public: - DiagStr() { - memset(buf1, 0, 256); - memset(buf2, 0, 256); - p = buf1; + DiagStr(const DiagStr& r) : _s(r.get()) { } + DiagStr() { } + bool empty() const { + mutex::scoped_lock lk(m); + return _s.empty(); + } + string get() const { + mutex::scoped_lock lk(m); + return _s; } - - const char * get() const { return p; } void set(const char *s) { - char *q = (p==buf1) ? buf2 : buf1; - strncpy(q, s, 255); - p = q; + mutex::scoped_lock lk(m); + _s = s; + } + void set(const string& s) { + mutex::scoped_lock lk(m); + _s = s; } + operator string() const { return get(); } + void operator=(const string& s) { set(s); } }; } diff --git a/util/concurrency/vars.cpp b/util/concurrency/vars.cpp index 3d057a4..19b58eb 100644 --- a/util/concurrency/vars.cpp +++ b/util/concurrency/vars.cpp @@ -22,6 +22,8 @@ namespace mongo { + mutex DiagStr::m("diags"); + mongo::mutex _atomicMutex("_atomicMutex"); // intentional leak. otherwise destructor orders can be problematic at termination. diff --git a/util/message.cpp b/util/message.cpp index 916aa34..bcb1772 100644 --- a/util/message.cpp +++ b/util/message.cpp @@ -359,7 +359,7 @@ namespace mongo { ConnectBG(int sock, SockAddr farEnd) : _sock(sock), _farEnd(farEnd) { } void run() { _res = ::connect(_sock, _farEnd.raw(), _farEnd.addressSize); } - string name() const { return ""; /* too short lived to need to name */ } + string name() const { return "ConnectBG"; } int inError() const { return _res; } private: @@ -628,12 +628,20 @@ again: unsigned retries = 0; while( len > 0 ) { int ret = ::recv( sock , buf , len , portRecvFlags ); - if ( ret == 0 ) { + if ( ret > 0 ) { + if ( len <= 4 && ret != len ) + log(_logLevel) << "MessagingPort recv() got " << ret << " bytes wanted len=" << len << endl; + assert( ret <= len ); + len -= ret; + buf += ret; + } + else if ( ret == 0 ) { log(3) << "MessagingPort recv() conn closed? " << farEnd.toString() << endl; throw SocketException( SocketException::CLOSED ); } - if ( ret < 0 ) { + else { /* ret < 0 */ int e = errno; + #if defined(EINTR) && !defined(_WIN32) if( e == EINTR ) { if( ++retries == 1 ) { @@ -642,29 +650,18 @@ again: } } #endif - if ( e != EAGAIN || _timeout == 0 ) { - SocketException::Type t = SocketException::RECV_ERROR; -#if defined(_WINDOWS) - if( e == WSAETIMEDOUT ) t = SocketException::RECV_TIMEOUT; -#else - /* todo: what is the error code on an SO_RCVTIMEO on linux? EGAIN? EWOULDBLOCK? */ + if ( ( e == EAGAIN +#ifdef _WINDOWS + || e == WSAETIMEDOUT #endif - log(_logLevel) << "MessagingPort recv() " << errnoWithDescription(e) << " " << farEnd.toString() <<endl; - throw SocketException(t); - } - else { - if ( !serverAlive( farEnd.toString() ) ) { - log(_logLevel) << "MessagingPort recv() remote dead " << farEnd.toString() << endl; - throw SocketException( SocketException::RECV_ERROR ); - } + ) && _timeout > 0 ) { + // this is a timeout + log(_logLevel) << "MessagingPort recv() timeout " << farEnd.toString() <<endl; + throw SocketException(SocketException::RECV_TIMEOUT); } - } - else { - if ( len <= 4 && ret != len ) - log(_logLevel) << "MessagingPort recv() got " << ret << " bytes wanted len=" << len << endl; - assert( ret <= len ); - len -= ret; - buf += ret; + + log(_logLevel) << "MessagingPort recv() " << errnoWithDescription(e) << " " << farEnd.toString() <<endl; + throw SocketException(SocketException::RECV_ERROR); } } } diff --git a/util/ramlog.h b/util/ramlog.h index fc588e6..b2f3aa0 100644 --- a/util/ramlog.h +++ b/util/ramlog.h @@ -108,7 +108,6 @@ namespace mongo { vector<const char*> v; get( v ); - bool first = true; s << "<pre>\n"; for( int i = 0; i < (int)v.size(); i++ ) { assert( strlen(v[i]) > 20 ); @@ -126,7 +125,7 @@ namespace mongo { stringstream r; if( nr == 1 ) r << "repeat last line"; else r << "repeats last " << nr << " lines; ends " << string(v[last]+4,0,15); - first = false; s << html::a("", r.str(), clean(v,i,x.str())); + s << html::a("", r.str(), clean(v,i,x.str())); } else s << x.str(); s << '\n'; diff --git a/util/sock.h b/util/sock.h index 84690fe..54dfb49 100644 --- a/util/sock.h +++ b/util/sock.h @@ -199,7 +199,7 @@ namespace mongo { case AF_INET6: return memcmp(as<sockaddr_in6>().sin6_addr.s6_addr, r.as<sockaddr_in6>().sin6_addr.s6_addr, sizeof(in6_addr)) == 0; case AF_UNIX: return strcmp(as<sockaddr_un>().sun_path, r.as<sockaddr_un>().sun_path) == 0; case AF_UNSPEC: return true; // assume all unspecified addresses are the same - default: massert(SOCK_FAMILY_UNKNOWN_ERROR, "unsupported address family", false); + default: massert(SOCK_FAMILY_UNKNOWN_ERROR, "unsupported address family", false); return false; } } bool operator!=(const SockAddr& r) const { @@ -221,7 +221,7 @@ namespace mongo { case AF_INET6: return memcmp(as<sockaddr_in6>().sin6_addr.s6_addr, r.as<sockaddr_in6>().sin6_addr.s6_addr, sizeof(in6_addr)) < 0; case AF_UNIX: return strcmp(as<sockaddr_un>().sun_path, r.as<sockaddr_un>().sun_path) < 0; case AF_UNSPEC: return false; - default: massert(SOCK_FAMILY_UNKNOWN_ERROR, "unsupported address family", false); + default: massert(SOCK_FAMILY_UNKNOWN_ERROR, "unsupported address family", false); return false; } } diff --git a/util/version.cpp b/util/version.cpp index f57e256..78a31be 100644 --- a/util/version.cpp +++ b/util/version.cpp @@ -23,10 +23,11 @@ #include <string> #include "unittest.h" #include "version.h" +#include "file.h" namespace mongo { - const char versionString[] = "1.8.1"; + const char versionString[] = "1.8.2"; string mongodVersion() { stringstream ss; @@ -94,6 +95,45 @@ namespace mongo { cout << "** WARNING: You are running in OpenVZ. This is known to be broken!!!" << endl; warned = true; } + + if (boost::filesystem::exists("/sys/devices/system/node/node1")){ + // We are on a box with a NUMA enabled kernel and more than 1 numa node (they start at node0) + // Now we look at the first line of /proc/self/numa_maps + // + // Bad example: + // $ cat /proc/self/numa_maps + // 00400000 default file=/bin/cat mapped=6 N4=6 + // + // Good example: + // $ numactl --interleave=all cat /proc/self/numa_maps + // 00400000 interleave:0-7 file=/bin/cat mapped=6 N4=6 + + File f; + f.open("/proc/self/numa_maps", /*read_only*/true); + if ( f.is_open() && ! f.bad() ) { + char line[100]; //we only need the first line + f.read(0, line, sizeof(line)); + + // just in case... + line[98] = ' '; + line[99] = '\0'; + + // skip over pointer + const char* space = strchr(line, ' '); + + if ( ! space ) { + cout << "** WARNING: cannot parse numa_maps" << endl; + warned = true; + } + else if ( ! startsWith(space+1, "interleave") ) { + cout << endl; + cout << "** WARNING: You are running on a NUMA machine." << endl; + cout << "** We suggest launching mongod like this to avoid performance problems:" << endl; + cout << "** numactl --interleave=all mongod [other options]" << endl; + warned = true; + } + } + } #endif if (warned) |