summaryrefslogtreecommitdiff
path: root/client/distlock.cpp
diff options
context:
space:
mode:
authorAntonin Kral <a.kral@bobek.cz>2011-09-14 17:08:06 +0200
committerAntonin Kral <a.kral@bobek.cz>2011-09-14 17:08:06 +0200
commit5d342a758c6095b4d30aba0750b54f13b8916f51 (patch)
tree762e9aa84781f5e3b96db2c02d356c29cf0217c0 /client/distlock.cpp
parentcbe2d992e9cd1ea66af9fa91df006106775d3073 (diff)
downloadmongodb-5d342a758c6095b4d30aba0750b54f13b8916f51.tar.gz
Imported Upstream version 2.0.0
Diffstat (limited to 'client/distlock.cpp')
-rw-r--r--client/distlock.cpp921
1 files changed, 764 insertions, 157 deletions
diff --git a/client/distlock.cpp b/client/distlock.cpp
index 9ec98ea..cb71159 100644
--- a/client/distlock.cpp
+++ b/client/distlock.cpp
@@ -21,8 +21,7 @@
namespace mongo {
- static string lockPingNS = "config.lockpings";
- static string locksNS = "config.locks";
+ LabeledLevel DistributedLock::logLvl( 1 );
ThreadLocalValue<string> distLockIds("");
@@ -36,7 +35,7 @@ namespace mongo {
static void initModule() {
// cache process string
stringstream ss;
- ss << getHostName() << ":" << time(0) << ":" << rand();
+ ss << getHostName() << ":" << cmdLine.port << ":" << time(0) << ":" << rand();
_cachedProcessString = new string( ss.str() );
}
@@ -59,116 +58,406 @@ namespace mongo {
return s;
}
- void _distLockPingThread( ConnectionString addr ) {
- setThreadName( "LockPinger" );
-
- log() << "creating dist lock ping thread for: " << addr << endl;
- static int loops = 0;
- while( ! inShutdown() ) {
+ class DistributedLockPinger {
+ public:
- string process = getDistLockProcess();
- log(4) << "dist_lock about to ping for: " << process << endl;
+ DistributedLockPinger()
+ : _mutex( "DistributedLockPinger" ) {
+ }
- try {
- ScopedDbConnection conn( addr );
-
- // refresh the entry corresponding to this process in the lockpings collection
- conn->update( lockPingNS ,
- BSON( "_id" << process ) ,
- BSON( "$set" << BSON( "ping" << DATENOW ) ) ,
- true );
- string err = conn->getLastError();
- if ( ! err.empty() ) {
- warning() << "dist_lock process: " << process << " pinging: " << addr << " failed: "
- << err << endl;
- conn.done();
- sleepsecs(30);
- continue;
- }
+ void _distLockPingThread( ConnectionString addr, string process, unsigned long long sleepTime ) {
+
+ setThreadName( "LockPinger" );
+
+ string pingId = pingThreadId( addr, process );
+
+ log( DistributedLock::logLvl - 1 ) << "creating distributed lock ping thread for " << addr
+ << " and process " << process
+ << " (sleeping for " << sleepTime << "ms)" << endl;
+
+ static int loops = 0;
+ while( ! inShutdown() && ! shouldKill( addr, process ) ) {
+
+ log( DistributedLock::logLvl + 2 ) << "distributed lock pinger '" << pingId << "' about to ping." << endl;
+
+ Date_t pingTime;
+
+ try {
+ ScopedDbConnection conn( addr );
+
+ pingTime = jsTime();
- // remove really old entries from the lockpings collection if they're not holding a lock
- // (this may happen if an instance of a process was taken down and no new instance came up to
- // replace it for a quite a while)
- // if the lock is taken, the take-over mechanism should handle the situation
- auto_ptr<DBClientCursor> c = conn->query( locksNS , BSONObj() );
- vector<string> pids;
- while ( c->more() ) {
- BSONObj lock = c->next();
- if ( ! lock["process"].eoo() ) {
- pids.push_back( lock["process"].valuestrsafe() );
+ // refresh the entry corresponding to this process in the lockpings collection
+ conn->update( DistributedLock::lockPingNS ,
+ BSON( "_id" << process ) ,
+ BSON( "$set" << BSON( "ping" << pingTime ) ) ,
+ true );
+
+ string err = conn->getLastError();
+ if ( ! err.empty() ) {
+ warning() << "pinging failed for distributed lock pinger '" << pingId << "'."
+ << causedBy( err ) << endl;
+ conn.done();
+
+ // Sleep for normal ping time
+ sleepmillis(sleepTime);
+ continue;
+ }
+
+ // remove really old entries from the lockpings collection if they're not holding a lock
+ // (this may happen if an instance of a process was taken down and no new instance came up to
+ // replace it for a quite a while)
+ // if the lock is taken, the take-over mechanism should handle the situation
+ auto_ptr<DBClientCursor> c = conn->query( DistributedLock::locksNS , BSONObj() );
+ set<string> pids;
+ while ( c->more() ) {
+ BSONObj lock = c->next();
+ if ( ! lock["process"].eoo() ) {
+ pids.insert( lock["process"].valuestrsafe() );
+ }
+ }
+
+ Date_t fourDays = pingTime - ( 4 * 86400 * 1000 ); // 4 days
+ conn->remove( DistributedLock::lockPingNS , BSON( "_id" << BSON( "$nin" << pids ) << "ping" << LT << fourDays ) );
+ err = conn->getLastError();
+ if ( ! err.empty() ) {
+ warning() << "ping cleanup for distributed lock pinger '" << pingId << " failed."
+ << causedBy( err ) << endl;
+ conn.done();
+
+ // Sleep for normal ping time
+ sleepmillis(sleepTime);
+ continue;
+ }
+
+ // create index so remove is fast even with a lot of servers
+ if ( loops++ == 0 ) {
+ conn->ensureIndex( DistributedLock::lockPingNS , BSON( "ping" << 1 ) );
+ }
+
+ log( DistributedLock::logLvl - ( loops % 10 == 0 ? 1 : 0 ) ) << "cluster " << addr << " pinged successfully at " << pingTime
+ << " by distributed lock pinger '" << pingId
+ << "', sleeping for " << sleepTime << "ms" << endl;
+
+ // Remove old locks, if possible
+ // Make sure no one else is adding to this list at the same time
+ scoped_lock lk( _mutex );
+
+ int numOldLocks = _oldLockOIDs.size();
+ if( numOldLocks > 0 )
+ log( DistributedLock::logLvl - 1 ) << "trying to delete " << _oldLockOIDs.size() << " old lock entries for process " << process << endl;
+
+ bool removed = false;
+ for( list<OID>::iterator i = _oldLockOIDs.begin(); i != _oldLockOIDs.end();
+ i = ( removed ? _oldLockOIDs.erase( i ) : ++i ) ) {
+ removed = false;
+ try {
+ // Got OID from lock with id, so we don't need to specify id again
+ conn->update( DistributedLock::locksNS ,
+ BSON( "ts" << *i ),
+ BSON( "$set" << BSON( "state" << 0 ) ) );
+
+ // Either the update went through or it didn't, either way we're done trying to
+ // unlock
+ log( DistributedLock::logLvl - 1 ) << "handled late remove of old distributed lock with ts " << *i << endl;
+ removed = true;
+ }
+ catch( UpdateNotTheSame& ) {
+ log( DistributedLock::logLvl - 1 ) << "partially removed old distributed lock with ts " << *i << endl;
+ removed = true;
+ }
+ catch ( std::exception& e) {
+ warning() << "could not remove old distributed lock with ts " << *i
+ << causedBy( e ) << endl;
+ }
+
+ }
+
+ if( numOldLocks > 0 && _oldLockOIDs.size() > 0 ){
+ log( DistributedLock::logLvl - 1 ) << "not all old lock entries could be removed for process " << process << endl;
}
- }
- Date_t fourDays = jsTime() - ( 4 * 86400 * 1000 ); // 4 days
- conn->remove( lockPingNS , BSON( "_id" << BSON( "$nin" << pids ) << "ping" << LT << fourDays ) );
- err = conn->getLastError();
- if ( ! err.empty() ) {
- warning() << "dist_lock cleanup request from process: " << process << " to: " << addr
- << " failed: " << err << endl;
conn.done();
- sleepsecs(30);
- continue;
- }
- // create index so remove is fast even with a lot of servers
- if ( loops++ == 0 ) {
- conn->ensureIndex( lockPingNS , BSON( "ping" << 1 ) );
+ }
+ catch ( std::exception& e ) {
+ warning() << "distributed lock pinger '" << pingId << "' detected an exception while pinging."
+ << causedBy( e ) << endl;
}
- conn.done();
+ sleepmillis(sleepTime);
+ }
+
+ warning() << "removing distributed lock ping thread '" << pingId << "'" << endl;
+
+
+ if( shouldKill( addr, process ) )
+ finishKill( addr, process );
+
+ }
+
+ void distLockPingThread( ConnectionString addr, long long clockSkew, string processId, unsigned long long sleepTime ) {
+ try {
+ jsTimeVirtualThreadSkew( clockSkew );
+ _distLockPingThread( addr, processId, sleepTime );
}
catch ( std::exception& e ) {
- warning() << "dist_lock exception during ping: " << e.what() << endl;
+ error() << "unexpected error while running distributed lock pinger for " << addr << ", process " << processId << causedBy( e ) << endl;
}
+ catch ( ... ) {
+ error() << "unknown error while running distributed lock pinger for " << addr << ", process " << processId << endl;
+ }
+ }
- log( loops % 10 == 0 ? 0 : 1) << "dist_lock pinged successfully for: " << process << endl;
- sleepsecs(30);
+ string pingThreadId( const ConnectionString& conn, const string& processId ) {
+ return conn.toString() + "/" + processId;
}
- }
- void distLockPingThread( ConnectionString addr ) {
- try {
- _distLockPingThread( addr );
+ string got( DistributedLock& lock, unsigned long long sleepTime ) {
+
+ // Make sure we don't start multiple threads for a process id
+ scoped_lock lk( _mutex );
+
+ const ConnectionString& conn = lock.getRemoteConnection();
+ const string& processId = lock.getProcessId();
+ string s = pingThreadId( conn, processId );
+
+ // Ignore if we already have a pinging thread for this process.
+ if ( _seen.count( s ) > 0 ) return "";
+
+ // Check our clock skew
+ try {
+ if( lock.isRemoteTimeSkewed() ) {
+ throw LockException( str::stream() << "clock skew of the cluster " << conn.toString() << " is too far out of bounds to allow distributed locking." , 13650 );
+ }
+ }
+ catch( LockException& e) {
+ throw LockException( str::stream() << "error checking clock skew of cluster " << conn.toString() << causedBy( e ) , 13651);
+ }
+
+ boost::thread t( boost::bind( &DistributedLockPinger::distLockPingThread, this, conn, getJSTimeVirtualThreadSkew(), processId, sleepTime) );
+
+ _seen.insert( s );
+
+ return s;
}
- catch ( std::exception& e ) {
- error() << "unexpected error in distLockPingThread: " << e.what() << endl;
+
+ void addUnlockOID( const OID& oid ) {
+ // Modifying the lock from some other thread
+ scoped_lock lk( _mutex );
+ _oldLockOIDs.push_back( oid );
}
- catch ( ... ) {
- error() << "unexpected unknown error in distLockPingThread" << endl;
+
+ bool willUnlockOID( const OID& oid ) {
+ scoped_lock lk( _mutex );
+ return find( _oldLockOIDs.begin(), _oldLockOIDs.end(), oid ) != _oldLockOIDs.end();
}
- }
+ void kill( const ConnectionString& conn, const string& processId ) {
+ // Make sure we're in a consistent state before other threads can see us
+ scoped_lock lk( _mutex );
- class DistributedLockPinger {
- public:
- DistributedLockPinger()
- : _mutex( "DistributedLockPinger" ) {
+ string pingId = pingThreadId( conn, processId );
+
+ assert( _seen.count( pingId ) > 0 );
+ _kill.insert( pingId );
+
+ }
+
+ bool shouldKill( const ConnectionString& conn, const string& processId ) {
+ return _kill.count( pingThreadId( conn, processId ) ) > 0;
}
- void got( const ConnectionString& conn ) {
- string s = conn.toString();
+ void finishKill( const ConnectionString& conn, const string& processId ) {
+ // Make sure we're in a consistent state before other threads can see us
scoped_lock lk( _mutex );
- if ( _seen.count( s ) > 0 )
- return;
- boost::thread t( boost::bind( &distLockPingThread , conn ) );
- _seen.insert( s );
+
+ string pingId = pingThreadId( conn, processId );
+
+ _kill.erase( pingId );
+ _seen.erase( pingId );
+
}
+ set<string> _kill;
set<string> _seen;
mongo::mutex _mutex;
+ list<OID> _oldLockOIDs;
} distLockPinger;
- DistributedLock::DistributedLock( const ConnectionString& conn , const string& name , unsigned takeoverMinutes )
- : _conn(conn),_name(name),_takeoverMinutes(takeoverMinutes) {
- _id = BSON( "_id" << name );
- _ns = "config.locks";
- distLockPinger.got( conn );
+
+ const string DistributedLock::lockPingNS = "config.lockpings";
+ const string DistributedLock::locksNS = "config.locks";
+
+ /**
+ * Create a new distributed lock, potentially with a custom sleep and takeover time. If a custom sleep time is
+ * specified (time between pings)
+ */
+ DistributedLock::DistributedLock( const ConnectionString& conn , const string& name , unsigned long long lockTimeout, bool asProcess )
+ : _conn(conn) , _name(name) , _id( BSON( "_id" << name ) ), _processId( asProcess ? getDistLockId() : getDistLockProcess() ),
+ _lockTimeout( lockTimeout == 0 ? LOCK_TIMEOUT : lockTimeout ), _maxClockSkew( _lockTimeout / LOCK_SKEW_FACTOR ), _maxNetSkew( _maxClockSkew ), _lockPing( _maxClockSkew ),
+ _mutex( "DistributedLock" )
+ {
+ log( logLvl - 1 ) << "created new distributed lock for " << name << " on " << conn
+ << " ( lock timeout : " << _lockTimeout
+ << ", ping interval : " << _lockPing << ", process : " << asProcess << " )" << endl;
+ }
+
+ Date_t DistributedLock::getRemoteTime() {
+ return DistributedLock::remoteTime( _conn, _maxNetSkew );
+ }
+
+ bool DistributedLock::isRemoteTimeSkewed() {
+ return !DistributedLock::checkSkew( _conn, NUM_LOCK_SKEW_CHECKS, _maxClockSkew, _maxNetSkew );
+ }
+
+ const ConnectionString& DistributedLock::getRemoteConnection() {
+ return _conn;
+ }
+
+ const string& DistributedLock::getProcessId() {
+ return _processId;
+ }
+
+ /**
+ * Returns the remote time as reported by the cluster or server. The maximum difference between the reported time
+ * and the actual time on the remote server (at the completion of the function) is the maxNetSkew
+ */
+ Date_t DistributedLock::remoteTime( const ConnectionString& cluster, unsigned long long maxNetSkew ) {
+
+ ConnectionString server( *cluster.getServers().begin() );
+ ScopedDbConnection conn( server );
+
+ BSONObj result;
+ long long delay;
+
+ try {
+ Date_t then = jsTime();
+ bool success = conn->runCommand( string("admin"), BSON( "serverStatus" << 1 ), result );
+ delay = jsTime() - then;
+
+ if( !success )
+ throw TimeNotFoundException( str::stream() << "could not get status from server "
+ << server.toString() << " in cluster " << cluster.toString()
+ << " to check time", 13647 );
+
+ // Make sure that our delay is not more than 2x our maximum network skew, since this is the max our remote
+ // time value can be off by if we assume a response in the middle of the delay.
+ if( delay > (long long) (maxNetSkew * 2) )
+ throw TimeNotFoundException( str::stream() << "server " << server.toString()
+ << " in cluster " << cluster.toString()
+ << " did not respond within max network delay of "
+ << maxNetSkew << "ms", 13648 );
+ }
+ catch(...) {
+ conn.done();
+ throw;
+ }
+
+ conn.done();
+
+ return result["localTime"].Date() - (delay / 2);
+
+ }
+
+ bool DistributedLock::checkSkew( const ConnectionString& cluster, unsigned skewChecks, unsigned long long maxClockSkew, unsigned long long maxNetSkew ) {
+
+ vector<HostAndPort> servers = cluster.getServers();
+
+ if(servers.size() < 1) return true;
+
+ vector<long long> avgSkews;
+
+ for(unsigned i = 0; i < skewChecks; i++) {
+
+ // Find the average skew for each server
+ unsigned s = 0;
+ for(vector<HostAndPort>::iterator si = servers.begin(); si != servers.end(); ++si,s++) {
+
+ if(i == 0) avgSkews.push_back(0);
+
+ // Could check if this is self, but shouldn't matter since local network connection should be fast.
+ ConnectionString server( *si );
+
+ vector<long long> skew;
+
+ BSONObj result;
+
+ Date_t remote = remoteTime( server, maxNetSkew );
+ Date_t local = jsTime();
+
+ // Remote time can be delayed by at most MAX_NET_SKEW
+
+ // Skew is how much time we'd have to add to local to get to remote
+ avgSkews[s] += (long long) (remote - local);
+
+ log( logLvl + 1 ) << "skew from remote server " << server << " found: " << (long long) (remote - local) << endl;
+
+ }
+ }
+
+ // Analyze skews
+
+ long long serverMaxSkew = 0;
+ long long serverMinSkew = 0;
+
+ for(unsigned s = 0; s < avgSkews.size(); s++) {
+
+ long long avgSkew = (avgSkews[s] /= skewChecks);
+
+ // Keep track of max and min skews
+ if(s == 0) {
+ serverMaxSkew = avgSkew;
+ serverMinSkew = avgSkew;
+ }
+ else {
+ if(avgSkew > serverMaxSkew)
+ serverMaxSkew = avgSkew;
+ if(avgSkew < serverMinSkew)
+ serverMinSkew = avgSkew;
+ }
+
+ }
+
+ long long totalSkew = serverMaxSkew - serverMinSkew;
+
+ // Make sure our max skew is not more than our pre-set limit
+ if(totalSkew > (long long) maxClockSkew) {
+ log( logLvl + 1 ) << "total clock skew of " << totalSkew << "ms for servers " << cluster << " is out of " << maxClockSkew << "ms bounds." << endl;
+ return false;
+ }
+
+ log( logLvl + 1 ) << "total clock skew of " << totalSkew << "ms for servers " << cluster << " is in " << maxClockSkew << "ms bounds." << endl;
+ return true;
+ }
+
+ // For use in testing, ping thread should run indefinitely in practice.
+ bool DistributedLock::killPinger( DistributedLock& lock ) {
+ if( lock._threadId == "") return false;
+
+ distLockPinger.kill( lock._conn, lock._processId );
+ return true;
}
+ // Semantics of this method are basically that if the lock cannot be acquired, returns false, can be retried.
+ // If the lock should not be tried again (some unexpected error) a LockException is thrown.
+ // If we are only trying to re-enter a currently held lock, reenter should be true.
+ // Note: reenter doesn't actually make this lock re-entrant in the normal sense, since it can still only
+ // be unlocked once, instead it is used to verify that the lock is already held.
+ bool DistributedLock::lock_try( const string& why , bool reenter, BSONObj * other ) {
+
+ // TODO: Start pinging only when we actually get the lock?
+ // If we don't have a thread pinger, make sure we shouldn't have one
+ if( _threadId == "" ){
+ scoped_lock lk( _mutex );
+ _threadId = distLockPinger.got( *this, _lockPing );
+ }
+
+ // This should always be true, if not, we are using the lock incorrectly.
+ assert( _name != "" );
- bool DistributedLock::lock_try( string why , BSONObj * other ) {
// write to dummy if 'other' is null
BSONObj dummyOther;
if ( other == NULL )
@@ -182,93 +471,240 @@ namespace mongo {
{
// make sure its there so we can use simple update logic below
- BSONObj o = conn->findOne( _ns , _id ).getOwned();
+ BSONObj o = conn->findOne( locksNS , _id ).getOwned();
+
+ // Case 1: No locks
if ( o.isEmpty() ) {
try {
- log(4) << "dist_lock inserting initial doc in " << _ns << " for lock " << _name << endl;
- conn->insert( _ns , BSON( "_id" << _name << "state" << 0 << "who" << "" ) );
+ log( logLvl ) << "inserting initial doc in " << locksNS << " for lock " << _name << endl;
+ conn->insert( locksNS , BSON( "_id" << _name << "state" << 0 << "who" << "" ) );
}
catch ( UserException& e ) {
- log() << "dist_lock could not insert initial doc: " << e << endl;
+ warning() << "could not insert initial doc for distributed lock " << _name << causedBy( e ) << endl;
}
}
-
+
+ // Case 2: A set lock that we might be able to force
else if ( o["state"].numberInt() > 0 ) {
+
+ string lockName = o["_id"].String() + string("/") + o["process"].String();
+
+ bool canReenter = reenter && o["process"].String() == _processId && ! distLockPinger.willUnlockOID( o["ts"].OID() ) && o["state"].numberInt() == 2;
+ if( reenter && ! canReenter ) {
+ log( logLvl - 1 ) << "not re-entering distributed lock " << lockName;
+ if( o["process"].String() != _processId ) log( logLvl - 1 ) << ", different process " << _processId << endl;
+ else if( o["state"].numberInt() == 2 ) log( logLvl - 1 ) << ", state not finalized" << endl;
+ else log( logLvl - 1 ) << ", ts " << o["ts"].OID() << " scheduled for late unlock" << endl;
+
+ // reset since we've been bounced by a previous lock not being where we thought it was,
+ // and should go through full forcing process if required.
+ // (in theory we should never see a ping here if used correctly)
+ *other = o; other->getOwned(); conn.done(); resetLastPing();
+ return false;
+ }
+
BSONObj lastPing = conn->findOne( lockPingNS , o["process"].wrap( "_id" ) );
if ( lastPing.isEmpty() ) {
- // if a lock is taken but there's no ping for it, we're in an inconsistent situation
- // if the lock holder (mongos or d) does not exist anymore, the lock could safely be removed
- // but we'd require analysis of the situation before a manual intervention
- error() << "config.locks: " << _name << " lock is taken by old process? "
- << "remove the following lock if the process is not active anymore: " << o << endl;
- *other = o;
- conn.done();
- return false;
+ log( logLvl ) << "empty ping found for process in lock '" << lockName << "'" << endl;
+ // TODO: Using 0 as a "no time found" value Will fail if dates roll over, but then, so will a lot.
+ lastPing = BSON( "_id" << o["process"].String() << "ping" << (Date_t) 0 );
}
- unsigned long long now = jsTime();
- unsigned long long pingTime = lastPing["ping"].Date();
-
- if ( now < pingTime ) {
- // clock skew
- warning() << "dist_lock has detected clock skew of " << ( pingTime - now ) << "ms" << endl;
- *other = o;
- conn.done();
- return false;
+ unsigned long long elapsed = 0;
+ unsigned long long takeover = _lockTimeout;
+
+ log( logLvl ) << "checking last ping for lock '" << lockName << "'" << " against process " << _lastPingCheck.get<0>() << " and ping " << _lastPingCheck.get<1>() << endl;
+
+ try {
+
+ Date_t remote = remoteTime( _conn );
+
+ // Timeout the elapsed time using comparisons of remote clock
+ // For non-finalized locks, timeout 15 minutes since last seen (ts)
+ // For finalized locks, timeout 15 minutes since last ping
+ bool recPingChange = o["state"].numberInt() == 2 && ( _lastPingCheck.get<0>() != lastPing["_id"].String() || _lastPingCheck.get<1>() != lastPing["ping"].Date() );
+ bool recTSChange = _lastPingCheck.get<3>() != o["ts"].OID();
+
+ if( recPingChange || recTSChange ) {
+ // If the ping has changed since we last checked, mark the current date and time
+ scoped_lock lk( _mutex );
+ _lastPingCheck = boost::tuple<string, Date_t, Date_t, OID>( lastPing["_id"].String().c_str(), lastPing["ping"].Date(), remote, o["ts"].OID() );
+ }
+ else {
+
+ // GOTCHA! Due to network issues, it is possible that the current time
+ // is less than the remote time. We *have* to check this here, otherwise
+ // we overflow and our lock breaks.
+ if(_lastPingCheck.get<2>() >= remote)
+ elapsed = 0;
+ else
+ elapsed = remote - _lastPingCheck.get<2>();
+ }
+
}
-
- unsigned long long elapsed = now - pingTime;
- elapsed = elapsed / ( 1000 * 60 ); // convert to minutes
-
- if ( elapsed > ( 60 * 24 * 365 * 100 ) /* 100 years */ ) {
- warning() << "distlock elapsed time seems impossible: " << lastPing << endl;
+ catch( LockException& e ) {
+
+ // Remote server cannot be found / is not responsive
+ warning() << "Could not get remote time from " << _conn << causedBy( e );
+ // If our config server is having issues, forget all the pings until we can see it again
+ resetLastPing();
+
}
-
- if ( elapsed <= _takeoverMinutes ) {
- log(1) << "dist_lock lock failed because taken by: " << o << " elapsed minutes: " << elapsed << endl;
- *other = o;
- conn.done();
+
+ if ( elapsed <= takeover && ! canReenter ) {
+ log( logLvl ) << "could not force lock '" << lockName << "' because elapsed time " << elapsed << " <= takeover time " << takeover << endl;
+ *other = o; other->getOwned(); conn.done();
return false;
}
-
- log() << "dist_lock forcefully taking over from: " << o << " elapsed minutes: " << elapsed << endl;
- conn->update( _ns , _id , BSON( "$set" << BSON( "state" << 0 ) ) );
- string err = conn->getLastError();
- if ( ! err.empty() ) {
- warning() << "dist_lock take over from: " << o << " failed: " << err << endl;
- *other = o.getOwned();
- other->getOwned();
- conn.done();
+ else if( elapsed > takeover && canReenter ) {
+ log( logLvl - 1 ) << "not re-entering distributed lock " << lockName << "' because elapsed time " << elapsed << " > takeover time " << takeover << endl;
+ *other = o; other->getOwned(); conn.done();
return false;
}
+ log( logLvl - 1 ) << ( canReenter ? "re-entering" : "forcing" ) << " lock '" << lockName << "' because "
+ << ( canReenter ? "re-entering is allowed, " : "" )
+ << "elapsed time " << elapsed << " > takeover time " << takeover << endl;
+
+ if( elapsed > takeover ) {
+
+ // Lock may forced, reset our timer if succeeds or fails
+ // Ensures that another timeout must happen if something borks up here, and resets our pristine
+ // ping state if acquired.
+ resetLastPing();
+
+ try {
+
+ // Check the clock skew again. If we check this before we get a lock
+ // and after the lock times out, we can be pretty sure the time is
+ // increasing at the same rate on all servers and therefore our
+ // timeout is accurate
+ uassert( 14023, str::stream() << "remote time in cluster " << _conn.toString() << " is now skewed, cannot force lock.", !isRemoteTimeSkewed() );
+
+ // Make sure we break the lock with the correct "ts" (OID) value, otherwise
+ // we can overwrite a new lock inserted in the meantime.
+ conn->update( locksNS , BSON( "_id" << _id["_id"].String() << "state" << o["state"].numberInt() << "ts" << o["ts"] ),
+ BSON( "$set" << BSON( "state" << 0 ) ) );
+
+ BSONObj err = conn->getLastErrorDetailed();
+ string errMsg = DBClientWithCommands::getLastErrorString(err);
+
+ // TODO: Clean up all the extra code to exit this method, probably with a refactor
+ if ( !errMsg.empty() || !err["n"].type() || err["n"].numberInt() < 1 ) {
+ ( errMsg.empty() ? log( logLvl - 1 ) : warning() ) << "Could not force lock '" << lockName << "' "
+ << ( !errMsg.empty() ? causedBy(errMsg) : string("(another force won)") ) << endl;
+ *other = o; other->getOwned(); conn.done();
+ return false;
+ }
+
+ }
+ catch( UpdateNotTheSame& ) {
+ // Ok to continue since we know we forced at least one lock document, and all lock docs
+ // are required for a lock to be held.
+ warning() << "lock forcing " << lockName << " inconsistent" << endl;
+ }
+ catch( std::exception& e ) {
+ conn.done();
+ throw LockException( str::stream() << "exception forcing distributed lock "
+ << lockName << causedBy( e ), 13660);
+ }
+
+ }
+ else {
+
+ assert( canReenter );
+
+ // Lock may be re-entered, reset our timer if succeeds or fails
+ // Not strictly necessary, but helpful for small timeouts where thread scheduling is significant.
+ // This ensures that two attempts are still required for a force if not acquired, and resets our
+ // state if we are acquired.
+ resetLastPing();
+
+ // Test that the lock is held by trying to update the finalized state of the lock to the same state
+ // if it does not update or does not update on all servers, we can't re-enter.
+ try {
+
+ // Test the lock with the correct "ts" (OID) value
+ conn->update( locksNS , BSON( "_id" << _id["_id"].String() << "state" << 2 << "ts" << o["ts"] ),
+ BSON( "$set" << BSON( "state" << 2 ) ) );
+
+ BSONObj err = conn->getLastErrorDetailed();
+ string errMsg = DBClientWithCommands::getLastErrorString(err);
+
+ // TODO: Clean up all the extra code to exit this method, probably with a refactor
+ if ( ! errMsg.empty() || ! err["n"].type() || err["n"].numberInt() < 1 ) {
+ ( errMsg.empty() ? log( logLvl - 1 ) : warning() ) << "Could not re-enter lock '" << lockName << "' "
+ << ( !errMsg.empty() ? causedBy(errMsg) : string("(not sure lock is held)") )
+ << " gle: " << err
+ << endl;
+ *other = o; other->getOwned(); conn.done();
+ return false;
+ }
+
+ }
+ catch( UpdateNotTheSame& ) {
+ // NOT ok to continue since our lock isn't held by all servers, so isn't valid.
+ warning() << "inconsistent state re-entering lock, lock " << lockName << " not held" << endl;
+ *other = o; other->getOwned(); conn.done();
+ return false;
+ }
+ catch( std::exception& e ) {
+ conn.done();
+ throw LockException( str::stream() << "exception re-entering distributed lock "
+ << lockName << causedBy( e ), 13660);
+ }
+
+ log( logLvl - 1 ) << "re-entered distributed lock '" << lockName << "'" << endl;
+ *other = o; other->getOwned(); conn.done();
+ return true;
+
+ }
+
+ log( logLvl - 1 ) << "lock '" << lockName << "' successfully forced" << endl;
+
+ // We don't need the ts value in the query, since we will only ever replace locks with state=0.
}
+ // Case 3: We have an expired lock
else if ( o["ts"].type() ) {
queryBuilder.append( o["ts"] );
}
}
- OID ts;
- ts.init();
+ // Always reset our ping if we're trying to get a lock, since getting a lock implies the lock state is open
+ // and no locks need to be forced. If anything goes wrong, we don't want to remember an old lock.
+ resetLastPing();
bool gotLock = false;
- BSONObj now;
+ BSONObj currLock;
- BSONObj lockDetails = BSON( "state" << 1 << "who" << getDistLockId() << "process" << getDistLockProcess() <<
- "when" << DATENOW << "why" << why << "ts" << ts );
+ BSONObj lockDetails = BSON( "state" << 1 << "who" << getDistLockId() << "process" << _processId <<
+ "when" << jsTime() << "why" << why << "ts" << OID::gen() );
BSONObj whatIWant = BSON( "$set" << lockDetails );
+
+ BSONObj query = queryBuilder.obj();
+
+ string lockName = _name + string("/") + _processId;
+
try {
- log(4) << "dist_lock about to aquire lock: " << lockDetails << endl;
- conn->update( _ns , queryBuilder.obj() , whatIWant );
+ // Main codepath to acquire lock
+
+ log( logLvl ) << "about to acquire distributed lock '" << lockName << ":\n"
+ << lockDetails.jsonString(Strict, true) << "\n"
+ << query.jsonString(Strict, true) << endl;
+
+ conn->update( locksNS , query , whatIWant );
+
+ BSONObj err = conn->getLastErrorDetailed();
+ string errMsg = DBClientWithCommands::getLastErrorString(err);
- BSONObj o = conn->getLastErrorDetailed();
- now = conn->findOne( _ns , _id );
+ currLock = conn->findOne( locksNS , _id );
- if ( o["n"].numberInt() == 0 ) {
- *other = now;
+ if ( !errMsg.empty() || !err["n"].type() || err["n"].numberInt() < 1 ) {
+ ( errMsg.empty() ? log( logLvl - 1 ) : warning() ) << "could not acquire lock '" << lockName << "' "
+ << ( !errMsg.empty() ? causedBy( errMsg ) : string("(another update won)") ) << endl;
+ *other = currLock;
other->getOwned();
- log() << "dist_lock error trying to aquire lock: " << lockDetails << " error: " << o << endl;
gotLock = false;
}
else {
@@ -277,63 +713,234 @@ namespace mongo {
}
catch ( UpdateNotTheSame& up ) {
+
// this means our update got through on some, but not others
- log(4) << "dist_lock lock did not propagate properly" << endl;
+ warning() << "distributed lock '" << lockName << " did not propagate properly." << causedBy( up ) << endl;
+
+ // Overall protection derives from:
+ // All unlocking updates use the ts value when setting state to 0
+ // This ensures that during locking, we can override all smaller ts locks with
+ // our own safe ts value and not be unlocked afterward.
+ for ( unsigned i = 0; i < up.size(); i++ ) {
+
+ ScopedDbConnection indDB( up[i].first );
+ BSONObj indUpdate;
+
+ try {
+
+ indUpdate = indDB->findOne( locksNS , _id );
+
+ // If we override this lock in any way, grab and protect it.
+ // We assume/ensure that if a process does not have all lock documents, it is no longer
+ // holding the lock.
+ // Note - finalized locks may compete too, but we know they've won already if competing
+ // in this round. Cleanup of crashes during finalizing may take a few tries.
+ if( indUpdate["ts"] < lockDetails["ts"] || indUpdate["state"].numberInt() == 0 ) {
+
+ BSONObj grabQuery = BSON( "_id" << _id["_id"].String() << "ts" << indUpdate["ts"].OID() );
+
+ // Change ts so we won't be forced, state so we won't be relocked
+ BSONObj grabChanges = BSON( "ts" << lockDetails["ts"].OID() << "state" << 1 );
+
+ // Either our update will succeed, and we'll grab the lock, or it will fail b/c some other
+ // process grabbed the lock (which will change the ts), but the lock will be set until forcing
+ indDB->update( locksNS, grabQuery, BSON( "$set" << grabChanges ) );
+
+ indUpdate = indDB->findOne( locksNS, _id );
+
+ // Our lock should now be set until forcing.
+ assert( indUpdate["state"].numberInt() == 1 );
+
+ }
+ // else our lock is the same, in which case we're safe, or it's a bigger lock,
+ // in which case we won't need to protect anything since we won't have the lock.
+
+ }
+ catch( std::exception& e ) {
+ conn.done();
+ throw LockException( str::stream() << "distributed lock " << lockName
+ << " had errors communicating with individual server "
+ << up[1].first << causedBy( e ), 13661 );
+ }
- for ( unsigned i=0; i<up.size(); i++ ) {
- ScopedDbConnection temp( up[i].first );
- BSONObj temp2 = temp->findOne( _ns , _id );
+ assert( !indUpdate.isEmpty() );
- if ( now.isEmpty() || now["ts"] < temp2["ts"] ) {
- now = temp2.getOwned();
+ // Find max TS value
+ if ( currLock.isEmpty() || currLock["ts"] < indUpdate["ts"] ) {
+ currLock = indUpdate.getOwned();
}
- temp.done();
+ indDB.done();
+
}
- if ( now["ts"].OID() == ts ) {
- log(4) << "dist_lock completed lock propagation" << endl;
+ // Locks on all servers are now set and safe until forcing
+
+ if ( currLock["ts"] == lockDetails["ts"] ) {
+ log( logLvl - 1 ) << "lock update won, completing lock propagation for '" << lockName << "'" << endl;
gotLock = true;
- conn->update( _ns , _id , whatIWant );
}
else {
- log() << "dist_lock error trying to complete propagation" << endl;
+ log( logLvl - 1 ) << "lock update lost, lock '" << lockName << "' not propagated." << endl;
+
+ // Register the lock for deletion, to speed up failover
+ // Not strictly necessary, but helpful
+ distLockPinger.addUnlockOID( lockDetails["ts"].OID() );
+
gotLock = false;
}
}
+ catch( std::exception& e ) {
+ conn.done();
+ throw LockException( str::stream() << "exception creating distributed lock "
+ << lockName << causedBy( e ), 13663 );
+ }
- conn.done();
+ // Complete lock propagation
+ if( gotLock ) {
+
+ // This is now safe, since we know that no new locks will be placed on top of the ones we've checked for at
+ // least 15 minutes. Sets the state = 2, so that future clients can determine that the lock is truly set.
+ // The invariant for rollbacks is that we will never force locks with state = 2 and active pings, since that
+ // indicates the lock is active, but this means the process creating/destroying them must explicitly poll
+ // when something goes wrong.
+ try {
+
+ BSONObjBuilder finalLockDetails;
+ BSONObjIterator bi( lockDetails );
+ while( bi.more() ) {
+ BSONElement el = bi.next();
+ if( (string) ( el.fieldName() ) == "state" )
+ finalLockDetails.append( "state", 2 );
+ else finalLockDetails.append( el );
+ }
+
+ conn->update( locksNS , _id , BSON( "$set" << finalLockDetails.obj() ) );
+
+ BSONObj err = conn->getLastErrorDetailed();
+ string errMsg = DBClientWithCommands::getLastErrorString(err);
+
+ currLock = conn->findOne( locksNS , _id );
- log(2) << "dist_lock lock gotLock: " << gotLock << " now: " << now << endl;
+ if ( !errMsg.empty() || !err["n"].type() || err["n"].numberInt() < 1 ) {
+ warning() << "could not finalize winning lock " << lockName
+ << ( !errMsg.empty() ? causedBy( errMsg ) : " (did not update lock) " ) << endl;
+ gotLock = false;
+ }
+ else {
+ // SUCCESS!
+ gotLock = true;
+ }
+
+ }
+ catch( std::exception& e ) {
+ conn.done();
+
+ // Register the bad final lock for deletion, in case it exists
+ distLockPinger.addUnlockOID( lockDetails["ts"].OID() );
+
+ throw LockException( str::stream() << "exception finalizing winning lock"
+ << causedBy( e ), 13662 );
+ }
+
+ }
+
+ *other = currLock;
+ other->getOwned();
+
+ // Log our lock results
+ if(gotLock)
+ log( logLvl - 1 ) << "distributed lock '" << lockName << "' acquired, ts : " << currLock["ts"].OID() << endl;
+ else
+ log( logLvl - 1 ) << "distributed lock '" << lockName << "' was not acquired." << endl;
+
+ conn.done();
return gotLock;
}
- void DistributedLock::unlock() {
+ // Unlock now takes an optional pointer to the lock, so you can be specific about which
+ // particular lock you want to unlock. This is required when the config server is down,
+ // and so cannot tell you what lock ts you should try later.
+ void DistributedLock::unlock( BSONObj* oldLockPtr ) {
+
+ assert( _name != "" );
+
+ string lockName = _name + string("/") + _processId;
+
const int maxAttempts = 3;
int attempted = 0;
+
+ BSONObj oldLock;
+ if( oldLockPtr ) oldLock = *oldLockPtr;
+
while ( ++attempted <= maxAttempts ) {
+ ScopedDbConnection conn( _conn );
+
try {
- ScopedDbConnection conn( _conn );
- conn->update( _ns , _id, BSON( "$set" << BSON( "state" << 0 ) ) );
- log(2) << "dist_lock unlock: " << conn->findOne( _ns , _id ) << endl;
- conn.done();
- return;
+ if( oldLock.isEmpty() )
+ oldLock = conn->findOne( locksNS, _id );
+
+ if( oldLock["state"].eoo() || oldLock["state"].numberInt() != 2 || oldLock["ts"].eoo() ) {
+ warning() << "cannot unlock invalid distributed lock " << oldLock << endl;
+ conn.done();
+ break;
+ }
+ // Use ts when updating lock, so that new locks can be sure they won't get trampled.
+ conn->update( locksNS ,
+ BSON( "_id" << _id["_id"].String() << "ts" << oldLock["ts"].OID() ),
+ BSON( "$set" << BSON( "state" << 0 ) ) );
+ // Check that the lock was actually unlocked... if not, try again
+ BSONObj err = conn->getLastErrorDetailed();
+ string errMsg = DBClientWithCommands::getLastErrorString(err);
+
+ if ( !errMsg.empty() || !err["n"].type() || err["n"].numberInt() < 1 ){
+ warning() << "distributed lock unlock update failed, retrying "
+ << ( errMsg.empty() ? causedBy( "( update not registered )" ) : causedBy( errMsg ) ) << endl;
+ conn.done();
+ continue;
+ }
+
+ log( logLvl - 1 ) << "distributed lock '" << lockName << "' unlocked. " << endl;
+ conn.done();
+ return;
+ }
+ catch( UpdateNotTheSame& ) {
+ log( logLvl - 1 ) << "distributed lock '" << lockName << "' unlocked (messily). " << endl;
+ conn.done();
+ break;
}
catch ( std::exception& e) {
- log( LL_WARNING ) << "dist_lock " << _name << " failed to contact config server in unlock attempt "
- << attempted << ": " << e.what() << endl;
+ warning() << "distributed lock '" << lockName << "' failed unlock attempt."
+ << causedBy( e ) << endl;
- sleepsecs(1 << attempted);
+ conn.done();
+ // TODO: If our lock timeout is small, sleeping this long may be unsafe.
+ if( attempted != maxAttempts) sleepsecs(1 << attempted);
}
}
- log( LL_WARNING ) << "dist_lock couldn't consumate unlock request. " << "Lock " << _name
- << " will be taken over after " << _takeoverMinutes << " minutes timeout" << endl;
+ if( attempted > maxAttempts && ! oldLock.isEmpty() && ! oldLock["ts"].eoo() ) {
+
+ log( logLvl - 1 ) << "could not unlock distributed lock with ts " << oldLock["ts"].OID()
+ << ", will attempt again later" << endl;
+
+ // We couldn't unlock the lock at all, so try again later in the pinging thread...
+ distLockPinger.addUnlockOID( oldLock["ts"].OID() );
+ }
+ else if( attempted > maxAttempts ) {
+ warning() << "could not unlock untracked distributed lock, a manual force may be required" << endl;
+ }
+
+ warning() << "distributed lock '" << lockName << "' couldn't consummate unlock request. "
+ << "lock may be taken over after " << ( _lockTimeout / (60 * 1000) )
+ << " minutes timeout." << endl;
}
+
+
}