diff options
author | Antonin Kral <a.kral@bobek.cz> | 2011-09-14 17:08:06 +0200 |
---|---|---|
committer | Antonin Kral <a.kral@bobek.cz> | 2011-09-14 17:08:06 +0200 |
commit | 5d342a758c6095b4d30aba0750b54f13b8916f51 (patch) | |
tree | 762e9aa84781f5e3b96db2c02d356c29cf0217c0 /client/distlock.cpp | |
parent | cbe2d992e9cd1ea66af9fa91df006106775d3073 (diff) | |
download | mongodb-5d342a758c6095b4d30aba0750b54f13b8916f51.tar.gz |
Imported Upstream version 2.0.0
Diffstat (limited to 'client/distlock.cpp')
-rw-r--r-- | client/distlock.cpp | 921 |
1 files changed, 764 insertions, 157 deletions
diff --git a/client/distlock.cpp b/client/distlock.cpp index 9ec98ea..cb71159 100644 --- a/client/distlock.cpp +++ b/client/distlock.cpp @@ -21,8 +21,7 @@ namespace mongo { - static string lockPingNS = "config.lockpings"; - static string locksNS = "config.locks"; + LabeledLevel DistributedLock::logLvl( 1 ); ThreadLocalValue<string> distLockIds(""); @@ -36,7 +35,7 @@ namespace mongo { static void initModule() { // cache process string stringstream ss; - ss << getHostName() << ":" << time(0) << ":" << rand(); + ss << getHostName() << ":" << cmdLine.port << ":" << time(0) << ":" << rand(); _cachedProcessString = new string( ss.str() ); } @@ -59,116 +58,406 @@ namespace mongo { return s; } - void _distLockPingThread( ConnectionString addr ) { - setThreadName( "LockPinger" ); - - log() << "creating dist lock ping thread for: " << addr << endl; - static int loops = 0; - while( ! inShutdown() ) { + class DistributedLockPinger { + public: - string process = getDistLockProcess(); - log(4) << "dist_lock about to ping for: " << process << endl; + DistributedLockPinger() + : _mutex( "DistributedLockPinger" ) { + } - try { - ScopedDbConnection conn( addr ); - - // refresh the entry corresponding to this process in the lockpings collection - conn->update( lockPingNS , - BSON( "_id" << process ) , - BSON( "$set" << BSON( "ping" << DATENOW ) ) , - true ); - string err = conn->getLastError(); - if ( ! err.empty() ) { - warning() << "dist_lock process: " << process << " pinging: " << addr << " failed: " - << err << endl; - conn.done(); - sleepsecs(30); - continue; - } + void _distLockPingThread( ConnectionString addr, string process, unsigned long long sleepTime ) { + + setThreadName( "LockPinger" ); + + string pingId = pingThreadId( addr, process ); + + log( DistributedLock::logLvl - 1 ) << "creating distributed lock ping thread for " << addr + << " and process " << process + << " (sleeping for " << sleepTime << "ms)" << endl; + + static int loops = 0; + while( ! inShutdown() && ! shouldKill( addr, process ) ) { + + log( DistributedLock::logLvl + 2 ) << "distributed lock pinger '" << pingId << "' about to ping." << endl; + + Date_t pingTime; + + try { + ScopedDbConnection conn( addr ); + + pingTime = jsTime(); - // remove really old entries from the lockpings collection if they're not holding a lock - // (this may happen if an instance of a process was taken down and no new instance came up to - // replace it for a quite a while) - // if the lock is taken, the take-over mechanism should handle the situation - auto_ptr<DBClientCursor> c = conn->query( locksNS , BSONObj() ); - vector<string> pids; - while ( c->more() ) { - BSONObj lock = c->next(); - if ( ! lock["process"].eoo() ) { - pids.push_back( lock["process"].valuestrsafe() ); + // refresh the entry corresponding to this process in the lockpings collection + conn->update( DistributedLock::lockPingNS , + BSON( "_id" << process ) , + BSON( "$set" << BSON( "ping" << pingTime ) ) , + true ); + + string err = conn->getLastError(); + if ( ! err.empty() ) { + warning() << "pinging failed for distributed lock pinger '" << pingId << "'." + << causedBy( err ) << endl; + conn.done(); + + // Sleep for normal ping time + sleepmillis(sleepTime); + continue; + } + + // remove really old entries from the lockpings collection if they're not holding a lock + // (this may happen if an instance of a process was taken down and no new instance came up to + // replace it for a quite a while) + // if the lock is taken, the take-over mechanism should handle the situation + auto_ptr<DBClientCursor> c = conn->query( DistributedLock::locksNS , BSONObj() ); + set<string> pids; + while ( c->more() ) { + BSONObj lock = c->next(); + if ( ! lock["process"].eoo() ) { + pids.insert( lock["process"].valuestrsafe() ); + } + } + + Date_t fourDays = pingTime - ( 4 * 86400 * 1000 ); // 4 days + conn->remove( DistributedLock::lockPingNS , BSON( "_id" << BSON( "$nin" << pids ) << "ping" << LT << fourDays ) ); + err = conn->getLastError(); + if ( ! err.empty() ) { + warning() << "ping cleanup for distributed lock pinger '" << pingId << " failed." + << causedBy( err ) << endl; + conn.done(); + + // Sleep for normal ping time + sleepmillis(sleepTime); + continue; + } + + // create index so remove is fast even with a lot of servers + if ( loops++ == 0 ) { + conn->ensureIndex( DistributedLock::lockPingNS , BSON( "ping" << 1 ) ); + } + + log( DistributedLock::logLvl - ( loops % 10 == 0 ? 1 : 0 ) ) << "cluster " << addr << " pinged successfully at " << pingTime + << " by distributed lock pinger '" << pingId + << "', sleeping for " << sleepTime << "ms" << endl; + + // Remove old locks, if possible + // Make sure no one else is adding to this list at the same time + scoped_lock lk( _mutex ); + + int numOldLocks = _oldLockOIDs.size(); + if( numOldLocks > 0 ) + log( DistributedLock::logLvl - 1 ) << "trying to delete " << _oldLockOIDs.size() << " old lock entries for process " << process << endl; + + bool removed = false; + for( list<OID>::iterator i = _oldLockOIDs.begin(); i != _oldLockOIDs.end(); + i = ( removed ? _oldLockOIDs.erase( i ) : ++i ) ) { + removed = false; + try { + // Got OID from lock with id, so we don't need to specify id again + conn->update( DistributedLock::locksNS , + BSON( "ts" << *i ), + BSON( "$set" << BSON( "state" << 0 ) ) ); + + // Either the update went through or it didn't, either way we're done trying to + // unlock + log( DistributedLock::logLvl - 1 ) << "handled late remove of old distributed lock with ts " << *i << endl; + removed = true; + } + catch( UpdateNotTheSame& ) { + log( DistributedLock::logLvl - 1 ) << "partially removed old distributed lock with ts " << *i << endl; + removed = true; + } + catch ( std::exception& e) { + warning() << "could not remove old distributed lock with ts " << *i + << causedBy( e ) << endl; + } + + } + + if( numOldLocks > 0 && _oldLockOIDs.size() > 0 ){ + log( DistributedLock::logLvl - 1 ) << "not all old lock entries could be removed for process " << process << endl; } - } - Date_t fourDays = jsTime() - ( 4 * 86400 * 1000 ); // 4 days - conn->remove( lockPingNS , BSON( "_id" << BSON( "$nin" << pids ) << "ping" << LT << fourDays ) ); - err = conn->getLastError(); - if ( ! err.empty() ) { - warning() << "dist_lock cleanup request from process: " << process << " to: " << addr - << " failed: " << err << endl; conn.done(); - sleepsecs(30); - continue; - } - // create index so remove is fast even with a lot of servers - if ( loops++ == 0 ) { - conn->ensureIndex( lockPingNS , BSON( "ping" << 1 ) ); + } + catch ( std::exception& e ) { + warning() << "distributed lock pinger '" << pingId << "' detected an exception while pinging." + << causedBy( e ) << endl; } - conn.done(); + sleepmillis(sleepTime); + } + + warning() << "removing distributed lock ping thread '" << pingId << "'" << endl; + + + if( shouldKill( addr, process ) ) + finishKill( addr, process ); + + } + + void distLockPingThread( ConnectionString addr, long long clockSkew, string processId, unsigned long long sleepTime ) { + try { + jsTimeVirtualThreadSkew( clockSkew ); + _distLockPingThread( addr, processId, sleepTime ); } catch ( std::exception& e ) { - warning() << "dist_lock exception during ping: " << e.what() << endl; + error() << "unexpected error while running distributed lock pinger for " << addr << ", process " << processId << causedBy( e ) << endl; } + catch ( ... ) { + error() << "unknown error while running distributed lock pinger for " << addr << ", process " << processId << endl; + } + } - log( loops % 10 == 0 ? 0 : 1) << "dist_lock pinged successfully for: " << process << endl; - sleepsecs(30); + string pingThreadId( const ConnectionString& conn, const string& processId ) { + return conn.toString() + "/" + processId; } - } - void distLockPingThread( ConnectionString addr ) { - try { - _distLockPingThread( addr ); + string got( DistributedLock& lock, unsigned long long sleepTime ) { + + // Make sure we don't start multiple threads for a process id + scoped_lock lk( _mutex ); + + const ConnectionString& conn = lock.getRemoteConnection(); + const string& processId = lock.getProcessId(); + string s = pingThreadId( conn, processId ); + + // Ignore if we already have a pinging thread for this process. + if ( _seen.count( s ) > 0 ) return ""; + + // Check our clock skew + try { + if( lock.isRemoteTimeSkewed() ) { + throw LockException( str::stream() << "clock skew of the cluster " << conn.toString() << " is too far out of bounds to allow distributed locking." , 13650 ); + } + } + catch( LockException& e) { + throw LockException( str::stream() << "error checking clock skew of cluster " << conn.toString() << causedBy( e ) , 13651); + } + + boost::thread t( boost::bind( &DistributedLockPinger::distLockPingThread, this, conn, getJSTimeVirtualThreadSkew(), processId, sleepTime) ); + + _seen.insert( s ); + + return s; } - catch ( std::exception& e ) { - error() << "unexpected error in distLockPingThread: " << e.what() << endl; + + void addUnlockOID( const OID& oid ) { + // Modifying the lock from some other thread + scoped_lock lk( _mutex ); + _oldLockOIDs.push_back( oid ); } - catch ( ... ) { - error() << "unexpected unknown error in distLockPingThread" << endl; + + bool willUnlockOID( const OID& oid ) { + scoped_lock lk( _mutex ); + return find( _oldLockOIDs.begin(), _oldLockOIDs.end(), oid ) != _oldLockOIDs.end(); } - } + void kill( const ConnectionString& conn, const string& processId ) { + // Make sure we're in a consistent state before other threads can see us + scoped_lock lk( _mutex ); - class DistributedLockPinger { - public: - DistributedLockPinger() - : _mutex( "DistributedLockPinger" ) { + string pingId = pingThreadId( conn, processId ); + + assert( _seen.count( pingId ) > 0 ); + _kill.insert( pingId ); + + } + + bool shouldKill( const ConnectionString& conn, const string& processId ) { + return _kill.count( pingThreadId( conn, processId ) ) > 0; } - void got( const ConnectionString& conn ) { - string s = conn.toString(); + void finishKill( const ConnectionString& conn, const string& processId ) { + // Make sure we're in a consistent state before other threads can see us scoped_lock lk( _mutex ); - if ( _seen.count( s ) > 0 ) - return; - boost::thread t( boost::bind( &distLockPingThread , conn ) ); - _seen.insert( s ); + + string pingId = pingThreadId( conn, processId ); + + _kill.erase( pingId ); + _seen.erase( pingId ); + } + set<string> _kill; set<string> _seen; mongo::mutex _mutex; + list<OID> _oldLockOIDs; } distLockPinger; - DistributedLock::DistributedLock( const ConnectionString& conn , const string& name , unsigned takeoverMinutes ) - : _conn(conn),_name(name),_takeoverMinutes(takeoverMinutes) { - _id = BSON( "_id" << name ); - _ns = "config.locks"; - distLockPinger.got( conn ); + + const string DistributedLock::lockPingNS = "config.lockpings"; + const string DistributedLock::locksNS = "config.locks"; + + /** + * Create a new distributed lock, potentially with a custom sleep and takeover time. If a custom sleep time is + * specified (time between pings) + */ + DistributedLock::DistributedLock( const ConnectionString& conn , const string& name , unsigned long long lockTimeout, bool asProcess ) + : _conn(conn) , _name(name) , _id( BSON( "_id" << name ) ), _processId( asProcess ? getDistLockId() : getDistLockProcess() ), + _lockTimeout( lockTimeout == 0 ? LOCK_TIMEOUT : lockTimeout ), _maxClockSkew( _lockTimeout / LOCK_SKEW_FACTOR ), _maxNetSkew( _maxClockSkew ), _lockPing( _maxClockSkew ), + _mutex( "DistributedLock" ) + { + log( logLvl - 1 ) << "created new distributed lock for " << name << " on " << conn + << " ( lock timeout : " << _lockTimeout + << ", ping interval : " << _lockPing << ", process : " << asProcess << " )" << endl; + } + + Date_t DistributedLock::getRemoteTime() { + return DistributedLock::remoteTime( _conn, _maxNetSkew ); + } + + bool DistributedLock::isRemoteTimeSkewed() { + return !DistributedLock::checkSkew( _conn, NUM_LOCK_SKEW_CHECKS, _maxClockSkew, _maxNetSkew ); + } + + const ConnectionString& DistributedLock::getRemoteConnection() { + return _conn; + } + + const string& DistributedLock::getProcessId() { + return _processId; + } + + /** + * Returns the remote time as reported by the cluster or server. The maximum difference between the reported time + * and the actual time on the remote server (at the completion of the function) is the maxNetSkew + */ + Date_t DistributedLock::remoteTime( const ConnectionString& cluster, unsigned long long maxNetSkew ) { + + ConnectionString server( *cluster.getServers().begin() ); + ScopedDbConnection conn( server ); + + BSONObj result; + long long delay; + + try { + Date_t then = jsTime(); + bool success = conn->runCommand( string("admin"), BSON( "serverStatus" << 1 ), result ); + delay = jsTime() - then; + + if( !success ) + throw TimeNotFoundException( str::stream() << "could not get status from server " + << server.toString() << " in cluster " << cluster.toString() + << " to check time", 13647 ); + + // Make sure that our delay is not more than 2x our maximum network skew, since this is the max our remote + // time value can be off by if we assume a response in the middle of the delay. + if( delay > (long long) (maxNetSkew * 2) ) + throw TimeNotFoundException( str::stream() << "server " << server.toString() + << " in cluster " << cluster.toString() + << " did not respond within max network delay of " + << maxNetSkew << "ms", 13648 ); + } + catch(...) { + conn.done(); + throw; + } + + conn.done(); + + return result["localTime"].Date() - (delay / 2); + + } + + bool DistributedLock::checkSkew( const ConnectionString& cluster, unsigned skewChecks, unsigned long long maxClockSkew, unsigned long long maxNetSkew ) { + + vector<HostAndPort> servers = cluster.getServers(); + + if(servers.size() < 1) return true; + + vector<long long> avgSkews; + + for(unsigned i = 0; i < skewChecks; i++) { + + // Find the average skew for each server + unsigned s = 0; + for(vector<HostAndPort>::iterator si = servers.begin(); si != servers.end(); ++si,s++) { + + if(i == 0) avgSkews.push_back(0); + + // Could check if this is self, but shouldn't matter since local network connection should be fast. + ConnectionString server( *si ); + + vector<long long> skew; + + BSONObj result; + + Date_t remote = remoteTime( server, maxNetSkew ); + Date_t local = jsTime(); + + // Remote time can be delayed by at most MAX_NET_SKEW + + // Skew is how much time we'd have to add to local to get to remote + avgSkews[s] += (long long) (remote - local); + + log( logLvl + 1 ) << "skew from remote server " << server << " found: " << (long long) (remote - local) << endl; + + } + } + + // Analyze skews + + long long serverMaxSkew = 0; + long long serverMinSkew = 0; + + for(unsigned s = 0; s < avgSkews.size(); s++) { + + long long avgSkew = (avgSkews[s] /= skewChecks); + + // Keep track of max and min skews + if(s == 0) { + serverMaxSkew = avgSkew; + serverMinSkew = avgSkew; + } + else { + if(avgSkew > serverMaxSkew) + serverMaxSkew = avgSkew; + if(avgSkew < serverMinSkew) + serverMinSkew = avgSkew; + } + + } + + long long totalSkew = serverMaxSkew - serverMinSkew; + + // Make sure our max skew is not more than our pre-set limit + if(totalSkew > (long long) maxClockSkew) { + log( logLvl + 1 ) << "total clock skew of " << totalSkew << "ms for servers " << cluster << " is out of " << maxClockSkew << "ms bounds." << endl; + return false; + } + + log( logLvl + 1 ) << "total clock skew of " << totalSkew << "ms for servers " << cluster << " is in " << maxClockSkew << "ms bounds." << endl; + return true; + } + + // For use in testing, ping thread should run indefinitely in practice. + bool DistributedLock::killPinger( DistributedLock& lock ) { + if( lock._threadId == "") return false; + + distLockPinger.kill( lock._conn, lock._processId ); + return true; } + // Semantics of this method are basically that if the lock cannot be acquired, returns false, can be retried. + // If the lock should not be tried again (some unexpected error) a LockException is thrown. + // If we are only trying to re-enter a currently held lock, reenter should be true. + // Note: reenter doesn't actually make this lock re-entrant in the normal sense, since it can still only + // be unlocked once, instead it is used to verify that the lock is already held. + bool DistributedLock::lock_try( const string& why , bool reenter, BSONObj * other ) { + + // TODO: Start pinging only when we actually get the lock? + // If we don't have a thread pinger, make sure we shouldn't have one + if( _threadId == "" ){ + scoped_lock lk( _mutex ); + _threadId = distLockPinger.got( *this, _lockPing ); + } + + // This should always be true, if not, we are using the lock incorrectly. + assert( _name != "" ); - bool DistributedLock::lock_try( string why , BSONObj * other ) { // write to dummy if 'other' is null BSONObj dummyOther; if ( other == NULL ) @@ -182,93 +471,240 @@ namespace mongo { { // make sure its there so we can use simple update logic below - BSONObj o = conn->findOne( _ns , _id ).getOwned(); + BSONObj o = conn->findOne( locksNS , _id ).getOwned(); + + // Case 1: No locks if ( o.isEmpty() ) { try { - log(4) << "dist_lock inserting initial doc in " << _ns << " for lock " << _name << endl; - conn->insert( _ns , BSON( "_id" << _name << "state" << 0 << "who" << "" ) ); + log( logLvl ) << "inserting initial doc in " << locksNS << " for lock " << _name << endl; + conn->insert( locksNS , BSON( "_id" << _name << "state" << 0 << "who" << "" ) ); } catch ( UserException& e ) { - log() << "dist_lock could not insert initial doc: " << e << endl; + warning() << "could not insert initial doc for distributed lock " << _name << causedBy( e ) << endl; } } - + + // Case 2: A set lock that we might be able to force else if ( o["state"].numberInt() > 0 ) { + + string lockName = o["_id"].String() + string("/") + o["process"].String(); + + bool canReenter = reenter && o["process"].String() == _processId && ! distLockPinger.willUnlockOID( o["ts"].OID() ) && o["state"].numberInt() == 2; + if( reenter && ! canReenter ) { + log( logLvl - 1 ) << "not re-entering distributed lock " << lockName; + if( o["process"].String() != _processId ) log( logLvl - 1 ) << ", different process " << _processId << endl; + else if( o["state"].numberInt() == 2 ) log( logLvl - 1 ) << ", state not finalized" << endl; + else log( logLvl - 1 ) << ", ts " << o["ts"].OID() << " scheduled for late unlock" << endl; + + // reset since we've been bounced by a previous lock not being where we thought it was, + // and should go through full forcing process if required. + // (in theory we should never see a ping here if used correctly) + *other = o; other->getOwned(); conn.done(); resetLastPing(); + return false; + } + BSONObj lastPing = conn->findOne( lockPingNS , o["process"].wrap( "_id" ) ); if ( lastPing.isEmpty() ) { - // if a lock is taken but there's no ping for it, we're in an inconsistent situation - // if the lock holder (mongos or d) does not exist anymore, the lock could safely be removed - // but we'd require analysis of the situation before a manual intervention - error() << "config.locks: " << _name << " lock is taken by old process? " - << "remove the following lock if the process is not active anymore: " << o << endl; - *other = o; - conn.done(); - return false; + log( logLvl ) << "empty ping found for process in lock '" << lockName << "'" << endl; + // TODO: Using 0 as a "no time found" value Will fail if dates roll over, but then, so will a lot. + lastPing = BSON( "_id" << o["process"].String() << "ping" << (Date_t) 0 ); } - unsigned long long now = jsTime(); - unsigned long long pingTime = lastPing["ping"].Date(); - - if ( now < pingTime ) { - // clock skew - warning() << "dist_lock has detected clock skew of " << ( pingTime - now ) << "ms" << endl; - *other = o; - conn.done(); - return false; + unsigned long long elapsed = 0; + unsigned long long takeover = _lockTimeout; + + log( logLvl ) << "checking last ping for lock '" << lockName << "'" << " against process " << _lastPingCheck.get<0>() << " and ping " << _lastPingCheck.get<1>() << endl; + + try { + + Date_t remote = remoteTime( _conn ); + + // Timeout the elapsed time using comparisons of remote clock + // For non-finalized locks, timeout 15 minutes since last seen (ts) + // For finalized locks, timeout 15 minutes since last ping + bool recPingChange = o["state"].numberInt() == 2 && ( _lastPingCheck.get<0>() != lastPing["_id"].String() || _lastPingCheck.get<1>() != lastPing["ping"].Date() ); + bool recTSChange = _lastPingCheck.get<3>() != o["ts"].OID(); + + if( recPingChange || recTSChange ) { + // If the ping has changed since we last checked, mark the current date and time + scoped_lock lk( _mutex ); + _lastPingCheck = boost::tuple<string, Date_t, Date_t, OID>( lastPing["_id"].String().c_str(), lastPing["ping"].Date(), remote, o["ts"].OID() ); + } + else { + + // GOTCHA! Due to network issues, it is possible that the current time + // is less than the remote time. We *have* to check this here, otherwise + // we overflow and our lock breaks. + if(_lastPingCheck.get<2>() >= remote) + elapsed = 0; + else + elapsed = remote - _lastPingCheck.get<2>(); + } + } - - unsigned long long elapsed = now - pingTime; - elapsed = elapsed / ( 1000 * 60 ); // convert to minutes - - if ( elapsed > ( 60 * 24 * 365 * 100 ) /* 100 years */ ) { - warning() << "distlock elapsed time seems impossible: " << lastPing << endl; + catch( LockException& e ) { + + // Remote server cannot be found / is not responsive + warning() << "Could not get remote time from " << _conn << causedBy( e ); + // If our config server is having issues, forget all the pings until we can see it again + resetLastPing(); + } - - if ( elapsed <= _takeoverMinutes ) { - log(1) << "dist_lock lock failed because taken by: " << o << " elapsed minutes: " << elapsed << endl; - *other = o; - conn.done(); + + if ( elapsed <= takeover && ! canReenter ) { + log( logLvl ) << "could not force lock '" << lockName << "' because elapsed time " << elapsed << " <= takeover time " << takeover << endl; + *other = o; other->getOwned(); conn.done(); return false; } - - log() << "dist_lock forcefully taking over from: " << o << " elapsed minutes: " << elapsed << endl; - conn->update( _ns , _id , BSON( "$set" << BSON( "state" << 0 ) ) ); - string err = conn->getLastError(); - if ( ! err.empty() ) { - warning() << "dist_lock take over from: " << o << " failed: " << err << endl; - *other = o.getOwned(); - other->getOwned(); - conn.done(); + else if( elapsed > takeover && canReenter ) { + log( logLvl - 1 ) << "not re-entering distributed lock " << lockName << "' because elapsed time " << elapsed << " > takeover time " << takeover << endl; + *other = o; other->getOwned(); conn.done(); return false; } + log( logLvl - 1 ) << ( canReenter ? "re-entering" : "forcing" ) << " lock '" << lockName << "' because " + << ( canReenter ? "re-entering is allowed, " : "" ) + << "elapsed time " << elapsed << " > takeover time " << takeover << endl; + + if( elapsed > takeover ) { + + // Lock may forced, reset our timer if succeeds or fails + // Ensures that another timeout must happen if something borks up here, and resets our pristine + // ping state if acquired. + resetLastPing(); + + try { + + // Check the clock skew again. If we check this before we get a lock + // and after the lock times out, we can be pretty sure the time is + // increasing at the same rate on all servers and therefore our + // timeout is accurate + uassert( 14023, str::stream() << "remote time in cluster " << _conn.toString() << " is now skewed, cannot force lock.", !isRemoteTimeSkewed() ); + + // Make sure we break the lock with the correct "ts" (OID) value, otherwise + // we can overwrite a new lock inserted in the meantime. + conn->update( locksNS , BSON( "_id" << _id["_id"].String() << "state" << o["state"].numberInt() << "ts" << o["ts"] ), + BSON( "$set" << BSON( "state" << 0 ) ) ); + + BSONObj err = conn->getLastErrorDetailed(); + string errMsg = DBClientWithCommands::getLastErrorString(err); + + // TODO: Clean up all the extra code to exit this method, probably with a refactor + if ( !errMsg.empty() || !err["n"].type() || err["n"].numberInt() < 1 ) { + ( errMsg.empty() ? log( logLvl - 1 ) : warning() ) << "Could not force lock '" << lockName << "' " + << ( !errMsg.empty() ? causedBy(errMsg) : string("(another force won)") ) << endl; + *other = o; other->getOwned(); conn.done(); + return false; + } + + } + catch( UpdateNotTheSame& ) { + // Ok to continue since we know we forced at least one lock document, and all lock docs + // are required for a lock to be held. + warning() << "lock forcing " << lockName << " inconsistent" << endl; + } + catch( std::exception& e ) { + conn.done(); + throw LockException( str::stream() << "exception forcing distributed lock " + << lockName << causedBy( e ), 13660); + } + + } + else { + + assert( canReenter ); + + // Lock may be re-entered, reset our timer if succeeds or fails + // Not strictly necessary, but helpful for small timeouts where thread scheduling is significant. + // This ensures that two attempts are still required for a force if not acquired, and resets our + // state if we are acquired. + resetLastPing(); + + // Test that the lock is held by trying to update the finalized state of the lock to the same state + // if it does not update or does not update on all servers, we can't re-enter. + try { + + // Test the lock with the correct "ts" (OID) value + conn->update( locksNS , BSON( "_id" << _id["_id"].String() << "state" << 2 << "ts" << o["ts"] ), + BSON( "$set" << BSON( "state" << 2 ) ) ); + + BSONObj err = conn->getLastErrorDetailed(); + string errMsg = DBClientWithCommands::getLastErrorString(err); + + // TODO: Clean up all the extra code to exit this method, probably with a refactor + if ( ! errMsg.empty() || ! err["n"].type() || err["n"].numberInt() < 1 ) { + ( errMsg.empty() ? log( logLvl - 1 ) : warning() ) << "Could not re-enter lock '" << lockName << "' " + << ( !errMsg.empty() ? causedBy(errMsg) : string("(not sure lock is held)") ) + << " gle: " << err + << endl; + *other = o; other->getOwned(); conn.done(); + return false; + } + + } + catch( UpdateNotTheSame& ) { + // NOT ok to continue since our lock isn't held by all servers, so isn't valid. + warning() << "inconsistent state re-entering lock, lock " << lockName << " not held" << endl; + *other = o; other->getOwned(); conn.done(); + return false; + } + catch( std::exception& e ) { + conn.done(); + throw LockException( str::stream() << "exception re-entering distributed lock " + << lockName << causedBy( e ), 13660); + } + + log( logLvl - 1 ) << "re-entered distributed lock '" << lockName << "'" << endl; + *other = o; other->getOwned(); conn.done(); + return true; + + } + + log( logLvl - 1 ) << "lock '" << lockName << "' successfully forced" << endl; + + // We don't need the ts value in the query, since we will only ever replace locks with state=0. } + // Case 3: We have an expired lock else if ( o["ts"].type() ) { queryBuilder.append( o["ts"] ); } } - OID ts; - ts.init(); + // Always reset our ping if we're trying to get a lock, since getting a lock implies the lock state is open + // and no locks need to be forced. If anything goes wrong, we don't want to remember an old lock. + resetLastPing(); bool gotLock = false; - BSONObj now; + BSONObj currLock; - BSONObj lockDetails = BSON( "state" << 1 << "who" << getDistLockId() << "process" << getDistLockProcess() << - "when" << DATENOW << "why" << why << "ts" << ts ); + BSONObj lockDetails = BSON( "state" << 1 << "who" << getDistLockId() << "process" << _processId << + "when" << jsTime() << "why" << why << "ts" << OID::gen() ); BSONObj whatIWant = BSON( "$set" << lockDetails ); + + BSONObj query = queryBuilder.obj(); + + string lockName = _name + string("/") + _processId; + try { - log(4) << "dist_lock about to aquire lock: " << lockDetails << endl; - conn->update( _ns , queryBuilder.obj() , whatIWant ); + // Main codepath to acquire lock + + log( logLvl ) << "about to acquire distributed lock '" << lockName << ":\n" + << lockDetails.jsonString(Strict, true) << "\n" + << query.jsonString(Strict, true) << endl; + + conn->update( locksNS , query , whatIWant ); + + BSONObj err = conn->getLastErrorDetailed(); + string errMsg = DBClientWithCommands::getLastErrorString(err); - BSONObj o = conn->getLastErrorDetailed(); - now = conn->findOne( _ns , _id ); + currLock = conn->findOne( locksNS , _id ); - if ( o["n"].numberInt() == 0 ) { - *other = now; + if ( !errMsg.empty() || !err["n"].type() || err["n"].numberInt() < 1 ) { + ( errMsg.empty() ? log( logLvl - 1 ) : warning() ) << "could not acquire lock '" << lockName << "' " + << ( !errMsg.empty() ? causedBy( errMsg ) : string("(another update won)") ) << endl; + *other = currLock; other->getOwned(); - log() << "dist_lock error trying to aquire lock: " << lockDetails << " error: " << o << endl; gotLock = false; } else { @@ -277,63 +713,234 @@ namespace mongo { } catch ( UpdateNotTheSame& up ) { + // this means our update got through on some, but not others - log(4) << "dist_lock lock did not propagate properly" << endl; + warning() << "distributed lock '" << lockName << " did not propagate properly." << causedBy( up ) << endl; + + // Overall protection derives from: + // All unlocking updates use the ts value when setting state to 0 + // This ensures that during locking, we can override all smaller ts locks with + // our own safe ts value and not be unlocked afterward. + for ( unsigned i = 0; i < up.size(); i++ ) { + + ScopedDbConnection indDB( up[i].first ); + BSONObj indUpdate; + + try { + + indUpdate = indDB->findOne( locksNS , _id ); + + // If we override this lock in any way, grab and protect it. + // We assume/ensure that if a process does not have all lock documents, it is no longer + // holding the lock. + // Note - finalized locks may compete too, but we know they've won already if competing + // in this round. Cleanup of crashes during finalizing may take a few tries. + if( indUpdate["ts"] < lockDetails["ts"] || indUpdate["state"].numberInt() == 0 ) { + + BSONObj grabQuery = BSON( "_id" << _id["_id"].String() << "ts" << indUpdate["ts"].OID() ); + + // Change ts so we won't be forced, state so we won't be relocked + BSONObj grabChanges = BSON( "ts" << lockDetails["ts"].OID() << "state" << 1 ); + + // Either our update will succeed, and we'll grab the lock, or it will fail b/c some other + // process grabbed the lock (which will change the ts), but the lock will be set until forcing + indDB->update( locksNS, grabQuery, BSON( "$set" << grabChanges ) ); + + indUpdate = indDB->findOne( locksNS, _id ); + + // Our lock should now be set until forcing. + assert( indUpdate["state"].numberInt() == 1 ); + + } + // else our lock is the same, in which case we're safe, or it's a bigger lock, + // in which case we won't need to protect anything since we won't have the lock. + + } + catch( std::exception& e ) { + conn.done(); + throw LockException( str::stream() << "distributed lock " << lockName + << " had errors communicating with individual server " + << up[1].first << causedBy( e ), 13661 ); + } - for ( unsigned i=0; i<up.size(); i++ ) { - ScopedDbConnection temp( up[i].first ); - BSONObj temp2 = temp->findOne( _ns , _id ); + assert( !indUpdate.isEmpty() ); - if ( now.isEmpty() || now["ts"] < temp2["ts"] ) { - now = temp2.getOwned(); + // Find max TS value + if ( currLock.isEmpty() || currLock["ts"] < indUpdate["ts"] ) { + currLock = indUpdate.getOwned(); } - temp.done(); + indDB.done(); + } - if ( now["ts"].OID() == ts ) { - log(4) << "dist_lock completed lock propagation" << endl; + // Locks on all servers are now set and safe until forcing + + if ( currLock["ts"] == lockDetails["ts"] ) { + log( logLvl - 1 ) << "lock update won, completing lock propagation for '" << lockName << "'" << endl; gotLock = true; - conn->update( _ns , _id , whatIWant ); } else { - log() << "dist_lock error trying to complete propagation" << endl; + log( logLvl - 1 ) << "lock update lost, lock '" << lockName << "' not propagated." << endl; + + // Register the lock for deletion, to speed up failover + // Not strictly necessary, but helpful + distLockPinger.addUnlockOID( lockDetails["ts"].OID() ); + gotLock = false; } } + catch( std::exception& e ) { + conn.done(); + throw LockException( str::stream() << "exception creating distributed lock " + << lockName << causedBy( e ), 13663 ); + } - conn.done(); + // Complete lock propagation + if( gotLock ) { + + // This is now safe, since we know that no new locks will be placed on top of the ones we've checked for at + // least 15 minutes. Sets the state = 2, so that future clients can determine that the lock is truly set. + // The invariant for rollbacks is that we will never force locks with state = 2 and active pings, since that + // indicates the lock is active, but this means the process creating/destroying them must explicitly poll + // when something goes wrong. + try { + + BSONObjBuilder finalLockDetails; + BSONObjIterator bi( lockDetails ); + while( bi.more() ) { + BSONElement el = bi.next(); + if( (string) ( el.fieldName() ) == "state" ) + finalLockDetails.append( "state", 2 ); + else finalLockDetails.append( el ); + } + + conn->update( locksNS , _id , BSON( "$set" << finalLockDetails.obj() ) ); + + BSONObj err = conn->getLastErrorDetailed(); + string errMsg = DBClientWithCommands::getLastErrorString(err); + + currLock = conn->findOne( locksNS , _id ); - log(2) << "dist_lock lock gotLock: " << gotLock << " now: " << now << endl; + if ( !errMsg.empty() || !err["n"].type() || err["n"].numberInt() < 1 ) { + warning() << "could not finalize winning lock " << lockName + << ( !errMsg.empty() ? causedBy( errMsg ) : " (did not update lock) " ) << endl; + gotLock = false; + } + else { + // SUCCESS! + gotLock = true; + } + + } + catch( std::exception& e ) { + conn.done(); + + // Register the bad final lock for deletion, in case it exists + distLockPinger.addUnlockOID( lockDetails["ts"].OID() ); + + throw LockException( str::stream() << "exception finalizing winning lock" + << causedBy( e ), 13662 ); + } + + } + + *other = currLock; + other->getOwned(); + + // Log our lock results + if(gotLock) + log( logLvl - 1 ) << "distributed lock '" << lockName << "' acquired, ts : " << currLock["ts"].OID() << endl; + else + log( logLvl - 1 ) << "distributed lock '" << lockName << "' was not acquired." << endl; + + conn.done(); return gotLock; } - void DistributedLock::unlock() { + // Unlock now takes an optional pointer to the lock, so you can be specific about which + // particular lock you want to unlock. This is required when the config server is down, + // and so cannot tell you what lock ts you should try later. + void DistributedLock::unlock( BSONObj* oldLockPtr ) { + + assert( _name != "" ); + + string lockName = _name + string("/") + _processId; + const int maxAttempts = 3; int attempted = 0; + + BSONObj oldLock; + if( oldLockPtr ) oldLock = *oldLockPtr; + while ( ++attempted <= maxAttempts ) { + ScopedDbConnection conn( _conn ); + try { - ScopedDbConnection conn( _conn ); - conn->update( _ns , _id, BSON( "$set" << BSON( "state" << 0 ) ) ); - log(2) << "dist_lock unlock: " << conn->findOne( _ns , _id ) << endl; - conn.done(); - return; + if( oldLock.isEmpty() ) + oldLock = conn->findOne( locksNS, _id ); + + if( oldLock["state"].eoo() || oldLock["state"].numberInt() != 2 || oldLock["ts"].eoo() ) { + warning() << "cannot unlock invalid distributed lock " << oldLock << endl; + conn.done(); + break; + } + // Use ts when updating lock, so that new locks can be sure they won't get trampled. + conn->update( locksNS , + BSON( "_id" << _id["_id"].String() << "ts" << oldLock["ts"].OID() ), + BSON( "$set" << BSON( "state" << 0 ) ) ); + // Check that the lock was actually unlocked... if not, try again + BSONObj err = conn->getLastErrorDetailed(); + string errMsg = DBClientWithCommands::getLastErrorString(err); + + if ( !errMsg.empty() || !err["n"].type() || err["n"].numberInt() < 1 ){ + warning() << "distributed lock unlock update failed, retrying " + << ( errMsg.empty() ? causedBy( "( update not registered )" ) : causedBy( errMsg ) ) << endl; + conn.done(); + continue; + } + + log( logLvl - 1 ) << "distributed lock '" << lockName << "' unlocked. " << endl; + conn.done(); + return; + } + catch( UpdateNotTheSame& ) { + log( logLvl - 1 ) << "distributed lock '" << lockName << "' unlocked (messily). " << endl; + conn.done(); + break; } catch ( std::exception& e) { - log( LL_WARNING ) << "dist_lock " << _name << " failed to contact config server in unlock attempt " - << attempted << ": " << e.what() << endl; + warning() << "distributed lock '" << lockName << "' failed unlock attempt." + << causedBy( e ) << endl; - sleepsecs(1 << attempted); + conn.done(); + // TODO: If our lock timeout is small, sleeping this long may be unsafe. + if( attempted != maxAttempts) sleepsecs(1 << attempted); } } - log( LL_WARNING ) << "dist_lock couldn't consumate unlock request. " << "Lock " << _name - << " will be taken over after " << _takeoverMinutes << " minutes timeout" << endl; + if( attempted > maxAttempts && ! oldLock.isEmpty() && ! oldLock["ts"].eoo() ) { + + log( logLvl - 1 ) << "could not unlock distributed lock with ts " << oldLock["ts"].OID() + << ", will attempt again later" << endl; + + // We couldn't unlock the lock at all, so try again later in the pinging thread... + distLockPinger.addUnlockOID( oldLock["ts"].OID() ); + } + else if( attempted > maxAttempts ) { + warning() << "could not unlock untracked distributed lock, a manual force may be required" << endl; + } + + warning() << "distributed lock '" << lockName << "' couldn't consummate unlock request. " + << "lock may be taken over after " << ( _lockTimeout / (60 * 1000) ) + << " minutes timeout." << endl; } + + } |