summaryrefslogtreecommitdiff
path: root/db/repl/rs.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'db/repl/rs.cpp')
-rw-r--r--db/repl/rs.cpp282
1 files changed, 193 insertions, 89 deletions
diff --git a/db/repl/rs.cpp b/db/repl/rs.cpp
index 1c0444a..90ed9f4 100644
--- a/db/repl/rs.cpp
+++ b/db/repl/rs.cpp
@@ -20,9 +20,12 @@
#include "../client.h"
#include "../../client/dbclient.h"
#include "../dbhelpers.h"
+#include "../../s/d_logic.h"
#include "rs.h"
+#include "connections.h"
+#include "../repl.h"
-namespace mongo {
+namespace mongo {
using namespace bson;
@@ -30,18 +33,18 @@ namespace mongo {
ReplSet *theReplSet = 0;
extern string *discoveredSeed;
- void ReplSetImpl::sethbmsg(string s, int logLevel) {
+ void ReplSetImpl::sethbmsg(string s, int logLevel) {
static time_t lastLogged;
_hbmsgTime = time(0);
- if( s == _hbmsg ) {
+ if( s == _hbmsg ) {
// unchanged
if( _hbmsgTime - lastLogged < 60 )
return;
}
unsigned sz = s.size();
- if( sz >= 256 )
+ if( sz >= 256 )
memcpy(_hbmsg, s.c_str(), 255);
else {
_hbmsg[sz] = 0;
@@ -53,7 +56,7 @@ namespace mongo {
}
}
- void ReplSetImpl::assumePrimary() {
+ void ReplSetImpl::assumePrimary() {
assert( iAmPotentiallyHot() );
writelock lk("admin."); // so we are synchronized with _logOp()
box.setSelfPrimary(_self);
@@ -62,17 +65,26 @@ namespace mongo {
void ReplSetImpl::changeState(MemberState s) { box.change(s, _self); }
- void ReplSetImpl::relinquish() {
+ const bool closeOnRelinquish = true;
+
+ void ReplSetImpl::relinquish() {
if( box.getState().primary() ) {
log() << "replSet relinquishing primary state" << rsLog;
- changeState(MemberState::RS_RECOVERING);
-
- /* close sockets that were talking to us */
- /*log() << "replSet closing sockets after reqlinquishing primary" << rsLog;
- MessagingPort::closeAllSockets(1);*/
+ changeState(MemberState::RS_SECONDARY);
+
+ if( closeOnRelinquish ) {
+ /* close sockets that were talking to us so they don't blithly send many writes that will fail
+ with "not master" (of course client could check result code, but in case they are not)
+ */
+ log() << "replSet closing client sockets after reqlinquishing primary" << rsLog;
+ MessagingPort::closeAllSockets(1);
+ }
+
+ // now that all connections were closed, strip this mongod from all sharding details
+ // if and when it gets promoted to a primary again, only then it should reload the sharding state
+ // the rationale here is that this mongod won't bring stale state when it regains primaryhood
+ shardingState.resetShardingState();
- // todo: >
- //changeState(MemberState::RS_SECONDARY);
}
else if( box.getState().startup2() ) {
// ? add comment
@@ -81,26 +93,48 @@ namespace mongo {
}
/* look freshly for who is primary - includes relinquishing ourself. */
- void ReplSetImpl::forgetPrimary() {
- if( box.getState().primary() )
+ void ReplSetImpl::forgetPrimary() {
+ if( box.getState().primary() )
relinquish();
else {
box.setOtherPrimary(0);
}
}
- bool ReplSetImpl::_stepDown() {
+ // for the replSetStepDown command
+ bool ReplSetImpl::_stepDown(int secs) {
lock lk(this);
- if( box.getState().primary() ) {
- changeState(MemberState::RS_RECOVERING);
- elect.steppedDown = time(0) + 60;
- log() << "replSet info stepped down as primary" << rsLog;
+ if( box.getState().primary() ) {
+ elect.steppedDown = time(0) + secs;
+ log() << "replSet info stepping down as primary secs=" << secs << rsLog;
+ relinquish();
return true;
}
return false;
}
- void ReplSetImpl::msgUpdateHBInfo(HeartbeatInfo h) {
+ bool ReplSetImpl::_freeze(int secs) {
+ lock lk(this);
+ /* note if we are primary we remain primary but won't try to elect ourself again until
+ this time period expires.
+ */
+ if( secs == 0 ) {
+ elect.steppedDown = 0;
+ log() << "replSet info 'unfreezing'" << rsLog;
+ }
+ else {
+ if( !box.getState().primary() ) {
+ elect.steppedDown = time(0) + secs;
+ log() << "replSet info 'freezing' for " << secs << " seconds" << rsLog;
+ }
+ else {
+ log() << "replSet info received freeze command but we are primary" << rsLog;
+ }
+ }
+ return true;
+ }
+
+ void ReplSetImpl::msgUpdateHBInfo(HeartbeatInfo h) {
for( Member *m = _members.head(); m; m=m->next() ) {
if( m->id() == h.id() ) {
m->_hbinfo = h;
@@ -109,7 +143,7 @@ namespace mongo {
}
}
- list<HostAndPort> ReplSetImpl::memberHostnames() const {
+ list<HostAndPort> ReplSetImpl::memberHostnames() const {
list<HostAndPort> L;
L.push_back(_self->h());
for( Member *m = _members.head(); m; m = m->next() )
@@ -118,6 +152,7 @@ namespace mongo {
}
void ReplSetImpl::_fillIsMasterHost(const Member *m, vector<string>& hosts, vector<string>& passives, vector<string>& arbiters) {
+ assert( m );
if( m->config().hidden )
return;
@@ -126,8 +161,9 @@ namespace mongo {
}
else if( !m->config().arbiterOnly ) {
if( m->config().slaveDelay ) {
- /* hmmm - we don't list these as they are stale. */
- } else {
+ /* hmmm - we don't list these as they are stale. */
+ }
+ else {
passives.push_back(m->h().toString());
}
}
@@ -147,6 +183,7 @@ namespace mongo {
_fillIsMasterHost(_self, hosts, passives, arbiters);
for( Member *m = _members.head(); m; m = m->next() ) {
+ assert( m );
_fillIsMasterHost(m, hosts, passives, arbiters);
}
@@ -161,23 +198,27 @@ namespace mongo {
}
}
- if( !isp ) {
+ if( !isp ) {
const Member *m = sp.primary;
if( m )
b.append("primary", m->h().toString());
}
if( myConfig().arbiterOnly )
b.append("arbiterOnly", true);
+ if( myConfig().priority == 0 )
+ b.append("passive", true);
if( myConfig().slaveDelay )
b.append("slaveDelay", myConfig().slaveDelay);
if( myConfig().hidden )
b.append("hidden", true);
+ if( !myConfig().buildIndexes )
+ b.append("buildIndexes", false);
}
/** @param cfgString <setname>/<seedhost1>,<seedhost2> */
- void parseReplsetCmdLine(string cfgString, string& setname, vector<HostAndPort>& seeds, set<HostAndPort>& seedSet ) {
- const char *p = cfgString.c_str();
+ void parseReplsetCmdLine(string cfgString, string& setname, vector<HostAndPort>& seeds, set<HostAndPort>& seedSet ) {
+ const char *p = cfgString.c_str();
const char *slash = strchr(p, '/');
if( slash )
setname = string(p, slash-p);
@@ -207,7 +248,8 @@ namespace mongo {
//uassert(13101, "can't use localhost in replset host list", !m.isLocalHost());
if( m.isSelf() ) {
log(1) << "replSet ignoring seed " << m.toString() << " (=self)" << rsLog;
- } else
+ }
+ else
seeds.push_back(m);
if( *comma == 0 )
break;
@@ -216,10 +258,9 @@ namespace mongo {
}
}
- ReplSetImpl::ReplSetImpl(ReplSetCmdline& replSetCmdline) : elect(this),
- _self(0),
- mgr( new Manager(this) )
- {
+ ReplSetImpl::ReplSetImpl(ReplSetCmdline& replSetCmdline) : elect(this),
+ _self(0),
+ mgr( new Manager(this) ) {
_cfg = 0;
memset(_hbmsg, 0, sizeof(_hbmsg));
*_hbmsg = '.'; // temp...just to see
@@ -240,20 +281,21 @@ namespace mongo {
}
for( set<HostAndPort>::iterator i = replSetCmdline.seedSet.begin(); i != replSetCmdline.seedSet.end(); i++ ) {
if( i->isSelf() ) {
- if( sss == 1 )
+ if( sss == 1 )
log(1) << "replSet warning self is listed in the seed list and there are no other seeds listed did you intend that?" << rsLog;
- } else
+ }
+ else
log() << "replSet warning command line seed " << i->toString() << " is not present in the current repl set config" << rsLog;
}
}
void newReplUp();
- void ReplSetImpl::loadLastOpTimeWritten() {
+ void ReplSetImpl::loadLastOpTimeWritten() {
//assert( lastOpTimeWritten.isNull() );
readlock lk(rsoplog);
BSONObj o;
- if( Helpers::getLast(rsoplog, o) ) {
+ if( Helpers::getLast(rsoplog, o) ) {
lastH = o["h"].numberLong();
lastOpTimeWritten = o["ts"]._opTime();
uassert(13290, "bad replSet oplog entry?", !lastOpTimeWritten.isNull());
@@ -261,11 +303,11 @@ namespace mongo {
}
/* call after constructing to start - returns fairly quickly after launching its threads */
- void ReplSetImpl::_go() {
- try {
+ void ReplSetImpl::_go() {
+ try {
loadLastOpTimeWritten();
}
- catch(std::exception& e) {
+ catch(std::exception& e) {
log() << "replSet error fatal couldn't query the local " << rsoplog << " collection. Terminating mongod after 30 seconds." << rsLog;
log() << e.what() << rsLog;
sleepsecs(30);
@@ -283,11 +325,17 @@ namespace mongo {
extern BSONObj *getLastErrorDefault;
+ void ReplSetImpl::setSelfTo(Member *m) {
+ _self = m;
+ if( m ) _buildIndexes = m->config().buildIndexes;
+ else _buildIndexes = true;
+ }
+
/** @param reconf true if this is a reconfiguration and not an initial load of the configuration.
@return true if ok; throws if config really bad; false if config doesn't include self
*/
bool ReplSetImpl::initFromConfig(ReplSetConfig& c, bool reconf) {
- /* NOTE: haveNewConfig() writes the new config to disk before we get here. So
+ /* NOTE: haveNewConfig() writes the new config to disk before we get here. So
we cannot error out at this point, except fatally. Check errors earlier.
*/
lock lk(this);
@@ -302,25 +350,24 @@ namespace mongo {
{
unsigned nfound = 0;
int me = 0;
- for( vector<ReplSetConfig::MemberCfg>::iterator i = c.members.begin(); i != c.members.end(); i++ ) {
+ for( vector<ReplSetConfig::MemberCfg>::iterator i = c.members.begin(); i != c.members.end(); i++ ) {
const ReplSetConfig::MemberCfg& m = *i;
if( m.h.isSelf() ) {
nfound++;
me++;
-
if( !reconf || (_self && _self->id() == (unsigned) m._id) )
;
- else {
+ else {
log() << "replSet " << _self->id() << ' ' << m._id << rsLog;
assert(false);
}
}
- else if( reconf ) {
+ else if( reconf ) {
const Member *old = findById(m._id);
- if( old ) {
+ if( old ) {
nfound++;
assert( (int) old->id() == m._id );
- if( old->config() == m ) {
+ if( old->config() == m ) {
additive = false;
}
}
@@ -328,16 +375,24 @@ namespace mongo {
newOnes.push_back(&m);
}
}
+
+ // change timeout settings, if necessary
+ ScopedConn conn(m.h.toString());
+ conn.setTimeout(c.ho.heartbeatTimeoutMillis/1000.0);
}
if( me == 0 ) {
+ // initial startup with fastsync
+ if (!reconf && replSettings.fastsync) {
+ return false;
+ }
// log() << "replSet config : " << _cfg->toString() << rsLog;
- log() << "replSet error can't find self in the repl set configuration:" << rsLog;
+ log() << "replSet error self not present in the repl set configuration:" << rsLog;
log() << c.toString() << rsLog;
- assert(false);
+ uasserted(13497, "replSet error self not present in the configuration");
}
uassert( 13302, "replSet error self appears twice in the repl set configuration", me<=1 );
- if( reconf && config().members.size() != nfound )
+ if( reconf && config().members.size() != nfound )
additive = false;
}
@@ -347,14 +402,14 @@ namespace mongo {
_name = _cfg->_id;
assert( !_name.empty() );
- if( additive ) {
+ if( additive ) {
log() << "replSet info : additive change to configuration" << rsLog;
for( list<const ReplSetConfig::MemberCfg*>::const_iterator i = newOnes.begin(); i != newOnes.end(); i++ ) {
const ReplSetConfig::MemberCfg* m = *i;
Member *mi = new Member(m->h, m->_id, m, false);
- /** we will indicate that new members are up() initially so that we don't relinquish our
- primary state because we can't (transiently) see a majority. they should be up as we
+ /** we will indicate that new members are up() initially so that we don't relinquish our
+ primary state because we can't (transiently) see a majority. they should be up as we
check that new members are up before getting here on reconfig anyway.
*/
mi->get_hbinfo().health = 0.1;
@@ -373,20 +428,30 @@ namespace mongo {
int oldPrimaryId = -1;
{
const Member *p = box.getPrimary();
- if( p )
+ if( p )
oldPrimaryId = p->id();
}
forgetPrimary();
- _self = 0;
- for( vector<ReplSetConfig::MemberCfg>::iterator i = _cfg->members.begin(); i != _cfg->members.end(); i++ ) {
+
+ bool iWasArbiterOnly = _self ? iAmArbiterOnly() : false;
+ setSelfTo(0);
+ for( vector<ReplSetConfig::MemberCfg>::iterator i = _cfg->members.begin(); i != _cfg->members.end(); i++ ) {
const ReplSetConfig::MemberCfg& m = *i;
Member *mi;
if( m.h.isSelf() ) {
assert( _self == 0 );
- mi = _self = new Member(m.h, m._id, &m, true);
+ mi = new Member(m.h, m._id, &m, true);
+ setSelfTo(mi);
+
+ // if the arbiter status changed
+ if (iWasArbiterOnly ^ iAmArbiterOnly()) {
+ _changeArbiterState();
+ }
+
if( (int)mi->id() == oldPrimaryId )
box.setSelfPrimary(mi);
- } else {
+ }
+ else {
mi = new Member(m.h, m._id, &m, false);
_members.push(mi);
startHealthTaskFor(mi);
@@ -397,26 +462,57 @@ namespace mongo {
return true;
}
+ void startSyncThread();
+
+ void ReplSetImpl::_changeArbiterState() {
+ if (iAmArbiterOnly()) {
+ changeState(MemberState::RS_ARBITER);
+
+ // if there is an oplog, free it
+ // not sure if this is necessary, maybe just leave the oplog and let
+ // the user delete it if they want the space?
+ writelock lk(rsoplog);
+ Client::Context c(rsoplog);
+ NamespaceDetails *d = nsdetails(rsoplog);
+ if (d) {
+ string errmsg;
+ bob res;
+ dropCollection(rsoplog, errmsg, res);
+
+ // clear last op time to force initial sync (if the arbiter
+ // becomes a "normal" server again)
+ lastOpTimeWritten = OpTime();
+ }
+ }
+ else {
+ changeState(MemberState::RS_RECOVERING);
+
+ // oplog will be allocated when sync begins
+ /* TODO : could this cause two sync threads to exist (race condition)? */
+ boost::thread t(startSyncThread);
+ }
+ }
+
// Our own config must be the first one.
- bool ReplSetImpl::_loadConfigFinish(vector<ReplSetConfig>& cfgs) {
+ bool ReplSetImpl::_loadConfigFinish(vector<ReplSetConfig>& cfgs) {
int v = -1;
ReplSetConfig *highest = 0;
int myVersion = -2000;
int n = 0;
- for( vector<ReplSetConfig>::iterator i = cfgs.begin(); i != cfgs.end(); i++ ) {
+ for( vector<ReplSetConfig>::iterator i = cfgs.begin(); i != cfgs.end(); i++ ) {
ReplSetConfig& cfg = *i;
if( ++n == 1 ) myVersion = cfg.version;
- if( cfg.ok() && cfg.version > v ) {
+ if( cfg.ok() && cfg.version > v ) {
highest = &cfg;
v = cfg.version;
}
}
assert( highest );
- if( !initFromConfig(*highest) )
+ if( !initFromConfig(*highest) )
return false;
- if( highest->version > myVersion && highest->version >= 0 ) {
+ if( highest->version > myVersion && highest->version >= 0 ) {
log() << "replSet got config version " << highest->version << " from a remote, saving locally" << rsLog;
writelock lk("admin.");
highest->saveConfigLocally(BSONObj());
@@ -430,7 +526,7 @@ namespace mongo {
startupStatusMsg = "loading " + rsConfigNs + " config (LOADINGCONFIG)";
try {
vector<ReplSetConfig> configs;
- try {
+ try {
configs.push_back( ReplSetConfig(HostAndPort::me()) );
}
catch(DBException& e) {
@@ -438,26 +534,26 @@ namespace mongo {
throw;
}
for( vector<HostAndPort>::const_iterator i = _seeds->begin(); i != _seeds->end(); i++ ) {
- try {
+ try {
configs.push_back( ReplSetConfig(*i) );
}
- catch( DBException& e ) {
+ catch( DBException& e ) {
log() << "replSet exception trying to load config from " << *i << " : " << e.toString() << rsLog;
}
}
- if( discoveredSeed ) {
+ if( discoveredSeed ) {
try {
configs.push_back( ReplSetConfig(HostAndPort(*discoveredSeed)) );
}
- catch( DBException& ) {
+ catch( DBException& ) {
log(1) << "replSet exception trying to load config from discovered seed " << *discoveredSeed << rsLog;
}
}
int nok = 0;
int nempty = 0;
- for( vector<ReplSetConfig>::iterator i = configs.begin(); i != configs.end(); i++ ) {
+ for( vector<ReplSetConfig>::iterator i = configs.begin(); i != configs.end(); i++ ) {
if( i->ok() )
nok++;
if( i->empty() )
@@ -469,7 +565,9 @@ namespace mongo {
startupStatus = EMPTYCONFIG;
startupStatusMsg = "can't get " + rsConfigNs + " config from self or any seed (EMPTYCONFIG)";
log() << "replSet can't get " << rsConfigNs << " config from self or any seed (EMPTYCONFIG)" << rsLog;
- log(1) << "replSet have you ran replSetInitiate yet?" << rsLog;
+ static unsigned once;
+ if( ++once == 1 )
+ log() << "replSet info you may need to run replSetInitiate -- rs.initiate() in the shell -- if that is not already done" << rsLog;
if( _seeds->size() == 0 )
log(1) << "replSet info no seed hosts were specified on the --replSet command line" << rsLog;
}
@@ -483,13 +581,13 @@ namespace mongo {
continue;
}
- if( !_loadConfigFinish(configs) ) {
+ if( !_loadConfigFinish(configs) ) {
log() << "replSet info Couldn't load config yet. Sleeping 20sec and will try again." << rsLog;
sleepsecs(20);
continue;
}
}
- catch(DBException& e) {
+ catch(DBException& e) {
startupStatus = BADCONFIG;
startupStatusMsg = "replSet error loading set config (BADCONFIG)";
log() << "replSet error loading configurations " << e.toString() << rsLog;
@@ -504,30 +602,34 @@ namespace mongo {
startupStatus = STARTED;
}
- void ReplSetImpl::_fatal()
- {
+ void ReplSetImpl::_fatal() {
//lock l(this);
box.set(MemberState::RS_FATAL, 0);
//sethbmsg("fatal error");
- log() << "replSet error fatal, stopping replication" << rsLog;
+ log() << "replSet error fatal, stopping replication" << rsLog;
}
- void ReplSet::haveNewConfig(ReplSetConfig& newConfig, bool addComment) {
+ void ReplSet::haveNewConfig(ReplSetConfig& newConfig, bool addComment) {
lock l(this); // convention is to lock replset before taking the db rwlock
writelock lk("");
bo comment;
if( addComment )
comment = BSON( "msg" << "Reconfig set" << "version" << newConfig.version );
newConfig.saveConfigLocally(comment);
- try {
+ try {
initFromConfig(newConfig, true);
log() << "replSet replSetReconfig new config saved locally" << rsLog;
}
- catch(DBException& e) {
+ catch(DBException& e) {
+ if( e.getCode() == 13497 /* removed from set */ ) {
+ cc().shutdown();
+ dbexit( EXIT_CLEAN , "removed from replica set" ); // never returns
+ assert(0);
+ }
log() << "replSet error unexpected exception in haveNewConfig() : " << e.toString() << rsLog;
_fatal();
}
- catch(...) {
+ catch(...) {
log() << "replSet error unexpected exception in haveNewConfig()" << rsLog;
_fatal();
}
@@ -538,30 +640,33 @@ namespace mongo {
ReplSetConfig c(o);
if( c.version > rs->config().version )
theReplSet->haveNewConfig(c, false);
- else {
- log() << "replSet info msgReceivedNewConfig but version isn't higher " <<
- c.version << ' ' << rs->config().version << rsLog;
+ else {
+ log() << "replSet info msgReceivedNewConfig but version isn't higher " <<
+ c.version << ' ' << rs->config().version << rsLog;
}
}
- /* forked as a thread during startup
- it can run quite a while looking for config. but once found,
+ /* forked as a thread during startup
+ it can run quite a while looking for config. but once found,
a separate thread takes over as ReplSetImpl::Manager, and this thread
terminates.
*/
void startReplSets(ReplSetCmdline *replSetCmdline) {
Client::initThread("startReplSets");
- try {
+ try {
assert( theReplSet == 0 );
if( replSetCmdline == 0 ) {
assert(!replSet);
return;
}
+ if( !noauth ) {
+ cc().getAuthenticationInfo()->authorize("local");
+ }
(theReplSet = new ReplSet(*replSetCmdline))->go();
}
- catch(std::exception& e) {
+ catch(std::exception& e) {
log() << "replSet caught exception in startReplSets thread: " << e.what() << rsLog;
- if( theReplSet )
+ if( theReplSet )
theReplSet->fatal();
}
cc().shutdown();
@@ -569,10 +674,9 @@ namespace mongo {
}
-namespace boost {
+namespace boost {
- void assertion_failed(char const * expr, char const * function, char const * file, long line)
- {
+ void assertion_failed(char const * expr, char const * function, char const * file, long line) {
mongo::log() << "boost assertion failure " << expr << ' ' << function << ' ' << file << ' ' << line << endl;
}