diff options
Diffstat (limited to 'db/repl')
-rw-r--r-- | db/repl/connections.h | 25 | ||||
-rw-r--r-- | db/repl/consensus.cpp | 15 | ||||
-rw-r--r-- | db/repl/health.cpp | 6 | ||||
-rw-r--r-- | db/repl/heartbeat.cpp | 2 | ||||
-rw-r--r-- | db/repl/multicmd.h | 2 | ||||
-rw-r--r-- | db/repl/rs_config.cpp | 34 | ||||
-rw-r--r-- | db/repl/rs_member.h | 6 | ||||
-rw-r--r-- | db/repl/rs_sync.cpp | 9 |
8 files changed, 76 insertions, 23 deletions
diff --git a/db/repl/connections.h b/db/repl/connections.h index 95defe4..cdf2fad 100644 --- a/db/repl/connections.h +++ b/db/repl/connections.h @@ -44,19 +44,36 @@ namespace mongo { /** throws assertions if connect failure etc. */ ScopedConn(string hostport); ~ScopedConn(); - DBClientConnection* operator->(); + + /* If we were to run a query and not exhaust the cursor, future use of the connection would be problematic. + So here what we do is wrapper known safe methods and not allow cursor-style queries at all. This makes + ScopedConn limited in functionality but very safe. More non-cursor wrappers can be added here if needed. + */ + + bool runCommand(const string &dbname, const BSONObj& cmd, BSONObj &info, int options=0) { + return conn()->runCommand(dbname, cmd, info, options); + } + unsigned long long count(const string &ns) { + return conn()->count(ns); + } + BSONObj findOne(const string &ns, const Query& q, const BSONObj *fieldsToReturn = 0, int queryOptions = 0) { + return conn()->findOne(ns, q, fieldsToReturn, queryOptions); + } + private: auto_ptr<scoped_lock> connLock; static mutex mapMutex; struct X { mutex z; DBClientConnection cc; - X() : z("X"), cc(/*reconnect*/true, 0, /*timeout*/10) { + X() : z("X"), cc(/*reconnect*/ true, 0, + /*timeout*/ theReplSet ? theReplSet->config().ho.heartbeatTimeoutMillis/1000.0 : 10.0) { cc._logLevel = 2; } } *x; typedef map<string,ScopedConn::X*> M; static M& _map; + DBClientConnection* conn() { return &x->cc; } }; inline ScopedConn::ScopedConn(string hostport) { @@ -84,8 +101,8 @@ namespace mongo { // conLock releases... } - inline DBClientConnection* ScopedConn::operator->() { + /*inline DBClientConnection* ScopedConn::operator->() { return &x->cc; - } + }*/ } diff --git a/db/repl/consensus.cpp b/db/repl/consensus.cpp index 4044538..1519c26 100644 --- a/db/repl/consensus.cpp +++ b/db/repl/consensus.cpp @@ -134,6 +134,9 @@ namespace mongo { OID round = cmd["round"].OID(); int myver = rs.config().version; + const Member* primary = rs.box.getPrimary(); + const Member* hopeful = rs.findById(whoid); + int vote = 0; if( set != rs.name() ) { log() << "replSet error received an elect request for '" << set << "' but our set name is '" << rs.name() << "'" << rsLog; @@ -147,6 +150,16 @@ namespace mongo { log() << "replSet info got stale version # during election" << rsLog; vote = -10000; } + else if( !hopeful ) { + log() << "couldn't find member with id " << whoid << rsLog; + vote = -10000; + } + else if( primary && primary->hbinfo().opTime > hopeful->hbinfo().opTime ) { + // other members might be aware of more up-to-date nodes + log() << hopeful->fullName() << " is trying to elect itself but " << + primary->fullName() << " is already primary and more up-to-date" << rsLog; + vote = -10000; + } else { try { vote = yea(whoid); @@ -165,7 +178,7 @@ namespace mongo { void ReplSetImpl::_getTargets(list<Target>& L, int& configVersion) { configVersion = config().version; for( Member *m = head(); m; m=m->next() ) - if( m->hbinfo().up() ) + if( m->hbinfo().maybeUp() ) L.push_back( Target(m->fullName()) ); } diff --git a/db/repl/health.cpp b/db/repl/health.cpp index 72396fe..c75221c 100644 --- a/db/repl/health.cpp +++ b/db/repl/health.cpp @@ -19,6 +19,7 @@ #include "health.h" #include "../../util/background.h" #include "../../client/dbclient.h" +#include "../../client/connpool.h" #include "../commands.h" #include "../../util/concurrency/value.h" #include "../../util/concurrency/task.h" @@ -186,7 +187,7 @@ namespace mongo { //const bo fields = BSON( "o" << false << "o2" << false ); const bo fields; - ScopedConn conn(m->fullName()); + ScopedDbConnection conn(m->fullName()); auto_ptr<DBClientCursor> c = conn->query(rsoplog, Query().sort("$natural",1), 20, 0, &fields); if( c.get() == 0 ) { @@ -245,8 +246,6 @@ namespace mongo { ss << _table(); ss << p(time_t_to_String_short(time(0)) + " current time"); - //ss << "</pre>\n"; - if( !otEnd.isNull() ) { ss << "<p>Log length in time: "; unsigned d = otEnd.getSecs() - otFirst.getSecs(); @@ -259,6 +258,7 @@ namespace mongo { ss << "</p>\n"; } + conn.done(); } void ReplSetImpl::_summarizeAsHtml(stringstream& s) const { diff --git a/db/repl/heartbeat.cpp b/db/repl/heartbeat.cpp index 4f28897..b39fad7 100644 --- a/db/repl/heartbeat.cpp +++ b/db/repl/heartbeat.cpp @@ -134,7 +134,7 @@ namespace mongo { assert( theReplSet == 0 || !theReplSet->lockedByMe() ); ScopedConn conn(memberFullName); - return conn->runCommand("admin", cmd, result); + return conn.runCommand("admin", cmd, result, 0); } /* poll every other set member to check its status */ diff --git a/db/repl/multicmd.h b/db/repl/multicmd.h index 61c9b5f..9eb9a17 100644 --- a/db/repl/multicmd.h +++ b/db/repl/multicmd.h @@ -43,7 +43,7 @@ namespace mongo { void run() { try { ScopedConn c(d.toHost); - d.ok = c->runCommand("admin", cmd, d.result); + d.ok = c.runCommand("admin", cmd, d.result); } catch(DBException&) { DEV log() << "dev caught dbexception on multiCommand " << d.toHost << rsLog; diff --git a/db/repl/rs_config.cpp b/db/repl/rs_config.cpp index 85c9a46..371507d 100644 --- a/db/repl/rs_config.cpp +++ b/db/repl/rs_config.cpp @@ -302,9 +302,8 @@ namespace mongo { clear(); int level = 2; DEV level = 0; - //log(0) << "replSet load config from: " << h.toString() << rsLog; - auto_ptr<DBClientCursor> c; + BSONObj cfg; int v = -5; try { if( h.isSelf() ) { @@ -337,13 +336,28 @@ namespace mongo { } v = -4; - ScopedConn conn(h.toString()); - v = -3; - c = conn->query(rsConfigNs); - if( c.get() == 0 ) { - version = v; return; + unsigned long long count = 0; + try { + ScopedConn conn(h.toString()); + v = -3; + cfg = conn.findOne(rsConfigNs, Query()).getOwned(); + count = conn.count(rsConfigNs); + } + catch ( DBException& e) { + if ( !h.isSelf() ) { + throw; + } + + // on startup, socket is not listening yet + DBDirectClient cli; + cfg = cli.findOne( rsConfigNs, Query() ).getOwned(); + count = cli.count(rsConfigNs); } - if( !c->more() ) { + + if( count > 1 ) + uasserted(13109, str::stream() << "multiple rows in " << rsConfigNs << " not supported host: " << h.toString()); + + if( cfg.isEmpty() ) { version = EMPTYCONFIG; return; } @@ -355,9 +369,7 @@ namespace mongo { return; } - BSONObj o = c->nextSafe(); - uassert(13109, "multiple rows in " + rsConfigNs + " not supported", !c->more()); - from(o); + from(cfg); checkRsConfig(); _ok = true; log(level) << "replSet load config ok from " << (h.isSelf() ? "self" : h.toString()) << rsLog; diff --git a/db/repl/rs_member.h b/db/repl/rs_member.h index 6a797b5..099cb22 100644 --- a/db/repl/rs_member.h +++ b/db/repl/rs_member.h @@ -67,7 +67,6 @@ namespace mongo { public: HeartbeatInfo() : _id(0xffffffff),hbstate(MemberState::RS_UNKNOWN),health(-1.0),downSince(0),skew(INT_MIN) { } HeartbeatInfo(unsigned id); - bool up() const { return health > 0; } unsigned id() const { return _id; } MemberState hbstate; double health; @@ -78,6 +77,11 @@ namespace mongo { OpTime opTime; int skew; + bool up() const { return health > 0; } + + /** health is set to -1 on startup. that means we haven't even checked yet. 0 means we checked and it failed. */ + bool maybeUp() const { return health != 0; } + long long timeDown() const; // ms /* true if changed in a way of interest to the repl set manager. */ diff --git a/db/repl/rs_sync.cpp b/db/repl/rs_sync.cpp index 9ea65cf..9de3f60 100644 --- a/db/repl/rs_sync.cpp +++ b/db/repl/rs_sync.cpp @@ -70,7 +70,14 @@ namespace mongo { return false; } - r.query(rsoplog, bo()); + { + BSONObjBuilder q; + q.appendDate("$gte", applyGTE.asDate()); + BSONObjBuilder query; + query.append("ts", q.done()); + BSONObj queryObj = query.done(); + r.query(rsoplog, queryObj); + } assert( r.haveCursor() ); /* we lock outside the loop to avoid the overhead of locking on every operation. server isn't usable yet anyway! */ |