diff options
Diffstat (limited to 's/request.cpp')
-rw-r--r-- | s/request.cpp | 133 |
1 files changed, 106 insertions, 27 deletions
diff --git a/s/request.cpp b/s/request.cpp index 02ada3c..e09c3bc 100644 --- a/s/request.cpp +++ b/s/request.cpp @@ -19,32 +19,48 @@ * along with this program. If not, see <http://www.gnu.org/licenses/>. */ -#include "stdafx.h" +#include "pch.h" #include "server.h" + #include "../db/commands.h" #include "../db/dbmessage.h" +#include "../db/stats/counters.h" + #include "../client/connpool.h" #include "request.h" #include "config.h" #include "chunk.h" +#include "stats.h" +#include "cursors.h" +#include "grid.h" namespace mongo { Request::Request( Message& m, AbstractMessagingPort* p ) : - _m(m) , _d( m ) , _p(p){ + _m(m) , _d( m ) , _p(p) , _didInit(false){ assert( _d.getns() ); - _id = _m.data->id; + _id = _m.header()->id; - _clientId = p ? p->remotePort() << 16 : 0; + _clientId = p ? p->getClientId() : 0; _clientInfo = ClientInfo::get( _clientId ); - _clientInfo->newRequest(); + _clientInfo->newRequest( p ); + } + + void Request::init(){ + if ( _didInit ) + return; + _didInit = true; reset(); } - + void Request::reset( bool reload ){ + if ( _m.operation() == dbKillCursors ){ + return; + } + _config = grid.getDBConfig( getns() ); if ( reload ) uassert( 10192 , "db config reload failed!" , _config->reload() ); @@ -54,49 +70,61 @@ namespace mongo { uassert( 10193 , (string)"no shard info for: " + getns() , _chunkManager ); } else { - _chunkManager = 0; + _chunkManager.reset(); } - _m.data->id = _id; + _m.header()->id = _id; } - string Request::singleServerName(){ + Shard Request::primaryShard() const { + assert( _didInit ); + if ( _chunkManager ){ if ( _chunkManager->numChunks() > 1 ) - throw UserException( 8060 , "can't call singleServerName on a sharded collection" ); - return _chunkManager->findChunk( _chunkManager->getShardKey().globalMin() ).getShard(); + throw UserException( 8060 , "can't call primaryShard on a sharded collection" ); + return _chunkManager->findChunk( _chunkManager->getShardKey().globalMin() )->getShard(); } - string s = _config->getShard( getns() ); - uassert( 10194 , "can't call singleServerName on a sharded collection!" , s.size() > 0 ); + Shard s = _config->getShard( getns() ); + uassert( 10194 , "can't call primaryShard on a sharded collection!" , s.ok() ); return s; } void Request::process( int attempt ){ - - log(3) << "Request::process ns: " << getns() << " msg id:" << (int)(_m.data->id) << " attempt: " << attempt << endl; - - int op = _m.data->operation(); + init(); + int op = _m.operation(); assert( op > dbMsg ); + if ( op == dbKillCursors ){ + cursorCache.gotKillCursors( _m ); + return; + } + + + log(3) << "Request::process ns: " << getns() << " msg id:" << (int)(_m.header()->id) << " attempt: " << attempt << endl; + Strategy * s = SINGLE; + _counter = &opsNonSharded; _d.markSet(); - + if ( _chunkManager ){ s = SHARDED; + _counter = &opsSharded; } + bool iscmd = false; if ( op == dbQuery ) { + iscmd = isCommand(); try { s->queryOp( *this ); } catch ( StaleConfigException& staleConfig ){ log() << staleConfig.what() << " attempt: " << attempt << endl; uassert( 10195 , "too many attempts to update config, failing" , attempt < 5 ); - + ShardConnection::checkMyConnectionVersions( getns() ); sleepsecs( attempt ); - reset( true ); + reset( ! staleConfig.justConnection() ); _d.markReset(); process( attempt + 1 ); return; @@ -108,8 +136,29 @@ namespace mongo { else { s->writeOp( op, *this ); } + + globalOpCounters.gotOp( op , iscmd ); + _counter->gotOp( op , iscmd ); } + bool Request::isCommand() const { + int x = _d.getQueryNToReturn(); + return ( x == 1 || x == -1 ) && strstr( getns() , ".$cmd" ); + } + + void Request::gotInsert(){ + globalOpCounters.gotInsert(); + _counter->gotInsert(); + } + + void Request::reply( Message & response , const string& fromServer ){ + assert( _didInit ); + long long cursor =response.header()->getCursor(); + if ( cursor ){ + cursorCache.storeRef( fromServer , cursor ); + } + _p->reply( _m , response , _id ); + } ClientInfo::ClientInfo( int clientId ) : _id( clientId ){ _cur = &_a; @@ -118,18 +167,33 @@ namespace mongo { } ClientInfo::~ClientInfo(){ - scoped_lock lk( _clientsLock ); - ClientCache::iterator i = _clients.find( _id ); - if ( i != _clients.end() ){ - _clients.erase( i ); + if ( _lastAccess ){ + scoped_lock lk( _clientsLock ); + ClientCache::iterator i = _clients.find( _id ); + if ( i != _clients.end() ){ + _clients.erase( i ); + } } } void ClientInfo::addShard( const string& shard ){ _cur->insert( shard ); + _sinceLastGetError.insert( shard ); } - void ClientInfo::newRequest(){ + void ClientInfo::newRequest( AbstractMessagingPort* p ){ + + if ( p ){ + string r = p->remote().toString(); + if ( _remote == "" ) + _remote = r; + else if ( _remote != r ){ + stringstream ss; + ss << "remotes don't match old [" << _remote << "] new [" << r << "]"; + throw UserException( 13134 , ss.str() ); + } + } + _lastAccess = (int) time(0); set<string> * temp = _cur; @@ -168,8 +232,23 @@ namespace mongo { return info; } - map<int,ClientInfo*> ClientInfo::_clients; - mongo::mutex ClientInfo::_clientsLock; + void ClientInfo::disconnect( int clientId ){ + if ( ! clientId ) + return; + + scoped_lock lk( _clientsLock ); + ClientCache::iterator i = _clients.find( clientId ); + if ( i == _clients.end() ) + return; + + ClientInfo* ci = i->second; + ci->disconnect(); + delete ci; + _clients.erase( i ); + } + + ClientCache& ClientInfo::_clients = *(new ClientCache()); + mongo::mutex ClientInfo::_clientsLock("_clientsLock"); boost::thread_specific_ptr<ClientInfo> ClientInfo::_tlInfo; } // namespace mongo |