summaryrefslogtreecommitdiff
path: root/client
diff options
context:
space:
mode:
Diffstat (limited to 'client')
-rw-r--r--client/clientOnly.cpp5
-rw-r--r--client/connpool.cpp223
-rw-r--r--client/connpool.h119
-rw-r--r--client/dbclient.cpp147
-rw-r--r--client/dbclient.h100
-rw-r--r--client/dbclient_rs.cpp335
-rw-r--r--client/dbclient_rs.h86
-rw-r--r--client/dbclientcursor.cpp123
-rw-r--r--client/dbclientcursor.h82
-rw-r--r--client/distlock.cpp921
-rw-r--r--client/distlock.h164
-rw-r--r--client/distlock_test.cpp394
-rw-r--r--client/examples/clientTest.cpp29
-rw-r--r--client/examples/httpClientTest.cpp33
-rw-r--r--client/examples/insert_demo.cpp47
-rw-r--r--client/examples/rs.cpp80
-rwxr-xr-xclient/examples/simple_client_demo.vcxproj92
-rwxr-xr-xclient/examples/simple_client_demo.vcxproj.filters21
-rw-r--r--client/examples/whereExample.cpp3
-rw-r--r--client/mongo_client_lib.cpp31
-rw-r--r--client/parallel.cpp369
-rw-r--r--client/parallel.h22
-rw-r--r--client/redef_macros.h5
-rw-r--r--client/simple_client_demo.cpp36
-rw-r--r--client/syncclusterconnection.cpp42
-rw-r--r--client/syncclusterconnection.h22
-rw-r--r--client/undef_macros.h2
27 files changed, 2920 insertions, 613 deletions
diff --git a/client/clientOnly.cpp b/client/clientOnly.cpp
index 5725e5f..11890c8 100644
--- a/client/clientOnly.cpp
+++ b/client/clientOnly.cpp
@@ -17,7 +17,6 @@
#include "pch.h"
#include "../client/dbclient.h"
-#include "../db/dbhelpers.h"
#include "../db/cmdline.h"
#include "../s/shard.h"
@@ -29,6 +28,10 @@ namespace mongo {
bool dbexitCalled = false;
+ void exitCleanly( ExitCode code ) {
+ dbexit( code );
+ }
+
void dbexit( ExitCode returnCode, const char *whyMsg , bool tryToGetLock ) {
dbexitCalled = true;
out() << "dbexit called" << endl;
diff --git a/client/connpool.cpp b/client/connpool.cpp
index 23d14da..2d7c37b 100644
--- a/client/connpool.cpp
+++ b/client/connpool.cpp
@@ -36,8 +36,9 @@ namespace mongo {
}
}
- void PoolForHost::done( DBClientBase * c ) {
+ void PoolForHost::done( DBConnectionPool * pool, DBClientBase * c ) {
if ( _pool.size() >= _maxPerHost ) {
+ pool->onDestory( c );
delete c;
}
else {
@@ -45,16 +46,24 @@ namespace mongo {
}
}
- DBClientBase * PoolForHost::get() {
+ DBClientBase * PoolForHost::get( DBConnectionPool * pool , double socketTimeout ) {
time_t now = time(0);
-
+
while ( ! _pool.empty() ) {
StoredConnection sc = _pool.top();
_pool.pop();
- if ( sc.ok( now ) )
- return sc.conn;
- delete sc.conn;
+
+ if ( ! sc.ok( now ) ) {
+ pool->onDestory( sc.conn );
+ delete sc.conn;
+ continue;
+ }
+
+ assert( sc.conn->getSoTimeout() == socketTimeout );
+
+ return sc.conn;
+
}
return NULL;
@@ -75,14 +84,34 @@ namespace mongo {
}
}
+ void PoolForHost::getStaleConnections( vector<DBClientBase*>& stale ) {
+ time_t now = time(0);
+
+ vector<StoredConnection> all;
+ while ( ! _pool.empty() ) {
+ StoredConnection c = _pool.top();
+ _pool.pop();
+
+ if ( c.ok( now ) )
+ all.push_back( c );
+ else
+ stale.push_back( c.conn );
+ }
+
+ for ( size_t i=0; i<all.size(); i++ ) {
+ _pool.push( all[i] );
+ }
+ }
+
+
PoolForHost::StoredConnection::StoredConnection( DBClientBase * c ) {
conn = c;
when = time(0);
}
bool PoolForHost::StoredConnection::ok( time_t now ) {
- // if connection has been idle for an hour, kill it
- return ( now - when ) < 3600;
+ // if connection has been idle for 30 minutes, kill it
+ return ( now - when ) < 1800;
}
void PoolForHost::createdOne( DBClientBase * base) {
@@ -97,16 +126,23 @@ namespace mongo {
DBConnectionPool pool;
- DBClientBase* DBConnectionPool::_get(const string& ident) {
+ DBConnectionPool::DBConnectionPool()
+ : _mutex("DBConnectionPool") ,
+ _name( "dbconnectionpool" ) ,
+ _hooks( new list<DBConnectionHook*>() ) {
+ }
+
+ DBClientBase* DBConnectionPool::_get(const string& ident , double socketTimeout ) {
+ assert( ! inShutdown() );
scoped_lock L(_mutex);
- PoolForHost& p = _pools[ident];
- return p.get();
+ PoolForHost& p = _pools[PoolKey(ident,socketTimeout)];
+ return p.get( this , socketTimeout );
}
- DBClientBase* DBConnectionPool::_finishCreate( const string& host , DBClientBase* conn ) {
+ DBClientBase* DBConnectionPool::_finishCreate( const string& host , double socketTimeout , DBClientBase* conn ) {
{
scoped_lock L(_mutex);
- PoolForHost& p = _pools[host];
+ PoolForHost& p = _pools[PoolKey(host,socketTimeout)];
p.createdOne( conn );
}
@@ -116,22 +152,22 @@ namespace mongo {
return conn;
}
- DBClientBase* DBConnectionPool::get(const ConnectionString& url) {
- DBClientBase * c = _get( url.toString() );
+ DBClientBase* DBConnectionPool::get(const ConnectionString& url, double socketTimeout) {
+ DBClientBase * c = _get( url.toString() , socketTimeout );
if ( c ) {
onHandedOut( c );
return c;
}
string errmsg;
- c = url.connect( errmsg );
+ c = url.connect( errmsg, socketTimeout );
uassert( 13328 , _name + ": connect failed " + url.toString() + " : " + errmsg , c );
- return _finishCreate( url.toString() , c );
+ return _finishCreate( url.toString() , socketTimeout , c );
}
- DBClientBase* DBConnectionPool::get(const string& host) {
- DBClientBase * c = _get( host );
+ DBClientBase* DBConnectionPool::get(const string& host, double socketTimeout) {
+ DBClientBase * c = _get( host , socketTimeout );
if ( c ) {
onHandedOut( c );
return c;
@@ -141,12 +177,23 @@ namespace mongo {
ConnectionString cs = ConnectionString::parse( host , errmsg );
uassert( 13071 , (string)"invalid hostname [" + host + "]" + errmsg , cs.isValid() );
- c = cs.connect( errmsg );
+ c = cs.connect( errmsg, socketTimeout );
if ( ! c )
throw SocketException( SocketException::CONNECT_ERROR , host , 11002 , str::stream() << _name << " error: " << errmsg );
- return _finishCreate( host , c );
+ return _finishCreate( host , socketTimeout , c );
+ }
+
+ void DBConnectionPool::release(const string& host, DBClientBase *c) {
+ if ( c->isFailed() ) {
+ onDestory( c );
+ delete c;
+ return;
+ }
+ scoped_lock L(_mutex);
+ _pools[PoolKey(host,c->getSoTimeout())].done(this,c);
}
+
DBConnectionPool::~DBConnectionPool() {
// connection closing is handled by ~PoolForHost
}
@@ -160,42 +207,55 @@ namespace mongo {
}
void DBConnectionPool::addHook( DBConnectionHook * hook ) {
- _hooks.push_back( hook );
+ _hooks->push_back( hook );
}
void DBConnectionPool::onCreate( DBClientBase * conn ) {
- if ( _hooks.size() == 0 )
+ if ( _hooks->size() == 0 )
return;
- for ( list<DBConnectionHook*>::iterator i = _hooks.begin(); i != _hooks.end(); i++ ) {
+ for ( list<DBConnectionHook*>::iterator i = _hooks->begin(); i != _hooks->end(); i++ ) {
(*i)->onCreate( conn );
}
}
void DBConnectionPool::onHandedOut( DBClientBase * conn ) {
- if ( _hooks.size() == 0 )
+ if ( _hooks->size() == 0 )
return;
- for ( list<DBConnectionHook*>::iterator i = _hooks.begin(); i != _hooks.end(); i++ ) {
+ for ( list<DBConnectionHook*>::iterator i = _hooks->begin(); i != _hooks->end(); i++ ) {
(*i)->onHandedOut( conn );
}
}
+ void DBConnectionPool::onDestory( DBClientBase * conn ) {
+ if ( _hooks->size() == 0 )
+ return;
+
+ for ( list<DBConnectionHook*>::iterator i = _hooks->begin(); i != _hooks->end(); i++ ) {
+ (*i)->onDestory( conn );
+ }
+ }
+
void DBConnectionPool::appendInfo( BSONObjBuilder& b ) {
- BSONObjBuilder bb( b.subobjStart( "hosts" ) );
+
int avail = 0;
long long created = 0;
map<ConnectionString::ConnectionType,long long> createdByType;
+ set<string> replicaSets;
+
+ BSONObjBuilder bb( b.subobjStart( "hosts" ) );
{
scoped_lock lk( _mutex );
for ( PoolMap::iterator i=_pools.begin(); i!=_pools.end(); ++i ) {
if ( i->second.numCreated() == 0 )
continue;
- string s = i->first;
+ string s = str::stream() << i->first.ident << "::" << i->first.timeout;
+
BSONObjBuilder temp( bb.subobjStart( s ) );
temp.append( "available" , i->second.numAvailable() );
temp.appendNumber( "created" , i->second.numCreated() );
@@ -206,9 +266,33 @@ namespace mongo {
long long& x = createdByType[i->second.type()];
x += i->second.numCreated();
+
+ {
+ string setName = i->first.ident;
+ if ( setName.find( "/" ) != string::npos ) {
+ setName = setName.substr( 0 , setName.find( "/" ) );
+ replicaSets.insert( setName );
+ }
+ }
}
}
bb.done();
+
+
+ BSONObjBuilder setBuilder( b.subobjStart( "replicaSets" ) );
+ for ( set<string>::iterator i=replicaSets.begin(); i!=replicaSets.end(); ++i ) {
+ string rs = *i;
+ ReplicaSetMonitorPtr m = ReplicaSetMonitor::get( rs );
+ if ( ! m ) {
+ warning() << "no monitor for set: " << rs << endl;
+ continue;
+ }
+
+ BSONObjBuilder temp( setBuilder.subobjStart( rs ) );
+ m->appendInfo( temp );
+ temp.done();
+ }
+ setBuilder.done();
{
BSONObjBuilder temp( bb.subobjStart( "createdByType" ) );
@@ -223,21 +307,82 @@ namespace mongo {
}
bool DBConnectionPool::serverNameCompare::operator()( const string& a , const string& b ) const{
- string ap = str::before( a , "/" );
- string bp = str::before( b , "/" );
+ const char* ap = a.c_str();
+ const char* bp = b.c_str();
+
+ while (true){
+ if (*ap == '\0' || *ap == '/'){
+ if (*bp == '\0' || *bp == '/')
+ return false; // equal strings
+ else
+ return true; // a is shorter
+ }
+
+ if (*bp == '\0' || *bp == '/')
+ return false; // b is shorter
+
+ if ( *ap < *bp)
+ return true;
+ else if (*ap > *bp)
+ return false;
+
+ ++ap;
+ ++bp;
+ }
+ assert(false);
+ }
+
+ bool DBConnectionPool::poolKeyCompare::operator()( const PoolKey& a , const PoolKey& b ) const {
+ if (DBConnectionPool::serverNameCompare()( a.ident , b.ident ))
+ return true;
- return ap < bp;
+ if (DBConnectionPool::serverNameCompare()( b.ident , a.ident ))
+ return false;
+
+ return a.timeout < b.timeout;
+ }
+
+
+ void DBConnectionPool::taskDoWork() {
+ vector<DBClientBase*> toDelete;
+
+ {
+ // we need to get the connections inside the lock
+ // but we can actually delete them outside
+ scoped_lock lk( _mutex );
+ for ( PoolMap::iterator i=_pools.begin(); i!=_pools.end(); ++i ) {
+ i->second.getStaleConnections( toDelete );
+ }
+ }
+
+ for ( size_t i=0; i<toDelete.size(); i++ ) {
+ try {
+ onDestory( toDelete[i] );
+ delete toDelete[i];
+ }
+ catch ( ... ) {
+ // we don't care if there was a socket error
+ }
+ }
}
// ------ ScopedDbConnection ------
ScopedDbConnection * ScopedDbConnection::steal() {
assert( _conn );
- ScopedDbConnection * n = new ScopedDbConnection( _host , _conn );
+ ScopedDbConnection * n = new ScopedDbConnection( _host , _conn, _socketTimeout );
_conn = 0;
return n;
}
+ void ScopedDbConnection::_setSocketTimeout(){
+ if( ! _conn ) return;
+ if( _conn->type() == ConnectionString::MASTER )
+ (( DBClientConnection* ) _conn)->setSoTimeout( _socketTimeout );
+ else if( _conn->type() == ConnectionString::SYNC )
+ (( SyncClusterConnection* ) _conn)->setAllSoTimeouts( _socketTimeout );
+ }
+
ScopedDbConnection::~ScopedDbConnection() {
if ( _conn ) {
if ( ! _conn->isFailed() ) {
@@ -248,12 +393,14 @@ namespace mongo {
}
}
- ScopedDbConnection::ScopedDbConnection(const Shard& shard )
- : _host( shard.getConnString() ) , _conn( pool.get(_host) ) {
+ ScopedDbConnection::ScopedDbConnection(const Shard& shard, double socketTimeout )
+ : _host( shard.getConnString() ) , _conn( pool.get(_host, socketTimeout) ), _socketTimeout( socketTimeout ) {
+ _setSocketTimeout();
}
- ScopedDbConnection::ScopedDbConnection(const Shard* shard )
- : _host( shard->getConnString() ) , _conn( pool.get(_host) ) {
+ ScopedDbConnection::ScopedDbConnection(const Shard* shard, double socketTimeout )
+ : _host( shard->getConnString() ) , _conn( pool.get(_host, socketTimeout) ), _socketTimeout( socketTimeout ) {
+ _setSocketTimeout();
}
@@ -262,7 +409,7 @@ namespace mongo {
PoolFlushCmd() : Command( "connPoolSync" , false , "connpoolsync" ) {}
virtual void help( stringstream &help ) const { help<<"internal"; }
virtual LockType locktype() const { return NONE; }
- virtual bool run(const string&, mongo::BSONObj&, std::string&, mongo::BSONObjBuilder& result, bool) {
+ virtual bool run(const string&, mongo::BSONObj&, int, std::string&, mongo::BSONObjBuilder& result, bool) {
pool.flush();
return true;
}
@@ -277,7 +424,7 @@ namespace mongo {
PoolStats() : Command( "connPoolStats" ) {}
virtual void help( stringstream &help ) const { help<<"stats about connection pool"; }
virtual LockType locktype() const { return NONE; }
- virtual bool run(const string&, mongo::BSONObj&, std::string&, mongo::BSONObjBuilder& result, bool) {
+ virtual bool run(const string&, mongo::BSONObj&, int, std::string&, mongo::BSONObjBuilder& result, bool) {
pool.appendInfo( result );
result.append( "numDBClientConnection" , DBClientConnection::getNumConnections() );
result.append( "numAScopedConnection" , AScopedConnection::getNumConnections() );
diff --git a/client/connpool.h b/client/connpool.h
index e7f59d6..a37dad7 100644
--- a/client/connpool.h
+++ b/client/connpool.h
@@ -21,9 +21,12 @@
#include "dbclient.h"
#include "redef_macros.h"
+#include "../util/background.h"
+
namespace mongo {
class Shard;
+ class DBConnectionPool;
/**
* not thread safe
@@ -44,7 +47,7 @@ namespace mongo {
int numAvailable() const { return (int)_pool.size(); }
- void createdOne( DBClientBase * base);
+ void createdOne( DBClientBase * base );
long long numCreated() const { return _created; }
ConnectionString::ConnectionType type() const { assert(_created); return _type; }
@@ -52,11 +55,13 @@ namespace mongo {
/**
* gets a connection or return NULL
*/
- DBClientBase * get();
+ DBClientBase * get( DBConnectionPool * pool , double socketTimeout );
- void done( DBClientBase * c );
+ void done( DBConnectionPool * pool , DBClientBase * c );
void flush();
+
+ void getStaleConnections( vector<DBClientBase*>& stale );
static void setMaxPerHost( unsigned max ) { _maxPerHost = max; }
static unsigned getMaxPerHost() { return _maxPerHost; }
@@ -72,6 +77,7 @@ namespace mongo {
};
std::stack<StoredConnection> _pool;
+
long long _created;
ConnectionString::ConnectionType _type;
@@ -83,6 +89,7 @@ namespace mongo {
virtual ~DBConnectionHook() {}
virtual void onCreate( DBClientBase * conn ) {}
virtual void onHandedOut( DBClientBase * conn ) {}
+ virtual void onDestory( DBClientBase * conn ) {}
};
/** Database connection pool.
@@ -100,29 +107,11 @@ namespace mongo {
c.conn()...
}
*/
- class DBConnectionPool {
+ class DBConnectionPool : public PeriodicTask {
public:
- /** compares server namees, but is smart about replica set names */
- struct serverNameCompare {
- bool operator()( const string& a , const string& b ) const;
- };
-
- private:
-
- mongo::mutex _mutex;
- typedef map<string,PoolForHost,serverNameCompare> PoolMap; // servername -> pool
- PoolMap _pools;
- list<DBConnectionHook*> _hooks;
- string _name;
-
- DBClientBase* _get( const string& ident );
-
- DBClientBase* _finishCreate( const string& ident , DBClientBase* conn );
-
- public:
- DBConnectionPool() : _mutex("DBConnectionPool") , _name( "dbconnectionpool" ) { }
+ DBConnectionPool();
~DBConnectionPool();
/** right now just controls some asserts. defaults to "dbconnectionpool" */
@@ -130,22 +119,54 @@ namespace mongo {
void onCreate( DBClientBase * conn );
void onHandedOut( DBClientBase * conn );
+ void onDestory( DBClientBase * conn );
void flush();
- DBClientBase *get(const string& host);
- DBClientBase *get(const ConnectionString& host);
+ DBClientBase *get(const string& host, double socketTimeout = 0);
+ DBClientBase *get(const ConnectionString& host, double socketTimeout = 0);
- void release(const string& host, DBClientBase *c) {
- if ( c->isFailed() ) {
- delete c;
- return;
- }
- scoped_lock L(_mutex);
- _pools[host].done(c);
- }
- void addHook( DBConnectionHook * hook );
+ void release(const string& host, DBClientBase *c);
+
+ void addHook( DBConnectionHook * hook ); // we take ownership
void appendInfo( BSONObjBuilder& b );
+
+ /** compares server namees, but is smart about replica set names */
+ struct serverNameCompare {
+ bool operator()( const string& a , const string& b ) const;
+ };
+
+ virtual string taskName() const { return "DBConnectionPool-cleaner"; }
+ virtual void taskDoWork();
+
+ private:
+ DBConnectionPool( DBConnectionPool& p );
+
+ DBClientBase* _get( const string& ident , double socketTimeout );
+
+ DBClientBase* _finishCreate( const string& ident , double socketTimeout, DBClientBase* conn );
+
+ struct PoolKey {
+ PoolKey( string i , double t ) : ident( i ) , timeout( t ) {}
+ string ident;
+ double timeout;
+ };
+
+ struct poolKeyCompare {
+ bool operator()( const PoolKey& a , const PoolKey& b ) const;
+ };
+
+ typedef map<PoolKey,PoolForHost,poolKeyCompare> PoolMap; // servername -> pool
+
+ mongo::mutex _mutex;
+ string _name;
+
+ PoolMap _pools;
+
+ // pointers owned by me, right now they leak on shutdown
+ // _hooks itself also leaks because it creates a shutdown race condition
+ list<DBConnectionHook*> * _hooks;
+
};
extern DBConnectionPool pool;
@@ -154,9 +175,15 @@ namespace mongo {
public:
AScopedConnection() { _numConnections++; }
virtual ~AScopedConnection() { _numConnections--; }
+
virtual DBClientBase* get() = 0;
virtual void done() = 0;
virtual string getHost() const = 0;
+
+ /**
+ * @return true iff this has a connection to the db
+ */
+ virtual bool ok() const = 0;
/**
* @return total number of current instances of AScopedConnection
@@ -176,19 +203,25 @@ namespace mongo {
/** the main constructor you want to use
throws UserException if can't connect
*/
- explicit ScopedDbConnection(const string& host) : _host(host), _conn( pool.get(host) ) {}
+ explicit ScopedDbConnection(const string& host, double socketTimeout = 0) : _host(host), _conn( pool.get(host, socketTimeout) ), _socketTimeout( socketTimeout ) {
+ _setSocketTimeout();
+ }
- ScopedDbConnection() : _host( "" ) , _conn(0) {}
+ ScopedDbConnection() : _host( "" ) , _conn(0), _socketTimeout( 0 ) {}
/* @param conn - bind to an existing connection */
- ScopedDbConnection(const string& host, DBClientBase* conn ) : _host( host ) , _conn( conn ) {}
+ ScopedDbConnection(const string& host, DBClientBase* conn, double socketTimeout = 0 ) : _host( host ) , _conn( conn ), _socketTimeout( socketTimeout ) {
+ _setSocketTimeout();
+ }
/** throws UserException if can't connect */
- explicit ScopedDbConnection(const ConnectionString& url ) : _host(url.toString()), _conn( pool.get(url) ) {}
+ explicit ScopedDbConnection(const ConnectionString& url, double socketTimeout = 0 ) : _host(url.toString()), _conn( pool.get(url, socketTimeout) ), _socketTimeout( socketTimeout ) {
+ _setSocketTimeout();
+ }
/** throws UserException if can't connect */
- explicit ScopedDbConnection(const Shard& shard );
- explicit ScopedDbConnection(const Shard* shard );
+ explicit ScopedDbConnection(const Shard& shard, double socketTimeout = 0 );
+ explicit ScopedDbConnection(const Shard* shard, double socketTimeout = 0 );
~ScopedDbConnection();
@@ -210,6 +243,8 @@ namespace mongo {
return _conn;
}
+ bool ok() const { return _conn > 0; }
+
string getHost() const { return _host; }
/** Force closure of the connection. You should call this if you leave it in
@@ -242,8 +277,12 @@ namespace mongo {
ScopedDbConnection * steal();
private:
+
+ void _setSocketTimeout();
+
const string _host;
DBClientBase *_conn;
+ const double _socketTimeout;
};
diff --git a/client/dbclient.cpp b/client/dbclient.cpp
index bb24199..dadf7e4 100644
--- a/client/dbclient.cpp
+++ b/client/dbclient.cpp
@@ -64,21 +64,23 @@ namespace mongo {
}
- DBClientBase* ConnectionString::connect( string& errmsg ) const {
+ DBClientBase* ConnectionString::connect( string& errmsg, double socketTimeout ) const {
switch ( _type ) {
case MASTER: {
DBClientConnection * c = new DBClientConnection(true);
+ c->setSoTimeout( socketTimeout );
log(1) << "creating new connection to:" << _servers[0] << endl;
if ( ! c->connect( _servers[0] , errmsg ) ) {
delete c;
return 0;
}
+ log(1) << "connected connection!" << endl;
return c;
}
case PAIR:
case SET: {
- DBClientReplicaSet * set = new DBClientReplicaSet( _setName , _servers );
+ DBClientReplicaSet * set = new DBClientReplicaSet( _setName , _servers , socketTimeout );
if( ! set->connect() ) {
delete set;
errmsg = "connect failed to set ";
@@ -93,7 +95,8 @@ namespace mongo {
list<HostAndPort> l;
for ( unsigned i=0; i<_servers.size(); i++ )
l.push_back( _servers[i] );
- return new SyncClusterConnection( l );
+ SyncClusterConnection* c = new SyncClusterConnection( l, socketTimeout );
+ return c;
}
case INVALID:
@@ -294,7 +297,7 @@ namespace mongo {
return b.obj();
}
- BSONObj getlasterrorcmdobj = fromjson("{getlasterror:1}");
+ const BSONObj getlasterrorcmdobj = fromjson("{getlasterror:1}");
BSONObj DBClientWithCommands::getLastErrorDetailed() {
BSONObj info;
@@ -314,7 +317,7 @@ namespace mongo {
return e.str();
}
- BSONObj getpreverrorcmdobj = fromjson("{getpreverror:1}");
+ const BSONObj getpreverrorcmdobj = fromjson("{getpreverror:1}");
BSONObj DBClientWithCommands::getPrevError() {
BSONObj info;
@@ -391,6 +394,7 @@ namespace mongo {
}
bool DBClientWithCommands::createCollection(const string &ns, long long size, bool capped, int max, BSONObj *info) {
+ assert(!capped||size);
BSONObj o;
if ( info == 0 ) info = &o;
BSONObjBuilder b;
@@ -529,19 +533,31 @@ namespace mongo {
return DBClientBase::auth(dbname, username, password.c_str(), errmsg, false);
}
- BSONObj DBClientInterface::findOne(const string &ns, const Query& query, const BSONObj *fieldsToReturn, int queryOptions) {
+ /** query N objects from the database into an array. makes sense mostly when you want a small number of results. if a huge number, use
+ query() and iterate the cursor.
+ */
+ void DBClientInterface::findN(vector<BSONObj>& out, const string& ns, Query query, int nToReturn, int nToSkip, const BSONObj *fieldsToReturn, int queryOptions) {
+ out.reserve(nToReturn);
+
auto_ptr<DBClientCursor> c =
- this->query(ns, query, 1, 0, fieldsToReturn, queryOptions);
+ this->query(ns, query, nToReturn, nToSkip, fieldsToReturn, queryOptions);
- uassert( 10276 , str::stream() << "DBClientBase::findOne: transport error: " << getServerAddress() << " query: " << query.toString(), c.get() );
+ uassert( 10276 , str::stream() << "DBClientBase::findN: transport error: " << getServerAddress() << " query: " << query.toString(), c.get() );
if ( c->hasResultFlag( ResultFlag_ShardConfigStale ) )
- throw StaleConfigException( ns , "findOne has stale config" );
+ throw StaleConfigException( ns , "findN stale config" );
- if ( !c->more() )
- return BSONObj();
+ for( int i = 0; i < nToReturn; i++ ) {
+ if ( !c->more() )
+ break;
+ out.push_back( c->nextSafe().copy() );
+ }
+ }
- return c->nextSafe().copy();
+ BSONObj DBClientInterface::findOne(const string &ns, const Query& query, const BSONObj *fieldsToReturn, int queryOptions) {
+ vector<BSONObj> v;
+ findN(v, ns, query, 1, 0, fieldsToReturn, queryOptions);
+ return v.empty() ? BSONObj() : v[0];
}
bool DBClientConnection::connect(const HostAndPort& server, string& errmsg) {
@@ -558,39 +574,50 @@ namespace mongo {
p.reset(new MessagingPort( _so_timeout, _logLevel ));
if (server->getAddr() == "0.0.0.0") {
- failed = true;
+ _failed = true;
return false;
}
+ // if( _so_timeout == 0 ){
+ // printStackTrace();
+ // log() << "Connecting to server " << _serverString << " timeout " << _so_timeout << endl;
+ // }
if ( !p->connect(*server) ) {
stringstream ss;
ss << "couldn't connect to server " << _serverString;
errmsg = ss.str();
- failed = true;
+ _failed = true;
return false;
}
+
+#ifdef MONGO_SSL
+ if ( cmdLine.sslOnNormalPorts ) {
+ p->secure( sslManager() );
+ }
+#endif
+
return true;
}
void DBClientConnection::_checkConnection() {
- if ( !failed )
+ if ( !_failed )
return;
if ( lastReconnectTry && time(0)-lastReconnectTry < 2 ) {
// we wait a little before reconnect attempt to avoid constant hammering.
// but we throw we don't want to try to use a connection in a bad state
- throw SocketException(SocketException::FAILED_STATE);
+ throw SocketException( SocketException::FAILED_STATE , toString() );
}
if ( !autoReconnect )
- throw SocketException(SocketException::FAILED_STATE);
+ throw SocketException( SocketException::FAILED_STATE , toString() );
lastReconnectTry = time(0);
log(_logLevel) << "trying reconnect to " << _serverString << endl;
string errmsg;
- failed = false;
+ _failed = false;
if ( ! _connect(errmsg) ) {
- failed = true;
+ _failed = true;
log(_logLevel) << "reconnect " << _serverString << " failed " << errmsg << endl;
- throw SocketException(SocketException::CONNECT_ERROR);
+ throw SocketException( SocketException::CONNECT_ERROR , toString() );
}
log(_logLevel) << "reconnect " << _serverString << " ok" << endl;
@@ -675,7 +702,7 @@ namespace mongo {
/* connection CANNOT be used anymore as more data may be on the way from the server.
we have to reconnect.
*/
- failed = true;
+ _failed = true;
p->shutdown();
throw;
}
@@ -683,12 +710,11 @@ namespace mongo {
return n;
}
- void DBClientBase::insert( const string & ns , BSONObj obj ) {
+ void DBClientBase::insert( const string & ns , BSONObj obj , int flags) {
Message toSend;
BufBuilder b;
- int opts = 0;
- b.appendNum( opts );
+ b.appendNum( flags );
b.appendStr( ns );
obj.appendSelfToBufBuilder( b );
@@ -697,12 +723,11 @@ namespace mongo {
say( toSend );
}
- void DBClientBase::insert( const string & ns , const vector< BSONObj > &v ) {
+ void DBClientBase::insert( const string & ns , const vector< BSONObj > &v , int flags) {
Message toSend;
BufBuilder b;
- int opts = 0;
- b.appendNum( opts );
+ b.appendNum( flags );
b.appendStr( ns );
for( vector< BSONObj >::const_iterator i = v.begin(); i != v.end(); ++i )
i->appendSelfToBufBuilder( b );
@@ -750,8 +775,12 @@ namespace mongo {
toSend.setData( dbUpdate , b.buf() , b.len() );
say( toSend );
+
+
}
+
+
auto_ptr<DBClientCursor> DBClientWithCommands::getIndexes( const string &ns ) {
return query( Namespace( ns.c_str() ).getSisterNS( "system.indexes" ).c_str() , BSON( "ns" << ns ) );
}
@@ -816,7 +845,7 @@ namespace mongo {
return ss.str();
}
- bool DBClientWithCommands::ensureIndex( const string &ns , BSONObj keys , bool unique, const string & name , bool cache ) {
+ bool DBClientWithCommands::ensureIndex( const string &ns , BSONObj keys , bool unique, const string & name , bool cache, bool background, int version ) {
BSONObjBuilder toSave;
toSave.append( "ns" , ns );
toSave.append( "key" , keys );
@@ -834,9 +863,15 @@ namespace mongo {
cacheKey += nn;
}
+ if( version >= 0 )
+ toSave.append("v", version);
+
if ( unique )
toSave.appendBool( "unique", unique );
+ if( background )
+ toSave.appendBool( "background", true );
+
if ( _seenIndexes.count( cacheKey ) )
return 0;
@@ -874,13 +909,13 @@ namespace mongo {
toSend.setData(dbQuery, b.buf(), b.len());
}
- void DBClientConnection::say( Message &toSend ) {
+ void DBClientConnection::say( Message &toSend, bool isRetry ) {
checkConnection();
try {
port().say( toSend );
}
catch( SocketException & ) {
- failed = true;
+ _failed = true;
throw;
}
}
@@ -889,8 +924,8 @@ namespace mongo {
port().piggyBack( toSend );
}
- void DBClientConnection::recv( Message &m ) {
- port().recv(m);
+ bool DBClientConnection::recv( Message &m ) {
+ return port().recv(m);
}
bool DBClientConnection::call( Message &toSend, Message &response, bool assertOk , string * actualServer ) {
@@ -900,7 +935,7 @@ namespace mongo {
*/
try {
if ( !port().call(toSend, response) ) {
- failed = true;
+ _failed = true;
if ( assertOk )
uasserted( 10278 , str::stream() << "dbclient error communicating with server: " << getServerAddress() );
@@ -908,21 +943,46 @@ namespace mongo {
}
}
catch( SocketException & ) {
- failed = true;
+ _failed = true;
throw;
}
return true;
}
- void DBClientConnection::checkResponse( const char *data, int nReturned ) {
+ BSONElement getErrField(const BSONObj& o) {
+ BSONElement first = o.firstElement();
+ if( strcmp(first.fieldName(), "$err") == 0 )
+ return first;
+
+ // temp - will be DEV only later
+ /*DEV*/
+ if( 1 ) {
+ BSONElement e = o["$err"];
+ if( !e.eoo() ) {
+ wassert(false);
+ }
+ return e;
+ }
+
+ return BSONElement();
+ }
+
+ bool hasErrField( const BSONObj& o ){
+ return ! getErrField( o ).eoo();
+ }
+
+ void DBClientConnection::checkResponse( const char *data, int nReturned, bool* retry, string* host ) {
/* check for errors. the only one we really care about at
* this stage is "not master"
*/
+ *retry = false;
+ *host = _serverString;
+
if ( clientSet && nReturned ) {
assert(data);
BSONObj o(data);
- BSONElement e = o["$err"];
+ BSONElement e = getErrField(o);
if ( e.type() == String && str::contains( e.valuestr() , "not master" ) ) {
clientSet->isntMaster();
}
@@ -930,7 +990,7 @@ namespace mongo {
}
void DBClientConnection::killCursor( long long cursorId ) {
- BufBuilder b;
+ StackBufBuilder b;
b.appendNum( (int)0 ); // reserved
b.appendNum( (int)1 ); // number
b.appendNum( cursorId );
@@ -944,6 +1004,19 @@ namespace mongo {
say(m);
}
+#ifdef MONGO_SSL
+ SSLManager* DBClientConnection::sslManager() {
+ if ( _sslManager )
+ return _sslManager;
+
+ SSLManager* s = new SSLManager(true);
+ _sslManager = s;
+ return s;
+ }
+
+ SSLManager* DBClientConnection::_sslManager = 0;
+#endif
+
AtomicUInt DBClientConnection::_numConnections;
bool DBClientConnection::_lazyKillCursor = true;
diff --git a/client/dbclient.h b/client/dbclient.h
index 9bc71fd..2b4bb85 100644
--- a/client/dbclient.h
+++ b/client/dbclient.h
@@ -1,4 +1,7 @@
-/** @file dbclient.h - connect to a Mongo database as a database, from C++ */
+/** @file dbclient.h
+
+ Core MongoDB C++ driver interfaces are defined here.
+*/
/* Copyright 2009 10gen Inc.
*
@@ -18,7 +21,8 @@
#pragma once
#include "../pch.h"
-#include "../util/message.h"
+#include "../util/net/message.h"
+#include "../util/net/message_port.h"
#include "../db/jsobj.h"
#include "../db/json.h"
#include <stack>
@@ -100,6 +104,15 @@ namespace mongo {
RemoveOption_Broadcast = 1 << 1
};
+
+ /**
+ * need to put in DbMesssage::ReservedOptions as well
+ */
+ enum InsertOptions {
+ /** With muli-insert keep processing inserts if one fails */
+ InsertOption_ContinueOnError = 1 << 0
+ };
+
class DBClientBase;
/**
@@ -174,7 +187,7 @@ namespace mongo {
string toString() const { return _string; }
- DBClientBase* connect( string& errmsg ) const;
+ DBClientBase* connect( string& errmsg, double socketTimeout = 0 ) const;
string getSetName() const { return _setName; }
@@ -296,7 +309,7 @@ namespace mongo {
Query& where(const string &jscode) { return where(jscode, BSONObj()); }
/**
- * if this query has an orderby, hint, or some other field
+ * @return true if this query has an orderby, hint, or some other field
*/
bool isComplex( bool * hasDollar = 0 ) const;
@@ -332,12 +345,15 @@ namespace mongo {
virtual ~DBConnector() {}
/** actualServer is set to the actual server where they call went if there was a choice (SlaveOk) */
virtual bool call( Message &toSend, Message &response, bool assertOk=true , string * actualServer = 0 ) = 0;
- virtual void say( Message &toSend ) = 0;
+ virtual void say( Message &toSend, bool isRetry = false ) = 0;
virtual void sayPiggyBack( Message &toSend ) = 0;
- virtual void checkResponse( const char* data, int nReturned ) {}
-
/* used by QueryOption_Exhaust. To use that your subclass must implement this. */
- virtual void recv( Message& m ) { assert(false); }
+ virtual bool recv( Message& m ) { assert(false); return false; }
+ // In general, for lazy queries, we'll need to say, recv, then checkResponse
+ virtual void checkResponse( const char* data, int nReturned, bool* retry = NULL, string* targetHost = NULL ) {
+ if( retry ) *retry = false; if( targetHost ) *targetHost = "";
+ }
+ virtual bool lazySupported() const = 0;
};
/**
@@ -348,12 +364,9 @@ namespace mongo {
virtual auto_ptr<DBClientCursor> query(const string &ns, Query query, int nToReturn = 0, int nToSkip = 0,
const BSONObj *fieldsToReturn = 0, int queryOptions = 0 , int batchSize = 0 ) = 0;
- /** don't use this - called automatically by DBClientCursor for you */
- virtual auto_ptr<DBClientCursor> getMore( const string &ns, long long cursorId, int nToReturn = 0, int options = 0 ) = 0;
-
- virtual void insert( const string &ns, BSONObj obj ) = 0;
+ virtual void insert( const string &ns, BSONObj obj , int flags=0) = 0;
- virtual void insert( const string &ns, const vector< BSONObj >& v ) = 0;
+ virtual void insert( const string &ns, const vector< BSONObj >& v , int flags=0) = 0;
virtual void remove( const string &ns , Query query, bool justOne = 0 ) = 0;
@@ -367,8 +380,15 @@ namespace mongo {
*/
virtual BSONObj findOne(const string &ns, const Query& query, const BSONObj *fieldsToReturn = 0, int queryOptions = 0);
+ /** query N objects from the database into an array. makes sense mostly when you want a small number of results. if a huge number, use
+ query() and iterate the cursor.
+ */
+ void findN(vector<BSONObj>& out, const string&ns, Query query, int nToReturn, int nToSkip = 0, const BSONObj *fieldsToReturn = 0, int queryOptions = 0);
+
virtual string getServerAddress() const = 0;
+ /** don't use this - called automatically by DBClientCursor for you */
+ virtual auto_ptr<DBClientCursor> getMore( const string &ns, long long cursorId, int nToReturn = 0, int options = 0 ) = 0;
};
/**
@@ -449,15 +469,19 @@ namespace mongo {
*/
bool createCollection(const string &ns, long long size = 0, bool capped = false, int max = 0, BSONObj *info = 0);
- /** Get error result from the last operation on this connection.
+ /** Get error result from the last write operation (insert/update/delete) on this connection.
@return error message text, or empty string if no error.
*/
string getLastError();
- /** Get error result from the last operation on this connection.
+
+ /** Get error result from the last write operation (insert/update/delete) on this connection.
@return full error object.
*/
virtual BSONObj getLastErrorDetailed();
+ /** Can be called with the returned value from getLastErrorDetailed to extract an error string.
+ If all you need is the string, just call getLastError() instead.
+ */
static string getLastErrorString( const BSONObj& res );
/** Return the last error which has occurred, even if not the very last operation.
@@ -640,13 +664,15 @@ namespace mongo {
@param ns collection to be indexed
@param keys the "key pattern" for the index. e.g., { name : 1 }
@param unique if true, indicates that key uniqueness should be enforced for this index
- @param name if not isn't specified, it will be created from the keys (recommended)
+ @param name if not specified, it will be created from the keys automatically (which is recommended)
@param cache if set to false, the index cache for the connection won't remember this call
+ @param background build index in the background (see mongodb docs/wiki for details)
+ @param v index version. leave at default value. (unit tests set this parameter.)
@return whether or not sent message to db.
should be true on first call, false on subsequent unless resetIndexCache was called
*/
virtual bool ensureIndex( const string &ns , BSONObj keys , bool unique = false, const string &name = "",
- bool cache = true );
+ bool cache = true, bool background = false, int v = -1 );
/**
clears the index cache, so the subsequent call to ensureIndex for any index will go to the server
@@ -748,12 +774,12 @@ namespace mongo {
/**
insert an object into the database
*/
- virtual void insert( const string &ns , BSONObj obj );
+ virtual void insert( const string &ns , BSONObj obj , int flags=0);
/**
insert a vector of objects into the database
*/
- virtual void insert( const string &ns, const vector< BSONObj >& v );
+ virtual void insert( const string &ns, const vector< BSONObj >& v , int flags=0);
/**
remove matching objects from the database
@@ -772,9 +798,10 @@ namespace mongo {
virtual bool callRead( Message& toSend , Message& response ) = 0;
// virtual bool callWrite( Message& toSend , Message& response ) = 0; // TODO: add this if needed
- virtual void say( Message& toSend ) = 0;
-
+
virtual ConnectionString::ConnectionType type() const = 0;
+
+ virtual double getSoTimeout() const = 0;
}; // DBClientBase
@@ -798,7 +825,7 @@ namespace mongo {
Connect timeout is fixed, but short, at 5 seconds.
*/
DBClientConnection(bool _autoReconnect=false, DBClientReplicaSet* cp=0, double so_timeout=0) :
- clientSet(cp), failed(false), autoReconnect(_autoReconnect), lastReconnectTry(0), _so_timeout(so_timeout) {
+ clientSet(cp), _failed(false), autoReconnect(_autoReconnect), lastReconnectTry(0), _so_timeout(so_timeout) {
_numConnections++;
}
@@ -869,14 +896,14 @@ namespace mongo {
@return true if this connection is currently in a failed state. When autoreconnect is on,
a connection will transition back to an ok state after reconnecting.
*/
- bool isFailed() const { return failed; }
+ bool isFailed() const { return _failed; }
- MessagingPort& port() { return *p; }
+ MessagingPort& port() { assert(p); return *p; }
string toStringLong() const {
stringstream ss;
ss << _serverString;
- if ( failed ) ss << " failed";
+ if ( _failed ) ss << " failed";
return ss.str();
}
@@ -887,11 +914,15 @@ namespace mongo {
virtual void killCursor( long long cursorID );
virtual bool callRead( Message& toSend , Message& response ) { return call( toSend , response ); }
- virtual void say( Message &toSend );
+ virtual void say( Message &toSend, bool isRetry = false );
+ virtual bool recv( Message& m );
+ virtual void checkResponse( const char *data, int nReturned, bool* retry = NULL, string* host = NULL );
virtual bool call( Message &toSend, Message &response, bool assertOk = true , string * actualServer = 0 );
virtual ConnectionString::ConnectionType type() const { return ConnectionString::MASTER; }
- virtual void checkResponse( const char *data, int nReturned );
void setSoTimeout(double to) { _so_timeout = to; }
+ double getSoTimeout() const { return _so_timeout; }
+
+ virtual bool lazySupported() const { return true; }
static int getNumConnections() {
return _numConnections;
@@ -899,16 +930,15 @@ namespace mongo {
static void setLazyKillCursor( bool lazy ) { _lazyKillCursor = lazy; }
static bool getLazyKillCursor() { return _lazyKillCursor; }
-
+
protected:
friend class SyncClusterConnection;
- virtual void recv( Message& m );
virtual void sayPiggyBack( Message &toSend );
DBClientReplicaSet *clientSet;
boost::scoped_ptr<MessagingPort> p;
boost::scoped_ptr<SockAddr> server;
- bool failed;
+ bool _failed;
const bool autoReconnect;
time_t lastReconnectTry;
HostAndPort _server; // remember for reconnects
@@ -916,7 +946,7 @@ namespace mongo {
void _checkConnection();
// throws SocketException if in failed state and not reconnecting or if waiting to reconnect
- void checkConnection() { if( failed ) _checkConnection(); }
+ void checkConnection() { if( _failed ) _checkConnection(); }
map< string, pair<string,string> > authCache;
double _so_timeout;
@@ -924,6 +954,11 @@ namespace mongo {
static AtomicUInt _numConnections;
static bool _lazyKillCursor; // lazy means we piggy back kill cursors on next op
+
+#ifdef MONGO_SSL
+ static SSLManager* sslManager();
+ static SSLManager* _sslManager;
+#endif
};
/** pings server to check if it's up
@@ -932,6 +967,9 @@ namespace mongo {
DBClientBase * createDirectClient();
+ BSONElement getErrField( const BSONObj& result );
+ bool hasErrField( const BSONObj& result );
+
} // namespace mongo
#include "dbclientcursor.h"
diff --git a/client/dbclient_rs.cpp b/client/dbclient_rs.cpp
index 37f6225..2cab1f7 100644
--- a/client/dbclient_rs.cpp
+++ b/client/dbclient_rs.cpp
@@ -54,9 +54,9 @@ namespace mongo {
void run() {
log() << "starting" << endl;
while ( ! inShutdown() ) {
- sleepsecs( 20 );
+ sleepsecs( 10 );
try {
- ReplicaSetMonitor::checkAll();
+ ReplicaSetMonitor::checkAll( true );
}
catch ( std::exception& e ) {
error() << "check failed: " << e.what() << endl;
@@ -99,17 +99,14 @@ namespace mongo {
}
_nodes.push_back( Node( servers[i] , conn.release() ) );
-
+
+ int myLoc = _nodes.size() - 1;
string maybePrimary;
- if (_checkConnection( _nodes[_nodes.size()-1].conn , maybePrimary, false)) {
- break;
- }
+ _checkConnection( _nodes[myLoc].conn.get() , maybePrimary, false, myLoc );
}
}
ReplicaSetMonitor::~ReplicaSetMonitor() {
- for ( unsigned i=0; i<_nodes.size(); i++ )
- delete _nodes[i].conn;
_nodes.clear();
_master = -1;
}
@@ -125,7 +122,16 @@ namespace mongo {
return m;
}
- void ReplicaSetMonitor::checkAll() {
+ ReplicaSetMonitorPtr ReplicaSetMonitor::get( const string& name ) {
+ scoped_lock lk( _setsLock );
+ map<string,ReplicaSetMonitorPtr>::const_iterator i = _sets.find( name );
+ if ( i == _sets.end() )
+ return ReplicaSetMonitorPtr();
+ return i->second;
+ }
+
+
+ void ReplicaSetMonitor::checkAll( bool checkAllSecondaries ) {
set<string> seen;
while ( true ) {
@@ -146,7 +152,7 @@ namespace mongo {
if ( ! m )
break;
- m->check();
+ m->check( checkAllSecondaries );
}
@@ -202,7 +208,7 @@ namespace mongo {
return _nodes[_master].addr;
}
- _check();
+ _check( false );
scoped_lock lk( _lock );
uassert( 10009 , str::stream() << "ReplicaSetMonitor no master found for set: " << _name , _master >= 0 );
@@ -210,34 +216,70 @@ namespace mongo {
}
HostAndPort ReplicaSetMonitor::getSlave( const HostAndPort& prev ) {
- // make sure its valid
- if ( prev.port() > 0 ) {
+ // make sure its valid
+
+ bool wasFound = false;
+
+ // This is always true, since checked in port()
+ assert( prev.port() >= 0 );
+ if( prev.host().size() ){
scoped_lock lk( _lock );
for ( unsigned i=0; i<_nodes.size(); i++ ) {
if ( prev != _nodes[i].addr )
continue;
- if ( _nodes[i].ok )
+ wasFound = true;
+
+ if ( _nodes[i].okForSecondaryQueries() )
return prev;
+
break;
}
}
+ if( prev.host().size() ){
+ if( wasFound ){ LOG(1) << "slave '" << prev << "' is no longer ok to use" << endl; }
+ else{ LOG(1) << "slave '" << prev << "' was not found in the replica set" << endl; }
+ }
+ else LOG(1) << "slave '" << prev << "' is not initialized or invalid" << endl;
+
return getSlave();
}
HostAndPort ReplicaSetMonitor::getSlave() {
- scoped_lock lk( _lock );
- for ( unsigned i=0; i<_nodes.size(); i++ ) {
- _nextSlave = ( _nextSlave + 1 ) % _nodes.size();
- if ( _nextSlave == _master )
- continue;
- if ( _nodes[ _nextSlave ].ok )
- return _nodes[ _nextSlave ].addr;
+ LOG(2) << "selecting new slave from replica set " << getServerAddress() << endl;
+
+ // Logic is to retry three times for any secondary node, if we can't find any secondary, we'll take
+ // any "ok" node
+ // TODO: Could this query hidden nodes?
+ const int MAX = 3;
+ for ( int xxx=0; xxx<MAX; xxx++ ) {
+
+ {
+ scoped_lock lk( _lock );
+
+ unsigned i = 0;
+ for ( ; i<_nodes.size(); i++ ) {
+ _nextSlave = ( _nextSlave + 1 ) % _nodes.size();
+ if ( _nextSlave == _master ){
+ LOG(2) << "not selecting " << _nodes[_nextSlave] << " as it is the current master" << endl;
+ continue;
+ }
+ if ( _nodes[ _nextSlave ].okForSecondaryQueries() || ( _nodes[ _nextSlave ].ok && ( xxx + 1 ) >= MAX ) )
+ return _nodes[ _nextSlave ].addr;
+
+ LOG(2) << "not selecting " << _nodes[_nextSlave] << " as it is not ok to use" << endl;
+ }
+
+ }
+
+ check(false);
}
+
+ LOG(2) << "no suitable slave nodes found, returning default node " << _nodes[ 0 ] << endl;
- return _nodes[ 0 ].addr;
+ return _nodes[0].addr;
}
/**
@@ -266,7 +308,7 @@ namespace mongo {
string host = member["name"].String();
int m = -1;
- if ((m = _find(host)) <= 0) {
+ if ((m = _find(host)) < 0) {
continue;
}
@@ -309,16 +351,34 @@ namespace mongo {
- bool ReplicaSetMonitor::_checkConnection( DBClientConnection * c , string& maybePrimary , bool verbose ) {
+ bool ReplicaSetMonitor::_checkConnection( DBClientConnection * c , string& maybePrimary , bool verbose , int nodesOffset ) {
scoped_lock lk( _checkConnectionLock );
bool isMaster = false;
bool changed = false;
try {
+ Timer t;
BSONObj o;
c->isMaster(isMaster, &o);
+
+ if ( o["setName"].type() != String || o["setName"].String() != _name ) {
+ warning() << "node: " << c->getServerAddress() << " isn't a part of set: " << _name
+ << " ismaster: " << o << endl;
+ if ( nodesOffset >= 0 )
+ _nodes[nodesOffset].ok = false;
+ return false;
+ }
- log( ! verbose ) << "ReplicaSetMonitor::_checkConnection: " << c->toString() << ' ' << o << endl;
+ if ( nodesOffset >= 0 ) {
+ _nodes[nodesOffset].pingTimeMillis = t.millis();
+ _nodes[nodesOffset].hidden = o["hidden"].trueValue();
+ _nodes[nodesOffset].secondary = o["secondary"].trueValue();
+ _nodes[nodesOffset].ismaster = o["ismaster"].trueValue();
+
+ _nodes[nodesOffset].lastIsMaster = o.copy();
+ }
+ log( ! verbose ) << "ReplicaSetMonitor::_checkConnection: " << c->toString() << ' ' << o << endl;
+
// add other nodes
if ( o["hosts"].type() == Array ) {
if ( o["primary"].type() == String )
@@ -329,11 +389,14 @@ namespace mongo {
if (o.hasField("passives") && o["passives"].type() == Array) {
_checkHosts(o["passives"].Obj(), changed);
}
-
+
_checkStatus(c);
+
+
}
catch ( std::exception& e ) {
log( ! verbose ) << "ReplicaSetMonitor::_checkConnection: caught exception " << c->toString() << ' ' << e.what() << endl;
+ _nodes[nodesOffset].ok = false;
}
if ( changed && _hook )
@@ -342,24 +405,28 @@ namespace mongo {
return isMaster;
}
- void ReplicaSetMonitor::_check() {
+ void ReplicaSetMonitor::_check( bool checkAllSecondaries ) {
bool triedQuickCheck = false;
LOG(1) << "_check : " << getServerAddress() << endl;
+ int newMaster = -1;
+
for ( int retry = 0; retry < 2; retry++ ) {
for ( unsigned i=0; i<_nodes.size(); i++ ) {
- DBClientConnection * c;
+ shared_ptr<DBClientConnection> c;
{
scoped_lock lk( _lock );
c = _nodes[i].conn;
}
string maybePrimary;
- if ( _checkConnection( c , maybePrimary , retry ) ) {
+ if ( _checkConnection( c.get() , maybePrimary , retry , i ) ) {
_master = i;
- return;
+ newMaster = i;
+ if ( ! checkAllSecondaries )
+ return;
}
if ( ! triedQuickCheck && maybePrimary.size() ) {
@@ -367,36 +434,44 @@ namespace mongo {
if ( x >= 0 ) {
triedQuickCheck = true;
string dummy;
- DBClientConnection * testConn;
+ shared_ptr<DBClientConnection> testConn;
{
scoped_lock lk( _lock );
testConn = _nodes[x].conn;
}
- if ( _checkConnection( testConn , dummy , false ) ) {
+ if ( _checkConnection( testConn.get() , dummy , false , x ) ) {
_master = x;
- return;
+ newMaster = x;
+ if ( ! checkAllSecondaries )
+ return;
}
}
}
}
+
+ if ( newMaster >= 0 )
+ return;
+
sleepsecs(1);
}
}
- void ReplicaSetMonitor::check() {
+ void ReplicaSetMonitor::check( bool checkAllSecondaries ) {
// first see if the current master is fine
if ( _master >= 0 ) {
string temp;
- if ( _checkConnection( _nodes[_master].conn , temp , false ) ) {
- // current master is fine, so we're done
- return;
+ if ( _checkConnection( _nodes[_master].conn.get() , temp , false , _master ) ) {
+ if ( ! checkAllSecondaries ) {
+ // current master is fine, so we're done
+ return;
+ }
}
}
// we either have no master, or the current is dead
- _check();
+ _check( checkAllSecondaries );
}
int ReplicaSetMonitor::_find( const string& server ) const {
@@ -419,7 +494,26 @@ namespace mongo {
return i;
return -1;
}
-
+
+ void ReplicaSetMonitor::appendInfo( BSONObjBuilder& b ) const {
+ scoped_lock lk( _lock );
+ BSONArrayBuilder hosts( b.subarrayStart( "hosts" ) );
+ for ( unsigned i=0; i<_nodes.size(); i++ ) {
+ hosts.append( BSON( "addr" << _nodes[i].addr <<
+ // "lastIsMaster" << _nodes[i].lastIsMaster << // this is a potential race, so only used when debugging
+ "ok" << _nodes[i].ok <<
+ "ismaster" << _nodes[i].ismaster <<
+ "hidden" << _nodes[i].hidden <<
+ "secondary" << _nodes[i].secondary <<
+ "pingTimeMillis" << _nodes[i].pingTimeMillis ) );
+
+ }
+ hosts.done();
+
+ b.append( "master" , _master );
+ b.append( "nextSlave" , _nextSlave );
+ }
+
mongo::mutex ReplicaSetMonitor::_setsLock( "ReplicaSetMonitor" );
map<string,ReplicaSetMonitorPtr> ReplicaSetMonitor::_sets;
@@ -428,8 +522,9 @@ namespace mongo {
// ----- DBClientReplicaSet ---------
// --------------------------------
- DBClientReplicaSet::DBClientReplicaSet( const string& name , const vector<HostAndPort>& servers )
- : _monitor( ReplicaSetMonitor::get( name , servers ) ) {
+ DBClientReplicaSet::DBClientReplicaSet( const string& name , const vector<HostAndPort>& servers, double so_timeout )
+ : _monitor( ReplicaSetMonitor::get( name , servers ) ),
+ _so_timeout( so_timeout ) {
}
DBClientReplicaSet::~DBClientReplicaSet() {
@@ -446,7 +541,7 @@ namespace mongo {
}
_masterHost = _monitor->getMaster();
- _master.reset( new DBClientConnection( true , this ) );
+ _master.reset( new DBClientConnection( true , this , _so_timeout ) );
string errmsg;
if ( ! _master->connect( _masterHost , errmsg ) ) {
_monitor->notifyFailure( _masterHost );
@@ -464,12 +559,12 @@ namespace mongo {
return _slave.get();
_monitor->notifySlaveFailure( _slaveHost );
_slaveHost = _monitor->getSlave();
- }
+ }
else {
_slaveHost = h;
}
- _slave.reset( new DBClientConnection( true , this ) );
+ _slave.reset( new DBClientConnection( true , this , _so_timeout ) );
_slave->connect( _slaveHost );
_auth( _slave.get() );
return _slave.get();
@@ -522,12 +617,12 @@ namespace mongo {
// ------------- simple functions -----------------
- void DBClientReplicaSet::insert( const string &ns , BSONObj obj ) {
- checkMaster()->insert(ns, obj);
+ void DBClientReplicaSet::insert( const string &ns , BSONObj obj , int flags) {
+ checkMaster()->insert(ns, obj, flags);
}
- void DBClientReplicaSet::insert( const string &ns, const vector< BSONObj >& v ) {
- checkMaster()->insert(ns, v);
+ void DBClientReplicaSet::insert( const string &ns, const vector< BSONObj >& v , int flags) {
+ checkMaster()->insert(ns, v, flags);
}
void DBClientReplicaSet::remove( const string &ns , Query obj , bool justOne ) {
@@ -545,12 +640,12 @@ namespace mongo {
// we're ok sending to a slave
// we'll try 2 slaves before just using master
// checkSlave will try a different slave automatically after a failure
- for ( int i=0; i<2; i++ ) {
+ for ( int i=0; i<3; i++ ) {
try {
return checkSlaveQueryResult( checkSlave()->query(ns,query,nToReturn,nToSkip,fieldsToReturn,queryOptions,batchSize) );
}
catch ( DBException &e ) {
- log() << "can't query replica set slave " << i << " : " << _slaveHost << e.what() << endl;
+ LOG(1) << "can't query replica set slave " << i << " : " << _slaveHost << causedBy( e ) << endl;
}
}
}
@@ -563,12 +658,12 @@ namespace mongo {
// we're ok sending to a slave
// we'll try 2 slaves before just using master
// checkSlave will try a different slave automatically after a failure
- for ( int i=0; i<2; i++ ) {
+ for ( int i=0; i<3; i++ ) {
try {
return checkSlave()->findOne(ns,query,fieldsToReturn,queryOptions);
}
catch ( DBException &e ) {
- LOG(1) << "can't findone replica set slave " << i << " : " << _slaveHost << e.what() << endl;
+ LOG(1) << "can't findone replica set slave " << i << " : " << _slaveHost << causedBy( e ) << endl;
}
}
}
@@ -584,23 +679,22 @@ namespace mongo {
assert(0);
}
- auto_ptr<DBClientCursor> DBClientReplicaSet::checkSlaveQueryResult( auto_ptr<DBClientCursor> result ){
+ void DBClientReplicaSet::isntMaster() {
+ log() << "got not master for: " << _masterHost << endl;
+ _monitor->notifyFailure( _masterHost );
+ _master.reset();
+ }
- bool isError = result->hasResultFlag( ResultFlag_ErrSet );
+ auto_ptr<DBClientCursor> DBClientReplicaSet::checkSlaveQueryResult( auto_ptr<DBClientCursor> result ){
+ BSONObj error;
+ bool isError = result->peekError( &error );
if( ! isError ) return result;
- BSONObj error = result->peekOne();
-
- BSONElement code = error["code"];
- if( code.eoo() || ! code.isNumber() ){
- warning() << "no code for error from secondary host " << _slaveHost << ", error was " << error << endl;
- return result;
- }
-
// We only check for "not master or secondary" errors here
// If the error code here ever changes, we need to change this code also
- if( code.Int() == 13436 /* not master or secondary */ ){
+ BSONElement code = error["code"];
+ if( code.isNumber() && code.Int() == 13436 /* not master or secondary */ ){
isntSecondary();
throw DBException( str::stream() << "slave " << _slaveHost.toString() << " is no longer secondary", 14812 );
}
@@ -615,20 +709,123 @@ namespace mongo {
_slave.reset();
}
+ void DBClientReplicaSet::say( Message& toSend, bool isRetry ) {
- void DBClientReplicaSet::isntMaster() {
- log() << "got not master for: " << _masterHost << endl;
- _monitor->notifyFailure( _masterHost );
- _master.reset();
+ if( ! isRetry )
+ _lazyState = LazyState();
+
+ int lastOp = -1;
+ bool slaveOk = false;
+
+ if ( ( lastOp = toSend.operation() ) == dbQuery ) {
+ // TODO: might be possible to do this faster by changing api
+ DbMessage dm( toSend );
+ QueryMessage qm( dm );
+ if ( ( slaveOk = ( qm.queryOptions & QueryOption_SlaveOk ) ) ) {
+
+ for ( int i = _lazyState._retries; i < 3; i++ ) {
+ try {
+ DBClientConnection* slave = checkSlave();
+ slave->say( toSend );
+
+ _lazyState._lastOp = lastOp;
+ _lazyState._slaveOk = slaveOk;
+ _lazyState._retries = i;
+ _lazyState._lastClient = slave;
+ return;
+ }
+ catch ( DBException &e ) {
+ LOG(1) << "can't callLazy replica set slave " << i << " : " << _slaveHost << causedBy( e ) << endl;
+ }
+ }
+ }
+ }
+
+ DBClientConnection* master = checkMaster();
+ master->say( toSend );
+
+ _lazyState._lastOp = lastOp;
+ _lazyState._slaveOk = slaveOk;
+ _lazyState._retries = 3;
+ _lazyState._lastClient = master;
+ return;
+ }
+
+ bool DBClientReplicaSet::recv( Message& m ) {
+
+ assert( _lazyState._lastClient );
+
+ // TODO: It would be nice if we could easily wrap a conn error as a result error
+ try {
+ return _lazyState._lastClient->recv( m );
+ }
+ catch( DBException& e ){
+ log() << "could not receive data from " << _lazyState._lastClient << causedBy( e ) << endl;
+ return false;
+ }
+ }
+
+ void DBClientReplicaSet::checkResponse( const char* data, int nReturned, bool* retry, string* targetHost ){
+
+ // For now, do exactly as we did before, so as not to break things. In general though, we
+ // should fix this so checkResponse has a more consistent contract.
+ if( ! retry ){
+ if( _lazyState._lastClient )
+ return _lazyState._lastClient->checkResponse( data, nReturned );
+ else
+ return checkMaster()->checkResponse( data, nReturned );
+ }
+
+ *retry = false;
+ if( targetHost && _lazyState._lastClient ) *targetHost = _lazyState._lastClient->getServerAddress();
+ else if (targetHost) *targetHost = "";
+
+ if( ! _lazyState._lastClient ) return;
+ if( nReturned != 1 && nReturned != -1 ) return;
+
+ BSONObj dataObj;
+ if( nReturned == 1 ) dataObj = BSONObj( data );
+
+ // Check if we should retry here
+ if( _lazyState._lastOp == dbQuery && _lazyState._slaveOk ){
+
+ // Check the error code for a slave not secondary error
+ if( nReturned == -1 ||
+ ( hasErrField( dataObj ) && ! dataObj["code"].eoo() && dataObj["code"].Int() == 13436 ) ){
+
+ bool wasMaster = false;
+ if( _lazyState._lastClient == _slave.get() ){
+ isntSecondary();
+ }
+ else if( _lazyState._lastClient == _master.get() ){
+ wasMaster = true;
+ isntMaster();
+ }
+ else
+ warning() << "passed " << dataObj << " but last rs client " << _lazyState._lastClient->toString() << " is not master or secondary" << endl;
+
+ if( _lazyState._retries < 3 ){
+ _lazyState._retries++;
+ *retry = true;
+ }
+ else{
+ (void)wasMaster; // silence set-but-not-used warning
+ // assert( wasMaster );
+ // printStackTrace();
+ log() << "too many retries (" << _lazyState._retries << "), could not get data from replica set" << endl;
+ }
+ }
+ }
}
+
bool DBClientReplicaSet::call( Message &toSend, Message &response, bool assertOk , string * actualServer ) {
if ( toSend.operation() == dbQuery ) {
// TODO: might be possible to do this faster by changing api
DbMessage dm( toSend );
QueryMessage qm( dm );
if ( qm.queryOptions & QueryOption_SlaveOk ) {
- for ( int i=0; i<2; i++ ) {
+ for ( int i=0; i<3; i++ ) {
try {
DBClientConnection* s = checkSlave();
if ( actualServer )
@@ -636,7 +833,7 @@ namespace mongo {
return s->call( toSend , response , assertOk );
}
catch ( DBException &e ) {
- LOG(1) << "can't call replica set slave " << i << " : " << _slaveHost << e.what() << endl;
+ LOG(1) << "can't call replica set slave " << i << " : " << _slaveHost << causedBy( e ) << endl;
if ( actualServer )
*actualServer = "";
}
diff --git a/client/dbclient_rs.h b/client/dbclient_rs.h
index 548b46a..b6948a0 100644
--- a/client/dbclient_rs.h
+++ b/client/dbclient_rs.h
@@ -1,4 +1,4 @@
-/** @file dbclient_rs.h - connect to a Replica Set, from C++ */
+/** @file dbclient_rs.h Connect to a Replica Set, from C++ */
/* Copyright 2009 10gen Inc.
*
@@ -43,10 +43,16 @@ namespace mongo {
static ReplicaSetMonitorPtr get( const string& name , const vector<HostAndPort>& servers );
/**
+ * gets a cached Monitor per name or will return none if it doesn't exist
+ */
+ static ReplicaSetMonitorPtr get( const string& name );
+
+
+ /**
* checks all sets for current master and new secondaries
* usually only called from a BackgroundJob
*/
- static void checkAll();
+ static void checkAll( bool checkAllSecondaries );
/**
* this is called whenever the config of any repclia set changes
@@ -81,13 +87,15 @@ namespace mongo {
/**
* checks for current master and new secondaries
*/
- void check();
+ void check( bool checkAllSecondaries );
string getName() const { return _name; }
string getServerAddress() const;
bool contains( const string& server ) const;
+
+ void appendInfo( BSONObjBuilder& b ) const;
private:
/**
@@ -98,7 +106,7 @@ namespace mongo {
*/
ReplicaSetMonitor( const string& name , const vector<HostAndPort>& servers );
- void _check();
+ void _check( bool checkAllSecondaries );
/**
* Use replSetGetStatus command to make sure hosts in host list are up
@@ -119,9 +127,10 @@ namespace mongo {
* @param c the connection to check
* @param maybePrimary OUT
* @param verbose
+ * @param nodesOffset - offset into _nodes array, -1 for not in it
* @return if the connection is good
*/
- bool _checkConnection( DBClientConnection * c , string& maybePrimary , bool verbose );
+ bool _checkConnection( DBClientConnection * c , string& maybePrimary , bool verbose , int nodesOffset );
int _find( const string& server ) const ;
int _find_inlock( const string& server ) const ;
@@ -132,14 +141,44 @@ namespace mongo {
string _name;
struct Node {
- Node( const HostAndPort& a , DBClientConnection* c ) : addr( a ) , conn(c) , ok(true) {}
+ Node( const HostAndPort& a , DBClientConnection* c )
+ : addr( a ) , conn(c) , ok(true) ,
+ ismaster(false), secondary( false ) , hidden( false ) , pingTimeMillis(0) {
+ }
+
+ bool okForSecondaryQueries() const {
+ return ok && secondary && ! hidden;
+ }
+
+ BSONObj toBSON() const {
+ return BSON( "addr" << addr.toString() <<
+ "isMaster" << ismaster <<
+ "secondary" << secondary <<
+ "hidden" << hidden <<
+ "ok" << ok );
+ }
+
+ string toString() const {
+ return toBSON().toString();
+ }
+
HostAndPort addr;
- DBClientConnection* conn;
+ shared_ptr<DBClientConnection> conn;
// if this node is in a failure state
// used for slave routing
// this is too simple, should make it better
bool ok;
+
+ // as reported by ismaster
+ BSONObj lastIsMaster;
+
+ bool ismaster;
+ bool secondary;
+ bool hidden;
+
+ int pingTimeMillis;
+
};
/**
@@ -168,7 +207,7 @@ namespace mongo {
public:
/** Call connect() after constructing. autoReconnect is always on for DBClientReplicaSet connections. */
- DBClientReplicaSet( const string& name , const vector<HostAndPort>& servers );
+ DBClientReplicaSet( const string& name , const vector<HostAndPort>& servers, double so_timeout=0 );
virtual ~DBClientReplicaSet();
/** Returns false if nomember of the set were reachable, or neither is
@@ -191,11 +230,11 @@ namespace mongo {
/** throws userassertion "no master found" */
virtual BSONObj findOne(const string &ns, const Query& query, const BSONObj *fieldsToReturn = 0, int queryOptions = 0);
- virtual void insert( const string &ns , BSONObj obj );
+ virtual void insert( const string &ns , BSONObj obj , int flags=0);
/** insert multiple objects. Note that single object insert is asynchronous, so this version
is only nominally faster and not worth a special effort to try to use. */
- virtual void insert( const string &ns, const vector< BSONObj >& v );
+ virtual void insert( const string &ns, const vector< BSONObj >& v , int flags=0);
virtual void remove( const string &ns , Query obj , bool justOne = 0 );
@@ -210,11 +249,14 @@ namespace mongo {
// ---- callback pieces -------
- virtual void checkResponse( const char *data, int nReturned ) { checkMaster()->checkResponse( data , nReturned ); }
+ virtual void say( Message &toSend, bool isRetry = false );
+ virtual bool recv( Message &toRecv );
+ virtual void checkResponse( const char* data, int nReturned, bool* retry = NULL, string* targetHost = NULL );
/* this is the callback from our underlying connections to notify us that we got a "not master" error.
*/
void isntMaster();
+
/* this is used to indicate we got a "not master or secondary" error from a secondary.
*/
void isntSecondary();
@@ -225,16 +267,18 @@ namespace mongo {
// ----- informational ----
+ double getSoTimeout() const { return _so_timeout; }
+
string toString() { return getServerAddress(); }
string getServerAddress() const { return _monitor->getServerAddress(); }
virtual ConnectionString::ConnectionType type() const { return ConnectionString::SET; }
+ virtual bool lazySupported() const { return true; }
// ---- low level ------
virtual bool call( Message &toSend, Message &response, bool assertOk=true , string * actualServer = 0 );
- virtual void say( Message &toSend ) { checkMaster()->say( toSend ); }
virtual bool callRead( Message& toSend , Message& response ) { return checkMaster()->callRead( toSend , response ); }
@@ -258,6 +302,8 @@ namespace mongo {
HostAndPort _slaveHost;
scoped_ptr<DBClientConnection> _slave;
+
+ double _so_timeout;
/**
* for storing authentication info
@@ -277,6 +323,22 @@ namespace mongo {
// this could be a security issue, as the password is stored in memory
// not sure if/how we should handle
list<AuthInfo> _auths;
+
+ protected:
+
+ /**
+ * for storing (non-threadsafe) information between lazy calls
+ */
+ class LazyState {
+ public:
+ LazyState() : _lastClient( NULL ), _lastOp( -1 ), _slaveOk( false ), _retries( 0 ) {}
+ DBClientConnection* _lastClient;
+ int _lastOp;
+ bool _slaveOk;
+ int _retries;
+
+ } _lazyState;
+
};
diff --git a/client/dbclientcursor.cpp b/client/dbclientcursor.cpp
index 6c6afc0..5db360e 100644
--- a/client/dbclientcursor.cpp
+++ b/client/dbclientcursor.cpp
@@ -37,8 +37,7 @@ namespace mongo {
return batchSize < nToReturn ? batchSize : nToReturn;
}
- bool DBClientCursor::init() {
- Message toSend;
+ void DBClientCursor::_assembleInit( Message& toSend ) {
if ( !cursorId ) {
assembleRequest( ns, query, nextBatchSize() , nToSkip, fieldsToReturn, opts, toSend );
}
@@ -50,12 +49,18 @@ namespace mongo {
b.appendNum( cursorId );
toSend.setData( dbGetMore, b.buf(), b.len() );
}
- if ( !_client->call( toSend, *m, false ) ) {
+ }
+
+ bool DBClientCursor::init() {
+ Message toSend;
+ _assembleInit( toSend );
+
+ if ( !_client->call( toSend, *b.m, false ) ) {
// log msg temp?
log() << "DBClientCursor::init call() failed" << endl;
return false;
}
- if ( m->empty() ) {
+ if ( b.m->empty() ) {
// log msg temp?
log() << "DBClientCursor::init message from call() was empty" << endl;
return false;
@@ -63,12 +68,41 @@ namespace mongo {
dataReceived();
return true;
}
+
+ void DBClientCursor::initLazy( bool isRetry ) {
+ verify( 15875 , _client->lazySupported() );
+ Message toSend;
+ _assembleInit( toSend );
+ _client->say( toSend, isRetry );
+ }
+
+ bool DBClientCursor::initLazyFinish( bool& retry ) {
+
+ bool recvd = _client->recv( *b.m );
+
+ // If we get a bad response, return false
+ if ( ! recvd || b.m->empty() ) {
+
+ if( !recvd )
+ log() << "DBClientCursor::init lazy say() failed" << endl;
+ if( b.m->empty() )
+ log() << "DBClientCursor::init message from say() was empty" << endl;
+
+ _client->checkResponse( NULL, -1, &retry, &_lazyHost );
+
+ return false;
+
+ }
+
+ dataReceived( retry, _lazyHost );
+ return ! retry;
+ }
void DBClientCursor::requestMore() {
- assert( cursorId && pos == nReturned );
+ assert( cursorId && b.pos == b.nReturned );
if (haveLimit) {
- nToReturn -= nReturned;
+ nToReturn -= b.nReturned;
assert(nToReturn > 0);
}
BufBuilder b;
@@ -83,7 +117,7 @@ namespace mongo {
if ( _client ) {
_client->call( toSend, *response );
- m = response;
+ this->b.m = response;
dataReceived();
}
else {
@@ -91,7 +125,7 @@ namespace mongo {
ScopedDbConnection conn( _scopedHost );
conn->call( toSend , *response );
_client = conn.get();
- m = response;
+ this->b.m = response;
dataReceived();
_client = 0;
conn.done();
@@ -100,19 +134,24 @@ namespace mongo {
/** with QueryOption_Exhaust, the server just blasts data at us (marked at end with cursorid==0). */
void DBClientCursor::exhaustReceiveMore() {
- assert( cursorId && pos == nReturned );
+ assert( cursorId && b.pos == b.nReturned );
assert( !haveLimit );
auto_ptr<Message> response(new Message());
assert( _client );
_client->recv(*response);
- m = response;
+ b.m = response;
dataReceived();
}
- void DBClientCursor::dataReceived() {
- QueryResult *qr = (QueryResult *) m->singleData();
+ void DBClientCursor::dataReceived( bool& retry, string& host ) {
+
+ QueryResult *qr = (QueryResult *) b.m->singleData();
resultFlags = qr->resultFlags();
+ if ( qr->resultFlags() & ResultFlag_ErrSet ) {
+ wasError = true;
+ }
+
if ( qr->resultFlags() & ResultFlag_CursorNotFound ) {
// cursor id no longer valid at the server.
assert( qr->cursorId == 0 );
@@ -127,11 +166,12 @@ namespace mongo {
cursorId = qr->cursorId;
}
- nReturned = qr->nReturned;
- pos = 0;
- data = qr->data();
+ b.nReturned = qr->nReturned;
+ b.pos = 0;
+ b.data = qr->data();
+
+ _client->checkResponse( b.data, b.nReturned, &retry, &host ); // watches for "not master"
- _client->checkResponse( data, nReturned );
/* this assert would fire the way we currently work:
assert( nReturned || cursorId == 0 );
*/
@@ -144,17 +184,17 @@ namespace mongo {
if ( !_putBack.empty() )
return true;
- if (haveLimit && pos >= nToReturn)
+ if (haveLimit && b.pos >= nToReturn)
return false;
- if ( pos < nReturned )
+ if ( b.pos < b.nReturned )
return true;
if ( cursorId == 0 )
return false;
requestMore();
- return pos < nReturned;
+ return b.pos < b.nReturned;
}
BSONObj DBClientCursor::next() {
@@ -165,11 +205,11 @@ namespace mongo {
return ret;
}
- uassert(13422, "DBClientCursor next() called but more() is false", pos < nReturned);
+ uassert(13422, "DBClientCursor next() called but more() is false", b.pos < b.nReturned);
- pos++;
- BSONObj o(data);
- data += o.objsize();
+ b.pos++;
+ BSONObj o(b.data);
+ b.data += o.objsize();
/* todo would be good to make data null at end of batch for safety */
return o;
}
@@ -187,9 +227,9 @@ namespace mongo {
}
*/
- int p = pos;
- const char *d = data;
- while( m && p < nReturned ) {
+ int p = b.pos;
+ const char *d = b.data;
+ while( m && p < b.nReturned ) {
BSONObj o(d);
d += o.objsize();
p++;
@@ -198,6 +238,19 @@ namespace mongo {
}
}
+ bool DBClientCursor::peekError(BSONObj* error){
+ if( ! wasError ) return false;
+
+ vector<BSONObj> v;
+ peek(v, 1);
+
+ assert( v.size() == 1 );
+ assert( hasErrField( v[0] ) );
+
+ if( error ) *error = v[0].getOwned();
+ return true;
+ }
+
void DBClientCursor::attach( AScopedConnection * conn ) {
assert( _scopedHost.size() == 0 );
assert( conn );
@@ -205,14 +258,20 @@ namespace mongo {
if ( conn->get()->type() == ConnectionString::SET ||
conn->get()->type() == ConnectionString::SYNC ) {
- _scopedHost = _client->getServerAddress();
+ if( _lazyHost.size() > 0 )
+ _scopedHost = _lazyHost;
+ else if( _client )
+ _scopedHost = _client->getServerAddress();
+ else
+ massert(14821, "No client or lazy client specified, cannot store multi-host connection.", false);
}
else {
_scopedHost = conn->getHost();
}
-
+
conn->done();
_client = 0;
+ _lazyHost = "";
}
DBClientCursor::~DBClientCursor() {
@@ -221,12 +280,12 @@ namespace mongo {
DESTRUCTOR_GUARD (
- if ( cursorId && _ownCursor ) {
- BufBuilder b;
- b.appendNum( (int)0 ); // reserved
+ if ( cursorId && _ownCursor && ! inShutdown() ) {
+ BufBuilder b;
+ b.appendNum( (int)0 ); // reserved
b.appendNum( (int)1 ); // number
b.appendNum( cursorId );
-
+
Message m;
m.setData( dbKillCursors , b.buf() , b.len() );
diff --git a/client/dbclientcursor.h b/client/dbclientcursor.h
index d176b89..977bd30 100644
--- a/client/dbclientcursor.h
+++ b/client/dbclientcursor.h
@@ -18,7 +18,7 @@
#pragma once
#include "../pch.h"
-#include "../util/message.h"
+#include "../util/net/message.h"
#include "../db/jsobj.h"
#include "../db/json.h"
#include <stack>
@@ -52,7 +52,7 @@ namespace mongo {
if you want to exhaust whatever data has been fetched to the client already but
then perhaps stop.
*/
- int objsLeftInBatch() const { _assertIfNull(); return _putBack.size() + nReturned - pos; }
+ int objsLeftInBatch() const { _assertIfNull(); return _putBack.size() + b.nReturned - b.pos; }
bool moreInCurrentBatch() { return objsLeftInBatch() > 0; }
/** next
@@ -71,11 +71,11 @@ namespace mongo {
/** throws AssertionException if get back { $err : ... } */
BSONObj nextSafe() {
BSONObj o = next();
- BSONElement e = o.firstElement();
- if( strcmp(e.fieldName(), "$err") == 0 ) {
+ if( strcmp(o.firstElementFieldName(), "$err") == 0 ) {
+ string s = "nextSafe(): " + o.toString();
if( logLevel >= 5 )
- log() << "nextSafe() error " << o.toString() << endl;
- uassert(13106, "nextSafe(): " + o.toString(), false);
+ log() << s << endl;
+ uasserted(13106, s);
}
return o;
}
@@ -86,11 +86,11 @@ namespace mongo {
WARNING: no support for _putBack yet!
*/
void peek(vector<BSONObj>&, int atMost);
- BSONObj peekOne(){
- vector<BSONObj> v;
- peek( v, 1 );
- return v.size() > 0 ? v[0] : BSONObj();
- }
+
+ /**
+ * peek ahead and see if an error occurred, and get the error if so.
+ */
+ bool peekError(BSONObj* error = NULL);
/**
iterate the rest of the cursor and return the number if items
@@ -109,13 +109,9 @@ namespace mongo {
'dead' may be preset yet some data still queued and locally
available from the dbclientcursor.
*/
- bool isDead() const {
- return !this || cursorId == 0;
- }
+ bool isDead() const { return !this || cursorId == 0; }
- bool tailable() const {
- return (opts & QueryOption_CursorTailable) != 0;
- }
+ bool tailable() const { return (opts & QueryOption_CursorTailable) != 0; }
/** see ResultFlagType (constants.h) for flag values
mostly these flags are for internal purposes -
@@ -137,12 +133,9 @@ namespace mongo {
fieldsToReturn(_fieldsToReturn),
opts(queryOptions),
batchSize(bs==1?2:bs),
- m(new Message()),
cursorId(),
- nReturned(),
- pos(),
- data(),
- _ownCursor( true ) {
+ _ownCursor( true ),
+ wasError( false ) {
}
DBClientCursor( DBClientBase* client, const string &_ns, long long _cursorId, int _nToReturn, int options ) :
@@ -151,11 +144,7 @@ namespace mongo {
nToReturn( _nToReturn ),
haveLimit( _nToReturn > 0 && !(options & QueryOption_CursorTailable)),
opts( options ),
- m(new Message()),
- cursorId( _cursorId ),
- nReturned(),
- pos(),
- data(),
+ cursorId(_cursorId),
_ownCursor( true ) {
}
@@ -170,11 +159,31 @@ namespace mongo {
void attach( AScopedConnection * conn );
+ /**
+ * actually does the query
+ */
+ bool init();
+
+ void initLazy( bool isRetry = false );
+ bool initLazyFinish( bool& retry );
+
+ class Batch : boost::noncopyable {
+ friend class DBClientCursor;
+ auto_ptr<Message> m;
+ int nReturned;
+ int pos;
+ const char *data;
+ public:
+ Batch() : m( new Message() ), nReturned(), pos(), data() { }
+ };
+
private:
friend class DBClientBase;
friend class DBClientConnection;
- bool init();
+
int nextBatchSize();
+
+ Batch b;
DBClientBase* _client;
string ns;
BSONObj query;
@@ -184,18 +193,18 @@ namespace mongo {
const BSONObj *fieldsToReturn;
int opts;
int batchSize;
- auto_ptr<Message> m;
stack< BSONObj > _putBack;
int resultFlags;
long long cursorId;
- int nReturned;
- int pos;
- const char *data;
- void dataReceived();
- void requestMore();
- void exhaustReceiveMore(); // for exhaust
bool _ownCursor; // see decouple()
string _scopedHost;
+ string _lazyHost;
+ bool wasError;
+
+ void dataReceived() { bool retry; string lazyHost; dataReceived( retry, lazyHost ); }
+ void dataReceived( bool& retry, string& lazyHost );
+ void requestMore();
+ void exhaustReceiveMore(); // for exhaust
// Don't call from a virtual function
void _assertIfNull() const { uassert(13348, "connection died", this); }
@@ -203,6 +212,9 @@ namespace mongo {
// non-copyable , non-assignable
DBClientCursor( const DBClientCursor& );
DBClientCursor& operator=( const DBClientCursor& );
+
+ // init pieces
+ void _assembleInit( Message& toSend );
};
/** iterate over objects in current batch only - will not cause a network call
diff --git a/client/distlock.cpp b/client/distlock.cpp
index 9ec98ea..cb71159 100644
--- a/client/distlock.cpp
+++ b/client/distlock.cpp
@@ -21,8 +21,7 @@
namespace mongo {
- static string lockPingNS = "config.lockpings";
- static string locksNS = "config.locks";
+ LabeledLevel DistributedLock::logLvl( 1 );
ThreadLocalValue<string> distLockIds("");
@@ -36,7 +35,7 @@ namespace mongo {
static void initModule() {
// cache process string
stringstream ss;
- ss << getHostName() << ":" << time(0) << ":" << rand();
+ ss << getHostName() << ":" << cmdLine.port << ":" << time(0) << ":" << rand();
_cachedProcessString = new string( ss.str() );
}
@@ -59,116 +58,406 @@ namespace mongo {
return s;
}
- void _distLockPingThread( ConnectionString addr ) {
- setThreadName( "LockPinger" );
-
- log() << "creating dist lock ping thread for: " << addr << endl;
- static int loops = 0;
- while( ! inShutdown() ) {
+ class DistributedLockPinger {
+ public:
- string process = getDistLockProcess();
- log(4) << "dist_lock about to ping for: " << process << endl;
+ DistributedLockPinger()
+ : _mutex( "DistributedLockPinger" ) {
+ }
- try {
- ScopedDbConnection conn( addr );
-
- // refresh the entry corresponding to this process in the lockpings collection
- conn->update( lockPingNS ,
- BSON( "_id" << process ) ,
- BSON( "$set" << BSON( "ping" << DATENOW ) ) ,
- true );
- string err = conn->getLastError();
- if ( ! err.empty() ) {
- warning() << "dist_lock process: " << process << " pinging: " << addr << " failed: "
- << err << endl;
- conn.done();
- sleepsecs(30);
- continue;
- }
+ void _distLockPingThread( ConnectionString addr, string process, unsigned long long sleepTime ) {
+
+ setThreadName( "LockPinger" );
+
+ string pingId = pingThreadId( addr, process );
+
+ log( DistributedLock::logLvl - 1 ) << "creating distributed lock ping thread for " << addr
+ << " and process " << process
+ << " (sleeping for " << sleepTime << "ms)" << endl;
+
+ static int loops = 0;
+ while( ! inShutdown() && ! shouldKill( addr, process ) ) {
+
+ log( DistributedLock::logLvl + 2 ) << "distributed lock pinger '" << pingId << "' about to ping." << endl;
+
+ Date_t pingTime;
+
+ try {
+ ScopedDbConnection conn( addr );
+
+ pingTime = jsTime();
- // remove really old entries from the lockpings collection if they're not holding a lock
- // (this may happen if an instance of a process was taken down and no new instance came up to
- // replace it for a quite a while)
- // if the lock is taken, the take-over mechanism should handle the situation
- auto_ptr<DBClientCursor> c = conn->query( locksNS , BSONObj() );
- vector<string> pids;
- while ( c->more() ) {
- BSONObj lock = c->next();
- if ( ! lock["process"].eoo() ) {
- pids.push_back( lock["process"].valuestrsafe() );
+ // refresh the entry corresponding to this process in the lockpings collection
+ conn->update( DistributedLock::lockPingNS ,
+ BSON( "_id" << process ) ,
+ BSON( "$set" << BSON( "ping" << pingTime ) ) ,
+ true );
+
+ string err = conn->getLastError();
+ if ( ! err.empty() ) {
+ warning() << "pinging failed for distributed lock pinger '" << pingId << "'."
+ << causedBy( err ) << endl;
+ conn.done();
+
+ // Sleep for normal ping time
+ sleepmillis(sleepTime);
+ continue;
+ }
+
+ // remove really old entries from the lockpings collection if they're not holding a lock
+ // (this may happen if an instance of a process was taken down and no new instance came up to
+ // replace it for a quite a while)
+ // if the lock is taken, the take-over mechanism should handle the situation
+ auto_ptr<DBClientCursor> c = conn->query( DistributedLock::locksNS , BSONObj() );
+ set<string> pids;
+ while ( c->more() ) {
+ BSONObj lock = c->next();
+ if ( ! lock["process"].eoo() ) {
+ pids.insert( lock["process"].valuestrsafe() );
+ }
+ }
+
+ Date_t fourDays = pingTime - ( 4 * 86400 * 1000 ); // 4 days
+ conn->remove( DistributedLock::lockPingNS , BSON( "_id" << BSON( "$nin" << pids ) << "ping" << LT << fourDays ) );
+ err = conn->getLastError();
+ if ( ! err.empty() ) {
+ warning() << "ping cleanup for distributed lock pinger '" << pingId << " failed."
+ << causedBy( err ) << endl;
+ conn.done();
+
+ // Sleep for normal ping time
+ sleepmillis(sleepTime);
+ continue;
+ }
+
+ // create index so remove is fast even with a lot of servers
+ if ( loops++ == 0 ) {
+ conn->ensureIndex( DistributedLock::lockPingNS , BSON( "ping" << 1 ) );
+ }
+
+ log( DistributedLock::logLvl - ( loops % 10 == 0 ? 1 : 0 ) ) << "cluster " << addr << " pinged successfully at " << pingTime
+ << " by distributed lock pinger '" << pingId
+ << "', sleeping for " << sleepTime << "ms" << endl;
+
+ // Remove old locks, if possible
+ // Make sure no one else is adding to this list at the same time
+ scoped_lock lk( _mutex );
+
+ int numOldLocks = _oldLockOIDs.size();
+ if( numOldLocks > 0 )
+ log( DistributedLock::logLvl - 1 ) << "trying to delete " << _oldLockOIDs.size() << " old lock entries for process " << process << endl;
+
+ bool removed = false;
+ for( list<OID>::iterator i = _oldLockOIDs.begin(); i != _oldLockOIDs.end();
+ i = ( removed ? _oldLockOIDs.erase( i ) : ++i ) ) {
+ removed = false;
+ try {
+ // Got OID from lock with id, so we don't need to specify id again
+ conn->update( DistributedLock::locksNS ,
+ BSON( "ts" << *i ),
+ BSON( "$set" << BSON( "state" << 0 ) ) );
+
+ // Either the update went through or it didn't, either way we're done trying to
+ // unlock
+ log( DistributedLock::logLvl - 1 ) << "handled late remove of old distributed lock with ts " << *i << endl;
+ removed = true;
+ }
+ catch( UpdateNotTheSame& ) {
+ log( DistributedLock::logLvl - 1 ) << "partially removed old distributed lock with ts " << *i << endl;
+ removed = true;
+ }
+ catch ( std::exception& e) {
+ warning() << "could not remove old distributed lock with ts " << *i
+ << causedBy( e ) << endl;
+ }
+
+ }
+
+ if( numOldLocks > 0 && _oldLockOIDs.size() > 0 ){
+ log( DistributedLock::logLvl - 1 ) << "not all old lock entries could be removed for process " << process << endl;
}
- }
- Date_t fourDays = jsTime() - ( 4 * 86400 * 1000 ); // 4 days
- conn->remove( lockPingNS , BSON( "_id" << BSON( "$nin" << pids ) << "ping" << LT << fourDays ) );
- err = conn->getLastError();
- if ( ! err.empty() ) {
- warning() << "dist_lock cleanup request from process: " << process << " to: " << addr
- << " failed: " << err << endl;
conn.done();
- sleepsecs(30);
- continue;
- }
- // create index so remove is fast even with a lot of servers
- if ( loops++ == 0 ) {
- conn->ensureIndex( lockPingNS , BSON( "ping" << 1 ) );
+ }
+ catch ( std::exception& e ) {
+ warning() << "distributed lock pinger '" << pingId << "' detected an exception while pinging."
+ << causedBy( e ) << endl;
}
- conn.done();
+ sleepmillis(sleepTime);
+ }
+
+ warning() << "removing distributed lock ping thread '" << pingId << "'" << endl;
+
+
+ if( shouldKill( addr, process ) )
+ finishKill( addr, process );
+
+ }
+
+ void distLockPingThread( ConnectionString addr, long long clockSkew, string processId, unsigned long long sleepTime ) {
+ try {
+ jsTimeVirtualThreadSkew( clockSkew );
+ _distLockPingThread( addr, processId, sleepTime );
}
catch ( std::exception& e ) {
- warning() << "dist_lock exception during ping: " << e.what() << endl;
+ error() << "unexpected error while running distributed lock pinger for " << addr << ", process " << processId << causedBy( e ) << endl;
}
+ catch ( ... ) {
+ error() << "unknown error while running distributed lock pinger for " << addr << ", process " << processId << endl;
+ }
+ }
- log( loops % 10 == 0 ? 0 : 1) << "dist_lock pinged successfully for: " << process << endl;
- sleepsecs(30);
+ string pingThreadId( const ConnectionString& conn, const string& processId ) {
+ return conn.toString() + "/" + processId;
}
- }
- void distLockPingThread( ConnectionString addr ) {
- try {
- _distLockPingThread( addr );
+ string got( DistributedLock& lock, unsigned long long sleepTime ) {
+
+ // Make sure we don't start multiple threads for a process id
+ scoped_lock lk( _mutex );
+
+ const ConnectionString& conn = lock.getRemoteConnection();
+ const string& processId = lock.getProcessId();
+ string s = pingThreadId( conn, processId );
+
+ // Ignore if we already have a pinging thread for this process.
+ if ( _seen.count( s ) > 0 ) return "";
+
+ // Check our clock skew
+ try {
+ if( lock.isRemoteTimeSkewed() ) {
+ throw LockException( str::stream() << "clock skew of the cluster " << conn.toString() << " is too far out of bounds to allow distributed locking." , 13650 );
+ }
+ }
+ catch( LockException& e) {
+ throw LockException( str::stream() << "error checking clock skew of cluster " << conn.toString() << causedBy( e ) , 13651);
+ }
+
+ boost::thread t( boost::bind( &DistributedLockPinger::distLockPingThread, this, conn, getJSTimeVirtualThreadSkew(), processId, sleepTime) );
+
+ _seen.insert( s );
+
+ return s;
}
- catch ( std::exception& e ) {
- error() << "unexpected error in distLockPingThread: " << e.what() << endl;
+
+ void addUnlockOID( const OID& oid ) {
+ // Modifying the lock from some other thread
+ scoped_lock lk( _mutex );
+ _oldLockOIDs.push_back( oid );
}
- catch ( ... ) {
- error() << "unexpected unknown error in distLockPingThread" << endl;
+
+ bool willUnlockOID( const OID& oid ) {
+ scoped_lock lk( _mutex );
+ return find( _oldLockOIDs.begin(), _oldLockOIDs.end(), oid ) != _oldLockOIDs.end();
}
- }
+ void kill( const ConnectionString& conn, const string& processId ) {
+ // Make sure we're in a consistent state before other threads can see us
+ scoped_lock lk( _mutex );
- class DistributedLockPinger {
- public:
- DistributedLockPinger()
- : _mutex( "DistributedLockPinger" ) {
+ string pingId = pingThreadId( conn, processId );
+
+ assert( _seen.count( pingId ) > 0 );
+ _kill.insert( pingId );
+
+ }
+
+ bool shouldKill( const ConnectionString& conn, const string& processId ) {
+ return _kill.count( pingThreadId( conn, processId ) ) > 0;
}
- void got( const ConnectionString& conn ) {
- string s = conn.toString();
+ void finishKill( const ConnectionString& conn, const string& processId ) {
+ // Make sure we're in a consistent state before other threads can see us
scoped_lock lk( _mutex );
- if ( _seen.count( s ) > 0 )
- return;
- boost::thread t( boost::bind( &distLockPingThread , conn ) );
- _seen.insert( s );
+
+ string pingId = pingThreadId( conn, processId );
+
+ _kill.erase( pingId );
+ _seen.erase( pingId );
+
}
+ set<string> _kill;
set<string> _seen;
mongo::mutex _mutex;
+ list<OID> _oldLockOIDs;
} distLockPinger;
- DistributedLock::DistributedLock( const ConnectionString& conn , const string& name , unsigned takeoverMinutes )
- : _conn(conn),_name(name),_takeoverMinutes(takeoverMinutes) {
- _id = BSON( "_id" << name );
- _ns = "config.locks";
- distLockPinger.got( conn );
+
+ const string DistributedLock::lockPingNS = "config.lockpings";
+ const string DistributedLock::locksNS = "config.locks";
+
+ /**
+ * Create a new distributed lock, potentially with a custom sleep and takeover time. If a custom sleep time is
+ * specified (time between pings)
+ */
+ DistributedLock::DistributedLock( const ConnectionString& conn , const string& name , unsigned long long lockTimeout, bool asProcess )
+ : _conn(conn) , _name(name) , _id( BSON( "_id" << name ) ), _processId( asProcess ? getDistLockId() : getDistLockProcess() ),
+ _lockTimeout( lockTimeout == 0 ? LOCK_TIMEOUT : lockTimeout ), _maxClockSkew( _lockTimeout / LOCK_SKEW_FACTOR ), _maxNetSkew( _maxClockSkew ), _lockPing( _maxClockSkew ),
+ _mutex( "DistributedLock" )
+ {
+ log( logLvl - 1 ) << "created new distributed lock for " << name << " on " << conn
+ << " ( lock timeout : " << _lockTimeout
+ << ", ping interval : " << _lockPing << ", process : " << asProcess << " )" << endl;
+ }
+
+ Date_t DistributedLock::getRemoteTime() {
+ return DistributedLock::remoteTime( _conn, _maxNetSkew );
+ }
+
+ bool DistributedLock::isRemoteTimeSkewed() {
+ return !DistributedLock::checkSkew( _conn, NUM_LOCK_SKEW_CHECKS, _maxClockSkew, _maxNetSkew );
+ }
+
+ const ConnectionString& DistributedLock::getRemoteConnection() {
+ return _conn;
+ }
+
+ const string& DistributedLock::getProcessId() {
+ return _processId;
+ }
+
+ /**
+ * Returns the remote time as reported by the cluster or server. The maximum difference between the reported time
+ * and the actual time on the remote server (at the completion of the function) is the maxNetSkew
+ */
+ Date_t DistributedLock::remoteTime( const ConnectionString& cluster, unsigned long long maxNetSkew ) {
+
+ ConnectionString server( *cluster.getServers().begin() );
+ ScopedDbConnection conn( server );
+
+ BSONObj result;
+ long long delay;
+
+ try {
+ Date_t then = jsTime();
+ bool success = conn->runCommand( string("admin"), BSON( "serverStatus" << 1 ), result );
+ delay = jsTime() - then;
+
+ if( !success )
+ throw TimeNotFoundException( str::stream() << "could not get status from server "
+ << server.toString() << " in cluster " << cluster.toString()
+ << " to check time", 13647 );
+
+ // Make sure that our delay is not more than 2x our maximum network skew, since this is the max our remote
+ // time value can be off by if we assume a response in the middle of the delay.
+ if( delay > (long long) (maxNetSkew * 2) )
+ throw TimeNotFoundException( str::stream() << "server " << server.toString()
+ << " in cluster " << cluster.toString()
+ << " did not respond within max network delay of "
+ << maxNetSkew << "ms", 13648 );
+ }
+ catch(...) {
+ conn.done();
+ throw;
+ }
+
+ conn.done();
+
+ return result["localTime"].Date() - (delay / 2);
+
+ }
+
+ bool DistributedLock::checkSkew( const ConnectionString& cluster, unsigned skewChecks, unsigned long long maxClockSkew, unsigned long long maxNetSkew ) {
+
+ vector<HostAndPort> servers = cluster.getServers();
+
+ if(servers.size() < 1) return true;
+
+ vector<long long> avgSkews;
+
+ for(unsigned i = 0; i < skewChecks; i++) {
+
+ // Find the average skew for each server
+ unsigned s = 0;
+ for(vector<HostAndPort>::iterator si = servers.begin(); si != servers.end(); ++si,s++) {
+
+ if(i == 0) avgSkews.push_back(0);
+
+ // Could check if this is self, but shouldn't matter since local network connection should be fast.
+ ConnectionString server( *si );
+
+ vector<long long> skew;
+
+ BSONObj result;
+
+ Date_t remote = remoteTime( server, maxNetSkew );
+ Date_t local = jsTime();
+
+ // Remote time can be delayed by at most MAX_NET_SKEW
+
+ // Skew is how much time we'd have to add to local to get to remote
+ avgSkews[s] += (long long) (remote - local);
+
+ log( logLvl + 1 ) << "skew from remote server " << server << " found: " << (long long) (remote - local) << endl;
+
+ }
+ }
+
+ // Analyze skews
+
+ long long serverMaxSkew = 0;
+ long long serverMinSkew = 0;
+
+ for(unsigned s = 0; s < avgSkews.size(); s++) {
+
+ long long avgSkew = (avgSkews[s] /= skewChecks);
+
+ // Keep track of max and min skews
+ if(s == 0) {
+ serverMaxSkew = avgSkew;
+ serverMinSkew = avgSkew;
+ }
+ else {
+ if(avgSkew > serverMaxSkew)
+ serverMaxSkew = avgSkew;
+ if(avgSkew < serverMinSkew)
+ serverMinSkew = avgSkew;
+ }
+
+ }
+
+ long long totalSkew = serverMaxSkew - serverMinSkew;
+
+ // Make sure our max skew is not more than our pre-set limit
+ if(totalSkew > (long long) maxClockSkew) {
+ log( logLvl + 1 ) << "total clock skew of " << totalSkew << "ms for servers " << cluster << " is out of " << maxClockSkew << "ms bounds." << endl;
+ return false;
+ }
+
+ log( logLvl + 1 ) << "total clock skew of " << totalSkew << "ms for servers " << cluster << " is in " << maxClockSkew << "ms bounds." << endl;
+ return true;
+ }
+
+ // For use in testing, ping thread should run indefinitely in practice.
+ bool DistributedLock::killPinger( DistributedLock& lock ) {
+ if( lock._threadId == "") return false;
+
+ distLockPinger.kill( lock._conn, lock._processId );
+ return true;
}
+ // Semantics of this method are basically that if the lock cannot be acquired, returns false, can be retried.
+ // If the lock should not be tried again (some unexpected error) a LockException is thrown.
+ // If we are only trying to re-enter a currently held lock, reenter should be true.
+ // Note: reenter doesn't actually make this lock re-entrant in the normal sense, since it can still only
+ // be unlocked once, instead it is used to verify that the lock is already held.
+ bool DistributedLock::lock_try( const string& why , bool reenter, BSONObj * other ) {
+
+ // TODO: Start pinging only when we actually get the lock?
+ // If we don't have a thread pinger, make sure we shouldn't have one
+ if( _threadId == "" ){
+ scoped_lock lk( _mutex );
+ _threadId = distLockPinger.got( *this, _lockPing );
+ }
+
+ // This should always be true, if not, we are using the lock incorrectly.
+ assert( _name != "" );
- bool DistributedLock::lock_try( string why , BSONObj * other ) {
// write to dummy if 'other' is null
BSONObj dummyOther;
if ( other == NULL )
@@ -182,93 +471,240 @@ namespace mongo {
{
// make sure its there so we can use simple update logic below
- BSONObj o = conn->findOne( _ns , _id ).getOwned();
+ BSONObj o = conn->findOne( locksNS , _id ).getOwned();
+
+ // Case 1: No locks
if ( o.isEmpty() ) {
try {
- log(4) << "dist_lock inserting initial doc in " << _ns << " for lock " << _name << endl;
- conn->insert( _ns , BSON( "_id" << _name << "state" << 0 << "who" << "" ) );
+ log( logLvl ) << "inserting initial doc in " << locksNS << " for lock " << _name << endl;
+ conn->insert( locksNS , BSON( "_id" << _name << "state" << 0 << "who" << "" ) );
}
catch ( UserException& e ) {
- log() << "dist_lock could not insert initial doc: " << e << endl;
+ warning() << "could not insert initial doc for distributed lock " << _name << causedBy( e ) << endl;
}
}
-
+
+ // Case 2: A set lock that we might be able to force
else if ( o["state"].numberInt() > 0 ) {
+
+ string lockName = o["_id"].String() + string("/") + o["process"].String();
+
+ bool canReenter = reenter && o["process"].String() == _processId && ! distLockPinger.willUnlockOID( o["ts"].OID() ) && o["state"].numberInt() == 2;
+ if( reenter && ! canReenter ) {
+ log( logLvl - 1 ) << "not re-entering distributed lock " << lockName;
+ if( o["process"].String() != _processId ) log( logLvl - 1 ) << ", different process " << _processId << endl;
+ else if( o["state"].numberInt() == 2 ) log( logLvl - 1 ) << ", state not finalized" << endl;
+ else log( logLvl - 1 ) << ", ts " << o["ts"].OID() << " scheduled for late unlock" << endl;
+
+ // reset since we've been bounced by a previous lock not being where we thought it was,
+ // and should go through full forcing process if required.
+ // (in theory we should never see a ping here if used correctly)
+ *other = o; other->getOwned(); conn.done(); resetLastPing();
+ return false;
+ }
+
BSONObj lastPing = conn->findOne( lockPingNS , o["process"].wrap( "_id" ) );
if ( lastPing.isEmpty() ) {
- // if a lock is taken but there's no ping for it, we're in an inconsistent situation
- // if the lock holder (mongos or d) does not exist anymore, the lock could safely be removed
- // but we'd require analysis of the situation before a manual intervention
- error() << "config.locks: " << _name << " lock is taken by old process? "
- << "remove the following lock if the process is not active anymore: " << o << endl;
- *other = o;
- conn.done();
- return false;
+ log( logLvl ) << "empty ping found for process in lock '" << lockName << "'" << endl;
+ // TODO: Using 0 as a "no time found" value Will fail if dates roll over, but then, so will a lot.
+ lastPing = BSON( "_id" << o["process"].String() << "ping" << (Date_t) 0 );
}
- unsigned long long now = jsTime();
- unsigned long long pingTime = lastPing["ping"].Date();
-
- if ( now < pingTime ) {
- // clock skew
- warning() << "dist_lock has detected clock skew of " << ( pingTime - now ) << "ms" << endl;
- *other = o;
- conn.done();
- return false;
+ unsigned long long elapsed = 0;
+ unsigned long long takeover = _lockTimeout;
+
+ log( logLvl ) << "checking last ping for lock '" << lockName << "'" << " against process " << _lastPingCheck.get<0>() << " and ping " << _lastPingCheck.get<1>() << endl;
+
+ try {
+
+ Date_t remote = remoteTime( _conn );
+
+ // Timeout the elapsed time using comparisons of remote clock
+ // For non-finalized locks, timeout 15 minutes since last seen (ts)
+ // For finalized locks, timeout 15 minutes since last ping
+ bool recPingChange = o["state"].numberInt() == 2 && ( _lastPingCheck.get<0>() != lastPing["_id"].String() || _lastPingCheck.get<1>() != lastPing["ping"].Date() );
+ bool recTSChange = _lastPingCheck.get<3>() != o["ts"].OID();
+
+ if( recPingChange || recTSChange ) {
+ // If the ping has changed since we last checked, mark the current date and time
+ scoped_lock lk( _mutex );
+ _lastPingCheck = boost::tuple<string, Date_t, Date_t, OID>( lastPing["_id"].String().c_str(), lastPing["ping"].Date(), remote, o["ts"].OID() );
+ }
+ else {
+
+ // GOTCHA! Due to network issues, it is possible that the current time
+ // is less than the remote time. We *have* to check this here, otherwise
+ // we overflow and our lock breaks.
+ if(_lastPingCheck.get<2>() >= remote)
+ elapsed = 0;
+ else
+ elapsed = remote - _lastPingCheck.get<2>();
+ }
+
}
-
- unsigned long long elapsed = now - pingTime;
- elapsed = elapsed / ( 1000 * 60 ); // convert to minutes
-
- if ( elapsed > ( 60 * 24 * 365 * 100 ) /* 100 years */ ) {
- warning() << "distlock elapsed time seems impossible: " << lastPing << endl;
+ catch( LockException& e ) {
+
+ // Remote server cannot be found / is not responsive
+ warning() << "Could not get remote time from " << _conn << causedBy( e );
+ // If our config server is having issues, forget all the pings until we can see it again
+ resetLastPing();
+
}
-
- if ( elapsed <= _takeoverMinutes ) {
- log(1) << "dist_lock lock failed because taken by: " << o << " elapsed minutes: " << elapsed << endl;
- *other = o;
- conn.done();
+
+ if ( elapsed <= takeover && ! canReenter ) {
+ log( logLvl ) << "could not force lock '" << lockName << "' because elapsed time " << elapsed << " <= takeover time " << takeover << endl;
+ *other = o; other->getOwned(); conn.done();
return false;
}
-
- log() << "dist_lock forcefully taking over from: " << o << " elapsed minutes: " << elapsed << endl;
- conn->update( _ns , _id , BSON( "$set" << BSON( "state" << 0 ) ) );
- string err = conn->getLastError();
- if ( ! err.empty() ) {
- warning() << "dist_lock take over from: " << o << " failed: " << err << endl;
- *other = o.getOwned();
- other->getOwned();
- conn.done();
+ else if( elapsed > takeover && canReenter ) {
+ log( logLvl - 1 ) << "not re-entering distributed lock " << lockName << "' because elapsed time " << elapsed << " > takeover time " << takeover << endl;
+ *other = o; other->getOwned(); conn.done();
return false;
}
+ log( logLvl - 1 ) << ( canReenter ? "re-entering" : "forcing" ) << " lock '" << lockName << "' because "
+ << ( canReenter ? "re-entering is allowed, " : "" )
+ << "elapsed time " << elapsed << " > takeover time " << takeover << endl;
+
+ if( elapsed > takeover ) {
+
+ // Lock may forced, reset our timer if succeeds or fails
+ // Ensures that another timeout must happen if something borks up here, and resets our pristine
+ // ping state if acquired.
+ resetLastPing();
+
+ try {
+
+ // Check the clock skew again. If we check this before we get a lock
+ // and after the lock times out, we can be pretty sure the time is
+ // increasing at the same rate on all servers and therefore our
+ // timeout is accurate
+ uassert( 14023, str::stream() << "remote time in cluster " << _conn.toString() << " is now skewed, cannot force lock.", !isRemoteTimeSkewed() );
+
+ // Make sure we break the lock with the correct "ts" (OID) value, otherwise
+ // we can overwrite a new lock inserted in the meantime.
+ conn->update( locksNS , BSON( "_id" << _id["_id"].String() << "state" << o["state"].numberInt() << "ts" << o["ts"] ),
+ BSON( "$set" << BSON( "state" << 0 ) ) );
+
+ BSONObj err = conn->getLastErrorDetailed();
+ string errMsg = DBClientWithCommands::getLastErrorString(err);
+
+ // TODO: Clean up all the extra code to exit this method, probably with a refactor
+ if ( !errMsg.empty() || !err["n"].type() || err["n"].numberInt() < 1 ) {
+ ( errMsg.empty() ? log( logLvl - 1 ) : warning() ) << "Could not force lock '" << lockName << "' "
+ << ( !errMsg.empty() ? causedBy(errMsg) : string("(another force won)") ) << endl;
+ *other = o; other->getOwned(); conn.done();
+ return false;
+ }
+
+ }
+ catch( UpdateNotTheSame& ) {
+ // Ok to continue since we know we forced at least one lock document, and all lock docs
+ // are required for a lock to be held.
+ warning() << "lock forcing " << lockName << " inconsistent" << endl;
+ }
+ catch( std::exception& e ) {
+ conn.done();
+ throw LockException( str::stream() << "exception forcing distributed lock "
+ << lockName << causedBy( e ), 13660);
+ }
+
+ }
+ else {
+
+ assert( canReenter );
+
+ // Lock may be re-entered, reset our timer if succeeds or fails
+ // Not strictly necessary, but helpful for small timeouts where thread scheduling is significant.
+ // This ensures that two attempts are still required for a force if not acquired, and resets our
+ // state if we are acquired.
+ resetLastPing();
+
+ // Test that the lock is held by trying to update the finalized state of the lock to the same state
+ // if it does not update or does not update on all servers, we can't re-enter.
+ try {
+
+ // Test the lock with the correct "ts" (OID) value
+ conn->update( locksNS , BSON( "_id" << _id["_id"].String() << "state" << 2 << "ts" << o["ts"] ),
+ BSON( "$set" << BSON( "state" << 2 ) ) );
+
+ BSONObj err = conn->getLastErrorDetailed();
+ string errMsg = DBClientWithCommands::getLastErrorString(err);
+
+ // TODO: Clean up all the extra code to exit this method, probably with a refactor
+ if ( ! errMsg.empty() || ! err["n"].type() || err["n"].numberInt() < 1 ) {
+ ( errMsg.empty() ? log( logLvl - 1 ) : warning() ) << "Could not re-enter lock '" << lockName << "' "
+ << ( !errMsg.empty() ? causedBy(errMsg) : string("(not sure lock is held)") )
+ << " gle: " << err
+ << endl;
+ *other = o; other->getOwned(); conn.done();
+ return false;
+ }
+
+ }
+ catch( UpdateNotTheSame& ) {
+ // NOT ok to continue since our lock isn't held by all servers, so isn't valid.
+ warning() << "inconsistent state re-entering lock, lock " << lockName << " not held" << endl;
+ *other = o; other->getOwned(); conn.done();
+ return false;
+ }
+ catch( std::exception& e ) {
+ conn.done();
+ throw LockException( str::stream() << "exception re-entering distributed lock "
+ << lockName << causedBy( e ), 13660);
+ }
+
+ log( logLvl - 1 ) << "re-entered distributed lock '" << lockName << "'" << endl;
+ *other = o; other->getOwned(); conn.done();
+ return true;
+
+ }
+
+ log( logLvl - 1 ) << "lock '" << lockName << "' successfully forced" << endl;
+
+ // We don't need the ts value in the query, since we will only ever replace locks with state=0.
}
+ // Case 3: We have an expired lock
else if ( o["ts"].type() ) {
queryBuilder.append( o["ts"] );
}
}
- OID ts;
- ts.init();
+ // Always reset our ping if we're trying to get a lock, since getting a lock implies the lock state is open
+ // and no locks need to be forced. If anything goes wrong, we don't want to remember an old lock.
+ resetLastPing();
bool gotLock = false;
- BSONObj now;
+ BSONObj currLock;
- BSONObj lockDetails = BSON( "state" << 1 << "who" << getDistLockId() << "process" << getDistLockProcess() <<
- "when" << DATENOW << "why" << why << "ts" << ts );
+ BSONObj lockDetails = BSON( "state" << 1 << "who" << getDistLockId() << "process" << _processId <<
+ "when" << jsTime() << "why" << why << "ts" << OID::gen() );
BSONObj whatIWant = BSON( "$set" << lockDetails );
+
+ BSONObj query = queryBuilder.obj();
+
+ string lockName = _name + string("/") + _processId;
+
try {
- log(4) << "dist_lock about to aquire lock: " << lockDetails << endl;
- conn->update( _ns , queryBuilder.obj() , whatIWant );
+ // Main codepath to acquire lock
+
+ log( logLvl ) << "about to acquire distributed lock '" << lockName << ":\n"
+ << lockDetails.jsonString(Strict, true) << "\n"
+ << query.jsonString(Strict, true) << endl;
+
+ conn->update( locksNS , query , whatIWant );
+
+ BSONObj err = conn->getLastErrorDetailed();
+ string errMsg = DBClientWithCommands::getLastErrorString(err);
- BSONObj o = conn->getLastErrorDetailed();
- now = conn->findOne( _ns , _id );
+ currLock = conn->findOne( locksNS , _id );
- if ( o["n"].numberInt() == 0 ) {
- *other = now;
+ if ( !errMsg.empty() || !err["n"].type() || err["n"].numberInt() < 1 ) {
+ ( errMsg.empty() ? log( logLvl - 1 ) : warning() ) << "could not acquire lock '" << lockName << "' "
+ << ( !errMsg.empty() ? causedBy( errMsg ) : string("(another update won)") ) << endl;
+ *other = currLock;
other->getOwned();
- log() << "dist_lock error trying to aquire lock: " << lockDetails << " error: " << o << endl;
gotLock = false;
}
else {
@@ -277,63 +713,234 @@ namespace mongo {
}
catch ( UpdateNotTheSame& up ) {
+
// this means our update got through on some, but not others
- log(4) << "dist_lock lock did not propagate properly" << endl;
+ warning() << "distributed lock '" << lockName << " did not propagate properly." << causedBy( up ) << endl;
+
+ // Overall protection derives from:
+ // All unlocking updates use the ts value when setting state to 0
+ // This ensures that during locking, we can override all smaller ts locks with
+ // our own safe ts value and not be unlocked afterward.
+ for ( unsigned i = 0; i < up.size(); i++ ) {
+
+ ScopedDbConnection indDB( up[i].first );
+ BSONObj indUpdate;
+
+ try {
+
+ indUpdate = indDB->findOne( locksNS , _id );
+
+ // If we override this lock in any way, grab and protect it.
+ // We assume/ensure that if a process does not have all lock documents, it is no longer
+ // holding the lock.
+ // Note - finalized locks may compete too, but we know they've won already if competing
+ // in this round. Cleanup of crashes during finalizing may take a few tries.
+ if( indUpdate["ts"] < lockDetails["ts"] || indUpdate["state"].numberInt() == 0 ) {
+
+ BSONObj grabQuery = BSON( "_id" << _id["_id"].String() << "ts" << indUpdate["ts"].OID() );
+
+ // Change ts so we won't be forced, state so we won't be relocked
+ BSONObj grabChanges = BSON( "ts" << lockDetails["ts"].OID() << "state" << 1 );
+
+ // Either our update will succeed, and we'll grab the lock, or it will fail b/c some other
+ // process grabbed the lock (which will change the ts), but the lock will be set until forcing
+ indDB->update( locksNS, grabQuery, BSON( "$set" << grabChanges ) );
+
+ indUpdate = indDB->findOne( locksNS, _id );
+
+ // Our lock should now be set until forcing.
+ assert( indUpdate["state"].numberInt() == 1 );
+
+ }
+ // else our lock is the same, in which case we're safe, or it's a bigger lock,
+ // in which case we won't need to protect anything since we won't have the lock.
+
+ }
+ catch( std::exception& e ) {
+ conn.done();
+ throw LockException( str::stream() << "distributed lock " << lockName
+ << " had errors communicating with individual server "
+ << up[1].first << causedBy( e ), 13661 );
+ }
- for ( unsigned i=0; i<up.size(); i++ ) {
- ScopedDbConnection temp( up[i].first );
- BSONObj temp2 = temp->findOne( _ns , _id );
+ assert( !indUpdate.isEmpty() );
- if ( now.isEmpty() || now["ts"] < temp2["ts"] ) {
- now = temp2.getOwned();
+ // Find max TS value
+ if ( currLock.isEmpty() || currLock["ts"] < indUpdate["ts"] ) {
+ currLock = indUpdate.getOwned();
}
- temp.done();
+ indDB.done();
+
}
- if ( now["ts"].OID() == ts ) {
- log(4) << "dist_lock completed lock propagation" << endl;
+ // Locks on all servers are now set and safe until forcing
+
+ if ( currLock["ts"] == lockDetails["ts"] ) {
+ log( logLvl - 1 ) << "lock update won, completing lock propagation for '" << lockName << "'" << endl;
gotLock = true;
- conn->update( _ns , _id , whatIWant );
}
else {
- log() << "dist_lock error trying to complete propagation" << endl;
+ log( logLvl - 1 ) << "lock update lost, lock '" << lockName << "' not propagated." << endl;
+
+ // Register the lock for deletion, to speed up failover
+ // Not strictly necessary, but helpful
+ distLockPinger.addUnlockOID( lockDetails["ts"].OID() );
+
gotLock = false;
}
}
+ catch( std::exception& e ) {
+ conn.done();
+ throw LockException( str::stream() << "exception creating distributed lock "
+ << lockName << causedBy( e ), 13663 );
+ }
- conn.done();
+ // Complete lock propagation
+ if( gotLock ) {
+
+ // This is now safe, since we know that no new locks will be placed on top of the ones we've checked for at
+ // least 15 minutes. Sets the state = 2, so that future clients can determine that the lock is truly set.
+ // The invariant for rollbacks is that we will never force locks with state = 2 and active pings, since that
+ // indicates the lock is active, but this means the process creating/destroying them must explicitly poll
+ // when something goes wrong.
+ try {
+
+ BSONObjBuilder finalLockDetails;
+ BSONObjIterator bi( lockDetails );
+ while( bi.more() ) {
+ BSONElement el = bi.next();
+ if( (string) ( el.fieldName() ) == "state" )
+ finalLockDetails.append( "state", 2 );
+ else finalLockDetails.append( el );
+ }
+
+ conn->update( locksNS , _id , BSON( "$set" << finalLockDetails.obj() ) );
+
+ BSONObj err = conn->getLastErrorDetailed();
+ string errMsg = DBClientWithCommands::getLastErrorString(err);
+
+ currLock = conn->findOne( locksNS , _id );
- log(2) << "dist_lock lock gotLock: " << gotLock << " now: " << now << endl;
+ if ( !errMsg.empty() || !err["n"].type() || err["n"].numberInt() < 1 ) {
+ warning() << "could not finalize winning lock " << lockName
+ << ( !errMsg.empty() ? causedBy( errMsg ) : " (did not update lock) " ) << endl;
+ gotLock = false;
+ }
+ else {
+ // SUCCESS!
+ gotLock = true;
+ }
+
+ }
+ catch( std::exception& e ) {
+ conn.done();
+
+ // Register the bad final lock for deletion, in case it exists
+ distLockPinger.addUnlockOID( lockDetails["ts"].OID() );
+
+ throw LockException( str::stream() << "exception finalizing winning lock"
+ << causedBy( e ), 13662 );
+ }
+
+ }
+
+ *other = currLock;
+ other->getOwned();
+
+ // Log our lock results
+ if(gotLock)
+ log( logLvl - 1 ) << "distributed lock '" << lockName << "' acquired, ts : " << currLock["ts"].OID() << endl;
+ else
+ log( logLvl - 1 ) << "distributed lock '" << lockName << "' was not acquired." << endl;
+
+ conn.done();
return gotLock;
}
- void DistributedLock::unlock() {
+ // Unlock now takes an optional pointer to the lock, so you can be specific about which
+ // particular lock you want to unlock. This is required when the config server is down,
+ // and so cannot tell you what lock ts you should try later.
+ void DistributedLock::unlock( BSONObj* oldLockPtr ) {
+
+ assert( _name != "" );
+
+ string lockName = _name + string("/") + _processId;
+
const int maxAttempts = 3;
int attempted = 0;
+
+ BSONObj oldLock;
+ if( oldLockPtr ) oldLock = *oldLockPtr;
+
while ( ++attempted <= maxAttempts ) {
+ ScopedDbConnection conn( _conn );
+
try {
- ScopedDbConnection conn( _conn );
- conn->update( _ns , _id, BSON( "$set" << BSON( "state" << 0 ) ) );
- log(2) << "dist_lock unlock: " << conn->findOne( _ns , _id ) << endl;
- conn.done();
- return;
+ if( oldLock.isEmpty() )
+ oldLock = conn->findOne( locksNS, _id );
+
+ if( oldLock["state"].eoo() || oldLock["state"].numberInt() != 2 || oldLock["ts"].eoo() ) {
+ warning() << "cannot unlock invalid distributed lock " << oldLock << endl;
+ conn.done();
+ break;
+ }
+ // Use ts when updating lock, so that new locks can be sure they won't get trampled.
+ conn->update( locksNS ,
+ BSON( "_id" << _id["_id"].String() << "ts" << oldLock["ts"].OID() ),
+ BSON( "$set" << BSON( "state" << 0 ) ) );
+ // Check that the lock was actually unlocked... if not, try again
+ BSONObj err = conn->getLastErrorDetailed();
+ string errMsg = DBClientWithCommands::getLastErrorString(err);
+
+ if ( !errMsg.empty() || !err["n"].type() || err["n"].numberInt() < 1 ){
+ warning() << "distributed lock unlock update failed, retrying "
+ << ( errMsg.empty() ? causedBy( "( update not registered )" ) : causedBy( errMsg ) ) << endl;
+ conn.done();
+ continue;
+ }
+
+ log( logLvl - 1 ) << "distributed lock '" << lockName << "' unlocked. " << endl;
+ conn.done();
+ return;
+ }
+ catch( UpdateNotTheSame& ) {
+ log( logLvl - 1 ) << "distributed lock '" << lockName << "' unlocked (messily). " << endl;
+ conn.done();
+ break;
}
catch ( std::exception& e) {
- log( LL_WARNING ) << "dist_lock " << _name << " failed to contact config server in unlock attempt "
- << attempted << ": " << e.what() << endl;
+ warning() << "distributed lock '" << lockName << "' failed unlock attempt."
+ << causedBy( e ) << endl;
- sleepsecs(1 << attempted);
+ conn.done();
+ // TODO: If our lock timeout is small, sleeping this long may be unsafe.
+ if( attempted != maxAttempts) sleepsecs(1 << attempted);
}
}
- log( LL_WARNING ) << "dist_lock couldn't consumate unlock request. " << "Lock " << _name
- << " will be taken over after " << _takeoverMinutes << " minutes timeout" << endl;
+ if( attempted > maxAttempts && ! oldLock.isEmpty() && ! oldLock["ts"].eoo() ) {
+
+ log( logLvl - 1 ) << "could not unlock distributed lock with ts " << oldLock["ts"].OID()
+ << ", will attempt again later" << endl;
+
+ // We couldn't unlock the lock at all, so try again later in the pinging thread...
+ distLockPinger.addUnlockOID( oldLock["ts"].OID() );
+ }
+ else if( attempted > maxAttempts ) {
+ warning() << "could not unlock untracked distributed lock, a manual force may be required" << endl;
+ }
+
+ warning() << "distributed lock '" << lockName << "' couldn't consummate unlock request. "
+ << "lock may be taken over after " << ( _lockTimeout / (60 * 1000) )
+ << " minutes timeout." << endl;
}
+
+
}
diff --git a/client/distlock.h b/client/distlock.h
index 753a241..8985672 100644
--- a/client/distlock.h
+++ b/client/distlock.h
@@ -23,9 +23,42 @@
#include "redef_macros.h"
#include "syncclusterconnection.h"
+#define LOCK_TIMEOUT (15 * 60 * 1000)
+#define LOCK_SKEW_FACTOR (30)
+#define LOCK_PING (LOCK_TIMEOUT / LOCK_SKEW_FACTOR)
+#define MAX_LOCK_NET_SKEW (LOCK_TIMEOUT / LOCK_SKEW_FACTOR)
+#define MAX_LOCK_CLOCK_SKEW (LOCK_TIMEOUT / LOCK_SKEW_FACTOR)
+#define NUM_LOCK_SKEW_CHECKS (3)
+
+// The maximum clock skew we need to handle between config servers is
+// 2 * MAX_LOCK_NET_SKEW + MAX_LOCK_CLOCK_SKEW.
+
+// Net effect of *this* clock being slow is effectively a multiplier on the max net skew
+// and a linear increase or decrease of the max clock skew.
+
namespace mongo {
/**
+ * Exception class to encapsulate exceptions while managing distributed locks
+ */
+ class LockException : public DBException {
+ public:
+ LockException( const char * msg , int code ) : DBException( msg, code ) {}
+ LockException( const string& msg, int code ) : DBException( msg, code ) {}
+ virtual ~LockException() throw() { }
+ };
+
+ /**
+ * Indicates an error in retrieving time values from remote servers.
+ */
+ class TimeNotFoundException : public LockException {
+ public:
+ TimeNotFoundException( const char * msg , int code ) : LockException( msg, code ) {}
+ TimeNotFoundException( const string& msg, int code ) : LockException( msg, code ) {}
+ virtual ~TimeNotFoundException() throw() { }
+ };
+
+ /**
* The distributed lock is a configdb backed way of synchronizing system-wide tasks. A task must be identified by a
* unique name across the system (e.g., "balancer"). A lock is taken by writing a document in the configdb's locks
* collection with that name.
@@ -36,53 +69,155 @@ namespace mongo {
class DistributedLock {
public:
+ static LabeledLevel logLvl;
+
/**
* The constructor does not connect to the configdb yet and constructing does not mean the lock was acquired.
* Construction does trigger a lock "pinging" mechanism, though.
*
* @param conn address of config(s) server(s)
* @param name identifier for the lock
- * @param takeoverMinutes how long can the log go "unpinged" before a new attempt to lock steals it (in minutes)
+ * @param lockTimeout how long can the log go "unpinged" before a new attempt to lock steals it (in minutes).
+ * @param lockPing how long to wait between lock pings
+ * @param legacy use legacy logic
+ *
*/
- DistributedLock( const ConnectionString& conn , const string& name , unsigned takeoverMinutes = 15 );
+ DistributedLock( const ConnectionString& conn , const string& name , unsigned long long lockTimeout = 0, bool asProcess = false );
+ ~DistributedLock(){};
/**
- * Attempts to aquire 'this' lock, checking if it could or should be stolen from the previous holder. Please
+ * Attempts to acquire 'this' lock, checking if it could or should be stolen from the previous holder. Please
* consider using the dist_lock_try construct to acquire this lock in an exception safe way.
*
* @param why human readable description of why the lock is being taken (used to log)
- * @param other configdb's lock document that is currently holding the lock, if lock is taken
+ * @param whether this is a lock re-entry or a new lock
+ * @param other configdb's lock document that is currently holding the lock, if lock is taken, or our own lock
+ * details if not
* @return true if it managed to grab the lock
*/
- bool lock_try( string why , BSONObj * other = 0 );
+ bool lock_try( const string& why , bool reenter = false, BSONObj * other = 0 );
/**
* Releases a previously taken lock.
*/
- void unlock();
+ void unlock( BSONObj* oldLockPtr = NULL );
+
+ Date_t getRemoteTime();
+
+ bool isRemoteTimeSkewed();
+
+ const string& getProcessId();
+
+ const ConnectionString& getRemoteConnection();
+
+ /**
+ * Check the skew between a cluster of servers
+ */
+ static bool checkSkew( const ConnectionString& cluster, unsigned skewChecks = NUM_LOCK_SKEW_CHECKS, unsigned long long maxClockSkew = MAX_LOCK_CLOCK_SKEW, unsigned long long maxNetSkew = MAX_LOCK_NET_SKEW );
+
+ /**
+ * Get the remote time from a server or cluster
+ */
+ static Date_t remoteTime( const ConnectionString& cluster, unsigned long long maxNetSkew = MAX_LOCK_NET_SKEW );
+
+ static bool killPinger( DistributedLock& lock );
+
+ /**
+ * Namespace for lock pings
+ */
+ static const string lockPingNS;
+
+ /**
+ * Namespace for locks
+ */
+ static const string locksNS;
+
+ const ConnectionString _conn;
+ const string _name;
+ const BSONObj _id;
+ const string _processId;
+
+ // Timeout for lock, usually LOCK_TIMEOUT
+ const unsigned long long _lockTimeout;
+ const unsigned long long _maxClockSkew;
+ const unsigned long long _maxNetSkew;
+ const unsigned long long _lockPing;
private:
- ConnectionString _conn;
- string _name;
- unsigned _takeoverMinutes;
- string _ns;
- BSONObj _id;
+ void resetLastPing(){
+ scoped_lock lk( _mutex );
+ _lastPingCheck = boost::tuple<string, Date_t, Date_t, OID>();
+ }
+
+ mongo::mutex _mutex;
+
+ // Data from last check of process with ping time
+ boost::tuple<string, Date_t, Date_t, OID> _lastPingCheck;
+ // May or may not exist, depending on startup
+ string _threadId;
+
};
class dist_lock_try {
public:
+
+ dist_lock_try() : _lock(NULL), _got(false) {}
+
+ dist_lock_try( const dist_lock_try& that ) : _lock(that._lock), _got(that._got), _other(that._other) {
+ _other.getOwned();
+
+ // Make sure the lock ownership passes to this object,
+ // so we only unlock once.
+ ((dist_lock_try&) that)._got = false;
+ ((dist_lock_try&) that)._lock = NULL;
+ ((dist_lock_try&) that)._other = BSONObj();
+ }
+
+ // Needed so we can handle lock exceptions in context of lock try.
+ dist_lock_try& operator=( const dist_lock_try& that ){
+
+ if( this == &that ) return *this;
+
+ _lock = that._lock;
+ _got = that._got;
+ _other = that._other;
+ _other.getOwned();
+ _why = that._why;
+
+ // Make sure the lock ownership passes to this object,
+ // so we only unlock once.
+ ((dist_lock_try&) that)._got = false;
+ ((dist_lock_try&) that)._lock = NULL;
+ ((dist_lock_try&) that)._other = BSONObj();
+
+ return *this;
+ }
+
dist_lock_try( DistributedLock * lock , string why )
- : _lock(lock) {
- _got = _lock->lock_try( why , &_other );
+ : _lock(lock), _why(why) {
+ _got = _lock->lock_try( why , false , &_other );
}
~dist_lock_try() {
if ( _got ) {
- _lock->unlock();
+ assert( ! _other.isEmpty() );
+ _lock->unlock( &_other );
}
}
+ bool reestablish(){
+ return retry();
+ }
+
+ bool retry() {
+ assert( _lock );
+ assert( _got );
+ assert( ! _other.isEmpty() );
+
+ return _got = _lock->lock_try( _why , true, &_other );
+ }
+
bool got() const { return _got; }
BSONObj other() const { return _other; }
@@ -90,6 +225,7 @@ namespace mongo {
DistributedLock * _lock;
bool _got;
BSONObj _other;
+ string _why;
};
}
diff --git a/client/distlock_test.cpp b/client/distlock_test.cpp
index 83d143f..42a1c48 100644
--- a/client/distlock_test.cpp
+++ b/client/distlock_test.cpp
@@ -15,85 +15,123 @@
* limitations under the License.
*/
+#include <iostream>
#include "../pch.h"
#include "dbclient.h"
#include "distlock.h"
#include "../db/commands.h"
+#include "../util/bson_util.h"
+
+// Modify some config options for the RNG, since they cause MSVC to fail
+#include <boost/config.hpp>
+
+#if defined(BOOST_MSVC) && defined(BOOST_NO_MEMBER_TEMPLATE_FRIENDS)
+#undef BOOST_NO_MEMBER_TEMPLATE_FRIENDS
+#define BOOST_RNG_HACK
+#endif
+
+// Well, sort-of cross-platform RNG
+#include <boost/random/mersenne_twister.hpp>
+
+#ifdef BOOST_RNG_HACK
+#define BOOST_NO_MEMBER_TEMPLATE_FRIENDS
+#undef BOOST_RNG_HACK
+#endif
+
+
+#include <boost/random/uniform_int.hpp>
+#include <boost/random/variate_generator.hpp>
+
+
+// TODO: Make a method in BSONObj if useful, don't modify for now
+#define string_field(obj, name, def) ( obj.hasField(name) ? obj[name].String() : def )
+#define number_field(obj, name, def) ( obj.hasField(name) ? obj[name].Number() : def )
namespace mongo {
- class TestDistLockWithSync : public Command {
+ class TestDistLockWithSync: public Command {
public:
- TestDistLockWithSync() : Command( "_testDistLockWithSyncCluster" ) {}
- virtual void help( stringstream& help ) const {
+ TestDistLockWithSync() :
+ Command("_testDistLockWithSyncCluster") {
+ }
+ virtual void help(stringstream& help) const {
help << "should not be calling this directly" << endl;
}
- virtual bool slaveOk() const { return false; }
- virtual bool adminOnly() const { return true; }
- virtual LockType locktype() const { return NONE; }
+ virtual bool slaveOk() const {
+ return false;
+ }
+ virtual bool adminOnly() const {
+ return true;
+ }
+ virtual LockType locktype() const {
+ return NONE;
+ }
static void runThread() {
- while ( keepGoing ) {
- if ( current->lock_try( "test" ) ) {
+ while (keepGoing) {
+ if (current->lock_try( "test" )) {
count++;
int before = count;
- sleepmillis( 3 );
+ sleepmillis(3);
int after = count;
-
- if ( after != before ) {
- error() << " before: " << before << " after: " << after << endl;
+
+ if (after != before) {
+ error() << " before: " << before << " after: " << after
+ << endl;
}
-
+
current->unlock();
}
}
}
-
- bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool) {
+
+ bool run(const string&, BSONObj& cmdObj, int, string& errmsg,
+ BSONObjBuilder& result, bool) {
Timer t;
- DistributedLock lk( ConnectionString( cmdObj["host"].String() , ConnectionString::SYNC ), "testdistlockwithsync" );
+ DistributedLock lk(ConnectionString(cmdObj["host"].String(),
+ ConnectionString::SYNC), "testdistlockwithsync", 0, 0);
current = &lk;
count = 0;
gotit = 0;
errors = 0;
keepGoing = true;
-
+
vector<shared_ptr<boost::thread> > l;
- for ( int i=0; i<4; i++ ) {
- l.push_back( shared_ptr<boost::thread>( new boost::thread( runThread ) ) );
+ for (int i = 0; i < 4; i++) {
+ l.push_back(
+ shared_ptr<boost::thread> (new boost::thread(runThread)));
}
-
+
int secs = 10;
- if ( cmdObj["secs"].isNumber() )
+ if (cmdObj["secs"].isNumber())
secs = cmdObj["secs"].numberInt();
- sleepsecs( secs );
+ sleepsecs(secs);
keepGoing = false;
- for ( unsigned i=0; i<l.size(); i++ )
+ for (unsigned i = 0; i < l.size(); i++)
l[i]->join();
current = 0;
- result.append( "count" , count );
- result.append( "gotit" , gotit );
- result.append( "errors" , errors );
- result.append( "timeMS" , t.millis() );
+ result.append("count", count);
+ result.append("gotit", gotit);
+ result.append("errors", errors);
+ result.append("timeMS", t.millis());
return errors == 0;
}
-
+
// variables for test
static DistributedLock * current;
static int gotit;
static int errors;
static AtomicUInt count;
-
+
static bool keepGoing;
} testDistLockWithSyncCmd;
-
DistributedLock * TestDistLockWithSync::current;
AtomicUInt TestDistLockWithSync::count;
int TestDistLockWithSync::gotit;
@@ -101,4 +139,300 @@ namespace mongo {
bool TestDistLockWithSync::keepGoing;
+
+ class TestDistLockWithSkew: public Command {
+ public:
+
+ static const int logLvl = 1;
+
+ TestDistLockWithSkew() :
+ Command("_testDistLockWithSkew") {
+ }
+ virtual void help(stringstream& help) const {
+ help << "should not be calling this directly" << endl;
+ }
+
+ virtual bool slaveOk() const {
+ return false;
+ }
+ virtual bool adminOnly() const {
+ return true;
+ }
+ virtual LockType locktype() const {
+ return NONE;
+ }
+
+ void runThread(ConnectionString& hostConn, unsigned threadId, unsigned seed,
+ BSONObj& cmdObj, BSONObjBuilder& result) {
+
+ stringstream ss;
+ ss << "thread-" << threadId;
+ setThreadName(ss.str().c_str());
+
+ // Lock name
+ string lockName = string_field(cmdObj, "lockName", this->name + "_lock");
+
+ // Range of clock skew in diff threads
+ int skewRange = (int) number_field(cmdObj, "skewRange", 1);
+
+ // How long to wait with the lock
+ int threadWait = (int) number_field(cmdObj, "threadWait", 30);
+ if(threadWait <= 0) threadWait = 1;
+
+ // Max amount of time (ms) a thread waits before checking the lock again
+ int threadSleep = (int) number_field(cmdObj, "threadSleep", 30);
+ if(threadSleep <= 0) threadSleep = 1;
+
+ // How long until the lock is forced in ms, only compared locally
+ unsigned long long takeoverMS = (unsigned long long) number_field(cmdObj, "takeoverMS", 0);
+
+ // Whether or not we should hang some threads
+ int hangThreads = (int) number_field(cmdObj, "hangThreads", 0);
+
+
+ boost::mt19937 gen((boost::mt19937::result_type) seed);
+
+ boost::variate_generator<boost::mt19937&, boost::uniform_int<> > randomSkew(gen, boost::uniform_int<>(0, skewRange));
+ boost::variate_generator<boost::mt19937&, boost::uniform_int<> > randomWait(gen, boost::uniform_int<>(1, threadWait));
+ boost::variate_generator<boost::mt19937&, boost::uniform_int<> > randomSleep(gen, boost::uniform_int<>(1, threadSleep));
+
+
+ int skew = 0;
+ if (!lock.get()) {
+
+ // Pick a skew, but the first two threads skew the whole range
+ if(threadId == 0)
+ skew = -skewRange / 2;
+ else if(threadId == 1)
+ skew = skewRange / 2;
+ else skew = randomSkew() - (skewRange / 2);
+
+ // Skew this thread
+ jsTimeVirtualThreadSkew( skew );
+
+ log() << "Initializing lock with skew of " << skew << " for thread " << threadId << endl;
+
+ lock.reset(new DistributedLock(hostConn, lockName, takeoverMS, true ));
+
+ log() << "Skewed time " << jsTime() << " for thread " << threadId << endl
+ << " max wait (with lock: " << threadWait << ", after lock: " << threadSleep << ")" << endl
+ << " takeover in " << takeoverMS << "(ms remote)" << endl;
+
+ }
+
+ DistributedLock* myLock = lock.get();
+
+ bool errors = false;
+ BSONObj lockObj;
+ while (keepGoing) {
+ try {
+
+ if (myLock->lock_try("Testing distributed lock with skew.", false, &lockObj )) {
+
+ log() << "**** Locked for thread " << threadId << " with ts " << lockObj["ts"] << endl;
+
+ if( count % 2 == 1 && ! myLock->lock_try( "Testing lock re-entry.", true ) ) {
+ errors = true;
+ log() << "**** !Could not re-enter lock already held" << endl;
+ break;
+ }
+
+ if( count % 3 == 1 && myLock->lock_try( "Testing lock non-re-entry.", false ) ) {
+ errors = true;
+ log() << "**** !Invalid lock re-entry" << endl;
+ break;
+ }
+
+ count++;
+ int before = count;
+ int sleep = randomWait();
+ sleepmillis(sleep);
+ int after = count;
+
+ if(after != before) {
+ errors = true;
+ log() << "**** !Bad increment while sleeping with lock for: " << sleep << "ms" << endl;
+ break;
+ }
+
+ // Unlock only half the time...
+ if(hangThreads == 0 || threadId % hangThreads != 0) {
+ log() << "**** Unlocking for thread " << threadId << " with ts " << lockObj["ts"] << endl;
+ myLock->unlock( &lockObj );
+ }
+ else {
+ log() << "**** Not unlocking for thread " << threadId << endl;
+ DistributedLock::killPinger( *myLock );
+ // We're simulating a crashed process...
+ break;
+ }
+ }
+
+ }
+ catch( LockException& e ) {
+ log() << "*** !Could not try distributed lock." << causedBy( e ) << endl;
+ break;
+ }
+
+ sleepmillis(randomSleep());
+ }
+
+ result << "errors" << errors
+ << "skew" << skew
+ << "takeover" << (long long) takeoverMS
+ << "localTimeout" << (takeoverMS > 0);
+
+ }
+
+ void test(ConnectionString& hostConn, string& lockName, unsigned seed) {
+ return;
+ }
+
+ bool run(const string&, BSONObj& cmdObj, int, string& errmsg,
+ BSONObjBuilder& result, bool) {
+
+ Timer t;
+
+ ConnectionString hostConn(cmdObj["host"].String(),
+ ConnectionString::SYNC);
+
+ unsigned seed = (unsigned) number_field(cmdObj, "seed", 0);
+ int numThreads = (int) number_field(cmdObj, "numThreads", 4);
+ int wait = (int) number_field(cmdObj, "wait", 10000);
+
+ log() << "Starting " << this->name << " with -" << endl
+ << " seed: " << seed << endl
+ << " numThreads: " << numThreads << endl
+ << " total wait: " << wait << endl << endl;
+
+ // Skew host clocks if needed
+ try {
+ skewClocks( hostConn, cmdObj );
+ }
+ catch( DBException e ) {
+ errmsg = str::stream() << "Clocks could not be skewed." << causedBy( e );
+ return false;
+ }
+
+ count = 0;
+ keepGoing = true;
+
+ vector<shared_ptr<boost::thread> > threads;
+ vector<shared_ptr<BSONObjBuilder> > results;
+ for (int i = 0; i < numThreads; i++) {
+ results.push_back(shared_ptr<BSONObjBuilder> (new BSONObjBuilder()));
+ threads.push_back(shared_ptr<boost::thread> (new boost::thread(
+ boost::bind(&TestDistLockWithSkew::runThread, this,
+ hostConn, (unsigned) i, seed + i, boost::ref(cmdObj),
+ boost::ref(*(results[i].get()))))));
+ }
+
+ sleepsecs(wait / 1000);
+ keepGoing = false;
+
+ bool errors = false;
+ for (unsigned i = 0; i < threads.size(); i++) {
+ threads[i]->join();
+ errors = errors || results[i].get()->obj()["errors"].Bool();
+ }
+
+ result.append("count", count);
+ result.append("errors", errors);
+ result.append("timeMS", t.millis());
+
+ return !errors;
+
+ }
+
+ /**
+ * Skews the clocks of a remote cluster by a particular amount, specified by
+ * the "skewHosts" element in a BSONObj.
+ */
+ static void skewClocks( ConnectionString& cluster, BSONObj& cmdObj ) {
+
+ vector<long long> skew;
+ if(cmdObj.hasField("skewHosts")) {
+ bsonArrToNumVector<long long>(cmdObj["skewHosts"], skew);
+ }
+ else {
+ log( logLvl ) << "No host clocks to skew." << endl;
+ return;
+ }
+
+ log( logLvl ) << "Skewing clocks of hosts " << cluster << endl;
+
+ unsigned s = 0;
+ for(vector<long long>::iterator i = skew.begin(); i != skew.end(); ++i,s++) {
+
+ ConnectionString server( cluster.getServers()[s] );
+ ScopedDbConnection conn( server );
+
+ BSONObj result;
+ try {
+ bool success = conn->runCommand( string("admin"), BSON( "_skewClockCommand" << 1 << "skew" << *i ), result );
+
+ uassert(13678, str::stream() << "Could not communicate with server " << server.toString() << " in cluster " << cluster.toString() << " to change skew by " << *i, success );
+
+ log( logLvl + 1 ) << " Skewed host " << server << " clock by " << *i << endl;
+ }
+ catch(...) {
+ conn.done();
+ throw;
+ }
+
+ conn.done();
+
+ }
+
+ }
+
+ // variables for test
+ thread_specific_ptr<DistributedLock> lock;
+ AtomicUInt count;
+ bool keepGoing;
+
+ } testDistLockWithSkewCmd;
+
+
+ /**
+ * Utility command to virtually skew the clock of a mongo server a particular amount.
+ * This skews the clock globally, per-thread skew is also possible.
+ */
+ class SkewClockCommand: public Command {
+ public:
+ SkewClockCommand() :
+ Command("_skewClockCommand") {
+ }
+ virtual void help(stringstream& help) const {
+ help << "should not be calling this directly" << endl;
+ }
+
+ virtual bool slaveOk() const {
+ return false;
+ }
+ virtual bool adminOnly() const {
+ return true;
+ }
+ virtual LockType locktype() const {
+ return NONE;
+ }
+
+ bool run(const string&, BSONObj& cmdObj, int, string& errmsg,
+ BSONObjBuilder& result, bool) {
+
+ long long skew = (long long) number_field(cmdObj, "skew", 0);
+
+ log() << "Adjusting jsTime() clock skew to " << skew << endl;
+
+ jsTimeVirtualSkew( skew );
+
+ log() << "JSTime adjusted, now is " << jsTime() << endl;
+
+ return true;
+
+ }
+
+ } testSkewClockCommand;
+
}
+
diff --git a/client/examples/clientTest.cpp b/client/examples/clientTest.cpp
index 96c014e..aaea6bd 100644
--- a/client/examples/clientTest.cpp
+++ b/client/examples/clientTest.cpp
@@ -246,5 +246,34 @@ int main( int argc, const char **argv ) {
//MONGO_PRINT(out);
}
+ {
+ // test timeouts
+
+ DBClientConnection conn( true , 0 , 2 );
+ if ( ! conn.connect( string( "127.0.0.1:" ) + port , errmsg ) ) {
+ cout << "couldn't connect : " << errmsg << endl;
+ throw -11;
+ }
+ conn.insert( "test.totest" , BSON( "x" << 1 ) );
+ BSONObj res;
+
+ bool gotError = false;
+ assert( conn.eval( "test" , "return db.totest.findOne().x" , res ) );
+ try {
+ conn.eval( "test" , "sleep(5000); return db.totest.findOne().x" , res );
+ }
+ catch ( std::exception& e ) {
+ gotError = true;
+ log() << e.what() << endl;
+ }
+ assert( gotError );
+ // sleep so the server isn't locked anymore
+ sleepsecs( 4 );
+
+ assert( conn.eval( "test" , "return db.totest.findOne().x" , res ) );
+
+
+ }
+
cout << "client test finished!" << endl;
}
diff --git a/client/examples/httpClientTest.cpp b/client/examples/httpClientTest.cpp
index 4fa5fd8..4055d44 100644
--- a/client/examples/httpClientTest.cpp
+++ b/client/examples/httpClientTest.cpp
@@ -18,10 +18,27 @@
#include <iostream>
#include "client/dbclient.h"
-#include "util/httpclient.h"
+#include "util/net/httpclient.h"
using namespace mongo;
+void play( string url ) {
+ cout << "[" << url << "]" << endl;
+
+ HttpClient c;
+ HttpClient::Result r;
+ MONGO_assert( c.get( url , &r ) == 200 );
+
+ HttpClient::Headers h = r.getHeaders();
+ MONGO_assert( h["Content-Type"].find( "text/html" ) == 0 );
+
+ cout << "\tHeaders" << endl;
+ for ( HttpClient::Headers::iterator i = h.begin() ; i != h.end(); ++i ) {
+ cout << "\t\t" << i->first << "\t" << i->second << endl;
+ }
+
+}
+
int main( int argc, const char **argv ) {
int port = 27017;
@@ -32,12 +49,10 @@ int main( int argc, const char **argv ) {
}
port += 1000;
- stringstream ss;
- ss << "http://localhost:" << port << "/";
- string url = ss.str();
-
- cout << "[" << url << "]" << endl;
-
- HttpClient c;
- MONGO_assert( c.get( url ) == 200 );
+ play( str::stream() << "http://localhost:" << port << "/" );
+
+#ifdef MONGO_SSL
+ play( "https://www.10gen.com/" );
+#endif
+
}
diff --git a/client/examples/insert_demo.cpp b/client/examples/insert_demo.cpp
new file mode 100644
index 0000000..14ac79e
--- /dev/null
+++ b/client/examples/insert_demo.cpp
@@ -0,0 +1,47 @@
+/*
+ C++ client program which inserts documents in a MongoDB database.
+
+ How to build and run:
+
+ Using mongo_client_lib.cpp:
+ g++ -I .. -I ../.. insert_demo.cpp ../mongo_client_lib.cpp -lboost_thread-mt -lboost_filesystem
+ ./a.out
+*/
+
+#include <iostream>
+#include "dbclient.h" // the mongo c++ driver
+
+using namespace std;
+using namespace mongo;
+using namespace bson;
+
+int main() {
+ try {
+ cout << "connecting to localhost..." << endl;
+ DBClientConnection c;
+ c.connect("localhost");
+ cout << "connected ok" << endl;
+
+ bo o = BSON( "hello" << "world" );
+
+ cout << "inserting..." << endl;
+
+ time_t start = time(0);
+ for( unsigned i = 0; i < 1000000; i++ ) {
+ c.insert("test.foo", o);
+ }
+
+ // wait until all operations applied
+ cout << "getlasterror returns: \"" << c.getLastError() << '"' << endl;
+
+ time_t done = time(0);
+ time_t dt = done-start;
+ cout << dt << " seconds " << 1000000/dt << " per second" << endl;
+ }
+ catch(DBException& e) {
+ cout << "caught DBException " << e.toString() << endl;
+ return 1;
+ }
+
+ return 0;
+}
diff --git a/client/examples/rs.cpp b/client/examples/rs.cpp
index 7813ec6..3307d87 100644
--- a/client/examples/rs.cpp
+++ b/client/examples/rs.cpp
@@ -21,11 +21,62 @@
#include "client/dbclient.h"
#include <iostream>
+#include <vector>
using namespace mongo;
using namespace std;
+void workerThread( string collName , bool print , DBClientReplicaSet * conn ) {
+
+ while ( true ) {
+ try {
+ conn->update( collName , BSONObj() , BSON( "$inc" << BSON( "x" << 1 ) ) , true );
+
+ BSONObj x = conn->findOne( collName , BSONObj() );
+
+ if ( print ) {
+ cout << x << endl;
+ }
+
+ BSONObj a = conn->slaveConn().findOne( collName , BSONObj() , 0 , QueryOption_SlaveOk );
+ BSONObj b = conn->findOne( collName , BSONObj() , 0 , QueryOption_SlaveOk );
+
+ if ( print ) {
+ cout << "\t A " << a << endl;
+ cout << "\t B " << b << endl;
+ }
+ }
+ catch ( std::exception& e ) {
+ cout << "ERROR: " << e.what() << endl;
+ }
+ sleepmillis( 10 );
+ }
+}
+
int main( int argc , const char ** argv ) {
+
+ unsigned nThreads = 1;
+ bool print = false;
+ bool testTimeout = false;
+
+ for ( int i=1; i<argc; i++ ) {
+ if ( mongoutils::str::equals( "--threads" , argv[i] ) ) {
+ nThreads = atoi( argv[++i] );
+ }
+ else if ( mongoutils::str::equals( "--print" , argv[i] ) ) {
+ print = true;
+ }
+ // Run a special mode to demonstrate the DBClientReplicaSet so_timeout option.
+ else if ( mongoutils::str::equals( "--testTimeout" , argv[i] ) ) {
+ testTimeout = true;
+ }
+ else {
+ cerr << "unknown option: " << argv[i] << endl;
+ return 1;
+ }
+
+ }
+
string errmsg;
ConnectionString cs = ConnectionString::parse( "foo/127.0.0.1" , errmsg );
if ( ! cs.isValid() ) {
@@ -33,7 +84,7 @@ int main( int argc , const char ** argv ) {
return 1;
}
- DBClientReplicaSet * conn = (DBClientReplicaSet*)cs.connect( errmsg );
+ DBClientReplicaSet * conn = dynamic_cast<DBClientReplicaSet*>(cs.connect( errmsg, testTimeout ? 10 : 0 ));
if ( ! conn ) {
cout << "error connecting: " << errmsg << endl;
return 2;
@@ -42,17 +93,26 @@ int main( int argc , const char ** argv ) {
string collName = "test.rs1";
conn->dropCollection( collName );
- while ( true ) {
+
+ if ( testTimeout ) {
+ conn->insert( collName, BSONObj() );
try {
- conn->update( collName , BSONObj() , BSON( "$inc" << BSON( "x" << 1 ) ) , true );
- cout << conn->findOne( collName , BSONObj() ) << endl;
- cout << "\t A" << conn->slaveConn().findOne( collName , BSONObj() , 0 , QueryOption_SlaveOk ) << endl;
- cout << "\t B " << conn->findOne( collName , BSONObj() , 0 , QueryOption_SlaveOk ) << endl;
- }
- catch ( std::exception& e ) {
- cout << "ERROR: " << e.what() << endl;
+ conn->count( collName, BSON( "$where" << "sleep(40000)" ) );
+ } catch( DBException& ) {
+ return 0;
}
- sleepsecs( 1 );
+ cout << "expected socket exception" << endl;
+ return 1;
+ }
+
+ vector<boost::shared_ptr<boost::thread> > threads;
+ for ( unsigned i=0; i<nThreads; i++ ) {
+ string errmsg;
+ threads.push_back( boost::shared_ptr<boost::thread>( new boost::thread( boost::bind( workerThread , collName , print , (DBClientReplicaSet*)cs.connect(errmsg) ) ) ) );
+ }
+
+ for ( unsigned i=0; i<threads.size(); i++ ) {
+ threads[i]->join();
}
}
diff --git a/client/examples/simple_client_demo.vcxproj b/client/examples/simple_client_demo.vcxproj
new file mode 100755
index 0000000..4658a42
--- /dev/null
+++ b/client/examples/simple_client_demo.vcxproj
@@ -0,0 +1,92 @@
+<?xml version="1.0" encoding="utf-8"?>
+<Project DefaultTargets="Build" ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+ <ItemGroup Label="ProjectConfigurations">
+ <ProjectConfiguration Include="Debug|Win32">
+ <Configuration>Debug</Configuration>
+ <Platform>Win32</Platform>
+ </ProjectConfiguration>
+ <ProjectConfiguration Include="Release|Win32">
+ <Configuration>Release</Configuration>
+ <Platform>Win32</Platform>
+ </ProjectConfiguration>
+ </ItemGroup>
+ <PropertyGroup Label="Globals">
+ <ProjectGuid>{89C30BC3-2874-4F2C-B4DA-EB04E9782236}</ProjectGuid>
+ <Keyword>Win32Proj</Keyword>
+ <RootNamespace>simple_client_demo</RootNamespace>
+ </PropertyGroup>
+ <Import Project="$(VCTargetsPath)\Microsoft.Cpp.Default.props" />
+ <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'" Label="Configuration">
+ <ConfigurationType>Application</ConfigurationType>
+ <UseDebugLibraries>true</UseDebugLibraries>
+ <CharacterSet>Unicode</CharacterSet>
+ </PropertyGroup>
+ <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|Win32'" Label="Configuration">
+ <ConfigurationType>Application</ConfigurationType>
+ <UseDebugLibraries>false</UseDebugLibraries>
+ <WholeProgramOptimization>true</WholeProgramOptimization>
+ <CharacterSet>Unicode</CharacterSet>
+ </PropertyGroup>
+ <Import Project="$(VCTargetsPath)\Microsoft.Cpp.props" />
+ <ImportGroup Label="ExtensionSettings">
+ </ImportGroup>
+ <ImportGroup Label="PropertySheets" Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">
+ <Import Project="$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props" Condition="exists('$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props')" Label="LocalAppDataPlatform" />
+ </ImportGroup>
+ <ImportGroup Label="PropertySheets" Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">
+ <Import Project="$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props" Condition="exists('$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props')" Label="LocalAppDataPlatform" />
+ </ImportGroup>
+ <PropertyGroup Label="UserMacros" />
+ <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">
+ <LinkIncremental>true</LinkIncremental>
+ <IncludePath>..\..;..\..\pcre-7.4;$(IncludePath)</IncludePath>
+ <LibraryPath>\boost\lib\vs2010_32;$(LibraryPath)</LibraryPath>
+ </PropertyGroup>
+ <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">
+ <LinkIncremental>false</LinkIncremental>
+ <IncludePath>..\..;..\..\pcre-7.4;$(IncludePath)</IncludePath>
+ <LibraryPath>\boost\lib\vs2010_32;$(LibraryPath)</LibraryPath>
+ </PropertyGroup>
+ <ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">
+ <ClCompile>
+ <PrecompiledHeader>
+ </PrecompiledHeader>
+ <WarningLevel>Level3</WarningLevel>
+ <Optimization>Disabled</Optimization>
+ <PreprocessorDefinitions> _CRT_SECURE_NO_WARNINGS;_DEBUG;_CONSOLE;%(PreprocessorDefinitions)</PreprocessorDefinitions>
+ <AdditionalIncludeDirectories>c:\boost;\boost</AdditionalIncludeDirectories>
+ </ClCompile>
+ <Link>
+ <SubSystem>Console</SubSystem>
+ <GenerateDebugInformation>true</GenerateDebugInformation>
+ <AdditionalDependencies>ws2_32.lib;kernel32.lib;user32.lib;gdi32.lib;winspool.lib;comdlg32.lib;advapi32.lib;shell32.lib;ole32.lib;oleaut32.lib;uuid.lib;odbc32.lib;odbccp32.lib;%(AdditionalDependencies)</AdditionalDependencies>
+ </Link>
+ </ItemDefinitionGroup>
+ <ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">
+ <ClCompile>
+ <WarningLevel>Level3</WarningLevel>
+ <PrecompiledHeader>
+ </PrecompiledHeader>
+ <Optimization>MaxSpeed</Optimization>
+ <FunctionLevelLinking>true</FunctionLevelLinking>
+ <IntrinsicFunctions>true</IntrinsicFunctions>
+ <PreprocessorDefinitions> _CRT_SECURE_NO_WARNINGS;NDEBUG;_CONSOLE;%(PreprocessorDefinitions)</PreprocessorDefinitions>
+ <RuntimeLibrary>MultiThreaded</RuntimeLibrary>
+ <AdditionalIncludeDirectories>c:\boost;\boost</AdditionalIncludeDirectories>
+ </ClCompile>
+ <Link>
+ <SubSystem>Console</SubSystem>
+ <GenerateDebugInformation>true</GenerateDebugInformation>
+ <EnableCOMDATFolding>true</EnableCOMDATFolding>
+ <OptimizeReferences>true</OptimizeReferences>
+ <AdditionalDependencies>ws2_32.lib;kernel32.lib;user32.lib;gdi32.lib;winspool.lib;comdlg32.lib;advapi32.lib;shell32.lib;ole32.lib;oleaut32.lib;uuid.lib;odbc32.lib;odbccp32.lib;%(AdditionalDependencies)</AdditionalDependencies>
+ </Link>
+ </ItemDefinitionGroup>
+ <ItemGroup>
+ <ClCompile Include="..\mongo_client_lib.cpp" />
+ <ClCompile Include="..\simple_client_demo.cpp" />
+ </ItemGroup>
+ <Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" />
+ <ImportGroup Label="ExtensionTargets">
+ </ImportGroup>
+</Project> \ No newline at end of file
diff --git a/client/examples/simple_client_demo.vcxproj.filters b/client/examples/simple_client_demo.vcxproj.filters
new file mode 100755
index 0000000..d6580c3
--- /dev/null
+++ b/client/examples/simple_client_demo.vcxproj.filters
@@ -0,0 +1,21 @@
+<?xml version="1.0" encoding="utf-8"?>
+<Project ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+ <ItemGroup>
+ <Filter Include="Source Files">
+ <UniqueIdentifier>{4FC737F1-C7A5-4376-A066-2A32D752A2FF}</UniqueIdentifier>
+ <Extensions>cpp;c;cc;cxx;def;odl;idl;hpj;bat;asm;asmx</Extensions>
+ </Filter>
+ <Filter Include="Header Files">
+ <UniqueIdentifier>{93995380-89BD-4b04-88EB-625FBE52EBFB}</UniqueIdentifier>
+ <Extensions>h;hpp;hxx;hm;inl;inc;xsd</Extensions>
+ </Filter>
+ </ItemGroup>
+ <ItemGroup>
+ <ClCompile Include="..\simple_client_demo.cpp">
+ <Filter>Source Files</Filter>
+ </ClCompile>
+ <ClCompile Include="..\mongo_client_lib.cpp">
+ <Filter>Source Files</Filter>
+ </ClCompile>
+ </ItemGroup>
+</Project> \ No newline at end of file
diff --git a/client/examples/whereExample.cpp b/client/examples/whereExample.cpp
index ce4174b..12b68d7 100644
--- a/client/examples/whereExample.cpp
+++ b/client/examples/whereExample.cpp
@@ -1,4 +1,5 @@
-// whereExample.cpp
+// @file whereExample.cpp
+// @see http://www.mongodb.org/display/DOCS/Server-side+Code+Execution
/* Copyright 2009 10gen Inc.
*
diff --git a/client/mongo_client_lib.cpp b/client/mongo_client_lib.cpp
index 69f801a..8100d71 100644
--- a/client/mongo_client_lib.cpp
+++ b/client/mongo_client_lib.cpp
@@ -4,13 +4,23 @@
Normally one includes dbclient.h, and links against libmongoclient.a, when connecting to MongoDB
from C++. However, if you have a situation where the pre-built library does not work, you can use
- this file instead to build all the necessary symbols. To do so, include client_lib.cpp in your
+ this file instead to build all the necessary symbols. To do so, include mongo_client_lib.cpp in your
project.
+ GCC
+ ---
For example, to build and run simple_client_demo.cpp with GCC and run it:
g++ -I .. simple_client_demo.cpp mongo_client_lib.cpp -lboost_thread-mt -lboost_filesystem
./a.out
+
+ Visual Studio (2010 tested)
+ ---------------------------
+ First, see client/examples/simple_client_demo.vcxproj.
+ - Be sure to include your boost include directory in your project as an Additional Include Directory.
+ - Define _CRT_SECURE_NO_WARNINGS to avoid warnings on use of strncpy and such by the MongoDB client code.
+ - Include the boost libraries directory.
+ - Linker.Input.Additional Dependencies - add ws2_32.lib for the Winsock library.
*/
/* Copyright 2009 10gen Inc.
@@ -28,23 +38,30 @@
* limitations under the License.
*/
+#if defined(_WIN32)
+// C4800 forcing value to bool 'true' or 'false' (performance warning)
+#pragma warning( disable : 4800 )
+#endif
+
#include "../util/md5main.cpp"
#define MONGO_EXPOSE_MACROS
#include "../pch.h"
#include "../util/assert_util.cpp"
-#include "../util/message.cpp"
+#include "../util/net/message.cpp"
#include "../util/util.cpp"
#include "../util/background.cpp"
#include "../util/base64.cpp"
-#include "../util/sock.cpp"
+#include "../util/net/sock.cpp"
#include "../util/log.cpp"
#include "../util/password.cpp"
+#include "../util/net/message_port.cpp"
#include "../util/concurrency/thread_pool.cpp"
#include "../util/concurrency/vars.cpp"
#include "../util/concurrency/task.cpp"
+#include "../util/concurrency/spin_lock.cpp"
#include "connpool.cpp"
#include "syncclusterconnection.cpp"
@@ -53,13 +70,19 @@
#include "gridfs.cpp"
#include "dbclientcursor.cpp"
+#include "../util/text.cpp"
+#include "dbclient_rs.cpp"
+#include "../bson/oid.cpp"
+
#include "../db/lasterror.cpp"
#include "../db/json.cpp"
#include "../db/jsobj.cpp"
-#include "../db/common.cpp"
+//#include "../db/common.cpp"
#include "../db/nonce.cpp"
#include "../db/commands.cpp"
+#include "../pch.cpp"
+
extern "C" {
#include "../util/md5.c"
}
diff --git a/client/parallel.cpp b/client/parallel.cpp
index c4905e3..76b0168 100644
--- a/client/parallel.cpp
+++ b/client/parallel.cpp
@@ -63,7 +63,20 @@ namespace mongo {
_init();
}
- auto_ptr<DBClientCursor> ClusteredCursor::query( const string& server , int num , BSONObj extra , int skipLeft ) {
+ void ClusteredCursor::_checkCursor( DBClientCursor * cursor ) {
+ assert( cursor );
+
+ if ( cursor->hasResultFlag( ResultFlag_ShardConfigStale ) ) {
+ throw StaleConfigException( _ns , "ClusteredCursor::query" );
+ }
+
+ if ( cursor->hasResultFlag( ResultFlag_ErrSet ) ) {
+ BSONObj o = cursor->next();
+ throw UserException( o["code"].numberInt() , o["$err"].String() );
+ }
+ }
+
+ auto_ptr<DBClientCursor> ClusteredCursor::query( const string& server , int num , BSONObj extra , int skipLeft , bool lazy ) {
uassert( 10017 , "cursor already done" , ! _done );
assert( _didInit );
@@ -80,12 +93,10 @@ namespace mongo {
throw StaleConfigException( _ns , "ClusteredCursor::query ShardConnection had to change" , true );
}
- if ( logLevel >= 5 ) {
- log(5) << "ClusteredCursor::query (" << type() << ") server:" << server
- << " ns:" << _ns << " query:" << q << " num:" << num
- << " _fields:" << _fields << " options: " << _options << endl;
- }
-
+ LOG(5) << "ClusteredCursor::query (" << type() << ") server:" << server
+ << " ns:" << _ns << " query:" << q << " num:" << num
+ << " _fields:" << _fields << " options: " << _options << endl;
+
auto_ptr<DBClientCursor> cursor =
conn->query( _ns , q , num , 0 , ( _fields.isEmpty() ? 0 : &_fields ) , _options , _batchSize == 0 ? 0 : _batchSize + skipLeft );
@@ -97,21 +108,9 @@ namespace mongo {
massert( 13633 , str::stream() << "error querying server: " << server , cursor.get() );
- if ( cursor->hasResultFlag( ResultFlag_ShardConfigStale ) ) {
- conn.done();
- throw StaleConfigException( _ns , "ClusteredCursor::query" );
- }
-
- if ( cursor->hasResultFlag( ResultFlag_ErrSet ) ) {
- conn.done();
- BSONObj o = cursor->next();
- throw UserException( o["code"].numberInt() , o["$err"].String() );
- }
-
-
- cursor->attach( &conn );
-
- conn.done();
+ cursor->attach( &conn ); // this calls done on conn
+ assert( ! conn.ok() );
+ _checkCursor( cursor.get() );
return cursor;
}
catch ( SocketException& e ) {
@@ -228,6 +227,11 @@ namespace mongo {
: _matcher( filter ) , _cursor( cursor ) , _done( cursor.get() == 0 ) {
}
+ FilteringClientCursor::FilteringClientCursor( DBClientCursor* cursor , const BSONObj filter )
+ : _matcher( filter ) , _cursor( cursor ) , _done( cursor == 0 ) {
+ }
+
+
FilteringClientCursor::~FilteringClientCursor() {
}
@@ -237,6 +241,13 @@ namespace mongo {
_done = _cursor.get() == 0;
}
+ void FilteringClientCursor::reset( DBClientCursor* cursor ) {
+ _cursor.reset( cursor );
+ _next = BSONObj();
+ _done = cursor == 0;
+ }
+
+
bool FilteringClientCursor::more() {
if ( ! _next.isEmpty() )
return true;
@@ -399,17 +410,245 @@ namespace mongo {
}
}
+ // TODO: Merge with futures API? We do a lot of error checking here that would be useful elsewhere.
void ParallelSortClusteredCursor::_init() {
+
+ // log() << "Starting parallel search..." << endl;
+
+ // make sure we're not already initialized
assert( ! _cursors );
_cursors = new FilteringClientCursor[_numServers];
- // TODO: parellize
- int num = 0;
- for ( set<ServerAndQuery>::iterator i = _servers.begin(); i!=_servers.end(); ++i ) {
- const ServerAndQuery& sq = *i;
- _cursors[num++].reset( query( sq._server , 0 , sq._extra , _needToSkip ) );
+ bool returnPartial = ( _options & QueryOption_PartialResults );
+
+ vector<ServerAndQuery> queries( _servers.begin(), _servers.end() );
+ set<int> retryQueries;
+ int finishedQueries = 0;
+
+ vector< shared_ptr<ShardConnection> > conns;
+ vector<string> servers;
+
+ // Since we may get all sorts of errors, record them all as they come and throw them later if necessary
+ vector<string> staleConfigExs;
+ vector<string> socketExs;
+ vector<string> otherExs;
+ bool allConfigStale = false;
+
+ int retries = -1;
+
+ // Loop through all the queries until we've finished or gotten a socket exception on all of them
+ // We break early for non-socket exceptions, and socket exceptions if we aren't returning partial results
+ do {
+ retries++;
+
+ bool firstPass = retryQueries.size() == 0;
+
+ if( ! firstPass ){
+ log() << "retrying " << ( returnPartial ? "(partial) " : "" ) << "parallel connection to ";
+ for( set<int>::iterator it = retryQueries.begin(); it != retryQueries.end(); ++it ){
+ log() << queries[*it]._server << ", ";
+ }
+ log() << finishedQueries << " finished queries." << endl;
+ }
+
+ size_t num = 0;
+ for ( vector<ServerAndQuery>::iterator it = queries.begin(); it != queries.end(); ++it ) {
+ size_t i = num++;
+
+ const ServerAndQuery& sq = *it;
+
+ // If we're not retrying this cursor on later passes, continue
+ if( ! firstPass && retryQueries.find( i ) == retryQueries.end() ) continue;
+
+ // log() << "Querying " << _query << " from " << _ns << " for " << sq._server << endl;
+
+ BSONObj q = _query;
+ if ( ! sq._extra.isEmpty() ) {
+ q = concatQuery( q , sq._extra );
+ }
+
+ string errLoc = " @ " + sq._server;
+
+ if( firstPass ){
+
+ // This may be the first time connecting to this shard, if so we can get an error here
+ try {
+ conns.push_back( shared_ptr<ShardConnection>( new ShardConnection( sq._server , _ns ) ) );
+ }
+ catch( std::exception& e ){
+ socketExs.push_back( e.what() + errLoc );
+ if( ! returnPartial ){
+ num--;
+ break;
+ }
+ conns.push_back( shared_ptr<ShardConnection>() );
+ continue;
+ }
+
+ servers.push_back( sq._server );
+ }
+
+ if ( conns[i]->setVersion() ) {
+ conns[i]->done();
+ staleConfigExs.push_back( StaleConfigException( _ns , "ClusteredCursor::query ShardConnection had to change" , true ).what() + errLoc );
+ break;
+ }
+
+ LOG(5) << "ParallelSortClusteredCursor::init server:" << sq._server << " ns:" << _ns
+ << " query:" << q << " _fields:" << _fields << " options: " << _options << endl;
+
+ if( ! _cursors[i].raw() )
+ _cursors[i].reset( new DBClientCursor( conns[i]->get() , _ns , q ,
+ 0 , // nToReturn
+ 0 , // nToSkip
+ _fields.isEmpty() ? 0 : &_fields , // fieldsToReturn
+ _options ,
+ _batchSize == 0 ? 0 : _batchSize + _needToSkip // batchSize
+ ) );
+
+ try{
+ _cursors[i].raw()->initLazy( ! firstPass );
+ }
+ catch( SocketException& e ){
+ socketExs.push_back( e.what() + errLoc );
+ _cursors[i].reset( NULL );
+ conns[i]->done();
+ if( ! returnPartial ) break;
+ }
+ catch( std::exception& e){
+ otherExs.push_back( e.what() + errLoc );
+ _cursors[i].reset( NULL );
+ conns[i]->done();
+ break;
+ }
+
+ }
+
+ // Go through all the potentially started cursors and finish initializing them or log any errors and
+ // potentially retry
+ // TODO: Better error classification would make this easier, errors are indicated in all sorts of ways
+ // here that we need to trap.
+ for ( size_t i = 0; i < num; i++ ) {
+
+ // log() << "Finishing query for " << cons[i].get()->getHost() << endl;
+ string errLoc = " @ " + queries[i]._server;
+
+ if( ! _cursors[i].raw() || ( ! firstPass && retryQueries.find( i ) == retryQueries.end() ) ){
+ if( conns[i] ) conns[i].get()->done();
+ continue;
+ }
+
+ assert( conns[i] );
+ retryQueries.erase( i );
+
+ bool retry = false;
+
+ try {
+
+ if( ! _cursors[i].raw()->initLazyFinish( retry ) ) {
+
+ warning() << "invalid result from " << conns[i]->getHost() << ( retry ? ", retrying" : "" ) << endl;
+ _cursors[i].reset( NULL );
+
+ if( ! retry ){
+ socketExs.push_back( str::stream() << "error querying server: " << servers[i] );
+ conns[i]->done();
+ }
+ else {
+ retryQueries.insert( i );
+ }
+
+ continue;
+ }
+ }
+ catch ( MsgAssertionException& e ){
+ socketExs.push_back( e.what() + errLoc );
+ _cursors[i].reset( NULL );
+ conns[i]->done();
+ continue;
+ }
+ catch ( SocketException& e ) {
+ socketExs.push_back( e.what() + errLoc );
+ _cursors[i].reset( NULL );
+ conns[i]->done();
+ continue;
+ }
+ catch( std::exception& e ){
+ otherExs.push_back( e.what() + errLoc );
+ _cursors[i].reset( NULL );
+ conns[i]->done();
+ continue;
+ }
+
+ try {
+ _cursors[i].raw()->attach( conns[i].get() ); // this calls done on conn
+ _checkCursor( _cursors[i].raw() );
+
+ finishedQueries++;
+ }
+ catch ( StaleConfigException& e ){
+
+ // Our stored configuration data is actually stale, we need to reload it
+ // when we throw our exception
+ allConfigStale = true;
+
+ staleConfigExs.push_back( e.what() + errLoc );
+ _cursors[i].reset( NULL );
+ conns[i]->done();
+ continue;
+ }
+ catch( std::exception& e ){
+ otherExs.push_back( e.what() + errLoc );
+ _cursors[i].reset( NULL );
+ conns[i]->done();
+ continue;
+ }
+ }
+
+ // Don't exceed our max retries, should not happen
+ assert( retries < 5 );
+ }
+ while( retryQueries.size() > 0 /* something to retry */ &&
+ ( socketExs.size() == 0 || returnPartial ) /* no conn issues */ &&
+ staleConfigExs.size() == 0 /* no config issues */ &&
+ otherExs.size() == 0 /* no other issues */);
+
+ // Assert that our conns are all closed!
+ for( vector< shared_ptr<ShardConnection> >::iterator i = conns.begin(); i < conns.end(); ++i ){
+ assert( ! (*i) || ! (*i)->ok() );
+ }
+
+ // Handle errors we got during initialization.
+ // If we're returning partial results, we can ignore socketExs, but nothing else
+ // Log a warning in any case, so we don't lose these messages
+ bool throwException = ( socketExs.size() > 0 && ! returnPartial ) || staleConfigExs.size() > 0 || otherExs.size() > 0;
+
+ if( socketExs.size() > 0 || staleConfigExs.size() > 0 || otherExs.size() > 0 ) {
+
+ vector<string> errMsgs;
+
+ errMsgs.insert( errMsgs.end(), staleConfigExs.begin(), staleConfigExs.end() );
+ errMsgs.insert( errMsgs.end(), otherExs.begin(), otherExs.end() );
+ errMsgs.insert( errMsgs.end(), socketExs.begin(), socketExs.end() );
+
+ stringstream errMsg;
+ errMsg << "could not initialize cursor across all shards because : ";
+ for( vector<string>::iterator i = errMsgs.begin(); i != errMsgs.end(); i++ ){
+ if( i != errMsgs.begin() ) errMsg << " :: and :: ";
+ errMsg << *i;
+ }
+
+ if( throwException && staleConfigExs.size() > 0 )
+ throw StaleConfigException( _ns , errMsg.str() , ! allConfigStale );
+ else if( throwException )
+ throw DBException( errMsg.str(), 14827 );
+ else
+ warning() << errMsg.str() << endl;
}
+ if( retries > 0 )
+ log() << "successfully finished parallel query after " << retries << " retries" << endl;
+
}
ParallelSortClusteredCursor::~ParallelSortClusteredCursor() {
@@ -451,6 +690,7 @@ namespace mongo {
if ( best.isEmpty() ) {
best = me;
bestFrom = i;
+ if( _sortKey.isEmpty() ) break;
continue;
}
@@ -481,49 +721,62 @@ namespace mongo {
// ---- Future -----
// -----------------
- Future::CommandResult::CommandResult( const string& server , const string& db , const BSONObj& cmd , DBClientBase * conn ) {
- _server = server;
- _db = db;
- _cmd = cmd;
- _conn = conn;
- _done = false;
- }
+ Future::CommandResult::CommandResult( const string& server , const string& db , const BSONObj& cmd , int options , DBClientBase * conn )
+ :_server(server) ,_db(db) , _options(options), _cmd(cmd) ,_conn(conn) ,_done(false)
+ {
+ try {
+ if ( ! _conn ){
+ _connHolder.reset( new ScopedDbConnection( _server ) );
+ _conn = _connHolder->get();
+ }
- bool Future::CommandResult::join() {
- _thr->join();
- assert( _done );
- return _ok;
+ if ( _conn->lazySupported() ) {
+ _cursor.reset( new DBClientCursor(_conn, _db + ".$cmd", _cmd, -1/*limit*/, 0, NULL, _options, 0));
+ _cursor->initLazy();
+ }
+ else {
+ _done = true; // we set _done first because even if there is an error we're done
+ _ok = _conn->runCommand( db , cmd , _res , options );
+ }
+ }
+ catch ( std::exception& e ) {
+ error() << "Future::spawnComand (part 1) exception: " << e.what() << endl;
+ _ok = false;
+ _done = true;
+ }
}
- void Future::commandThread(shared_ptr<CommandResult> res) {
- setThreadName( "future" );
+ bool Future::CommandResult::join() {
+ if (_done)
+ return _ok;
try {
- DBClientBase * conn = res->_conn;
-
- scoped_ptr<ScopedDbConnection> myconn;
- if ( ! conn ){
- myconn.reset( new ScopedDbConnection( res->_server ) );
- conn = myconn->get();
- }
-
- res->_ok = conn->runCommand( res->_db , res->_cmd , res->_res );
+ // TODO: Allow retries?
+ bool retry = false;
+ bool finished = _cursor->initLazyFinish( retry );
+
+ // Shouldn't need to communicate with server any more
+ if ( _connHolder )
+ _connHolder->done();
- if ( myconn )
- myconn->done();
+ uassert(14812, str::stream() << "Error running command on server: " << _server, finished);
+ massert(14813, "Command returned nothing", _cursor->more());
+
+ _res = _cursor->nextSafe();
+ _ok = _res["ok"].trueValue();
}
catch ( std::exception& e ) {
- error() << "Future::commandThread exception: " << e.what() << endl;
- res->_ok = false;
+ error() << "Future::spawnComand (part 2) exception: " << e.what() << endl;
+ _ok = false;
}
- res->_done = true;
- }
- shared_ptr<Future::CommandResult> Future::spawnCommand( const string& server , const string& db , const BSONObj& cmd , DBClientBase * conn ) {
- shared_ptr<Future::CommandResult> res (new Future::CommandResult( server , db , cmd , conn ));
- res->_thr.reset( new boost::thread( boost::bind(Future::commandThread, res) ) );
+ _done = true;
+ return _ok;
+ }
+ shared_ptr<Future::CommandResult> Future::spawnCommand( const string& server , const string& db , const BSONObj& cmd , int options , DBClientBase * conn ) {
+ shared_ptr<Future::CommandResult> res (new Future::CommandResult( server , db , cmd , options , conn ));
return res;
}
diff --git a/client/parallel.h b/client/parallel.h
index 0809376..869bff9 100644
--- a/client/parallel.h
+++ b/client/parallel.h
@@ -89,9 +89,15 @@ namespace mongo {
virtual void _init() = 0;
- auto_ptr<DBClientCursor> query( const string& server , int num = 0 , BSONObj extraFilter = BSONObj() , int skipLeft = 0 );
+ auto_ptr<DBClientCursor> query( const string& server , int num = 0 , BSONObj extraFilter = BSONObj() , int skipLeft = 0 , bool lazy=false );
BSONObj explain( const string& server , BSONObj extraFilter = BSONObj() );
+ /**
+ * checks the cursor for any errors
+ * will throw an exceptionif an error is encountered
+ */
+ void _checkCursor( DBClientCursor * cursor );
+
static BSONObj _concatFilter( const BSONObj& filter , const BSONObj& extraFilter );
virtual void _explain( map< string,list<BSONObj> >& out ) = 0;
@@ -111,15 +117,20 @@ namespace mongo {
class FilteringClientCursor {
public:
FilteringClientCursor( const BSONObj filter = BSONObj() );
+ FilteringClientCursor( DBClientCursor* cursor , const BSONObj filter = BSONObj() );
FilteringClientCursor( auto_ptr<DBClientCursor> cursor , const BSONObj filter = BSONObj() );
~FilteringClientCursor();
void reset( auto_ptr<DBClientCursor> cursor );
+ void reset( DBClientCursor* cursor );
bool more();
BSONObj next();
BSONObj peek();
+
+ DBClientCursor* raw() { return _cursor.get(); }
+
private:
void _advance();
@@ -269,14 +280,16 @@ namespace mongo {
private:
- CommandResult( const string& server , const string& db , const BSONObj& cmd , DBClientBase * conn );
+ CommandResult( const string& server , const string& db , const BSONObj& cmd , int options , DBClientBase * conn );
string _server;
string _db;
+ int _options;
BSONObj _cmd;
DBClientBase * _conn;
+ scoped_ptr<ScopedDbConnection> _connHolder; // used if not provided a connection
- scoped_ptr<boost::thread> _thr;
+ scoped_ptr<DBClientCursor> _cursor;
BSONObj _res;
bool _ok;
@@ -285,7 +298,6 @@ namespace mongo {
friend class Future;
};
- static void commandThread(shared_ptr<CommandResult> res);
/**
* @param server server name
@@ -293,7 +305,7 @@ namespace mongo {
* @param cmd cmd to exec
* @param conn optional connection to use. will use standard pooled if non-specified
*/
- static shared_ptr<CommandResult> spawnCommand( const string& server , const string& db , const BSONObj& cmd , DBClientBase * conn = 0 );
+ static shared_ptr<CommandResult> spawnCommand( const string& server , const string& db , const BSONObj& cmd , int options , DBClientBase * conn = 0 );
};
diff --git a/client/redef_macros.h b/client/redef_macros.h
index a4cb1c9..897912d 100644
--- a/client/redef_macros.h
+++ b/client/redef_macros.h
@@ -1,4 +1,7 @@
-/** @file redef_macros.h - redefine macros from undef_macros.h */
+/** @file redef_macros.h macros the implementation uses.
+
+ @see undef_macros.h undefines these after use to minimize name pollution.
+*/
/* Copyright 2009 10gen Inc.
*
diff --git a/client/simple_client_demo.cpp b/client/simple_client_demo.cpp
index fa2f4a8..f4278dd 100644
--- a/client/simple_client_demo.cpp
+++ b/client/simple_client_demo.cpp
@@ -21,15 +21,33 @@ using namespace mongo;
using namespace bson;
int main() {
- cout << "connecting to localhost..." << endl;
- DBClientConnection c;
- c.connect("localhost");
- cout << "connected ok" << endl;
- unsigned long long count = c.count("test.foo");
- cout << "count of exiting documents in collection test.foo : " << count << endl;
-
- bo o = BSON( "hello" << "world" );
- c.insert("test.foo", o);
+ try {
+ cout << "connecting to localhost..." << endl;
+ DBClientConnection c;
+ c.connect("localhost");
+ cout << "connected ok" << endl;
+ unsigned long long count = c.count("test.foo");
+ cout << "count of exiting documents in collection test.foo : " << count << endl;
+
+ bo o = BSON( "hello" << "world" );
+ c.insert("test.foo", o);
+
+ string e = c.getLastError();
+ if( !e.empty() ) {
+ cout << "insert #1 failed: " << e << endl;
+ }
+
+ // make an index with a unique key constraint
+ c.ensureIndex("test.foo", BSON("hello"<<1), /*unique*/true);
+
+ c.insert("test.foo", o); // will cause a dup key error on "hello" field
+ cout << "we expect a dup key error here:" << endl;
+ cout << " " << c.getLastErrorDetailed().toString() << endl;
+ }
+ catch(DBException& e) {
+ cout << "caught DBException " << e.toString() << endl;
+ return 1;
+ }
return 0;
}
diff --git a/client/syncclusterconnection.cpp b/client/syncclusterconnection.cpp
index 4fafdc1..34633d1 100644
--- a/client/syncclusterconnection.cpp
+++ b/client/syncclusterconnection.cpp
@@ -24,7 +24,7 @@
namespace mongo {
- SyncClusterConnection::SyncClusterConnection( const list<HostAndPort> & L) : _mutex("SynClusterConnection") {
+ SyncClusterConnection::SyncClusterConnection( const list<HostAndPort> & L, double socketTimeout) : _mutex("SyncClusterConnection"), _socketTimeout( socketTimeout ) {
{
stringstream s;
int n=0;
@@ -38,7 +38,7 @@ namespace mongo {
_connect( i->toString() );
}
- SyncClusterConnection::SyncClusterConnection( string commaSeperated ) : _mutex("SyncClusterConnection") {
+ SyncClusterConnection::SyncClusterConnection( string commaSeperated, double socketTimeout) : _mutex("SyncClusterConnection"), _socketTimeout( socketTimeout ) {
_address = commaSeperated;
string::size_type idx;
while ( ( idx = commaSeperated.find( ',' ) ) != string::npos ) {
@@ -50,7 +50,7 @@ namespace mongo {
uassert( 8004 , "SyncClusterConnection needs 3 servers" , _conns.size() == 3 );
}
- SyncClusterConnection::SyncClusterConnection( string a , string b , string c ) : _mutex("SyncClusterConnection") {
+ SyncClusterConnection::SyncClusterConnection( string a , string b , string c, double socketTimeout) : _mutex("SyncClusterConnection"), _socketTimeout( socketTimeout ) {
_address = a + "," + b + "," + c;
// connect to all even if not working
_connect( a );
@@ -58,7 +58,7 @@ namespace mongo {
_connect( c );
}
- SyncClusterConnection::SyncClusterConnection( SyncClusterConnection& prev ) : _mutex("SyncClusterConnection") {
+ SyncClusterConnection::SyncClusterConnection( SyncClusterConnection& prev, double socketTimeout) : _mutex("SyncClusterConnection"), _socketTimeout( socketTimeout ) {
assert(0);
}
@@ -79,7 +79,7 @@ namespace mongo {
for ( size_t i=0; i<_conns.size(); i++ ) {
BSONObj res;
try {
- if ( _conns[i]->simpleCommand( "admin" , 0 , "fsync" ) )
+ if ( _conns[i]->simpleCommand( "admin" , &res , "fsync" ) )
continue;
}
catch ( DBException& e ) {
@@ -144,6 +144,7 @@ namespace mongo {
void SyncClusterConnection::_connect( string host ) {
log() << "SyncClusterConnection connecting to [" << host << "]" << endl;
DBClientConnection * c = new DBClientConnection( true );
+ c->setSoTimeout( _socketTimeout );
string errmsg;
if ( ! c->connect( host , errmsg ) )
log() << "SyncClusterConnection connect fail to: " << host << " errmsg: " << errmsg << endl;
@@ -159,7 +160,7 @@ namespace mongo {
BSONObj SyncClusterConnection::findOne(const string &ns, const Query& query, const BSONObj *fieldsToReturn, int queryOptions) {
if ( ns.find( ".$cmd" ) != string::npos ) {
- string cmdName = query.obj.firstElement().fieldName();
+ string cmdName = query.obj.firstElementFieldName();
int lockType = _lockType( cmdName );
@@ -194,12 +195,22 @@ namespace mongo {
return DBClientBase::findOne( ns , query , fieldsToReturn , queryOptions );
}
+ bool SyncClusterConnection::auth(const string &dbname, const string &username, const string &password_text, string& errmsg, bool digestPassword) {
+ for (vector<DBClientConnection*>::iterator it = _conns.begin(); it < _conns.end(); it++) {
+ massert( 15848, "sync cluster of sync clusters?", (*it)->type() != ConnectionString::SYNC);
+
+ if (!(*it)->auth(dbname, username, password_text, errmsg, digestPassword)) {
+ return false;
+ }
+ }
+ return true;
+ }
auto_ptr<DBClientCursor> SyncClusterConnection::query(const string &ns, Query query, int nToReturn, int nToSkip,
const BSONObj *fieldsToReturn, int queryOptions, int batchSize ) {
_lastErrors.clear();
if ( ns.find( ".$cmd" ) != string::npos ) {
- string cmdName = query.obj.firstElement().fieldName();
+ string cmdName = query.obj.firstElementFieldName();
int lockType = _lockType( cmdName );
uassert( 13054 , (string)"write $cmd not supported in SyncClusterConnection::query for:" + cmdName , lockType <= 0 );
}
@@ -240,7 +251,7 @@ namespace mongo {
return c;
}
- void SyncClusterConnection::insert( const string &ns, BSONObj obj ) {
+ void SyncClusterConnection::insert( const string &ns, BSONObj obj , int flags) {
uassert( 13119 , (string)"SyncClusterConnection::insert obj has to have an _id: " + obj.jsonString() ,
ns.find( ".system.indexes" ) != string::npos || obj["_id"].type() );
@@ -250,13 +261,13 @@ namespace mongo {
throw UserException( 8003 , (string)"SyncClusterConnection::insert prepare failed: " + errmsg );
for ( size_t i=0; i<_conns.size(); i++ ) {
- _conns[i]->insert( ns , obj );
+ _conns[i]->insert( ns , obj , flags);
}
_checkLast();
}
- void SyncClusterConnection::insert( const string &ns, const vector< BSONObj >& v ) {
+ void SyncClusterConnection::insert( const string &ns, const vector< BSONObj >& v , int flags) {
uassert( 10023 , "SyncClusterConnection bulk insert not implemented" , 0);
}
@@ -284,7 +295,7 @@ namespace mongo {
throw UserException( 8005 , (string)"SyncClusterConnection::udpate prepare failed: " + errmsg );
}
- for ( size_t i=0; i<_conns.size(); i++ ) {
+ for ( size_t i = 0; i < _conns.size(); i++ ) {
try {
_conns[i]->update( ns , query , obj , upsert , multi );
}
@@ -347,7 +358,7 @@ namespace mongo {
throw UserException( 8008 , "all servers down!" );
}
- void SyncClusterConnection::say( Message &toSend ) {
+ void SyncClusterConnection::say( Message &toSend, bool isRetry ) {
string errmsg;
if ( ! prepare( errmsg ) )
throw UserException( 13397 , (string)"SyncClusterConnection::say prepare failed: " + errmsg );
@@ -386,4 +397,11 @@ namespace mongo {
assert(0);
}
+ void SyncClusterConnection::setAllSoTimeouts( double socketTimeout ){
+ _socketTimeout = socketTimeout;
+ for ( size_t i=0; i<_conns.size(); i++ )
+
+ if( _conns[i] ) _conns[i]->setSoTimeout( socketTimeout );
+ }
+
}
diff --git a/client/syncclusterconnection.h b/client/syncclusterconnection.h
index c946073..68dd338 100644
--- a/client/syncclusterconnection.h
+++ b/client/syncclusterconnection.h
@@ -43,9 +43,9 @@ namespace mongo {
/**
* @param commaSeparated should be 3 hosts comma separated
*/
- SyncClusterConnection( const list<HostAndPort> & );
- SyncClusterConnection( string commaSeparated );
- SyncClusterConnection( string a , string b , string c );
+ SyncClusterConnection( const list<HostAndPort> &, double socketTimeout = 0);
+ SyncClusterConnection( string commaSeparated, double socketTimeout = 0);
+ SyncClusterConnection( string a , string b , string c, double socketTimeout = 0 );
~SyncClusterConnection();
/**
@@ -67,16 +67,16 @@ namespace mongo {
virtual auto_ptr<DBClientCursor> getMore( const string &ns, long long cursorId, int nToReturn, int options );
- virtual void insert( const string &ns, BSONObj obj );
+ virtual void insert( const string &ns, BSONObj obj, int flags=0);
- virtual void insert( const string &ns, const vector< BSONObj >& v );
+ virtual void insert( const string &ns, const vector< BSONObj >& v, int flags=0);
virtual void remove( const string &ns , Query query, bool justOne );
virtual void update( const string &ns , Query query , BSONObj obj , bool upsert , bool multi );
virtual bool call( Message &toSend, Message &response, bool assertOk , string * actualServer );
- virtual void say( Message &toSend );
+ virtual void say( Message &toSend, bool isRetry = false );
virtual void sayPiggyBack( Message &toSend );
virtual void killCursor( long long cursorID );
@@ -91,8 +91,14 @@ namespace mongo {
virtual ConnectionString::ConnectionType type() const { return ConnectionString::SYNC; }
+ void setAllSoTimeouts( double socketTimeout );
+ double getSoTimeout() const { return _socketTimeout; }
+
+ virtual bool auth(const string &dbname, const string &username, const string &password_text, string& errmsg, bool digestPassword);
+
+ virtual bool lazySupported() const { return false; }
private:
- SyncClusterConnection( SyncClusterConnection& prev );
+ SyncClusterConnection( SyncClusterConnection& prev, double socketTimeout = 0 );
string _toString() const;
bool _commandOnActive(const string &dbname, const BSONObj& cmd, BSONObj &info, int options=0);
auto_ptr<DBClientCursor> _queryOnActive(const string &ns, Query query, int nToReturn, int nToSkip,
@@ -108,6 +114,8 @@ namespace mongo {
mongo::mutex _mutex;
vector<BSONObj> _lastErrors;
+
+ double _socketTimeout;
};
class UpdateNotTheSame : public UserException {
diff --git a/client/undef_macros.h b/client/undef_macros.h
index bc59a84..30ece61 100644
--- a/client/undef_macros.h
+++ b/client/undef_macros.h
@@ -1,4 +1,4 @@
-/** @file undef_macros.h - remove mongo-specific macros that might cause issues */
+/** @file undef_macros.h remove mongo implementation macros after using */
/* Copyright 2009 10gen Inc.
*