diff options
Diffstat (limited to 'db/client.cpp')
-rw-r--r-- | db/client.cpp | 396 |
1 files changed, 206 insertions, 190 deletions
diff --git a/db/client.cpp b/db/client.cpp index f9653f5..e4fd4b9 100644 --- a/db/client.cpp +++ b/db/client.cpp @@ -16,14 +16,14 @@ * along with this program. If not, see <http://www.gnu.org/licenses/>. */ -/* Client represents a connection to the database (the server-side) and corresponds +/* Client represents a connection to the database (the server-side) and corresponds to an open socket (or logical connection if pooling on sockets) from a client. */ #include "pch.h" #include "db.h" #include "client.h" -#include "curop.h" +#include "curop-inl.h" #include "json.h" #include "security.h" #include "commands.h" @@ -40,20 +40,31 @@ namespace mongo { set<Client*> Client::clients; // always be in clientsMutex when manipulating this boost::thread_specific_ptr<Client> currentClient; - Client::Client(const char *desc, MessagingPort *p) : - _context(0), - _shutdown(false), - _desc(desc), - _god(0), - _lastOp(0), - _mp(p) - { + /* each thread which does db operations has a Client object in TLS. + call this when your thread starts. + */ + Client& Client::initThread(const char *desc, MessagingPort *mp) { + assert( currentClient.get() == 0 ); + Client *c = new Client(desc, mp); + currentClient.reset(c); + mongo::lastError.initThread(); + return *c; + } + + Client::Client(const char *desc, MessagingPort *p) : + _context(0), + _shutdown(false), + _desc(desc), + _god(0), + _lastOp(0), + _mp(p) { + _connectionId = setThreadName(desc); _curOp = new CurOp( this ); scoped_lock bl(clientsMutex); clients.insert(this); } - Client::~Client() { + Client::~Client() { _god = 0; if ( _context ) @@ -62,90 +73,33 @@ namespace mongo { if ( ! _shutdown ) { error() << "Client::shutdown not called: " << _desc << endl; } - + scoped_lock bl(clientsMutex); if ( ! _shutdown ) clients.erase(this); delete _curOp; } - - void Client::_dropns( const string& ns ){ - Top::global.collectionDropped( ns ); - - dblock l; - Client::Context ctx( ns ); - if ( ! nsdetails( ns.c_str() ) ) - return; - - try { - string err; - BSONObjBuilder b; - dropCollection( ns , err , b ); - } - catch ( ... ){ - warning() << "error dropping temp collection: " << ns << endl; - } - - } - - void Client::_invalidateDB( const string& db ) { - assert( db.find( '.' ) == string::npos ); - - set<string>::iterator min = _tempCollections.lower_bound( db + "." ); - set<string>::iterator max = _tempCollections.lower_bound( db + "|" ); - - _tempCollections.erase( min , max ); - - } - - void Client::invalidateDB(const string& db) { - scoped_lock bl(clientsMutex); - for ( set<Client*>::iterator i = clients.begin(); i!=clients.end(); i++ ){ - Client* cli = *i; - cli->_invalidateDB(db); - } - } - void Client::invalidateNS( const string& ns ){ - scoped_lock bl(clientsMutex); - for ( set<Client*>::iterator i = clients.begin(); i!=clients.end(); i++ ){ - Client* cli = *i; - cli->_tempCollections.erase( ns ); - } - } - - - void Client::addTempCollection( const string& ns ) { - _tempCollections.insert( ns ); - } - - bool Client::shutdown(){ + bool Client::shutdown() { _shutdown = true; if ( inShutdown() ) return false; { scoped_lock bl(clientsMutex); clients.erase(this); - } - - bool didAnything = false; - - if ( _tempCollections.size() ){ - didAnything = true; - for ( set<string>::iterator i = _tempCollections.begin(); i!=_tempCollections.end(); i++ ){ - _dropns( *i ); + if ( isSyncThread() ) { + syncThread = 0; } - _tempCollections.clear(); } - - return didAnything; + + return false; } - BSONObj CurOp::_tooBig = fromjson("{\"$msg\":\"query not recording (too large)\"}"); + BSONObj CachedBSONObj::_tooBig = fromjson("{\"$msg\":\"query not recording (too large)\"}"); AtomicUInt CurOp::_nextOpNum; - + Client::Context::Context( string ns , Database * db, bool doauth ) - : _client( currentClient.get() ) , _oldContext( _client->_context ) , + : _client( currentClient.get() ) , _oldContext( _client->_context ) , _path( dbpath ) , _lock(0) , _justCreated(false) { assert( db && db->isOk() ); _ns = ns; @@ -155,20 +109,36 @@ namespace mongo { _auth(); } - void Client::Context::_finishInit( bool doauth ){ + Client::Context::Context(const string& ns, string path , mongolock * lock , bool doauth ) + : _client( currentClient.get() ) , _oldContext( _client->_context ) , + _path( path ) , _lock( lock ) , + _ns( ns ), _db(0) { + _finishInit( doauth ); + } + + /* this version saves the context but doesn't yet set the new one: */ + + Client::Context::Context() + : _client( currentClient.get() ) , _oldContext( _client->_context ), + _path( dbpath ) , _lock(0) , _justCreated(false), _db(0) { + _client->_context = this; + clear(); + } + + void Client::Context::_finishInit( bool doauth ) { int lockState = dbMutex.getState(); assert( lockState ); - + _db = dbHolder.get( _ns , _path ); - if ( _db ){ + if ( _db ) { _justCreated = false; } - else if ( dbMutex.getState() > 0 ){ + else if ( dbMutex.getState() > 0 ) { // already in a write lock _db = dbHolder.getOrCreate( _ns , _path , _justCreated ); assert( _db ); } - else if ( dbMutex.getState() < -1 ){ + else if ( dbMutex.getState() < -1 ) { // nested read lock :( assert( _lock ); _lock->releaseAndWriteLock(); @@ -181,50 +151,52 @@ namespace mongo { // to do that, we're going to unlock, then get a write lock // this is so that if this is the first query and its long doesn't block db // we just have to check that the db wasn't closed in the interim where we unlock - for ( int x=0; x<2; x++ ){ - { + for ( int x=0; x<2; x++ ) { + { dbtemprelease unlock; writelock lk( _ns ); dbHolder.getOrCreate( _ns , _path , _justCreated ); } - + _db = dbHolder.get( _ns , _path ); - + if ( _db ) break; - + log() << "db was closed on us right after we opened it: " << _ns << endl; } - + uassert( 13005 , "can't create db, keeps getting closed" , _db ); } - - _client->_context = this; - _client->_curOp->enter( this ); - if ( doauth ) - _auth( lockState ); - switch ( _client->_curOp->getOp() ){ + switch ( _client->_curOp->getOp() ) { case dbGetMore: // getMore's are special and should be handled else where case dbUpdate: // update & delete check shard version in instance.cpp, so don't check here as well - case dbDelete: + case dbDelete: break; default: { string errmsg; - if ( ! shardVersionOk( _ns , lockState > 0 , errmsg ) ){ - msgasserted( StaleConfigInContextCode , (string)"[" + _ns + "] shard version not ok in Client::Context: " + errmsg ); + if ( ! shardVersionOk( _ns , lockState > 0 , errmsg ) ) { + ostringstream os; + os << "[" << _ns << "] shard version not ok in Client::Context: " << errmsg; + msgassertedNoTrace( StaleConfigInContextCode , os.str().c_str() ); } } } + + _client->_context = this; + _client->_curOp->enter( this ); + if ( doauth ) + _auth( lockState ); } - - void Client::Context::_auth( int lockState ){ + + void Client::Context::_auth( int lockState ) { if ( _client->_ai.isAuthorizedForLock( _db->name , lockState ) ) return; // before we assert, do a little cleanup _client->_context = _oldContext; // note: _oldContext may be null - + stringstream ss; ss << "unauthorized db:" << _db->name << " lock type:" << lockState << " client:" << _client->clientAddress(); uasserted( 10057 , ss.str() ); @@ -236,9 +208,35 @@ namespace mongo { _client->_context = _oldContext; // note: _oldContext may be null } - string Client::clientAddress() const { + bool Client::Context::inDB( const string& db , const string& path ) const { + if ( _path != path ) + return false; + + if ( db == _ns ) + return true; + + string::size_type idx = _ns.find( db ); + if ( idx != 0 ) + return false; + + return _ns[db.size()] == '.'; + } + + void Client::appendLastOp( BSONObjBuilder& b ) const { + if( theReplSet ) { + b.append("lastOp" , (long long) _lastOp); + } + else { + OpTime lo(_lastOp); + if ( ! lo.isNull() ) + b.appendTimestamp( "lastOp" , lo.asDate() ); + } + } + + + string Client::clientAddress(bool includePort) const { if( _curOp ) - return _curOp->getRemoteString(false); + return _curOp->getRemoteString(includePort); return ""; } @@ -249,63 +247,75 @@ namespace mongo { return ss.str(); } - string sayClientState(){ + string sayClientState() { Client* c = currentClient.get(); if ( !c ) return "no client"; return c->toString(); } - - void curopWaitingForLock( int type ){ + + Client* curopWaitingForLock( int type ) { Client * c = currentClient.get(); assert( c ); CurOp * co = c->curop(); - if ( co ){ + if ( co ) { co->waitingForLock( type ); } + return c; } - void curopGotLock(){ - Client * c = currentClient.get(); + void curopGotLock(Client *c) { assert(c); CurOp * co = c->curop(); - if ( co ){ + if ( co ) co->gotLock(); - } } - CurOp::~CurOp(){ - if ( _wrapped ){ - scoped_lock bl(Client::clientsMutex); - _client->_curOp = _wrapped; + void KillCurrentOp::interruptJs( AtomicUInt *op ) { + if ( !globalScriptEngine ) + return; + if ( !op ) { + globalScriptEngine->interruptAll(); } - - _client = 0; + else { + globalScriptEngine->interrupt( *op ); + } + } + + void KillCurrentOp::killAll() { + _globalKill = true; + interruptJs( 0 ); } - BSONObj CurOp::query( bool threadSafe ) { - if( querySize() == 1 ) { - return _tooBig; + void KillCurrentOp::kill(AtomicUInt i) { + bool found = false; + { + scoped_lock l( Client::clientsMutex ); + for( set< Client* >::const_iterator j = Client::clients.begin(); !found && j != Client::clients.end(); ++j ) { + for( CurOp *k = ( *j )->curop(); !found && k; k = k->parent() ) { + if ( k->opNum() == i ) { + k->kill(); + for( CurOp *l = ( *j )->curop(); l != k; l = l->parent() ) { + l->kill(); + } + found = true; + } + } + } } - - if ( ! threadSafe ){ - BSONObj o(_queryBuf); - return o; + if ( found ) { + interruptJs( &i ); } - - int size = querySize(); - int before = checksum( _queryBuf , size ); - BSONObj a(_queryBuf); - BSONObj b = a.copy(); - int after = checksum( _queryBuf , size ); - - if ( before == after ) - return b; - - return BSON( "msg" << "query changed while capturing" ); } + CurOp::~CurOp() { + if ( _wrapped ) { + scoped_lock bl(Client::clientsMutex); + _client->_curOp = _wrapped; + } + _client = 0; + } - BSONObj CurOp::infoNoauth( int attempt ) { + BSONObj CurOp::infoNoauth() { BSONObjBuilder b; b.append("opid", _opNum); bool a = _active && _start; @@ -313,40 +323,16 @@ namespace mongo { if ( _lockType ) b.append("lockType" , _lockType > 0 ? "write" : "read" ); b.append("waitingForLock" , _waitingForLock ); - - if( a ){ + + if( a ) { b.append("secs_running", elapsedSeconds() ); } - + b.append( "op" , opToString( _op ) ); - + b.append("ns", _ns); - - { - int size = querySize(); - if ( size == 0 ){ - // do nothing - } - else if ( size == 1 ){ - b.append( "query" , _tooBig ); - } - else if ( attempt > 2 ){ - b.append( "query" , BSON( "err" << "can't get a clean object" ) ); - log( LL_WARNING ) << "CurOp changing too much to get reading" << endl; - - } - else { - int before = checksum( _queryBuf , size ); - b.appendObject( "query" , _queryBuf , size ); - int after = checksum( _queryBuf , size ); - - if ( after != before ){ - // this means something changed - // going to retry - return infoNoauth( attempt + 1 ); - } - } - } + + _query.append( b , "query" ); // b.append("inLock", ?? stringstream clientStr; @@ -355,9 +341,9 @@ namespace mongo { if ( _client ) b.append( "desc" , _client->desc() ); - - if ( ! _message.empty() ){ - if ( _progressMeter.isActive() ){ + + if ( ! _message.empty() ) { + if ( _progressMeter.isActive() ) { StringBuilder buf(128); buf << _message.toString() << " " << _progressMeter.toString(); b.append( "msg" , buf.str() ); @@ -370,7 +356,7 @@ namespace mongo { return b.obj(); } - void Client::gotHandshake( const BSONObj& o ){ + void Client::gotHandshake( const BSONObj& o ) { BSONObjIterator i(o); { @@ -378,7 +364,7 @@ namespace mongo { assert( id.type() ); _remoteId = id.wrap( "_id" ); } - + BSONObjBuilder b; while ( i.more() ) b.append( i.next() ); @@ -388,31 +374,31 @@ namespace mongo { class HandshakeCmd : public Command { public: void help(stringstream& h) const { h << "internal"; } - HandshakeCmd() : Command( "handshake" ){} - virtual LockType locktype() const { return NONE; } + HandshakeCmd() : Command( "handshake" ) {} + virtual LockType locktype() const { return NONE; } virtual bool slaveOk() const { return true; } virtual bool adminOnly() const { return false; } virtual bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl) { Client& c = cc(); c.gotHandshake( cmdObj ); return 1; - } + } } handshakeCmd; class ClientListPlugin : public WebStatusPlugin { public: - ClientListPlugin() : WebStatusPlugin( "clients" , 20 ){} - virtual void init(){} - - virtual void run( stringstream& ss ){ + ClientListPlugin() : WebStatusPlugin( "clients" , 20 ) {} + virtual void init() {} + + virtual void run( stringstream& ss ) { using namespace mongoutils::html; ss << "\n<table border=1 cellpadding=2 cellspacing=0>"; ss << "<tr align='left'>" << th( a("", "Connections to the database, both internal and external.", "Client") ) << th( a("http://www.mongodb.org/display/DOCS/Viewing+and+Terminating+Current+Operation", "", "OpId") ) - << "<th>Active</th>" + << "<th>Active</th>" << "<th>LockType</th>" << "<th>Waiting</th>" << "<th>SecsRunning</th>" @@ -426,11 +412,11 @@ namespace mongo { << "</tr>\n"; { scoped_lock bl(Client::clientsMutex); - for( set<Client*>::iterator i = Client::clients.begin(); i != Client::clients.end(); i++ ) { + for( set<Client*>::iterator i = Client::clients.begin(); i != Client::clients.end(); i++ ) { Client *c = *i; CurOp& co = *(c->curop()); ss << "<tr><td>" << c->desc() << "</td>"; - + tablecell( ss , co.opNum() ); tablecell( ss , co.active() ); { @@ -447,8 +433,9 @@ namespace mongo { tablecell( ss , "" ); tablecell( ss , co.getOp() ); tablecell( ss , co.getNS() ); - if ( co.haveQuery() ) - tablecell( ss , co.query( true ) ); + if ( co.haveQuery() ) { + tablecell( ss , co.query() ); + } else tablecell( ss , "" ); tablecell( ss , co.getRemoteString() ); @@ -463,18 +450,18 @@ namespace mongo { ss << "</table>\n"; } - + } clientListPlugin; - int Client::recommendedYieldMicros( int * writers , int * readers ){ + int Client::recommendedYieldMicros( int * writers , int * readers ) { int num = 0; int w = 0; int r = 0; { scoped_lock bl(clientsMutex); - for ( set<Client*>::iterator i=clients.begin(); i!=clients.end(); ++i ){ + for ( set<Client*>::iterator i=clients.begin(); i!=clients.end(); ++i ) { Client* c = *i; - if ( c->curop()->isWaitingForLock() ){ + if ( c->curop()->isWaitingForLock() ) { num++; if ( c->curop()->getLockType() > 0 ) w++; @@ -483,15 +470,44 @@ namespace mongo { } } } - + if ( writers ) *writers = w; if ( readers ) *readers = r; - if ( num > 50 ) - num = 50; + int time = r * 100; + time += w * 500; + + time = min( time , 1000000 ); + + // there has been a kill request for this op - we should yield to allow the op to stop + // This function returns empty string if we aren't interrupted + if ( killCurrentOp.checkForInterruptNoAssert( false )[0] != '\0' ) { + return 100; + } + + return time; + } + + int Client::getActiveClientCount( int& writers, int& readers ) { + writers = 0; + readers = 0; + + scoped_lock bl(clientsMutex); + for ( set<Client*>::iterator i=clients.begin(); i!=clients.end(); ++i ) { + Client* c = *i; + if ( ! c->curop()->active() ) + continue; + + int l = c->curop()->getLockType(); + if ( l > 0 ) + writers++; + else if ( l < 0 ) + readers++; + + } - return num * 100; + return writers + readers; } } |