diff options
Diffstat (limited to 's/shardconnection.cpp')
-rw-r--r-- | s/shardconnection.cpp | 173 |
1 files changed, 85 insertions, 88 deletions
diff --git a/s/shardconnection.cpp b/s/shardconnection.cpp index 694693b..d05f5b1 100644 --- a/s/shardconnection.cpp +++ b/s/shardconnection.cpp @@ -23,7 +23,24 @@ #include <set> namespace mongo { - + + // The code in shardconnection may run not only in mongos context. When elsewhere, chunk shard versioning + // is disabled. To enable chunk shard versioning, provide the check/resetShardVerionCB's below + // + // TODO: better encapsulate this mechanism. + + bool defaultCheckShardVersion( DBClientBase & conn , const string& ns , bool authoritative , int tryNumber ) { + // no-op in mongod + return false; + } + + void defaultResetShardVersion( DBClientBase * conn ) { + // no-op in mongod + } + + boost::function4<bool, DBClientBase&, const string&, bool, int> checkShardVersionCB = defaultCheckShardVersion; + boost::function1<void, DBClientBase*> resetShardVersionCB = defaultResetShardVersion; + /** * holds all the actual db connections for a client to various servers * 1 pre thread, so don't have to worry about thread safety @@ -31,39 +48,22 @@ namespace mongo { class ClientConnections : boost::noncopyable { public: struct Status : boost::noncopyable { - Status() : created(0), avail(0){} + Status() : created(0), avail(0) {} - long long created; + long long created; DBClientBase* avail; }; - Nullstream& debug( Status * s = 0 , const string& addr = "" ){ - static int ll = 9; + ClientConnections() {} - if ( logLevel < ll ) - return nullstream; - Nullstream& l = log(ll); - - l << "ClientConnections DEBUG " << this << " "; - if ( s ){ - l << "s: " << s << " addr: " << addr << " "; - } - return l; - } - - ClientConnections() : _mutex("ClientConnections") { - debug() << " NEW " << endl; - } - - ~ClientConnections(){ - debug() << " KILLING " << endl; - for ( map<string,Status*>::iterator i=_hosts.begin(); i!=_hosts.end(); ++i ){ + ~ClientConnections() { + for ( HostMap::iterator i=_hosts.begin(); i!=_hosts.end(); ++i ) { string addr = i->first; Status* ss = i->second; assert( ss ); - if ( ss->avail ){ - /* if we're shutting down, don't want to initiate release mechanism as it is slow, + if ( ss->avail ) { + /* if we're shutting down, don't want to initiate release mechanism as it is slow, and isn't needed since all connections will be closed anyway */ if ( inShutdown() ) delete ss->avail; @@ -75,49 +75,41 @@ namespace mongo { } _hosts.clear(); } - - DBClientBase * get( const string& addr , const string& ns ){ + + DBClientBase * get( const string& addr , const string& ns ) { _check( ns ); - scoped_lock lk( _mutex ); + Status* &s = _hosts[addr]; if ( ! s ) s = new Status(); - - debug( s , addr ) << "WANT ONE pool avail: " << s->avail << endl; - - if ( s->avail ){ + + if ( s->avail ) { DBClientBase* c = s->avail; s->avail = 0; - debug( s , addr ) << "GOT " << c << endl; pool.onHandedOut( c ); return c; } - debug() << "CREATING NEW CONNECTION" << endl; s->created++; return pool.get( addr ); } - - void done( const string& addr , DBClientBase* conn ){ - scoped_lock lk( _mutex ); + + void done( const string& addr , DBClientBase* conn ) { Status* s = _hosts[addr]; assert( s ); - if ( s->avail ){ - debug( s , addr ) << "DONE WITH TEMP" << endl; + if ( s->avail ) { release( addr , conn ); return; } s->avail = conn; - debug( s , addr ) << "PUSHING: " << conn << endl; } - - void sync(){ - scoped_lock lk( _mutex ); - for ( map<string,Status*>::iterator i=_hosts.begin(); i!=_hosts.end(); ++i ){ + + void sync() { + for ( HostMap::iterator i=_hosts.begin(); i!=_hosts.end(); ++i ) { string addr = i->first; Status* ss = i->second; - if ( ss->avail ){ + if ( ss->avail ) { ss->avail->getLastError(); release( addr , ss->avail ); ss->avail = 0; @@ -127,63 +119,67 @@ namespace mongo { _hosts.clear(); } - void checkVersions( const string& ns ){ + void checkVersions( const string& ns ) { vector<Shard> all; Shard::getAllShards( all ); - scoped_lock lk( _mutex ); - for ( unsigned i=0; i<all.size(); i++ ){ + for ( unsigned i=0; i<all.size(); i++ ) { Status* &s = _hosts[all[i].getConnString()]; if ( ! s ) s = new Status(); } - for ( map<string,Status*>::iterator i=_hosts.begin(); i!=_hosts.end(); ++i ){ - if ( ! Shard::isAShard( i->first ) ) + for ( HostMap::iterator i=_hosts.begin(); i!=_hosts.end(); ++i ) { + if ( ! Shard::isAShardNode( i->first ) ) continue; Status* ss = i->second; assert( ss ); if ( ! ss->avail ) ss->avail = pool.get( i->first ); - checkShardVersion( *ss->avail , ns ); + checkShardVersionCB( *ss->avail , ns , false , 1 ); } } - void release( const string& addr , DBClientBase * conn ){ - resetShardVersion( conn ); + void release( const string& addr , DBClientBase * conn ) { + resetShardVersionCB( conn ); BSONObj res; - + try { - if ( conn->simpleCommand( "admin" , &res , "unsetSharding" ) ){ + if ( conn->simpleCommand( "admin" , &res , "unsetSharding" ) ) { pool.release( addr , conn ); } else { - log(LL_ERROR) << " couldn't unset sharding :( " << res << endl; + error() << "unset sharding failed : " << res << endl; delete conn; } } - catch ( std::exception& e ){ - log(LL_ERROR) << "couldn't unsert sharding : " << e.what() << endl; + catch ( SocketException& e ) { + // server down or something + LOG(1) << "socket exception trying to unset sharding: " << e.toString() << endl; + delete conn; + } + catch ( std::exception& e ) { + error() << "couldn't unset sharding : " << e.what() << endl; delete conn; } } - - void _check( const string& ns ){ + + void _check( const string& ns ) { if ( ns.size() == 0 || _seenNS.count( ns ) ) return; _seenNS.insert( ns ); checkVersions( ns ); } - - map<string,Status*> _hosts; - mongo::mutex _mutex; + + typedef map<string,Status*,DBConnectionPool::serverNameCompare> HostMap; + HostMap _hosts; set<string> _seenNS; // ----- - + static thread_specific_ptr<ClientConnections> _perThread; - static ClientConnections* get(){ + static ClientConnections* threadInstance() { ClientConnections* cc = _perThread.get(); - if ( ! cc ){ + if ( ! cc ) { cc = new ClientConnections(); _perThread.reset( cc ); } @@ -202,57 +198,58 @@ namespace mongo { : _addr( s.getConnString() ) , _ns( ns ) { _init(); } - + ShardConnection::ShardConnection( const string& addr , const string& ns ) : _addr( addr ) , _ns( ns ) { _init(); } - - void ShardConnection::_init(){ + + void ShardConnection::_init() { assert( _addr.size() ); - _conn = ClientConnections::get()->get( _addr , _ns ); + _conn = ClientConnections::threadInstance()->get( _addr , _ns ); _finishedInit = false; } - void ShardConnection::_finishInit(){ + void ShardConnection::_finishInit() { if ( _finishedInit ) return; _finishedInit = true; - - if ( _ns.size() ){ - _setVersion = checkShardVersion( *_conn , _ns ); + + if ( _ns.size() ) { + _setVersion = checkShardVersionCB( *_conn , _ns , false , 1 ); } else { _setVersion = false; } - + } - void ShardConnection::done(){ - if ( _conn ){ - ClientConnections::get()->done( _addr , _conn ); + void ShardConnection::done() { + if ( _conn ) { + ClientConnections::threadInstance()->done( _addr , _conn ); _conn = 0; _finishedInit = true; } } - void ShardConnection::kill(){ - if ( _conn ){ + void ShardConnection::kill() { + if ( _conn ) { + resetShardVersionCB( _conn ); delete _conn; _conn = 0; _finishedInit = true; } } - void ShardConnection::sync(){ - ClientConnections::get()->sync(); + void ShardConnection::sync() { + ClientConnections::threadInstance()->sync(); } - bool ShardConnection::runCommand( const string& db , const BSONObj& cmd , BSONObj& res ){ + bool ShardConnection::runCommand( const string& db , const BSONObj& cmd , BSONObj& res ) { assert( _conn ); bool ok = _conn->runCommand( db , cmd , res ); - if ( ! ok ){ - if ( res["code"].numberInt() == StaleConfigInContextCode ){ + if ( ! ok ) { + if ( res["code"].numberInt() == StaleConfigInContextCode ) { string big = res["errmsg"].String(); string ns,raw; massert( 13409 , (string)"can't parse ns from: " + big , StaleConfigException::parse( big , ns , raw ) ); @@ -263,12 +260,12 @@ namespace mongo { return ok; } - void ShardConnection::checkMyConnectionVersions( const string & ns ){ - ClientConnections::get()->checkVersions( ns ); + void ShardConnection::checkMyConnectionVersions( const string & ns ) { + ClientConnections::threadInstance()->checkVersions( ns ); } ShardConnection::~ShardConnection() { - if ( _conn ){ + if ( _conn ) { if ( ! _conn->isFailed() ) { /* see done() comments above for why we log this line */ log() << "~ScopedDBConnection: _conn != null" << endl; |