summaryrefslogtreecommitdiff
path: root/db
diff options
context:
space:
mode:
Diffstat (limited to 'db')
-rw-r--r--db/client.cpp45
-rw-r--r--db/client.h50
-rw-r--r--db/clientcursor.h9
-rw-r--r--db/cloner.cpp17
-rw-r--r--db/commands.h4
-rw-r--r--db/curop.h5
-rw-r--r--db/db.cpp16
-rw-r--r--db/db.vcxproj20
-rwxr-xr-xdb/db.vcxproj.filters62
-rw-r--r--db/db_10.sln4
-rw-r--r--db/dbcommands.cpp88
-rw-r--r--db/dbcommands_generic.cpp3
-rw-r--r--db/instance.cpp33
-rw-r--r--db/lasterror.cpp7
-rw-r--r--db/matcher.cpp2
-rw-r--r--db/oplog.cpp6
-rw-r--r--db/oplog.h2
-rw-r--r--db/pdfile.cpp48
-rw-r--r--db/pdfile.h6
-rw-r--r--db/query.cpp7
-rw-r--r--db/queryoptimizer.cpp2
-rw-r--r--db/queryutil.cpp30
-rw-r--r--db/repl.cpp22
-rw-r--r--db/repl/consensus.cpp12
-rw-r--r--db/repl/health.cpp34
-rw-r--r--db/repl/health.h2
-rw-r--r--db/repl/heartbeat.cpp27
-rw-r--r--db/repl/manager.cpp30
-rw-r--r--db/repl/replset_commands.cpp18
-rw-r--r--db/repl/rs.cpp138
-rw-r--r--db/repl/rs.h74
-rw-r--r--db/repl/rs_config.cpp57
-rw-r--r--db/repl/rs_config.h13
-rw-r--r--db/repl/rs_initialsync.cpp2
-rw-r--r--db/repl/rs_member.h19
-rw-r--r--db/repl/rs_rollback.cpp291
-rw-r--r--db/repl/rs_sync.cpp122
-rw-r--r--db/security_commands.cpp4
-rw-r--r--db/stats/snapshots.cpp2
39 files changed, 977 insertions, 356 deletions
diff --git a/db/client.cpp b/db/client.cpp
index 9781041..f9653f5 100644
--- a/db/client.cpp
+++ b/db/client.cpp
@@ -35,16 +35,18 @@
namespace mongo {
+ Client* Client::syncThread;
mongo::mutex Client::clientsMutex("clientsMutex");
set<Client*> Client::clients; // always be in clientsMutex when manipulating this
boost::thread_specific_ptr<Client> currentClient;
- Client::Client(const char *desc) :
+ Client::Client(const char *desc, MessagingPort *p) :
_context(0),
_shutdown(false),
_desc(desc),
_god(0),
- _lastOp(0)
+ _lastOp(0),
+ _mp(p)
{
_curOp = new CurOp( this );
scoped_lock bl(clientsMutex);
@@ -52,15 +54,21 @@ namespace mongo {
}
Client::~Client() {
- delete _curOp;
_god = 0;
if ( _context )
- cout << "ERROR: Client::~Client _context should be NULL: " << _desc << endl;
- if ( !_shutdown )
- cout << "ERROR: Client::shutdown not called: " << _desc << endl;
- }
+ error() << "Client::~Client _context should be null but is not; client:" << _desc << endl;
+ if ( ! _shutdown ) {
+ error() << "Client::shutdown not called: " << _desc << endl;
+ }
+
+ scoped_lock bl(clientsMutex);
+ if ( ! _shutdown )
+ clients.erase(this);
+ delete _curOp;
+ }
+
void Client::_dropns( const string& ns ){
Top::global.collectionDropped( ns );
@@ -75,7 +83,7 @@ namespace mongo {
dropCollection( ns , err , b );
}
catch ( ... ){
- log() << "error dropping temp collection: " << ns << endl;
+ warning() << "error dropping temp collection: " << ns << endl;
}
}
@@ -196,12 +204,18 @@ namespace mongo {
if ( doauth )
_auth( lockState );
- if ( _client->_curOp->getOp() != dbGetMore ){ // getMore's are special and should be handled else where
+ switch ( _client->_curOp->getOp() ){
+ case dbGetMore: // getMore's are special and should be handled else where
+ case dbUpdate: // update & delete check shard version in instance.cpp, so don't check here as well
+ case dbDelete:
+ break;
+ default: {
string errmsg;
- if ( ! shardVersionOk( _ns , errmsg ) ){
+ if ( ! shardVersionOk( _ns , lockState > 0 , errmsg ) ){
msgasserted( StaleConfigInContextCode , (string)"[" + _ns + "] shard version not ok in Client::Context: " + errmsg );
}
}
+ }
}
void Client::Context::_auth( int lockState ){
@@ -237,7 +251,7 @@ namespace mongo {
string sayClientState(){
Client* c = currentClient.get();
- if ( ! c )
+ if ( !c )
return "no client";
return c->toString();
}
@@ -259,6 +273,15 @@ namespace mongo {
}
}
+ CurOp::~CurOp(){
+ if ( _wrapped ){
+ scoped_lock bl(Client::clientsMutex);
+ _client->_curOp = _wrapped;
+ }
+
+ _client = 0;
+ }
+
BSONObj CurOp::query( bool threadSafe ) {
if( querySize() == 1 ) {
return _tooBig;
diff --git a/db/client.h b/db/client.h
index 2456d7f..d0600e3 100644
--- a/db/client.h
+++ b/db/client.h
@@ -29,27 +29,33 @@
#include "namespace.h"
#include "lasterror.h"
#include "stats/top.h"
-//#include "repl/rs.h"
namespace mongo {
extern class ReplSet *theReplSet;
-
class AuthenticationInfo;
class Database;
class CurOp;
class Command;
class Client;
+ class MessagingPort;
extern boost::thread_specific_ptr<Client> currentClient;
class Client : boost::noncopyable {
public:
+ static Client *syncThread;
+ void iAmSyncThread() {
+ wassert( syncThread == 0 );
+ syncThread = this;
+ }
+ bool isSyncThread() const { return this == syncThread; } // true if this client is the replication secondary pull thread
+
static mongo::mutex clientsMutex;
static set<Client*> clients; // always be in clientsMutex when manipulating this
-
static int recommendedYieldMicros( int * writers = 0 , int * readers = 0 );
+ /* set _god=true temporarily, safely */
class GodScope {
bool _prev;
public:
@@ -148,9 +154,11 @@ namespace mongo {
}
friend class CurOp;
- };
+ }; // class Client::Context
private:
+ void _dropns( const string& ns );
+
CurOp * _curOp;
Context * _context;
bool _shutdown;
@@ -162,9 +170,9 @@ namespace mongo {
BSONObj _handshake;
BSONObj _remoteId;
- void _dropns( const string& ns );
-
public:
+ MessagingPort * const _mp;
+
string clientAddress() const;
AuthenticationInfo * getAuthenticationInfo(){ return &_ai; }
bool isAdmin() { return _ai.isAuthorized( "admin" ); }
@@ -174,24 +182,19 @@ namespace mongo {
const char *ns() const { return _context->ns(); }
const char *desc() const { return _desc; }
- Client(const char *desc);
+ Client(const char *desc, MessagingPort *p = 0);
~Client();
void addTempCollection( const string& ns );
void _invalidateDB(const string& db);
static void invalidateDB(const string& db);
-
static void invalidateNS( const string& ns );
- void setLastOp( ReplTime op ) {
- _lastOp = op;
- }
-
- ReplTime getLastOp() const {
- return _lastOp;
- }
+ void setLastOp( ReplTime op ) { _lastOp = op; }
+ ReplTime getLastOp() const { return _lastOp; }
+ /* report what the last operation was. used by getlasterror */
void appendLastOp( BSONObjBuilder& b ) {
if( theReplSet ) {
b.append("lastOp" , (long long) _lastOp);
@@ -206,14 +209,13 @@ namespace mongo {
/* each thread which does db operations has a Client object in TLS.
call this when your thread starts.
*/
- static void initThread(const char *desc);
+ static Client& initThread(const char *desc, MessagingPort *mp = 0);
/*
this has to be called as the client goes away, but before thread termination
@return true if anything was done
*/
bool shutdown();
-
/* this is for map/reduce writes */
bool isGod() const { return _god; }
@@ -221,13 +223,12 @@ namespace mongo {
friend class CurOp;
string toString() const;
-
void gotHandshake( const BSONObj& o );
-
BSONObj getRemoteID() const { return _remoteId; }
BSONObj getHandshake() const { return _handshake; }
};
+ /** get the Client object for this thread. */
inline Client& cc() {
Client * c = currentClient.get();
assert( c );
@@ -237,11 +238,13 @@ namespace mongo {
/* each thread which does db operations has a Client object in TLS.
call this when your thread starts.
*/
- inline void Client::initThread(const char *desc) {
+ inline Client& Client::initThread(const char *desc, MessagingPort *mp) {
setThreadName(desc);
assert( currentClient.get() == 0 );
- currentClient.reset( new Client(desc) );
+ Client *c = new Client(desc, mp);
+ currentClient.reset(c);
mongo::lastError.initThread();
+ return *c;
}
inline Client::GodScope::GodScope(){
@@ -276,8 +279,5 @@ namespace mongo {
string sayClientState();
- inline bool haveClient(){
- return currentClient.get() > 0;
- }
+ inline bool haveClient() { return currentClient.get() > 0; }
};
-
diff --git a/db/clientcursor.h b/db/clientcursor.h
index 6f79dcf..b895c17 100644
--- a/db/clientcursor.h
+++ b/db/clientcursor.h
@@ -138,17 +138,18 @@ namespace mongo {
/*const*/ CursorId cursorid;
const string ns;
const shared_ptr<Cursor> c;
- int pos; // # objects into the cursor so far
- BSONObj query;
+ int pos; // # objects into the cursor so far
+ const BSONObj query; // used for logging diags only; optional in constructor
const int _queryOptions; // see enum QueryOptions dbclient.h
OpTime _slaveReadTill;
Database * const _db;
- ClientCursor(int queryOptions, shared_ptr<Cursor>& _c, const string& _ns) :
+ ClientCursor(int queryOptions, shared_ptr<Cursor>& _c, const string& _ns, BSONObj _query = BSONObj()) :
_idleAgeMillis(0), _pinValue(0),
_doingDeletes(false), _yieldSometimesTracker(128,10),
ns(_ns), c(_c),
- pos(0), _queryOptions(queryOptions),
+ pos(0), query(_query),
+ _queryOptions(queryOptions),
_db( cc().database() )
{
assert( _db );
diff --git a/db/cloner.cpp b/db/cloner.cpp
index 96890bf..9177a00 100644
--- a/db/cloner.cpp
+++ b/db/cloner.cpp
@@ -49,7 +49,7 @@ namespace mongo {
void setConnection( DBClientWithCommands *c ) { conn.reset( c ); }
bool go(const char *masterHost, string& errmsg, const string& fromdb, bool logForRepl, bool slaveOk, bool useReplAuth, bool snapshot);
- bool copyCollection( const string& from , const string& ns , const BSONObj& query , string& errmsg , bool copyIndexes = true );
+ bool copyCollection( const string& from , const string& ns , const BSONObj& query , string& errmsg , bool copyIndexes = true, bool logForRepl = true );
};
/* for index info object:
@@ -198,12 +198,12 @@ namespace mongo {
}
}
- bool copyCollectionFromRemote(const string& host, const string& ns, const BSONObj& query, string errmsg) {
+ bool copyCollectionFromRemote(const string& host, const string& ns, const BSONObj& query, string& errmsg, bool logForRepl) {
Cloner c;
- return c.copyCollection(host, ns, query, errmsg , /*copyIndexes*/ true);
+ return c.copyCollection(host, ns, query, errmsg , /*copyIndexes*/ true, logForRepl);
}
- bool Cloner::copyCollection( const string& from , const string& ns , const BSONObj& query , string& errmsg , bool copyIndexes ){
+ bool Cloner::copyCollection( const string& from , const string& ns , const BSONObj& query , string& errmsg , bool copyIndexes, bool logForRepl ) {
auto_ptr<DBClientConnection> myconn;
myconn.reset( new DBClientConnection() );
if ( ! myconn->connect( from , errmsg ) )
@@ -223,12 +223,17 @@ namespace mongo {
}
{ // main data
- copy( ns.c_str() , ns.c_str() , false , true , false , true , Query(query).snapshot() );
+ copy( ns.c_str() , ns.c_str() , /*isindex*/false , logForRepl , false , true , Query(query).snapshot() );
}
+ /* TODO : copyIndexes bool does not seem to be implemented! */
+ if( !copyIndexes ) {
+ log() << "ERROR copy collection copyIndexes not implemented? " << ns << endl;
+ }
+
{ // indexes
string temp = ctx.db()->name + ".system.indexes";
- copy( temp.c_str() , temp.c_str() , true , true , false , true , BSON( "ns" << ns ) );
+ copy( temp.c_str() , temp.c_str() , /*isindex*/true , logForRepl , false , true , BSON( "ns" << ns ) );
}
return true;
}
diff --git a/db/commands.h b/db/commands.h
index eea4a71..a8a61c4 100644
--- a/db/commands.h
+++ b/db/commands.h
@@ -85,9 +85,7 @@ namespace mongo {
Note if run() returns false, we do NOT log.
*/
- virtual bool logTheOp() {
- return false;
- }
+ virtual bool logTheOp() { return false; }
virtual void help( stringstream& help ) const;
diff --git a/db/curop.h b/db/curop.h
index fbeda9f..bf06a69 100644
--- a/db/curop.h
+++ b/db/curop.h
@@ -222,10 +222,7 @@ namespace mongo {
memset(_queryBuf, 0, sizeof(_queryBuf));
}
- ~CurOp(){
- if ( _wrapped )
- _client->_curOp = _wrapped;
- }
+ ~CurOp();
BSONObj info() {
if( ! cc().getAuthenticationInfo()->isAuthorized("admin") ) {
diff --git a/db/db.cpp b/db/db.cpp
index 2b91956..d5b9339 100644
--- a/db/db.cpp
+++ b/db/db.cpp
@@ -67,7 +67,6 @@ namespace mongo {
#endif
void setupSignals();
- void closeAllSockets();
void startReplSets(ReplSetCmdline*);
void startReplication();
void pairWith(const char *remoteEnd, const char *arb);
@@ -104,7 +103,7 @@ namespace mongo {
virtual void accepted(MessagingPort *mp) {
if ( ! connTicketHolder.tryAcquire() ){
- log() << "connection refused because too many open connections: " << connTicketHolder.used() << endl;
+ log() << "connection refused because too many open connections: " << connTicketHolder.used() << " of " << connTicketHolder.outof() << endl;
// TODO: would be nice if we notified them...
mp->shutdown();
delete mp;
@@ -207,16 +206,14 @@ namespace mongo {
void connThread( MessagingPort * inPort )
{
TicketHolderReleaser connTicketReleaser( &connTicketHolder );
- Client::initThread("conn");
/* todo: move to Client object */
LastError *le = new LastError();
lastError.reset(le);
+ inPort->_logLevel = 1;
auto_ptr<MessagingPort> dbMsgPort( inPort );
-
- dbMsgPort->_logLevel = 1;
- Client& c = cc();
+ Client& c = Client::initThread("conn", inPort);
try {
@@ -522,7 +519,7 @@ sendmore:
l << ( is32bit ? " 32" : " 64" ) << "-bit " << endl;
}
DEV log() << "_DEBUG build (which is slower)" << endl;
- show_32_warning();
+ show_warnings();
log() << mongodVersion() << endl;
printGitVersion();
printSysInfo();
@@ -629,7 +626,7 @@ using namespace mongo;
namespace po = boost::program_options;
void show_help_text(po::options_description options) {
- show_32_warning();
+ show_warnings();
cout << options << endl;
};
@@ -1111,9 +1108,6 @@ int main(int argc, char* argv[], char *envp[] )
namespace mongo {
- /* we do not use log() below as it uses a mutex and that could cause deadlocks.
- */
-
string getDbContext();
#undef out
diff --git a/db/db.vcxproj b/db/db.vcxproj
index 61f81fe..0cabbd0 100644
--- a/db/db.vcxproj
+++ b/db/db.vcxproj
@@ -574,14 +574,26 @@
<ClCompile Include="repl\rs_config.cpp" />
</ItemGroup>
<ItemGroup>
- <None Include="..\jstests\rs\rs_basic.js" />
- <None Include="..\jstests\rs\test_framework.js" />
+ <None Include="..\jstests\replsets\replset1.js" />
+ <None Include="..\jstests\replsets\replset2.js" />
+ <None Include="..\jstests\replsets\replset3.js" />
+ <None Include="..\jstests\replsets\replset4.js" />
+ <None Include="..\jstests\replsets\replset5.js" />
+ <None Include="..\jstests\replsets\replsetadd.js" />
+ <None Include="..\jstests\replsets\replsetarb1.js" />
+ <None Include="..\jstests\replsets\replsetarb2.js" />
+ <None Include="..\jstests\replsets\replsetprio1.js" />
+ <None Include="..\jstests\replsets\replsetrestart1.js" />
+ <None Include="..\jstests\replsets\replsetrestart2.js" />
+ <None Include="..\jstests\replsets\replset_remove_node.js" />
+ <None Include="..\jstests\replsets\rollback.js" />
+ <None Include="..\jstests\replsets\rollback2.js" />
+ <None Include="..\jstests\replsets\sync1.js" />
+ <None Include="..\jstests\replsets\twosets.js" />
<None Include="..\SConstruct" />
<None Include="..\util\mongoutils\README" />
<None Include="mongo.ico" />
<None Include="repl\notes.txt" />
- <None Include="repl\test.html" />
- <None Include="repl\testing.js" />
</ItemGroup>
<ItemGroup>
<ClInclude Include="..\client\dbclientcursor.h" />
diff --git a/db/db.vcxproj.filters b/db/db.vcxproj.filters
index f8d72a6..bf30b4e 100755
--- a/db/db.vcxproj.filters
+++ b/db/db.vcxproj.filters
@@ -840,7 +840,7 @@
<Filter Include="replSets">
<UniqueIdentifier>{3b73f786-d352-446f-a5f5-df49384baf7a}</UniqueIdentifier>
</Filter>
- <Filter Include="replSets\test stuff">
+ <Filter Include="replSets\testing">
<UniqueIdentifier>{4a1ea357-1077-4ad1-85b4-db48a6e1eb46}</UniqueIdentifier>
</Filter>
</ItemGroup>
@@ -851,24 +851,60 @@
<None Include="..\util\mongoutils\README">
<Filter>util\mongoutils</Filter>
</None>
- <None Include="repl\testing.js">
- <Filter>replSets\test stuff</Filter>
- </None>
- <None Include="repl\test.html">
- <Filter>replSets\test stuff</Filter>
- </None>
<None Include="..\SConstruct">
<Filter>db</Filter>
</None>
- <None Include="..\jstests\rs\rs_basic.js">
- <Filter>replSets\test stuff</Filter>
- </None>
- <None Include="..\jstests\rs\test_framework.js">
- <Filter>replSets\test stuff</Filter>
- </None>
<None Include="mongo.ico">
<Filter>Resource Files</Filter>
</None>
+ <None Include="..\jstests\replsets\replset_remove_node.js">
+ <Filter>replSets\testing</Filter>
+ </None>
+ <None Include="..\jstests\replsets\replset1.js">
+ <Filter>replSets\testing</Filter>
+ </None>
+ <None Include="..\jstests\replsets\replset2.js">
+ <Filter>replSets\testing</Filter>
+ </None>
+ <None Include="..\jstests\replsets\replset3.js">
+ <Filter>replSets\testing</Filter>
+ </None>
+ <None Include="..\jstests\replsets\replset4.js">
+ <Filter>replSets\testing</Filter>
+ </None>
+ <None Include="..\jstests\replsets\replset5.js">
+ <Filter>replSets\testing</Filter>
+ </None>
+ <None Include="..\jstests\replsets\replsetadd.js">
+ <Filter>replSets\testing</Filter>
+ </None>
+ <None Include="..\jstests\replsets\replsetarb1.js">
+ <Filter>replSets\testing</Filter>
+ </None>
+ <None Include="..\jstests\replsets\replsetarb2.js">
+ <Filter>replSets\testing</Filter>
+ </None>
+ <None Include="..\jstests\replsets\replsetprio1.js">
+ <Filter>replSets\testing</Filter>
+ </None>
+ <None Include="..\jstests\replsets\replsetrestart1.js">
+ <Filter>replSets\testing</Filter>
+ </None>
+ <None Include="..\jstests\replsets\replsetrestart2.js">
+ <Filter>replSets\testing</Filter>
+ </None>
+ <None Include="..\jstests\replsets\rollback.js">
+ <Filter>replSets\testing</Filter>
+ </None>
+ <None Include="..\jstests\replsets\rollback2.js">
+ <Filter>replSets\testing</Filter>
+ </None>
+ <None Include="..\jstests\replsets\sync1.js">
+ <Filter>replSets\testing</Filter>
+ </None>
+ <None Include="..\jstests\replsets\twosets.js">
+ <Filter>replSets\testing</Filter>
+ </None>
</ItemGroup>
<ItemGroup>
<Library Include="..\..\js\js64r.lib">
diff --git a/db/db_10.sln b/db/db_10.sln
index 0aa382f..d68d897 100644
--- a/db/db_10.sln
+++ b/db/db_10.sln
@@ -12,7 +12,6 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "tools", "tools", "{2B262D59
..\tools\bsondump.cpp = ..\tools\bsondump.cpp
..\tools\dump.cpp = ..\tools\dump.cpp
..\tools\export.cpp = ..\tools\export.cpp
- ..\tools\files.cpp = ..\tools\files.cpp
..\tools\import.cpp = ..\tools\import.cpp
..\tools\restore.cpp = ..\tools\restore.cpp
..\tools\sniffer.cpp = ..\tools\sniffer.cpp
@@ -29,9 +28,6 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "unix files", "unix files",
EndProjectSection
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "shell", "shell", "{407B4B88-3451-433C-B74F-31B31FEB5791}"
- ProjectSection(SolutionItems) = preProject
- ..\shell\utils.h = ..\shell\utils.h
- EndProjectSection
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "other", "other", "{12B11474-2D74-48C3-BB3D-F03249BEA88F}"
EndProject
diff --git a/db/dbcommands.cpp b/db/dbcommands.cpp
index 22b0457..96374d9 100644
--- a/db/dbcommands.cpp
+++ b/db/dbcommands.cpp
@@ -74,6 +74,14 @@ namespace mongo {
}
} cmdResetError;
+ /* set by replica sets if specified in the configuration.
+ a pointer is used to avoid any possible locking issues with lockless reading (see below locktype() is NONE
+ and would like to keep that)
+ (for now, it simply orphans any old copy as config changes should be extremely rare).
+ note: once non-null, never goes to null again.
+ */
+ BSONObj *getLastErrorDefault = 0;
+
class CmdGetLastError : public Command {
public:
virtual LockType locktype() const { return NONE; }
@@ -88,7 +96,7 @@ namespace mongo {
help << "return error status of the last operation on this connection";
}
CmdGetLastError() : Command("getLastError", false, "getlasterror") {}
- bool run(const string& dbnamne, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl) {
+ bool run(const string& dbnamne, BSONObj& _cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl) {
LastError *le = lastError.disableForCommand();
if ( le->nPrev != 1 )
LastError::noError.appendSelf( result );
@@ -98,6 +106,18 @@ namespace mongo {
Client& c = cc();
c.appendLastOp( result );
+ BSONObj cmdObj = _cmdObj;
+ {
+ BSONObj::iterator i(_cmdObj);
+ i.next();
+ if( !i.more() ) {
+ /* empty, use default */
+ BSONObj *def = getLastErrorDefault;
+ if( def )
+ cmdObj = *def;
+ }
+ }
+
if ( cmdObj["fsync"].trueValue() ){
log() << "fsync from getlasterror" << endl;
result.append( "fsyncFiles" , MemoryMappedFile::flushAll( true ) );
@@ -389,8 +409,11 @@ namespace mongo {
if ( ! authed )
result.append( "note" , "run against admin for more info" );
- if ( Listener::getElapsedTimeMillis() - start > 1000 )
- result.append( "timing" , timeBuilder.obj() );
+ if ( Listener::getElapsedTimeMillis() - start > 1000 ){
+ BSONObj t = timeBuilder.obj();
+ log() << "serverStatus was very slow: " << t << endl;
+ result.append( "timing" , t );
+ }
return true;
}
@@ -690,12 +713,8 @@ namespace mongo {
class CmdReIndex : public Command {
public:
- virtual bool logTheOp() {
- return true;
- }
- virtual bool slaveOk() const {
- return false;
- }
+ virtual bool logTheOp() { return false; } // only reindexes on the one node
+ virtual bool slaveOk() const { return true; } // can reindex on a secondary
virtual LockType locktype() const { return WRITE; }
virtual void help( stringstream& help ) const {
help << "re-index a collection";
@@ -745,9 +764,6 @@ namespace mongo {
class CmdListDatabases : public Command {
public:
- virtual bool logTheOp() {
- return false;
- }
virtual bool slaveOk() const {
return true;
}
@@ -925,15 +941,34 @@ namespace mongo {
"\nnote: This command may take a while to run";
}
bool run(const string& dbname, BSONObj& jsobj, string& errmsg, BSONObjBuilder& result, bool fromRepl ){
+ Timer timer;
+
string ns = jsobj.firstElement().String();
BSONObj min = jsobj.getObjectField( "min" );
BSONObj max = jsobj.getObjectField( "max" );
BSONObj keyPattern = jsobj.getObjectField( "keyPattern" );
+ bool estimate = jsobj["estimate"].trueValue();
Client::Context ctx( ns );
+ NamespaceDetails *d = nsdetails(ns.c_str());
+
+ if ( ! d || d->nrecords == 0 ){
+ result.appendNumber( "size" , 0 );
+ result.appendNumber( "numObjects" , 0 );
+ result.append( "millis" , timer.millis() );
+ return true;
+ }
+ result.appendBool( "estimate" , estimate );
+
shared_ptr<Cursor> c;
if ( min.isEmpty() && max.isEmpty() ) {
+ if ( estimate ){
+ result.appendNumber( "size" , d->datasize );
+ result.appendNumber( "numObjects" , d->nrecords );
+ result.append( "millis" , timer.millis() );
+ return 1;
+ }
c = theDataFileMgr.findAll( ns.c_str() );
}
else if ( min.isEmpty() || max.isEmpty() ) {
@@ -944,18 +979,24 @@ namespace mongo {
IndexDetails *idx = cmdIndexDetailsForRange( ns.c_str(), errmsg, min, max, keyPattern );
if ( idx == 0 )
return false;
- NamespaceDetails *d = nsdetails(ns.c_str());
+
c.reset( new BtreeCursor( d, d->idxNo(*idx), *idx, min, max, false, 1 ) );
}
+ long long avgObjSize = d->datasize / d->nrecords;
+
long long maxSize = jsobj["maxSize"].numberLong();
long long maxObjects = jsobj["maxObjects"].numberLong();
- Timer timer;
long long size = 0;
long long numObjects = 0;
while( c->ok() ) {
- size += c->currLoc().rec()->netLength();
+
+ if ( estimate )
+ size += avgObjSize;
+ else
+ size += c->currLoc().rec()->netLength();
+
numObjects++;
if ( ( maxSize && size > maxSize ) ||
@@ -974,8 +1015,8 @@ namespace mongo {
}
logIfSlow( timer , os.str() );
- result.append( "size", (double)size );
- result.append( "numObjects" , (double)numObjects );
+ result.appendNumber( "size", size );
+ result.appendNumber( "numObjects" , numObjects );
result.append( "millis" , timer.millis() );
return true;
}
@@ -1555,9 +1596,6 @@ namespace mongo {
class CmdWhatsMyUri : public Command {
public:
CmdWhatsMyUri() : Command("whatsmyuri") { }
- virtual bool logTheOp() {
- return false; // the modification will be logged directly
- }
virtual bool slaveOk() const {
return true;
}
@@ -1693,12 +1731,8 @@ namespace mongo {
public:
virtual LockType locktype() const { return NONE; }
virtual bool adminOnly() const { return true; }
- virtual bool logTheOp() {
- return false;
- }
- virtual bool slaveOk() const {
- return true;
- }
+ virtual bool logTheOp() { return false; }
+ virtual bool slaveOk() const { return true; }
virtual void help( stringstream& help ) const {
help << "internal testing command. Makes db block (in a read lock) for 100 seconds\n";
help << "w:true write lock";
@@ -1798,7 +1832,7 @@ namespace mongo {
if ( c->adminOnly() && ! fromRepl && dbname != "admin" ) {
- result.append( "errmsg" , "access denied- use admin db" );
+ result.append( "errmsg" , "access denied; use admin db" );
log() << "command denied: " << cmdObj.toString() << endl;
return false;
}
diff --git a/db/dbcommands_generic.cpp b/db/dbcommands_generic.cpp
index 340f31c..25c6a93 100644
--- a/db/dbcommands_generic.cpp
+++ b/db/dbcommands_generic.cpp
@@ -196,8 +196,9 @@ namespace mongo {
CmdShutdown() : Command("shutdown") {}
bool run(const string& dbname, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl) {
Client * c = currentClient.get();
- if ( c )
+ if ( c ) {
c->shutdown();
+ }
log() << "terminating, shutdown command received" << endl;
dbexit( EXIT_CLEAN ); // this never returns
return true;
diff --git a/db/instance.cpp b/db/instance.cpp
index ec3b793..9e81464 100644
--- a/db/instance.cpp
+++ b/db/instance.cpp
@@ -62,7 +62,6 @@ namespace mongo {
bool useCursors = true;
bool useHints = true;
- void closeAllSockets();
void flushOpLog( stringstream &ss ) {
if( _diaglog.f && _diaglog.f->is_open() ) {
ss << "flushing op log and files\n";
@@ -332,9 +331,10 @@ namespace mongo {
}
}
catch ( AssertionException& e ) {
- tlog() << " Caught Assertion in " << opToString(op) << " , continuing" << endl;
+ static int n;
+ tlog(3) << " Caught Assertion in " << opToString(op) << ", continuing" << endl;
ss << " exception " + e.toString();
- log = true;
+ log = ++n < 10;
}
}
}
@@ -446,6 +446,7 @@ namespace mongo {
mongolock lk(1);
+ // if this ever moves to outside of lock, need to adjust check Client::Context::_finishInit
if ( ! broadcast && handlePossibleShardedMessage( m , 0 ) )
return;
@@ -472,8 +473,10 @@ namespace mongo {
}
writelock lk(ns);
+ // if this ever moves to outside of lock, need to adjust check Client::Context::_finishInit
if ( ! broadcast & handlePossibleShardedMessage( m , 0 ) )
return;
+
Client::Context ctx(ns);
long long n = deleteObjects(ns, pattern, justOne, true);
@@ -709,32 +712,32 @@ namespace mongo {
}
catch (...) { }
- tryToOutputFatal( "dbexit: really exiting now\n" );
+ tryToOutputFatal( "dbexit: really exiting now" );
if ( c ) c->shutdown();
::exit(rc);
}
void shutdown() {
- log() << "\t shutdown: going to close listening sockets..." << endl;
+ log() << "shutdown: going to close listening sockets..." << endl;
ListeningSockets::get()->closeAll();
- log() << "\t shutdown: going to flush oplog..." << endl;
+ log() << "shutdown: going to flush oplog..." << endl;
stringstream ss2;
flushOpLog( ss2 );
rawOut( ss2.str() );
/* must do this before unmapping mem or you may get a seg fault */
- log() << "\t shutdown: going to close sockets..." << endl;
- boost::thread close_socket_thread(closeAllSockets);
+ log() << "shutdown: going to close sockets..." << endl;
+ boost::thread close_socket_thread( boost::bind(MessagingPort::closeAllSockets, 0) );
// wait until file preallocation finishes
// we would only hang here if the file_allocator code generates a
// synchronous signal, which we don't expect
- log() << "\t shutdown: waiting for fs preallocator..." << endl;
+ log() << "shutdown: waiting for fs preallocator..." << endl;
theFileAllocator().waitUntilFinished();
- log() << "\t shutdown: closing all files..." << endl;
+ log() << "shutdown: closing all files..." << endl;
stringstream ss3;
MemoryMappedFile::closeAllFiles( ss3 );
rawOut( ss3.str() );
@@ -744,9 +747,9 @@ namespace mongo {
#if !defined(_WIN32) && !defined(__sunos__)
if ( lockFile ){
- log() << "\t shutdown: removing fs lock..." << endl;
+ log() << "shutdown: removing fs lock..." << endl;
if( ftruncate( lockFile , 0 ) )
- log() << "\t couldn't remove fs lock " << errnoWithDescription() << endl;
+ log() << "couldn't remove fs lock " << errnoWithDescription() << endl;
flock( lockFile, LOCK_UN );
}
#endif
@@ -766,12 +769,14 @@ namespace mongo {
bool oldFile = false;
- if ( boost::filesystem::exists( name ) && boost::filesystem::file_size( name ) > 0 ){
+ if ( boost::filesystem::exists( name ) && boost::filesystem::file_size( name ) > 0 ) {
oldFile = true;
}
lockFile = open( name.c_str(), O_RDWR | O_CREAT , S_IRWXU | S_IRWXG | S_IRWXO );
- uassert( 10309 , "Unable to create / open lock file for lockfilepath: " + name, lockFile > 0 );
+ if( lockFile <= 0 ) {
+ uasserted( 10309 , str::stream() << "Unable to create / open lock file for lockfilepath: " << name << ' ' << errnoWithDescription());
+ }
if (flock( lockFile, LOCK_EX | LOCK_NB ) != 0) {
close ( lockFile );
lockFile = 0;
diff --git a/db/lasterror.cpp b/db/lasterror.cpp
index 630fcfb..12fc694 100644
--- a/db/lasterror.cpp
+++ b/db/lasterror.cpp
@@ -34,8 +34,11 @@ namespace mongo {
void raiseError(int code , const char *msg) {
LastError *le = lastError.get();
if ( le == 0 ) {
- /* might be intentional (non-user thread) */
- OCCASIONALLY DEV if( !isShell ) log() << "warning dev: lastError==0 won't report:" << msg << endl;
+ /* might be intentional (non-user thread) */
+ DEV {
+ static unsigned n;
+ if( ++n < 4 && !isShell ) log() << "dev: lastError==0 won't report:" << msg << endl;
+ }
} else if ( le->disabled ) {
log() << "lastError disabled, can't report: " << code << ":" << msg << endl;
} else {
diff --git a/db/matcher.cpp b/db/matcher.cpp
index 681a6dc..cd62563 100644
--- a/db/matcher.cpp
+++ b/db/matcher.cpp
@@ -259,7 +259,7 @@ namespace mongo {
}
void Matcher::parseOr( const BSONElement &e, bool subMatcher, list< shared_ptr< Matcher > > &matchers ) {
- uassert( 13090, "recursive $or/$nor not allowed", !subMatcher );
+ uassert( 13090, "nested $or/$nor not allowed", !subMatcher );
uassert( 13086, "$or/$nor must be a nonempty array", e.type() == Array && e.embeddedObject().nFields() > 0 );
BSONObjIterator j( e.embeddedObject() );
while( j.more() ) {
diff --git a/db/oplog.cpp b/db/oplog.cpp
index 4ad4ca9..93800c7 100644
--- a/db/oplog.cpp
+++ b/db/oplog.cpp
@@ -142,8 +142,10 @@ namespace mongo {
this code (or code in now() maybe) should be improved.
*/
if( theReplSet ) {
- if( !(theReplSet->lastOpTimeWritten<ts) )
- log() << "replSet ERROR possible failover clock skew issue? " << theReplSet->lastOpTimeWritten << ' ' << ts << endl;
+ if( !(theReplSet->lastOpTimeWritten<ts) ) {
+ log() << "replSet ERROR possible failover clock skew issue? " << theReplSet->lastOpTimeWritten << ' ' << ts << rsLog;
+ log() << "replSet " << theReplSet->isPrimary() << rsLog;
+ }
theReplSet->lastOpTimeWritten = ts;
theReplSet->lastH = hNew;
ctx.getClient()->setLastOp( ts.asDate() );
diff --git a/db/oplog.h b/db/oplog.h
index d1e4990..34c345f 100644
--- a/db/oplog.h
+++ b/db/oplog.h
@@ -195,7 +195,7 @@ namespace mongo {
// Use a ClientCursor here so we can release db mutex while scanning
// oplog (can take quite a while with large oplogs).
shared_ptr<Cursor> c = _qp.newReverseCursor();
- _findingStartCursor = new ClientCursor(QueryOption_NoCursorTimeout, c, _qp.ns());
+ _findingStartCursor = new ClientCursor(QueryOption_NoCursorTimeout, c, _qp.ns(), BSONObj());
_findingStartTimer.reset();
_findingStartMode = Initial;
BSONElement tsElt = _qp.originalQuery()[ "ts" ];
diff --git a/db/pdfile.cpp b/db/pdfile.cpp
index cf7cb22..216f21a 100644
--- a/db/pdfile.cpp
+++ b/db/pdfile.cpp
@@ -56,8 +56,6 @@ namespace mongo {
}
};
- const int MaxExtentSize = 0x7ff00000;
-
map<string, unsigned> BackgroundOperation::dbsInProg;
set<string> BackgroundOperation::nsInProg;
@@ -157,7 +155,7 @@ namespace mongo {
sz = 1000000000;
int z = ((int)sz) & 0xffffff00;
assert( z > len );
- DEV tlog() << "initialExtentSize(" << len << ") returns " << z << endl;
+ //DEV tlog() << "initialExtentSize(" << len << ") returns " << z << endl;
return z;
}
@@ -272,10 +270,19 @@ namespace mongo {
/*---------------------------------------------------------------------*/
int MongoDataFile::maxSize() {
- if ( sizeof( int* ) == 4 )
+ if ( sizeof( int* ) == 4 ) {
return 512 * 1024 * 1024;
- else
+ } else if ( cmdLine.smallfiles ) {
+ return 0x7ff00000 >> 2;
+ } else {
return 0x7ff00000;
+ }
+ }
+
+ void MongoDataFile::badOfs(int ofs) const {
+ stringstream ss;
+ ss << "bad offset:" << ofs << " accessing file: " << mmf.filename() << " - consider repairing database";
+ uasserted(13440, ss.str());
}
int MongoDataFile::defaultSize( const char *filename ) const {
@@ -380,7 +387,7 @@ namespace mongo {
Extent* MongoDataFile::createExtent(const char *ns, int approxSize, bool newCapped, int loops) {
massert( 10357 , "shutdown in progress", !goingAway );
- massert( 10358 , "bad new extent size", approxSize >= 0 && approxSize <= MaxExtentSize );
+ massert( 10358 , "bad new extent size", approxSize >= 0 && approxSize <= Extent::maxSize() );
massert( 10359 , "header==0 on new extent: 32 bit mmap space exceeded?", header ); // null if file open failed
int ExtentSize = approxSize <= header->unusedLength ? approxSize : header->unusedLength;
DiskLoc loc;
@@ -403,8 +410,8 @@ namespace mongo {
addNewExtentToNamespace(ns, e, loc, emptyLoc, newCapped);
- DEV tlog() << "new extent " << ns << " size: 0x" << hex << ExtentSize << " loc: 0x" << hex << offset
- << " emptyLoc:" << hex << emptyLoc.getOfs() << dec << endl;
+ DEV tlog(1) << "new extent " << ns << " size: 0x" << hex << ExtentSize << " loc: 0x" << hex << offset
+ << " emptyLoc:" << hex << emptyLoc.getOfs() << dec << endl;
return e;
}
@@ -568,6 +575,14 @@ namespace mongo {
}
*/
+ int Extent::maxSize() {
+ int maxExtentSize = 0x7ff00000;
+ if ( cmdLine.smallfiles ) {
+ maxExtentSize >>= 2;
+ }
+ return maxExtentSize;
+ }
+
/*---------------------------------------------------------------------*/
shared_ptr<Cursor> DataFileMgr::findAll(const char *ns, const DiskLoc &startLoc) {
@@ -897,7 +912,7 @@ namespace mongo {
if ( toupdate->netLength() < objNew.objsize() ) {
// doesn't fit. reallocate -----------------------------------------------------
- uassert( 10003 , "E10003 failing update: objects in a capped ns cannot grow", !(d && d->capped));
+ uassert( 10003 , "failing update: objects in a capped ns cannot grow", !(d && d->capped));
d->paddingTooSmall();
if ( cc().database()->profile )
ss << " moved ";
@@ -950,15 +965,15 @@ namespace mongo {
}
int followupExtentSize(int len, int lastExtentLen) {
- assert( len < MaxExtentSize );
+ assert( len < Extent::maxSize() );
int x = initialExtentSize(len);
int y = (int) (lastExtentLen < 4000000 ? lastExtentLen * 4.0 : lastExtentLen * 1.2);
int sz = y > x ? y : x;
if ( sz < lastExtentLen )
sz = lastExtentLen;
- else if ( sz > MaxExtentSize )
- sz = MaxExtentSize;
+ else if ( sz > Extent::maxSize() )
+ sz = Extent::maxSize();
sz = ((int)sz) & 0xffffff00;
assert( sz > len );
@@ -1029,7 +1044,7 @@ namespace mongo {
Timer t;
- tlog() << "Buildindex " << ns << " idxNo:" << idxNo << ' ' << idx.info.obj().toString() << endl;
+ tlog(1) << "fastBuildIndex " << ns << " idxNo:" << idxNo << ' ' << idx.info.obj().toString() << endl;
bool dupsAllowed = !idx.unique();
bool dropDups = idx.dropDups() || inDBRepair;
@@ -1514,6 +1529,13 @@ namespace mongo {
BSONObj info = loc.obj();
bool background = info["background"].trueValue();
+ if( background && cc().isSyncThread() ) {
+ /* don't do background indexing on slaves. there are nuances. this could be added later
+ but requires more code.
+ */
+ log() << "info: indexing in foreground on this replica; was a background index build on the primary" << endl;
+ background = false;
+ }
int idxNo = tableToIndex->nIndexes;
IndexDetails& idx = tableToIndex->addIndex(tabletoidxns.c_str(), !background); // clear transient info caches so they refresh; increments nIndexes
diff --git a/db/pdfile.h b/db/pdfile.h
index 084a542..d268aac 100644
--- a/db/pdfile.h
+++ b/db/pdfile.h
@@ -79,6 +79,8 @@ namespace mongo {
void flush( bool sync );
private:
+ void badOfs(int) const;
+
int defaultSize( const char *filename ) const;
Extent* getExtent(DiskLoc loc);
@@ -255,6 +257,8 @@ namespace mongo {
Extent* getPrevExtent() {
return xprev.isNull() ? 0 : DataFileMgr::getExtent(xprev);
}
+
+ static int maxSize();
};
/*
@@ -339,7 +343,7 @@ namespace mongo {
inline Record* MongoDataFile::recordAt(DiskLoc dl) {
int ofs = dl.getOfs();
- assert( ofs >= DataFileHeader::HeaderSize );
+ if( ofs < DataFileHeader::HeaderSize ) badOfs(ofs); // will uassert - external call to keep out of the normal code path
return (Record*) _p.at(ofs, -1);
}
diff --git a/db/query.cpp b/db/query.cpp
index bfe845c..5bd7b00 100644
--- a/db/query.cpp
+++ b/db/query.cpp
@@ -1044,14 +1044,13 @@ namespace mongo {
if ( moreClauses ) {
// this MultiCursor will use a dumb NoOp to advance(), so no need to specify mayYield
shared_ptr< Cursor > multi( new MultiCursor( mps, cursor, dqo.matcher(), dqo ) );
- cc = new ClientCursor(queryOptions, multi, ns);
+ cc = new ClientCursor(queryOptions, multi, ns, jsobj.getOwned());
} else {
cursor->setMatcher( dqo.matcher() );
- cc = new ClientCursor( queryOptions, cursor, ns );
+ cc = new ClientCursor( queryOptions, cursor, ns, jsobj.getOwned() );
}
cursorid = cc->cursorid;
- cc->query = jsobj.getOwned();
- DEV tlog() << "query has more, cursorid: " << cursorid << endl;
+ DEV tlog(2) << "query has more, cursorid: " << cursorid << endl;
cc->pos = n;
cc->pq = pq_shared;
cc->fields = pq.getFieldPtr();
diff --git a/db/queryoptimizer.cpp b/db/queryoptimizer.cpp
index 3d4cbd0..e7068c2 100644
--- a/db/queryoptimizer.cpp
+++ b/db/queryoptimizer.cpp
@@ -361,7 +361,7 @@ namespace mongo {
const IndexSpec& spec = ii.getSpec();
if ( spec.getTypeName() == _special && spec.suitability( _originalQuery , order_ ) ){
usingPrerecordedPlan_ = true;
- mayRecordPlan_ = true;
+ mayRecordPlan_ = false;
plans_.push_back( PlanPtr( new QueryPlan( d , j , *fbs_ , _originalQuery, order_ ,
BSONObj() , BSONObj() , _special ) ) );
return;
diff --git a/db/queryutil.cpp b/db/queryutil.cpp
index 791096f..007a1ce 100644
--- a/db/queryutil.cpp
+++ b/db/queryutil.cpp
@@ -944,6 +944,8 @@ namespace mongo {
return ( l % 2 == 0 ); // if we're inside an interval
}
+ // binary search for interval containing the specified element
+ // an even return value indicates that the element is contained within a valid interval
int FieldRangeVector::matchingLowElement( const BSONElement &e, int i, bool forward ) const {
int l = -1;
int h = _ranges[ i ].intervals().size() * 2;
@@ -1007,9 +1009,13 @@ namespace mongo {
int FieldRangeVector::Iterator::advance( const BSONObj &curr ) {
BSONObjIterator j( curr );
BSONObjIterator o( _v._keyPattern );
+ // track first field for which we are not at the end of the valid values,
+ // since we may need to advance from the key prefix ending with this field
int latestNonEndpoint = -1;
+ // iterate over fields to determine appropriate advance method
for( int i = 0; i < (int)_i.size(); ++i ) {
if ( i > 0 && !_v._ranges[ i - 1 ].intervals()[ _i[ i - 1 ] ].equality() ) {
+ // if last bound was inequality, we don't know anything about where we are for this field
// TODO if possible avoid this certain cases when field in prev key is the same
setMinus( i );
}
@@ -1017,9 +1023,9 @@ namespace mongo {
BSONElement oo = o.next();
bool reverse = ( ( oo.number() < 0 ) ^ ( _v._direction < 0 ) );
BSONElement jj = j.next();
- if ( _i[ i ] == -1 ) {
+ if ( _i[ i ] == -1 ) { // unknown position for this field, do binary search
int l = _v.matchingLowElement( jj, i, !reverse );
- if ( l % 2 == 0 ) {
+ if ( l % 2 == 0 ) { // we are in a valid range for this field
_i[ i ] = l / 2;
int diff = (int)_v._ranges[ i ].intervals().size() - _i[ i ];
if ( diff > 1 ) {
@@ -1031,7 +1037,8 @@ namespace mongo {
}
}
continue;
- } else {
+ } else { // not in a valid range for this field - determine if and how to advance
+ // check if we're after the last interval for this field
if ( l == (int)_v._ranges[ i ].intervals().size() * 2 - 1 ) {
if ( latestNonEndpoint == -1 ) {
return -2;
@@ -1053,7 +1060,11 @@ namespace mongo {
}
}
bool first = true;
+ // _i[ i ] != -1, so we have a starting interval for this field
+ // which serves as a lower/equal bound on the first iteration -
+ // we advance from this interval to find a matching interval
while( _i[ i ] < (int)_v._ranges[ i ].intervals().size() ) {
+ // compare to current interval's upper bound
int x = _v._ranges[ i ].intervals()[ _i[ i ] ]._upper._bound.woCompare( jj, false );
if ( reverse ) {
x = -x;
@@ -1062,16 +1073,22 @@ namespace mongo {
eq = true;
break;
}
+ // see if we're less than the upper bound
if ( x > 0 ) {
if ( i == 0 && first ) {
- break; // the value of 1st field won't go backward
+ // the value of 1st field won't go backward, so don't check lower bound
+ // TODO maybe we can check first only?
+ break;
}
+ // if it's an equality interval, don't need to compare separately to lower bound
if ( !_v._ranges[ i ].intervals()[ _i[ i ] ].equality() ) {
+ // compare to current interval's lower bound
x = _v._ranges[ i ].intervals()[ _i[ i ] ]._lower._bound.woCompare( jj, false );
if ( reverse ) {
x = -x;
}
}
+ // if we're less than the lower bound, advance
if ( x > 0 ) {
setZero( i + 1 );
// skip to curr / i / nextbounds
@@ -1084,17 +1101,20 @@ namespace mongo {
break;
}
}
+ // we're above the upper bound, so try next interval and reset remaining fields
++_i[ i ];
setZero( i + 1 );
first = false;
}
int diff = (int)_v._ranges[ i ].intervals().size() - _i[ i ];
if ( diff > 1 || ( !eq && diff == 1 ) ) {
+ // check if we're not at the end of valid values for this field
latestNonEndpoint = i;
- } else if ( diff == 0 ) {
+ } else if ( diff == 0 ) { // check if we're past the last interval for this field
if ( latestNonEndpoint == -1 ) {
return -2;
}
+ // more values possible, skip...
setZero( latestNonEndpoint + 1 );
// skip to curr / latestNonEndpoint + 1 / superlative
for( int j = latestNonEndpoint + 1; j < (int)_i.size(); ++j ) {
diff --git a/db/repl.cpp b/db/repl.cpp
index 37197ba..085ae64 100644
--- a/db/repl.cpp
+++ b/db/repl.cpp
@@ -137,9 +137,6 @@ namespace mongo {
virtual bool adminOnly() const {
return true;
}
- virtual bool logTheOp() {
- return false;
- }
virtual LockType locktype() const { return WRITE; }
void help(stringstream&h) const { h << "replace a node in a replica pair"; }
CmdReplacePeer() : Command("replacePeer", false, "replacepeer") { }
@@ -199,9 +196,6 @@ namespace mongo {
virtual bool adminOnly() const {
return true;
}
- virtual bool logTheOp() {
- return false;
- }
virtual void help(stringstream& h) const { h << "internal"; }
virtual LockType locktype() const { return WRITE; }
CmdForceDead() : Command("forcedead") { }
@@ -222,9 +216,7 @@ namespace mongo {
virtual bool adminOnly() const {
return true;
}
- virtual bool logTheOp() {
- return false;
- }
+ virtual bool logTheOp() { return false; }
virtual LockType locktype() const { return WRITE; }
void help(stringstream&h) const { h << "resync (from scratch) an out of date replica slave.\nhttp://www.mongodb.org/display/DOCS/Master+Slave"; }
CmdResync() : Command("resync") { }
@@ -861,7 +853,6 @@ namespace mongo {
if( logLevel >= 6 ) // op.tostring is expensive so doing this check explicitly
log(6) << "processing op: " << op << endl;
- // skip no-op
if( op.getStringField("op")[0] == 'n' )
return;
@@ -1260,8 +1251,14 @@ namespace mongo {
if ( tailing || initial ) {
if ( initial )
log(1) << "repl: initial run\n";
- else
- assert( syncedTo < nextOpTime );
+ else {
+ if( !( syncedTo <= nextOpTime ) ) {
+ log() << "repl ASSERTION failed : syncedTo <= nextOpTime" << endl;
+ log() << "repl syncTo: " << syncedTo.toStringLong() << endl;
+ log() << "repl nextOpTime: " << nextOpTime.toStringLong() << endl;
+ assert(false);
+ }
+ }
oplogReader.putBack( op ); // op will be processed in the loop below
nextOpTime = OpTime(); // will reread the op below
}
@@ -1690,6 +1687,7 @@ namespace mongo {
void replSlaveThread() {
sleepsecs(1);
Client::initThread("replslave");
+ cc().iAmSyncThread();
{
dblock lk;
diff --git a/db/repl/consensus.cpp b/db/repl/consensus.cpp
index 4eba17d..4044538 100644
--- a/db/repl/consensus.cpp
+++ b/db/repl/consensus.cpp
@@ -83,6 +83,17 @@ namespace mongo {
return vUp * 2 > totalVotes();
}
+ bool Consensus::shouldRelinquish() const {
+ int vUp = rs._self->config().votes;
+ const long long T = rs.config().ho.heartbeatTimeoutMillis * rs.config().ho.heartbeatConnRetries;
+ for( Member *m = rs.head(); m; m=m->next() ) {
+ long long dt = m->hbinfo().timeDown();
+ if( dt < T )
+ vUp += m->config().votes;
+ }
+ return !( vUp * 2 > totalVotes() );
+ }
+
static const int VETO = -10000;
const time_t LeaseTime = 30;
@@ -322,6 +333,7 @@ namespace mongo {
void Consensus::electSelf() {
assert( !rs.lockedByMe() );
assert( !rs.myConfig().arbiterOnly );
+ assert( rs.myConfig().slaveDelay == 0 );
try {
_electSelf();
}
diff --git a/db/repl/health.cpp b/db/repl/health.cpp
index b0be25f..72396fe 100644
--- a/db/repl/health.cpp
+++ b/db/repl/health.cpp
@@ -89,11 +89,13 @@ namespace mongo {
}
s << td(config().votes);
{
- string stateText = ReplSet::stateAsStr(state());
+ string stateText = state().toString();
+ if( _config.hidden )
+ stateText += " (hidden)";
if( ok || stateText.empty() )
s << td(stateText); // text blank if we've never connected
else
- s << td( grey(str::stream() << "(was " << ReplSet::stateAsStr(state()) << ')', true) );
+ s << td( grey(str::stream() << "(was " << state().toString() << ')', true) );
}
s << td( grey(hbinfo().lastHeartbeatMsg,!ok) );
stringstream q;
@@ -105,28 +107,30 @@ namespace mongo {
s << td("");
s << _tr();
}
-
+
string ReplSetImpl::stateAsHtml(MemberState s) {
if( s.s == MemberState::RS_STARTUP ) return a("", "serving still starting up, or still trying to initiate the set", "STARTUP");
if( s.s == MemberState::RS_PRIMARY ) return a("", "this server thinks it is primary", "PRIMARY");
if( s.s == MemberState::RS_SECONDARY ) return a("", "this server thinks it is a secondary (slave mode)", "SECONDARY");
if( s.s == MemberState::RS_RECOVERING ) return a("", "recovering/resyncing; after recovery usually auto-transitions to secondary", "RECOVERING");
- if( s.s == MemberState::RS_FATAL ) return a("", "something bad has occurred and server is not completely offline with regard to the replica set. fatal error.", "RS_FATAL");
- if( s.s == MemberState::RS_STARTUP2 ) return a("", "loaded config, still determining who is primary", "RS_STARTUP2");
+ if( s.s == MemberState::RS_FATAL ) return a("", "something bad has occurred and server is not completely offline with regard to the replica set. fatal error.", "FATAL");
+ if( s.s == MemberState::RS_STARTUP2 ) return a("", "loaded config, still determining who is primary", "STARTUP2");
if( s.s == MemberState::RS_ARBITER ) return a("", "this server is an arbiter only", "ARBITER");
if( s.s == MemberState::RS_DOWN ) return a("", "member is down, slow, or unreachable", "DOWN");
+ if( s.s == MemberState::RS_ROLLBACK ) return a("", "rolling back operations to get in sync", "ROLLBACK");
return "";
}
- string ReplSetImpl::stateAsStr(MemberState s) {
- if( s.s == MemberState::RS_STARTUP ) return "STARTUP";
- if( s.s == MemberState::RS_PRIMARY ) return "PRIMARY";
- if( s.s == MemberState::RS_SECONDARY ) return "SECONDARY";
- if( s.s == MemberState::RS_RECOVERING ) return "RECOVERING";
- if( s.s == MemberState::RS_FATAL ) return "FATAL";
- if( s.s == MemberState::RS_STARTUP2 ) return "STARTUP2";
- if( s.s == MemberState::RS_ARBITER ) return "ARBITER";
- if( s.s == MemberState::RS_DOWN ) return "DOWN";
+ string MemberState::toString() const {
+ if( s == MemberState::RS_STARTUP ) return "STARTUP";
+ if( s == MemberState::RS_PRIMARY ) return "PRIMARY";
+ if( s == MemberState::RS_SECONDARY ) return "SECONDARY";
+ if( s == MemberState::RS_RECOVERING ) return "RECOVERING";
+ if( s == MemberState::RS_FATAL ) return "FATAL";
+ if( s == MemberState::RS_STARTUP2 ) return "STARTUP2";
+ if( s == MemberState::RS_ARBITER ) return "ARBITER";
+ if( s == MemberState::RS_DOWN ) return "DOWN";
+ if( s == MemberState::RS_ROLLBACK ) return "ROLLBACK";
return "";
}
@@ -302,7 +306,7 @@ namespace mongo {
td(ago(started)) <<
td("") << // last heartbeat
td(ToString(_self->config().votes)) <<
- td(stateAsHtml(box.getState()));
+ td( stateAsHtml(box.getState()) + (_self->config().hidden?" (hidden)":"") );
s << td( _hbmsg );
stringstream q;
q << "/_replSetOplog?" << _self->id();
diff --git a/db/repl/health.h b/db/repl/health.h
index 8b1005e..645a3b5 100644
--- a/db/repl/health.h
+++ b/db/repl/health.h
@@ -27,7 +27,7 @@ namespace mongo {
HealthOptions() {
heartbeatSleepMillis = 2000;
heartbeatTimeoutMillis = 10000;
- heartbeatConnRetries = 3;
+ heartbeatConnRetries = 2;
}
bool isDefault() const { return *this == HealthOptions(); }
diff --git a/db/repl/heartbeat.cpp b/db/repl/heartbeat.cpp
index 78ce5d1..4f28897 100644
--- a/db/repl/heartbeat.cpp
+++ b/db/repl/heartbeat.cpp
@@ -40,6 +40,13 @@ namespace mongo {
// hacky
string *discoveredSeed = 0;
+ long long HeartbeatInfo::timeDown() const {
+ if( up() ) return 0;
+ if( downSince == 0 )
+ return 0; // still waiting on first heartbeat
+ return jsTime() - downSince;
+ }
+
/* { replSetHeartbeat : <setname> } */
class CmdReplSetHeartbeat : public ReplSetCommand {
public:
@@ -55,6 +62,14 @@ namespace mongo {
errmsg = "not running with --replSet";
return false;
}
+
+ /* we want to keep heartbeat connections open when relinquishing primary. tag them here. */
+ {
+ MessagingPort *mp = cc()._mp;
+ if( mp )
+ mp->tag |= 1;
+ }
+
if( cmdObj["pv"].Int() != 1 ) {
errmsg = "incompatible replset protocol version";
return false;
@@ -91,7 +106,7 @@ namespace mongo {
result.append("set", theReplSet->name());
result.append("state", theReplSet->state().s);
result.append("hbmsg", theReplSet->hbmsg());
- result.append("time", (int) time(0));
+ result.append("time", (long long) time(0));
result.appendDate("opTime", theReplSet->lastOpTimeWritten.asDate());
int v = theReplSet->config().version;
result.append("v", v);
@@ -196,7 +211,12 @@ namespace mongo {
static time_t last = 0;
time_t now = time(0);
- if( mem.changed(old) || now-last>4 ) {
+ bool changed = mem.changed(old);
+ if( changed ) {
+ if( old.hbstate != mem.hbstate )
+ log() << "replSet " << h.toString() << ' ' << mem.hbstate.toString() << rsLog;
+ }
+ if( changed || now-last>4 ) {
last = now;
theReplSet->mgr->send( boost::bind(&Manager::msgCheckNewState, theReplSet->mgr) );
}
@@ -205,8 +225,9 @@ namespace mongo {
private:
void down(HeartbeatInfo& mem, string msg) {
mem.health = 0.0;
- if( mem.upSince ) {
+ if( mem.upSince || mem.downSince == 0 ) {
mem.upSince = 0;
+ mem.downSince = jsTime();
log() << "replSet info " << h.toString() << " is now down (or slow to respond)" << rsLog;
}
mem.lastHeartbeatMsg = msg;
diff --git a/db/repl/manager.cpp b/db/repl/manager.cpp
index e870688..862ac46 100644
--- a/db/repl/manager.cpp
+++ b/db/repl/manager.cpp
@@ -29,12 +29,17 @@ namespace mongo {
};
/* check members OTHER THAN US to see if they think they are primary */
- const Member * Manager::findOtherPrimary() {
+ const Member * Manager::findOtherPrimary(bool& two) {
+ two = false;
Member *m = rs->head();
Member *p = 0;
while( m ) {
+ DEV assert( m != rs->_self );
if( m->state().primary() && m->hbinfo().up() ) {
- if( p ) throw "twomasters"; // our polling is asynchronous, so this is often ok.
+ if( p ) {
+ two = true;
+ return 0;
+ }
p = m;
}
m = m->next();
@@ -63,7 +68,11 @@ namespace mongo {
if( rs->box.getPrimary() == m )
return;
rs->_self->lhb() = "";
- rs->box.set(rs->iAmArbiterOnly() ? MemberState::RS_ARBITER : MemberState::RS_RECOVERING, m);
+ if( rs->iAmArbiterOnly() ) {
+ rs->box.set(MemberState::RS_ARBITER, m);
+ } else {
+ rs->box.noteRemoteIsPrimary(m);
+ }
}
/** called as the health threads get new results */
@@ -87,11 +96,14 @@ namespace mongo {
}
const Member *p2;
- try { p2 = findOtherPrimary(); }
- catch(string s) {
- /* two other nodes think they are primary (asynchronously polled) -- wait for things to settle down. */
- log() << "replSet warning DIAG 2 primary" << s << rsLog;
- return;
+ {
+ bool two;
+ p2 = findOtherPrimary(two);
+ if( two ) {
+ /* two other nodes think they are primary (asynchronously polled) -- wait for things to settle down. */
+ log() << "replSet warning DIAG two primaries (transiently)" << rsLog;
+ return;
+ }
}
if( p2 ) {
@@ -136,7 +148,7 @@ namespace mongo {
return;
}
- if( !rs->elect.aMajoritySeemsToBeUp() ) {
+ if( rs->elect.shouldRelinquish() ) {
log() << "replSet can't see a majority of the set, relinquishing primary" << rsLog;
rs->relinquish();
}
diff --git a/db/repl/replset_commands.cpp b/db/repl/replset_commands.cpp
index f8f46d5..328b0ab 100644
--- a/db/repl/replset_commands.cpp
+++ b/db/repl/replset_commands.cpp
@@ -34,19 +34,27 @@ namespace mongo {
*/
bool replSetBlind = false;
+ unsigned replSetForceInitialSyncFailure = 0;
class CmdReplSetTest : public ReplSetCommand {
public:
virtual void help( stringstream &help ) const {
- help << "Just for testing : do not use.\n";
+ help << "Just for regression tests.\n";
}
CmdReplSetTest() : ReplSetCommand("replSetTest") { }
virtual bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl) {
+ log() << "replSet replSetTest command received: " << cmdObj.toString() << rsLog;
+ if( cmdObj.hasElement("forceInitialSyncFailure") ) {
+ replSetForceInitialSyncFailure = (unsigned) cmdObj["forceInitialSyncFailure"].Number();
+ return true;
+ }
+
+ // may not need this, but if removed check all tests still work:
if( !check(errmsg, result) )
return false;
+
if( cmdObj.hasElement("blind") ) {
replSetBlind = cmdObj.getBoolField("blind");
- log() << "replSet info replSetTest command received, replSetBlind=" << replSetBlind << rsLog;
return true;
}
return false;
@@ -55,6 +63,7 @@ namespace mongo {
class CmdReplSetGetRBID : public ReplSetCommand {
public:
+ /* todo: ideally this should only change on rollbacks NOT on mongod restarts also. fix... */
int rbid;
virtual void help( stringstream &help ) const {
help << "internal";
@@ -65,12 +74,15 @@ namespace mongo {
virtual bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl) {
if( !check(errmsg, result) )
return false;
- result.append("rbid",rbid);
+ result.append("rbid",rbid);
return true;
}
} cmdReplSetRBID;
using namespace bson;
+ void incRBID() {
+ cmdReplSetRBID.rbid++;
+ }
int getRBID(DBClientConnection *c) {
bo info;
c->simpleCommand("admin", &info, "replSetGetRBID");
diff --git a/db/repl/rs.cpp b/db/repl/rs.cpp
index a6737be..1c0444a 100644
--- a/db/repl/rs.cpp
+++ b/db/repl/rs.cpp
@@ -31,39 +31,48 @@ namespace mongo {
extern string *discoveredSeed;
void ReplSetImpl::sethbmsg(string s, int logLevel) {
- static time_t lastLogged;
- if( s == _hbmsg ) {
- // unchanged
- if( time(0)-lastLogged < 60 )
- return;
- }
-
- unsigned sz = s.size();
- if( sz >= 256 )
- memcpy(_hbmsg, s.c_str(), 255);
- else {
- _hbmsg[sz] = 0;
- memcpy(_hbmsg, s.c_str(), sz);
- }
- if( !s.empty() ) {
- lastLogged = time(0);
- log(logLevel) << "replSet " << s << rsLog;
- }
+ static time_t lastLogged;
+ _hbmsgTime = time(0);
+
+ if( s == _hbmsg ) {
+ // unchanged
+ if( _hbmsgTime - lastLogged < 60 )
+ return;
+ }
+
+ unsigned sz = s.size();
+ if( sz >= 256 )
+ memcpy(_hbmsg, s.c_str(), 255);
+ else {
+ _hbmsg[sz] = 0;
+ memcpy(_hbmsg, s.c_str(), sz);
+ }
+ if( !s.empty() ) {
+ lastLogged = _hbmsgTime;
+ log(logLevel) << "replSet " << s << rsLog;
+ }
}
void ReplSetImpl::assumePrimary() {
assert( iAmPotentiallyHot() );
writelock lk("admin."); // so we are synchronized with _logOp()
box.setSelfPrimary(_self);
- log() << "replSet PRIMARY" << rsLog; // self (" << _self->id() << ") is now primary" << rsLog;
+ //log() << "replSet PRIMARY" << rsLog; // self (" << _self->id() << ") is now primary" << rsLog;
}
void ReplSetImpl::changeState(MemberState s) { box.change(s, _self); }
void ReplSetImpl::relinquish() {
if( box.getState().primary() ) {
+ log() << "replSet relinquishing primary state" << rsLog;
changeState(MemberState::RS_RECOVERING);
- log() << "replSet info relinquished primary state" << rsLog;
+
+ /* close sockets that were talking to us */
+ /*log() << "replSet closing sockets after reqlinquishing primary" << rsLog;
+ MessagingPort::closeAllSockets(1);*/
+
+ // todo: >
+ //changeState(MemberState::RS_SECONDARY);
}
else if( box.getState().startup2() ) {
// ? add comment
@@ -109,11 +118,18 @@ namespace mongo {
}
void ReplSetImpl::_fillIsMasterHost(const Member *m, vector<string>& hosts, vector<string>& passives, vector<string>& arbiters) {
+ if( m->config().hidden )
+ return;
+
if( m->potentiallyHot() ) {
hosts.push_back(m->h().toString());
}
else if( !m->config().arbiterOnly ) {
- passives.push_back(m->h().toString());
+ if( m->config().slaveDelay ) {
+ /* hmmm - we don't list these as they are stale. */
+ } else {
+ passives.push_back(m->h().toString());
+ }
}
else {
arbiters.push_back(m->h().toString());
@@ -152,6 +168,10 @@ namespace mongo {
}
if( myConfig().arbiterOnly )
b.append("arbiterOnly", true);
+ if( myConfig().slaveDelay )
+ b.append("slaveDelay", myConfig().slaveDelay);
+ if( myConfig().hidden )
+ b.append("hidden", true);
}
/** @param cfgString <setname>/<seedhost1>,<seedhost2> */
@@ -200,6 +220,7 @@ namespace mongo {
_self(0),
mgr( new Manager(this) )
{
+ _cfg = 0;
memset(_hbmsg, 0, sizeof(_hbmsg));
*_hbmsg = '.'; // temp...just to see
lastH = 0;
@@ -245,7 +266,7 @@ namespace mongo {
loadLastOpTimeWritten();
}
catch(std::exception& e) {
- log() << "replSet ERROR FATAL couldn't query the local " << rsoplog << " collection. Terminating mongod after 30 seconds." << rsLog;
+ log() << "replSet error fatal couldn't query the local " << rsoplog << " collection. Terminating mongod after 30 seconds." << rsLog;
log() << e.what() << rsLog;
sleepsecs(30);
dbexit( EXIT_REPLICATION_ERROR );
@@ -260,25 +281,64 @@ namespace mongo {
ReplSetImpl::StartupStatus ReplSetImpl::startupStatus = PRESTART;
string ReplSetImpl::startupStatusMsg;
- // true if ok; throws if config really bad; false if config doesn't include self
- bool ReplSetImpl::initFromConfig(ReplSetConfig& c) {
+ extern BSONObj *getLastErrorDefault;
+
+ /** @param reconf true if this is a reconfiguration and not an initial load of the configuration.
+ @return true if ok; throws if config really bad; false if config doesn't include self
+ */
+ bool ReplSetImpl::initFromConfig(ReplSetConfig& c, bool reconf) {
+ /* NOTE: haveNewConfig() writes the new config to disk before we get here. So
+ we cannot error out at this point, except fatally. Check errors earlier.
+ */
lock lk(this);
+ if( getLastErrorDefault || !c.getLastErrorDefaults.isEmpty() ) {
+ // see comment in dbcommands.cpp for getlasterrordefault
+ getLastErrorDefault = new BSONObj( c.getLastErrorDefaults );
+ }
+
+ list<const ReplSetConfig::MemberCfg*> newOnes;
+ bool additive = reconf;
{
+ unsigned nfound = 0;
int me = 0;
for( vector<ReplSetConfig::MemberCfg>::iterator i = c.members.begin(); i != c.members.end(); i++ ) {
const ReplSetConfig::MemberCfg& m = *i;
if( m.h.isSelf() ) {
+ nfound++;
me++;
+
+ if( !reconf || (_self && _self->id() == (unsigned) m._id) )
+ ;
+ else {
+ log() << "replSet " << _self->id() << ' ' << m._id << rsLog;
+ assert(false);
+ }
+ }
+ else if( reconf ) {
+ const Member *old = findById(m._id);
+ if( old ) {
+ nfound++;
+ assert( (int) old->id() == m._id );
+ if( old->config() == m ) {
+ additive = false;
+ }
+ }
+ else {
+ newOnes.push_back(&m);
+ }
}
}
if( me == 0 ) {
// log() << "replSet config : " << _cfg->toString() << rsLog;
- log() << "replSet warning can't find self in the repl set configuration:" << rsLog;
+ log() << "replSet error can't find self in the repl set configuration:" << rsLog;
log() << c.toString() << rsLog;
- return false;
+ assert(false);
}
uassert( 13302, "replSet error self appears twice in the repl set configuration", me<=1 );
+
+ if( reconf && config().members.size() != nfound )
+ additive = false;
}
_cfg = new ReplSetConfig(c);
@@ -287,6 +347,24 @@ namespace mongo {
_name = _cfg->_id;
assert( !_name.empty() );
+ if( additive ) {
+ log() << "replSet info : additive change to configuration" << rsLog;
+ for( list<const ReplSetConfig::MemberCfg*>::const_iterator i = newOnes.begin(); i != newOnes.end(); i++ ) {
+ const ReplSetConfig::MemberCfg* m = *i;
+ Member *mi = new Member(m->h, m->_id, m, false);
+
+ /** we will indicate that new members are up() initially so that we don't relinquish our
+ primary state because we can't (transiently) see a majority. they should be up as we
+ check that new members are up before getting here on reconfig anyway.
+ */
+ mi->get_hbinfo().health = 0.1;
+
+ _members.push(mi);
+ startHealthTaskFor(mi);
+ }
+ return true;
+ }
+
// start with no members. if this is a reconfig, drop the old ones.
_members.orphanAll();
@@ -416,6 +494,7 @@ namespace mongo {
startupStatusMsg = "replSet error loading set config (BADCONFIG)";
log() << "replSet error loading configurations " << e.toString() << rsLog;
log() << "replSet error replication will not start" << rsLog;
+ sethbmsg("error loading set config");
_fatal();
throw;
}
@@ -429,11 +508,10 @@ namespace mongo {
{
//lock l(this);
box.set(MemberState::RS_FATAL, 0);
- sethbmsg("fatal error");
- log() << "replSet error fatal error, stopping replication" << rsLog;
+ //sethbmsg("fatal error");
+ log() << "replSet error fatal, stopping replication" << rsLog;
}
-
void ReplSet::haveNewConfig(ReplSetConfig& newConfig, bool addComment) {
lock l(this); // convention is to lock replset before taking the db rwlock
writelock lk("");
@@ -442,7 +520,7 @@ namespace mongo {
comment = BSON( "msg" << "Reconfig set" << "version" << newConfig.version );
newConfig.saveConfigLocally(comment);
try {
- initFromConfig(newConfig);
+ initFromConfig(newConfig, true);
log() << "replSet replSetReconfig new config saved locally" << rsLog;
}
catch(DBException& e) {
diff --git a/db/repl/rs.h b/db/repl/rs.h
index 17a070c..6c4d9a8 100644
--- a/db/repl/rs.h
+++ b/db/repl/rs.h
@@ -44,19 +44,19 @@ namespace mongo {
public:
Member(HostAndPort h, unsigned ord, const ReplSetConfig::MemberCfg *c, bool self);
string fullName() const { return h().toString(); }
- const ReplSetConfig::MemberCfg& config() const { return *_config; }
+ const ReplSetConfig::MemberCfg& config() const { return _config; }
const HeartbeatInfo& hbinfo() const { return _hbinfo; }
- string lhb() { return _hbinfo.lastHeartbeatMsg; }
+ HeartbeatInfo& get_hbinfo() { return _hbinfo; }
+ string lhb() const { return _hbinfo.lastHeartbeatMsg; }
MemberState state() const { return _hbinfo.hbstate; }
const HostAndPort& h() const { return _h; }
unsigned id() const { return _hbinfo.id(); }
- bool potentiallyHot() const { return _config->potentiallyHot(); } // not arbiter, not priority 0
-
+ bool potentiallyHot() const { return _config.potentiallyHot(); } // not arbiter, not priority 0
void summarizeMember(stringstream& s) const;
friend class ReplSetImpl;
private:
- const ReplSetConfig::MemberCfg *_config; /* todo: when this changes??? */
- HostAndPort _h;
+ const ReplSetConfig::MemberCfg _config;
+ const HostAndPort _h;
HeartbeatInfo _hbinfo;
};
@@ -64,7 +64,13 @@ namespace mongo {
ReplSetImpl *rs;
bool busyWithElectSelf;
int _primary;
- const Member* findOtherPrimary();
+
+ /** @param two - if true two primaries were seen. this can happen transiently, in addition to our
+ polling being only occasional. in this case null is returned, but the caller should
+ not assume primary itself in that situation.
+ */
+ const Member* findOtherPrimary(bool& two);
+
void noteARemoteIsPrimary(const Member *);
virtual void starting();
public:
@@ -102,6 +108,7 @@ namespace mongo {
int totalVotes() const;
bool aMajoritySeemsToBeUp() const;
+ bool shouldRelinquish() const;
void electSelf();
void electCmdReceived(BSONObj, BSONObjBuilder*);
void multiCommand(BSONObj cmd, list<Target>& L);
@@ -119,8 +126,8 @@ namespace mongo {
protected:
RSBase() : magic(0x12345677), m("RSBase"), _locked(0) { }
~RSBase() {
- log() << "~RSBase should never be called?" << rsLog;
- assert(false);
+ /* this can happen if we throw in the constructor; otherwise never happens. thus we log it as it is quite unusual. */
+ log() << "replSet ~RSBase called" << rsLog;
}
class lock {
@@ -175,6 +182,9 @@ namespace mongo {
const Member* getPrimary() const { return sp.primary; }
void change(MemberState s, const Member *self) {
scoped_lock lk(m);
+ if( sp.state != s ) {
+ log() << "replSet " << s.toString() << rsLog;
+ }
sp.state = s;
if( s.primary() ) {
sp.primary = self;
@@ -194,6 +204,12 @@ namespace mongo {
assert( !sp.state.primary() );
sp.primary = mem;
}
+ void noteRemoteIsPrimary(const Member *remote) {
+ scoped_lock lk(m);
+ if( !sp.state.secondary() && !sp.state.fatal() )
+ sp.state = MemberState::RS_RECOVERING;
+ sp.primary = remote;
+ }
StateBox() : m("StateBox") { }
private:
mutex m;
@@ -226,7 +242,6 @@ namespace mongo {
};
static StartupStatus startupStatus;
static string startupStatusMsg;
- static string stateAsStr(MemberState state);
static string stateAsHtml(MemberState state);
/* todo thread */
@@ -241,28 +256,24 @@ namespace mongo {
void endOldHealthTasks();
void startHealthTaskFor(Member *m);
- private:
Consensus elect;
- bool ok() const { return !box.getState().fatal(); }
-
void relinquish();
void forgetPrimary();
-
protected:
bool _stepDown();
private:
void assumePrimary();
void loadLastOpTimeWritten();
void changeState(MemberState s);
-
protected:
// "heartbeat message"
// sent in requestHeartbeat respond in field "hbm"
char _hbmsg[256]; // we change this unlocked, thus not an stl::string
+ time_t _hbmsgTime; // when it was logged
public:
void sethbmsg(string s, int logLevel = 0);
protected:
- bool initFromConfig(ReplSetConfig& c); // true if ok; throws if config really bad; false if config doesn't include self
+ bool initFromConfig(ReplSetConfig& c, bool reconf=false); // true if ok; throws if config really bad; false if config doesn't include self
void _fillIsMaster(BSONObjBuilder&);
void _fillIsMasterHost(const Member*, vector<string>&, vector<string>&, vector<string>&);
const ReplSetConfig& config() { return *_cfg; }
@@ -323,8 +334,10 @@ namespace mongo {
void _syncDoInitialSync();
void syncDoInitialSync();
void _syncThread();
+ bool tryToGoLiveAsASecondary(OpTime&); // readlocks
void syncTail();
void syncApply(const BSONObj &o);
+ unsigned _syncRollback(OplogReader& r);
void syncRollback(OplogReader& r);
void syncFixUp(HowToFixUp& h, OplogReader& r);
public:
@@ -344,9 +357,10 @@ namespace mongo {
/* call after constructing to start - returns fairly quickly after la[unching its threads */
void go() { _go(); }
+
void fatal() { _fatal(); }
- bool isPrimary();
- bool isSecondary();
+ bool isPrimary() { return box.getState().primary(); }
+ bool isSecondary() { return box.getState().secondary(); }
MemberState state() const { return ReplSetImpl::state(); }
string name() const { return ReplSetImpl::name(); }
const ReplSetConfig& config() { return ReplSetImpl::config(); }
@@ -366,7 +380,10 @@ namespace mongo {
bool lockedByMe() { return RSBase::lockedByMe(); }
// heartbeat msg to send to others; descriptive diagnostic info
- string hbmsg() const { return _hbmsg; }
+ string hbmsg() const {
+ if( time(0)-_hbmsgTime > 120 ) return "";
+ return _hbmsg;
+ }
};
/** base class for repl set commands. checks basic things such as in rs mode before the command
@@ -388,6 +405,8 @@ namespace mongo {
if( theReplSet == 0 ) {
result.append("startupStatus", ReplSet::startupStatus);
errmsg = ReplSet::startupStatusMsg.empty() ? "replset unknown error 2" : ReplSet::startupStatusMsg;
+ if( ReplSet::startupStatus == 3 )
+ result.append("info", "run rs.initiate(...) if not yet done for the set");
return false;
}
return true;
@@ -397,19 +416,10 @@ namespace mongo {
/** inlines ----------------- */
inline Member::Member(HostAndPort h, unsigned ord, const ReplSetConfig::MemberCfg *c, bool self) :
- _config(c), _h(h), _hbinfo(ord) {
- if( self ) {
- _hbinfo.health = 1.0;
- }
- }
-
- inline bool ReplSet::isPrimary() {
- /* todo replset */
- return box.getState().primary();
- }
-
- inline bool ReplSet::isSecondary() {
- return box.getState().secondary();
+ _config(*c), _h(h), _hbinfo(ord)
+ {
+ if( self )
+ _hbinfo.health = 1.0;
}
}
diff --git a/db/repl/rs_config.cpp b/db/repl/rs_config.cpp
index 76b20a4..85c9a46 100644
--- a/db/repl/rs_config.cpp
+++ b/db/repl/rs_config.cpp
@@ -31,6 +31,16 @@ namespace mongo {
void logOpInitiate(const bo&);
+ void assertOnlyHas(BSONObj o, const set<string>& fields) {
+ BSONObj::iterator i(o);
+ while( i.more() ) {
+ BSONElement e = i.next();
+ if( !fields.count( e.fieldName() ) ) {
+ uasserted(13434, str::stream() << "unexpected field '" << e.fieldName() << "'in object");
+ }
+ }
+ }
+
list<HostAndPort> ReplSetConfig::otherMemberHostnames() const {
list<HostAndPort> L;
for( vector<MemberCfg>::const_iterator i = members.begin(); i != members.end(); i++ ) {
@@ -42,7 +52,7 @@ namespace mongo {
/* comment MUST only be set when initiating the set by the initiator */
void ReplSetConfig::saveConfigLocally(bo comment) {
- check();
+ checkRsConfig();
log() << "replSet info saving a newer config version to local.system.replset" << rsLog;
{
writelock lk("");
@@ -81,6 +91,8 @@ namespace mongo {
if( votes != 1 ) b << "votes" << votes;
if( priority != 1.0 ) b << "priority" << priority;
if( arbiterOnly ) b << "arbiterOnly" << true;
+ if( slaveDelay ) b << "slaveDelay" << slaveDelay;
+ if( hidden ) b << "hidden" << hidden;
return b.obj();
}
@@ -115,9 +127,17 @@ namespace mongo {
mchk(priority >= 0 && priority <= 1000);
mchk(votes >= 0 && votes <= 100);
uassert(13419, "this version of mongod only supports priorities 0 and 1", priority == 0 || priority == 1);
+ uassert(13437, "slaveDelay requires priority be zero", slaveDelay == 0 || priority == 0);
+ uassert(13438, "bad slaveDelay value", slaveDelay >= 0 && slaveDelay <= 3600 * 24 * 366);
+ uassert(13439, "priority must be 0 when hidden=true", priority == 0 || !hidden);
}
+ /** @param o old config
+ @param n new config
+ */
/*static*/ bool ReplSetConfig::legalChange(const ReplSetConfig& o, const ReplSetConfig& n, string& errmsg) {
+ assert( theReplSet );
+
if( o._id != n._id ) {
errmsg = "set name may not change";
return false;
@@ -131,6 +151,25 @@ namespace mongo {
return false;
}
+ map<HostAndPort,const ReplSetConfig::MemberCfg*> old;
+ for( vector<ReplSetConfig::MemberCfg>::const_iterator i = o.members.begin(); i != o.members.end(); i++ ) {
+ old[i->h] = &(*i);
+ }
+ int me = 0;
+ for( vector<ReplSetConfig::MemberCfg>::const_iterator i = n.members.begin(); i != n.members.end(); i++ ) {
+ const ReplSetConfig::MemberCfg& m = *i;
+ if( old.count(m.h) ) {
+ if( old[m.h]->_id != m._id ) {
+ log() << "replSet reconfig error with member: " << m.h.toString() << rsLog;
+ uasserted(13432, "_id may not change for members");
+ }
+ }
+ if( m.h.isSelf() )
+ me++;
+ }
+
+ uassert(13433, "can't find self in new replset config", me == 1);
+
/* TODO : MORE CHECKS HERE */
log() << "replSet TODO : don't allow removal of a node until we handle it at the removed node end?" << endl;
@@ -144,7 +183,7 @@ namespace mongo {
_ok = false;
}
- void ReplSetConfig::check() const {
+ void ReplSetConfig::checkRsConfig() const {
uassert(13132,
"nonmatching repl set name in _id field; check --replSet command line",
_id == cmdLine.ourSetName());
@@ -154,6 +193,10 @@ namespace mongo {
}
void ReplSetConfig::from(BSONObj o) {
+ static const string legal[] = {"_id","version", "members","settings"};
+ static const set<string> legals(legal, legal + 4);
+ assertOnlyHas(o, legals);
+
md5 = o.md5();
_id = o["_id"].String();
if( o["version"].ok() ) {
@@ -170,7 +213,7 @@ namespace mongo {
if( settings["heartbeatTimeout"].ok() )
ho.heartbeatTimeoutMillis = (unsigned) (settings["heartbeatTimeout"].Number() * 1000);
ho.check();
- try { getLastErrorDefaults = settings["getLastErrorDefaults"].Obj(); } catch(...) { }
+ try { getLastErrorDefaults = settings["getLastErrorDefaults"].Obj().copy(); } catch(...) { }
}
set<string> hosts;
@@ -188,6 +231,10 @@ namespace mongo {
BSONObj mobj = members[i].Obj();
MemberCfg m;
try {
+ static const string legal[] = {"_id","votes","priority","host","hidden","slaveDelay","arbiterOnly"};
+ static const set<string> legals(legal, legal + 7);
+ assertOnlyHas(mobj, legals);
+
try {
m._id = (int) mobj["_id"].Number();
} catch(...) {
@@ -205,6 +252,9 @@ namespace mongo {
if( m.h.isLocalHost() )
localhosts++;
m.arbiterOnly = mobj.getBoolField("arbiterOnly");
+ m.slaveDelay = mobj["slaveDelay"].numberInt();
+ if( mobj.hasElement("hidden") )
+ m.hidden = mobj.getBoolField("hidden");
if( mobj.hasElement("priority") )
m.priority = mobj["priority"].Number();
if( mobj.hasElement("votes") )
@@ -308,6 +358,7 @@ namespace mongo {
BSONObj o = c->nextSafe();
uassert(13109, "multiple rows in " + rsConfigNs + " not supported", !c->more());
from(o);
+ checkRsConfig();
_ok = true;
log(level) << "replSet load config ok from " << (h.isSelf() ? "self" : h.toString()) << rsLog;
}
diff --git a/db/repl/rs_config.h b/db/repl/rs_config.h
index 38df772..e39dad7 100644
--- a/db/repl/rs_config.h
+++ b/db/repl/rs_config.h
@@ -41,18 +41,27 @@ namespace mongo {
bool ok() const { return _ok; }
struct MemberCfg {
- MemberCfg() : _id(-1), votes(1), priority(1.0), arbiterOnly(false) { }
+ MemberCfg() : _id(-1), votes(1), priority(1.0), arbiterOnly(false), slaveDelay(0), hidden(false) { }
int _id; /* ordinal */
unsigned votes; /* how many votes this node gets. default 1. */
HostAndPort h;
double priority; /* 0 means can never be primary */
bool arbiterOnly;
+ int slaveDelay; /* seconds. int rather than unsigned for convenient to/front bson conversion. */
+ bool hidden; /* if set, don't advertise to drives in isMaster. for non-primaries (priority 0) */
+
void check() const; /* check validity, assert if not. */
BSONObj asBson() const;
bool potentiallyHot() const {
return !arbiterOnly && priority > 0;
}
+ bool operator==(const MemberCfg& r) const {
+ return _id==r._id && votes == r.votes && h == r.h && priority == r.priority &&
+ arbiterOnly == r.arbiterOnly && slaveDelay == r.slaveDelay && hidden == r.hidden;
+ }
+ bool operator!=(const MemberCfg& r) const { return !(*this == r); }
};
+
vector<MemberCfg> members;
string _id;
int version;
@@ -68,7 +77,7 @@ namespace mongo {
string toString() const { return asBson().toString(); }
/** validate the settings. does not call check() on each member, you have to do that separately. */
- void check() const;
+ void checkRsConfig() const;
/** check if modification makes sense */
static bool legalChange(const ReplSetConfig& old, const ReplSetConfig& n, string& errmsg);
diff --git a/db/repl/rs_initialsync.cpp b/db/repl/rs_initialsync.cpp
index 4c6bd4d..3851c66 100644
--- a/db/repl/rs_initialsync.cpp
+++ b/db/repl/rs_initialsync.cpp
@@ -66,7 +66,7 @@ namespace mongo {
void _logOpObjRS(const BSONObj& op);
- bool copyCollectionFromRemote(const string& host, const string& ns, const BSONObj& query, string errmsg);
+ bool copyCollectionFromRemote(const string& host, const string& ns, const BSONObj& query, string &errmsg, bool logforrepl);
static void emptyOplog() {
writelock lk(rsoplog);
diff --git a/db/repl/rs_member.h b/db/repl/rs_member.h
index 4f6846a..6a797b5 100644
--- a/db/repl/rs_member.h
+++ b/db/repl/rs_member.h
@@ -40,7 +40,8 @@ namespace mongo {
RS_STARTUP2,
RS_UNKNOWN, /* remote node not yet reached */
RS_ARBITER,
- RS_DOWN /* node not reachable for a report */
+ RS_DOWN, /* node not reachable for a report */
+ RS_ROLLBACK
} s;
MemberState(MS ms = RS_UNKNOWN) : s(ms) { }
@@ -51,6 +52,9 @@ namespace mongo {
bool recovering() const { return s == RS_RECOVERING; }
bool startup2() const { return s == RS_STARTUP2; }
bool fatal() const { return s == RS_FATAL; }
+ bool rollback() const { return s == RS_ROLLBACK; }
+
+ string toString() const;
bool operator==(const MemberState& r) const { return s == r.s; }
bool operator!=(const MemberState& r) const { return s != r.s; }
@@ -61,26 +65,31 @@ namespace mongo {
class HeartbeatInfo {
unsigned _id;
public:
- HeartbeatInfo() : _id(0xffffffff),skew(INT_MIN) { }
+ HeartbeatInfo() : _id(0xffffffff),hbstate(MemberState::RS_UNKNOWN),health(-1.0),downSince(0),skew(INT_MIN) { }
HeartbeatInfo(unsigned id);
bool up() const { return health > 0; }
unsigned id() const { return _id; }
MemberState hbstate;
double health;
time_t upSince;
+ long long downSince;
time_t lastHeartbeat;
string lastHeartbeatMsg;
OpTime opTime;
int skew;
+ long long timeDown() const; // ms
+
/* true if changed in a way of interest to the repl set manager. */
bool changed(const HeartbeatInfo& old) const;
};
inline HeartbeatInfo::HeartbeatInfo(unsigned id) : _id(id) {
- health = -1.0;
- lastHeartbeat = upSince = 0;
- skew = INT_MIN;
+ hbstate = MemberState::RS_UNKNOWN;
+ health = -1.0;
+ downSince = 0;
+ lastHeartbeat = upSince = 0;
+ skew = INT_MIN;
}
inline bool HeartbeatInfo::changed(const HeartbeatInfo& old) const {
diff --git a/db/repl/rs_rollback.cpp b/db/repl/rs_rollback.cpp
index 1bb7217..6b2544c 100644
--- a/db/repl/rs_rollback.cpp
+++ b/db/repl/rs_rollback.cpp
@@ -62,6 +62,14 @@ namespace mongo {
using namespace bson;
+ bool copyCollectionFromRemote(const string& host, const string& ns, const BSONObj& query, string& errmsg, bool logforrepl);
+ void incRBID();
+
+ class rsfatal : public std::exception {
+ public:
+ virtual const char* what() const throw(){ return "replica set fatal exception"; }
+ };
+
struct DocID {
const char *ns;
be _id;
@@ -81,6 +89,8 @@ namespace mongo {
/* collections to drop */
set<string> toDrop;
+ set<string> collectionsToResync;
+
OpTime commonPoint;
DiskLoc commonPointOurDiskloc;
@@ -113,17 +123,59 @@ namespace mongo {
if( *op == 'c' ) {
be first = o.firstElement();
NamespaceString s(d.ns); // foo.$cmd
-
- if( string("create") == first.fieldName() ) {
- /* Create collection operation
- { ts: ..., h: ..., op: "c", ns: "foo.$cmd", o: { create: "abc", ... } }
- */
- string ns = s.db + '.' + o["create"].String(); // -> foo.abc
- h.toDrop.insert(ns);
+ string cmdname = first.fieldName();
+ Command *cmd = Command::findCommand(cmdname.c_str());
+ if( cmd == 0 ) {
+ log() << "replSet warning rollback no suchcommand " << first.fieldName() << " - different mongod versions perhaps?" << rsLog;
return;
}
- else {
- log() << "replSet WARNING can't roll back this command yet: " << o.toString() << rsLog;
+ else {
+ /* findandmodify - tranlated?
+ godinsert?,
+ renamecollection a->b. just resync a & b
+ */
+ if( cmdname == "create" ) {
+ /* Create collection operation
+ { ts: ..., h: ..., op: "c", ns: "foo.$cmd", o: { create: "abc", ... } }
+ */
+ string ns = s.db + '.' + o["create"].String(); // -> foo.abc
+ h.toDrop.insert(ns);
+ return;
+ }
+ else if( cmdname == "drop" ) {
+ string ns = s.db + '.' + first.valuestr();
+ h.collectionsToResync.insert(ns);
+ return;
+ }
+ else if( cmdname == "dropIndexes" || cmdname == "deleteIndexes" ) {
+ /* TODO: this is bad. we simply full resync the collection here, which could be very slow. */
+ log() << "replSet info rollback of dropIndexes is slow in this version of mongod" << rsLog;
+ string ns = s.db + '.' + first.valuestr();
+ h.collectionsToResync.insert(ns);
+ return;
+ }
+ else if( cmdname == "renameCollection" ) {
+ /* TODO: slow. */
+ log() << "replSet info rollback of renameCollection is slow in this version of mongod" << rsLog;
+ string from = first.valuestr();
+ string to = o["to"].String();
+ h.collectionsToResync.insert(from);
+ h.collectionsToResync.insert(to);
+ return;
+ }
+ else if( cmdname == "reIndex" ) {
+ return;
+ }
+ else if( cmdname == "dropDatabase" ) {
+ log() << "replSet error rollback : can't rollback drop database full resync will be required" << rsLog;
+ log() << "replSet " << o.toString() << rsLog;
+ throw rsfatal();
+ }
+ else {
+ log() << "replSet error can't rollback this command yet: " << o.toString() << rsLog;
+ log() << "replSet cmdname=" << cmdname << rsLog;
+ throw rsfatal();
+ }
}
}
@@ -141,8 +193,6 @@ namespace mongo {
static void syncRollbackFindCommonPoint(DBClientConnection *them, HowToFixUp& h) {
static time_t last;
if( time(0)-last < 60 ) {
- // this could put a lot of load on someone else, don't repeat too often
- sleepsecs(10);
throw "findcommonpoint waiting a while before trying again";
}
last = time(0);
@@ -170,12 +220,14 @@ namespace mongo {
BSONObj theirObj = t->nextSafe();
OpTime theirTime = theirObj["ts"]._opTime();
- if( 1 ) {
+ {
long long diff = (long long) ourTime.getSecs() - ((long long) theirTime.getSecs());
/* diff could be positive, negative, or zero */
- log() << "replSet info syncRollback diff in end of log times : " << diff << " seconds" << rsLog;
+ log() << "replSet info rollback our last optime: " << ourTime.toStringPretty() << rsLog;
+ log() << "replSet info rollback their last optime: " << theirTime.toStringPretty() << rsLog;
+ log() << "replSet info rollback diff in end of log times: " << diff << " seconds" << rsLog;
if( diff > 3600 ) {
- log() << "replSet syncRollback too long a time period for a rollback." << rsLog;
+ log() << "replSet rollback too long a time period for a rollback." << rsLog;
throw "error not willing to roll back more than one hour of data";
}
}
@@ -197,16 +249,35 @@ namespace mongo {
refetch(h, ourObj);
+ if( !t->more() ) {
+ log() << "replSet rollback error RS100 reached beginning of remote oplog" << rsLog;
+ log() << "replSet them: " << them->toString() << " scanned: " << scanned << rsLog;
+ log() << "replSet theirTime: " << theirTime.toStringLong() << rsLog;
+ log() << "replSet ourTime: " << ourTime.toStringLong() << rsLog;
+ throw "RS100 reached beginning of remote oplog [2]";
+ }
theirObj = t->nextSafe();
theirTime = theirObj["ts"]._opTime();
u.advance();
- if( !u.ok() ) throw "reached beginning of local oplog";
+ if( !u.ok() ) {
+ log() << "replSet rollback error RS101 reached beginning of local oplog" << rsLog;
+ log() << "replSet them: " << them->toString() << " scanned: " << scanned << rsLog;
+ log() << "replSet theirTime: " << theirTime.toStringLong() << rsLog;
+ log() << "replSet ourTime: " << ourTime.toStringLong() << rsLog;
+ throw "RS101 reached beginning of local oplog [1]";
+ }
ourObj = u.current();
ourTime = ourObj["ts"]._opTime();
}
else if( theirTime > ourTime ) {
- /* todo: we could hit beginning of log here. exception thrown is ok but not descriptive, so fix up */
+ if( !t->more() ) {
+ log() << "replSet rollback error RS100 reached beginning of remote oplog" << rsLog;
+ log() << "replSet them: " << them->toString() << " scanned: " << scanned << rsLog;
+ log() << "replSet theirTime: " << theirTime.toStringLong() << rsLog;
+ log() << "replSet ourTime: " << ourTime.toStringLong() << rsLog;
+ throw "RS100 reached beginning of remote oplog [1]";
+ }
theirObj = t->nextSafe();
theirTime = theirObj["ts"]._opTime();
}
@@ -214,7 +285,13 @@ namespace mongo {
// theirTime < ourTime
refetch(h, ourObj);
u.advance();
- if( !u.ok() ) throw "reached beginning of local oplog";
+ if( !u.ok() ) {
+ log() << "replSet rollback error RS101 reached beginning of local oplog" << rsLog;
+ log() << "replSet them: " << them->toString() << " scanned: " << scanned << rsLog;
+ log() << "replSet theirTime: " << theirTime.toStringLong() << rsLog;
+ log() << "replSet ourTime: " << ourTime.toStringLong() << rsLog;
+ throw "RS101 reached beginning of local oplog [2]";
+ }
ourObj = u.current();
ourTime = ourObj["ts"]._opTime();
}
@@ -226,7 +303,19 @@ namespace mongo {
bson::bo goodVersionOfObject;
};
- void ReplSetImpl::syncFixUp(HowToFixUp& h, OplogReader& r) {
+ static void setMinValid(bo newMinValid) {
+ try {
+ log() << "replSet minvalid=" << newMinValid["ts"]._opTime().toStringLong() << rsLog;
+ }
+ catch(...) { }
+ {
+ Helpers::putSingleton("local.replset.minvalid", newMinValid);
+ Client::Context cx( "local." );
+ cx.db()->flushFiles(true);
+ }
+ }
+
+ void ReplSetImpl::syncFixUp(HowToFixUp& h, OplogReader& r) {
DBClientConnection *them = r.conn();
// fetch all first so we needn't handle interruption in a fancy way
@@ -237,6 +326,7 @@ namespace mongo {
bo newMinValid;
+ /* fetch all the goodVersions of each document from current primary */
DocID d;
unsigned long long n = 0;
try {
@@ -258,43 +348,98 @@ namespace mongo {
}
newMinValid = r.getLastOp(rsoplog);
if( newMinValid.isEmpty() ) {
- sethbmsg("syncRollback error newMinValid empty?");
+ sethbmsg("rollback error newMinValid empty?");
return;
}
}
catch(DBException& e) {
- sethbmsg(str::stream() << "syncRollback re-get objects: " << e.toString(),0);
- log() << "syncRollback couldn't re-get ns:" << d.ns << " _id:" << d._id << ' ' << n << '/' << h.toRefetch.size() << rsLog;
+ sethbmsg(str::stream() << "rollback re-get objects: " << e.toString(),0);
+ log() << "rollback couldn't re-get ns:" << d.ns << " _id:" << d._id << ' ' << n << '/' << h.toRefetch.size() << rsLog;
throw e;
}
- sethbmsg("syncRollback 3.5");
+ MemoryMappedFile::flushAll(true);
+
+ sethbmsg("rollback 3.5");
if( h.rbid != getRBID(r.conn()) ) {
// our source rolled back itself. so the data we received isn't necessarily consistent.
- sethbmsg("syncRollback rbid on source changed during rollback, cancelling this attempt");
+ sethbmsg("rollback rbid on source changed during rollback, cancelling this attempt");
return;
}
// update them
- sethbmsg(str::stream() << "syncRollback 4 n:" << goodVersions.size());
+ sethbmsg(str::stream() << "rollback 4 n:" << goodVersions.size());
bool warn = false;
assert( !h.commonPointOurDiskloc.isNull() );
- MemoryMappedFile::flushAll(true);
-
dbMutex.assertWriteLocked();
/* we have items we are writing that aren't from a point-in-time. thus best not to come online
until we get to that point in freshness. */
- try {
- log() << "replSet set minvalid=" << newMinValid["ts"]._opTime().toString() << rsLog;
+ setMinValid(newMinValid);
+
+ /** any full collection resyncs required? */
+ if( !h.collectionsToResync.empty() ) {
+ for( set<string>::iterator i = h.collectionsToResync.begin(); i != h.collectionsToResync.end(); i++ ) {
+ string ns = *i;
+ sethbmsg(str::stream() << "rollback 4.1 coll resync " << ns);
+ Client::Context c(*i, dbpath, 0, /*doauth*/false);
+ try {
+ bob res;
+ string errmsg;
+ dropCollection(ns, errmsg, res);
+ {
+ dbtemprelease r;
+ bool ok = copyCollectionFromRemote(them->getServerAddress(), ns, bo(), errmsg, false);
+ if( !ok ) {
+ log() << "replSet rollback error resyncing collection " << ns << ' ' << errmsg << rsLog;
+ throw "rollback error resyncing rollection [1]";
+ }
+ }
+ }
+ catch(...) {
+ log() << "replset rollback error resyncing collection " << ns << rsLog;
+ throw "rollback error resyncing rollection [2]";
+ }
+ }
+
+ /* we did more reading from primary, so check it again for a rollback (which would mess us up), and
+ make minValid newer.
+ */
+ sethbmsg("rollback 4.2");
+ {
+ string err;
+ try {
+ newMinValid = r.getLastOp(rsoplog);
+ if( newMinValid.isEmpty() ) {
+ err = "can't get minvalid from primary";
+ } else {
+ setMinValid(newMinValid);
+ }
+ }
+ catch(...) {
+ err = "can't get/set minvalid";
+ }
+ if( h.rbid != getRBID(r.conn()) ) {
+ // our source rolled back itself. so the data we received isn't necessarily consistent.
+ // however, we've now done writes. thus we have a problem.
+ err += "rbid at primary changed during resync/rollback";
+ }
+ if( !err.empty() ) {
+ log() << "replSet error rolling back : " << err << ". A full resync will be necessary." << rsLog;
+ /* todo: reset minvalid so that we are permanently in fatal state */
+ /* todo: don't be fatal, but rather, get all the data first. */
+ sethbmsg("rollback error");
+ throw rsfatal();
+ }
+ }
+ sethbmsg("rollback 4.3");
}
- catch(...){}
- Helpers::putSingleton("local.replset.minvalid", newMinValid);
- /** first drop collections to drop - that might make things faster below actually if there were subsequent inserts */
+ sethbmsg("rollback 4.6");
+ /** drop collections to drop before doing individual fixups - that might make things faster below actually if there were subsequent inserts to rollback */
for( set<string>::iterator i = h.toDrop.begin(); i != h.toDrop.end(); i++ ) {
Client::Context c(*i, dbpath, 0, /*doauth*/false);
try {
@@ -308,6 +453,7 @@ namespace mongo {
}
}
+ sethbmsg("rollback 4.7");
Client::Context c(rsoplog, dbpath, 0, /*doauth*/false);
NamespaceDetails *oplogDetails = nsdetails(rsoplog);
uassert(13423, str::stream() << "replSet error in rollback can't find " << rsoplog, oplogDetails);
@@ -320,7 +466,12 @@ namespace mongo {
bo pattern = d._id.wrap(); // { _id : ... }
try {
assert( d.ns && *d.ns );
-
+ if( h.collectionsToResync.count(d.ns) ) {
+ /* we just synced this entire collection */
+ continue;
+ }
+
+ /* keep an archive of items rolled back */
shared_ptr<RemoveSaver>& rs = removeSavers[d.ns];
if ( ! rs )
rs.reset( new RemoveSaver( "rollback" , "" , d.ns ) );
@@ -343,7 +494,7 @@ namespace mongo {
long long start = Listener::getElapsedTimeMillis();
DiskLoc loc = Helpers::findOne(d.ns, pattern, false);
if( Listener::getElapsedTimeMillis() - start > 200 )
- log() << "replSet warning roll back slow no _id index for " << d.ns << rsLog;
+ log() << "replSet warning roll back slow no _id index for " << d.ns << " perhaps?" << rsLog;
//would be faster but requires index: DiskLoc loc = Helpers::findById(nsd, pattern);
if( !loc.isNull() ) {
try {
@@ -411,9 +562,9 @@ namespace mongo {
removeSavers.clear(); // this effectively closes all of them
- sethbmsg(str::stream() << "syncRollback 5 d:" << deletes << " u:" << updates);
+ sethbmsg(str::stream() << "rollback 5 d:" << deletes << " u:" << updates);
MemoryMappedFile::flushAll(true);
- sethbmsg("syncRollback 6");
+ sethbmsg("rollback 6");
// clean up oplog
log(2) << "replSet rollback truncate oplog after " << h.commonPoint.toStringPretty() << rsLog;
@@ -423,59 +574,99 @@ namespace mongo {
/* reset cached lastoptimewritten and h value */
loadLastOpTimeWritten();
- sethbmsg("syncRollback 7");
+ sethbmsg("rollback 7");
MemoryMappedFile::flushAll(true);
// done
if( warn )
sethbmsg("issues during syncRollback, see log");
else
- sethbmsg("syncRollback done");
+ sethbmsg("rollback done");
}
void ReplSetImpl::syncRollback(OplogReader&r) {
+ unsigned s = _syncRollback(r);
+ if( s )
+ sleepsecs(s);
+ }
+
+ unsigned ReplSetImpl::_syncRollback(OplogReader&r) {
assert( !lockedByMe() );
assert( !dbMutex.atLeastReadLocked() );
- sethbmsg("syncRollback 0");
+ sethbmsg("rollback 0");
writelocktry lk(rsoplog, 20000);
if( !lk.got() ) {
- sethbmsg("syncRollback couldn't get write lock in a reasonable time");
- sleepsecs(2);
- return;
+ sethbmsg("rollback couldn't get write lock in a reasonable time");
+ return 2;
+ }
+
+ if( box.getState().secondary() ) {
+ /* by doing this, we will not service reads (return an error as we aren't in secondary staate.
+ that perhaps is moot becasue of the write lock above, but that write lock probably gets deferred
+ or removed or yielded later anyway.
+
+ also, this is better for status reporting - we know what is happening.
+ */
+ box.change(MemberState::RS_ROLLBACK, _self);
}
HowToFixUp how;
- sethbmsg("syncRollback 1");
+ sethbmsg("rollback 1");
{
r.resetCursor();
/*DBClientConnection us(false, 0, 0);
string errmsg;
if( !us.connect(HostAndPort::me().toString(),errmsg) ) {
- sethbmsg("syncRollback connect to self failure" + errmsg);
+ sethbmsg("rollback connect to self failure" + errmsg);
return;
}*/
- sethbmsg("syncRollback 2 FindCommonPoint");
+ sethbmsg("rollback 2 FindCommonPoint");
try {
syncRollbackFindCommonPoint(r.conn(), how);
}
catch( const char *p ) {
- sethbmsg(string("syncRollback 2 error ") + p);
- sleepsecs(10);
- return;
+ sethbmsg(string("rollback 2 error ") + p);
+ return 10;
+ }
+ catch( rsfatal& ) {
+ _fatal();
+ return 2;
}
catch( DBException& e ) {
- sethbmsg(string("syncRollback 2 exception ") + e.toString() + "; sleeping 1 min");
+ sethbmsg(string("rollback 2 exception ") + e.toString() + "; sleeping 1 min");
+ dbtemprelease r;
sleepsecs(60);
throw;
}
}
- sethbmsg("replSet syncRollback 3 fixup");
+ sethbmsg("replSet rollback 3 fixup");
+
+ {
+ incRBID();
+ try {
+ syncFixUp(how, r);
+ }
+ catch( rsfatal& ) {
+ sethbmsg("rollback fixup error");
+ _fatal();
+ return 2;
+ }
+ catch(...) {
+ incRBID(); throw;
+ }
+ incRBID();
+
+ /* success - leave "ROLLBACK" state
+ can go to SECONDARY once minvalid is achieved
+ */
+ box.change(MemberState::RS_RECOVERING, _self);
+ }
- syncFixUp(how, r);
+ return 0;
}
}
diff --git a/db/repl/rs_sync.cpp b/db/repl/rs_sync.cpp
index bece96c..9ea65cf 100644
--- a/db/repl/rs_sync.cpp
+++ b/db/repl/rs_sync.cpp
@@ -24,8 +24,11 @@ namespace mongo {
using namespace bson;
+ extern unsigned replSetForceInitialSyncFailure;
+
void startSyncThread() {
Client::initThread("rs_sync");
+ cc().iAmSyncThread();
theReplSet->syncThread();
cc().shutdown();
}
@@ -91,6 +94,7 @@ namespace mongo {
// todo : use exhaust
unsigned long long n = 0;
while( 1 ) {
+
if( !r.more() )
break;
BSONObj o = r.nextSafe(); /* note we might get "not master" at some point */
@@ -103,9 +107,14 @@ namespace mongo {
anymore. assumePrimary is in the db lock so we are safe as long as
we check after we locked above. */
const Member *p1 = box.getPrimary();
- if( p1 != primary ) {
- log() << "replSet primary was:" << primary->fullName() << " now:" <<
- (p1 != 0 ? p1->fullName() : "none") << rsLog;
+ if( p1 != primary || replSetForceInitialSyncFailure ) {
+ int f = replSetForceInitialSyncFailure;
+ if( f > 0 ) {
+ replSetForceInitialSyncFailure = f-1;
+ log() << "replSet test code invoked, replSetForceInitialSyncFailure" << rsLog;
+ }
+ log() << "replSet primary was:" << primary->fullName() << " now:" <<
+ (p1 != 0 ? p1->fullName() : "none") << rsLog;
throw DBException("primary changed",0);
}
@@ -131,6 +140,32 @@ namespace mongo {
return true;
}
+ /* should be in RECOVERING state on arrival here.
+ readlocks
+ @return true if transitioned to SECONDARY
+ */
+ bool ReplSetImpl::tryToGoLiveAsASecondary(OpTime& /*out*/ minvalid) {
+ bool golive = false;
+ {
+ readlock lk("local.replset.minvalid");
+ BSONObj mv;
+ if( Helpers::getSingleton("local.replset.minvalid", mv) ) {
+ minvalid = mv["ts"]._opTime();
+ if( minvalid <= lastOpTimeWritten ) {
+ golive=true;
+ }
+ }
+ else
+ golive = true; /* must have been the original member */
+ }
+ if( golive ) {
+ sethbmsg("");
+ changeState(MemberState::RS_SECONDARY);
+ }
+ return golive;
+ }
+
+ /* tail the primary's oplog. ok to return, will be re-called. */
void ReplSetImpl::syncTail() {
// todo : locking vis a vis the mgr...
@@ -147,14 +182,19 @@ namespace mongo {
{
BSONObj remoteOldestOp = r.findOne(rsoplog, Query());
OpTime ts = remoteOldestOp["ts"]._opTime();
- DEV log() << "remoteOldestOp: " << ts.toStringPretty() << endl;
- else log(3) << "remoteOldestOp: " << ts.toStringPretty() << endl;
+ DEV log() << "replSet remoteOldestOp: " << ts.toStringLong() << rsLog;
+ else log(3) << "replSet remoteOldestOp: " << ts.toStringLong() << rsLog;
+ DEV {
+ // debugging sync1.js...
+ log() << "replSet lastOpTimeWritten: " << lastOpTimeWritten.toStringLong() << rsLog;
+ log() << "replSet our state: " << state().toString() << rsLog;
+ }
if( lastOpTimeWritten < ts ) {
- log() << "replSet error too stale to catch up, at least from primary " << hn << rsLog;
- log() << "replSet our last optime : " << lastOpTimeWritten.toStringPretty() << rsLog;
- log() << "replSet oldest at " << hn << " : " << ts.toStringPretty() << rsLog;
+ log() << "replSet error RS102 too stale to catch up, at least from primary: " << hn << rsLog;
+ log() << "replSet our last optime : " << lastOpTimeWritten.toStringLong() << rsLog;
+ log() << "replSet oldest at " << hn << " : " << ts.toStringLong() << rsLog;
log() << "replSet See http://www.mongodb.org/display/DOCS/Resyncing+a+Very+Stale+Replica+Set+Member" << rsLog;
- sethbmsg("error too stale to catch up");
+ sethbmsg("error RS102 too stale to catch up");
sleepsecs(120);
return;
}
@@ -213,7 +253,13 @@ namespace mongo {
}
}
- while( 1 ) {
+ /* we have now checked if we need to rollback and we either don't have to or did it. */
+ {
+ OpTime minvalid;
+ tryToGoLiveAsASecondary(minvalid);
+ }
+
+ while( 1 ) {
while( 1 ) {
if( !r.moreInCurrentBatch() ) {
/* we need to occasionally check some things. between
@@ -222,27 +268,13 @@ namespace mongo {
/* perhaps we should check this earlier? but not before the rollback checks. */
if( state().recovering() ) {
/* can we go to RS_SECONDARY state? we can if not too old and if minvalid achieved */
- bool golive = false;
OpTime minvalid;
- {
- readlock lk("local.replset.minvalid");
- BSONObj mv;
- if( Helpers::getSingleton("local.replset.minvalid", mv) ) {
- minvalid = mv["ts"]._opTime();
- if( minvalid <= lastOpTimeWritten ) {
- golive=true;
- }
- }
- else
- golive = true; /* must have been the original member */
- }
+ bool golive = ReplSetImpl::tryToGoLiveAsASecondary(minvalid);
if( golive ) {
- sethbmsg("");
- log() << "replSet SECONDARY" << rsLog;
- changeState(MemberState::RS_SECONDARY);
+ ;
}
else {
- sethbmsg(str::stream() << "still syncing, not yet to minValid optime " << minvalid.toString());
+ sethbmsg(str::stream() << "still syncing, not yet to minValid optime" << minvalid.toString());
}
/* todo: too stale capability */
@@ -270,12 +302,40 @@ namespace mongo {
syncApply(o);
_logOpObjRS(o); /* with repl sets we write the ops to our oplog too: */
}
+ int sd = myConfig().slaveDelay;
+ if( sd ) {
+ const OpTime ts = o["ts"]._opTime();
+ long long a = ts.getSecs();
+ long long b = time(0);
+ long long lag = b - a;
+ long long sleeptime = sd - lag;
+ if( sleeptime > 0 ) {
+ uassert(12000, "rs slaveDelay differential too big check clocks and systems", sleeptime < 0x40000000);
+ log() << "replSet temp slavedelay sleep:" << sleeptime << rsLog;
+ if( sleeptime < 60 ) {
+ sleepsecs((int) sleeptime);
+ }
+ else {
+ // sleep(hours) would prevent reconfigs from taking effect & such!
+ long long waitUntil = b + sleeptime;
+ while( 1 ) {
+ sleepsecs(6);
+ if( time(0) >= waitUntil )
+ break;
+ if( box.getPrimary() != primary )
+ break;
+ if( myConfig().slaveDelay != sd ) // reconf
+ break;
+ }
+ }
+ }
+ }
}
}
r.tailCheck();
if( !r.haveCursor() ) {
- log() << "replSet TEMP end syncTail pass with " << hn << rsLog;
- // TODO : reuse our cnonection to the primary.
+ log(1) << "replSet end syncTail pass with " << hn << rsLog;
+ // TODO : reuse our connection to the primary.
return;
}
if( box.getPrimary() != primary )
@@ -290,6 +350,10 @@ namespace mongo {
sleepsecs(1);
return;
}
+ if( sp.state.fatal() ) {
+ sleepsecs(5);
+ return;
+ }
/* later, we can sync from up secondaries if we want. tbd. */
if( sp.primary == 0 )
diff --git a/db/security_commands.cpp b/db/security_commands.cpp
index af03294..7bf2813 100644
--- a/db/security_commands.cpp
+++ b/db/security_commands.cpp
@@ -49,9 +49,7 @@ namespace mongo {
class CmdGetNonce : public Command {
public:
virtual bool requiresAuth() { return false; }
- virtual bool logTheOp() {
- return false;
- }
+ virtual bool logTheOp() { return false; }
virtual bool slaveOk() const {
return true;
}
diff --git a/db/stats/snapshots.cpp b/db/stats/snapshots.cpp
index b439b03..3ce80ca 100644
--- a/db/stats/snapshots.cpp
+++ b/db/stats/snapshots.cpp
@@ -163,7 +163,7 @@ namespace mongo {
ss << usage.count;
ss << "</td><td>";
double per = 100 * ((double)usage.time)/elapsed;
- ss << setprecision(4) << fixed << per << "%";
+ ss << setprecision(1) << fixed << per << "%";
ss << "</td>";
}