summaryrefslogtreecommitdiff
path: root/s/request.cpp
diff options
context:
space:
mode:
Diffstat (limited to 's/request.cpp')
-rw-r--r--s/request.cpp133
1 files changed, 106 insertions, 27 deletions
diff --git a/s/request.cpp b/s/request.cpp
index 02ada3c..e09c3bc 100644
--- a/s/request.cpp
+++ b/s/request.cpp
@@ -19,32 +19,48 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#include "stdafx.h"
+#include "pch.h"
#include "server.h"
+
#include "../db/commands.h"
#include "../db/dbmessage.h"
+#include "../db/stats/counters.h"
+
#include "../client/connpool.h"
#include "request.h"
#include "config.h"
#include "chunk.h"
+#include "stats.h"
+#include "cursors.h"
+#include "grid.h"
namespace mongo {
Request::Request( Message& m, AbstractMessagingPort* p ) :
- _m(m) , _d( m ) , _p(p){
+ _m(m) , _d( m ) , _p(p) , _didInit(false){
assert( _d.getns() );
- _id = _m.data->id;
+ _id = _m.header()->id;
- _clientId = p ? p->remotePort() << 16 : 0;
+ _clientId = p ? p->getClientId() : 0;
_clientInfo = ClientInfo::get( _clientId );
- _clientInfo->newRequest();
+ _clientInfo->newRequest( p );
+ }
+
+ void Request::init(){
+ if ( _didInit )
+ return;
+ _didInit = true;
reset();
}
-
+
void Request::reset( bool reload ){
+ if ( _m.operation() == dbKillCursors ){
+ return;
+ }
+
_config = grid.getDBConfig( getns() );
if ( reload )
uassert( 10192 , "db config reload failed!" , _config->reload() );
@@ -54,49 +70,61 @@ namespace mongo {
uassert( 10193 , (string)"no shard info for: " + getns() , _chunkManager );
}
else {
- _chunkManager = 0;
+ _chunkManager.reset();
}
- _m.data->id = _id;
+ _m.header()->id = _id;
}
- string Request::singleServerName(){
+ Shard Request::primaryShard() const {
+ assert( _didInit );
+
if ( _chunkManager ){
if ( _chunkManager->numChunks() > 1 )
- throw UserException( 8060 , "can't call singleServerName on a sharded collection" );
- return _chunkManager->findChunk( _chunkManager->getShardKey().globalMin() ).getShard();
+ throw UserException( 8060 , "can't call primaryShard on a sharded collection" );
+ return _chunkManager->findChunk( _chunkManager->getShardKey().globalMin() )->getShard();
}
- string s = _config->getShard( getns() );
- uassert( 10194 , "can't call singleServerName on a sharded collection!" , s.size() > 0 );
+ Shard s = _config->getShard( getns() );
+ uassert( 10194 , "can't call primaryShard on a sharded collection!" , s.ok() );
return s;
}
void Request::process( int attempt ){
-
- log(3) << "Request::process ns: " << getns() << " msg id:" << (int)(_m.data->id) << " attempt: " << attempt << endl;
-
- int op = _m.data->operation();
+ init();
+ int op = _m.operation();
assert( op > dbMsg );
+ if ( op == dbKillCursors ){
+ cursorCache.gotKillCursors( _m );
+ return;
+ }
+
+
+ log(3) << "Request::process ns: " << getns() << " msg id:" << (int)(_m.header()->id) << " attempt: " << attempt << endl;
+
Strategy * s = SINGLE;
+ _counter = &opsNonSharded;
_d.markSet();
-
+
if ( _chunkManager ){
s = SHARDED;
+ _counter = &opsSharded;
}
+ bool iscmd = false;
if ( op == dbQuery ) {
+ iscmd = isCommand();
try {
s->queryOp( *this );
}
catch ( StaleConfigException& staleConfig ){
log() << staleConfig.what() << " attempt: " << attempt << endl;
uassert( 10195 , "too many attempts to update config, failing" , attempt < 5 );
-
+ ShardConnection::checkMyConnectionVersions( getns() );
sleepsecs( attempt );
- reset( true );
+ reset( ! staleConfig.justConnection() );
_d.markReset();
process( attempt + 1 );
return;
@@ -108,8 +136,29 @@ namespace mongo {
else {
s->writeOp( op, *this );
}
+
+ globalOpCounters.gotOp( op , iscmd );
+ _counter->gotOp( op , iscmd );
}
+ bool Request::isCommand() const {
+ int x = _d.getQueryNToReturn();
+ return ( x == 1 || x == -1 ) && strstr( getns() , ".$cmd" );
+ }
+
+ void Request::gotInsert(){
+ globalOpCounters.gotInsert();
+ _counter->gotInsert();
+ }
+
+ void Request::reply( Message & response , const string& fromServer ){
+ assert( _didInit );
+ long long cursor =response.header()->getCursor();
+ if ( cursor ){
+ cursorCache.storeRef( fromServer , cursor );
+ }
+ _p->reply( _m , response , _id );
+ }
ClientInfo::ClientInfo( int clientId ) : _id( clientId ){
_cur = &_a;
@@ -118,18 +167,33 @@ namespace mongo {
}
ClientInfo::~ClientInfo(){
- scoped_lock lk( _clientsLock );
- ClientCache::iterator i = _clients.find( _id );
- if ( i != _clients.end() ){
- _clients.erase( i );
+ if ( _lastAccess ){
+ scoped_lock lk( _clientsLock );
+ ClientCache::iterator i = _clients.find( _id );
+ if ( i != _clients.end() ){
+ _clients.erase( i );
+ }
}
}
void ClientInfo::addShard( const string& shard ){
_cur->insert( shard );
+ _sinceLastGetError.insert( shard );
}
- void ClientInfo::newRequest(){
+ void ClientInfo::newRequest( AbstractMessagingPort* p ){
+
+ if ( p ){
+ string r = p->remote().toString();
+ if ( _remote == "" )
+ _remote = r;
+ else if ( _remote != r ){
+ stringstream ss;
+ ss << "remotes don't match old [" << _remote << "] new [" << r << "]";
+ throw UserException( 13134 , ss.str() );
+ }
+ }
+
_lastAccess = (int) time(0);
set<string> * temp = _cur;
@@ -168,8 +232,23 @@ namespace mongo {
return info;
}
- map<int,ClientInfo*> ClientInfo::_clients;
- mongo::mutex ClientInfo::_clientsLock;
+ void ClientInfo::disconnect( int clientId ){
+ if ( ! clientId )
+ return;
+
+ scoped_lock lk( _clientsLock );
+ ClientCache::iterator i = _clients.find( clientId );
+ if ( i == _clients.end() )
+ return;
+
+ ClientInfo* ci = i->second;
+ ci->disconnect();
+ delete ci;
+ _clients.erase( i );
+ }
+
+ ClientCache& ClientInfo::_clients = *(new ClientCache());
+ mongo::mutex ClientInfo::_clientsLock("_clientsLock");
boost::thread_specific_ptr<ClientInfo> ClientInfo::_tlInfo;
} // namespace mongo