diff options
Diffstat (limited to 'db')
39 files changed, 977 insertions, 356 deletions
diff --git a/db/client.cpp b/db/client.cpp index 9781041..f9653f5 100644 --- a/db/client.cpp +++ b/db/client.cpp @@ -35,16 +35,18 @@ namespace mongo { + Client* Client::syncThread; mongo::mutex Client::clientsMutex("clientsMutex"); set<Client*> Client::clients; // always be in clientsMutex when manipulating this boost::thread_specific_ptr<Client> currentClient; - Client::Client(const char *desc) : + Client::Client(const char *desc, MessagingPort *p) : _context(0), _shutdown(false), _desc(desc), _god(0), - _lastOp(0) + _lastOp(0), + _mp(p) { _curOp = new CurOp( this ); scoped_lock bl(clientsMutex); @@ -52,15 +54,21 @@ namespace mongo { } Client::~Client() { - delete _curOp; _god = 0; if ( _context ) - cout << "ERROR: Client::~Client _context should be NULL: " << _desc << endl; - if ( !_shutdown ) - cout << "ERROR: Client::shutdown not called: " << _desc << endl; - } + error() << "Client::~Client _context should be null but is not; client:" << _desc << endl; + if ( ! _shutdown ) { + error() << "Client::shutdown not called: " << _desc << endl; + } + + scoped_lock bl(clientsMutex); + if ( ! _shutdown ) + clients.erase(this); + delete _curOp; + } + void Client::_dropns( const string& ns ){ Top::global.collectionDropped( ns ); @@ -75,7 +83,7 @@ namespace mongo { dropCollection( ns , err , b ); } catch ( ... ){ - log() << "error dropping temp collection: " << ns << endl; + warning() << "error dropping temp collection: " << ns << endl; } } @@ -196,12 +204,18 @@ namespace mongo { if ( doauth ) _auth( lockState ); - if ( _client->_curOp->getOp() != dbGetMore ){ // getMore's are special and should be handled else where + switch ( _client->_curOp->getOp() ){ + case dbGetMore: // getMore's are special and should be handled else where + case dbUpdate: // update & delete check shard version in instance.cpp, so don't check here as well + case dbDelete: + break; + default: { string errmsg; - if ( ! shardVersionOk( _ns , errmsg ) ){ + if ( ! shardVersionOk( _ns , lockState > 0 , errmsg ) ){ msgasserted( StaleConfigInContextCode , (string)"[" + _ns + "] shard version not ok in Client::Context: " + errmsg ); } } + } } void Client::Context::_auth( int lockState ){ @@ -237,7 +251,7 @@ namespace mongo { string sayClientState(){ Client* c = currentClient.get(); - if ( ! c ) + if ( !c ) return "no client"; return c->toString(); } @@ -259,6 +273,15 @@ namespace mongo { } } + CurOp::~CurOp(){ + if ( _wrapped ){ + scoped_lock bl(Client::clientsMutex); + _client->_curOp = _wrapped; + } + + _client = 0; + } + BSONObj CurOp::query( bool threadSafe ) { if( querySize() == 1 ) { return _tooBig; diff --git a/db/client.h b/db/client.h index 2456d7f..d0600e3 100644 --- a/db/client.h +++ b/db/client.h @@ -29,27 +29,33 @@ #include "namespace.h" #include "lasterror.h" #include "stats/top.h" -//#include "repl/rs.h" namespace mongo { extern class ReplSet *theReplSet; - class AuthenticationInfo; class Database; class CurOp; class Command; class Client; + class MessagingPort; extern boost::thread_specific_ptr<Client> currentClient; class Client : boost::noncopyable { public: + static Client *syncThread; + void iAmSyncThread() { + wassert( syncThread == 0 ); + syncThread = this; + } + bool isSyncThread() const { return this == syncThread; } // true if this client is the replication secondary pull thread + static mongo::mutex clientsMutex; static set<Client*> clients; // always be in clientsMutex when manipulating this - static int recommendedYieldMicros( int * writers = 0 , int * readers = 0 ); + /* set _god=true temporarily, safely */ class GodScope { bool _prev; public: @@ -148,9 +154,11 @@ namespace mongo { } friend class CurOp; - }; + }; // class Client::Context private: + void _dropns( const string& ns ); + CurOp * _curOp; Context * _context; bool _shutdown; @@ -162,9 +170,9 @@ namespace mongo { BSONObj _handshake; BSONObj _remoteId; - void _dropns( const string& ns ); - public: + MessagingPort * const _mp; + string clientAddress() const; AuthenticationInfo * getAuthenticationInfo(){ return &_ai; } bool isAdmin() { return _ai.isAuthorized( "admin" ); } @@ -174,24 +182,19 @@ namespace mongo { const char *ns() const { return _context->ns(); } const char *desc() const { return _desc; } - Client(const char *desc); + Client(const char *desc, MessagingPort *p = 0); ~Client(); void addTempCollection( const string& ns ); void _invalidateDB(const string& db); static void invalidateDB(const string& db); - static void invalidateNS( const string& ns ); - void setLastOp( ReplTime op ) { - _lastOp = op; - } - - ReplTime getLastOp() const { - return _lastOp; - } + void setLastOp( ReplTime op ) { _lastOp = op; } + ReplTime getLastOp() const { return _lastOp; } + /* report what the last operation was. used by getlasterror */ void appendLastOp( BSONObjBuilder& b ) { if( theReplSet ) { b.append("lastOp" , (long long) _lastOp); @@ -206,14 +209,13 @@ namespace mongo { /* each thread which does db operations has a Client object in TLS. call this when your thread starts. */ - static void initThread(const char *desc); + static Client& initThread(const char *desc, MessagingPort *mp = 0); /* this has to be called as the client goes away, but before thread termination @return true if anything was done */ bool shutdown(); - /* this is for map/reduce writes */ bool isGod() const { return _god; } @@ -221,13 +223,12 @@ namespace mongo { friend class CurOp; string toString() const; - void gotHandshake( const BSONObj& o ); - BSONObj getRemoteID() const { return _remoteId; } BSONObj getHandshake() const { return _handshake; } }; + /** get the Client object for this thread. */ inline Client& cc() { Client * c = currentClient.get(); assert( c ); @@ -237,11 +238,13 @@ namespace mongo { /* each thread which does db operations has a Client object in TLS. call this when your thread starts. */ - inline void Client::initThread(const char *desc) { + inline Client& Client::initThread(const char *desc, MessagingPort *mp) { setThreadName(desc); assert( currentClient.get() == 0 ); - currentClient.reset( new Client(desc) ); + Client *c = new Client(desc, mp); + currentClient.reset(c); mongo::lastError.initThread(); + return *c; } inline Client::GodScope::GodScope(){ @@ -276,8 +279,5 @@ namespace mongo { string sayClientState(); - inline bool haveClient(){ - return currentClient.get() > 0; - } + inline bool haveClient() { return currentClient.get() > 0; } }; - diff --git a/db/clientcursor.h b/db/clientcursor.h index 6f79dcf..b895c17 100644 --- a/db/clientcursor.h +++ b/db/clientcursor.h @@ -138,17 +138,18 @@ namespace mongo { /*const*/ CursorId cursorid; const string ns; const shared_ptr<Cursor> c; - int pos; // # objects into the cursor so far - BSONObj query; + int pos; // # objects into the cursor so far + const BSONObj query; // used for logging diags only; optional in constructor const int _queryOptions; // see enum QueryOptions dbclient.h OpTime _slaveReadTill; Database * const _db; - ClientCursor(int queryOptions, shared_ptr<Cursor>& _c, const string& _ns) : + ClientCursor(int queryOptions, shared_ptr<Cursor>& _c, const string& _ns, BSONObj _query = BSONObj()) : _idleAgeMillis(0), _pinValue(0), _doingDeletes(false), _yieldSometimesTracker(128,10), ns(_ns), c(_c), - pos(0), _queryOptions(queryOptions), + pos(0), query(_query), + _queryOptions(queryOptions), _db( cc().database() ) { assert( _db ); diff --git a/db/cloner.cpp b/db/cloner.cpp index 96890bf..9177a00 100644 --- a/db/cloner.cpp +++ b/db/cloner.cpp @@ -49,7 +49,7 @@ namespace mongo { void setConnection( DBClientWithCommands *c ) { conn.reset( c ); } bool go(const char *masterHost, string& errmsg, const string& fromdb, bool logForRepl, bool slaveOk, bool useReplAuth, bool snapshot); - bool copyCollection( const string& from , const string& ns , const BSONObj& query , string& errmsg , bool copyIndexes = true ); + bool copyCollection( const string& from , const string& ns , const BSONObj& query , string& errmsg , bool copyIndexes = true, bool logForRepl = true ); }; /* for index info object: @@ -198,12 +198,12 @@ namespace mongo { } } - bool copyCollectionFromRemote(const string& host, const string& ns, const BSONObj& query, string errmsg) { + bool copyCollectionFromRemote(const string& host, const string& ns, const BSONObj& query, string& errmsg, bool logForRepl) { Cloner c; - return c.copyCollection(host, ns, query, errmsg , /*copyIndexes*/ true); + return c.copyCollection(host, ns, query, errmsg , /*copyIndexes*/ true, logForRepl); } - bool Cloner::copyCollection( const string& from , const string& ns , const BSONObj& query , string& errmsg , bool copyIndexes ){ + bool Cloner::copyCollection( const string& from , const string& ns , const BSONObj& query , string& errmsg , bool copyIndexes, bool logForRepl ) { auto_ptr<DBClientConnection> myconn; myconn.reset( new DBClientConnection() ); if ( ! myconn->connect( from , errmsg ) ) @@ -223,12 +223,17 @@ namespace mongo { } { // main data - copy( ns.c_str() , ns.c_str() , false , true , false , true , Query(query).snapshot() ); + copy( ns.c_str() , ns.c_str() , /*isindex*/false , logForRepl , false , true , Query(query).snapshot() ); } + /* TODO : copyIndexes bool does not seem to be implemented! */ + if( !copyIndexes ) { + log() << "ERROR copy collection copyIndexes not implemented? " << ns << endl; + } + { // indexes string temp = ctx.db()->name + ".system.indexes"; - copy( temp.c_str() , temp.c_str() , true , true , false , true , BSON( "ns" << ns ) ); + copy( temp.c_str() , temp.c_str() , /*isindex*/true , logForRepl , false , true , BSON( "ns" << ns ) ); } return true; } diff --git a/db/commands.h b/db/commands.h index eea4a71..a8a61c4 100644 --- a/db/commands.h +++ b/db/commands.h @@ -85,9 +85,7 @@ namespace mongo { Note if run() returns false, we do NOT log. */ - virtual bool logTheOp() { - return false; - } + virtual bool logTheOp() { return false; } virtual void help( stringstream& help ) const; @@ -222,10 +222,7 @@ namespace mongo { memset(_queryBuf, 0, sizeof(_queryBuf)); } - ~CurOp(){ - if ( _wrapped ) - _client->_curOp = _wrapped; - } + ~CurOp(); BSONObj info() { if( ! cc().getAuthenticationInfo()->isAuthorized("admin") ) { @@ -67,7 +67,6 @@ namespace mongo { #endif void setupSignals(); - void closeAllSockets(); void startReplSets(ReplSetCmdline*); void startReplication(); void pairWith(const char *remoteEnd, const char *arb); @@ -104,7 +103,7 @@ namespace mongo { virtual void accepted(MessagingPort *mp) { if ( ! connTicketHolder.tryAcquire() ){ - log() << "connection refused because too many open connections: " << connTicketHolder.used() << endl; + log() << "connection refused because too many open connections: " << connTicketHolder.used() << " of " << connTicketHolder.outof() << endl; // TODO: would be nice if we notified them... mp->shutdown(); delete mp; @@ -207,16 +206,14 @@ namespace mongo { void connThread( MessagingPort * inPort ) { TicketHolderReleaser connTicketReleaser( &connTicketHolder ); - Client::initThread("conn"); /* todo: move to Client object */ LastError *le = new LastError(); lastError.reset(le); + inPort->_logLevel = 1; auto_ptr<MessagingPort> dbMsgPort( inPort ); - - dbMsgPort->_logLevel = 1; - Client& c = cc(); + Client& c = Client::initThread("conn", inPort); try { @@ -522,7 +519,7 @@ sendmore: l << ( is32bit ? " 32" : " 64" ) << "-bit " << endl; } DEV log() << "_DEBUG build (which is slower)" << endl; - show_32_warning(); + show_warnings(); log() << mongodVersion() << endl; printGitVersion(); printSysInfo(); @@ -629,7 +626,7 @@ using namespace mongo; namespace po = boost::program_options; void show_help_text(po::options_description options) { - show_32_warning(); + show_warnings(); cout << options << endl; }; @@ -1111,9 +1108,6 @@ int main(int argc, char* argv[], char *envp[] ) namespace mongo { - /* we do not use log() below as it uses a mutex and that could cause deadlocks. - */ - string getDbContext(); #undef out diff --git a/db/db.vcxproj b/db/db.vcxproj index 61f81fe..0cabbd0 100644 --- a/db/db.vcxproj +++ b/db/db.vcxproj @@ -574,14 +574,26 @@ <ClCompile Include="repl\rs_config.cpp" />
</ItemGroup>
<ItemGroup>
- <None Include="..\jstests\rs\rs_basic.js" />
- <None Include="..\jstests\rs\test_framework.js" />
+ <None Include="..\jstests\replsets\replset1.js" />
+ <None Include="..\jstests\replsets\replset2.js" />
+ <None Include="..\jstests\replsets\replset3.js" />
+ <None Include="..\jstests\replsets\replset4.js" />
+ <None Include="..\jstests\replsets\replset5.js" />
+ <None Include="..\jstests\replsets\replsetadd.js" />
+ <None Include="..\jstests\replsets\replsetarb1.js" />
+ <None Include="..\jstests\replsets\replsetarb2.js" />
+ <None Include="..\jstests\replsets\replsetprio1.js" />
+ <None Include="..\jstests\replsets\replsetrestart1.js" />
+ <None Include="..\jstests\replsets\replsetrestart2.js" />
+ <None Include="..\jstests\replsets\replset_remove_node.js" />
+ <None Include="..\jstests\replsets\rollback.js" />
+ <None Include="..\jstests\replsets\rollback2.js" />
+ <None Include="..\jstests\replsets\sync1.js" />
+ <None Include="..\jstests\replsets\twosets.js" />
<None Include="..\SConstruct" />
<None Include="..\util\mongoutils\README" />
<None Include="mongo.ico" />
<None Include="repl\notes.txt" />
- <None Include="repl\test.html" />
- <None Include="repl\testing.js" />
</ItemGroup>
<ItemGroup>
<ClInclude Include="..\client\dbclientcursor.h" />
diff --git a/db/db.vcxproj.filters b/db/db.vcxproj.filters index f8d72a6..bf30b4e 100755 --- a/db/db.vcxproj.filters +++ b/db/db.vcxproj.filters @@ -840,7 +840,7 @@ <Filter Include="replSets">
<UniqueIdentifier>{3b73f786-d352-446f-a5f5-df49384baf7a}</UniqueIdentifier>
</Filter>
- <Filter Include="replSets\test stuff">
+ <Filter Include="replSets\testing">
<UniqueIdentifier>{4a1ea357-1077-4ad1-85b4-db48a6e1eb46}</UniqueIdentifier>
</Filter>
</ItemGroup>
@@ -851,24 +851,60 @@ <None Include="..\util\mongoutils\README">
<Filter>util\mongoutils</Filter>
</None>
- <None Include="repl\testing.js">
- <Filter>replSets\test stuff</Filter>
- </None>
- <None Include="repl\test.html">
- <Filter>replSets\test stuff</Filter>
- </None>
<None Include="..\SConstruct">
<Filter>db</Filter>
</None>
- <None Include="..\jstests\rs\rs_basic.js">
- <Filter>replSets\test stuff</Filter>
- </None>
- <None Include="..\jstests\rs\test_framework.js">
- <Filter>replSets\test stuff</Filter>
- </None>
<None Include="mongo.ico">
<Filter>Resource Files</Filter>
</None>
+ <None Include="..\jstests\replsets\replset_remove_node.js">
+ <Filter>replSets\testing</Filter>
+ </None>
+ <None Include="..\jstests\replsets\replset1.js">
+ <Filter>replSets\testing</Filter>
+ </None>
+ <None Include="..\jstests\replsets\replset2.js">
+ <Filter>replSets\testing</Filter>
+ </None>
+ <None Include="..\jstests\replsets\replset3.js">
+ <Filter>replSets\testing</Filter>
+ </None>
+ <None Include="..\jstests\replsets\replset4.js">
+ <Filter>replSets\testing</Filter>
+ </None>
+ <None Include="..\jstests\replsets\replset5.js">
+ <Filter>replSets\testing</Filter>
+ </None>
+ <None Include="..\jstests\replsets\replsetadd.js">
+ <Filter>replSets\testing</Filter>
+ </None>
+ <None Include="..\jstests\replsets\replsetarb1.js">
+ <Filter>replSets\testing</Filter>
+ </None>
+ <None Include="..\jstests\replsets\replsetarb2.js">
+ <Filter>replSets\testing</Filter>
+ </None>
+ <None Include="..\jstests\replsets\replsetprio1.js">
+ <Filter>replSets\testing</Filter>
+ </None>
+ <None Include="..\jstests\replsets\replsetrestart1.js">
+ <Filter>replSets\testing</Filter>
+ </None>
+ <None Include="..\jstests\replsets\replsetrestart2.js">
+ <Filter>replSets\testing</Filter>
+ </None>
+ <None Include="..\jstests\replsets\rollback.js">
+ <Filter>replSets\testing</Filter>
+ </None>
+ <None Include="..\jstests\replsets\rollback2.js">
+ <Filter>replSets\testing</Filter>
+ </None>
+ <None Include="..\jstests\replsets\sync1.js">
+ <Filter>replSets\testing</Filter>
+ </None>
+ <None Include="..\jstests\replsets\twosets.js">
+ <Filter>replSets\testing</Filter>
+ </None>
</ItemGroup>
<ItemGroup>
<Library Include="..\..\js\js64r.lib">
diff --git a/db/db_10.sln b/db/db_10.sln index 0aa382f..d68d897 100644 --- a/db/db_10.sln +++ b/db/db_10.sln @@ -12,7 +12,6 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "tools", "tools", "{2B262D59 ..\tools\bsondump.cpp = ..\tools\bsondump.cpp
..\tools\dump.cpp = ..\tools\dump.cpp
..\tools\export.cpp = ..\tools\export.cpp
- ..\tools\files.cpp = ..\tools\files.cpp
..\tools\import.cpp = ..\tools\import.cpp
..\tools\restore.cpp = ..\tools\restore.cpp
..\tools\sniffer.cpp = ..\tools\sniffer.cpp
@@ -29,9 +28,6 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "unix files", "unix files", EndProjectSection
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "shell", "shell", "{407B4B88-3451-433C-B74F-31B31FEB5791}"
- ProjectSection(SolutionItems) = preProject
- ..\shell\utils.h = ..\shell\utils.h
- EndProjectSection
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "other", "other", "{12B11474-2D74-48C3-BB3D-F03249BEA88F}"
EndProject
diff --git a/db/dbcommands.cpp b/db/dbcommands.cpp index 22b0457..96374d9 100644 --- a/db/dbcommands.cpp +++ b/db/dbcommands.cpp @@ -74,6 +74,14 @@ namespace mongo { } } cmdResetError; + /* set by replica sets if specified in the configuration. + a pointer is used to avoid any possible locking issues with lockless reading (see below locktype() is NONE + and would like to keep that) + (for now, it simply orphans any old copy as config changes should be extremely rare). + note: once non-null, never goes to null again. + */ + BSONObj *getLastErrorDefault = 0; + class CmdGetLastError : public Command { public: virtual LockType locktype() const { return NONE; } @@ -88,7 +96,7 @@ namespace mongo { help << "return error status of the last operation on this connection"; } CmdGetLastError() : Command("getLastError", false, "getlasterror") {} - bool run(const string& dbnamne, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl) { + bool run(const string& dbnamne, BSONObj& _cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl) { LastError *le = lastError.disableForCommand(); if ( le->nPrev != 1 ) LastError::noError.appendSelf( result ); @@ -98,6 +106,18 @@ namespace mongo { Client& c = cc(); c.appendLastOp( result ); + BSONObj cmdObj = _cmdObj; + { + BSONObj::iterator i(_cmdObj); + i.next(); + if( !i.more() ) { + /* empty, use default */ + BSONObj *def = getLastErrorDefault; + if( def ) + cmdObj = *def; + } + } + if ( cmdObj["fsync"].trueValue() ){ log() << "fsync from getlasterror" << endl; result.append( "fsyncFiles" , MemoryMappedFile::flushAll( true ) ); @@ -389,8 +409,11 @@ namespace mongo { if ( ! authed ) result.append( "note" , "run against admin for more info" ); - if ( Listener::getElapsedTimeMillis() - start > 1000 ) - result.append( "timing" , timeBuilder.obj() ); + if ( Listener::getElapsedTimeMillis() - start > 1000 ){ + BSONObj t = timeBuilder.obj(); + log() << "serverStatus was very slow: " << t << endl; + result.append( "timing" , t ); + } return true; } @@ -690,12 +713,8 @@ namespace mongo { class CmdReIndex : public Command { public: - virtual bool logTheOp() { - return true; - } - virtual bool slaveOk() const { - return false; - } + virtual bool logTheOp() { return false; } // only reindexes on the one node + virtual bool slaveOk() const { return true; } // can reindex on a secondary virtual LockType locktype() const { return WRITE; } virtual void help( stringstream& help ) const { help << "re-index a collection"; @@ -745,9 +764,6 @@ namespace mongo { class CmdListDatabases : public Command { public: - virtual bool logTheOp() { - return false; - } virtual bool slaveOk() const { return true; } @@ -925,15 +941,34 @@ namespace mongo { "\nnote: This command may take a while to run"; } bool run(const string& dbname, BSONObj& jsobj, string& errmsg, BSONObjBuilder& result, bool fromRepl ){ + Timer timer; + string ns = jsobj.firstElement().String(); BSONObj min = jsobj.getObjectField( "min" ); BSONObj max = jsobj.getObjectField( "max" ); BSONObj keyPattern = jsobj.getObjectField( "keyPattern" ); + bool estimate = jsobj["estimate"].trueValue(); Client::Context ctx( ns ); + NamespaceDetails *d = nsdetails(ns.c_str()); + + if ( ! d || d->nrecords == 0 ){ + result.appendNumber( "size" , 0 ); + result.appendNumber( "numObjects" , 0 ); + result.append( "millis" , timer.millis() ); + return true; + } + result.appendBool( "estimate" , estimate ); + shared_ptr<Cursor> c; if ( min.isEmpty() && max.isEmpty() ) { + if ( estimate ){ + result.appendNumber( "size" , d->datasize ); + result.appendNumber( "numObjects" , d->nrecords ); + result.append( "millis" , timer.millis() ); + return 1; + } c = theDataFileMgr.findAll( ns.c_str() ); } else if ( min.isEmpty() || max.isEmpty() ) { @@ -944,18 +979,24 @@ namespace mongo { IndexDetails *idx = cmdIndexDetailsForRange( ns.c_str(), errmsg, min, max, keyPattern ); if ( idx == 0 ) return false; - NamespaceDetails *d = nsdetails(ns.c_str()); + c.reset( new BtreeCursor( d, d->idxNo(*idx), *idx, min, max, false, 1 ) ); } + long long avgObjSize = d->datasize / d->nrecords; + long long maxSize = jsobj["maxSize"].numberLong(); long long maxObjects = jsobj["maxObjects"].numberLong(); - Timer timer; long long size = 0; long long numObjects = 0; while( c->ok() ) { - size += c->currLoc().rec()->netLength(); + + if ( estimate ) + size += avgObjSize; + else + size += c->currLoc().rec()->netLength(); + numObjects++; if ( ( maxSize && size > maxSize ) || @@ -974,8 +1015,8 @@ namespace mongo { } logIfSlow( timer , os.str() ); - result.append( "size", (double)size ); - result.append( "numObjects" , (double)numObjects ); + result.appendNumber( "size", size ); + result.appendNumber( "numObjects" , numObjects ); result.append( "millis" , timer.millis() ); return true; } @@ -1555,9 +1596,6 @@ namespace mongo { class CmdWhatsMyUri : public Command { public: CmdWhatsMyUri() : Command("whatsmyuri") { } - virtual bool logTheOp() { - return false; // the modification will be logged directly - } virtual bool slaveOk() const { return true; } @@ -1693,12 +1731,8 @@ namespace mongo { public: virtual LockType locktype() const { return NONE; } virtual bool adminOnly() const { return true; } - virtual bool logTheOp() { - return false; - } - virtual bool slaveOk() const { - return true; - } + virtual bool logTheOp() { return false; } + virtual bool slaveOk() const { return true; } virtual void help( stringstream& help ) const { help << "internal testing command. Makes db block (in a read lock) for 100 seconds\n"; help << "w:true write lock"; @@ -1798,7 +1832,7 @@ namespace mongo { if ( c->adminOnly() && ! fromRepl && dbname != "admin" ) { - result.append( "errmsg" , "access denied- use admin db" ); + result.append( "errmsg" , "access denied; use admin db" ); log() << "command denied: " << cmdObj.toString() << endl; return false; } diff --git a/db/dbcommands_generic.cpp b/db/dbcommands_generic.cpp index 340f31c..25c6a93 100644 --- a/db/dbcommands_generic.cpp +++ b/db/dbcommands_generic.cpp @@ -196,8 +196,9 @@ namespace mongo { CmdShutdown() : Command("shutdown") {} bool run(const string& dbname, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl) { Client * c = currentClient.get(); - if ( c ) + if ( c ) { c->shutdown(); + } log() << "terminating, shutdown command received" << endl; dbexit( EXIT_CLEAN ); // this never returns return true; diff --git a/db/instance.cpp b/db/instance.cpp index ec3b793..9e81464 100644 --- a/db/instance.cpp +++ b/db/instance.cpp @@ -62,7 +62,6 @@ namespace mongo { bool useCursors = true; bool useHints = true; - void closeAllSockets(); void flushOpLog( stringstream &ss ) { if( _diaglog.f && _diaglog.f->is_open() ) { ss << "flushing op log and files\n"; @@ -332,9 +331,10 @@ namespace mongo { } } catch ( AssertionException& e ) { - tlog() << " Caught Assertion in " << opToString(op) << " , continuing" << endl; + static int n; + tlog(3) << " Caught Assertion in " << opToString(op) << ", continuing" << endl; ss << " exception " + e.toString(); - log = true; + log = ++n < 10; } } } @@ -446,6 +446,7 @@ namespace mongo { mongolock lk(1); + // if this ever moves to outside of lock, need to adjust check Client::Context::_finishInit if ( ! broadcast && handlePossibleShardedMessage( m , 0 ) ) return; @@ -472,8 +473,10 @@ namespace mongo { } writelock lk(ns); + // if this ever moves to outside of lock, need to adjust check Client::Context::_finishInit if ( ! broadcast & handlePossibleShardedMessage( m , 0 ) ) return; + Client::Context ctx(ns); long long n = deleteObjects(ns, pattern, justOne, true); @@ -709,32 +712,32 @@ namespace mongo { } catch (...) { } - tryToOutputFatal( "dbexit: really exiting now\n" ); + tryToOutputFatal( "dbexit: really exiting now" ); if ( c ) c->shutdown(); ::exit(rc); } void shutdown() { - log() << "\t shutdown: going to close listening sockets..." << endl; + log() << "shutdown: going to close listening sockets..." << endl; ListeningSockets::get()->closeAll(); - log() << "\t shutdown: going to flush oplog..." << endl; + log() << "shutdown: going to flush oplog..." << endl; stringstream ss2; flushOpLog( ss2 ); rawOut( ss2.str() ); /* must do this before unmapping mem or you may get a seg fault */ - log() << "\t shutdown: going to close sockets..." << endl; - boost::thread close_socket_thread(closeAllSockets); + log() << "shutdown: going to close sockets..." << endl; + boost::thread close_socket_thread( boost::bind(MessagingPort::closeAllSockets, 0) ); // wait until file preallocation finishes // we would only hang here if the file_allocator code generates a // synchronous signal, which we don't expect - log() << "\t shutdown: waiting for fs preallocator..." << endl; + log() << "shutdown: waiting for fs preallocator..." << endl; theFileAllocator().waitUntilFinished(); - log() << "\t shutdown: closing all files..." << endl; + log() << "shutdown: closing all files..." << endl; stringstream ss3; MemoryMappedFile::closeAllFiles( ss3 ); rawOut( ss3.str() ); @@ -744,9 +747,9 @@ namespace mongo { #if !defined(_WIN32) && !defined(__sunos__) if ( lockFile ){ - log() << "\t shutdown: removing fs lock..." << endl; + log() << "shutdown: removing fs lock..." << endl; if( ftruncate( lockFile , 0 ) ) - log() << "\t couldn't remove fs lock " << errnoWithDescription() << endl; + log() << "couldn't remove fs lock " << errnoWithDescription() << endl; flock( lockFile, LOCK_UN ); } #endif @@ -766,12 +769,14 @@ namespace mongo { bool oldFile = false; - if ( boost::filesystem::exists( name ) && boost::filesystem::file_size( name ) > 0 ){ + if ( boost::filesystem::exists( name ) && boost::filesystem::file_size( name ) > 0 ) { oldFile = true; } lockFile = open( name.c_str(), O_RDWR | O_CREAT , S_IRWXU | S_IRWXG | S_IRWXO ); - uassert( 10309 , "Unable to create / open lock file for lockfilepath: " + name, lockFile > 0 ); + if( lockFile <= 0 ) { + uasserted( 10309 , str::stream() << "Unable to create / open lock file for lockfilepath: " << name << ' ' << errnoWithDescription()); + } if (flock( lockFile, LOCK_EX | LOCK_NB ) != 0) { close ( lockFile ); lockFile = 0; diff --git a/db/lasterror.cpp b/db/lasterror.cpp index 630fcfb..12fc694 100644 --- a/db/lasterror.cpp +++ b/db/lasterror.cpp @@ -34,8 +34,11 @@ namespace mongo { void raiseError(int code , const char *msg) { LastError *le = lastError.get(); if ( le == 0 ) { - /* might be intentional (non-user thread) */ - OCCASIONALLY DEV if( !isShell ) log() << "warning dev: lastError==0 won't report:" << msg << endl; + /* might be intentional (non-user thread) */ + DEV { + static unsigned n; + if( ++n < 4 && !isShell ) log() << "dev: lastError==0 won't report:" << msg << endl; + } } else if ( le->disabled ) { log() << "lastError disabled, can't report: " << code << ":" << msg << endl; } else { diff --git a/db/matcher.cpp b/db/matcher.cpp index 681a6dc..cd62563 100644 --- a/db/matcher.cpp +++ b/db/matcher.cpp @@ -259,7 +259,7 @@ namespace mongo { } void Matcher::parseOr( const BSONElement &e, bool subMatcher, list< shared_ptr< Matcher > > &matchers ) { - uassert( 13090, "recursive $or/$nor not allowed", !subMatcher ); + uassert( 13090, "nested $or/$nor not allowed", !subMatcher ); uassert( 13086, "$or/$nor must be a nonempty array", e.type() == Array && e.embeddedObject().nFields() > 0 ); BSONObjIterator j( e.embeddedObject() ); while( j.more() ) { diff --git a/db/oplog.cpp b/db/oplog.cpp index 4ad4ca9..93800c7 100644 --- a/db/oplog.cpp +++ b/db/oplog.cpp @@ -142,8 +142,10 @@ namespace mongo { this code (or code in now() maybe) should be improved. */ if( theReplSet ) { - if( !(theReplSet->lastOpTimeWritten<ts) ) - log() << "replSet ERROR possible failover clock skew issue? " << theReplSet->lastOpTimeWritten << ' ' << ts << endl; + if( !(theReplSet->lastOpTimeWritten<ts) ) { + log() << "replSet ERROR possible failover clock skew issue? " << theReplSet->lastOpTimeWritten << ' ' << ts << rsLog; + log() << "replSet " << theReplSet->isPrimary() << rsLog; + } theReplSet->lastOpTimeWritten = ts; theReplSet->lastH = hNew; ctx.getClient()->setLastOp( ts.asDate() ); @@ -195,7 +195,7 @@ namespace mongo { // Use a ClientCursor here so we can release db mutex while scanning // oplog (can take quite a while with large oplogs). shared_ptr<Cursor> c = _qp.newReverseCursor(); - _findingStartCursor = new ClientCursor(QueryOption_NoCursorTimeout, c, _qp.ns()); + _findingStartCursor = new ClientCursor(QueryOption_NoCursorTimeout, c, _qp.ns(), BSONObj()); _findingStartTimer.reset(); _findingStartMode = Initial; BSONElement tsElt = _qp.originalQuery()[ "ts" ]; diff --git a/db/pdfile.cpp b/db/pdfile.cpp index cf7cb22..216f21a 100644 --- a/db/pdfile.cpp +++ b/db/pdfile.cpp @@ -56,8 +56,6 @@ namespace mongo { } }; - const int MaxExtentSize = 0x7ff00000; - map<string, unsigned> BackgroundOperation::dbsInProg; set<string> BackgroundOperation::nsInProg; @@ -157,7 +155,7 @@ namespace mongo { sz = 1000000000; int z = ((int)sz) & 0xffffff00; assert( z > len ); - DEV tlog() << "initialExtentSize(" << len << ") returns " << z << endl; + //DEV tlog() << "initialExtentSize(" << len << ") returns " << z << endl; return z; } @@ -272,10 +270,19 @@ namespace mongo { /*---------------------------------------------------------------------*/ int MongoDataFile::maxSize() { - if ( sizeof( int* ) == 4 ) + if ( sizeof( int* ) == 4 ) { return 512 * 1024 * 1024; - else + } else if ( cmdLine.smallfiles ) { + return 0x7ff00000 >> 2; + } else { return 0x7ff00000; + } + } + + void MongoDataFile::badOfs(int ofs) const { + stringstream ss; + ss << "bad offset:" << ofs << " accessing file: " << mmf.filename() << " - consider repairing database"; + uasserted(13440, ss.str()); } int MongoDataFile::defaultSize( const char *filename ) const { @@ -380,7 +387,7 @@ namespace mongo { Extent* MongoDataFile::createExtent(const char *ns, int approxSize, bool newCapped, int loops) { massert( 10357 , "shutdown in progress", !goingAway ); - massert( 10358 , "bad new extent size", approxSize >= 0 && approxSize <= MaxExtentSize ); + massert( 10358 , "bad new extent size", approxSize >= 0 && approxSize <= Extent::maxSize() ); massert( 10359 , "header==0 on new extent: 32 bit mmap space exceeded?", header ); // null if file open failed int ExtentSize = approxSize <= header->unusedLength ? approxSize : header->unusedLength; DiskLoc loc; @@ -403,8 +410,8 @@ namespace mongo { addNewExtentToNamespace(ns, e, loc, emptyLoc, newCapped); - DEV tlog() << "new extent " << ns << " size: 0x" << hex << ExtentSize << " loc: 0x" << hex << offset - << " emptyLoc:" << hex << emptyLoc.getOfs() << dec << endl; + DEV tlog(1) << "new extent " << ns << " size: 0x" << hex << ExtentSize << " loc: 0x" << hex << offset + << " emptyLoc:" << hex << emptyLoc.getOfs() << dec << endl; return e; } @@ -568,6 +575,14 @@ namespace mongo { } */ + int Extent::maxSize() { + int maxExtentSize = 0x7ff00000; + if ( cmdLine.smallfiles ) { + maxExtentSize >>= 2; + } + return maxExtentSize; + } + /*---------------------------------------------------------------------*/ shared_ptr<Cursor> DataFileMgr::findAll(const char *ns, const DiskLoc &startLoc) { @@ -897,7 +912,7 @@ namespace mongo { if ( toupdate->netLength() < objNew.objsize() ) { // doesn't fit. reallocate ----------------------------------------------------- - uassert( 10003 , "E10003 failing update: objects in a capped ns cannot grow", !(d && d->capped)); + uassert( 10003 , "failing update: objects in a capped ns cannot grow", !(d && d->capped)); d->paddingTooSmall(); if ( cc().database()->profile ) ss << " moved "; @@ -950,15 +965,15 @@ namespace mongo { } int followupExtentSize(int len, int lastExtentLen) { - assert( len < MaxExtentSize ); + assert( len < Extent::maxSize() ); int x = initialExtentSize(len); int y = (int) (lastExtentLen < 4000000 ? lastExtentLen * 4.0 : lastExtentLen * 1.2); int sz = y > x ? y : x; if ( sz < lastExtentLen ) sz = lastExtentLen; - else if ( sz > MaxExtentSize ) - sz = MaxExtentSize; + else if ( sz > Extent::maxSize() ) + sz = Extent::maxSize(); sz = ((int)sz) & 0xffffff00; assert( sz > len ); @@ -1029,7 +1044,7 @@ namespace mongo { Timer t; - tlog() << "Buildindex " << ns << " idxNo:" << idxNo << ' ' << idx.info.obj().toString() << endl; + tlog(1) << "fastBuildIndex " << ns << " idxNo:" << idxNo << ' ' << idx.info.obj().toString() << endl; bool dupsAllowed = !idx.unique(); bool dropDups = idx.dropDups() || inDBRepair; @@ -1514,6 +1529,13 @@ namespace mongo { BSONObj info = loc.obj(); bool background = info["background"].trueValue(); + if( background && cc().isSyncThread() ) { + /* don't do background indexing on slaves. there are nuances. this could be added later + but requires more code. + */ + log() << "info: indexing in foreground on this replica; was a background index build on the primary" << endl; + background = false; + } int idxNo = tableToIndex->nIndexes; IndexDetails& idx = tableToIndex->addIndex(tabletoidxns.c_str(), !background); // clear transient info caches so they refresh; increments nIndexes diff --git a/db/pdfile.h b/db/pdfile.h index 084a542..d268aac 100644 --- a/db/pdfile.h +++ b/db/pdfile.h @@ -79,6 +79,8 @@ namespace mongo { void flush( bool sync ); private: + void badOfs(int) const; + int defaultSize( const char *filename ) const; Extent* getExtent(DiskLoc loc); @@ -255,6 +257,8 @@ namespace mongo { Extent* getPrevExtent() { return xprev.isNull() ? 0 : DataFileMgr::getExtent(xprev); } + + static int maxSize(); }; /* @@ -339,7 +343,7 @@ namespace mongo { inline Record* MongoDataFile::recordAt(DiskLoc dl) { int ofs = dl.getOfs(); - assert( ofs >= DataFileHeader::HeaderSize ); + if( ofs < DataFileHeader::HeaderSize ) badOfs(ofs); // will uassert - external call to keep out of the normal code path return (Record*) _p.at(ofs, -1); } diff --git a/db/query.cpp b/db/query.cpp index bfe845c..5bd7b00 100644 --- a/db/query.cpp +++ b/db/query.cpp @@ -1044,14 +1044,13 @@ namespace mongo { if ( moreClauses ) { // this MultiCursor will use a dumb NoOp to advance(), so no need to specify mayYield shared_ptr< Cursor > multi( new MultiCursor( mps, cursor, dqo.matcher(), dqo ) ); - cc = new ClientCursor(queryOptions, multi, ns); + cc = new ClientCursor(queryOptions, multi, ns, jsobj.getOwned()); } else { cursor->setMatcher( dqo.matcher() ); - cc = new ClientCursor( queryOptions, cursor, ns ); + cc = new ClientCursor( queryOptions, cursor, ns, jsobj.getOwned() ); } cursorid = cc->cursorid; - cc->query = jsobj.getOwned(); - DEV tlog() << "query has more, cursorid: " << cursorid << endl; + DEV tlog(2) << "query has more, cursorid: " << cursorid << endl; cc->pos = n; cc->pq = pq_shared; cc->fields = pq.getFieldPtr(); diff --git a/db/queryoptimizer.cpp b/db/queryoptimizer.cpp index 3d4cbd0..e7068c2 100644 --- a/db/queryoptimizer.cpp +++ b/db/queryoptimizer.cpp @@ -361,7 +361,7 @@ namespace mongo { const IndexSpec& spec = ii.getSpec(); if ( spec.getTypeName() == _special && spec.suitability( _originalQuery , order_ ) ){ usingPrerecordedPlan_ = true; - mayRecordPlan_ = true; + mayRecordPlan_ = false; plans_.push_back( PlanPtr( new QueryPlan( d , j , *fbs_ , _originalQuery, order_ , BSONObj() , BSONObj() , _special ) ) ); return; diff --git a/db/queryutil.cpp b/db/queryutil.cpp index 791096f..007a1ce 100644 --- a/db/queryutil.cpp +++ b/db/queryutil.cpp @@ -944,6 +944,8 @@ namespace mongo { return ( l % 2 == 0 ); // if we're inside an interval } + // binary search for interval containing the specified element + // an even return value indicates that the element is contained within a valid interval int FieldRangeVector::matchingLowElement( const BSONElement &e, int i, bool forward ) const { int l = -1; int h = _ranges[ i ].intervals().size() * 2; @@ -1007,9 +1009,13 @@ namespace mongo { int FieldRangeVector::Iterator::advance( const BSONObj &curr ) { BSONObjIterator j( curr ); BSONObjIterator o( _v._keyPattern ); + // track first field for which we are not at the end of the valid values, + // since we may need to advance from the key prefix ending with this field int latestNonEndpoint = -1; + // iterate over fields to determine appropriate advance method for( int i = 0; i < (int)_i.size(); ++i ) { if ( i > 0 && !_v._ranges[ i - 1 ].intervals()[ _i[ i - 1 ] ].equality() ) { + // if last bound was inequality, we don't know anything about where we are for this field // TODO if possible avoid this certain cases when field in prev key is the same setMinus( i ); } @@ -1017,9 +1023,9 @@ namespace mongo { BSONElement oo = o.next(); bool reverse = ( ( oo.number() < 0 ) ^ ( _v._direction < 0 ) ); BSONElement jj = j.next(); - if ( _i[ i ] == -1 ) { + if ( _i[ i ] == -1 ) { // unknown position for this field, do binary search int l = _v.matchingLowElement( jj, i, !reverse ); - if ( l % 2 == 0 ) { + if ( l % 2 == 0 ) { // we are in a valid range for this field _i[ i ] = l / 2; int diff = (int)_v._ranges[ i ].intervals().size() - _i[ i ]; if ( diff > 1 ) { @@ -1031,7 +1037,8 @@ namespace mongo { } } continue; - } else { + } else { // not in a valid range for this field - determine if and how to advance + // check if we're after the last interval for this field if ( l == (int)_v._ranges[ i ].intervals().size() * 2 - 1 ) { if ( latestNonEndpoint == -1 ) { return -2; @@ -1053,7 +1060,11 @@ namespace mongo { } } bool first = true; + // _i[ i ] != -1, so we have a starting interval for this field + // which serves as a lower/equal bound on the first iteration - + // we advance from this interval to find a matching interval while( _i[ i ] < (int)_v._ranges[ i ].intervals().size() ) { + // compare to current interval's upper bound int x = _v._ranges[ i ].intervals()[ _i[ i ] ]._upper._bound.woCompare( jj, false ); if ( reverse ) { x = -x; @@ -1062,16 +1073,22 @@ namespace mongo { eq = true; break; } + // see if we're less than the upper bound if ( x > 0 ) { if ( i == 0 && first ) { - break; // the value of 1st field won't go backward + // the value of 1st field won't go backward, so don't check lower bound + // TODO maybe we can check first only? + break; } + // if it's an equality interval, don't need to compare separately to lower bound if ( !_v._ranges[ i ].intervals()[ _i[ i ] ].equality() ) { + // compare to current interval's lower bound x = _v._ranges[ i ].intervals()[ _i[ i ] ]._lower._bound.woCompare( jj, false ); if ( reverse ) { x = -x; } } + // if we're less than the lower bound, advance if ( x > 0 ) { setZero( i + 1 ); // skip to curr / i / nextbounds @@ -1084,17 +1101,20 @@ namespace mongo { break; } } + // we're above the upper bound, so try next interval and reset remaining fields ++_i[ i ]; setZero( i + 1 ); first = false; } int diff = (int)_v._ranges[ i ].intervals().size() - _i[ i ]; if ( diff > 1 || ( !eq && diff == 1 ) ) { + // check if we're not at the end of valid values for this field latestNonEndpoint = i; - } else if ( diff == 0 ) { + } else if ( diff == 0 ) { // check if we're past the last interval for this field if ( latestNonEndpoint == -1 ) { return -2; } + // more values possible, skip... setZero( latestNonEndpoint + 1 ); // skip to curr / latestNonEndpoint + 1 / superlative for( int j = latestNonEndpoint + 1; j < (int)_i.size(); ++j ) { diff --git a/db/repl.cpp b/db/repl.cpp index 37197ba..085ae64 100644 --- a/db/repl.cpp +++ b/db/repl.cpp @@ -137,9 +137,6 @@ namespace mongo { virtual bool adminOnly() const { return true; } - virtual bool logTheOp() { - return false; - } virtual LockType locktype() const { return WRITE; } void help(stringstream&h) const { h << "replace a node in a replica pair"; } CmdReplacePeer() : Command("replacePeer", false, "replacepeer") { } @@ -199,9 +196,6 @@ namespace mongo { virtual bool adminOnly() const { return true; } - virtual bool logTheOp() { - return false; - } virtual void help(stringstream& h) const { h << "internal"; } virtual LockType locktype() const { return WRITE; } CmdForceDead() : Command("forcedead") { } @@ -222,9 +216,7 @@ namespace mongo { virtual bool adminOnly() const { return true; } - virtual bool logTheOp() { - return false; - } + virtual bool logTheOp() { return false; } virtual LockType locktype() const { return WRITE; } void help(stringstream&h) const { h << "resync (from scratch) an out of date replica slave.\nhttp://www.mongodb.org/display/DOCS/Master+Slave"; } CmdResync() : Command("resync") { } @@ -861,7 +853,6 @@ namespace mongo { if( logLevel >= 6 ) // op.tostring is expensive so doing this check explicitly log(6) << "processing op: " << op << endl; - // skip no-op if( op.getStringField("op")[0] == 'n' ) return; @@ -1260,8 +1251,14 @@ namespace mongo { if ( tailing || initial ) { if ( initial ) log(1) << "repl: initial run\n"; - else - assert( syncedTo < nextOpTime ); + else { + if( !( syncedTo <= nextOpTime ) ) { + log() << "repl ASSERTION failed : syncedTo <= nextOpTime" << endl; + log() << "repl syncTo: " << syncedTo.toStringLong() << endl; + log() << "repl nextOpTime: " << nextOpTime.toStringLong() << endl; + assert(false); + } + } oplogReader.putBack( op ); // op will be processed in the loop below nextOpTime = OpTime(); // will reread the op below } @@ -1690,6 +1687,7 @@ namespace mongo { void replSlaveThread() { sleepsecs(1); Client::initThread("replslave"); + cc().iAmSyncThread(); { dblock lk; diff --git a/db/repl/consensus.cpp b/db/repl/consensus.cpp index 4eba17d..4044538 100644 --- a/db/repl/consensus.cpp +++ b/db/repl/consensus.cpp @@ -83,6 +83,17 @@ namespace mongo { return vUp * 2 > totalVotes(); } + bool Consensus::shouldRelinquish() const { + int vUp = rs._self->config().votes; + const long long T = rs.config().ho.heartbeatTimeoutMillis * rs.config().ho.heartbeatConnRetries; + for( Member *m = rs.head(); m; m=m->next() ) { + long long dt = m->hbinfo().timeDown(); + if( dt < T ) + vUp += m->config().votes; + } + return !( vUp * 2 > totalVotes() ); + } + static const int VETO = -10000; const time_t LeaseTime = 30; @@ -322,6 +333,7 @@ namespace mongo { void Consensus::electSelf() { assert( !rs.lockedByMe() ); assert( !rs.myConfig().arbiterOnly ); + assert( rs.myConfig().slaveDelay == 0 ); try { _electSelf(); } diff --git a/db/repl/health.cpp b/db/repl/health.cpp index b0be25f..72396fe 100644 --- a/db/repl/health.cpp +++ b/db/repl/health.cpp @@ -89,11 +89,13 @@ namespace mongo { } s << td(config().votes); { - string stateText = ReplSet::stateAsStr(state()); + string stateText = state().toString(); + if( _config.hidden ) + stateText += " (hidden)"; if( ok || stateText.empty() ) s << td(stateText); // text blank if we've never connected else - s << td( grey(str::stream() << "(was " << ReplSet::stateAsStr(state()) << ')', true) ); + s << td( grey(str::stream() << "(was " << state().toString() << ')', true) ); } s << td( grey(hbinfo().lastHeartbeatMsg,!ok) ); stringstream q; @@ -105,28 +107,30 @@ namespace mongo { s << td(""); s << _tr(); } - + string ReplSetImpl::stateAsHtml(MemberState s) { if( s.s == MemberState::RS_STARTUP ) return a("", "serving still starting up, or still trying to initiate the set", "STARTUP"); if( s.s == MemberState::RS_PRIMARY ) return a("", "this server thinks it is primary", "PRIMARY"); if( s.s == MemberState::RS_SECONDARY ) return a("", "this server thinks it is a secondary (slave mode)", "SECONDARY"); if( s.s == MemberState::RS_RECOVERING ) return a("", "recovering/resyncing; after recovery usually auto-transitions to secondary", "RECOVERING"); - if( s.s == MemberState::RS_FATAL ) return a("", "something bad has occurred and server is not completely offline with regard to the replica set. fatal error.", "RS_FATAL"); - if( s.s == MemberState::RS_STARTUP2 ) return a("", "loaded config, still determining who is primary", "RS_STARTUP2"); + if( s.s == MemberState::RS_FATAL ) return a("", "something bad has occurred and server is not completely offline with regard to the replica set. fatal error.", "FATAL"); + if( s.s == MemberState::RS_STARTUP2 ) return a("", "loaded config, still determining who is primary", "STARTUP2"); if( s.s == MemberState::RS_ARBITER ) return a("", "this server is an arbiter only", "ARBITER"); if( s.s == MemberState::RS_DOWN ) return a("", "member is down, slow, or unreachable", "DOWN"); + if( s.s == MemberState::RS_ROLLBACK ) return a("", "rolling back operations to get in sync", "ROLLBACK"); return ""; } - string ReplSetImpl::stateAsStr(MemberState s) { - if( s.s == MemberState::RS_STARTUP ) return "STARTUP"; - if( s.s == MemberState::RS_PRIMARY ) return "PRIMARY"; - if( s.s == MemberState::RS_SECONDARY ) return "SECONDARY"; - if( s.s == MemberState::RS_RECOVERING ) return "RECOVERING"; - if( s.s == MemberState::RS_FATAL ) return "FATAL"; - if( s.s == MemberState::RS_STARTUP2 ) return "STARTUP2"; - if( s.s == MemberState::RS_ARBITER ) return "ARBITER"; - if( s.s == MemberState::RS_DOWN ) return "DOWN"; + string MemberState::toString() const { + if( s == MemberState::RS_STARTUP ) return "STARTUP"; + if( s == MemberState::RS_PRIMARY ) return "PRIMARY"; + if( s == MemberState::RS_SECONDARY ) return "SECONDARY"; + if( s == MemberState::RS_RECOVERING ) return "RECOVERING"; + if( s == MemberState::RS_FATAL ) return "FATAL"; + if( s == MemberState::RS_STARTUP2 ) return "STARTUP2"; + if( s == MemberState::RS_ARBITER ) return "ARBITER"; + if( s == MemberState::RS_DOWN ) return "DOWN"; + if( s == MemberState::RS_ROLLBACK ) return "ROLLBACK"; return ""; } @@ -302,7 +306,7 @@ namespace mongo { td(ago(started)) << td("") << // last heartbeat td(ToString(_self->config().votes)) << - td(stateAsHtml(box.getState())); + td( stateAsHtml(box.getState()) + (_self->config().hidden?" (hidden)":"") ); s << td( _hbmsg ); stringstream q; q << "/_replSetOplog?" << _self->id(); diff --git a/db/repl/health.h b/db/repl/health.h index 8b1005e..645a3b5 100644 --- a/db/repl/health.h +++ b/db/repl/health.h @@ -27,7 +27,7 @@ namespace mongo { HealthOptions() { heartbeatSleepMillis = 2000; heartbeatTimeoutMillis = 10000; - heartbeatConnRetries = 3; + heartbeatConnRetries = 2; } bool isDefault() const { return *this == HealthOptions(); } diff --git a/db/repl/heartbeat.cpp b/db/repl/heartbeat.cpp index 78ce5d1..4f28897 100644 --- a/db/repl/heartbeat.cpp +++ b/db/repl/heartbeat.cpp @@ -40,6 +40,13 @@ namespace mongo { // hacky string *discoveredSeed = 0; + long long HeartbeatInfo::timeDown() const { + if( up() ) return 0; + if( downSince == 0 ) + return 0; // still waiting on first heartbeat + return jsTime() - downSince; + } + /* { replSetHeartbeat : <setname> } */ class CmdReplSetHeartbeat : public ReplSetCommand { public: @@ -55,6 +62,14 @@ namespace mongo { errmsg = "not running with --replSet"; return false; } + + /* we want to keep heartbeat connections open when relinquishing primary. tag them here. */ + { + MessagingPort *mp = cc()._mp; + if( mp ) + mp->tag |= 1; + } + if( cmdObj["pv"].Int() != 1 ) { errmsg = "incompatible replset protocol version"; return false; @@ -91,7 +106,7 @@ namespace mongo { result.append("set", theReplSet->name()); result.append("state", theReplSet->state().s); result.append("hbmsg", theReplSet->hbmsg()); - result.append("time", (int) time(0)); + result.append("time", (long long) time(0)); result.appendDate("opTime", theReplSet->lastOpTimeWritten.asDate()); int v = theReplSet->config().version; result.append("v", v); @@ -196,7 +211,12 @@ namespace mongo { static time_t last = 0; time_t now = time(0); - if( mem.changed(old) || now-last>4 ) { + bool changed = mem.changed(old); + if( changed ) { + if( old.hbstate != mem.hbstate ) + log() << "replSet " << h.toString() << ' ' << mem.hbstate.toString() << rsLog; + } + if( changed || now-last>4 ) { last = now; theReplSet->mgr->send( boost::bind(&Manager::msgCheckNewState, theReplSet->mgr) ); } @@ -205,8 +225,9 @@ namespace mongo { private: void down(HeartbeatInfo& mem, string msg) { mem.health = 0.0; - if( mem.upSince ) { + if( mem.upSince || mem.downSince == 0 ) { mem.upSince = 0; + mem.downSince = jsTime(); log() << "replSet info " << h.toString() << " is now down (or slow to respond)" << rsLog; } mem.lastHeartbeatMsg = msg; diff --git a/db/repl/manager.cpp b/db/repl/manager.cpp index e870688..862ac46 100644 --- a/db/repl/manager.cpp +++ b/db/repl/manager.cpp @@ -29,12 +29,17 @@ namespace mongo { }; /* check members OTHER THAN US to see if they think they are primary */ - const Member * Manager::findOtherPrimary() { + const Member * Manager::findOtherPrimary(bool& two) { + two = false; Member *m = rs->head(); Member *p = 0; while( m ) { + DEV assert( m != rs->_self ); if( m->state().primary() && m->hbinfo().up() ) { - if( p ) throw "twomasters"; // our polling is asynchronous, so this is often ok. + if( p ) { + two = true; + return 0; + } p = m; } m = m->next(); @@ -63,7 +68,11 @@ namespace mongo { if( rs->box.getPrimary() == m ) return; rs->_self->lhb() = ""; - rs->box.set(rs->iAmArbiterOnly() ? MemberState::RS_ARBITER : MemberState::RS_RECOVERING, m); + if( rs->iAmArbiterOnly() ) { + rs->box.set(MemberState::RS_ARBITER, m); + } else { + rs->box.noteRemoteIsPrimary(m); + } } /** called as the health threads get new results */ @@ -87,11 +96,14 @@ namespace mongo { } const Member *p2; - try { p2 = findOtherPrimary(); } - catch(string s) { - /* two other nodes think they are primary (asynchronously polled) -- wait for things to settle down. */ - log() << "replSet warning DIAG 2 primary" << s << rsLog; - return; + { + bool two; + p2 = findOtherPrimary(two); + if( two ) { + /* two other nodes think they are primary (asynchronously polled) -- wait for things to settle down. */ + log() << "replSet warning DIAG two primaries (transiently)" << rsLog; + return; + } } if( p2 ) { @@ -136,7 +148,7 @@ namespace mongo { return; } - if( !rs->elect.aMajoritySeemsToBeUp() ) { + if( rs->elect.shouldRelinquish() ) { log() << "replSet can't see a majority of the set, relinquishing primary" << rsLog; rs->relinquish(); } diff --git a/db/repl/replset_commands.cpp b/db/repl/replset_commands.cpp index f8f46d5..328b0ab 100644 --- a/db/repl/replset_commands.cpp +++ b/db/repl/replset_commands.cpp @@ -34,19 +34,27 @@ namespace mongo { */ bool replSetBlind = false; + unsigned replSetForceInitialSyncFailure = 0; class CmdReplSetTest : public ReplSetCommand { public: virtual void help( stringstream &help ) const { - help << "Just for testing : do not use.\n"; + help << "Just for regression tests.\n"; } CmdReplSetTest() : ReplSetCommand("replSetTest") { } virtual bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl) { + log() << "replSet replSetTest command received: " << cmdObj.toString() << rsLog; + if( cmdObj.hasElement("forceInitialSyncFailure") ) { + replSetForceInitialSyncFailure = (unsigned) cmdObj["forceInitialSyncFailure"].Number(); + return true; + } + + // may not need this, but if removed check all tests still work: if( !check(errmsg, result) ) return false; + if( cmdObj.hasElement("blind") ) { replSetBlind = cmdObj.getBoolField("blind"); - log() << "replSet info replSetTest command received, replSetBlind=" << replSetBlind << rsLog; return true; } return false; @@ -55,6 +63,7 @@ namespace mongo { class CmdReplSetGetRBID : public ReplSetCommand { public: + /* todo: ideally this should only change on rollbacks NOT on mongod restarts also. fix... */ int rbid; virtual void help( stringstream &help ) const { help << "internal"; @@ -65,12 +74,15 @@ namespace mongo { virtual bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl) { if( !check(errmsg, result) ) return false; - result.append("rbid",rbid); + result.append("rbid",rbid); return true; } } cmdReplSetRBID; using namespace bson; + void incRBID() { + cmdReplSetRBID.rbid++; + } int getRBID(DBClientConnection *c) { bo info; c->simpleCommand("admin", &info, "replSetGetRBID"); diff --git a/db/repl/rs.cpp b/db/repl/rs.cpp index a6737be..1c0444a 100644 --- a/db/repl/rs.cpp +++ b/db/repl/rs.cpp @@ -31,39 +31,48 @@ namespace mongo { extern string *discoveredSeed; void ReplSetImpl::sethbmsg(string s, int logLevel) { - static time_t lastLogged; - if( s == _hbmsg ) { - // unchanged - if( time(0)-lastLogged < 60 ) - return; - } - - unsigned sz = s.size(); - if( sz >= 256 ) - memcpy(_hbmsg, s.c_str(), 255); - else { - _hbmsg[sz] = 0; - memcpy(_hbmsg, s.c_str(), sz); - } - if( !s.empty() ) { - lastLogged = time(0); - log(logLevel) << "replSet " << s << rsLog; - } + static time_t lastLogged; + _hbmsgTime = time(0); + + if( s == _hbmsg ) { + // unchanged + if( _hbmsgTime - lastLogged < 60 ) + return; + } + + unsigned sz = s.size(); + if( sz >= 256 ) + memcpy(_hbmsg, s.c_str(), 255); + else { + _hbmsg[sz] = 0; + memcpy(_hbmsg, s.c_str(), sz); + } + if( !s.empty() ) { + lastLogged = _hbmsgTime; + log(logLevel) << "replSet " << s << rsLog; + } } void ReplSetImpl::assumePrimary() { assert( iAmPotentiallyHot() ); writelock lk("admin."); // so we are synchronized with _logOp() box.setSelfPrimary(_self); - log() << "replSet PRIMARY" << rsLog; // self (" << _self->id() << ") is now primary" << rsLog; + //log() << "replSet PRIMARY" << rsLog; // self (" << _self->id() << ") is now primary" << rsLog; } void ReplSetImpl::changeState(MemberState s) { box.change(s, _self); } void ReplSetImpl::relinquish() { if( box.getState().primary() ) { + log() << "replSet relinquishing primary state" << rsLog; changeState(MemberState::RS_RECOVERING); - log() << "replSet info relinquished primary state" << rsLog; + + /* close sockets that were talking to us */ + /*log() << "replSet closing sockets after reqlinquishing primary" << rsLog; + MessagingPort::closeAllSockets(1);*/ + + // todo: > + //changeState(MemberState::RS_SECONDARY); } else if( box.getState().startup2() ) { // ? add comment @@ -109,11 +118,18 @@ namespace mongo { } void ReplSetImpl::_fillIsMasterHost(const Member *m, vector<string>& hosts, vector<string>& passives, vector<string>& arbiters) { + if( m->config().hidden ) + return; + if( m->potentiallyHot() ) { hosts.push_back(m->h().toString()); } else if( !m->config().arbiterOnly ) { - passives.push_back(m->h().toString()); + if( m->config().slaveDelay ) { + /* hmmm - we don't list these as they are stale. */ + } else { + passives.push_back(m->h().toString()); + } } else { arbiters.push_back(m->h().toString()); @@ -152,6 +168,10 @@ namespace mongo { } if( myConfig().arbiterOnly ) b.append("arbiterOnly", true); + if( myConfig().slaveDelay ) + b.append("slaveDelay", myConfig().slaveDelay); + if( myConfig().hidden ) + b.append("hidden", true); } /** @param cfgString <setname>/<seedhost1>,<seedhost2> */ @@ -200,6 +220,7 @@ namespace mongo { _self(0), mgr( new Manager(this) ) { + _cfg = 0; memset(_hbmsg, 0, sizeof(_hbmsg)); *_hbmsg = '.'; // temp...just to see lastH = 0; @@ -245,7 +266,7 @@ namespace mongo { loadLastOpTimeWritten(); } catch(std::exception& e) { - log() << "replSet ERROR FATAL couldn't query the local " << rsoplog << " collection. Terminating mongod after 30 seconds." << rsLog; + log() << "replSet error fatal couldn't query the local " << rsoplog << " collection. Terminating mongod after 30 seconds." << rsLog; log() << e.what() << rsLog; sleepsecs(30); dbexit( EXIT_REPLICATION_ERROR ); @@ -260,25 +281,64 @@ namespace mongo { ReplSetImpl::StartupStatus ReplSetImpl::startupStatus = PRESTART; string ReplSetImpl::startupStatusMsg; - // true if ok; throws if config really bad; false if config doesn't include self - bool ReplSetImpl::initFromConfig(ReplSetConfig& c) { + extern BSONObj *getLastErrorDefault; + + /** @param reconf true if this is a reconfiguration and not an initial load of the configuration. + @return true if ok; throws if config really bad; false if config doesn't include self + */ + bool ReplSetImpl::initFromConfig(ReplSetConfig& c, bool reconf) { + /* NOTE: haveNewConfig() writes the new config to disk before we get here. So + we cannot error out at this point, except fatally. Check errors earlier. + */ lock lk(this); + if( getLastErrorDefault || !c.getLastErrorDefaults.isEmpty() ) { + // see comment in dbcommands.cpp for getlasterrordefault + getLastErrorDefault = new BSONObj( c.getLastErrorDefaults ); + } + + list<const ReplSetConfig::MemberCfg*> newOnes; + bool additive = reconf; { + unsigned nfound = 0; int me = 0; for( vector<ReplSetConfig::MemberCfg>::iterator i = c.members.begin(); i != c.members.end(); i++ ) { const ReplSetConfig::MemberCfg& m = *i; if( m.h.isSelf() ) { + nfound++; me++; + + if( !reconf || (_self && _self->id() == (unsigned) m._id) ) + ; + else { + log() << "replSet " << _self->id() << ' ' << m._id << rsLog; + assert(false); + } + } + else if( reconf ) { + const Member *old = findById(m._id); + if( old ) { + nfound++; + assert( (int) old->id() == m._id ); + if( old->config() == m ) { + additive = false; + } + } + else { + newOnes.push_back(&m); + } } } if( me == 0 ) { // log() << "replSet config : " << _cfg->toString() << rsLog; - log() << "replSet warning can't find self in the repl set configuration:" << rsLog; + log() << "replSet error can't find self in the repl set configuration:" << rsLog; log() << c.toString() << rsLog; - return false; + assert(false); } uassert( 13302, "replSet error self appears twice in the repl set configuration", me<=1 ); + + if( reconf && config().members.size() != nfound ) + additive = false; } _cfg = new ReplSetConfig(c); @@ -287,6 +347,24 @@ namespace mongo { _name = _cfg->_id; assert( !_name.empty() ); + if( additive ) { + log() << "replSet info : additive change to configuration" << rsLog; + for( list<const ReplSetConfig::MemberCfg*>::const_iterator i = newOnes.begin(); i != newOnes.end(); i++ ) { + const ReplSetConfig::MemberCfg* m = *i; + Member *mi = new Member(m->h, m->_id, m, false); + + /** we will indicate that new members are up() initially so that we don't relinquish our + primary state because we can't (transiently) see a majority. they should be up as we + check that new members are up before getting here on reconfig anyway. + */ + mi->get_hbinfo().health = 0.1; + + _members.push(mi); + startHealthTaskFor(mi); + } + return true; + } + // start with no members. if this is a reconfig, drop the old ones. _members.orphanAll(); @@ -416,6 +494,7 @@ namespace mongo { startupStatusMsg = "replSet error loading set config (BADCONFIG)"; log() << "replSet error loading configurations " << e.toString() << rsLog; log() << "replSet error replication will not start" << rsLog; + sethbmsg("error loading set config"); _fatal(); throw; } @@ -429,11 +508,10 @@ namespace mongo { { //lock l(this); box.set(MemberState::RS_FATAL, 0); - sethbmsg("fatal error"); - log() << "replSet error fatal error, stopping replication" << rsLog; + //sethbmsg("fatal error"); + log() << "replSet error fatal, stopping replication" << rsLog; } - void ReplSet::haveNewConfig(ReplSetConfig& newConfig, bool addComment) { lock l(this); // convention is to lock replset before taking the db rwlock writelock lk(""); @@ -442,7 +520,7 @@ namespace mongo { comment = BSON( "msg" << "Reconfig set" << "version" << newConfig.version ); newConfig.saveConfigLocally(comment); try { - initFromConfig(newConfig); + initFromConfig(newConfig, true); log() << "replSet replSetReconfig new config saved locally" << rsLog; } catch(DBException& e) { diff --git a/db/repl/rs.h b/db/repl/rs.h index 17a070c..6c4d9a8 100644 --- a/db/repl/rs.h +++ b/db/repl/rs.h @@ -44,19 +44,19 @@ namespace mongo { public: Member(HostAndPort h, unsigned ord, const ReplSetConfig::MemberCfg *c, bool self); string fullName() const { return h().toString(); } - const ReplSetConfig::MemberCfg& config() const { return *_config; } + const ReplSetConfig::MemberCfg& config() const { return _config; } const HeartbeatInfo& hbinfo() const { return _hbinfo; } - string lhb() { return _hbinfo.lastHeartbeatMsg; } + HeartbeatInfo& get_hbinfo() { return _hbinfo; } + string lhb() const { return _hbinfo.lastHeartbeatMsg; } MemberState state() const { return _hbinfo.hbstate; } const HostAndPort& h() const { return _h; } unsigned id() const { return _hbinfo.id(); } - bool potentiallyHot() const { return _config->potentiallyHot(); } // not arbiter, not priority 0 - + bool potentiallyHot() const { return _config.potentiallyHot(); } // not arbiter, not priority 0 void summarizeMember(stringstream& s) const; friend class ReplSetImpl; private: - const ReplSetConfig::MemberCfg *_config; /* todo: when this changes??? */ - HostAndPort _h; + const ReplSetConfig::MemberCfg _config; + const HostAndPort _h; HeartbeatInfo _hbinfo; }; @@ -64,7 +64,13 @@ namespace mongo { ReplSetImpl *rs; bool busyWithElectSelf; int _primary; - const Member* findOtherPrimary(); + + /** @param two - if true two primaries were seen. this can happen transiently, in addition to our + polling being only occasional. in this case null is returned, but the caller should + not assume primary itself in that situation. + */ + const Member* findOtherPrimary(bool& two); + void noteARemoteIsPrimary(const Member *); virtual void starting(); public: @@ -102,6 +108,7 @@ namespace mongo { int totalVotes() const; bool aMajoritySeemsToBeUp() const; + bool shouldRelinquish() const; void electSelf(); void electCmdReceived(BSONObj, BSONObjBuilder*); void multiCommand(BSONObj cmd, list<Target>& L); @@ -119,8 +126,8 @@ namespace mongo { protected: RSBase() : magic(0x12345677), m("RSBase"), _locked(0) { } ~RSBase() { - log() << "~RSBase should never be called?" << rsLog; - assert(false); + /* this can happen if we throw in the constructor; otherwise never happens. thus we log it as it is quite unusual. */ + log() << "replSet ~RSBase called" << rsLog; } class lock { @@ -175,6 +182,9 @@ namespace mongo { const Member* getPrimary() const { return sp.primary; } void change(MemberState s, const Member *self) { scoped_lock lk(m); + if( sp.state != s ) { + log() << "replSet " << s.toString() << rsLog; + } sp.state = s; if( s.primary() ) { sp.primary = self; @@ -194,6 +204,12 @@ namespace mongo { assert( !sp.state.primary() ); sp.primary = mem; } + void noteRemoteIsPrimary(const Member *remote) { + scoped_lock lk(m); + if( !sp.state.secondary() && !sp.state.fatal() ) + sp.state = MemberState::RS_RECOVERING; + sp.primary = remote; + } StateBox() : m("StateBox") { } private: mutex m; @@ -226,7 +242,6 @@ namespace mongo { }; static StartupStatus startupStatus; static string startupStatusMsg; - static string stateAsStr(MemberState state); static string stateAsHtml(MemberState state); /* todo thread */ @@ -241,28 +256,24 @@ namespace mongo { void endOldHealthTasks(); void startHealthTaskFor(Member *m); - private: Consensus elect; - bool ok() const { return !box.getState().fatal(); } - void relinquish(); void forgetPrimary(); - protected: bool _stepDown(); private: void assumePrimary(); void loadLastOpTimeWritten(); void changeState(MemberState s); - protected: // "heartbeat message" // sent in requestHeartbeat respond in field "hbm" char _hbmsg[256]; // we change this unlocked, thus not an stl::string + time_t _hbmsgTime; // when it was logged public: void sethbmsg(string s, int logLevel = 0); protected: - bool initFromConfig(ReplSetConfig& c); // true if ok; throws if config really bad; false if config doesn't include self + bool initFromConfig(ReplSetConfig& c, bool reconf=false); // true if ok; throws if config really bad; false if config doesn't include self void _fillIsMaster(BSONObjBuilder&); void _fillIsMasterHost(const Member*, vector<string>&, vector<string>&, vector<string>&); const ReplSetConfig& config() { return *_cfg; } @@ -323,8 +334,10 @@ namespace mongo { void _syncDoInitialSync(); void syncDoInitialSync(); void _syncThread(); + bool tryToGoLiveAsASecondary(OpTime&); // readlocks void syncTail(); void syncApply(const BSONObj &o); + unsigned _syncRollback(OplogReader& r); void syncRollback(OplogReader& r); void syncFixUp(HowToFixUp& h, OplogReader& r); public: @@ -344,9 +357,10 @@ namespace mongo { /* call after constructing to start - returns fairly quickly after la[unching its threads */ void go() { _go(); } + void fatal() { _fatal(); } - bool isPrimary(); - bool isSecondary(); + bool isPrimary() { return box.getState().primary(); } + bool isSecondary() { return box.getState().secondary(); } MemberState state() const { return ReplSetImpl::state(); } string name() const { return ReplSetImpl::name(); } const ReplSetConfig& config() { return ReplSetImpl::config(); } @@ -366,7 +380,10 @@ namespace mongo { bool lockedByMe() { return RSBase::lockedByMe(); } // heartbeat msg to send to others; descriptive diagnostic info - string hbmsg() const { return _hbmsg; } + string hbmsg() const { + if( time(0)-_hbmsgTime > 120 ) return ""; + return _hbmsg; + } }; /** base class for repl set commands. checks basic things such as in rs mode before the command @@ -388,6 +405,8 @@ namespace mongo { if( theReplSet == 0 ) { result.append("startupStatus", ReplSet::startupStatus); errmsg = ReplSet::startupStatusMsg.empty() ? "replset unknown error 2" : ReplSet::startupStatusMsg; + if( ReplSet::startupStatus == 3 ) + result.append("info", "run rs.initiate(...) if not yet done for the set"); return false; } return true; @@ -397,19 +416,10 @@ namespace mongo { /** inlines ----------------- */ inline Member::Member(HostAndPort h, unsigned ord, const ReplSetConfig::MemberCfg *c, bool self) : - _config(c), _h(h), _hbinfo(ord) { - if( self ) { - _hbinfo.health = 1.0; - } - } - - inline bool ReplSet::isPrimary() { - /* todo replset */ - return box.getState().primary(); - } - - inline bool ReplSet::isSecondary() { - return box.getState().secondary(); + _config(*c), _h(h), _hbinfo(ord) + { + if( self ) + _hbinfo.health = 1.0; } } diff --git a/db/repl/rs_config.cpp b/db/repl/rs_config.cpp index 76b20a4..85c9a46 100644 --- a/db/repl/rs_config.cpp +++ b/db/repl/rs_config.cpp @@ -31,6 +31,16 @@ namespace mongo { void logOpInitiate(const bo&); + void assertOnlyHas(BSONObj o, const set<string>& fields) { + BSONObj::iterator i(o); + while( i.more() ) { + BSONElement e = i.next(); + if( !fields.count( e.fieldName() ) ) { + uasserted(13434, str::stream() << "unexpected field '" << e.fieldName() << "'in object"); + } + } + } + list<HostAndPort> ReplSetConfig::otherMemberHostnames() const { list<HostAndPort> L; for( vector<MemberCfg>::const_iterator i = members.begin(); i != members.end(); i++ ) { @@ -42,7 +52,7 @@ namespace mongo { /* comment MUST only be set when initiating the set by the initiator */ void ReplSetConfig::saveConfigLocally(bo comment) { - check(); + checkRsConfig(); log() << "replSet info saving a newer config version to local.system.replset" << rsLog; { writelock lk(""); @@ -81,6 +91,8 @@ namespace mongo { if( votes != 1 ) b << "votes" << votes; if( priority != 1.0 ) b << "priority" << priority; if( arbiterOnly ) b << "arbiterOnly" << true; + if( slaveDelay ) b << "slaveDelay" << slaveDelay; + if( hidden ) b << "hidden" << hidden; return b.obj(); } @@ -115,9 +127,17 @@ namespace mongo { mchk(priority >= 0 && priority <= 1000); mchk(votes >= 0 && votes <= 100); uassert(13419, "this version of mongod only supports priorities 0 and 1", priority == 0 || priority == 1); + uassert(13437, "slaveDelay requires priority be zero", slaveDelay == 0 || priority == 0); + uassert(13438, "bad slaveDelay value", slaveDelay >= 0 && slaveDelay <= 3600 * 24 * 366); + uassert(13439, "priority must be 0 when hidden=true", priority == 0 || !hidden); } + /** @param o old config + @param n new config + */ /*static*/ bool ReplSetConfig::legalChange(const ReplSetConfig& o, const ReplSetConfig& n, string& errmsg) { + assert( theReplSet ); + if( o._id != n._id ) { errmsg = "set name may not change"; return false; @@ -131,6 +151,25 @@ namespace mongo { return false; } + map<HostAndPort,const ReplSetConfig::MemberCfg*> old; + for( vector<ReplSetConfig::MemberCfg>::const_iterator i = o.members.begin(); i != o.members.end(); i++ ) { + old[i->h] = &(*i); + } + int me = 0; + for( vector<ReplSetConfig::MemberCfg>::const_iterator i = n.members.begin(); i != n.members.end(); i++ ) { + const ReplSetConfig::MemberCfg& m = *i; + if( old.count(m.h) ) { + if( old[m.h]->_id != m._id ) { + log() << "replSet reconfig error with member: " << m.h.toString() << rsLog; + uasserted(13432, "_id may not change for members"); + } + } + if( m.h.isSelf() ) + me++; + } + + uassert(13433, "can't find self in new replset config", me == 1); + /* TODO : MORE CHECKS HERE */ log() << "replSet TODO : don't allow removal of a node until we handle it at the removed node end?" << endl; @@ -144,7 +183,7 @@ namespace mongo { _ok = false; } - void ReplSetConfig::check() const { + void ReplSetConfig::checkRsConfig() const { uassert(13132, "nonmatching repl set name in _id field; check --replSet command line", _id == cmdLine.ourSetName()); @@ -154,6 +193,10 @@ namespace mongo { } void ReplSetConfig::from(BSONObj o) { + static const string legal[] = {"_id","version", "members","settings"}; + static const set<string> legals(legal, legal + 4); + assertOnlyHas(o, legals); + md5 = o.md5(); _id = o["_id"].String(); if( o["version"].ok() ) { @@ -170,7 +213,7 @@ namespace mongo { if( settings["heartbeatTimeout"].ok() ) ho.heartbeatTimeoutMillis = (unsigned) (settings["heartbeatTimeout"].Number() * 1000); ho.check(); - try { getLastErrorDefaults = settings["getLastErrorDefaults"].Obj(); } catch(...) { } + try { getLastErrorDefaults = settings["getLastErrorDefaults"].Obj().copy(); } catch(...) { } } set<string> hosts; @@ -188,6 +231,10 @@ namespace mongo { BSONObj mobj = members[i].Obj(); MemberCfg m; try { + static const string legal[] = {"_id","votes","priority","host","hidden","slaveDelay","arbiterOnly"}; + static const set<string> legals(legal, legal + 7); + assertOnlyHas(mobj, legals); + try { m._id = (int) mobj["_id"].Number(); } catch(...) { @@ -205,6 +252,9 @@ namespace mongo { if( m.h.isLocalHost() ) localhosts++; m.arbiterOnly = mobj.getBoolField("arbiterOnly"); + m.slaveDelay = mobj["slaveDelay"].numberInt(); + if( mobj.hasElement("hidden") ) + m.hidden = mobj.getBoolField("hidden"); if( mobj.hasElement("priority") ) m.priority = mobj["priority"].Number(); if( mobj.hasElement("votes") ) @@ -308,6 +358,7 @@ namespace mongo { BSONObj o = c->nextSafe(); uassert(13109, "multiple rows in " + rsConfigNs + " not supported", !c->more()); from(o); + checkRsConfig(); _ok = true; log(level) << "replSet load config ok from " << (h.isSelf() ? "self" : h.toString()) << rsLog; } diff --git a/db/repl/rs_config.h b/db/repl/rs_config.h index 38df772..e39dad7 100644 --- a/db/repl/rs_config.h +++ b/db/repl/rs_config.h @@ -41,18 +41,27 @@ namespace mongo { bool ok() const { return _ok; } struct MemberCfg { - MemberCfg() : _id(-1), votes(1), priority(1.0), arbiterOnly(false) { } + MemberCfg() : _id(-1), votes(1), priority(1.0), arbiterOnly(false), slaveDelay(0), hidden(false) { } int _id; /* ordinal */ unsigned votes; /* how many votes this node gets. default 1. */ HostAndPort h; double priority; /* 0 means can never be primary */ bool arbiterOnly; + int slaveDelay; /* seconds. int rather than unsigned for convenient to/front bson conversion. */ + bool hidden; /* if set, don't advertise to drives in isMaster. for non-primaries (priority 0) */ + void check() const; /* check validity, assert if not. */ BSONObj asBson() const; bool potentiallyHot() const { return !arbiterOnly && priority > 0; } + bool operator==(const MemberCfg& r) const { + return _id==r._id && votes == r.votes && h == r.h && priority == r.priority && + arbiterOnly == r.arbiterOnly && slaveDelay == r.slaveDelay && hidden == r.hidden; + } + bool operator!=(const MemberCfg& r) const { return !(*this == r); } }; + vector<MemberCfg> members; string _id; int version; @@ -68,7 +77,7 @@ namespace mongo { string toString() const { return asBson().toString(); } /** validate the settings. does not call check() on each member, you have to do that separately. */ - void check() const; + void checkRsConfig() const; /** check if modification makes sense */ static bool legalChange(const ReplSetConfig& old, const ReplSetConfig& n, string& errmsg); diff --git a/db/repl/rs_initialsync.cpp b/db/repl/rs_initialsync.cpp index 4c6bd4d..3851c66 100644 --- a/db/repl/rs_initialsync.cpp +++ b/db/repl/rs_initialsync.cpp @@ -66,7 +66,7 @@ namespace mongo { void _logOpObjRS(const BSONObj& op); - bool copyCollectionFromRemote(const string& host, const string& ns, const BSONObj& query, string errmsg); + bool copyCollectionFromRemote(const string& host, const string& ns, const BSONObj& query, string &errmsg, bool logforrepl); static void emptyOplog() { writelock lk(rsoplog); diff --git a/db/repl/rs_member.h b/db/repl/rs_member.h index 4f6846a..6a797b5 100644 --- a/db/repl/rs_member.h +++ b/db/repl/rs_member.h @@ -40,7 +40,8 @@ namespace mongo { RS_STARTUP2, RS_UNKNOWN, /* remote node not yet reached */ RS_ARBITER, - RS_DOWN /* node not reachable for a report */ + RS_DOWN, /* node not reachable for a report */ + RS_ROLLBACK } s; MemberState(MS ms = RS_UNKNOWN) : s(ms) { } @@ -51,6 +52,9 @@ namespace mongo { bool recovering() const { return s == RS_RECOVERING; } bool startup2() const { return s == RS_STARTUP2; } bool fatal() const { return s == RS_FATAL; } + bool rollback() const { return s == RS_ROLLBACK; } + + string toString() const; bool operator==(const MemberState& r) const { return s == r.s; } bool operator!=(const MemberState& r) const { return s != r.s; } @@ -61,26 +65,31 @@ namespace mongo { class HeartbeatInfo { unsigned _id; public: - HeartbeatInfo() : _id(0xffffffff),skew(INT_MIN) { } + HeartbeatInfo() : _id(0xffffffff),hbstate(MemberState::RS_UNKNOWN),health(-1.0),downSince(0),skew(INT_MIN) { } HeartbeatInfo(unsigned id); bool up() const { return health > 0; } unsigned id() const { return _id; } MemberState hbstate; double health; time_t upSince; + long long downSince; time_t lastHeartbeat; string lastHeartbeatMsg; OpTime opTime; int skew; + long long timeDown() const; // ms + /* true if changed in a way of interest to the repl set manager. */ bool changed(const HeartbeatInfo& old) const; }; inline HeartbeatInfo::HeartbeatInfo(unsigned id) : _id(id) { - health = -1.0; - lastHeartbeat = upSince = 0; - skew = INT_MIN; + hbstate = MemberState::RS_UNKNOWN; + health = -1.0; + downSince = 0; + lastHeartbeat = upSince = 0; + skew = INT_MIN; } inline bool HeartbeatInfo::changed(const HeartbeatInfo& old) const { diff --git a/db/repl/rs_rollback.cpp b/db/repl/rs_rollback.cpp index 1bb7217..6b2544c 100644 --- a/db/repl/rs_rollback.cpp +++ b/db/repl/rs_rollback.cpp @@ -62,6 +62,14 @@ namespace mongo { using namespace bson; + bool copyCollectionFromRemote(const string& host, const string& ns, const BSONObj& query, string& errmsg, bool logforrepl); + void incRBID(); + + class rsfatal : public std::exception { + public: + virtual const char* what() const throw(){ return "replica set fatal exception"; } + }; + struct DocID { const char *ns; be _id; @@ -81,6 +89,8 @@ namespace mongo { /* collections to drop */ set<string> toDrop; + set<string> collectionsToResync; + OpTime commonPoint; DiskLoc commonPointOurDiskloc; @@ -113,17 +123,59 @@ namespace mongo { if( *op == 'c' ) { be first = o.firstElement(); NamespaceString s(d.ns); // foo.$cmd - - if( string("create") == first.fieldName() ) { - /* Create collection operation - { ts: ..., h: ..., op: "c", ns: "foo.$cmd", o: { create: "abc", ... } }
- */ - string ns = s.db + '.' + o["create"].String(); // -> foo.abc - h.toDrop.insert(ns); + string cmdname = first.fieldName(); + Command *cmd = Command::findCommand(cmdname.c_str()); + if( cmd == 0 ) { + log() << "replSet warning rollback no suchcommand " << first.fieldName() << " - different mongod versions perhaps?" << rsLog; return; } - else { - log() << "replSet WARNING can't roll back this command yet: " << o.toString() << rsLog; + else { + /* findandmodify - tranlated? + godinsert?, + renamecollection a->b. just resync a & b + */ + if( cmdname == "create" ) { + /* Create collection operation + { ts: ..., h: ..., op: "c", ns: "foo.$cmd", o: { create: "abc", ... } }
+ */ + string ns = s.db + '.' + o["create"].String(); // -> foo.abc + h.toDrop.insert(ns); + return; + } + else if( cmdname == "drop" ) { + string ns = s.db + '.' + first.valuestr(); + h.collectionsToResync.insert(ns); + return; + } + else if( cmdname == "dropIndexes" || cmdname == "deleteIndexes" ) { + /* TODO: this is bad. we simply full resync the collection here, which could be very slow. */ + log() << "replSet info rollback of dropIndexes is slow in this version of mongod" << rsLog; + string ns = s.db + '.' + first.valuestr(); + h.collectionsToResync.insert(ns); + return; + } + else if( cmdname == "renameCollection" ) { + /* TODO: slow. */ + log() << "replSet info rollback of renameCollection is slow in this version of mongod" << rsLog; + string from = first.valuestr(); + string to = o["to"].String(); + h.collectionsToResync.insert(from); + h.collectionsToResync.insert(to); + return; + } + else if( cmdname == "reIndex" ) { + return; + } + else if( cmdname == "dropDatabase" ) { + log() << "replSet error rollback : can't rollback drop database full resync will be required" << rsLog; + log() << "replSet " << o.toString() << rsLog; + throw rsfatal(); + } + else { + log() << "replSet error can't rollback this command yet: " << o.toString() << rsLog; + log() << "replSet cmdname=" << cmdname << rsLog; + throw rsfatal(); + } } } @@ -141,8 +193,6 @@ namespace mongo { static void syncRollbackFindCommonPoint(DBClientConnection *them, HowToFixUp& h) { static time_t last; if( time(0)-last < 60 ) { - // this could put a lot of load on someone else, don't repeat too often - sleepsecs(10); throw "findcommonpoint waiting a while before trying again"; } last = time(0); @@ -170,12 +220,14 @@ namespace mongo { BSONObj theirObj = t->nextSafe(); OpTime theirTime = theirObj["ts"]._opTime(); - if( 1 ) { + { long long diff = (long long) ourTime.getSecs() - ((long long) theirTime.getSecs()); /* diff could be positive, negative, or zero */ - log() << "replSet info syncRollback diff in end of log times : " << diff << " seconds" << rsLog; + log() << "replSet info rollback our last optime: " << ourTime.toStringPretty() << rsLog; + log() << "replSet info rollback their last optime: " << theirTime.toStringPretty() << rsLog; + log() << "replSet info rollback diff in end of log times: " << diff << " seconds" << rsLog; if( diff > 3600 ) { - log() << "replSet syncRollback too long a time period for a rollback." << rsLog; + log() << "replSet rollback too long a time period for a rollback." << rsLog; throw "error not willing to roll back more than one hour of data"; } } @@ -197,16 +249,35 @@ namespace mongo { refetch(h, ourObj); + if( !t->more() ) { + log() << "replSet rollback error RS100 reached beginning of remote oplog" << rsLog; + log() << "replSet them: " << them->toString() << " scanned: " << scanned << rsLog; + log() << "replSet theirTime: " << theirTime.toStringLong() << rsLog; + log() << "replSet ourTime: " << ourTime.toStringLong() << rsLog; + throw "RS100 reached beginning of remote oplog [2]"; + } theirObj = t->nextSafe(); theirTime = theirObj["ts"]._opTime(); u.advance(); - if( !u.ok() ) throw "reached beginning of local oplog"; + if( !u.ok() ) { + log() << "replSet rollback error RS101 reached beginning of local oplog" << rsLog; + log() << "replSet them: " << them->toString() << " scanned: " << scanned << rsLog; + log() << "replSet theirTime: " << theirTime.toStringLong() << rsLog; + log() << "replSet ourTime: " << ourTime.toStringLong() << rsLog; + throw "RS101 reached beginning of local oplog [1]"; + } ourObj = u.current(); ourTime = ourObj["ts"]._opTime(); } else if( theirTime > ourTime ) { - /* todo: we could hit beginning of log here. exception thrown is ok but not descriptive, so fix up */ + if( !t->more() ) { + log() << "replSet rollback error RS100 reached beginning of remote oplog" << rsLog; + log() << "replSet them: " << them->toString() << " scanned: " << scanned << rsLog; + log() << "replSet theirTime: " << theirTime.toStringLong() << rsLog; + log() << "replSet ourTime: " << ourTime.toStringLong() << rsLog; + throw "RS100 reached beginning of remote oplog [1]"; + } theirObj = t->nextSafe(); theirTime = theirObj["ts"]._opTime(); } @@ -214,7 +285,13 @@ namespace mongo { // theirTime < ourTime refetch(h, ourObj); u.advance(); - if( !u.ok() ) throw "reached beginning of local oplog"; + if( !u.ok() ) { + log() << "replSet rollback error RS101 reached beginning of local oplog" << rsLog; + log() << "replSet them: " << them->toString() << " scanned: " << scanned << rsLog; + log() << "replSet theirTime: " << theirTime.toStringLong() << rsLog; + log() << "replSet ourTime: " << ourTime.toStringLong() << rsLog; + throw "RS101 reached beginning of local oplog [2]"; + } ourObj = u.current(); ourTime = ourObj["ts"]._opTime(); } @@ -226,7 +303,19 @@ namespace mongo { bson::bo goodVersionOfObject; }; - void ReplSetImpl::syncFixUp(HowToFixUp& h, OplogReader& r) { + static void setMinValid(bo newMinValid) { + try { + log() << "replSet minvalid=" << newMinValid["ts"]._opTime().toStringLong() << rsLog; + } + catch(...) { } + { + Helpers::putSingleton("local.replset.minvalid", newMinValid); + Client::Context cx( "local." ); + cx.db()->flushFiles(true); + } + } + + void ReplSetImpl::syncFixUp(HowToFixUp& h, OplogReader& r) { DBClientConnection *them = r.conn(); // fetch all first so we needn't handle interruption in a fancy way @@ -237,6 +326,7 @@ namespace mongo { bo newMinValid; + /* fetch all the goodVersions of each document from current primary */ DocID d; unsigned long long n = 0; try { @@ -258,43 +348,98 @@ namespace mongo { } newMinValid = r.getLastOp(rsoplog); if( newMinValid.isEmpty() ) { - sethbmsg("syncRollback error newMinValid empty?"); + sethbmsg("rollback error newMinValid empty?"); return; } } catch(DBException& e) { - sethbmsg(str::stream() << "syncRollback re-get objects: " << e.toString(),0); - log() << "syncRollback couldn't re-get ns:" << d.ns << " _id:" << d._id << ' ' << n << '/' << h.toRefetch.size() << rsLog; + sethbmsg(str::stream() << "rollback re-get objects: " << e.toString(),0); + log() << "rollback couldn't re-get ns:" << d.ns << " _id:" << d._id << ' ' << n << '/' << h.toRefetch.size() << rsLog; throw e; } - sethbmsg("syncRollback 3.5"); + MemoryMappedFile::flushAll(true); + + sethbmsg("rollback 3.5"); if( h.rbid != getRBID(r.conn()) ) { // our source rolled back itself. so the data we received isn't necessarily consistent. - sethbmsg("syncRollback rbid on source changed during rollback, cancelling this attempt"); + sethbmsg("rollback rbid on source changed during rollback, cancelling this attempt"); return; } // update them - sethbmsg(str::stream() << "syncRollback 4 n:" << goodVersions.size()); + sethbmsg(str::stream() << "rollback 4 n:" << goodVersions.size()); bool warn = false; assert( !h.commonPointOurDiskloc.isNull() ); - MemoryMappedFile::flushAll(true); - dbMutex.assertWriteLocked(); /* we have items we are writing that aren't from a point-in-time. thus best not to come online until we get to that point in freshness. */ - try { - log() << "replSet set minvalid=" << newMinValid["ts"]._opTime().toString() << rsLog; + setMinValid(newMinValid); + + /** any full collection resyncs required? */ + if( !h.collectionsToResync.empty() ) { + for( set<string>::iterator i = h.collectionsToResync.begin(); i != h.collectionsToResync.end(); i++ ) { + string ns = *i; + sethbmsg(str::stream() << "rollback 4.1 coll resync " << ns); + Client::Context c(*i, dbpath, 0, /*doauth*/false); + try { + bob res; + string errmsg; + dropCollection(ns, errmsg, res); + { + dbtemprelease r; + bool ok = copyCollectionFromRemote(them->getServerAddress(), ns, bo(), errmsg, false); + if( !ok ) { + log() << "replSet rollback error resyncing collection " << ns << ' ' << errmsg << rsLog; + throw "rollback error resyncing rollection [1]"; + } + } + } + catch(...) { + log() << "replset rollback error resyncing collection " << ns << rsLog; + throw "rollback error resyncing rollection [2]"; + } + } + + /* we did more reading from primary, so check it again for a rollback (which would mess us up), and + make minValid newer. + */ + sethbmsg("rollback 4.2"); + { + string err; + try { + newMinValid = r.getLastOp(rsoplog); + if( newMinValid.isEmpty() ) { + err = "can't get minvalid from primary"; + } else { + setMinValid(newMinValid); + } + } + catch(...) { + err = "can't get/set minvalid"; + } + if( h.rbid != getRBID(r.conn()) ) { + // our source rolled back itself. so the data we received isn't necessarily consistent. + // however, we've now done writes. thus we have a problem. + err += "rbid at primary changed during resync/rollback"; + } + if( !err.empty() ) { + log() << "replSet error rolling back : " << err << ". A full resync will be necessary." << rsLog; + /* todo: reset minvalid so that we are permanently in fatal state */ + /* todo: don't be fatal, but rather, get all the data first. */ + sethbmsg("rollback error"); + throw rsfatal(); + } + } + sethbmsg("rollback 4.3"); } - catch(...){} - Helpers::putSingleton("local.replset.minvalid", newMinValid); - /** first drop collections to drop - that might make things faster below actually if there were subsequent inserts */ + sethbmsg("rollback 4.6"); + /** drop collections to drop before doing individual fixups - that might make things faster below actually if there were subsequent inserts to rollback */ for( set<string>::iterator i = h.toDrop.begin(); i != h.toDrop.end(); i++ ) { Client::Context c(*i, dbpath, 0, /*doauth*/false); try { @@ -308,6 +453,7 @@ namespace mongo { } } + sethbmsg("rollback 4.7"); Client::Context c(rsoplog, dbpath, 0, /*doauth*/false); NamespaceDetails *oplogDetails = nsdetails(rsoplog); uassert(13423, str::stream() << "replSet error in rollback can't find " << rsoplog, oplogDetails); @@ -320,7 +466,12 @@ namespace mongo { bo pattern = d._id.wrap(); // { _id : ... } try { assert( d.ns && *d.ns ); - + if( h.collectionsToResync.count(d.ns) ) { + /* we just synced this entire collection */ + continue; + } + + /* keep an archive of items rolled back */ shared_ptr<RemoveSaver>& rs = removeSavers[d.ns]; if ( ! rs ) rs.reset( new RemoveSaver( "rollback" , "" , d.ns ) ); @@ -343,7 +494,7 @@ namespace mongo { long long start = Listener::getElapsedTimeMillis(); DiskLoc loc = Helpers::findOne(d.ns, pattern, false); if( Listener::getElapsedTimeMillis() - start > 200 ) - log() << "replSet warning roll back slow no _id index for " << d.ns << rsLog; + log() << "replSet warning roll back slow no _id index for " << d.ns << " perhaps?" << rsLog; //would be faster but requires index: DiskLoc loc = Helpers::findById(nsd, pattern); if( !loc.isNull() ) { try { @@ -411,9 +562,9 @@ namespace mongo { removeSavers.clear(); // this effectively closes all of them - sethbmsg(str::stream() << "syncRollback 5 d:" << deletes << " u:" << updates); + sethbmsg(str::stream() << "rollback 5 d:" << deletes << " u:" << updates); MemoryMappedFile::flushAll(true); - sethbmsg("syncRollback 6"); + sethbmsg("rollback 6"); // clean up oplog log(2) << "replSet rollback truncate oplog after " << h.commonPoint.toStringPretty() << rsLog; @@ -423,59 +574,99 @@ namespace mongo { /* reset cached lastoptimewritten and h value */ loadLastOpTimeWritten(); - sethbmsg("syncRollback 7"); + sethbmsg("rollback 7"); MemoryMappedFile::flushAll(true); // done if( warn ) sethbmsg("issues during syncRollback, see log"); else - sethbmsg("syncRollback done"); + sethbmsg("rollback done"); } void ReplSetImpl::syncRollback(OplogReader&r) { + unsigned s = _syncRollback(r); + if( s ) + sleepsecs(s); + } + + unsigned ReplSetImpl::_syncRollback(OplogReader&r) { assert( !lockedByMe() ); assert( !dbMutex.atLeastReadLocked() ); - sethbmsg("syncRollback 0"); + sethbmsg("rollback 0"); writelocktry lk(rsoplog, 20000); if( !lk.got() ) { - sethbmsg("syncRollback couldn't get write lock in a reasonable time"); - sleepsecs(2); - return; + sethbmsg("rollback couldn't get write lock in a reasonable time"); + return 2; + } + + if( box.getState().secondary() ) { + /* by doing this, we will not service reads (return an error as we aren't in secondary staate. + that perhaps is moot becasue of the write lock above, but that write lock probably gets deferred + or removed or yielded later anyway. + + also, this is better for status reporting - we know what is happening. + */ + box.change(MemberState::RS_ROLLBACK, _self); } HowToFixUp how; - sethbmsg("syncRollback 1"); + sethbmsg("rollback 1"); { r.resetCursor(); /*DBClientConnection us(false, 0, 0); string errmsg; if( !us.connect(HostAndPort::me().toString(),errmsg) ) { - sethbmsg("syncRollback connect to self failure" + errmsg); + sethbmsg("rollback connect to self failure" + errmsg); return; }*/ - sethbmsg("syncRollback 2 FindCommonPoint"); + sethbmsg("rollback 2 FindCommonPoint"); try { syncRollbackFindCommonPoint(r.conn(), how); } catch( const char *p ) { - sethbmsg(string("syncRollback 2 error ") + p); - sleepsecs(10); - return; + sethbmsg(string("rollback 2 error ") + p); + return 10; + } + catch( rsfatal& ) { + _fatal(); + return 2; } catch( DBException& e ) { - sethbmsg(string("syncRollback 2 exception ") + e.toString() + "; sleeping 1 min"); + sethbmsg(string("rollback 2 exception ") + e.toString() + "; sleeping 1 min"); + dbtemprelease r; sleepsecs(60); throw; } } - sethbmsg("replSet syncRollback 3 fixup"); + sethbmsg("replSet rollback 3 fixup"); + + { + incRBID(); + try { + syncFixUp(how, r); + } + catch( rsfatal& ) { + sethbmsg("rollback fixup error"); + _fatal(); + return 2; + } + catch(...) { + incRBID(); throw; + } + incRBID(); + + /* success - leave "ROLLBACK" state + can go to SECONDARY once minvalid is achieved + */ + box.change(MemberState::RS_RECOVERING, _self); + } - syncFixUp(how, r); + return 0; } } diff --git a/db/repl/rs_sync.cpp b/db/repl/rs_sync.cpp index bece96c..9ea65cf 100644 --- a/db/repl/rs_sync.cpp +++ b/db/repl/rs_sync.cpp @@ -24,8 +24,11 @@ namespace mongo { using namespace bson; + extern unsigned replSetForceInitialSyncFailure; + void startSyncThread() { Client::initThread("rs_sync"); + cc().iAmSyncThread(); theReplSet->syncThread(); cc().shutdown(); } @@ -91,6 +94,7 @@ namespace mongo { // todo : use exhaust unsigned long long n = 0; while( 1 ) { + if( !r.more() ) break; BSONObj o = r.nextSafe(); /* note we might get "not master" at some point */ @@ -103,9 +107,14 @@ namespace mongo { anymore. assumePrimary is in the db lock so we are safe as long as we check after we locked above. */ const Member *p1 = box.getPrimary(); - if( p1 != primary ) { - log() << "replSet primary was:" << primary->fullName() << " now:" << - (p1 != 0 ? p1->fullName() : "none") << rsLog; + if( p1 != primary || replSetForceInitialSyncFailure ) { + int f = replSetForceInitialSyncFailure; + if( f > 0 ) { + replSetForceInitialSyncFailure = f-1; + log() << "replSet test code invoked, replSetForceInitialSyncFailure" << rsLog; + } + log() << "replSet primary was:" << primary->fullName() << " now:" << + (p1 != 0 ? p1->fullName() : "none") << rsLog; throw DBException("primary changed",0); } @@ -131,6 +140,32 @@ namespace mongo { return true; } + /* should be in RECOVERING state on arrival here. + readlocks + @return true if transitioned to SECONDARY + */ + bool ReplSetImpl::tryToGoLiveAsASecondary(OpTime& /*out*/ minvalid) { + bool golive = false; + { + readlock lk("local.replset.minvalid"); + BSONObj mv; + if( Helpers::getSingleton("local.replset.minvalid", mv) ) { + minvalid = mv["ts"]._opTime(); + if( minvalid <= lastOpTimeWritten ) { + golive=true; + } + } + else + golive = true; /* must have been the original member */ + } + if( golive ) { + sethbmsg(""); + changeState(MemberState::RS_SECONDARY); + } + return golive; + } + + /* tail the primary's oplog. ok to return, will be re-called. */ void ReplSetImpl::syncTail() { // todo : locking vis a vis the mgr... @@ -147,14 +182,19 @@ namespace mongo { { BSONObj remoteOldestOp = r.findOne(rsoplog, Query()); OpTime ts = remoteOldestOp["ts"]._opTime(); - DEV log() << "remoteOldestOp: " << ts.toStringPretty() << endl; - else log(3) << "remoteOldestOp: " << ts.toStringPretty() << endl; + DEV log() << "replSet remoteOldestOp: " << ts.toStringLong() << rsLog; + else log(3) << "replSet remoteOldestOp: " << ts.toStringLong() << rsLog; + DEV { + // debugging sync1.js... + log() << "replSet lastOpTimeWritten: " << lastOpTimeWritten.toStringLong() << rsLog; + log() << "replSet our state: " << state().toString() << rsLog; + } if( lastOpTimeWritten < ts ) { - log() << "replSet error too stale to catch up, at least from primary " << hn << rsLog; - log() << "replSet our last optime : " << lastOpTimeWritten.toStringPretty() << rsLog; - log() << "replSet oldest at " << hn << " : " << ts.toStringPretty() << rsLog; + log() << "replSet error RS102 too stale to catch up, at least from primary: " << hn << rsLog; + log() << "replSet our last optime : " << lastOpTimeWritten.toStringLong() << rsLog; + log() << "replSet oldest at " << hn << " : " << ts.toStringLong() << rsLog; log() << "replSet See http://www.mongodb.org/display/DOCS/Resyncing+a+Very+Stale+Replica+Set+Member" << rsLog; - sethbmsg("error too stale to catch up"); + sethbmsg("error RS102 too stale to catch up"); sleepsecs(120); return; } @@ -213,7 +253,13 @@ namespace mongo { } } - while( 1 ) { + /* we have now checked if we need to rollback and we either don't have to or did it. */ + { + OpTime minvalid; + tryToGoLiveAsASecondary(minvalid); + } + + while( 1 ) { while( 1 ) { if( !r.moreInCurrentBatch() ) { /* we need to occasionally check some things. between @@ -222,27 +268,13 @@ namespace mongo { /* perhaps we should check this earlier? but not before the rollback checks. */ if( state().recovering() ) { /* can we go to RS_SECONDARY state? we can if not too old and if minvalid achieved */ - bool golive = false; OpTime minvalid; - { - readlock lk("local.replset.minvalid"); - BSONObj mv; - if( Helpers::getSingleton("local.replset.minvalid", mv) ) { - minvalid = mv["ts"]._opTime(); - if( minvalid <= lastOpTimeWritten ) { - golive=true; - } - } - else - golive = true; /* must have been the original member */ - } + bool golive = ReplSetImpl::tryToGoLiveAsASecondary(minvalid); if( golive ) { - sethbmsg(""); - log() << "replSet SECONDARY" << rsLog; - changeState(MemberState::RS_SECONDARY); + ; } else { - sethbmsg(str::stream() << "still syncing, not yet to minValid optime " << minvalid.toString()); + sethbmsg(str::stream() << "still syncing, not yet to minValid optime" << minvalid.toString()); } /* todo: too stale capability */ @@ -270,12 +302,40 @@ namespace mongo { syncApply(o); _logOpObjRS(o); /* with repl sets we write the ops to our oplog too: */ } + int sd = myConfig().slaveDelay; + if( sd ) { + const OpTime ts = o["ts"]._opTime(); + long long a = ts.getSecs(); + long long b = time(0); + long long lag = b - a; + long long sleeptime = sd - lag; + if( sleeptime > 0 ) { + uassert(12000, "rs slaveDelay differential too big check clocks and systems", sleeptime < 0x40000000); + log() << "replSet temp slavedelay sleep:" << sleeptime << rsLog; + if( sleeptime < 60 ) { + sleepsecs((int) sleeptime); + } + else { + // sleep(hours) would prevent reconfigs from taking effect & such! + long long waitUntil = b + sleeptime; + while( 1 ) { + sleepsecs(6); + if( time(0) >= waitUntil ) + break; + if( box.getPrimary() != primary ) + break; + if( myConfig().slaveDelay != sd ) // reconf + break; + } + } + } + } } } r.tailCheck(); if( !r.haveCursor() ) { - log() << "replSet TEMP end syncTail pass with " << hn << rsLog; - // TODO : reuse our cnonection to the primary. + log(1) << "replSet end syncTail pass with " << hn << rsLog; + // TODO : reuse our connection to the primary. return; } if( box.getPrimary() != primary ) @@ -290,6 +350,10 @@ namespace mongo { sleepsecs(1); return; } + if( sp.state.fatal() ) { + sleepsecs(5); + return; + } /* later, we can sync from up secondaries if we want. tbd. */ if( sp.primary == 0 ) diff --git a/db/security_commands.cpp b/db/security_commands.cpp index af03294..7bf2813 100644 --- a/db/security_commands.cpp +++ b/db/security_commands.cpp @@ -49,9 +49,7 @@ namespace mongo { class CmdGetNonce : public Command { public: virtual bool requiresAuth() { return false; } - virtual bool logTheOp() { - return false; - } + virtual bool logTheOp() { return false; } virtual bool slaveOk() const { return true; } diff --git a/db/stats/snapshots.cpp b/db/stats/snapshots.cpp index b439b03..3ce80ca 100644 --- a/db/stats/snapshots.cpp +++ b/db/stats/snapshots.cpp @@ -163,7 +163,7 @@ namespace mongo { ss << usage.count; ss << "</td><td>"; double per = 100 * ((double)usage.time)/elapsed; - ss << setprecision(4) << fixed << per << "%"; + ss << setprecision(1) << fixed << per << "%"; ss << "</td>"; } |