summaryrefslogtreecommitdiff
path: root/db
diff options
context:
space:
mode:
Diffstat (limited to 'db')
-rw-r--r--db/cloner.cpp6
-rw-r--r--db/commands/isself.cpp6
-rw-r--r--db/commands/mr.cpp1
-rw-r--r--db/db.cpp6
-rw-r--r--db/dbcommands.cpp12
-rw-r--r--db/dbcommands_generic.cpp3
-rw-r--r--db/dbhelpers.cpp1
-rw-r--r--db/geo/2d.cpp5
-rw-r--r--db/instance.cpp9
-rw-r--r--db/jsobj.cpp15
-rw-r--r--db/oplog.cpp80
-rw-r--r--db/oplog.h8
-rw-r--r--db/ops/update.cpp3
-rw-r--r--db/queryutil.h2
-rw-r--r--db/record.cpp9
-rw-r--r--db/repl.cpp48
-rw-r--r--db/repl.h4
-rw-r--r--db/repl/connections.h46
-rw-r--r--db/repl/health.cpp5
-rw-r--r--db/repl/heartbeat.cpp229
-rw-r--r--db/repl/manager.cpp36
-rw-r--r--db/repl/rs.cpp1
-rw-r--r--db/repl/rs.h8
-rw-r--r--db/repl/rs_config.h16
-rw-r--r--db/repl/rs_initialsync.cpp5
-rw-r--r--db/repl/rs_member.h6
-rw-r--r--db/repl/rs_sync.cpp60
27 files changed, 456 insertions, 174 deletions
diff --git a/db/cloner.cpp b/db/cloner.cpp
index 8956133..f13ea52 100644
--- a/db/cloner.cpp
+++ b/db/cloner.cpp
@@ -83,6 +83,12 @@ namespace mongo {
BSONElement e = i.next();
if ( e.eoo() )
break;
+
+ // for now, skip the "v" field so that v:0 indexes will be upgraded to v:1
+ if ( string("v") == e.fieldName() ) {
+ continue;
+ }
+
if ( string("ns") == e.fieldName() ) {
uassert( 10024 , "bad ns field for index during dbcopy", e.type() == String);
const char *p = strchr(e.valuestr(), '.');
diff --git a/db/commands/isself.cpp b/db/commands/isself.cpp
index 5a868de..7b1cea4 100644
--- a/db/commands/isself.cpp
+++ b/db/commands/isself.cpp
@@ -4,6 +4,7 @@
#include "../../util/net/listen.h"
#include "../commands.h"
#include "../../client/dbclient.h"
+#include "../security.h"
#ifndef _WIN32
# ifndef __sunos__
@@ -211,6 +212,11 @@ namespace mongo {
return false;
}
+ if (!noauth && cmdLine.keyFile &&
+ !conn.auth("local", internalSecurity.user, internalSecurity.pwd, errmsg, false)) {
+ return false;
+ }
+
BSONObj out;
bool ok = conn.simpleCommand( "admin" , &out , "_isSelf" );
diff --git a/db/commands/mr.cpp b/db/commands/mr.cpp
index 56e9770..b79e62b 100644
--- a/db/commands/mr.cpp
+++ b/db/commands/mr.cpp
@@ -1119,6 +1119,7 @@ namespace mongo {
virtual LockType locktype() const { return NONE; }
bool run(const string& dbname , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool) {
+ ShardedConnectionInfo::addHook();
string shardedOutputCollection = cmdObj["shardedOutputCollection"].valuestrsafe();
string postProcessCollection = cmdObj["postProcessCollection"].valuestrsafe();
bool postProcessOnly = !(postProcessCollection.empty());
diff --git a/db/db.cpp b/db/db.cpp
index e6281d7..b1d1db8 100644
--- a/db/db.cpp
+++ b/db/db.cpp
@@ -708,6 +708,12 @@ int main(int argc, char* argv[]) {
else {
dbpath = "/data/db/";
}
+#ifdef _WIN32
+ if (dbpath.size() > 1 && dbpath[dbpath.size()-1] == '/') {
+ // size() check is for the unlikely possibility of --dbpath "/"
+ dbpath = dbpath.erase(dbpath.size()-1);
+ }
+#endif
if ( params.count("directoryperdb")) {
directoryperdb = true;
diff --git a/db/dbcommands.cpp b/db/dbcommands.cpp
index 31f4b7f..b2e6218 100644
--- a/db/dbcommands.cpp
+++ b/db/dbcommands.cpp
@@ -510,9 +510,11 @@ namespace mongo {
t.appendNumber( "mappedWithJournal" , m );
}
- if( v - m > 5000 ) {
+ int overhead = v - m - connTicketHolder.used();
+
+ if( overhead > 4000 ) {
t.append("note", "virtual minus mapped is large. could indicate a memory leak");
- log() << "warning: virtual size (" << v << "MB) - mapped size (" << m << "MB) is large. could indicate a memory leak" << endl;
+ log() << "warning: virtual size (" << v << "MB) - mapped size (" << m << "MB) is large (" << overhead << "MB). could indicate a memory leak" << endl;
}
t.done();
@@ -949,7 +951,7 @@ namespace mongo {
}
list<BSONObj> all;
- auto_ptr<DBClientCursor> i = db.getIndexes( toDeleteNs );
+ auto_ptr<DBClientCursor> i = db.query( dbname + ".system.indexes" , BSON( "ns" << toDeleteNs ) , 0 , 0 , 0 , QueryOption_SlaveOk );
BSONObjBuilder b;
while ( i->more() ) {
BSONObj o = i->next().removeField("v").getOwned();
@@ -1104,6 +1106,10 @@ namespace mongo {
BSONObj sort = BSON( "files_id" << 1 << "n" << 1 );
shared_ptr<Cursor> cursor = bestGuessCursor(ns.c_str(), query, sort);
+ if ( ! cursor ) {
+ errmsg = "need an index on { files_id : 1 , n : 1 }";
+ return false;
+ }
auto_ptr<ClientCursor> cc (new ClientCursor(QueryOption_NoCursorTimeout, cursor, ns.c_str()));
int n = 0;
diff --git a/db/dbcommands_generic.cpp b/db/dbcommands_generic.cpp
index 69b51c7..22cee22 100644
--- a/db/dbcommands_generic.cpp
+++ b/db/dbcommands_generic.cpp
@@ -51,7 +51,8 @@ namespace mongo {
public:
CmdBuildInfo() : Command( "buildInfo", true, "buildinfo" ) {}
virtual bool slaveOk() const { return true; }
- virtual bool adminOnly() const { return true; }
+ virtual bool adminOnly() const { return false; }
+ virtual bool requiresAuth() { return false; }
virtual LockType locktype() const { return NONE; }
virtual void help( stringstream &help ) const {
help << "get version #, etc.\n";
diff --git a/db/dbhelpers.cpp b/db/dbhelpers.cpp
index cc4fdba..33ac9b7 100644
--- a/db/dbhelpers.cpp
+++ b/db/dbhelpers.cpp
@@ -157,6 +157,7 @@ namespace mongo {
}
DiskLoc Helpers::findById(NamespaceDetails *d, BSONObj idquery) {
+ assert(d);
int idxNo = d->findIdIndex();
uassert(13430, "no _id index", idxNo>=0);
IndexDetails& i = d->idx( idxNo );
diff --git a/db/geo/2d.cpp b/db/geo/2d.cpp
index b873490..40df5e2 100644
--- a/db/geo/2d.cpp
+++ b/db/geo/2d.cpp
@@ -2647,7 +2647,10 @@ namespace mongo {
BSONObjBuilder bb( arr.subobjStart( BSONObjBuilder::numStr( x++ ) ) );
bb.append( "dis" , dis );
- if( includeLocs ) bb.append( "loc" , p._pt );
+ if( includeLocs ){
+ if( p._pt.couldBeArray() ) bb.append( "loc", BSONArray( p._pt ) );
+ else bb.append( "loc" , p._pt );
+ }
bb.append( "obj" , p._o );
bb.done();
}
diff --git a/db/instance.cpp b/db/instance.cpp
index 6727867..764571d 100644
--- a/db/instance.cpp
+++ b/db/instance.cpp
@@ -353,20 +353,19 @@ namespace mongo {
}
currentOp.ensureStarted();
currentOp.done();
- int ms = currentOp.totalTimeMillis();
+ debug.executionTime = currentOp.totalTimeMillis();
//DEV log = true;
- if ( log || ms > logThreshold ) {
- if( logLevel < 3 && op == dbGetMore && strstr(ns, ".oplog.") && ms < 4300 && !log ) {
+ if ( log || debug.executionTime > logThreshold ) {
+ if( logLevel < 3 && op == dbGetMore && strstr(ns, ".oplog.") && debug.executionTime < 4300 && !log ) {
/* it's normal for getMore on the oplog to be slow because of use of awaitdata flag. */
}
else {
- debug.executionTime = ms;
mongo::tlog() << debug << endl;
}
}
- if ( currentOp.shouldDBProfile( ms ) ) {
+ if ( currentOp.shouldDBProfile( debug.executionTime ) ) {
// performance profiling is on
if ( dbMutex.getState() < 0 ) {
mongo::log(1) << "note: not profiling because recursive read lock" << endl;
diff --git a/db/jsobj.cpp b/db/jsobj.cpp
index dcb7744..9644a87 100644
--- a/db/jsobj.cpp
+++ b/db/jsobj.cpp
@@ -753,6 +753,21 @@ namespace mongo {
return n;
}
+ bool BSONObj::couldBeArray() const {
+ BSONObjIterator i( *this );
+ int index = 0;
+ while( i.moreWithEOO() ){
+ BSONElement e = i.next();
+ if( e.eoo() ) break;
+
+ // TODO: If actually important, may be able to do int->char* much faster
+ if( strcmp( e.fieldName(), ((string)( str::stream() << index )).c_str() ) != 0 )
+ return false;
+ index++;
+ }
+ return true;
+ }
+
BSONObj BSONObj::clientReadable() const {
BSONObjBuilder b;
BSONObjIterator i( *this );
diff --git a/db/oplog.cpp b/db/oplog.cpp
index dc9db76..5c1671c 100644
--- a/db/oplog.cpp
+++ b/db/oplog.cpp
@@ -625,9 +625,47 @@ namespace mongo {
}
}
- void applyOperation_inlock(const BSONObj& op , bool fromRepl ) {
+ bool shouldRetry(const BSONObj& o, const string& hn) {
+ OplogReader missingObjReader;
+
+ // we don't have the object yet, which is possible on initial sync. get it.
+ log() << "replication info adding missing object" << endl; // rare enough we can log
+ uassert(15916, str::stream() << "Can no longer connect to initial sync source: " << hn, missingObjReader.connect(hn));
+
+ const char *ns = o.getStringField("ns");
+ // might be more than just _id in the update criteria
+ BSONObj query = BSONObjBuilder().append(o.getObjectField("o2")["_id"]).obj();
+ BSONObj missingObj;
+ try {
+ missingObj = missingObjReader.findOne(ns, query);
+ } catch(DBException& e) {
+ log() << "replication assertion fetching missing object: " << e.what() << endl;
+ throw;
+ }
+
+ if( missingObj.isEmpty() ) {
+ log() << "replication missing object not found on source. presumably deleted later in oplog" << endl;
+ log() << "replication o2: " << o.getObjectField("o2").toString() << endl;
+ log() << "replication o firstfield: " << o.getObjectField("o").firstElementFieldName() << endl;
+
+ return false;
+ }
+ else {
+ Client::Context ctx(ns);
+ DiskLoc d = theDataFileMgr.insert(ns, (void*) missingObj.objdata(), missingObj.objsize());
+ uassert(15917, "Got bad disk location when attempting to insert", !d.isNull());
+
+ return true;
+ }
+ }
+
+ /** @param fromRepl false if from ApplyOpsCmd
+ @return true if was and update should have happened and the document DNE. see replset initial sync code.
+ */
+ bool applyOperation_inlock(const BSONObj& op , bool fromRepl ) {
assertInWriteLock();
LOG(6) << "applying op: " << op << endl;
+ bool failedUpdate = false;
OpCounters * opCounters = fromRepl ? &replOpCounters : &globalOpCounters;
@@ -680,9 +718,45 @@ namespace mongo {
}
else if ( *opType == 'u' ) {
opCounters->gotUpdate();
+ // dm do we create this for a capped collection?
+ // - if not, updates would be slow
+ // - but if were by id would be slow on primary too so maybe ok
+ // - if on primary was by another key and there are other indexes, this could be very bad w/out an index
+ // - if do create, odd to have on secondary but not primary. also can cause secondary to block for
+ // quite a while on creation.
RARELY ensureHaveIdIndex(ns); // otherwise updates will be super slow
OpDebug debug;
- updateObjects(ns, o, op.getObjectField("o2"), /*upsert*/ fields[3].booleanSafe(), /*multi*/ false, /*logop*/ false , debug );
+ BSONObj updateCriteria = op.getObjectField("o2");
+ bool upsert = fields[3].booleanSafe();
+ UpdateResult ur = updateObjects(ns, o, updateCriteria, upsert, /*multi*/ false, /*logop*/ false , debug );
+ if( ur.num == 0 ) {
+ if( ur.mod ) {
+ if( updateCriteria.nFields() == 1 ) {
+ // was a simple { _id : ... } update criteria
+ failedUpdate = true;
+ // todo: probably should assert in these failedUpdate cases if not in initialSync
+ }
+ // need to check to see if it isn't present so we can set failedUpdate correctly.
+ // note that adds some overhead for this extra check in some cases, such as an updateCriteria
+ // of the form
+ // { _id:..., { x : {$size:...} }
+ // thus this is not ideal.
+ else if( nsdetails(ns) == NULL || Helpers::findById(nsdetails(ns), updateCriteria).isNull() ) {
+ failedUpdate = true;
+ }
+ else {
+ // it's present; zero objects were updated because of additional specifiers in the query for idempotence
+ }
+ }
+ else {
+ // this could happen benignly on an oplog duplicate replay of an upsert
+ // (because we are idempotent),
+ // if an regular non-mod update fails the item is (presumably) missing.
+ if( !upsert ) {
+ failedUpdate = true;
+ }
+ }
+ }
}
else if ( *opType == 'd' ) {
opCounters->gotDelete();
@@ -703,7 +777,7 @@ namespace mongo {
else {
throw MsgAssertionException( 14825 , ErrorMsg("error in applyOperation : unknown opType ", *opType) );
}
-
+ return failedUpdate;
}
class ApplyOpsCmd : public Command {
diff --git a/db/oplog.h b/db/oplog.h
index 2f2b286..769dd75 100644
--- a/db/oplog.h
+++ b/db/oplog.h
@@ -129,6 +129,12 @@ namespace mongo {
* take an op and apply locally
* used for applying from an oplog
* @param fromRepl really from replication or for testing/internal/command/etc...
+ * Returns if the op was an update that could not be applied (true on failure)
*/
- void applyOperation_inlock(const BSONObj& op , bool fromRepl = true );
+ bool applyOperation_inlock(const BSONObj& op , bool fromRepl = true );
+
+ /**
+ * If applyOperation_inlock should be called again after an update fails.
+ */
+ bool shouldRetry(const BSONObj& op , const string& hn);
}
diff --git a/db/ops/update.cpp b/db/ops/update.cpp
index fd9798a..6a7aad4 100644
--- a/db/ops/update.cpp
+++ b/db/ops/update.cpp
@@ -1354,7 +1354,8 @@ namespace mongo {
logOp( "i", ns, no );
return UpdateResult( 0 , 0 , 1 , no );
}
- return UpdateResult( 0 , 0 , 0 );
+
+ return UpdateResult( 0 , isOperatorUpdate , 0 );
}
UpdateResult updateObjects(const char *ns, const BSONObj& updateobj, BSONObj patternOrig, bool upsert, bool multi, bool logop , OpDebug& debug ) {
diff --git a/db/queryutil.h b/db/queryutil.h
index 104cde2..5d86194 100644
--- a/db/queryutil.h
+++ b/db/queryutil.h
@@ -328,7 +328,7 @@ namespace mongo {
bool matchesElement( const BSONElement &e, int i, bool direction ) const;
bool matchesKey( const BSONObj &key ) const;
vector<FieldRange> _ranges;
- const IndexSpec &_indexSpec;
+ IndexSpec _indexSpec;
int _direction;
vector<BSONObj> _queries; // make sure mem owned
friend class FieldRangeVectorIterator;
diff --git a/db/record.cpp b/db/record.cpp
index 51dc520..a8a3e43 100644
--- a/db/record.cpp
+++ b/db/record.cpp
@@ -112,7 +112,8 @@ namespace mongo {
class Rolling {
public:
- Rolling() {
+ Rolling()
+ : _lock( "ps::Rolling" ){
_curSlice = 0;
_lastRotate = Listener::getElapsedTimeMillis();
}
@@ -126,8 +127,8 @@ namespace mongo {
bool access( size_t region , short offset , bool doHalf ) {
int regionHash = hash(region);
- scoped_spinlock lk( _lock );
-
+ SimpleMutex::scoped_lock lk( _lock );
+
static int rarely_count = 0;
if ( rarely_count++ % 2048 == 0 ) {
long long now = Listener::getElapsedTimeMillis();
@@ -174,7 +175,7 @@ namespace mongo {
long long _lastRotate;
Slice _slices[NumSlices];
- SpinLock _lock;
+ SimpleMutex _lock;
} rolling;
}
diff --git a/db/repl.cpp b/db/repl.cpp
index a18d725..5edf0c2 100644
--- a/db/repl.cpp
+++ b/db/repl.cpp
@@ -508,12 +508,12 @@ namespace mongo {
return;
}
-
+
DatabaseIgnorer ___databaseIgnorer;
-
+
void DatabaseIgnorer::doIgnoreUntilAfter( const string &db, const OpTime &futureOplogTime ) {
if ( futureOplogTime > _ignores[ db ] ) {
- _ignores[ db ] = futureOplogTime;
+ _ignores[ db ] = futureOplogTime;
}
}
@@ -533,28 +533,28 @@ namespace mongo {
bool ReplSource::handleDuplicateDbName( const BSONObj &op, const char *ns, const char *db ) {
if ( dbHolder.isLoaded( ns, dbpath ) ) {
// Database is already present.
- return true;
+ return true;
}
BSONElement ts = op.getField( "ts" );
if ( ( ts.type() == Date || ts.type() == Timestamp ) && ___databaseIgnorer.ignoreAt( db, ts.date() ) ) {
// Database is ignored due to a previous indication that it is
// missing from master after optime "ts".
- return false;
+ return false;
}
if ( Database::duplicateUncasedName( db, dbpath ).empty() ) {
// No duplicate database names are present.
return true;
}
-
+
OpTime lastTime;
bool dbOk = false;
{
dbtemprelease release;
-
+
// We always log an operation after executing it (never before), so
// a database list will always be valid as of an oplog entry generated
// before it was retrieved.
-
+
BSONObj last = oplogReader.findOne( this->ns().c_str(), Query().sort( BSON( "$natural" << -1 ) ) );
if ( !last.isEmpty() ) {
BSONElement ts = last.getField( "ts" );
@@ -568,34 +568,34 @@ namespace mongo {
BSONObjIterator i( info.getField( "databases" ).embeddedObject() );
while( i.more() ) {
BSONElement e = i.next();
-
+
const char * name = e.embeddedObject().getField( "name" ).valuestr();
if ( strcasecmp( name, db ) != 0 )
continue;
-
+
if ( strcmp( name, db ) == 0 ) {
// The db exists on master, still need to check that no conflicts exist there.
dbOk = true;
continue;
}
-
+
// The master has a db name that conflicts with the requested name.
dbOk = false;
break;
}
}
-
+
if ( !dbOk ) {
___databaseIgnorer.doIgnoreUntilAfter( db, lastTime );
incompleteCloneDbs.erase(db);
addDbNextPass.erase(db);
- return false;
+ return false;
}
-
+
// Check for duplicates again, since we released the lock above.
set< string > duplicates;
Database::duplicateUncasedName( db, dbpath, &duplicates );
-
+
// The database is present on the master and no conflicting databases
// are present on the master. Drop any local conflicts.
for( set< string >::const_iterator i = duplicates.begin(); i != duplicates.end(); ++i ) {
@@ -605,7 +605,7 @@ namespace mongo {
Client::Context ctx(*i);
dropDatabase(*i);
}
-
+
massert( 14034, "Duplicate database names present after attempting to delete duplicates",
Database::duplicateUncasedName( db, dbpath ).empty() );
return true;
@@ -613,7 +613,11 @@ namespace mongo {
void ReplSource::applyOperation(const BSONObj& op) {
try {
- applyOperation_inlock( op );
+ bool failedUpdate = applyOperation_inlock( op );
+ if (failedUpdate && shouldRetry(op, hostName)) {
+ failedUpdate = applyOperation_inlock( op );
+ uassert(15914, "Failure retrying initial sync update", ! failedUpdate );
+ }
}
catch ( UserException& e ) {
log() << "sync: caught user assertion " << e << " while applying op: " << op << endl;;
@@ -705,9 +709,9 @@ namespace mongo {
}
if ( !handleDuplicateDbName( op, ns, clientName ) ) {
- return;
+ return;
}
-
+
Client::Context ctx( ns );
ctx.getClient()->curop()->reset();
@@ -943,7 +947,7 @@ namespace mongo {
}
// otherwise, break out of loop so we can set to completed or clone more dbs
}
-
+
if( oplogReader.awaitCapable() && tailing )
okResultCode = 0; // don't sleep
syncedTo = nextOpTime;
@@ -1077,7 +1081,7 @@ namespace mongo {
BSONObj me;
{
-
+
dblock l;
// local.me is an identifier for a server for getLastError w:2+
if ( ! Helpers::getSingleton( "local.me" , me ) ||
@@ -1123,7 +1127,7 @@ namespace mongo {
}
return true;
}
-
+
bool OplogReader::connect(string hostName) {
if (conn() != 0) {
return true;
diff --git a/db/repl.h b/db/repl.h
index 9791f14..635265b 100644
--- a/db/repl.h
+++ b/db/repl.h
@@ -122,11 +122,11 @@ namespace mongo {
* @return true iff an op with the specified ns may be applied.
*/
bool handleDuplicateDbName( const BSONObj &op, const char *ns, const char *db );
-
+
public:
OplogReader oplogReader;
- static void applyOperation(const BSONObj& op);
+ void applyOperation(const BSONObj& op);
string hostName; // ip addr or hostname plus optionally, ":<port>"
string _sourceName; // a logical source name.
string sourceName() const { return _sourceName.empty() ? "main" : _sourceName; }
diff --git a/db/repl/connections.h b/db/repl/connections.h
index 78cfb30..61c581b 100644
--- a/db/repl/connections.h
+++ b/db/repl/connections.h
@@ -47,6 +47,10 @@ namespace mongo {
~ScopedConn() {
// conLock releases...
}
+ void reconnect() {
+ conn()->port().shutdown();
+ connect();
+ }
/* If we were to run a query and not exhaust the cursor, future use of the connection would be problematic.
So here what we do is wrapper known safe methods and not allow cursor-style queries at all. This makes
@@ -61,9 +65,6 @@ namespace mongo {
BSONObj findOne(const string &ns, const Query& q, const BSONObj *fieldsToReturn = 0, int queryOptions = 0) {
return conn()->findOne(ns, q, fieldsToReturn, queryOptions);
}
- void setTimeout(double to) {
- conn()->setSoTimeout(to);
- }
private:
auto_ptr<scoped_lock> connLock;
@@ -78,15 +79,36 @@ namespace mongo {
typedef map<string,ScopedConn::X*> M;
static M& _map;
DBClientConnection* conn() { return &x->cc; }
+ const string _hostport;
+
+ // we should already be locked...
+ bool connect() {
+ string err;
+ if (!x->cc.connect(_hostport, err)) {
+ log() << "couldn't connect to " << _hostport << ": " << err << rsLog;
+ return false;
+ }
+
+ // if we cannot authenticate against a member, then either its key file
+ // or our key file has to change. if our key file has to change, we'll
+ // be rebooting. if their file has to change, they'll be rebooted so the
+ // connection created above will go dead, reconnect, and reauth.
+ if (!noauth && !x->cc.auth("local", internalSecurity.user, internalSecurity.pwd, err, false)) {
+ log() << "could not authenticate against " << _hostport << ", " << err << rsLog;
+ return false;
+ }
+
+ return true;
+ }
};
- inline ScopedConn::ScopedConn(string hostport) {
+ inline ScopedConn::ScopedConn(string hostport) : _hostport(hostport) {
bool first = false;
{
scoped_lock lk(mapMutex);
- x = _map[hostport];
+ x = _map[_hostport];
if( x == 0 ) {
- x = _map[hostport] = new X();
+ x = _map[_hostport] = new X();
first = true;
connLock.reset( new scoped_lock(x->z) );
}
@@ -96,17 +118,7 @@ namespace mongo {
return;
}
- // we already locked above...
- string err;
- if (!x->cc.connect(hostport, err)) {
- log() << "couldn't connect to " << hostport << ": " << err << rsLog;
- return;
- }
-
- if (!noauth && !x->cc.auth("local", internalSecurity.user, internalSecurity.pwd, err, false)) {
- log() << "could not authenticate against " << conn()->toString() << ", " << err << rsLog;
- return;
- }
+ connect();
}
}
diff --git a/db/repl/health.cpp b/db/repl/health.cpp
index 711b457..7e5a39f 100644
--- a/db/repl/health.cpp
+++ b/db/repl/health.cpp
@@ -402,6 +402,11 @@ namespace mongo {
string s = m->lhb();
if( !s.empty() )
bb.append("errmsg", s);
+
+ if (m->hbinfo().authIssue) {
+ bb.append("authenticated", false);
+ }
+
v.push_back(bb.obj());
m = m->next();
}
diff --git a/db/repl/heartbeat.cpp b/db/repl/heartbeat.cpp
index 7d3f78c..138ba45 100644
--- a/db/repl/heartbeat.cpp
+++ b/db/repl/heartbeat.cpp
@@ -51,11 +51,14 @@ namespace mongo {
/* { replSetHeartbeat : <setname> } */
class CmdReplSetHeartbeat : public ReplSetCommand {
public:
- virtual bool adminOnly() const { return false; }
CmdReplSetHeartbeat() : ReplSetCommand("replSetHeartbeat") { }
virtual bool run(const string& , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool fromRepl) {
- if( replSetBlind )
+ if( replSetBlind ) {
+ if (theReplSet) {
+ errmsg = str::stream() << theReplSet->selfFullName() << " is blind";
+ }
return false;
+ }
/* we don't call ReplSetCommand::check() here because heartbeat
checks many things that are pre-initialization. */
@@ -99,8 +102,8 @@ namespace mongo {
if( !from.empty() ) {
replSettings.discoveredSeeds.insert(from);
}
- errmsg = "still initializing";
- return false;
+ result.append("hbmsg", "still initializing");
+ return true;
}
if( theReplSet->name() != cmdObj.getStringField("replSetHeartbeat") ) {
@@ -123,32 +126,54 @@ namespace mongo {
}
} cmdReplSetHeartbeat;
- /* throws dbexception */
- bool requestHeartbeat(string setName, string from, string memberFullName, BSONObj& result, int myCfgVersion, int& theirCfgVersion, bool checkEmpty) {
+ bool requestHeartbeat(string setName, string from, string memberFullName, BSONObj& result,
+ int myCfgVersion, int& theirCfgVersion, bool checkEmpty) {
if( replSetBlind ) {
- //sleepmillis( rand() );
return false;
}
- BSONObj cmd = BSON( "replSetHeartbeat" << setName << "v" << myCfgVersion << "pv" << 1 << "checkEmpty" << checkEmpty << "from" << from );
+ BSONObj cmd = BSON( "replSetHeartbeat" << setName <<
+ "v" << myCfgVersion <<
+ "pv" << 1 <<
+ "checkEmpty" << checkEmpty <<
+ "from" << from );
- // we might be talking to ourself - generally not a great idea to do outbound waiting calls in a write lock
- assert( !dbMutex.isWriteLocked() );
-
- // these are slow (multisecond to respond), so generally we don't want to be locked, at least not without
+ // generally not a great idea to do outbound waiting calls in a
+ // write lock. heartbeats can be slow (multisecond to respond), so
+ // generally we don't want to be locked, at least not without
// thinking acarefully about it first.
- assert( theReplSet == 0 || !theReplSet->lockedByMe() );
+ uassert(15900, "can't heartbeat: too much lock",
+ !dbMutex.isWriteLocked() || theReplSet == 0 || !theReplSet->lockedByMe() );
ScopedConn conn(memberFullName);
return conn.runCommand("admin", cmd, result, 0);
}
- /* poll every other set member to check its status */
+ /**
+ * Poll every other set member to check its status.
+ *
+ * A detail about local machines and authentication: suppose we have 2
+ * members, A and B, on the same machine using different keyFiles. A is
+ * primary. If we're just starting the set, there are no admin users, so A
+ * and B can access each other because it's local access.
+ *
+ * Then we add a user to A. B cannot sync this user from A, because as soon
+ * as we add a an admin user, A requires auth. However, A can still
+ * heartbeat B, because B *doesn't* have an admin user. So A can reach B
+ * but B cannot reach A.
+ *
+ * Once B is restarted with the correct keyFile, everything should work as
+ * expected.
+ */
class ReplSetHealthPollTask : public task::Task {
+ private:
HostAndPort h;
HeartbeatInfo m;
+ int tries;
+ const int threshold;
public:
- ReplSetHealthPollTask(const HostAndPort& hh, const HeartbeatInfo& mm) : h(hh), m(mm) { }
+ ReplSetHealthPollTask(const HostAndPort& hh, const HeartbeatInfo& mm)
+ : h(hh), m(mm), tries(0), threshold(15) { }
string name() const { return "rsHealthPoll"; }
void doWork() {
@@ -163,16 +188,7 @@ namespace mongo {
BSONObj info;
int theirConfigVersion = -10000;
- Timer timer;
-
- bool ok = requestHeartbeat(theReplSet->name(), theReplSet->selfFullName(), h.toString(), info, theReplSet->config().version, theirConfigVersion);
-
- mem.ping = (unsigned int)timer.millis();
-
- time_t before = timer.startTime() / 1000000;
- // we set this on any response - we don't get this far if
- // couldn't connect because exception is thrown
- time_t after = mem.lastHeartbeat = before + (mem.ping / 1000);
+ bool ok = _requestHeartbeat(mem, info, theirConfigVersion);
// weight new ping with old pings
// on the first ping, just use the ping value
@@ -180,68 +196,12 @@ namespace mongo {
mem.ping = (unsigned int)((old.ping * .8) + (mem.ping * .2));
}
- if ( info["time"].isNumber() ) {
- long long t = info["time"].numberLong();
- if( t > after )
- mem.skew = (int) (t - after);
- else if( t < before )
- mem.skew = (int) (t - before); // negative
- }
- else {
- // it won't be there if remote hasn't initialized yet
- if( info.hasElement("time") )
- warning() << "heatbeat.time isn't a number: " << info << endl;
- mem.skew = INT_MIN;
- }
-
- {
- be state = info["state"];
- if( state.ok() )
- mem.hbstate = MemberState(state.Int());
- }
if( ok ) {
- HeartbeatInfo::numPings++;
-
- if( mem.upSince == 0 ) {
- log() << "replSet info member " << h.toString() << " is up" << rsLog;
- mem.upSince = mem.lastHeartbeat;
- }
- mem.health = 1.0;
- mem.lastHeartbeatMsg = info["hbmsg"].String();
- if( info.hasElement("opTime") )
- mem.opTime = info["opTime"].Date();
-
- // see if this member is in the electable set
- if( info["e"].eoo() ) {
- // for backwards compatibility
- const Member *member = theReplSet->findById(mem.id());
- if (member && member->config().potentiallyHot()) {
- theReplSet->addToElectable(mem.id());
- }
- else {
- theReplSet->rmFromElectable(mem.id());
- }
- }
- // add this server to the electable set if it is within 10
- // seconds of the latest optime we know of
- else if( info["e"].trueValue() &&
- mem.opTime >= theReplSet->lastOpTimeWritten.getSecs() - 10) {
- unsigned lastOp = theReplSet->lastOtherOpTime().getSecs();
- if (lastOp > 0 && mem.opTime >= lastOp - 10) {
- theReplSet->addToElectable(mem.id());
- }
- }
- else {
- theReplSet->rmFromElectable(mem.id());
- }
-
- be cfg = info["config"];
- if( cfg.ok() ) {
- // received a new config
- boost::function<void()> f =
- boost::bind(&Manager::msgReceivedNewConfig, theReplSet->mgr, cfg.Obj().copy());
- theReplSet->mgr->send(f);
- }
+ up(info, mem);
+ }
+ else if (!info["errmsg"].eoo() &&
+ info["errmsg"].str() == "need to login") {
+ authIssue(mem);
}
else {
down(mem, info.getStringField("errmsg"));
@@ -271,7 +231,58 @@ namespace mongo {
}
private:
+ bool _requestHeartbeat(HeartbeatInfo& mem, BSONObj& info, int& theirConfigVersion) {
+ if (tries++ % threshold == (threshold - 1)) {
+ ScopedConn conn(h.toString());
+ conn.reconnect();
+ }
+
+ Timer timer;
+
+ bool ok = requestHeartbeat(theReplSet->name(), theReplSet->selfFullName(),
+ h.toString(), info, theReplSet->config().version, theirConfigVersion);
+
+ mem.ping = (unsigned int)timer.millis();
+
+ time_t before = timer.startTime() / 1000000;
+ // we set this on any response - we don't get this far if
+ // couldn't connect because exception is thrown
+ time_t after = mem.lastHeartbeat = before + (mem.ping / 1000);
+
+ if ( info["time"].isNumber() ) {
+ long long t = info["time"].numberLong();
+ if( t > after )
+ mem.skew = (int) (t - after);
+ else if( t < before )
+ mem.skew = (int) (t - before); // negative
+ }
+ else {
+ // it won't be there if remote hasn't initialized yet
+ if( info.hasElement("time") )
+ warning() << "heatbeat.time isn't a number: " << info << endl;
+ mem.skew = INT_MIN;
+ }
+
+ {
+ be state = info["state"];
+ if( state.ok() )
+ mem.hbstate = MemberState(state.Int());
+ }
+
+ return ok;
+ }
+
+ void authIssue(HeartbeatInfo& mem) {
+ mem.authIssue = true;
+ mem.hbstate = MemberState::RS_UNKNOWN;
+
+ // set health to 0 so that this doesn't count towards majority
+ mem.health = 0.0;
+ theReplSet->rmFromElectable(mem.id());
+ }
+
void down(HeartbeatInfo& mem, string msg) {
+ mem.authIssue = false;
mem.health = 0.0;
mem.ping = 0;
if( mem.upSince || mem.downSince == 0 ) {
@@ -283,6 +294,52 @@ namespace mongo {
mem.lastHeartbeatMsg = msg;
theReplSet->rmFromElectable(mem.id());
}
+
+ void up(const BSONObj& info, HeartbeatInfo& mem) {
+ HeartbeatInfo::numPings++;
+ mem.authIssue = false;
+
+ if( mem.upSince == 0 ) {
+ log() << "replSet member " << h.toString() << " is up" << rsLog;
+ mem.upSince = mem.lastHeartbeat;
+ }
+ mem.health = 1.0;
+ mem.lastHeartbeatMsg = info["hbmsg"].String();
+ if( info.hasElement("opTime") )
+ mem.opTime = info["opTime"].Date();
+
+ // see if this member is in the electable set
+ if( info["e"].eoo() ) {
+ // for backwards compatibility
+ const Member *member = theReplSet->findById(mem.id());
+ if (member && member->config().potentiallyHot()) {
+ theReplSet->addToElectable(mem.id());
+ }
+ else {
+ theReplSet->rmFromElectable(mem.id());
+ }
+ }
+ // add this server to the electable set if it is within 10
+ // seconds of the latest optime we know of
+ else if( info["e"].trueValue() &&
+ mem.opTime >= theReplSet->lastOpTimeWritten.getSecs() - 10) {
+ unsigned lastOp = theReplSet->lastOtherOpTime().getSecs();
+ if (lastOp > 0 && mem.opTime >= lastOp - 10) {
+ theReplSet->addToElectable(mem.id());
+ }
+ }
+ else {
+ theReplSet->rmFromElectable(mem.id());
+ }
+
+ be cfg = info["config"];
+ if( cfg.ok() ) {
+ // received a new config
+ boost::function<void()> f =
+ boost::bind(&Manager::msgReceivedNewConfig, theReplSet->mgr, cfg.Obj().copy());
+ theReplSet->mgr->send(f);
+ }
+ }
};
void ReplSetImpl::endOldHealthTasks() {
diff --git a/db/repl/manager.cpp b/db/repl/manager.cpp
index 3c4c0eb..c91adc3 100644
--- a/db/repl/manager.cpp
+++ b/db/repl/manager.cpp
@@ -119,6 +119,39 @@ namespace mongo {
}
}
+ void Manager::checkAuth() {
+ int down = 0, authIssue = 0, total = 0;
+
+ for( Member *m = rs->head(); m; m=m->next() ) {
+ total++;
+
+ // all authIssue servers will also be not up
+ if (!m->hbinfo().up()) {
+ down++;
+ if (m->hbinfo().authIssue) {
+ authIssue++;
+ }
+ }
+ }
+
+ // if all nodes are down or failed auth AND at least one failed
+ // auth, go into recovering. If all nodes are down, stay a
+ // secondary.
+ if (authIssue > 0 && down == total) {
+ log() << "replset error could not reach/authenticate against any members" << endl;
+
+ if (rs->box.getPrimary() == rs->_self) {
+ log() << "auth problems, relinquishing primary" << rsLog;
+ rs->relinquish();
+ }
+
+ rs->blockSync(true);
+ }
+ else {
+ rs->blockSync(false);
+ }
+ }
+
/** called as the health threads get new results */
void Manager::msgCheckNewState() {
{
@@ -130,7 +163,8 @@ namespace mongo {
if( busyWithElectSelf ) return;
checkElectableSet();
-
+ checkAuth();
+
const Member *p = rs->box.getPrimary();
if( p && p != rs->_self ) {
if( !p->hbinfo().up() ||
diff --git a/db/repl/rs.cpp b/db/repl/rs.cpp
index 1fbbc10..f827291 100644
--- a/db/repl/rs.cpp
+++ b/db/repl/rs.cpp
@@ -329,6 +329,7 @@ namespace mongo {
ReplSetImpl::ReplSetImpl(ReplSetCmdline& replSetCmdline) : elect(this),
_currentSyncTarget(0),
+ _blockSync(false),
_hbmsgTime(0),
_self(0),
_maintenanceMode(0),
diff --git a/db/repl/rs.h b/db/repl/rs.h
index 61041a6..2b3ea9b 100644
--- a/db/repl/rs.h
+++ b/db/repl/rs.h
@@ -93,6 +93,7 @@ namespace mongo {
void noteARemoteIsPrimary(const Member *);
void checkElectableSet();
+ void checkAuth();
virtual void starting();
public:
Manager(ReplSetImpl *rs);
@@ -348,6 +349,9 @@ namespace mongo {
const Member* getMemberToSyncTo();
Member* _currentSyncTarget;
+ bool _blockSync;
+ void blockSync(bool block);
+
// set of electable members' _ids
set<unsigned> _electableSet;
protected:
@@ -491,7 +495,7 @@ namespace mongo {
void _syncThread();
bool tryToGoLiveAsASecondary(OpTime&); // readlocks
void syncTail();
- void syncApply(const BSONObj &o);
+ bool syncApply(const BSONObj &o);
unsigned _syncRollback(OplogReader& r);
void syncRollback(OplogReader& r);
void syncFixUp(HowToFixUp& h, OplogReader& r);
@@ -577,7 +581,7 @@ namespace mongo {
* that still need to be checked for auth.
*/
bool checkAuth(string& errmsg, BSONObjBuilder& result) {
- if( !noauth && adminOnly() ) {
+ if( !noauth ) {
AuthenticationInfo *ai = cc().getAuthenticationInfo();
if (!ai->isAuthorizedForLock("admin", locktype())) {
errmsg = "replSet command unauthorized";
diff --git a/db/repl/rs_config.h b/db/repl/rs_config.h
index f69052a..b22b61e 100644
--- a/db/repl/rs_config.h
+++ b/db/repl/rs_config.h
@@ -80,6 +80,22 @@ namespace mongo {
}
}
bool operator==(const MemberCfg& r) const {
+ if (!tags.empty() || !r.tags.empty()) {
+ if (tags.size() != r.tags.size()) {
+ return false;
+ }
+
+ // if they are the same size and not equal, at least one
+ // element in A must be different in B
+ for (map<string,string>::const_iterator lit = tags.begin(); lit != tags.end(); lit++) {
+ map<string,string>::const_iterator rit = r.tags.find((*lit).first);
+
+ if (rit == r.tags.end() || (*lit).second != (*rit).second) {
+ return false;
+ }
+ }
+ }
+
return _id==r._id && votes == r.votes && h == r.h && priority == r.priority &&
arbiterOnly == r.arbiterOnly && slaveDelay == r.slaveDelay && hidden == r.hidden &&
buildIndexes == buildIndexes;
diff --git a/db/repl/rs_initialsync.cpp b/db/repl/rs_initialsync.cpp
index 101b03a..112d739 100644
--- a/db/repl/rs_initialsync.cpp
+++ b/db/repl/rs_initialsync.cpp
@@ -81,6 +81,7 @@ namespace mongo {
const Member* ReplSetImpl::getMemberToSyncTo() {
Member *closest = 0;
+ bool buildIndexes = true;
// wait for 2N pings before choosing a sync target
if (_cfg) {
@@ -90,11 +91,15 @@ namespace mongo {
OCCASIONALLY log() << "waiting for " << needMorePings << " pings from other members before syncing" << endl;
return NULL;
}
+
+ buildIndexes = myConfig().buildIndexes;
}
// find the member with the lowest ping time that has more data than me
for (Member *m = _members.head(); m; m = m->next()) {
if (m->hbinfo().up() &&
+ // make sure members with buildIndexes sync from other members w/indexes
+ (!buildIndexes || (buildIndexes && m->config().buildIndexes)) &&
(m->state() == MemberState::RS_PRIMARY ||
(m->state() == MemberState::RS_SECONDARY && m->hbinfo().opTime > lastOpTimeWritten)) &&
(!closest || m->hbinfo().ping < closest->hbinfo().ping)) {
diff --git a/db/repl/rs_member.h b/db/repl/rs_member.h
index d60bb52..38b6c9b 100644
--- a/db/repl/rs_member.h
+++ b/db/repl/rs_member.h
@@ -69,7 +69,8 @@ namespace mongo {
class HeartbeatInfo {
unsigned _id;
public:
- HeartbeatInfo() : _id(0xffffffff),hbstate(MemberState::RS_UNKNOWN),health(-1.0),downSince(0),skew(INT_MIN) { }
+ HeartbeatInfo() : _id(0xffffffff), hbstate(MemberState::RS_UNKNOWN), health(-1.0),
+ downSince(0), skew(INT_MIN), authIssue(false) { }
HeartbeatInfo(unsigned id);
unsigned id() const { return _id; }
MemberState hbstate;
@@ -80,6 +81,7 @@ namespace mongo {
DiagStr lastHeartbeatMsg;
OpTime opTime;
int skew;
+ bool authIssue;
unsigned int ping; // milliseconds
static unsigned int numPings;
@@ -94,7 +96,7 @@ namespace mongo {
bool changed(const HeartbeatInfo& old) const;
};
- inline HeartbeatInfo::HeartbeatInfo(unsigned id) : _id(id) {
+ inline HeartbeatInfo::HeartbeatInfo(unsigned id) : _id(id), authIssue(false) {
hbstate = MemberState::RS_UNKNOWN;
health = -1.0;
downSince = 0;
diff --git a/db/repl/rs_sync.cpp b/db/repl/rs_sync.cpp
index b29328b..8cd3e14 100644
--- a/db/repl/rs_sync.cpp
+++ b/db/repl/rs_sync.cpp
@@ -32,17 +32,19 @@ namespace mongo {
}
}
- /* apply the log op that is in param o */
- void ReplSetImpl::syncApply(const BSONObj &o) {
+ /* apply the log op that is in param o
+ @return bool failedUpdate
+ */
+ bool ReplSetImpl::syncApply(const BSONObj &o) {
const char *ns = o.getStringField("ns");
if ( *ns == '.' || *ns == 0 ) {
blank(o);
- return;
+ return false;
}
Client::Context ctx(ns);
ctx.getClient()->curop()->reset();
- applyOperation_inlock(o);
+ return applyOperation_inlock(o);
}
/* initial oplog application, during initial sync, after cloning.
@@ -57,6 +59,7 @@ namespace mongo {
const string hn = source->h().toString();
OplogReader r;
+
try {
if( !r.connect(hn) ) {
log() << "replSet initial sync error can't connect to " << hn << " to read " << rsoplog << rsLog;
@@ -113,12 +116,9 @@ namespace mongo {
if( !r.more() )
break;
BSONObj o = r.nextSafe(); /* note we might get "not master" at some point */
- {
- ts = o["ts"]._opTime();
+ ts = o["ts"]._opTime();
- /* if we have become primary, we dont' want to apply things from elsewhere
- anymore. assumePrimary is in the db lock so we are safe as long as
- we check after we locked above. */
+ {
if( (source->state() != MemberState::RS_PRIMARY &&
source->state() != MemberState::RS_SECONDARY) ||
replSetForceInitialSyncFailure ) {
@@ -133,9 +133,12 @@ namespace mongo {
throw DBException("primary changed",0);
}
- if( ts >= applyGTE ) {
- // optimes before we started copying need not be applied.
- syncApply(o);
+ if( ts >= applyGTE ) { // optimes before we started copying need not be applied.
+ bool failedUpdate = syncApply(o);
+ if( failedUpdate && shouldRetry(o, hn)) {
+ failedUpdate = syncApply(o);
+ uassert(15915, "replSet update still fails after adding missing object", !failedUpdate);
+ }
}
_logOpObjRS(o); /* with repl sets we write the ops to our oplog too */
}
@@ -149,7 +152,11 @@ namespace mongo {
start = now;
}
}
-
+
+ if ( ts > minValid ) {
+ break;
+ }
+
getDur().commitIfNeeded();
}
catch (DBException& e) {
@@ -157,7 +164,7 @@ namespace mongo {
if( e.getCode() == 11000 || e.getCode() == 11001 ) {
continue;
}
-
+
// handle cursor not found (just requery)
if( e.getCode() == 13127 ) {
r.resetCursor();
@@ -290,7 +297,7 @@ namespace mongo {
target = 0;
}
}
-
+
// no server found
if (target == 0) {
// if there is no one to sync from
@@ -298,7 +305,7 @@ namespace mongo {
tryToGoLiveAsASecondary(minvalid);
return;
}
-
+
r.tailingQueryGTE(rsoplog, lastOpTimeWritten);
// if target cut connections between connecting and querying (for
// example, because it stepped down) we might not have a cursor
@@ -408,7 +415,7 @@ namespace mongo {
if( !target->hbinfo().hbstate.readable() ) {
break;
}
-
+
if( myConfig().slaveDelay != sd ) // reconf
break;
}
@@ -429,7 +436,7 @@ namespace mongo {
}
syncApply(o);
- _logOpObjRS(o); // with repl sets we write the ops to our oplog too
+ _logOpObjRS(o); // with repl sets we write the ops to our oplog too
}
catch (DBException& e) {
sethbmsg(str::stream() << "syncTail: " << e.toString() << ", syncing: " << o);
@@ -444,7 +451,7 @@ namespace mongo {
// TODO : reuse our connection to the primary.
return;
}
-
+
if( !target->hbinfo().hbstate.readable() ) {
return;
}
@@ -458,7 +465,7 @@ namespace mongo {
sleepsecs(1);
return;
}
- if( sp.state.fatal() || sp.state.startup() ) {
+ if( _blockSync || sp.state.fatal() || sp.state.startup() ) {
sleepsecs(5);
return;
}
@@ -530,6 +537,15 @@ namespace mongo {
replLocalAuth();
}
+ void ReplSetImpl::blockSync(bool block) {
+ _blockSync = block;
+ if (_blockSync) {
+ // syncing is how we get into SECONDARY state, so we'll be stuck in
+ // RECOVERING until we unblock
+ changeState(MemberState::RS_RECOVERING);
+ }
+ }
+
void GhostSync::associateSlave(const BSONObj& id, const int memberId) {
const OID rid = id["_id"].OID();
rwlock lk( _lock , true );
@@ -556,10 +572,10 @@ namespace mongo {
OCCASIONALLY warning() << "couldn't update slave " << rid << " no entry" << rsLog;
return;
}
-
+
GhostSlave& slave = i->second;
if (!slave.init) {
- OCCASIONALLY log() << "couldn't update slave " << rid << " not init" << rsLog;
+ OCCASIONALLY log() << "couldn't update slave " << rid << " not init" << rsLog;
return;
}