path: root/db/repl/rs.h
diff options
Diffstat (limited to 'db/repl/rs.h')
1 files changed, 415 insertions, 0 deletions
diff --git a/db/repl/rs.h b/db/repl/rs.h
new file mode 100644
index 0000000..17a070c
--- /dev/null
+++ b/db/repl/rs.h
@@ -0,0 +1,415 @@
+// /db/repl/rs.h
+* Copyright (C) 2008 10gen Inc.
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* GNU Affero General Public License for more details.
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <>.
+#pragma once
+#include "../../util/concurrency/list.h"
+#include "../../util/concurrency/value.h"
+#include "../../util/concurrency/msg.h"
+#include "../../util/hostandport.h"
+#include "../commands.h"
+#include "rs_exception.h"
+#include "rs_optime.h"
+#include "rs_member.h"
+#include "rs_config.h"
+namespace mongo {
+ struct HowToFixUp;
+ struct Target;
+ class DBClientConnection;
+ class ReplSetImpl;
+ class OplogReader;
+ extern bool replSet; // true if using repl sets
+ extern class ReplSet *theReplSet; // null until initialized
+ extern Tee *rsLog;
+ /* member of a replica set */
+ class Member : public List1<Member>::Base {
+ public:
+ Member(HostAndPort h, unsigned ord, const ReplSetConfig::MemberCfg *c, bool self);
+ string fullName() const { return h().toString(); }
+ const ReplSetConfig::MemberCfg& config() const { return *_config; }
+ const HeartbeatInfo& hbinfo() const { return _hbinfo; }
+ string lhb() { return _hbinfo.lastHeartbeatMsg; }
+ MemberState state() const { return _hbinfo.hbstate; }
+ const HostAndPort& h() const { return _h; }
+ unsigned id() const { return; }
+ bool potentiallyHot() const { return _config->potentiallyHot(); } // not arbiter, not priority 0
+ void summarizeMember(stringstream& s) const;
+ friend class ReplSetImpl;
+ private:
+ const ReplSetConfig::MemberCfg *_config; /* todo: when this changes??? */
+ HostAndPort _h;
+ HeartbeatInfo _hbinfo;
+ };
+ class Manager : public task::Server {
+ ReplSetImpl *rs;
+ bool busyWithElectSelf;
+ int _primary;
+ const Member* findOtherPrimary();
+ void noteARemoteIsPrimary(const Member *);
+ virtual void starting();
+ public:
+ Manager(ReplSetImpl *rs);
+ ~Manager();
+ void msgReceivedNewConfig(BSONObj);
+ void msgCheckNewState();
+ };
+ struct Target;
+ class Consensus {
+ ReplSetImpl &rs;
+ struct LastYea {
+ LastYea() : when(0), who(0xffffffff) { }
+ time_t when;
+ unsigned who;
+ };
+ Atomic<LastYea> ly;
+ unsigned yea(unsigned memberId); // throws VoteException
+ void electionFailed(unsigned meid);
+ void _electSelf();
+ bool weAreFreshest(bool& allUp, int& nTies);
+ bool sleptLast; // slept last elect() pass
+ public:
+ Consensus(ReplSetImpl *t) : rs(*t) {
+ sleptLast = false;
+ steppedDown = 0;
+ }
+ /* if we've stepped down, this is when we are allowed to try to elect ourself again.
+ todo: handle possible weirdnesses at clock skews etc.
+ */
+ time_t steppedDown;
+ int totalVotes() const;
+ bool aMajoritySeemsToBeUp() const;
+ void electSelf();
+ void electCmdReceived(BSONObj, BSONObjBuilder*);
+ void multiCommand(BSONObj cmd, list<Target>& L);
+ };
+ /** most operations on a ReplSet object should be done while locked. that logic implemented here. */
+ class RSBase : boost::noncopyable {
+ public:
+ const unsigned magic;
+ void assertValid() { assert( magic == 0x12345677 ); }
+ private:
+ mutex m;
+ int _locked;
+ ThreadLocalValue<bool> _lockedByMe;
+ protected:
+ RSBase() : magic(0x12345677), m("RSBase"), _locked(0) { }
+ ~RSBase() {
+ log() << "~RSBase should never be called?" << rsLog;
+ assert(false);
+ }
+ class lock {
+ RSBase& rsbase;
+ auto_ptr<scoped_lock> sl;
+ public:
+ lock(RSBase* b) : rsbase(*b) {
+ if( rsbase._lockedByMe.get() )
+ return; // recursive is ok...
+ sl.reset( new scoped_lock(rsbase.m) );
+ DEV assert(rsbase._locked == 0);
+ rsbase._locked++;
+ rsbase._lockedByMe.set(true);
+ }
+ ~lock() {
+ if( sl.get() ) {
+ assert( rsbase._lockedByMe.get() );
+ DEV assert(rsbase._locked == 1);
+ rsbase._lockedByMe.set(false);
+ rsbase._locked--;
+ }
+ }
+ };
+ public:
+ /* for asserts */
+ bool locked() const { return _locked != 0; }
+ /* if true, is locked, and was locked by this thread. note if false, it could be in the lock or not for another
+ just for asserts & such so we can make the contracts clear on who locks what when.
+ we don't use these locks that frequently, so the little bit of overhead is fine.
+ */
+ bool lockedByMe() { return _lockedByMe.get(); }
+ };
+ class ReplSetHealthPollTask;
+ /* safe container for our state that keeps member pointer and state variables always aligned */
+ class StateBox : boost::noncopyable {
+ public:
+ struct SP { // SP is like pair<MemberState,const Member *> but nicer
+ SP() : state(MemberState::RS_STARTUP), primary(0) { }
+ MemberState state;
+ const Member *primary;
+ };
+ const SP get() {
+ scoped_lock lk(m);
+ return sp;
+ }
+ MemberState getState() const { return sp.state; }
+ const Member* getPrimary() const { return sp.primary; }
+ void change(MemberState s, const Member *self) {
+ scoped_lock lk(m);
+ sp.state = s;
+ if( s.primary() ) {
+ sp.primary = self;
+ }
+ else {
+ if( self == sp.primary )
+ sp.primary = 0;
+ }
+ }
+ void set(MemberState s, const Member *p) {
+ scoped_lock lk(m);
+ sp.state = s; sp.primary = p;
+ }
+ void setSelfPrimary(const Member *self) { change(MemberState::RS_PRIMARY, self); }
+ void setOtherPrimary(const Member *mem) {
+ scoped_lock lk(m);
+ assert( !sp.state.primary() );
+ sp.primary = mem;
+ }
+ StateBox() : m("StateBox") { }
+ private:
+ mutex m;
+ SP sp;
+ };
+ void parseReplsetCmdLine(string cfgString, string& setname, vector<HostAndPort>& seeds, set<HostAndPort>& seedSet );
+ /** Parameter given to the --replSet command line option (parsed).
+ Syntax is "<setname>/<seedhost1>,<seedhost2>"
+ where setname is a name and seedhost is "<host>[:<port>]" */
+ class ReplSetCmdline {
+ public:
+ ReplSetCmdline(string cfgString) { parseReplsetCmdLine(cfgString, setname, seeds, seedSet); }
+ string setname;
+ vector<HostAndPort> seeds;
+ set<HostAndPort> seedSet;
+ };
+ /* information about the entire repl set, such as the various servers in the set, and their state */
+ /* note: We currently do not free mem when the set goes away - it is assumed the replset is a
+ singleton and long lived.
+ */
+ class ReplSetImpl : protected RSBase {
+ public:
+ /** info on our state if the replset isn't yet "up". for example, if we are pre-initiation. */
+ enum StartupStatus {
+ };
+ static StartupStatus startupStatus;
+ static string startupStatusMsg;
+ static string stateAsStr(MemberState state);
+ static string stateAsHtml(MemberState state);
+ /* todo thread */
+ void msgUpdateHBInfo(HeartbeatInfo);
+ StateBox box;
+ OpTime lastOpTimeWritten;
+ long long lastH; // hash we use to make sure we are reading the right flow of ops and aren't on an out-of-date "fork"
+ private:
+ set<ReplSetHealthPollTask*> healthTasks;
+ void endOldHealthTasks();
+ void startHealthTaskFor(Member *m);
+ private:
+ Consensus elect;
+ bool ok() const { return !box.getState().fatal(); }
+ void relinquish();
+ void forgetPrimary();
+ protected:
+ bool _stepDown();
+ private:
+ void assumePrimary();
+ void loadLastOpTimeWritten();
+ void changeState(MemberState s);
+ protected:
+ // "heartbeat message"
+ // sent in requestHeartbeat respond in field "hbm"
+ char _hbmsg[256]; // we change this unlocked, thus not an stl::string
+ public:
+ void sethbmsg(string s, int logLevel = 0);
+ protected:
+ bool initFromConfig(ReplSetConfig& c); // true if ok; throws if config really bad; false if config doesn't include self
+ void _fillIsMaster(BSONObjBuilder&);
+ void _fillIsMasterHost(const Member*, vector<string>&, vector<string>&, vector<string>&);
+ const ReplSetConfig& config() { return *_cfg; }
+ string name() const { return _name; } /* @return replica set's logical name */
+ MemberState state() const { return box.getState(); }
+ void _fatal();
+ void _getOplogDiagsAsHtml(unsigned server_id, stringstream& ss) const;
+ void _summarizeAsHtml(stringstream&) const;
+ void _summarizeStatus(BSONObjBuilder&) const; // for replSetGetStatus command
+ /* throws exception if a problem initializing. */
+ ReplSetImpl(ReplSetCmdline&);
+ /* call afer constructing to start - returns fairly quickly after launching its threads */
+ void _go();
+ private:
+ string _name;
+ const vector<HostAndPort> *_seeds;
+ ReplSetConfig *_cfg;
+ /** load our configuration from admin.replset. try seed machines too.
+ @return true if ok; throws if config really bad; false if config doesn't include self
+ */
+ bool _loadConfigFinish(vector<ReplSetConfig>& v);
+ void loadConfig();
+ list<HostAndPort> memberHostnames() const;
+ const ReplSetConfig::MemberCfg& myConfig() const { return _self->config(); }
+ bool iAmArbiterOnly() const { return myConfig().arbiterOnly; }
+ bool iAmPotentiallyHot() const { return myConfig().potentiallyHot(); }
+ protected:
+ Member *_self;
+ private:
+ List1<Member> _members; /* all members of the set EXCEPT self. */
+ public:
+ unsigned selfId() const { return _self->id(); }
+ Manager *mgr;
+ private:
+ Member* head() const { return _members.head(); }
+ public:
+ const Member* findById(unsigned id) const;
+ private:
+ void _getTargets(list<Target>&, int &configVersion);
+ void getTargets(list<Target>&, int &configVersion);
+ void startThreads();
+ friend class FeedbackThread;
+ friend class CmdReplSetElect;
+ friend class Member;
+ friend class Manager;
+ friend class Consensus;
+ private:
+ /* pulling data from primary related - see rs_sync.cpp */
+ bool initialSyncOplogApplication(string hn, const Member *primary, OpTime applyGTE, OpTime minValid);
+ void _syncDoInitialSync();
+ void syncDoInitialSync();
+ void _syncThread();
+ void syncTail();
+ void syncApply(const BSONObj &o);
+ void syncRollback(OplogReader& r);
+ void syncFixUp(HowToFixUp& h, OplogReader& r);
+ public:
+ void syncThread();
+ };
+ class ReplSet : public ReplSetImpl {
+ public:
+ ReplSet(ReplSetCmdline& replSetCmdline) : ReplSetImpl(replSetCmdline) { }
+ bool stepDown() { return _stepDown(); }
+ string selfFullName() {
+ lock lk(this);
+ return _self->fullName();
+ }
+ /* call after constructing to start - returns fairly quickly after la[unching its threads */
+ void go() { _go(); }
+ void fatal() { _fatal(); }
+ bool isPrimary();
+ bool isSecondary();
+ MemberState state() const { return ReplSetImpl::state(); }
+ string name() const { return ReplSetImpl::name(); }
+ const ReplSetConfig& config() { return ReplSetImpl::config(); }
+ void getOplogDiagsAsHtml(unsigned server_id, stringstream& ss) const { _getOplogDiagsAsHtml(server_id,ss); }
+ void summarizeAsHtml(stringstream& ss) const { _summarizeAsHtml(ss); }
+ void summarizeStatus(BSONObjBuilder& b) const { _summarizeStatus(b); }
+ void fillIsMaster(BSONObjBuilder& b) { _fillIsMaster(b); }
+ /* we have a new config (reconfig) - apply it.
+ @param comment write a no-op comment to the oplog about it. only makes sense if one is primary and initiating the reconf.
+ */
+ void haveNewConfig(ReplSetConfig& c, bool comment);
+ /* if we delete old configs, this needs to assure locking. currently we don't so it is ok. */
+ const ReplSetConfig& getConfig() { return config(); }
+ bool lockedByMe() { return RSBase::lockedByMe(); }
+ // heartbeat msg to send to others; descriptive diagnostic info
+ string hbmsg() const { return _hbmsg; }
+ };
+ /** base class for repl set commands. checks basic things such as in rs mode before the command
+ does its real work
+ */
+ class ReplSetCommand : public Command {
+ protected:
+ ReplSetCommand(const char * s, bool show=false) : Command(s, show) { }
+ virtual bool slaveOk() const { return true; }
+ virtual bool adminOnly() const { return true; }
+ virtual bool logTheOp() { return false; }
+ virtual LockType locktype() const { return NONE; }
+ virtual void help( stringstream &help ) const { help << "internal"; }
+ bool check(string& errmsg, BSONObjBuilder& result) {
+ if( !replSet ) {
+ errmsg = "not running with --replSet";
+ return false;
+ }
+ if( theReplSet == 0 ) {
+ result.append("startupStatus", ReplSet::startupStatus);
+ errmsg = ReplSet::startupStatusMsg.empty() ? "replset unknown error 2" : ReplSet::startupStatusMsg;
+ return false;
+ }
+ return true;
+ }
+ };
+ /** inlines ----------------- */
+ inline Member::Member(HostAndPort h, unsigned ord, const ReplSetConfig::MemberCfg *c, bool self) :
+ _config(c), _h(h), _hbinfo(ord) {
+ if( self ) {
+ = 1.0;
+ }
+ }
+ inline bool ReplSet::isPrimary() {
+ /* todo replset */
+ return box.getState().primary();
+ }
+ inline bool ReplSet::isSecondary() {
+ return box.getState().secondary();
+ }