diff options
Diffstat (limited to 'client/connpool.cpp')
-rw-r--r-- | client/connpool.cpp | 171 |
1 files changed, 119 insertions, 52 deletions
diff --git a/client/connpool.cpp b/client/connpool.cpp index 5a08483..dae13f6 100644 --- a/client/connpool.cpp +++ b/client/connpool.cpp @@ -18,76 +18,99 @@ // _ todo: reconnect? -#include "stdafx.h" +#include "pch.h" #include "connpool.h" #include "../db/commands.h" #include "syncclusterconnection.h" +#include "../s/shard.h" namespace mongo { DBConnectionPool pool; + DBClientBase* DBConnectionPool::_get(const string& ident) { + scoped_lock L(_mutex); + + PoolForHost& p = _pools[ident]; + if ( p.pool.empty() ) + return 0; + + DBClientBase *c = p.pool.top(); + p.pool.pop(); + return c; + } + + DBClientBase* DBConnectionPool::_finishCreate( const string& host , DBClientBase* conn ){ + { + scoped_lock L(_mutex); + PoolForHost& p = _pools[host]; + p.created++; + } + + onCreate( conn ); + onHandedOut( conn ); + + return conn; + } + + DBClientBase* DBConnectionPool::get(const ConnectionString& url) { + DBClientBase * c = _get( url.toString() ); + if ( c ){ + onHandedOut( c ); + return c; + } + + string errmsg; + c = url.connect( errmsg ); + uassert( 13328 , (string)"dbconnectionpool: connect failed " + url.toString() + " : " + errmsg , c ); + + return _finishCreate( url.toString() , c ); + } + DBClientBase* DBConnectionPool::get(const string& host) { - scoped_lock L(poolMutex); + DBClientBase * c = _get( host ); + if ( c ){ + onHandedOut( c ); + return c; + } - PoolForHost *&p = pools[host]; - if ( p == 0 ) - p = new PoolForHost(); - if ( p->pool.empty() ) { - int numCommas = DBClientBase::countCommas( host ); - DBClientBase *c; - - if( numCommas == 0 ) { - DBClientConnection *cc = new DBClientConnection(true); - log(2) << "creating new connection for pool to:" << host << endl; - string errmsg; - if ( !cc->connect(host.c_str(), errmsg) ) { - delete cc; - uassert( 11002 , (string)"dbconnectionpool: connect failed " + host , false); - return 0; - } - c = cc; - onCreate( c ); - } - else if ( numCommas == 1 ) { - DBClientPaired *p = new DBClientPaired(); - if( !p->connect(host) ) { - delete p; - uassert( 11003 , (string)"dbconnectionpool: connect failed [2] " + host , false); - return 0; - } - c = p; - } - else if ( numCommas == 2 ) { - c = new SyncClusterConnection( host ); - } - else { - uassert( 13071 , (string)"invalid hostname [" + host + "]" , 0 ); + string errmsg; + ConnectionString cs = ConnectionString::parse( host , errmsg ); + uassert( 13071 , (string)"invalid hostname [" + host + "]" + errmsg , cs.isValid() ); + + c = cs.connect( errmsg ); + uassert( 11002 , (string)"dbconnectionpool: connect failed " + host + " : " + errmsg , c ); + return _finishCreate( host , c ); + } + + DBConnectionPool::~DBConnectionPool(){ + for ( map<string,PoolForHost>::iterator i = _pools.begin(); i != _pools.end(); i++ ){ + PoolForHost& p = i->second; + + while ( ! p.pool.empty() ){ + DBClientBase * c = p.pool.top(); + delete c; + p.pool.pop(); } - return c; } - DBClientBase *c = p->pool.top(); - p->pool.pop(); - onHandedOut( c ); - return c; } void DBConnectionPool::flush(){ - scoped_lock L(poolMutex); - for ( map<string,PoolForHost*>::iterator i = pools.begin(); i != pools.end(); i++ ){ - PoolForHost* p = i->second; + scoped_lock L(_mutex); + for ( map<string,PoolForHost>::iterator i = _pools.begin(); i != _pools.end(); i++ ){ + PoolForHost& p = i->second; vector<DBClientBase*> all; - while ( ! p->pool.empty() ){ - DBClientBase * c = p->pool.top(); - p->pool.pop(); + while ( ! p.pool.empty() ){ + DBClientBase * c = p.pool.top(); + p.pool.pop(); all.push_back( c ); bool res; c->isMaster( res ); } for ( vector<DBClientBase*>::iterator i=all.begin(); i != all.end(); i++ ){ - p->pool.push( *i ); + p.pool.push( *i ); } } } @@ -114,6 +137,26 @@ namespace mongo { } } + void DBConnectionPool::appendInfo( BSONObjBuilder& b ){ + scoped_lock lk( _mutex ); + BSONObjBuilder bb( b.subobjStart( "hosts" ) ); + for ( map<string,PoolForHost>::iterator i=_pools.begin(); i!=_pools.end(); ++i ){ + string s = i->first; + BSONObjBuilder temp( bb.subobjStart( s.c_str() ) ); + temp.append( "available" , (int)(i->second.pool.size()) ); + temp.appendNumber( "created" , i->second.created ); + temp.done(); + } + bb.done(); + } + + ScopedDbConnection * ScopedDbConnection::steal(){ + assert( _conn ); + ScopedDbConnection * n = new ScopedDbConnection( _host , _conn ); + _conn = 0; + return n; + } + ScopedDbConnection::~ScopedDbConnection() { if ( _conn ){ if ( ! _conn->isFailed() ) { @@ -124,20 +167,44 @@ namespace mongo { } } + ScopedDbConnection::ScopedDbConnection(const Shard& shard ) + : _host( shard.getConnString() ) , _conn( pool.get(_host) ){ + } + + ScopedDbConnection::ScopedDbConnection(const Shard* shard ) + : _host( shard->getConnString() ) , _conn( pool.get(_host) ){ + } + class PoolFlushCmd : public Command { public: - PoolFlushCmd() : Command( "connpoolsync" ){} - virtual LockType locktype(){ return NONE; } - virtual bool run(const char*, mongo::BSONObj&, std::string&, mongo::BSONObjBuilder& result, bool){ + PoolFlushCmd() : Command( "connPoolSync" , false , "connpoolsync" ){} + virtual void help( stringstream &help ) const { help<<"internal"; } + virtual LockType locktype() const { return NONE; } + virtual bool run(const string&, mongo::BSONObj&, std::string&, mongo::BSONObjBuilder& result, bool){ pool.flush(); - result << "ok" << 1; return true; } - virtual bool slaveOk(){ + virtual bool slaveOk() const { return true; } } poolFlushCmd; + class PoolStats : public Command { + public: + PoolStats() : Command( "connPoolStats" ){} + virtual void help( stringstream &help ) const { help<<"stats about connection pool"; } + virtual LockType locktype() const { return NONE; } + virtual bool run(const string&, mongo::BSONObj&, std::string&, mongo::BSONObjBuilder& result, bool){ + pool.appendInfo( result ); + return true; + } + virtual bool slaveOk() const { + return true; + } + + } poolStatsCmd; + + } // namespace mongo |