summaryrefslogtreecommitdiff
path: root/db/repl/rs.h
diff options
context:
space:
mode:
Diffstat (limited to 'db/repl/rs.h')
-rw-r--r--db/repl/rs.h256
1 files changed, 220 insertions, 36 deletions
diff --git a/db/repl/rs.h b/db/repl/rs.h
index ea9aef1..61041a6 100644
--- a/db/repl/rs.h
+++ b/db/repl/rs.h
@@ -21,13 +21,26 @@
#include "../../util/concurrency/list.h"
#include "../../util/concurrency/value.h"
#include "../../util/concurrency/msg.h"
-#include "../../util/hostandport.h"
+#include "../../util/net/hostandport.h"
#include "../commands.h"
+#include "../oplogreader.h"
#include "rs_exception.h"
#include "rs_optime.h"
#include "rs_member.h"
#include "rs_config.h"
+/**
+ * Order of Events
+ *
+ * On startup, if the --replSet option is present, startReplSets is called.
+ * startReplSets forks off a new thread for replica set activities. It creates
+ * the global theReplSet variable and calls go() on it.
+ *
+ * theReplSet's constructor changes the replica set's state to RS_STARTUP,
+ * starts the replica set manager, and loads the config (if the replica set
+ * has been initialized).
+ */
+
namespace mongo {
struct HowToFixUp;
@@ -41,11 +54,15 @@ namespace mongo {
/* member of a replica set */
class Member : public List1<Member>::Base {
+ private:
+ ~Member(); // intentionally unimplemented as should never be called -- see List1<>::Base.
+ Member(const Member&);
public:
- Member(HostAndPort h, unsigned ord, const ReplSetConfig::MemberCfg *c, bool self);
+ Member(HostAndPort h, unsigned ord, ReplSetConfig::MemberCfg *c, bool self);
string fullName() const { return h().toString(); }
const ReplSetConfig::MemberCfg& config() const { return _config; }
+ ReplSetConfig::MemberCfg& configw() { return _config; }
const HeartbeatInfo& hbinfo() const { return _hbinfo; }
HeartbeatInfo& get_hbinfo() { return _hbinfo; }
string lhb() const { return _hbinfo.lastHeartbeatMsg; }
@@ -58,7 +75,7 @@ namespace mongo {
private:
friend class ReplSetImpl;
- const ReplSetConfig::MemberCfg _config;
+ ReplSetConfig::MemberCfg _config;
const HostAndPort _h;
HeartbeatInfo _hbinfo;
};
@@ -75,6 +92,7 @@ namespace mongo {
const Member* findOtherPrimary(bool& two);
void noteARemoteIsPrimary(const Member *);
+ void checkElectableSet();
virtual void starting();
public:
Manager(ReplSetImpl *rs);
@@ -83,6 +101,47 @@ namespace mongo {
void msgCheckNewState();
};
+ class GhostSync : public task::Server {
+ struct GhostSlave {
+ GhostSlave() : last(0), slave(0), init(false) {}
+ OplogReader reader;
+ OpTime last;
+ Member* slave;
+ bool init;
+ };
+ /**
+ * This is a cache of ghost slaves
+ */
+ typedef map<mongo::OID,GhostSlave> MAP;
+ MAP _ghostCache;
+ RWLock _lock; // protects _ghostCache
+ ReplSetImpl *rs;
+ virtual void starting();
+ public:
+ GhostSync(ReplSetImpl *_rs) : task::Server("rsGhostSync"), _lock("GhostSync"), rs(_rs) {}
+ ~GhostSync() {
+ log() << "~GhostSync() called" << rsLog;
+ }
+
+ /**
+ * Replica sets can sync in a hierarchical fashion, which throws off w
+ * calculation on the master. percolate() faux-syncs from an upstream
+ * node so that the primary will know what the slaves are up to.
+ *
+ * We can't just directly sync to the primary because it could be
+ * unreachable, e.g., S1--->S2--->S3--->P. S2 should ghost sync from S3
+ * and S3 can ghost sync from the primary.
+ *
+ * Say we have an S1--->S2--->P situation and this node is S2. rid
+ * would refer to S1. S2 would create a ghost slave of S1 and connect
+ * it to P (_currentSyncTarget). Then it would use this connection to
+ * pretend to be S1, replicating off of P.
+ */
+ void percolate(const BSONObj& rid, const OpTime& last);
+ void associateSlave(const BSONObj& rid, const int memberId);
+ void updateSlave(const mongo::OID& id, const OpTime& last);
+ };
+
struct Target;
class Consensus {
@@ -92,7 +151,8 @@ namespace mongo {
time_t when;
unsigned who;
};
- Atomic<LastYea> ly;
+ static mutex lyMutex;
+ Guarded<LastYea,lyMutex> ly;
unsigned yea(unsigned memberId); // throws VoteException
void electionFailed(unsigned meid);
void _electSelf();
@@ -117,7 +177,12 @@ namespace mongo {
void multiCommand(BSONObj cmd, list<Target>& L);
};
- /** most operations on a ReplSet object should be done while locked. that logic implemented here. */
+ /**
+ * most operations on a ReplSet object should be done while locked. that
+ * logic implemented here.
+ *
+ * Order of locking: lock the replica set, then take a rwlock.
+ */
class RSBase : boost::noncopyable {
public:
const unsigned magic;
@@ -133,6 +198,7 @@ namespace mongo {
log() << "replSet ~RSBase called" << rsLog;
}
+ public:
class lock {
RSBase& rsbase;
auto_ptr<scoped_lock> sl;
@@ -156,7 +222,6 @@ namespace mongo {
}
};
- public:
/* for asserts */
bool locked() const { return _locked != 0; }
@@ -178,13 +243,19 @@ namespace mongo {
const Member *primary;
};
const SP get() {
- scoped_lock lk(m);
+ rwlock lk(m, false);
return sp;
}
- MemberState getState() const { return sp.state; }
- const Member* getPrimary() const { return sp.primary; }
+ MemberState getState() const {
+ rwlock lk(m, false);
+ return sp.state;
+ }
+ const Member* getPrimary() const {
+ rwlock lk(m, false);
+ return sp.primary;
+ }
void change(MemberState s, const Member *self) {
- scoped_lock lk(m);
+ rwlock lk(m, true);
if( sp.state != s ) {
log() << "replSet " << s.toString() << rsLog;
}
@@ -198,24 +269,25 @@ namespace mongo {
}
}
void set(MemberState s, const Member *p) {
- scoped_lock lk(m);
- sp.state = s; sp.primary = p;
+ rwlock lk(m, true);
+ sp.state = s;
+ sp.primary = p;
}
void setSelfPrimary(const Member *self) { change(MemberState::RS_PRIMARY, self); }
void setOtherPrimary(const Member *mem) {
- scoped_lock lk(m);
+ rwlock lk(m, true);
assert( !sp.state.primary() );
sp.primary = mem;
}
void noteRemoteIsPrimary(const Member *remote) {
- scoped_lock lk(m);
+ rwlock lk(m, true);
if( !sp.state.secondary() && !sp.state.fatal() )
sp.state = MemberState::RS_RECOVERING;
sp.primary = remote;
}
StateBox() : m("StateBox") { }
private:
- mongo::mutex m;
+ RWLock m;
SP sp;
};
@@ -267,10 +339,17 @@ namespace mongo {
bool _freeze(int secs);
private:
void assumePrimary();
- void loadLastOpTimeWritten();
+ void loadLastOpTimeWritten(bool quiet=false);
void changeState(MemberState s);
+
+ /**
+ * Find the closest member (using ping time) with a higher latest optime.
+ */
const Member* getMemberToSyncTo();
- void _changeArbiterState();
+ Member* _currentSyncTarget;
+
+ // set of electable members' _ids
+ set<unsigned> _electableSet;
protected:
// "heartbeat message"
// sent in requestHeartbeat respond in field "hbm"
@@ -278,8 +357,54 @@ namespace mongo {
time_t _hbmsgTime; // when it was logged
public:
void sethbmsg(string s, int logLevel = 0);
+
+ /**
+ * Election with Priorities
+ *
+ * Each node (n) keeps a set of nodes that could be elected primary.
+ * Each node in this set:
+ *
+ * 1. can connect to a majority of the set
+ * 2. has a priority greater than 0
+ * 3. has an optime within 10 seconds of the most up-to-date node
+ * that n can reach
+ *
+ * If a node fails to meet one or more of these criteria, it is removed
+ * from the list. This list is updated whenever the node receives a
+ * heartbeat.
+ *
+ * When a node sends an "am I freshest?" query, the node receiving the
+ * query checks their electable list to make sure that no one else is
+ * electable AND higher priority. If this check passes, the node will
+ * return an "ok" response, if not, it will veto.
+ *
+ * If a node is primary and there is another node with higher priority
+ * on the electable list (i.e., it must be synced to within 10 seconds
+ * of the current primary), the node (or nodes) with connections to both
+ * the primary and the secondary with higher priority will issue
+ * replSetStepDown requests to the primary to allow the higher-priority
+ * node to take over.
+ */
+ void addToElectable(const unsigned m) { lock lk(this); _electableSet.insert(m); }
+ void rmFromElectable(const unsigned m) { lock lk(this); _electableSet.erase(m); }
+ bool iAmElectable() { lock lk(this); return _electableSet.find(_self->id()) != _electableSet.end(); }
+ bool isElectable(const unsigned id) { lock lk(this); return _electableSet.find(id) != _electableSet.end(); }
+ Member* getMostElectable();
protected:
- bool initFromConfig(ReplSetConfig& c, bool reconf=false); // true if ok; throws if config really bad; false if config doesn't include self
+ /**
+ * Load a new config as the replica set's main config.
+ *
+ * If there is a "simple" change (just adding a node), this shortcuts
+ * the config. Returns true if the config was changed. Returns false
+ * if the config doesn't include a this node. Throws an exception if
+ * something goes very wrong.
+ *
+ * Behavior to note:
+ * - locks this
+ * - intentionally leaks the old _cfg and any old _members (if the
+ * change isn't strictly additive)
+ */
+ bool initFromConfig(ReplSetConfig& c, bool reconf=false);
void _fillIsMaster(BSONObjBuilder&);
void _fillIsMasterHost(const Member*, vector<string>&, vector<string>&, vector<string>&);
const ReplSetConfig& config() { return *_cfg; }
@@ -301,27 +426,48 @@ namespace mongo {
const vector<HostAndPort> *_seeds;
ReplSetConfig *_cfg;
- /** load our configuration from admin.replset. try seed machines too.
- @return true if ok; throws if config really bad; false if config doesn't include self
- */
+ /**
+ * Finds the configuration with the highest version number and attempts
+ * load it.
+ */
bool _loadConfigFinish(vector<ReplSetConfig>& v);
+ /**
+ * Gather all possible configs (from command line seeds, our own config
+ * doc, and any hosts listed therein) and try to initiate from the most
+ * recent config we find.
+ */
void loadConfig();
list<HostAndPort> memberHostnames() const;
- const ReplSetConfig::MemberCfg& myConfig() const { return _self->config(); }
+ const ReplSetConfig::MemberCfg& myConfig() const { return _config; }
bool iAmArbiterOnly() const { return myConfig().arbiterOnly; }
- bool iAmPotentiallyHot() const { return myConfig().potentiallyHot(); }
+ bool iAmPotentiallyHot() const {
+ return myConfig().potentiallyHot() && // not an arbiter
+ elect.steppedDown <= time(0) && // not stepped down/frozen
+ state() == MemberState::RS_SECONDARY; // not stale
+ }
protected:
Member *_self;
bool _buildIndexes; // = _self->config().buildIndexes
void setSelfTo(Member *); // use this as it sets buildIndexes var
private:
- List1<Member> _members; /* all members of the set EXCEPT self. */
+ List1<Member> _members; // all members of the set EXCEPT _self.
+ ReplSetConfig::MemberCfg _config; // config of _self
+ unsigned _id; // _id of _self
+ int _maintenanceMode; // if we should stay in recovering state
public:
- unsigned selfId() const { return _self->id(); }
+ // this is called from within a writelock in logOpRS
+ unsigned selfId() const { return _id; }
Manager *mgr;
-
+ GhostSync *ghost;
+ /**
+ * This forces a secondary to go into recovering state and stay there
+ * until this is called again, passing in "false". Multiple threads can
+ * call this and it will leave maintenance mode once all of the callers
+ * have called it again, passing in false.
+ */
+ void setMaintenanceMode(const bool inc);
private:
Member* head() const { return _members.head(); }
public:
@@ -334,6 +480,7 @@ namespace mongo {
friend class CmdReplSetElect;
friend class Member;
friend class Manager;
+ friend class GhostSync;
friend class Consensus;
private:
@@ -352,6 +499,7 @@ namespace mongo {
bool _isStale(OplogReader& r, const string& hn);
public:
void syncThread();
+ const OpTime lastOtherOpTime() const;
};
class ReplSet : public ReplSetImpl {
@@ -365,7 +513,7 @@ namespace mongo {
bool freeze(int secs) { return _freeze(secs); }
string selfFullName() {
- lock lk(this);
+ assert( _self );
return _self->fullName();
}
@@ -385,12 +533,20 @@ namespace mongo {
void summarizeStatus(BSONObjBuilder& b) const { _summarizeStatus(b); }
void fillIsMaster(BSONObjBuilder& b) { _fillIsMaster(b); }
- /* we have a new config (reconfig) - apply it.
- @param comment write a no-op comment to the oplog about it. only makes sense if one is primary and initiating the reconf.
- */
+ /**
+ * We have a new config (reconfig) - apply it.
+ * @param comment write a no-op comment to the oplog about it. only
+ * makes sense if one is primary and initiating the reconf.
+ *
+ * The slaves are updated when they get a heartbeat indicating the new
+ * config. The comment is a no-op.
+ */
void haveNewConfig(ReplSetConfig& c, bool comment);
- /* if we delete old configs, this needs to assure locking. currently we don't so it is ok. */
+ /**
+ * Pointer assignment isn't necessarily atomic, so this needs to assure
+ * locking, even though we don't delete old configs.
+ */
const ReplSetConfig& getConfig() { return config(); }
bool lockedByMe() { return RSBase::lockedByMe(); }
@@ -402,9 +558,10 @@ namespace mongo {
}
};
- /** base class for repl set commands. checks basic things such as in rs mode before the command
- does its real work
- */
+ /**
+ * Base class for repl set commands. Checks basic things such if we're in
+ * rs mode before the command does its real work.
+ */
class ReplSetCommand : public Command {
protected:
ReplSetCommand(const char * s, bool show=false) : Command(s, show) { }
@@ -413,26 +570,53 @@ namespace mongo {
virtual bool logTheOp() { return false; }
virtual LockType locktype() const { return NONE; }
virtual void help( stringstream &help ) const { help << "internal"; }
+
+ /**
+ * Some replica set commands call this and then call check(). This is
+ * intentional, as they might do things before theReplSet is initialized
+ * that still need to be checked for auth.
+ */
+ bool checkAuth(string& errmsg, BSONObjBuilder& result) {
+ if( !noauth && adminOnly() ) {
+ AuthenticationInfo *ai = cc().getAuthenticationInfo();
+ if (!ai->isAuthorizedForLock("admin", locktype())) {
+ errmsg = "replSet command unauthorized";
+ return false;
+ }
+ }
+ return true;
+ }
+
bool check(string& errmsg, BSONObjBuilder& result) {
if( !replSet ) {
errmsg = "not running with --replSet";
return false;
}
+
if( theReplSet == 0 ) {
result.append("startupStatus", ReplSet::startupStatus);
+ string s;
errmsg = ReplSet::startupStatusMsg.empty() ? "replset unknown error 2" : ReplSet::startupStatusMsg.get();
if( ReplSet::startupStatus == 3 )
result.append("info", "run rs.initiate(...) if not yet done for the set");
return false;
}
- return true;
+
+ return checkAuth(errmsg, result);
}
};
+ /**
+ * does local authentication
+ * directly authorizes against AuthenticationInfo
+ */
+ void replLocalAuth();
+
/** inlines ----------------- */
- inline Member::Member(HostAndPort h, unsigned ord, const ReplSetConfig::MemberCfg *c, bool self) :
+ inline Member::Member(HostAndPort h, unsigned ord, ReplSetConfig::MemberCfg *c, bool self) :
_config(*c), _h(h), _hbinfo(ord) {
+ assert(c);
if( self )
_hbinfo.health = 1.0;
}