diff options
Diffstat (limited to 'db/repl/rs.h')
-rw-r--r-- | db/repl/rs.h | 256 |
1 files changed, 220 insertions, 36 deletions
diff --git a/db/repl/rs.h b/db/repl/rs.h index ea9aef1..61041a6 100644 --- a/db/repl/rs.h +++ b/db/repl/rs.h @@ -21,13 +21,26 @@ #include "../../util/concurrency/list.h" #include "../../util/concurrency/value.h" #include "../../util/concurrency/msg.h" -#include "../../util/hostandport.h" +#include "../../util/net/hostandport.h" #include "../commands.h" +#include "../oplogreader.h" #include "rs_exception.h" #include "rs_optime.h" #include "rs_member.h" #include "rs_config.h" +/** + * Order of Events + * + * On startup, if the --replSet option is present, startReplSets is called. + * startReplSets forks off a new thread for replica set activities. It creates + * the global theReplSet variable and calls go() on it. + * + * theReplSet's constructor changes the replica set's state to RS_STARTUP, + * starts the replica set manager, and loads the config (if the replica set + * has been initialized). + */ + namespace mongo { struct HowToFixUp; @@ -41,11 +54,15 @@ namespace mongo { /* member of a replica set */ class Member : public List1<Member>::Base { + private: + ~Member(); // intentionally unimplemented as should never be called -- see List1<>::Base. + Member(const Member&); public: - Member(HostAndPort h, unsigned ord, const ReplSetConfig::MemberCfg *c, bool self); + Member(HostAndPort h, unsigned ord, ReplSetConfig::MemberCfg *c, bool self); string fullName() const { return h().toString(); } const ReplSetConfig::MemberCfg& config() const { return _config; } + ReplSetConfig::MemberCfg& configw() { return _config; } const HeartbeatInfo& hbinfo() const { return _hbinfo; } HeartbeatInfo& get_hbinfo() { return _hbinfo; } string lhb() const { return _hbinfo.lastHeartbeatMsg; } @@ -58,7 +75,7 @@ namespace mongo { private: friend class ReplSetImpl; - const ReplSetConfig::MemberCfg _config; + ReplSetConfig::MemberCfg _config; const HostAndPort _h; HeartbeatInfo _hbinfo; }; @@ -75,6 +92,7 @@ namespace mongo { const Member* findOtherPrimary(bool& two); void noteARemoteIsPrimary(const Member *); + void checkElectableSet(); virtual void starting(); public: Manager(ReplSetImpl *rs); @@ -83,6 +101,47 @@ namespace mongo { void msgCheckNewState(); }; + class GhostSync : public task::Server { + struct GhostSlave { + GhostSlave() : last(0), slave(0), init(false) {} + OplogReader reader; + OpTime last; + Member* slave; + bool init; + }; + /** + * This is a cache of ghost slaves + */ + typedef map<mongo::OID,GhostSlave> MAP; + MAP _ghostCache; + RWLock _lock; // protects _ghostCache + ReplSetImpl *rs; + virtual void starting(); + public: + GhostSync(ReplSetImpl *_rs) : task::Server("rsGhostSync"), _lock("GhostSync"), rs(_rs) {} + ~GhostSync() { + log() << "~GhostSync() called" << rsLog; + } + + /** + * Replica sets can sync in a hierarchical fashion, which throws off w + * calculation on the master. percolate() faux-syncs from an upstream + * node so that the primary will know what the slaves are up to. + * + * We can't just directly sync to the primary because it could be + * unreachable, e.g., S1--->S2--->S3--->P. S2 should ghost sync from S3 + * and S3 can ghost sync from the primary. + * + * Say we have an S1--->S2--->P situation and this node is S2. rid + * would refer to S1. S2 would create a ghost slave of S1 and connect + * it to P (_currentSyncTarget). Then it would use this connection to + * pretend to be S1, replicating off of P. + */ + void percolate(const BSONObj& rid, const OpTime& last); + void associateSlave(const BSONObj& rid, const int memberId); + void updateSlave(const mongo::OID& id, const OpTime& last); + }; + struct Target; class Consensus { @@ -92,7 +151,8 @@ namespace mongo { time_t when; unsigned who; }; - Atomic<LastYea> ly; + static mutex lyMutex; + Guarded<LastYea,lyMutex> ly; unsigned yea(unsigned memberId); // throws VoteException void electionFailed(unsigned meid); void _electSelf(); @@ -117,7 +177,12 @@ namespace mongo { void multiCommand(BSONObj cmd, list<Target>& L); }; - /** most operations on a ReplSet object should be done while locked. that logic implemented here. */ + /** + * most operations on a ReplSet object should be done while locked. that + * logic implemented here. + * + * Order of locking: lock the replica set, then take a rwlock. + */ class RSBase : boost::noncopyable { public: const unsigned magic; @@ -133,6 +198,7 @@ namespace mongo { log() << "replSet ~RSBase called" << rsLog; } + public: class lock { RSBase& rsbase; auto_ptr<scoped_lock> sl; @@ -156,7 +222,6 @@ namespace mongo { } }; - public: /* for asserts */ bool locked() const { return _locked != 0; } @@ -178,13 +243,19 @@ namespace mongo { const Member *primary; }; const SP get() { - scoped_lock lk(m); + rwlock lk(m, false); return sp; } - MemberState getState() const { return sp.state; } - const Member* getPrimary() const { return sp.primary; } + MemberState getState() const { + rwlock lk(m, false); + return sp.state; + } + const Member* getPrimary() const { + rwlock lk(m, false); + return sp.primary; + } void change(MemberState s, const Member *self) { - scoped_lock lk(m); + rwlock lk(m, true); if( sp.state != s ) { log() << "replSet " << s.toString() << rsLog; } @@ -198,24 +269,25 @@ namespace mongo { } } void set(MemberState s, const Member *p) { - scoped_lock lk(m); - sp.state = s; sp.primary = p; + rwlock lk(m, true); + sp.state = s; + sp.primary = p; } void setSelfPrimary(const Member *self) { change(MemberState::RS_PRIMARY, self); } void setOtherPrimary(const Member *mem) { - scoped_lock lk(m); + rwlock lk(m, true); assert( !sp.state.primary() ); sp.primary = mem; } void noteRemoteIsPrimary(const Member *remote) { - scoped_lock lk(m); + rwlock lk(m, true); if( !sp.state.secondary() && !sp.state.fatal() ) sp.state = MemberState::RS_RECOVERING; sp.primary = remote; } StateBox() : m("StateBox") { } private: - mongo::mutex m; + RWLock m; SP sp; }; @@ -267,10 +339,17 @@ namespace mongo { bool _freeze(int secs); private: void assumePrimary(); - void loadLastOpTimeWritten(); + void loadLastOpTimeWritten(bool quiet=false); void changeState(MemberState s); + + /** + * Find the closest member (using ping time) with a higher latest optime. + */ const Member* getMemberToSyncTo(); - void _changeArbiterState(); + Member* _currentSyncTarget; + + // set of electable members' _ids + set<unsigned> _electableSet; protected: // "heartbeat message" // sent in requestHeartbeat respond in field "hbm" @@ -278,8 +357,54 @@ namespace mongo { time_t _hbmsgTime; // when it was logged public: void sethbmsg(string s, int logLevel = 0); + + /** + * Election with Priorities + * + * Each node (n) keeps a set of nodes that could be elected primary. + * Each node in this set: + * + * 1. can connect to a majority of the set + * 2. has a priority greater than 0 + * 3. has an optime within 10 seconds of the most up-to-date node + * that n can reach + * + * If a node fails to meet one or more of these criteria, it is removed + * from the list. This list is updated whenever the node receives a + * heartbeat. + * + * When a node sends an "am I freshest?" query, the node receiving the + * query checks their electable list to make sure that no one else is + * electable AND higher priority. If this check passes, the node will + * return an "ok" response, if not, it will veto. + * + * If a node is primary and there is another node with higher priority + * on the electable list (i.e., it must be synced to within 10 seconds + * of the current primary), the node (or nodes) with connections to both + * the primary and the secondary with higher priority will issue + * replSetStepDown requests to the primary to allow the higher-priority + * node to take over. + */ + void addToElectable(const unsigned m) { lock lk(this); _electableSet.insert(m); } + void rmFromElectable(const unsigned m) { lock lk(this); _electableSet.erase(m); } + bool iAmElectable() { lock lk(this); return _electableSet.find(_self->id()) != _electableSet.end(); } + bool isElectable(const unsigned id) { lock lk(this); return _electableSet.find(id) != _electableSet.end(); } + Member* getMostElectable(); protected: - bool initFromConfig(ReplSetConfig& c, bool reconf=false); // true if ok; throws if config really bad; false if config doesn't include self + /** + * Load a new config as the replica set's main config. + * + * If there is a "simple" change (just adding a node), this shortcuts + * the config. Returns true if the config was changed. Returns false + * if the config doesn't include a this node. Throws an exception if + * something goes very wrong. + * + * Behavior to note: + * - locks this + * - intentionally leaks the old _cfg and any old _members (if the + * change isn't strictly additive) + */ + bool initFromConfig(ReplSetConfig& c, bool reconf=false); void _fillIsMaster(BSONObjBuilder&); void _fillIsMasterHost(const Member*, vector<string>&, vector<string>&, vector<string>&); const ReplSetConfig& config() { return *_cfg; } @@ -301,27 +426,48 @@ namespace mongo { const vector<HostAndPort> *_seeds; ReplSetConfig *_cfg; - /** load our configuration from admin.replset. try seed machines too. - @return true if ok; throws if config really bad; false if config doesn't include self - */ + /** + * Finds the configuration with the highest version number and attempts + * load it. + */ bool _loadConfigFinish(vector<ReplSetConfig>& v); + /** + * Gather all possible configs (from command line seeds, our own config + * doc, and any hosts listed therein) and try to initiate from the most + * recent config we find. + */ void loadConfig(); list<HostAndPort> memberHostnames() const; - const ReplSetConfig::MemberCfg& myConfig() const { return _self->config(); } + const ReplSetConfig::MemberCfg& myConfig() const { return _config; } bool iAmArbiterOnly() const { return myConfig().arbiterOnly; } - bool iAmPotentiallyHot() const { return myConfig().potentiallyHot(); } + bool iAmPotentiallyHot() const { + return myConfig().potentiallyHot() && // not an arbiter + elect.steppedDown <= time(0) && // not stepped down/frozen + state() == MemberState::RS_SECONDARY; // not stale + } protected: Member *_self; bool _buildIndexes; // = _self->config().buildIndexes void setSelfTo(Member *); // use this as it sets buildIndexes var private: - List1<Member> _members; /* all members of the set EXCEPT self. */ + List1<Member> _members; // all members of the set EXCEPT _self. + ReplSetConfig::MemberCfg _config; // config of _self + unsigned _id; // _id of _self + int _maintenanceMode; // if we should stay in recovering state public: - unsigned selfId() const { return _self->id(); } + // this is called from within a writelock in logOpRS + unsigned selfId() const { return _id; } Manager *mgr; - + GhostSync *ghost; + /** + * This forces a secondary to go into recovering state and stay there + * until this is called again, passing in "false". Multiple threads can + * call this and it will leave maintenance mode once all of the callers + * have called it again, passing in false. + */ + void setMaintenanceMode(const bool inc); private: Member* head() const { return _members.head(); } public: @@ -334,6 +480,7 @@ namespace mongo { friend class CmdReplSetElect; friend class Member; friend class Manager; + friend class GhostSync; friend class Consensus; private: @@ -352,6 +499,7 @@ namespace mongo { bool _isStale(OplogReader& r, const string& hn); public: void syncThread(); + const OpTime lastOtherOpTime() const; }; class ReplSet : public ReplSetImpl { @@ -365,7 +513,7 @@ namespace mongo { bool freeze(int secs) { return _freeze(secs); } string selfFullName() { - lock lk(this); + assert( _self ); return _self->fullName(); } @@ -385,12 +533,20 @@ namespace mongo { void summarizeStatus(BSONObjBuilder& b) const { _summarizeStatus(b); } void fillIsMaster(BSONObjBuilder& b) { _fillIsMaster(b); } - /* we have a new config (reconfig) - apply it. - @param comment write a no-op comment to the oplog about it. only makes sense if one is primary and initiating the reconf. - */ + /** + * We have a new config (reconfig) - apply it. + * @param comment write a no-op comment to the oplog about it. only + * makes sense if one is primary and initiating the reconf. + * + * The slaves are updated when they get a heartbeat indicating the new + * config. The comment is a no-op. + */ void haveNewConfig(ReplSetConfig& c, bool comment); - /* if we delete old configs, this needs to assure locking. currently we don't so it is ok. */ + /** + * Pointer assignment isn't necessarily atomic, so this needs to assure + * locking, even though we don't delete old configs. + */ const ReplSetConfig& getConfig() { return config(); } bool lockedByMe() { return RSBase::lockedByMe(); } @@ -402,9 +558,10 @@ namespace mongo { } }; - /** base class for repl set commands. checks basic things such as in rs mode before the command - does its real work - */ + /** + * Base class for repl set commands. Checks basic things such if we're in + * rs mode before the command does its real work. + */ class ReplSetCommand : public Command { protected: ReplSetCommand(const char * s, bool show=false) : Command(s, show) { } @@ -413,26 +570,53 @@ namespace mongo { virtual bool logTheOp() { return false; } virtual LockType locktype() const { return NONE; } virtual void help( stringstream &help ) const { help << "internal"; } + + /** + * Some replica set commands call this and then call check(). This is + * intentional, as they might do things before theReplSet is initialized + * that still need to be checked for auth. + */ + bool checkAuth(string& errmsg, BSONObjBuilder& result) { + if( !noauth && adminOnly() ) { + AuthenticationInfo *ai = cc().getAuthenticationInfo(); + if (!ai->isAuthorizedForLock("admin", locktype())) { + errmsg = "replSet command unauthorized"; + return false; + } + } + return true; + } + bool check(string& errmsg, BSONObjBuilder& result) { if( !replSet ) { errmsg = "not running with --replSet"; return false; } + if( theReplSet == 0 ) { result.append("startupStatus", ReplSet::startupStatus); + string s; errmsg = ReplSet::startupStatusMsg.empty() ? "replset unknown error 2" : ReplSet::startupStatusMsg.get(); if( ReplSet::startupStatus == 3 ) result.append("info", "run rs.initiate(...) if not yet done for the set"); return false; } - return true; + + return checkAuth(errmsg, result); } }; + /** + * does local authentication + * directly authorizes against AuthenticationInfo + */ + void replLocalAuth(); + /** inlines ----------------- */ - inline Member::Member(HostAndPort h, unsigned ord, const ReplSetConfig::MemberCfg *c, bool self) : + inline Member::Member(HostAndPort h, unsigned ord, ReplSetConfig::MemberCfg *c, bool self) : _config(*c), _h(h), _hbinfo(ord) { + assert(c); if( self ) _hbinfo.health = 1.0; } |