diff options
Diffstat (limited to 'client')
27 files changed, 2920 insertions, 613 deletions
diff --git a/client/clientOnly.cpp b/client/clientOnly.cpp index 5725e5f..11890c8 100644 --- a/client/clientOnly.cpp +++ b/client/clientOnly.cpp @@ -17,7 +17,6 @@ #include "pch.h" #include "../client/dbclient.h" -#include "../db/dbhelpers.h" #include "../db/cmdline.h" #include "../s/shard.h" @@ -29,6 +28,10 @@ namespace mongo { bool dbexitCalled = false; + void exitCleanly( ExitCode code ) { + dbexit( code ); + } + void dbexit( ExitCode returnCode, const char *whyMsg , bool tryToGetLock ) { dbexitCalled = true; out() << "dbexit called" << endl; diff --git a/client/connpool.cpp b/client/connpool.cpp index 23d14da..2d7c37b 100644 --- a/client/connpool.cpp +++ b/client/connpool.cpp @@ -36,8 +36,9 @@ namespace mongo { } } - void PoolForHost::done( DBClientBase * c ) { + void PoolForHost::done( DBConnectionPool * pool, DBClientBase * c ) { if ( _pool.size() >= _maxPerHost ) { + pool->onDestory( c ); delete c; } else { @@ -45,16 +46,24 @@ namespace mongo { } } - DBClientBase * PoolForHost::get() { + DBClientBase * PoolForHost::get( DBConnectionPool * pool , double socketTimeout ) { time_t now = time(0); - + while ( ! _pool.empty() ) { StoredConnection sc = _pool.top(); _pool.pop(); - if ( sc.ok( now ) ) - return sc.conn; - delete sc.conn; + + if ( ! sc.ok( now ) ) { + pool->onDestory( sc.conn ); + delete sc.conn; + continue; + } + + assert( sc.conn->getSoTimeout() == socketTimeout ); + + return sc.conn; + } return NULL; @@ -75,14 +84,34 @@ namespace mongo { } } + void PoolForHost::getStaleConnections( vector<DBClientBase*>& stale ) { + time_t now = time(0); + + vector<StoredConnection> all; + while ( ! _pool.empty() ) { + StoredConnection c = _pool.top(); + _pool.pop(); + + if ( c.ok( now ) ) + all.push_back( c ); + else + stale.push_back( c.conn ); + } + + for ( size_t i=0; i<all.size(); i++ ) { + _pool.push( all[i] ); + } + } + + PoolForHost::StoredConnection::StoredConnection( DBClientBase * c ) { conn = c; when = time(0); } bool PoolForHost::StoredConnection::ok( time_t now ) { - // if connection has been idle for an hour, kill it - return ( now - when ) < 3600; + // if connection has been idle for 30 minutes, kill it + return ( now - when ) < 1800; } void PoolForHost::createdOne( DBClientBase * base) { @@ -97,16 +126,23 @@ namespace mongo { DBConnectionPool pool; - DBClientBase* DBConnectionPool::_get(const string& ident) { + DBConnectionPool::DBConnectionPool() + : _mutex("DBConnectionPool") , + _name( "dbconnectionpool" ) , + _hooks( new list<DBConnectionHook*>() ) { + } + + DBClientBase* DBConnectionPool::_get(const string& ident , double socketTimeout ) { + assert( ! inShutdown() ); scoped_lock L(_mutex); - PoolForHost& p = _pools[ident]; - return p.get(); + PoolForHost& p = _pools[PoolKey(ident,socketTimeout)]; + return p.get( this , socketTimeout ); } - DBClientBase* DBConnectionPool::_finishCreate( const string& host , DBClientBase* conn ) { + DBClientBase* DBConnectionPool::_finishCreate( const string& host , double socketTimeout , DBClientBase* conn ) { { scoped_lock L(_mutex); - PoolForHost& p = _pools[host]; + PoolForHost& p = _pools[PoolKey(host,socketTimeout)]; p.createdOne( conn ); } @@ -116,22 +152,22 @@ namespace mongo { return conn; } - DBClientBase* DBConnectionPool::get(const ConnectionString& url) { - DBClientBase * c = _get( url.toString() ); + DBClientBase* DBConnectionPool::get(const ConnectionString& url, double socketTimeout) { + DBClientBase * c = _get( url.toString() , socketTimeout ); if ( c ) { onHandedOut( c ); return c; } string errmsg; - c = url.connect( errmsg ); + c = url.connect( errmsg, socketTimeout ); uassert( 13328 , _name + ": connect failed " + url.toString() + " : " + errmsg , c ); - return _finishCreate( url.toString() , c ); + return _finishCreate( url.toString() , socketTimeout , c ); } - DBClientBase* DBConnectionPool::get(const string& host) { - DBClientBase * c = _get( host ); + DBClientBase* DBConnectionPool::get(const string& host, double socketTimeout) { + DBClientBase * c = _get( host , socketTimeout ); if ( c ) { onHandedOut( c ); return c; @@ -141,12 +177,23 @@ namespace mongo { ConnectionString cs = ConnectionString::parse( host , errmsg ); uassert( 13071 , (string)"invalid hostname [" + host + "]" + errmsg , cs.isValid() ); - c = cs.connect( errmsg ); + c = cs.connect( errmsg, socketTimeout ); if ( ! c ) throw SocketException( SocketException::CONNECT_ERROR , host , 11002 , str::stream() << _name << " error: " << errmsg ); - return _finishCreate( host , c ); + return _finishCreate( host , socketTimeout , c ); + } + + void DBConnectionPool::release(const string& host, DBClientBase *c) { + if ( c->isFailed() ) { + onDestory( c ); + delete c; + return; + } + scoped_lock L(_mutex); + _pools[PoolKey(host,c->getSoTimeout())].done(this,c); } + DBConnectionPool::~DBConnectionPool() { // connection closing is handled by ~PoolForHost } @@ -160,42 +207,55 @@ namespace mongo { } void DBConnectionPool::addHook( DBConnectionHook * hook ) { - _hooks.push_back( hook ); + _hooks->push_back( hook ); } void DBConnectionPool::onCreate( DBClientBase * conn ) { - if ( _hooks.size() == 0 ) + if ( _hooks->size() == 0 ) return; - for ( list<DBConnectionHook*>::iterator i = _hooks.begin(); i != _hooks.end(); i++ ) { + for ( list<DBConnectionHook*>::iterator i = _hooks->begin(); i != _hooks->end(); i++ ) { (*i)->onCreate( conn ); } } void DBConnectionPool::onHandedOut( DBClientBase * conn ) { - if ( _hooks.size() == 0 ) + if ( _hooks->size() == 0 ) return; - for ( list<DBConnectionHook*>::iterator i = _hooks.begin(); i != _hooks.end(); i++ ) { + for ( list<DBConnectionHook*>::iterator i = _hooks->begin(); i != _hooks->end(); i++ ) { (*i)->onHandedOut( conn ); } } + void DBConnectionPool::onDestory( DBClientBase * conn ) { + if ( _hooks->size() == 0 ) + return; + + for ( list<DBConnectionHook*>::iterator i = _hooks->begin(); i != _hooks->end(); i++ ) { + (*i)->onDestory( conn ); + } + } + void DBConnectionPool::appendInfo( BSONObjBuilder& b ) { - BSONObjBuilder bb( b.subobjStart( "hosts" ) ); + int avail = 0; long long created = 0; map<ConnectionString::ConnectionType,long long> createdByType; + set<string> replicaSets; + + BSONObjBuilder bb( b.subobjStart( "hosts" ) ); { scoped_lock lk( _mutex ); for ( PoolMap::iterator i=_pools.begin(); i!=_pools.end(); ++i ) { if ( i->second.numCreated() == 0 ) continue; - string s = i->first; + string s = str::stream() << i->first.ident << "::" << i->first.timeout; + BSONObjBuilder temp( bb.subobjStart( s ) ); temp.append( "available" , i->second.numAvailable() ); temp.appendNumber( "created" , i->second.numCreated() ); @@ -206,9 +266,33 @@ namespace mongo { long long& x = createdByType[i->second.type()]; x += i->second.numCreated(); + + { + string setName = i->first.ident; + if ( setName.find( "/" ) != string::npos ) { + setName = setName.substr( 0 , setName.find( "/" ) ); + replicaSets.insert( setName ); + } + } } } bb.done(); + + + BSONObjBuilder setBuilder( b.subobjStart( "replicaSets" ) ); + for ( set<string>::iterator i=replicaSets.begin(); i!=replicaSets.end(); ++i ) { + string rs = *i; + ReplicaSetMonitorPtr m = ReplicaSetMonitor::get( rs ); + if ( ! m ) { + warning() << "no monitor for set: " << rs << endl; + continue; + } + + BSONObjBuilder temp( setBuilder.subobjStart( rs ) ); + m->appendInfo( temp ); + temp.done(); + } + setBuilder.done(); { BSONObjBuilder temp( bb.subobjStart( "createdByType" ) ); @@ -223,21 +307,82 @@ namespace mongo { } bool DBConnectionPool::serverNameCompare::operator()( const string& a , const string& b ) const{ - string ap = str::before( a , "/" ); - string bp = str::before( b , "/" ); + const char* ap = a.c_str(); + const char* bp = b.c_str(); + + while (true){ + if (*ap == '\0' || *ap == '/'){ + if (*bp == '\0' || *bp == '/') + return false; // equal strings + else + return true; // a is shorter + } + + if (*bp == '\0' || *bp == '/') + return false; // b is shorter + + if ( *ap < *bp) + return true; + else if (*ap > *bp) + return false; + + ++ap; + ++bp; + } + assert(false); + } + + bool DBConnectionPool::poolKeyCompare::operator()( const PoolKey& a , const PoolKey& b ) const { + if (DBConnectionPool::serverNameCompare()( a.ident , b.ident )) + return true; - return ap < bp; + if (DBConnectionPool::serverNameCompare()( b.ident , a.ident )) + return false; + + return a.timeout < b.timeout; + } + + + void DBConnectionPool::taskDoWork() { + vector<DBClientBase*> toDelete; + + { + // we need to get the connections inside the lock + // but we can actually delete them outside + scoped_lock lk( _mutex ); + for ( PoolMap::iterator i=_pools.begin(); i!=_pools.end(); ++i ) { + i->second.getStaleConnections( toDelete ); + } + } + + for ( size_t i=0; i<toDelete.size(); i++ ) { + try { + onDestory( toDelete[i] ); + delete toDelete[i]; + } + catch ( ... ) { + // we don't care if there was a socket error + } + } } // ------ ScopedDbConnection ------ ScopedDbConnection * ScopedDbConnection::steal() { assert( _conn ); - ScopedDbConnection * n = new ScopedDbConnection( _host , _conn ); + ScopedDbConnection * n = new ScopedDbConnection( _host , _conn, _socketTimeout ); _conn = 0; return n; } + void ScopedDbConnection::_setSocketTimeout(){ + if( ! _conn ) return; + if( _conn->type() == ConnectionString::MASTER ) + (( DBClientConnection* ) _conn)->setSoTimeout( _socketTimeout ); + else if( _conn->type() == ConnectionString::SYNC ) + (( SyncClusterConnection* ) _conn)->setAllSoTimeouts( _socketTimeout ); + } + ScopedDbConnection::~ScopedDbConnection() { if ( _conn ) { if ( ! _conn->isFailed() ) { @@ -248,12 +393,14 @@ namespace mongo { } } - ScopedDbConnection::ScopedDbConnection(const Shard& shard ) - : _host( shard.getConnString() ) , _conn( pool.get(_host) ) { + ScopedDbConnection::ScopedDbConnection(const Shard& shard, double socketTimeout ) + : _host( shard.getConnString() ) , _conn( pool.get(_host, socketTimeout) ), _socketTimeout( socketTimeout ) { + _setSocketTimeout(); } - ScopedDbConnection::ScopedDbConnection(const Shard* shard ) - : _host( shard->getConnString() ) , _conn( pool.get(_host) ) { + ScopedDbConnection::ScopedDbConnection(const Shard* shard, double socketTimeout ) + : _host( shard->getConnString() ) , _conn( pool.get(_host, socketTimeout) ), _socketTimeout( socketTimeout ) { + _setSocketTimeout(); } @@ -262,7 +409,7 @@ namespace mongo { PoolFlushCmd() : Command( "connPoolSync" , false , "connpoolsync" ) {} virtual void help( stringstream &help ) const { help<<"internal"; } virtual LockType locktype() const { return NONE; } - virtual bool run(const string&, mongo::BSONObj&, std::string&, mongo::BSONObjBuilder& result, bool) { + virtual bool run(const string&, mongo::BSONObj&, int, std::string&, mongo::BSONObjBuilder& result, bool) { pool.flush(); return true; } @@ -277,7 +424,7 @@ namespace mongo { PoolStats() : Command( "connPoolStats" ) {} virtual void help( stringstream &help ) const { help<<"stats about connection pool"; } virtual LockType locktype() const { return NONE; } - virtual bool run(const string&, mongo::BSONObj&, std::string&, mongo::BSONObjBuilder& result, bool) { + virtual bool run(const string&, mongo::BSONObj&, int, std::string&, mongo::BSONObjBuilder& result, bool) { pool.appendInfo( result ); result.append( "numDBClientConnection" , DBClientConnection::getNumConnections() ); result.append( "numAScopedConnection" , AScopedConnection::getNumConnections() ); diff --git a/client/connpool.h b/client/connpool.h index e7f59d6..a37dad7 100644 --- a/client/connpool.h +++ b/client/connpool.h @@ -21,9 +21,12 @@ #include "dbclient.h" #include "redef_macros.h" +#include "../util/background.h" + namespace mongo { class Shard; + class DBConnectionPool; /** * not thread safe @@ -44,7 +47,7 @@ namespace mongo { int numAvailable() const { return (int)_pool.size(); } - void createdOne( DBClientBase * base); + void createdOne( DBClientBase * base ); long long numCreated() const { return _created; } ConnectionString::ConnectionType type() const { assert(_created); return _type; } @@ -52,11 +55,13 @@ namespace mongo { /** * gets a connection or return NULL */ - DBClientBase * get(); + DBClientBase * get( DBConnectionPool * pool , double socketTimeout ); - void done( DBClientBase * c ); + void done( DBConnectionPool * pool , DBClientBase * c ); void flush(); + + void getStaleConnections( vector<DBClientBase*>& stale ); static void setMaxPerHost( unsigned max ) { _maxPerHost = max; } static unsigned getMaxPerHost() { return _maxPerHost; } @@ -72,6 +77,7 @@ namespace mongo { }; std::stack<StoredConnection> _pool; + long long _created; ConnectionString::ConnectionType _type; @@ -83,6 +89,7 @@ namespace mongo { virtual ~DBConnectionHook() {} virtual void onCreate( DBClientBase * conn ) {} virtual void onHandedOut( DBClientBase * conn ) {} + virtual void onDestory( DBClientBase * conn ) {} }; /** Database connection pool. @@ -100,29 +107,11 @@ namespace mongo { c.conn()... } */ - class DBConnectionPool { + class DBConnectionPool : public PeriodicTask { public: - /** compares server namees, but is smart about replica set names */ - struct serverNameCompare { - bool operator()( const string& a , const string& b ) const; - }; - - private: - - mongo::mutex _mutex; - typedef map<string,PoolForHost,serverNameCompare> PoolMap; // servername -> pool - PoolMap _pools; - list<DBConnectionHook*> _hooks; - string _name; - - DBClientBase* _get( const string& ident ); - - DBClientBase* _finishCreate( const string& ident , DBClientBase* conn ); - - public: - DBConnectionPool() : _mutex("DBConnectionPool") , _name( "dbconnectionpool" ) { } + DBConnectionPool(); ~DBConnectionPool(); /** right now just controls some asserts. defaults to "dbconnectionpool" */ @@ -130,22 +119,54 @@ namespace mongo { void onCreate( DBClientBase * conn ); void onHandedOut( DBClientBase * conn ); + void onDestory( DBClientBase * conn ); void flush(); - DBClientBase *get(const string& host); - DBClientBase *get(const ConnectionString& host); + DBClientBase *get(const string& host, double socketTimeout = 0); + DBClientBase *get(const ConnectionString& host, double socketTimeout = 0); - void release(const string& host, DBClientBase *c) { - if ( c->isFailed() ) { - delete c; - return; - } - scoped_lock L(_mutex); - _pools[host].done(c); - } - void addHook( DBConnectionHook * hook ); + void release(const string& host, DBClientBase *c); + + void addHook( DBConnectionHook * hook ); // we take ownership void appendInfo( BSONObjBuilder& b ); + + /** compares server namees, but is smart about replica set names */ + struct serverNameCompare { + bool operator()( const string& a , const string& b ) const; + }; + + virtual string taskName() const { return "DBConnectionPool-cleaner"; } + virtual void taskDoWork(); + + private: + DBConnectionPool( DBConnectionPool& p ); + + DBClientBase* _get( const string& ident , double socketTimeout ); + + DBClientBase* _finishCreate( const string& ident , double socketTimeout, DBClientBase* conn ); + + struct PoolKey { + PoolKey( string i , double t ) : ident( i ) , timeout( t ) {} + string ident; + double timeout; + }; + + struct poolKeyCompare { + bool operator()( const PoolKey& a , const PoolKey& b ) const; + }; + + typedef map<PoolKey,PoolForHost,poolKeyCompare> PoolMap; // servername -> pool + + mongo::mutex _mutex; + string _name; + + PoolMap _pools; + + // pointers owned by me, right now they leak on shutdown + // _hooks itself also leaks because it creates a shutdown race condition + list<DBConnectionHook*> * _hooks; + }; extern DBConnectionPool pool; @@ -154,9 +175,15 @@ namespace mongo { public: AScopedConnection() { _numConnections++; } virtual ~AScopedConnection() { _numConnections--; } + virtual DBClientBase* get() = 0; virtual void done() = 0; virtual string getHost() const = 0; + + /** + * @return true iff this has a connection to the db + */ + virtual bool ok() const = 0; /** * @return total number of current instances of AScopedConnection @@ -176,19 +203,25 @@ namespace mongo { /** the main constructor you want to use throws UserException if can't connect */ - explicit ScopedDbConnection(const string& host) : _host(host), _conn( pool.get(host) ) {} + explicit ScopedDbConnection(const string& host, double socketTimeout = 0) : _host(host), _conn( pool.get(host, socketTimeout) ), _socketTimeout( socketTimeout ) { + _setSocketTimeout(); + } - ScopedDbConnection() : _host( "" ) , _conn(0) {} + ScopedDbConnection() : _host( "" ) , _conn(0), _socketTimeout( 0 ) {} /* @param conn - bind to an existing connection */ - ScopedDbConnection(const string& host, DBClientBase* conn ) : _host( host ) , _conn( conn ) {} + ScopedDbConnection(const string& host, DBClientBase* conn, double socketTimeout = 0 ) : _host( host ) , _conn( conn ), _socketTimeout( socketTimeout ) { + _setSocketTimeout(); + } /** throws UserException if can't connect */ - explicit ScopedDbConnection(const ConnectionString& url ) : _host(url.toString()), _conn( pool.get(url) ) {} + explicit ScopedDbConnection(const ConnectionString& url, double socketTimeout = 0 ) : _host(url.toString()), _conn( pool.get(url, socketTimeout) ), _socketTimeout( socketTimeout ) { + _setSocketTimeout(); + } /** throws UserException if can't connect */ - explicit ScopedDbConnection(const Shard& shard ); - explicit ScopedDbConnection(const Shard* shard ); + explicit ScopedDbConnection(const Shard& shard, double socketTimeout = 0 ); + explicit ScopedDbConnection(const Shard* shard, double socketTimeout = 0 ); ~ScopedDbConnection(); @@ -210,6 +243,8 @@ namespace mongo { return _conn; } + bool ok() const { return _conn > 0; } + string getHost() const { return _host; } /** Force closure of the connection. You should call this if you leave it in @@ -242,8 +277,12 @@ namespace mongo { ScopedDbConnection * steal(); private: + + void _setSocketTimeout(); + const string _host; DBClientBase *_conn; + const double _socketTimeout; }; diff --git a/client/dbclient.cpp b/client/dbclient.cpp index bb24199..dadf7e4 100644 --- a/client/dbclient.cpp +++ b/client/dbclient.cpp @@ -64,21 +64,23 @@ namespace mongo { } - DBClientBase* ConnectionString::connect( string& errmsg ) const { + DBClientBase* ConnectionString::connect( string& errmsg, double socketTimeout ) const { switch ( _type ) { case MASTER: { DBClientConnection * c = new DBClientConnection(true); + c->setSoTimeout( socketTimeout ); log(1) << "creating new connection to:" << _servers[0] << endl; if ( ! c->connect( _servers[0] , errmsg ) ) { delete c; return 0; } + log(1) << "connected connection!" << endl; return c; } case PAIR: case SET: { - DBClientReplicaSet * set = new DBClientReplicaSet( _setName , _servers ); + DBClientReplicaSet * set = new DBClientReplicaSet( _setName , _servers , socketTimeout ); if( ! set->connect() ) { delete set; errmsg = "connect failed to set "; @@ -93,7 +95,8 @@ namespace mongo { list<HostAndPort> l; for ( unsigned i=0; i<_servers.size(); i++ ) l.push_back( _servers[i] ); - return new SyncClusterConnection( l ); + SyncClusterConnection* c = new SyncClusterConnection( l, socketTimeout ); + return c; } case INVALID: @@ -294,7 +297,7 @@ namespace mongo { return b.obj(); } - BSONObj getlasterrorcmdobj = fromjson("{getlasterror:1}"); + const BSONObj getlasterrorcmdobj = fromjson("{getlasterror:1}"); BSONObj DBClientWithCommands::getLastErrorDetailed() { BSONObj info; @@ -314,7 +317,7 @@ namespace mongo { return e.str(); } - BSONObj getpreverrorcmdobj = fromjson("{getpreverror:1}"); + const BSONObj getpreverrorcmdobj = fromjson("{getpreverror:1}"); BSONObj DBClientWithCommands::getPrevError() { BSONObj info; @@ -391,6 +394,7 @@ namespace mongo { } bool DBClientWithCommands::createCollection(const string &ns, long long size, bool capped, int max, BSONObj *info) { + assert(!capped||size); BSONObj o; if ( info == 0 ) info = &o; BSONObjBuilder b; @@ -529,19 +533,31 @@ namespace mongo { return DBClientBase::auth(dbname, username, password.c_str(), errmsg, false); } - BSONObj DBClientInterface::findOne(const string &ns, const Query& query, const BSONObj *fieldsToReturn, int queryOptions) { + /** query N objects from the database into an array. makes sense mostly when you want a small number of results. if a huge number, use + query() and iterate the cursor. + */ + void DBClientInterface::findN(vector<BSONObj>& out, const string& ns, Query query, int nToReturn, int nToSkip, const BSONObj *fieldsToReturn, int queryOptions) { + out.reserve(nToReturn); + auto_ptr<DBClientCursor> c = - this->query(ns, query, 1, 0, fieldsToReturn, queryOptions); + this->query(ns, query, nToReturn, nToSkip, fieldsToReturn, queryOptions); - uassert( 10276 , str::stream() << "DBClientBase::findOne: transport error: " << getServerAddress() << " query: " << query.toString(), c.get() ); + uassert( 10276 , str::stream() << "DBClientBase::findN: transport error: " << getServerAddress() << " query: " << query.toString(), c.get() ); if ( c->hasResultFlag( ResultFlag_ShardConfigStale ) ) - throw StaleConfigException( ns , "findOne has stale config" ); + throw StaleConfigException( ns , "findN stale config" ); - if ( !c->more() ) - return BSONObj(); + for( int i = 0; i < nToReturn; i++ ) { + if ( !c->more() ) + break; + out.push_back( c->nextSafe().copy() ); + } + } - return c->nextSafe().copy(); + BSONObj DBClientInterface::findOne(const string &ns, const Query& query, const BSONObj *fieldsToReturn, int queryOptions) { + vector<BSONObj> v; + findN(v, ns, query, 1, 0, fieldsToReturn, queryOptions); + return v.empty() ? BSONObj() : v[0]; } bool DBClientConnection::connect(const HostAndPort& server, string& errmsg) { @@ -558,39 +574,50 @@ namespace mongo { p.reset(new MessagingPort( _so_timeout, _logLevel )); if (server->getAddr() == "0.0.0.0") { - failed = true; + _failed = true; return false; } + // if( _so_timeout == 0 ){ + // printStackTrace(); + // log() << "Connecting to server " << _serverString << " timeout " << _so_timeout << endl; + // } if ( !p->connect(*server) ) { stringstream ss; ss << "couldn't connect to server " << _serverString; errmsg = ss.str(); - failed = true; + _failed = true; return false; } + +#ifdef MONGO_SSL + if ( cmdLine.sslOnNormalPorts ) { + p->secure( sslManager() ); + } +#endif + return true; } void DBClientConnection::_checkConnection() { - if ( !failed ) + if ( !_failed ) return; if ( lastReconnectTry && time(0)-lastReconnectTry < 2 ) { // we wait a little before reconnect attempt to avoid constant hammering. // but we throw we don't want to try to use a connection in a bad state - throw SocketException(SocketException::FAILED_STATE); + throw SocketException( SocketException::FAILED_STATE , toString() ); } if ( !autoReconnect ) - throw SocketException(SocketException::FAILED_STATE); + throw SocketException( SocketException::FAILED_STATE , toString() ); lastReconnectTry = time(0); log(_logLevel) << "trying reconnect to " << _serverString << endl; string errmsg; - failed = false; + _failed = false; if ( ! _connect(errmsg) ) { - failed = true; + _failed = true; log(_logLevel) << "reconnect " << _serverString << " failed " << errmsg << endl; - throw SocketException(SocketException::CONNECT_ERROR); + throw SocketException( SocketException::CONNECT_ERROR , toString() ); } log(_logLevel) << "reconnect " << _serverString << " ok" << endl; @@ -675,7 +702,7 @@ namespace mongo { /* connection CANNOT be used anymore as more data may be on the way from the server. we have to reconnect. */ - failed = true; + _failed = true; p->shutdown(); throw; } @@ -683,12 +710,11 @@ namespace mongo { return n; } - void DBClientBase::insert( const string & ns , BSONObj obj ) { + void DBClientBase::insert( const string & ns , BSONObj obj , int flags) { Message toSend; BufBuilder b; - int opts = 0; - b.appendNum( opts ); + b.appendNum( flags ); b.appendStr( ns ); obj.appendSelfToBufBuilder( b ); @@ -697,12 +723,11 @@ namespace mongo { say( toSend ); } - void DBClientBase::insert( const string & ns , const vector< BSONObj > &v ) { + void DBClientBase::insert( const string & ns , const vector< BSONObj > &v , int flags) { Message toSend; BufBuilder b; - int opts = 0; - b.appendNum( opts ); + b.appendNum( flags ); b.appendStr( ns ); for( vector< BSONObj >::const_iterator i = v.begin(); i != v.end(); ++i ) i->appendSelfToBufBuilder( b ); @@ -750,8 +775,12 @@ namespace mongo { toSend.setData( dbUpdate , b.buf() , b.len() ); say( toSend ); + + } + + auto_ptr<DBClientCursor> DBClientWithCommands::getIndexes( const string &ns ) { return query( Namespace( ns.c_str() ).getSisterNS( "system.indexes" ).c_str() , BSON( "ns" << ns ) ); } @@ -816,7 +845,7 @@ namespace mongo { return ss.str(); } - bool DBClientWithCommands::ensureIndex( const string &ns , BSONObj keys , bool unique, const string & name , bool cache ) { + bool DBClientWithCommands::ensureIndex( const string &ns , BSONObj keys , bool unique, const string & name , bool cache, bool background, int version ) { BSONObjBuilder toSave; toSave.append( "ns" , ns ); toSave.append( "key" , keys ); @@ -834,9 +863,15 @@ namespace mongo { cacheKey += nn; } + if( version >= 0 ) + toSave.append("v", version); + if ( unique ) toSave.appendBool( "unique", unique ); + if( background ) + toSave.appendBool( "background", true ); + if ( _seenIndexes.count( cacheKey ) ) return 0; @@ -874,13 +909,13 @@ namespace mongo { toSend.setData(dbQuery, b.buf(), b.len()); } - void DBClientConnection::say( Message &toSend ) { + void DBClientConnection::say( Message &toSend, bool isRetry ) { checkConnection(); try { port().say( toSend ); } catch( SocketException & ) { - failed = true; + _failed = true; throw; } } @@ -889,8 +924,8 @@ namespace mongo { port().piggyBack( toSend ); } - void DBClientConnection::recv( Message &m ) { - port().recv(m); + bool DBClientConnection::recv( Message &m ) { + return port().recv(m); } bool DBClientConnection::call( Message &toSend, Message &response, bool assertOk , string * actualServer ) { @@ -900,7 +935,7 @@ namespace mongo { */ try { if ( !port().call(toSend, response) ) { - failed = true; + _failed = true; if ( assertOk ) uasserted( 10278 , str::stream() << "dbclient error communicating with server: " << getServerAddress() ); @@ -908,21 +943,46 @@ namespace mongo { } } catch( SocketException & ) { - failed = true; + _failed = true; throw; } return true; } - void DBClientConnection::checkResponse( const char *data, int nReturned ) { + BSONElement getErrField(const BSONObj& o) { + BSONElement first = o.firstElement(); + if( strcmp(first.fieldName(), "$err") == 0 ) + return first; + + // temp - will be DEV only later + /*DEV*/ + if( 1 ) { + BSONElement e = o["$err"]; + if( !e.eoo() ) { + wassert(false); + } + return e; + } + + return BSONElement(); + } + + bool hasErrField( const BSONObj& o ){ + return ! getErrField( o ).eoo(); + } + + void DBClientConnection::checkResponse( const char *data, int nReturned, bool* retry, string* host ) { /* check for errors. the only one we really care about at * this stage is "not master" */ + *retry = false; + *host = _serverString; + if ( clientSet && nReturned ) { assert(data); BSONObj o(data); - BSONElement e = o["$err"]; + BSONElement e = getErrField(o); if ( e.type() == String && str::contains( e.valuestr() , "not master" ) ) { clientSet->isntMaster(); } @@ -930,7 +990,7 @@ namespace mongo { } void DBClientConnection::killCursor( long long cursorId ) { - BufBuilder b; + StackBufBuilder b; b.appendNum( (int)0 ); // reserved b.appendNum( (int)1 ); // number b.appendNum( cursorId ); @@ -944,6 +1004,19 @@ namespace mongo { say(m); } +#ifdef MONGO_SSL + SSLManager* DBClientConnection::sslManager() { + if ( _sslManager ) + return _sslManager; + + SSLManager* s = new SSLManager(true); + _sslManager = s; + return s; + } + + SSLManager* DBClientConnection::_sslManager = 0; +#endif + AtomicUInt DBClientConnection::_numConnections; bool DBClientConnection::_lazyKillCursor = true; diff --git a/client/dbclient.h b/client/dbclient.h index 9bc71fd..2b4bb85 100644 --- a/client/dbclient.h +++ b/client/dbclient.h @@ -1,4 +1,7 @@ -/** @file dbclient.h - connect to a Mongo database as a database, from C++ */ +/** @file dbclient.h + + Core MongoDB C++ driver interfaces are defined here. +*/ /* Copyright 2009 10gen Inc. * @@ -18,7 +21,8 @@ #pragma once #include "../pch.h" -#include "../util/message.h" +#include "../util/net/message.h" +#include "../util/net/message_port.h" #include "../db/jsobj.h" #include "../db/json.h" #include <stack> @@ -100,6 +104,15 @@ namespace mongo { RemoveOption_Broadcast = 1 << 1 }; + + /** + * need to put in DbMesssage::ReservedOptions as well + */ + enum InsertOptions { + /** With muli-insert keep processing inserts if one fails */ + InsertOption_ContinueOnError = 1 << 0 + }; + class DBClientBase; /** @@ -174,7 +187,7 @@ namespace mongo { string toString() const { return _string; } - DBClientBase* connect( string& errmsg ) const; + DBClientBase* connect( string& errmsg, double socketTimeout = 0 ) const; string getSetName() const { return _setName; } @@ -296,7 +309,7 @@ namespace mongo { Query& where(const string &jscode) { return where(jscode, BSONObj()); } /** - * if this query has an orderby, hint, or some other field + * @return true if this query has an orderby, hint, or some other field */ bool isComplex( bool * hasDollar = 0 ) const; @@ -332,12 +345,15 @@ namespace mongo { virtual ~DBConnector() {} /** actualServer is set to the actual server where they call went if there was a choice (SlaveOk) */ virtual bool call( Message &toSend, Message &response, bool assertOk=true , string * actualServer = 0 ) = 0; - virtual void say( Message &toSend ) = 0; + virtual void say( Message &toSend, bool isRetry = false ) = 0; virtual void sayPiggyBack( Message &toSend ) = 0; - virtual void checkResponse( const char* data, int nReturned ) {} - /* used by QueryOption_Exhaust. To use that your subclass must implement this. */ - virtual void recv( Message& m ) { assert(false); } + virtual bool recv( Message& m ) { assert(false); return false; } + // In general, for lazy queries, we'll need to say, recv, then checkResponse + virtual void checkResponse( const char* data, int nReturned, bool* retry = NULL, string* targetHost = NULL ) { + if( retry ) *retry = false; if( targetHost ) *targetHost = ""; + } + virtual bool lazySupported() const = 0; }; /** @@ -348,12 +364,9 @@ namespace mongo { virtual auto_ptr<DBClientCursor> query(const string &ns, Query query, int nToReturn = 0, int nToSkip = 0, const BSONObj *fieldsToReturn = 0, int queryOptions = 0 , int batchSize = 0 ) = 0; - /** don't use this - called automatically by DBClientCursor for you */ - virtual auto_ptr<DBClientCursor> getMore( const string &ns, long long cursorId, int nToReturn = 0, int options = 0 ) = 0; - - virtual void insert( const string &ns, BSONObj obj ) = 0; + virtual void insert( const string &ns, BSONObj obj , int flags=0) = 0; - virtual void insert( const string &ns, const vector< BSONObj >& v ) = 0; + virtual void insert( const string &ns, const vector< BSONObj >& v , int flags=0) = 0; virtual void remove( const string &ns , Query query, bool justOne = 0 ) = 0; @@ -367,8 +380,15 @@ namespace mongo { */ virtual BSONObj findOne(const string &ns, const Query& query, const BSONObj *fieldsToReturn = 0, int queryOptions = 0); + /** query N objects from the database into an array. makes sense mostly when you want a small number of results. if a huge number, use + query() and iterate the cursor. + */ + void findN(vector<BSONObj>& out, const string&ns, Query query, int nToReturn, int nToSkip = 0, const BSONObj *fieldsToReturn = 0, int queryOptions = 0); + virtual string getServerAddress() const = 0; + /** don't use this - called automatically by DBClientCursor for you */ + virtual auto_ptr<DBClientCursor> getMore( const string &ns, long long cursorId, int nToReturn = 0, int options = 0 ) = 0; }; /** @@ -449,15 +469,19 @@ namespace mongo { */ bool createCollection(const string &ns, long long size = 0, bool capped = false, int max = 0, BSONObj *info = 0); - /** Get error result from the last operation on this connection. + /** Get error result from the last write operation (insert/update/delete) on this connection. @return error message text, or empty string if no error. */ string getLastError(); - /** Get error result from the last operation on this connection. + + /** Get error result from the last write operation (insert/update/delete) on this connection. @return full error object. */ virtual BSONObj getLastErrorDetailed(); + /** Can be called with the returned value from getLastErrorDetailed to extract an error string. + If all you need is the string, just call getLastError() instead. + */ static string getLastErrorString( const BSONObj& res ); /** Return the last error which has occurred, even if not the very last operation. @@ -640,13 +664,15 @@ namespace mongo { @param ns collection to be indexed @param keys the "key pattern" for the index. e.g., { name : 1 } @param unique if true, indicates that key uniqueness should be enforced for this index - @param name if not isn't specified, it will be created from the keys (recommended) + @param name if not specified, it will be created from the keys automatically (which is recommended) @param cache if set to false, the index cache for the connection won't remember this call + @param background build index in the background (see mongodb docs/wiki for details) + @param v index version. leave at default value. (unit tests set this parameter.) @return whether or not sent message to db. should be true on first call, false on subsequent unless resetIndexCache was called */ virtual bool ensureIndex( const string &ns , BSONObj keys , bool unique = false, const string &name = "", - bool cache = true ); + bool cache = true, bool background = false, int v = -1 ); /** clears the index cache, so the subsequent call to ensureIndex for any index will go to the server @@ -748,12 +774,12 @@ namespace mongo { /** insert an object into the database */ - virtual void insert( const string &ns , BSONObj obj ); + virtual void insert( const string &ns , BSONObj obj , int flags=0); /** insert a vector of objects into the database */ - virtual void insert( const string &ns, const vector< BSONObj >& v ); + virtual void insert( const string &ns, const vector< BSONObj >& v , int flags=0); /** remove matching objects from the database @@ -772,9 +798,10 @@ namespace mongo { virtual bool callRead( Message& toSend , Message& response ) = 0; // virtual bool callWrite( Message& toSend , Message& response ) = 0; // TODO: add this if needed - virtual void say( Message& toSend ) = 0; - + virtual ConnectionString::ConnectionType type() const = 0; + + virtual double getSoTimeout() const = 0; }; // DBClientBase @@ -798,7 +825,7 @@ namespace mongo { Connect timeout is fixed, but short, at 5 seconds. */ DBClientConnection(bool _autoReconnect=false, DBClientReplicaSet* cp=0, double so_timeout=0) : - clientSet(cp), failed(false), autoReconnect(_autoReconnect), lastReconnectTry(0), _so_timeout(so_timeout) { + clientSet(cp), _failed(false), autoReconnect(_autoReconnect), lastReconnectTry(0), _so_timeout(so_timeout) { _numConnections++; } @@ -869,14 +896,14 @@ namespace mongo { @return true if this connection is currently in a failed state. When autoreconnect is on, a connection will transition back to an ok state after reconnecting. */ - bool isFailed() const { return failed; } + bool isFailed() const { return _failed; } - MessagingPort& port() { return *p; } + MessagingPort& port() { assert(p); return *p; } string toStringLong() const { stringstream ss; ss << _serverString; - if ( failed ) ss << " failed"; + if ( _failed ) ss << " failed"; return ss.str(); } @@ -887,11 +914,15 @@ namespace mongo { virtual void killCursor( long long cursorID ); virtual bool callRead( Message& toSend , Message& response ) { return call( toSend , response ); } - virtual void say( Message &toSend ); + virtual void say( Message &toSend, bool isRetry = false ); + virtual bool recv( Message& m ); + virtual void checkResponse( const char *data, int nReturned, bool* retry = NULL, string* host = NULL ); virtual bool call( Message &toSend, Message &response, bool assertOk = true , string * actualServer = 0 ); virtual ConnectionString::ConnectionType type() const { return ConnectionString::MASTER; } - virtual void checkResponse( const char *data, int nReturned ); void setSoTimeout(double to) { _so_timeout = to; } + double getSoTimeout() const { return _so_timeout; } + + virtual bool lazySupported() const { return true; } static int getNumConnections() { return _numConnections; @@ -899,16 +930,15 @@ namespace mongo { static void setLazyKillCursor( bool lazy ) { _lazyKillCursor = lazy; } static bool getLazyKillCursor() { return _lazyKillCursor; } - + protected: friend class SyncClusterConnection; - virtual void recv( Message& m ); virtual void sayPiggyBack( Message &toSend ); DBClientReplicaSet *clientSet; boost::scoped_ptr<MessagingPort> p; boost::scoped_ptr<SockAddr> server; - bool failed; + bool _failed; const bool autoReconnect; time_t lastReconnectTry; HostAndPort _server; // remember for reconnects @@ -916,7 +946,7 @@ namespace mongo { void _checkConnection(); // throws SocketException if in failed state and not reconnecting or if waiting to reconnect - void checkConnection() { if( failed ) _checkConnection(); } + void checkConnection() { if( _failed ) _checkConnection(); } map< string, pair<string,string> > authCache; double _so_timeout; @@ -924,6 +954,11 @@ namespace mongo { static AtomicUInt _numConnections; static bool _lazyKillCursor; // lazy means we piggy back kill cursors on next op + +#ifdef MONGO_SSL + static SSLManager* sslManager(); + static SSLManager* _sslManager; +#endif }; /** pings server to check if it's up @@ -932,6 +967,9 @@ namespace mongo { DBClientBase * createDirectClient(); + BSONElement getErrField( const BSONObj& result ); + bool hasErrField( const BSONObj& result ); + } // namespace mongo #include "dbclientcursor.h" diff --git a/client/dbclient_rs.cpp b/client/dbclient_rs.cpp index 37f6225..2cab1f7 100644 --- a/client/dbclient_rs.cpp +++ b/client/dbclient_rs.cpp @@ -54,9 +54,9 @@ namespace mongo { void run() { log() << "starting" << endl; while ( ! inShutdown() ) { - sleepsecs( 20 ); + sleepsecs( 10 ); try { - ReplicaSetMonitor::checkAll(); + ReplicaSetMonitor::checkAll( true ); } catch ( std::exception& e ) { error() << "check failed: " << e.what() << endl; @@ -99,17 +99,14 @@ namespace mongo { } _nodes.push_back( Node( servers[i] , conn.release() ) ); - + + int myLoc = _nodes.size() - 1; string maybePrimary; - if (_checkConnection( _nodes[_nodes.size()-1].conn , maybePrimary, false)) { - break; - } + _checkConnection( _nodes[myLoc].conn.get() , maybePrimary, false, myLoc ); } } ReplicaSetMonitor::~ReplicaSetMonitor() { - for ( unsigned i=0; i<_nodes.size(); i++ ) - delete _nodes[i].conn; _nodes.clear(); _master = -1; } @@ -125,7 +122,16 @@ namespace mongo { return m; } - void ReplicaSetMonitor::checkAll() { + ReplicaSetMonitorPtr ReplicaSetMonitor::get( const string& name ) { + scoped_lock lk( _setsLock ); + map<string,ReplicaSetMonitorPtr>::const_iterator i = _sets.find( name ); + if ( i == _sets.end() ) + return ReplicaSetMonitorPtr(); + return i->second; + } + + + void ReplicaSetMonitor::checkAll( bool checkAllSecondaries ) { set<string> seen; while ( true ) { @@ -146,7 +152,7 @@ namespace mongo { if ( ! m ) break; - m->check(); + m->check( checkAllSecondaries ); } @@ -202,7 +208,7 @@ namespace mongo { return _nodes[_master].addr; } - _check(); + _check( false ); scoped_lock lk( _lock ); uassert( 10009 , str::stream() << "ReplicaSetMonitor no master found for set: " << _name , _master >= 0 ); @@ -210,34 +216,70 @@ namespace mongo { } HostAndPort ReplicaSetMonitor::getSlave( const HostAndPort& prev ) { - // make sure its valid - if ( prev.port() > 0 ) { + // make sure its valid + + bool wasFound = false; + + // This is always true, since checked in port() + assert( prev.port() >= 0 ); + if( prev.host().size() ){ scoped_lock lk( _lock ); for ( unsigned i=0; i<_nodes.size(); i++ ) { if ( prev != _nodes[i].addr ) continue; - if ( _nodes[i].ok ) + wasFound = true; + + if ( _nodes[i].okForSecondaryQueries() ) return prev; + break; } } + if( prev.host().size() ){ + if( wasFound ){ LOG(1) << "slave '" << prev << "' is no longer ok to use" << endl; } + else{ LOG(1) << "slave '" << prev << "' was not found in the replica set" << endl; } + } + else LOG(1) << "slave '" << prev << "' is not initialized or invalid" << endl; + return getSlave(); } HostAndPort ReplicaSetMonitor::getSlave() { - scoped_lock lk( _lock ); - for ( unsigned i=0; i<_nodes.size(); i++ ) { - _nextSlave = ( _nextSlave + 1 ) % _nodes.size(); - if ( _nextSlave == _master ) - continue; - if ( _nodes[ _nextSlave ].ok ) - return _nodes[ _nextSlave ].addr; + LOG(2) << "selecting new slave from replica set " << getServerAddress() << endl; + + // Logic is to retry three times for any secondary node, if we can't find any secondary, we'll take + // any "ok" node + // TODO: Could this query hidden nodes? + const int MAX = 3; + for ( int xxx=0; xxx<MAX; xxx++ ) { + + { + scoped_lock lk( _lock ); + + unsigned i = 0; + for ( ; i<_nodes.size(); i++ ) { + _nextSlave = ( _nextSlave + 1 ) % _nodes.size(); + if ( _nextSlave == _master ){ + LOG(2) << "not selecting " << _nodes[_nextSlave] << " as it is the current master" << endl; + continue; + } + if ( _nodes[ _nextSlave ].okForSecondaryQueries() || ( _nodes[ _nextSlave ].ok && ( xxx + 1 ) >= MAX ) ) + return _nodes[ _nextSlave ].addr; + + LOG(2) << "not selecting " << _nodes[_nextSlave] << " as it is not ok to use" << endl; + } + + } + + check(false); } + + LOG(2) << "no suitable slave nodes found, returning default node " << _nodes[ 0 ] << endl; - return _nodes[ 0 ].addr; + return _nodes[0].addr; } /** @@ -266,7 +308,7 @@ namespace mongo { string host = member["name"].String(); int m = -1; - if ((m = _find(host)) <= 0) { + if ((m = _find(host)) < 0) { continue; } @@ -309,16 +351,34 @@ namespace mongo { - bool ReplicaSetMonitor::_checkConnection( DBClientConnection * c , string& maybePrimary , bool verbose ) { + bool ReplicaSetMonitor::_checkConnection( DBClientConnection * c , string& maybePrimary , bool verbose , int nodesOffset ) { scoped_lock lk( _checkConnectionLock ); bool isMaster = false; bool changed = false; try { + Timer t; BSONObj o; c->isMaster(isMaster, &o); + + if ( o["setName"].type() != String || o["setName"].String() != _name ) { + warning() << "node: " << c->getServerAddress() << " isn't a part of set: " << _name + << " ismaster: " << o << endl; + if ( nodesOffset >= 0 ) + _nodes[nodesOffset].ok = false; + return false; + } - log( ! verbose ) << "ReplicaSetMonitor::_checkConnection: " << c->toString() << ' ' << o << endl; + if ( nodesOffset >= 0 ) { + _nodes[nodesOffset].pingTimeMillis = t.millis(); + _nodes[nodesOffset].hidden = o["hidden"].trueValue(); + _nodes[nodesOffset].secondary = o["secondary"].trueValue(); + _nodes[nodesOffset].ismaster = o["ismaster"].trueValue(); + + _nodes[nodesOffset].lastIsMaster = o.copy(); + } + log( ! verbose ) << "ReplicaSetMonitor::_checkConnection: " << c->toString() << ' ' << o << endl; + // add other nodes if ( o["hosts"].type() == Array ) { if ( o["primary"].type() == String ) @@ -329,11 +389,14 @@ namespace mongo { if (o.hasField("passives") && o["passives"].type() == Array) { _checkHosts(o["passives"].Obj(), changed); } - + _checkStatus(c); + + } catch ( std::exception& e ) { log( ! verbose ) << "ReplicaSetMonitor::_checkConnection: caught exception " << c->toString() << ' ' << e.what() << endl; + _nodes[nodesOffset].ok = false; } if ( changed && _hook ) @@ -342,24 +405,28 @@ namespace mongo { return isMaster; } - void ReplicaSetMonitor::_check() { + void ReplicaSetMonitor::_check( bool checkAllSecondaries ) { bool triedQuickCheck = false; LOG(1) << "_check : " << getServerAddress() << endl; + int newMaster = -1; + for ( int retry = 0; retry < 2; retry++ ) { for ( unsigned i=0; i<_nodes.size(); i++ ) { - DBClientConnection * c; + shared_ptr<DBClientConnection> c; { scoped_lock lk( _lock ); c = _nodes[i].conn; } string maybePrimary; - if ( _checkConnection( c , maybePrimary , retry ) ) { + if ( _checkConnection( c.get() , maybePrimary , retry , i ) ) { _master = i; - return; + newMaster = i; + if ( ! checkAllSecondaries ) + return; } if ( ! triedQuickCheck && maybePrimary.size() ) { @@ -367,36 +434,44 @@ namespace mongo { if ( x >= 0 ) { triedQuickCheck = true; string dummy; - DBClientConnection * testConn; + shared_ptr<DBClientConnection> testConn; { scoped_lock lk( _lock ); testConn = _nodes[x].conn; } - if ( _checkConnection( testConn , dummy , false ) ) { + if ( _checkConnection( testConn.get() , dummy , false , x ) ) { _master = x; - return; + newMaster = x; + if ( ! checkAllSecondaries ) + return; } } } } + + if ( newMaster >= 0 ) + return; + sleepsecs(1); } } - void ReplicaSetMonitor::check() { + void ReplicaSetMonitor::check( bool checkAllSecondaries ) { // first see if the current master is fine if ( _master >= 0 ) { string temp; - if ( _checkConnection( _nodes[_master].conn , temp , false ) ) { - // current master is fine, so we're done - return; + if ( _checkConnection( _nodes[_master].conn.get() , temp , false , _master ) ) { + if ( ! checkAllSecondaries ) { + // current master is fine, so we're done + return; + } } } // we either have no master, or the current is dead - _check(); + _check( checkAllSecondaries ); } int ReplicaSetMonitor::_find( const string& server ) const { @@ -419,7 +494,26 @@ namespace mongo { return i; return -1; } - + + void ReplicaSetMonitor::appendInfo( BSONObjBuilder& b ) const { + scoped_lock lk( _lock ); + BSONArrayBuilder hosts( b.subarrayStart( "hosts" ) ); + for ( unsigned i=0; i<_nodes.size(); i++ ) { + hosts.append( BSON( "addr" << _nodes[i].addr << + // "lastIsMaster" << _nodes[i].lastIsMaster << // this is a potential race, so only used when debugging + "ok" << _nodes[i].ok << + "ismaster" << _nodes[i].ismaster << + "hidden" << _nodes[i].hidden << + "secondary" << _nodes[i].secondary << + "pingTimeMillis" << _nodes[i].pingTimeMillis ) ); + + } + hosts.done(); + + b.append( "master" , _master ); + b.append( "nextSlave" , _nextSlave ); + } + mongo::mutex ReplicaSetMonitor::_setsLock( "ReplicaSetMonitor" ); map<string,ReplicaSetMonitorPtr> ReplicaSetMonitor::_sets; @@ -428,8 +522,9 @@ namespace mongo { // ----- DBClientReplicaSet --------- // -------------------------------- - DBClientReplicaSet::DBClientReplicaSet( const string& name , const vector<HostAndPort>& servers ) - : _monitor( ReplicaSetMonitor::get( name , servers ) ) { + DBClientReplicaSet::DBClientReplicaSet( const string& name , const vector<HostAndPort>& servers, double so_timeout ) + : _monitor( ReplicaSetMonitor::get( name , servers ) ), + _so_timeout( so_timeout ) { } DBClientReplicaSet::~DBClientReplicaSet() { @@ -446,7 +541,7 @@ namespace mongo { } _masterHost = _monitor->getMaster(); - _master.reset( new DBClientConnection( true , this ) ); + _master.reset( new DBClientConnection( true , this , _so_timeout ) ); string errmsg; if ( ! _master->connect( _masterHost , errmsg ) ) { _monitor->notifyFailure( _masterHost ); @@ -464,12 +559,12 @@ namespace mongo { return _slave.get(); _monitor->notifySlaveFailure( _slaveHost ); _slaveHost = _monitor->getSlave(); - } + } else { _slaveHost = h; } - _slave.reset( new DBClientConnection( true , this ) ); + _slave.reset( new DBClientConnection( true , this , _so_timeout ) ); _slave->connect( _slaveHost ); _auth( _slave.get() ); return _slave.get(); @@ -522,12 +617,12 @@ namespace mongo { // ------------- simple functions ----------------- - void DBClientReplicaSet::insert( const string &ns , BSONObj obj ) { - checkMaster()->insert(ns, obj); + void DBClientReplicaSet::insert( const string &ns , BSONObj obj , int flags) { + checkMaster()->insert(ns, obj, flags); } - void DBClientReplicaSet::insert( const string &ns, const vector< BSONObj >& v ) { - checkMaster()->insert(ns, v); + void DBClientReplicaSet::insert( const string &ns, const vector< BSONObj >& v , int flags) { + checkMaster()->insert(ns, v, flags); } void DBClientReplicaSet::remove( const string &ns , Query obj , bool justOne ) { @@ -545,12 +640,12 @@ namespace mongo { // we're ok sending to a slave // we'll try 2 slaves before just using master // checkSlave will try a different slave automatically after a failure - for ( int i=0; i<2; i++ ) { + for ( int i=0; i<3; i++ ) { try { return checkSlaveQueryResult( checkSlave()->query(ns,query,nToReturn,nToSkip,fieldsToReturn,queryOptions,batchSize) ); } catch ( DBException &e ) { - log() << "can't query replica set slave " << i << " : " << _slaveHost << e.what() << endl; + LOG(1) << "can't query replica set slave " << i << " : " << _slaveHost << causedBy( e ) << endl; } } } @@ -563,12 +658,12 @@ namespace mongo { // we're ok sending to a slave // we'll try 2 slaves before just using master // checkSlave will try a different slave automatically after a failure - for ( int i=0; i<2; i++ ) { + for ( int i=0; i<3; i++ ) { try { return checkSlave()->findOne(ns,query,fieldsToReturn,queryOptions); } catch ( DBException &e ) { - LOG(1) << "can't findone replica set slave " << i << " : " << _slaveHost << e.what() << endl; + LOG(1) << "can't findone replica set slave " << i << " : " << _slaveHost << causedBy( e ) << endl; } } } @@ -584,23 +679,22 @@ namespace mongo { assert(0); } - auto_ptr<DBClientCursor> DBClientReplicaSet::checkSlaveQueryResult( auto_ptr<DBClientCursor> result ){ + void DBClientReplicaSet::isntMaster() { + log() << "got not master for: " << _masterHost << endl; + _monitor->notifyFailure( _masterHost ); + _master.reset(); + } - bool isError = result->hasResultFlag( ResultFlag_ErrSet ); + auto_ptr<DBClientCursor> DBClientReplicaSet::checkSlaveQueryResult( auto_ptr<DBClientCursor> result ){ + BSONObj error; + bool isError = result->peekError( &error ); if( ! isError ) return result; - BSONObj error = result->peekOne(); - - BSONElement code = error["code"]; - if( code.eoo() || ! code.isNumber() ){ - warning() << "no code for error from secondary host " << _slaveHost << ", error was " << error << endl; - return result; - } - // We only check for "not master or secondary" errors here // If the error code here ever changes, we need to change this code also - if( code.Int() == 13436 /* not master or secondary */ ){ + BSONElement code = error["code"]; + if( code.isNumber() && code.Int() == 13436 /* not master or secondary */ ){ isntSecondary(); throw DBException( str::stream() << "slave " << _slaveHost.toString() << " is no longer secondary", 14812 ); } @@ -615,20 +709,123 @@ namespace mongo { _slave.reset(); } + void DBClientReplicaSet::say( Message& toSend, bool isRetry ) { - void DBClientReplicaSet::isntMaster() { - log() << "got not master for: " << _masterHost << endl; - _monitor->notifyFailure( _masterHost ); - _master.reset(); + if( ! isRetry ) + _lazyState = LazyState(); + + int lastOp = -1; + bool slaveOk = false; + + if ( ( lastOp = toSend.operation() ) == dbQuery ) { + // TODO: might be possible to do this faster by changing api + DbMessage dm( toSend ); + QueryMessage qm( dm ); + if ( ( slaveOk = ( qm.queryOptions & QueryOption_SlaveOk ) ) ) { + + for ( int i = _lazyState._retries; i < 3; i++ ) { + try { + DBClientConnection* slave = checkSlave(); + slave->say( toSend ); + + _lazyState._lastOp = lastOp; + _lazyState._slaveOk = slaveOk; + _lazyState._retries = i; + _lazyState._lastClient = slave; + return; + } + catch ( DBException &e ) { + LOG(1) << "can't callLazy replica set slave " << i << " : " << _slaveHost << causedBy( e ) << endl; + } + } + } + } + + DBClientConnection* master = checkMaster(); + master->say( toSend ); + + _lazyState._lastOp = lastOp; + _lazyState._slaveOk = slaveOk; + _lazyState._retries = 3; + _lazyState._lastClient = master; + return; + } + + bool DBClientReplicaSet::recv( Message& m ) { + + assert( _lazyState._lastClient ); + + // TODO: It would be nice if we could easily wrap a conn error as a result error + try { + return _lazyState._lastClient->recv( m ); + } + catch( DBException& e ){ + log() << "could not receive data from " << _lazyState._lastClient << causedBy( e ) << endl; + return false; + } + } + + void DBClientReplicaSet::checkResponse( const char* data, int nReturned, bool* retry, string* targetHost ){ + + // For now, do exactly as we did before, so as not to break things. In general though, we + // should fix this so checkResponse has a more consistent contract. + if( ! retry ){ + if( _lazyState._lastClient ) + return _lazyState._lastClient->checkResponse( data, nReturned ); + else + return checkMaster()->checkResponse( data, nReturned ); + } + + *retry = false; + if( targetHost && _lazyState._lastClient ) *targetHost = _lazyState._lastClient->getServerAddress(); + else if (targetHost) *targetHost = ""; + + if( ! _lazyState._lastClient ) return; + if( nReturned != 1 && nReturned != -1 ) return; + + BSONObj dataObj; + if( nReturned == 1 ) dataObj = BSONObj( data ); + + // Check if we should retry here + if( _lazyState._lastOp == dbQuery && _lazyState._slaveOk ){ + + // Check the error code for a slave not secondary error + if( nReturned == -1 || + ( hasErrField( dataObj ) && ! dataObj["code"].eoo() && dataObj["code"].Int() == 13436 ) ){ + + bool wasMaster = false; + if( _lazyState._lastClient == _slave.get() ){ + isntSecondary(); + } + else if( _lazyState._lastClient == _master.get() ){ + wasMaster = true; + isntMaster(); + } + else + warning() << "passed " << dataObj << " but last rs client " << _lazyState._lastClient->toString() << " is not master or secondary" << endl; + + if( _lazyState._retries < 3 ){ + _lazyState._retries++; + *retry = true; + } + else{ + (void)wasMaster; // silence set-but-not-used warning + // assert( wasMaster ); + // printStackTrace(); + log() << "too many retries (" << _lazyState._retries << "), could not get data from replica set" << endl; + } + } + } } + bool DBClientReplicaSet::call( Message &toSend, Message &response, bool assertOk , string * actualServer ) { if ( toSend.operation() == dbQuery ) { // TODO: might be possible to do this faster by changing api DbMessage dm( toSend ); QueryMessage qm( dm ); if ( qm.queryOptions & QueryOption_SlaveOk ) { - for ( int i=0; i<2; i++ ) { + for ( int i=0; i<3; i++ ) { try { DBClientConnection* s = checkSlave(); if ( actualServer ) @@ -636,7 +833,7 @@ namespace mongo { return s->call( toSend , response , assertOk ); } catch ( DBException &e ) { - LOG(1) << "can't call replica set slave " << i << " : " << _slaveHost << e.what() << endl; + LOG(1) << "can't call replica set slave " << i << " : " << _slaveHost << causedBy( e ) << endl; if ( actualServer ) *actualServer = ""; } diff --git a/client/dbclient_rs.h b/client/dbclient_rs.h index 548b46a..b6948a0 100644 --- a/client/dbclient_rs.h +++ b/client/dbclient_rs.h @@ -1,4 +1,4 @@ -/** @file dbclient_rs.h - connect to a Replica Set, from C++ */ +/** @file dbclient_rs.h Connect to a Replica Set, from C++ */ /* Copyright 2009 10gen Inc. * @@ -43,10 +43,16 @@ namespace mongo { static ReplicaSetMonitorPtr get( const string& name , const vector<HostAndPort>& servers ); /** + * gets a cached Monitor per name or will return none if it doesn't exist + */ + static ReplicaSetMonitorPtr get( const string& name ); + + + /** * checks all sets for current master and new secondaries * usually only called from a BackgroundJob */ - static void checkAll(); + static void checkAll( bool checkAllSecondaries ); /** * this is called whenever the config of any repclia set changes @@ -81,13 +87,15 @@ namespace mongo { /** * checks for current master and new secondaries */ - void check(); + void check( bool checkAllSecondaries ); string getName() const { return _name; } string getServerAddress() const; bool contains( const string& server ) const; + + void appendInfo( BSONObjBuilder& b ) const; private: /** @@ -98,7 +106,7 @@ namespace mongo { */ ReplicaSetMonitor( const string& name , const vector<HostAndPort>& servers ); - void _check(); + void _check( bool checkAllSecondaries ); /** * Use replSetGetStatus command to make sure hosts in host list are up @@ -119,9 +127,10 @@ namespace mongo { * @param c the connection to check * @param maybePrimary OUT * @param verbose + * @param nodesOffset - offset into _nodes array, -1 for not in it * @return if the connection is good */ - bool _checkConnection( DBClientConnection * c , string& maybePrimary , bool verbose ); + bool _checkConnection( DBClientConnection * c , string& maybePrimary , bool verbose , int nodesOffset ); int _find( const string& server ) const ; int _find_inlock( const string& server ) const ; @@ -132,14 +141,44 @@ namespace mongo { string _name; struct Node { - Node( const HostAndPort& a , DBClientConnection* c ) : addr( a ) , conn(c) , ok(true) {} + Node( const HostAndPort& a , DBClientConnection* c ) + : addr( a ) , conn(c) , ok(true) , + ismaster(false), secondary( false ) , hidden( false ) , pingTimeMillis(0) { + } + + bool okForSecondaryQueries() const { + return ok && secondary && ! hidden; + } + + BSONObj toBSON() const { + return BSON( "addr" << addr.toString() << + "isMaster" << ismaster << + "secondary" << secondary << + "hidden" << hidden << + "ok" << ok ); + } + + string toString() const { + return toBSON().toString(); + } + HostAndPort addr; - DBClientConnection* conn; + shared_ptr<DBClientConnection> conn; // if this node is in a failure state // used for slave routing // this is too simple, should make it better bool ok; + + // as reported by ismaster + BSONObj lastIsMaster; + + bool ismaster; + bool secondary; + bool hidden; + + int pingTimeMillis; + }; /** @@ -168,7 +207,7 @@ namespace mongo { public: /** Call connect() after constructing. autoReconnect is always on for DBClientReplicaSet connections. */ - DBClientReplicaSet( const string& name , const vector<HostAndPort>& servers ); + DBClientReplicaSet( const string& name , const vector<HostAndPort>& servers, double so_timeout=0 ); virtual ~DBClientReplicaSet(); /** Returns false if nomember of the set were reachable, or neither is @@ -191,11 +230,11 @@ namespace mongo { /** throws userassertion "no master found" */ virtual BSONObj findOne(const string &ns, const Query& query, const BSONObj *fieldsToReturn = 0, int queryOptions = 0); - virtual void insert( const string &ns , BSONObj obj ); + virtual void insert( const string &ns , BSONObj obj , int flags=0); /** insert multiple objects. Note that single object insert is asynchronous, so this version is only nominally faster and not worth a special effort to try to use. */ - virtual void insert( const string &ns, const vector< BSONObj >& v ); + virtual void insert( const string &ns, const vector< BSONObj >& v , int flags=0); virtual void remove( const string &ns , Query obj , bool justOne = 0 ); @@ -210,11 +249,14 @@ namespace mongo { // ---- callback pieces ------- - virtual void checkResponse( const char *data, int nReturned ) { checkMaster()->checkResponse( data , nReturned ); } + virtual void say( Message &toSend, bool isRetry = false ); + virtual bool recv( Message &toRecv ); + virtual void checkResponse( const char* data, int nReturned, bool* retry = NULL, string* targetHost = NULL ); /* this is the callback from our underlying connections to notify us that we got a "not master" error. */ void isntMaster(); + /* this is used to indicate we got a "not master or secondary" error from a secondary. */ void isntSecondary(); @@ -225,16 +267,18 @@ namespace mongo { // ----- informational ---- + double getSoTimeout() const { return _so_timeout; } + string toString() { return getServerAddress(); } string getServerAddress() const { return _monitor->getServerAddress(); } virtual ConnectionString::ConnectionType type() const { return ConnectionString::SET; } + virtual bool lazySupported() const { return true; } // ---- low level ------ virtual bool call( Message &toSend, Message &response, bool assertOk=true , string * actualServer = 0 ); - virtual void say( Message &toSend ) { checkMaster()->say( toSend ); } virtual bool callRead( Message& toSend , Message& response ) { return checkMaster()->callRead( toSend , response ); } @@ -258,6 +302,8 @@ namespace mongo { HostAndPort _slaveHost; scoped_ptr<DBClientConnection> _slave; + + double _so_timeout; /** * for storing authentication info @@ -277,6 +323,22 @@ namespace mongo { // this could be a security issue, as the password is stored in memory // not sure if/how we should handle list<AuthInfo> _auths; + + protected: + + /** + * for storing (non-threadsafe) information between lazy calls + */ + class LazyState { + public: + LazyState() : _lastClient( NULL ), _lastOp( -1 ), _slaveOk( false ), _retries( 0 ) {} + DBClientConnection* _lastClient; + int _lastOp; + bool _slaveOk; + int _retries; + + } _lazyState; + }; diff --git a/client/dbclientcursor.cpp b/client/dbclientcursor.cpp index 6c6afc0..5db360e 100644 --- a/client/dbclientcursor.cpp +++ b/client/dbclientcursor.cpp @@ -37,8 +37,7 @@ namespace mongo { return batchSize < nToReturn ? batchSize : nToReturn; } - bool DBClientCursor::init() { - Message toSend; + void DBClientCursor::_assembleInit( Message& toSend ) { if ( !cursorId ) { assembleRequest( ns, query, nextBatchSize() , nToSkip, fieldsToReturn, opts, toSend ); } @@ -50,12 +49,18 @@ namespace mongo { b.appendNum( cursorId ); toSend.setData( dbGetMore, b.buf(), b.len() ); } - if ( !_client->call( toSend, *m, false ) ) { + } + + bool DBClientCursor::init() { + Message toSend; + _assembleInit( toSend ); + + if ( !_client->call( toSend, *b.m, false ) ) { // log msg temp? log() << "DBClientCursor::init call() failed" << endl; return false; } - if ( m->empty() ) { + if ( b.m->empty() ) { // log msg temp? log() << "DBClientCursor::init message from call() was empty" << endl; return false; @@ -63,12 +68,41 @@ namespace mongo { dataReceived(); return true; } + + void DBClientCursor::initLazy( bool isRetry ) { + verify( 15875 , _client->lazySupported() ); + Message toSend; + _assembleInit( toSend ); + _client->say( toSend, isRetry ); + } + + bool DBClientCursor::initLazyFinish( bool& retry ) { + + bool recvd = _client->recv( *b.m ); + + // If we get a bad response, return false + if ( ! recvd || b.m->empty() ) { + + if( !recvd ) + log() << "DBClientCursor::init lazy say() failed" << endl; + if( b.m->empty() ) + log() << "DBClientCursor::init message from say() was empty" << endl; + + _client->checkResponse( NULL, -1, &retry, &_lazyHost ); + + return false; + + } + + dataReceived( retry, _lazyHost ); + return ! retry; + } void DBClientCursor::requestMore() { - assert( cursorId && pos == nReturned ); + assert( cursorId && b.pos == b.nReturned ); if (haveLimit) { - nToReturn -= nReturned; + nToReturn -= b.nReturned; assert(nToReturn > 0); } BufBuilder b; @@ -83,7 +117,7 @@ namespace mongo { if ( _client ) { _client->call( toSend, *response ); - m = response; + this->b.m = response; dataReceived(); } else { @@ -91,7 +125,7 @@ namespace mongo { ScopedDbConnection conn( _scopedHost ); conn->call( toSend , *response ); _client = conn.get(); - m = response; + this->b.m = response; dataReceived(); _client = 0; conn.done(); @@ -100,19 +134,24 @@ namespace mongo { /** with QueryOption_Exhaust, the server just blasts data at us (marked at end with cursorid==0). */ void DBClientCursor::exhaustReceiveMore() { - assert( cursorId && pos == nReturned ); + assert( cursorId && b.pos == b.nReturned ); assert( !haveLimit ); auto_ptr<Message> response(new Message()); assert( _client ); _client->recv(*response); - m = response; + b.m = response; dataReceived(); } - void DBClientCursor::dataReceived() { - QueryResult *qr = (QueryResult *) m->singleData(); + void DBClientCursor::dataReceived( bool& retry, string& host ) { + + QueryResult *qr = (QueryResult *) b.m->singleData(); resultFlags = qr->resultFlags(); + if ( qr->resultFlags() & ResultFlag_ErrSet ) { + wasError = true; + } + if ( qr->resultFlags() & ResultFlag_CursorNotFound ) { // cursor id no longer valid at the server. assert( qr->cursorId == 0 ); @@ -127,11 +166,12 @@ namespace mongo { cursorId = qr->cursorId; } - nReturned = qr->nReturned; - pos = 0; - data = qr->data(); + b.nReturned = qr->nReturned; + b.pos = 0; + b.data = qr->data(); + + _client->checkResponse( b.data, b.nReturned, &retry, &host ); // watches for "not master" - _client->checkResponse( data, nReturned ); /* this assert would fire the way we currently work: assert( nReturned || cursorId == 0 ); */ @@ -144,17 +184,17 @@ namespace mongo { if ( !_putBack.empty() ) return true; - if (haveLimit && pos >= nToReturn) + if (haveLimit && b.pos >= nToReturn) return false; - if ( pos < nReturned ) + if ( b.pos < b.nReturned ) return true; if ( cursorId == 0 ) return false; requestMore(); - return pos < nReturned; + return b.pos < b.nReturned; } BSONObj DBClientCursor::next() { @@ -165,11 +205,11 @@ namespace mongo { return ret; } - uassert(13422, "DBClientCursor next() called but more() is false", pos < nReturned); + uassert(13422, "DBClientCursor next() called but more() is false", b.pos < b.nReturned); - pos++; - BSONObj o(data); - data += o.objsize(); + b.pos++; + BSONObj o(b.data); + b.data += o.objsize(); /* todo would be good to make data null at end of batch for safety */ return o; } @@ -187,9 +227,9 @@ namespace mongo { } */ - int p = pos; - const char *d = data; - while( m && p < nReturned ) { + int p = b.pos; + const char *d = b.data; + while( m && p < b.nReturned ) { BSONObj o(d); d += o.objsize(); p++; @@ -198,6 +238,19 @@ namespace mongo { } } + bool DBClientCursor::peekError(BSONObj* error){ + if( ! wasError ) return false; + + vector<BSONObj> v; + peek(v, 1); + + assert( v.size() == 1 ); + assert( hasErrField( v[0] ) ); + + if( error ) *error = v[0].getOwned(); + return true; + } + void DBClientCursor::attach( AScopedConnection * conn ) { assert( _scopedHost.size() == 0 ); assert( conn ); @@ -205,14 +258,20 @@ namespace mongo { if ( conn->get()->type() == ConnectionString::SET || conn->get()->type() == ConnectionString::SYNC ) { - _scopedHost = _client->getServerAddress(); + if( _lazyHost.size() > 0 ) + _scopedHost = _lazyHost; + else if( _client ) + _scopedHost = _client->getServerAddress(); + else + massert(14821, "No client or lazy client specified, cannot store multi-host connection.", false); } else { _scopedHost = conn->getHost(); } - + conn->done(); _client = 0; + _lazyHost = ""; } DBClientCursor::~DBClientCursor() { @@ -221,12 +280,12 @@ namespace mongo { DESTRUCTOR_GUARD ( - if ( cursorId && _ownCursor ) { - BufBuilder b; - b.appendNum( (int)0 ); // reserved + if ( cursorId && _ownCursor && ! inShutdown() ) { + BufBuilder b; + b.appendNum( (int)0 ); // reserved b.appendNum( (int)1 ); // number b.appendNum( cursorId ); - + Message m; m.setData( dbKillCursors , b.buf() , b.len() ); diff --git a/client/dbclientcursor.h b/client/dbclientcursor.h index d176b89..977bd30 100644 --- a/client/dbclientcursor.h +++ b/client/dbclientcursor.h @@ -18,7 +18,7 @@ #pragma once #include "../pch.h" -#include "../util/message.h" +#include "../util/net/message.h" #include "../db/jsobj.h" #include "../db/json.h" #include <stack> @@ -52,7 +52,7 @@ namespace mongo { if you want to exhaust whatever data has been fetched to the client already but then perhaps stop. */ - int objsLeftInBatch() const { _assertIfNull(); return _putBack.size() + nReturned - pos; } + int objsLeftInBatch() const { _assertIfNull(); return _putBack.size() + b.nReturned - b.pos; } bool moreInCurrentBatch() { return objsLeftInBatch() > 0; } /** next @@ -71,11 +71,11 @@ namespace mongo { /** throws AssertionException if get back { $err : ... } */ BSONObj nextSafe() { BSONObj o = next(); - BSONElement e = o.firstElement(); - if( strcmp(e.fieldName(), "$err") == 0 ) { + if( strcmp(o.firstElementFieldName(), "$err") == 0 ) { + string s = "nextSafe(): " + o.toString(); if( logLevel >= 5 ) - log() << "nextSafe() error " << o.toString() << endl; - uassert(13106, "nextSafe(): " + o.toString(), false); + log() << s << endl; + uasserted(13106, s); } return o; } @@ -86,11 +86,11 @@ namespace mongo { WARNING: no support for _putBack yet! */ void peek(vector<BSONObj>&, int atMost); - BSONObj peekOne(){ - vector<BSONObj> v; - peek( v, 1 ); - return v.size() > 0 ? v[0] : BSONObj(); - } + + /** + * peek ahead and see if an error occurred, and get the error if so. + */ + bool peekError(BSONObj* error = NULL); /** iterate the rest of the cursor and return the number if items @@ -109,13 +109,9 @@ namespace mongo { 'dead' may be preset yet some data still queued and locally available from the dbclientcursor. */ - bool isDead() const { - return !this || cursorId == 0; - } + bool isDead() const { return !this || cursorId == 0; } - bool tailable() const { - return (opts & QueryOption_CursorTailable) != 0; - } + bool tailable() const { return (opts & QueryOption_CursorTailable) != 0; } /** see ResultFlagType (constants.h) for flag values mostly these flags are for internal purposes - @@ -137,12 +133,9 @@ namespace mongo { fieldsToReturn(_fieldsToReturn), opts(queryOptions), batchSize(bs==1?2:bs), - m(new Message()), cursorId(), - nReturned(), - pos(), - data(), - _ownCursor( true ) { + _ownCursor( true ), + wasError( false ) { } DBClientCursor( DBClientBase* client, const string &_ns, long long _cursorId, int _nToReturn, int options ) : @@ -151,11 +144,7 @@ namespace mongo { nToReturn( _nToReturn ), haveLimit( _nToReturn > 0 && !(options & QueryOption_CursorTailable)), opts( options ), - m(new Message()), - cursorId( _cursorId ), - nReturned(), - pos(), - data(), + cursorId(_cursorId), _ownCursor( true ) { } @@ -170,11 +159,31 @@ namespace mongo { void attach( AScopedConnection * conn ); + /** + * actually does the query + */ + bool init(); + + void initLazy( bool isRetry = false ); + bool initLazyFinish( bool& retry ); + + class Batch : boost::noncopyable { + friend class DBClientCursor; + auto_ptr<Message> m; + int nReturned; + int pos; + const char *data; + public: + Batch() : m( new Message() ), nReturned(), pos(), data() { } + }; + private: friend class DBClientBase; friend class DBClientConnection; - bool init(); + int nextBatchSize(); + + Batch b; DBClientBase* _client; string ns; BSONObj query; @@ -184,18 +193,18 @@ namespace mongo { const BSONObj *fieldsToReturn; int opts; int batchSize; - auto_ptr<Message> m; stack< BSONObj > _putBack; int resultFlags; long long cursorId; - int nReturned; - int pos; - const char *data; - void dataReceived(); - void requestMore(); - void exhaustReceiveMore(); // for exhaust bool _ownCursor; // see decouple() string _scopedHost; + string _lazyHost; + bool wasError; + + void dataReceived() { bool retry; string lazyHost; dataReceived( retry, lazyHost ); } + void dataReceived( bool& retry, string& lazyHost ); + void requestMore(); + void exhaustReceiveMore(); // for exhaust // Don't call from a virtual function void _assertIfNull() const { uassert(13348, "connection died", this); } @@ -203,6 +212,9 @@ namespace mongo { // non-copyable , non-assignable DBClientCursor( const DBClientCursor& ); DBClientCursor& operator=( const DBClientCursor& ); + + // init pieces + void _assembleInit( Message& toSend ); }; /** iterate over objects in current batch only - will not cause a network call diff --git a/client/distlock.cpp b/client/distlock.cpp index 9ec98ea..cb71159 100644 --- a/client/distlock.cpp +++ b/client/distlock.cpp @@ -21,8 +21,7 @@ namespace mongo { - static string lockPingNS = "config.lockpings"; - static string locksNS = "config.locks"; + LabeledLevel DistributedLock::logLvl( 1 ); ThreadLocalValue<string> distLockIds(""); @@ -36,7 +35,7 @@ namespace mongo { static void initModule() { // cache process string stringstream ss; - ss << getHostName() << ":" << time(0) << ":" << rand(); + ss << getHostName() << ":" << cmdLine.port << ":" << time(0) << ":" << rand(); _cachedProcessString = new string( ss.str() ); } @@ -59,116 +58,406 @@ namespace mongo { return s; } - void _distLockPingThread( ConnectionString addr ) { - setThreadName( "LockPinger" ); - - log() << "creating dist lock ping thread for: " << addr << endl; - static int loops = 0; - while( ! inShutdown() ) { + class DistributedLockPinger { + public: - string process = getDistLockProcess(); - log(4) << "dist_lock about to ping for: " << process << endl; + DistributedLockPinger() + : _mutex( "DistributedLockPinger" ) { + } - try { - ScopedDbConnection conn( addr ); - - // refresh the entry corresponding to this process in the lockpings collection - conn->update( lockPingNS , - BSON( "_id" << process ) , - BSON( "$set" << BSON( "ping" << DATENOW ) ) , - true ); - string err = conn->getLastError(); - if ( ! err.empty() ) { - warning() << "dist_lock process: " << process << " pinging: " << addr << " failed: " - << err << endl; - conn.done(); - sleepsecs(30); - continue; - } + void _distLockPingThread( ConnectionString addr, string process, unsigned long long sleepTime ) { + + setThreadName( "LockPinger" ); + + string pingId = pingThreadId( addr, process ); + + log( DistributedLock::logLvl - 1 ) << "creating distributed lock ping thread for " << addr + << " and process " << process + << " (sleeping for " << sleepTime << "ms)" << endl; + + static int loops = 0; + while( ! inShutdown() && ! shouldKill( addr, process ) ) { + + log( DistributedLock::logLvl + 2 ) << "distributed lock pinger '" << pingId << "' about to ping." << endl; + + Date_t pingTime; + + try { + ScopedDbConnection conn( addr ); + + pingTime = jsTime(); - // remove really old entries from the lockpings collection if they're not holding a lock - // (this may happen if an instance of a process was taken down and no new instance came up to - // replace it for a quite a while) - // if the lock is taken, the take-over mechanism should handle the situation - auto_ptr<DBClientCursor> c = conn->query( locksNS , BSONObj() ); - vector<string> pids; - while ( c->more() ) { - BSONObj lock = c->next(); - if ( ! lock["process"].eoo() ) { - pids.push_back( lock["process"].valuestrsafe() ); + // refresh the entry corresponding to this process in the lockpings collection + conn->update( DistributedLock::lockPingNS , + BSON( "_id" << process ) , + BSON( "$set" << BSON( "ping" << pingTime ) ) , + true ); + + string err = conn->getLastError(); + if ( ! err.empty() ) { + warning() << "pinging failed for distributed lock pinger '" << pingId << "'." + << causedBy( err ) << endl; + conn.done(); + + // Sleep for normal ping time + sleepmillis(sleepTime); + continue; + } + + // remove really old entries from the lockpings collection if they're not holding a lock + // (this may happen if an instance of a process was taken down and no new instance came up to + // replace it for a quite a while) + // if the lock is taken, the take-over mechanism should handle the situation + auto_ptr<DBClientCursor> c = conn->query( DistributedLock::locksNS , BSONObj() ); + set<string> pids; + while ( c->more() ) { + BSONObj lock = c->next(); + if ( ! lock["process"].eoo() ) { + pids.insert( lock["process"].valuestrsafe() ); + } + } + + Date_t fourDays = pingTime - ( 4 * 86400 * 1000 ); // 4 days + conn->remove( DistributedLock::lockPingNS , BSON( "_id" << BSON( "$nin" << pids ) << "ping" << LT << fourDays ) ); + err = conn->getLastError(); + if ( ! err.empty() ) { + warning() << "ping cleanup for distributed lock pinger '" << pingId << " failed." + << causedBy( err ) << endl; + conn.done(); + + // Sleep for normal ping time + sleepmillis(sleepTime); + continue; + } + + // create index so remove is fast even with a lot of servers + if ( loops++ == 0 ) { + conn->ensureIndex( DistributedLock::lockPingNS , BSON( "ping" << 1 ) ); + } + + log( DistributedLock::logLvl - ( loops % 10 == 0 ? 1 : 0 ) ) << "cluster " << addr << " pinged successfully at " << pingTime + << " by distributed lock pinger '" << pingId + << "', sleeping for " << sleepTime << "ms" << endl; + + // Remove old locks, if possible + // Make sure no one else is adding to this list at the same time + scoped_lock lk( _mutex ); + + int numOldLocks = _oldLockOIDs.size(); + if( numOldLocks > 0 ) + log( DistributedLock::logLvl - 1 ) << "trying to delete " << _oldLockOIDs.size() << " old lock entries for process " << process << endl; + + bool removed = false; + for( list<OID>::iterator i = _oldLockOIDs.begin(); i != _oldLockOIDs.end(); + i = ( removed ? _oldLockOIDs.erase( i ) : ++i ) ) { + removed = false; + try { + // Got OID from lock with id, so we don't need to specify id again + conn->update( DistributedLock::locksNS , + BSON( "ts" << *i ), + BSON( "$set" << BSON( "state" << 0 ) ) ); + + // Either the update went through or it didn't, either way we're done trying to + // unlock + log( DistributedLock::logLvl - 1 ) << "handled late remove of old distributed lock with ts " << *i << endl; + removed = true; + } + catch( UpdateNotTheSame& ) { + log( DistributedLock::logLvl - 1 ) << "partially removed old distributed lock with ts " << *i << endl; + removed = true; + } + catch ( std::exception& e) { + warning() << "could not remove old distributed lock with ts " << *i + << causedBy( e ) << endl; + } + + } + + if( numOldLocks > 0 && _oldLockOIDs.size() > 0 ){ + log( DistributedLock::logLvl - 1 ) << "not all old lock entries could be removed for process " << process << endl; } - } - Date_t fourDays = jsTime() - ( 4 * 86400 * 1000 ); // 4 days - conn->remove( lockPingNS , BSON( "_id" << BSON( "$nin" << pids ) << "ping" << LT << fourDays ) ); - err = conn->getLastError(); - if ( ! err.empty() ) { - warning() << "dist_lock cleanup request from process: " << process << " to: " << addr - << " failed: " << err << endl; conn.done(); - sleepsecs(30); - continue; - } - // create index so remove is fast even with a lot of servers - if ( loops++ == 0 ) { - conn->ensureIndex( lockPingNS , BSON( "ping" << 1 ) ); + } + catch ( std::exception& e ) { + warning() << "distributed lock pinger '" << pingId << "' detected an exception while pinging." + << causedBy( e ) << endl; } - conn.done(); + sleepmillis(sleepTime); + } + + warning() << "removing distributed lock ping thread '" << pingId << "'" << endl; + + + if( shouldKill( addr, process ) ) + finishKill( addr, process ); + + } + + void distLockPingThread( ConnectionString addr, long long clockSkew, string processId, unsigned long long sleepTime ) { + try { + jsTimeVirtualThreadSkew( clockSkew ); + _distLockPingThread( addr, processId, sleepTime ); } catch ( std::exception& e ) { - warning() << "dist_lock exception during ping: " << e.what() << endl; + error() << "unexpected error while running distributed lock pinger for " << addr << ", process " << processId << causedBy( e ) << endl; } + catch ( ... ) { + error() << "unknown error while running distributed lock pinger for " << addr << ", process " << processId << endl; + } + } - log( loops % 10 == 0 ? 0 : 1) << "dist_lock pinged successfully for: " << process << endl; - sleepsecs(30); + string pingThreadId( const ConnectionString& conn, const string& processId ) { + return conn.toString() + "/" + processId; } - } - void distLockPingThread( ConnectionString addr ) { - try { - _distLockPingThread( addr ); + string got( DistributedLock& lock, unsigned long long sleepTime ) { + + // Make sure we don't start multiple threads for a process id + scoped_lock lk( _mutex ); + + const ConnectionString& conn = lock.getRemoteConnection(); + const string& processId = lock.getProcessId(); + string s = pingThreadId( conn, processId ); + + // Ignore if we already have a pinging thread for this process. + if ( _seen.count( s ) > 0 ) return ""; + + // Check our clock skew + try { + if( lock.isRemoteTimeSkewed() ) { + throw LockException( str::stream() << "clock skew of the cluster " << conn.toString() << " is too far out of bounds to allow distributed locking." , 13650 ); + } + } + catch( LockException& e) { + throw LockException( str::stream() << "error checking clock skew of cluster " << conn.toString() << causedBy( e ) , 13651); + } + + boost::thread t( boost::bind( &DistributedLockPinger::distLockPingThread, this, conn, getJSTimeVirtualThreadSkew(), processId, sleepTime) ); + + _seen.insert( s ); + + return s; } - catch ( std::exception& e ) { - error() << "unexpected error in distLockPingThread: " << e.what() << endl; + + void addUnlockOID( const OID& oid ) { + // Modifying the lock from some other thread + scoped_lock lk( _mutex ); + _oldLockOIDs.push_back( oid ); } - catch ( ... ) { - error() << "unexpected unknown error in distLockPingThread" << endl; + + bool willUnlockOID( const OID& oid ) { + scoped_lock lk( _mutex ); + return find( _oldLockOIDs.begin(), _oldLockOIDs.end(), oid ) != _oldLockOIDs.end(); } - } + void kill( const ConnectionString& conn, const string& processId ) { + // Make sure we're in a consistent state before other threads can see us + scoped_lock lk( _mutex ); - class DistributedLockPinger { - public: - DistributedLockPinger() - : _mutex( "DistributedLockPinger" ) { + string pingId = pingThreadId( conn, processId ); + + assert( _seen.count( pingId ) > 0 ); + _kill.insert( pingId ); + + } + + bool shouldKill( const ConnectionString& conn, const string& processId ) { + return _kill.count( pingThreadId( conn, processId ) ) > 0; } - void got( const ConnectionString& conn ) { - string s = conn.toString(); + void finishKill( const ConnectionString& conn, const string& processId ) { + // Make sure we're in a consistent state before other threads can see us scoped_lock lk( _mutex ); - if ( _seen.count( s ) > 0 ) - return; - boost::thread t( boost::bind( &distLockPingThread , conn ) ); - _seen.insert( s ); + + string pingId = pingThreadId( conn, processId ); + + _kill.erase( pingId ); + _seen.erase( pingId ); + } + set<string> _kill; set<string> _seen; mongo::mutex _mutex; + list<OID> _oldLockOIDs; } distLockPinger; - DistributedLock::DistributedLock( const ConnectionString& conn , const string& name , unsigned takeoverMinutes ) - : _conn(conn),_name(name),_takeoverMinutes(takeoverMinutes) { - _id = BSON( "_id" << name ); - _ns = "config.locks"; - distLockPinger.got( conn ); + + const string DistributedLock::lockPingNS = "config.lockpings"; + const string DistributedLock::locksNS = "config.locks"; + + /** + * Create a new distributed lock, potentially with a custom sleep and takeover time. If a custom sleep time is + * specified (time between pings) + */ + DistributedLock::DistributedLock( const ConnectionString& conn , const string& name , unsigned long long lockTimeout, bool asProcess ) + : _conn(conn) , _name(name) , _id( BSON( "_id" << name ) ), _processId( asProcess ? getDistLockId() : getDistLockProcess() ), + _lockTimeout( lockTimeout == 0 ? LOCK_TIMEOUT : lockTimeout ), _maxClockSkew( _lockTimeout / LOCK_SKEW_FACTOR ), _maxNetSkew( _maxClockSkew ), _lockPing( _maxClockSkew ), + _mutex( "DistributedLock" ) + { + log( logLvl - 1 ) << "created new distributed lock for " << name << " on " << conn + << " ( lock timeout : " << _lockTimeout + << ", ping interval : " << _lockPing << ", process : " << asProcess << " )" << endl; + } + + Date_t DistributedLock::getRemoteTime() { + return DistributedLock::remoteTime( _conn, _maxNetSkew ); + } + + bool DistributedLock::isRemoteTimeSkewed() { + return !DistributedLock::checkSkew( _conn, NUM_LOCK_SKEW_CHECKS, _maxClockSkew, _maxNetSkew ); + } + + const ConnectionString& DistributedLock::getRemoteConnection() { + return _conn; + } + + const string& DistributedLock::getProcessId() { + return _processId; + } + + /** + * Returns the remote time as reported by the cluster or server. The maximum difference between the reported time + * and the actual time on the remote server (at the completion of the function) is the maxNetSkew + */ + Date_t DistributedLock::remoteTime( const ConnectionString& cluster, unsigned long long maxNetSkew ) { + + ConnectionString server( *cluster.getServers().begin() ); + ScopedDbConnection conn( server ); + + BSONObj result; + long long delay; + + try { + Date_t then = jsTime(); + bool success = conn->runCommand( string("admin"), BSON( "serverStatus" << 1 ), result ); + delay = jsTime() - then; + + if( !success ) + throw TimeNotFoundException( str::stream() << "could not get status from server " + << server.toString() << " in cluster " << cluster.toString() + << " to check time", 13647 ); + + // Make sure that our delay is not more than 2x our maximum network skew, since this is the max our remote + // time value can be off by if we assume a response in the middle of the delay. + if( delay > (long long) (maxNetSkew * 2) ) + throw TimeNotFoundException( str::stream() << "server " << server.toString() + << " in cluster " << cluster.toString() + << " did not respond within max network delay of " + << maxNetSkew << "ms", 13648 ); + } + catch(...) { + conn.done(); + throw; + } + + conn.done(); + + return result["localTime"].Date() - (delay / 2); + + } + + bool DistributedLock::checkSkew( const ConnectionString& cluster, unsigned skewChecks, unsigned long long maxClockSkew, unsigned long long maxNetSkew ) { + + vector<HostAndPort> servers = cluster.getServers(); + + if(servers.size() < 1) return true; + + vector<long long> avgSkews; + + for(unsigned i = 0; i < skewChecks; i++) { + + // Find the average skew for each server + unsigned s = 0; + for(vector<HostAndPort>::iterator si = servers.begin(); si != servers.end(); ++si,s++) { + + if(i == 0) avgSkews.push_back(0); + + // Could check if this is self, but shouldn't matter since local network connection should be fast. + ConnectionString server( *si ); + + vector<long long> skew; + + BSONObj result; + + Date_t remote = remoteTime( server, maxNetSkew ); + Date_t local = jsTime(); + + // Remote time can be delayed by at most MAX_NET_SKEW + + // Skew is how much time we'd have to add to local to get to remote + avgSkews[s] += (long long) (remote - local); + + log( logLvl + 1 ) << "skew from remote server " << server << " found: " << (long long) (remote - local) << endl; + + } + } + + // Analyze skews + + long long serverMaxSkew = 0; + long long serverMinSkew = 0; + + for(unsigned s = 0; s < avgSkews.size(); s++) { + + long long avgSkew = (avgSkews[s] /= skewChecks); + + // Keep track of max and min skews + if(s == 0) { + serverMaxSkew = avgSkew; + serverMinSkew = avgSkew; + } + else { + if(avgSkew > serverMaxSkew) + serverMaxSkew = avgSkew; + if(avgSkew < serverMinSkew) + serverMinSkew = avgSkew; + } + + } + + long long totalSkew = serverMaxSkew - serverMinSkew; + + // Make sure our max skew is not more than our pre-set limit + if(totalSkew > (long long) maxClockSkew) { + log( logLvl + 1 ) << "total clock skew of " << totalSkew << "ms for servers " << cluster << " is out of " << maxClockSkew << "ms bounds." << endl; + return false; + } + + log( logLvl + 1 ) << "total clock skew of " << totalSkew << "ms for servers " << cluster << " is in " << maxClockSkew << "ms bounds." << endl; + return true; + } + + // For use in testing, ping thread should run indefinitely in practice. + bool DistributedLock::killPinger( DistributedLock& lock ) { + if( lock._threadId == "") return false; + + distLockPinger.kill( lock._conn, lock._processId ); + return true; } + // Semantics of this method are basically that if the lock cannot be acquired, returns false, can be retried. + // If the lock should not be tried again (some unexpected error) a LockException is thrown. + // If we are only trying to re-enter a currently held lock, reenter should be true. + // Note: reenter doesn't actually make this lock re-entrant in the normal sense, since it can still only + // be unlocked once, instead it is used to verify that the lock is already held. + bool DistributedLock::lock_try( const string& why , bool reenter, BSONObj * other ) { + + // TODO: Start pinging only when we actually get the lock? + // If we don't have a thread pinger, make sure we shouldn't have one + if( _threadId == "" ){ + scoped_lock lk( _mutex ); + _threadId = distLockPinger.got( *this, _lockPing ); + } + + // This should always be true, if not, we are using the lock incorrectly. + assert( _name != "" ); - bool DistributedLock::lock_try( string why , BSONObj * other ) { // write to dummy if 'other' is null BSONObj dummyOther; if ( other == NULL ) @@ -182,93 +471,240 @@ namespace mongo { { // make sure its there so we can use simple update logic below - BSONObj o = conn->findOne( _ns , _id ).getOwned(); + BSONObj o = conn->findOne( locksNS , _id ).getOwned(); + + // Case 1: No locks if ( o.isEmpty() ) { try { - log(4) << "dist_lock inserting initial doc in " << _ns << " for lock " << _name << endl; - conn->insert( _ns , BSON( "_id" << _name << "state" << 0 << "who" << "" ) ); + log( logLvl ) << "inserting initial doc in " << locksNS << " for lock " << _name << endl; + conn->insert( locksNS , BSON( "_id" << _name << "state" << 0 << "who" << "" ) ); } catch ( UserException& e ) { - log() << "dist_lock could not insert initial doc: " << e << endl; + warning() << "could not insert initial doc for distributed lock " << _name << causedBy( e ) << endl; } } - + + // Case 2: A set lock that we might be able to force else if ( o["state"].numberInt() > 0 ) { + + string lockName = o["_id"].String() + string("/") + o["process"].String(); + + bool canReenter = reenter && o["process"].String() == _processId && ! distLockPinger.willUnlockOID( o["ts"].OID() ) && o["state"].numberInt() == 2; + if( reenter && ! canReenter ) { + log( logLvl - 1 ) << "not re-entering distributed lock " << lockName; + if( o["process"].String() != _processId ) log( logLvl - 1 ) << ", different process " << _processId << endl; + else if( o["state"].numberInt() == 2 ) log( logLvl - 1 ) << ", state not finalized" << endl; + else log( logLvl - 1 ) << ", ts " << o["ts"].OID() << " scheduled for late unlock" << endl; + + // reset since we've been bounced by a previous lock not being where we thought it was, + // and should go through full forcing process if required. + // (in theory we should never see a ping here if used correctly) + *other = o; other->getOwned(); conn.done(); resetLastPing(); + return false; + } + BSONObj lastPing = conn->findOne( lockPingNS , o["process"].wrap( "_id" ) ); if ( lastPing.isEmpty() ) { - // if a lock is taken but there's no ping for it, we're in an inconsistent situation - // if the lock holder (mongos or d) does not exist anymore, the lock could safely be removed - // but we'd require analysis of the situation before a manual intervention - error() << "config.locks: " << _name << " lock is taken by old process? " - << "remove the following lock if the process is not active anymore: " << o << endl; - *other = o; - conn.done(); - return false; + log( logLvl ) << "empty ping found for process in lock '" << lockName << "'" << endl; + // TODO: Using 0 as a "no time found" value Will fail if dates roll over, but then, so will a lot. + lastPing = BSON( "_id" << o["process"].String() << "ping" << (Date_t) 0 ); } - unsigned long long now = jsTime(); - unsigned long long pingTime = lastPing["ping"].Date(); - - if ( now < pingTime ) { - // clock skew - warning() << "dist_lock has detected clock skew of " << ( pingTime - now ) << "ms" << endl; - *other = o; - conn.done(); - return false; + unsigned long long elapsed = 0; + unsigned long long takeover = _lockTimeout; + + log( logLvl ) << "checking last ping for lock '" << lockName << "'" << " against process " << _lastPingCheck.get<0>() << " and ping " << _lastPingCheck.get<1>() << endl; + + try { + + Date_t remote = remoteTime( _conn ); + + // Timeout the elapsed time using comparisons of remote clock + // For non-finalized locks, timeout 15 minutes since last seen (ts) + // For finalized locks, timeout 15 minutes since last ping + bool recPingChange = o["state"].numberInt() == 2 && ( _lastPingCheck.get<0>() != lastPing["_id"].String() || _lastPingCheck.get<1>() != lastPing["ping"].Date() ); + bool recTSChange = _lastPingCheck.get<3>() != o["ts"].OID(); + + if( recPingChange || recTSChange ) { + // If the ping has changed since we last checked, mark the current date and time + scoped_lock lk( _mutex ); + _lastPingCheck = boost::tuple<string, Date_t, Date_t, OID>( lastPing["_id"].String().c_str(), lastPing["ping"].Date(), remote, o["ts"].OID() ); + } + else { + + // GOTCHA! Due to network issues, it is possible that the current time + // is less than the remote time. We *have* to check this here, otherwise + // we overflow and our lock breaks. + if(_lastPingCheck.get<2>() >= remote) + elapsed = 0; + else + elapsed = remote - _lastPingCheck.get<2>(); + } + } - - unsigned long long elapsed = now - pingTime; - elapsed = elapsed / ( 1000 * 60 ); // convert to minutes - - if ( elapsed > ( 60 * 24 * 365 * 100 ) /* 100 years */ ) { - warning() << "distlock elapsed time seems impossible: " << lastPing << endl; + catch( LockException& e ) { + + // Remote server cannot be found / is not responsive + warning() << "Could not get remote time from " << _conn << causedBy( e ); + // If our config server is having issues, forget all the pings until we can see it again + resetLastPing(); + } - - if ( elapsed <= _takeoverMinutes ) { - log(1) << "dist_lock lock failed because taken by: " << o << " elapsed minutes: " << elapsed << endl; - *other = o; - conn.done(); + + if ( elapsed <= takeover && ! canReenter ) { + log( logLvl ) << "could not force lock '" << lockName << "' because elapsed time " << elapsed << " <= takeover time " << takeover << endl; + *other = o; other->getOwned(); conn.done(); return false; } - - log() << "dist_lock forcefully taking over from: " << o << " elapsed minutes: " << elapsed << endl; - conn->update( _ns , _id , BSON( "$set" << BSON( "state" << 0 ) ) ); - string err = conn->getLastError(); - if ( ! err.empty() ) { - warning() << "dist_lock take over from: " << o << " failed: " << err << endl; - *other = o.getOwned(); - other->getOwned(); - conn.done(); + else if( elapsed > takeover && canReenter ) { + log( logLvl - 1 ) << "not re-entering distributed lock " << lockName << "' because elapsed time " << elapsed << " > takeover time " << takeover << endl; + *other = o; other->getOwned(); conn.done(); return false; } + log( logLvl - 1 ) << ( canReenter ? "re-entering" : "forcing" ) << " lock '" << lockName << "' because " + << ( canReenter ? "re-entering is allowed, " : "" ) + << "elapsed time " << elapsed << " > takeover time " << takeover << endl; + + if( elapsed > takeover ) { + + // Lock may forced, reset our timer if succeeds or fails + // Ensures that another timeout must happen if something borks up here, and resets our pristine + // ping state if acquired. + resetLastPing(); + + try { + + // Check the clock skew again. If we check this before we get a lock + // and after the lock times out, we can be pretty sure the time is + // increasing at the same rate on all servers and therefore our + // timeout is accurate + uassert( 14023, str::stream() << "remote time in cluster " << _conn.toString() << " is now skewed, cannot force lock.", !isRemoteTimeSkewed() ); + + // Make sure we break the lock with the correct "ts" (OID) value, otherwise + // we can overwrite a new lock inserted in the meantime. + conn->update( locksNS , BSON( "_id" << _id["_id"].String() << "state" << o["state"].numberInt() << "ts" << o["ts"] ), + BSON( "$set" << BSON( "state" << 0 ) ) ); + + BSONObj err = conn->getLastErrorDetailed(); + string errMsg = DBClientWithCommands::getLastErrorString(err); + + // TODO: Clean up all the extra code to exit this method, probably with a refactor + if ( !errMsg.empty() || !err["n"].type() || err["n"].numberInt() < 1 ) { + ( errMsg.empty() ? log( logLvl - 1 ) : warning() ) << "Could not force lock '" << lockName << "' " + << ( !errMsg.empty() ? causedBy(errMsg) : string("(another force won)") ) << endl; + *other = o; other->getOwned(); conn.done(); + return false; + } + + } + catch( UpdateNotTheSame& ) { + // Ok to continue since we know we forced at least one lock document, and all lock docs + // are required for a lock to be held. + warning() << "lock forcing " << lockName << " inconsistent" << endl; + } + catch( std::exception& e ) { + conn.done(); + throw LockException( str::stream() << "exception forcing distributed lock " + << lockName << causedBy( e ), 13660); + } + + } + else { + + assert( canReenter ); + + // Lock may be re-entered, reset our timer if succeeds or fails + // Not strictly necessary, but helpful for small timeouts where thread scheduling is significant. + // This ensures that two attempts are still required for a force if not acquired, and resets our + // state if we are acquired. + resetLastPing(); + + // Test that the lock is held by trying to update the finalized state of the lock to the same state + // if it does not update or does not update on all servers, we can't re-enter. + try { + + // Test the lock with the correct "ts" (OID) value + conn->update( locksNS , BSON( "_id" << _id["_id"].String() << "state" << 2 << "ts" << o["ts"] ), + BSON( "$set" << BSON( "state" << 2 ) ) ); + + BSONObj err = conn->getLastErrorDetailed(); + string errMsg = DBClientWithCommands::getLastErrorString(err); + + // TODO: Clean up all the extra code to exit this method, probably with a refactor + if ( ! errMsg.empty() || ! err["n"].type() || err["n"].numberInt() < 1 ) { + ( errMsg.empty() ? log( logLvl - 1 ) : warning() ) << "Could not re-enter lock '" << lockName << "' " + << ( !errMsg.empty() ? causedBy(errMsg) : string("(not sure lock is held)") ) + << " gle: " << err + << endl; + *other = o; other->getOwned(); conn.done(); + return false; + } + + } + catch( UpdateNotTheSame& ) { + // NOT ok to continue since our lock isn't held by all servers, so isn't valid. + warning() << "inconsistent state re-entering lock, lock " << lockName << " not held" << endl; + *other = o; other->getOwned(); conn.done(); + return false; + } + catch( std::exception& e ) { + conn.done(); + throw LockException( str::stream() << "exception re-entering distributed lock " + << lockName << causedBy( e ), 13660); + } + + log( logLvl - 1 ) << "re-entered distributed lock '" << lockName << "'" << endl; + *other = o; other->getOwned(); conn.done(); + return true; + + } + + log( logLvl - 1 ) << "lock '" << lockName << "' successfully forced" << endl; + + // We don't need the ts value in the query, since we will only ever replace locks with state=0. } + // Case 3: We have an expired lock else if ( o["ts"].type() ) { queryBuilder.append( o["ts"] ); } } - OID ts; - ts.init(); + // Always reset our ping if we're trying to get a lock, since getting a lock implies the lock state is open + // and no locks need to be forced. If anything goes wrong, we don't want to remember an old lock. + resetLastPing(); bool gotLock = false; - BSONObj now; + BSONObj currLock; - BSONObj lockDetails = BSON( "state" << 1 << "who" << getDistLockId() << "process" << getDistLockProcess() << - "when" << DATENOW << "why" << why << "ts" << ts ); + BSONObj lockDetails = BSON( "state" << 1 << "who" << getDistLockId() << "process" << _processId << + "when" << jsTime() << "why" << why << "ts" << OID::gen() ); BSONObj whatIWant = BSON( "$set" << lockDetails ); + + BSONObj query = queryBuilder.obj(); + + string lockName = _name + string("/") + _processId; + try { - log(4) << "dist_lock about to aquire lock: " << lockDetails << endl; - conn->update( _ns , queryBuilder.obj() , whatIWant ); + // Main codepath to acquire lock + + log( logLvl ) << "about to acquire distributed lock '" << lockName << ":\n" + << lockDetails.jsonString(Strict, true) << "\n" + << query.jsonString(Strict, true) << endl; + + conn->update( locksNS , query , whatIWant ); + + BSONObj err = conn->getLastErrorDetailed(); + string errMsg = DBClientWithCommands::getLastErrorString(err); - BSONObj o = conn->getLastErrorDetailed(); - now = conn->findOne( _ns , _id ); + currLock = conn->findOne( locksNS , _id ); - if ( o["n"].numberInt() == 0 ) { - *other = now; + if ( !errMsg.empty() || !err["n"].type() || err["n"].numberInt() < 1 ) { + ( errMsg.empty() ? log( logLvl - 1 ) : warning() ) << "could not acquire lock '" << lockName << "' " + << ( !errMsg.empty() ? causedBy( errMsg ) : string("(another update won)") ) << endl; + *other = currLock; other->getOwned(); - log() << "dist_lock error trying to aquire lock: " << lockDetails << " error: " << o << endl; gotLock = false; } else { @@ -277,63 +713,234 @@ namespace mongo { } catch ( UpdateNotTheSame& up ) { + // this means our update got through on some, but not others - log(4) << "dist_lock lock did not propagate properly" << endl; + warning() << "distributed lock '" << lockName << " did not propagate properly." << causedBy( up ) << endl; + + // Overall protection derives from: + // All unlocking updates use the ts value when setting state to 0 + // This ensures that during locking, we can override all smaller ts locks with + // our own safe ts value and not be unlocked afterward. + for ( unsigned i = 0; i < up.size(); i++ ) { + + ScopedDbConnection indDB( up[i].first ); + BSONObj indUpdate; + + try { + + indUpdate = indDB->findOne( locksNS , _id ); + + // If we override this lock in any way, grab and protect it. + // We assume/ensure that if a process does not have all lock documents, it is no longer + // holding the lock. + // Note - finalized locks may compete too, but we know they've won already if competing + // in this round. Cleanup of crashes during finalizing may take a few tries. + if( indUpdate["ts"] < lockDetails["ts"] || indUpdate["state"].numberInt() == 0 ) { + + BSONObj grabQuery = BSON( "_id" << _id["_id"].String() << "ts" << indUpdate["ts"].OID() ); + + // Change ts so we won't be forced, state so we won't be relocked + BSONObj grabChanges = BSON( "ts" << lockDetails["ts"].OID() << "state" << 1 ); + + // Either our update will succeed, and we'll grab the lock, or it will fail b/c some other + // process grabbed the lock (which will change the ts), but the lock will be set until forcing + indDB->update( locksNS, grabQuery, BSON( "$set" << grabChanges ) ); + + indUpdate = indDB->findOne( locksNS, _id ); + + // Our lock should now be set until forcing. + assert( indUpdate["state"].numberInt() == 1 ); + + } + // else our lock is the same, in which case we're safe, or it's a bigger lock, + // in which case we won't need to protect anything since we won't have the lock. + + } + catch( std::exception& e ) { + conn.done(); + throw LockException( str::stream() << "distributed lock " << lockName + << " had errors communicating with individual server " + << up[1].first << causedBy( e ), 13661 ); + } - for ( unsigned i=0; i<up.size(); i++ ) { - ScopedDbConnection temp( up[i].first ); - BSONObj temp2 = temp->findOne( _ns , _id ); + assert( !indUpdate.isEmpty() ); - if ( now.isEmpty() || now["ts"] < temp2["ts"] ) { - now = temp2.getOwned(); + // Find max TS value + if ( currLock.isEmpty() || currLock["ts"] < indUpdate["ts"] ) { + currLock = indUpdate.getOwned(); } - temp.done(); + indDB.done(); + } - if ( now["ts"].OID() == ts ) { - log(4) << "dist_lock completed lock propagation" << endl; + // Locks on all servers are now set and safe until forcing + + if ( currLock["ts"] == lockDetails["ts"] ) { + log( logLvl - 1 ) << "lock update won, completing lock propagation for '" << lockName << "'" << endl; gotLock = true; - conn->update( _ns , _id , whatIWant ); } else { - log() << "dist_lock error trying to complete propagation" << endl; + log( logLvl - 1 ) << "lock update lost, lock '" << lockName << "' not propagated." << endl; + + // Register the lock for deletion, to speed up failover + // Not strictly necessary, but helpful + distLockPinger.addUnlockOID( lockDetails["ts"].OID() ); + gotLock = false; } } + catch( std::exception& e ) { + conn.done(); + throw LockException( str::stream() << "exception creating distributed lock " + << lockName << causedBy( e ), 13663 ); + } - conn.done(); + // Complete lock propagation + if( gotLock ) { + + // This is now safe, since we know that no new locks will be placed on top of the ones we've checked for at + // least 15 minutes. Sets the state = 2, so that future clients can determine that the lock is truly set. + // The invariant for rollbacks is that we will never force locks with state = 2 and active pings, since that + // indicates the lock is active, but this means the process creating/destroying them must explicitly poll + // when something goes wrong. + try { + + BSONObjBuilder finalLockDetails; + BSONObjIterator bi( lockDetails ); + while( bi.more() ) { + BSONElement el = bi.next(); + if( (string) ( el.fieldName() ) == "state" ) + finalLockDetails.append( "state", 2 ); + else finalLockDetails.append( el ); + } + + conn->update( locksNS , _id , BSON( "$set" << finalLockDetails.obj() ) ); + + BSONObj err = conn->getLastErrorDetailed(); + string errMsg = DBClientWithCommands::getLastErrorString(err); + + currLock = conn->findOne( locksNS , _id ); - log(2) << "dist_lock lock gotLock: " << gotLock << " now: " << now << endl; + if ( !errMsg.empty() || !err["n"].type() || err["n"].numberInt() < 1 ) { + warning() << "could not finalize winning lock " << lockName + << ( !errMsg.empty() ? causedBy( errMsg ) : " (did not update lock) " ) << endl; + gotLock = false; + } + else { + // SUCCESS! + gotLock = true; + } + + } + catch( std::exception& e ) { + conn.done(); + + // Register the bad final lock for deletion, in case it exists + distLockPinger.addUnlockOID( lockDetails["ts"].OID() ); + + throw LockException( str::stream() << "exception finalizing winning lock" + << causedBy( e ), 13662 ); + } + + } + + *other = currLock; + other->getOwned(); + + // Log our lock results + if(gotLock) + log( logLvl - 1 ) << "distributed lock '" << lockName << "' acquired, ts : " << currLock["ts"].OID() << endl; + else + log( logLvl - 1 ) << "distributed lock '" << lockName << "' was not acquired." << endl; + + conn.done(); return gotLock; } - void DistributedLock::unlock() { + // Unlock now takes an optional pointer to the lock, so you can be specific about which + // particular lock you want to unlock. This is required when the config server is down, + // and so cannot tell you what lock ts you should try later. + void DistributedLock::unlock( BSONObj* oldLockPtr ) { + + assert( _name != "" ); + + string lockName = _name + string("/") + _processId; + const int maxAttempts = 3; int attempted = 0; + + BSONObj oldLock; + if( oldLockPtr ) oldLock = *oldLockPtr; + while ( ++attempted <= maxAttempts ) { + ScopedDbConnection conn( _conn ); + try { - ScopedDbConnection conn( _conn ); - conn->update( _ns , _id, BSON( "$set" << BSON( "state" << 0 ) ) ); - log(2) << "dist_lock unlock: " << conn->findOne( _ns , _id ) << endl; - conn.done(); - return; + if( oldLock.isEmpty() ) + oldLock = conn->findOne( locksNS, _id ); + + if( oldLock["state"].eoo() || oldLock["state"].numberInt() != 2 || oldLock["ts"].eoo() ) { + warning() << "cannot unlock invalid distributed lock " << oldLock << endl; + conn.done(); + break; + } + // Use ts when updating lock, so that new locks can be sure they won't get trampled. + conn->update( locksNS , + BSON( "_id" << _id["_id"].String() << "ts" << oldLock["ts"].OID() ), + BSON( "$set" << BSON( "state" << 0 ) ) ); + // Check that the lock was actually unlocked... if not, try again + BSONObj err = conn->getLastErrorDetailed(); + string errMsg = DBClientWithCommands::getLastErrorString(err); + + if ( !errMsg.empty() || !err["n"].type() || err["n"].numberInt() < 1 ){ + warning() << "distributed lock unlock update failed, retrying " + << ( errMsg.empty() ? causedBy( "( update not registered )" ) : causedBy( errMsg ) ) << endl; + conn.done(); + continue; + } + + log( logLvl - 1 ) << "distributed lock '" << lockName << "' unlocked. " << endl; + conn.done(); + return; + } + catch( UpdateNotTheSame& ) { + log( logLvl - 1 ) << "distributed lock '" << lockName << "' unlocked (messily). " << endl; + conn.done(); + break; } catch ( std::exception& e) { - log( LL_WARNING ) << "dist_lock " << _name << " failed to contact config server in unlock attempt " - << attempted << ": " << e.what() << endl; + warning() << "distributed lock '" << lockName << "' failed unlock attempt." + << causedBy( e ) << endl; - sleepsecs(1 << attempted); + conn.done(); + // TODO: If our lock timeout is small, sleeping this long may be unsafe. + if( attempted != maxAttempts) sleepsecs(1 << attempted); } } - log( LL_WARNING ) << "dist_lock couldn't consumate unlock request. " << "Lock " << _name - << " will be taken over after " << _takeoverMinutes << " minutes timeout" << endl; + if( attempted > maxAttempts && ! oldLock.isEmpty() && ! oldLock["ts"].eoo() ) { + + log( logLvl - 1 ) << "could not unlock distributed lock with ts " << oldLock["ts"].OID() + << ", will attempt again later" << endl; + + // We couldn't unlock the lock at all, so try again later in the pinging thread... + distLockPinger.addUnlockOID( oldLock["ts"].OID() ); + } + else if( attempted > maxAttempts ) { + warning() << "could not unlock untracked distributed lock, a manual force may be required" << endl; + } + + warning() << "distributed lock '" << lockName << "' couldn't consummate unlock request. " + << "lock may be taken over after " << ( _lockTimeout / (60 * 1000) ) + << " minutes timeout." << endl; } + + } diff --git a/client/distlock.h b/client/distlock.h index 753a241..8985672 100644 --- a/client/distlock.h +++ b/client/distlock.h @@ -23,9 +23,42 @@ #include "redef_macros.h" #include "syncclusterconnection.h" +#define LOCK_TIMEOUT (15 * 60 * 1000) +#define LOCK_SKEW_FACTOR (30) +#define LOCK_PING (LOCK_TIMEOUT / LOCK_SKEW_FACTOR) +#define MAX_LOCK_NET_SKEW (LOCK_TIMEOUT / LOCK_SKEW_FACTOR) +#define MAX_LOCK_CLOCK_SKEW (LOCK_TIMEOUT / LOCK_SKEW_FACTOR) +#define NUM_LOCK_SKEW_CHECKS (3) + +// The maximum clock skew we need to handle between config servers is +// 2 * MAX_LOCK_NET_SKEW + MAX_LOCK_CLOCK_SKEW. + +// Net effect of *this* clock being slow is effectively a multiplier on the max net skew +// and a linear increase or decrease of the max clock skew. + namespace mongo { /** + * Exception class to encapsulate exceptions while managing distributed locks + */ + class LockException : public DBException { + public: + LockException( const char * msg , int code ) : DBException( msg, code ) {} + LockException( const string& msg, int code ) : DBException( msg, code ) {} + virtual ~LockException() throw() { } + }; + + /** + * Indicates an error in retrieving time values from remote servers. + */ + class TimeNotFoundException : public LockException { + public: + TimeNotFoundException( const char * msg , int code ) : LockException( msg, code ) {} + TimeNotFoundException( const string& msg, int code ) : LockException( msg, code ) {} + virtual ~TimeNotFoundException() throw() { } + }; + + /** * The distributed lock is a configdb backed way of synchronizing system-wide tasks. A task must be identified by a * unique name across the system (e.g., "balancer"). A lock is taken by writing a document in the configdb's locks * collection with that name. @@ -36,53 +69,155 @@ namespace mongo { class DistributedLock { public: + static LabeledLevel logLvl; + /** * The constructor does not connect to the configdb yet and constructing does not mean the lock was acquired. * Construction does trigger a lock "pinging" mechanism, though. * * @param conn address of config(s) server(s) * @param name identifier for the lock - * @param takeoverMinutes how long can the log go "unpinged" before a new attempt to lock steals it (in minutes) + * @param lockTimeout how long can the log go "unpinged" before a new attempt to lock steals it (in minutes). + * @param lockPing how long to wait between lock pings + * @param legacy use legacy logic + * */ - DistributedLock( const ConnectionString& conn , const string& name , unsigned takeoverMinutes = 15 ); + DistributedLock( const ConnectionString& conn , const string& name , unsigned long long lockTimeout = 0, bool asProcess = false ); + ~DistributedLock(){}; /** - * Attempts to aquire 'this' lock, checking if it could or should be stolen from the previous holder. Please + * Attempts to acquire 'this' lock, checking if it could or should be stolen from the previous holder. Please * consider using the dist_lock_try construct to acquire this lock in an exception safe way. * * @param why human readable description of why the lock is being taken (used to log) - * @param other configdb's lock document that is currently holding the lock, if lock is taken + * @param whether this is a lock re-entry or a new lock + * @param other configdb's lock document that is currently holding the lock, if lock is taken, or our own lock + * details if not * @return true if it managed to grab the lock */ - bool lock_try( string why , BSONObj * other = 0 ); + bool lock_try( const string& why , bool reenter = false, BSONObj * other = 0 ); /** * Releases a previously taken lock. */ - void unlock(); + void unlock( BSONObj* oldLockPtr = NULL ); + + Date_t getRemoteTime(); + + bool isRemoteTimeSkewed(); + + const string& getProcessId(); + + const ConnectionString& getRemoteConnection(); + + /** + * Check the skew between a cluster of servers + */ + static bool checkSkew( const ConnectionString& cluster, unsigned skewChecks = NUM_LOCK_SKEW_CHECKS, unsigned long long maxClockSkew = MAX_LOCK_CLOCK_SKEW, unsigned long long maxNetSkew = MAX_LOCK_NET_SKEW ); + + /** + * Get the remote time from a server or cluster + */ + static Date_t remoteTime( const ConnectionString& cluster, unsigned long long maxNetSkew = MAX_LOCK_NET_SKEW ); + + static bool killPinger( DistributedLock& lock ); + + /** + * Namespace for lock pings + */ + static const string lockPingNS; + + /** + * Namespace for locks + */ + static const string locksNS; + + const ConnectionString _conn; + const string _name; + const BSONObj _id; + const string _processId; + + // Timeout for lock, usually LOCK_TIMEOUT + const unsigned long long _lockTimeout; + const unsigned long long _maxClockSkew; + const unsigned long long _maxNetSkew; + const unsigned long long _lockPing; private: - ConnectionString _conn; - string _name; - unsigned _takeoverMinutes; - string _ns; - BSONObj _id; + void resetLastPing(){ + scoped_lock lk( _mutex ); + _lastPingCheck = boost::tuple<string, Date_t, Date_t, OID>(); + } + + mongo::mutex _mutex; + + // Data from last check of process with ping time + boost::tuple<string, Date_t, Date_t, OID> _lastPingCheck; + // May or may not exist, depending on startup + string _threadId; + }; class dist_lock_try { public: + + dist_lock_try() : _lock(NULL), _got(false) {} + + dist_lock_try( const dist_lock_try& that ) : _lock(that._lock), _got(that._got), _other(that._other) { + _other.getOwned(); + + // Make sure the lock ownership passes to this object, + // so we only unlock once. + ((dist_lock_try&) that)._got = false; + ((dist_lock_try&) that)._lock = NULL; + ((dist_lock_try&) that)._other = BSONObj(); + } + + // Needed so we can handle lock exceptions in context of lock try. + dist_lock_try& operator=( const dist_lock_try& that ){ + + if( this == &that ) return *this; + + _lock = that._lock; + _got = that._got; + _other = that._other; + _other.getOwned(); + _why = that._why; + + // Make sure the lock ownership passes to this object, + // so we only unlock once. + ((dist_lock_try&) that)._got = false; + ((dist_lock_try&) that)._lock = NULL; + ((dist_lock_try&) that)._other = BSONObj(); + + return *this; + } + dist_lock_try( DistributedLock * lock , string why ) - : _lock(lock) { - _got = _lock->lock_try( why , &_other ); + : _lock(lock), _why(why) { + _got = _lock->lock_try( why , false , &_other ); } ~dist_lock_try() { if ( _got ) { - _lock->unlock(); + assert( ! _other.isEmpty() ); + _lock->unlock( &_other ); } } + bool reestablish(){ + return retry(); + } + + bool retry() { + assert( _lock ); + assert( _got ); + assert( ! _other.isEmpty() ); + + return _got = _lock->lock_try( _why , true, &_other ); + } + bool got() const { return _got; } BSONObj other() const { return _other; } @@ -90,6 +225,7 @@ namespace mongo { DistributedLock * _lock; bool _got; BSONObj _other; + string _why; }; } diff --git a/client/distlock_test.cpp b/client/distlock_test.cpp index 83d143f..42a1c48 100644 --- a/client/distlock_test.cpp +++ b/client/distlock_test.cpp @@ -15,85 +15,123 @@ * limitations under the License. */ +#include <iostream> #include "../pch.h" #include "dbclient.h" #include "distlock.h" #include "../db/commands.h" +#include "../util/bson_util.h" + +// Modify some config options for the RNG, since they cause MSVC to fail +#include <boost/config.hpp> + +#if defined(BOOST_MSVC) && defined(BOOST_NO_MEMBER_TEMPLATE_FRIENDS) +#undef BOOST_NO_MEMBER_TEMPLATE_FRIENDS +#define BOOST_RNG_HACK +#endif + +// Well, sort-of cross-platform RNG +#include <boost/random/mersenne_twister.hpp> + +#ifdef BOOST_RNG_HACK +#define BOOST_NO_MEMBER_TEMPLATE_FRIENDS +#undef BOOST_RNG_HACK +#endif + + +#include <boost/random/uniform_int.hpp> +#include <boost/random/variate_generator.hpp> + + +// TODO: Make a method in BSONObj if useful, don't modify for now +#define string_field(obj, name, def) ( obj.hasField(name) ? obj[name].String() : def ) +#define number_field(obj, name, def) ( obj.hasField(name) ? obj[name].Number() : def ) namespace mongo { - class TestDistLockWithSync : public Command { + class TestDistLockWithSync: public Command { public: - TestDistLockWithSync() : Command( "_testDistLockWithSyncCluster" ) {} - virtual void help( stringstream& help ) const { + TestDistLockWithSync() : + Command("_testDistLockWithSyncCluster") { + } + virtual void help(stringstream& help) const { help << "should not be calling this directly" << endl; } - virtual bool slaveOk() const { return false; } - virtual bool adminOnly() const { return true; } - virtual LockType locktype() const { return NONE; } + virtual bool slaveOk() const { + return false; + } + virtual bool adminOnly() const { + return true; + } + virtual LockType locktype() const { + return NONE; + } static void runThread() { - while ( keepGoing ) { - if ( current->lock_try( "test" ) ) { + while (keepGoing) { + if (current->lock_try( "test" )) { count++; int before = count; - sleepmillis( 3 ); + sleepmillis(3); int after = count; - - if ( after != before ) { - error() << " before: " << before << " after: " << after << endl; + + if (after != before) { + error() << " before: " << before << " after: " << after + << endl; } - + current->unlock(); } } } - - bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool) { + + bool run(const string&, BSONObj& cmdObj, int, string& errmsg, + BSONObjBuilder& result, bool) { Timer t; - DistributedLock lk( ConnectionString( cmdObj["host"].String() , ConnectionString::SYNC ), "testdistlockwithsync" ); + DistributedLock lk(ConnectionString(cmdObj["host"].String(), + ConnectionString::SYNC), "testdistlockwithsync", 0, 0); current = &lk; count = 0; gotit = 0; errors = 0; keepGoing = true; - + vector<shared_ptr<boost::thread> > l; - for ( int i=0; i<4; i++ ) { - l.push_back( shared_ptr<boost::thread>( new boost::thread( runThread ) ) ); + for (int i = 0; i < 4; i++) { + l.push_back( + shared_ptr<boost::thread> (new boost::thread(runThread))); } - + int secs = 10; - if ( cmdObj["secs"].isNumber() ) + if (cmdObj["secs"].isNumber()) secs = cmdObj["secs"].numberInt(); - sleepsecs( secs ); + sleepsecs(secs); keepGoing = false; - for ( unsigned i=0; i<l.size(); i++ ) + for (unsigned i = 0; i < l.size(); i++) l[i]->join(); current = 0; - result.append( "count" , count ); - result.append( "gotit" , gotit ); - result.append( "errors" , errors ); - result.append( "timeMS" , t.millis() ); + result.append("count", count); + result.append("gotit", gotit); + result.append("errors", errors); + result.append("timeMS", t.millis()); return errors == 0; } - + // variables for test static DistributedLock * current; static int gotit; static int errors; static AtomicUInt count; - + static bool keepGoing; } testDistLockWithSyncCmd; - DistributedLock * TestDistLockWithSync::current; AtomicUInt TestDistLockWithSync::count; int TestDistLockWithSync::gotit; @@ -101,4 +139,300 @@ namespace mongo { bool TestDistLockWithSync::keepGoing; + + class TestDistLockWithSkew: public Command { + public: + + static const int logLvl = 1; + + TestDistLockWithSkew() : + Command("_testDistLockWithSkew") { + } + virtual void help(stringstream& help) const { + help << "should not be calling this directly" << endl; + } + + virtual bool slaveOk() const { + return false; + } + virtual bool adminOnly() const { + return true; + } + virtual LockType locktype() const { + return NONE; + } + + void runThread(ConnectionString& hostConn, unsigned threadId, unsigned seed, + BSONObj& cmdObj, BSONObjBuilder& result) { + + stringstream ss; + ss << "thread-" << threadId; + setThreadName(ss.str().c_str()); + + // Lock name + string lockName = string_field(cmdObj, "lockName", this->name + "_lock"); + + // Range of clock skew in diff threads + int skewRange = (int) number_field(cmdObj, "skewRange", 1); + + // How long to wait with the lock + int threadWait = (int) number_field(cmdObj, "threadWait", 30); + if(threadWait <= 0) threadWait = 1; + + // Max amount of time (ms) a thread waits before checking the lock again + int threadSleep = (int) number_field(cmdObj, "threadSleep", 30); + if(threadSleep <= 0) threadSleep = 1; + + // How long until the lock is forced in ms, only compared locally + unsigned long long takeoverMS = (unsigned long long) number_field(cmdObj, "takeoverMS", 0); + + // Whether or not we should hang some threads + int hangThreads = (int) number_field(cmdObj, "hangThreads", 0); + + + boost::mt19937 gen((boost::mt19937::result_type) seed); + + boost::variate_generator<boost::mt19937&, boost::uniform_int<> > randomSkew(gen, boost::uniform_int<>(0, skewRange)); + boost::variate_generator<boost::mt19937&, boost::uniform_int<> > randomWait(gen, boost::uniform_int<>(1, threadWait)); + boost::variate_generator<boost::mt19937&, boost::uniform_int<> > randomSleep(gen, boost::uniform_int<>(1, threadSleep)); + + + int skew = 0; + if (!lock.get()) { + + // Pick a skew, but the first two threads skew the whole range + if(threadId == 0) + skew = -skewRange / 2; + else if(threadId == 1) + skew = skewRange / 2; + else skew = randomSkew() - (skewRange / 2); + + // Skew this thread + jsTimeVirtualThreadSkew( skew ); + + log() << "Initializing lock with skew of " << skew << " for thread " << threadId << endl; + + lock.reset(new DistributedLock(hostConn, lockName, takeoverMS, true )); + + log() << "Skewed time " << jsTime() << " for thread " << threadId << endl + << " max wait (with lock: " << threadWait << ", after lock: " << threadSleep << ")" << endl + << " takeover in " << takeoverMS << "(ms remote)" << endl; + + } + + DistributedLock* myLock = lock.get(); + + bool errors = false; + BSONObj lockObj; + while (keepGoing) { + try { + + if (myLock->lock_try("Testing distributed lock with skew.", false, &lockObj )) { + + log() << "**** Locked for thread " << threadId << " with ts " << lockObj["ts"] << endl; + + if( count % 2 == 1 && ! myLock->lock_try( "Testing lock re-entry.", true ) ) { + errors = true; + log() << "**** !Could not re-enter lock already held" << endl; + break; + } + + if( count % 3 == 1 && myLock->lock_try( "Testing lock non-re-entry.", false ) ) { + errors = true; + log() << "**** !Invalid lock re-entry" << endl; + break; + } + + count++; + int before = count; + int sleep = randomWait(); + sleepmillis(sleep); + int after = count; + + if(after != before) { + errors = true; + log() << "**** !Bad increment while sleeping with lock for: " << sleep << "ms" << endl; + break; + } + + // Unlock only half the time... + if(hangThreads == 0 || threadId % hangThreads != 0) { + log() << "**** Unlocking for thread " << threadId << " with ts " << lockObj["ts"] << endl; + myLock->unlock( &lockObj ); + } + else { + log() << "**** Not unlocking for thread " << threadId << endl; + DistributedLock::killPinger( *myLock ); + // We're simulating a crashed process... + break; + } + } + + } + catch( LockException& e ) { + log() << "*** !Could not try distributed lock." << causedBy( e ) << endl; + break; + } + + sleepmillis(randomSleep()); + } + + result << "errors" << errors + << "skew" << skew + << "takeover" << (long long) takeoverMS + << "localTimeout" << (takeoverMS > 0); + + } + + void test(ConnectionString& hostConn, string& lockName, unsigned seed) { + return; + } + + bool run(const string&, BSONObj& cmdObj, int, string& errmsg, + BSONObjBuilder& result, bool) { + + Timer t; + + ConnectionString hostConn(cmdObj["host"].String(), + ConnectionString::SYNC); + + unsigned seed = (unsigned) number_field(cmdObj, "seed", 0); + int numThreads = (int) number_field(cmdObj, "numThreads", 4); + int wait = (int) number_field(cmdObj, "wait", 10000); + + log() << "Starting " << this->name << " with -" << endl + << " seed: " << seed << endl + << " numThreads: " << numThreads << endl + << " total wait: " << wait << endl << endl; + + // Skew host clocks if needed + try { + skewClocks( hostConn, cmdObj ); + } + catch( DBException e ) { + errmsg = str::stream() << "Clocks could not be skewed." << causedBy( e ); + return false; + } + + count = 0; + keepGoing = true; + + vector<shared_ptr<boost::thread> > threads; + vector<shared_ptr<BSONObjBuilder> > results; + for (int i = 0; i < numThreads; i++) { + results.push_back(shared_ptr<BSONObjBuilder> (new BSONObjBuilder())); + threads.push_back(shared_ptr<boost::thread> (new boost::thread( + boost::bind(&TestDistLockWithSkew::runThread, this, + hostConn, (unsigned) i, seed + i, boost::ref(cmdObj), + boost::ref(*(results[i].get())))))); + } + + sleepsecs(wait / 1000); + keepGoing = false; + + bool errors = false; + for (unsigned i = 0; i < threads.size(); i++) { + threads[i]->join(); + errors = errors || results[i].get()->obj()["errors"].Bool(); + } + + result.append("count", count); + result.append("errors", errors); + result.append("timeMS", t.millis()); + + return !errors; + + } + + /** + * Skews the clocks of a remote cluster by a particular amount, specified by + * the "skewHosts" element in a BSONObj. + */ + static void skewClocks( ConnectionString& cluster, BSONObj& cmdObj ) { + + vector<long long> skew; + if(cmdObj.hasField("skewHosts")) { + bsonArrToNumVector<long long>(cmdObj["skewHosts"], skew); + } + else { + log( logLvl ) << "No host clocks to skew." << endl; + return; + } + + log( logLvl ) << "Skewing clocks of hosts " << cluster << endl; + + unsigned s = 0; + for(vector<long long>::iterator i = skew.begin(); i != skew.end(); ++i,s++) { + + ConnectionString server( cluster.getServers()[s] ); + ScopedDbConnection conn( server ); + + BSONObj result; + try { + bool success = conn->runCommand( string("admin"), BSON( "_skewClockCommand" << 1 << "skew" << *i ), result ); + + uassert(13678, str::stream() << "Could not communicate with server " << server.toString() << " in cluster " << cluster.toString() << " to change skew by " << *i, success ); + + log( logLvl + 1 ) << " Skewed host " << server << " clock by " << *i << endl; + } + catch(...) { + conn.done(); + throw; + } + + conn.done(); + + } + + } + + // variables for test + thread_specific_ptr<DistributedLock> lock; + AtomicUInt count; + bool keepGoing; + + } testDistLockWithSkewCmd; + + + /** + * Utility command to virtually skew the clock of a mongo server a particular amount. + * This skews the clock globally, per-thread skew is also possible. + */ + class SkewClockCommand: public Command { + public: + SkewClockCommand() : + Command("_skewClockCommand") { + } + virtual void help(stringstream& help) const { + help << "should not be calling this directly" << endl; + } + + virtual bool slaveOk() const { + return false; + } + virtual bool adminOnly() const { + return true; + } + virtual LockType locktype() const { + return NONE; + } + + bool run(const string&, BSONObj& cmdObj, int, string& errmsg, + BSONObjBuilder& result, bool) { + + long long skew = (long long) number_field(cmdObj, "skew", 0); + + log() << "Adjusting jsTime() clock skew to " << skew << endl; + + jsTimeVirtualSkew( skew ); + + log() << "JSTime adjusted, now is " << jsTime() << endl; + + return true; + + } + + } testSkewClockCommand; + } + diff --git a/client/examples/clientTest.cpp b/client/examples/clientTest.cpp index 96c014e..aaea6bd 100644 --- a/client/examples/clientTest.cpp +++ b/client/examples/clientTest.cpp @@ -246,5 +246,34 @@ int main( int argc, const char **argv ) { //MONGO_PRINT(out); } + { + // test timeouts + + DBClientConnection conn( true , 0 , 2 ); + if ( ! conn.connect( string( "127.0.0.1:" ) + port , errmsg ) ) { + cout << "couldn't connect : " << errmsg << endl; + throw -11; + } + conn.insert( "test.totest" , BSON( "x" << 1 ) ); + BSONObj res; + + bool gotError = false; + assert( conn.eval( "test" , "return db.totest.findOne().x" , res ) ); + try { + conn.eval( "test" , "sleep(5000); return db.totest.findOne().x" , res ); + } + catch ( std::exception& e ) { + gotError = true; + log() << e.what() << endl; + } + assert( gotError ); + // sleep so the server isn't locked anymore + sleepsecs( 4 ); + + assert( conn.eval( "test" , "return db.totest.findOne().x" , res ) ); + + + } + cout << "client test finished!" << endl; } diff --git a/client/examples/httpClientTest.cpp b/client/examples/httpClientTest.cpp index 4fa5fd8..4055d44 100644 --- a/client/examples/httpClientTest.cpp +++ b/client/examples/httpClientTest.cpp @@ -18,10 +18,27 @@ #include <iostream> #include "client/dbclient.h" -#include "util/httpclient.h" +#include "util/net/httpclient.h" using namespace mongo; +void play( string url ) { + cout << "[" << url << "]" << endl; + + HttpClient c; + HttpClient::Result r; + MONGO_assert( c.get( url , &r ) == 200 ); + + HttpClient::Headers h = r.getHeaders(); + MONGO_assert( h["Content-Type"].find( "text/html" ) == 0 ); + + cout << "\tHeaders" << endl; + for ( HttpClient::Headers::iterator i = h.begin() ; i != h.end(); ++i ) { + cout << "\t\t" << i->first << "\t" << i->second << endl; + } + +} + int main( int argc, const char **argv ) { int port = 27017; @@ -32,12 +49,10 @@ int main( int argc, const char **argv ) { } port += 1000; - stringstream ss; - ss << "http://localhost:" << port << "/"; - string url = ss.str(); - - cout << "[" << url << "]" << endl; - - HttpClient c; - MONGO_assert( c.get( url ) == 200 ); + play( str::stream() << "http://localhost:" << port << "/" ); + +#ifdef MONGO_SSL + play( "https://www.10gen.com/" ); +#endif + } diff --git a/client/examples/insert_demo.cpp b/client/examples/insert_demo.cpp new file mode 100644 index 0000000..14ac79e --- /dev/null +++ b/client/examples/insert_demo.cpp @@ -0,0 +1,47 @@ +/* + C++ client program which inserts documents in a MongoDB database. + + How to build and run: + + Using mongo_client_lib.cpp: + g++ -I .. -I ../.. insert_demo.cpp ../mongo_client_lib.cpp -lboost_thread-mt -lboost_filesystem + ./a.out +*/ + +#include <iostream> +#include "dbclient.h" // the mongo c++ driver + +using namespace std; +using namespace mongo; +using namespace bson; + +int main() { + try { + cout << "connecting to localhost..." << endl; + DBClientConnection c; + c.connect("localhost"); + cout << "connected ok" << endl; + + bo o = BSON( "hello" << "world" ); + + cout << "inserting..." << endl; + + time_t start = time(0); + for( unsigned i = 0; i < 1000000; i++ ) { + c.insert("test.foo", o); + } + + // wait until all operations applied + cout << "getlasterror returns: \"" << c.getLastError() << '"' << endl; + + time_t done = time(0); + time_t dt = done-start; + cout << dt << " seconds " << 1000000/dt << " per second" << endl; + } + catch(DBException& e) { + cout << "caught DBException " << e.toString() << endl; + return 1; + } + + return 0; +} diff --git a/client/examples/rs.cpp b/client/examples/rs.cpp index 7813ec6..3307d87 100644 --- a/client/examples/rs.cpp +++ b/client/examples/rs.cpp @@ -21,11 +21,62 @@ #include "client/dbclient.h" #include <iostream> +#include <vector> using namespace mongo; using namespace std; +void workerThread( string collName , bool print , DBClientReplicaSet * conn ) { + + while ( true ) { + try { + conn->update( collName , BSONObj() , BSON( "$inc" << BSON( "x" << 1 ) ) , true ); + + BSONObj x = conn->findOne( collName , BSONObj() ); + + if ( print ) { + cout << x << endl; + } + + BSONObj a = conn->slaveConn().findOne( collName , BSONObj() , 0 , QueryOption_SlaveOk ); + BSONObj b = conn->findOne( collName , BSONObj() , 0 , QueryOption_SlaveOk ); + + if ( print ) { + cout << "\t A " << a << endl; + cout << "\t B " << b << endl; + } + } + catch ( std::exception& e ) { + cout << "ERROR: " << e.what() << endl; + } + sleepmillis( 10 ); + } +} + int main( int argc , const char ** argv ) { + + unsigned nThreads = 1; + bool print = false; + bool testTimeout = false; + + for ( int i=1; i<argc; i++ ) { + if ( mongoutils::str::equals( "--threads" , argv[i] ) ) { + nThreads = atoi( argv[++i] ); + } + else if ( mongoutils::str::equals( "--print" , argv[i] ) ) { + print = true; + } + // Run a special mode to demonstrate the DBClientReplicaSet so_timeout option. + else if ( mongoutils::str::equals( "--testTimeout" , argv[i] ) ) { + testTimeout = true; + } + else { + cerr << "unknown option: " << argv[i] << endl; + return 1; + } + + } + string errmsg; ConnectionString cs = ConnectionString::parse( "foo/127.0.0.1" , errmsg ); if ( ! cs.isValid() ) { @@ -33,7 +84,7 @@ int main( int argc , const char ** argv ) { return 1; } - DBClientReplicaSet * conn = (DBClientReplicaSet*)cs.connect( errmsg ); + DBClientReplicaSet * conn = dynamic_cast<DBClientReplicaSet*>(cs.connect( errmsg, testTimeout ? 10 : 0 )); if ( ! conn ) { cout << "error connecting: " << errmsg << endl; return 2; @@ -42,17 +93,26 @@ int main( int argc , const char ** argv ) { string collName = "test.rs1"; conn->dropCollection( collName ); - while ( true ) { + + if ( testTimeout ) { + conn->insert( collName, BSONObj() ); try { - conn->update( collName , BSONObj() , BSON( "$inc" << BSON( "x" << 1 ) ) , true ); - cout << conn->findOne( collName , BSONObj() ) << endl; - cout << "\t A" << conn->slaveConn().findOne( collName , BSONObj() , 0 , QueryOption_SlaveOk ) << endl; - cout << "\t B " << conn->findOne( collName , BSONObj() , 0 , QueryOption_SlaveOk ) << endl; - } - catch ( std::exception& e ) { - cout << "ERROR: " << e.what() << endl; + conn->count( collName, BSON( "$where" << "sleep(40000)" ) ); + } catch( DBException& ) { + return 0; } - sleepsecs( 1 ); + cout << "expected socket exception" << endl; + return 1; + } + + vector<boost::shared_ptr<boost::thread> > threads; + for ( unsigned i=0; i<nThreads; i++ ) { + string errmsg; + threads.push_back( boost::shared_ptr<boost::thread>( new boost::thread( boost::bind( workerThread , collName , print , (DBClientReplicaSet*)cs.connect(errmsg) ) ) ) ); + } + + for ( unsigned i=0; i<threads.size(); i++ ) { + threads[i]->join(); } } diff --git a/client/examples/simple_client_demo.vcxproj b/client/examples/simple_client_demo.vcxproj new file mode 100755 index 0000000..4658a42 --- /dev/null +++ b/client/examples/simple_client_demo.vcxproj @@ -0,0 +1,92 @@ +<?xml version="1.0" encoding="utf-8"?>
+<Project DefaultTargets="Build" ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+ <ItemGroup Label="ProjectConfigurations">
+ <ProjectConfiguration Include="Debug|Win32">
+ <Configuration>Debug</Configuration>
+ <Platform>Win32</Platform>
+ </ProjectConfiguration>
+ <ProjectConfiguration Include="Release|Win32">
+ <Configuration>Release</Configuration>
+ <Platform>Win32</Platform>
+ </ProjectConfiguration>
+ </ItemGroup>
+ <PropertyGroup Label="Globals">
+ <ProjectGuid>{89C30BC3-2874-4F2C-B4DA-EB04E9782236}</ProjectGuid>
+ <Keyword>Win32Proj</Keyword>
+ <RootNamespace>simple_client_demo</RootNamespace>
+ </PropertyGroup>
+ <Import Project="$(VCTargetsPath)\Microsoft.Cpp.Default.props" />
+ <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'" Label="Configuration">
+ <ConfigurationType>Application</ConfigurationType>
+ <UseDebugLibraries>true</UseDebugLibraries>
+ <CharacterSet>Unicode</CharacterSet>
+ </PropertyGroup>
+ <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|Win32'" Label="Configuration">
+ <ConfigurationType>Application</ConfigurationType>
+ <UseDebugLibraries>false</UseDebugLibraries>
+ <WholeProgramOptimization>true</WholeProgramOptimization>
+ <CharacterSet>Unicode</CharacterSet>
+ </PropertyGroup>
+ <Import Project="$(VCTargetsPath)\Microsoft.Cpp.props" />
+ <ImportGroup Label="ExtensionSettings">
+ </ImportGroup>
+ <ImportGroup Label="PropertySheets" Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">
+ <Import Project="$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props" Condition="exists('$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props')" Label="LocalAppDataPlatform" />
+ </ImportGroup>
+ <ImportGroup Label="PropertySheets" Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">
+ <Import Project="$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props" Condition="exists('$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props')" Label="LocalAppDataPlatform" />
+ </ImportGroup>
+ <PropertyGroup Label="UserMacros" />
+ <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">
+ <LinkIncremental>true</LinkIncremental>
+ <IncludePath>..\..;..\..\pcre-7.4;$(IncludePath)</IncludePath>
+ <LibraryPath>\boost\lib\vs2010_32;$(LibraryPath)</LibraryPath>
+ </PropertyGroup>
+ <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">
+ <LinkIncremental>false</LinkIncremental>
+ <IncludePath>..\..;..\..\pcre-7.4;$(IncludePath)</IncludePath>
+ <LibraryPath>\boost\lib\vs2010_32;$(LibraryPath)</LibraryPath>
+ </PropertyGroup>
+ <ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">
+ <ClCompile>
+ <PrecompiledHeader>
+ </PrecompiledHeader>
+ <WarningLevel>Level3</WarningLevel>
+ <Optimization>Disabled</Optimization>
+ <PreprocessorDefinitions> _CRT_SECURE_NO_WARNINGS;_DEBUG;_CONSOLE;%(PreprocessorDefinitions)</PreprocessorDefinitions>
+ <AdditionalIncludeDirectories>c:\boost;\boost</AdditionalIncludeDirectories>
+ </ClCompile>
+ <Link>
+ <SubSystem>Console</SubSystem>
+ <GenerateDebugInformation>true</GenerateDebugInformation>
+ <AdditionalDependencies>ws2_32.lib;kernel32.lib;user32.lib;gdi32.lib;winspool.lib;comdlg32.lib;advapi32.lib;shell32.lib;ole32.lib;oleaut32.lib;uuid.lib;odbc32.lib;odbccp32.lib;%(AdditionalDependencies)</AdditionalDependencies>
+ </Link>
+ </ItemDefinitionGroup>
+ <ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">
+ <ClCompile>
+ <WarningLevel>Level3</WarningLevel>
+ <PrecompiledHeader>
+ </PrecompiledHeader>
+ <Optimization>MaxSpeed</Optimization>
+ <FunctionLevelLinking>true</FunctionLevelLinking>
+ <IntrinsicFunctions>true</IntrinsicFunctions>
+ <PreprocessorDefinitions> _CRT_SECURE_NO_WARNINGS;NDEBUG;_CONSOLE;%(PreprocessorDefinitions)</PreprocessorDefinitions>
+ <RuntimeLibrary>MultiThreaded</RuntimeLibrary>
+ <AdditionalIncludeDirectories>c:\boost;\boost</AdditionalIncludeDirectories>
+ </ClCompile>
+ <Link>
+ <SubSystem>Console</SubSystem>
+ <GenerateDebugInformation>true</GenerateDebugInformation>
+ <EnableCOMDATFolding>true</EnableCOMDATFolding>
+ <OptimizeReferences>true</OptimizeReferences>
+ <AdditionalDependencies>ws2_32.lib;kernel32.lib;user32.lib;gdi32.lib;winspool.lib;comdlg32.lib;advapi32.lib;shell32.lib;ole32.lib;oleaut32.lib;uuid.lib;odbc32.lib;odbccp32.lib;%(AdditionalDependencies)</AdditionalDependencies>
+ </Link>
+ </ItemDefinitionGroup>
+ <ItemGroup>
+ <ClCompile Include="..\mongo_client_lib.cpp" />
+ <ClCompile Include="..\simple_client_demo.cpp" />
+ </ItemGroup>
+ <Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" />
+ <ImportGroup Label="ExtensionTargets">
+ </ImportGroup>
+</Project>
\ No newline at end of file diff --git a/client/examples/simple_client_demo.vcxproj.filters b/client/examples/simple_client_demo.vcxproj.filters new file mode 100755 index 0000000..d6580c3 --- /dev/null +++ b/client/examples/simple_client_demo.vcxproj.filters @@ -0,0 +1,21 @@ +<?xml version="1.0" encoding="utf-8"?>
+<Project ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+ <ItemGroup>
+ <Filter Include="Source Files">
+ <UniqueIdentifier>{4FC737F1-C7A5-4376-A066-2A32D752A2FF}</UniqueIdentifier>
+ <Extensions>cpp;c;cc;cxx;def;odl;idl;hpj;bat;asm;asmx</Extensions>
+ </Filter>
+ <Filter Include="Header Files">
+ <UniqueIdentifier>{93995380-89BD-4b04-88EB-625FBE52EBFB}</UniqueIdentifier>
+ <Extensions>h;hpp;hxx;hm;inl;inc;xsd</Extensions>
+ </Filter>
+ </ItemGroup>
+ <ItemGroup>
+ <ClCompile Include="..\simple_client_demo.cpp">
+ <Filter>Source Files</Filter>
+ </ClCompile>
+ <ClCompile Include="..\mongo_client_lib.cpp">
+ <Filter>Source Files</Filter>
+ </ClCompile>
+ </ItemGroup>
+</Project>
\ No newline at end of file diff --git a/client/examples/whereExample.cpp b/client/examples/whereExample.cpp index ce4174b..12b68d7 100644 --- a/client/examples/whereExample.cpp +++ b/client/examples/whereExample.cpp @@ -1,4 +1,5 @@ -// whereExample.cpp +// @file whereExample.cpp +// @see http://www.mongodb.org/display/DOCS/Server-side+Code+Execution /* Copyright 2009 10gen Inc. * diff --git a/client/mongo_client_lib.cpp b/client/mongo_client_lib.cpp index 69f801a..8100d71 100644 --- a/client/mongo_client_lib.cpp +++ b/client/mongo_client_lib.cpp @@ -4,13 +4,23 @@ Normally one includes dbclient.h, and links against libmongoclient.a, when connecting to MongoDB from C++. However, if you have a situation where the pre-built library does not work, you can use - this file instead to build all the necessary symbols. To do so, include client_lib.cpp in your + this file instead to build all the necessary symbols. To do so, include mongo_client_lib.cpp in your project. + GCC + --- For example, to build and run simple_client_demo.cpp with GCC and run it: g++ -I .. simple_client_demo.cpp mongo_client_lib.cpp -lboost_thread-mt -lboost_filesystem ./a.out + + Visual Studio (2010 tested) + --------------------------- + First, see client/examples/simple_client_demo.vcxproj. + - Be sure to include your boost include directory in your project as an Additional Include Directory. + - Define _CRT_SECURE_NO_WARNINGS to avoid warnings on use of strncpy and such by the MongoDB client code. + - Include the boost libraries directory. + - Linker.Input.Additional Dependencies - add ws2_32.lib for the Winsock library. */ /* Copyright 2009 10gen Inc. @@ -28,23 +38,30 @@ * limitations under the License. */ +#if defined(_WIN32) +// C4800 forcing value to bool 'true' or 'false' (performance warning) +#pragma warning( disable : 4800 ) +#endif + #include "../util/md5main.cpp" #define MONGO_EXPOSE_MACROS #include "../pch.h" #include "../util/assert_util.cpp" -#include "../util/message.cpp" +#include "../util/net/message.cpp" #include "../util/util.cpp" #include "../util/background.cpp" #include "../util/base64.cpp" -#include "../util/sock.cpp" +#include "../util/net/sock.cpp" #include "../util/log.cpp" #include "../util/password.cpp" +#include "../util/net/message_port.cpp" #include "../util/concurrency/thread_pool.cpp" #include "../util/concurrency/vars.cpp" #include "../util/concurrency/task.cpp" +#include "../util/concurrency/spin_lock.cpp" #include "connpool.cpp" #include "syncclusterconnection.cpp" @@ -53,13 +70,19 @@ #include "gridfs.cpp" #include "dbclientcursor.cpp" +#include "../util/text.cpp" +#include "dbclient_rs.cpp" +#include "../bson/oid.cpp" + #include "../db/lasterror.cpp" #include "../db/json.cpp" #include "../db/jsobj.cpp" -#include "../db/common.cpp" +//#include "../db/common.cpp" #include "../db/nonce.cpp" #include "../db/commands.cpp" +#include "../pch.cpp" + extern "C" { #include "../util/md5.c" } diff --git a/client/parallel.cpp b/client/parallel.cpp index c4905e3..76b0168 100644 --- a/client/parallel.cpp +++ b/client/parallel.cpp @@ -63,7 +63,20 @@ namespace mongo { _init(); } - auto_ptr<DBClientCursor> ClusteredCursor::query( const string& server , int num , BSONObj extra , int skipLeft ) { + void ClusteredCursor::_checkCursor( DBClientCursor * cursor ) { + assert( cursor ); + + if ( cursor->hasResultFlag( ResultFlag_ShardConfigStale ) ) { + throw StaleConfigException( _ns , "ClusteredCursor::query" ); + } + + if ( cursor->hasResultFlag( ResultFlag_ErrSet ) ) { + BSONObj o = cursor->next(); + throw UserException( o["code"].numberInt() , o["$err"].String() ); + } + } + + auto_ptr<DBClientCursor> ClusteredCursor::query( const string& server , int num , BSONObj extra , int skipLeft , bool lazy ) { uassert( 10017 , "cursor already done" , ! _done ); assert( _didInit ); @@ -80,12 +93,10 @@ namespace mongo { throw StaleConfigException( _ns , "ClusteredCursor::query ShardConnection had to change" , true ); } - if ( logLevel >= 5 ) { - log(5) << "ClusteredCursor::query (" << type() << ") server:" << server - << " ns:" << _ns << " query:" << q << " num:" << num - << " _fields:" << _fields << " options: " << _options << endl; - } - + LOG(5) << "ClusteredCursor::query (" << type() << ") server:" << server + << " ns:" << _ns << " query:" << q << " num:" << num + << " _fields:" << _fields << " options: " << _options << endl; + auto_ptr<DBClientCursor> cursor = conn->query( _ns , q , num , 0 , ( _fields.isEmpty() ? 0 : &_fields ) , _options , _batchSize == 0 ? 0 : _batchSize + skipLeft ); @@ -97,21 +108,9 @@ namespace mongo { massert( 13633 , str::stream() << "error querying server: " << server , cursor.get() ); - if ( cursor->hasResultFlag( ResultFlag_ShardConfigStale ) ) { - conn.done(); - throw StaleConfigException( _ns , "ClusteredCursor::query" ); - } - - if ( cursor->hasResultFlag( ResultFlag_ErrSet ) ) { - conn.done(); - BSONObj o = cursor->next(); - throw UserException( o["code"].numberInt() , o["$err"].String() ); - } - - - cursor->attach( &conn ); - - conn.done(); + cursor->attach( &conn ); // this calls done on conn + assert( ! conn.ok() ); + _checkCursor( cursor.get() ); return cursor; } catch ( SocketException& e ) { @@ -228,6 +227,11 @@ namespace mongo { : _matcher( filter ) , _cursor( cursor ) , _done( cursor.get() == 0 ) { } + FilteringClientCursor::FilteringClientCursor( DBClientCursor* cursor , const BSONObj filter ) + : _matcher( filter ) , _cursor( cursor ) , _done( cursor == 0 ) { + } + + FilteringClientCursor::~FilteringClientCursor() { } @@ -237,6 +241,13 @@ namespace mongo { _done = _cursor.get() == 0; } + void FilteringClientCursor::reset( DBClientCursor* cursor ) { + _cursor.reset( cursor ); + _next = BSONObj(); + _done = cursor == 0; + } + + bool FilteringClientCursor::more() { if ( ! _next.isEmpty() ) return true; @@ -399,17 +410,245 @@ namespace mongo { } } + // TODO: Merge with futures API? We do a lot of error checking here that would be useful elsewhere. void ParallelSortClusteredCursor::_init() { + + // log() << "Starting parallel search..." << endl; + + // make sure we're not already initialized assert( ! _cursors ); _cursors = new FilteringClientCursor[_numServers]; - // TODO: parellize - int num = 0; - for ( set<ServerAndQuery>::iterator i = _servers.begin(); i!=_servers.end(); ++i ) { - const ServerAndQuery& sq = *i; - _cursors[num++].reset( query( sq._server , 0 , sq._extra , _needToSkip ) ); + bool returnPartial = ( _options & QueryOption_PartialResults ); + + vector<ServerAndQuery> queries( _servers.begin(), _servers.end() ); + set<int> retryQueries; + int finishedQueries = 0; + + vector< shared_ptr<ShardConnection> > conns; + vector<string> servers; + + // Since we may get all sorts of errors, record them all as they come and throw them later if necessary + vector<string> staleConfigExs; + vector<string> socketExs; + vector<string> otherExs; + bool allConfigStale = false; + + int retries = -1; + + // Loop through all the queries until we've finished or gotten a socket exception on all of them + // We break early for non-socket exceptions, and socket exceptions if we aren't returning partial results + do { + retries++; + + bool firstPass = retryQueries.size() == 0; + + if( ! firstPass ){ + log() << "retrying " << ( returnPartial ? "(partial) " : "" ) << "parallel connection to "; + for( set<int>::iterator it = retryQueries.begin(); it != retryQueries.end(); ++it ){ + log() << queries[*it]._server << ", "; + } + log() << finishedQueries << " finished queries." << endl; + } + + size_t num = 0; + for ( vector<ServerAndQuery>::iterator it = queries.begin(); it != queries.end(); ++it ) { + size_t i = num++; + + const ServerAndQuery& sq = *it; + + // If we're not retrying this cursor on later passes, continue + if( ! firstPass && retryQueries.find( i ) == retryQueries.end() ) continue; + + // log() << "Querying " << _query << " from " << _ns << " for " << sq._server << endl; + + BSONObj q = _query; + if ( ! sq._extra.isEmpty() ) { + q = concatQuery( q , sq._extra ); + } + + string errLoc = " @ " + sq._server; + + if( firstPass ){ + + // This may be the first time connecting to this shard, if so we can get an error here + try { + conns.push_back( shared_ptr<ShardConnection>( new ShardConnection( sq._server , _ns ) ) ); + } + catch( std::exception& e ){ + socketExs.push_back( e.what() + errLoc ); + if( ! returnPartial ){ + num--; + break; + } + conns.push_back( shared_ptr<ShardConnection>() ); + continue; + } + + servers.push_back( sq._server ); + } + + if ( conns[i]->setVersion() ) { + conns[i]->done(); + staleConfigExs.push_back( StaleConfigException( _ns , "ClusteredCursor::query ShardConnection had to change" , true ).what() + errLoc ); + break; + } + + LOG(5) << "ParallelSortClusteredCursor::init server:" << sq._server << " ns:" << _ns + << " query:" << q << " _fields:" << _fields << " options: " << _options << endl; + + if( ! _cursors[i].raw() ) + _cursors[i].reset( new DBClientCursor( conns[i]->get() , _ns , q , + 0 , // nToReturn + 0 , // nToSkip + _fields.isEmpty() ? 0 : &_fields , // fieldsToReturn + _options , + _batchSize == 0 ? 0 : _batchSize + _needToSkip // batchSize + ) ); + + try{ + _cursors[i].raw()->initLazy( ! firstPass ); + } + catch( SocketException& e ){ + socketExs.push_back( e.what() + errLoc ); + _cursors[i].reset( NULL ); + conns[i]->done(); + if( ! returnPartial ) break; + } + catch( std::exception& e){ + otherExs.push_back( e.what() + errLoc ); + _cursors[i].reset( NULL ); + conns[i]->done(); + break; + } + + } + + // Go through all the potentially started cursors and finish initializing them or log any errors and + // potentially retry + // TODO: Better error classification would make this easier, errors are indicated in all sorts of ways + // here that we need to trap. + for ( size_t i = 0; i < num; i++ ) { + + // log() << "Finishing query for " << cons[i].get()->getHost() << endl; + string errLoc = " @ " + queries[i]._server; + + if( ! _cursors[i].raw() || ( ! firstPass && retryQueries.find( i ) == retryQueries.end() ) ){ + if( conns[i] ) conns[i].get()->done(); + continue; + } + + assert( conns[i] ); + retryQueries.erase( i ); + + bool retry = false; + + try { + + if( ! _cursors[i].raw()->initLazyFinish( retry ) ) { + + warning() << "invalid result from " << conns[i]->getHost() << ( retry ? ", retrying" : "" ) << endl; + _cursors[i].reset( NULL ); + + if( ! retry ){ + socketExs.push_back( str::stream() << "error querying server: " << servers[i] ); + conns[i]->done(); + } + else { + retryQueries.insert( i ); + } + + continue; + } + } + catch ( MsgAssertionException& e ){ + socketExs.push_back( e.what() + errLoc ); + _cursors[i].reset( NULL ); + conns[i]->done(); + continue; + } + catch ( SocketException& e ) { + socketExs.push_back( e.what() + errLoc ); + _cursors[i].reset( NULL ); + conns[i]->done(); + continue; + } + catch( std::exception& e ){ + otherExs.push_back( e.what() + errLoc ); + _cursors[i].reset( NULL ); + conns[i]->done(); + continue; + } + + try { + _cursors[i].raw()->attach( conns[i].get() ); // this calls done on conn + _checkCursor( _cursors[i].raw() ); + + finishedQueries++; + } + catch ( StaleConfigException& e ){ + + // Our stored configuration data is actually stale, we need to reload it + // when we throw our exception + allConfigStale = true; + + staleConfigExs.push_back( e.what() + errLoc ); + _cursors[i].reset( NULL ); + conns[i]->done(); + continue; + } + catch( std::exception& e ){ + otherExs.push_back( e.what() + errLoc ); + _cursors[i].reset( NULL ); + conns[i]->done(); + continue; + } + } + + // Don't exceed our max retries, should not happen + assert( retries < 5 ); + } + while( retryQueries.size() > 0 /* something to retry */ && + ( socketExs.size() == 0 || returnPartial ) /* no conn issues */ && + staleConfigExs.size() == 0 /* no config issues */ && + otherExs.size() == 0 /* no other issues */); + + // Assert that our conns are all closed! + for( vector< shared_ptr<ShardConnection> >::iterator i = conns.begin(); i < conns.end(); ++i ){ + assert( ! (*i) || ! (*i)->ok() ); + } + + // Handle errors we got during initialization. + // If we're returning partial results, we can ignore socketExs, but nothing else + // Log a warning in any case, so we don't lose these messages + bool throwException = ( socketExs.size() > 0 && ! returnPartial ) || staleConfigExs.size() > 0 || otherExs.size() > 0; + + if( socketExs.size() > 0 || staleConfigExs.size() > 0 || otherExs.size() > 0 ) { + + vector<string> errMsgs; + + errMsgs.insert( errMsgs.end(), staleConfigExs.begin(), staleConfigExs.end() ); + errMsgs.insert( errMsgs.end(), otherExs.begin(), otherExs.end() ); + errMsgs.insert( errMsgs.end(), socketExs.begin(), socketExs.end() ); + + stringstream errMsg; + errMsg << "could not initialize cursor across all shards because : "; + for( vector<string>::iterator i = errMsgs.begin(); i != errMsgs.end(); i++ ){ + if( i != errMsgs.begin() ) errMsg << " :: and :: "; + errMsg << *i; + } + + if( throwException && staleConfigExs.size() > 0 ) + throw StaleConfigException( _ns , errMsg.str() , ! allConfigStale ); + else if( throwException ) + throw DBException( errMsg.str(), 14827 ); + else + warning() << errMsg.str() << endl; } + if( retries > 0 ) + log() << "successfully finished parallel query after " << retries << " retries" << endl; + } ParallelSortClusteredCursor::~ParallelSortClusteredCursor() { @@ -451,6 +690,7 @@ namespace mongo { if ( best.isEmpty() ) { best = me; bestFrom = i; + if( _sortKey.isEmpty() ) break; continue; } @@ -481,49 +721,62 @@ namespace mongo { // ---- Future ----- // ----------------- - Future::CommandResult::CommandResult( const string& server , const string& db , const BSONObj& cmd , DBClientBase * conn ) { - _server = server; - _db = db; - _cmd = cmd; - _conn = conn; - _done = false; - } + Future::CommandResult::CommandResult( const string& server , const string& db , const BSONObj& cmd , int options , DBClientBase * conn ) + :_server(server) ,_db(db) , _options(options), _cmd(cmd) ,_conn(conn) ,_done(false) + { + try { + if ( ! _conn ){ + _connHolder.reset( new ScopedDbConnection( _server ) ); + _conn = _connHolder->get(); + } - bool Future::CommandResult::join() { - _thr->join(); - assert( _done ); - return _ok; + if ( _conn->lazySupported() ) { + _cursor.reset( new DBClientCursor(_conn, _db + ".$cmd", _cmd, -1/*limit*/, 0, NULL, _options, 0)); + _cursor->initLazy(); + } + else { + _done = true; // we set _done first because even if there is an error we're done + _ok = _conn->runCommand( db , cmd , _res , options ); + } + } + catch ( std::exception& e ) { + error() << "Future::spawnComand (part 1) exception: " << e.what() << endl; + _ok = false; + _done = true; + } } - void Future::commandThread(shared_ptr<CommandResult> res) { - setThreadName( "future" ); + bool Future::CommandResult::join() { + if (_done) + return _ok; try { - DBClientBase * conn = res->_conn; - - scoped_ptr<ScopedDbConnection> myconn; - if ( ! conn ){ - myconn.reset( new ScopedDbConnection( res->_server ) ); - conn = myconn->get(); - } - - res->_ok = conn->runCommand( res->_db , res->_cmd , res->_res ); + // TODO: Allow retries? + bool retry = false; + bool finished = _cursor->initLazyFinish( retry ); + + // Shouldn't need to communicate with server any more + if ( _connHolder ) + _connHolder->done(); - if ( myconn ) - myconn->done(); + uassert(14812, str::stream() << "Error running command on server: " << _server, finished); + massert(14813, "Command returned nothing", _cursor->more()); + + _res = _cursor->nextSafe(); + _ok = _res["ok"].trueValue(); } catch ( std::exception& e ) { - error() << "Future::commandThread exception: " << e.what() << endl; - res->_ok = false; + error() << "Future::spawnComand (part 2) exception: " << e.what() << endl; + _ok = false; } - res->_done = true; - } - shared_ptr<Future::CommandResult> Future::spawnCommand( const string& server , const string& db , const BSONObj& cmd , DBClientBase * conn ) { - shared_ptr<Future::CommandResult> res (new Future::CommandResult( server , db , cmd , conn )); - res->_thr.reset( new boost::thread( boost::bind(Future::commandThread, res) ) ); + _done = true; + return _ok; + } + shared_ptr<Future::CommandResult> Future::spawnCommand( const string& server , const string& db , const BSONObj& cmd , int options , DBClientBase * conn ) { + shared_ptr<Future::CommandResult> res (new Future::CommandResult( server , db , cmd , options , conn )); return res; } diff --git a/client/parallel.h b/client/parallel.h index 0809376..869bff9 100644 --- a/client/parallel.h +++ b/client/parallel.h @@ -89,9 +89,15 @@ namespace mongo { virtual void _init() = 0; - auto_ptr<DBClientCursor> query( const string& server , int num = 0 , BSONObj extraFilter = BSONObj() , int skipLeft = 0 ); + auto_ptr<DBClientCursor> query( const string& server , int num = 0 , BSONObj extraFilter = BSONObj() , int skipLeft = 0 , bool lazy=false ); BSONObj explain( const string& server , BSONObj extraFilter = BSONObj() ); + /** + * checks the cursor for any errors + * will throw an exceptionif an error is encountered + */ + void _checkCursor( DBClientCursor * cursor ); + static BSONObj _concatFilter( const BSONObj& filter , const BSONObj& extraFilter ); virtual void _explain( map< string,list<BSONObj> >& out ) = 0; @@ -111,15 +117,20 @@ namespace mongo { class FilteringClientCursor { public: FilteringClientCursor( const BSONObj filter = BSONObj() ); + FilteringClientCursor( DBClientCursor* cursor , const BSONObj filter = BSONObj() ); FilteringClientCursor( auto_ptr<DBClientCursor> cursor , const BSONObj filter = BSONObj() ); ~FilteringClientCursor(); void reset( auto_ptr<DBClientCursor> cursor ); + void reset( DBClientCursor* cursor ); bool more(); BSONObj next(); BSONObj peek(); + + DBClientCursor* raw() { return _cursor.get(); } + private: void _advance(); @@ -269,14 +280,16 @@ namespace mongo { private: - CommandResult( const string& server , const string& db , const BSONObj& cmd , DBClientBase * conn ); + CommandResult( const string& server , const string& db , const BSONObj& cmd , int options , DBClientBase * conn ); string _server; string _db; + int _options; BSONObj _cmd; DBClientBase * _conn; + scoped_ptr<ScopedDbConnection> _connHolder; // used if not provided a connection - scoped_ptr<boost::thread> _thr; + scoped_ptr<DBClientCursor> _cursor; BSONObj _res; bool _ok; @@ -285,7 +298,6 @@ namespace mongo { friend class Future; }; - static void commandThread(shared_ptr<CommandResult> res); /** * @param server server name @@ -293,7 +305,7 @@ namespace mongo { * @param cmd cmd to exec * @param conn optional connection to use. will use standard pooled if non-specified */ - static shared_ptr<CommandResult> spawnCommand( const string& server , const string& db , const BSONObj& cmd , DBClientBase * conn = 0 ); + static shared_ptr<CommandResult> spawnCommand( const string& server , const string& db , const BSONObj& cmd , int options , DBClientBase * conn = 0 ); }; diff --git a/client/redef_macros.h b/client/redef_macros.h index a4cb1c9..897912d 100644 --- a/client/redef_macros.h +++ b/client/redef_macros.h @@ -1,4 +1,7 @@ -/** @file redef_macros.h - redefine macros from undef_macros.h */ +/** @file redef_macros.h macros the implementation uses. + + @see undef_macros.h undefines these after use to minimize name pollution. +*/ /* Copyright 2009 10gen Inc. * diff --git a/client/simple_client_demo.cpp b/client/simple_client_demo.cpp index fa2f4a8..f4278dd 100644 --- a/client/simple_client_demo.cpp +++ b/client/simple_client_demo.cpp @@ -21,15 +21,33 @@ using namespace mongo; using namespace bson; int main() { - cout << "connecting to localhost..." << endl; - DBClientConnection c; - c.connect("localhost"); - cout << "connected ok" << endl; - unsigned long long count = c.count("test.foo"); - cout << "count of exiting documents in collection test.foo : " << count << endl; - - bo o = BSON( "hello" << "world" ); - c.insert("test.foo", o); + try { + cout << "connecting to localhost..." << endl; + DBClientConnection c; + c.connect("localhost"); + cout << "connected ok" << endl; + unsigned long long count = c.count("test.foo"); + cout << "count of exiting documents in collection test.foo : " << count << endl; + + bo o = BSON( "hello" << "world" ); + c.insert("test.foo", o); + + string e = c.getLastError(); + if( !e.empty() ) { + cout << "insert #1 failed: " << e << endl; + } + + // make an index with a unique key constraint + c.ensureIndex("test.foo", BSON("hello"<<1), /*unique*/true); + + c.insert("test.foo", o); // will cause a dup key error on "hello" field + cout << "we expect a dup key error here:" << endl; + cout << " " << c.getLastErrorDetailed().toString() << endl; + } + catch(DBException& e) { + cout << "caught DBException " << e.toString() << endl; + return 1; + } return 0; } diff --git a/client/syncclusterconnection.cpp b/client/syncclusterconnection.cpp index 4fafdc1..34633d1 100644 --- a/client/syncclusterconnection.cpp +++ b/client/syncclusterconnection.cpp @@ -24,7 +24,7 @@ namespace mongo { - SyncClusterConnection::SyncClusterConnection( const list<HostAndPort> & L) : _mutex("SynClusterConnection") { + SyncClusterConnection::SyncClusterConnection( const list<HostAndPort> & L, double socketTimeout) : _mutex("SyncClusterConnection"), _socketTimeout( socketTimeout ) { { stringstream s; int n=0; @@ -38,7 +38,7 @@ namespace mongo { _connect( i->toString() ); } - SyncClusterConnection::SyncClusterConnection( string commaSeperated ) : _mutex("SyncClusterConnection") { + SyncClusterConnection::SyncClusterConnection( string commaSeperated, double socketTimeout) : _mutex("SyncClusterConnection"), _socketTimeout( socketTimeout ) { _address = commaSeperated; string::size_type idx; while ( ( idx = commaSeperated.find( ',' ) ) != string::npos ) { @@ -50,7 +50,7 @@ namespace mongo { uassert( 8004 , "SyncClusterConnection needs 3 servers" , _conns.size() == 3 ); } - SyncClusterConnection::SyncClusterConnection( string a , string b , string c ) : _mutex("SyncClusterConnection") { + SyncClusterConnection::SyncClusterConnection( string a , string b , string c, double socketTimeout) : _mutex("SyncClusterConnection"), _socketTimeout( socketTimeout ) { _address = a + "," + b + "," + c; // connect to all even if not working _connect( a ); @@ -58,7 +58,7 @@ namespace mongo { _connect( c ); } - SyncClusterConnection::SyncClusterConnection( SyncClusterConnection& prev ) : _mutex("SyncClusterConnection") { + SyncClusterConnection::SyncClusterConnection( SyncClusterConnection& prev, double socketTimeout) : _mutex("SyncClusterConnection"), _socketTimeout( socketTimeout ) { assert(0); } @@ -79,7 +79,7 @@ namespace mongo { for ( size_t i=0; i<_conns.size(); i++ ) { BSONObj res; try { - if ( _conns[i]->simpleCommand( "admin" , 0 , "fsync" ) ) + if ( _conns[i]->simpleCommand( "admin" , &res , "fsync" ) ) continue; } catch ( DBException& e ) { @@ -144,6 +144,7 @@ namespace mongo { void SyncClusterConnection::_connect( string host ) { log() << "SyncClusterConnection connecting to [" << host << "]" << endl; DBClientConnection * c = new DBClientConnection( true ); + c->setSoTimeout( _socketTimeout ); string errmsg; if ( ! c->connect( host , errmsg ) ) log() << "SyncClusterConnection connect fail to: " << host << " errmsg: " << errmsg << endl; @@ -159,7 +160,7 @@ namespace mongo { BSONObj SyncClusterConnection::findOne(const string &ns, const Query& query, const BSONObj *fieldsToReturn, int queryOptions) { if ( ns.find( ".$cmd" ) != string::npos ) { - string cmdName = query.obj.firstElement().fieldName(); + string cmdName = query.obj.firstElementFieldName(); int lockType = _lockType( cmdName ); @@ -194,12 +195,22 @@ namespace mongo { return DBClientBase::findOne( ns , query , fieldsToReturn , queryOptions ); } + bool SyncClusterConnection::auth(const string &dbname, const string &username, const string &password_text, string& errmsg, bool digestPassword) { + for (vector<DBClientConnection*>::iterator it = _conns.begin(); it < _conns.end(); it++) { + massert( 15848, "sync cluster of sync clusters?", (*it)->type() != ConnectionString::SYNC); + + if (!(*it)->auth(dbname, username, password_text, errmsg, digestPassword)) { + return false; + } + } + return true; + } auto_ptr<DBClientCursor> SyncClusterConnection::query(const string &ns, Query query, int nToReturn, int nToSkip, const BSONObj *fieldsToReturn, int queryOptions, int batchSize ) { _lastErrors.clear(); if ( ns.find( ".$cmd" ) != string::npos ) { - string cmdName = query.obj.firstElement().fieldName(); + string cmdName = query.obj.firstElementFieldName(); int lockType = _lockType( cmdName ); uassert( 13054 , (string)"write $cmd not supported in SyncClusterConnection::query for:" + cmdName , lockType <= 0 ); } @@ -240,7 +251,7 @@ namespace mongo { return c; } - void SyncClusterConnection::insert( const string &ns, BSONObj obj ) { + void SyncClusterConnection::insert( const string &ns, BSONObj obj , int flags) { uassert( 13119 , (string)"SyncClusterConnection::insert obj has to have an _id: " + obj.jsonString() , ns.find( ".system.indexes" ) != string::npos || obj["_id"].type() ); @@ -250,13 +261,13 @@ namespace mongo { throw UserException( 8003 , (string)"SyncClusterConnection::insert prepare failed: " + errmsg ); for ( size_t i=0; i<_conns.size(); i++ ) { - _conns[i]->insert( ns , obj ); + _conns[i]->insert( ns , obj , flags); } _checkLast(); } - void SyncClusterConnection::insert( const string &ns, const vector< BSONObj >& v ) { + void SyncClusterConnection::insert( const string &ns, const vector< BSONObj >& v , int flags) { uassert( 10023 , "SyncClusterConnection bulk insert not implemented" , 0); } @@ -284,7 +295,7 @@ namespace mongo { throw UserException( 8005 , (string)"SyncClusterConnection::udpate prepare failed: " + errmsg ); } - for ( size_t i=0; i<_conns.size(); i++ ) { + for ( size_t i = 0; i < _conns.size(); i++ ) { try { _conns[i]->update( ns , query , obj , upsert , multi ); } @@ -347,7 +358,7 @@ namespace mongo { throw UserException( 8008 , "all servers down!" ); } - void SyncClusterConnection::say( Message &toSend ) { + void SyncClusterConnection::say( Message &toSend, bool isRetry ) { string errmsg; if ( ! prepare( errmsg ) ) throw UserException( 13397 , (string)"SyncClusterConnection::say prepare failed: " + errmsg ); @@ -386,4 +397,11 @@ namespace mongo { assert(0); } + void SyncClusterConnection::setAllSoTimeouts( double socketTimeout ){ + _socketTimeout = socketTimeout; + for ( size_t i=0; i<_conns.size(); i++ ) + + if( _conns[i] ) _conns[i]->setSoTimeout( socketTimeout ); + } + } diff --git a/client/syncclusterconnection.h b/client/syncclusterconnection.h index c946073..68dd338 100644 --- a/client/syncclusterconnection.h +++ b/client/syncclusterconnection.h @@ -43,9 +43,9 @@ namespace mongo { /** * @param commaSeparated should be 3 hosts comma separated */ - SyncClusterConnection( const list<HostAndPort> & ); - SyncClusterConnection( string commaSeparated ); - SyncClusterConnection( string a , string b , string c ); + SyncClusterConnection( const list<HostAndPort> &, double socketTimeout = 0); + SyncClusterConnection( string commaSeparated, double socketTimeout = 0); + SyncClusterConnection( string a , string b , string c, double socketTimeout = 0 ); ~SyncClusterConnection(); /** @@ -67,16 +67,16 @@ namespace mongo { virtual auto_ptr<DBClientCursor> getMore( const string &ns, long long cursorId, int nToReturn, int options ); - virtual void insert( const string &ns, BSONObj obj ); + virtual void insert( const string &ns, BSONObj obj, int flags=0); - virtual void insert( const string &ns, const vector< BSONObj >& v ); + virtual void insert( const string &ns, const vector< BSONObj >& v, int flags=0); virtual void remove( const string &ns , Query query, bool justOne ); virtual void update( const string &ns , Query query , BSONObj obj , bool upsert , bool multi ); virtual bool call( Message &toSend, Message &response, bool assertOk , string * actualServer ); - virtual void say( Message &toSend ); + virtual void say( Message &toSend, bool isRetry = false ); virtual void sayPiggyBack( Message &toSend ); virtual void killCursor( long long cursorID ); @@ -91,8 +91,14 @@ namespace mongo { virtual ConnectionString::ConnectionType type() const { return ConnectionString::SYNC; } + void setAllSoTimeouts( double socketTimeout ); + double getSoTimeout() const { return _socketTimeout; } + + virtual bool auth(const string &dbname, const string &username, const string &password_text, string& errmsg, bool digestPassword); + + virtual bool lazySupported() const { return false; } private: - SyncClusterConnection( SyncClusterConnection& prev ); + SyncClusterConnection( SyncClusterConnection& prev, double socketTimeout = 0 ); string _toString() const; bool _commandOnActive(const string &dbname, const BSONObj& cmd, BSONObj &info, int options=0); auto_ptr<DBClientCursor> _queryOnActive(const string &ns, Query query, int nToReturn, int nToSkip, @@ -108,6 +114,8 @@ namespace mongo { mongo::mutex _mutex; vector<BSONObj> _lastErrors; + + double _socketTimeout; }; class UpdateNotTheSame : public UserException { diff --git a/client/undef_macros.h b/client/undef_macros.h index bc59a84..30ece61 100644 --- a/client/undef_macros.h +++ b/client/undef_macros.h @@ -1,4 +1,4 @@ -/** @file undef_macros.h - remove mongo-specific macros that might cause issues */ +/** @file undef_macros.h remove mongo implementation macros after using */ /* Copyright 2009 10gen Inc. * |