diff options
Diffstat (limited to 'db/repl/rs.cpp')
-rw-r--r-- | db/repl/rs.cpp | 270 |
1 files changed, 170 insertions, 100 deletions
diff --git a/db/repl/rs.cpp b/db/repl/rs.cpp index bbfb057..1fbbc10 100644 --- a/db/repl/rs.cpp +++ b/db/repl/rs.cpp @@ -16,7 +16,7 @@ #include "pch.h" #include "../cmdline.h" -#include "../../util/sock.h" +#include "../../util/net/sock.h" #include "../client.h" #include "../../client/dbclient.h" #include "../dbhelpers.h" @@ -24,14 +24,20 @@ #include "rs.h" #include "connections.h" #include "../repl.h" +#include "../instance.h" -namespace mongo { +using namespace std; +namespace mongo { + using namespace bson; bool replSet = false; ReplSet *theReplSet = 0; - extern string *discoveredSeed; + + bool isCurrentlyAReplSetPrimary() { + return theReplSet && theReplSet->isPrimary(); + } void ReplSetImpl::sethbmsg(string s, int logLevel) { static time_t lastLogged; @@ -57,21 +63,71 @@ namespace mongo { } void ReplSetImpl::assumePrimary() { + LOG(2) << "replSet assuming primary" << endl; assert( iAmPotentiallyHot() ); writelock lk("admin."); // so we are synchronized with _logOp() - box.setSelfPrimary(_self); - //log() << "replSet PRIMARY" << rsLog; // self (" << _self->id() << ") is now primary" << rsLog; + + // Make sure that new OpTimes are higher than existing ones even with clock skew + DBDirectClient c; + BSONObj lastOp = c.findOne( "local.oplog.rs", Query().sort(reverseNaturalObj), NULL, QueryOption_SlaveOk ); + if ( !lastOp.isEmpty() ) { + OpTime::setLast( lastOp[ "ts" ].date() ); + } + + changeState(MemberState::RS_PRIMARY); } void ReplSetImpl::changeState(MemberState s) { box.change(s, _self); } + void ReplSetImpl::setMaintenanceMode(const bool inc) { + lock lk(this); + + if (inc) { + log() << "replSet going into maintenance mode (" << _maintenanceMode << " other tasks)" << rsLog; + + _maintenanceMode++; + changeState(MemberState::RS_RECOVERING); + } + else { + _maintenanceMode--; + // no need to change state, syncTail will try to go live as a secondary soon + + log() << "leaving maintenance mode (" << _maintenanceMode << " other tasks)" << rsLog; + } + } + + Member* ReplSetImpl::getMostElectable() { + lock lk(this); + + Member *max = 0; + + for (set<unsigned>::iterator it = _electableSet.begin(); it != _electableSet.end(); it++) { + const Member *temp = findById(*it); + if (!temp) { + log() << "couldn't find member: " << *it << endl; + _electableSet.erase(*it); + continue; + } + if (!max || max->config().priority < temp->config().priority) { + max = (Member*)temp; + } + } + + return max; + } + const bool closeOnRelinquish = true; void ReplSetImpl::relinquish() { + LOG(2) << "replSet attempting to relinquish" << endl; if( box.getState().primary() ) { - log() << "replSet relinquishing primary state" << rsLog; - changeState(MemberState::RS_SECONDARY); - + { + writelock lk("admin."); // so we are synchronized with _logOp() + + log() << "replSet relinquishing primary state" << rsLog; + changeState(MemberState::RS_SECONDARY); + } + if( closeOnRelinquish ) { /* close sockets that were talking to us so they don't blithly send many writes that will fail with "not master" (of course client could check result code, but in case they are not) @@ -173,6 +229,8 @@ namespace mongo { } void ReplSetImpl::_fillIsMaster(BSONObjBuilder& b) { + lock lk(this); + const StateBox::SP sp = box.get(); bool isp = sp.state.primary(); b.append("setName", name()); @@ -203,9 +261,13 @@ namespace mongo { if( m ) b.append("primary", m->h().toString()); } + else { + b.append("primary", _self->fullName()); + } + if( myConfig().arbiterOnly ) b.append("arbiterOnly", true); - if( myConfig().priority == 0 ) + if( myConfig().priority == 0 && !myConfig().arbiterOnly) b.append("passive", true); if( myConfig().slaveDelay ) b.append("slaveDelay", myConfig().slaveDelay); @@ -213,6 +275,13 @@ namespace mongo { b.append("hidden", true); if( !myConfig().buildIndexes ) b.append("buildIndexes", false); + if( !myConfig().tags.empty() ) { + BSONObjBuilder a; + for( map<string,string>::const_iterator i = myConfig().tags.begin(); i != myConfig().tags.end(); i++ ) + a.append((*i).first, (*i).second); + b.append("tags", a.done()); + } + b.append("me", myConfig().h.toString()); } /** @param cfgString <setname>/<seedhost1>,<seedhost2> */ @@ -259,19 +328,22 @@ namespace mongo { } ReplSetImpl::ReplSetImpl(ReplSetCmdline& replSetCmdline) : elect(this), + _currentSyncTarget(0), + _hbmsgTime(0), _self(0), - mgr( new Manager(this) ) { + _maintenanceMode(0), + mgr( new Manager(this) ), + ghost( new GhostSync(this) ) { + _cfg = 0; memset(_hbmsg, 0, sizeof(_hbmsg)); - *_hbmsg = '.'; // temp...just to see + strcpy( _hbmsg , "initial startup" ); lastH = 0; changeState(MemberState::RS_STARTUP); _seeds = &replSetCmdline.seeds; - //for( vector<HostAndPort>::iterator i = seeds->begin(); i != seeds->end(); i++ ) - // addMemberIfMissing(*i); - log(1) << "replSet beginning startup..." << rsLog; + LOG(1) << "replSet beginning startup..." << rsLog; loadConfig(); @@ -282,7 +354,7 @@ namespace mongo { for( set<HostAndPort>::iterator i = replSetCmdline.seedSet.begin(); i != replSetCmdline.seedSet.end(); i++ ) { if( i->isSelf() ) { if( sss == 1 ) - log(1) << "replSet warning self is listed in the seed list and there are no other seeds listed did you intend that?" << rsLog; + LOG(1) << "replSet warning self is listed in the seed list and there are no other seeds listed did you intend that?" << rsLog; } else log() << "replSet warning command line seed " << i->toString() << " is not present in the current repl set config" << rsLog; @@ -291,14 +363,13 @@ namespace mongo { void newReplUp(); - void ReplSetImpl::loadLastOpTimeWritten() { - //assert( lastOpTimeWritten.isNull() ); + void ReplSetImpl::loadLastOpTimeWritten(bool quiet) { readlock lk(rsoplog); BSONObj o; if( Helpers::getLast(rsoplog, o) ) { lastH = o["h"].numberLong(); lastOpTimeWritten = o["ts"]._opTime(); - uassert(13290, "bad replSet oplog entry?", !lastOpTimeWritten.isNull()); + uassert(13290, "bad replSet oplog entry?", quiet || !lastOpTimeWritten.isNull()); } } @@ -326,7 +397,10 @@ namespace mongo { extern BSONObj *getLastErrorDefault; void ReplSetImpl::setSelfTo(Member *m) { + // already locked in initFromConfig _self = m; + _id = m->id(); + _config = m->config(); if( m ) _buildIndexes = m->config().buildIndexes; else _buildIndexes = true; } @@ -345,29 +419,32 @@ namespace mongo { getLastErrorDefault = new BSONObj( c.getLastErrorDefaults ); } - list<const ReplSetConfig::MemberCfg*> newOnes; + list<ReplSetConfig::MemberCfg*> newOnes; + // additive short-cuts the new config setup. If we are just adding a + // node/nodes and nothing else is changing, this is additive. If it's + // not a reconfig, we're not adding anything bool additive = reconf; { unsigned nfound = 0; int me = 0; for( vector<ReplSetConfig::MemberCfg>::iterator i = c.members.begin(); i != c.members.end(); i++ ) { - const ReplSetConfig::MemberCfg& m = *i; + + ReplSetConfig::MemberCfg& m = *i; if( m.h.isSelf() ) { - nfound++; me++; - if( !reconf || (_self && _self->id() == (unsigned) m._id) ) - ; - else { - log() << "replSet " << _self->id() << ' ' << m._id << rsLog; + } + + if( reconf ) { + if (m.h.isSelf() && (!_self || (int)_self->id() != m._id)) { + log() << "self doesn't match: " << m._id << rsLog; assert(false); } - } - else if( reconf ) { + const Member *old = findById(m._id); if( old ) { nfound++; assert( (int) old->id() == m._id ); - if( old->config() == m ) { + if( old->config() != m ) { additive = false; } } @@ -375,23 +452,21 @@ namespace mongo { newOnes.push_back(&m); } } - - // change timeout settings, if necessary - ScopedConn conn(m.h.toString()); - conn.setTimeout(c.ho.heartbeatTimeoutMillis/1000.0); } if( me == 0 ) { - // initial startup with fastsync - if (!reconf && replSettings.fastsync) { - return false; - } - // log() << "replSet config : " << _cfg->toString() << rsLog; + _members.orphanAll(); + // hbs must continue to pick up new config + // stop sync thread + box.set(MemberState::RS_STARTUP, 0); + + // go into holding pattern log() << "replSet error self not present in the repl set configuration:" << rsLog; log() << c.toString() << rsLog; - uasserted(13497, "replSet error self not present in the configuration"); + return false; } uassert( 13302, "replSet error self appears twice in the repl set configuration", me<=1 ); + // if we found different members that the original config, reload everything if( reconf && config().members.size() != nfound ) additive = false; } @@ -402,10 +477,11 @@ namespace mongo { _name = _cfg->_id; assert( !_name.empty() ); + // this is a shortcut for simple changes if( additive ) { log() << "replSet info : additive change to configuration" << rsLog; - for( list<const ReplSetConfig::MemberCfg*>::const_iterator i = newOnes.begin(); i != newOnes.end(); i++ ) { - const ReplSetConfig::MemberCfg* m = *i; + for( list<ReplSetConfig::MemberCfg*>::const_iterator i = newOnes.begin(); i != newOnes.end(); i++ ) { + ReplSetConfig::MemberCfg *m = *i; Member *mi = new Member(m->h, m->_id, m, false); /** we will indicate that new members are up() initially so that we don't relinquish our @@ -417,6 +493,11 @@ namespace mongo { _members.push(mi); startHealthTaskFor(mi); } + + // if we aren't creating new members, we may have to update the + // groups for the current ones + _cfg->updateMembers(_members); + return true; } @@ -433,21 +514,21 @@ namespace mongo { } forgetPrimary(); - bool iWasArbiterOnly = _self ? iAmArbiterOnly() : false; - setSelfTo(0); + // not setting _self to 0 as other threads use _self w/o locking + int me = 0; + + // For logging + string members = ""; + for( vector<ReplSetConfig::MemberCfg>::iterator i = _cfg->members.begin(); i != _cfg->members.end(); i++ ) { - const ReplSetConfig::MemberCfg& m = *i; + ReplSetConfig::MemberCfg& m = *i; Member *mi; + members += ( members == "" ? "" : ", " ) + m.h.toString(); if( m.h.isSelf() ) { - assert( _self == 0 ); + assert( me++ == 0 ); mi = new Member(m.h, m._id, &m, true); setSelfTo(mi); - // if the arbiter status changed - if (iWasArbiterOnly ^ iAmArbiterOnly()) { - _changeArbiterState(); - } - if( (int)mi->id() == oldPrimaryId ) box.setSelfPrimary(mi); } @@ -459,38 +540,12 @@ namespace mongo { box.setOtherPrimary(mi); } } - return true; - } - void startSyncThread(); - - void ReplSetImpl::_changeArbiterState() { - if (iAmArbiterOnly()) { - changeState(MemberState::RS_ARBITER); - - // if there is an oplog, free it - // not sure if this is necessary, maybe just leave the oplog and let - // the user delete it if they want the space? - writelock lk(rsoplog); - Client::Context c(rsoplog); - NamespaceDetails *d = nsdetails(rsoplog); - if (d) { - string errmsg; - bob res; - dropCollection(rsoplog, errmsg, res); - - // clear last op time to force initial sync (if the arbiter - // becomes a "normal" server again) - lastOpTimeWritten = OpTime(); - } + if( me == 0 ){ + log() << "replSet warning did not detect own host in full reconfig, members " << members << " config: " << c << rsLog; } - else { - changeState(MemberState::RS_RECOVERING); - // oplog will be allocated when sync begins - /* TODO : could this cause two sync threads to exist (race condition)? */ - boost::thread t(startSyncThread); - } + return true; } // Our own config must be the first one. @@ -514,7 +569,6 @@ namespace mongo { if( highest->version > myVersion && highest->version >= 0 ) { log() << "replSet got config version " << highest->version << " from a remote, saving locally" << rsLog; - writelock lk("admin."); highest->saveConfigLocally(BSONObj()); } return true; @@ -523,7 +577,7 @@ namespace mongo { void ReplSetImpl::loadConfig() { while( 1 ) { startupStatus = LOADINGCONFIG; - startupStatusMsg = "loading " + rsConfigNs + " config (LOADINGCONFIG)"; + startupStatusMsg.set("loading " + rsConfigNs + " config (LOADINGCONFIG)"); try { vector<ReplSetConfig> configs; try { @@ -531,7 +585,6 @@ namespace mongo { } catch(DBException& e) { log() << "replSet exception loading our local replset configuration object : " << e.toString() << rsLog; - throw; } for( vector<HostAndPort>::const_iterator i = _seeds->begin(); i != _seeds->end(); i++ ) { try { @@ -542,12 +595,25 @@ namespace mongo { } } - if( discoveredSeed ) { + if( replSettings.discoveredSeeds.size() > 0 ) { + for (set<string>::iterator i = replSettings.discoveredSeeds.begin(); i != replSettings.discoveredSeeds.end(); i++) { + try { + configs.push_back( ReplSetConfig(HostAndPort(*i)) ); + } + catch( DBException& ) { + log(1) << "replSet exception trying to load config from discovered seed " << *i << rsLog; + replSettings.discoveredSeeds.erase(*i); + } + } + } + + if (!replSettings.reconfig.isEmpty()) { try { - configs.push_back( ReplSetConfig(HostAndPort(*discoveredSeed)) ); + configs.push_back(ReplSetConfig(replSettings.reconfig, true)); } - catch( DBException& ) { - log(1) << "replSet exception trying to load config from discovered seed " << *discoveredSeed << rsLog; + catch( DBException& re) { + log() << "couldn't load reconfig: " << re.what() << endl; + replSettings.reconfig = BSONObj(); } } @@ -563,17 +629,17 @@ namespace mongo { if( nempty == (int) configs.size() ) { startupStatus = EMPTYCONFIG; - startupStatusMsg = "can't get " + rsConfigNs + " config from self or any seed (EMPTYCONFIG)"; + startupStatusMsg.set("can't get " + rsConfigNs + " config from self or any seed (EMPTYCONFIG)"); log() << "replSet can't get " << rsConfigNs << " config from self or any seed (EMPTYCONFIG)" << rsLog; static unsigned once; if( ++once == 1 ) log() << "replSet info you may need to run replSetInitiate -- rs.initiate() in the shell -- if that is not already done" << rsLog; if( _seeds->size() == 0 ) - log(1) << "replSet info no seed hosts were specified on the --replSet command line" << rsLog; + LOG(1) << "replSet info no seed hosts were specified on the --replSet command line" << rsLog; } else { startupStatus = EMPTYUNREACHABLE; - startupStatusMsg = "can't currently get " + rsConfigNs + " config from self or any seed (EMPTYUNREACHABLE)"; + startupStatusMsg.set("can't currently get " + rsConfigNs + " config from self or any seed (EMPTYUNREACHABLE)"); log() << "replSet can't get " << rsConfigNs << " config from self or any seed (yet)" << rsLog; } @@ -589,7 +655,7 @@ namespace mongo { } catch(DBException& e) { startupStatus = BADCONFIG; - startupStatusMsg = "replSet error loading set config (BADCONFIG)"; + startupStatusMsg.set("replSet error loading set config (BADCONFIG)"); log() << "replSet error loading configurations " << e.toString() << rsLog; log() << "replSet error replication will not start" << rsLog; sethbmsg("error loading set config"); @@ -598,27 +664,26 @@ namespace mongo { } break; } - startupStatusMsg = "? started"; + startupStatusMsg.set("? started"); startupStatus = STARTED; } void ReplSetImpl::_fatal() { - //lock l(this); box.set(MemberState::RS_FATAL, 0); - //sethbmsg("fatal error"); log() << "replSet error fatal, stopping replication" << rsLog; } void ReplSet::haveNewConfig(ReplSetConfig& newConfig, bool addComment) { - lock l(this); // convention is to lock replset before taking the db rwlock - writelock lk(""); bo comment; if( addComment ) comment = BSON( "msg" << "Reconfig set" << "version" << newConfig.version ); + newConfig.saveConfigLocally(comment); + try { - initFromConfig(newConfig, true); - log() << "replSet replSetReconfig new config saved locally" << rsLog; + if (initFromConfig(newConfig, true)) { + log() << "replSet replSetReconfig new config saved locally" << rsLog; + } } catch(DBException& e) { if( e.getCode() == 13497 /* removed from set */ ) { @@ -652,16 +717,14 @@ namespace mongo { terminates. */ void startReplSets(ReplSetCmdline *replSetCmdline) { - Client::initThread("startReplSets"); + Client::initThread("rsStart"); try { assert( theReplSet == 0 ); if( replSetCmdline == 0 ) { assert(!replSet); return; } - if( !noauth ) { - cc().getAuthenticationInfo()->authorize("local"); - } + replLocalAuth(); (theReplSet = new ReplSet(*replSetCmdline))->go(); } catch(std::exception& e) { @@ -672,6 +735,13 @@ namespace mongo { cc().shutdown(); } + void replLocalAuth() { + if ( noauth ) + return; + cc().getAuthenticationInfo()->authorize("local","_repl"); + } + + } namespace boost { |