diff options
author | Antonin Kral <a.kral@bobek.cz> | 2011-03-17 00:05:43 +0100 |
---|---|---|
committer | Antonin Kral <a.kral@bobek.cz> | 2011-03-17 00:05:43 +0100 |
commit | 582fc32574a3b158c81e49cb00e6ae59205e66ba (patch) | |
tree | ac64a3243e0d2121709f685695247052858115c8 /client | |
parent | 2761bffa96595ac1698d86bbc2e95ebb0d4d6e93 (diff) | |
download | mongodb-582fc32574a3b158c81e49cb00e6ae59205e66ba.tar.gz |
Imported Upstream version 1.8.0
Diffstat (limited to 'client')
35 files changed, 2748 insertions, 1512 deletions
diff --git a/client/clientOnly.cpp b/client/clientOnly.cpp index 6178257..726c3a9 100644 --- a/client/clientOnly.cpp +++ b/client/clientOnly.cpp @@ -29,7 +29,7 @@ namespace mongo { bool dbexitCalled = false; - void dbexit( ExitCode returnCode, const char *whyMsg ) { + void dbexit( ExitCode returnCode, const char *whyMsg , bool tryToGetLock ) { dbexitCalled = true; out() << "dbexit called" << endl; if ( whyMsg ) @@ -37,12 +37,12 @@ namespace mongo { out() << "exiting" << endl; ::exit( returnCode ); } - - bool inShutdown(){ + + bool inShutdown() { return dbexitCalled; } - void setupSignals(){ + void setupSignals() { // maybe should do SIGPIPE here, not sure } @@ -50,20 +50,20 @@ namespace mongo { return "in client only mode"; } - bool haveLocalShardingInfo( const string& ns ){ + bool haveLocalShardingInfo( const string& ns ) { return false; } - DBClientBase * createDirectClient(){ + DBClientBase * createDirectClient() { uassert( 10256 , "no createDirectClient in clientOnly" , 0 ); return 0; } - void Shard::getAllShards( vector<Shard>& all ){ + void Shard::getAllShards( vector<Shard>& all ) { assert(0); } - bool Shard::isAShard( const string& ident ){ + bool Shard::isAShardNode( const string& ident ) { assert(0); return false; } diff --git a/client/connpool.cpp b/client/connpool.cpp index dae13f6..a521699 100644 --- a/client/connpool.cpp +++ b/client/connpool.cpp @@ -26,162 +26,240 @@ namespace mongo { + // ------ PoolForHost ------ + + PoolForHost::~PoolForHost() { + while ( ! _pool.empty() ) { + StoredConnection sc = _pool.top(); + delete sc.conn; + _pool.pop(); + } + } + + void PoolForHost::done( DBClientBase * c ) { + if ( _pool.size() >= _maxPerHost ) { + delete c; + } + else { + _pool.push(c); + } + } + + DBClientBase * PoolForHost::get() { + + time_t now = time(0); + + while ( ! _pool.empty() ) { + StoredConnection sc = _pool.top(); + _pool.pop(); + if ( sc.ok( now ) ) + return sc.conn; + delete sc.conn; + } + + return NULL; + } + + void PoolForHost::flush() { + vector<StoredConnection> all; + while ( ! _pool.empty() ) { + StoredConnection c = _pool.top(); + _pool.pop(); + all.push_back( c ); + bool res; + c.conn->isMaster( res ); + } + + for ( vector<StoredConnection>::iterator i=all.begin(); i != all.end(); ++i ) { + _pool.push( *i ); + } + } + + PoolForHost::StoredConnection::StoredConnection( DBClientBase * c ) { + conn = c; + when = time(0); + } + + bool PoolForHost::StoredConnection::ok( time_t now ) { + // if connection has been idle for an hour, kill it + return ( now - when ) < 3600; + } + + void PoolForHost::createdOne( DBClientBase * base) { + if ( _created == 0 ) + _type = base->type(); + _created++; + } + + unsigned PoolForHost::_maxPerHost = 50; + + // ------ DBConnectionPool ------ + DBConnectionPool pool; - + DBClientBase* DBConnectionPool::_get(const string& ident) { scoped_lock L(_mutex); - PoolForHost& p = _pools[ident]; - if ( p.pool.empty() ) - return 0; - - DBClientBase *c = p.pool.top(); - p.pool.pop(); - return c; + return p.get(); } - DBClientBase* DBConnectionPool::_finishCreate( const string& host , DBClientBase* conn ){ + DBClientBase* DBConnectionPool::_finishCreate( const string& host , DBClientBase* conn ) { { scoped_lock L(_mutex); PoolForHost& p = _pools[host]; - p.created++; + p.createdOne( conn ); } onCreate( conn ); onHandedOut( conn ); - + return conn; } DBClientBase* DBConnectionPool::get(const ConnectionString& url) { DBClientBase * c = _get( url.toString() ); - if ( c ){ + if ( c ) { onHandedOut( c ); return c; } - + string errmsg; c = url.connect( errmsg ); - uassert( 13328 , (string)"dbconnectionpool: connect failed " + url.toString() + " : " + errmsg , c ); - + uassert( 13328 , _name + ": connect failed " + url.toString() + " : " + errmsg , c ); + return _finishCreate( url.toString() , c ); } - + DBClientBase* DBConnectionPool::get(const string& host) { DBClientBase * c = _get( host ); - if ( c ){ + if ( c ) { onHandedOut( c ); return c; } - + string errmsg; ConnectionString cs = ConnectionString::parse( host , errmsg ); uassert( 13071 , (string)"invalid hostname [" + host + "]" + errmsg , cs.isValid() ); - + c = cs.connect( errmsg ); - uassert( 11002 , (string)"dbconnectionpool: connect failed " + host + " : " + errmsg , c ); + if ( ! c ) + throw SocketException( SocketException::CONNECT_ERROR , host , 11002 , str::stream() << _name << " error: " << errmsg ); return _finishCreate( host , c ); } - DBConnectionPool::~DBConnectionPool(){ - for ( map<string,PoolForHost>::iterator i = _pools.begin(); i != _pools.end(); i++ ){ - PoolForHost& p = i->second; - - while ( ! p.pool.empty() ){ - DBClientBase * c = p.pool.top(); - delete c; - p.pool.pop(); - } - } + DBConnectionPool::~DBConnectionPool() { + // connection closing is handled by ~PoolForHost } - void DBConnectionPool::flush(){ + void DBConnectionPool::flush() { scoped_lock L(_mutex); - for ( map<string,PoolForHost>::iterator i = _pools.begin(); i != _pools.end(); i++ ){ + for ( PoolMap::iterator i = _pools.begin(); i != _pools.end(); i++ ) { PoolForHost& p = i->second; - - vector<DBClientBase*> all; - while ( ! p.pool.empty() ){ - DBClientBase * c = p.pool.top(); - p.pool.pop(); - all.push_back( c ); - bool res; - c->isMaster( res ); - } - - for ( vector<DBClientBase*>::iterator i=all.begin(); i != all.end(); i++ ){ - p.pool.push( *i ); - } + p.flush(); } } - void DBConnectionPool::addHook( DBConnectionHook * hook ){ + void DBConnectionPool::addHook( DBConnectionHook * hook ) { _hooks.push_back( hook ); } - void DBConnectionPool::onCreate( DBClientBase * conn ){ + void DBConnectionPool::onCreate( DBClientBase * conn ) { if ( _hooks.size() == 0 ) return; - - for ( list<DBConnectionHook*>::iterator i = _hooks.begin(); i != _hooks.end(); i++ ){ + + for ( list<DBConnectionHook*>::iterator i = _hooks.begin(); i != _hooks.end(); i++ ) { (*i)->onCreate( conn ); } } - void DBConnectionPool::onHandedOut( DBClientBase * conn ){ + void DBConnectionPool::onHandedOut( DBClientBase * conn ) { if ( _hooks.size() == 0 ) return; - - for ( list<DBConnectionHook*>::iterator i = _hooks.begin(); i != _hooks.end(); i++ ){ + + for ( list<DBConnectionHook*>::iterator i = _hooks.begin(); i != _hooks.end(); i++ ) { (*i)->onHandedOut( conn ); } } - void DBConnectionPool::appendInfo( BSONObjBuilder& b ){ - scoped_lock lk( _mutex ); + void DBConnectionPool::appendInfo( BSONObjBuilder& b ) { BSONObjBuilder bb( b.subobjStart( "hosts" ) ); - for ( map<string,PoolForHost>::iterator i=_pools.begin(); i!=_pools.end(); ++i ){ - string s = i->first; - BSONObjBuilder temp( bb.subobjStart( s.c_str() ) ); - temp.append( "available" , (int)(i->second.pool.size()) ); - temp.appendNumber( "created" , i->second.created ); - temp.done(); + int avail = 0; + long long created = 0; + + + map<ConnectionString::ConnectionType,long long> createdByType; + + { + scoped_lock lk( _mutex ); + for ( PoolMap::iterator i=_pools.begin(); i!=_pools.end(); ++i ) { + string s = i->first; + BSONObjBuilder temp( bb.subobjStart( s ) ); + temp.append( "available" , i->second.numAvailable() ); + temp.appendNumber( "created" , i->second.numCreated() ); + temp.done(); + + avail += i->second.numAvailable(); + created += i->second.numCreated(); + + long long& x = createdByType[i->second.type()]; + x += i->second.numCreated(); + } } bb.done(); + + { + BSONObjBuilder temp( bb.subobjStart( "createdByType" ) ); + for ( map<ConnectionString::ConnectionType,long long>::iterator i=createdByType.begin(); i!=createdByType.end(); ++i ) { + temp.appendNumber( ConnectionString::typeToString( i->first ) , i->second ); + } + temp.done(); + } + + b.append( "totalAvailable" , avail ); + b.appendNumber( "totalCreated" , created ); } - ScopedDbConnection * ScopedDbConnection::steal(){ + bool DBConnectionPool::serverNameCompare::operator()( const string& a , const string& b ) const{ + string ap = str::before( a , "/" ); + string bp = str::before( b , "/" ); + + return ap < bp; + } + + // ------ ScopedDbConnection ------ + + ScopedDbConnection * ScopedDbConnection::steal() { assert( _conn ); ScopedDbConnection * n = new ScopedDbConnection( _host , _conn ); _conn = 0; return n; } - + ScopedDbConnection::~ScopedDbConnection() { - if ( _conn ){ + if ( _conn ) { if ( ! _conn->isFailed() ) { /* see done() comments above for why we log this line */ - log() << "~ScopedDBConnection: _conn != null" << endl; + log() << "~ScopedDbConnection: _conn != null" << endl; } kill(); } } ScopedDbConnection::ScopedDbConnection(const Shard& shard ) - : _host( shard.getConnString() ) , _conn( pool.get(_host) ){ + : _host( shard.getConnString() ) , _conn( pool.get(_host) ) { } - + ScopedDbConnection::ScopedDbConnection(const Shard* shard ) - : _host( shard->getConnString() ) , _conn( pool.get(_host) ){ + : _host( shard->getConnString() ) , _conn( pool.get(_host) ) { } class PoolFlushCmd : public Command { public: - PoolFlushCmd() : Command( "connPoolSync" , false , "connpoolsync" ){} + PoolFlushCmd() : Command( "connPoolSync" , false , "connpoolsync" ) {} virtual void help( stringstream &help ) const { help<<"internal"; } virtual LockType locktype() const { return NONE; } - virtual bool run(const string&, mongo::BSONObj&, std::string&, mongo::BSONObjBuilder& result, bool){ + virtual bool run(const string&, mongo::BSONObj&, std::string&, mongo::BSONObjBuilder& result, bool) { pool.flush(); return true; } @@ -193,11 +271,13 @@ namespace mongo { class PoolStats : public Command { public: - PoolStats() : Command( "connPoolStats" ){} + PoolStats() : Command( "connPoolStats" ) {} virtual void help( stringstream &help ) const { help<<"stats about connection pool"; } virtual LockType locktype() const { return NONE; } - virtual bool run(const string&, mongo::BSONObj&, std::string&, mongo::BSONObjBuilder& result, bool){ + virtual bool run(const string&, mongo::BSONObj&, std::string&, mongo::BSONObjBuilder& result, bool) { pool.appendInfo( result ); + result.append( "numDBClientConnection" , DBClientConnection::getNumConnections() ); + result.append( "numAScopedConnection" , AScopedConnection::getNumConnections() ); return true; } virtual bool slaveOk() const { @@ -206,5 +286,6 @@ namespace mongo { } poolStatsCmd; + AtomicUInt AScopedConnection::_numConnections; } // namespace mongo diff --git a/client/connpool.h b/client/connpool.h index 00570c5..e7f59d6 100644 --- a/client/connpool.h +++ b/client/connpool.h @@ -24,55 +24,109 @@ namespace mongo { class Shard; - - struct PoolForHost { + + /** + * not thread safe + * thread safety is handled by DBConnectionPool + */ + class PoolForHost { + public: PoolForHost() - : created(0){} - PoolForHost( const PoolForHost& other ){ - assert(other.pool.size() == 0); - created = other.created; - assert( created == 0 ); + : _created(0) {} + + PoolForHost( const PoolForHost& other ) { + assert(other._pool.size() == 0); + _created = other._created; + assert( _created == 0 ); } - - std::stack<DBClientBase*> pool; - long long created; + + ~PoolForHost(); + + int numAvailable() const { return (int)_pool.size(); } + + void createdOne( DBClientBase * base); + long long numCreated() const { return _created; } + + ConnectionString::ConnectionType type() const { assert(_created); return _type; } + + /** + * gets a connection or return NULL + */ + DBClientBase * get(); + + void done( DBClientBase * c ); + + void flush(); + + static void setMaxPerHost( unsigned max ) { _maxPerHost = max; } + static unsigned getMaxPerHost() { return _maxPerHost; } + private: + + struct StoredConnection { + StoredConnection( DBClientBase * c ); + + bool ok( time_t now ); + + DBClientBase* conn; + time_t when; + }; + + std::stack<StoredConnection> _pool; + long long _created; + ConnectionString::ConnectionType _type; + + static unsigned _maxPerHost; }; - + class DBConnectionHook { public: - virtual ~DBConnectionHook(){} - virtual void onCreate( DBClientBase * conn ){} - virtual void onHandedOut( DBClientBase * conn ){} + virtual ~DBConnectionHook() {} + virtual void onCreate( DBClientBase * conn ) {} + virtual void onHandedOut( DBClientBase * conn ) {} }; /** Database connection pool. Generally, use ScopedDbConnection and do not call these directly. - This class, so far, is suitable for use with unauthenticated connections. - Support for authenticated connections requires some adjustements: please + This class, so far, is suitable for use with unauthenticated connections. + Support for authenticated connections requires some adjustements: please request... Usage: - + { ScopedDbConnection c("myserver"); c.conn()... } */ class DBConnectionPool { + + public: + + /** compares server namees, but is smart about replica set names */ + struct serverNameCompare { + bool operator()( const string& a , const string& b ) const; + }; + + private: + mongo::mutex _mutex; - map<string,PoolForHost> _pools; // servername -> pool + typedef map<string,PoolForHost,serverNameCompare> PoolMap; // servername -> pool + PoolMap _pools; list<DBConnectionHook*> _hooks; + string _name; DBClientBase* _get( const string& ident ); - + DBClientBase* _finishCreate( const string& ident , DBClientBase* conn ); - public: - DBConnectionPool() : _mutex("DBConnectionPool") { } + public: + DBConnectionPool() : _mutex("DBConnectionPool") , _name( "dbconnectionpool" ) { } ~DBConnectionPool(); + /** right now just controls some asserts. defaults to "dbconnectionpool" */ + void setName( const string& name ) { _name = name; } void onCreate( DBClientBase * conn ); void onHandedOut( DBClientBase * conn ); @@ -83,72 +137,78 @@ namespace mongo { DBClientBase *get(const ConnectionString& host); void release(const string& host, DBClientBase *c) { - if ( c->isFailed() ){ + if ( c->isFailed() ) { delete c; return; } scoped_lock L(_mutex); - _pools[host].pool.push(c); + _pools[host].done(c); } void addHook( DBConnectionHook * hook ); void appendInfo( BSONObjBuilder& b ); }; - + extern DBConnectionPool pool; class AScopedConnection : boost::noncopyable { public: - virtual ~AScopedConnection(){} + AScopedConnection() { _numConnections++; } + virtual ~AScopedConnection() { _numConnections--; } virtual DBClientBase* get() = 0; virtual void done() = 0; virtual string getHost() const = 0; + + /** + * @return total number of current instances of AScopedConnection + */ + static int getNumConnections() { return _numConnections; } + + private: + static AtomicUInt _numConnections; }; /** Use to get a connection from the pool. On exceptions things - clean up nicely. + clean up nicely (i.e. the socket gets closed automatically when the + scopeddbconnection goes out of scope). */ class ScopedDbConnection : public AScopedConnection { - const string _host; - DBClientBase *_conn; public: + /** the main constructor you want to use + throws UserException if can't connect + */ + explicit ScopedDbConnection(const string& host) : _host(host), _conn( pool.get(host) ) {} + + ScopedDbConnection() : _host( "" ) , _conn(0) {} + + /* @param conn - bind to an existing connection */ + ScopedDbConnection(const string& host, DBClientBase* conn ) : _host( host ) , _conn( conn ) {} + + /** throws UserException if can't connect */ + explicit ScopedDbConnection(const ConnectionString& url ) : _host(url.toString()), _conn( pool.get(url) ) {} + + /** throws UserException if can't connect */ + explicit ScopedDbConnection(const Shard& shard ); + explicit ScopedDbConnection(const Shard* shard ); + + ~ScopedDbConnection(); + /** get the associated connection object */ - DBClientBase* operator->(){ - uassert( 11004 , "did you call done already" , _conn ); - return _conn; + DBClientBase* operator->() { + uassert( 11004 , "connection was returned to the pool already" , _conn ); + return _conn; } - + /** get the associated connection object */ DBClientBase& conn() { - uassert( 11005 , "did you call done already" , _conn ); + uassert( 11005 , "connection was returned to the pool already" , _conn ); return *_conn; } /** get the associated connection object */ DBClientBase* get() { - uassert( 13102 , "did you call done already" , _conn ); + uassert( 13102 , "connection was returned to the pool already" , _conn ); return _conn; } - - ScopedDbConnection() - : _host( "" ) , _conn(0) { - } - - /** throws UserException if can't connect */ - ScopedDbConnection(const string& host) - : _host(host), _conn( pool.get(host) ) { - } - - ScopedDbConnection(const string& host, DBClientBase* conn ) - : _host( host ) , _conn( conn ){ - } - - ScopedDbConnection(const Shard& shard ); - ScopedDbConnection(const Shard* shard ); - - ScopedDbConnection(const ConnectionString& url ) - : _host(url.toString()), _conn( pool.get(url) ) { - } - string getHost() const { return _host; } @@ -161,8 +221,8 @@ namespace mongo { } /** Call this when you are done with the connection. - - If you do not call done() before this object goes out of scope, + + If you do not call done() before this object goes out of scope, we can't be sure we fully read all expected data of a reply on the socket. so we don't try to reuse the connection in that situation. */ @@ -170,7 +230,7 @@ namespace mongo { if ( ! _conn ) return; - /* we could do this, but instead of assume one is using autoreconnect mode on the connection + /* we could do this, but instead of assume one is using autoreconnect mode on the connection if ( _conn->isFailed() ) kill(); else @@ -178,10 +238,12 @@ namespace mongo { pool.release(_host, _conn); _conn = 0; } - + ScopedDbConnection * steal(); - ~ScopedDbConnection(); + private: + const string _host; + DBClientBase *_conn; }; diff --git a/client/constants.h b/client/constants.h index 66aa9b1..54f3fd2 100644 --- a/client/constants.h +++ b/client/constants.h @@ -2,22 +2,22 @@ #pragma once -namespace mongo { +namespace mongo { /* query results include a 32 result flag word consisting of these bits */ enum ResultFlagType { - /* returned, with zero results, when getMore is called but the cursor id + /* returned, with zero results, when getMore is called but the cursor id is not valid at the server. */ - ResultFlag_CursorNotFound = 1, - + ResultFlag_CursorNotFound = 1, + /* { $err : ... } is being returned */ - ResultFlag_ErrSet = 2, - + ResultFlag_ErrSet = 2, + /* Have to update config from the server, usually $err is also set */ - ResultFlag_ShardConfigStale = 4, - - /* for backward compatability: this let's us know the server supports - the QueryOption_AwaitData option. if it doesn't, a repl slave client should sleep + ResultFlag_ShardConfigStale = 4, + + /* for backward compatability: this let's us know the server supports + the QueryOption_AwaitData option. if it doesn't, a repl slave client should sleep a little between getMore's. */ ResultFlag_AwaitCapable = 8 diff --git a/client/dbclient.cpp b/client/dbclient.cpp index aa9b7ae..b4214ab 100644 --- a/client/dbclient.cpp +++ b/client/dbclient.cpp @@ -31,8 +31,41 @@ namespace mongo { + void ConnectionString::_fillServers( string s ) { + + { + string::size_type idx = s.find( '/' ); + if ( idx != string::npos ) { + _setName = s.substr( 0 , idx ); + s = s.substr( idx + 1 ); + _type = SET; + } + } + + string::size_type idx; + while ( ( idx = s.find( ',' ) ) != string::npos ) { + _servers.push_back( s.substr( 0 , idx ) ); + s = s.substr( idx + 1 ); + } + _servers.push_back( s ); + + } + + void ConnectionString::_finishInit() { + stringstream ss; + if ( _type == SET ) + ss << _setName << "/"; + for ( unsigned i=0; i<_servers.size(); i++ ) { + if ( i > 0 ) + ss << ","; + ss << _servers[i].toString(); + } + _string = ss.str(); + } + + DBClientBase* ConnectionString::connect( string& errmsg ) const { - switch ( _type ){ + switch ( _type ) { case MASTER: { DBClientConnection * c = new DBClientConnection(true); log(1) << "creating new connection to:" << _servers[0] << endl; @@ -42,11 +75,11 @@ namespace mongo { } return c; } - - case PAIR: + + case PAIR: case SET: { DBClientReplicaSet * set = new DBClientReplicaSet( _setName , _servers ); - if( ! set->connect() ){ + if( ! set->connect() ) { delete set; errmsg = "connect failed to set "; errmsg += toString(); @@ -54,7 +87,7 @@ namespace mongo { } return set; } - + case SYNC: { // TODO , don't copy list<HostAndPort> l; @@ -62,40 +95,58 @@ namespace mongo { l.push_back( _servers[i] ); return new SyncClusterConnection( l ); } - + case INVALID: throw UserException( 13421 , "trying to connect to invalid ConnectionString" ); break; } - + assert( 0 ); return 0; } - ConnectionString ConnectionString::parse( const string& host , string& errmsg ){ - + ConnectionString ConnectionString::parse( const string& host , string& errmsg ) { + string::size_type i = host.find( '/' ); - if ( i != string::npos ){ + if ( i != string::npos && i != 0) { // replica set return ConnectionString( SET , host.substr( i + 1 ) , host.substr( 0 , i ) ); } - int numCommas = DBClientBase::countCommas( host ); - - if( numCommas == 0 ) + int numCommas = str::count( host , ',' ); + + if( numCommas == 0 ) return ConnectionString( HostAndPort( host ) ); - - if ( numCommas == 1 ) + + if ( numCommas == 1 ) return ConnectionString( PAIR , host ); if ( numCommas == 2 ) return ConnectionString( SYNC , host ); - + errmsg = (string)"invalid hostname [" + host + "]"; return ConnectionString(); // INVALID } - Query& Query::where(const string &jscode, BSONObj scope) { + string ConnectionString::typeToString( ConnectionType type ) { + switch ( type ) { + case INVALID: + return "invalid"; + case MASTER: + return "master"; + case PAIR: + return "pair"; + case SET: + return "set"; + case SYNC: + return "sync"; + } + assert(0); + return ""; + } + + + Query& Query::where(const string &jscode, BSONObj scope) { /* use where() before sort() and hint() and explain(), else this will assert. */ assert( ! isComplex() ); BSONObjBuilder b; @@ -113,44 +164,44 @@ namespace mongo { obj = b.obj(); } - Query& Query::sort(const BSONObj& s) { + Query& Query::sort(const BSONObj& s) { appendComplex( "orderby", s ); - return *this; + return *this; } Query& Query::hint(BSONObj keyPattern) { appendComplex( "$hint", keyPattern ); - return *this; + return *this; } Query& Query::explain() { appendComplex( "$explain", true ); - return *this; + return *this; } - + Query& Query::snapshot() { appendComplex( "$snapshot", true ); - return *this; + return *this; } - + Query& Query::minKey( const BSONObj &val ) { appendComplex( "$min", val ); - return *this; + return *this; } Query& Query::maxKey( const BSONObj &val ) { appendComplex( "$max", val ); - return *this; + return *this; } - bool Query::isComplex( bool * hasDollar ) const{ - if ( obj.hasElement( "query" ) ){ + bool Query::isComplex( bool * hasDollar ) const { + if ( obj.hasElement( "query" ) ) { if ( hasDollar ) hasDollar[0] = false; return true; } - if ( obj.hasElement( "$query" ) ){ + if ( obj.hasElement( "$query" ) ) { if ( hasDollar ) hasDollar[0] = true; return true; @@ -158,12 +209,12 @@ namespace mongo { return false; } - + BSONObj Query::getFilter() const { bool hasDollar; if ( ! isComplex( &hasDollar ) ) return obj; - + return obj.getObjectField( hasDollar ? "$query" : "query" ); } BSONObj Query::getSort() const { @@ -182,8 +233,8 @@ namespace mongo { bool Query::isExplain() const { return isComplex() && obj.getBoolField( "$explain" ); } - - string Query::toString() const{ + + string Query::toString() const { return obj.toString(); } @@ -203,7 +254,7 @@ namespace mongo { } return _cachedAvailableOptions; } - + inline bool DBClientWithCommands::runCommand(const string &dbname, const BSONObj& cmd, BSONObj &info, int options) { string ns = dbname + ".$cmd"; info = findOne(ns, cmd, 0 , options); @@ -222,38 +273,50 @@ namespace mongo { return runCommand(dbname, b.done(), *info); } - unsigned long long DBClientWithCommands::count(const string &_ns, const BSONObj& query, int options) { - NamespaceString ns(_ns); - BSONObj cmd = BSON( "count" << ns.coll << "query" << query ); + unsigned long long DBClientWithCommands::count(const string &myns, const BSONObj& query, int options, int limit, int skip ) { + NamespaceString ns(myns); + BSONObj cmd = _countCmd( myns , query , options , limit , skip ); BSONObj res; if( !runCommand(ns.db.c_str(), cmd, res, options) ) uasserted(11010,string("count fails:") + res.toString()); return res["n"].numberLong(); } + BSONObj DBClientWithCommands::_countCmd(const string &myns, const BSONObj& query, int options, int limit, int skip ) { + NamespaceString ns(myns); + BSONObjBuilder b; + b.append( "count" , ns.coll ); + b.append( "query" , query ); + if ( limit ) + b.append( "limit" , limit ); + if ( skip ) + b.append( "skip" , skip ); + return b.obj(); + } + BSONObj getlasterrorcmdobj = fromjson("{getlasterror:1}"); - BSONObj DBClientWithCommands::getLastErrorDetailed() { + BSONObj DBClientWithCommands::getLastErrorDetailed() { BSONObj info; runCommand("admin", getlasterrorcmdobj, info); - return info; + return info; } - string DBClientWithCommands::getLastError() { + string DBClientWithCommands::getLastError() { BSONObj info = getLastErrorDetailed(); return getLastErrorString( info ); } - - string DBClientWithCommands::getLastErrorString( const BSONObj& info ){ + + string DBClientWithCommands::getLastErrorString( const BSONObj& info ) { BSONElement e = info["err"]; if( e.eoo() ) return ""; if( e.type() == Object ) return e.toString(); - return e.str(); + return e.str(); } BSONObj getpreverrorcmdobj = fromjson("{getpreverror:1}"); - BSONObj DBClientWithCommands::getPrevError() { + BSONObj DBClientWithCommands::getPrevError() { BSONObj info; runCommand("admin", getpreverrorcmdobj, info); return info; @@ -261,7 +324,7 @@ namespace mongo { BSONObj getnoncecmdobj = fromjson("{getnonce:1}"); - string DBClientWithCommands::createPasswordDigest( const string & username , const string & clearTextPassword ){ + string DBClientWithCommands::createPasswordDigest( const string & username , const string & clearTextPassword ) { md5digest d; { md5_state_t st; @@ -275,11 +338,9 @@ namespace mongo { } bool DBClientWithCommands::auth(const string &dbname, const string &username, const string &password_text, string& errmsg, bool digestPassword) { - //cout << "TEMP AUTH " << toString() << dbname << ' ' << username << ' ' << password_text << ' ' << digestPassword << endl; - - string password = password_text; - if( digestPassword ) - password = createPasswordDigest( username , password_text ); + string password = password_text; + if( digestPassword ) + password = createPasswordDigest( username , password_text ); BSONObj info; string nonce; @@ -310,8 +371,8 @@ namespace mongo { b << "key" << digestToString( d ); authCmd = b.done(); } - - if( runCommand(dbname, authCmd, info) ) + + if( runCommand(dbname, authCmd, info) ) return true; errmsg = info.toString(); @@ -322,7 +383,7 @@ namespace mongo { bool DBClientWithCommands::isMaster(bool& isMaster, BSONObj *info) { BSONObj o; - if ( info == 0 ) + if ( info == 0 ) info = &o; bool ok = runCommand("admin", ismastercmdobj, *info); isMaster = info->getField("ismaster").trueValue(); @@ -331,7 +392,7 @@ namespace mongo { bool DBClientWithCommands::createCollection(const string &ns, long long size, bool capped, int max, BSONObj *info) { BSONObj o; - if ( info == 0 ) info = &o; + if ( info == 0 ) info = &o; BSONObjBuilder b; string db = nsToDatabase(ns.c_str()); b.append("create", ns.c_str() + db.length() + 1); @@ -381,11 +442,11 @@ namespace mongo { return false; } - BSONObj DBClientWithCommands::mapreduce(const string &ns, const string &jsmapf, const string &jsreducef, BSONObj query, const string& outputcolname) { + BSONObj DBClientWithCommands::mapreduce(const string &ns, const string &jsmapf, const string &jsreducef, BSONObj query, const string& outputcolname) { BSONObjBuilder b; b.append("mapreduce", nsGetCollection(ns)); - b.appendCode("map", jsmapf.c_str()); - b.appendCode("reduce", jsreducef.c_str()); + b.appendCode("map", jsmapf); + b.appendCode("reduce", jsreducef); if( !query.isEmpty() ) b.append("query", query); if( !outputcolname.empty() ) @@ -397,7 +458,7 @@ namespace mongo { bool DBClientWithCommands::eval(const string &dbname, const string &jscode, BSONObj& info, BSONElement& retValue, BSONObj *args) { BSONObjBuilder b; - b.appendCode("$eval", jscode.c_str()); + b.appendCode("$eval", jscode); if ( args ) b.appendArray("args", *args); bool ok = runCommand(dbname, b.done(), info); @@ -412,27 +473,27 @@ namespace mongo { return eval(dbname, jscode, info, retValue); } - list<string> DBClientWithCommands::getDatabaseNames(){ + list<string> DBClientWithCommands::getDatabaseNames() { BSONObj info; uassert( 10005 , "listdatabases failed" , runCommand( "admin" , BSON( "listDatabases" << 1 ) , info ) ); uassert( 10006 , "listDatabases.databases not array" , info["databases"].type() == Array ); - + list<string> names; - + BSONObjIterator i( info["databases"].embeddedObjectUserCheck() ); - while ( i.more() ){ + while ( i.more() ) { names.push_back( i.next().embeddedObjectUserCheck()["name"].valuestr() ); } return names; } - list<string> DBClientWithCommands::getCollectionNames( const string& db ){ + list<string> DBClientWithCommands::getCollectionNames( const string& db ) { list<string> names; - + string ns = db + ".system.namespaces"; auto_ptr<DBClientCursor> c = query( ns.c_str() , BSONObj() ); - while ( c->more() ){ + while ( c->more() ) { string name = c->next()["name"].valuestr(); if ( name.find( "$" ) != string::npos ) continue; @@ -441,37 +502,37 @@ namespace mongo { return names; } - bool DBClientWithCommands::exists( const string& ns ){ + bool DBClientWithCommands::exists( const string& ns ) { list<string> names; - + string db = nsGetDB( ns ) + ".system.namespaces"; BSONObj q = BSON( "name" << ns ); - return count( db.c_str() , q ) != 0; + return count( db.c_str() , q, QueryOption_SlaveOk ) != 0; } /* --- dbclientconnection --- */ - bool DBClientConnection::auth(const string &dbname, const string &username, const string &password_text, string& errmsg, bool digestPassword) { - string password = password_text; - if( digestPassword ) - password = createPasswordDigest( username , password_text ); + bool DBClientConnection::auth(const string &dbname, const string &username, const string &password_text, string& errmsg, bool digestPassword) { + string password = password_text; + if( digestPassword ) + password = createPasswordDigest( username , password_text ); - if( autoReconnect ) { - /* note we remember the auth info before we attempt to auth -- if the connection is broken, we will - then have it for the next autoreconnect attempt. - */ - pair<string,string> p = pair<string,string>(username, password); - authCache[dbname] = p; - } + if( autoReconnect ) { + /* note we remember the auth info before we attempt to auth -- if the connection is broken, we will + then have it for the next autoreconnect attempt. + */ + pair<string,string> p = pair<string,string>(username, password); + authCache[dbname] = p; + } - return DBClientBase::auth(dbname, username, password.c_str(), errmsg, false); - } + return DBClientBase::auth(dbname, username, password.c_str(), errmsg, false); + } BSONObj DBClientInterface::findOne(const string &ns, const Query& query, const BSONObj *fieldsToReturn, int queryOptions) { auto_ptr<DBClientCursor> c = this->query(ns, query, 1, 0, fieldsToReturn, queryOptions); - uassert( 10276 , "DBClientBase::findOne: transport error", c.get() ); + uassert( 10276 , str::stream() << "DBClientBase::findOne: transport error: " << getServerAddress() << " query: " << query.toString(), c.get() ); if ( c->hasResultFlag( ResultFlag_ShardConfigStale ) ) throw StaleConfigException( ns , "findOne has stale config" ); @@ -482,20 +543,20 @@ namespace mongo { return c->nextSafe().copy(); } - bool DBClientConnection::connect(const HostAndPort& server, string& errmsg){ + bool DBClientConnection::connect(const HostAndPort& server, string& errmsg) { _server = server; _serverString = _server.toString(); return _connect( errmsg ); } - bool DBClientConnection::_connect( string& errmsg ){ + bool DBClientConnection::_connect( string& errmsg ) { _serverString = _server.toString(); // we keep around SockAddr for connection life -- maybe MessagingPort // requires that? server.reset(new SockAddr(_server.host().c_str(), _server.port())); - p.reset(new MessagingPort( _timeout, _logLevel )); + p.reset(new MessagingPort( _so_timeout, _logLevel )); - if (server->getAddr() == "0.0.0.0"){ + if (server->getAddr() == "0.0.0.0") { failed = true; return false; } @@ -513,35 +574,39 @@ namespace mongo { void DBClientConnection::_checkConnection() { if ( !failed ) return; - if ( lastReconnectTry && time(0)-lastReconnectTry < 2 ) - return; + if ( lastReconnectTry && time(0)-lastReconnectTry < 2 ) { + // we wait a little before reconnect attempt to avoid constant hammering. + // but we throw we don't want to try to use a connection in a bad state + throw SocketException(SocketException::FAILED_STATE); + } if ( !autoReconnect ) - return; + throw SocketException(SocketException::FAILED_STATE); lastReconnectTry = time(0); log(_logLevel) << "trying reconnect to " << _serverString << endl; string errmsg; failed = false; - if ( ! _connect(errmsg) ) { + if ( ! _connect(errmsg) ) { + failed = true; log(_logLevel) << "reconnect " << _serverString << " failed " << errmsg << endl; - return; - } + throw SocketException(SocketException::CONNECT_ERROR); + } - log(_logLevel) << "reconnect " << _serverString << " ok" << endl; - for( map< string, pair<string,string> >::iterator i = authCache.begin(); i != authCache.end(); i++ ) { - const char *dbname = i->first.c_str(); - const char *username = i->second.first.c_str(); - const char *password = i->second.second.c_str(); - if( !DBClientBase::auth(dbname, username, password, errmsg, false) ) - log(_logLevel) << "reconnect: auth failed db:" << dbname << " user:" << username << ' ' << errmsg << '\n'; - } + log(_logLevel) << "reconnect " << _serverString << " ok" << endl; + for( map< string, pair<string,string> >::iterator i = authCache.begin(); i != authCache.end(); i++ ) { + const char *dbname = i->first.c_str(); + const char *username = i->second.first.c_str(); + const char *password = i->second.second.c_str(); + if( !DBClientBase::auth(dbname, username, password, errmsg, false) ) + log(_logLevel) << "reconnect: auth failed db:" << dbname << " user:" << username << ' ' << errmsg << '\n'; + } } auto_ptr<DBClientCursor> DBClientBase::query(const string &ns, Query query, int nToReturn, - int nToSkip, const BSONObj *fieldsToReturn, int queryOptions , int batchSize ) { + int nToSkip, const BSONObj *fieldsToReturn, int queryOptions , int batchSize ) { auto_ptr<DBClientCursor> c( new DBClientCursor( this, - ns, query.obj, nToReturn, nToSkip, - fieldsToReturn, queryOptions , batchSize ) ); + ns, query.obj, nToReturn, nToSkip, + fieldsToReturn, queryOptions , batchSize ) ); if ( c->init() ) return c; return auto_ptr< DBClientCursor >( 0 ); @@ -562,14 +627,14 @@ namespace mongo { } boost::function<void(const BSONObj &)> _f; }; - + unsigned long long DBClientConnection::query( boost::function<void(const BSONObj&)> f, const string& ns, Query query, const BSONObj *fieldsToReturn, int queryOptions ) { DBClientFunConvertor fun; fun._f = f; boost::function<void(DBClientCursorBatchIterator &)> ptr( fun ); return DBClientConnection::query( ptr, ns, query, fieldsToReturn, queryOptions ); } - + unsigned long long DBClientConnection::query( boost::function<void(DBClientCursorBatchIterator &)> f, const string& ns, Query query, const BSONObj *fieldsToReturn, int queryOptions ) { // mask options queryOptions &= (int)( QueryOption_NoCursorTimeout | QueryOption_SlaveOk ); @@ -577,11 +642,11 @@ namespace mongo { bool doExhaust = ( availableOptions() & QueryOption_Exhaust ); if ( doExhaust ) { - queryOptions |= (int)QueryOption_Exhaust; + queryOptions |= (int)QueryOption_Exhaust; } auto_ptr<DBClientCursor> c( this->query(ns, query, 0, 0, fieldsToReturn, queryOptions) ); - massert( 13386, "socket error for mapping query", c.get() ); - + uassert( 13386, "socket error for mapping query", c.get() ); + if ( !doExhaust ) { while( c->more() ) { DBClientCursorBatchIterator i( *c ); @@ -591,21 +656,21 @@ namespace mongo { return n; } - try { - while( 1 ) { - while( c->moreInCurrentBatch() ) { + try { + while( 1 ) { + while( c->moreInCurrentBatch() ) { DBClientCursorBatchIterator i( *c ); f( i ); n += i.n(); } - if( c->getCursorId() == 0 ) + if( c->getCursorId() == 0 ) break; c->exhaustReceiveMore(); } } - catch(std::exception&) { + catch(std::exception&) { /* connection CANNOT be used anymore as more data may be on the way from the server. we have to reconnect. */ @@ -633,16 +698,16 @@ namespace mongo { void DBClientBase::insert( const string & ns , const vector< BSONObj > &v ) { Message toSend; - + BufBuilder b; int opts = 0; b.appendNum( opts ); b.appendStr( ns ); for( vector< BSONObj >::const_iterator i = v.begin(); i != v.end(); ++i ) i->appendSelfToBufBuilder( b ); - + toSend.setData( dbInsert, b.buf(), b.len() ); - + say( toSend ); } @@ -686,63 +751,63 @@ namespace mongo { say( toSend ); } - auto_ptr<DBClientCursor> DBClientWithCommands::getIndexes( const string &ns ){ + auto_ptr<DBClientCursor> DBClientWithCommands::getIndexes( const string &ns ) { return query( Namespace( ns.c_str() ).getSisterNS( "system.indexes" ).c_str() , BSON( "ns" << ns ) ); } - - void DBClientWithCommands::dropIndex( const string& ns , BSONObj keys ){ + + void DBClientWithCommands::dropIndex( const string& ns , BSONObj keys ) { dropIndex( ns , genIndexName( keys ) ); } - void DBClientWithCommands::dropIndex( const string& ns , const string& indexName ){ + void DBClientWithCommands::dropIndex( const string& ns , const string& indexName ) { BSONObj info; - if ( ! runCommand( nsToDatabase( ns.c_str() ) , - BSON( "deleteIndexes" << NamespaceString( ns ).coll << "index" << indexName ) , - info ) ){ + if ( ! runCommand( nsToDatabase( ns.c_str() ) , + BSON( "deleteIndexes" << NamespaceString( ns ).coll << "index" << indexName ) , + info ) ) { log(_logLevel) << "dropIndex failed: " << info << endl; uassert( 10007 , "dropIndex failed" , 0 ); } resetIndexCache(); } - - void DBClientWithCommands::dropIndexes( const string& ns ){ + + void DBClientWithCommands::dropIndexes( const string& ns ) { BSONObj info; - uassert( 10008 , "dropIndexes failed" , runCommand( nsToDatabase( ns.c_str() ) , - BSON( "deleteIndexes" << NamespaceString( ns ).coll << "index" << "*") , - info ) ); + uassert( 10008 , "dropIndexes failed" , runCommand( nsToDatabase( ns.c_str() ) , + BSON( "deleteIndexes" << NamespaceString( ns ).coll << "index" << "*") , + info ) ); resetIndexCache(); } - void DBClientWithCommands::reIndex( const string& ns ){ + void DBClientWithCommands::reIndex( const string& ns ) { list<BSONObj> all; auto_ptr<DBClientCursor> i = getIndexes( ns ); - while ( i->more() ){ + while ( i->more() ) { all.push_back( i->next().getOwned() ); } - + dropIndexes( ns ); - - for ( list<BSONObj>::iterator i=all.begin(); i!=all.end(); i++ ){ + + for ( list<BSONObj>::iterator i=all.begin(); i!=all.end(); i++ ) { BSONObj o = *i; insert( Namespace( ns.c_str() ).getSisterNS( "system.indexes" ).c_str() , o ); } - + } - - string DBClientWithCommands::genIndexName( const BSONObj& keys ){ + + string DBClientWithCommands::genIndexName( const BSONObj& keys ) { stringstream ss; - + bool first = 1; for ( BSONObjIterator i(keys); i.more(); ) { BSONElement f = i.next(); - + if ( first ) first = 0; else ss << "_"; - + ss << f.fieldName() << "_"; if( f.isNumber() ) ss << f.numberInt(); @@ -750,7 +815,7 @@ namespace mongo { return ss.str(); } - bool DBClientWithCommands::ensureIndex( const string &ns , BSONObj keys , bool unique, const string & name ) { + bool DBClientWithCommands::ensureIndex( const string &ns , BSONObj keys , bool unique, const string & name , bool cache ) { BSONObjBuilder toSave; toSave.append( "ns" , ns ); toSave.append( "key" , keys ); @@ -767,13 +832,15 @@ namespace mongo { toSave.append( "name" , nn ); cacheKey += nn; } - + if ( unique ) toSave.appendBool( "unique", unique ); if ( _seenIndexes.count( cacheKey ) ) return 0; - _seenIndexes.insert( cacheKey ); + + if ( cache ) + _seenIndexes.insert( cacheKey ); insert( Namespace( ns.c_str() ).getSisterNS( "system.indexes" ).c_str() , toSave.obj() ); return 1; @@ -808,9 +875,10 @@ namespace mongo { void DBClientConnection::say( Message &toSend ) { checkConnection(); - try { + try { port().say( toSend ); - } catch( SocketException & ) { + } + catch( SocketException & ) { failed = true; throw; } @@ -820,24 +888,25 @@ namespace mongo { port().piggyBack( toSend ); } - void DBClientConnection::recv( Message &m ) { + void DBClientConnection::recv( Message &m ) { port().recv(m); } - bool DBClientConnection::call( Message &toSend, Message &response, bool assertOk ) { - /* todo: this is very ugly messagingport::call returns an error code AND can throw - an exception. we should make it return void and just throw an exception anytime + bool DBClientConnection::call( Message &toSend, Message &response, bool assertOk , string * actualServer ) { + /* todo: this is very ugly messagingport::call returns an error code AND can throw + an exception. we should make it return void and just throw an exception anytime it fails */ - try { + try { if ( !port().call(toSend, response) ) { failed = true; if ( assertOk ) - uassert( 10278 , "dbclient error communicating with server", false); + uasserted( 10278 , str::stream() << "dbclient error communicating with server: " << getServerAddress() ); + return false; } } - catch( SocketException & ) { + catch( SocketException & ) { failed = true; throw; } @@ -858,222 +927,24 @@ namespace mongo { } } - void DBClientConnection::killCursor( long long cursorId ){ + void DBClientConnection::killCursor( long long cursorId ) { BufBuilder b; b.appendNum( (int)0 ); // reserved b.appendNum( (int)1 ); // number b.appendNum( cursorId ); - + Message m; m.setData( dbKillCursors , b.buf() , b.len() ); - sayPiggyBack( m ); + if ( _lazyKillCursor ) + sayPiggyBack( m ); + else + say(m); } - /* --- class dbclientpaired --- */ + AtomicUInt DBClientConnection::_numConnections; + bool DBClientConnection::_lazyKillCursor = true; - string DBClientReplicaSet::toString() { - return getServerAddress(); - } - - DBClientReplicaSet::DBClientReplicaSet( const string& name , const vector<HostAndPort>& servers ) - : _name( name ) , _currentMaster( 0 ), _servers( servers ){ - - for ( unsigned i=0; i<_servers.size(); i++ ) - _conns.push_back( new DBClientConnection( true , this ) ); - } - - DBClientReplicaSet::~DBClientReplicaSet(){ - for ( unsigned i=0; i<_conns.size(); i++ ) - delete _conns[i]; - _conns.clear(); - } - - string DBClientReplicaSet::getServerAddress() const { - StringBuilder ss; - if ( _name.size() ) - ss << _name << "/"; - - for ( unsigned i=0; i<_servers.size(); i++ ){ - if ( i > 0 ) - ss << ","; - ss << _servers[i].toString(); - } - return ss.str(); - } - - /* find which server, the left or right, is currently master mode */ - void DBClientReplicaSet::_checkMaster() { - - bool triedQuickCheck = false; - - log( _logLevel + 1) << "_checkMaster on: " << toString() << endl; - for ( int retry = 0; retry < 2; retry++ ) { - for ( unsigned i=0; i<_conns.size(); i++ ){ - DBClientConnection * c = _conns[i]; - try { - bool im; - BSONObj o; - c->isMaster(im, &o); - - if ( retry ) - log(_logLevel) << "checkmaster: " << c->toString() << ' ' << o << '\n'; - - string maybePrimary; - if ( o["hosts"].type() == Array ){ - if ( o["primary"].type() == String ) - maybePrimary = o["primary"].String(); - - BSONObjIterator hi(o["hosts"].Obj()); - while ( hi.more() ){ - string toCheck = hi.next().String(); - int found = -1; - for ( unsigned x=0; x<_servers.size(); x++ ){ - if ( toCheck == _servers[x].toString() ){ - found = x; - break; - } - } - - if ( found == -1 ){ - HostAndPort h( toCheck ); - _servers.push_back( h ); - _conns.push_back( new DBClientConnection( true, this ) ); - string temp; - _conns[ _conns.size() - 1 ]->connect( h , temp ); - log( _logLevel ) << "updated set to: " << toString() << endl; - } - - } - } - - if ( im ) { - _currentMaster = c; - return; - } - - if ( maybePrimary.size() && ! triedQuickCheck ){ - for ( unsigned x=0; x<_servers.size(); x++ ){ - if ( _servers[i].toString() != maybePrimary ) - continue; - triedQuickCheck = true; - _conns[x]->isMaster( im , &o ); - if ( im ){ - _currentMaster = _conns[x]; - return; - } - } - } - } - catch ( std::exception& e ) { - if ( retry ) - log(_logLevel) << "checkmaster: caught exception " << c->toString() << ' ' << e.what() << endl; - } - } - sleepsecs(1); - } - - uassert( 10009 , "checkmaster: no master found", false); - } - - DBClientConnection * DBClientReplicaSet::checkMaster() { - if ( _currentMaster ){ - // a master is selected. let's just make sure connection didn't die - if ( ! _currentMaster->isFailed() ) - return _currentMaster; - _currentMaster = 0; - } - - _checkMaster(); - assert( _currentMaster ); - return _currentMaster; - } - - DBClientConnection& DBClientReplicaSet::masterConn(){ - return *checkMaster(); - } - - DBClientConnection& DBClientReplicaSet::slaveConn(){ - DBClientConnection * m = checkMaster(); - assert( ! m->isFailed() ); - - DBClientConnection * failedSlave = 0; - - for ( unsigned i=0; i<_conns.size(); i++ ){ - if ( m == _conns[i] ) - continue; - failedSlave = _conns[i]; - if ( _conns[i]->isFailed() ) - continue; - return *_conns[i]; - } - - assert(failedSlave); - return *failedSlave; - } - - bool DBClientReplicaSet::connect(){ - string errmsg; - - bool anyGood = false; - for ( unsigned i=0; i<_conns.size(); i++ ){ - if ( _conns[i]->connect( _servers[i] , errmsg ) ) - anyGood = true; - } - - if ( ! anyGood ) - return false; - - try { - checkMaster(); - } - catch (AssertionException&) { - return false; - } - return true; - } - - bool DBClientReplicaSet::auth(const string &dbname, const string &username, const string &pwd, string& errmsg, bool digestPassword ) { - DBClientConnection * m = checkMaster(); - if( !m->auth(dbname, username, pwd, errmsg, digestPassword ) ) - return false; - - /* we try to authentiate with the other half of the pair -- even if down, that way the authInfo is cached. */ - for ( unsigned i=0; i<_conns.size(); i++ ){ - if ( _conns[i] == m ) - continue; - try { - string e; - _conns[i]->auth( dbname , username , pwd , e , digestPassword ); - } - catch ( AssertionException& ){ - } - } - - return true; - } - - auto_ptr<DBClientCursor> DBClientReplicaSet::query(const string &a, Query b, int c, int d, - const BSONObj *e, int f, int g){ - // TODO: if slave ok is set go to a slave - return checkMaster()->query(a,b,c,d,e,f,g); - } - - BSONObj DBClientReplicaSet::findOne(const string &a, const Query& b, const BSONObj *c, int d) { - return checkMaster()->findOne(a,b,c,d); - } - - bool DBClientReplicaSet::isMember( const DBConnector * conn ) const { - if ( conn == this ) - return true; - - for ( unsigned i=0; i<_conns.size(); i++ ) - if ( _conns[i]->isMember( conn ) ) - return true; - - return false; - } - bool serverAlive( const string &uri ) { DBClientConnection c( false, 0, 20 ); // potentially the connection to server could fail while we're checking if it's alive - so use timeouts @@ -1084,5 +955,5 @@ namespace mongo { return false; return true; } - + } // namespace mongo diff --git a/client/dbclient.h b/client/dbclient.h index 9448055..9cb6571 100644 --- a/client/dbclient.h +++ b/client/dbclient.h @@ -40,7 +40,7 @@ namespace mongo { /** allow query of replica slave. normally these return an error except for namespace "local". */ QueryOption_SlaveOk = 1 << 2, - + // findingStart mode is used to find the first operation of interest when // we are scanning through a repl log. For efficiency in the common case, // where the first operation of interest is closer to the tail than the head, @@ -52,25 +52,31 @@ namespace mongo { QueryOption_OplogReplay = 1 << 3, /** The server normally times out idle cursors after an inactivy period to prevent excess memory uses - Set this option to prevent that. + Set this option to prevent that. */ QueryOption_NoCursorTimeout = 1 << 4, - /** Use with QueryOption_CursorTailable. If we are at the end of the data, block for a while rather + /** Use with QueryOption_CursorTailable. If we are at the end of the data, block for a while rather than returning no data. After a timeout period, we do return as normal. */ QueryOption_AwaitData = 1 << 5, - /** Stream the data down full blast in multiple "more" packages, on the assumption that the client - will fully read all data queried. Faster when you are pulling a lot of data and know you want to + /** Stream the data down full blast in multiple "more" packages, on the assumption that the client + will fully read all data queried. Faster when you are pulling a lot of data and know you want to pull it all down. Note: it is not allowed to not read all the data unless you close the connection. - Use the query( boost::function<void(const BSONObj&)> f, ... ) version of the connection's query() + Use the query( boost::function<void(const BSONObj&)> f, ... ) version of the connection's query() method, and it will take care of all the details for you. */ QueryOption_Exhaust = 1 << 6, - - QueryOption_AllSupported = QueryOption_CursorTailable | QueryOption_SlaveOk | QueryOption_OplogReplay | QueryOption_NoCursorTimeout | QueryOption_AwaitData | QueryOption_Exhaust + + /** When sharded, this means its ok to return partial results + Usually we will fail a query if all required shards aren't up + If this is set, it'll be a partial result set + */ + QueryOption_PartialResults = 1 << 7 , + + QueryOption_AllSupported = QueryOption_CursorTailable | QueryOption_SlaveOk | QueryOption_OplogReplay | QueryOption_NoCursorTimeout | QueryOption_AwaitData | QueryOption_Exhaust | QueryOption_PartialResults }; @@ -78,7 +84,7 @@ namespace mongo { /** Upsert - that is, insert the item if no matching item is found. */ UpdateOption_Upsert = 1 << 0, - /** Update multiple documents (if multiple documents match query expression). + /** Update multiple documents (if multiple documents match query expression). (Default is update a single document and stop.) */ UpdateOption_Multi = 1 << 1, @@ -96,28 +102,40 @@ namespace mongo { class DBClientBase; + /** + * ConnectionString handles parsing different ways to connect to mongo and determining method + * samples: + * server + * server:port + * foo/server:port,server:port SET + * server,server,server SYNC + * + * tyipcal use + * string errmsg, + * ConnectionString cs = ConnectionString::parse( url , errmsg ); + * if ( ! cs.isValid() ) throw "bad: " + errmsg; + * DBClientBase * conn = cs.connect( errmsg ); + */ class ConnectionString { public: enum ConnectionType { INVALID , MASTER , PAIR , SET , SYNC }; - - ConnectionString( const HostAndPort& server ){ + + ConnectionString() { + _type = INVALID; + } + + ConnectionString( const HostAndPort& server ) { _type = MASTER; _servers.push_back( server ); _finishInit(); } - // TODO Delete if nobody is using - //ConnectionString( ConnectionType type , const vector<HostAndPort>& servers ) - // : _type( type ) , _servers( servers ){ - // _finishInit(); - //} - - ConnectionString( ConnectionType type , const string& s , const string& setName = "" ){ + ConnectionString( ConnectionType type , const string& s , const string& setName = "" ) { _type = type; _setName = setName; _fillServers( s ); - - switch ( _type ){ + + switch ( _type ) { case MASTER: assert( _servers.size() == 1 ); break; @@ -131,73 +149,54 @@ namespace mongo { default: assert( _servers.size() > 0 ); } - + _finishInit(); } - ConnectionString( const string& s , ConnectionType favoredMultipleType ){ + ConnectionString( const string& s , ConnectionType favoredMultipleType ) { + _type = INVALID; + _fillServers( s ); - if ( _servers.size() == 1 ){ + if ( _type != INVALID ) { + // set already + } + else if ( _servers.size() == 1 ) { _type = MASTER; } else { _type = favoredMultipleType; - assert( _type != MASTER ); + assert( _type == SET || _type == SYNC ); } _finishInit(); } bool isValid() const { return _type != INVALID; } - - string toString() const { - return _string; - } + + string toString() const { return _string; } DBClientBase* connect( string& errmsg ) const; - static ConnectionString parse( const string& url , string& errmsg ); - - string getSetName() const{ - return _setName; - } + string getSetName() const { return _setName; } - vector<HostAndPort> getServers() const { - return _servers; - } + vector<HostAndPort> getServers() const { return _servers; } + ConnectionType type() const { return _type; } + + static ConnectionString parse( const string& url , string& errmsg ); + + static string typeToString( ConnectionType type ); + private: - ConnectionString(){ - _type = INVALID; - } - - void _fillServers( string s ){ - string::size_type idx; - while ( ( idx = s.find( ',' ) ) != string::npos ){ - _servers.push_back( s.substr( 0 , idx ) ); - s = s.substr( idx + 1 ); - } - _servers.push_back( s ); - } - - void _finishInit(){ - stringstream ss; - if ( _type == SET ) - ss << _setName << "/"; - for ( unsigned i=0; i<_servers.size(); i++ ){ - if ( i > 0 ) - ss << ","; - ss << _servers[i].toString(); - } - _string = ss.str(); - } + void _fillServers( string s ); + void _finishInit(); ConnectionType _type; vector<HostAndPort> _servers; string _string; string _setName; }; - + /** * controls how much a clients cares about writes * default is NORMAL @@ -213,7 +212,7 @@ namespace mongo { class DBClientCursor; class DBClientCursorBatchIterator; - /** Represents a Mongo query expression. Typically one uses the QUERY(...) macro to construct a Query object. + /** Represents a Mongo query expression. Typically one uses the QUERY(...) macro to construct a Query object. Examples: QUERY( "age" << 33 << "school" << "UCLA" ).sort("name") QUERY( "age" << GT << 30 << LT << 50 ) @@ -223,22 +222,22 @@ namespace mongo { BSONObj obj; Query() : obj(BSONObj()) { } Query(const BSONObj& b) : obj(b) { } - Query(const string &json) : + Query(const string &json) : obj(fromjson(json)) { } - Query(const char * json) : + Query(const char * json) : obj(fromjson(json)) { } - /** Add a sort (ORDER BY) criteria to the query expression. + /** Add a sort (ORDER BY) criteria to the query expression. @param sortPattern the sort order template. For example to order by name ascending, time descending: { name : 1, ts : -1 } i.e. BSON( "name" << 1 << "ts" << -1 ) - or + or fromjson(" name : 1, ts : -1 ") */ Query& sort(const BSONObj& sortPattern); - /** Add a sort (ORDER BY) criteria to the query expression. + /** Add a sort (ORDER BY) criteria to the query expression. This version of sort() assumes you want to sort on a single field. @param asc = 1 for ascending order asc = -1 for descending order @@ -267,8 +266,8 @@ namespace mongo { */ Query& explain(); - /** Use snapshot mode for the query. Snapshot mode assures no duplicates are returned, or objects missed, which were - present at both the start and end of the query's execution (if an object is new during the query, or deleted during + /** Use snapshot mode for the query. Snapshot mode assures no duplicates are returned, or objects missed, which were + present at both the start and end of the query's execution (if an object is new during the query, or deleted during the query, it may or may not be returned, even with snapshot mode). Note that short query responses (less than 1MB) are always effectively snapshotted. @@ -277,16 +276,16 @@ namespace mongo { */ Query& snapshot(); - /** Queries to the Mongo database support a $where parameter option which contains - a javascript function that is evaluated to see whether objects being queried match - its criteria. Use this helper to append such a function to a query object. + /** Queries to the Mongo database support a $where parameter option which contains + a javascript function that is evaluated to see whether objects being queried match + its criteria. Use this helper to append such a function to a query object. Your query may also contain other traditional Mongo query terms. - @param jscode The javascript function to evaluate against each potential object - match. The function must return true for matched objects. Use the this + @param jscode The javascript function to evaluate against each potential object + match. The function must return true for matched objects. Use the this variable to inspect the current object. - @param scope SavedContext for the javascript object. List in a BSON object any - variables you would like defined when the jscode executes. One can think + @param scope SavedContext for the javascript object. List in a BSON object any + variables you would like defined when the jscode executes. One can think of these as "bind variables". Examples: @@ -300,12 +299,12 @@ namespace mongo { * if this query has an orderby, hint, or some other field */ bool isComplex( bool * hasDollar = 0 ) const; - + BSONObj getFilter() const; BSONObj getSort() const; BSONObj getHint() const; bool isExplain() const; - + string toString() const; operator string() const { return toString(); } private: @@ -316,13 +315,13 @@ namespace mongo { BSONObjBuilder b; b.appendElements(obj); b.append(fieldName, val); - obj = b.obj(); + obj = b.obj(); } }; - -/** Typically one uses the QUERY(...) macro to construct a Query object. - Example: QUERY( "age" << 33 << "school" << "UCLA" ) -*/ + + /** Typically one uses the QUERY(...) macro to construct a Query object. + Example: QUERY( "age" << 33 << "school" << "UCLA" ) + */ #define QUERY(x) mongo::Query( BSON(x) ) /** @@ -331,15 +330,14 @@ namespace mongo { class DBConnector { public: virtual ~DBConnector() {} - virtual bool call( Message &toSend, Message &response, bool assertOk=true ) = 0; + /** actualServer is set to the actual server where they call went if there was a choice (SlaveOk) */ + virtual bool call( Message &toSend, Message &response, bool assertOk=true , string * actualServer = 0 ) = 0; virtual void say( Message &toSend ) = 0; virtual void sayPiggyBack( Message &toSend ) = 0; virtual void checkResponse( const char* data, int nReturned ) {} /* used by QueryOption_Exhaust. To use that your subclass must implement this. */ virtual void recv( Message& m ) { assert(false); } - - virtual string getServerAddress() const = 0; }; /** @@ -352,9 +350,9 @@ namespace mongo { /** don't use this - called automatically by DBClientCursor for you */ virtual auto_ptr<DBClientCursor> getMore( const string &ns, long long cursorId, int nToReturn = 0, int options = 0 ) = 0; - + virtual void insert( const string &ns, BSONObj obj ) = 0; - + virtual void insert( const string &ns, const vector< BSONObj >& v ) = 0; virtual void remove( const string &ns , Query query, bool justOne = 0 ) = 0; @@ -369,6 +367,7 @@ namespace mongo { */ virtual BSONObj findOne(const string &ns, const Query& query, const BSONObj *fieldsToReturn = 0, int queryOptions = 0); + virtual string getServerAddress() const = 0; }; @@ -397,18 +396,18 @@ namespace mongo { directly call runCommand. @param dbname database name. Use "admin" for global administrative commands. - @param cmd the command object to execute. For example, { ismaster : 1 } - @param info the result object the database returns. Typically has { ok : ..., errmsg : ... } fields - set. + @param cmd the command object to execute. For example, { ismaster : 1 } + @param info the result object the database returns. Typically has { ok : ..., errmsg : ... } fields + set. @param options see enum QueryOptions - normally not needed to run a command @return true if the command returned "ok". */ virtual bool runCommand(const string &dbname, const BSONObj& cmd, BSONObj &info, int options=0); /** Authorize access to a particular database. - Authentication is separate for each database on the server -- you may authenticate for any + Authentication is separate for each database on the server -- you may authenticate for any number of databases on a single connection. - The "admin" database is special and once authenticated provides access to all databases on the + The "admin" database is special and once authenticated provides access to all databases on the server. @param digestPassword if password is plain text, set this to true. otherwise assumed to be pre-digested @return true if successful @@ -418,7 +417,7 @@ namespace mongo { /** count number of objects in collection ns that match the query criteria specified throws UserAssertion if database returns an error */ - unsigned long long count(const string &ns, const BSONObj& query = BSONObj(), int options=0 ); + virtual unsigned long long count(const string &ns, const BSONObj& query = BSONObj(), int options=0, int limit=0, int skip=0 ); string createPasswordDigest( const string &username , const string &clearTextPassword ); @@ -450,14 +449,14 @@ namespace mongo { */ bool createCollection(const string &ns, long long size = 0, bool capped = false, int max = 0, BSONObj *info = 0); - /** Get error result from the last operation on this connection. + /** Get error result from the last operation on this connection. @return error message text, or empty string if no error. */ string getLastError(); - /** Get error result from the last operation on this connection. - @return full error object. - */ - virtual BSONObj getLastErrorDetailed(); + /** Get error result from the last operation on this connection. + @return full error object. + */ + virtual BSONObj getLastErrorDetailed(); static string getLastErrorString( const BSONObj& res ); @@ -466,23 +465,23 @@ namespace mongo { @return { err : <error message>, nPrev : <how_many_ops_back_occurred>, ok : 1 } result.err will be null if no error has occurred. - */ + */ BSONObj getPrevError(); - /** Reset the previous error state for this connection (accessed via getLastError and - getPrevError). Useful when performing several operations at once and then checking + /** Reset the previous error state for this connection (accessed via getLastError and + getPrevError). Useful when performing several operations at once and then checking for an error after attempting all operations. */ bool resetError() { return simpleCommand("admin", 0, "reseterror"); } - /** Delete the specified collection. */ - virtual bool dropCollection( const string &ns ){ + /** Delete the specified collection. */ + virtual bool dropCollection( const string &ns ) { string db = nsGetDB( ns ); string coll = nsGetCollection( ns ); uassert( 10011 , "no collection name", coll.size() ); BSONObj info; - + bool res = runCommand( db.c_str() , BSON( "drop" << coll ) , info ); resetIndexCache(); return res; @@ -494,7 +493,7 @@ namespace mongo { bool repairDatabase(const string &dbname, BSONObj *info = 0) { return simpleCommand(dbname, info, "repairDatabase"); } - + /** Copy database from one server or name to another server or name. Generally, you should dropDatabase() first as otherwise the copied information will MERGE @@ -524,23 +523,23 @@ namespace mongo { ProfileOff = 0, ProfileSlow = 1, // log very slow (>100ms) operations ProfileAll = 2 - + }; bool setDbProfilingLevel(const string &dbname, ProfilingLevel level, BSONObj *info = 0); bool getDbProfilingLevel(const string &dbname, ProfilingLevel& level, BSONObj *info = 0); - /** Run a map/reduce job on the server. + /** Run a map/reduce job on the server. See http://www.mongodb.org/display/DOCS/MapReduce ns namespace (db+collection name) of input data - jsmapf javascript map function code - jsreducef javascript reduce function code. + jsmapf javascript map function code + jsreducef javascript reduce function code. query optional query filter for the input - output optional permanent output collection name. if not specified server will + output optional permanent output collection name. if not specified server will generate a temporary collection and return its name. - returns a result object which contains: + returns a result object which contains: { result : <collection_name>, numObjects : <number_of_objects_scanned>, timeMillis : <job_time>, @@ -548,8 +547,8 @@ namespace mongo { [, err : <errmsg_if_error>] } - For example one might call: - result.getField("ok").trueValue() + For example one might call: + result.getField("ok").trueValue() on the result to check if ok. */ BSONObj mapreduce(const string &ns, const string &jsmapf, const string &jsreducef, BSONObj query = BSONObj(), const string& output = ""); @@ -560,7 +559,7 @@ namespace mongo { jscode source code for a javascript function. info the command object which contains any information on the invocation result including the return value and other information. If an error occurs running the jscode, error - information will be in info. (try "out() << info.toString()") + information will be in info. (try "out() << info.toString()") retValue return value from the jscode function. args args to pass to the jscode function. when invoked, the 'args' variable will be defined for use by the jscode. @@ -571,10 +570,10 @@ namespace mongo { */ bool eval(const string &dbname, const string &jscode, BSONObj& info, BSONElement& retValue, BSONObj *args = 0); - /** - + /** validate a collection, checking for errors and reporting back statistics. + this operation is slow and blocking. */ - bool validate( const string &ns , bool scandata=true ){ + bool validate( const string &ns , bool scandata=true ) { BSONObj cmd = BSON( "validate" << nsGetCollection( ns ) << "scandata" << scandata ); BSONObj info; return runCommand( nsGetDB( ns ).c_str() , cmd , info ); @@ -607,7 +606,7 @@ namespace mongo { ret = (NumType) retValue.number(); return true; } - + /** get a list of all the current databases uses the { listDatabases : 1 } command. @@ -623,16 +622,18 @@ namespace mongo { bool exists( const string& ns ); /** Create an index if it does not already exist. - ensureIndex calls are remembered so it is safe/fast to call this function many + ensureIndex calls are remembered so it is safe/fast to call this function many times in your code. @param ns collection to be indexed @param keys the "key pattern" for the index. e.g., { name : 1 } @param unique if true, indicates that key uniqueness should be enforced for this index @param name if not isn't specified, it will be created from the keys (recommended) + @param cache if set to false, the index cache for the connection won't remember this call @return whether or not sent message to db. should be true on first call, false on subsequent unless resetIndexCache was called */ - virtual bool ensureIndex( const string &ns , BSONObj keys , bool unique = false, const string &name = "" ); + virtual bool ensureIndex( const string &ns , BSONObj keys , bool unique = false, const string &name = "", + bool cache = true ); /** clears the index cache, so the subsequent call to ensureIndex for any index will go to the server @@ -640,17 +641,17 @@ namespace mongo { virtual void resetIndexCache(); virtual auto_ptr<DBClientCursor> getIndexes( const string &ns ); - + virtual void dropIndex( const string& ns , BSONObj keys ); virtual void dropIndex( const string& ns , const string& indexName ); - + /** drops all indexes for the collection */ virtual void dropIndexes( const string& ns ); virtual void reIndex( const string& ns ); - + string genIndexName( const BSONObj& keys ); /** Erase / drop an entire database */ @@ -663,33 +664,35 @@ namespace mongo { virtual string toString() = 0; /** @return the database name portion of an ns string */ - string nsGetDB( const string &ns ){ + string nsGetDB( const string &ns ) { string::size_type pos = ns.find( "." ); if ( pos == string::npos ) return ns; - + return ns.substr( 0 , pos ); } - + /** @return the collection name portion of an ns string */ - string nsGetCollection( const string &ns ){ + string nsGetCollection( const string &ns ) { string::size_type pos = ns.find( "." ); if ( pos == string::npos ) return ""; - return ns.substr( pos + 1 ); + return ns.substr( pos + 1 ); } protected: bool isOk(const BSONObj&); - + + BSONObj _countCmd(const string &ns, const BSONObj& query, int options, int limit, int skip ); + enum QueryOptions availableOptions(); - + private: enum QueryOptions _cachedAvailableOptions; bool _haveCachedAvailableOptions; }; - + /** abstract class that implements the core db operations */ @@ -698,20 +701,20 @@ namespace mongo { WriteConcern _writeConcern; public: - DBClientBase(){ + DBClientBase() { _writeConcern = W_NORMAL; } - + WriteConcern getWriteConcern() const { return _writeConcern; } - void setWriteConcern( WriteConcern w ){ _writeConcern = w; } - + void setWriteConcern( WriteConcern w ) { _writeConcern = w; } + /** send a query to the database. @param ns namespace to query, format is <dbname>.<collectname>[.<collectname>]* @param query query to perform on the collection. this is a BSONObj (binary JSON) You may format as { query: { ... }, orderby: { ... } } to specify a sort order. - @param nToReturn n to return. 0 = unlimited + @param nToReturn n to return (i.e., limit). 0 = unlimited @param nToSkip start with the nth item @param fieldsToReturn optional template of which fields to select. if unspecified, returns all fields @param queryOptions see options enum at top of this file @@ -744,23 +747,15 @@ namespace mongo { @param justOne if this true, then once a single match is found will stop */ virtual void remove( const string &ns , Query q , bool justOne = 0 ); - + /** updates objects matching query */ virtual void update( const string &ns , Query query , BSONObj obj , bool upsert = false , bool multi = false ); - + virtual bool isFailed() const = 0; - - virtual void killCursor( long long cursorID ) = 0; - static int countCommas( const string& s ){ - int n = 0; - for ( unsigned i=0; i<s.size(); i++ ) - if ( s[i] == ',' ) - n++; - return n; - } + virtual void killCursor( long long cursorID ) = 0; virtual bool callRead( Message& toSend , Message& response ) = 0; // virtual bool callWrite( Message& toSend , Message& response ) = 0; // TODO: add this if needed @@ -768,47 +763,35 @@ namespace mongo { virtual ConnectionString::ConnectionType type() const = 0; - /** @return true if conn is either equal to or contained in this connection */ - virtual bool isMember( const DBConnector * conn ) const = 0; }; // DBClientBase - + class DBClientReplicaSet; - - class ConnectException : public UserException { + + class ConnectException : public UserException { public: ConnectException(string msg) : UserException(9000,msg) { } }; - /** - A basic connection to the database. + /** + A basic connection to the database. This is the main entry point for talking to a simple Mongo setup */ class DBClientConnection : public DBClientBase { - DBClientReplicaSet *clientSet; - boost::scoped_ptr<MessagingPort> p; - boost::scoped_ptr<SockAddr> server; - bool failed; // true if some sort of fatal error has ever happened - bool autoReconnect; - time_t lastReconnectTry; - HostAndPort _server; // remember for reconnects - string _serverString; - int _port; - void _checkConnection(); - void checkConnection() { if( failed ) _checkConnection(); } - map< string, pair<string,string> > authCache; - double _timeout; - - bool _connect( string& errmsg ); public: - /** @param _autoReconnect if true, automatically reconnect on a connection failure @param cp used by DBClientReplicaSet. You do not need to specify this parameter - @param timeout tcp timeout in seconds - this is for read/write, not connect. + @param timeout tcp timeout in seconds - this is for read/write, not connect. Connect timeout is fixed, but short, at 5 seconds. */ - DBClientConnection(bool _autoReconnect=false, DBClientReplicaSet* cp=0, double timeout=0) : - clientSet(cp), failed(false), autoReconnect(_autoReconnect), lastReconnectTry(0), _timeout(timeout) { } + DBClientConnection(bool _autoReconnect=false, DBClientReplicaSet* cp=0, double so_timeout=0) : + clientSet(cp), failed(false), autoReconnect(_autoReconnect), lastReconnectTry(0), _so_timeout(so_timeout) { + _numConnections++; + } + + virtual ~DBClientConnection() { + _numConnections--; + } /** Connect to a Mongo database server. @@ -821,14 +804,14 @@ namespace mongo { @deprecated please use HostAndPort @return false if fails to connect. */ - virtual bool connect(const char * hostname, string& errmsg){ + virtual bool connect(const char * hostname, string& errmsg) { // TODO: remove this method HostAndPort t( hostname ); return connect( t , errmsg ); } /** Connect to a Mongo database server. - + If autoReconnect is true, you can try to use the DBClientConnection even when false was returned -- it will try to connect again. @@ -846,9 +829,9 @@ namespace mongo { @param serverHostname host to connect to. can include port number ( 127.0.0.1 , 127.0.0.1:5555 ) */ - void connect(const string& serverHostname) { + void connect(const string& serverHostname) { string errmsg; - if( !connect(HostAndPort(serverHostname), errmsg) ) + if( !connect(HostAndPort(serverHostname), errmsg) ) throw ConnectException(string("can't connect ") + errmsg); } @@ -860,23 +843,22 @@ namespace mongo { return DBClientBase::query( ns, query, nToReturn, nToSkip, fieldsToReturn, queryOptions , batchSize ); } - /** uses QueryOption_Exhaust - use DBClientCursorBatchIterator if you want to do items in large blocks, perhpas to avoid granular locking and such. + /** Uses QueryOption_Exhaust + Exhaust mode sends back all data queries as fast as possible, with no back-and-for for OP_GETMORE. If you are certain + you will exhaust the query, it could be useful. + + Use DBClientCursorBatchIterator version if you want to do items in large blocks, perhaps to avoid granular locking and such. */ unsigned long long query( boost::function<void(const BSONObj&)> f, const string& ns, Query query, const BSONObj *fieldsToReturn = 0, int queryOptions = 0); unsigned long long query( boost::function<void(DBClientCursorBatchIterator&)> f, const string& ns, Query query, const BSONObj *fieldsToReturn = 0, int queryOptions = 0); /** - @return true if this connection is currently in a failed state. When autoreconnect is on, + @return true if this connection is currently in a failed state. When autoreconnect is on, a connection will transition back to an ok state after reconnecting. */ - bool isFailed() const { - return failed; - } + bool isFailed() const { return failed; } - MessagingPort& port() { - return *p; - } + MessagingPort& port() { return *p; } string toStringLong() const { stringstream ss; @@ -886,143 +868,59 @@ namespace mongo { } /** Returns the address of the server */ - string toString() { - return _serverString; - } - - string getServerAddress() const { - return _serverString; - } - - virtual void killCursor( long long cursorID ); + string toString() { return _serverString; } - virtual bool callRead( Message& toSend , Message& response ){ - return call( toSend , response ); - } + string getServerAddress() const { return _serverString; } + virtual void killCursor( long long cursorID ); + virtual bool callRead( Message& toSend , Message& response ) { return call( toSend , response ); } virtual void say( Message &toSend ); - virtual bool call( Message &toSend, Message &response, bool assertOk = true ); - - virtual ConnectionString::ConnectionType type() const { return ConnectionString::MASTER; } - - virtual bool isMember( const DBConnector * conn ) const { return this == conn; }; - + virtual bool call( Message &toSend, Message &response, bool assertOk = true , string * actualServer = 0 ); + virtual ConnectionString::ConnectionType type() const { return ConnectionString::MASTER; } virtual void checkResponse( const char *data, int nReturned ); + void setSoTimeout(double to) { _so_timeout = to; } + + static int getNumConnections() { + return _numConnections; + } + + static void setLazyKillCursor( bool lazy ) { _lazyKillCursor = lazy; } + static bool getLazyKillCursor() { return _lazyKillCursor; } protected: friend class SyncClusterConnection; virtual void recv( Message& m ); virtual void sayPiggyBack( Message &toSend ); - }; - - /** Use this class to connect to a replica set of servers. The class will manage - checking for which server in a replica set is master, and do failover automatically. - - This can also be used to connect to replica pairs since pairs are a subset of sets - - On a failover situation, expect at least one operation to return an error (throw - an exception) before the failover is complete. Operations are not retried. - */ - class DBClientReplicaSet : public DBClientBase { - string _name; - DBClientConnection * _currentMaster; - vector<HostAndPort> _servers; - vector<DBClientConnection*> _conns; - - - void _checkMaster(); - DBClientConnection * checkMaster(); - - public: - /** Call connect() after constructing. autoReconnect is always on for DBClientReplicaSet connections. */ - DBClientReplicaSet( const string& name , const vector<HostAndPort>& servers ); - virtual ~DBClientReplicaSet(); - - /** Returns false if nomember of the set were reachable, or neither is - master, although, - when false returned, you can still try to use this connection object, it will - try reconnects. - */ - bool connect(); - - /** Authorize. Authorizes all nodes as needed - */ - virtual bool auth(const string &dbname, const string &username, const string &pwd, string& errmsg, bool digestPassword = true ); - - /** throws userassertion "no master found" */ - virtual - auto_ptr<DBClientCursor> query(const string &ns, Query query, int nToReturn = 0, int nToSkip = 0, - const BSONObj *fieldsToReturn = 0, int queryOptions = 0 , int batchSize = 0 ); - - /** throws userassertion "no master found" */ - virtual - BSONObj findOne(const string &ns, const Query& query, const BSONObj *fieldsToReturn = 0, int queryOptions = 0); - - /** insert */ - virtual void insert( const string &ns , BSONObj obj ) { - checkMaster()->insert(ns, obj); - } - - /** insert multiple objects. Note that single object insert is asynchronous, so this version - is only nominally faster and not worth a special effort to try to use. */ - virtual void insert( const string &ns, const vector< BSONObj >& v ) { - checkMaster()->insert(ns, v); - } - - /** remove */ - virtual void remove( const string &ns , Query obj , bool justOne = 0 ) { - checkMaster()->remove(ns, obj, justOne); - } - - /** update */ - virtual void update( const string &ns , Query query , BSONObj obj , bool upsert = 0 , bool multi = 0 ) { - return checkMaster()->update(ns, query, obj, upsert,multi); - } - - virtual void killCursor( long long cursorID ){ - checkMaster()->killCursor( cursorID ); - } - - string toString(); - - /* this is the callback from our underlying connections to notify us that we got a "not master" error. - */ - void isntMaster() { - _currentMaster = 0; - } - - string getServerAddress() const; - - DBClientConnection& masterConn(); - DBClientConnection& slaveConn(); - - - virtual bool call( Message &toSend, Message &response, bool assertOk=true ) { return checkMaster()->call( toSend , response , assertOk ); } - virtual void say( Message &toSend ) { checkMaster()->say( toSend ); } - virtual bool callRead( Message& toSend , Message& response ){ return checkMaster()->callRead( toSend , response ); } - - virtual ConnectionString::ConnectionType type() const { return ConnectionString::SET; } + DBClientReplicaSet *clientSet; + boost::scoped_ptr<MessagingPort> p; + boost::scoped_ptr<SockAddr> server; + bool failed; + const bool autoReconnect; + time_t lastReconnectTry; + HostAndPort _server; // remember for reconnects + string _serverString; + void _checkConnection(); - virtual bool isMember( const DBConnector * conn ) const; + // throws SocketException if in failed state and not reconnecting or if waiting to reconnect + void checkConnection() { if( failed ) _checkConnection(); } - virtual void checkResponse( const char *data, int nReturned ) { checkMaster()->checkResponse( data , nReturned ); } + map< string, pair<string,string> > authCache; + double _so_timeout; + bool _connect( string& errmsg ); - protected: - virtual void sayPiggyBack( Message &toSend ) { checkMaster()->say( toSend ); } - - bool isFailed() const { - return _currentMaster == 0 || _currentMaster->isFailed(); - } + static AtomicUInt _numConnections; + static bool _lazyKillCursor; // lazy means we piggy back kill cursors on next op }; - + /** pings server to check if it's up */ bool serverAlive( const string &uri ); DBClientBase * createDirectClient(); - + } // namespace mongo #include "dbclientcursor.h" +#include "dbclient_rs.h" #include "undef_macros.h" diff --git a/client/dbclient_rs.cpp b/client/dbclient_rs.cpp new file mode 100644 index 0000000..fd8ecec --- /dev/null +++ b/client/dbclient_rs.cpp @@ -0,0 +1,594 @@ +// dbclient.cpp - connect to a Mongo database as a database, from C++ + +/* Copyright 2009 10gen Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "pch.h" +#include "dbclient.h" +#include "../bson/util/builder.h" +#include "../db/jsobj.h" +#include "../db/json.h" +#include "../db/dbmessage.h" +#include "connpool.h" +#include "dbclient_rs.h" +#include "../util/background.h" + +namespace mongo { + + // -------------------------------- + // ----- ReplicaSetMonitor --------- + // -------------------------------- + + // global background job responsible for checking every X amount of time + class ReplicaSetMonitorWatcher : public BackgroundJob { + public: + ReplicaSetMonitorWatcher() : _safego("ReplicaSetMonitorWatcher::_safego") , _started(false) {} + + virtual string name() const { return "ReplicaSetMonitorWatcher"; } + + void safeGo() { + // check outside of lock for speed + if ( _started ) + return; + + scoped_lock lk( _safego ); + if ( _started ) + return; + _started = true; + + go(); + } + protected: + void run() { + while ( ! inShutdown() ) { + sleepsecs( 20 ); + try { + ReplicaSetMonitor::checkAll(); + } + catch ( std::exception& e ) { + error() << "ReplicaSetMonitorWatcher: check failed: " << e.what() << endl; + } + } + } + + mongo::mutex _safego; + bool _started; + + } replicaSetMonitorWatcher; + + + ReplicaSetMonitor::ReplicaSetMonitor( const string& name , const vector<HostAndPort>& servers ) + : _lock( "ReplicaSetMonitor instance" ) , _checkConnectionLock( "ReplicaSetMonitor check connection lock" ), _name( name ) , _master(-1) { + + uassert( 13642 , "need at least 1 node for a replica set" , servers.size() > 0 ); + + if ( _name.size() == 0 ) { + warning() << "replica set name empty, first node: " << servers[0] << endl; + } + + string errmsg; + + for ( unsigned i=0; i<servers.size(); i++ ) { + auto_ptr<DBClientConnection> conn( new DBClientConnection( true , 0, 5.0 ) ); + if (!conn->connect( servers[i] , errmsg ) ) { + log(1) << "error connecting to seed " << servers[i] << ": " << errmsg << endl; + // skip seeds that don't work + continue; + } + + _nodes.push_back( Node( servers[i] , conn.release() ) ); + + string maybePrimary; + if (_checkConnection( _nodes[_nodes.size()-1].conn , maybePrimary, false)) { + break; + } + } + } + + ReplicaSetMonitor::~ReplicaSetMonitor() { + for ( unsigned i=0; i<_nodes.size(); i++ ) + delete _nodes[i].conn; + _nodes.clear(); + _master = -1; + } + + ReplicaSetMonitorPtr ReplicaSetMonitor::get( const string& name , const vector<HostAndPort>& servers ) { + scoped_lock lk( _setsLock ); + ReplicaSetMonitorPtr& m = _sets[name]; + if ( ! m ) + m.reset( new ReplicaSetMonitor( name , servers ) ); + + replicaSetMonitorWatcher.safeGo(); + + return m; + } + + void ReplicaSetMonitor::checkAll() { + set<string> seen; + + while ( true ) { + ReplicaSetMonitorPtr m; + { + for ( map<string,ReplicaSetMonitorPtr>::iterator i=_sets.begin(); i!=_sets.end(); ++i ) { + string name = i->first; + if ( seen.count( name ) ) + continue; + LOG(1) << "checking replica set: " << name << endl; + seen.insert( name ); + m = i->second; + break; + } + } + + if ( ! m ) + break; + + m->check(); + } + + + } + + void ReplicaSetMonitor::setConfigChangeHook( ConfigChangeHook hook ) { + massert( 13610 , "ConfigChangeHook already specified" , _hook == 0 ); + _hook = hook; + } + + string ReplicaSetMonitor::getServerAddress() const { + StringBuilder ss; + if ( _name.size() ) + ss << _name << "/"; + + { + scoped_lock lk( _lock ); + for ( unsigned i=0; i<_nodes.size(); i++ ) { + if ( i > 0 ) + ss << ","; + ss << _nodes[i].addr.toString(); + } + } + return ss.str(); + } + + bool ReplicaSetMonitor::contains( const string& server ) const { + scoped_lock lk( _lock ); + for ( unsigned i=0; i<_nodes.size(); i++ ) { + if ( _nodes[i].addr == server ) + return true; + } + return false; + } + + + void ReplicaSetMonitor::notifyFailure( const HostAndPort& server ) { + scoped_lock lk( _lock ); + if ( _master >= 0 && _master < (int)_nodes.size() ) { + if ( server == _nodes[_master].addr ) + _master = -1; + } + } + + + + HostAndPort ReplicaSetMonitor::getMaster() { + { + scoped_lock lk( _lock ); + if ( _master >= 0 && _nodes[_master].ok ) + return _nodes[_master].addr; + } + + _check(); + + scoped_lock lk( _lock ); + uassert( 10009 , str::stream() << "ReplicaSetMonitor no master found for set: " << _name , _master >= 0 ); + return _nodes[_master].addr; + } + + HostAndPort ReplicaSetMonitor::getSlave( const HostAndPort& prev ) { + // make sure its valid + if ( prev.port() > 0 ) { + scoped_lock lk( _lock ); + for ( unsigned i=0; i<_nodes.size(); i++ ) { + if ( prev != _nodes[i].addr ) + continue; + + if ( _nodes[i].ok ) + return prev; + break; + } + } + + return getSlave(); + } + + HostAndPort ReplicaSetMonitor::getSlave() { + int x = rand() % _nodes.size(); + { + scoped_lock lk( _lock ); + for ( unsigned i=0; i<_nodes.size(); i++ ) { + int p = ( i + x ) % _nodes.size(); + if ( p == _master ) + continue; + if ( _nodes[p].ok ) + return _nodes[p].addr; + } + } + + return _nodes[0].addr; + } + + /** + * notify the monitor that server has faild + */ + void ReplicaSetMonitor::notifySlaveFailure( const HostAndPort& server ) { + int x = _find( server ); + if ( x >= 0 ) { + scoped_lock lk( _lock ); + _nodes[x].ok = false; + } + } + + void ReplicaSetMonitor::_checkStatus(DBClientConnection *conn) { + BSONObj status; + + if (!conn->runCommand("admin", BSON("replSetGetStatus" << 1), status) || + !status.hasField("members") || + status["members"].type() != Array) { + return; + } + + BSONObjIterator hi(status["members"].Obj()); + while (hi.more()) { + BSONObj member = hi.next().Obj(); + string host = member["name"].String(); + + int m = -1; + if ((m = _find(host)) <= 0) { + continue; + } + + double state = member["state"].Number(); + if (member["health"].Number() == 1 && (state == 1 || state == 2)) { + scoped_lock lk( _lock ); + _nodes[m].ok = true; + } + else { + scoped_lock lk( _lock ); + _nodes[m].ok = false; + } + } + } + + void ReplicaSetMonitor::_checkHosts( const BSONObj& hostList, bool& changed ) { + BSONObjIterator hi(hostList); + while ( hi.more() ) { + string toCheck = hi.next().String(); + + if ( _find( toCheck ) >= 0 ) + continue; + + HostAndPort h( toCheck ); + DBClientConnection * newConn = new DBClientConnection( true, 0, 5.0 ); + string temp; + newConn->connect( h , temp ); + { + scoped_lock lk( _lock ); + _nodes.push_back( Node( h , newConn ) ); + } + log() << "updated set (" << _name << ") to: " << getServerAddress() << endl; + changed = true; + } + } + + + + bool ReplicaSetMonitor::_checkConnection( DBClientConnection * c , string& maybePrimary , bool verbose ) { + scoped_lock lk( _checkConnectionLock ); + bool isMaster = false; + bool changed = false; + try { + BSONObj o; + c->isMaster(isMaster, &o); + + log( ! verbose ) << "ReplicaSetMonitor::_checkConnection: " << c->toString() << ' ' << o << '\n'; + + // add other nodes + string maybePrimary; + if ( o["hosts"].type() == Array ) { + if ( o["primary"].type() == String ) + maybePrimary = o["primary"].String(); + + _checkHosts(o["hosts"].Obj(), changed); + } + if (o.hasField("passives") && o["passives"].type() == Array) { + _checkHosts(o["passives"].Obj(), changed); + } + + _checkStatus(c); + } + catch ( std::exception& e ) { + log( ! verbose ) << "ReplicaSetMonitor::_checkConnection: caught exception " << c->toString() << ' ' << e.what() << endl; + } + + if ( changed && _hook ) + _hook( this ); + + return isMaster; + } + + void ReplicaSetMonitor::_check() { + + bool triedQuickCheck = false; + + LOG(1) << "_check : " << getServerAddress() << endl; + + for ( int retry = 0; retry < 2; retry++ ) { + for ( unsigned i=0; i<_nodes.size(); i++ ) { + DBClientConnection * c; + { + scoped_lock lk( _lock ); + c = _nodes[i].conn; + } + + string maybePrimary; + if ( _checkConnection( c , maybePrimary , retry ) ) { + _master = i; + return; + } + + if ( ! triedQuickCheck && maybePrimary.size() ) { + int x = _find( maybePrimary ); + if ( x >= 0 ) { + triedQuickCheck = true; + string dummy; + DBClientConnection * testConn; + { + scoped_lock lk( _lock ); + testConn = _nodes[x].conn; + } + if ( _checkConnection( testConn , dummy , false ) ) { + _master = x; + return; + } + } + } + + } + sleepsecs(1); + } + + } + + void ReplicaSetMonitor::check() { + // first see if the current master is fine + if ( _master >= 0 ) { + string temp; + if ( _checkConnection( _nodes[_master].conn , temp , false ) ) { + // current master is fine, so we're done + return; + } + } + + // we either have no master, or the current is dead + _check(); + } + + int ReplicaSetMonitor::_find( const string& server ) const { + scoped_lock lk( _lock ); + for ( unsigned i=0; i<_nodes.size(); i++ ) + if ( _nodes[i].addr == server ) + return i; + return -1; + } + + int ReplicaSetMonitor::_find( const HostAndPort& server ) const { + scoped_lock lk( _lock ); + for ( unsigned i=0; i<_nodes.size(); i++ ) + if ( _nodes[i].addr == server ) + return i; + return -1; + } + + + mongo::mutex ReplicaSetMonitor::_setsLock( "ReplicaSetMonitor" ); + map<string,ReplicaSetMonitorPtr> ReplicaSetMonitor::_sets; + ReplicaSetMonitor::ConfigChangeHook ReplicaSetMonitor::_hook; + // -------------------------------- + // ----- DBClientReplicaSet --------- + // -------------------------------- + + DBClientReplicaSet::DBClientReplicaSet( const string& name , const vector<HostAndPort>& servers ) + : _monitor( ReplicaSetMonitor::get( name , servers ) ) { + } + + DBClientReplicaSet::~DBClientReplicaSet() { + } + + DBClientConnection * DBClientReplicaSet::checkMaster() { + HostAndPort h = _monitor->getMaster(); + + if ( h == _masterHost ) { + // a master is selected. let's just make sure connection didn't die + if ( ! _master->isFailed() ) + return _master.get(); + _monitor->notifyFailure( _masterHost ); + } + + _masterHost = _monitor->getMaster(); + _master.reset( new DBClientConnection( true ) ); + string errmsg; + if ( ! _master->connect( _masterHost , errmsg ) ) { + _monitor->notifyFailure( _masterHost ); + uasserted( 13639 , str::stream() << "can't connect to new replica set master [" << _masterHost.toString() << "] err: " << errmsg ); + } + _auth( _master.get() ); + return _master.get(); + } + + DBClientConnection * DBClientReplicaSet::checkSlave() { + HostAndPort h = _monitor->getSlave( _slaveHost ); + + if ( h == _slaveHost ) { + if ( ! _slave->isFailed() ) + return _slave.get(); + _monitor->notifySlaveFailure( _slaveHost ); + } + + _slaveHost = _monitor->getSlave(); + _slave.reset( new DBClientConnection( true ) ); + _slave->connect( _slaveHost ); + _auth( _slave.get() ); + return _slave.get(); + } + + + void DBClientReplicaSet::_auth( DBClientConnection * conn ) { + for ( list<AuthInfo>::iterator i=_auths.begin(); i!=_auths.end(); ++i ) { + const AuthInfo& a = *i; + string errmsg; + if ( ! conn->auth( a.dbname , a.username , a.pwd , errmsg, a.digestPassword ) ) + warning() << "cached auth failed for set: " << _monitor->getName() << " db: " << a.dbname << " user: " << a.username << endl; + + } + + } + + DBClientConnection& DBClientReplicaSet::masterConn() { + return *checkMaster(); + } + + DBClientConnection& DBClientReplicaSet::slaveConn() { + return *checkSlave(); + } + + bool DBClientReplicaSet::connect() { + try { + checkMaster(); + } + catch (AssertionException&) { + if (_master && _monitor) { + _monitor->notifyFailure(_masterHost); + } + return false; + } + return true; + } + + bool DBClientReplicaSet::auth(const string &dbname, const string &username, const string &pwd, string& errmsg, bool digestPassword ) { + DBClientConnection * m = checkMaster(); + + // first make sure it actually works + if( ! m->auth(dbname, username, pwd, errmsg, digestPassword ) ) + return false; + + // now that it does, we should save so that for a new node we can auth + _auths.push_back( AuthInfo( dbname , username , pwd , digestPassword ) ); + return true; + } + + // ------------- simple functions ----------------- + + void DBClientReplicaSet::insert( const string &ns , BSONObj obj ) { + checkMaster()->insert(ns, obj); + } + + void DBClientReplicaSet::insert( const string &ns, const vector< BSONObj >& v ) { + checkMaster()->insert(ns, v); + } + + void DBClientReplicaSet::remove( const string &ns , Query obj , bool justOne ) { + checkMaster()->remove(ns, obj, justOne); + } + + void DBClientReplicaSet::update( const string &ns , Query query , BSONObj obj , bool upsert , bool multi ) { + return checkMaster()->update(ns, query, obj, upsert,multi); + } + + auto_ptr<DBClientCursor> DBClientReplicaSet::query(const string &ns, Query query, int nToReturn, int nToSkip, + const BSONObj *fieldsToReturn, int queryOptions, int batchSize) { + + if ( queryOptions & QueryOption_SlaveOk ) { + // we're ok sending to a slave + // we'll try 2 slaves before just using master + // checkSlave will try a different slave automatically after a failure + for ( int i=0; i<2; i++ ) { + try { + return checkSlave()->query(ns,query,nToReturn,nToSkip,fieldsToReturn,queryOptions,batchSize); + } + catch ( DBException & ) { + LOG(1) << "can't query replica set slave: " << _slaveHost << endl; + } + } + } + + return checkMaster()->query(ns,query,nToReturn,nToSkip,fieldsToReturn,queryOptions,batchSize); + } + + BSONObj DBClientReplicaSet::findOne(const string &ns, const Query& query, const BSONObj *fieldsToReturn, int queryOptions) { + if ( queryOptions & QueryOption_SlaveOk ) { + // we're ok sending to a slave + // we'll try 2 slaves before just using master + // checkSlave will try a different slave automatically after a failure + for ( int i=0; i<2; i++ ) { + try { + return checkSlave()->findOne(ns,query,fieldsToReturn,queryOptions); + } + catch ( DBException & ) { + LOG(1) << "can't query replica set slave: " << _slaveHost << endl; + } + } + } + + return checkMaster()->findOne(ns,query,fieldsToReturn,queryOptions); + } + + void DBClientReplicaSet::killCursor( long long cursorID ) { + // we should neve call killCursor on a replica set conncetion + // since we don't know which server it belongs to + // can't assume master because of slave ok + // and can have a cursor survive a master change + assert(0); + } + + + bool DBClientReplicaSet::call( Message &toSend, Message &response, bool assertOk , string * actualServer ) { + if ( toSend.operation() == dbQuery ) { + // TODO: might be possible to do this faster by changing api + DbMessage dm( toSend ); + QueryMessage qm( dm ); + if ( qm.queryOptions & QueryOption_SlaveOk ) { + for ( int i=0; i<2; i++ ) { + try { + DBClientConnection* s = checkSlave(); + if ( actualServer ) + *actualServer = s->getServerAddress(); + return s->call( toSend , response , assertOk ); + } + catch ( DBException & ) { + log(1) << "can't query replica set slave: " << _slaveHost << endl; + if ( actualServer ) + *actualServer = ""; + } + } + } + } + + DBClientConnection* m = checkMaster(); + if ( actualServer ) + *actualServer = m->getServerAddress(); + return m->call( toSend , response , assertOk ); + } + +} diff --git a/client/dbclient_rs.h b/client/dbclient_rs.h new file mode 100644 index 0000000..43bf561 --- /dev/null +++ b/client/dbclient_rs.h @@ -0,0 +1,276 @@ +/** @file dbclient_rs.h - connect to a Replica Set, from C++ */ + +/* Copyright 2009 10gen Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "../pch.h" +#include "dbclient.h" + +namespace mongo { + + class ReplicaSetMonitor; + typedef shared_ptr<ReplicaSetMonitor> ReplicaSetMonitorPtr; + + /** + * manages state about a replica set for client + * keeps tabs on whose master and what slaves are up + * can hand a slave to someone for SLAVE_OK + * one instace per process per replica set + * TODO: we might be able to use a regular Node * to avoid _lock + */ + class ReplicaSetMonitor { + public: + + typedef boost::function1<void,const ReplicaSetMonitor*> ConfigChangeHook; + + /** + * gets a cached Monitor per name or will create if doesn't exist + */ + static ReplicaSetMonitorPtr get( const string& name , const vector<HostAndPort>& servers ); + + /** + * checks all sets for current master and new secondaries + * usually only called from a BackgroundJob + */ + static void checkAll(); + + /** + * this is called whenever the config of any repclia set changes + * currently only 1 globally + * asserts if one already exists + * ownership passes to ReplicaSetMonitor and the hook will actually never be deleted + */ + static void setConfigChangeHook( ConfigChangeHook hook ); + + ~ReplicaSetMonitor(); + + /** @return HostAndPort or throws an exception */ + HostAndPort getMaster(); + + /** + * notify the monitor that server has faild + */ + void notifyFailure( const HostAndPort& server ); + + /** @return prev if its still ok, and if not returns a random slave that is ok for reads */ + HostAndPort getSlave( const HostAndPort& prev ); + + /** @return a random slave that is ok for reads */ + HostAndPort getSlave(); + + + /** + * notify the monitor that server has faild + */ + void notifySlaveFailure( const HostAndPort& server ); + + /** + * checks for current master and new secondaries + */ + void check(); + + string getName() const { return _name; } + + string getServerAddress() const; + + bool contains( const string& server ) const; + + private: + /** + * This populates a list of hosts from the list of seeds (discarding the + * seed list). + * @param name set name + * @param servers seeds + */ + ReplicaSetMonitor( const string& name , const vector<HostAndPort>& servers ); + + void _check(); + + /** + * Use replSetGetStatus command to make sure hosts in host list are up + * and readable. Sets Node::ok appropriately. + */ + void _checkStatus(DBClientConnection *conn); + + /** + * Add array of hosts to host list. Doesn't do anything if hosts are + * already in host list. + * @param hostList the list of hosts to add + * @param changed if new hosts were added + */ + void _checkHosts(const BSONObj& hostList, bool& changed); + + /** + * Updates host list. + * @param c the connection to check + * @param maybePrimary OUT + * @param verbose + * @return if the connection is good + */ + bool _checkConnection( DBClientConnection * c , string& maybePrimary , bool verbose ); + + int _find( const string& server ) const ; + int _find( const HostAndPort& server ) const ; + + mutable mongo::mutex _lock; // protects _nodes + mutable mongo::mutex _checkConnectionLock; + + string _name; + struct Node { + Node( const HostAndPort& a , DBClientConnection* c ) : addr( a ) , conn(c) , ok(true) {} + HostAndPort addr; + DBClientConnection* conn; + + // if this node is in a failure state + // used for slave routing + // this is too simple, should make it better + bool ok; + }; + + /** + * Host list. + */ + vector<Node> _nodes; + + int _master; // which node is the current master. -1 means no master is known + + + static mongo::mutex _setsLock; // protects _sets + static map<string,ReplicaSetMonitorPtr> _sets; // set name to Monitor + + static ConfigChangeHook _hook; + }; + + /** Use this class to connect to a replica set of servers. The class will manage + checking for which server in a replica set is master, and do failover automatically. + + This can also be used to connect to replica pairs since pairs are a subset of sets + + On a failover situation, expect at least one operation to return an error (throw + an exception) before the failover is complete. Operations are not retried. + */ + class DBClientReplicaSet : public DBClientBase { + + public: + /** Call connect() after constructing. autoReconnect is always on for DBClientReplicaSet connections. */ + DBClientReplicaSet( const string& name , const vector<HostAndPort>& servers ); + virtual ~DBClientReplicaSet(); + + /** Returns false if nomember of the set were reachable, or neither is + * master, although, + * when false returned, you can still try to use this connection object, it will + * try reconnects. + */ + bool connect(); + + /** Authorize. Authorizes all nodes as needed + */ + virtual bool auth(const string &dbname, const string &username, const string &pwd, string& errmsg, bool digestPassword = true ); + + // ----------- simple functions -------------- + + /** throws userassertion "no master found" */ + virtual auto_ptr<DBClientCursor> query(const string &ns, Query query, int nToReturn = 0, int nToSkip = 0, + const BSONObj *fieldsToReturn = 0, int queryOptions = 0 , int batchSize = 0 ); + + /** throws userassertion "no master found" */ + virtual BSONObj findOne(const string &ns, const Query& query, const BSONObj *fieldsToReturn = 0, int queryOptions = 0); + + virtual void insert( const string &ns , BSONObj obj ); + + /** insert multiple objects. Note that single object insert is asynchronous, so this version + is only nominally faster and not worth a special effort to try to use. */ + virtual void insert( const string &ns, const vector< BSONObj >& v ); + + virtual void remove( const string &ns , Query obj , bool justOne = 0 ); + + virtual void update( const string &ns , Query query , BSONObj obj , bool upsert = 0 , bool multi = 0 ); + + virtual void killCursor( long long cursorID ); + + // ---- access raw connections ---- + + DBClientConnection& masterConn(); + DBClientConnection& slaveConn(); + + // ---- callback pieces ------- + + virtual void checkResponse( const char *data, int nReturned ) { checkMaster()->checkResponse( data , nReturned ); } + + /* this is the callback from our underlying connections to notify us that we got a "not master" error. + */ + void isntMaster() { _master.reset(); } + + // ----- status ------ + + virtual bool isFailed() const { return ! _master || _master->isFailed(); } + + // ----- informational ---- + + string toString() { return getServerAddress(); } + + string getServerAddress() const { return _monitor->getServerAddress(); } + + virtual ConnectionString::ConnectionType type() const { return ConnectionString::SET; } + + // ---- low level ------ + + virtual bool call( Message &toSend, Message &response, bool assertOk=true , string * actualServer = 0 ); + virtual void say( Message &toSend ) { checkMaster()->say( toSend ); } + virtual bool callRead( Message& toSend , Message& response ) { return checkMaster()->callRead( toSend , response ); } + + + protected: + virtual void sayPiggyBack( Message &toSend ) { checkMaster()->say( toSend ); } + + private: + + DBClientConnection * checkMaster(); + DBClientConnection * checkSlave(); + + void _auth( DBClientConnection * conn ); + + ReplicaSetMonitorPtr _monitor; + + HostAndPort _masterHost; + scoped_ptr<DBClientConnection> _master; + + HostAndPort _slaveHost; + scoped_ptr<DBClientConnection> _slave; + + /** + * for storing authentication info + * fields are exactly for DBClientConnection::auth + */ + struct AuthInfo { + AuthInfo( string d , string u , string p , bool di ) + : dbname( d ) , username( u ) , pwd( p ) , digestPassword( di ) {} + string dbname; + string username; + string pwd; + bool digestPassword; + }; + + // we need to store so that when we connect to a new node on failure + // we can re-auth + // this could be a security issue, as the password is stored in memory + // not sure if/how we should handle + list<AuthInfo> _auths; + }; + + +} diff --git a/client/dbclientcursor.cpp b/client/dbclientcursor.cpp index 5f9db43..6c6afc0 100644 --- a/client/dbclientcursor.cpp +++ b/client/dbclientcursor.cpp @@ -26,14 +26,14 @@ namespace mongo { void assembleRequest( const string &ns, BSONObj query, int nToReturn, int nToSkip, const BSONObj *fieldsToReturn, int queryOptions, Message &toSend ); - int DBClientCursor::nextBatchSize(){ + int DBClientCursor::nextBatchSize() { if ( nToReturn == 0 ) return batchSize; if ( batchSize == 0 ) return nToReturn; - + return batchSize < nToReturn ? batchSize : nToReturn; } @@ -41,7 +41,8 @@ namespace mongo { Message toSend; if ( !cursorId ) { assembleRequest( ns, query, nextBatchSize() , nToSkip, fieldsToReturn, opts, toSend ); - } else { + } + else { BufBuilder b; b.appendNum( opts ); b.appendStr( ns ); @@ -49,10 +50,16 @@ namespace mongo { b.appendNum( cursorId ); toSend.setData( dbGetMore, b.buf(), b.len() ); } - if ( !connector->call( toSend, *m, false ) ) + if ( !_client->call( toSend, *m, false ) ) { + // log msg temp? + log() << "DBClientCursor::init call() failed" << endl; return false; - if ( m->empty() ) + } + if ( m->empty() ) { + // log msg temp? + log() << "DBClientCursor::init message from call() was empty" << endl; return false; + } dataReceived(); return true; } @@ -60,7 +67,7 @@ namespace mongo { void DBClientCursor::requestMore() { assert( cursorId && pos == nReturned ); - if (haveLimit){ + if (haveLimit) { nToReturn -= nReturned; assert(nToReturn > 0); } @@ -69,13 +76,13 @@ namespace mongo { b.appendStr(ns); b.appendNum(nextBatchSize()); b.appendNum(cursorId); - + Message toSend; toSend.setData(dbGetMore, b.buf(), b.len()); auto_ptr<Message> response(new Message()); - - if ( connector ){ - connector->call( toSend, *response ); + + if ( _client ) { + _client->call( toSend, *response ); m = response; dataReceived(); } @@ -83,10 +90,10 @@ namespace mongo { assert( _scopedHost.size() ); ScopedDbConnection conn( _scopedHost ); conn->call( toSend , *response ); - connector = conn.get(); + _client = conn.get(); m = response; dataReceived(); - connector = 0; + _client = 0; conn.done(); } } @@ -96,8 +103,8 @@ namespace mongo { assert( cursorId && pos == nReturned ); assert( !haveLimit ); auto_ptr<Message> response(new Message()); - assert( connector ); - connector->recv(*response); + assert( _client ); + _client->recv(*response); m = response; dataReceived(); } @@ -105,7 +112,7 @@ namespace mongo { void DBClientCursor::dataReceived() { QueryResult *qr = (QueryResult *) m->singleData(); resultFlags = qr->resultFlags(); - + if ( qr->resultFlags() & ResultFlag_CursorNotFound ) { // cursor id no longer valid at the server. assert( qr->cursorId == 0 ); @@ -113,7 +120,7 @@ namespace mongo { if ( ! ( opts & QueryOption_CursorTailable ) ) throw UserException( 13127 , "getMore: cursor didn't exist on server, possible restart or timeout?" ); } - + if ( cursorId == 0 || ! ( opts & QueryOption_CursorTailable ) ) { // only set initially: we don't want to kill it on end of data // if it's a tailable cursor @@ -124,7 +131,7 @@ namespace mongo { pos = 0; data = qr->data(); - connector->checkResponse( data, nReturned ); + _client->checkResponse( data, nReturned ); /* this assert would fire the way we currently work: assert( nReturned || cursorId == 0 ); */ @@ -136,7 +143,7 @@ namespace mongo { if ( !_putBack.empty() ) return true; - + if (haveLimit && pos >= nToReturn) return false; @@ -171,7 +178,7 @@ namespace mongo { int m = atMost; /* - for( stack<BSONObj>::iterator i = _putBack.begin(); i != _putBack.end(); i++ ) { + for( stack<BSONObj>::iterator i = _putBack.begin(); i != _putBack.end(); i++ ) { if( m == 0 ) return; v.push_back(*i); @@ -190,13 +197,22 @@ namespace mongo { v.push_back(o); } } - - void DBClientCursor::attach( AScopedConnection * conn ){ + + void DBClientCursor::attach( AScopedConnection * conn ) { assert( _scopedHost.size() == 0 ); - assert( conn->get()->isMember( connector ) ); - _scopedHost = conn->getHost(); + assert( conn ); + assert( conn->get() ); + + if ( conn->get()->type() == ConnectionString::SET || + conn->get()->type() == ConnectionString::SYNC ) { + _scopedHost = _client->getServerAddress(); + } + else { + _scopedHost = conn->getHost(); + } + conn->done(); - connector = 0; + _client = 0; } DBClientCursor::~DBClientCursor() { @@ -205,28 +221,28 @@ namespace mongo { DESTRUCTOR_GUARD ( - if ( cursorId && _ownCursor ) { - BufBuilder b; - b.appendNum( (int)0 ); // reserved - b.appendNum( (int)1 ); // number - b.appendNum( cursorId ); - - Message m; - m.setData( dbKillCursors , b.buf() , b.len() ); - - if ( connector ){ - connector->sayPiggyBack( m ); - } - else { - assert( _scopedHost.size() ); - ScopedDbConnection conn( _scopedHost ); - conn->sayPiggyBack( m ); - conn.done(); - } + if ( cursorId && _ownCursor ) { + BufBuilder b; + b.appendNum( (int)0 ); // reserved + b.appendNum( (int)1 ); // number + b.appendNum( cursorId ); + + Message m; + m.setData( dbKillCursors , b.buf() , b.len() ); + + if ( _client ) { + _client->sayPiggyBack( m ); + } + else { + assert( _scopedHost.size() ); + ScopedDbConnection conn( _scopedHost ); + conn->sayPiggyBack( m ); + conn.done(); } + } ); } - + } // namespace mongo diff --git a/client/dbclientcursor.h b/client/dbclientcursor.h index 51cdc13..5d795f4 100644 --- a/client/dbclientcursor.h +++ b/client/dbclientcursor.h @@ -1,4 +1,4 @@ -// file dbclientcursor.h +// file dbclientcursor.h /* Copyright 2009 10gen Inc. * @@ -24,41 +24,55 @@ #include <stack> namespace mongo { - + class AScopedConnection; - - /** Queries return a cursor object */ - class DBClientCursor : boost::noncopyable { + + /** for mock purposes only -- do not create variants of DBClientCursor, nor hang code here */ + class DBClientCursorInterface { public: - /** If true, safe to call next(). Requests more from server if necessary. */ + virtual ~DBClientCursorInterface() {} + + virtual bool more() = 0; + virtual BSONObj next() = 0; + + // TODO bring more of the DBClientCursor interface to here + + protected: + DBClientCursorInterface() {} + }; + + /** Queries return a cursor object */ + class DBClientCursor : public DBClientCursorInterface { + public: + /** If true, safe to call next(). Requests more from server if necessary. */ bool more(); - /** If true, there is more in our local buffers to be fetched via next(). Returns - false when a getMore request back to server would be required. You can use this - if you want to exhaust whatever data has been fetched to the client already but + /** If true, there is more in our local buffers to be fetched via next(). Returns + false when a getMore request back to server would be required. You can use this + if you want to exhaust whatever data has been fetched to the client already but then perhaps stop. */ int objsLeftInBatch() const { _assertIfNull(); return _putBack.size() + nReturned - pos; } bool moreInCurrentBatch() { return objsLeftInBatch() > 0; } /** next - @return next object in the result cursor. + @return next object in the result cursor. on an error at the remote server, you will get back: { $err: <string> } if you do not want to handle that yourself, call nextSafe(). */ BSONObj next(); - - /** + + /** restore an object previously returned by next() to the cursor */ void putBack( const BSONObj &o ) { _putBack.push( o.getOwned() ); } - /** throws AssertionException if get back { $err : ... } */ + /** throws AssertionException if get back { $err : ... } */ BSONObj nextSafe() { BSONObj o = next(); BSONElement e = o.firstElement(); - if( strcmp(e.fieldName(), "$err") == 0 ) { + if( strcmp(e.fieldName(), "$err") == 0 ) { if( logLevel >= 5 ) log() << "nextSafe() error " << o.toString() << endl; uassert(13106, "nextSafe(): " + o.toString(), false); @@ -67,7 +81,7 @@ namespace mongo { } /** peek ahead at items buffered for future next() calls. - never requests new data from the server. so peek only effective + never requests new data from the server. so peek only effective with what is already buffered. WARNING: no support for _putBack yet! */ @@ -76,9 +90,9 @@ namespace mongo { /** iterate the rest of the cursor and return the number if items */ - int itcount(){ + int itcount() { int c = 0; - while ( more() ){ + while ( more() ) { next(); c++; } @@ -97,48 +111,48 @@ namespace mongo { bool tailable() const { return (opts & QueryOption_CursorTailable) != 0; } - - /** see ResultFlagType (constants.h) for flag values - mostly these flags are for internal purposes - + + /** see ResultFlagType (constants.h) for flag values + mostly these flags are for internal purposes - ResultFlag_ErrSet is the possible exception to that */ - bool hasResultFlag( int flag ){ + bool hasResultFlag( int flag ) { _assertIfNull(); return (resultFlags & flag) != 0; } - DBClientCursor( DBConnector *_connector, const string &_ns, BSONObj _query, int _nToReturn, + DBClientCursor( DBClientBase* client, const string &_ns, BSONObj _query, int _nToReturn, int _nToSkip, const BSONObj *_fieldsToReturn, int queryOptions , int bs ) : - connector(_connector), - ns(_ns), - query(_query), - nToReturn(_nToReturn), - haveLimit( _nToReturn > 0 && !(queryOptions & QueryOption_CursorTailable)), - nToSkip(_nToSkip), - fieldsToReturn(_fieldsToReturn), - opts(queryOptions), - batchSize(bs==1?2:bs), - m(new Message()), - cursorId(), - nReturned(), - pos(), - data(), - _ownCursor( true ){ + _client(client), + ns(_ns), + query(_query), + nToReturn(_nToReturn), + haveLimit( _nToReturn > 0 && !(queryOptions & QueryOption_CursorTailable)), + nToSkip(_nToSkip), + fieldsToReturn(_fieldsToReturn), + opts(queryOptions), + batchSize(bs==1?2:bs), + m(new Message()), + cursorId(), + nReturned(), + pos(), + data(), + _ownCursor( true ) { + } + + DBClientCursor( DBClientBase* client, const string &_ns, long long _cursorId, int _nToReturn, int options ) : + _client(client), + ns(_ns), + nToReturn( _nToReturn ), + haveLimit( _nToReturn > 0 && !(options & QueryOption_CursorTailable)), + opts( options ), + m(new Message()), + cursorId( _cursorId ), + nReturned(), + pos(), + data(), + _ownCursor( true ) { } - - DBClientCursor( DBConnector *_connector, const string &_ns, long long _cursorId, int _nToReturn, int options ) : - connector(_connector), - ns(_ns), - nToReturn( _nToReturn ), - haveLimit( _nToReturn > 0 && !(options & QueryOption_CursorTailable)), - opts( options ), - m(new Message()), - cursorId( _cursorId ), - nReturned(), - pos(), - data(), - _ownCursor( true ){ - } virtual ~DBClientCursor(); @@ -148,15 +162,15 @@ namespace mongo { message when ~DBClientCursor() is called. This function overrides that. */ void decouple() { _ownCursor = false; } - + void attach( AScopedConnection * conn ); - + private: friend class DBClientBase; friend class DBClientConnection; - bool init(); + bool init(); int nextBatchSize(); - DBConnector *connector; + DBClientBase* _client; string ns; BSONObj query; int nToReturn; @@ -180,8 +194,12 @@ namespace mongo { // Don't call from a virtual function void _assertIfNull() const { uassert(13348, "connection died", this); } + + // non-copyable , non-assignable + DBClientCursor( const DBClientCursor& ); + DBClientCursor& operator=( const DBClientCursor& ); }; - + /** iterate over objects in current batch only - will not cause a network call */ class DBClientCursorBatchIterator { @@ -198,7 +216,7 @@ namespace mongo { DBClientCursor &_c; int _n; }; - + } // namespace mongo #include "undef_macros.h" diff --git a/client/dbclientmockcursor.h b/client/dbclientmockcursor.h new file mode 100644 index 0000000..8d85ff5 --- /dev/null +++ b/client/dbclientmockcursor.h @@ -0,0 +1,40 @@ +//@file dbclientmockcursor.h + +/* Copyright 2010 10gen Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "dbclientcursor.h" + +namespace mongo { + + class DBClientMockCursor : public DBClientCursorInterface { + public: + DBClientMockCursor( const BSONArray& mockCollection ) : _iter( mockCollection ) {} + virtual ~DBClientMockCursor() {} + + bool more() { return _iter.more(); } + BSONObj next() { return _iter.next().Obj(); } + + private: + BSONObjIterator _iter; + + // non-copyable , non-assignable + DBClientMockCursor( const DBClientMockCursor& ); + DBClientMockCursor& operator=( const DBClientMockCursor& ); + }; + +} // namespace mongo diff --git a/client/distlock.cpp b/client/distlock.cpp index 05e54c0..9ec98ea 100644 --- a/client/distlock.cpp +++ b/client/distlock.cpp @@ -21,23 +21,36 @@ namespace mongo { - string lockPingNS = "config.lockpings"; + static string lockPingNS = "config.lockpings"; + static string locksNS = "config.locks"; ThreadLocalValue<string> distLockIds(""); - - string getDistLockProcess(){ - static string s; - if ( s.empty() ){ - stringstream ss; - ss << getHostNameCached() << ":" << time(0) << ":" << rand(); - s = ss.str(); - } - return s; + + /* ================== + * Module initialization + */ + + boost::once_flag _init = BOOST_ONCE_INIT; + static string* _cachedProcessString = NULL; + + static void initModule() { + // cache process string + stringstream ss; + ss << getHostName() << ":" << time(0) << ":" << rand(); + _cachedProcessString = new string( ss.str() ); } - string getDistLockId(){ + /* =================== */ + + string getDistLockProcess() { + boost::call_once( initModule, _init ); + assert( _cachedProcessString ); + return *_cachedProcessString; + } + + string getDistLockId() { string s = distLockIds.get(); - if ( s.empty() ){ + if ( s.empty() ) { stringstream ss; ss << getDistLockProcess() << ":" << getThreadName() << ":" << rand(); s = ss.str(); @@ -45,50 +58,95 @@ namespace mongo { } return s; } - - void distLockPingThread( ConnectionString addr ){ + + void _distLockPingThread( ConnectionString addr ) { setThreadName( "LockPinger" ); + + log() << "creating dist lock ping thread for: " << addr << endl; static int loops = 0; - while( ! inShutdown() ){ + while( ! inShutdown() ) { + + string process = getDistLockProcess(); + log(4) << "dist_lock about to ping for: " << process << endl; + try { ScopedDbConnection conn( addr ); - - // do ping - conn->update( lockPingNS , - BSON( "_id" << getDistLockProcess() ) , + + // refresh the entry corresponding to this process in the lockpings collection + conn->update( lockPingNS , + BSON( "_id" << process ) , BSON( "$set" << BSON( "ping" << DATENOW ) ) , true ); - - - // remove really old entries - BSONObjBuilder f; - f.appendDate( "$lt" , jsTime() - ( 4 * 86400 * 1000 ) ); - BSONObj r = BSON( "ping" << f.obj() ); - conn->remove( lockPingNS , r ); - + string err = conn->getLastError(); + if ( ! err.empty() ) { + warning() << "dist_lock process: " << process << " pinging: " << addr << " failed: " + << err << endl; + conn.done(); + sleepsecs(30); + continue; + } + + // remove really old entries from the lockpings collection if they're not holding a lock + // (this may happen if an instance of a process was taken down and no new instance came up to + // replace it for a quite a while) + // if the lock is taken, the take-over mechanism should handle the situation + auto_ptr<DBClientCursor> c = conn->query( locksNS , BSONObj() ); + vector<string> pids; + while ( c->more() ) { + BSONObj lock = c->next(); + if ( ! lock["process"].eoo() ) { + pids.push_back( lock["process"].valuestrsafe() ); + } + } + + Date_t fourDays = jsTime() - ( 4 * 86400 * 1000 ); // 4 days + conn->remove( lockPingNS , BSON( "_id" << BSON( "$nin" << pids ) << "ping" << LT << fourDays ) ); + err = conn->getLastError(); + if ( ! err.empty() ) { + warning() << "dist_lock cleanup request from process: " << process << " to: " << addr + << " failed: " << err << endl; + conn.done(); + sleepsecs(30); + continue; + } + // create index so remove is fast even with a lot of servers - if ( loops++ == 0 ){ + if ( loops++ == 0 ) { conn->ensureIndex( lockPingNS , BSON( "ping" << 1 ) ); } - + conn.done(); } - catch ( std::exception& e ){ - log( LL_WARNING ) << "couldn't ping: " << e.what() << endl; + catch ( std::exception& e ) { + warning() << "dist_lock exception during ping: " << e.what() << endl; } + + log( loops % 10 == 0 ? 0 : 1) << "dist_lock pinged successfully for: " << process << endl; sleepsecs(30); } } - - + + void distLockPingThread( ConnectionString addr ) { + try { + _distLockPingThread( addr ); + } + catch ( std::exception& e ) { + error() << "unexpected error in distLockPingThread: " << e.what() << endl; + } + catch ( ... ) { + error() << "unexpected unknown error in distLockPingThread" << endl; + } + } + + class DistributedLockPinger { public: DistributedLockPinger() - : _mutex( "DistributedLockPinger" ){ + : _mutex( "DistributedLockPinger" ) { } - - void got( const ConnectionString& conn ){ + + void got( const ConnectionString& conn ) { string s = conn.toString(); scoped_lock lk( _mutex ); if ( _seen.count( s ) > 0 ) @@ -96,80 +154,121 @@ namespace mongo { boost::thread t( boost::bind( &distLockPingThread , conn ) ); _seen.insert( s ); } - + set<string> _seen; mongo::mutex _mutex; - + } distLockPinger; - + DistributedLock::DistributedLock( const ConnectionString& conn , const string& name , unsigned takeoverMinutes ) - : _conn(conn),_name(name),_takeoverMinutes(takeoverMinutes){ + : _conn(conn),_name(name),_takeoverMinutes(takeoverMinutes) { _id = BSON( "_id" << name ); _ns = "config.locks"; distLockPinger.got( conn ); } - - bool DistributedLock::lock_try( string why , BSONObj * other ){ + + bool DistributedLock::lock_try( string why , BSONObj * other ) { + // write to dummy if 'other' is null + BSONObj dummyOther; + if ( other == NULL ) + other = &dummyOther; + ScopedDbConnection conn( _conn ); - + BSONObjBuilder queryBuilder; queryBuilder.appendElements( _id ); - queryBuilder.append( "state" , 0 ); + queryBuilder.append( "state" , 0 ); - { // make sure its there so we can use simple update logic below - BSONObj o = conn->findOne( _ns , _id ); - if ( o.isEmpty() ){ + { + // make sure its there so we can use simple update logic below + BSONObj o = conn->findOne( _ns , _id ).getOwned(); + if ( o.isEmpty() ) { try { + log(4) << "dist_lock inserting initial doc in " << _ns << " for lock " << _name << endl; conn->insert( _ns , BSON( "_id" << _name << "state" << 0 << "who" << "" ) ); } - catch ( UserException& ){ + catch ( UserException& e ) { + log() << "dist_lock could not insert initial doc: " << e << endl; } } - else if ( o["state"].numberInt() > 0 ){ + + else if ( o["state"].numberInt() > 0 ) { BSONObj lastPing = conn->findOne( lockPingNS , o["process"].wrap( "_id" ) ); - if ( lastPing.isEmpty() ){ - // TODO: maybe this should clear, not sure yet - log() << "lastPing is empty! this could be bad: " << o << endl; + if ( lastPing.isEmpty() ) { + // if a lock is taken but there's no ping for it, we're in an inconsistent situation + // if the lock holder (mongos or d) does not exist anymore, the lock could safely be removed + // but we'd require analysis of the situation before a manual intervention + error() << "config.locks: " << _name << " lock is taken by old process? " + << "remove the following lock if the process is not active anymore: " << o << endl; + *other = o; conn.done(); return false; } - unsigned long long elapsed = jsTime() - lastPing["ping"].Date(); // in ms - elapsed = elapsed / ( 1000 * 60 ); // convert to minutes - - if ( elapsed <= _takeoverMinutes ){ - log(1) << "dist_lock lock failed because taken by: " << o << endl; + unsigned long long now = jsTime(); + unsigned long long pingTime = lastPing["ping"].Date(); + + if ( now < pingTime ) { + // clock skew + warning() << "dist_lock has detected clock skew of " << ( pingTime - now ) << "ms" << endl; + *other = o; conn.done(); return false; } + unsigned long long elapsed = now - pingTime; + elapsed = elapsed / ( 1000 * 60 ); // convert to minutes + + if ( elapsed > ( 60 * 24 * 365 * 100 ) /* 100 years */ ) { + warning() << "distlock elapsed time seems impossible: " << lastPing << endl; + } + + if ( elapsed <= _takeoverMinutes ) { + log(1) << "dist_lock lock failed because taken by: " << o << " elapsed minutes: " << elapsed << endl; + *other = o; + conn.done(); + return false; + } + log() << "dist_lock forcefully taking over from: " << o << " elapsed minutes: " << elapsed << endl; conn->update( _ns , _id , BSON( "$set" << BSON( "state" << 0 ) ) ); + string err = conn->getLastError(); + if ( ! err.empty() ) { + warning() << "dist_lock take over from: " << o << " failed: " << err << endl; + *other = o.getOwned(); + other->getOwned(); + conn.done(); + return false; + } + } - else if ( o["ts"].type() ){ + else if ( o["ts"].type() ) { queryBuilder.append( o["ts"] ); } } - + OID ts; ts.init(); bool gotLock = false; BSONObj now; - - BSONObj whatIWant = BSON( "$set" << BSON( "state" << 1 << - "who" << getDistLockId() << "process" << getDistLockProcess() << - "when" << DATENOW << "why" << why << "ts" << ts ) ); + + BSONObj lockDetails = BSON( "state" << 1 << "who" << getDistLockId() << "process" << getDistLockProcess() << + "when" << DATENOW << "why" << why << "ts" << ts ); + BSONObj whatIWant = BSON( "$set" << lockDetails ); try { + log(4) << "dist_lock about to aquire lock: " << lockDetails << endl; + conn->update( _ns , queryBuilder.obj() , whatIWant ); - + BSONObj o = conn->getLastErrorDetailed(); now = conn->findOne( _ns , _id ); - - if ( o["n"].numberInt() == 0 ){ - if ( other ) - *other = now; + + if ( o["n"].numberInt() == 0 ) { + *other = now; + other->getOwned(); + log() << "dist_lock error trying to aquire lock: " << lockDetails << " error: " << o << endl; gotLock = false; } else { @@ -177,40 +276,40 @@ namespace mongo { } } - catch ( UpdateNotTheSame& up ){ + catch ( UpdateNotTheSame& up ) { // this means our update got through on some, but not others + log(4) << "dist_lock lock did not propagate properly" << endl; - for ( unsigned i=0; i<up.size(); i++ ){ + for ( unsigned i=0; i<up.size(); i++ ) { ScopedDbConnection temp( up[i].first ); BSONObj temp2 = temp->findOne( _ns , _id ); - if ( now.isEmpty() || now["ts"] < temp2["ts"] ){ + if ( now.isEmpty() || now["ts"] < temp2["ts"] ) { now = temp2.getOwned(); } temp.done(); } - if ( now["ts"].OID() == ts ){ + if ( now["ts"].OID() == ts ) { + log(4) << "dist_lock completed lock propagation" << endl; gotLock = true; conn->update( _ns , _id , whatIWant ); } else { + log() << "dist_lock error trying to complete propagation" << endl; gotLock = false; } } - + conn.done(); - - log(1) << "dist_lock lock gotLock: " << gotLock << " now: " << now << endl; - if ( ! gotLock ) - return false; - - return true; + log(2) << "dist_lock lock gotLock: " << gotLock << " now: " << now << endl; + + return gotLock; } - void DistributedLock::unlock(){ + void DistributedLock::unlock() { const int maxAttempts = 3; int attempted = 0; while ( ++attempted <= maxAttempts ) { @@ -218,22 +317,23 @@ namespace mongo { try { ScopedDbConnection conn( _conn ); conn->update( _ns , _id, BSON( "$set" << BSON( "state" << 0 ) ) ); - log(1) << "dist_lock unlock: " << conn->findOne( _ns , _id ) << endl; + log(2) << "dist_lock unlock: " << conn->findOne( _ns , _id ) << endl; conn.done(); return; - - } catch ( std::exception& e) { - log( LL_WARNING ) << "dist_lock " << _name << " failed to contact config server in unlock attempt " + + } + catch ( std::exception& e) { + log( LL_WARNING ) << "dist_lock " << _name << " failed to contact config server in unlock attempt " << attempted << ": " << e.what() << endl; sleepsecs(1 << attempted); } } - log( LL_WARNING ) << "dist_lock couldn't consumate unlock request. " << "Lock " << _name - << " will be taken over after " << _takeoverMinutes << " minutes timeout" << endl; + log( LL_WARNING ) << "dist_lock couldn't consumate unlock request. " << "Lock " << _name + << " will be taken over after " << _takeoverMinutes << " minutes timeout" << endl; } } diff --git a/client/distlock.h b/client/distlock.h index 8a77338..753a241 100644 --- a/client/distlock.h +++ b/client/distlock.h @@ -15,10 +15,7 @@ * limitations under the License. */ - -/** - * distributed locking mechanism - */ +#pragma once #include "../pch.h" #include "dbclient.h" @@ -28,53 +25,71 @@ namespace mongo { + /** + * The distributed lock is a configdb backed way of synchronizing system-wide tasks. A task must be identified by a + * unique name across the system (e.g., "balancer"). A lock is taken by writing a document in the configdb's locks + * collection with that name. + * + * To be maintained, each taken lock needs to be revalidaded ("pinged") within a pre-established amount of time. This + * class does this maintenance automatically once a DistributedLock object was constructed. + */ class DistributedLock { public: /** - * @param takeoverMinutes how long before we steal lock in minutes + * The constructor does not connect to the configdb yet and constructing does not mean the lock was acquired. + * Construction does trigger a lock "pinging" mechanism, though. + * + * @param conn address of config(s) server(s) + * @param name identifier for the lock + * @param takeoverMinutes how long can the log go "unpinged" before a new attempt to lock steals it (in minutes) */ - DistributedLock( const ConnectionString& conn , const string& name , unsigned takeoverMinutes = 10 ); + DistributedLock( const ConnectionString& conn , const string& name , unsigned takeoverMinutes = 15 ); + /** + * Attempts to aquire 'this' lock, checking if it could or should be stolen from the previous holder. Please + * consider using the dist_lock_try construct to acquire this lock in an exception safe way. + * + * @param why human readable description of why the lock is being taken (used to log) + * @param other configdb's lock document that is currently holding the lock, if lock is taken + * @return true if it managed to grab the lock + */ bool lock_try( string why , BSONObj * other = 0 ); + + /** + * Releases a previously taken lock. + */ void unlock(); private: ConnectionString _conn; string _name; unsigned _takeoverMinutes; - + string _ns; BSONObj _id; }; - + class dist_lock_try { public: - dist_lock_try( DistributedLock * lock , string why ) - : _lock(lock){ + : _lock(lock) { _got = _lock->lock_try( why , &_other ); } - ~dist_lock_try(){ - if ( _got ){ + ~dist_lock_try() { + if ( _got ) { _lock->unlock(); } } - bool got() const { - return _got; - } + bool got() const { return _got; } + BSONObj other() const { return _other; } - BSONObj other() const { - return _other; - } - private: DistributedLock * _lock; bool _got; BSONObj _other; - }; } diff --git a/client/distlock_test.cpp b/client/distlock_test.cpp index 0879b6e..83d143f 100644 --- a/client/distlock_test.cpp +++ b/client/distlock_test.cpp @@ -21,60 +21,84 @@ #include "../db/commands.h" namespace mongo { - + class TestDistLockWithSync : public Command { public: - TestDistLockWithSync() : Command( "_testDistLockWithSyncCluster" ){} + TestDistLockWithSync() : Command( "_testDistLockWithSyncCluster" ) {} virtual void help( stringstream& help ) const { help << "should not be calling this directly" << endl; } - + virtual bool slaveOk() const { return false; } virtual bool adminOnly() const { return true; } - virtual LockType locktype() const { return NONE; } + virtual LockType locktype() const { return NONE; } - static void runThread(){ - for ( int i=0; i<1000; i++ ){ - if ( current->lock_try( "test" ) ){ - gotit++; - for ( int j=0; j<2000; j++ ){ - count++; + static void runThread() { + while ( keepGoing ) { + if ( current->lock_try( "test" ) ) { + count++; + int before = count; + sleepmillis( 3 ); + int after = count; + + if ( after != before ) { + error() << " before: " << before << " after: " << after << endl; } + current->unlock(); } } } - - bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){ + + bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool) { + Timer t; DistributedLock lk( ConnectionString( cmdObj["host"].String() , ConnectionString::SYNC ), "testdistlockwithsync" ); current = &lk; count = 0; gotit = 0; + errors = 0; + keepGoing = true; vector<shared_ptr<boost::thread> > l; - for ( int i=0; i<4; i++ ){ + for ( int i=0; i<4; i++ ) { l.push_back( shared_ptr<boost::thread>( new boost::thread( runThread ) ) ); } + int secs = 10; + if ( cmdObj["secs"].isNumber() ) + secs = cmdObj["secs"].numberInt(); + sleepsecs( secs ); + keepGoing = false; + for ( unsigned i=0; i<l.size(); i++ ) l[i]->join(); + current = 0; + result.append( "count" , count ); result.append( "gotit" , gotit ); - current = 0; - return count == gotit * 2000; + result.append( "errors" , errors ); + result.append( "timeMS" , t.millis() ); + + return errors == 0; } + // variables for test static DistributedLock * current; - static int count; static int gotit; + static int errors; + static AtomicUInt count; + + static bool keepGoing; } testDistLockWithSyncCmd; DistributedLock * TestDistLockWithSync::current; - int TestDistLockWithSync::count; + AtomicUInt TestDistLockWithSync::count; int TestDistLockWithSync::gotit; + int TestDistLockWithSync::errors; + bool TestDistLockWithSync::keepGoing; } diff --git a/client/examples/authTest.cpp b/client/examples/authTest.cpp index 77ce12d..71cdd39 100644 --- a/client/examples/authTest.cpp +++ b/client/examples/authTest.cpp @@ -22,7 +22,7 @@ using namespace mongo; int main( int argc, const char **argv ) { - + const char *port = "27017"; if ( argc != 1 ) { if ( argc != 3 ) @@ -37,17 +37,18 @@ int main( int argc, const char **argv ) { throw -11; } - { // clean up old data from any previous tests + { + // clean up old data from any previous tests conn.remove( "test.system.users" , BSONObj() ); } conn.insert( "test.system.users" , BSON( "user" << "eliot" << "pwd" << conn.createPasswordDigest( "eliot" , "bar" ) ) ); - + errmsg.clear(); bool ok = conn.auth( "test" , "eliot" , "bar" , errmsg ); if ( ! ok ) cout << errmsg << endl; - assert( ok ); + MONGO_assert( ok ); - assert( ! conn.auth( "test" , "eliot" , "bars" , errmsg ) ); + MONGO_assert( ! conn.auth( "test" , "eliot" , "bars" , errmsg ) ); } diff --git a/client/examples/clientTest.cpp b/client/examples/clientTest.cpp index 83a556a..bd4432e 100644 --- a/client/examples/clientTest.cpp +++ b/client/examples/clientTest.cpp @@ -19,9 +19,14 @@ * a simple test for the c++ driver */ +// this header should be first to ensure that it includes cleanly in any context +#include "client/dbclient.h" + #include <iostream> -#include "client/dbclient.h" +#ifndef assert +# define assert(x) MONGO_assert(x) +#endif using namespace std; using namespace mongo; @@ -125,12 +130,14 @@ int main( int argc, const char **argv ) { } - { // ensure index + { + // ensure index assert( conn.ensureIndex( ns , BSON( "name" << 1 ) ) ); assert( ! conn.ensureIndex( ns , BSON( "name" << 1 ) ) ); } - { // hint related tests + { + // hint related tests assert( conn.findOne(ns, "{}")["name"].str() == "sara" ); assert( conn.findOne(ns, "{ name : 'eliot' }")["name"].str() == "eliot" ); @@ -141,7 +148,7 @@ int main( int argc, const char **argv ) { try { conn.findOne(ns, Query("{name:\"eliot\"}").hint("{foo:1}")); } - catch ( ... ){ + catch ( ... ) { asserted = true; } assert( asserted ); @@ -153,7 +160,8 @@ int main( int argc, const char **argv ) { assert( conn.validate( ns ) ); } - { // timestamp test + { + // timestamp test const char * tsns = "test.tstest1"; conn.dropCollection( tsns ); @@ -185,32 +193,33 @@ int main( int argc, const char **argv ) { ( oldTime == found["ts"].timestampTime() && oldInc < found["ts"].timestampInc() ) ); } - - { // check that killcursors doesn't affect last error + + { + // check that killcursors doesn't affect last error assert( conn.getLastError().empty() ); - + BufBuilder b; b.appendNum( (int)0 ); // reserved b.appendNum( (int)-1 ); // invalid # of cursors triggers exception b.appendNum( (int)-1 ); // bogus cursor id - + Message m; m.setData( dbKillCursors, b.buf(), b.len() ); - + // say() is protected in DBClientConnection, so get superclass static_cast< DBConnector* >( &conn )->say( m ); - + assert( conn.getLastError().empty() ); } { list<string> l = conn.getDatabaseNames(); - for ( list<string>::iterator i = l.begin(); i != l.end(); i++ ){ + for ( list<string>::iterator i = l.begin(); i != l.end(); i++ ) { cout << "db name : " << *i << endl; } l = conn.getCollectionNames( "test" ); - for ( list<string>::iterator i = l.begin(); i != l.end(); i++ ){ + for ( list<string>::iterator i = l.begin(); i != l.end(); i++ ) { cout << "coll name : " << *i << endl; } } diff --git a/client/examples/first.cpp b/client/examples/first.cpp index f3b654f..ab5efb3 100644 --- a/client/examples/first.cpp +++ b/client/examples/first.cpp @@ -40,7 +40,7 @@ int main( int argc, const char **argv ) { throw -12; port = argv[ 2 ]; } - + mongo::DBClientConnection conn; string errmsg; if ( ! conn.connect( string( "127.0.0.1:" ) + port , errmsg ) ) { @@ -48,14 +48,15 @@ int main( int argc, const char **argv ) { throw -11; } - { // clean up old data from any previous tests + { + // clean up old data from any previous tests mongo::BSONObjBuilder query; conn.remove( "test.people" , query.obj() ); } insert( conn , "eliot" , 15 ); insert( conn , "sara" , 23 ); - + { mongo::BSONObjBuilder query; auto_ptr<mongo::DBClientCursor> cursor = conn.query( "test.people" , query.obj() ); @@ -66,14 +67,14 @@ int main( int argc, const char **argv ) { } } - + { mongo::BSONObjBuilder query; query.append( "name" , "eliot" ); mongo::BSONObj res = conn.findOne( "test.people" , query.obj() ); cout << res.isEmpty() << "\t" << res.jsonString() << endl; } - + { mongo::BSONObjBuilder query; query.append( "name" , "asd" ); diff --git a/client/examples/httpClientTest.cpp b/client/examples/httpClientTest.cpp index 5d6c429..4fa5fd8 100644 --- a/client/examples/httpClientTest.cpp +++ b/client/examples/httpClientTest.cpp @@ -23,7 +23,7 @@ using namespace mongo; int main( int argc, const char **argv ) { - + int port = 27017; if ( argc != 1 ) { if ( argc != 3 ) @@ -31,13 +31,13 @@ int main( int argc, const char **argv ) { port = atoi( argv[ 2 ] ); } port += 1000; - + stringstream ss; ss << "http://localhost:" << port << "/"; string url = ss.str(); - + cout << "[" << url << "]" << endl; HttpClient c; - assert( c.get( url ) == 200 ); + MONGO_assert( c.get( url ) == 200 ); } diff --git a/client/examples/rs.cpp b/client/examples/rs.cpp new file mode 100644 index 0000000..7813ec6 --- /dev/null +++ b/client/examples/rs.cpp @@ -0,0 +1,58 @@ +// rs.cpp + +/* Copyright 2009 10gen Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * example of using replica sets from c++ + */ + +#include "client/dbclient.h" +#include <iostream> + +using namespace mongo; +using namespace std; + +int main( int argc , const char ** argv ) { + string errmsg; + ConnectionString cs = ConnectionString::parse( "foo/127.0.0.1" , errmsg ); + if ( ! cs.isValid() ) { + cout << "error parsing url: " << errmsg << endl; + return 1; + } + + DBClientReplicaSet * conn = (DBClientReplicaSet*)cs.connect( errmsg ); + if ( ! conn ) { + cout << "error connecting: " << errmsg << endl; + return 2; + } + + string collName = "test.rs1"; + + conn->dropCollection( collName ); + while ( true ) { + try { + conn->update( collName , BSONObj() , BSON( "$inc" << BSON( "x" << 1 ) ) , true ); + cout << conn->findOne( collName , BSONObj() ) << endl; + cout << "\t A" << conn->slaveConn().findOne( collName , BSONObj() , 0 , QueryOption_SlaveOk ) << endl; + cout << "\t B " << conn->findOne( collName , BSONObj() , 0 , QueryOption_SlaveOk ) << endl; + } + catch ( std::exception& e ) { + cout << "ERROR: " << e.what() << endl; + } + sleepsecs( 1 ); + } + +} diff --git a/client/examples/second.cpp b/client/examples/second.cpp index 68eafaa..6cc2111 100644 --- a/client/examples/second.cpp +++ b/client/examples/second.cpp @@ -23,7 +23,7 @@ using namespace std; using namespace mongo; int main( int argc, const char **argv ) { - + const char *port = "27017"; if ( argc != 1 ) { if ( argc != 3 ) diff --git a/client/examples/tail.cpp b/client/examples/tail.cpp index 3738b4f..90e62d2 100644 --- a/client/examples/tail.cpp +++ b/client/examples/tail.cpp @@ -23,24 +23,24 @@ using namespace mongo; void tail(DBClientBase& conn, const char *ns) { - BSONElement lastId = minKey.firstElement(); - Query query = Query(); - - auto_ptr<DBClientCursor> c = - conn.query(ns, query, 0, 0, 0, QueryOption_CursorTailable); - - while( 1 ) { - if( !c->more() ) { - if( c->isDead() ) { - break; // we need to requery - } - - // all data (so far) exhausted, wait for more - sleepsecs(1); - continue; - } - BSONObj o = c->next(); - lastId = o["_id"]; - cout << o.toString() << endl; - } + BSONElement lastId = minKey.firstElement(); + Query query = Query(); + + auto_ptr<DBClientCursor> c = + conn.query(ns, query, 0, 0, 0, QueryOption_CursorTailable); + + while( 1 ) { + if( !c->more() ) { + if( c->isDead() ) { + break; // we need to requery + } + + // all data (so far) exhausted, wait for more + sleepsecs(1); + continue; + } + BSONObj o = c->next(); + lastId = o["_id"]; + cout << o.toString() << endl; + } } diff --git a/client/examples/tutorial.cpp b/client/examples/tutorial.cpp index 28e1b27..3cdf359 100644 --- a/client/examples/tutorial.cpp +++ b/client/examples/tutorial.cpp @@ -23,45 +23,45 @@ using namespace mongo; void printIfAge(DBClientConnection& c, int age) { - auto_ptr<DBClientCursor> cursor = c.query("tutorial.persons", QUERY( "age" << age ).sort("name") ); - while( cursor->more() ) { - BSONObj p = cursor->next(); - cout << p.getStringField("name") << endl; - } + auto_ptr<DBClientCursor> cursor = c.query("tutorial.persons", QUERY( "age" << age ).sort("name") ); + while( cursor->more() ) { + BSONObj p = cursor->next(); + cout << p.getStringField("name") << endl; + } } void run() { - DBClientConnection c; - c.connect("localhost"); //"192.168.58.1"); - cout << "connected ok" << endl; - BSONObj p = BSON( "name" << "Joe" << "age" << 33 ); - c.insert("tutorial.persons", p); - p = BSON( "name" << "Jane" << "age" << 40 ); - c.insert("tutorial.persons", p); - p = BSON( "name" << "Abe" << "age" << 33 ); - c.insert("tutorial.persons", p); - p = BSON( "name" << "Samantha" << "age" << 21 << "city" << "Los Angeles" << "state" << "CA" ); - c.insert("tutorial.persons", p); + DBClientConnection c; + c.connect("localhost"); //"192.168.58.1"); + cout << "connected ok" << endl; + BSONObj p = BSON( "name" << "Joe" << "age" << 33 ); + c.insert("tutorial.persons", p); + p = BSON( "name" << "Jane" << "age" << 40 ); + c.insert("tutorial.persons", p); + p = BSON( "name" << "Abe" << "age" << 33 ); + c.insert("tutorial.persons", p); + p = BSON( "name" << "Samantha" << "age" << 21 << "city" << "Los Angeles" << "state" << "CA" ); + c.insert("tutorial.persons", p); - c.ensureIndex("tutorial.persons", fromjson("{age:1}")); + c.ensureIndex("tutorial.persons", fromjson("{age:1}")); - cout << "count:" << c.count("tutorial.persons") << endl; + cout << "count:" << c.count("tutorial.persons") << endl; - auto_ptr<DBClientCursor> cursor = c.query("tutorial.persons", BSONObj()); - while( cursor->more() ) { - cout << cursor->next().toString() << endl; - } + auto_ptr<DBClientCursor> cursor = c.query("tutorial.persons", BSONObj()); + while( cursor->more() ) { + cout << cursor->next().toString() << endl; + } - cout << "\nprintifage:\n"; - printIfAge(c, 33); + cout << "\nprintifage:\n"; + printIfAge(c, 33); } -int main() { - try { - run(); - } - catch( DBException &e ) { - cout << "caught " << e.what() << endl; - } - return 0; +int main() { + try { + run(); + } + catch( DBException &e ) { + cout << "caught " << e.what() << endl; + } + return 0; } diff --git a/client/examples/whereExample.cpp b/client/examples/whereExample.cpp index a26d921..ce4174b 100644 --- a/client/examples/whereExample.cpp +++ b/client/examples/whereExample.cpp @@ -23,7 +23,7 @@ using namespace std; using namespace mongo; int main( int argc, const char **argv ) { - + const char *port = "27017"; if ( argc != 1 ) { if ( argc != 3 ) @@ -36,7 +36,7 @@ int main( int argc, const char **argv ) { if ( ! conn.connect( string( "127.0.0.1:" ) + port , errmsg ) ) { cout << "couldn't connect : " << errmsg << endl; throw -11; - } + } const char * ns = "test.where"; @@ -44,9 +44,9 @@ int main( int argc, const char **argv ) { conn.insert( ns , BSON( "name" << "eliot" << "num" << 17 ) ); conn.insert( ns , BSON( "name" << "sara" << "num" << 24 ) ); - + auto_ptr<DBClientCursor> cursor = conn.query( ns , BSONObj() ); - + while ( cursor->more() ) { BSONObj obj = cursor->next(); cout << "\t" << obj.jsonString() << endl; @@ -64,5 +64,5 @@ int main( int argc, const char **argv ) { cout << "\t" << obj.jsonString() << endl; num++; } - assert( num == 1 ); + MONGO_assert( num == 1 ); } diff --git a/client/gridfs.cpp b/client/gridfs.cpp index d740c76..233724a 100644 --- a/client/gridfs.cpp +++ b/client/gridfs.cpp @@ -34,11 +34,11 @@ namespace mongo { const unsigned DEFAULT_CHUNK_SIZE = 256 * 1024; - GridFSChunk::GridFSChunk( BSONObj o ){ + GridFSChunk::GridFSChunk( BSONObj o ) { _data = o; } - GridFSChunk::GridFSChunk( BSONObj fileObject , int chunkNumber , const char * data , int len ){ + GridFSChunk::GridFSChunk( BSONObj fileObject , int chunkNumber , const char * data , int len ) { BSONObjBuilder b; b.appendAs( fileObject["_id"] , "files_id" ); b.append( "n" , chunkNumber ); @@ -47,7 +47,7 @@ namespace mongo { } - GridFS::GridFS( DBClientBase& client , const string& dbName , const string& prefix ) : _client( client ) , _dbName( dbName ) , _prefix( prefix ){ + GridFS::GridFS( DBClientBase& client , const string& dbName , const string& prefix ) : _client( client ) , _dbName( dbName ) , _prefix( prefix ) { _filesNS = dbName + "." + prefix + ".files"; _chunksNS = dbName + "." + prefix + ".chunks"; _chunkSize = DEFAULT_CHUNK_SIZE; @@ -56,7 +56,7 @@ namespace mongo { client.ensureIndex( _chunksNS , BSON( "files_id" << 1 << "n" << 1 ) ); } - GridFS::~GridFS(){ + GridFS::~GridFS() { } @@ -65,7 +65,7 @@ namespace mongo { _chunkSize = size; } - BSONObj GridFS::storeFile( const char* data , size_t length , const string& remoteName , const string& contentType){ + BSONObj GridFS::storeFile( const char* data , size_t length , const string& remoteName , const string& contentType) { char const * const end = data + length; OID id; @@ -73,7 +73,7 @@ namespace mongo { BSONObj idObj = BSON("_id" << id); int chunkNumber = 0; - while (data < end){ + while (data < end) { int chunkLen = MIN(_chunkSize, (unsigned)(end-data)); GridFSChunk c(idObj, chunkNumber, data, chunkLen); _client.insert( _chunksNS.c_str() , c._data ); @@ -86,7 +86,7 @@ namespace mongo { } - BSONObj GridFS::storeFile( const string& fileName , const string& remoteName , const string& contentType){ + BSONObj GridFS::storeFile( const string& fileName , const string& remoteName , const string& contentType) { uassert( 10012 , "file doesn't exist" , fileName == "-" || boost::filesystem::exists( fileName ) ); FILE* fd; @@ -102,12 +102,12 @@ namespace mongo { int chunkNumber = 0; gridfs_offset length = 0; - while (!feof(fd)){ + while (!feof(fd)) { //boost::scoped_array<char>buf (new char[_chunkSize+1]); char * buf = new char[_chunkSize+1]; char* bufPos = buf;//.get(); unsigned int chunkLen = 0; // how much in the chunk now - while(chunkLen != _chunkSize && !feof(fd)){ + while(chunkLen != _chunkSize && !feof(fd)) { int readLen = fread(bufPos, 1, _chunkSize - chunkLen, fd); chunkLen += readLen; bufPos += readLen; @@ -125,11 +125,11 @@ namespace mongo { if (fd != stdin) fclose( fd ); - + return insertFile((remoteName.empty() ? fileName : remoteName), id, length, contentType); } - BSONObj GridFS::insertFile(const string& name, const OID& id, gridfs_offset length, const string& contentType){ + BSONObj GridFS::insertFile(const string& name, const OID& id, gridfs_offset length, const string& contentType) { BSONObj res; if ( ! _client.runCommand( _dbName.c_str() , BSON( "filemd5" << id << "root" << _prefix ) , res ) ) @@ -143,9 +143,10 @@ namespace mongo { << "md5" << res["md5"] ; - if (length < 1024*1024*1024){ // 2^30 + if (length < 1024*1024*1024) { // 2^30 file << "length" << (int) length; - }else{ + } + else { file << "length" << (long long) length; } @@ -158,9 +159,9 @@ namespace mongo { return ret; } - void GridFS::removeFile( const string& fileName ){ + void GridFS::removeFile( const string& fileName ) { auto_ptr<DBClientCursor> files = _client.query( _filesNS , BSON( "filename" << fileName ) ); - while (files->more()){ + while (files->more()) { BSONObj file = files->next(); BSONElement id = file["_id"]; _client.remove( _filesNS.c_str() , BSON( "_id" << id ) ); @@ -168,38 +169,38 @@ namespace mongo { } } - GridFile::GridFile( GridFS * grid , BSONObj obj ){ + GridFile::GridFile( GridFS * grid , BSONObj obj ) { _grid = grid; _obj = obj; } - GridFile GridFS::findFile( const string& fileName ){ + GridFile GridFS::findFile( const string& fileName ) { return findFile( BSON( "filename" << fileName ) ); }; - GridFile GridFS::findFile( BSONObj query ){ + GridFile GridFS::findFile( BSONObj query ) { query = BSON("query" << query << "orderby" << BSON("uploadDate" << -1)); return GridFile( this , _client.findOne( _filesNS.c_str() , query ) ); } - auto_ptr<DBClientCursor> GridFS::list(){ + auto_ptr<DBClientCursor> GridFS::list() { return _client.query( _filesNS.c_str() , BSONObj() ); } - auto_ptr<DBClientCursor> GridFS::list( BSONObj o ){ + auto_ptr<DBClientCursor> GridFS::list( BSONObj o ) { return _client.query( _filesNS.c_str() , o ); } - BSONObj GridFile::getMetadata(){ + BSONObj GridFile::getMetadata() { BSONElement meta_element = _obj["metadata"]; - if( meta_element.eoo() ){ + if( meta_element.eoo() ) { return BSONObj(); } return meta_element.embeddedObject(); } - GridFSChunk GridFile::getChunk( int n ){ + GridFSChunk GridFile::getChunk( int n ) { _exists(); BSONObjBuilder b; b.appendAs( _obj["_id"] , "files_id" ); @@ -210,12 +211,12 @@ namespace mongo { return GridFSChunk(o); } - gridfs_offset GridFile::write( ostream & out ){ + gridfs_offset GridFile::write( ostream & out ) { _exists(); const int num = getNumChunks(); - for ( int i=0; i<num; i++ ){ + for ( int i=0; i<num; i++ ) { GridFSChunk c = getChunk( i ); int len; @@ -226,17 +227,18 @@ namespace mongo { return getContentLength(); } - gridfs_offset GridFile::write( const string& where ){ - if (where == "-"){ + gridfs_offset GridFile::write( const string& where ) { + if (where == "-") { return write( cout ); - } else { + } + else { ofstream out(where.c_str() , ios::out | ios::binary ); uassert(13325, "couldn't open file: " + where, out.is_open() ); return write( out ); } } - void GridFile::_exists(){ + void GridFile::_exists() { uassert( 10015 , "doesn't exists" , exists() ); } diff --git a/client/gridfs.h b/client/gridfs.h index 1c55f79..b52cf75 100644 --- a/client/gridfs.h +++ b/client/gridfs.h @@ -32,13 +32,13 @@ namespace mongo { GridFSChunk( BSONObj data ); GridFSChunk( BSONObj fileId , int chunkNumber , const char * data , int len ); - int len(){ + int len() { int len; _data["data"].binDataClean( len ); return len; } - const char * data( int & len ){ + const char * data( int & len ) { return _data["data"].binDataClean( len ); } @@ -49,9 +49,10 @@ namespace mongo { /** - this is the main entry point into the mongo grid fs + GridFS is for storing large file-style objects in MongoDB. + @see http://www.mongodb.org/display/DOCS/GridFS+Specification */ - class GridFS{ + class GridFS { public: /** * @param client - db connection @@ -88,6 +89,7 @@ namespace mongo { * @return the file object */ BSONObj storeFile( const char* data , size_t length , const string& remoteName , const string& contentType=""); + /** * removes file referenced by fileName from the db * @param fileName filename (in GridFS) of the file to remove @@ -138,41 +140,41 @@ namespace mongo { * @return whether or not this file exists * findFile will always return a GriFile, so need to check this */ - bool exists(){ + bool exists() { return ! _obj.isEmpty(); } - string getFilename(){ + string getFilename() { return _obj["filename"].str(); } - int getChunkSize(){ + int getChunkSize() { return (int)(_obj["chunkSize"].number()); } - gridfs_offset getContentLength(){ + gridfs_offset getContentLength() { return (gridfs_offset)(_obj["length"].number()); } - string getContentType(){ + string getContentType() { return _obj["contentType"].valuestr(); } - Date_t getUploadDate(){ + Date_t getUploadDate() { return _obj["uploadDate"].date(); } - string getMD5(){ + string getMD5() { return _obj["md5"].str(); } - BSONElement getFileField( const string& name ){ + BSONElement getFileField( const string& name ) { return _obj[name]; } BSONObj getMetadata(); - int getNumChunks(){ + int getNumChunks() { return (int) ceil( (double)getContentLength() / (double)getChunkSize() ); } diff --git a/client/model.cpp b/client/model.cpp index 7861b91..bd10a3c 100644 --- a/client/model.cpp +++ b/client/model.cpp @@ -21,23 +21,23 @@ namespace mongo { - bool Model::load(BSONObj& query){ + bool Model::load(BSONObj& query) { ScopedDbConnection conn( modelServer() ); BSONObj b = conn->findOne(getNS(), query); conn.done(); - + if ( b.isEmpty() ) return false; - + unserialize(b); _id = b["_id"].wrap().getOwned(); return true; } - void Model::remove( bool safe ){ + void Model::remove( bool safe ) { uassert( 10016 , "_id isn't set - needed for remove()" , _id["_id"].type() ); - + ScopedDbConnection conn( modelServer() ); conn->remove( getNS() , _id ); @@ -46,34 +46,34 @@ namespace mongo { errmsg = conn->getLastError(); conn.done(); - + if ( safe && errmsg.size() ) throw UserException( 9002 , (string)"error on Model::remove: " + errmsg ); } - void Model::save( bool safe ){ + void Model::save( bool safe ) { ScopedDbConnection conn( modelServer() ); BSONObjBuilder b; serialize( b ); - + BSONElement myId; { BSONObjIterator i = b.iterator(); - while ( i.more() ){ + while ( i.more() ) { BSONElement e = i.next(); - if ( strcmp( e.fieldName() , "_id" ) == 0 ){ + if ( strcmp( e.fieldName() , "_id" ) == 0 ) { myId = e; break; } } } - if ( myId.type() ){ - if ( _id.isEmpty() ){ + if ( myId.type() ) { + if ( _id.isEmpty() ) { _id = myId.wrap(); } - else if ( myId.woCompare( _id.firstElement() ) ){ + else if ( myId.woCompare( _id.firstElement() ) ) { stringstream ss; ss << "_id from serialize and stored differ: "; ss << '[' << myId << "] != "; @@ -82,11 +82,11 @@ namespace mongo { } } - if ( _id.isEmpty() ){ + if ( _id.isEmpty() ) { OID oid; oid.init(); b.appendOID( "_id" , &oid ); - + BSONObj o = b.obj(); conn->insert( getNS() , o ); _id = o["_id"].wrap().getOwned(); @@ -94,25 +94,25 @@ namespace mongo { log(4) << "inserted new model " << getNS() << " " << o << endl; } else { - if ( myId.eoo() ){ + if ( myId.eoo() ) { myId = _id["_id"]; b.append( myId ); } - + assert( ! myId.eoo() ); BSONObjBuilder qb; qb.append( myId ); - + BSONObj q = qb.obj(); BSONObj o = b.obj(); log(4) << "updated model" << getNS() << " " << q << " " << o << endl; conn->update( getNS() , q , o , true ); - + } - + string errmsg = ""; if ( safe ) errmsg = conn->getLastError(); @@ -123,13 +123,13 @@ namespace mongo { throw UserException( 9003 , (string)"error on Model::save: " + errmsg ); } - BSONObj Model::toObject(){ + BSONObj Model::toObject() { BSONObjBuilder b; serialize( b ); return b.obj(); } - void Model::append( const char * name , BSONObjBuilder& b ){ + void Model::append( const char * name , BSONObjBuilder& b ) { BSONObjBuilder bb( b.subobjStart( name ) ); serialize( bb ); bb.done(); diff --git a/client/model.h b/client/model.h index 108efc0..7dd3143 100644 --- a/client/model.h +++ b/client/model.h @@ -43,16 +43,16 @@ namespace mongo { virtual void unserialize(const BSONObj& from) = 0; virtual BSONObj toObject(); virtual void append( const char * name , BSONObjBuilder& b ); - + virtual string modelServer() = 0; - - /** Load a single object. + + /** Load a single object. @return true if successful. */ virtual bool load(BSONObj& query); virtual void save( bool safe=false ); virtual void remove( bool safe=false ); - + protected: BSONObj _id; }; diff --git a/client/mongo_client_lib.cpp b/client/mongo_client_lib.cpp new file mode 100644 index 0000000..69f801a --- /dev/null +++ b/client/mongo_client_lib.cpp @@ -0,0 +1,66 @@ +/* @file client_lib.cpp + + MongoDB C++ Driver + + Normally one includes dbclient.h, and links against libmongoclient.a, when connecting to MongoDB + from C++. However, if you have a situation where the pre-built library does not work, you can use + this file instead to build all the necessary symbols. To do so, include client_lib.cpp in your + project. + + For example, to build and run simple_client_demo.cpp with GCC and run it: + + g++ -I .. simple_client_demo.cpp mongo_client_lib.cpp -lboost_thread-mt -lboost_filesystem + ./a.out +*/ + +/* Copyright 2009 10gen Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "../util/md5main.cpp" + +#define MONGO_EXPOSE_MACROS +#include "../pch.h" + +#include "../util/assert_util.cpp" +#include "../util/message.cpp" +#include "../util/util.cpp" +#include "../util/background.cpp" +#include "../util/base64.cpp" +#include "../util/sock.cpp" +#include "../util/log.cpp" +#include "../util/password.cpp" + +#include "../util/concurrency/thread_pool.cpp" +#include "../util/concurrency/vars.cpp" +#include "../util/concurrency/task.cpp" + +#include "connpool.cpp" +#include "syncclusterconnection.cpp" +#include "dbclient.cpp" +#include "clientOnly.cpp" +#include "gridfs.cpp" +#include "dbclientcursor.cpp" + +#include "../db/lasterror.cpp" +#include "../db/json.cpp" +#include "../db/jsobj.cpp" +#include "../db/common.cpp" +#include "../db/nonce.cpp" +#include "../db/commands.cpp" + +extern "C" { +#include "../util/md5.c" +} + diff --git a/client/parallel.cpp b/client/parallel.cpp index 92d1b04..c4905e3 100644 --- a/client/parallel.cpp +++ b/client/parallel.cpp @@ -25,10 +25,10 @@ #include "../s/shard.h" namespace mongo { - + // -------- ClusteredCursor ----------- - - ClusteredCursor::ClusteredCursor( QueryMessage& q ){ + + ClusteredCursor::ClusteredCursor( QueryMessage& q ) { _ns = q.ns; _query = q.query.copy(); _options = q.queryOptions; @@ -41,7 +41,7 @@ namespace mongo { _didInit = false; } - ClusteredCursor::ClusteredCursor( const string& ns , const BSONObj& q , int options , const BSONObj& fields ){ + ClusteredCursor::ClusteredCursor( const string& ns , const BSONObj& q , int options , const BSONObj& fields ) { _ns = ns; _query = q.getOwned(); _options = options; @@ -52,94 +52,112 @@ namespace mongo { _didInit = false; } - ClusteredCursor::~ClusteredCursor(){ + ClusteredCursor::~ClusteredCursor() { _done = true; // just in case } - void ClusteredCursor::init(){ + void ClusteredCursor::init() { if ( _didInit ) return; _didInit = true; _init(); } - - auto_ptr<DBClientCursor> ClusteredCursor::query( const string& server , int num , BSONObj extra , int skipLeft ){ + + auto_ptr<DBClientCursor> ClusteredCursor::query( const string& server , int num , BSONObj extra , int skipLeft ) { uassert( 10017 , "cursor already done" , ! _done ); assert( _didInit ); - + BSONObj q = _query; - if ( ! extra.isEmpty() ){ + if ( ! extra.isEmpty() ) { q = concatQuery( q , extra ); } - ShardConnection conn( server , _ns ); - - if ( conn.setVersion() ){ - conn.done(); - throw StaleConfigException( _ns , "ClusteredCursor::query ShardConnection had to change" , true ); - } - - if ( logLevel >= 5 ){ - log(5) << "ClusteredCursor::query (" << type() << ") server:" << server - << " ns:" << _ns << " query:" << q << " num:" << num - << " _fields:" << _fields << " options: " << _options << endl; - } - - auto_ptr<DBClientCursor> cursor = - conn->query( _ns , q , num , 0 , ( _fields.isEmpty() ? 0 : &_fields ) , _options , _batchSize == 0 ? 0 : _batchSize + skipLeft ); - - assert( cursor.get() ); - - if ( cursor->hasResultFlag( ResultFlag_ShardConfigStale ) ){ + try { + ShardConnection conn( server , _ns ); + + if ( conn.setVersion() ) { + conn.done(); + throw StaleConfigException( _ns , "ClusteredCursor::query ShardConnection had to change" , true ); + } + + if ( logLevel >= 5 ) { + log(5) << "ClusteredCursor::query (" << type() << ") server:" << server + << " ns:" << _ns << " query:" << q << " num:" << num + << " _fields:" << _fields << " options: " << _options << endl; + } + + auto_ptr<DBClientCursor> cursor = + conn->query( _ns , q , num , 0 , ( _fields.isEmpty() ? 0 : &_fields ) , _options , _batchSize == 0 ? 0 : _batchSize + skipLeft ); + + if ( ! cursor.get() && _options & QueryOption_PartialResults ) { + _done = true; + conn.done(); + return cursor; + } + + massert( 13633 , str::stream() << "error querying server: " << server , cursor.get() ); + + if ( cursor->hasResultFlag( ResultFlag_ShardConfigStale ) ) { + conn.done(); + throw StaleConfigException( _ns , "ClusteredCursor::query" ); + } + + if ( cursor->hasResultFlag( ResultFlag_ErrSet ) ) { + conn.done(); + BSONObj o = cursor->next(); + throw UserException( o["code"].numberInt() , o["$err"].String() ); + } + + + cursor->attach( &conn ); + conn.done(); - throw StaleConfigException( _ns , "ClusteredCursor::query" ); + return cursor; } - - if ( cursor->hasResultFlag( ResultFlag_ErrSet ) ){ - conn.done(); - BSONObj o = cursor->next(); - throw UserException( o["code"].numberInt() , o["$err"].String() ); + catch ( SocketException& e ) { + if ( ! ( _options & QueryOption_PartialResults ) ) + throw e; + _done = true; + return auto_ptr<DBClientCursor>(); } - - - cursor->attach( &conn ); - - conn.done(); - return cursor; } - BSONObj ClusteredCursor::explain( const string& server , BSONObj extra ){ + BSONObj ClusteredCursor::explain( const string& server , BSONObj extra ) { BSONObj q = _query; - if ( ! extra.isEmpty() ){ + if ( ! extra.isEmpty() ) { q = concatQuery( q , extra ); } + BSONObj o; + ShardConnection conn( server , _ns ); - BSONObj o = conn->findOne( _ns , Query( q ).explain() ); + auto_ptr<DBClientCursor> cursor = conn->query( _ns , Query( q ).explain() , abs( _batchSize ) * -1 , 0 , _fields.isEmpty() ? 0 : &_fields ); + if ( cursor.get() && cursor->more() ) + o = cursor->next().getOwned(); conn.done(); return o; } - BSONObj ClusteredCursor::concatQuery( const BSONObj& query , const BSONObj& extraFilter ){ + BSONObj ClusteredCursor::concatQuery( const BSONObj& query , const BSONObj& extraFilter ) { if ( ! query.hasField( "query" ) ) return _concatFilter( query , extraFilter ); BSONObjBuilder b; BSONObjIterator i( query ); - while ( i.more() ){ + while ( i.more() ) { BSONElement e = i.next(); - if ( strcmp( e.fieldName() , "query" ) ){ + if ( strcmp( e.fieldName() , "query" ) ) { b.append( e ); continue; } - + b.append( "query" , _concatFilter( e.embeddedObjectUserCheck() , extraFilter ) ); } return b.obj(); } - - BSONObj ClusteredCursor::_concatFilter( const BSONObj& filter , const BSONObj& extra ){ + + BSONObj ClusteredCursor::_concatFilter( const BSONObj& filter , const BSONObj& extra ) { BSONObjBuilder b; b.appendElements( filter ); b.appendElements( extra ); @@ -147,32 +165,41 @@ namespace mongo { // TODO: should do some simplification here if possibl ideally } - BSONObj ClusteredCursor::explain(){ + BSONObj ClusteredCursor::explain() { + // Note: by default we filter out allPlans and oldPlan in the shell's + // explain() function. If you add any recursive structures, make sure to + // edit the JS to make sure everything gets filtered. + BSONObjBuilder b; b.append( "clusteredType" , type() ); - long long nscanned = 0; - long long nscannedObjects = 0; - long long n = 0; long long millis = 0; double numExplains = 0; - + + map<string,long long> counters; + map<string,list<BSONObj> > out; { _explain( out ); - + BSONObjBuilder x( b.subobjStart( "shards" ) ); - for ( map<string,list<BSONObj> >::iterator i=out.begin(); i!=out.end(); ++i ){ + for ( map<string,list<BSONObj> >::iterator i=out.begin(); i!=out.end(); ++i ) { string shard = i->first; list<BSONObj> l = i->second; - BSONArrayBuilder y( x.subarrayStart( shard.c_str() ) ); - for ( list<BSONObj>::iterator j=l.begin(); j!=l.end(); ++j ){ + BSONArrayBuilder y( x.subarrayStart( shard ) ); + for ( list<BSONObj>::iterator j=l.begin(); j!=l.end(); ++j ) { BSONObj temp = *j; y.append( temp ); - nscanned += temp["nscanned"].numberLong(); - nscannedObjects += temp["nscannedObjects"].numberLong(); - n += temp["n"].numberLong(); + BSONObjIterator k( temp ); + while ( k.more() ) { + BSONElement z = k.next(); + if ( z.fieldName()[0] != 'n' ) + continue; + long long& c = counters[z.fieldName()]; + c += z.numberLong(); + } + millis += temp["millis"].numberLong(); numExplains++; } @@ -181,9 +208,9 @@ namespace mongo { x.done(); } - b.appendNumber( "nscanned" , nscanned ); - b.appendNumber( "nscannedObjects" , nscannedObjects ); - b.appendNumber( "n" , n ); + for ( map<string,long long>::iterator i=counters.begin(); i!=counters.end(); ++i ) + b.appendNumber( i->first , i->second ); + b.appendNumber( "millisTotal" , millis ); b.append( "millisAvg" , (int)((double)millis / numExplains ) ); b.append( "numQueries" , (int)numExplains ); @@ -191,37 +218,37 @@ namespace mongo { return b.obj(); } - + // -------- FilteringClientCursor ----------- FilteringClientCursor::FilteringClientCursor( const BSONObj filter ) - : _matcher( filter ) , _done( true ){ + : _matcher( filter ) , _done( true ) { } FilteringClientCursor::FilteringClientCursor( auto_ptr<DBClientCursor> cursor , const BSONObj filter ) - : _matcher( filter ) , _cursor( cursor ) , _done( cursor.get() == 0 ){ + : _matcher( filter ) , _cursor( cursor ) , _done( cursor.get() == 0 ) { } - - FilteringClientCursor::~FilteringClientCursor(){ + + FilteringClientCursor::~FilteringClientCursor() { } - - void FilteringClientCursor::reset( auto_ptr<DBClientCursor> cursor ){ + + void FilteringClientCursor::reset( auto_ptr<DBClientCursor> cursor ) { _cursor = cursor; _next = BSONObj(); _done = _cursor.get() == 0; } - bool FilteringClientCursor::more(){ + bool FilteringClientCursor::more() { if ( ! _next.isEmpty() ) return true; - + if ( _done ) return false; - + _advance(); return ! _next.isEmpty(); } - - BSONObj FilteringClientCursor::next(){ + + BSONObj FilteringClientCursor::next() { assert( ! _next.isEmpty() ); assert( ! _done ); @@ -231,20 +258,20 @@ namespace mongo { return ret; } - BSONObj FilteringClientCursor::peek(){ + BSONObj FilteringClientCursor::peek() { if ( _next.isEmpty() ) _advance(); return _next; } - - void FilteringClientCursor::_advance(){ + + void FilteringClientCursor::_advance() { assert( _next.isEmpty() ); if ( ! _cursor.get() || _done ) return; - - while ( _cursor->more() ){ + + while ( _cursor->more() ) { _next = _cursor->next(); - if ( _matcher.matches( _next ) ){ + if ( _matcher.matches( _next ) ) { if ( ! _cursor->moreInCurrentBatch() ) _next = _next.getOwned(); return; @@ -253,53 +280,53 @@ namespace mongo { } _done = true; } - + // -------- SerialServerClusteredCursor ----------- - - SerialServerClusteredCursor::SerialServerClusteredCursor( const set<ServerAndQuery>& servers , QueryMessage& q , int sortOrder) : ClusteredCursor( q ){ + + SerialServerClusteredCursor::SerialServerClusteredCursor( const set<ServerAndQuery>& servers , QueryMessage& q , int sortOrder) : ClusteredCursor( q ) { for ( set<ServerAndQuery>::const_iterator i = servers.begin(); i!=servers.end(); i++ ) _servers.push_back( *i ); - + if ( sortOrder > 0 ) sort( _servers.begin() , _servers.end() ); else if ( sortOrder < 0 ) sort( _servers.rbegin() , _servers.rend() ); - + _serverIndex = 0; _needToSkip = q.ntoskip; } - - bool SerialServerClusteredCursor::more(){ - + + bool SerialServerClusteredCursor::more() { + // TODO: optimize this by sending on first query and then back counting // tricky in case where 1st server doesn't have any after // need it to send n skipped - while ( _needToSkip > 0 && _current.more() ){ + while ( _needToSkip > 0 && _current.more() ) { _current.next(); _needToSkip--; } - + if ( _current.more() ) return true; - - if ( _serverIndex >= _servers.size() ){ + + if ( _serverIndex >= _servers.size() ) { return false; } - + ServerAndQuery& sq = _servers[_serverIndex++]; _current.reset( query( sq._server , 0 , sq._extra ) ); return more(); } - - BSONObj SerialServerClusteredCursor::next(){ + + BSONObj SerialServerClusteredCursor::next() { uassert( 10018 , "no more items" , more() ); return _current.next(); } - void SerialServerClusteredCursor::_explain( map< string,list<BSONObj> >& out ){ - for ( unsigned i=0; i<_servers.size(); i++ ){ + void SerialServerClusteredCursor::_explain( map< string,list<BSONObj> >& out ) { + for ( unsigned i=0; i<_servers.size(); i++ ) { ServerAndQuery& sq = _servers[i]; list<BSONObj> & l = out[sq._server]; l.push_back( explain( sq._server , sq._extra ) ); @@ -307,132 +334,142 @@ namespace mongo { } // -------- ParallelSortClusteredCursor ----------- - - ParallelSortClusteredCursor::ParallelSortClusteredCursor( const set<ServerAndQuery>& servers , QueryMessage& q , - const BSONObj& sortKey ) - : ClusteredCursor( q ) , _servers( servers ){ + + ParallelSortClusteredCursor::ParallelSortClusteredCursor( const set<ServerAndQuery>& servers , QueryMessage& q , + const BSONObj& sortKey ) + : ClusteredCursor( q ) , _servers( servers ) { _sortKey = sortKey.getOwned(); _needToSkip = q.ntoskip; _finishCons(); } - ParallelSortClusteredCursor::ParallelSortClusteredCursor( const set<ServerAndQuery>& servers , const string& ns , - const Query& q , - int options , const BSONObj& fields ) - : ClusteredCursor( ns , q.obj , options , fields ) , _servers( servers ){ + ParallelSortClusteredCursor::ParallelSortClusteredCursor( const set<ServerAndQuery>& servers , const string& ns , + const Query& q , + int options , const BSONObj& fields ) + : ClusteredCursor( ns , q.obj , options , fields ) , _servers( servers ) { _sortKey = q.getSort().copy(); _needToSkip = 0; _finishCons(); } - void ParallelSortClusteredCursor::_finishCons(){ + void ParallelSortClusteredCursor::_finishCons() { _numServers = _servers.size(); _cursors = 0; - if ( ! _sortKey.isEmpty() && ! _fields.isEmpty() ){ - // we need to make sure the sort key is in the project - bool isNegative = false; + if ( ! _sortKey.isEmpty() && ! _fields.isEmpty() ) { + // we need to make sure the sort key is in the projection + + set<string> sortKeyFields; + _sortKey.getFieldNames(sortKeyFields); + BSONObjBuilder b; + bool isNegative = false; { BSONObjIterator i( _fields ); - while ( i.more() ){ + while ( i.more() ) { BSONElement e = i.next(); b.append( e ); - if ( ! e.trueValue() ) - isNegative = true; - } - } - - { - BSONObjIterator i( _sortKey ); - while ( i.more() ){ - BSONElement e = i.next(); - BSONElement f = _fields.getField( e.fieldName() ); - if ( isNegative ){ - uassert( 13431 , "have to have sort key in projection and removing it" , f.eoo() ); + + string fieldName = e.fieldName(); + + // exact field + bool found = sortKeyFields.erase(fieldName); + + // subfields + set<string>::const_iterator begin = sortKeyFields.lower_bound(fieldName + ".\x00"); + set<string>::const_iterator end = sortKeyFields.lower_bound(fieldName + ".\xFF"); + sortKeyFields.erase(begin, end); + + if ( ! e.trueValue() ) { + uassert( 13431 , "have to have sort key in projection and removing it" , !found && begin == end ); } - else if ( f.eoo() ){ - // add to projection - b.append( e ); + else if (!e.isABSONObj()) { + isNegative = true; } } } - + + if (isNegative) { + for (set<string>::const_iterator it(sortKeyFields.begin()), end(sortKeyFields.end()); it != end; ++it) { + b.append(*it, 1); + } + } + _fields = b.obj(); } } - - void ParallelSortClusteredCursor::_init(){ + + void ParallelSortClusteredCursor::_init() { assert( ! _cursors ); _cursors = new FilteringClientCursor[_numServers]; - + // TODO: parellize int num = 0; - for ( set<ServerAndQuery>::iterator i = _servers.begin(); i!=_servers.end(); ++i ){ + for ( set<ServerAndQuery>::iterator i = _servers.begin(); i!=_servers.end(); ++i ) { const ServerAndQuery& sq = *i; _cursors[num++].reset( query( sq._server , 0 , sq._extra , _needToSkip ) ); } - + } - - ParallelSortClusteredCursor::~ParallelSortClusteredCursor(){ + + ParallelSortClusteredCursor::~ParallelSortClusteredCursor() { delete [] _cursors; _cursors = 0; } - bool ParallelSortClusteredCursor::more(){ + bool ParallelSortClusteredCursor::more() { - if ( _needToSkip > 0 ){ + if ( _needToSkip > 0 ) { int n = _needToSkip; _needToSkip = 0; - while ( n > 0 && more() ){ + while ( n > 0 && more() ) { BSONObj x = next(); n--; } _needToSkip = n; } - - for ( int i=0; i<_numServers; i++ ){ + + for ( int i=0; i<_numServers; i++ ) { if ( _cursors[i].more() ) return true; } return false; } - - BSONObj ParallelSortClusteredCursor::next(){ + + BSONObj ParallelSortClusteredCursor::next() { BSONObj best = BSONObj(); int bestFrom = -1; - - for ( int i=0; i<_numServers; i++){ + + for ( int i=0; i<_numServers; i++) { if ( ! _cursors[i].more() ) continue; - + BSONObj me = _cursors[i].peek(); - if ( best.isEmpty() ){ + if ( best.isEmpty() ) { best = me; bestFrom = i; continue; } - + int comp = best.woSortOrder( me , _sortKey , true ); if ( comp < 0 ) continue; - + best = me; bestFrom = i; } - + uassert( 10019 , "no more elements" , ! best.isEmpty() ); _cursors[bestFrom].next(); - + return best; } - void ParallelSortClusteredCursor::_explain( map< string,list<BSONObj> >& out ){ - for ( set<ServerAndQuery>::iterator i=_servers.begin(); i!=_servers.end(); ++i ){ + void ParallelSortClusteredCursor::_explain( map< string,list<BSONObj> >& out ) { + for ( set<ServerAndQuery>::iterator i=_servers.begin(); i!=_servers.end(); ++i ) { const ServerAndQuery& sq = *i; list<BSONObj> & l = out[sq._server]; l.push_back( explain( sq._server , sq._extra ) ); @@ -444,39 +481,50 @@ namespace mongo { // ---- Future ----- // ----------------- - Future::CommandResult::CommandResult( const string& server , const string& db , const BSONObj& cmd ){ + Future::CommandResult::CommandResult( const string& server , const string& db , const BSONObj& cmd , DBClientBase * conn ) { _server = server; _db = db; _cmd = cmd; + _conn = conn; _done = false; } - bool Future::CommandResult::join(){ + bool Future::CommandResult::join() { _thr->join(); assert( _done ); return _ok; } - void Future::commandThread( shared_ptr<CommandResult> res ){ + void Future::commandThread(shared_ptr<CommandResult> res) { setThreadName( "future" ); try { - ScopedDbConnection conn( res->_server ); + DBClientBase * conn = res->_conn; + + scoped_ptr<ScopedDbConnection> myconn; + if ( ! conn ){ + myconn.reset( new ScopedDbConnection( res->_server ) ); + conn = myconn->get(); + } + res->_ok = conn->runCommand( res->_db , res->_cmd , res->_res ); - conn.done(); + + if ( myconn ) + myconn->done(); + } - catch ( std::exception& e ){ + catch ( std::exception& e ) { error() << "Future::commandThread exception: " << e.what() << endl; res->_ok = false; } res->_done = true; } - shared_ptr<Future::CommandResult> Future::spawnCommand( const string& server , const string& db , const BSONObj& cmd ){ - shared_ptr<Future::CommandResult> res( new Future::CommandResult( server , db , cmd ) ); - res->_thr.reset( new boost::thread( boost::bind( Future::commandThread , res ) ) ); + shared_ptr<Future::CommandResult> Future::spawnCommand( const string& server , const string& db , const BSONObj& cmd , DBClientBase * conn ) { + shared_ptr<Future::CommandResult> res (new Future::CommandResult( server , db , cmd , conn )); + res->_thr.reset( new boost::thread( boost::bind(Future::commandThread, res) ) ); + return res; } - - + } diff --git a/client/parallel.h b/client/parallel.h index 603cfe7..0809376 100644 --- a/client/parallel.h +++ b/client/parallel.h @@ -24,6 +24,7 @@ #include "redef_macros.h" #include "../db/dbmessage.h" #include "../db/matcher.h" +#include "../util/concurrency/mvar.h" namespace mongo { @@ -32,14 +33,14 @@ namespace mongo { */ class ServerAndQuery { public: - ServerAndQuery( const string& server , BSONObj extra = BSONObj() , BSONObj orderObject = BSONObj() ) : - _server( server ) , _extra( extra.getOwned() ) , _orderObject( orderObject.getOwned() ){ + ServerAndQuery( const string& server , BSONObj extra = BSONObj() , BSONObj orderObject = BSONObj() ) : + _server( server ) , _extra( extra.getOwned() ) , _orderObject( orderObject.getOwned() ) { } - bool operator<( const ServerAndQuery& other ) const{ + bool operator<( const ServerAndQuery& other ) const { if ( ! _orderObject.isEmpty() ) return _orderObject.woCompare( other._orderObject ) < 0; - + if ( _server < other._server ) return true; if ( other._server > _server ) @@ -71,28 +72,28 @@ namespace mongo { ClusteredCursor( QueryMessage& q ); ClusteredCursor( const string& ns , const BSONObj& q , int options=0 , const BSONObj& fields=BSONObj() ); virtual ~ClusteredCursor(); - + /** call before using */ void init(); - + virtual bool more() = 0; virtual BSONObj next() = 0; - + static BSONObj concatQuery( const BSONObj& query , const BSONObj& extraFilter ); - + virtual string type() const = 0; virtual BSONObj explain(); protected: - + virtual void _init() = 0; auto_ptr<DBClientCursor> query( const string& server , int num = 0 , BSONObj extraFilter = BSONObj() , int skipLeft = 0 ); BSONObj explain( const string& server , BSONObj extraFilter = BSONObj() ); - + static BSONObj _concatFilter( const BSONObj& filter , const BSONObj& extraFilter ); - + virtual void _explain( map< string,list<BSONObj> >& out ) = 0; string _ns; @@ -112,19 +113,19 @@ namespace mongo { FilteringClientCursor( const BSONObj filter = BSONObj() ); FilteringClientCursor( auto_ptr<DBClientCursor> cursor , const BSONObj filter = BSONObj() ); ~FilteringClientCursor(); - + void reset( auto_ptr<DBClientCursor> cursor ); - + bool more(); BSONObj next(); - + BSONObj peek(); private: void _advance(); - + Matcher _matcher; auto_ptr<DBClientCursor> _cursor; - + BSONObj _next; bool _done; }; @@ -132,22 +133,22 @@ namespace mongo { class Servers { public: - Servers(){ + Servers() { } - - void add( const ServerAndQuery& s ){ + + void add( const ServerAndQuery& s ) { add( s._server , s._extra ); } - - void add( const string& server , const BSONObj& filter ){ + + void add( const string& server , const BSONObj& filter ) { vector<BSONObj>& mine = _filters[server]; mine.push_back( filter.getOwned() ); } - + // TOOO: pick a less horrible name class View { - View( const Servers* s ){ - for ( map<string, vector<BSONObj> >::const_iterator i=s->_filters.begin(); i!=s->_filters.end(); ++i ){ + View( const Servers* s ) { + for ( map<string, vector<BSONObj> >::const_iterator i=s->_filters.begin(); i!=s->_filters.end(); ++i ) { _servers.push_back( i->first ); _filters.push_back( i->second ); } @@ -164,7 +165,7 @@ namespace mongo { vector<BSONObj> getFilter( int n ) const { return _filters[ n ]; } - + private: vector<string> _servers; vector< vector<BSONObj> > _filters; @@ -175,7 +176,7 @@ namespace mongo { View view() const { return View( this ); } - + private: map<string, vector<BSONObj> > _filters; @@ -198,13 +199,13 @@ namespace mongo { protected: virtual void _explain( map< string,list<BSONObj> >& out ); - void _init(){} + void _init() {} vector<ServerAndQuery> _servers; unsigned _serverIndex; - + FilteringClientCursor _current; - + int _needToSkip; }; @@ -212,11 +213,11 @@ namespace mongo { /** * runs a query in parellel across N servers * sots - */ + */ class ParallelSortClusteredCursor : public ClusteredCursor { public: ParallelSortClusteredCursor( const set<ServerAndQuery>& servers , QueryMessage& q , const BSONObj& sortKey ); - ParallelSortClusteredCursor( const set<ServerAndQuery>& servers , const string& ns , + ParallelSortClusteredCursor( const set<ServerAndQuery>& servers , const string& ns , const Query& q , int options=0, const BSONObj& fields=BSONObj() ); virtual ~ParallelSortClusteredCursor(); virtual bool more(); @@ -231,7 +232,7 @@ namespace mongo { int _numServers; set<ServerAndQuery> _servers; BSONObj _sortKey; - + FilteringClientCursor * _cursors; int _needToSkip; }; @@ -245,11 +246,11 @@ namespace mongo { public: class CommandResult { public: - + string getServer() const { return _server; } bool isDone() const { return _done; } - + bool ok() const { assert( _done ); return _ok; @@ -265,30 +266,37 @@ namespace mongo { returns ok() */ bool join(); - + private: - - CommandResult( const string& server , const string& db , const BSONObj& cmd ); - + + CommandResult( const string& server , const string& db , const BSONObj& cmd , DBClientBase * conn ); + string _server; string _db; BSONObj _cmd; + DBClientBase * _conn; scoped_ptr<boost::thread> _thr; - + BSONObj _res; - bool _done; bool _ok; - + bool _done; + friend class Future; }; + + static void commandThread(shared_ptr<CommandResult> res); - static void commandThread( shared_ptr<CommandResult> res ); - - static shared_ptr<CommandResult> spawnCommand( const string& server , const string& db , const BSONObj& cmd ); + /** + * @param server server name + * @param db db name + * @param cmd cmd to exec + * @param conn optional connection to use. will use standard pooled if non-specified + */ + static shared_ptr<CommandResult> spawnCommand( const string& server , const string& db , const BSONObj& cmd , DBClientBase * conn = 0 ); }; - + } #include "undef_macros.h" diff --git a/client/redef_macros.h b/client/redef_macros.h index dd2e66f..a4cb1c9 100644 --- a/client/redef_macros.h +++ b/client/redef_macros.h @@ -50,6 +50,9 @@ #define RARELY MONGO_RARELY #define ONCE MONGO_ONCE +// util/log.h +#define LOG MONGO_LOG + #undef MONGO_MACROS_CLEANED #endif diff --git a/client/simple_client_demo.cpp b/client/simple_client_demo.cpp new file mode 100644 index 0000000..fa2f4a8 --- /dev/null +++ b/client/simple_client_demo.cpp @@ -0,0 +1,36 @@ +/* simple_client_demo.cpp + + See also : http://www.mongodb.org/pages/viewpage.action?pageId=133415 + + How to build and run: + + (1) Using the mongoclient: + g++ simple_client_demo.cpp -lmongoclient -lboost_thread-mt -lboost_filesystem -lboost_program_options + ./a.out + + (2) using client_lib.cpp: + g++ -I .. simple_client_demo.cpp mongo_client_lib.cpp -lboost_thread-mt -lboost_filesystem + ./a.out +*/ + +#include <iostream> +#include "dbclient.h" // the mongo c++ driver + +using namespace std; +using namespace mongo; +using namespace bson; + +int main() { + cout << "connecting to localhost..." << endl; + DBClientConnection c; + c.connect("localhost"); + cout << "connected ok" << endl; + unsigned long long count = c.count("test.foo"); + cout << "count of exiting documents in collection test.foo : " << count << endl; + + bo o = BSON( "hello" << "world" ); + c.insert("test.foo", o); + + return 0; +} + diff --git a/client/syncclusterconnection.cpp b/client/syncclusterconnection.cpp index 99f6067..4fafdc1 100644 --- a/client/syncclusterconnection.cpp +++ b/client/syncclusterconnection.cpp @@ -37,11 +37,11 @@ namespace mongo { for( list<HostAndPort>::const_iterator i = L.begin(); i != L.end(); i++ ) _connect( i->toString() ); } - + SyncClusterConnection::SyncClusterConnection( string commaSeperated ) : _mutex("SyncClusterConnection") { _address = commaSeperated; string::size_type idx; - while ( ( idx = commaSeperated.find( ',' ) ) != string::npos ){ + while ( ( idx = commaSeperated.find( ',' ) ) != string::npos ) { string h = commaSeperated.substr( 0 , idx ); commaSeperated = commaSeperated.substr( idx + 1 ); _connect( h ); @@ -50,7 +50,7 @@ namespace mongo { uassert( 8004 , "SyncClusterConnection needs 3 servers" , _conns.size() == 3 ); } - SyncClusterConnection::SyncClusterConnection( string a , string b , string c ) : _mutex("SyncClusterConnection") { + SyncClusterConnection::SyncClusterConnection( string a , string b , string c ) : _mutex("SyncClusterConnection") { _address = a + "," + b + "," + c; // connect to all even if not working _connect( a ); @@ -62,52 +62,55 @@ namespace mongo { assert(0); } - SyncClusterConnection::~SyncClusterConnection(){ + SyncClusterConnection::~SyncClusterConnection() { for ( size_t i=0; i<_conns.size(); i++ ) delete _conns[i]; _conns.clear(); } - bool SyncClusterConnection::prepare( string& errmsg ){ + bool SyncClusterConnection::prepare( string& errmsg ) { _lastErrors.clear(); return fsync( errmsg ); } - - bool SyncClusterConnection::fsync( string& errmsg ){ + + bool SyncClusterConnection::fsync( string& errmsg ) { bool ok = true; errmsg = ""; - for ( size_t i=0; i<_conns.size(); i++ ){ + for ( size_t i=0; i<_conns.size(); i++ ) { BSONObj res; try { if ( _conns[i]->simpleCommand( "admin" , 0 , "fsync" ) ) continue; } - catch ( std::exception& e ){ + catch ( DBException& e ) { + errmsg += e.toString(); + } + catch ( std::exception& e ) { errmsg += e.what(); } - catch ( ... ){ + catch ( ... ) { } ok = false; - errmsg += _conns[i]->toString() + ":" + res.toString(); + errmsg += " " + _conns[i]->toString() + ":" + res.toString(); } return ok; } - void SyncClusterConnection::_checkLast(){ + void SyncClusterConnection::_checkLast() { _lastErrors.clear(); vector<string> errors; - for ( size_t i=0; i<_conns.size(); i++ ){ + for ( size_t i=0; i<_conns.size(); i++ ) { BSONObj res; string err; try { if ( ! _conns[i]->runCommand( "admin" , BSON( "getlasterror" << 1 << "fsync" << 1 ) , res ) ) err = "cmd failed: "; } - catch ( std::exception& e ){ + catch ( std::exception& e ) { err += e.what(); } - catch ( ... ){ + catch ( ... ) { err += "unknown failure"; } _lastErrors.push_back( res.getOwned() ); @@ -115,13 +118,13 @@ namespace mongo { } assert( _lastErrors.size() == errors.size() && _lastErrors.size() == _conns.size() ); - + stringstream err; bool ok = true; - - for ( size_t i = 0; i<_conns.size(); i++ ){ + + for ( size_t i = 0; i<_conns.size(); i++ ) { BSONObj res = _lastErrors[i]; - if ( res["ok"].trueValue() && res["fsyncFiles"].numberInt() > 0 ) + if ( res["ok"].trueValue() && (res["fsyncFiles"].numberInt() > 0 || res.hasElement("waited"))) continue; ok = false; err << _conns[i]->toString() << ": " << res << " " << errors[i]; @@ -132,13 +135,13 @@ namespace mongo { throw UserException( 8001 , (string)"SyncClusterConnection write op failed: " + err.str() ); } - BSONObj SyncClusterConnection::getLastErrorDetailed(){ + BSONObj SyncClusterConnection::getLastErrorDetailed() { if ( _lastErrors.size() ) return _lastErrors[0]; return DBClientBase::getLastErrorDetailed(); } - void SyncClusterConnection::_connect( string host ){ + void SyncClusterConnection::_connect( string host ) { log() << "SyncClusterConnection connecting to [" << host << "]" << endl; DBClientConnection * c = new DBClientConnection( true ); string errmsg; @@ -148,40 +151,42 @@ namespace mongo { _conns.push_back( c ); } - bool SyncClusterConnection::callRead( Message& toSend , Message& response ){ + bool SyncClusterConnection::callRead( Message& toSend , Message& response ) { // TODO: need to save state of which one to go back to somehow... return _conns[0]->callRead( toSend , response ); } BSONObj SyncClusterConnection::findOne(const string &ns, const Query& query, const BSONObj *fieldsToReturn, int queryOptions) { - - if ( ns.find( ".$cmd" ) != string::npos ){ + + if ( ns.find( ".$cmd" ) != string::npos ) { string cmdName = query.obj.firstElement().fieldName(); int lockType = _lockType( cmdName ); - if ( lockType > 0 ){ // write $cmd + if ( lockType > 0 ) { // write $cmd string errmsg; if ( ! prepare( errmsg ) ) throw UserException( 13104 , (string)"SyncClusterConnection::findOne prepare failed: " + errmsg ); - + vector<BSONObj> all; - for ( size_t i=0; i<_conns.size(); i++ ){ + for ( size_t i=0; i<_conns.size(); i++ ) { all.push_back( _conns[i]->findOne( ns , query , 0 , queryOptions ).getOwned() ); } - + _checkLast(); - for ( size_t i=0; i<all.size(); i++ ){ + for ( size_t i=0; i<all.size(); i++ ) { BSONObj temp = all[i]; if ( isOk( temp ) ) continue; stringstream ss; - ss << "write $cmd failed on a shard: " << temp.jsonString(); + ss << "write $cmd failed on a node: " << temp.jsonString(); ss << " " << _conns[i]->toString(); + ss << " ns: " << ns; + ss << " cmd: " << query.toString(); throw UserException( 13105 , ss.str() ); } - + return all[0]; } } @@ -191,9 +196,9 @@ namespace mongo { auto_ptr<DBClientCursor> SyncClusterConnection::query(const string &ns, Query query, int nToReturn, int nToSkip, - const BSONObj *fieldsToReturn, int queryOptions, int batchSize ){ + const BSONObj *fieldsToReturn, int queryOptions, int batchSize ) { _lastErrors.clear(); - if ( ns.find( ".$cmd" ) != string::npos ){ + if ( ns.find( ".$cmd" ) != string::npos ) { string cmdName = query.obj.firstElement().fieldName(); int lockType = _lockType( cmdName ); uassert( 13054 , (string)"write $cmd not supported in SyncClusterConnection::query for:" + cmdName , lockType <= 0 ); @@ -202,7 +207,7 @@ namespace mongo { return _queryOnActive( ns , query , nToReturn , nToSkip , fieldsToReturn , queryOptions , batchSize ); } - bool SyncClusterConnection::_commandOnActive(const string &dbname, const BSONObj& cmd, BSONObj &info, int options ){ + bool SyncClusterConnection::_commandOnActive(const string &dbname, const BSONObj& cmd, BSONObj &info, int options ) { auto_ptr<DBClientCursor> cursor = _queryOnActive( dbname + ".$cmd" , cmd , 1 , 0 , 0 , options , 0 ); if ( cursor->more() ) info = cursor->next().copy(); @@ -210,153 +215,164 @@ namespace mongo { info = BSONObj(); return isOk( info ); } - + auto_ptr<DBClientCursor> SyncClusterConnection::_queryOnActive(const string &ns, Query query, int nToReturn, int nToSkip, - const BSONObj *fieldsToReturn, int queryOptions, int batchSize ){ - - for ( size_t i=0; i<_conns.size(); i++ ){ + const BSONObj *fieldsToReturn, int queryOptions, int batchSize ) { + + for ( size_t i=0; i<_conns.size(); i++ ) { try { - auto_ptr<DBClientCursor> cursor = + auto_ptr<DBClientCursor> cursor = _conns[i]->query( ns , query , nToReturn , nToSkip , fieldsToReturn , queryOptions , batchSize ); if ( cursor.get() ) return cursor; log() << "query failed to: " << _conns[i]->toString() << " no data" << endl; } - catch ( ... ){ + catch ( ... ) { log() << "query failed to: " << _conns[i]->toString() << " exception" << endl; } } throw UserException( 8002 , "all servers down!" ); } - - auto_ptr<DBClientCursor> SyncClusterConnection::getMore( const string &ns, long long cursorId, int nToReturn, int options ){ - uassert( 10022 , "SyncClusterConnection::getMore not supported yet" , 0); + + auto_ptr<DBClientCursor> SyncClusterConnection::getMore( const string &ns, long long cursorId, int nToReturn, int options ) { + uassert( 10022 , "SyncClusterConnection::getMore not supported yet" , 0); auto_ptr<DBClientCursor> c; return c; } - - void SyncClusterConnection::insert( const string &ns, BSONObj obj ){ - uassert( 13119 , (string)"SyncClusterConnection::insert obj has to have an _id: " + obj.jsonString() , + void SyncClusterConnection::insert( const string &ns, BSONObj obj ) { + + uassert( 13119 , (string)"SyncClusterConnection::insert obj has to have an _id: " + obj.jsonString() , ns.find( ".system.indexes" ) != string::npos || obj["_id"].type() ); - + string errmsg; if ( ! prepare( errmsg ) ) throw UserException( 8003 , (string)"SyncClusterConnection::insert prepare failed: " + errmsg ); - for ( size_t i=0; i<_conns.size(); i++ ){ + for ( size_t i=0; i<_conns.size(); i++ ) { _conns[i]->insert( ns , obj ); } - + _checkLast(); } - - void SyncClusterConnection::insert( const string &ns, const vector< BSONObj >& v ){ - uassert( 10023 , "SyncClusterConnection bulk insert not implemented" , 0); + + void SyncClusterConnection::insert( const string &ns, const vector< BSONObj >& v ) { + uassert( 10023 , "SyncClusterConnection bulk insert not implemented" , 0); } - void SyncClusterConnection::remove( const string &ns , Query query, bool justOne ){ + void SyncClusterConnection::remove( const string &ns , Query query, bool justOne ) { string errmsg; if ( ! prepare( errmsg ) ) throw UserException( 8020 , (string)"SyncClusterConnection::remove prepare failed: " + errmsg ); - - for ( size_t i=0; i<_conns.size(); i++ ){ + + for ( size_t i=0; i<_conns.size(); i++ ) { _conns[i]->remove( ns , query , justOne ); } - + _checkLast(); } - void SyncClusterConnection::update( const string &ns , Query query , BSONObj obj , bool upsert , bool multi ){ + void SyncClusterConnection::update( const string &ns , Query query , BSONObj obj , bool upsert , bool multi ) { - if ( upsert ){ + if ( upsert ) { uassert( 13120 , "SyncClusterConnection::update upsert query needs _id" , query.obj["_id"].type() ); } - if ( _writeConcern ){ + if ( _writeConcern ) { string errmsg; if ( ! prepare( errmsg ) ) throw UserException( 8005 , (string)"SyncClusterConnection::udpate prepare failed: " + errmsg ); } - for ( size_t i=0; i<_conns.size(); i++ ){ + for ( size_t i=0; i<_conns.size(); i++ ) { try { _conns[i]->update( ns , query , obj , upsert , multi ); } - catch ( std::exception& e ){ + catch ( std::exception& e ) { if ( _writeConcern ) throw e; } } - - if ( _writeConcern ){ + + if ( _writeConcern ) { _checkLast(); assert( _lastErrors.size() > 1 ); - + int a = _lastErrors[0]["n"].numberInt(); - for ( unsigned i=1; i<_lastErrors.size(); i++ ){ + for ( unsigned i=1; i<_lastErrors.size(); i++ ) { int b = _lastErrors[i]["n"].numberInt(); if ( a == b ) continue; - - throw UpdateNotTheSame( 8017 , "update not consistent" , _connAddresses , _lastErrors ); + + throw UpdateNotTheSame( 8017 , + str::stream() + << "update not consistent " + << " ns: " << ns + << " query: " << query.toString() + << " update: " << obj + << " gle1: " << _lastErrors[0] + << " gle2: " << _lastErrors[i] , + _connAddresses , _lastErrors ); } } } - string SyncClusterConnection::_toString() const { + string SyncClusterConnection::_toString() const { stringstream ss; ss << "SyncClusterConnection [" << _address << "]"; return ss.str(); } - bool SyncClusterConnection::call( Message &toSend, Message &response, bool assertOk ){ - uassert( 8006 , "SyncClusterConnection::call can only be used directly for dbQuery" , + bool SyncClusterConnection::call( Message &toSend, Message &response, bool assertOk , string * actualServer ) { + uassert( 8006 , "SyncClusterConnection::call can only be used directly for dbQuery" , toSend.operation() == dbQuery ); - + DbMessage d( toSend ); uassert( 8007 , "SyncClusterConnection::call can't handle $cmd" , strstr( d.getns(), "$cmd" ) == 0 ); - for ( size_t i=0; i<_conns.size(); i++ ){ + for ( size_t i=0; i<_conns.size(); i++ ) { try { bool ok = _conns[i]->call( toSend , response , assertOk ); - if ( ok ) + if ( ok ) { + if ( actualServer ) + *actualServer = _connAddresses[i]; return ok; + } log() << "call failed to: " << _conns[i]->toString() << " no data" << endl; } - catch ( ... ){ + catch ( ... ) { log() << "call failed to: " << _conns[i]->toString() << " exception" << endl; } } throw UserException( 8008 , "all servers down!" ); } - - void SyncClusterConnection::say( Message &toSend ){ + + void SyncClusterConnection::say( Message &toSend ) { string errmsg; if ( ! prepare( errmsg ) ) throw UserException( 13397 , (string)"SyncClusterConnection::say prepare failed: " + errmsg ); - for ( size_t i=0; i<_conns.size(); i++ ){ + for ( size_t i=0; i<_conns.size(); i++ ) { _conns[i]->say( toSend ); } - + _checkLast(); } - - void SyncClusterConnection::sayPiggyBack( Message &toSend ){ + + void SyncClusterConnection::sayPiggyBack( Message &toSend ) { assert(0); } - int SyncClusterConnection::_lockType( const string& name ){ + int SyncClusterConnection::_lockType( const string& name ) { { scoped_lock lk(_mutex); map<string,int>::iterator i = _lockTypes.find( name ); if ( i != _lockTypes.end() ) return i->second; } - + BSONObj info; - uassert( 13053 , "help failed" , _commandOnActive( "admin" , BSON( name << "1" << "help" << 1 ) , info ) ); + uassert( 13053 , str::stream() << "help failed: " << info , _commandOnActive( "admin" , BSON( name << "1" << "help" << 1 ) , info ) ); int lockType = info["lockType"].numberInt(); @@ -365,20 +381,9 @@ namespace mongo { return lockType; } - void SyncClusterConnection::killCursor( long long cursorID ){ + void SyncClusterConnection::killCursor( long long cursorID ) { // should never need to do this assert(0); } - bool SyncClusterConnection::isMember( const DBConnector * conn ) const { - if ( conn == this ) - return true; - - for ( unsigned i=0; i<_conns.size(); i++ ) - if ( _conns[i]->isMember( conn ) ) - return true; - - return false; - } - } diff --git a/client/syncclusterconnection.h b/client/syncclusterconnection.h index 4292e3d..c946073 100644 --- a/client/syncclusterconnection.h +++ b/client/syncclusterconnection.h @@ -16,6 +16,7 @@ * limitations under the License. */ +#pragma once #include "../pch.h" #include "dbclient.h" @@ -26,15 +27,15 @@ namespace mongo { /** * This is a connection to a cluster of servers that operate as one * for super high durability. - * + * * Write operations are two-phase. First, all nodes are asked to fsync. If successful - * everywhere, the write is sent everywhere and then followed by an fsync. There is no - * rollback if a problem occurs during the second phase. Naturally, with all these fsyncs, + * everywhere, the write is sent everywhere and then followed by an fsync. There is no + * rollback if a problem occurs during the second phase. Naturally, with all these fsyncs, * these operations will be quite slow -- use sparingly. - * + * * Read operations are sent to a single random node. - * - * The class checks if a command is read or write style, and sends to a single + * + * The class checks if a command is read or write style, and sends to a single * node if a read lock command and to all in two phases with a write style command. */ class SyncClusterConnection : public DBClientBase { @@ -46,7 +47,7 @@ namespace mongo { SyncClusterConnection( string commaSeparated ); SyncClusterConnection( string a , string b , string c ); ~SyncClusterConnection(); - + /** * @return true if all servers are up and ready for writes */ @@ -65,36 +66,34 @@ namespace mongo { const BSONObj *fieldsToReturn, int queryOptions, int batchSize ); virtual auto_ptr<DBClientCursor> getMore( const string &ns, long long cursorId, int nToReturn, int options ); - + virtual void insert( const string &ns, BSONObj obj ); - + virtual void insert( const string &ns, const vector< BSONObj >& v ); virtual void remove( const string &ns , Query query, bool justOne ); virtual void update( const string &ns , Query query , BSONObj obj , bool upsert , bool multi ); - virtual bool call( Message &toSend, Message &response, bool assertOk ); + virtual bool call( Message &toSend, Message &response, bool assertOk , string * actualServer ); virtual void say( Message &toSend ); virtual void sayPiggyBack( Message &toSend ); virtual void killCursor( long long cursorID ); - + virtual string getServerAddress() const { return _address; } virtual bool isFailed() const { return false; } virtual string toString() { return _toString(); } - virtual BSONObj getLastErrorDetailed(); + virtual BSONObj getLastErrorDetailed(); virtual bool callRead( Message& toSend , Message& response ); - virtual ConnectionString::ConnectionType type() const { return ConnectionString::SYNC; } - - virtual bool isMember( const DBConnector * conn ) const; + virtual ConnectionString::ConnectionType type() const { return ConnectionString::SYNC; } private: SyncClusterConnection( SyncClusterConnection& prev ); - string _toString() const; + string _toString() const; bool _commandOnActive(const string &dbname, const BSONObj& cmd, BSONObj &info, int options=0); auto_ptr<DBClientCursor> _queryOnActive(const string &ns, Query query, int nToReturn, int nToSkip, const BSONObj *fieldsToReturn, int queryOptions, int batchSize ); @@ -107,17 +106,17 @@ namespace mongo { vector<DBClientConnection*> _conns; map<string,int> _lockTypes; mongo::mutex _mutex; - + vector<BSONObj> _lastErrors; }; - + class UpdateNotTheSame : public UserException { public: UpdateNotTheSame( int code , const string& msg , const vector<string>& addrs , const vector<BSONObj>& lastErrors ) - : UserException( code , msg ) , _addrs( addrs ) , _lastErrors( lastErrors ){ + : UserException( code , msg ) , _addrs( addrs ) , _lastErrors( lastErrors ) { assert( _addrs.size() == _lastErrors.size() ); } - + virtual ~UpdateNotTheSame() throw() { } @@ -134,7 +133,7 @@ namespace mongo { vector<string> _addrs; vector<BSONObj> _lastErrors; }; - + }; #include "undef_macros.h" diff --git a/client/undef_macros.h b/client/undef_macros.h index cce8692..bc59a84 100644 --- a/client/undef_macros.h +++ b/client/undef_macros.h @@ -54,5 +54,8 @@ #undef RARELY #undef ONCE +// util/log.h +#undef LOG + #define MONGO_MACROS_CLEANED #endif |