diff options
113 files changed, 2034 insertions, 569 deletions
@@ -32,6 +32,16 @@ def findSettingsSetup(): sys.path.append( ".." ) sys.path.append( "../../" ) +def getThirdPartyShortNames(): + lst = [] + for x in os.listdir( "third_party" ): + if not x.endswith( ".py" ) or x.find( "#" ) >= 0: + continue + + lst.append( x.rpartition( "." )[0] ) + return lst + + # --- options ---- options = {} @@ -135,6 +145,8 @@ add_option( "staticlibpath", "comma separated list of dirs to search for staticl add_option( "boost-compiler", "compiler used for boost (gcc41)" , 1 , True , "boostCompiler" ) add_option( "boost-version", "boost version for linking(1_38)" , 1 , True , "boostVersion" ) +add_option( "no-glibc-check" , "don't check for new versions of glibc" , 0 , False ) + # experimental features add_option( "mm", "use main memory instead of memory mapped files" , 0 , True ) add_option( "asio" , "Use Asynchronous IO (NOT READY YET)" , 0 , True ) @@ -170,6 +182,11 @@ add_option( "heapcheck", "link to heap-checking malloc-lib and look for memory l add_option("smokedbprefix", "prefix to dbpath et al. for smoke tests", 1 , False ) +for shortName in getThirdPartyShortNames(): + add_option( "use-system-" + shortName , "use system version of library " + shortName , 0 , True ) + +add_option( "use-system-all" , "use all system libraries " + shortName , 0 , True ) + # --- environment setup --- def removeIfInList( lst , thing ): @@ -327,7 +344,7 @@ if has_option( "full" ): # ------ SOURCE FILE SETUP ----------- -commonFiles = Split( "pch.cpp buildinfo.cpp db/indexkey.cpp db/jsobj.cpp bson/oid.cpp db/json.cpp db/lasterror.cpp db/nonce.cpp db/queryutil.cpp db/querypattern.cpp db/projection.cpp shell/mongo.cpp db/security_common.cpp db/security_commands.cpp" ) +commonFiles = Split( "pch.cpp buildinfo.cpp db/indexkey.cpp db/jsobj.cpp bson/oid.cpp db/json.cpp db/lasterror.cpp db/nonce.cpp db/queryutil.cpp db/querypattern.cpp db/projection.cpp shell/mongo.cpp" ) commonFiles += [ "util/background.cpp" , "util/util.cpp" , "util/file_allocator.cpp" , "util/assert_util.cpp" , "util/log.cpp" , "util/ramlog.cpp" , "util/md5main.cpp" , "util/base64.cpp", "util/concurrency/vars.cpp", "util/concurrency/task.cpp", "util/debug_util.cpp", "util/concurrency/thread_pool.cpp", "util/password.cpp", "util/version.cpp", "util/signal_handlers.cpp", @@ -343,8 +360,9 @@ coreDbFiles = [ "db/commands.cpp" ] coreServerFiles = [ "util/net/message_server_port.cpp" , "client/parallel.cpp" , "db/common.cpp", "util/net/miniwebserver.cpp" , "db/dbwebserver.cpp" , - "db/matcher.cpp" , "db/dbcommands_generic.cpp" , "db/dbmessage.cpp" ] - + "db/matcher.cpp" , "db/dbcommands_generic.cpp" , "db/dbmessage.cpp", + "db/security_common.cpp", "db/security_commands.cpp", + ] mmapFiles = [ "util/mmap.cpp" ] if has_option( "mm" ): @@ -757,21 +775,20 @@ if not windows: keyfile = "jstests/libs/key%s" % keysuffix os.chmod( keyfile , stat.S_IWUSR|stat.S_IRUSR ) -for x in os.listdir( "third_party" ): - if not x.endswith( ".py" ) or x.find( "#" ) >= 0: - continue - - shortName = x.rpartition( "." )[0] - path = "third_party/%s" % x - - +moduleFiles = {} +for shortName in getThirdPartyShortNames(): + path = "third_party/%s.py" % shortName myModule = imp.load_module( "third_party_%s" % shortName , open( path , "r" ) , path , ( ".py" , "r" , imp.PY_SOURCE ) ) fileLists = { "commonFiles" : commonFiles , "serverOnlyFiles" : serverOnlyFiles , "scriptingFiles" : scriptingFiles } options_topass["windows"] = windows options_topass["nix"] = nix - myModule.configure( env , fileLists , options_topass ) + if has_option( "use-system-" + shortName ) or has_option( "use-system-all" ): + print( "using system version of: " + shortName ) + myModule.configureSystem( env , fileLists , options_topass ) + else: + myModule.configure( env , fileLists , options_topass ) coreServerFiles += scriptingFiles @@ -1131,7 +1148,7 @@ if darwin or clientEnv["_HAVEPCAP"]: sniffEnv.Append( LIBS=[ "wpcap" ] ) sniffEnv.Prepend( LIBPATH=["."] ) - sniffEnv.Append( LIBS=[ "mongotestfiles" ] ) + sniffEnv.Prepend( LIBS=[ "mongotestfiles" ] ) sniffEnv.Program( "mongosniff" , "tools/sniffer.cpp" ) @@ -1166,6 +1183,7 @@ elif not onlyServer: shellEnv = doConfigure( shellEnv , shell=True ) shellEnv.Prepend( LIBS=[ "mongoshellfiles"] ) + mongo = shellEnv.Program( "mongo" , coreShellFiles ) @@ -1431,7 +1449,7 @@ def installBinary( e , name ): if (solaris or linux) and (not has_option("nostrip")): e.AddPostAction( inst, e.Action( 'strip ' + fullInstallName ) ) - if linux and len( COMMAND_LINE_TARGETS ) == 1 and str( COMMAND_LINE_TARGETS[0] ) == "s3dist": + if not has_option( "no-glibc-check" ) and linux and len( COMMAND_LINE_TARGETS ) == 1 and str( COMMAND_LINE_TARGETS[0] ) == "s3dist": e.AddPostAction( inst , checkGlibc ) if nix: diff --git a/bson/bsonelement.h b/bson/bsonelement.h index 5487d8d..bf0ccc3 100644 --- a/bson/bsonelement.h +++ b/bson/bsonelement.h @@ -308,6 +308,8 @@ namespace mongo { bool operator==(const BSONElement& r) const { return woCompare( r , true ) == 0; } + /** Returns true if elements are unequal. */ + bool operator!=(const BSONElement& r) const { return !operator==(r); } /** Well ordered comparison. @return <0: l<r. 0:l==r. >0:l>r diff --git a/bson/bsonobj.h b/bson/bsonobj.h index 9e948f3..486a3e6 100644 --- a/bson/bsonobj.h +++ b/bson/bsonobj.h @@ -254,6 +254,11 @@ namespace mongo { BSONElement getFieldUsingIndexNames(const char *fieldName, const BSONObj &indexKey) const; + /** arrays are bson objects with numeric and increasing field names + @return true if field names are numeric and increasing + */ + bool couldBeArray() const; + /** @return the raw data of the object */ const char *objdata() const { return _objdata; @@ -360,6 +365,7 @@ namespace mongo { string md5() const; bool operator==( const BSONObj& other ) const { return equal( other ); } + bool operator!=(const BSONObj& other) const { return !operator==( other); } enum MatchType { Equality = 0, diff --git a/bson/bsonobjbuilder.h b/bson/bsonobjbuilder.h index 86a52ac..f89d225 100644 --- a/bson/bsonobjbuilder.h +++ b/bson/bsonobjbuilder.h @@ -469,17 +469,14 @@ namespace mongo { Use BinDataGeneral if you don't care about the type. @param data the byte array */ - BSONObjBuilder& appendBinData( const StringData& fieldName, int len, BinDataType type, const char *data ) { + BSONObjBuilder& appendBinData( const StringData& fieldName, int len, BinDataType type, const void *data ) { _b.appendNum( (char) BinData ); _b.appendStr( fieldName ); _b.appendNum( len ); _b.appendNum( (char) type ); - _b.appendBuf( (void *) data, len ); + _b.appendBuf( data, len ); return *this; } - BSONObjBuilder& appendBinData( const StringData& fieldName, int len, BinDataType type, const unsigned char *data ) { - return appendBinData(fieldName, len, type, (const char *) data); - } /** Subtype 2 is deprecated. @@ -487,13 +484,13 @@ namespace mongo { @param data a byte array @param len the length of data */ - BSONObjBuilder& appendBinDataArrayDeprecated( const char * fieldName , const char * data , int len ) { + BSONObjBuilder& appendBinDataArrayDeprecated( const char * fieldName , const void * data , int len ) { _b.appendNum( (char) BinData ); _b.appendStr( fieldName ); _b.appendNum( len + 4 ); _b.appendNum( (char)0x2 ); _b.appendNum( len ); - _b.appendBuf( (void *) data, len ); + _b.appendBuf( data, len ); return *this; } diff --git a/bson/stringdata.h b/bson/stringdata.h index 352dc51..1fb4e7d 100644 --- a/bson/stringdata.h +++ b/bson/stringdata.h @@ -61,7 +61,7 @@ namespace mongo { // accessors const char* data() const { return _data; } - const unsigned size() const { return _size; } + unsigned size() const { return _size; } private: const char* const _data; // is always null terminated diff --git a/bson/util/builder.h b/bson/util/builder.h index 710c2d4..f189f58 100644 --- a/bson/util/builder.h +++ b/bson/util/builder.h @@ -65,6 +65,8 @@ namespace mongo { if( p == buf ) { if( sz <= SZ ) return buf; void *d = malloc(sz); + if ( d == 0 ) + msgasserted( 15912 , "out of memory StackAllocator::Realloc" ); memcpy(d, p, SZ); return d; } @@ -113,6 +115,8 @@ namespace mongo { if ( maxSize && size > maxSize ) { al.Free(data); data = (char*)al.Malloc(maxSize); + if ( data == 0 ) + msgasserted( 15913 , "out of memory BufBuilder::reset" ); size = maxSize; } } diff --git a/client/connpool.cpp b/client/connpool.cpp index 2d7c37b..94ce4ec 100644 --- a/client/connpool.cpp +++ b/client/connpool.cpp @@ -38,7 +38,7 @@ namespace mongo { void PoolForHost::done( DBConnectionPool * pool, DBClientBase * c ) { if ( _pool.size() >= _maxPerHost ) { - pool->onDestory( c ); + pool->onDestroy( c ); delete c; } else { @@ -55,7 +55,7 @@ namespace mongo { _pool.pop(); if ( ! sc.ok( now ) ) { - pool->onDestory( sc.conn ); + pool->onDestroy( sc.conn ); delete sc.conn; continue; } @@ -145,9 +145,15 @@ namespace mongo { PoolForHost& p = _pools[PoolKey(host,socketTimeout)]; p.createdOne( conn ); } - - onCreate( conn ); - onHandedOut( conn ); + + try { + onCreate( conn ); + onHandedOut( conn ); + } + catch ( std::exception& e ) { + delete conn; + throw; + } return conn; } @@ -155,7 +161,13 @@ namespace mongo { DBClientBase* DBConnectionPool::get(const ConnectionString& url, double socketTimeout) { DBClientBase * c = _get( url.toString() , socketTimeout ); if ( c ) { - onHandedOut( c ); + try { + onHandedOut( c ); + } + catch ( std::exception& e ) { + delete c; + throw; + } return c; } @@ -169,7 +181,13 @@ namespace mongo { DBClientBase* DBConnectionPool::get(const string& host, double socketTimeout) { DBClientBase * c = _get( host , socketTimeout ); if ( c ) { - onHandedOut( c ); + try { + onHandedOut( c ); + } + catch ( std::exception& e ) { + delete c; + throw; + } return c; } @@ -185,7 +203,7 @@ namespace mongo { void DBConnectionPool::release(const string& host, DBClientBase *c) { if ( c->isFailed() ) { - onDestory( c ); + onDestroy( c ); delete c; return; } @@ -228,12 +246,12 @@ namespace mongo { } } - void DBConnectionPool::onDestory( DBClientBase * conn ) { + void DBConnectionPool::onDestroy( DBClientBase * conn ) { if ( _hooks->size() == 0 ) return; for ( list<DBConnectionHook*>::iterator i = _hooks->begin(); i != _hooks->end(); i++ ) { - (*i)->onDestory( conn ); + (*i)->onDestroy( conn ); } } @@ -357,7 +375,7 @@ namespace mongo { for ( size_t i=0; i<toDelete.size(); i++ ) { try { - onDestory( toDelete[i] ); + onDestroy( toDelete[i] ); delete toDelete[i]; } catch ( ... ) { diff --git a/client/connpool.h b/client/connpool.h index a37dad7..8733abb 100644 --- a/client/connpool.h +++ b/client/connpool.h @@ -89,7 +89,7 @@ namespace mongo { virtual ~DBConnectionHook() {} virtual void onCreate( DBClientBase * conn ) {} virtual void onHandedOut( DBClientBase * conn ) {} - virtual void onDestory( DBClientBase * conn ) {} + virtual void onDestroy( DBClientBase * conn ) {} }; /** Database connection pool. @@ -119,7 +119,7 @@ namespace mongo { void onCreate( DBClientBase * conn ); void onHandedOut( DBClientBase * conn ); - void onDestory( DBClientBase * conn ); + void onDestroy( DBClientBase * conn ); void flush(); diff --git a/client/dbclient.cpp b/client/dbclient.cpp index dadf7e4..5faeccf 100644 --- a/client/dbclient.cpp +++ b/client/dbclient.cpp @@ -247,6 +247,11 @@ namespace mongo { return o["ok"].trueValue(); } + bool DBClientWithCommands::isNotMasterErrorString( const BSONElement& e ) { + return e.type() == String && str::contains( e.valuestr() , "not master" ); + } + + enum QueryOptions DBClientWithCommands::availableOptions() { if ( !_haveCachedAvailableOptions ) { BSONObj ret; @@ -599,6 +604,19 @@ namespace mongo { return true; } + + inline bool DBClientConnection::runCommand(const string &dbname, const BSONObj& cmd, BSONObj &info, int options) { + if ( DBClientWithCommands::runCommand( dbname , cmd , info , options ) ) + return true; + + if ( clientSet && isNotMasterErrorString( info["errmsg"] ) ) { + clientSet->isntMaster(); + } + + return false; + } + + void DBClientConnection::_checkConnection() { if ( !_failed ) return; @@ -982,8 +1000,7 @@ namespace mongo { if ( clientSet && nReturned ) { assert(data); BSONObj o(data); - BSONElement e = getErrField(o); - if ( e.type() == String && str::contains( e.valuestr() , "not master" ) ) { + if ( isNotMasterErrorString( getErrField(o) ) ) { clientSet->isntMaster(); } } diff --git a/client/dbclient.h b/client/dbclient.h index 2b4bb85..ea55bb4 100644 --- a/client/dbclient.h +++ b/client/dbclient.h @@ -721,8 +721,12 @@ namespace mongo { } protected: + /** if the result of a command is ok*/ bool isOk(const BSONObj&); + /** if the element contains a not master error */ + bool isNotMasterErrorString( const BSONElement& e ); + BSONObj _countCmd(const string &ns, const BSONObj& query, int options, int limit, int skip ); enum QueryOptions availableOptions(); @@ -892,6 +896,8 @@ namespace mongo { unsigned long long query( boost::function<void(const BSONObj&)> f, const string& ns, Query query, const BSONObj *fieldsToReturn = 0, int queryOptions = 0); unsigned long long query( boost::function<void(DBClientCursorBatchIterator&)> f, const string& ns, Query query, const BSONObj *fieldsToReturn = 0, int queryOptions = 0); + virtual bool runCommand(const string &dbname, const BSONObj& cmd, BSONObj &info, int options=0); + /** @return true if this connection is currently in a failed state. When autoreconnect is on, a connection will transition back to an ok state after reconnecting. diff --git a/client/dbclient_rs.cpp b/client/dbclient_rs.cpp index 2cab1f7..c57a52d 100644 --- a/client/dbclient_rs.cpp +++ b/client/dbclient_rs.cpp @@ -247,38 +247,27 @@ namespace mongo { } HostAndPort ReplicaSetMonitor::getSlave() { + LOG(2) << "dbclient_rs getSlave " << getServerAddress() << endl; - LOG(2) << "selecting new slave from replica set " << getServerAddress() << endl; - - // Logic is to retry three times for any secondary node, if we can't find any secondary, we'll take - // any "ok" node - // TODO: Could this query hidden nodes? - const int MAX = 3; - for ( int xxx=0; xxx<MAX; xxx++ ) { + scoped_lock lk( _lock ); - { - scoped_lock lk( _lock ); - - unsigned i = 0; - for ( ; i<_nodes.size(); i++ ) { - _nextSlave = ( _nextSlave + 1 ) % _nodes.size(); - if ( _nextSlave == _master ){ - LOG(2) << "not selecting " << _nodes[_nextSlave] << " as it is the current master" << endl; - continue; - } - if ( _nodes[ _nextSlave ].okForSecondaryQueries() || ( _nodes[ _nextSlave ].ok && ( xxx + 1 ) >= MAX ) ) - return _nodes[ _nextSlave ].addr; - - LOG(2) << "not selecting " << _nodes[_nextSlave] << " as it is not ok to use" << endl; - } - + for ( unsigned ii = 0; ii < _nodes.size(); ii++ ) { + _nextSlave = ( _nextSlave + 1 ) % _nodes.size(); + if ( _nextSlave != _master ) { + if ( _nodes[ _nextSlave ].okForSecondaryQueries() ) + return _nodes[ _nextSlave ].addr; + LOG(2) << "dbclient_rs getSlave not selecting " << _nodes[_nextSlave] << ", not currently okForSecondaryQueries" << endl; } + } - check(false); + if( _master >= 0 ) { + assert( static_cast<unsigned>(_master) < _nodes.size() ); + LOG(2) << "dbclient_rs getSlave no member in secondary state found, returning primary " << _nodes[ _master ] << endl; + return _nodes[_master].addr; } - LOG(2) << "no suitable slave nodes found, returning default node " << _nodes[ 0 ] << endl; - + LOG(2) << "dbclient_rs getSlave no suitable member found, returning first node " << _nodes[ 0 ] << endl; + assert( _nodes.size() > 0 ); return _nodes[0].addr; } @@ -820,10 +809,14 @@ namespace mongo { bool DBClientReplicaSet::call( Message &toSend, Message &response, bool assertOk , string * actualServer ) { + const char * ns = 0; + if ( toSend.operation() == dbQuery ) { // TODO: might be possible to do this faster by changing api DbMessage dm( toSend ); QueryMessage qm( dm ); + ns = qm.ns; + if ( qm.queryOptions & QueryOption_SlaveOk ) { for ( int i=0; i<3; i++ ) { try { @@ -844,7 +837,26 @@ namespace mongo { DBClientConnection* m = checkMaster(); if ( actualServer ) *actualServer = m->getServerAddress(); - return m->call( toSend , response , assertOk ); + + if ( ! m->call( toSend , response , assertOk ) ) + return false; + + if ( ns ) { + QueryResult * res = (QueryResult*)response.singleData(); + if ( res->nReturned == 1 ) { + BSONObj x(res->data() ); + if ( str::contains( ns , "$cmd" ) ) { + if ( isNotMasterErrorString( x["errmsg"] ) ) + isntMaster(); + } + else { + if ( isNotMasterErrorString( getErrField( x ) ) ) + isntMaster(); + } + } + } + + return true; } } diff --git a/client/distlock.cpp b/client/distlock.cpp index cb71159..595fc38 100644 --- a/client/distlock.cpp +++ b/client/distlock.cpp @@ -22,6 +22,7 @@ namespace mongo { LabeledLevel DistributedLock::logLvl( 1 ); + DistributedLock::LastPings DistributedLock::lastPings; ThreadLocalValue<string> distLockIds(""); @@ -84,7 +85,7 @@ namespace mongo { Date_t pingTime; try { - ScopedDbConnection conn( addr ); + ScopedDbConnection conn( addr, 30.0 ); pingTime = jsTime(); @@ -224,7 +225,7 @@ namespace mongo { string s = pingThreadId( conn, processId ); // Ignore if we already have a pinging thread for this process. - if ( _seen.count( s ) > 0 ) return ""; + if ( _seen.count( s ) > 0 ) return s; // Check our clock skew try { @@ -303,6 +304,18 @@ namespace mongo { log( logLvl - 1 ) << "created new distributed lock for " << name << " on " << conn << " ( lock timeout : " << _lockTimeout << ", ping interval : " << _lockPing << ", process : " << asProcess << " )" << endl; + + + } + + DistributedLock::PingData DistributedLock::LastPings::getLastPing( const ConnectionString& conn, const string& lockName ){ + scoped_lock lock( _mutex ); + return _lastPings[ std::pair< string, string >( conn.toString(), lockName ) ]; + } + + void DistributedLock::LastPings::setLastPing( const ConnectionString& conn, const string& lockName, const PingData& pd ){ + scoped_lock lock( _mutex ); + _lastPings[ std::pair< string, string >( conn.toString(), lockName ) ] = pd; } Date_t DistributedLock::getRemoteTime() { @@ -512,6 +525,7 @@ namespace mongo { unsigned long long elapsed = 0; unsigned long long takeover = _lockTimeout; + PingData _lastPingCheck = getLastPing(); log( logLvl ) << "checking last ping for lock '" << lockName << "'" << " against process " << _lastPingCheck.get<0>() << " and ping " << _lastPingCheck.get<1>() << endl; @@ -527,8 +541,7 @@ namespace mongo { if( recPingChange || recTSChange ) { // If the ping has changed since we last checked, mark the current date and time - scoped_lock lk( _mutex ); - _lastPingCheck = boost::tuple<string, Date_t, Date_t, OID>( lastPing["_id"].String().c_str(), lastPing["ping"].Date(), remote, o["ts"].OID() ); + setLastPing( PingData( lastPing["_id"].String().c_str(), lastPing["ping"].Date(), remote, o["ts"].OID() ) ); } else { @@ -540,7 +553,6 @@ namespace mongo { else elapsed = remote - _lastPingCheck.get<2>(); } - } catch( LockException& e ) { diff --git a/client/distlock.h b/client/distlock.h index 8985672..106a5d0 100644 --- a/client/distlock.h +++ b/client/distlock.h @@ -71,6 +71,22 @@ namespace mongo { static LabeledLevel logLvl; + typedef boost::tuple<string, Date_t, Date_t, OID> PingData; + + class LastPings { + public: + LastPings() : _mutex( "DistributedLock::LastPings" ) {} + ~LastPings(){} + + PingData getLastPing( const ConnectionString& conn, const string& lockName ); + void setLastPing( const ConnectionString& conn, const string& lockName, const PingData& pd ); + + mongo::mutex _mutex; + map< std::pair<string, string>, PingData > _lastPings; + }; + + static LastPings lastPings; + /** * The constructor does not connect to the configdb yet and constructing does not mean the lock was acquired. * Construction does trigger a lock "pinging" mechanism, though. @@ -145,16 +161,12 @@ namespace mongo { private: - void resetLastPing(){ - scoped_lock lk( _mutex ); - _lastPingCheck = boost::tuple<string, Date_t, Date_t, OID>(); - } - - mongo::mutex _mutex; + void resetLastPing(){ lastPings.setLastPing( _conn, _name, PingData() ); } + void setLastPing( const PingData& pd ){ lastPings.setLastPing( _conn, _name, pd ); } + PingData getLastPing(){ return lastPings.getLastPing( _conn, _name ); } - // Data from last check of process with ping time - boost::tuple<string, Date_t, Date_t, OID> _lastPingCheck; // May or may not exist, depending on startup + mongo::mutex _mutex; string _threadId; }; diff --git a/client/distlock_test.cpp b/client/distlock_test.cpp index 42a1c48..5f37e6b 100644 --- a/client/distlock_test.cpp +++ b/client/distlock_test.cpp @@ -195,6 +195,7 @@ namespace mongo { boost::variate_generator<boost::mt19937&, boost::uniform_int<> > randomSkew(gen, boost::uniform_int<>(0, skewRange)); boost::variate_generator<boost::mt19937&, boost::uniform_int<> > randomWait(gen, boost::uniform_int<>(1, threadWait)); boost::variate_generator<boost::mt19937&, boost::uniform_int<> > randomSleep(gen, boost::uniform_int<>(1, threadSleep)); + boost::variate_generator<boost::mt19937&, boost::uniform_int<> > randomNewLock(gen, boost::uniform_int<>(0, 3)); int skew = 0; @@ -262,7 +263,7 @@ namespace mongo { } else { log() << "**** Not unlocking for thread " << threadId << endl; - DistributedLock::killPinger( *myLock ); + assert( DistributedLock::killPinger( *myLock ) ); // We're simulating a crashed process... break; } @@ -274,6 +275,12 @@ namespace mongo { break; } + // Create a new lock 1/3 of the time + if( randomNewLock() > 1 ){ + lock.reset(new DistributedLock( hostConn, lockName, takeoverMS, true )); + myLock = lock.get(); + } + sleepmillis(randomSleep()); } diff --git a/client/parallel.cpp b/client/parallel.cpp index 76b0168..3a33eb5 100644 --- a/client/parallel.cpp +++ b/client/parallel.cpp @@ -67,7 +67,7 @@ namespace mongo { assert( cursor ); if ( cursor->hasResultFlag( ResultFlag_ShardConfigStale ) ) { - throw StaleConfigException( _ns , "ClusteredCursor::query" ); + throw StaleConfigException( _ns , "ClusteredCursor::_checkCursor" ); } if ( cursor->hasResultFlag( ResultFlag_ErrSet ) ) { @@ -90,7 +90,7 @@ namespace mongo { if ( conn.setVersion() ) { conn.done(); - throw StaleConfigException( _ns , "ClusteredCursor::query ShardConnection had to change" , true ); + throw StaleConfigException( _ns , "ClusteredCursor::query" , true ); } LOG(5) << "ClusteredCursor::query (" << type() << ") server:" << server @@ -490,7 +490,7 @@ namespace mongo { if ( conns[i]->setVersion() ) { conns[i]->done(); - staleConfigExs.push_back( StaleConfigException( _ns , "ClusteredCursor::query ShardConnection had to change" , true ).what() + errLoc ); + staleConfigExs.push_back( (string)"stale config detected for " + StaleConfigException( _ns , "ParallelCursor::_init" , true ).what() + errLoc ); break; } @@ -592,7 +592,7 @@ namespace mongo { // when we throw our exception allConfigStale = true; - staleConfigExs.push_back( e.what() + errLoc ); + staleConfigExs.push_back( (string)"stale config detected for " + e.what() + errLoc ); _cursors[i].reset( NULL ); conns[i]->done(); continue; diff --git a/db/cloner.cpp b/db/cloner.cpp index 8956133..f13ea52 100644 --- a/db/cloner.cpp +++ b/db/cloner.cpp @@ -83,6 +83,12 @@ namespace mongo { BSONElement e = i.next(); if ( e.eoo() ) break; + + // for now, skip the "v" field so that v:0 indexes will be upgraded to v:1 + if ( string("v") == e.fieldName() ) { + continue; + } + if ( string("ns") == e.fieldName() ) { uassert( 10024 , "bad ns field for index during dbcopy", e.type() == String); const char *p = strchr(e.valuestr(), '.'); diff --git a/db/commands/isself.cpp b/db/commands/isself.cpp index 5a868de..7b1cea4 100644 --- a/db/commands/isself.cpp +++ b/db/commands/isself.cpp @@ -4,6 +4,7 @@ #include "../../util/net/listen.h" #include "../commands.h" #include "../../client/dbclient.h" +#include "../security.h" #ifndef _WIN32 # ifndef __sunos__ @@ -211,6 +212,11 @@ namespace mongo { return false; } + if (!noauth && cmdLine.keyFile && + !conn.auth("local", internalSecurity.user, internalSecurity.pwd, errmsg, false)) { + return false; + } + BSONObj out; bool ok = conn.simpleCommand( "admin" , &out , "_isSelf" ); diff --git a/db/commands/mr.cpp b/db/commands/mr.cpp index 56e9770..b79e62b 100644 --- a/db/commands/mr.cpp +++ b/db/commands/mr.cpp @@ -1119,6 +1119,7 @@ namespace mongo { virtual LockType locktype() const { return NONE; } bool run(const string& dbname , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool) { + ShardedConnectionInfo::addHook(); string shardedOutputCollection = cmdObj["shardedOutputCollection"].valuestrsafe(); string postProcessCollection = cmdObj["postProcessCollection"].valuestrsafe(); bool postProcessOnly = !(postProcessCollection.empty()); @@ -708,6 +708,12 @@ int main(int argc, char* argv[]) { else { dbpath = "/data/db/"; } +#ifdef _WIN32 + if (dbpath.size() > 1 && dbpath[dbpath.size()-1] == '/') { + // size() check is for the unlikely possibility of --dbpath "/" + dbpath = dbpath.erase(dbpath.size()-1); + } +#endif if ( params.count("directoryperdb")) { directoryperdb = true; diff --git a/db/dbcommands.cpp b/db/dbcommands.cpp index 31f4b7f..b2e6218 100644 --- a/db/dbcommands.cpp +++ b/db/dbcommands.cpp @@ -510,9 +510,11 @@ namespace mongo { t.appendNumber( "mappedWithJournal" , m ); } - if( v - m > 5000 ) { + int overhead = v - m - connTicketHolder.used(); + + if( overhead > 4000 ) { t.append("note", "virtual minus mapped is large. could indicate a memory leak"); - log() << "warning: virtual size (" << v << "MB) - mapped size (" << m << "MB) is large. could indicate a memory leak" << endl; + log() << "warning: virtual size (" << v << "MB) - mapped size (" << m << "MB) is large (" << overhead << "MB). could indicate a memory leak" << endl; } t.done(); @@ -949,7 +951,7 @@ namespace mongo { } list<BSONObj> all; - auto_ptr<DBClientCursor> i = db.getIndexes( toDeleteNs ); + auto_ptr<DBClientCursor> i = db.query( dbname + ".system.indexes" , BSON( "ns" << toDeleteNs ) , 0 , 0 , 0 , QueryOption_SlaveOk ); BSONObjBuilder b; while ( i->more() ) { BSONObj o = i->next().removeField("v").getOwned(); @@ -1104,6 +1106,10 @@ namespace mongo { BSONObj sort = BSON( "files_id" << 1 << "n" << 1 ); shared_ptr<Cursor> cursor = bestGuessCursor(ns.c_str(), query, sort); + if ( ! cursor ) { + errmsg = "need an index on { files_id : 1 , n : 1 }"; + return false; + } auto_ptr<ClientCursor> cc (new ClientCursor(QueryOption_NoCursorTimeout, cursor, ns.c_str())); int n = 0; diff --git a/db/dbcommands_generic.cpp b/db/dbcommands_generic.cpp index 69b51c7..22cee22 100644 --- a/db/dbcommands_generic.cpp +++ b/db/dbcommands_generic.cpp @@ -51,7 +51,8 @@ namespace mongo { public: CmdBuildInfo() : Command( "buildInfo", true, "buildinfo" ) {} virtual bool slaveOk() const { return true; } - virtual bool adminOnly() const { return true; } + virtual bool adminOnly() const { return false; } + virtual bool requiresAuth() { return false; } virtual LockType locktype() const { return NONE; } virtual void help( stringstream &help ) const { help << "get version #, etc.\n"; diff --git a/db/dbhelpers.cpp b/db/dbhelpers.cpp index cc4fdba..33ac9b7 100644 --- a/db/dbhelpers.cpp +++ b/db/dbhelpers.cpp @@ -157,6 +157,7 @@ namespace mongo { } DiskLoc Helpers::findById(NamespaceDetails *d, BSONObj idquery) { + assert(d); int idxNo = d->findIdIndex(); uassert(13430, "no _id index", idxNo>=0); IndexDetails& i = d->idx( idxNo ); diff --git a/db/geo/2d.cpp b/db/geo/2d.cpp index b873490..40df5e2 100644 --- a/db/geo/2d.cpp +++ b/db/geo/2d.cpp @@ -2647,7 +2647,10 @@ namespace mongo { BSONObjBuilder bb( arr.subobjStart( BSONObjBuilder::numStr( x++ ) ) ); bb.append( "dis" , dis ); - if( includeLocs ) bb.append( "loc" , p._pt ); + if( includeLocs ){ + if( p._pt.couldBeArray() ) bb.append( "loc", BSONArray( p._pt ) ); + else bb.append( "loc" , p._pt ); + } bb.append( "obj" , p._o ); bb.done(); } diff --git a/db/instance.cpp b/db/instance.cpp index 6727867..764571d 100644 --- a/db/instance.cpp +++ b/db/instance.cpp @@ -353,20 +353,19 @@ namespace mongo { } currentOp.ensureStarted(); currentOp.done(); - int ms = currentOp.totalTimeMillis(); + debug.executionTime = currentOp.totalTimeMillis(); //DEV log = true; - if ( log || ms > logThreshold ) { - if( logLevel < 3 && op == dbGetMore && strstr(ns, ".oplog.") && ms < 4300 && !log ) { + if ( log || debug.executionTime > logThreshold ) { + if( logLevel < 3 && op == dbGetMore && strstr(ns, ".oplog.") && debug.executionTime < 4300 && !log ) { /* it's normal for getMore on the oplog to be slow because of use of awaitdata flag. */ } else { - debug.executionTime = ms; mongo::tlog() << debug << endl; } } - if ( currentOp.shouldDBProfile( ms ) ) { + if ( currentOp.shouldDBProfile( debug.executionTime ) ) { // performance profiling is on if ( dbMutex.getState() < 0 ) { mongo::log(1) << "note: not profiling because recursive read lock" << endl; diff --git a/db/jsobj.cpp b/db/jsobj.cpp index dcb7744..9644a87 100644 --- a/db/jsobj.cpp +++ b/db/jsobj.cpp @@ -753,6 +753,21 @@ namespace mongo { return n; } + bool BSONObj::couldBeArray() const { + BSONObjIterator i( *this ); + int index = 0; + while( i.moreWithEOO() ){ + BSONElement e = i.next(); + if( e.eoo() ) break; + + // TODO: If actually important, may be able to do int->char* much faster + if( strcmp( e.fieldName(), ((string)( str::stream() << index )).c_str() ) != 0 ) + return false; + index++; + } + return true; + } + BSONObj BSONObj::clientReadable() const { BSONObjBuilder b; BSONObjIterator i( *this ); diff --git a/db/oplog.cpp b/db/oplog.cpp index dc9db76..5c1671c 100644 --- a/db/oplog.cpp +++ b/db/oplog.cpp @@ -625,9 +625,47 @@ namespace mongo { } } - void applyOperation_inlock(const BSONObj& op , bool fromRepl ) { + bool shouldRetry(const BSONObj& o, const string& hn) { + OplogReader missingObjReader; + + // we don't have the object yet, which is possible on initial sync. get it. + log() << "replication info adding missing object" << endl; // rare enough we can log + uassert(15916, str::stream() << "Can no longer connect to initial sync source: " << hn, missingObjReader.connect(hn)); + + const char *ns = o.getStringField("ns"); + // might be more than just _id in the update criteria + BSONObj query = BSONObjBuilder().append(o.getObjectField("o2")["_id"]).obj(); + BSONObj missingObj; + try { + missingObj = missingObjReader.findOne(ns, query); + } catch(DBException& e) { + log() << "replication assertion fetching missing object: " << e.what() << endl; + throw; + } + + if( missingObj.isEmpty() ) { + log() << "replication missing object not found on source. presumably deleted later in oplog" << endl; + log() << "replication o2: " << o.getObjectField("o2").toString() << endl; + log() << "replication o firstfield: " << o.getObjectField("o").firstElementFieldName() << endl; + + return false; + } + else { + Client::Context ctx(ns); + DiskLoc d = theDataFileMgr.insert(ns, (void*) missingObj.objdata(), missingObj.objsize()); + uassert(15917, "Got bad disk location when attempting to insert", !d.isNull()); + + return true; + } + } + + /** @param fromRepl false if from ApplyOpsCmd + @return true if was and update should have happened and the document DNE. see replset initial sync code. + */ + bool applyOperation_inlock(const BSONObj& op , bool fromRepl ) { assertInWriteLock(); LOG(6) << "applying op: " << op << endl; + bool failedUpdate = false; OpCounters * opCounters = fromRepl ? &replOpCounters : &globalOpCounters; @@ -680,9 +718,45 @@ namespace mongo { } else if ( *opType == 'u' ) { opCounters->gotUpdate(); + // dm do we create this for a capped collection? + // - if not, updates would be slow + // - but if were by id would be slow on primary too so maybe ok + // - if on primary was by another key and there are other indexes, this could be very bad w/out an index + // - if do create, odd to have on secondary but not primary. also can cause secondary to block for + // quite a while on creation. RARELY ensureHaveIdIndex(ns); // otherwise updates will be super slow OpDebug debug; - updateObjects(ns, o, op.getObjectField("o2"), /*upsert*/ fields[3].booleanSafe(), /*multi*/ false, /*logop*/ false , debug ); + BSONObj updateCriteria = op.getObjectField("o2"); + bool upsert = fields[3].booleanSafe(); + UpdateResult ur = updateObjects(ns, o, updateCriteria, upsert, /*multi*/ false, /*logop*/ false , debug ); + if( ur.num == 0 ) { + if( ur.mod ) { + if( updateCriteria.nFields() == 1 ) { + // was a simple { _id : ... } update criteria + failedUpdate = true; + // todo: probably should assert in these failedUpdate cases if not in initialSync + } + // need to check to see if it isn't present so we can set failedUpdate correctly. + // note that adds some overhead for this extra check in some cases, such as an updateCriteria + // of the form + // { _id:..., { x : {$size:...} } + // thus this is not ideal. + else if( nsdetails(ns) == NULL || Helpers::findById(nsdetails(ns), updateCriteria).isNull() ) { + failedUpdate = true; + } + else { + // it's present; zero objects were updated because of additional specifiers in the query for idempotence + } + } + else { + // this could happen benignly on an oplog duplicate replay of an upsert + // (because we are idempotent), + // if an regular non-mod update fails the item is (presumably) missing. + if( !upsert ) { + failedUpdate = true; + } + } + } } else if ( *opType == 'd' ) { opCounters->gotDelete(); @@ -703,7 +777,7 @@ namespace mongo { else { throw MsgAssertionException( 14825 , ErrorMsg("error in applyOperation : unknown opType ", *opType) ); } - + return failedUpdate; } class ApplyOpsCmd : public Command { @@ -129,6 +129,12 @@ namespace mongo { * take an op and apply locally * used for applying from an oplog * @param fromRepl really from replication or for testing/internal/command/etc... + * Returns if the op was an update that could not be applied (true on failure) */ - void applyOperation_inlock(const BSONObj& op , bool fromRepl = true ); + bool applyOperation_inlock(const BSONObj& op , bool fromRepl = true ); + + /** + * If applyOperation_inlock should be called again after an update fails. + */ + bool shouldRetry(const BSONObj& op , const string& hn); } diff --git a/db/ops/update.cpp b/db/ops/update.cpp index fd9798a..6a7aad4 100644 --- a/db/ops/update.cpp +++ b/db/ops/update.cpp @@ -1354,7 +1354,8 @@ namespace mongo { logOp( "i", ns, no ); return UpdateResult( 0 , 0 , 1 , no ); } - return UpdateResult( 0 , 0 , 0 ); + + return UpdateResult( 0 , isOperatorUpdate , 0 ); } UpdateResult updateObjects(const char *ns, const BSONObj& updateobj, BSONObj patternOrig, bool upsert, bool multi, bool logop , OpDebug& debug ) { diff --git a/db/queryutil.h b/db/queryutil.h index 104cde2..5d86194 100644 --- a/db/queryutil.h +++ b/db/queryutil.h @@ -328,7 +328,7 @@ namespace mongo { bool matchesElement( const BSONElement &e, int i, bool direction ) const; bool matchesKey( const BSONObj &key ) const; vector<FieldRange> _ranges; - const IndexSpec &_indexSpec; + IndexSpec _indexSpec; int _direction; vector<BSONObj> _queries; // make sure mem owned friend class FieldRangeVectorIterator; diff --git a/db/record.cpp b/db/record.cpp index 51dc520..a8a3e43 100644 --- a/db/record.cpp +++ b/db/record.cpp @@ -112,7 +112,8 @@ namespace mongo { class Rolling { public: - Rolling() { + Rolling() + : _lock( "ps::Rolling" ){ _curSlice = 0; _lastRotate = Listener::getElapsedTimeMillis(); } @@ -126,8 +127,8 @@ namespace mongo { bool access( size_t region , short offset , bool doHalf ) { int regionHash = hash(region); - scoped_spinlock lk( _lock ); - + SimpleMutex::scoped_lock lk( _lock ); + static int rarely_count = 0; if ( rarely_count++ % 2048 == 0 ) { long long now = Listener::getElapsedTimeMillis(); @@ -174,7 +175,7 @@ namespace mongo { long long _lastRotate; Slice _slices[NumSlices]; - SpinLock _lock; + SimpleMutex _lock; } rolling; } diff --git a/db/repl.cpp b/db/repl.cpp index a18d725..5edf0c2 100644 --- a/db/repl.cpp +++ b/db/repl.cpp @@ -508,12 +508,12 @@ namespace mongo { return; } - + DatabaseIgnorer ___databaseIgnorer; - + void DatabaseIgnorer::doIgnoreUntilAfter( const string &db, const OpTime &futureOplogTime ) { if ( futureOplogTime > _ignores[ db ] ) { - _ignores[ db ] = futureOplogTime; + _ignores[ db ] = futureOplogTime; } } @@ -533,28 +533,28 @@ namespace mongo { bool ReplSource::handleDuplicateDbName( const BSONObj &op, const char *ns, const char *db ) { if ( dbHolder.isLoaded( ns, dbpath ) ) { // Database is already present. - return true; + return true; } BSONElement ts = op.getField( "ts" ); if ( ( ts.type() == Date || ts.type() == Timestamp ) && ___databaseIgnorer.ignoreAt( db, ts.date() ) ) { // Database is ignored due to a previous indication that it is // missing from master after optime "ts". - return false; + return false; } if ( Database::duplicateUncasedName( db, dbpath ).empty() ) { // No duplicate database names are present. return true; } - + OpTime lastTime; bool dbOk = false; { dbtemprelease release; - + // We always log an operation after executing it (never before), so // a database list will always be valid as of an oplog entry generated // before it was retrieved. - + BSONObj last = oplogReader.findOne( this->ns().c_str(), Query().sort( BSON( "$natural" << -1 ) ) ); if ( !last.isEmpty() ) { BSONElement ts = last.getField( "ts" ); @@ -568,34 +568,34 @@ namespace mongo { BSONObjIterator i( info.getField( "databases" ).embeddedObject() ); while( i.more() ) { BSONElement e = i.next(); - + const char * name = e.embeddedObject().getField( "name" ).valuestr(); if ( strcasecmp( name, db ) != 0 ) continue; - + if ( strcmp( name, db ) == 0 ) { // The db exists on master, still need to check that no conflicts exist there. dbOk = true; continue; } - + // The master has a db name that conflicts with the requested name. dbOk = false; break; } } - + if ( !dbOk ) { ___databaseIgnorer.doIgnoreUntilAfter( db, lastTime ); incompleteCloneDbs.erase(db); addDbNextPass.erase(db); - return false; + return false; } - + // Check for duplicates again, since we released the lock above. set< string > duplicates; Database::duplicateUncasedName( db, dbpath, &duplicates ); - + // The database is present on the master and no conflicting databases // are present on the master. Drop any local conflicts. for( set< string >::const_iterator i = duplicates.begin(); i != duplicates.end(); ++i ) { @@ -605,7 +605,7 @@ namespace mongo { Client::Context ctx(*i); dropDatabase(*i); } - + massert( 14034, "Duplicate database names present after attempting to delete duplicates", Database::duplicateUncasedName( db, dbpath ).empty() ); return true; @@ -613,7 +613,11 @@ namespace mongo { void ReplSource::applyOperation(const BSONObj& op) { try { - applyOperation_inlock( op ); + bool failedUpdate = applyOperation_inlock( op ); + if (failedUpdate && shouldRetry(op, hostName)) { + failedUpdate = applyOperation_inlock( op ); + uassert(15914, "Failure retrying initial sync update", ! failedUpdate ); + } } catch ( UserException& e ) { log() << "sync: caught user assertion " << e << " while applying op: " << op << endl;; @@ -705,9 +709,9 @@ namespace mongo { } if ( !handleDuplicateDbName( op, ns, clientName ) ) { - return; + return; } - + Client::Context ctx( ns ); ctx.getClient()->curop()->reset(); @@ -943,7 +947,7 @@ namespace mongo { } // otherwise, break out of loop so we can set to completed or clone more dbs } - + if( oplogReader.awaitCapable() && tailing ) okResultCode = 0; // don't sleep syncedTo = nextOpTime; @@ -1077,7 +1081,7 @@ namespace mongo { BSONObj me; { - + dblock l; // local.me is an identifier for a server for getLastError w:2+ if ( ! Helpers::getSingleton( "local.me" , me ) || @@ -1123,7 +1127,7 @@ namespace mongo { } return true; } - + bool OplogReader::connect(string hostName) { if (conn() != 0) { return true; @@ -122,11 +122,11 @@ namespace mongo { * @return true iff an op with the specified ns may be applied. */ bool handleDuplicateDbName( const BSONObj &op, const char *ns, const char *db ); - + public: OplogReader oplogReader; - static void applyOperation(const BSONObj& op); + void applyOperation(const BSONObj& op); string hostName; // ip addr or hostname plus optionally, ":<port>" string _sourceName; // a logical source name. string sourceName() const { return _sourceName.empty() ? "main" : _sourceName; } diff --git a/db/repl/connections.h b/db/repl/connections.h index 78cfb30..61c581b 100644 --- a/db/repl/connections.h +++ b/db/repl/connections.h @@ -47,6 +47,10 @@ namespace mongo { ~ScopedConn() { // conLock releases... } + void reconnect() { + conn()->port().shutdown(); + connect(); + } /* If we were to run a query and not exhaust the cursor, future use of the connection would be problematic. So here what we do is wrapper known safe methods and not allow cursor-style queries at all. This makes @@ -61,9 +65,6 @@ namespace mongo { BSONObj findOne(const string &ns, const Query& q, const BSONObj *fieldsToReturn = 0, int queryOptions = 0) { return conn()->findOne(ns, q, fieldsToReturn, queryOptions); } - void setTimeout(double to) { - conn()->setSoTimeout(to); - } private: auto_ptr<scoped_lock> connLock; @@ -78,15 +79,36 @@ namespace mongo { typedef map<string,ScopedConn::X*> M; static M& _map; DBClientConnection* conn() { return &x->cc; } + const string _hostport; + + // we should already be locked... + bool connect() { + string err; + if (!x->cc.connect(_hostport, err)) { + log() << "couldn't connect to " << _hostport << ": " << err << rsLog; + return false; + } + + // if we cannot authenticate against a member, then either its key file + // or our key file has to change. if our key file has to change, we'll + // be rebooting. if their file has to change, they'll be rebooted so the + // connection created above will go dead, reconnect, and reauth. + if (!noauth && !x->cc.auth("local", internalSecurity.user, internalSecurity.pwd, err, false)) { + log() << "could not authenticate against " << _hostport << ", " << err << rsLog; + return false; + } + + return true; + } }; - inline ScopedConn::ScopedConn(string hostport) { + inline ScopedConn::ScopedConn(string hostport) : _hostport(hostport) { bool first = false; { scoped_lock lk(mapMutex); - x = _map[hostport]; + x = _map[_hostport]; if( x == 0 ) { - x = _map[hostport] = new X(); + x = _map[_hostport] = new X(); first = true; connLock.reset( new scoped_lock(x->z) ); } @@ -96,17 +118,7 @@ namespace mongo { return; } - // we already locked above... - string err; - if (!x->cc.connect(hostport, err)) { - log() << "couldn't connect to " << hostport << ": " << err << rsLog; - return; - } - - if (!noauth && !x->cc.auth("local", internalSecurity.user, internalSecurity.pwd, err, false)) { - log() << "could not authenticate against " << conn()->toString() << ", " << err << rsLog; - return; - } + connect(); } } diff --git a/db/repl/health.cpp b/db/repl/health.cpp index 711b457..7e5a39f 100644 --- a/db/repl/health.cpp +++ b/db/repl/health.cpp @@ -402,6 +402,11 @@ namespace mongo { string s = m->lhb(); if( !s.empty() ) bb.append("errmsg", s); + + if (m->hbinfo().authIssue) { + bb.append("authenticated", false); + } + v.push_back(bb.obj()); m = m->next(); } diff --git a/db/repl/heartbeat.cpp b/db/repl/heartbeat.cpp index 7d3f78c..138ba45 100644 --- a/db/repl/heartbeat.cpp +++ b/db/repl/heartbeat.cpp @@ -51,11 +51,14 @@ namespace mongo { /* { replSetHeartbeat : <setname> } */ class CmdReplSetHeartbeat : public ReplSetCommand { public: - virtual bool adminOnly() const { return false; } CmdReplSetHeartbeat() : ReplSetCommand("replSetHeartbeat") { } virtual bool run(const string& , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool fromRepl) { - if( replSetBlind ) + if( replSetBlind ) { + if (theReplSet) { + errmsg = str::stream() << theReplSet->selfFullName() << " is blind"; + } return false; + } /* we don't call ReplSetCommand::check() here because heartbeat checks many things that are pre-initialization. */ @@ -99,8 +102,8 @@ namespace mongo { if( !from.empty() ) { replSettings.discoveredSeeds.insert(from); } - errmsg = "still initializing"; - return false; + result.append("hbmsg", "still initializing"); + return true; } if( theReplSet->name() != cmdObj.getStringField("replSetHeartbeat") ) { @@ -123,32 +126,54 @@ namespace mongo { } } cmdReplSetHeartbeat; - /* throws dbexception */ - bool requestHeartbeat(string setName, string from, string memberFullName, BSONObj& result, int myCfgVersion, int& theirCfgVersion, bool checkEmpty) { + bool requestHeartbeat(string setName, string from, string memberFullName, BSONObj& result, + int myCfgVersion, int& theirCfgVersion, bool checkEmpty) { if( replSetBlind ) { - //sleepmillis( rand() ); return false; } - BSONObj cmd = BSON( "replSetHeartbeat" << setName << "v" << myCfgVersion << "pv" << 1 << "checkEmpty" << checkEmpty << "from" << from ); + BSONObj cmd = BSON( "replSetHeartbeat" << setName << + "v" << myCfgVersion << + "pv" << 1 << + "checkEmpty" << checkEmpty << + "from" << from ); - // we might be talking to ourself - generally not a great idea to do outbound waiting calls in a write lock - assert( !dbMutex.isWriteLocked() ); - - // these are slow (multisecond to respond), so generally we don't want to be locked, at least not without + // generally not a great idea to do outbound waiting calls in a + // write lock. heartbeats can be slow (multisecond to respond), so + // generally we don't want to be locked, at least not without // thinking acarefully about it first. - assert( theReplSet == 0 || !theReplSet->lockedByMe() ); + uassert(15900, "can't heartbeat: too much lock", + !dbMutex.isWriteLocked() || theReplSet == 0 || !theReplSet->lockedByMe() ); ScopedConn conn(memberFullName); return conn.runCommand("admin", cmd, result, 0); } - /* poll every other set member to check its status */ + /** + * Poll every other set member to check its status. + * + * A detail about local machines and authentication: suppose we have 2 + * members, A and B, on the same machine using different keyFiles. A is + * primary. If we're just starting the set, there are no admin users, so A + * and B can access each other because it's local access. + * + * Then we add a user to A. B cannot sync this user from A, because as soon + * as we add a an admin user, A requires auth. However, A can still + * heartbeat B, because B *doesn't* have an admin user. So A can reach B + * but B cannot reach A. + * + * Once B is restarted with the correct keyFile, everything should work as + * expected. + */ class ReplSetHealthPollTask : public task::Task { + private: HostAndPort h; HeartbeatInfo m; + int tries; + const int threshold; public: - ReplSetHealthPollTask(const HostAndPort& hh, const HeartbeatInfo& mm) : h(hh), m(mm) { } + ReplSetHealthPollTask(const HostAndPort& hh, const HeartbeatInfo& mm) + : h(hh), m(mm), tries(0), threshold(15) { } string name() const { return "rsHealthPoll"; } void doWork() { @@ -163,16 +188,7 @@ namespace mongo { BSONObj info; int theirConfigVersion = -10000; - Timer timer; - - bool ok = requestHeartbeat(theReplSet->name(), theReplSet->selfFullName(), h.toString(), info, theReplSet->config().version, theirConfigVersion); - - mem.ping = (unsigned int)timer.millis(); - - time_t before = timer.startTime() / 1000000; - // we set this on any response - we don't get this far if - // couldn't connect because exception is thrown - time_t after = mem.lastHeartbeat = before + (mem.ping / 1000); + bool ok = _requestHeartbeat(mem, info, theirConfigVersion); // weight new ping with old pings // on the first ping, just use the ping value @@ -180,68 +196,12 @@ namespace mongo { mem.ping = (unsigned int)((old.ping * .8) + (mem.ping * .2)); } - if ( info["time"].isNumber() ) { - long long t = info["time"].numberLong(); - if( t > after ) - mem.skew = (int) (t - after); - else if( t < before ) - mem.skew = (int) (t - before); // negative - } - else { - // it won't be there if remote hasn't initialized yet - if( info.hasElement("time") ) - warning() << "heatbeat.time isn't a number: " << info << endl; - mem.skew = INT_MIN; - } - - { - be state = info["state"]; - if( state.ok() ) - mem.hbstate = MemberState(state.Int()); - } if( ok ) { - HeartbeatInfo::numPings++; - - if( mem.upSince == 0 ) { - log() << "replSet info member " << h.toString() << " is up" << rsLog; - mem.upSince = mem.lastHeartbeat; - } - mem.health = 1.0; - mem.lastHeartbeatMsg = info["hbmsg"].String(); - if( info.hasElement("opTime") ) - mem.opTime = info["opTime"].Date(); - - // see if this member is in the electable set - if( info["e"].eoo() ) { - // for backwards compatibility - const Member *member = theReplSet->findById(mem.id()); - if (member && member->config().potentiallyHot()) { - theReplSet->addToElectable(mem.id()); - } - else { - theReplSet->rmFromElectable(mem.id()); - } - } - // add this server to the electable set if it is within 10 - // seconds of the latest optime we know of - else if( info["e"].trueValue() && - mem.opTime >= theReplSet->lastOpTimeWritten.getSecs() - 10) { - unsigned lastOp = theReplSet->lastOtherOpTime().getSecs(); - if (lastOp > 0 && mem.opTime >= lastOp - 10) { - theReplSet->addToElectable(mem.id()); - } - } - else { - theReplSet->rmFromElectable(mem.id()); - } - - be cfg = info["config"]; - if( cfg.ok() ) { - // received a new config - boost::function<void()> f = - boost::bind(&Manager::msgReceivedNewConfig, theReplSet->mgr, cfg.Obj().copy()); - theReplSet->mgr->send(f); - } + up(info, mem); + } + else if (!info["errmsg"].eoo() && + info["errmsg"].str() == "need to login") { + authIssue(mem); } else { down(mem, info.getStringField("errmsg")); @@ -271,7 +231,58 @@ namespace mongo { } private: + bool _requestHeartbeat(HeartbeatInfo& mem, BSONObj& info, int& theirConfigVersion) { + if (tries++ % threshold == (threshold - 1)) { + ScopedConn conn(h.toString()); + conn.reconnect(); + } + + Timer timer; + + bool ok = requestHeartbeat(theReplSet->name(), theReplSet->selfFullName(), + h.toString(), info, theReplSet->config().version, theirConfigVersion); + + mem.ping = (unsigned int)timer.millis(); + + time_t before = timer.startTime() / 1000000; + // we set this on any response - we don't get this far if + // couldn't connect because exception is thrown + time_t after = mem.lastHeartbeat = before + (mem.ping / 1000); + + if ( info["time"].isNumber() ) { + long long t = info["time"].numberLong(); + if( t > after ) + mem.skew = (int) (t - after); + else if( t < before ) + mem.skew = (int) (t - before); // negative + } + else { + // it won't be there if remote hasn't initialized yet + if( info.hasElement("time") ) + warning() << "heatbeat.time isn't a number: " << info << endl; + mem.skew = INT_MIN; + } + + { + be state = info["state"]; + if( state.ok() ) + mem.hbstate = MemberState(state.Int()); + } + + return ok; + } + + void authIssue(HeartbeatInfo& mem) { + mem.authIssue = true; + mem.hbstate = MemberState::RS_UNKNOWN; + + // set health to 0 so that this doesn't count towards majority + mem.health = 0.0; + theReplSet->rmFromElectable(mem.id()); + } + void down(HeartbeatInfo& mem, string msg) { + mem.authIssue = false; mem.health = 0.0; mem.ping = 0; if( mem.upSince || mem.downSince == 0 ) { @@ -283,6 +294,52 @@ namespace mongo { mem.lastHeartbeatMsg = msg; theReplSet->rmFromElectable(mem.id()); } + + void up(const BSONObj& info, HeartbeatInfo& mem) { + HeartbeatInfo::numPings++; + mem.authIssue = false; + + if( mem.upSince == 0 ) { + log() << "replSet member " << h.toString() << " is up" << rsLog; + mem.upSince = mem.lastHeartbeat; + } + mem.health = 1.0; + mem.lastHeartbeatMsg = info["hbmsg"].String(); + if( info.hasElement("opTime") ) + mem.opTime = info["opTime"].Date(); + + // see if this member is in the electable set + if( info["e"].eoo() ) { + // for backwards compatibility + const Member *member = theReplSet->findById(mem.id()); + if (member && member->config().potentiallyHot()) { + theReplSet->addToElectable(mem.id()); + } + else { + theReplSet->rmFromElectable(mem.id()); + } + } + // add this server to the electable set if it is within 10 + // seconds of the latest optime we know of + else if( info["e"].trueValue() && + mem.opTime >= theReplSet->lastOpTimeWritten.getSecs() - 10) { + unsigned lastOp = theReplSet->lastOtherOpTime().getSecs(); + if (lastOp > 0 && mem.opTime >= lastOp - 10) { + theReplSet->addToElectable(mem.id()); + } + } + else { + theReplSet->rmFromElectable(mem.id()); + } + + be cfg = info["config"]; + if( cfg.ok() ) { + // received a new config + boost::function<void()> f = + boost::bind(&Manager::msgReceivedNewConfig, theReplSet->mgr, cfg.Obj().copy()); + theReplSet->mgr->send(f); + } + } }; void ReplSetImpl::endOldHealthTasks() { diff --git a/db/repl/manager.cpp b/db/repl/manager.cpp index 3c4c0eb..c91adc3 100644 --- a/db/repl/manager.cpp +++ b/db/repl/manager.cpp @@ -119,6 +119,39 @@ namespace mongo { } } + void Manager::checkAuth() { + int down = 0, authIssue = 0, total = 0; + + for( Member *m = rs->head(); m; m=m->next() ) { + total++; + + // all authIssue servers will also be not up + if (!m->hbinfo().up()) { + down++; + if (m->hbinfo().authIssue) { + authIssue++; + } + } + } + + // if all nodes are down or failed auth AND at least one failed + // auth, go into recovering. If all nodes are down, stay a + // secondary. + if (authIssue > 0 && down == total) { + log() << "replset error could not reach/authenticate against any members" << endl; + + if (rs->box.getPrimary() == rs->_self) { + log() << "auth problems, relinquishing primary" << rsLog; + rs->relinquish(); + } + + rs->blockSync(true); + } + else { + rs->blockSync(false); + } + } + /** called as the health threads get new results */ void Manager::msgCheckNewState() { { @@ -130,7 +163,8 @@ namespace mongo { if( busyWithElectSelf ) return; checkElectableSet(); - + checkAuth(); + const Member *p = rs->box.getPrimary(); if( p && p != rs->_self ) { if( !p->hbinfo().up() || diff --git a/db/repl/rs.cpp b/db/repl/rs.cpp index 1fbbc10..f827291 100644 --- a/db/repl/rs.cpp +++ b/db/repl/rs.cpp @@ -329,6 +329,7 @@ namespace mongo { ReplSetImpl::ReplSetImpl(ReplSetCmdline& replSetCmdline) : elect(this), _currentSyncTarget(0), + _blockSync(false), _hbmsgTime(0), _self(0), _maintenanceMode(0), diff --git a/db/repl/rs.h b/db/repl/rs.h index 61041a6..2b3ea9b 100644 --- a/db/repl/rs.h +++ b/db/repl/rs.h @@ -93,6 +93,7 @@ namespace mongo { void noteARemoteIsPrimary(const Member *); void checkElectableSet(); + void checkAuth(); virtual void starting(); public: Manager(ReplSetImpl *rs); @@ -348,6 +349,9 @@ namespace mongo { const Member* getMemberToSyncTo(); Member* _currentSyncTarget; + bool _blockSync; + void blockSync(bool block); + // set of electable members' _ids set<unsigned> _electableSet; protected: @@ -491,7 +495,7 @@ namespace mongo { void _syncThread(); bool tryToGoLiveAsASecondary(OpTime&); // readlocks void syncTail(); - void syncApply(const BSONObj &o); + bool syncApply(const BSONObj &o); unsigned _syncRollback(OplogReader& r); void syncRollback(OplogReader& r); void syncFixUp(HowToFixUp& h, OplogReader& r); @@ -577,7 +581,7 @@ namespace mongo { * that still need to be checked for auth. */ bool checkAuth(string& errmsg, BSONObjBuilder& result) { - if( !noauth && adminOnly() ) { + if( !noauth ) { AuthenticationInfo *ai = cc().getAuthenticationInfo(); if (!ai->isAuthorizedForLock("admin", locktype())) { errmsg = "replSet command unauthorized"; diff --git a/db/repl/rs_config.h b/db/repl/rs_config.h index f69052a..b22b61e 100644 --- a/db/repl/rs_config.h +++ b/db/repl/rs_config.h @@ -80,6 +80,22 @@ namespace mongo { } } bool operator==(const MemberCfg& r) const { + if (!tags.empty() || !r.tags.empty()) { + if (tags.size() != r.tags.size()) { + return false; + } + + // if they are the same size and not equal, at least one + // element in A must be different in B + for (map<string,string>::const_iterator lit = tags.begin(); lit != tags.end(); lit++) { + map<string,string>::const_iterator rit = r.tags.find((*lit).first); + + if (rit == r.tags.end() || (*lit).second != (*rit).second) { + return false; + } + } + } + return _id==r._id && votes == r.votes && h == r.h && priority == r.priority && arbiterOnly == r.arbiterOnly && slaveDelay == r.slaveDelay && hidden == r.hidden && buildIndexes == buildIndexes; diff --git a/db/repl/rs_initialsync.cpp b/db/repl/rs_initialsync.cpp index 101b03a..112d739 100644 --- a/db/repl/rs_initialsync.cpp +++ b/db/repl/rs_initialsync.cpp @@ -81,6 +81,7 @@ namespace mongo { const Member* ReplSetImpl::getMemberToSyncTo() { Member *closest = 0; + bool buildIndexes = true; // wait for 2N pings before choosing a sync target if (_cfg) { @@ -90,11 +91,15 @@ namespace mongo { OCCASIONALLY log() << "waiting for " << needMorePings << " pings from other members before syncing" << endl; return NULL; } + + buildIndexes = myConfig().buildIndexes; } // find the member with the lowest ping time that has more data than me for (Member *m = _members.head(); m; m = m->next()) { if (m->hbinfo().up() && + // make sure members with buildIndexes sync from other members w/indexes + (!buildIndexes || (buildIndexes && m->config().buildIndexes)) && (m->state() == MemberState::RS_PRIMARY || (m->state() == MemberState::RS_SECONDARY && m->hbinfo().opTime > lastOpTimeWritten)) && (!closest || m->hbinfo().ping < closest->hbinfo().ping)) { diff --git a/db/repl/rs_member.h b/db/repl/rs_member.h index d60bb52..38b6c9b 100644 --- a/db/repl/rs_member.h +++ b/db/repl/rs_member.h @@ -69,7 +69,8 @@ namespace mongo { class HeartbeatInfo { unsigned _id; public: - HeartbeatInfo() : _id(0xffffffff),hbstate(MemberState::RS_UNKNOWN),health(-1.0),downSince(0),skew(INT_MIN) { } + HeartbeatInfo() : _id(0xffffffff), hbstate(MemberState::RS_UNKNOWN), health(-1.0), + downSince(0), skew(INT_MIN), authIssue(false) { } HeartbeatInfo(unsigned id); unsigned id() const { return _id; } MemberState hbstate; @@ -80,6 +81,7 @@ namespace mongo { DiagStr lastHeartbeatMsg; OpTime opTime; int skew; + bool authIssue; unsigned int ping; // milliseconds static unsigned int numPings; @@ -94,7 +96,7 @@ namespace mongo { bool changed(const HeartbeatInfo& old) const; }; - inline HeartbeatInfo::HeartbeatInfo(unsigned id) : _id(id) { + inline HeartbeatInfo::HeartbeatInfo(unsigned id) : _id(id), authIssue(false) { hbstate = MemberState::RS_UNKNOWN; health = -1.0; downSince = 0; diff --git a/db/repl/rs_sync.cpp b/db/repl/rs_sync.cpp index b29328b..8cd3e14 100644 --- a/db/repl/rs_sync.cpp +++ b/db/repl/rs_sync.cpp @@ -32,17 +32,19 @@ namespace mongo { } } - /* apply the log op that is in param o */ - void ReplSetImpl::syncApply(const BSONObj &o) { + /* apply the log op that is in param o + @return bool failedUpdate + */ + bool ReplSetImpl::syncApply(const BSONObj &o) { const char *ns = o.getStringField("ns"); if ( *ns == '.' || *ns == 0 ) { blank(o); - return; + return false; } Client::Context ctx(ns); ctx.getClient()->curop()->reset(); - applyOperation_inlock(o); + return applyOperation_inlock(o); } /* initial oplog application, during initial sync, after cloning. @@ -57,6 +59,7 @@ namespace mongo { const string hn = source->h().toString(); OplogReader r; + try { if( !r.connect(hn) ) { log() << "replSet initial sync error can't connect to " << hn << " to read " << rsoplog << rsLog; @@ -113,12 +116,9 @@ namespace mongo { if( !r.more() ) break; BSONObj o = r.nextSafe(); /* note we might get "not master" at some point */ - { - ts = o["ts"]._opTime(); + ts = o["ts"]._opTime(); - /* if we have become primary, we dont' want to apply things from elsewhere - anymore. assumePrimary is in the db lock so we are safe as long as - we check after we locked above. */ + { if( (source->state() != MemberState::RS_PRIMARY && source->state() != MemberState::RS_SECONDARY) || replSetForceInitialSyncFailure ) { @@ -133,9 +133,12 @@ namespace mongo { throw DBException("primary changed",0); } - if( ts >= applyGTE ) { - // optimes before we started copying need not be applied. - syncApply(o); + if( ts >= applyGTE ) { // optimes before we started copying need not be applied. + bool failedUpdate = syncApply(o); + if( failedUpdate && shouldRetry(o, hn)) { + failedUpdate = syncApply(o); + uassert(15915, "replSet update still fails after adding missing object", !failedUpdate); + } } _logOpObjRS(o); /* with repl sets we write the ops to our oplog too */ } @@ -149,7 +152,11 @@ namespace mongo { start = now; } } - + + if ( ts > minValid ) { + break; + } + getDur().commitIfNeeded(); } catch (DBException& e) { @@ -157,7 +164,7 @@ namespace mongo { if( e.getCode() == 11000 || e.getCode() == 11001 ) { continue; } - + // handle cursor not found (just requery) if( e.getCode() == 13127 ) { r.resetCursor(); @@ -290,7 +297,7 @@ namespace mongo { target = 0; } } - + // no server found if (target == 0) { // if there is no one to sync from @@ -298,7 +305,7 @@ namespace mongo { tryToGoLiveAsASecondary(minvalid); return; } - + r.tailingQueryGTE(rsoplog, lastOpTimeWritten); // if target cut connections between connecting and querying (for // example, because it stepped down) we might not have a cursor @@ -408,7 +415,7 @@ namespace mongo { if( !target->hbinfo().hbstate.readable() ) { break; } - + if( myConfig().slaveDelay != sd ) // reconf break; } @@ -429,7 +436,7 @@ namespace mongo { } syncApply(o); - _logOpObjRS(o); // with repl sets we write the ops to our oplog too + _logOpObjRS(o); // with repl sets we write the ops to our oplog too } catch (DBException& e) { sethbmsg(str::stream() << "syncTail: " << e.toString() << ", syncing: " << o); @@ -444,7 +451,7 @@ namespace mongo { // TODO : reuse our connection to the primary. return; } - + if( !target->hbinfo().hbstate.readable() ) { return; } @@ -458,7 +465,7 @@ namespace mongo { sleepsecs(1); return; } - if( sp.state.fatal() || sp.state.startup() ) { + if( _blockSync || sp.state.fatal() || sp.state.startup() ) { sleepsecs(5); return; } @@ -530,6 +537,15 @@ namespace mongo { replLocalAuth(); } + void ReplSetImpl::blockSync(bool block) { + _blockSync = block; + if (_blockSync) { + // syncing is how we get into SECONDARY state, so we'll be stuck in + // RECOVERING until we unblock + changeState(MemberState::RS_RECOVERING); + } + } + void GhostSync::associateSlave(const BSONObj& id, const int memberId) { const OID rid = id["_id"].OID(); rwlock lk( _lock , true ); @@ -556,10 +572,10 @@ namespace mongo { OCCASIONALLY warning() << "couldn't update slave " << rid << " no entry" << rsLog; return; } - + GhostSlave& slave = i->second; if (!slave.init) { - OCCASIONALLY log() << "couldn't update slave " << rid << " not init" << rsLog; + OCCASIONALLY log() << "couldn't update slave " << rid << " not init" << rsLog; return; } diff --git a/dbtests/repltests.cpp b/dbtests/repltests.cpp index 0b53d36..40650eb 100644 --- a/dbtests/repltests.cpp +++ b/dbtests/repltests.cpp @@ -28,6 +28,8 @@ #include "../db/oplog.h" #include "../db/queryoptimizer.h" +#include "../db/repl/rs.h" + namespace mongo { void createOplog(); } @@ -107,12 +109,6 @@ namespace ReplTests { return count; } static void applyAllOperations() { - class Applier : public ReplSource { - public: - static void apply( const BSONObj &op ) { - ReplSource::applyOperation( op ); - } - }; dblock lk; vector< BSONObj > ops; { @@ -122,8 +118,13 @@ namespace ReplTests { } { Client::Context ctx( ns() ); - for( vector< BSONObj >::iterator i = ops.begin(); i != ops.end(); ++i ) - Applier::apply( *i ); + BSONObjBuilder b; + b.append("host", "localhost"); + b.appendTimestamp("syncedTo", 0); + ReplSource a(b.obj()); + for( vector< BSONObj >::iterator i = ops.begin(); i != ops.end(); ++i ) { + a.applyOperation( *i ); + } } } static void printAll( const char *ns ) { @@ -1014,7 +1015,7 @@ namespace ReplTests { ASSERT( !one( BSON( "_id" << 2 ) ).isEmpty() ); } }; - + class DatabaseIgnorerBasic { public: void run() { @@ -1047,10 +1048,10 @@ namespace ReplTests { d.doIgnoreUntilAfter( "a", OpTime( 5, 0 ) ); ASSERT( d.ignoreAt( "a", OpTime( 5, 5 ) ) ); ASSERT( d.ignoreAt( "a", OpTime( 6, 0 ) ) ); - ASSERT( !d.ignoreAt( "a", OpTime( 6, 1 ) ) ); + ASSERT( !d.ignoreAt( "a", OpTime( 6, 1 ) ) ); } }; - + /** * Check against oldest document in the oplog before scanning backward * from the newest document. @@ -1075,7 +1076,7 @@ namespace ReplTests { ASSERT_EQUALS( 0, fsc.cursor()->current()[ "o" ].Obj()[ "_id" ].Int() ); } }; - + /** Check unsuccessful yield recovery with FindingStartCursor */ class FindingStartCursorYield : public Base { public: @@ -1101,7 +1102,26 @@ namespace ReplTests { ASSERT_EXCEPTION( fsc.recoverFromYield(), MsgAssertionException ); } }; - + + /** Check ReplSetConfig::MemberCfg equality */ + class ReplSetMemberCfgEquality : public Base { + public: + void run() { + ReplSetConfig::MemberCfg m1, m2; + assert(m1 == m2); + m1.tags["x"] = "foo"; + assert(m1 != m2); + m2.tags["y"] = "bar"; + assert(m1 != m2); + m1.tags["y"] = "bar"; + assert(m1 != m2); + m2.tags["x"] = "foo"; + assert(m1 == m2); + m1.tags.clear(); + assert(m1 != m2); + } + }; + class All : public Suite { public: All() : Suite( "repl" ) { @@ -1158,6 +1178,7 @@ namespace ReplTests { add< DatabaseIgnorerUpdate >(); add< FindingStartCursorStale >(); add< FindingStartCursorYield >(); + add< ReplSetMemberCfgEquality >(); } } myall; diff --git a/dbtests/spin_lock_test.cpp b/dbtests/spin_lock_test.cpp index dbd637e..ed1f1ae 100644 --- a/dbtests/spin_lock_test.cpp +++ b/dbtests/spin_lock_test.cpp @@ -20,6 +20,7 @@ #include <boost/thread/thread.hpp> #include "dbtests.h" #include "../util/concurrency/spin_lock.h" +#include "../util/timer.h" namespace { @@ -73,8 +74,10 @@ namespace { int counter = 0; const int threads = 64; - const int incs = 10000; + const int incs = 50000; LockTester* testers[threads]; + + Timer timer; for ( int i = 0; i < threads; i++ ) { testers[i] = new LockTester( &spin, &counter ); @@ -87,7 +90,10 @@ namespace { ASSERT_EQUALS( testers[i]->requests(), incs ); delete testers[i]; } - + + int ms = timer.millis(); + log() << "spinlock ConcurrentIncs time: " << ms << endl; + ASSERT_EQUALS( counter, threads*incs ); #if defined(__linux__) ASSERT( SpinLock::isfast() ); diff --git a/distsrc/THIRD-PARTY-NOTICES b/distsrc/THIRD-PARTY-NOTICES index 76b7f1e..6c32a6f 100644 --- a/distsrc/THIRD-PARTY-NOTICES +++ b/distsrc/THIRD-PARTY-NOTICES @@ -188,4 +188,46 @@ freely, subject to the following restrictions: L. Peter Deutsch ghost@aladdin.com +5) License notice for Snappy - http://code.google.com/p/snappy/ +--------------------------------- + Copyright 2005 and onwards Google Inc. + + Redistribution and use in source and binary forms, with or without + modification, are permitted provided that the following conditions are + met: + + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above + copyright notice, this list of conditions and the following disclaimer + in the documentation and/or other materials provided with the + distribution. + * Neither the name of Google Inc. nor the names of its + contributors may be used to endorse or promote products derived from + this software without specific prior written permission. + + THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + + A light-weight compression algorithm. It is designed for speed of + compression and decompression, rather than for the utmost in space + savings. + + For getting better compression ratios when you are compressing data + with long repeated sequences or compressing data that is similar to + other data, while still compressing fast, you might look at first + using BMDiff and then compressing the output of BMDiff with + Snappy. + + + End diff --git a/doxygenConfig b/doxygenConfig index 94eea98..be8ef9e 100644 --- a/doxygenConfig +++ b/doxygenConfig @@ -3,7 +3,7 @@ #--------------------------------------------------------------------------- DOXYFILE_ENCODING = UTF-8 PROJECT_NAME = MongoDB -PROJECT_NUMBER = 2.0.0 +PROJECT_NUMBER = 2.0.2 OUTPUT_DIRECTORY = docs/doxygen CREATE_SUBDIRS = NO OUTPUT_LANGUAGE = English diff --git a/jstests/filemd5.js b/jstests/filemd5.js new file mode 100644 index 0000000..41d03a1 --- /dev/null +++ b/jstests/filemd5.js @@ -0,0 +1,11 @@ +
+db.fs.chunks.drop();
+db.fs.chunks.insert({files_id:1,n:0,data:new BinData(0,"test")})
+
+x = db.runCommand({"filemd5":1,"root":"fs"});
+assert( ! x.ok , tojson(x) )
+
+db.fs.chunks.ensureIndex({files_id:1,n:1})
+x = db.runCommand({"filemd5":1,"root":"fs"});
+assert( x.ok , tojson(x) )
+
diff --git a/jstests/profile1.js b/jstests/profile1.js index 9654357..ba01d59 100644 --- a/jstests/profile1.js +++ b/jstests/profile1.js @@ -55,12 +55,18 @@ try { db.eval("sleep(1)") // pre-load system.js - db.setProfilingLevel(2); - before = db.system.profile.count(); + function resetProfile( level , slowms ) { + db.setProfilingLevel(0); + db.system.profile.drop(); + db.setProfilingLevel(level,slowms); + } + + resetProfile(2); + db.eval( "sleep(25)" ) db.eval( "sleep(120)" ) - after = db.system.profile.count() - assert.eq( before + 3 , after , "X1" ) + + assert.eq( 2 , db.system.profile.find( { "command.$eval" : /^sleep/ } ).count() ); /* sleep() could be inaccurate on certain platforms. let's check */ print("\nsleep 2 time actual:"); @@ -90,24 +96,20 @@ try { return actual >= max ? 1 : 0; } - db.setProfilingLevel(1,100); - before = db.system.profile.count(); + resetProfile(1,100); var delta = 0; delta += evalSleepMoreThan( 15 , 100 ); delta += evalSleepMoreThan( 120 , 100 ); - after = db.system.profile.count() - assert.eq( before + delta , after , "X2 : " + getProfileAString() ) + assert.eq( delta , db.system.profile.find( { "command.$eval" : /^sleep/ } ).count() , "X2 : " + getProfileAString() ) - db.setProfilingLevel(1,20); - before = db.system.profile.count(); + resetProfile(1,20); delta = 0; delta += evalSleepMoreThan( 5 , 20 ); delta += evalSleepMoreThan( 120 , 20 ); - after = db.system.profile.count() - assert.eq( before + delta , after , "X3 : " + getProfileAString() ) + assert.eq( delta , db.system.profile.find( { "command.$eval" : /^sleep/ } ).count() , "X3 : " + getProfileAString() ) - db.profile.drop(); - db.setProfilingLevel(2) + resetProfile(2); + db.profile1.drop(); var q = { _id : 5 }; var u = { $inc : { x : 1 } }; db.profile1.update( q , u ); diff --git a/jstests/replsets/auth1.js b/jstests/replsets/auth1.js index 71ab2d9..40a131a 100644 --- a/jstests/replsets/auth1.js +++ b/jstests/replsets/auth1.js @@ -8,11 +8,11 @@ var path = "jstests/libs/"; print("try starting mongod with auth"); -var m = runMongoProgram( "mongod", "--auth", "--port", port[4], "--dbpath", "/data/db/wrong-auth"); +var pargs = new MongodRunner( port[4], "/data/db/wrong-auth", false, false, + ["--auth"], {no_bind : true} ); +var m = pargs.start(); -assert.throws(function() { - m.getDB("local").auth("__system", ""); -}); +assert.eq(m.getDB("local").auth("__system", ""), 0); stopMongod(port[4]); @@ -25,7 +25,6 @@ run("chmod", "644", path+"key2"); print("try starting mongod"); m = runMongoProgram( "mongod", "--keyFile", path+"key1", "--port", port[0], "--dbpath", "/data/db/" + name); - print("should fail with wrong permissions"); assert.eq(m, 2, "mongod should exit w/ 2: permissions too open"); stopMongod(port[0]); diff --git a/jstests/replsets/auth2.js b/jstests/replsets/auth2.js new file mode 100644 index 0000000..0fe1ae4 --- /dev/null +++ b/jstests/replsets/auth2.js @@ -0,0 +1,103 @@ +var name = "rs_auth2"; +var port = allocatePorts(3); +var path = "jstests/libs/"; + +print("change permissions on #1 & #2"); +run("chmod", "600", path+"key1"); +run("chmod", "600", path+"key2"); + +var setupReplSet = function() { + print("start up rs"); + var rs = new ReplSetTest({"name" : name, "nodes" : 3, "startPort" : port[0]}); + rs.startSet(); + rs.initiate(); + + print("getting master"); + rs.getMaster(); + + print("getting secondaries"); + assert.soon(function() { + var result1 = rs.nodes[1].getDB("admin").runCommand({isMaster: 1}); + var result2 = rs.nodes[2].getDB("admin").runCommand({isMaster: 1}); + return result1.secondary && result2.secondary; + }); + + return rs; +}; + +var checkNoAuth = function() { + print("without an admin user, things should work"); + + master.getDB("foo").bar.insert({x:1}); + var result = master.getDB("admin").runCommand({getLastError:1}); + + printjson(result); + assert.eq(result.err, null); +} + +var checkInvalidAuthStates = function() { + print("check that 0 is in recovering"); + assert.soon(function() { + try { + var result = m.getDB("admin").runCommand({isMaster: 1}); + return !result.ismaster && !result.secondary; + } + catch ( e ) { + print( e ); + } + }); + + print("shut down 1, 0 still in recovering."); + rs.stop(1); + sleep(5); + + assert.soon(function() { + var result = m.getDB("admin").runCommand({isMaster: 1}); + return !result.ismaster && !result.secondary; + }); + + print("shut down 2, 0 becomes a secondary."); + rs.stop(2); + + assert.soon(function() { + var result = m.getDB("admin").runCommand({isMaster: 1}); + return result.secondary; + }); + + rs.restart(1, {"keyFile" : path+"key1"}); + rs.restart(2, {"keyFile" : path+"key1"}); +}; + +var checkValidAuthState = function() { + assert.soon(function() { + var result = m.getDB("admin").runCommand({isMaster : 1}); + return result.secondary; + }); +}; + +var rs = setupReplSet(); +var master = rs.getMaster(); + +print("add an admin user"); +master.getDB("admin").addUser("foo","bar"); +m = rs.nodes[0]; + +print("starting 1 and 2 with key file"); +rs.stop(1); +rs.restart(1, {"keyFile" : path+"key1"}); +rs.stop(2); +rs.restart(2, {"keyFile" : path+"key1"}); + +checkInvalidAuthStates(); + +print("restart mongod with bad keyFile"); + +rs.stop(0); +m = rs.restart(0, {"keyFile" : path+"key2"}); + +checkInvalidAuthStates(); + +rs.stop(0); +m = rs.restart(0, {"keyFile" : path+"key1"}); + +print("0 becomes a secondary"); diff --git a/jstests/replsets/initial_sync3.js b/jstests/replsets/initial_sync3.js index 7f2af94..ef45581 100644 --- a/jstests/replsets/initial_sync3.js +++ b/jstests/replsets/initial_sync3.js @@ -1,11 +1,6 @@ /* test initial sync options * - * {state : 1} - * {state : 2} - * {name : host+":"+port} - * {_id : 2} - * {optime : now} - * {optime : 1970} + * Make sure member can't sync from a member with a different buildIndexes setting. */ load("jstests/replsets/rslib.js"); @@ -14,46 +9,44 @@ var host = getHostName(); var port = allocatePorts(7); print("Start set with three nodes"); -var replTest = new ReplSetTest( {name: name, nodes: 7} ); +var replTest = new ReplSetTest( {name: name, nodes: 3} ); var nodes = replTest.startSet(); replTest.initiate({ _id : name, members : [ - {_id:0, host : host+":"+port[0]}, - {_id:1, host : host+":"+port[1], initialSync : {state : 1}}, - {_id:2, host : host+":"+port[2], initialSync : {state : 2}}, - {_id:3, host : host+":"+port[3], initialSync : {name : host+":"+port[2]}}, - {_id:4, host : host+":"+port[4], initialSync : {_id : 2}}, - {_id:5, host : host+":"+port[5], initialSync : {optime : new Date()}}, - {_id:6, host : host+":"+port[6], initialSync : {optime : new Date(0)}} + {_id:0, host : host+":"+port[0]}, + {_id:1, host : host+":"+port[1]}, + {_id:2, host : host+":"+port[2], priority : 0, buildIndexes : false}, ]}); var master = replTest.getMaster(); print("Initial sync"); master.getDB("foo").bar.baz.insert({x:1}); +replTest.awaitReplication(); -print("Make sure everyone's secondary"); -wait(function() { - var status = master.getDB("admin").runCommand({replSetGetStatus:1}); - occasionally(function() { - printjson(status); - }); +replTest.stop(0); +replTest.stop(1); - if (!status.members) { - return false; - } +print("restart 1, clearing its data directory so it has to resync"); +replTest.start(1); - for (i=0; i<7; i++) { - if (status.members[i].state != 1 && status.members[i].state != 2) { - return false; - } - } - return true; +print("make sure 1 does not become a secondary (because it cannot clone from 2)"); +sleep(10000); +reconnect(nodes[1]); +var result = nodes[1].getDB("admin").runCommand({isMaster : 1}); +assert(!result.ismaster, tojson(result)); +assert(!result.secondary, tojson(result)); - }); +print("bring 0 back up"); +replTest.restart(0); -replTest.awaitReplication(); +print("now 1 should be able to initial sync"); +assert.soon(function() { + var result = nodes[1].getDB("admin").runCommand({isMaster : 1}); + printjson(result); + return result.secondary; +}); replTest.stopSet(); diff --git a/jstests/replsets/reconfig.js b/jstests/replsets/reconfig.js index b7dca03..55ee505 100644 --- a/jstests/replsets/reconfig.js +++ b/jstests/replsets/reconfig.js @@ -64,6 +64,6 @@ result = nodes[0].getDB("admin").runCommand({replSetInitiate : {_id : "testSet2" {_id : 0, tags : ["member0"]} ]}}); -assert(result.errmsg.match(/bad or missing host field/)); +assert(result.errmsg.match(/bad or missing host field/) , "error message doesn't match, got result:" + tojson(result) ); replTest2.stopSet(); diff --git a/jstests/replsets/stepdown.js b/jstests/replsets/stepdown.js index 3a17b0e..67f10f9 100644 --- a/jstests/replsets/stepdown.js +++ b/jstests/replsets/stepdown.js @@ -36,12 +36,15 @@ printjson(result); assert.eq(result.ok, 0); print("\n do stepdown that should work"); +var threw = false; try { master.getDB("admin").runCommand({replSetStepDown: 50, force : true}); } catch (e) { print(e); + threw = true; } +assert(threw); var r2 = master.getDB("admin").runCommand({ismaster : 1}); assert.eq(r2.ismaster, false); diff --git a/jstests/sharding/auth.js b/jstests/sharding/auth.js index 8d8d7d7..c9bf1e1 100644 --- a/jstests/sharding/auth.js +++ b/jstests/sharding/auth.js @@ -146,13 +146,17 @@ for (i=0; i<num; i++) { s.getDB("test").foo.insert({x:i, abc : "defg", date : new Date(), str : "all the talk on the market"}); } -var d1Chunks = s.getDB("config").chunks.count({shard : "d1"}); -var d2Chunks = s.getDB("config").chunks.count({shard : "d2"}); -var totalChunks = s.getDB("config").chunks.count({ns : "test.foo"}); +assert.soon( function(){ -print("chunks: " + d1Chunks+" "+d2Chunks+" "+totalChunks); + var d1Chunks = s.getDB("config").chunks.count({shard : "d1"}); + var d2Chunks = s.getDB("config").chunks.count({shard : "d2"}); + var totalChunks = s.getDB("config").chunks.count({ns : "test.foo"}); -assert(d1Chunks > 0 && d2Chunks > 0 && d1Chunks+d2Chunks == totalChunks); + print("chunks: " + d1Chunks+" "+d2Chunks+" "+totalChunks); + + return d1Chunks > 0 && d2Chunks > 0 && d1Chunks+d2Chunks == totalChunks; + } + ) assert.eq(s.getDB("test").foo.count(), num+1); diff --git a/jstests/sharding/auto1.js b/jstests/sharding/auto1.js index bdd43e9..57b0a00 100644 --- a/jstests/sharding/auto1.js +++ b/jstests/sharding/auto1.js @@ -58,7 +58,8 @@ counts.push( s.config.chunks.count() ); assert( counts[counts.length-1] > counts[0] , "counts 1 : " + tojson( counts ) ) sorted = counts.slice(0) -sorted.sort(); +// Sort doesn't sort numbers correctly by default, resulting in fail +sorted.sort( function(a, b){ return a - b } ) assert.eq( counts , sorted , "counts 2 : " + tojson( counts ) ) print( counts ) diff --git a/jstests/sharding/bouncing_count.js b/jstests/sharding/bouncing_count.js new file mode 100644 index 0000000..d9630a4 --- /dev/null +++ b/jstests/sharding/bouncing_count.js @@ -0,0 +1,49 @@ +// Tests whether new sharding is detected on insert by mongos + +var st = new ShardingTest( name = "test", shards = 10, verbose = 0, mongos = 3 ) + +var mongosA = st.s0 +var mongosB = st.s1 +var mongosC = st.s2 + +var admin = mongosA.getDB("admin") +var config = mongosA.getDB("config") + +var collA = mongosA.getCollection( "foo.bar" ) +var collB = mongosB.getCollection( "" + collA ) +var collC = mongosB.getCollection( "" + collA ) + +admin.runCommand({ enableSharding : "" + collA.getDB() }) +admin.runCommand({ shardCollection : "" + collA, key : { _id : 1 } }) + +var shards = config.shards.find().sort({ _id : 1 }).toArray() + +jsTestLog( "Splitting up the collection..." ) + +// Split up the collection +for( var i = 0; i < shards.length; i++ ){ + printjson( admin.runCommand({ split : "" + collA, middle : { _id : i } }) ) + printjson( admin.runCommand({ moveChunk : "" + collA, find : { _id : i }, to : shards[i]._id }) ) +} + +mongosB.getDB("admin").runCommand({ flushRouterConfig : 1 }) +mongosC.getDB("admin").runCommand({ flushRouterConfig : 1 }) +printjson( collB.count() ) +printjson( collC.count() ) + +// Change up all the versions... +for( var i = 0; i < shards.length; i++ ){ + printjson( admin.runCommand({ moveChunk : "" + collA, find : { _id : i }, to : shards[ (i + 1) % shards.length ]._id }) ) +} + +// Make sure mongos A is up-to-date +mongosA.getDB("admin").runCommand({ flushRouterConfig : 1 }) + +config.printShardingStatus( true ) + +jsTestLog( "Running count!" ) + +printjson( collB.count() ) +printjson( collC.find().toArray() ) + +st.stop()
\ No newline at end of file diff --git a/jstests/sharding/migrateBig.js b/jstests/sharding/migrateBig.js index 917f152..1613f17 100644 --- a/jstests/sharding/migrateBig.js +++ b/jstests/sharding/migrateBig.js @@ -35,8 +35,16 @@ db.printShardingStatus() assert.throws( function(){ s.adminCommand( { movechunk : "test.foo" , find : { x : 50 } , to : s.getOther( s.getServer( "test" ) ).name } ); } , [] , "move should fail" ) -for ( i=0; i<20; i+= 2 ) - s.adminCommand( { split : "test.foo" , middle : { x : i } } ) +for ( i=0; i<20; i+= 2 ) { + try { + s.adminCommand( { split : "test.foo" , middle : { x : i } } ); + } + catch ( e ) { + // we may have auto split on some of these + // which is ok + print(e); + } +} db.printShardingStatus() diff --git a/jstests/sharding/reset_shard_version.js b/jstests/sharding/reset_shard_version.js new file mode 100644 index 0000000..ea4a76c --- /dev/null +++ b/jstests/sharding/reset_shard_version.js @@ -0,0 +1,51 @@ +// Tests whether a reset sharding version triggers errors + +jsTestLog( "Starting sharded cluster..." ) + +var st = new ShardingTest( { shards : 1, mongos : 2 } ) + +var mongosA = st.s0 +var mongosB = st.s1 + +var collA = mongosA.getCollection( jsTestName() + ".coll" ) +collA.drop() +var collB = mongosB.getCollection( "" + collA ) + +st.shardColl( collA, { _id : 1 }, false ) + +jsTestLog( "Inserting data..." ) + +// Insert some data +for ( var i = 0; i < 100; i++ ) { + collA.insert( { _id : i } ) +} + +jsTestLog( "Setting connection versions on both mongoses..." ) + +assert.eq( collA.find().itcount(), 100 ) +assert.eq( collB.find().itcount(), 100 ) + +jsTestLog( "Resetting connection version on shard..." ) + +var admin = st.shard0.getDB( "admin" ) + +printjson( admin.runCommand( { + setShardVersion : "" + collA, version : new Timestamp( 0, 0 ), configdb : st._configDB, serverID : new ObjectId(), + authoritative : true } ) ) + +jsTestLog( "Querying with version reset..." ) + +// This will cause a version check +assert.eq(0, collA.findOne({_id:0})['_id']) + +jsTestLog( "Resetting connection version on shard again..." ) + +printjson( admin.runCommand( { + setShardVersion : "" + collA, version : new Timestamp( 0, 0 ), configdb : st._configDB, serverID : new ObjectId(), + authoritative : true } ) ) + +jsTestLog( "Doing count command with version reset..." ) + +assert.eq(100, collA.count()) // Test for SERVER-4196 + +st.stop()
\ No newline at end of file diff --git a/jstests/sharding/shard_existing.js b/jstests/sharding/shard_existing.js new file mode 100644 index 0000000..315445e --- /dev/null +++ b/jstests/sharding/shard_existing.js @@ -0,0 +1,33 @@ + +s = new ShardingTest( "shard_existing" , 2 /* numShards */, 1 /* verboseLevel */, 1 /* numMongos */, { chunksize : 1 } ) + +db = s.getDB( "test" ) + +stringSize = 10000 + +// we want a lot of data, so lets make a string to cheat :) +bigString = ""; +while ( bigString.length < stringSize ) + bigString += "this is a big string. "; + +dataSize = 20 * 1024 * 1024; + +numToInsert = dataSize / stringSize +print( "numToInsert: " + numToInsert ) + +for ( i=0; i<(dataSize/stringSize); i++ ) { + db.data.insert( { _id : i , s : bigString } ) +} + +db.getLastError(); + +assert.lt( dataSize , db.data.stats().size ) + +s.adminCommand( { enablesharding : "test" } ); +res = s.adminCommand( { shardcollection : "test.data" , key : { _id : 1 } } ); +printjson( res ); + +assert.eq( 40 , s.config.chunks.find().itcount() , "not right number of chunks" ); + + +s.stop(); diff --git a/jstests/sharding/sort1.js b/jstests/sharding/sort1.js index e2b287e..235e5c0 100644 --- a/jstests/sharding/sort1.js +++ b/jstests/sharding/sort1.js @@ -22,11 +22,18 @@ s.adminCommand( { split : "test.data" , middle : { 'sub.num' : 66 } } ) s.adminCommand( { movechunk : "test.data" , find : { 'sub.num' : 50 } , to : s.getOther( s.getServer( "test" ) ).name } ); -assert.eq( 3 , s.config.chunks.find().itcount() , "A1" ); +assert.lte( 3 , s.config.chunks.find().itcount() , "A1" ); temp = s.config.chunks.find().sort( { min : 1 } ).toArray(); -assert.eq( temp[0].shard , temp[2].shard , "A2" ); -assert.neq( temp[0].shard , temp[1].shard , "A3" ); +temp.forEach( printjsononeline ) + +z = 0; +for ( ; z<temp.length; z++ ) + if ( temp[z].min["sub.num"] <= 50 && temp[z].max["sub.num"] > 50 ) + break; + +assert.eq( temp[z-1].shard , temp[z+1].shard , "A2" ); +assert.neq( temp[z-1].shard , temp[z].shard , "A3" ); temp = db.data.find().sort( { 'sub.num' : 1 } ).toArray(); assert.eq( N , temp.length , "B1" ); diff --git a/jstests/sharding/writeback_shard_version.js b/jstests/sharding/writeback_shard_version.js new file mode 100644 index 0000000..34af1f0 --- /dev/null +++ b/jstests/sharding/writeback_shard_version.js @@ -0,0 +1,42 @@ +// Tests whether a newly sharded collection can be handled by the wbl + +jsTestLog( "Starting sharded cluster..." ) + +// Need to start as a replica set here, just because there's no other way to trigger separate configs, +// See SERVER-4222 +var st = new ShardingTest( { shards : 1, mongos : 2, verbose : 2, other : { rs : true } } ) + +st.setBalancer( false ) + +var mongosA = st.s0 +var mongosB = st.s1 + +jsTestLog( "Adding new collections...") + +var collA = mongosA.getCollection( jsTestName() + ".coll" ) +collA.insert({ hello : "world" }) +assert.eq( null, collA.getDB().getLastError() ) + +var collB = mongosB.getCollection( "" + collA ) +collB.insert({ hello : "world" }) +assert.eq( null, collB.getDB().getLastError() ) + +jsTestLog( "Enabling sharding..." ) + +printjson( mongosA.getDB( "admin" ).runCommand({ enableSharding : "" + collA.getDB() }) ) +printjson( mongosA.getDB( "admin" ).runCommand({ shardCollection : "" + collA, key : { _id : 1 } }) ) + +// MongoD doesn't know about the config shard version *until* MongoS tells it +collA.findOne() + +jsTestLog( "Trigger wbl..." ) + +collB.insert({ goodbye : "world" }) +assert.eq( null, collB.getDB().getLastError() ) + +print( "Inserted..." ) + +assert.eq( 3, collA.find().itcount() ) +assert.eq( 3, collB.find().itcount() ) + +st.stop()
\ No newline at end of file diff --git a/jstests/slowNightly/large_chunk.js b/jstests/slowNightly/large_chunk.js index 6cf40e3..1f0b2e6 100644 --- a/jstests/slowNightly/large_chunk.js +++ b/jstests/slowNightly/large_chunk.js @@ -13,10 +13,6 @@ db = s.getDB( "test" ); // Step 1 - Test moving a large chunk // -// Turn on sharding on the 'test.foo' collection and generate a large chunk -s.adminCommand( { enablesharding : "test" } ); -s.adminCommand( { shardcollection : "test.foo" , key : { _id : 1 } } ); - bigString = "" while ( bigString.length < 10000 ) bigString += "asdasdasdasdadasdasdasdasdasdasdasdasda"; @@ -28,6 +24,11 @@ while ( inserted < ( 400 * 1024 * 1024 ) ){ inserted += bigString.length; } db.getLastError(); + +// Turn on sharding on the 'test.foo' collection and generate a large chunk +s.adminCommand( { enablesharding : "test" } ); +s.adminCommand( { shardcollection : "test.foo" , key : { _id : 1 } } ); + assert.eq( 1 , s.config.chunks.count() , "step 1 - need one large chunk" ); primary = s.getServer( "test" ).getDB( "test" ); @@ -48,4 +49,4 @@ assert.neq( before[0].shard , after[0].shard , "move chunk did not work" ); s.config.changelog.find().forEach( printjson ) -s.stop();
\ No newline at end of file +s.stop(); diff --git a/jstests/slowNightly/replica_set_shard_version.js b/jstests/slowNightly/replica_set_shard_version.js new file mode 100644 index 0000000..6221f4c --- /dev/null +++ b/jstests/slowNightly/replica_set_shard_version.js @@ -0,0 +1,83 @@ +// Tests whether a Replica Set in a mongos cluster can cause versioning problems + +jsTestLog( "Starting sharded cluster..." ) + +var st = new ShardingTest( { shards : 1, mongos : 2, other : { rs : true } } ) + +var mongosA = st.s0 +var mongosB = st.s1 +var rs = st._rs[0].test +var shard = st.shard0 + +var sadmin = shard.getDB( "admin" ) + +jsTestLog( "Stepping down replica set member..." ) + +try{ + sadmin.runCommand({ replSetStepDown : 3000, force : true }) +} +catch( e ){ + // stepdown errors out our conn to the shard + printjson( e ) +} + +jsTestLog( "Reconnecting..." ) + +sadmin = new Mongo( st.shard0.host ).getDB("admin") + +assert.soon( + function(){ + var res = sadmin.runCommand( "replSetGetStatus" ); + for ( var i=0; i<res.members.length; i++ ) { + if ( res.members[i].state == 1 ) + return true; + } + return false; + } +); + +jsTestLog( "New primary elected..." ) + +coll = mongosA.getCollection( jsTestName() + ".coll" ); + +start = new Date(); + +ReplSetTest.awaitRSClientHosts( coll.getMongo(), rs.getPrimary(), { ismaster : true }, rs ) + +try{ + coll.findOne() +} +catch( e ){ + printjson( e ) + assert( false ) +} + +end = new Date(); + +print( "time to work for primary: " + ( ( end.getTime() - start.getTime() ) / 1000 ) + " seconds" ); + +jsTestLog( "Found data from collection..." ) + +// now check secondary + +try{ + sadmin.runCommand({ replSetStepDown : 3000, force : true }) +} +catch( e ){ + // expected, since all conns closed + printjson( e ) +} + +sadmin = new Mongo( st.shard0.host ).getDB("admin") + +jsTestLog( "Stepped down secondary..." ) + +other = new Mongo( mongosA.host ); +other.setSlaveOk( true ); +other = other.getCollection( jsTestName() + ".coll" ); + +print( "eliot: " + tojson( other.findOne() ) ); + + + +st.stop() diff --git a/jstests/slowNightly/sharding_migrateBigObject.js b/jstests/slowNightly/sharding_migrateBigObject.js index 5ad9ed1..8bf6713 100644 --- a/jstests/slowNightly/sharding_migrateBigObject.js +++ b/jstests/slowNightly/sharding_migrateBigObject.js @@ -51,7 +51,7 @@ assert.soon( return res.length > 1 && Math.abs( res[0].nChunks - res[1].nChunks ) <= 3; } , - "never migrated" , 180000 , 1000 ); + "never migrated" , 9 * 60 * 1000 , 1000 ); stopMongod( 30000 ); stopMongod( 29999 ); diff --git a/jstests/slowNightly/sharding_rs2.js b/jstests/slowNightly/sharding_rs2.js index 4de935b..162a9c6 100644 --- a/jstests/slowNightly/sharding_rs2.js +++ b/jstests/slowNightly/sharding_rs2.js @@ -71,6 +71,11 @@ for ( i=0; i<10; i++ ) assert.eq( 17 , ts.findOne().x , "B1" ) m.setSlaveOk() + +// Confusingly, v2.0 mongos does not actually update the secondary status of any members until after the first +// ReplicaSetMonitorWatcher round. Wait for that here. +ReplSetTest.awaitRSClientHosts( m, rs.test.getSecondaries()[0], { secondary : true } ) + for ( i=0; i<10; i++ ) assert.eq( 17 , ts.findOne().x , "B2" ) diff --git a/jstests/tool/dumpsecondary.js b/jstests/tool/dumpsecondary.js new file mode 100644 index 0000000..4edb3f1 --- /dev/null +++ b/jstests/tool/dumpsecondary.js @@ -0,0 +1,32 @@ +var replTest = new ReplSetTest( {name: 'testSet', nodes: 2} ); + +var nodes = replTest.startSet(); +replTest.initiate(); + +var master = replTest.getMaster(); +db = master.getDB("foo") +db.foo.save({a: 1000}); +replTest.awaitReplication(); +replTest.awaitSecondaryNodes(); + +assert.eq( 1 , db.foo.count() , "setup" ); + +var slaves = replTest.liveNodes.slaves; +assert( slaves.length == 1, "Expected 1 slave but length was " + slaves.length ); +slave = slaves[0]; + +runMongoProgram.apply(null, ['mongodump', '-h', slave.host, '--out', '/data/db/jstests_tool_dumpsecondary_external/']) + +db.foo.drop() + +assert.eq( 0 , db.foo.count() , "after drop" ); + +runMongoProgram.apply(null, ['mongorestore', '-h', master.host, '/data/db/jstests_tool_dumpsecondary_external/']) + +assert.soon( "db.foo.findOne()" , "no data after sleep" ); +assert.eq( 1 , db.foo.count() , "after restore" ); +assert.eq( 1000 , db.foo.findOne().a , "after restore 2" ); + +resetDbpath('/data/db/jstests_tool_dumpsecondary_external') + +replTest.stopSet(15)
\ No newline at end of file diff --git a/jstests/tool/exportimport3.js b/jstests/tool/exportimport3.js new file mode 100644 index 0000000..f18ba6c --- /dev/null +++ b/jstests/tool/exportimport3.js @@ -0,0 +1,27 @@ +// exportimport3.js + +t = new ToolTest( "exportimport3" ); + +c = t.startDB( "foo" ); +assert.eq( 0 , c.count() , "setup1" ); +c.save({a:1}) +c.save({a:2}) +c.save({a:3}) +c.save({a:4}) +c.save({a:5}) + +assert.eq( 5 , c.count() , "setup2" ); + + +t.runTool( "export" , "--jsonArray" , "--out" , t.extFile , "-d" , t.baseName , "-c" , "foo" ); + +c.drop(); +assert.eq( 0 , c.count() , "after drop" , "-d" , t.baseName , "-c" , "foo" );; + +t.runTool( "import" , "--jsonArray" , "--file" , t.extFile , "-d" , t.baseName , "-c" , "foo" ); + +assert.soon( "c.findOne()" , "no data after sleep" ); +assert.eq( 5 , c.count() , "after restore 2" ); + + +t.stop(); diff --git a/rpm/mongo.spec b/rpm/mongo.spec index 3506882..38e2bb6 100644 --- a/rpm/mongo.spec +++ b/rpm/mongo.spec @@ -1,5 +1,5 @@ Name: mongo -Version: 2.0.0 +Version: 2.0.2 Release: mongodb_1%{?dist} Summary: mongo client shell and tools License: AGPL 3.0 diff --git a/s/balancer_policy.cpp b/s/balancer_policy.cpp index f1b4bf1..bd1763d 100644 --- a/s/balancer_policy.cpp +++ b/s/balancer_policy.cpp @@ -92,7 +92,7 @@ namespace mongo { } if ( maxOpsQueued ) { - log() << "biggest shard has unprocessed writebacks, waiting for completion of migrate" << endl; + log() << "biggest shard " << max.first << " has unprocessed writebacks, waiting for completion of migrate" << endl; return NULL; } diff --git a/s/chunk.cpp b/s/chunk.cpp index 09dc994..066265e 100644 --- a/s/chunk.cpp +++ b/s/chunk.cpp @@ -208,7 +208,7 @@ namespace mongo { // no split points means there isn't enough data to split on // 1 split point means we have between half the chunk size to full chunk size // so we shouldn't split - LOG(1) << "chunk not full enough to trigger auto-split" << endl; + LOG(1) << "chunk not full enough to trigger auto-split " << ( candidates.size() == 0 ? "no split entry" : candidates[0].toString() ) << endl; return BSONObj(); } @@ -342,6 +342,12 @@ namespace mongo { if ( _dataWritten < splitThreshold / 5 ) return false; + + if ( ! getManager()->_splitTickets.tryAcquire() ) { + LOG(1) << "won't auto split becaue not enough tickets: " << getManager()->getns() << endl; + return false; + } + TicketHolderReleaser releaser( &getManager()->_splitTickets ); // this is a bit ugly // we need it so that mongos blocks for the writes to actually be committed @@ -352,8 +358,6 @@ namespace mongo { LOG(1) << "about to initiate autosplit: " << *this << " dataWritten: " << _dataWritten << " splitThreshold: " << splitThreshold << endl; - _dataWritten = 0; // reset so we check often enough - BSONObj res; BSONObj splitPoint = singleSplit( false /* does not force a split if not enough data */ , res ); if ( splitPoint.isEmpty() ) { @@ -361,6 +365,15 @@ namespace mongo { _dataWritten = 0; // this means there wasn't enough data to split, so don't want to try again until considerable more data return false; } + + if ( maxIsInf() || minIsInf() ) { + // we don't want to reset _dataWritten since we kind of want to check the other side right away + } + else { + _dataWritten = 0; // we're splitting, so should wait a bit + } + + log() << "autosplitted " << _manager->getns() << " shard: " << toString() << " on: " << splitPoint << "(splitThreshold " << splitThreshold << ")" @@ -497,7 +510,9 @@ namespace mongo { // The shard versioning mechanism hinges on keeping track of the number of times we reloaded ChunkManager's. // Increasing this number here will prompt checkShardVersion() to refresh the connection-level versions to // the most up to date value. - _sequenceNumber(++NextSequenceNumber) + _sequenceNumber(++NextSequenceNumber), + + _splitTickets( 5 ) { int tries = 3; @@ -611,44 +626,65 @@ namespace mongo { return _key.hasShardKey( obj ); } - void ChunkManager::createFirstChunk( const Shard& shard ) const { + void ChunkManager::createFirstChunks( const Shard& shard ) const { // TODO distlock? assert( _chunkMap.size() == 0 ); - Chunk c (this, _key.globalMin(), _key.globalMax(), shard); + unsigned long long numObjects = 0; + { + // get stats to see if there is any data + ScopedDbConnection shardConn( shard.getConnString() ); + numObjects = shardConn->count( getns() ); + shardConn.done(); + } // this is the first chunk; start the versioning from scratch ShardChunkVersion version; version.incMajor(); - // build update for the chunk collection - BSONObjBuilder chunkBuilder; - c.serialize( chunkBuilder , version ); - BSONObj chunkCmd = chunkBuilder.obj(); + Chunk c(this, _key.globalMin(), _key.globalMax(), shard); - log() << "about to create first chunk for: " << _ns << endl; + vector<BSONObj> splitPoints; + if ( numObjects > 0 ) + c.pickSplitVector( splitPoints , Chunk::MaxChunkSize ); + + log() << "going to create " << splitPoints.size() + 1 << " chunk(s) for: " << _ns << endl; + - ScopedDbConnection conn( configServer.modelServer() ); - BSONObj res; - conn->update( Chunk::chunkMetadataNS, QUERY( "_id" << c.genID() ), chunkCmd, true, false ); + ScopedDbConnection conn( configServer.modelServer() ); + + for ( unsigned i=0; i<=splitPoints.size(); i++ ) { + BSONObj min = i == 0 ? _key.globalMin() : splitPoints[i-1]; + BSONObj max = i < splitPoints.size() ? splitPoints[i] : _key.globalMax(); + + Chunk temp( this , min , max , shard ); + + BSONObjBuilder chunkBuilder; + temp.serialize( chunkBuilder , version ); + BSONObj chunkObj = chunkBuilder.obj(); + + conn->update( Chunk::chunkMetadataNS, QUERY( "_id" << temp.genID() ), chunkObj, true, false ); + + version.incMinor(); + } string errmsg = conn->getLastError(); if ( errmsg.size() ) { - stringstream ss; - ss << "saving first chunk failed. cmd: " << chunkCmd << " result: " << errmsg; - log( LL_ERROR ) << ss.str() << endl; - msgasserted( 13592 , ss.str() ); + string ss = str::stream() << "creating first chunks failed. result: " << errmsg; + error() << ss << endl; + msgasserted( 15903 , ss ); } - + conn.done(); - // the ensure index will have the (desired) indirect effect of creating the collection on the - // assigned shard, as it sets up the index over the sharding keys. - ScopedDbConnection shardConn( c.getShard().getConnString() ); - shardConn->ensureIndex( getns() , getShardKey().key() , _unique , "" , false /* do not cache ensureIndex SERVER-1691 */ ); - shardConn.done(); + if ( numObjects == 0 ) { + // the ensure index will have the (desired) indirect effect of creating the collection on the + // assigned shard, as it sets up the index over the sharding keys. + ScopedDbConnection shardConn( c.getShard().getConnString() ); + shardConn->ensureIndex( getns() , getShardKey().key() , _unique , "" , false ); // do not cache ensureIndex SERVER-1691 + shardConn.done(); + } - log() << "successfully created first chunk for " << c.toString() << endl; } ChunkPtr ChunkManager::findChunk( const BSONObj & obj ) const { @@ -836,26 +872,6 @@ namespace mongo { configServer.logChange( "dropCollection" , _ns , BSONObj() ); } - void ChunkManager::maybeChunkCollection() const { - uassert( 13346 , "can't pre-split already splitted collection" , (_chunkMap.size() == 1) ); - - ChunkPtr soleChunk = _chunkMap.begin()->second; - vector<BSONObj> splitPoints; - soleChunk->pickSplitVector( splitPoints , Chunk::MaxChunkSize ); - if ( splitPoints.empty() ) { - LOG(1) << "not enough data to warrant chunking " << getns() << endl; - return; - } - - BSONObj res; - ChunkPtr p; - bool worked = soleChunk->multiSplit( splitPoints , res ); - if (!worked) { - log( LL_WARNING ) << "could not split '" << getns() << "': " << res << endl; - return; - } - } - ShardChunkVersion ChunkManager::getVersion( const Shard& shard ) const { ShardVersionMap::const_iterator i = _shardVersions.find( shard ); if ( i == _shardVersions.end() ) @@ -956,7 +972,13 @@ namespace mongo { int nc = numChunks(); - if ( nc < 10 ) { + if ( nc <= 1 ) { + return 1024; + } + else if ( nc < 3 ) { + return minChunkSize / 2; + } + else if ( nc < 10 ) { splitThreshold = max( splitThreshold / 4 , minChunkSize ); } else if ( nc < 20 ) { @@ -292,15 +292,13 @@ namespace mongo { int numChunks() const { return _chunkMap.size(); } bool hasShardKey( const BSONObj& obj ) const; - void createFirstChunk( const Shard& shard ) const; // only call from DBConfig::shardCollection + void createFirstChunks( const Shard& shard ) const; // only call from DBConfig::shardCollection ChunkPtr findChunk( const BSONObj& obj ) const; ChunkPtr findChunkOnServer( const Shard& shard ) const; const ShardKeyPattern& getShardKey() const { return _key; } bool isUnique() const { return _unique; } - void maybeChunkCollection() const; - void getShardsForQuery( set<Shard>& shards , const BSONObj& query ) const; void getAllShards( set<Shard>& all ) const; void getShardsForRange(set<Shard>& shards, const BSONObj& min, const BSONObj& max) const; // [min, max) @@ -355,6 +353,8 @@ namespace mongo { const unsigned long long _sequenceNumber; + mutable TicketHolder _splitTickets; // number of concurrent splitVector we can do from a splitIfShould per collection + friend class Chunk; friend class ChunkRangeManager; // only needed for CRM::assertValid() static AtomicUInt NextSequenceNumber; diff --git a/s/client.cpp b/s/client.cpp index 0da05b6..9058b31 100644 --- a/s/client.cpp +++ b/s/client.cpp @@ -141,7 +141,7 @@ namespace mongo { if ( shards->size() == 1 ) { string theShard = *(shards->begin() ); - ShardConnection conn( theShard , "", true ); + ShardConnection conn( theShard , "" ); BSONObj res; bool ok = false; @@ -150,7 +150,7 @@ namespace mongo { } catch( std::exception &e ){ - warning() << "could not get last error." << causedBy( e ) << endl; + warning() << "could not get last error from shard " << theShard << causedBy( e ) << endl; // Catch everything that happens here, since we need to ensure we return our connection when we're // finished. @@ -172,7 +172,14 @@ namespace mongo { continue; ShardConnection conn( temp , "" ); - _addWriteBack( writebacks , conn->getLastErrorDetailed() ); + + try { + _addWriteBack( writebacks , conn->getLastErrorDetailed() ); + } + catch( std::exception &e ){ + warning() << "could not clear last error from shard " << temp << causedBy( e ) << endl; + } + conn.done(); } clearSinceLastGetError(); @@ -183,7 +190,13 @@ namespace mongo { // ok } else { - assert( v.size() == 1 ); + // this will usually be 1 + // it can be greater than 1 if a write to a different shard + // than the last write op had a writeback + // all we're going to report is the first + // since that's the current write + // but we block for all + assert( v.size() >= 1 ); result.appendElements( v[0] ); result.appendElementsUnique( res ); result.append( "writebackGLE" , v[0] ); @@ -211,7 +224,7 @@ namespace mongo { for ( set<string>::iterator i = shards->begin(); i != shards->end(); i++ ) { string theShard = *i; bbb.append( theShard ); - ShardConnection conn( theShard , "", true ); + ShardConnection conn( theShard , "" ); BSONObj res; bool ok = false; try { @@ -223,7 +236,7 @@ namespace mongo { // Safe to return here, since we haven't started any extra processing yet, just collecting // responses. - warning() << "could not get last error." << causedBy( e ) << endl; + warning() << "could not get last error from a shard " << theShard << causedBy( e ) << endl; conn.done(); return false; @@ -262,7 +275,12 @@ namespace mongo { continue; ShardConnection conn( temp , "" ); - _addWriteBack( writebacks, conn->getLastErrorDetailed() ); + try { + _addWriteBack( writebacks, conn->getLastErrorDetailed() ); + } + catch( std::exception &e ){ + warning() << "could not clear last error from a shard " << temp << causedBy( e ) << endl; + } conn.done(); } clearSinceLastGetError(); diff --git a/s/commands_public.cpp b/s/commands_public.cpp index ef7110c..23dd7fe 100644 --- a/s/commands_public.cpp +++ b/s/commands_public.cpp @@ -432,9 +432,17 @@ namespace mongo { long long total = 0; map<string,long long> shardCounts; + int numTries = 0; + bool hadToBreak = false; ChunkManagerPtr cm = conf->getChunkManagerIfExists( fullns ); - while ( true ) { + while ( numTries < 5 ) { + numTries++; + + // This all should eventually be replaced by new pcursor framework, but for now match query + // retry behavior manually + if( numTries >= 2 ) sleepsecs( numTries - 1 ); + if ( ! cm ) { // probably unsharded now return run( dbName , cmdObj , options , errmsg , result, false ); @@ -444,17 +452,20 @@ namespace mongo { cm->getShardsForQuery( shards , filter ); assert( shards.size() ); - bool hadToBreak = false; + hadToBreak = false; for (set<Shard>::iterator it=shards.begin(), end=shards.end(); it != end; ++it) { ShardConnection conn(*it, fullns); - if ( conn.setVersion() ) { - total = 0; - shardCounts.clear(); - cm = conf->getChunkManagerIfExists( fullns ); - conn.done(); - hadToBreak = true; - break; + if ( conn.setVersion() ){ + ChunkManagerPtr newCM = conf->getChunkManagerIfExists( fullns ); + if( newCM->getVersion() != cm->getVersion() ){ + cm = newCM; + total = 0; + shardCounts.clear(); + conn.done(); + hadToBreak = true; + break; + } } BSONObj temp; @@ -472,7 +483,7 @@ namespace mongo { // my version is old total = 0; shardCounts.clear(); - cm = conf->getChunkManagerIfExists( fullns , true ); + cm = conf->getChunkManagerIfExists( fullns , true, numTries > 2 ); // Force reload on third attempt hadToBreak = true; break; } @@ -485,6 +496,10 @@ namespace mongo { if ( ! hadToBreak ) break; } + if (hadToBreak) { + errmsg = "Tried 5 times without success to get count for " + fullns + " from all shards"; + return false; + } total = applySkipLimit( total , cmdObj ); result.appendNumber( "n" , total ); diff --git a/s/config.cpp b/s/config.cpp index 23475eb..645e923 100644 --- a/s/config.cpp +++ b/s/config.cpp @@ -143,25 +143,14 @@ namespace mongo { log() << "enable sharding on: " << ns << " with shard key: " << fieldsAndOrder << endl; - // From this point on, 'ns' is going to be treated as a sharded collection. We assume this is the first - // time it is seen by the sharded system and thus create the first chunk for the collection. All the remaining - // chunks will be created as a by-product of splitting. ci.shard( ns , fieldsAndOrder , unique ); ChunkManagerPtr cm = ci.getCM(); uassert( 13449 , "collections already sharded" , (cm->numChunks() == 0) ); - cm->createFirstChunk( getPrimary() ); + cm->createFirstChunks( getPrimary() ); _save(); } - try { - getChunkManager(ns, true)->maybeChunkCollection(); - } - catch ( UserException& e ) { - // failure to chunk is not critical enough to abort the command (and undo the _save()'d configDB state) - log() << "couldn't chunk recently created collection: " << ns << " " << e << endl; - } - - return getChunkManager(ns); + return getChunkManager(ns,true,true); } bool DBConfig::removeSharding( const string& ns ) { @@ -185,9 +174,9 @@ namespace mongo { return true; } - ChunkManagerPtr DBConfig::getChunkManagerIfExists( const string& ns, bool shouldReload ){ + ChunkManagerPtr DBConfig::getChunkManagerIfExists( const string& ns, bool shouldReload, bool forceReload ){ try{ - return getChunkManager( ns, shouldReload ); + return getChunkManager( ns, shouldReload, forceReload ); } catch( AssertionException& e ){ warning() << "chunk manager not found for " << ns << causedBy( e ) << endl; @@ -195,7 +184,7 @@ namespace mongo { } } - ChunkManagerPtr DBConfig::getChunkManager( const string& ns , bool shouldReload ) { + ChunkManagerPtr DBConfig::getChunkManager( const string& ns , bool shouldReload, bool forceReload ) { BSONObj key; bool unique; ShardChunkVersion oldVersion; @@ -205,7 +194,7 @@ namespace mongo { CollectionInfo& ci = _collections[ns]; - bool earlyReload = ! ci.isSharded() && shouldReload; + bool earlyReload = ! ci.isSharded() && ( shouldReload || forceReload ); if ( earlyReload ) { // this is to catch cases where there this is a new sharded collection _reload(); @@ -214,7 +203,7 @@ namespace mongo { massert( 10181 , (string)"not sharded:" + ns , ci.isSharded() ); assert( ! ci.key().isEmpty() ); - if ( ! shouldReload || earlyReload ) + if ( ! ( shouldReload || forceReload ) || earlyReload ) return ci.getCM(); key = ci.key().copy(); @@ -225,10 +214,11 @@ namespace mongo { assert( ! key.isEmpty() ); - if ( oldVersion > 0 ) { + BSONObj newest; + if ( oldVersion > 0 && ! forceReload ) { ScopedDbConnection conn( configServer.modelServer() , 30.0 ); - BSONObj newest = conn->findOne( ShardNS::chunk , - Query( BSON( "ns" << ns ) ).sort( "lastmod" , -1 ) ); + newest = conn->findOne( ShardNS::chunk , + Query( BSON( "ns" << ns ) ).sort( "lastmod" , -1 ) ); conn.done(); if ( ! newest.isEmpty() ) { @@ -240,16 +230,41 @@ namespace mongo { return ci.getCM(); } } - + + } + else if( oldVersion == 0 ){ + warning() << "version 0 found when " << ( forceReload ? "reloading" : "checking" ) << " chunk manager" + << ", collection '" << ns << "' initially detected as sharded" << endl; } // we are not locked now, and want to load a new ChunkManager - auto_ptr<ChunkManager> temp( new ChunkManager( ns , key , unique ) ); - if ( temp->numChunks() == 0 ) { - // maybe we're not sharded any more - reload(); // this is a full reload - return getChunkManager( ns , false ); + auto_ptr<ChunkManager> temp; + + { + scoped_lock lll ( _hitConfigServerLock ); + + if ( ! newest.isEmpty() && ! forceReload ) { + // if we have a target we're going for + // see if we've hit already + + scoped_lock lk( _lock ); + CollectionInfo& ci = _collections[ns]; + if ( ci.isSharded() && ci.getCM() ) { + ShardChunkVersion currentVersion = newest["lastmod"]; + if ( currentVersion == ci.getCM()->getVersion() ) { + return ci.getCM(); + } + } + + } + + temp.reset( new ChunkManager( ns , key , unique ) ); + if ( temp->numChunks() == 0 ) { + // maybe we're not sharded any more + reload(); // this is a full reload + return getChunkManager( ns , false ); + } } scoped_lock lk( _lock ); @@ -257,8 +272,15 @@ namespace mongo { CollectionInfo& ci = _collections[ns]; massert( 14822 , (string)"state changed in the middle: " + ns , ci.isSharded() ); - if ( temp->getVersion() > ci.getCM()->getVersion() ) { - // we only want to reset if we're newer + bool forced = false; + if ( temp->getVersion() > ci.getCM()->getVersion() || + (forced = (temp->getVersion() == ci.getCM()->getVersion() && forceReload ) ) ) { + + if( forced ){ + warning() << "chunk manager reload forced for collection '" << ns << "', config version is " << temp->getVersion() << endl; + } + + // we only want to reset if we're newer or equal and forced // otherwise we go into a bad cycle ci.resetCM( temp.release() ); } @@ -115,7 +115,8 @@ namespace mongo { : _name( name ) , _primary("config","") , _shardingEnabled(false), - _lock("DBConfig") { + _lock("DBConfig") , + _hitConfigServerLock( "DBConfig::_hitConfigServerLock" ) { assert( name.size() ); } virtual ~DBConfig() {} @@ -142,8 +143,8 @@ namespace mongo { */ bool isSharded( const string& ns ); - ChunkManagerPtr getChunkManager( const string& ns , bool reload = false ); - ChunkManagerPtr getChunkManagerIfExists( const string& ns , bool reload = false ); + ChunkManagerPtr getChunkManager( const string& ns , bool reload = false, bool forceReload = false ); + ChunkManagerPtr getChunkManagerIfExists( const string& ns , bool reload = false, bool forceReload = false ); /** * @return the correct for shard for the ns @@ -195,6 +196,7 @@ namespace mongo { Collections _collections; mutable mongo::mutex _lock; // TODO: change to r/w lock ?? + mutable mongo::mutex _hitConfigServerLock; }; class ConfigServer : public DBConfig { diff --git a/s/d_logic.cpp b/s/d_logic.cpp index 9d4fd74..fd281ac 100644 --- a/s/d_logic.cpp +++ b/s/d_logic.cpp @@ -107,7 +107,10 @@ namespace mongo { b.append( "connectionId" , cc().getConnectionId() ); b.append( "instanceIdent" , prettyHostName() ); b.appendTimestamp( "version" , shardingState.getVersion( ns ) ); - b.appendTimestamp( "yourVersion" , ShardedConnectionInfo::get( true )->getVersion( ns ) ); + + ShardedConnectionInfo* info = ShardedConnectionInfo::get( false ); + b.appendTimestamp( "yourVersion" , info ? info->getVersion(ns) : (ConfigVersion)0 ); + b.appendBinData( "msg" , m.header()->len , bdtCustom , (char*)(m.singleData()) ); LOG(2) << "writing back msg with len: " << m.header()->len << " op: " << m.operation() << endl; writeBackManager.queueWriteBack( clientID.str() , b.obj() ); diff --git a/s/d_state.cpp b/s/d_state.cpp index f43865b..929d2a8 100644 --- a/s/d_state.cpp +++ b/s/d_state.cpp @@ -29,7 +29,7 @@ #include "../db/commands.h" #include "../db/jsobj.h" #include "../db/db.h" - +#include "../db/replutil.h" #include "../client/connpool.h" #include "../util/queue.h" @@ -318,6 +318,7 @@ namespace mongo { if (!done) { LOG(1) << "adding sharding hook" << endl; pool.addHook(new ShardingConnectionHook(false)); + shardConnectionPool.addHook(new ShardingConnectionHook(true)); done = true; } } @@ -395,6 +396,7 @@ namespace mongo { help << " example: { setShardVersion : 'alleyinsider.foo' , version : 1 , configdb : '' } "; } + virtual bool slaveOk() const { return true; } virtual LockType locktype() const { return NONE; } bool checkConfigOrInit( const string& configdb , bool authoritative , string& errmsg , BSONObjBuilder& result , bool locked=false ) const { @@ -430,8 +432,11 @@ namespace mongo { return checkConfigOrInit( configdb , authoritative , errmsg , result , true ); } - bool checkMongosID( ShardedConnectionInfo* info, const BSONElement& id, string errmsg ) { + bool checkMongosID( ShardedConnectionInfo* info, const BSONElement& id, string& errmsg ) { if ( id.type() != jstOID ) { + if ( ! info->hasID() ) { + warning() << "bad serverID set in setShardVersion and none in info: " << id << endl; + } // TODO: fix this //errmsg = "need serverID to be an OID"; //return 0; @@ -465,6 +470,10 @@ namespace mongo { lastError.disableForCommand(); ShardedConnectionInfo* info = ShardedConnectionInfo::get( true ); + // make sure we have the mongos id for writebacks + if ( ! checkMongosID( info , cmdObj["serverID"] , errmsg ) ) + return false; + bool authoritative = cmdObj.getBoolField( "authoritative" ); // check config server is ok or enable sharding @@ -477,9 +486,19 @@ namespace mongo { shardingState.gotShardHost( cmdObj["shardHost"].String() ); } - // make sure we have the mongos id for writebacks - if ( ! checkMongosID( info , cmdObj["serverID"] , errmsg ) ) + + // Handle initial shard connection + if( cmdObj["version"].eoo() && cmdObj["init"].trueValue() ){ + result.append( "initialized", true ); + return true; + } + + // we can run on a slave up to here + if ( ! isMaster( "admin" ) ) { + result.append( "errmsg" , "not master" ); + result.append( "note" , "from post init in setShardVersion" ); return false; + } // step 2 @@ -657,6 +676,11 @@ namespace mongo { if ( ! shardingState.enabled() ) return true; + if ( ! isMasterNs( ns.c_str() ) ) { + // right now connections to secondaries aren't versioned at all + return true; + } + ShardedConnectionInfo* info = ShardedConnectionInfo::get( false ); if ( ! info ) { diff --git a/s/request.cpp b/s/request.cpp index 36488cb..98740ae 100644 --- a/s/request.cpp +++ b/s/request.cpp @@ -58,7 +58,7 @@ namespace mongo { reset(); } - void Request::reset( bool reload ) { + void Request::reset( bool reload, bool forceReload ) { if ( _m.operation() == dbKillCursors ) { return; } @@ -70,7 +70,7 @@ namespace mongo { _config = grid.getDBConfig( nsStr ); if ( reload ) { if ( _config->isSharded( nsStr ) ) - _config->getChunkManager( nsStr , true ); + _config->getChunkManager( nsStr , true, forceReload ); else _config->reload(); } @@ -137,7 +137,7 @@ namespace mongo { ShardConnection::checkMyConnectionVersions( getns() ); if (!staleConfig.justConnection() ) sleepsecs( attempt ); - reset( ! staleConfig.justConnection() ); + reset( ! staleConfig.justConnection(), attempt >= 2 ); _d.markReset(); process( attempt + 1 ); return; diff --git a/s/request.h b/s/request.h index 86a484e..d60b95d 100644 --- a/s/request.h +++ b/s/request.h @@ -91,7 +91,7 @@ namespace mongo { void init(); - void reset( bool reload=false ); + void reset( bool reload=false, bool forceReload = false ); private: Message& _m; diff --git a/s/shard.cpp b/s/shard.cpp index 75326e0..7d06a31 100644 --- a/s/shard.cpp +++ b/s/shard.cpp @@ -365,12 +365,26 @@ namespace mongo { conn->auth("local", internalSecurity.user, internalSecurity.pwd, err, false)); } - if ( _shardedConnections ) { - conn->simpleCommand( "admin" , 0 , "setShardVersion" ); + if ( _shardedConnections && isVersionableCB( conn ) ) { + + // We must initialize sharding on all connections, so that we get exceptions if sharding is enabled on + // the collection. + BSONObj result; + bool ok = initShardVersionCB( *conn, result ); + + // assert that we actually successfully setup sharding + uassert( 15907, str::stream() << "could not initialize sharding on connection " << (*conn).toString() << + ( result["errmsg"].type() == String ? causedBy( result["errmsg"].String() ) : + causedBy( (string)"unknown failure : " + result.toString() ) ), ok ); + } } - void ShardingConnectionHook::onDestory( DBClientBase * conn ) { - resetShardVersionCB( conn ); + void ShardingConnectionHook::onDestroy( DBClientBase * conn ) { + + if( _shardedConnections && isVersionableCB( conn ) ){ + resetShardVersionCB( conn ); + } + } } @@ -213,9 +213,9 @@ namespace mongo { class ShardConnection : public AScopedConnection { public: - ShardConnection( const Shard * s , const string& ns, bool ignoreDirect = false ); - ShardConnection( const Shard& s , const string& ns, bool ignoreDirect = false ); - ShardConnection( const string& addr , const string& ns, bool ignoreDirect = false ); + ShardConnection( const Shard * s , const string& ns ); + ShardConnection( const Shard& s , const string& ns ); + ShardConnection( const string& addr , const string& ns ); ~ShardConnection(); @@ -267,7 +267,7 @@ namespace mongo { static void checkMyConnectionVersions( const string & ns ); private: - void _init( bool ignoreDirect = false ); + void _init(); void _finishInit(); bool _finishedInit; @@ -290,7 +290,7 @@ namespace mongo { virtual void onCreate( DBClientBase * conn ); virtual void onHandedOut( DBClientBase * conn ); - virtual void onDestory( DBClientBase * conn ); + virtual void onDestroy( DBClientBase * conn ); bool _shardedConnections; }; diff --git a/s/shard_version.cpp b/s/shard_version.cpp index 8782c8e..9c55019 100644 --- a/s/shard_version.cpp +++ b/s/shard_version.cpp @@ -31,6 +31,8 @@ namespace mongo { // when running in sharded mode, use chunk shard version control + static bool isVersionable( DBClientBase * conn ); + static bool initShardVersion( DBClientBase & conn, BSONObj& result ); static bool checkShardVersion( DBClientBase & conn , const string& ns , bool authoritative = false , int tryNumber = 1 ); static void resetShardVersion( DBClientBase * conn ); @@ -40,6 +42,8 @@ namespace mongo { // // TODO: Better encapsulate this mechanism. // + isVersionableCB = isVersionable; + initShardVersionCB = initShardVersion; checkShardVersionCB = checkShardVersion; resetShardVersionCB = resetShardVersion; } @@ -52,6 +56,16 @@ namespace mongo { : _mutex( "ConnectionShardStatus" ) { } + bool isInitialized( DBClientBase * conn ){ + scoped_lock lk( _mutex ); + return _init.find( conn ) != _init.end(); + } + + void setInitialized( DBClientBase * conn ){ + scoped_lock lk( _mutex ); + _init.insert( conn ); + } + S getSequence( DBClientBase * conn , const string& ns ) { scoped_lock lk( _mutex ); return _map[conn][ns]; @@ -65,13 +79,15 @@ namespace mongo { void reset( DBClientBase * conn ) { scoped_lock lk( _mutex ); _map.erase( conn ); + _init.erase( conn ); } - // protects _map + // protects _maps mongo::mutex _mutex; // a map from a connection into ChunkManager's sequence number for each namespace map<DBClientBase*, map<string,unsigned long long> > _map; + set<DBClientBase*> _init; } connectionShardStatus; @@ -79,6 +95,75 @@ namespace mongo { connectionShardStatus.reset( conn ); } + bool isVersionable( DBClientBase* conn ){ + return conn->type() == ConnectionString::MASTER || conn->type() == ConnectionString::SET; + } + + DBClientBase* getVersionable( DBClientBase* conn ){ + + switch ( conn->type() ) { + case ConnectionString::INVALID: + massert( 15904, str::stream() << "cannot set version on invalid connection " << conn->toString(), false ); + return NULL; + case ConnectionString::MASTER: + return conn; + case ConnectionString::PAIR: + massert( 15905, str::stream() << "cannot set version or shard on pair connection " << conn->toString(), false ); + return NULL; + case ConnectionString::SYNC: + massert( 15906, str::stream() << "cannot set version or shard on sync connection " << conn->toString(), false ); + return NULL; + case ConnectionString::SET: + DBClientReplicaSet* set = (DBClientReplicaSet*) conn; + return &( set->masterConn() ); + } + + assert( false ); + return NULL; + } + + extern OID serverID; + + bool initShardVersion( DBClientBase& conn_in, BSONObj& result ){ + + WriteBackListener::init( conn_in ); + + DBClientBase* conn = getVersionable( &conn_in ); + assert( conn ); // errors thrown above + + BSONObjBuilder cmdBuilder; + + cmdBuilder.append( "setShardVersion" , "" ); + cmdBuilder.appendBool( "init", true ); + cmdBuilder.append( "configdb" , configServer.modelServer() ); + cmdBuilder.appendOID( "serverID" , &serverID ); + cmdBuilder.appendBool( "authoritative" , true ); + + BSONObj cmd = cmdBuilder.obj(); + + LOG(1) << "initializing shard connection to " << conn->toString() << endl; + LOG(2) << "initial sharding settings : " << cmd << endl; + + bool ok = conn->runCommand( "admin" , cmd , result ); + connectionShardStatus.setInitialized( conn ); + + // HACK for backwards compatibility with v1.8.x, v2.0.0 and v2.0.1 + // Result is false, but will still initialize serverID and configdb + // Not master does not initialize serverID and configdb, but we ignore since if the connection is not master, + // we are not setting the shard version at all + if( ! ok && ! result["errmsg"].eoo() && ( result["errmsg"].String() == "need to specify namespace"/* 2.0.1/2 */ || + result["errmsg"].String() == "need to speciy namespace" /* 1.8 */ || + result["errmsg"].String() == "not master" /* both */ ) ) + { + ok = true; + } + + LOG(3) << "initial sharding result : " << result << endl; + + return ok; + + } + /** * @return true if had to do something */ @@ -91,31 +176,14 @@ namespace mongo { if ( ! conf ) return false; - DBClientBase* conn = 0; + DBClientBase* conn = getVersionable( &conn_in ); + assert(conn); // errors thrown above - switch ( conn_in.type() ) { - case ConnectionString::INVALID: - assert(0); - break; - case ConnectionString::MASTER: - // great - conn = &conn_in; - break; - case ConnectionString::PAIR: - assert( ! "pair not support for sharding" ); - break; - case ConnectionString::SYNC: - // TODO: we should check later that we aren't actually sharded on this - conn = &conn_in; - break; - case ConnectionString::SET: - DBClientReplicaSet* set = (DBClientReplicaSet*)&conn_in; - conn = &(set->masterConn()); - break; + if( ! connectionShardStatus.isInitialized( conn ) ){ + BSONObj result; + uassert( 15918, str::stream() << "cannot initialize version on shard " << conn->getServerAddress() << causedBy( result.toString() ), initShardVersion( *conn, result ) ); } - assert(conn); - unsigned long long officialSequenceNumber = 0; ChunkManagerPtr manager; @@ -140,6 +208,13 @@ namespace mongo { version = manager->getVersion( Shard::make( conn->getServerAddress() ) ); } + if( version == 0 ){ + LOG(2) << "resetting shard version of " << ns << " on " << conn->getServerAddress() << ", " << + ( ! isSharded ? "no longer sharded" : + ( ! manager ? "no chunk manager found" : + "version is zero" ) ) << endl; + } + LOG(2) << " have to set shard version for conn: " << conn << " ns:" << ns << " my last seq: " << sequenceNumber << " current: " << officialSequenceNumber << " version: " << version << " manager: " << manager.get() diff --git a/s/shardconnection.cpp b/s/shardconnection.cpp index 04b49f2..89b3839 100644 --- a/s/shardconnection.cpp +++ b/s/shardconnection.cpp @@ -29,6 +29,14 @@ namespace mongo { // // TODO: better encapsulate this mechanism. + bool defaultIsVersionable( DBClientBase * conn ){ + return false; + } + + bool defaultInitShardVersion( DBClientBase & conn, BSONObj& result ){ + return false; + } + bool defaultCheckShardVersion( DBClientBase & conn , const string& ns , bool authoritative , int tryNumber ) { // no-op in mongod return false; @@ -38,14 +46,13 @@ namespace mongo { // no-op in mongod } + boost::function1<bool, DBClientBase* > isVersionableCB = defaultIsVersionable; + boost::function2<bool, DBClientBase&, BSONObj& > initShardVersionCB = defaultInitShardVersion; boost::function4<bool, DBClientBase&, const string&, bool, int> checkShardVersionCB = defaultCheckShardVersion; boost::function1<void, DBClientBase*> resetShardVersionCB = defaultResetShardVersion; DBConnectionPool shardConnectionPool; - // Only print the non-top-level-shard-conn warning once if not verbose - volatile bool printedShardConnWarning = false; - /** * holds all the actual db connections for a client to various servers * 1 per thread, so doesn't have to be thread safe @@ -71,7 +78,7 @@ namespace mongo { /* if we're shutting down, don't want to initiate release mechanism as it is slow, and isn't needed since all connections will be closed anyway */ if ( inShutdown() ) { - resetShardVersionCB( ss->avail ); + if( isVersionableCB( ss->avail ) ) resetShardVersionCB( ss->avail ); delete ss->avail; } else @@ -83,35 +90,9 @@ namespace mongo { _hosts.clear(); } - DBClientBase * get( const string& addr , const string& ns, bool ignoreDirect = false ) { + DBClientBase * get( const string& addr , const string& ns ) { _check( ns ); - // Determine if non-shard conn is RS member for warning - // All shards added to _hosts if not present in _check() - if( ( logLevel >= 1 || ! printedShardConnWarning ) && ! ignoreDirect && _hosts.find( addr ) == _hosts.end() ){ - - vector<Shard> all; - Shard::getAllShards( all ); - - bool isRSMember = false; - string parentShard; - for ( unsigned i = 0; i < all.size(); i++ ) { - string connString = all[i].getConnString(); - if( connString.find( addr ) != string::npos && connString.find( '/' ) != string::npos ){ - isRSMember = true; - parentShard = connString; - break; - } - } - - if( isRSMember ){ - printedShardConnWarning = true; - warning() << "adding shard sub-connection " << addr << " (parent " << parentShard << ") as sharded, this is safe but unexpected" << endl; - printStackTrace(); - } - } - - Status* &s = _hosts[addr]; if ( ! s ) s = new Status(); @@ -119,7 +100,13 @@ namespace mongo { if ( s->avail ) { DBClientBase* c = s->avail; s->avail = 0; - shardConnectionPool.onHandedOut( c ); + try { + shardConnectionPool.onHandedOut( c ); + } + catch ( std::exception& e ) { + delete c; + throw; + } return c; } @@ -200,24 +187,24 @@ namespace mongo { thread_specific_ptr<ClientConnections> ClientConnections::_perThread; - ShardConnection::ShardConnection( const Shard * s , const string& ns, bool ignoreDirect ) + ShardConnection::ShardConnection( const Shard * s , const string& ns ) : _addr( s->getConnString() ) , _ns( ns ) { - _init( ignoreDirect ); + _init(); } - ShardConnection::ShardConnection( const Shard& s , const string& ns, bool ignoreDirect ) + ShardConnection::ShardConnection( const Shard& s , const string& ns ) : _addr( s.getConnString() ) , _ns( ns ) { - _init( ignoreDirect ); + _init(); } - ShardConnection::ShardConnection( const string& addr , const string& ns, bool ignoreDirect ) + ShardConnection::ShardConnection( const string& addr , const string& ns ) : _addr( addr ) , _ns( ns ) { - _init( ignoreDirect ); + _init(); } - void ShardConnection::_init( bool ignoreDirect ) { + void ShardConnection::_init() { assert( _addr.size() ); - _conn = ClientConnections::threadInstance()->get( _addr , _ns, ignoreDirect ); + _conn = ClientConnections::threadInstance()->get( _addr , _ns ); _finishedInit = false; } @@ -226,7 +213,7 @@ namespace mongo { return; _finishedInit = true; - if ( _ns.size() ) { + if ( _ns.size() && isVersionableCB( _conn ) ) { _setVersion = checkShardVersionCB( *_conn , _ns , false , 1 ); } else { @@ -245,7 +232,7 @@ namespace mongo { void ShardConnection::kill() { if ( _conn ) { - resetShardVersionCB( _conn ); + if( isVersionableCB( _conn ) ) resetShardVersionCB( _conn ); delete _conn; _conn = 0; _finishedInit = true; diff --git a/s/strategy_single.cpp b/s/strategy_single.cpp index 012be5f..fc206e5 100644 --- a/s/strategy_single.cpp +++ b/s/strategy_single.cpp @@ -182,6 +182,9 @@ namespace mongo { vector<Shard> shards; if ( strcmp( ns , "inprog" ) == 0 ) { + + r.checkAuth(); + Shard::getAllShards( shards ); BSONArrayBuilder arr( b.subarrayStart( "inprog" ) ); @@ -219,6 +222,9 @@ namespace mongo { arr.done(); } else if ( strcmp( ns , "killop" ) == 0 ) { + + r.checkAuth(); + BSONElement e = q.query["op"]; if ( strstr( r.getns() , "admin." ) != 0 ) { b.append( "err" , "unauthorized" ); @@ -152,6 +152,8 @@ namespace mongo { string _ns; }; + extern boost::function1<bool, DBClientBase* > isVersionableCB; + extern boost::function2<bool, DBClientBase&, BSONObj& > initShardVersionCB; extern boost::function4<bool, DBClientBase&, const string&, bool, int> checkShardVersionCB; extern boost::function1<void, DBClientBase*> resetShardVersionCB; diff --git a/s/writeback_listener.cpp b/s/writeback_listener.cpp index 5f320d3..ebdefb0 100644 --- a/s/writeback_listener.cpp +++ b/s/writeback_listener.cpp @@ -165,13 +165,16 @@ namespace mongo { DBConfigPtr db = grid.getDBConfig( ns ); ShardChunkVersion needVersion( data["version"] ); + // TODO: The logic here could be refactored, but keeping to the original codepath for safety for now + ChunkManagerPtr manager = db->getChunkManagerIfExists( ns ); + LOG(1) << "connectionId: " << cid << " writebackId: " << wid << " needVersion : " << needVersion.toString() - << " mine : " << db->getChunkManager( ns )->getVersion().toString() + << " mine : " << ( manager ? manager->getVersion().toString() : "(unknown)" ) << endl; LOG(1) << m.toString() << endl; - if ( needVersion.isSet() && needVersion <= db->getChunkManager( ns )->getVersion() ) { + if ( needVersion.isSet() && manager && needVersion <= manager->getVersion() ) { // this means when the write went originally, the version was old // if we're here, it means we've already updated the config, so don't need to do again //db->getChunkManager( ns , true ); // SERVER-1349 @@ -180,7 +183,7 @@ namespace mongo { // we received a writeback object that was sent to a previous version of a shard // the actual shard may not have the object the writeback operation is for // we need to reload the chunk manager and get the new shard versions - db->getChunkManager( ns , true ); + manager = db->getChunkManager( ns , true ); } // do request and then call getLastError @@ -216,7 +219,7 @@ namespace mongo { if ( gle["code"].numberInt() == 9517 ) { log() << "writeback failed because of stale config, retrying attempts: " << attempts << endl; - if( ! db->getChunkManagerIfExists( ns , true ) ){ + if( ! db->getChunkManagerIfExists( ns , true, attempts > 2 ) ){ uassert( 15884, str::stream() << "Could not reload chunk manager after " << attempts << " attempts.", attempts <= 4 ); sleepsecs( attempts - 1 ); } diff --git a/scripting/engine.cpp b/scripting/engine.cpp index 1982940..7d13cb7 100644 --- a/scripting/engine.cpp +++ b/scripting/engine.cpp @@ -26,7 +26,7 @@ namespace mongo { int Scope::_numScopes = 0; - Scope::Scope() : _localDBName("") , _loadedVersion(0) { + Scope::Scope() : _localDBName("") , _loadedVersion(0), _numTimeUsed(0) { _numScopes++; } @@ -284,7 +284,8 @@ namespace mongo { void done( const string& pool , Scope * s ) { scoped_lock lk( _mutex ); list<Scope*> & l = _pools[pool]; - if ( l.size() > 10 ) { + // make we dont keep to many contexts, or use them for too long + if ( l.size() > 10 || s->getTimeUsed() > 100 ) { delete s; } else { @@ -302,6 +303,7 @@ namespace mongo { Scope * s = l.back(); l.pop_back(); s->reset(); + s->incTimeUsed(); return s; } diff --git a/scripting/engine.h b/scripting/engine.h index 1f9f1f5..9dd5f1d 100644 --- a/scripting/engine.h +++ b/scripting/engine.h @@ -132,6 +132,11 @@ namespace mongo { static void validateObjectIdString( const string &str ); + /** increments the number of times a scope was used */ + void incTimeUsed() { ++_numTimeUsed; } + /** gets the number of times a scope was used */ + int getTimeUsed() { return _numTimeUsed; } + protected: virtual ScriptingFunction _createFunction( const char * code ) = 0; @@ -141,6 +146,7 @@ namespace mongo { set<string> _storedNames; static long long _lastVersion; map<string,ScriptingFunction> _cachedFunctions; + int _numTimeUsed; static int _numScopes; }; @@ -168,7 +174,12 @@ namespace mongo { static void setup(); + /** gets a scope from the pool or a new one if pool is empty + * @param pool An identifier for the pool, usually the db name + * @return the scope */ auto_ptr<Scope> getPooledScope( const string& pool ); + + /** call this method to release some JS resources when a thread is done */ void threadDone(); struct Unlocker { virtual ~Unlocker() {} }; diff --git a/scripting/engine_v8.cpp b/scripting/engine_v8.cpp index fd69d66..2173391 100644 --- a/scripting/engine_v8.cpp +++ b/scripting/engine_v8.cpp @@ -1034,7 +1034,7 @@ namespace mongo { } case mongo::BinData: { - Local<v8::Object> b = readOnly ? readOnlyObjects->NewInstance() : internalFieldObjects->NewInstance(); + /*Local<v8::Object> b = */ readOnly ? readOnlyObjects->NewInstance() : internalFieldObjects->NewInstance(); int len; const char *data = f.binData( len ); @@ -1060,7 +1060,7 @@ namespace mongo { } case mongo::NumberLong: { - Local<v8::Object> sub = readOnly ? readOnlyObjects->NewInstance() : internalFieldObjects->NewInstance(); + /* Local<v8::Object> sub = */ readOnly ? readOnlyObjects->NewInstance() : internalFieldObjects->NewInstance(); unsigned long long val = f.numberLong(); v8::Function* numberLong = getNamedCons( "NumberLong" ); double floatApprox = (double)(long long)val; @@ -1244,7 +1244,7 @@ namespace mongo { } case mongo::NumberLong: { - Local<v8::Object> sub = internalFieldObjects->NewInstance(); + /* Local<v8::Object> sub = */ internalFieldObjects->NewInstance(); unsigned long long val = f.numberLong(); v8::Function* numberLong = getNamedCons( "NumberLong" ); if ( (long long)val == (long long)(double)(long long)(val) ) { diff --git a/scripting/v8_db.cpp b/scripting/v8_db.cpp index bda549c..f2f5333 100644 --- a/scripting/v8_db.cpp +++ b/scripting/v8_db.cpp @@ -128,7 +128,7 @@ namespace mongo { v8::Handle<v8::FunctionTemplate> getTimestampFunctionTemplate(V8Scope* scope) { v8::Handle<v8::FunctionTemplate> ts = scope->createV8Function(dbTimestampInit); - v8::Local<v8::Template> proto = ts->PrototypeTemplate(); + /* v8::Local<v8::Template> proto = */ ts->PrototypeTemplate(); ts->InstanceTemplate()->SetInternalFieldCount( 1 ); return ts; diff --git a/shell/collection.js b/shell/collection.js index 1e6fe03..cb7035d 100644 --- a/shell/collection.js +++ b/shell/collection.js @@ -62,6 +62,7 @@ DBCollection.prototype.help = function () { print("\tdb." + shortName + ".update(query, object[, upsert_bool, multi_bool])"); print("\tdb." + shortName + ".validate( <full> ) - SLOW");; print("\tdb." + shortName + ".getShardVersion() - only for use with sharding"); + print("\tdb." + shortName + ".getShardDistribution() - prints statistics about data distribution in the cluster"); return __magicNoPrint; } @@ -654,3 +655,190 @@ DBCollection.autocomplete = function(obj){ } return ret; } + + +// Sharding additions + +/* +Usage : + +mongo <mongos> +> load('path-to-file/shardingAdditions.js') +Loading custom sharding extensions... +true + +> var collection = db.getMongo().getCollection("foo.bar") +> collection.getShardDistribution() // prints statistics related to the collection's data distribution + +> collection.getSplitKeysForChunks() // generates split points for all chunks in the collection, based on the + // default maxChunkSize or alternately a specified chunk size +> collection.getSplitKeysForChunks( 10 ) // Mb + +> var splitter = collection.getSplitKeysForChunks() // by default, the chunks are not split, the keys are just + // found. A splitter function is returned which will actually + // do the splits. + +> splitter() // ! Actually executes the splits on the cluster ! + +*/ + +DBCollection.prototype.getShardDistribution = function(){ + + var stats = this.stats() + + if( ! stats.sharded ){ + print( "Collection " + this + " is not sharded." ) + return + } + + var config = this.getMongo().getDB("config") + + var numChunks = 0 + + for( var shard in stats.shards ){ + + var shardDoc = config.shards.findOne({ _id : shard }) + + print( "\nShard " + shard + " at " + shardDoc.host ) + + var shardStats = stats.shards[ shard ] + + var chunks = config.chunks.find({ _id : sh._collRE( coll ), shard : shard }).toArray() + + numChunks += chunks.length + + var estChunkData = shardStats.size / chunks.length + var estChunkCount = Math.floor( shardStats.count / chunks.length ) + + print( " data : " + sh._dataFormat( shardStats.size ) + + " docs : " + shardStats.count + + " chunks : " + chunks.length ) + print( " estimated data per chunk : " + sh._dataFormat( estChunkData ) ) + print( " estimated docs per chunk : " + estChunkCount ) + + } + + print( "\nTotals" ) + print( " data : " + sh._dataFormat( stats.size ) + + " docs : " + stats.count + + " chunks : " + numChunks ) + for( var shard in stats.shards ){ + + var shardStats = stats.shards[ shard ] + + var estDataPercent = Math.floor( shardStats.size / stats.size * 10000 ) / 100 + var estDocPercent = Math.floor( shardStats.count / stats.count * 10000 ) / 100 + + print( " Shard " + shard + " contains " + estDataPercent + "% data, " + estDocPercent + "% docs in cluster, " + + "avg obj size on shard : " + sh._dataFormat( stats.shards[ shard ].avgObjSize ) ) + } + + print( "\n" ) + +} + +// In testing phase, use with caution +DBCollection.prototype._getSplitKeysForChunks = function( chunkSize ){ + + var stats = this.stats() + + if( ! stats.sharded ){ + print( "Collection " + this + " is not sharded." ) + return + } + + var config = this.getMongo().getDB("config") + + if( ! chunkSize ){ + chunkSize = config.settings.findOne({ _id : "chunksize" }).value + print( "Chunk size not set, using default of " + chunkSize + "Mb" ) + } + else{ + print( "Using chunk size of " + chunkSize + "Mb" ) + } + + var shardDocs = config.shards.find().toArray() + + var allSplitPoints = {} + var numSplits = 0 + + for( var i = 0; i < shardDocs.length; i++ ){ + + var shardDoc = shardDocs[i] + var shard = shardDoc._id + var host = shardDoc.host + var sconn = new Mongo( host ) + + var chunks = config.chunks.find({ _id : sh._collRE( this ), shard : shard }).toArray() + + print( "\nGetting split points for chunks on shard " + shard + " at " + host ) + + var splitPoints = [] + + for( var j = 0; j < chunks.length; j++ ){ + var chunk = chunks[j] + var result = sconn.getDB("admin").runCommand({ splitVector : this + "", min : chunk.min, max : chunk.max, maxChunkSize : chunkSize }) + if( ! result.ok ){ + print( " Had trouble getting split keys for chunk " + sh._pchunk( chunk ) + " :\n" ) + printjson( result ) + } + else{ + splitPoints = splitPoints.concat( result.splitKeys ) + + if( result.splitKeys.length > 0 ) + print( " Added " + result.splitKeys.length + " split points for chunk " + sh._pchunk( chunk ) ) + } + } + + print( "Total splits for shard " + shard + " : " + splitPoints.length ) + + numSplits += splitPoints.length + allSplitPoints[ shard ] = splitPoints + + } + + // Get most recent migration + var migration = config.changelog.find({ what : /^move.*/ }).sort({ time : -1 }).limit( 1 ).toArray() + if( migration.length == 0 ) + print( "\nNo migrations found in changelog." ) + else { + migration = migration[0] + print( "\nMost recent migration activity was on " + migration.ns + " at " + migration.time ) + } + + var admin = this.getMongo().getDB("admin") + var coll = this + var splitFunction = function(){ + + // Turn off the balancer, just to be safe + print( "Turning off balancer..." ) + config.settings.update({ _id : "balancer" }, { $set : { stopped : true } }, true ) + print( "Sleeping for 30s to allow balancers to detect change. To be extra safe, check config.changelog" + + " for recent migrations." ) + sleep( 30000 ) + + for( shard in allSplitPoints ){ + for( var i = 0; i < allSplitPoints[ shard ].length; i++ ){ + var splitKey = allSplitPoints[ shard ][i] + print( "Splitting at " + tojson( splitKey ) ) + printjson( admin.runCommand({ split : coll + "", middle : splitKey }) ) + } + } + + print( "Turning the balancer back on." ) + config.settings.update({ _id : "balancer" }, { $set : { stopped : false } } ) + sleep( 1 ) + } + + print( "\nGenerated " + numSplits + " split keys, run output function to perform splits.\n" + + " ex : \n" + + " > var splitter = <collection>.getSplitKeysForChunks()\n" + + " > splitter() // Execute splits on cluster !\n" ) + + return splitFunction + +} + + + + diff --git a/shell/dbshell.cpp b/shell/dbshell.cpp index f3122c7..443973f 100644 --- a/shell/dbshell.cpp +++ b/shell/dbshell.cpp @@ -398,13 +398,14 @@ string finishCode( string code ) { while ( ! isBalanced( code ) ) { inMultiLine = 1; code += "\n"; + // cancel multiline if two blank lines are entered + if ( code.find("\n\n\n") != string::npos ) + return ";"; char * line = shellReadline("... " , 1 ); if ( gotInterrupted ) return ""; if ( ! line ) return ""; - if ( code.find("\n\n") != string::npos ) // cancel multiline if two blank lines are entered - return ";"; while (startsWith(line, "... ")) line += 4; diff --git a/shell/mongo_vstudio.cpp b/shell/mongo_vstudio.cpp index 5496ddb..208d734 100644 --- a/shell/mongo_vstudio.cpp +++ b/shell/mongo_vstudio.cpp @@ -1005,8 +1005,8 @@ const StringData _jscode_raw_utils = "return {}\n" "}\n" "\n" -"testLog = function(x){\n" -"print( jsTestFile() + \" - \" + x )\n" +"jsTestLog = function(msg){\n" +"print( \"\\n\\n----\\n\" + msg + \"\\n----\\n\\n\" )\n" "}\n" "\n" "shellPrintHelper = function (x) {\n" diff --git a/shell/servers.js b/shell/servers.js index ad3b5eb..efbd9b6 100755 --- a/shell/servers.js +++ b/shell/servers.js @@ -235,7 +235,8 @@ ShardingTest = function( testName , numShards , verboseLevel , numMongos , other rs.awaitReplication(); var xxx = new Mongo( rs.getURL() ); xxx.name = rs.getURL(); - this._connections.push( xxx ); + this._connections.push( xxx ) + this["shard" + i] = xxx } this._configServers = [] @@ -260,6 +261,7 @@ ShardingTest = function( testName , numShards , verboseLevel , numMongos , other var conn = startMongodTest( 30000 + i , testName + i, 0, options ); this._alldbpaths.push( testName +i ) this._connections.push( conn ); + this["shard" + i] = conn } if ( otherParams.sync ){ @@ -544,7 +546,7 @@ printShardingStatus = function( configDB , verbose ){ output( "\t" + tojsononeline(db,"",true) ); if (db.partitioned){ - configDB.collections.find( { _id : new RegExp( "^" + db._id + "\." ) } ).sort( { _id : 1 } ).forEach( + configDB.collections.find( { _id : new RegExp( "^" + db._id + "\\." ) } ).sort( { _id : 1 } ).forEach( function( coll ){ if ( coll.dropped == false ){ output("\t\t" + coll._id + " chunks:"); @@ -760,8 +762,8 @@ ShardingTest.prototype.isSharded = function( collName ){ ShardingTest.prototype.shardGo = function( collName , key , split , move , dbName ){ - split = split || key; - move = move || split; + split = ( split != false ? ( split || key ) : split ) + move = ( split != false && move != false ? ( move || split ) : false ) if( collName.getDB ) dbName = "" + collName.getDB() @@ -782,12 +784,16 @@ ShardingTest.prototype.shardGo = function( collName , key , split , move , dbNam assert( false ) } + if( split == false ) return + result = this.s.adminCommand( { split : c , middle : split } ); if( ! result.ok ){ printjson( result ) assert( false ) } + if( move == false ) return + var result = null for( var i = 0; i < 5; i++ ){ result = this.s.adminCommand( { movechunk : c , find : move , to : this.getOther( this.getServer( dbName ) ).name } ); @@ -1854,7 +1860,11 @@ ReplSetTest.prototype.waitForIndicator = function( node, states, ind, timeout ){ printjson( status ) lastTime = new Date().getTime() } - + + if (typeof status.members == 'undefined') { + return false; + } + for( var i = 0; i < status.members.length; i++ ){ if( status.members[i].name == node.host ){ for( var j = 0; j < states.length; j++ ){ diff --git a/shell/utils.js b/shell/utils.js index 8380607..7d7a23b 100644 --- a/shell/utils.js +++ b/shell/utils.js @@ -1000,8 +1000,8 @@ jsTestOptions = function(){ return {} } -testLog = function(x){ - print( jsTestFile() + " - " + x ) +jsTestLog = function(msg){ + print( "\n\n----\n" + msg + "\n----\n\n" ) } shellPrintHelper = function (x) { diff --git a/shell/utils_sh.js b/shell/utils_sh.js index 5bd449b..297643f 100644 --- a/shell/utils_sh.js +++ b/shell/utils_sh.js @@ -23,6 +23,22 @@ sh._adminCommand = function( cmd , skipCheck ) { return res; } + +sh._dataFormat = function( bytes ){ + if( bytes < 1024 ) return Math.floor( bytes ) + "b" + if( bytes < 1024 * 1024 ) return Math.floor( bytes / 1024 ) + "kb" + if( bytes < 1024 * 1024 * 1024 ) return Math.floor( ( Math.floor( bytes / 1024 ) / 1024 ) * 100 ) / 100 + "Mb" + return Math.floor( ( Math.floor( bytes / ( 1024 * 1024 ) ) / 1024 ) * 100 ) / 100 + "Gb" +} + +sh._collRE = function( coll ){ + return RegExp( "^" + (coll + "").replace(/\./g, "\\.") + "-.*" ) +} + +sh._pchunk = function( chunk ){ + return "[" + tojson( chunk.min ) + " -> " + tojson( chunk.max ) + "]" +} + sh.help = function() { print( "\tsh.addShard( host ) server:port OR setname/server:port" ) print( "\tsh.enableSharding(dbname) enables sharding on the database dbname" ) diff --git a/third_party/pcre.py b/third_party/pcre.py index a200722..ad9d6bb 100644 --- a/third_party/pcre.py +++ b/third_party/pcre.py @@ -31,6 +31,9 @@ def configure( env , fileLists , options ): myenv.Append( CPPDEFINES=["HAVE_CONFIG_H"] ) fileLists["commonFiles"] += [ myenv.Object(f) for f in getFiles() ] +def configureSystem( env , fileLists , options ): + + env.Append( LIBS=[ "pcrecpp" ] ) if __name__ == "__main__": diff --git a/third_party/sm.py b/third_party/sm.py index 53e7984..9927be8 100644 --- a/third_party/sm.py +++ b/third_party/sm.py @@ -42,15 +42,20 @@ root = "third_party/js-1.7" def r(x): return "%s/%s" % ( root , x ) -def configure( env , fileLists , options ): - if not options["usesm"]: - return - +def configureBasics( env , fileLists , options ): if options["windows"]: env.Append( CPPDEFINES=[ "XP_WIN" ] ) else: env.Append( CPPDEFINES=[ "XP_UNIX" ] ) + + +def configure( env , fileLists , options ): + if not options["usesm"]: + return + + configureBasics( env , fileLists , options ) + env.Prepend( CPPPATH=[root] ) myenv = env.Clone() @@ -98,3 +103,12 @@ def configure( env , fileLists , options ): myenv.Auto( r("jsautocfg.h") , [ jscpucfg ] ) myenv.Depends( r("jsscan.c") , r("jsautokw.h") ) + + +def configureSystem( env , fileLists , options ): + if not options["usesm"]: + return + + configureBasics( env , fileLists , options ) + + env.Append( LIBS=[ "js" ] ) diff --git a/third_party/snappy.py b/third_party/snappy.py index c70cb28..e53ee63 100644 --- a/third_party/snappy.py +++ b/third_party/snappy.py @@ -9,3 +9,6 @@ def configure( env , fileLists , options ): files = ["third_party/snappy/snappy.cc", "third_party/snappy/snappy-sinksource.cc"] fileLists["serverOnlyFiles"] += [ myenv.Object(f) for f in files ] + +def configureSystem( env , fileLists , options ): + configure( env , fileLists , options ) diff --git a/tools/import.cpp b/tools/import.cpp index 16980b0..bd77bcd 100644 --- a/tools/import.cpp +++ b/tools/import.cpp @@ -396,7 +396,7 @@ public: break; } len += bytesProcessed; - line += len; + line += bytesProcessed; } else { if (!parseRow(in, o, len)) { diff --git a/tools/stat.cpp b/tools/stat.cpp index 7483222..e1eda8d 100644 --- a/tools/stat.cpp +++ b/tools/stat.cpp @@ -423,6 +423,8 @@ namespace mongo { int rowCount = getParam( "rowcount" , 0 ); int rowNum = 0; + auth(); + BSONObj prev = stats(); if ( prev.isEmpty() ) return -1; diff --git a/tools/tool.cpp b/tools/tool.cpp index e8c23d5..ab464c7 100644 --- a/tools/tool.cpp +++ b/tools/tool.cpp @@ -402,7 +402,7 @@ namespace mongo { // findOne throws an AssertionException if it's not authenticated. if (_coll.size() > 0) { // BSONTools don't have a collection - conn().findOne(getNS(), Query("{}")); + conn().findOne(getNS(), Query("{}"), 0, QueryOption_SlaveOk); } return; } diff --git a/util/concurrency/rwlock.h b/util/concurrency/rwlock.h index d8a11ea..ed5bda0 100644 --- a/util/concurrency/rwlock.h +++ b/util/concurrency/rwlock.h @@ -203,7 +203,7 @@ namespace mongo { DEV mutexDebugger.entering(_name); } void unlock() { - mutexDebugger.leaving(_name); + DEV mutexDebugger.leaving(_name); check( pthread_rwlock_unlock( &_lock ) ); } diff --git a/util/concurrency/spin_lock.cpp b/util/concurrency/spin_lock.cpp index 1811f15..52bd218 100644 --- a/util/concurrency/spin_lock.cpp +++ b/util/concurrency/spin_lock.cpp @@ -45,7 +45,36 @@ namespace mongo { #if defined(_WIN32) EnterCriticalSection(&_cs); #elif defined(__USE_XOPEN2K) - pthread_spin_lock( &_lock ); + + /** + * this is designed to perform close to the default spin lock + * the reason for the mild insanity is to prevent horrible performance + * when contention spikes + * it allows spinlocks to be used in many more places + * which is good because even with this change they are about 8x faster on linux + */ + + if ( pthread_spin_trylock( &_lock ) == 0 ) + return; + + for ( int i=0; i<1000; i++ ) + if ( pthread_spin_trylock( &_lock ) == 0 ) + return; + + for ( int i=0; i<1000; i++ ) { + if ( pthread_spin_trylock( &_lock ) == 0 ) + return; + pthread_yield(); + } + + struct timespec t; + t.tv_sec = 0; + t.tv_nsec = 5000000; + + while ( pthread_spin_trylock( &_lock ) != 0 ) { + nanosleep(&t, NULL); + } + #elif defined(__GCC_HAVE_SYNC_COMPARE_AND_SWAP_4) // fast path if (!_locked && !__sync_lock_test_and_set(&_locked, true)) { diff --git a/util/concurrency/value.h b/util/concurrency/value.h index c66977b..897fa95 100644 --- a/util/concurrency/value.h +++ b/util/concurrency/value.h @@ -21,6 +21,7 @@ #pragma once #include "mutex.h" +#include "spin_lock.h" namespace mongo { @@ -36,7 +37,7 @@ namespace mongo { builds at runtime. */ template <typename T, mutex& BY> - class Guarded { + class Guarded : boost::noncopyable { T _val; public: T& ref(const scoped_lock& lk) { @@ -47,29 +48,85 @@ namespace mongo { class DiagStr { string _s; - static mutex m; + mutable SpinLock m; public: DiagStr(const DiagStr& r) : _s(r.get()) { } DiagStr() { } bool empty() const { - mutex::scoped_lock lk(m); + scoped_spinlock lk(m); return _s.empty(); } string get() const { - mutex::scoped_lock lk(m); + scoped_spinlock lk(m); return _s; } void set(const char *s) { - mutex::scoped_lock lk(m); + scoped_spinlock lk(m); _s = s; } void set(const string& s) { - mutex::scoped_lock lk(m); + scoped_spinlock lk(m); _s = s; } operator string() const { return get(); } void operator=(const string& s) { set(s); } + void operator=(const DiagStr& rhs) { + scoped_spinlock lk(m); + _s = rhs.get(); + } }; +#if 0 // not including in 2.0 + + /** Thread safe map. + Be careful not to use this too much or it could make things slow; + if not a hot code path no problem. + + Examples: + + mapsf<int,int> mp; + + int x = mp.get(); + + map<int,int> two; + mp.swap(two); + + { + mapsf<int,int>::ref r(mp); + r[9] = 1; + map<int,int>::iterator i = r.r.begin(); + } + + */ + template< class K, class V > + struct mapsf : boost::noncopyable { + SimpleMutex m; + map<K,V> val; + friend struct ref; + public: + mapsf() : m("mapsf") { } + void swap(map<K,V>& rhs) { + SimpleMutex::scoped_lock lk(m); + val.swap(rhs); + } + // safe as we pass by value: + V get(K k) { + SimpleMutex::scoped_lock lk(m); + map<K,V>::iterator i = val.find(k); + if( i == val.end() ) + return K(); + return i->second; + } + // think about deadlocks when using ref. the other methods + // above will always be safe as they are "leaf" operations. + struct ref { + SimpleMutex::scoped_lock lk; + public: + map<K,V> const &r; + ref(mapsf<K,V> &m) : lk(m.m), r(m.val) { } + V& operator[](const K& k) { return r[k]; } + }; + }; +#endif } diff --git a/util/concurrency/vars.cpp b/util/concurrency/vars.cpp index 213e576..b561ccc 100644 --- a/util/concurrency/vars.cpp +++ b/util/concurrency/vars.cpp @@ -22,8 +22,6 @@ namespace mongo { - mutex DiagStr::m("diags"); - // intentional leak. otherwise destructor orders can be problematic at termination. MutexDebugger &mutexDebugger = *(new MutexDebugger()); diff --git a/util/net/httpclient.h b/util/net/httpclient.h index c3f8c82..126969f 100644 --- a/util/net/httpclient.h +++ b/util/net/httpclient.h @@ -35,7 +35,7 @@ namespace mongo { return _entireResponse; } - const Headers getHeaders() const { + Headers getHeaders() const { return _headers; } @@ -76,4 +76,3 @@ namespace mongo { #endif }; } - diff --git a/util/net/message_port.cpp b/util/net/message_port.cpp index 9abfaf7..29764c3 100644 --- a/util/net/message_port.cpp +++ b/util/net/message_port.cpp @@ -139,6 +139,7 @@ namespace mongo { MessagingPort::MessagingPort( Socket& sock ) : Socket( sock ) , piggyBackData( 0 ) { + ports.insert(this); } void MessagingPort::shutdown() { diff --git a/util/net/message_server_port.cpp b/util/net/message_server_port.cpp index ca0b13d..ac13712 100644 --- a/util/net/message_server_port.cpp +++ b/util/net/message_server_port.cpp @@ -87,6 +87,10 @@ namespace mongo { log() << "ClockSkewException - shutting down" << endl; exitCleanly( EXIT_CLOCK_SKEW ); } + catch ( const DBException& e ) { // must be right above std::exception to avoid catching subclasses + log() << "DBException handling request, closing client connection: " << e << endl; + p->shutdown(); + } catch ( std::exception &e ) { error() << "Uncaught std::exception: " << e.what() << ", terminating" << endl; dbexit( EXIT_UNCAUGHT ); diff --git a/util/net/miniwebserver.cpp b/util/net/miniwebserver.cpp index 0793100..f0b5856 100644 --- a/util/net/miniwebserver.cpp +++ b/util/net/miniwebserver.cpp @@ -166,8 +166,13 @@ namespace mongo { ss << responseMsg; string response = ss.str(); - sock.send( response.c_str(), response.size() , "http response" ); - sock.close(); + try { + sock.send( response.c_str(), response.size() , "http response" ); + sock.close(); + } + catch ( SocketException& e ) { + log(1) << "couldn't send data to http client: " << e << endl; + } } string MiniWebServer::getHeader( const char * req , string wanted ) { diff --git a/util/processinfo_win32.cpp b/util/processinfo_win32.cpp index ec66aec..fdd88d3 100644 --- a/util/processinfo_win32.cpp +++ b/util/processinfo_win32.cpp @@ -19,7 +19,7 @@ #include "processinfo.h" #include <iostream> #include <psapi.h> - +#include "../bson/bsonobjbuilder.h" using namespace std; int getpid() { @@ -57,7 +57,20 @@ namespace mongo { return _wconvertmtos( pmc.WorkingSetSize ); } - void ProcessInfo::getExtraInfo(BSONObjBuilder& info) {} + void ProcessInfo::getExtraInfo(BSONObjBuilder& info) { + MEMORYSTATUSEX mse; + mse.dwLength = sizeof(mse); + PROCESS_MEMORY_COUNTERS pmc; + if( GetProcessMemoryInfo( GetCurrentProcess() , &pmc, sizeof(pmc) ) ) { + info.append("page_faults", static_cast<int>(pmc.PageFaultCount)); + info.append("usagePageFileMB", static_cast<int>(pmc.PagefileUsage / 1024 / 1024)); + } + if( GlobalMemoryStatusEx( &mse ) ) { + info.append("totalPageFileMB", static_cast<int>(mse.ullTotalPageFile / 1024 / 1024)); + info.append("availPageFileMB", static_cast<int>(mse.ullAvailPageFile / 1024 / 1024)); + info.append("ramMB", static_cast<int>(mse.ullTotalPhys / 1024 / 1024)); + } + } bool ProcessInfo::blockCheckSupported() { return false; diff --git a/util/ramlog.cpp b/util/ramlog.cpp index 69ffc17..d7a839a 100644 --- a/util/ramlog.cpp +++ b/util/ramlog.cpp @@ -135,7 +135,7 @@ namespace mongo { assert( strlen(v[i]) > 20 ); int r = repeats(v, i); if( r < 0 ) { - s << color( linkify( clean(v,i).c_str() ) ); + s << color( linkify( clean(v,i).c_str() ) ) << '\n'; } else { stringstream x; diff --git a/util/version.cpp b/util/version.cpp index a6efbd5..c43180c 100644 --- a/util/version.cpp +++ b/util/version.cpp @@ -38,7 +38,7 @@ namespace mongo { * 1.2.3-rc4-pre- * If you really need to do something else you'll need to fix _versionArray() */ - const char versionString[] = "2.0.0"; + const char versionString[] = "2.0.2"; // See unit test for example outputs static BSONArray _versionArray(const char* version){ @@ -168,7 +168,7 @@ namespace mongo { f.open("/proc/self/numa_maps", /*read_only*/true); if ( f.is_open() && ! f.bad() ) { char line[100]; //we only need the first line - f.read(0, line, sizeof(line)); + assert( read(f.fd, line, sizeof(line)) <= 100 ); // just in case... line[98] = ' '; |