diff options
Diffstat (limited to 'db')
-rw-r--r-- | db/cloner.cpp | 6 | ||||
-rw-r--r-- | db/commands/isself.cpp | 6 | ||||
-rw-r--r-- | db/commands/mr.cpp | 1 | ||||
-rw-r--r-- | db/db.cpp | 6 | ||||
-rw-r--r-- | db/dbcommands.cpp | 12 | ||||
-rw-r--r-- | db/dbcommands_generic.cpp | 3 | ||||
-rw-r--r-- | db/dbhelpers.cpp | 1 | ||||
-rw-r--r-- | db/geo/2d.cpp | 5 | ||||
-rw-r--r-- | db/instance.cpp | 9 | ||||
-rw-r--r-- | db/jsobj.cpp | 15 | ||||
-rw-r--r-- | db/oplog.cpp | 80 | ||||
-rw-r--r-- | db/oplog.h | 8 | ||||
-rw-r--r-- | db/ops/update.cpp | 3 | ||||
-rw-r--r-- | db/queryutil.h | 2 | ||||
-rw-r--r-- | db/record.cpp | 9 | ||||
-rw-r--r-- | db/repl.cpp | 48 | ||||
-rw-r--r-- | db/repl.h | 4 | ||||
-rw-r--r-- | db/repl/connections.h | 46 | ||||
-rw-r--r-- | db/repl/health.cpp | 5 | ||||
-rw-r--r-- | db/repl/heartbeat.cpp | 229 | ||||
-rw-r--r-- | db/repl/manager.cpp | 36 | ||||
-rw-r--r-- | db/repl/rs.cpp | 1 | ||||
-rw-r--r-- | db/repl/rs.h | 8 | ||||
-rw-r--r-- | db/repl/rs_config.h | 16 | ||||
-rw-r--r-- | db/repl/rs_initialsync.cpp | 5 | ||||
-rw-r--r-- | db/repl/rs_member.h | 6 | ||||
-rw-r--r-- | db/repl/rs_sync.cpp | 60 |
27 files changed, 456 insertions, 174 deletions
diff --git a/db/cloner.cpp b/db/cloner.cpp index 8956133..f13ea52 100644 --- a/db/cloner.cpp +++ b/db/cloner.cpp @@ -83,6 +83,12 @@ namespace mongo { BSONElement e = i.next(); if ( e.eoo() ) break; + + // for now, skip the "v" field so that v:0 indexes will be upgraded to v:1 + if ( string("v") == e.fieldName() ) { + continue; + } + if ( string("ns") == e.fieldName() ) { uassert( 10024 , "bad ns field for index during dbcopy", e.type() == String); const char *p = strchr(e.valuestr(), '.'); diff --git a/db/commands/isself.cpp b/db/commands/isself.cpp index 5a868de..7b1cea4 100644 --- a/db/commands/isself.cpp +++ b/db/commands/isself.cpp @@ -4,6 +4,7 @@ #include "../../util/net/listen.h" #include "../commands.h" #include "../../client/dbclient.h" +#include "../security.h" #ifndef _WIN32 # ifndef __sunos__ @@ -211,6 +212,11 @@ namespace mongo { return false; } + if (!noauth && cmdLine.keyFile && + !conn.auth("local", internalSecurity.user, internalSecurity.pwd, errmsg, false)) { + return false; + } + BSONObj out; bool ok = conn.simpleCommand( "admin" , &out , "_isSelf" ); diff --git a/db/commands/mr.cpp b/db/commands/mr.cpp index 56e9770..b79e62b 100644 --- a/db/commands/mr.cpp +++ b/db/commands/mr.cpp @@ -1119,6 +1119,7 @@ namespace mongo { virtual LockType locktype() const { return NONE; } bool run(const string& dbname , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool) { + ShardedConnectionInfo::addHook(); string shardedOutputCollection = cmdObj["shardedOutputCollection"].valuestrsafe(); string postProcessCollection = cmdObj["postProcessCollection"].valuestrsafe(); bool postProcessOnly = !(postProcessCollection.empty()); @@ -708,6 +708,12 @@ int main(int argc, char* argv[]) { else { dbpath = "/data/db/"; } +#ifdef _WIN32 + if (dbpath.size() > 1 && dbpath[dbpath.size()-1] == '/') { + // size() check is for the unlikely possibility of --dbpath "/" + dbpath = dbpath.erase(dbpath.size()-1); + } +#endif if ( params.count("directoryperdb")) { directoryperdb = true; diff --git a/db/dbcommands.cpp b/db/dbcommands.cpp index 31f4b7f..b2e6218 100644 --- a/db/dbcommands.cpp +++ b/db/dbcommands.cpp @@ -510,9 +510,11 @@ namespace mongo { t.appendNumber( "mappedWithJournal" , m ); } - if( v - m > 5000 ) { + int overhead = v - m - connTicketHolder.used(); + + if( overhead > 4000 ) { t.append("note", "virtual minus mapped is large. could indicate a memory leak"); - log() << "warning: virtual size (" << v << "MB) - mapped size (" << m << "MB) is large. could indicate a memory leak" << endl; + log() << "warning: virtual size (" << v << "MB) - mapped size (" << m << "MB) is large (" << overhead << "MB). could indicate a memory leak" << endl; } t.done(); @@ -949,7 +951,7 @@ namespace mongo { } list<BSONObj> all; - auto_ptr<DBClientCursor> i = db.getIndexes( toDeleteNs ); + auto_ptr<DBClientCursor> i = db.query( dbname + ".system.indexes" , BSON( "ns" << toDeleteNs ) , 0 , 0 , 0 , QueryOption_SlaveOk ); BSONObjBuilder b; while ( i->more() ) { BSONObj o = i->next().removeField("v").getOwned(); @@ -1104,6 +1106,10 @@ namespace mongo { BSONObj sort = BSON( "files_id" << 1 << "n" << 1 ); shared_ptr<Cursor> cursor = bestGuessCursor(ns.c_str(), query, sort); + if ( ! cursor ) { + errmsg = "need an index on { files_id : 1 , n : 1 }"; + return false; + } auto_ptr<ClientCursor> cc (new ClientCursor(QueryOption_NoCursorTimeout, cursor, ns.c_str())); int n = 0; diff --git a/db/dbcommands_generic.cpp b/db/dbcommands_generic.cpp index 69b51c7..22cee22 100644 --- a/db/dbcommands_generic.cpp +++ b/db/dbcommands_generic.cpp @@ -51,7 +51,8 @@ namespace mongo { public: CmdBuildInfo() : Command( "buildInfo", true, "buildinfo" ) {} virtual bool slaveOk() const { return true; } - virtual bool adminOnly() const { return true; } + virtual bool adminOnly() const { return false; } + virtual bool requiresAuth() { return false; } virtual LockType locktype() const { return NONE; } virtual void help( stringstream &help ) const { help << "get version #, etc.\n"; diff --git a/db/dbhelpers.cpp b/db/dbhelpers.cpp index cc4fdba..33ac9b7 100644 --- a/db/dbhelpers.cpp +++ b/db/dbhelpers.cpp @@ -157,6 +157,7 @@ namespace mongo { } DiskLoc Helpers::findById(NamespaceDetails *d, BSONObj idquery) { + assert(d); int idxNo = d->findIdIndex(); uassert(13430, "no _id index", idxNo>=0); IndexDetails& i = d->idx( idxNo ); diff --git a/db/geo/2d.cpp b/db/geo/2d.cpp index b873490..40df5e2 100644 --- a/db/geo/2d.cpp +++ b/db/geo/2d.cpp @@ -2647,7 +2647,10 @@ namespace mongo { BSONObjBuilder bb( arr.subobjStart( BSONObjBuilder::numStr( x++ ) ) ); bb.append( "dis" , dis ); - if( includeLocs ) bb.append( "loc" , p._pt ); + if( includeLocs ){ + if( p._pt.couldBeArray() ) bb.append( "loc", BSONArray( p._pt ) ); + else bb.append( "loc" , p._pt ); + } bb.append( "obj" , p._o ); bb.done(); } diff --git a/db/instance.cpp b/db/instance.cpp index 6727867..764571d 100644 --- a/db/instance.cpp +++ b/db/instance.cpp @@ -353,20 +353,19 @@ namespace mongo { } currentOp.ensureStarted(); currentOp.done(); - int ms = currentOp.totalTimeMillis(); + debug.executionTime = currentOp.totalTimeMillis(); //DEV log = true; - if ( log || ms > logThreshold ) { - if( logLevel < 3 && op == dbGetMore && strstr(ns, ".oplog.") && ms < 4300 && !log ) { + if ( log || debug.executionTime > logThreshold ) { + if( logLevel < 3 && op == dbGetMore && strstr(ns, ".oplog.") && debug.executionTime < 4300 && !log ) { /* it's normal for getMore on the oplog to be slow because of use of awaitdata flag. */ } else { - debug.executionTime = ms; mongo::tlog() << debug << endl; } } - if ( currentOp.shouldDBProfile( ms ) ) { + if ( currentOp.shouldDBProfile( debug.executionTime ) ) { // performance profiling is on if ( dbMutex.getState() < 0 ) { mongo::log(1) << "note: not profiling because recursive read lock" << endl; diff --git a/db/jsobj.cpp b/db/jsobj.cpp index dcb7744..9644a87 100644 --- a/db/jsobj.cpp +++ b/db/jsobj.cpp @@ -753,6 +753,21 @@ namespace mongo { return n; } + bool BSONObj::couldBeArray() const { + BSONObjIterator i( *this ); + int index = 0; + while( i.moreWithEOO() ){ + BSONElement e = i.next(); + if( e.eoo() ) break; + + // TODO: If actually important, may be able to do int->char* much faster + if( strcmp( e.fieldName(), ((string)( str::stream() << index )).c_str() ) != 0 ) + return false; + index++; + } + return true; + } + BSONObj BSONObj::clientReadable() const { BSONObjBuilder b; BSONObjIterator i( *this ); diff --git a/db/oplog.cpp b/db/oplog.cpp index dc9db76..5c1671c 100644 --- a/db/oplog.cpp +++ b/db/oplog.cpp @@ -625,9 +625,47 @@ namespace mongo { } } - void applyOperation_inlock(const BSONObj& op , bool fromRepl ) { + bool shouldRetry(const BSONObj& o, const string& hn) { + OplogReader missingObjReader; + + // we don't have the object yet, which is possible on initial sync. get it. + log() << "replication info adding missing object" << endl; // rare enough we can log + uassert(15916, str::stream() << "Can no longer connect to initial sync source: " << hn, missingObjReader.connect(hn)); + + const char *ns = o.getStringField("ns"); + // might be more than just _id in the update criteria + BSONObj query = BSONObjBuilder().append(o.getObjectField("o2")["_id"]).obj(); + BSONObj missingObj; + try { + missingObj = missingObjReader.findOne(ns, query); + } catch(DBException& e) { + log() << "replication assertion fetching missing object: " << e.what() << endl; + throw; + } + + if( missingObj.isEmpty() ) { + log() << "replication missing object not found on source. presumably deleted later in oplog" << endl; + log() << "replication o2: " << o.getObjectField("o2").toString() << endl; + log() << "replication o firstfield: " << o.getObjectField("o").firstElementFieldName() << endl; + + return false; + } + else { + Client::Context ctx(ns); + DiskLoc d = theDataFileMgr.insert(ns, (void*) missingObj.objdata(), missingObj.objsize()); + uassert(15917, "Got bad disk location when attempting to insert", !d.isNull()); + + return true; + } + } + + /** @param fromRepl false if from ApplyOpsCmd + @return true if was and update should have happened and the document DNE. see replset initial sync code. + */ + bool applyOperation_inlock(const BSONObj& op , bool fromRepl ) { assertInWriteLock(); LOG(6) << "applying op: " << op << endl; + bool failedUpdate = false; OpCounters * opCounters = fromRepl ? &replOpCounters : &globalOpCounters; @@ -680,9 +718,45 @@ namespace mongo { } else if ( *opType == 'u' ) { opCounters->gotUpdate(); + // dm do we create this for a capped collection? + // - if not, updates would be slow + // - but if were by id would be slow on primary too so maybe ok + // - if on primary was by another key and there are other indexes, this could be very bad w/out an index + // - if do create, odd to have on secondary but not primary. also can cause secondary to block for + // quite a while on creation. RARELY ensureHaveIdIndex(ns); // otherwise updates will be super slow OpDebug debug; - updateObjects(ns, o, op.getObjectField("o2"), /*upsert*/ fields[3].booleanSafe(), /*multi*/ false, /*logop*/ false , debug ); + BSONObj updateCriteria = op.getObjectField("o2"); + bool upsert = fields[3].booleanSafe(); + UpdateResult ur = updateObjects(ns, o, updateCriteria, upsert, /*multi*/ false, /*logop*/ false , debug ); + if( ur.num == 0 ) { + if( ur.mod ) { + if( updateCriteria.nFields() == 1 ) { + // was a simple { _id : ... } update criteria + failedUpdate = true; + // todo: probably should assert in these failedUpdate cases if not in initialSync + } + // need to check to see if it isn't present so we can set failedUpdate correctly. + // note that adds some overhead for this extra check in some cases, such as an updateCriteria + // of the form + // { _id:..., { x : {$size:...} } + // thus this is not ideal. + else if( nsdetails(ns) == NULL || Helpers::findById(nsdetails(ns), updateCriteria).isNull() ) { + failedUpdate = true; + } + else { + // it's present; zero objects were updated because of additional specifiers in the query for idempotence + } + } + else { + // this could happen benignly on an oplog duplicate replay of an upsert + // (because we are idempotent), + // if an regular non-mod update fails the item is (presumably) missing. + if( !upsert ) { + failedUpdate = true; + } + } + } } else if ( *opType == 'd' ) { opCounters->gotDelete(); @@ -703,7 +777,7 @@ namespace mongo { else { throw MsgAssertionException( 14825 , ErrorMsg("error in applyOperation : unknown opType ", *opType) ); } - + return failedUpdate; } class ApplyOpsCmd : public Command { @@ -129,6 +129,12 @@ namespace mongo { * take an op and apply locally * used for applying from an oplog * @param fromRepl really from replication or for testing/internal/command/etc... + * Returns if the op was an update that could not be applied (true on failure) */ - void applyOperation_inlock(const BSONObj& op , bool fromRepl = true ); + bool applyOperation_inlock(const BSONObj& op , bool fromRepl = true ); + + /** + * If applyOperation_inlock should be called again after an update fails. + */ + bool shouldRetry(const BSONObj& op , const string& hn); } diff --git a/db/ops/update.cpp b/db/ops/update.cpp index fd9798a..6a7aad4 100644 --- a/db/ops/update.cpp +++ b/db/ops/update.cpp @@ -1354,7 +1354,8 @@ namespace mongo { logOp( "i", ns, no ); return UpdateResult( 0 , 0 , 1 , no ); } - return UpdateResult( 0 , 0 , 0 ); + + return UpdateResult( 0 , isOperatorUpdate , 0 ); } UpdateResult updateObjects(const char *ns, const BSONObj& updateobj, BSONObj patternOrig, bool upsert, bool multi, bool logop , OpDebug& debug ) { diff --git a/db/queryutil.h b/db/queryutil.h index 104cde2..5d86194 100644 --- a/db/queryutil.h +++ b/db/queryutil.h @@ -328,7 +328,7 @@ namespace mongo { bool matchesElement( const BSONElement &e, int i, bool direction ) const; bool matchesKey( const BSONObj &key ) const; vector<FieldRange> _ranges; - const IndexSpec &_indexSpec; + IndexSpec _indexSpec; int _direction; vector<BSONObj> _queries; // make sure mem owned friend class FieldRangeVectorIterator; diff --git a/db/record.cpp b/db/record.cpp index 51dc520..a8a3e43 100644 --- a/db/record.cpp +++ b/db/record.cpp @@ -112,7 +112,8 @@ namespace mongo { class Rolling { public: - Rolling() { + Rolling() + : _lock( "ps::Rolling" ){ _curSlice = 0; _lastRotate = Listener::getElapsedTimeMillis(); } @@ -126,8 +127,8 @@ namespace mongo { bool access( size_t region , short offset , bool doHalf ) { int regionHash = hash(region); - scoped_spinlock lk( _lock ); - + SimpleMutex::scoped_lock lk( _lock ); + static int rarely_count = 0; if ( rarely_count++ % 2048 == 0 ) { long long now = Listener::getElapsedTimeMillis(); @@ -174,7 +175,7 @@ namespace mongo { long long _lastRotate; Slice _slices[NumSlices]; - SpinLock _lock; + SimpleMutex _lock; } rolling; } diff --git a/db/repl.cpp b/db/repl.cpp index a18d725..5edf0c2 100644 --- a/db/repl.cpp +++ b/db/repl.cpp @@ -508,12 +508,12 @@ namespace mongo { return; } - + DatabaseIgnorer ___databaseIgnorer; - + void DatabaseIgnorer::doIgnoreUntilAfter( const string &db, const OpTime &futureOplogTime ) { if ( futureOplogTime > _ignores[ db ] ) { - _ignores[ db ] = futureOplogTime; + _ignores[ db ] = futureOplogTime; } } @@ -533,28 +533,28 @@ namespace mongo { bool ReplSource::handleDuplicateDbName( const BSONObj &op, const char *ns, const char *db ) { if ( dbHolder.isLoaded( ns, dbpath ) ) { // Database is already present. - return true; + return true; } BSONElement ts = op.getField( "ts" ); if ( ( ts.type() == Date || ts.type() == Timestamp ) && ___databaseIgnorer.ignoreAt( db, ts.date() ) ) { // Database is ignored due to a previous indication that it is // missing from master after optime "ts". - return false; + return false; } if ( Database::duplicateUncasedName( db, dbpath ).empty() ) { // No duplicate database names are present. return true; } - + OpTime lastTime; bool dbOk = false; { dbtemprelease release; - + // We always log an operation after executing it (never before), so // a database list will always be valid as of an oplog entry generated // before it was retrieved. - + BSONObj last = oplogReader.findOne( this->ns().c_str(), Query().sort( BSON( "$natural" << -1 ) ) ); if ( !last.isEmpty() ) { BSONElement ts = last.getField( "ts" ); @@ -568,34 +568,34 @@ namespace mongo { BSONObjIterator i( info.getField( "databases" ).embeddedObject() ); while( i.more() ) { BSONElement e = i.next(); - + const char * name = e.embeddedObject().getField( "name" ).valuestr(); if ( strcasecmp( name, db ) != 0 ) continue; - + if ( strcmp( name, db ) == 0 ) { // The db exists on master, still need to check that no conflicts exist there. dbOk = true; continue; } - + // The master has a db name that conflicts with the requested name. dbOk = false; break; } } - + if ( !dbOk ) { ___databaseIgnorer.doIgnoreUntilAfter( db, lastTime ); incompleteCloneDbs.erase(db); addDbNextPass.erase(db); - return false; + return false; } - + // Check for duplicates again, since we released the lock above. set< string > duplicates; Database::duplicateUncasedName( db, dbpath, &duplicates ); - + // The database is present on the master and no conflicting databases // are present on the master. Drop any local conflicts. for( set< string >::const_iterator i = duplicates.begin(); i != duplicates.end(); ++i ) { @@ -605,7 +605,7 @@ namespace mongo { Client::Context ctx(*i); dropDatabase(*i); } - + massert( 14034, "Duplicate database names present after attempting to delete duplicates", Database::duplicateUncasedName( db, dbpath ).empty() ); return true; @@ -613,7 +613,11 @@ namespace mongo { void ReplSource::applyOperation(const BSONObj& op) { try { - applyOperation_inlock( op ); + bool failedUpdate = applyOperation_inlock( op ); + if (failedUpdate && shouldRetry(op, hostName)) { + failedUpdate = applyOperation_inlock( op ); + uassert(15914, "Failure retrying initial sync update", ! failedUpdate ); + } } catch ( UserException& e ) { log() << "sync: caught user assertion " << e << " while applying op: " << op << endl;; @@ -705,9 +709,9 @@ namespace mongo { } if ( !handleDuplicateDbName( op, ns, clientName ) ) { - return; + return; } - + Client::Context ctx( ns ); ctx.getClient()->curop()->reset(); @@ -943,7 +947,7 @@ namespace mongo { } // otherwise, break out of loop so we can set to completed or clone more dbs } - + if( oplogReader.awaitCapable() && tailing ) okResultCode = 0; // don't sleep syncedTo = nextOpTime; @@ -1077,7 +1081,7 @@ namespace mongo { BSONObj me; { - + dblock l; // local.me is an identifier for a server for getLastError w:2+ if ( ! Helpers::getSingleton( "local.me" , me ) || @@ -1123,7 +1127,7 @@ namespace mongo { } return true; } - + bool OplogReader::connect(string hostName) { if (conn() != 0) { return true; @@ -122,11 +122,11 @@ namespace mongo { * @return true iff an op with the specified ns may be applied. */ bool handleDuplicateDbName( const BSONObj &op, const char *ns, const char *db ); - + public: OplogReader oplogReader; - static void applyOperation(const BSONObj& op); + void applyOperation(const BSONObj& op); string hostName; // ip addr or hostname plus optionally, ":<port>" string _sourceName; // a logical source name. string sourceName() const { return _sourceName.empty() ? "main" : _sourceName; } diff --git a/db/repl/connections.h b/db/repl/connections.h index 78cfb30..61c581b 100644 --- a/db/repl/connections.h +++ b/db/repl/connections.h @@ -47,6 +47,10 @@ namespace mongo { ~ScopedConn() { // conLock releases... } + void reconnect() { + conn()->port().shutdown(); + connect(); + } /* If we were to run a query and not exhaust the cursor, future use of the connection would be problematic. So here what we do is wrapper known safe methods and not allow cursor-style queries at all. This makes @@ -61,9 +65,6 @@ namespace mongo { BSONObj findOne(const string &ns, const Query& q, const BSONObj *fieldsToReturn = 0, int queryOptions = 0) { return conn()->findOne(ns, q, fieldsToReturn, queryOptions); } - void setTimeout(double to) { - conn()->setSoTimeout(to); - } private: auto_ptr<scoped_lock> connLock; @@ -78,15 +79,36 @@ namespace mongo { typedef map<string,ScopedConn::X*> M; static M& _map; DBClientConnection* conn() { return &x->cc; } + const string _hostport; + + // we should already be locked... + bool connect() { + string err; + if (!x->cc.connect(_hostport, err)) { + log() << "couldn't connect to " << _hostport << ": " << err << rsLog; + return false; + } + + // if we cannot authenticate against a member, then either its key file + // or our key file has to change. if our key file has to change, we'll + // be rebooting. if their file has to change, they'll be rebooted so the + // connection created above will go dead, reconnect, and reauth. + if (!noauth && !x->cc.auth("local", internalSecurity.user, internalSecurity.pwd, err, false)) { + log() << "could not authenticate against " << _hostport << ", " << err << rsLog; + return false; + } + + return true; + } }; - inline ScopedConn::ScopedConn(string hostport) { + inline ScopedConn::ScopedConn(string hostport) : _hostport(hostport) { bool first = false; { scoped_lock lk(mapMutex); - x = _map[hostport]; + x = _map[_hostport]; if( x == 0 ) { - x = _map[hostport] = new X(); + x = _map[_hostport] = new X(); first = true; connLock.reset( new scoped_lock(x->z) ); } @@ -96,17 +118,7 @@ namespace mongo { return; } - // we already locked above... - string err; - if (!x->cc.connect(hostport, err)) { - log() << "couldn't connect to " << hostport << ": " << err << rsLog; - return; - } - - if (!noauth && !x->cc.auth("local", internalSecurity.user, internalSecurity.pwd, err, false)) { - log() << "could not authenticate against " << conn()->toString() << ", " << err << rsLog; - return; - } + connect(); } } diff --git a/db/repl/health.cpp b/db/repl/health.cpp index 711b457..7e5a39f 100644 --- a/db/repl/health.cpp +++ b/db/repl/health.cpp @@ -402,6 +402,11 @@ namespace mongo { string s = m->lhb(); if( !s.empty() ) bb.append("errmsg", s); + + if (m->hbinfo().authIssue) { + bb.append("authenticated", false); + } + v.push_back(bb.obj()); m = m->next(); } diff --git a/db/repl/heartbeat.cpp b/db/repl/heartbeat.cpp index 7d3f78c..138ba45 100644 --- a/db/repl/heartbeat.cpp +++ b/db/repl/heartbeat.cpp @@ -51,11 +51,14 @@ namespace mongo { /* { replSetHeartbeat : <setname> } */ class CmdReplSetHeartbeat : public ReplSetCommand { public: - virtual bool adminOnly() const { return false; } CmdReplSetHeartbeat() : ReplSetCommand("replSetHeartbeat") { } virtual bool run(const string& , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool fromRepl) { - if( replSetBlind ) + if( replSetBlind ) { + if (theReplSet) { + errmsg = str::stream() << theReplSet->selfFullName() << " is blind"; + } return false; + } /* we don't call ReplSetCommand::check() here because heartbeat checks many things that are pre-initialization. */ @@ -99,8 +102,8 @@ namespace mongo { if( !from.empty() ) { replSettings.discoveredSeeds.insert(from); } - errmsg = "still initializing"; - return false; + result.append("hbmsg", "still initializing"); + return true; } if( theReplSet->name() != cmdObj.getStringField("replSetHeartbeat") ) { @@ -123,32 +126,54 @@ namespace mongo { } } cmdReplSetHeartbeat; - /* throws dbexception */ - bool requestHeartbeat(string setName, string from, string memberFullName, BSONObj& result, int myCfgVersion, int& theirCfgVersion, bool checkEmpty) { + bool requestHeartbeat(string setName, string from, string memberFullName, BSONObj& result, + int myCfgVersion, int& theirCfgVersion, bool checkEmpty) { if( replSetBlind ) { - //sleepmillis( rand() ); return false; } - BSONObj cmd = BSON( "replSetHeartbeat" << setName << "v" << myCfgVersion << "pv" << 1 << "checkEmpty" << checkEmpty << "from" << from ); + BSONObj cmd = BSON( "replSetHeartbeat" << setName << + "v" << myCfgVersion << + "pv" << 1 << + "checkEmpty" << checkEmpty << + "from" << from ); - // we might be talking to ourself - generally not a great idea to do outbound waiting calls in a write lock - assert( !dbMutex.isWriteLocked() ); - - // these are slow (multisecond to respond), so generally we don't want to be locked, at least not without + // generally not a great idea to do outbound waiting calls in a + // write lock. heartbeats can be slow (multisecond to respond), so + // generally we don't want to be locked, at least not without // thinking acarefully about it first. - assert( theReplSet == 0 || !theReplSet->lockedByMe() ); + uassert(15900, "can't heartbeat: too much lock", + !dbMutex.isWriteLocked() || theReplSet == 0 || !theReplSet->lockedByMe() ); ScopedConn conn(memberFullName); return conn.runCommand("admin", cmd, result, 0); } - /* poll every other set member to check its status */ + /** + * Poll every other set member to check its status. + * + * A detail about local machines and authentication: suppose we have 2 + * members, A and B, on the same machine using different keyFiles. A is + * primary. If we're just starting the set, there are no admin users, so A + * and B can access each other because it's local access. + * + * Then we add a user to A. B cannot sync this user from A, because as soon + * as we add a an admin user, A requires auth. However, A can still + * heartbeat B, because B *doesn't* have an admin user. So A can reach B + * but B cannot reach A. + * + * Once B is restarted with the correct keyFile, everything should work as + * expected. + */ class ReplSetHealthPollTask : public task::Task { + private: HostAndPort h; HeartbeatInfo m; + int tries; + const int threshold; public: - ReplSetHealthPollTask(const HostAndPort& hh, const HeartbeatInfo& mm) : h(hh), m(mm) { } + ReplSetHealthPollTask(const HostAndPort& hh, const HeartbeatInfo& mm) + : h(hh), m(mm), tries(0), threshold(15) { } string name() const { return "rsHealthPoll"; } void doWork() { @@ -163,16 +188,7 @@ namespace mongo { BSONObj info; int theirConfigVersion = -10000; - Timer timer; - - bool ok = requestHeartbeat(theReplSet->name(), theReplSet->selfFullName(), h.toString(), info, theReplSet->config().version, theirConfigVersion); - - mem.ping = (unsigned int)timer.millis(); - - time_t before = timer.startTime() / 1000000; - // we set this on any response - we don't get this far if - // couldn't connect because exception is thrown - time_t after = mem.lastHeartbeat = before + (mem.ping / 1000); + bool ok = _requestHeartbeat(mem, info, theirConfigVersion); // weight new ping with old pings // on the first ping, just use the ping value @@ -180,68 +196,12 @@ namespace mongo { mem.ping = (unsigned int)((old.ping * .8) + (mem.ping * .2)); } - if ( info["time"].isNumber() ) { - long long t = info["time"].numberLong(); - if( t > after ) - mem.skew = (int) (t - after); - else if( t < before ) - mem.skew = (int) (t - before); // negative - } - else { - // it won't be there if remote hasn't initialized yet - if( info.hasElement("time") ) - warning() << "heatbeat.time isn't a number: " << info << endl; - mem.skew = INT_MIN; - } - - { - be state = info["state"]; - if( state.ok() ) - mem.hbstate = MemberState(state.Int()); - } if( ok ) { - HeartbeatInfo::numPings++; - - if( mem.upSince == 0 ) { - log() << "replSet info member " << h.toString() << " is up" << rsLog; - mem.upSince = mem.lastHeartbeat; - } - mem.health = 1.0; - mem.lastHeartbeatMsg = info["hbmsg"].String(); - if( info.hasElement("opTime") ) - mem.opTime = info["opTime"].Date(); - - // see if this member is in the electable set - if( info["e"].eoo() ) { - // for backwards compatibility - const Member *member = theReplSet->findById(mem.id()); - if (member && member->config().potentiallyHot()) { - theReplSet->addToElectable(mem.id()); - } - else { - theReplSet->rmFromElectable(mem.id()); - } - } - // add this server to the electable set if it is within 10 - // seconds of the latest optime we know of - else if( info["e"].trueValue() && - mem.opTime >= theReplSet->lastOpTimeWritten.getSecs() - 10) { - unsigned lastOp = theReplSet->lastOtherOpTime().getSecs(); - if (lastOp > 0 && mem.opTime >= lastOp - 10) { - theReplSet->addToElectable(mem.id()); - } - } - else { - theReplSet->rmFromElectable(mem.id()); - } - - be cfg = info["config"]; - if( cfg.ok() ) { - // received a new config - boost::function<void()> f = - boost::bind(&Manager::msgReceivedNewConfig, theReplSet->mgr, cfg.Obj().copy()); - theReplSet->mgr->send(f); - } + up(info, mem); + } + else if (!info["errmsg"].eoo() && + info["errmsg"].str() == "need to login") { + authIssue(mem); } else { down(mem, info.getStringField("errmsg")); @@ -271,7 +231,58 @@ namespace mongo { } private: + bool _requestHeartbeat(HeartbeatInfo& mem, BSONObj& info, int& theirConfigVersion) { + if (tries++ % threshold == (threshold - 1)) { + ScopedConn conn(h.toString()); + conn.reconnect(); + } + + Timer timer; + + bool ok = requestHeartbeat(theReplSet->name(), theReplSet->selfFullName(), + h.toString(), info, theReplSet->config().version, theirConfigVersion); + + mem.ping = (unsigned int)timer.millis(); + + time_t before = timer.startTime() / 1000000; + // we set this on any response - we don't get this far if + // couldn't connect because exception is thrown + time_t after = mem.lastHeartbeat = before + (mem.ping / 1000); + + if ( info["time"].isNumber() ) { + long long t = info["time"].numberLong(); + if( t > after ) + mem.skew = (int) (t - after); + else if( t < before ) + mem.skew = (int) (t - before); // negative + } + else { + // it won't be there if remote hasn't initialized yet + if( info.hasElement("time") ) + warning() << "heatbeat.time isn't a number: " << info << endl; + mem.skew = INT_MIN; + } + + { + be state = info["state"]; + if( state.ok() ) + mem.hbstate = MemberState(state.Int()); + } + + return ok; + } + + void authIssue(HeartbeatInfo& mem) { + mem.authIssue = true; + mem.hbstate = MemberState::RS_UNKNOWN; + + // set health to 0 so that this doesn't count towards majority + mem.health = 0.0; + theReplSet->rmFromElectable(mem.id()); + } + void down(HeartbeatInfo& mem, string msg) { + mem.authIssue = false; mem.health = 0.0; mem.ping = 0; if( mem.upSince || mem.downSince == 0 ) { @@ -283,6 +294,52 @@ namespace mongo { mem.lastHeartbeatMsg = msg; theReplSet->rmFromElectable(mem.id()); } + + void up(const BSONObj& info, HeartbeatInfo& mem) { + HeartbeatInfo::numPings++; + mem.authIssue = false; + + if( mem.upSince == 0 ) { + log() << "replSet member " << h.toString() << " is up" << rsLog; + mem.upSince = mem.lastHeartbeat; + } + mem.health = 1.0; + mem.lastHeartbeatMsg = info["hbmsg"].String(); + if( info.hasElement("opTime") ) + mem.opTime = info["opTime"].Date(); + + // see if this member is in the electable set + if( info["e"].eoo() ) { + // for backwards compatibility + const Member *member = theReplSet->findById(mem.id()); + if (member && member->config().potentiallyHot()) { + theReplSet->addToElectable(mem.id()); + } + else { + theReplSet->rmFromElectable(mem.id()); + } + } + // add this server to the electable set if it is within 10 + // seconds of the latest optime we know of + else if( info["e"].trueValue() && + mem.opTime >= theReplSet->lastOpTimeWritten.getSecs() - 10) { + unsigned lastOp = theReplSet->lastOtherOpTime().getSecs(); + if (lastOp > 0 && mem.opTime >= lastOp - 10) { + theReplSet->addToElectable(mem.id()); + } + } + else { + theReplSet->rmFromElectable(mem.id()); + } + + be cfg = info["config"]; + if( cfg.ok() ) { + // received a new config + boost::function<void()> f = + boost::bind(&Manager::msgReceivedNewConfig, theReplSet->mgr, cfg.Obj().copy()); + theReplSet->mgr->send(f); + } + } }; void ReplSetImpl::endOldHealthTasks() { diff --git a/db/repl/manager.cpp b/db/repl/manager.cpp index 3c4c0eb..c91adc3 100644 --- a/db/repl/manager.cpp +++ b/db/repl/manager.cpp @@ -119,6 +119,39 @@ namespace mongo { } } + void Manager::checkAuth() { + int down = 0, authIssue = 0, total = 0; + + for( Member *m = rs->head(); m; m=m->next() ) { + total++; + + // all authIssue servers will also be not up + if (!m->hbinfo().up()) { + down++; + if (m->hbinfo().authIssue) { + authIssue++; + } + } + } + + // if all nodes are down or failed auth AND at least one failed + // auth, go into recovering. If all nodes are down, stay a + // secondary. + if (authIssue > 0 && down == total) { + log() << "replset error could not reach/authenticate against any members" << endl; + + if (rs->box.getPrimary() == rs->_self) { + log() << "auth problems, relinquishing primary" << rsLog; + rs->relinquish(); + } + + rs->blockSync(true); + } + else { + rs->blockSync(false); + } + } + /** called as the health threads get new results */ void Manager::msgCheckNewState() { { @@ -130,7 +163,8 @@ namespace mongo { if( busyWithElectSelf ) return; checkElectableSet(); - + checkAuth(); + const Member *p = rs->box.getPrimary(); if( p && p != rs->_self ) { if( !p->hbinfo().up() || diff --git a/db/repl/rs.cpp b/db/repl/rs.cpp index 1fbbc10..f827291 100644 --- a/db/repl/rs.cpp +++ b/db/repl/rs.cpp @@ -329,6 +329,7 @@ namespace mongo { ReplSetImpl::ReplSetImpl(ReplSetCmdline& replSetCmdline) : elect(this), _currentSyncTarget(0), + _blockSync(false), _hbmsgTime(0), _self(0), _maintenanceMode(0), diff --git a/db/repl/rs.h b/db/repl/rs.h index 61041a6..2b3ea9b 100644 --- a/db/repl/rs.h +++ b/db/repl/rs.h @@ -93,6 +93,7 @@ namespace mongo { void noteARemoteIsPrimary(const Member *); void checkElectableSet(); + void checkAuth(); virtual void starting(); public: Manager(ReplSetImpl *rs); @@ -348,6 +349,9 @@ namespace mongo { const Member* getMemberToSyncTo(); Member* _currentSyncTarget; + bool _blockSync; + void blockSync(bool block); + // set of electable members' _ids set<unsigned> _electableSet; protected: @@ -491,7 +495,7 @@ namespace mongo { void _syncThread(); bool tryToGoLiveAsASecondary(OpTime&); // readlocks void syncTail(); - void syncApply(const BSONObj &o); + bool syncApply(const BSONObj &o); unsigned _syncRollback(OplogReader& r); void syncRollback(OplogReader& r); void syncFixUp(HowToFixUp& h, OplogReader& r); @@ -577,7 +581,7 @@ namespace mongo { * that still need to be checked for auth. */ bool checkAuth(string& errmsg, BSONObjBuilder& result) { - if( !noauth && adminOnly() ) { + if( !noauth ) { AuthenticationInfo *ai = cc().getAuthenticationInfo(); if (!ai->isAuthorizedForLock("admin", locktype())) { errmsg = "replSet command unauthorized"; diff --git a/db/repl/rs_config.h b/db/repl/rs_config.h index f69052a..b22b61e 100644 --- a/db/repl/rs_config.h +++ b/db/repl/rs_config.h @@ -80,6 +80,22 @@ namespace mongo { } } bool operator==(const MemberCfg& r) const { + if (!tags.empty() || !r.tags.empty()) { + if (tags.size() != r.tags.size()) { + return false; + } + + // if they are the same size and not equal, at least one + // element in A must be different in B + for (map<string,string>::const_iterator lit = tags.begin(); lit != tags.end(); lit++) { + map<string,string>::const_iterator rit = r.tags.find((*lit).first); + + if (rit == r.tags.end() || (*lit).second != (*rit).second) { + return false; + } + } + } + return _id==r._id && votes == r.votes && h == r.h && priority == r.priority && arbiterOnly == r.arbiterOnly && slaveDelay == r.slaveDelay && hidden == r.hidden && buildIndexes == buildIndexes; diff --git a/db/repl/rs_initialsync.cpp b/db/repl/rs_initialsync.cpp index 101b03a..112d739 100644 --- a/db/repl/rs_initialsync.cpp +++ b/db/repl/rs_initialsync.cpp @@ -81,6 +81,7 @@ namespace mongo { const Member* ReplSetImpl::getMemberToSyncTo() { Member *closest = 0; + bool buildIndexes = true; // wait for 2N pings before choosing a sync target if (_cfg) { @@ -90,11 +91,15 @@ namespace mongo { OCCASIONALLY log() << "waiting for " << needMorePings << " pings from other members before syncing" << endl; return NULL; } + + buildIndexes = myConfig().buildIndexes; } // find the member with the lowest ping time that has more data than me for (Member *m = _members.head(); m; m = m->next()) { if (m->hbinfo().up() && + // make sure members with buildIndexes sync from other members w/indexes + (!buildIndexes || (buildIndexes && m->config().buildIndexes)) && (m->state() == MemberState::RS_PRIMARY || (m->state() == MemberState::RS_SECONDARY && m->hbinfo().opTime > lastOpTimeWritten)) && (!closest || m->hbinfo().ping < closest->hbinfo().ping)) { diff --git a/db/repl/rs_member.h b/db/repl/rs_member.h index d60bb52..38b6c9b 100644 --- a/db/repl/rs_member.h +++ b/db/repl/rs_member.h @@ -69,7 +69,8 @@ namespace mongo { class HeartbeatInfo { unsigned _id; public: - HeartbeatInfo() : _id(0xffffffff),hbstate(MemberState::RS_UNKNOWN),health(-1.0),downSince(0),skew(INT_MIN) { } + HeartbeatInfo() : _id(0xffffffff), hbstate(MemberState::RS_UNKNOWN), health(-1.0), + downSince(0), skew(INT_MIN), authIssue(false) { } HeartbeatInfo(unsigned id); unsigned id() const { return _id; } MemberState hbstate; @@ -80,6 +81,7 @@ namespace mongo { DiagStr lastHeartbeatMsg; OpTime opTime; int skew; + bool authIssue; unsigned int ping; // milliseconds static unsigned int numPings; @@ -94,7 +96,7 @@ namespace mongo { bool changed(const HeartbeatInfo& old) const; }; - inline HeartbeatInfo::HeartbeatInfo(unsigned id) : _id(id) { + inline HeartbeatInfo::HeartbeatInfo(unsigned id) : _id(id), authIssue(false) { hbstate = MemberState::RS_UNKNOWN; health = -1.0; downSince = 0; diff --git a/db/repl/rs_sync.cpp b/db/repl/rs_sync.cpp index b29328b..8cd3e14 100644 --- a/db/repl/rs_sync.cpp +++ b/db/repl/rs_sync.cpp @@ -32,17 +32,19 @@ namespace mongo { } } - /* apply the log op that is in param o */ - void ReplSetImpl::syncApply(const BSONObj &o) { + /* apply the log op that is in param o + @return bool failedUpdate + */ + bool ReplSetImpl::syncApply(const BSONObj &o) { const char *ns = o.getStringField("ns"); if ( *ns == '.' || *ns == 0 ) { blank(o); - return; + return false; } Client::Context ctx(ns); ctx.getClient()->curop()->reset(); - applyOperation_inlock(o); + return applyOperation_inlock(o); } /* initial oplog application, during initial sync, after cloning. @@ -57,6 +59,7 @@ namespace mongo { const string hn = source->h().toString(); OplogReader r; + try { if( !r.connect(hn) ) { log() << "replSet initial sync error can't connect to " << hn << " to read " << rsoplog << rsLog; @@ -113,12 +116,9 @@ namespace mongo { if( !r.more() ) break; BSONObj o = r.nextSafe(); /* note we might get "not master" at some point */ - { - ts = o["ts"]._opTime(); + ts = o["ts"]._opTime(); - /* if we have become primary, we dont' want to apply things from elsewhere - anymore. assumePrimary is in the db lock so we are safe as long as - we check after we locked above. */ + { if( (source->state() != MemberState::RS_PRIMARY && source->state() != MemberState::RS_SECONDARY) || replSetForceInitialSyncFailure ) { @@ -133,9 +133,12 @@ namespace mongo { throw DBException("primary changed",0); } - if( ts >= applyGTE ) { - // optimes before we started copying need not be applied. - syncApply(o); + if( ts >= applyGTE ) { // optimes before we started copying need not be applied. + bool failedUpdate = syncApply(o); + if( failedUpdate && shouldRetry(o, hn)) { + failedUpdate = syncApply(o); + uassert(15915, "replSet update still fails after adding missing object", !failedUpdate); + } } _logOpObjRS(o); /* with repl sets we write the ops to our oplog too */ } @@ -149,7 +152,11 @@ namespace mongo { start = now; } } - + + if ( ts > minValid ) { + break; + } + getDur().commitIfNeeded(); } catch (DBException& e) { @@ -157,7 +164,7 @@ namespace mongo { if( e.getCode() == 11000 || e.getCode() == 11001 ) { continue; } - + // handle cursor not found (just requery) if( e.getCode() == 13127 ) { r.resetCursor(); @@ -290,7 +297,7 @@ namespace mongo { target = 0; } } - + // no server found if (target == 0) { // if there is no one to sync from @@ -298,7 +305,7 @@ namespace mongo { tryToGoLiveAsASecondary(minvalid); return; } - + r.tailingQueryGTE(rsoplog, lastOpTimeWritten); // if target cut connections between connecting and querying (for // example, because it stepped down) we might not have a cursor @@ -408,7 +415,7 @@ namespace mongo { if( !target->hbinfo().hbstate.readable() ) { break; } - + if( myConfig().slaveDelay != sd ) // reconf break; } @@ -429,7 +436,7 @@ namespace mongo { } syncApply(o); - _logOpObjRS(o); // with repl sets we write the ops to our oplog too + _logOpObjRS(o); // with repl sets we write the ops to our oplog too } catch (DBException& e) { sethbmsg(str::stream() << "syncTail: " << e.toString() << ", syncing: " << o); @@ -444,7 +451,7 @@ namespace mongo { // TODO : reuse our connection to the primary. return; } - + if( !target->hbinfo().hbstate.readable() ) { return; } @@ -458,7 +465,7 @@ namespace mongo { sleepsecs(1); return; } - if( sp.state.fatal() || sp.state.startup() ) { + if( _blockSync || sp.state.fatal() || sp.state.startup() ) { sleepsecs(5); return; } @@ -530,6 +537,15 @@ namespace mongo { replLocalAuth(); } + void ReplSetImpl::blockSync(bool block) { + _blockSync = block; + if (_blockSync) { + // syncing is how we get into SECONDARY state, so we'll be stuck in + // RECOVERING until we unblock + changeState(MemberState::RS_RECOVERING); + } + } + void GhostSync::associateSlave(const BSONObj& id, const int memberId) { const OID rid = id["_id"].OID(); rwlock lk( _lock , true ); @@ -556,10 +572,10 @@ namespace mongo { OCCASIONALLY warning() << "couldn't update slave " << rid << " no entry" << rsLog; return; } - + GhostSlave& slave = i->second; if (!slave.init) { - OCCASIONALLY log() << "couldn't update slave " << rid << " not init" << rsLog; + OCCASIONALLY log() << "couldn't update slave " << rid << " not init" << rsLog; return; } |