diff options
author | Antonin Kral <a.kral@bobek.cz> | 2010-08-11 12:38:57 +0200 |
---|---|---|
committer | Antonin Kral <a.kral@bobek.cz> | 2010-08-11 12:38:57 +0200 |
commit | 7645618fd3914cb8a20561625913c20d49504a49 (patch) | |
tree | 8370f846f58f6d71165b7a0e2eda04648584ec76 /db/client.cpp | |
parent | 68c73c3c7608b4c87f07440dc3232801720b1168 (diff) | |
download | mongodb-7645618fd3914cb8a20561625913c20d49504a49.tar.gz |
Imported Upstream version 1.6.0
Diffstat (limited to 'db/client.cpp')
-rw-r--r-- | db/client.cpp | 258 |
1 files changed, 222 insertions, 36 deletions
diff --git a/db/client.cpp b/db/client.cpp index a2fe568..65c467a 100644 --- a/db/client.cpp +++ b/db/client.cpp @@ -20,16 +20,22 @@ to an open socket (or logical connection if pooling on sockets) from a client. */ -#include "stdafx.h" +#include "pch.h" #include "db.h" #include "client.h" #include "curop.h" #include "json.h" #include "security.h" +#include "commands.h" +#include "instance.h" +#include "../s/d_logic.h" +#include "dbwebserver.h" +#include "../util/mongoutils/html.h" +#include "../util/mongoutils/checksum.h" namespace mongo { - mongo::mutex Client::clientsMutex; + mongo::mutex Client::clientsMutex("clientsMutex"); set<Client*> Client::clients; // always be in clientsMutex when manipulating this boost::thread_specific_ptr<Client> currentClient; @@ -37,7 +43,8 @@ namespace mongo { _context(0), _shutdown(false), _desc(desc), - _god(0) + _god(0), + _lastOp(0) { _curOp = new CurOp( this ); scoped_lock bl(clientsMutex); @@ -54,6 +61,56 @@ namespace mongo { cout << "ERROR: Client::shutdown not called: " << _desc << endl; } + void Client::_dropns( const string& ns ){ + Top::global.collectionDropped( ns ); + + dblock l; + Client::Context ctx( ns ); + if ( ! nsdetails( ns.c_str() ) ) + return; + + try { + string err; + BSONObjBuilder b; + dropCollection( ns , err , b ); + } + catch ( ... ){ + log() << "error dropping temp collection: " << ns << endl; + } + + } + + void Client::_invalidateDB( const string& db ) { + assert( db.find( '.' ) == string::npos ); + + set<string>::iterator min = _tempCollections.lower_bound( db + "." ); + set<string>::iterator max = _tempCollections.lower_bound( db + "|" ); + + _tempCollections.erase( min , max ); + + } + + void Client::invalidateDB(const string& db) { + scoped_lock bl(clientsMutex); + for ( set<Client*>::iterator i = clients.begin(); i!=clients.end(); i++ ){ + Client* cli = *i; + cli->_invalidateDB(db); + } + } + + void Client::invalidateNS( const string& ns ){ + scoped_lock bl(clientsMutex); + for ( set<Client*>::iterator i = clients.begin(); i!=clients.end(); i++ ){ + Client* cli = *i; + cli->_tempCollections.erase( ns ); + } + } + + + void Client::addTempCollection( const string& ns ) { + _tempCollections.insert( ns ); + } + bool Client::shutdown(){ _shutdown = true; if ( inShutdown() ) @@ -67,22 +124,8 @@ namespace mongo { if ( _tempCollections.size() ){ didAnything = true; - for ( list<string>::iterator i = _tempCollections.begin(); i!=_tempCollections.end(); i++ ){ - string ns = *i; - Top::global.collectionDropped( ns ); - - dblock l; - Client::Context ctx( ns ); - if ( ! nsdetails( ns.c_str() ) ) - continue; - try { - string err; - BSONObjBuilder b; - dropCollection( ns , err , b ); - } - catch ( ... ){ - log() << "error dropping temp collection: " << ns << endl; - } + for ( set<string>::iterator i = _tempCollections.begin(); i!=_tempCollections.end(); i++ ){ + _dropns( *i ); } _tempCollections.clear(); } @@ -152,8 +195,15 @@ namespace mongo { _client->_curOp->enter( this ); if ( doauth ) _auth( lockState ); - } + if ( _client->_curOp->getOp() != dbGetMore ){ // getMore's are special and should be handled else where + string errmsg; + if ( ! shardVersionOk( _ns , errmsg ) ){ + msgasserted( StaleConfigInContextCode , (string)"[" + _ns + "] shard version not ok in Client::Context: " + errmsg ); + } + } + } + void Client::Context::_auth( int lockState ){ if ( _client->_ai.isAuthorizedForLock( _db->name , lockState ) ) return; @@ -162,8 +212,8 @@ namespace mongo { _client->_context = _oldContext; // note: _oldContext may be null stringstream ss; - ss << "unauthorized for db [" << _db->name << "] lock type: " << lockState << endl; - massert( 10057 , ss.str() , 0 ); + ss << "unauthorized db:" << _db->name << " lock type:" << lockState << " client:" << _client->clientAddress(); + uasserted( 10057 , ss.str() ); } Client::Context::~Context() { @@ -172,6 +222,12 @@ namespace mongo { _client->_context = _oldContext; // note: _oldContext may be null } + string Client::clientAddress() const { + if( _curOp ) + return _curOp->getRemoteString(false); + return ""; + } + string Client::toString() const { stringstream ss; if ( _curOp ) @@ -203,7 +259,7 @@ namespace mongo { } } - BSONObj CurOp::infoNoauth() { + BSONObj CurOp::infoNoauth( int attempt ) { BSONObjBuilder b; b.append("opid", _opNum); bool a = _active && _start; @@ -220,12 +276,35 @@ namespace mongo { b.append("ns", _ns); - if( haveQuery() ) { - b.append("query", query()); + { + int size = querySize(); + if ( size == 0 ){ + // do nothing + } + else if ( size == 1 ){ + b.append( "query" , _tooBig ); + } + else if ( attempt > 2 ){ + b.append( "query" , BSON( "err" << "can't get a clean object" ) ); + log( LL_WARNING ) << "CurOp changing too much to get reading" << endl; + + } + else { + int before = checksum( _queryBuf , size ); + b.appendObject( "query" , _queryBuf , size ); + int after = checksum( _queryBuf , size ); + + if ( after != before ){ + // this means something changed + // going to retry + return infoNoauth( attempt + 1 ); + } + } } + // b.append("inLock", ?? stringstream clientStr; - clientStr << inet_ntoa( _remote.sin_addr ) << ":" << ntohs( _remote.sin_port ); + clientStr << _remote.toString(); b.append("client", clientStr.str()); if ( _client ) @@ -234,32 +313,139 @@ namespace mongo { if ( ! _message.empty() ){ if ( _progressMeter.isActive() ){ StringBuilder buf(128); - buf << _message << " " << _progressMeter.toString(); + buf << _message.toString() << " " << _progressMeter.toString(); b.append( "msg" , buf.str() ); } else { - b.append( "msg" , _message ); + b.append( "msg" , _message.toString() ); } } return b.obj(); } - int Client::recommendedYieldMicros(){ + void Client::gotHandshake( const BSONObj& o ){ + BSONObjIterator i(o); + + { + BSONElement id = i.next(); + assert( id.type() ); + _remoteId = id.wrap( "_id" ); + } + + BSONObjBuilder b; + while ( i.more() ) + b.append( i.next() ); + _handshake = b.obj(); + } + + class HandshakeCmd : public Command { + public: + void help(stringstream& h) const { h << "internal"; } + HandshakeCmd() : Command( "handshake" ){} + virtual LockType locktype() const { return NONE; } + virtual bool slaveOk() const { return true; } + virtual bool adminOnly() const { return false; } + virtual bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl) { + Client& c = cc(); + c.gotHandshake( cmdObj ); + return 1; + } + + } handshakeCmd; + + class ClientListPlugin : public WebStatusPlugin { + public: + ClientListPlugin() : WebStatusPlugin( "clients" , 20 ){} + virtual void init(){} + + virtual void run( stringstream& ss ){ + using namespace mongoutils::html; + + ss << "\n<table border=1 cellpadding=2 cellspacing=0>"; + ss << "<tr align='left'>" + << th( a("", "Connections to the database, both internal and external.", "Client") ) + << th( a("http://www.mongodb.org/display/DOCS/Viewing+and+Terminating+Current+Operation", "", "OpId") ) + << "<th>Active</th>" + << "<th>LockType</th>" + << "<th>Waiting</th>" + << "<th>SecsRunning</th>" + << "<th>Op</th>" + << th( a("http://www.mongodb.org/display/DOCS/Developer+FAQ#DeveloperFAQ-What%27sa%22namespace%22%3F", "", "Namespace") ) + << "<th>Query</th>" + << "<th>client</th>" + << "<th>msg</th>" + << "<th>progress</th>" + + << "</tr>\n"; + { + scoped_lock bl(Client::clientsMutex); + for( set<Client*>::iterator i = Client::clients.begin(); i != Client::clients.end(); i++ ) { + Client *c = *i; + CurOp& co = *(c->curop()); + ss << "<tr><td>" << c->desc() << "</td>"; + + tablecell( ss , co.opNum() ); + tablecell( ss , co.active() ); + { + int lt = co.getLockType(); + if( lt == -1 ) tablecell(ss, "R"); + else if( lt == 1 ) tablecell(ss, "W"); + else + tablecell( ss , lt); + } + tablecell( ss , co.isWaitingForLock() ); + if ( co.active() ) + tablecell( ss , co.elapsedSeconds() ); + else + tablecell( ss , "" ); + tablecell( ss , co.getOp() ); + tablecell( ss , co.getNS() ); + if ( co.haveQuery() ) + tablecell( ss , co.query() ); + else + tablecell( ss , "" ); + tablecell( ss , co.getRemoteString() ); + + tablecell( ss , co.getMessage() ); + tablecell( ss , co.getProgressMeter().toString() ); + + + ss << "</tr>\n"; + } + } + ss << "</table>\n"; + + } + + } clientListPlugin; + + int Client::recommendedYieldMicros( int * writers , int * readers ){ int num = 0; + int w = 0; + int r = 0; { scoped_lock bl(clientsMutex); - num = clients.size(); + for ( set<Client*>::iterator i=clients.begin(); i!=clients.end(); ++i ){ + Client* c = *i; + if ( c->curop()->isWaitingForLock() ){ + num++; + if ( c->curop()->getLockType() > 0 ) + w++; + else + r++; + } + } } - if ( --num <= 0 ) // -- is for myself - return 0; - + if ( writers ) + *writers = w; + if ( readers ) + *readers = r; + if ( num > 50 ) num = 50; - num *= 100; - return num; + return num * 100; } - } |