summaryrefslogtreecommitdiff
path: root/s/shardconnection.cpp
diff options
context:
space:
mode:
Diffstat (limited to 's/shardconnection.cpp')
-rw-r--r--s/shardconnection.cpp173
1 files changed, 85 insertions, 88 deletions
diff --git a/s/shardconnection.cpp b/s/shardconnection.cpp
index 694693b..d05f5b1 100644
--- a/s/shardconnection.cpp
+++ b/s/shardconnection.cpp
@@ -23,7 +23,24 @@
#include <set>
namespace mongo {
-
+
+ // The code in shardconnection may run not only in mongos context. When elsewhere, chunk shard versioning
+ // is disabled. To enable chunk shard versioning, provide the check/resetShardVerionCB's below
+ //
+ // TODO: better encapsulate this mechanism.
+
+ bool defaultCheckShardVersion( DBClientBase & conn , const string& ns , bool authoritative , int tryNumber ) {
+ // no-op in mongod
+ return false;
+ }
+
+ void defaultResetShardVersion( DBClientBase * conn ) {
+ // no-op in mongod
+ }
+
+ boost::function4<bool, DBClientBase&, const string&, bool, int> checkShardVersionCB = defaultCheckShardVersion;
+ boost::function1<void, DBClientBase*> resetShardVersionCB = defaultResetShardVersion;
+
/**
* holds all the actual db connections for a client to various servers
* 1 pre thread, so don't have to worry about thread safety
@@ -31,39 +48,22 @@ namespace mongo {
class ClientConnections : boost::noncopyable {
public:
struct Status : boost::noncopyable {
- Status() : created(0), avail(0){}
+ Status() : created(0), avail(0) {}
- long long created;
+ long long created;
DBClientBase* avail;
};
- Nullstream& debug( Status * s = 0 , const string& addr = "" ){
- static int ll = 9;
+ ClientConnections() {}
- if ( logLevel < ll )
- return nullstream;
- Nullstream& l = log(ll);
-
- l << "ClientConnections DEBUG " << this << " ";
- if ( s ){
- l << "s: " << s << " addr: " << addr << " ";
- }
- return l;
- }
-
- ClientConnections() : _mutex("ClientConnections") {
- debug() << " NEW " << endl;
- }
-
- ~ClientConnections(){
- debug() << " KILLING " << endl;
- for ( map<string,Status*>::iterator i=_hosts.begin(); i!=_hosts.end(); ++i ){
+ ~ClientConnections() {
+ for ( HostMap::iterator i=_hosts.begin(); i!=_hosts.end(); ++i ) {
string addr = i->first;
Status* ss = i->second;
assert( ss );
- if ( ss->avail ){
- /* if we're shutting down, don't want to initiate release mechanism as it is slow,
+ if ( ss->avail ) {
+ /* if we're shutting down, don't want to initiate release mechanism as it is slow,
and isn't needed since all connections will be closed anyway */
if ( inShutdown() )
delete ss->avail;
@@ -75,49 +75,41 @@ namespace mongo {
}
_hosts.clear();
}
-
- DBClientBase * get( const string& addr , const string& ns ){
+
+ DBClientBase * get( const string& addr , const string& ns ) {
_check( ns );
- scoped_lock lk( _mutex );
+
Status* &s = _hosts[addr];
if ( ! s )
s = new Status();
-
- debug( s , addr ) << "WANT ONE pool avail: " << s->avail << endl;
-
- if ( s->avail ){
+
+ if ( s->avail ) {
DBClientBase* c = s->avail;
s->avail = 0;
- debug( s , addr ) << "GOT " << c << endl;
pool.onHandedOut( c );
return c;
}
- debug() << "CREATING NEW CONNECTION" << endl;
s->created++;
return pool.get( addr );
}
-
- void done( const string& addr , DBClientBase* conn ){
- scoped_lock lk( _mutex );
+
+ void done( const string& addr , DBClientBase* conn ) {
Status* s = _hosts[addr];
assert( s );
- if ( s->avail ){
- debug( s , addr ) << "DONE WITH TEMP" << endl;
+ if ( s->avail ) {
release( addr , conn );
return;
}
s->avail = conn;
- debug( s , addr ) << "PUSHING: " << conn << endl;
}
-
- void sync(){
- scoped_lock lk( _mutex );
- for ( map<string,Status*>::iterator i=_hosts.begin(); i!=_hosts.end(); ++i ){
+
+ void sync() {
+ for ( HostMap::iterator i=_hosts.begin(); i!=_hosts.end(); ++i ) {
string addr = i->first;
Status* ss = i->second;
- if ( ss->avail ){
+ if ( ss->avail ) {
ss->avail->getLastError();
release( addr , ss->avail );
ss->avail = 0;
@@ -127,63 +119,67 @@ namespace mongo {
_hosts.clear();
}
- void checkVersions( const string& ns ){
+ void checkVersions( const string& ns ) {
vector<Shard> all;
Shard::getAllShards( all );
- scoped_lock lk( _mutex );
- for ( unsigned i=0; i<all.size(); i++ ){
+ for ( unsigned i=0; i<all.size(); i++ ) {
Status* &s = _hosts[all[i].getConnString()];
if ( ! s )
s = new Status();
}
- for ( map<string,Status*>::iterator i=_hosts.begin(); i!=_hosts.end(); ++i ){
- if ( ! Shard::isAShard( i->first ) )
+ for ( HostMap::iterator i=_hosts.begin(); i!=_hosts.end(); ++i ) {
+ if ( ! Shard::isAShardNode( i->first ) )
continue;
Status* ss = i->second;
assert( ss );
if ( ! ss->avail )
ss->avail = pool.get( i->first );
- checkShardVersion( *ss->avail , ns );
+ checkShardVersionCB( *ss->avail , ns , false , 1 );
}
}
- void release( const string& addr , DBClientBase * conn ){
- resetShardVersion( conn );
+ void release( const string& addr , DBClientBase * conn ) {
+ resetShardVersionCB( conn );
BSONObj res;
-
+
try {
- if ( conn->simpleCommand( "admin" , &res , "unsetSharding" ) ){
+ if ( conn->simpleCommand( "admin" , &res , "unsetSharding" ) ) {
pool.release( addr , conn );
}
else {
- log(LL_ERROR) << " couldn't unset sharding :( " << res << endl;
+ error() << "unset sharding failed : " << res << endl;
delete conn;
}
}
- catch ( std::exception& e ){
- log(LL_ERROR) << "couldn't unsert sharding : " << e.what() << endl;
+ catch ( SocketException& e ) {
+ // server down or something
+ LOG(1) << "socket exception trying to unset sharding: " << e.toString() << endl;
+ delete conn;
+ }
+ catch ( std::exception& e ) {
+ error() << "couldn't unset sharding : " << e.what() << endl;
delete conn;
}
}
-
- void _check( const string& ns ){
+
+ void _check( const string& ns ) {
if ( ns.size() == 0 || _seenNS.count( ns ) )
return;
_seenNS.insert( ns );
checkVersions( ns );
}
-
- map<string,Status*> _hosts;
- mongo::mutex _mutex;
+
+ typedef map<string,Status*,DBConnectionPool::serverNameCompare> HostMap;
+ HostMap _hosts;
set<string> _seenNS;
// -----
-
+
static thread_specific_ptr<ClientConnections> _perThread;
- static ClientConnections* get(){
+ static ClientConnections* threadInstance() {
ClientConnections* cc = _perThread.get();
- if ( ! cc ){
+ if ( ! cc ) {
cc = new ClientConnections();
_perThread.reset( cc );
}
@@ -202,57 +198,58 @@ namespace mongo {
: _addr( s.getConnString() ) , _ns( ns ) {
_init();
}
-
+
ShardConnection::ShardConnection( const string& addr , const string& ns )
: _addr( addr ) , _ns( ns ) {
_init();
}
-
- void ShardConnection::_init(){
+
+ void ShardConnection::_init() {
assert( _addr.size() );
- _conn = ClientConnections::get()->get( _addr , _ns );
+ _conn = ClientConnections::threadInstance()->get( _addr , _ns );
_finishedInit = false;
}
- void ShardConnection::_finishInit(){
+ void ShardConnection::_finishInit() {
if ( _finishedInit )
return;
_finishedInit = true;
-
- if ( _ns.size() ){
- _setVersion = checkShardVersion( *_conn , _ns );
+
+ if ( _ns.size() ) {
+ _setVersion = checkShardVersionCB( *_conn , _ns , false , 1 );
}
else {
_setVersion = false;
}
-
+
}
- void ShardConnection::done(){
- if ( _conn ){
- ClientConnections::get()->done( _addr , _conn );
+ void ShardConnection::done() {
+ if ( _conn ) {
+ ClientConnections::threadInstance()->done( _addr , _conn );
_conn = 0;
_finishedInit = true;
}
}
- void ShardConnection::kill(){
- if ( _conn ){
+ void ShardConnection::kill() {
+ if ( _conn ) {
+ resetShardVersionCB( _conn );
delete _conn;
_conn = 0;
_finishedInit = true;
}
}
- void ShardConnection::sync(){
- ClientConnections::get()->sync();
+ void ShardConnection::sync() {
+ ClientConnections::threadInstance()->sync();
}
- bool ShardConnection::runCommand( const string& db , const BSONObj& cmd , BSONObj& res ){
+ bool ShardConnection::runCommand( const string& db , const BSONObj& cmd , BSONObj& res ) {
assert( _conn );
bool ok = _conn->runCommand( db , cmd , res );
- if ( ! ok ){
- if ( res["code"].numberInt() == StaleConfigInContextCode ){
+ if ( ! ok ) {
+ if ( res["code"].numberInt() == StaleConfigInContextCode ) {
string big = res["errmsg"].String();
string ns,raw;
massert( 13409 , (string)"can't parse ns from: " + big , StaleConfigException::parse( big , ns , raw ) );
@@ -263,12 +260,12 @@ namespace mongo {
return ok;
}
- void ShardConnection::checkMyConnectionVersions( const string & ns ){
- ClientConnections::get()->checkVersions( ns );
+ void ShardConnection::checkMyConnectionVersions( const string & ns ) {
+ ClientConnections::threadInstance()->checkVersions( ns );
}
ShardConnection::~ShardConnection() {
- if ( _conn ){
+ if ( _conn ) {
if ( ! _conn->isFailed() ) {
/* see done() comments above for why we log this line */
log() << "~ScopedDBConnection: _conn != null" << endl;