summaryrefslogtreecommitdiff
path: root/db/repl
diff options
context:
space:
mode:
Diffstat (limited to 'db/repl')
-rw-r--r--db/repl/connections.h25
-rw-r--r--db/repl/consensus.cpp15
-rw-r--r--db/repl/health.cpp6
-rw-r--r--db/repl/heartbeat.cpp2
-rw-r--r--db/repl/multicmd.h2
-rw-r--r--db/repl/rs_config.cpp34
-rw-r--r--db/repl/rs_member.h6
-rw-r--r--db/repl/rs_sync.cpp9
8 files changed, 76 insertions, 23 deletions
diff --git a/db/repl/connections.h b/db/repl/connections.h
index 95defe4..cdf2fad 100644
--- a/db/repl/connections.h
+++ b/db/repl/connections.h
@@ -44,19 +44,36 @@ namespace mongo {
/** throws assertions if connect failure etc. */
ScopedConn(string hostport);
~ScopedConn();
- DBClientConnection* operator->();
+
+ /* If we were to run a query and not exhaust the cursor, future use of the connection would be problematic.
+ So here what we do is wrapper known safe methods and not allow cursor-style queries at all. This makes
+ ScopedConn limited in functionality but very safe. More non-cursor wrappers can be added here if needed.
+ */
+
+ bool runCommand(const string &dbname, const BSONObj& cmd, BSONObj &info, int options=0) {
+ return conn()->runCommand(dbname, cmd, info, options);
+ }
+ unsigned long long count(const string &ns) {
+ return conn()->count(ns);
+ }
+ BSONObj findOne(const string &ns, const Query& q, const BSONObj *fieldsToReturn = 0, int queryOptions = 0) {
+ return conn()->findOne(ns, q, fieldsToReturn, queryOptions);
+ }
+
private:
auto_ptr<scoped_lock> connLock;
static mutex mapMutex;
struct X {
mutex z;
DBClientConnection cc;
- X() : z("X"), cc(/*reconnect*/true, 0, /*timeout*/10) {
+ X() : z("X"), cc(/*reconnect*/ true, 0,
+ /*timeout*/ theReplSet ? theReplSet->config().ho.heartbeatTimeoutMillis/1000.0 : 10.0) {
cc._logLevel = 2;
}
} *x;
typedef map<string,ScopedConn::X*> M;
static M& _map;
+ DBClientConnection* conn() { return &x->cc; }
};
inline ScopedConn::ScopedConn(string hostport) {
@@ -84,8 +101,8 @@ namespace mongo {
// conLock releases...
}
- inline DBClientConnection* ScopedConn::operator->() {
+ /*inline DBClientConnection* ScopedConn::operator->() {
return &x->cc;
- }
+ }*/
}
diff --git a/db/repl/consensus.cpp b/db/repl/consensus.cpp
index 4044538..1519c26 100644
--- a/db/repl/consensus.cpp
+++ b/db/repl/consensus.cpp
@@ -134,6 +134,9 @@ namespace mongo {
OID round = cmd["round"].OID();
int myver = rs.config().version;
+ const Member* primary = rs.box.getPrimary();
+ const Member* hopeful = rs.findById(whoid);
+
int vote = 0;
if( set != rs.name() ) {
log() << "replSet error received an elect request for '" << set << "' but our set name is '" << rs.name() << "'" << rsLog;
@@ -147,6 +150,16 @@ namespace mongo {
log() << "replSet info got stale version # during election" << rsLog;
vote = -10000;
}
+ else if( !hopeful ) {
+ log() << "couldn't find member with id " << whoid << rsLog;
+ vote = -10000;
+ }
+ else if( primary && primary->hbinfo().opTime > hopeful->hbinfo().opTime ) {
+ // other members might be aware of more up-to-date nodes
+ log() << hopeful->fullName() << " is trying to elect itself but " <<
+ primary->fullName() << " is already primary and more up-to-date" << rsLog;
+ vote = -10000;
+ }
else {
try {
vote = yea(whoid);
@@ -165,7 +178,7 @@ namespace mongo {
void ReplSetImpl::_getTargets(list<Target>& L, int& configVersion) {
configVersion = config().version;
for( Member *m = head(); m; m=m->next() )
- if( m->hbinfo().up() )
+ if( m->hbinfo().maybeUp() )
L.push_back( Target(m->fullName()) );
}
diff --git a/db/repl/health.cpp b/db/repl/health.cpp
index 72396fe..c75221c 100644
--- a/db/repl/health.cpp
+++ b/db/repl/health.cpp
@@ -19,6 +19,7 @@
#include "health.h"
#include "../../util/background.h"
#include "../../client/dbclient.h"
+#include "../../client/connpool.h"
#include "../commands.h"
#include "../../util/concurrency/value.h"
#include "../../util/concurrency/task.h"
@@ -186,7 +187,7 @@ namespace mongo {
//const bo fields = BSON( "o" << false << "o2" << false );
const bo fields;
- ScopedConn conn(m->fullName());
+ ScopedDbConnection conn(m->fullName());
auto_ptr<DBClientCursor> c = conn->query(rsoplog, Query().sort("$natural",1), 20, 0, &fields);
if( c.get() == 0 ) {
@@ -245,8 +246,6 @@ namespace mongo {
ss << _table();
ss << p(time_t_to_String_short(time(0)) + " current time");
- //ss << "</pre>\n";
-
if( !otEnd.isNull() ) {
ss << "<p>Log length in time: ";
unsigned d = otEnd.getSecs() - otFirst.getSecs();
@@ -259,6 +258,7 @@ namespace mongo {
ss << "</p>\n";
}
+ conn.done();
}
void ReplSetImpl::_summarizeAsHtml(stringstream& s) const {
diff --git a/db/repl/heartbeat.cpp b/db/repl/heartbeat.cpp
index 4f28897..b39fad7 100644
--- a/db/repl/heartbeat.cpp
+++ b/db/repl/heartbeat.cpp
@@ -134,7 +134,7 @@ namespace mongo {
assert( theReplSet == 0 || !theReplSet->lockedByMe() );
ScopedConn conn(memberFullName);
- return conn->runCommand("admin", cmd, result);
+ return conn.runCommand("admin", cmd, result, 0);
}
/* poll every other set member to check its status */
diff --git a/db/repl/multicmd.h b/db/repl/multicmd.h
index 61c9b5f..9eb9a17 100644
--- a/db/repl/multicmd.h
+++ b/db/repl/multicmd.h
@@ -43,7 +43,7 @@ namespace mongo {
void run() {
try {
ScopedConn c(d.toHost);
- d.ok = c->runCommand("admin", cmd, d.result);
+ d.ok = c.runCommand("admin", cmd, d.result);
}
catch(DBException&) {
DEV log() << "dev caught dbexception on multiCommand " << d.toHost << rsLog;
diff --git a/db/repl/rs_config.cpp b/db/repl/rs_config.cpp
index 85c9a46..371507d 100644
--- a/db/repl/rs_config.cpp
+++ b/db/repl/rs_config.cpp
@@ -302,9 +302,8 @@ namespace mongo {
clear();
int level = 2;
DEV level = 0;
- //log(0) << "replSet load config from: " << h.toString() << rsLog;
- auto_ptr<DBClientCursor> c;
+ BSONObj cfg;
int v = -5;
try {
if( h.isSelf() ) {
@@ -337,13 +336,28 @@ namespace mongo {
}
v = -4;
- ScopedConn conn(h.toString());
- v = -3;
- c = conn->query(rsConfigNs);
- if( c.get() == 0 ) {
- version = v; return;
+ unsigned long long count = 0;
+ try {
+ ScopedConn conn(h.toString());
+ v = -3;
+ cfg = conn.findOne(rsConfigNs, Query()).getOwned();
+ count = conn.count(rsConfigNs);
+ }
+ catch ( DBException& e) {
+ if ( !h.isSelf() ) {
+ throw;
+ }
+
+ // on startup, socket is not listening yet
+ DBDirectClient cli;
+ cfg = cli.findOne( rsConfigNs, Query() ).getOwned();
+ count = cli.count(rsConfigNs);
}
- if( !c->more() ) {
+
+ if( count > 1 )
+ uasserted(13109, str::stream() << "multiple rows in " << rsConfigNs << " not supported host: " << h.toString());
+
+ if( cfg.isEmpty() ) {
version = EMPTYCONFIG;
return;
}
@@ -355,9 +369,7 @@ namespace mongo {
return;
}
- BSONObj o = c->nextSafe();
- uassert(13109, "multiple rows in " + rsConfigNs + " not supported", !c->more());
- from(o);
+ from(cfg);
checkRsConfig();
_ok = true;
log(level) << "replSet load config ok from " << (h.isSelf() ? "self" : h.toString()) << rsLog;
diff --git a/db/repl/rs_member.h b/db/repl/rs_member.h
index 6a797b5..099cb22 100644
--- a/db/repl/rs_member.h
+++ b/db/repl/rs_member.h
@@ -67,7 +67,6 @@ namespace mongo {
public:
HeartbeatInfo() : _id(0xffffffff),hbstate(MemberState::RS_UNKNOWN),health(-1.0),downSince(0),skew(INT_MIN) { }
HeartbeatInfo(unsigned id);
- bool up() const { return health > 0; }
unsigned id() const { return _id; }
MemberState hbstate;
double health;
@@ -78,6 +77,11 @@ namespace mongo {
OpTime opTime;
int skew;
+ bool up() const { return health > 0; }
+
+ /** health is set to -1 on startup. that means we haven't even checked yet. 0 means we checked and it failed. */
+ bool maybeUp() const { return health != 0; }
+
long long timeDown() const; // ms
/* true if changed in a way of interest to the repl set manager. */
diff --git a/db/repl/rs_sync.cpp b/db/repl/rs_sync.cpp
index 9ea65cf..9de3f60 100644
--- a/db/repl/rs_sync.cpp
+++ b/db/repl/rs_sync.cpp
@@ -70,7 +70,14 @@ namespace mongo {
return false;
}
- r.query(rsoplog, bo());
+ {
+ BSONObjBuilder q;
+ q.appendDate("$gte", applyGTE.asDate());
+ BSONObjBuilder query;
+ query.append("ts", q.done());
+ BSONObj queryObj = query.done();
+ r.query(rsoplog, queryObj);
+ }
assert( r.haveCursor() );
/* we lock outside the loop to avoid the overhead of locking on every operation. server isn't usable yet anyway! */