summaryrefslogtreecommitdiff
path: root/client/connpool.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'client/connpool.cpp')
-rw-r--r--client/connpool.cpp171
1 files changed, 119 insertions, 52 deletions
diff --git a/client/connpool.cpp b/client/connpool.cpp
index 5a08483..dae13f6 100644
--- a/client/connpool.cpp
+++ b/client/connpool.cpp
@@ -18,76 +18,99 @@
// _ todo: reconnect?
-#include "stdafx.h"
+#include "pch.h"
#include "connpool.h"
#include "../db/commands.h"
#include "syncclusterconnection.h"
+#include "../s/shard.h"
namespace mongo {
DBConnectionPool pool;
+ DBClientBase* DBConnectionPool::_get(const string& ident) {
+ scoped_lock L(_mutex);
+
+ PoolForHost& p = _pools[ident];
+ if ( p.pool.empty() )
+ return 0;
+
+ DBClientBase *c = p.pool.top();
+ p.pool.pop();
+ return c;
+ }
+
+ DBClientBase* DBConnectionPool::_finishCreate( const string& host , DBClientBase* conn ){
+ {
+ scoped_lock L(_mutex);
+ PoolForHost& p = _pools[host];
+ p.created++;
+ }
+
+ onCreate( conn );
+ onHandedOut( conn );
+
+ return conn;
+ }
+
+ DBClientBase* DBConnectionPool::get(const ConnectionString& url) {
+ DBClientBase * c = _get( url.toString() );
+ if ( c ){
+ onHandedOut( c );
+ return c;
+ }
+
+ string errmsg;
+ c = url.connect( errmsg );
+ uassert( 13328 , (string)"dbconnectionpool: connect failed " + url.toString() + " : " + errmsg , c );
+
+ return _finishCreate( url.toString() , c );
+ }
+
DBClientBase* DBConnectionPool::get(const string& host) {
- scoped_lock L(poolMutex);
+ DBClientBase * c = _get( host );
+ if ( c ){
+ onHandedOut( c );
+ return c;
+ }
- PoolForHost *&p = pools[host];
- if ( p == 0 )
- p = new PoolForHost();
- if ( p->pool.empty() ) {
- int numCommas = DBClientBase::countCommas( host );
- DBClientBase *c;
-
- if( numCommas == 0 ) {
- DBClientConnection *cc = new DBClientConnection(true);
- log(2) << "creating new connection for pool to:" << host << endl;
- string errmsg;
- if ( !cc->connect(host.c_str(), errmsg) ) {
- delete cc;
- uassert( 11002 , (string)"dbconnectionpool: connect failed " + host , false);
- return 0;
- }
- c = cc;
- onCreate( c );
- }
- else if ( numCommas == 1 ) {
- DBClientPaired *p = new DBClientPaired();
- if( !p->connect(host) ) {
- delete p;
- uassert( 11003 , (string)"dbconnectionpool: connect failed [2] " + host , false);
- return 0;
- }
- c = p;
- }
- else if ( numCommas == 2 ) {
- c = new SyncClusterConnection( host );
- }
- else {
- uassert( 13071 , (string)"invalid hostname [" + host + "]" , 0 );
+ string errmsg;
+ ConnectionString cs = ConnectionString::parse( host , errmsg );
+ uassert( 13071 , (string)"invalid hostname [" + host + "]" + errmsg , cs.isValid() );
+
+ c = cs.connect( errmsg );
+ uassert( 11002 , (string)"dbconnectionpool: connect failed " + host + " : " + errmsg , c );
+ return _finishCreate( host , c );
+ }
+
+ DBConnectionPool::~DBConnectionPool(){
+ for ( map<string,PoolForHost>::iterator i = _pools.begin(); i != _pools.end(); i++ ){
+ PoolForHost& p = i->second;
+
+ while ( ! p.pool.empty() ){
+ DBClientBase * c = p.pool.top();
+ delete c;
+ p.pool.pop();
}
- return c;
}
- DBClientBase *c = p->pool.top();
- p->pool.pop();
- onHandedOut( c );
- return c;
}
void DBConnectionPool::flush(){
- scoped_lock L(poolMutex);
- for ( map<string,PoolForHost*>::iterator i = pools.begin(); i != pools.end(); i++ ){
- PoolForHost* p = i->second;
+ scoped_lock L(_mutex);
+ for ( map<string,PoolForHost>::iterator i = _pools.begin(); i != _pools.end(); i++ ){
+ PoolForHost& p = i->second;
vector<DBClientBase*> all;
- while ( ! p->pool.empty() ){
- DBClientBase * c = p->pool.top();
- p->pool.pop();
+ while ( ! p.pool.empty() ){
+ DBClientBase * c = p.pool.top();
+ p.pool.pop();
all.push_back( c );
bool res;
c->isMaster( res );
}
for ( vector<DBClientBase*>::iterator i=all.begin(); i != all.end(); i++ ){
- p->pool.push( *i );
+ p.pool.push( *i );
}
}
}
@@ -114,6 +137,26 @@ namespace mongo {
}
}
+ void DBConnectionPool::appendInfo( BSONObjBuilder& b ){
+ scoped_lock lk( _mutex );
+ BSONObjBuilder bb( b.subobjStart( "hosts" ) );
+ for ( map<string,PoolForHost>::iterator i=_pools.begin(); i!=_pools.end(); ++i ){
+ string s = i->first;
+ BSONObjBuilder temp( bb.subobjStart( s.c_str() ) );
+ temp.append( "available" , (int)(i->second.pool.size()) );
+ temp.appendNumber( "created" , i->second.created );
+ temp.done();
+ }
+ bb.done();
+ }
+
+ ScopedDbConnection * ScopedDbConnection::steal(){
+ assert( _conn );
+ ScopedDbConnection * n = new ScopedDbConnection( _host , _conn );
+ _conn = 0;
+ return n;
+ }
+
ScopedDbConnection::~ScopedDbConnection() {
if ( _conn ){
if ( ! _conn->isFailed() ) {
@@ -124,20 +167,44 @@ namespace mongo {
}
}
+ ScopedDbConnection::ScopedDbConnection(const Shard& shard )
+ : _host( shard.getConnString() ) , _conn( pool.get(_host) ){
+ }
+
+ ScopedDbConnection::ScopedDbConnection(const Shard* shard )
+ : _host( shard->getConnString() ) , _conn( pool.get(_host) ){
+ }
+
class PoolFlushCmd : public Command {
public:
- PoolFlushCmd() : Command( "connpoolsync" ){}
- virtual LockType locktype(){ return NONE; }
- virtual bool run(const char*, mongo::BSONObj&, std::string&, mongo::BSONObjBuilder& result, bool){
+ PoolFlushCmd() : Command( "connPoolSync" , false , "connpoolsync" ){}
+ virtual void help( stringstream &help ) const { help<<"internal"; }
+ virtual LockType locktype() const { return NONE; }
+ virtual bool run(const string&, mongo::BSONObj&, std::string&, mongo::BSONObjBuilder& result, bool){
pool.flush();
- result << "ok" << 1;
return true;
}
- virtual bool slaveOk(){
+ virtual bool slaveOk() const {
return true;
}
} poolFlushCmd;
+ class PoolStats : public Command {
+ public:
+ PoolStats() : Command( "connPoolStats" ){}
+ virtual void help( stringstream &help ) const { help<<"stats about connection pool"; }
+ virtual LockType locktype() const { return NONE; }
+ virtual bool run(const string&, mongo::BSONObj&, std::string&, mongo::BSONObjBuilder& result, bool){
+ pool.appendInfo( result );
+ return true;
+ }
+ virtual bool slaveOk() const {
+ return true;
+ }
+
+ } poolStatsCmd;
+
+
} // namespace mongo