From 7645618fd3914cb8a20561625913c20d49504a49 Mon Sep 17 00:00:00 2001 From: Antonin Kral Date: Wed, 11 Aug 2010 12:38:57 +0200 Subject: Imported Upstream version 1.6.0 --- client/clientOnly.cpp | 18 +- client/connpool.cpp | 171 +++++++--- client/connpool.h | 88 ++++- client/constants.h | 26 ++ client/dbclient.cpp | 722 +++++++++++++++++++++------------------ client/dbclient.h | 543 ++++++++++++++++------------- client/dbclientcursor.cpp | 232 +++++++++++++ client/dbclientcursor.h | 204 +++++++++++ client/distlock.cpp | 225 ++++++++++++ client/distlock.h | 91 +++++ client/distlock_test.cpp | 80 +++++ client/examples/clientTest.cpp | 21 +- client/examples/tail.cpp | 35 +- client/gridfs.cpp | 51 +-- client/gridfs.h | 32 +- client/model.cpp | 53 ++- client/model.h | 7 +- client/parallel.cpp | 316 ++++++++++++++--- client/parallel.h | 176 ++++++++-- client/redef_macros.h | 55 +++ client/syncclusterconnection.cpp | 179 ++++++++-- client/syncclusterconnection.h | 84 +++-- client/undef_macros.h | 58 ++++ 23 files changed, 2618 insertions(+), 849 deletions(-) create mode 100644 client/constants.h create mode 100644 client/dbclientcursor.cpp create mode 100644 client/dbclientcursor.h create mode 100644 client/distlock.cpp create mode 100644 client/distlock.h create mode 100644 client/distlock_test.cpp create mode 100644 client/redef_macros.h create mode 100644 client/undef_macros.h (limited to 'client') diff --git a/client/clientOnly.cpp b/client/clientOnly.cpp index 566095a..6178257 100644 --- a/client/clientOnly.cpp +++ b/client/clientOnly.cpp @@ -15,10 +15,11 @@ * limitations under the License. */ -#include "stdafx.h" +#include "pch.h" #include "../client/dbclient.h" #include "../db/dbhelpers.h" #include "../db/cmdline.h" +#include "../s/shard.h" namespace mongo { @@ -57,10 +58,15 @@ namespace mongo { uassert( 10256 , "no createDirectClient in clientOnly" , 0 ); return 0; } -/* - auto_ptr Helpers::find( const char *ns , BSONObj query , bool requireIndex ){ - uassert( 10000 , "Helpers::find can't be used in client" , 0 ); - return auto_ptr(0); + + void Shard::getAllShards( vector& all ){ + assert(0); + } + + bool Shard::isAShard( const string& ident ){ + assert(0); + return false; } -*/ + + } diff --git a/client/connpool.cpp b/client/connpool.cpp index 5a08483..dae13f6 100644 --- a/client/connpool.cpp +++ b/client/connpool.cpp @@ -18,76 +18,99 @@ // _ todo: reconnect? -#include "stdafx.h" +#include "pch.h" #include "connpool.h" #include "../db/commands.h" #include "syncclusterconnection.h" +#include "../s/shard.h" namespace mongo { DBConnectionPool pool; + DBClientBase* DBConnectionPool::_get(const string& ident) { + scoped_lock L(_mutex); + + PoolForHost& p = _pools[ident]; + if ( p.pool.empty() ) + return 0; + + DBClientBase *c = p.pool.top(); + p.pool.pop(); + return c; + } + + DBClientBase* DBConnectionPool::_finishCreate( const string& host , DBClientBase* conn ){ + { + scoped_lock L(_mutex); + PoolForHost& p = _pools[host]; + p.created++; + } + + onCreate( conn ); + onHandedOut( conn ); + + return conn; + } + + DBClientBase* DBConnectionPool::get(const ConnectionString& url) { + DBClientBase * c = _get( url.toString() ); + if ( c ){ + onHandedOut( c ); + return c; + } + + string errmsg; + c = url.connect( errmsg ); + uassert( 13328 , (string)"dbconnectionpool: connect failed " + url.toString() + " : " + errmsg , c ); + + return _finishCreate( url.toString() , c ); + } + DBClientBase* DBConnectionPool::get(const string& host) { - scoped_lock L(poolMutex); + DBClientBase * c = _get( host ); + if ( c ){ + onHandedOut( c ); + return c; + } - PoolForHost *&p = pools[host]; - if ( p == 0 ) - p = new PoolForHost(); - if ( p->pool.empty() ) { - int numCommas = DBClientBase::countCommas( host ); - DBClientBase *c; - - if( numCommas == 0 ) { - DBClientConnection *cc = new DBClientConnection(true); - log(2) << "creating new connection for pool to:" << host << endl; - string errmsg; - if ( !cc->connect(host.c_str(), errmsg) ) { - delete cc; - uassert( 11002 , (string)"dbconnectionpool: connect failed " + host , false); - return 0; - } - c = cc; - onCreate( c ); - } - else if ( numCommas == 1 ) { - DBClientPaired *p = new DBClientPaired(); - if( !p->connect(host) ) { - delete p; - uassert( 11003 , (string)"dbconnectionpool: connect failed [2] " + host , false); - return 0; - } - c = p; - } - else if ( numCommas == 2 ) { - c = new SyncClusterConnection( host ); - } - else { - uassert( 13071 , (string)"invalid hostname [" + host + "]" , 0 ); + string errmsg; + ConnectionString cs = ConnectionString::parse( host , errmsg ); + uassert( 13071 , (string)"invalid hostname [" + host + "]" + errmsg , cs.isValid() ); + + c = cs.connect( errmsg ); + uassert( 11002 , (string)"dbconnectionpool: connect failed " + host + " : " + errmsg , c ); + return _finishCreate( host , c ); + } + + DBConnectionPool::~DBConnectionPool(){ + for ( map::iterator i = _pools.begin(); i != _pools.end(); i++ ){ + PoolForHost& p = i->second; + + while ( ! p.pool.empty() ){ + DBClientBase * c = p.pool.top(); + delete c; + p.pool.pop(); } - return c; } - DBClientBase *c = p->pool.top(); - p->pool.pop(); - onHandedOut( c ); - return c; } void DBConnectionPool::flush(){ - scoped_lock L(poolMutex); - for ( map::iterator i = pools.begin(); i != pools.end(); i++ ){ - PoolForHost* p = i->second; + scoped_lock L(_mutex); + for ( map::iterator i = _pools.begin(); i != _pools.end(); i++ ){ + PoolForHost& p = i->second; vector all; - while ( ! p->pool.empty() ){ - DBClientBase * c = p->pool.top(); - p->pool.pop(); + while ( ! p.pool.empty() ){ + DBClientBase * c = p.pool.top(); + p.pool.pop(); all.push_back( c ); bool res; c->isMaster( res ); } for ( vector::iterator i=all.begin(); i != all.end(); i++ ){ - p->pool.push( *i ); + p.pool.push( *i ); } } } @@ -114,6 +137,26 @@ namespace mongo { } } + void DBConnectionPool::appendInfo( BSONObjBuilder& b ){ + scoped_lock lk( _mutex ); + BSONObjBuilder bb( b.subobjStart( "hosts" ) ); + for ( map::iterator i=_pools.begin(); i!=_pools.end(); ++i ){ + string s = i->first; + BSONObjBuilder temp( bb.subobjStart( s.c_str() ) ); + temp.append( "available" , (int)(i->second.pool.size()) ); + temp.appendNumber( "created" , i->second.created ); + temp.done(); + } + bb.done(); + } + + ScopedDbConnection * ScopedDbConnection::steal(){ + assert( _conn ); + ScopedDbConnection * n = new ScopedDbConnection( _host , _conn ); + _conn = 0; + return n; + } + ScopedDbConnection::~ScopedDbConnection() { if ( _conn ){ if ( ! _conn->isFailed() ) { @@ -124,20 +167,44 @@ namespace mongo { } } + ScopedDbConnection::ScopedDbConnection(const Shard& shard ) + : _host( shard.getConnString() ) , _conn( pool.get(_host) ){ + } + + ScopedDbConnection::ScopedDbConnection(const Shard* shard ) + : _host( shard->getConnString() ) , _conn( pool.get(_host) ){ + } + class PoolFlushCmd : public Command { public: - PoolFlushCmd() : Command( "connpoolsync" ){} - virtual LockType locktype(){ return NONE; } - virtual bool run(const char*, mongo::BSONObj&, std::string&, mongo::BSONObjBuilder& result, bool){ + PoolFlushCmd() : Command( "connPoolSync" , false , "connpoolsync" ){} + virtual void help( stringstream &help ) const { help<<"internal"; } + virtual LockType locktype() const { return NONE; } + virtual bool run(const string&, mongo::BSONObj&, std::string&, mongo::BSONObjBuilder& result, bool){ pool.flush(); - result << "ok" << 1; return true; } - virtual bool slaveOk(){ + virtual bool slaveOk() const { return true; } } poolFlushCmd; + class PoolStats : public Command { + public: + PoolStats() : Command( "connPoolStats" ){} + virtual void help( stringstream &help ) const { help<<"stats about connection pool"; } + virtual LockType locktype() const { return NONE; } + virtual bool run(const string&, mongo::BSONObj&, std::string&, mongo::BSONObjBuilder& result, bool){ + pool.appendInfo( result ); + return true; + } + virtual bool slaveOk() const { + return true; + } + + } poolStatsCmd; + + } // namespace mongo diff --git a/client/connpool.h b/client/connpool.h index b44ff51..00570c5 100644 --- a/client/connpool.h +++ b/client/connpool.h @@ -19,20 +19,30 @@ #include #include "dbclient.h" +#include "redef_macros.h" namespace mongo { + class Shard; + struct PoolForHost { + PoolForHost() + : created(0){} + PoolForHost( const PoolForHost& other ){ + assert(other.pool.size() == 0); + created = other.created; + assert( created == 0 ); + } + std::stack pool; + long long created; }; class DBConnectionHook { public: virtual ~DBConnectionHook(){} - virtual void onCreate( DBClientBase * conn ){} virtual void onHandedOut( DBClientBase * conn ){} - }; /** Database connection pool. @@ -51,33 +61,54 @@ namespace mongo { } */ class DBConnectionPool { - mongo::mutex poolMutex; - map pools; // servername -> pool + mongo::mutex _mutex; + map _pools; // servername -> pool list _hooks; + + DBClientBase* _get( const string& ident ); + DBClientBase* _finishCreate( const string& ident , DBClientBase* conn ); + + public: + DBConnectionPool() : _mutex("DBConnectionPool") { } + ~DBConnectionPool(); + + void onCreate( DBClientBase * conn ); void onHandedOut( DBClientBase * conn ); - public: + void flush(); + DBClientBase *get(const string& host); + DBClientBase *get(const ConnectionString& host); + void release(const string& host, DBClientBase *c) { if ( c->isFailed() ){ delete c; return; } - scoped_lock L(poolMutex); - pools[host]->pool.push(c); + scoped_lock L(_mutex); + _pools[host].pool.push(c); } void addHook( DBConnectionHook * hook ); + void appendInfo( BSONObjBuilder& b ); }; - + extern DBConnectionPool pool; + class AScopedConnection : boost::noncopyable { + public: + virtual ~AScopedConnection(){} + virtual DBClientBase* get() = 0; + virtual void done() = 0; + virtual string getHost() const = 0; + }; + /** Use to get a connection from the pool. On exceptions things clean up nicely. */ - class ScopedDbConnection { - const string host; + class ScopedDbConnection : public AScopedConnection { + const string _host; DBClientBase *_conn; public: /** get the associated connection object */ @@ -85,19 +116,42 @@ namespace mongo { uassert( 11004 , "did you call done already" , _conn ); return _conn; } - + /** get the associated connection object */ DBClientBase& conn() { uassert( 11005 , "did you call done already" , _conn ); return *_conn; } + /** get the associated connection object */ + DBClientBase* get() { + uassert( 13102 , "did you call done already" , _conn ); + return _conn; + } + + ScopedDbConnection() + : _host( "" ) , _conn(0) { + } + /** throws UserException if can't connect */ - ScopedDbConnection(const string& _host) : - host(_host), _conn( pool.get(_host) ) { - //cout << " for: " << _host << " got conn: " << _conn << endl; + ScopedDbConnection(const string& host) + : _host(host), _conn( pool.get(host) ) { + } + + ScopedDbConnection(const string& host, DBClientBase* conn ) + : _host( host ) , _conn( conn ){ + } + + ScopedDbConnection(const Shard& shard ); + ScopedDbConnection(const Shard* shard ); + + ScopedDbConnection(const ConnectionString& url ) + : _host(url.toString()), _conn( pool.get(url) ) { } + + string getHost() const { return _host; } + /** Force closure of the connection. You should call this if you leave it in a bad state. Destructor will do this too, but it is verbose. */ @@ -121,12 +175,16 @@ namespace mongo { kill(); else */ - pool.release(host, _conn); + pool.release(_host, _conn); _conn = 0; } + ScopedDbConnection * steal(); + ~ScopedDbConnection(); }; } // namespace mongo + +#include "undef_macros.h" diff --git a/client/constants.h b/client/constants.h new file mode 100644 index 0000000..66aa9b1 --- /dev/null +++ b/client/constants.h @@ -0,0 +1,26 @@ +// constants.h + +#pragma once + +namespace mongo { + + /* query results include a 32 result flag word consisting of these bits */ + enum ResultFlagType { + /* returned, with zero results, when getMore is called but the cursor id + is not valid at the server. */ + ResultFlag_CursorNotFound = 1, + + /* { $err : ... } is being returned */ + ResultFlag_ErrSet = 2, + + /* Have to update config from the server, usually $err is also set */ + ResultFlag_ShardConfigStale = 4, + + /* for backward compatability: this let's us know the server supports + the QueryOption_AwaitData option. if it doesn't, a repl slave client should sleep + a little between getMore's. + */ + ResultFlag_AwaitCapable = 8 + }; + +} diff --git a/client/dbclient.cpp b/client/dbclient.cpp index f617f7c..04b6147 100644 --- a/client/dbclient.cpp +++ b/client/dbclient.cpp @@ -15,22 +15,89 @@ * limitations under the License. */ -#include "stdafx.h" +#include "pch.h" #include "../db/pdfile.h" #include "dbclient.h" -#include "../util/builder.h" +#include "../bson/util/builder.h" #include "../db/jsobj.h" #include "../db/json.h" #include "../db/instance.h" #include "../util/md5.hpp" #include "../db/dbmessage.h" #include "../db/cmdline.h" +#include "connpool.h" +#include "../s/util.h" +#include "syncclusterconnection.h" namespace mongo { + DBClientBase* ConnectionString::connect( string& errmsg ) const { + switch ( _type ){ + case MASTER: { + DBClientConnection * c = new DBClientConnection(true); + log(1) << "creating new connection to:" << _servers[0] << endl; + if ( ! c->connect( _servers[0] , errmsg ) ) { + delete c; + return 0; + } + return c; + } + + case PAIR: + case SET: { + DBClientReplicaSet * set = new DBClientReplicaSet( _setName , _servers ); + if( ! set->connect() ){ + delete set; + errmsg = "connect failed to set "; + errmsg += toString(); + return 0; + } + return set; + } + + case SYNC: { + // TODO , don't copy + list l; + for ( unsigned i=0; i<_servers.size(); i++ ) + l.push_back( _servers[i] ); + return new SyncClusterConnection( l ); + } + + case INVALID: + throw UserException( 13421 , "trying to connect to invalid ConnectionString" ); + break; + } + + assert( 0 ); + return 0; + } + + ConnectionString ConnectionString::parse( const string& host , string& errmsg ){ + + string::size_type i = host.find( '/' ); + if ( i != string::npos ){ + // replica set + return ConnectionString( SET , host.substr( i + 1 ) , host.substr( 0 , i ) ); + } + + int numCommas = DBClientBase::countCommas( host ); + + if( numCommas == 0 ) + return ConnectionString( HostAndPort( host ) ); + + if ( numCommas == 1 ) + return ConnectionString( PAIR , host ); + + if ( numCommas == 2 ) + return ConnectionString( SYNC , host ); + + errmsg = (string)"invalid hostname [" + host + "]"; + return ConnectionString(); // INVALID + } + Query& Query::where(const string &jscode, BSONObj scope) { /* use where() before sort() and hint() and explain(), else this will assert. */ - assert( !obj.hasField("query") ); + assert( ! isComplex() ); BSONObjBuilder b; b.appendElements(obj); b.appendWhere(jscode, scope); @@ -39,7 +106,7 @@ namespace mongo { } void Query::makeComplex() { - if ( obj.hasElement( "query" ) ) + if ( isComplex() ) return; BSONObjBuilder b; b.append( "query", obj ); @@ -76,19 +143,36 @@ namespace mongo { return *this; } - bool Query::isComplex() const{ - return obj.hasElement( "query" ); + bool Query::isComplex( bool * hasDollar ) const{ + if ( obj.hasElement( "query" ) ){ + if ( hasDollar ) + hasDollar[0] = false; + return true; + } + + if ( obj.hasElement( "$query" ) ){ + if ( hasDollar ) + hasDollar[0] = true; + return true; + } + + return false; } BSONObj Query::getFilter() const { - if ( ! isComplex() ) + bool hasDollar; + if ( ! isComplex( &hasDollar ) ) return obj; - return obj.getObjectField( "query" ); + + return obj.getObjectField( hasDollar ? "$query" : "query" ); } BSONObj Query::getSort() const { if ( ! isComplex() ) return BSONObj(); - return obj.getObjectField( "orderby" ); + BSONObj ret = obj.getObjectField( "orderby" ); + if (ret.isEmpty()) + ret = obj.getObjectField( "$orderby" ); + return ret; } BSONObj Query::getHint() const { if ( ! isComplex() ) @@ -109,6 +193,17 @@ namespace mongo { return o["ok"].trueValue(); } + enum QueryOptions DBClientWithCommands::availableOptions() { + if ( !_haveCachedAvailableOptions ) { + BSONObj ret; + if ( runCommand( "admin", BSON( "availablequeryoptions" << 1 ), ret ) ) { + _cachedAvailableOptions = ( enum QueryOptions )( ret.getIntField( "options" ) ); + } + _haveCachedAvailableOptions = true; + } + return _cachedAvailableOptions; + } + inline bool DBClientWithCommands::runCommand(const string &dbname, const BSONObj& cmd, BSONObj &info, int options) { string ns = dbname + ".$cmd"; info = findOne(ns, cmd, 0 , options); @@ -133,7 +228,7 @@ namespace mongo { BSONObj res; if( !runCommand(ns.db.c_str(), cmd, res, options) ) uasserted(11010,string("count fails:") + res.toString()); - return res.getIntField("n"); + return res["n"].numberLong(); } BSONObj getlasterrorcmdobj = fromjson("{getlasterror:1}"); @@ -146,10 +241,14 @@ namespace mongo { string DBClientWithCommands::getLastError() { BSONObj info = getLastErrorDetailed(); + return getLastErrorString( info ); + } + + string DBClientWithCommands::getLastErrorString( const BSONObj& info ){ BSONElement e = info["err"]; if( e.eoo() ) return ""; if( e.type() == Object ) return e.toString(); - return e.str(); + return e.str(); } BSONObj getpreverrorcmdobj = fromjson("{getpreverror:1}"); @@ -223,13 +322,14 @@ namespace mongo { bool DBClientWithCommands::isMaster(bool& isMaster, BSONObj *info) { BSONObj o; - if ( info == 0 ) info = &o; + if ( info == 0 ) + info = &o; bool ok = runCommand("admin", ismastercmdobj, *info); - isMaster = (info->getIntField("ismaster") == 1); + isMaster = info->getField("ismaster").trueValue(); return ok; } - bool DBClientWithCommands::createCollection(const string &ns, unsigned size, bool capped, int max, BSONObj *info) { + bool DBClientWithCommands::createCollection(const string &ns, long long size, bool capped, int max, BSONObj *info) { BSONObj o; if ( info == 0 ) info = &o; BSONObjBuilder b; @@ -346,64 +446,9 @@ namespace mongo { string db = nsGetDB( ns ) + ".system.namespaces"; BSONObj q = BSON( "name" << ns ); - return count( db.c_str() , q ); - } - - - void testSort() { - DBClientConnection c; - string err; - if ( !c.connect("localhost", err) ) { - out() << "can't connect to server " << err << endl; - return; - } - - cout << "findOne returns:" << endl; - cout << c.findOne("test.foo", QUERY( "x" << 3 ) ).toString() << endl; - cout << c.findOne("test.foo", QUERY( "x" << 3 ).sort("name") ).toString() << endl; - - } - - /* TODO: unit tests should run this? */ - void testDbEval() { - DBClientConnection c; - string err; - if ( !c.connect("localhost", err) ) { - out() << "can't connect to server " << err << endl; - return; - } - - if( !c.auth("dwight", "u", "p", err) ) { - out() << "can't authenticate " << err << endl; - return; - } - - BSONObj info; - BSONElement retValue; - BSONObjBuilder b; - b.append("0", 99); - BSONObj args = b.done(); - bool ok = c.eval("dwight", "function() { return args[0]; }", info, retValue, &args); - out() << "eval ok=" << ok << endl; - out() << "retvalue=" << retValue.toString() << endl; - out() << "info=" << info.toString() << endl; - - out() << endl; - - int x = 3; - assert( c.eval("dwight", "function() { return 3; }", x) ); - - out() << "***\n"; - - BSONObj foo = fromjson("{\"x\":7}"); - out() << foo.toString() << endl; - int res=0; - ok = c.eval("dwight", "function(parm1) { return parm1.x; }", foo, res); - out() << ok << " retval:" << res << endl; + return count( db.c_str() , q ) != 0; } - void testPaired(); - /* --- dbclientconnection --- */ bool DBClientConnection::auth(const string &dbname, const string &username, const string &password_text, string& errmsg, bool digestPassword) { @@ -422,48 +467,42 @@ namespace mongo { return DBClientBase::auth(dbname, username, password.c_str(), errmsg, false); } - BSONObj DBClientInterface::findOne(const string &ns, Query query, const BSONObj *fieldsToReturn, int queryOptions) { + BSONObj DBClientInterface::findOne(const string &ns, const Query& query, const BSONObj *fieldsToReturn, int queryOptions) { auto_ptr c = this->query(ns, query, 1, 0, fieldsToReturn, queryOptions); - massert( 10276 , "DBClientBase::findOne: transport error", c.get() ); + uassert( 10276 , "DBClientBase::findOne: transport error", c.get() ); + + if ( c->hasResultFlag( ResultFlag_ShardConfigStale ) ) + throw StaleConfigException( ns , "findOne has stale config" ); if ( !c->more() ) return BSONObj(); - return c->next().copy(); + return c->nextSafe().copy(); + } + + bool DBClientConnection::connect(const HostAndPort& server, string& errmsg){ + _server = server; + _serverString = _server.toString(); + return _connect( errmsg ); } - bool DBClientConnection::connect(const string &_serverAddress, string& errmsg) { - serverAddress = _serverAddress; + bool DBClientConnection::_connect( string& errmsg ){ + _serverString = _server.toString(); + // we keep around SockAddr for connection life -- maybe MessagingPort + // requires that? + server.reset(new SockAddr(_server.host().c_str(), _server.port())); + p.reset(new MessagingPort( _timeout, _logLevel )); - string ip; - int port; - size_t idx = serverAddress.find( ":" ); - if ( idx != string::npos ) { - port = strtol( serverAddress.substr( idx + 1 ).c_str(), 0, 10 ); - ip = serverAddress.substr( 0 , idx ); - ip = hostbyname(ip.c_str()); - } else { - port = CmdLine::DefaultDBPort; - ip = hostbyname( serverAddress.c_str() ); - } - if( ip.empty() ) { - stringstream ss; - ss << "client connect: couldn't parse/resolve hostname: " << _serverAddress; - errmsg = ss.str(); + if (server->getAddr() == "0.0.0.0"){ failed = true; return false; } - // we keep around SockAddr for connection life -- maybe MessagingPort - // requires that? - server = auto_ptr(new SockAddr(ip.c_str(), port)); - p = auto_ptr(new MessagingPort()); - if ( !p->connect(*server) ) { stringstream ss; - ss << "couldn't connect to server " << serverAddress << " " << ip << ":" << port; + ss << "couldn't connect to server " << _serverString << '}'; errmsg = ss.str(); failed = true; return false; @@ -480,22 +519,21 @@ namespace mongo { return; lastReconnectTry = time(0); - log() << "trying reconnect to " << serverAddress << endl; + log(_logLevel) << "trying reconnect to " << _serverString << endl; string errmsg; - string tmp = serverAddress; failed = false; - if ( !connect(tmp.c_str(), errmsg) ) { - log() << "reconnect " << serverAddress << " failed " << errmsg << endl; + if ( ! _connect(errmsg) ) { + log(_logLevel) << "reconnect " << _serverString << " failed " << errmsg << endl; return; } - log() << "reconnect " << serverAddress << " ok" << endl; + log(_logLevel) << "reconnect " << _serverString << " ok" << endl; for( map< string, pair >::iterator i = authCache.begin(); i != authCache.end(); i++ ) { const char *dbname = i->first.c_str(); const char *username = i->second.first.c_str(); const char *password = i->second.second.c_str(); if( !DBClientBase::auth(dbname, username, password, errmsg, false) ) - log() << "reconnect: auth failed db:" << dbname << " user:" << username << ' ' << errmsg << '\n'; + log(_logLevel) << "reconnect: auth failed db:" << dbname << " user:" << username << ' ' << errmsg << '\n'; } } @@ -516,13 +554,76 @@ namespace mongo { return auto_ptr< DBClientCursor >( 0 ); } + struct DBClientFunConvertor { + void operator()( DBClientCursorBatchIterator &i ) { + while( i.moreInCurrentBatch() ) { + _f( i.nextSafe() ); + } + } + boost::function _f; + }; + + unsigned long long DBClientConnection::query( boost::function f, const string& ns, Query query, const BSONObj *fieldsToReturn, int queryOptions ) { + DBClientFunConvertor fun; + fun._f = f; + boost::function ptr( fun ); + return DBClientConnection::query( ptr, ns, query, fieldsToReturn, queryOptions ); + } + + unsigned long long DBClientConnection::query( boost::function f, const string& ns, Query query, const BSONObj *fieldsToReturn, int queryOptions ) { + // mask options + queryOptions &= (int)( QueryOption_NoCursorTimeout | QueryOption_SlaveOk ); + unsigned long long n = 0; + + bool doExhaust = ( availableOptions() & QueryOption_Exhaust ); + if ( doExhaust ) { + queryOptions |= (int)QueryOption_Exhaust; + } + auto_ptr c( this->query(ns, query, 0, 0, fieldsToReturn, queryOptions) ); + massert( 13386, "socket error for mapping query", c.get() ); + + if ( !doExhaust ) { + while( c->more() ) { + DBClientCursorBatchIterator i( *c ); + f( i ); + n += i.n(); + } + return n; + } + + try { + while( 1 ) { + while( c->moreInCurrentBatch() ) { + DBClientCursorBatchIterator i( *c ); + f( i ); + n += i.n(); + } + + if( c->getCursorId() == 0 ) + break; + + c->exhaustReceiveMore(); + } + } + catch(std::exception&) { + /* connection CANNOT be used anymore as more data may be on the way from the server. + we have to reconnect. + */ + failed = true; + p->shutdown(); + throw; + } + + return n; + } + void DBClientBase::insert( const string & ns , BSONObj obj ) { Message toSend; BufBuilder b; int opts = 0; - b.append( opts ); - b.append( ns ); + b.appendNum( opts ); + b.appendStr( ns ); obj.appendSelfToBufBuilder( b ); toSend.setData( dbInsert , b.buf() , b.len() ); @@ -535,8 +636,8 @@ namespace mongo { BufBuilder b; int opts = 0; - b.append( opts ); - b.append( ns ); + b.appendNum( opts ); + b.appendStr( ns ); for( vector< BSONObj >::const_iterator i = v.begin(); i != v.end(); ++i ) i->appendSelfToBufBuilder( b ); @@ -550,13 +651,13 @@ namespace mongo { BufBuilder b; int opts = 0; - b.append( opts ); - b.append( ns ); + b.appendNum( opts ); + b.appendStr( ns ); int flags = 0; if ( justOne ) - flags |= 1; - b.append( flags ); + flags |= RemoveOption_JustOne; + b.appendNum( flags ); obj.obj.appendSelfToBufBuilder( b ); @@ -568,13 +669,13 @@ namespace mongo { void DBClientBase::update( const string & ns , Query query , BSONObj obj , bool upsert , bool multi ) { BufBuilder b; - b.append( (int)0 ); // reserved - b.append( ns ); + b.appendNum( (int)0 ); // reserved + b.appendStr( ns ); int flags = 0; if ( upsert ) flags |= UpdateOption_Upsert; if ( multi ) flags |= UpdateOption_Multi; - b.append( flags ); + b.appendNum( flags ); query.obj.appendSelfToBufBuilder( b ); obj.appendSelfToBufBuilder( b ); @@ -599,7 +700,7 @@ namespace mongo { if ( ! runCommand( nsToDatabase( ns.c_str() ) , BSON( "deleteIndexes" << NamespaceString( ns ).coll << "index" << indexName ) , info ) ){ - log() << "dropIndex failed: " << info << endl; + log(_logLevel) << "dropIndex failed: " << info << endl; uassert( 10007 , "dropIndex failed" , 0 ); } resetIndexCache(); @@ -684,15 +785,21 @@ namespace mongo { /* -- DBClientCursor ---------------------------------------------- */ +#ifdef _DEBUG +#define CHECK_OBJECT( o , msg ) massert( 10337 , (string)"object not valid" + (msg) , (o).isValid() ) +#else +#define CHECK_OBJECT( o , msg ) +#endif + void assembleRequest( const string &ns, BSONObj query, int nToReturn, int nToSkip, const BSONObj *fieldsToReturn, int queryOptions, Message &toSend ) { CHECK_OBJECT( query , "assembleRequest query" ); // see query.h for the protocol we are using here. BufBuilder b; int opts = queryOptions; - b.append(opts); - b.append(ns.c_str()); - b.append(nToSkip); - b.append(nToReturn); + b.appendNum(opts); + b.appendStr(ns); + b.appendNum(nToSkip); + b.appendNum(nToReturn); query.appendSelfToBufBuilder(b); if ( fieldsToReturn ) fieldsToReturn->appendSelfToBufBuilder(b); @@ -713,6 +820,10 @@ namespace mongo { port().piggyBack( toSend ); } + void DBClientConnection::recv( Message &m ) { + port().recv(m); + } + bool DBClientConnection::call( Message &toSend, Message &response, bool assertOk ) { /* todo: this is very ugly messagingport::call returns an error code AND can throw an exception. we should make it return void and just throw an exception anytime @@ -722,7 +833,7 @@ namespace mongo { if ( !port().call(toSend, response) ) { failed = true; if ( assertOk ) - massert( 10278 , "dbclient error communicating with server", false); + uassert( 10278 , "dbclient error communicating with server", false); return false; } } @@ -736,178 +847,128 @@ namespace mongo { void DBClientConnection::checkResponse( const char *data, int nReturned ) { /* check for errors. the only one we really care about at this stage is "not master" */ - if ( clientPaired && nReturned ) { + if ( clientSet && nReturned ) { + assert(data); BSONObj o(data); BSONElement e = o.firstElement(); if ( strcmp(e.fieldName(), "$err") == 0 && e.type() == String && strncmp(e.valuestr(), "not master", 10) == 0 ) { - clientPaired->isntMaster(); + clientSet->isntMaster(); } } } - int DBClientCursor::nextBatchSize(){ - if ( nToReturn == 0 ) - return batchSize; - if ( batchSize == 0 ) - return nToReturn; + void DBClientConnection::killCursor( long long cursorId ){ + BufBuilder b; + b.appendNum( (int)0 ); // reserved + b.appendNum( (int)1 ); // number + b.appendNum( cursorId ); - return batchSize < nToReturn ? batchSize : nToReturn; - } - - bool DBClientCursor::init() { - Message toSend; - if ( !cursorId ) { - assembleRequest( ns, query, nextBatchSize() , nToSkip, fieldsToReturn, opts, toSend ); - } else { - BufBuilder b; - b.append( opts ); - b.append( ns.c_str() ); - b.append( nToReturn ); - b.append( cursorId ); - toSend.setData( dbGetMore, b.buf(), b.len() ); - } - if ( !connector->call( toSend, *m, false ) ) - return false; - if ( ! m->data ) - return false; - dataReceived(); - return true; + Message m; + m.setData( dbKillCursors , b.buf() , b.len() ); + + sayPiggyBack( m ); } - void DBClientCursor::requestMore() { - assert( cursorId && pos == nReturned ); - - if (haveLimit){ - nToReturn -= nReturned; - assert(nToReturn > 0); - } - BufBuilder b; - b.append(opts); - b.append(ns.c_str()); - b.append(nextBatchSize()); - b.append(cursorId); + /* --- class dbclientpaired --- */ - Message toSend; - toSend.setData(dbGetMore, b.buf(), b.len()); - auto_ptr response(new Message()); - connector->call( toSend, *response ); - - m = response; - dataReceived(); - } - - void DBClientCursor::dataReceived() { - QueryResult *qr = (QueryResult *) m->data; - resultFlags = qr->resultFlags(); - if ( qr->resultFlags() & QueryResult::ResultFlag_CursorNotFound ) { - // cursor id no longer valid at the server. - assert( qr->cursorId == 0 ); - cursorId = 0; // 0 indicates no longer valid (dead) - // TODO: should we throw a UserException here??? - } - if ( cursorId == 0 || ! ( opts & QueryOption_CursorTailable ) ) { - // only set initially: we don't want to kill it on end of data - // if it's a tailable cursor - cursorId = qr->cursorId; - } - nReturned = qr->nReturned; - pos = 0; - data = qr->data(); - - connector->checkResponse( data, nReturned ); - /* this assert would fire the way we currently work: - assert( nReturned || cursorId == 0 ); - */ + string DBClientReplicaSet::toString() { + return getServerAddress(); } - /** If true, safe to call next(). Requests more from server if necessary. */ - bool DBClientCursor::more() { - if ( !_putBack.empty() ) - return true; + DBClientReplicaSet::DBClientReplicaSet( const string& name , const vector& servers ) + : _name( name ) , _currentMaster( 0 ), _servers( servers ){ - if (haveLimit && pos >= nToReturn) - return false; - - if ( pos < nReturned ) - return true; - - if ( cursorId == 0 ) - return false; - - requestMore(); - return pos < nReturned; + for ( unsigned i=0; i<_servers.size(); i++ ) + _conns.push_back( new DBClientConnection( true , this ) ); } - - BSONObj DBClientCursor::next() { - assert( more() ); - if ( !_putBack.empty() ) { - BSONObj ret = _putBack.top(); - _putBack.pop(); - return ret; - } - pos++; - BSONObj o(data); - data += o.objsize(); - return o; - } - - DBClientCursor::~DBClientCursor() { - DESTRUCTOR_GUARD ( - if ( cursorId && _ownCursor ) { - BufBuilder b; - b.append( (int)0 ); // reserved - b.append( (int)1 ); // number - b.append( cursorId ); - - Message m; - m.setData( dbKillCursors , b.buf() , b.len() ); - - connector->sayPiggyBack( m ); - } - ); + + DBClientReplicaSet::~DBClientReplicaSet(){ + for ( unsigned i=0; i<_conns.size(); i++ ) + delete _conns[i]; + _conns.clear(); } - - /* --- class dbclientpaired --- */ - - string DBClientPaired::toString() { - stringstream ss; - ss << "state: " << master << '\n'; - ss << "left: " << left.toStringLong() << '\n'; - ss << "right: " << right.toStringLong() << '\n'; + + string DBClientReplicaSet::getServerAddress() const { + StringBuilder ss; + if ( _name.size() ) + ss << _name << "/"; + + for ( unsigned i=0; i<_servers.size(); i++ ){ + if ( i > 0 ) + ss << ","; + ss << _servers[i].toString(); + } return ss.str(); } -#pragma warning(disable: 4355) - DBClientPaired::DBClientPaired() : - left(true, this), right(true, this) - { - master = NotSetL; - } -#pragma warning(default: 4355) - /* find which server, the left or right, is currently master mode */ - void DBClientPaired::_checkMaster() { + void DBClientReplicaSet::_checkMaster() { + + bool triedQuickCheck = false; + + log( _logLevel + 1) << "_checkMaster on: " << toString() << endl; for ( int retry = 0; retry < 2; retry++ ) { - int x = master; - for ( int pass = 0; pass < 2; pass++ ) { - DBClientConnection& c = x == 0 ? left : right; + for ( unsigned i=0; i<_conns.size(); i++ ){ + DBClientConnection * c = _conns[i]; try { bool im; BSONObj o; - c.isMaster(im, &o); + c->isMaster(im, &o); + if ( retry ) - log() << "checkmaster: " << c.toString() << ' ' << o.toString() << '\n'; + log(_logLevel) << "checkmaster: " << c->toString() << ' ' << o << '\n'; + + string maybePrimary; + if ( o["hosts"].type() == Array ){ + if ( o["primary"].type() == String ) + maybePrimary = o["primary"].String(); + + BSONObjIterator hi(o["hosts"].Obj()); + while ( hi.more() ){ + string toCheck = hi.next().String(); + int found = -1; + for ( unsigned x=0; x<_servers.size(); x++ ){ + if ( toCheck == _servers[x].toString() ){ + found = x; + break; + } + } + + if ( found == -1 ){ + HostAndPort h( toCheck ); + _servers.push_back( h ); + _conns.push_back( new DBClientConnection( true, this ) ); + string temp; + _conns[ _conns.size() - 1 ]->connect( h , temp ); + log( _logLevel ) << "updated set to: " << toString() << endl; + } + + } + } + if ( im ) { - master = (State) (x + 2); + _currentMaster = c; return; } + + if ( maybePrimary.size() && ! triedQuickCheck ){ + for ( unsigned x=0; x<_servers.size(); x++ ){ + if ( _servers[i].toString() != maybePrimary ) + continue; + triedQuickCheck = true; + _conns[x]->isMaster( im , &o ); + if ( im ){ + _currentMaster = _conns[x]; + return; + } + } + } } - catch (AssertionException&) { + catch ( std::exception& e ) { if ( retry ) - log() << "checkmaster: caught exception " << c.toString() << '\n'; + log(_logLevel) << "checkmaster: caught exception " << c->toString() << ' ' << e.what() << endl; } - x = x^1; } sleepsecs(1); } @@ -915,36 +976,54 @@ namespace mongo { uassert( 10009 , "checkmaster: no master found", false); } - inline DBClientConnection& DBClientPaired::checkMaster() { - if ( master > NotSetR ) { + DBClientConnection * DBClientReplicaSet::checkMaster() { + if ( _currentMaster ){ // a master is selected. let's just make sure connection didn't die - DBClientConnection& c = master == Left ? left : right; - if ( !c.isFailed() ) - return c; - // after a failure, on the next checkMaster, start with the other - // server -- presumably it took over. (not critical which we check first, - // just will make the failover slightly faster if we guess right) - master = master == Left ? NotSetR : NotSetL; + if ( ! _currentMaster->isFailed() ) + return _currentMaster; + _currentMaster = 0; } _checkMaster(); - assert( master > NotSetR ); - return master == Left ? left : right; + assert( _currentMaster ); + return _currentMaster; } - DBClientConnection& DBClientPaired::slaveConn(){ - DBClientConnection& m = checkMaster(); - assert( ! m.isFailed() ); - return master == Left ? right : left; + DBClientConnection& DBClientReplicaSet::masterConn(){ + return *checkMaster(); } - bool DBClientPaired::connect(const string &serverHostname1, const string &serverHostname2) { + DBClientConnection& DBClientReplicaSet::slaveConn(){ + DBClientConnection * m = checkMaster(); + assert( ! m->isFailed() ); + + DBClientConnection * failedSlave = 0; + + for ( unsigned i=0; i<_conns.size(); i++ ){ + if ( m == _conns[i] ) + continue; + failedSlave = _conns[i]; + if ( _conns[i]->isFailed() ) + continue; + return *_conns[i]; + } + + assert(failedSlave); + return *failedSlave; + } + + bool DBClientReplicaSet::connect(){ string errmsg; - bool l = left.connect(serverHostname1, errmsg); - bool r = right.connect(serverHostname2, errmsg); - master = l ? NotSetL : NotSetR; - if ( !l && !r ) // it would be ok to fall through, but checkMaster will then try an immediate reconnect which is slow + + bool anyGood = false; + for ( unsigned i=0; i<_conns.size(); i++ ){ + if ( _conns[i]->connect( _servers[i] , errmsg ) ) + anyGood = true; + } + + if ( ! anyGood ) return false; + try { checkMaster(); } @@ -954,61 +1033,44 @@ namespace mongo { return true; } - bool DBClientPaired::connect(string hostpairstring) { - size_t comma = hostpairstring.find( "," ); - uassert( 10010 , "bad hostpairstring", comma != string::npos); - return connect( hostpairstring.substr( 0 , comma ) , hostpairstring.substr( comma + 1 ) ); - } - - bool DBClientPaired::auth(const string &dbname, const string &username, const string &pwd, string& errmsg) { - DBClientConnection& m = checkMaster(); - if( !m.auth(dbname, username, pwd, errmsg) ) + bool DBClientReplicaSet::auth(const string &dbname, const string &username, const string &pwd, string& errmsg, bool digestPassword ) { + DBClientConnection * m = checkMaster(); + if( !m->auth(dbname, username, pwd, errmsg, digestPassword ) ) return false; + /* we try to authentiate with the other half of the pair -- even if down, that way the authInfo is cached. */ - string e; - try { - if( &m == &left ) - right.auth(dbname, username, pwd, e); - else - left.auth(dbname, username, pwd, e); - } - catch( AssertionException&) { - } + for ( unsigned i=0; i<_conns.size(); i++ ){ + if ( _conns[i] == m ) + continue; + try { + string e; + _conns[i]->auth( dbname , username , pwd , e , digestPassword ); + } + catch ( AssertionException& ){ + } + } + return true; } - auto_ptr DBClientPaired::query(const string &a, Query b, int c, int d, - const BSONObj *e, int f, int g) - { - return checkMaster().query(a,b,c,d,e,f,g); - } - - BSONObj DBClientPaired::findOne(const string &a, Query b, const BSONObj *c, int d) { - return checkMaster().findOne(a,b,c,d); - } - - void testPaired() { - DBClientPaired p; - log() << "connect returns " << p.connect("localhost:27017", "localhost:27018") << endl; - - //DBClientConnection p(true); - string errmsg; - // log() << "connect " << p.connect("localhost", errmsg) << endl; - log() << "auth " << p.auth("dwight", "u", "p", errmsg) << endl; - - while( 1 ) { - sleepsecs(3); - try { - log() << "findone returns " << p.findOne("dwight.foo", BSONObj()).toString() << endl; - sleepsecs(3); - BSONObj info; - bool im; - log() << "ismaster returns " << p.isMaster(im,&info) << " info: " << info.toString() << endl; - } - catch(...) { - cout << "caught exception" << endl; - } - } - } + auto_ptr DBClientReplicaSet::query(const string &a, Query b, int c, int d, + const BSONObj *e, int f, int g){ + // TODO: if slave ok is set go to a slave + return checkMaster()->query(a,b,c,d,e,f,g); + } + BSONObj DBClientReplicaSet::findOne(const string &a, const Query& b, const BSONObj *c, int d) { + return checkMaster()->findOne(a,b,c,d); + } + + bool serverAlive( const string &uri ) { + DBClientConnection c( false, 0, 20 ); // potentially the connection to server could fail while we're checking if it's alive - so use timeouts + string err; + if ( !c.connect( uri, err ) ) + return false; + if ( !c.simpleCommand( "admin", 0, "ping" ) ) + return false; + return true; + } + } // namespace mongo diff --git a/client/dbclient.h b/client/dbclient.h index a2fad8e..639d960 100644 --- a/client/dbclient.h +++ b/client/dbclient.h @@ -17,7 +17,7 @@ #pragma once -#include "../stdafx.h" +#include "../pch.h" #include "../util/message.h" #include "../db/jsobj.h" #include "../db/json.h" @@ -51,7 +51,7 @@ namespace mongo { // an extended period of time. QueryOption_OplogReplay = 1 << 3, - /** The server normally times out idle cursors after an inactivy period to prevent excess memory use + /** The server normally times out idle cursors after an inactivy period to prevent excess memory uses Set this option to prevent that. */ QueryOption_NoCursorTimeout = 1 << 4, @@ -59,7 +59,18 @@ namespace mongo { /** Use with QueryOption_CursorTailable. If we are at the end of the data, block for a while rather than returning no data. After a timeout period, we do return as normal. */ - QueryOption_AwaitData = 1 << 5 + QueryOption_AwaitData = 1 << 5, + + /** Stream the data down full blast in multiple "more" packages, on the assumption that the client + will fully read all data queried. Faster when you are pulling a lot of data and know you want to + pull it all down. Note: it is not allowed to not read all the data unless you close the connection. + + Use the query( boost::function f, ... ) version of the connection's query() + method, and it will take care of all the details for you. + */ + QueryOption_Exhaust = 1 << 6, + + QueryOption_AllSupported = QueryOption_CursorTailable | QueryOption_SlaveOk | QueryOption_OplogReplay | QueryOption_NoCursorTimeout | QueryOption_AwaitData | QueryOption_Exhaust }; @@ -69,10 +80,129 @@ namespace mongo { /** Update multiple documents (if multiple documents match query expression). (Default is update a single document and stop.) */ - UpdateOption_Multi = 1 << 1 + UpdateOption_Multi = 1 << 1, + + /** flag from mongo saying this update went everywhere */ + UpdateOption_Broadcast = 1 << 2 + }; + + enum RemoveOptions { + /** only delete one option */ + RemoveOption_JustOne = 1 << 0, + + /** flag from mongo saying this update went everywhere */ + RemoveOption_Broadcast = 1 << 1 + }; + + class DBClientBase; + + class ConnectionString { + public: + enum ConnectionType { INVALID , MASTER , PAIR , SET , SYNC }; + + ConnectionString( const HostAndPort& server ){ + _type = MASTER; + _servers.push_back( server ); + _finishInit(); + } + + ConnectionString( ConnectionType type , const vector& servers ) + : _type( type ) , _servers( servers ){ + _finishInit(); + } + + ConnectionString( ConnectionType type , const string& s , const string& setName = "" ){ + _type = type; + _setName = setName; + _fillServers( s ); + + switch ( _type ){ + case MASTER: + assert( _servers.size() == 1 ); + break; + case SET: + assert( _setName.size() ); + assert( _servers.size() >= 1 ); // 1 is ok since we can derive + break; + case PAIR: + assert( _servers.size() == 2 ); + break; + default: + assert( _servers.size() > 0 ); + } + + _finishInit(); + } + + ConnectionString( const string& s , ConnectionType favoredMultipleType ){ + _fillServers( s ); + if ( _servers.size() == 1 ){ + _type = MASTER; + } + else { + _type = favoredMultipleType; + assert( _type != MASTER ); + } + _finishInit(); + } + + bool isValid() const { return _type != INVALID; } + + string toString() const { + return _string; + } + + DBClientBase* connect( string& errmsg ) const; + + static ConnectionString parse( const string& url , string& errmsg ); + + private: + + ConnectionString(){ + _type = INVALID; + } + + void _fillServers( string s ){ + string::size_type idx; + while ( ( idx = s.find( ',' ) ) != string::npos ){ + _servers.push_back( s.substr( 0 , idx ) ); + s = s.substr( idx + 1 ); + } + _servers.push_back( s ); + } + + void _finishInit(){ + stringstream ss; + if ( _type == SET ) + ss << _setName << "/"; + for ( unsigned i=0; i<_servers.size(); i++ ){ + if ( i > 0 ) + ss << ","; + ss << _servers[i].toString(); + } + _string = ss.str(); + } + + ConnectionType _type; + vector _servers; + string _string; + string _setName; + }; + + /** + * controls how much a clients cares about writes + * default is NORMAL + */ + enum WriteConcern { + W_NONE = 0 , // TODO: not every connection type fully supports this + W_NORMAL = 1 + // TODO SAFE = 2 }; class BSONObj; + class ScopedDbConnection; + class DBClientCursor; + class DBClientCursorBatchIterator; /** Represents a Mongo query expression. Typically one uses the QUERY(...) macro to construct a Query object. Examples: @@ -160,7 +290,7 @@ namespace mongo { /** * if this query has an orderby, hint, or some other field */ - bool isComplex() const; + bool isComplex( bool * hasDollar = 0 ) const; BSONObj getFilter() const; BSONObj getSort() const; @@ -195,146 +325,12 @@ namespace mongo { virtual bool call( Message &toSend, Message &response, bool assertOk=true ) = 0; virtual void say( Message &toSend ) = 0; virtual void sayPiggyBack( Message &toSend ) = 0; - virtual void checkResponse( const string &data, int nReturned ) {} - }; - - /** Queries return a cursor object */ - class DBClientCursor : boost::noncopyable { - friend class DBClientBase; - bool init(); - public: - /** If true, safe to call next(). Requests more from server if necessary. */ - bool more(); + virtual void checkResponse( const char* data, int nReturned ) {} - /** If true, there is more in our local buffers to be fetched via next(). Returns - false when a getMore request back to server would be required. You can use this - if you want to exhaust whatever data has been fetched to the client already but - then perhaps stop. - */ - bool moreInCurrentBatch() { return !_putBack.empty() || pos < nReturned; } - - /** next - @return next object in the result cursor. - on an error at the remote server, you will get back: - { $err: } - if you do not want to handle that yourself, call nextSafe(). - */ - BSONObj next(); - - /** - restore an object previously returned by next() to the cursor - */ - void putBack( const BSONObj &o ) { _putBack.push( o.getOwned() ); } - - /** throws AssertionException if get back { $err : ... } */ - BSONObj nextSafe() { - BSONObj o = next(); - BSONElement e = o.firstElement(); - assert( strcmp(e.fieldName(), "$err") != 0 ); - return o; - } - - /** - iterate the rest of the cursor and return the number if items - */ - int itcount(){ - int c = 0; - while ( more() ){ - next(); - c++; - } - return c; - } - - /** cursor no longer valid -- use with tailable cursors. - note you should only rely on this once more() returns false; - 'dead' may be preset yet some data still queued and locally - available from the dbclientcursor. - */ - bool isDead() const { - return cursorId == 0; - } - - bool tailable() const { - return (opts & QueryOption_CursorTailable) != 0; - } - - /** see QueryResult::ResultFlagType (db/dbmessage.h) for flag values - mostly these flags are for internal purposes - - ResultFlag_ErrSet is the possible exception to that - */ - bool hasResultFlag( int flag ){ - return (resultFlags & flag) != 0; - } - - DBClientCursor( DBConnector *_connector, const string &_ns, BSONObj _query, int _nToReturn, - int _nToSkip, const BSONObj *_fieldsToReturn, int queryOptions , int bs ) : - connector(_connector), - ns(_ns), - query(_query), - nToReturn(_nToReturn), - haveLimit( _nToReturn > 0 && !(queryOptions & QueryOption_CursorTailable)), - nToSkip(_nToSkip), - fieldsToReturn(_fieldsToReturn), - opts(queryOptions), - batchSize(bs), - m(new Message()), - cursorId(), - nReturned(), - pos(), - data(), - _ownCursor( true ) { - } - - DBClientCursor( DBConnector *_connector, const string &_ns, long long _cursorId, int _nToReturn, int options ) : - connector(_connector), - ns(_ns), - nToReturn( _nToReturn ), - haveLimit( _nToReturn > 0 && !(options & QueryOption_CursorTailable)), - opts( options ), - m(new Message()), - cursorId( _cursorId ), - nReturned(), - pos(), - data(), - _ownCursor( true ) { - } - - virtual ~DBClientCursor(); - - long long getCursorId() const { return cursorId; } - - /** by default we "own" the cursor and will send the server a KillCursor - message when ~DBClientCursor() is called. This function overrides that. - */ - void decouple() { _ownCursor = false; } - - private: - - int nextBatchSize(); - - DBConnector *connector; - string ns; - BSONObj query; - int nToReturn; - bool haveLimit; - int nToSkip; - const BSONObj *fieldsToReturn; - int opts; - int batchSize; - auto_ptr m; - stack< BSONObj > _putBack; - - int resultFlags; - long long cursorId; - int nReturned; - int pos; - const char *data; - void dataReceived(); - void requestMore(); - bool _ownCursor; // see decouple() + /* used by QueryOption_Exhaust. To use that your subclass must implement this. */ + virtual void recv( Message& m ) { assert(false); } }; - + /** The interface that any db connection should implement */ @@ -343,6 +339,7 @@ namespace mongo { virtual auto_ptr query(const string &ns, Query query, int nToReturn = 0, int nToSkip = 0, const BSONObj *fieldsToReturn = 0, int queryOptions = 0 , int batchSize = 0 ) = 0; + /** don't use this - called automatically by DBClientCursor for you */ virtual auto_ptr getMore( const string &ns, long long cursorId, int nToReturn = 0, int options = 0 ) = 0; virtual void insert( const string &ns, BSONObj obj ) = 0; @@ -359,7 +356,7 @@ namespace mongo { @return a single object that matches the query. if none do, then the object is empty @throws AssertionException */ - virtual BSONObj findOne(const string &ns, Query query, const BSONObj *fieldsToReturn = 0, int queryOptions = 0); + virtual BSONObj findOne(const string &ns, const Query& query, const BSONObj *fieldsToReturn = 0, int queryOptions = 0); }; @@ -371,33 +368,38 @@ namespace mongo { class DBClientWithCommands : public DBClientInterface { set _seenIndexes; public: + /** controls how chatty the client is about network errors & such. See log.h */ + int _logLevel; + + DBClientWithCommands() : _logLevel(0), _cachedAvailableOptions( (enum QueryOptions)0 ), _haveCachedAvailableOptions(false) { } - /** helper function. run a simple command where the command expression is simply - { command : 1 } + /** helper function. run a simple command where the command expression is simply + { command : 1 } @param info -- where to put result object. may be null if caller doesn't need that info @param command -- command name - @return true if the command returned "ok". - */ + @return true if the command returned "ok". + */ bool simpleCommand(const string &dbname, BSONObj *info, const string &command); /** Run a database command. Database commands are represented as BSON objects. Common database commands have prebuilt helper functions -- see below. If a helper is not available you can - directly call runCommand. + directly call runCommand. @param dbname database name. Use "admin" for global administrative commands. @param cmd the command object to execute. For example, { ismaster : 1 } @param info the result object the database returns. Typically has { ok : ..., errmsg : ... } fields set. - @return true if the command returned "ok". + @param options see enum QueryOptions - normally not needed to run a command + @return true if the command returned "ok". */ virtual bool runCommand(const string &dbname, const BSONObj& cmd, BSONObj &info, int options=0); /** Authorize access to a particular database. - Authentication is separate for each database on the server -- you may authenticate for any - number of databases on a single connection. - The "admin" database is special and once authenticated provides access to all databases on the - server. - @param digestPassword if password is plain text, set this to true. otherwise assumed to be pre-digested + Authentication is separate for each database on the server -- you may authenticate for any + number of databases on a single connection. + The "admin" database is special and once authenticated provides access to all databases on the + server. + @param digestPassword if password is plain text, set this to true. otherwise assumed to be pre-digested @return true if successful */ virtual bool auth(const string &dbname, const string &username, const string &pwd, string& errmsg, bool digestPassword = true); @@ -425,17 +427,17 @@ namespace mongo { If the collection already exists, no action occurs. - ns: fully qualified collection name - size: desired initial extent size for the collection. - Must be <= 1000000000 for normal collections. - For fixed size (capped) collections, this size is the total/max size of the - collection. - capped: if true, this is a fixed size collection (where old data rolls out). - max: maximum number of objects if capped (optional). + @param ns fully qualified collection name + @param size desired initial extent size for the collection. + Must be <= 1000000000 for normal collections. + For fixed size (capped) collections, this size is the total/max size of the + collection. + @param capped if true, this is a fixed size collection (where old data rolls out). + @param max maximum number of objects if capped (optional). returns true if successful. */ - bool createCollection(const string &ns, unsigned size = 0, bool capped = false, int max = 0, BSONObj *info = 0); + bool createCollection(const string &ns, long long size = 0, bool capped = false, int max = 0, BSONObj *info = 0); /** Get error result from the last operation on this connection. @return error message text, or empty string if no error. @@ -444,7 +446,9 @@ namespace mongo { /** Get error result from the last operation on this connection. @return full error object. */ - BSONObj getLastErrorDetailed(); + virtual BSONObj getLastErrorDetailed(); + + static string getLastErrorString( const BSONObj& res ); /** Return the last error which has occurred, even if not the very last operation. @@ -595,6 +599,8 @@ namespace mongo { /** get a list of all the current databases + uses the { listDatabases : 1 } command. + throws on error */ list getDatabaseNames(); @@ -605,7 +611,6 @@ namespace mongo { bool exists( const string& ns ); - /** Create an index if it does not already exist. ensureIndex calls are remembered so it is safe/fast to call this function many times in your code. @@ -666,25 +671,39 @@ namespace mongo { protected: bool isOk(const BSONObj&); - + + enum QueryOptions availableOptions(); + + private: + enum QueryOptions _cachedAvailableOptions; + bool _haveCachedAvailableOptions; }; /** abstract class that implements the core db operations */ class DBClientBase : public DBClientWithCommands, public DBConnector { + protected: + WriteConcern _writeConcern; + public: + DBClientBase(){ + _writeConcern = W_NORMAL; + } + + WriteConcern getWriteConcern() const { return _writeConcern; } + void setWriteConcern( WriteConcern w ){ _writeConcern = w; } + /** send a query to the database. - ns: namespace to query, format is .[.]* - query: query to perform on the collection. this is a BSONObj (binary JSON) + @param ns namespace to query, format is .[.]* + @param query query to perform on the collection. this is a BSONObj (binary JSON) You may format as { query: { ... }, orderby: { ... } } to specify a sort order. - nToReturn: n to return. 0 = unlimited - nToSkip: start with the nth item - fieldsToReturn: - optional template of which fields to select. if unspecified, returns all fields - queryOptions: see options enum at top of this file + @param nToReturn n to return. 0 = unlimited + @param nToSkip start with the nth item + @param fieldsToReturn optional template of which fields to select. if unspecified, returns all fields + @param queryOptions see options enum at top of this file @return cursor. 0 if error (connection failure) @throws AssertionException @@ -692,12 +711,13 @@ namespace mongo { virtual auto_ptr query(const string &ns, Query query, int nToReturn = 0, int nToSkip = 0, const BSONObj *fieldsToReturn = 0, int queryOptions = 0 , int batchSize = 0 ); - /** @param cursorId id of cursor to retrieve + /** don't use this - called automatically by DBClientCursor for you + @param cursorId id of cursor to retrieve @return an handle to a previously allocated cursor @throws AssertionException */ virtual auto_ptr getMore( const string &ns, long long cursorId, int nToReturn = 0, int options = 0 ); - + /** insert an object into the database */ @@ -717,11 +737,13 @@ namespace mongo { /** updates objects matching query */ - virtual void update( const string &ns , Query query , BSONObj obj , bool upsert = 0 , bool multi = 0 ); + virtual void update( const string &ns , Query query , BSONObj obj , bool upsert = false , bool multi = false ); virtual string getServerAddress() const = 0; virtual bool isFailed() const = 0; + + virtual void killCursor( long long cursorID ) = 0; static int countCommas( const string& s ){ int n = 0; @@ -730,9 +752,15 @@ namespace mongo { n++; return n; } - }; + + virtual bool callRead( Message& toSend , Message& response ) = 0; + // virtual bool callWrite( Message& toSend , Message& response ) = 0; // TODO: add this if needed + virtual void say( Message& toSend ) = 0; + + virtual ConnectionString::ConnectionType type() const = 0; + }; // DBClientBase - class DBClientPaired; + class DBClientReplicaSet; class ConnectException : public UserException { public: @@ -744,24 +772,31 @@ namespace mongo { This is the main entry point for talking to a simple Mongo setup */ class DBClientConnection : public DBClientBase { - DBClientPaired *clientPaired; - auto_ptr p; - auto_ptr server; + DBClientReplicaSet *clientSet; + boost::scoped_ptr p; + boost::scoped_ptr server; bool failed; // true if some sort of fatal error has ever happened bool autoReconnect; time_t lastReconnectTry; - string serverAddress; // remember for reconnects + HostAndPort _server; // remember for reconnects + string _serverString; + int _port; void _checkConnection(); void checkConnection() { if( failed ) _checkConnection(); } map< string, pair > authCache; + int _timeout; + + bool _connect( string& errmsg ); public: /** @param _autoReconnect if true, automatically reconnect on a connection failure - @param cp used by DBClientPaired. You do not need to specify this parameter + @param cp used by DBClientReplicaSet. You do not need to specify this parameter + @param timeout tcp timeout in seconds - this is for read/write, not connect. + Connect timeout is fixed, but short, at 5 seconds. */ - DBClientConnection(bool _autoReconnect=false,DBClientPaired* cp=0) : - clientPaired(cp), failed(false), autoReconnect(_autoReconnect), lastReconnectTry(0) { } + DBClientConnection(bool _autoReconnect=false, DBClientReplicaSet* cp=0, int timeout=0) : + clientSet(cp), failed(false), autoReconnect(_autoReconnect), lastReconnectTry(0), _timeout(timeout) { } /** Connect to a Mongo database server. @@ -769,10 +804,27 @@ namespace mongo { false was returned -- it will try to connect again. @param serverHostname host to connect to. can include port number ( 127.0.0.1 , 127.0.0.1:5555 ) + If you use IPv6 you must add a port number ( ::1:27017 ) @param errmsg any relevant error message will appended to the string + @deprecated please use HostAndPort @return false if fails to connect. */ - virtual bool connect(const string &serverHostname, string& errmsg); + virtual bool connect(const char * hostname, string& errmsg){ + // TODO: remove this method + HostAndPort t( hostname ); + return connect( t , errmsg ); + } + + /** Connect to a Mongo database server. + + If autoReconnect is true, you can try to use the DBClientConnection even when + false was returned -- it will try to connect again. + + @param server server to connect to. + @param errmsg any relevant error message will appended to the string + @return false if fails to connect. + */ + virtual bool connect(const HostAndPort& server, string& errmsg); /** Connect to a Mongo database server. Exception throwing version. Throws a UserException if cannot connect. @@ -782,20 +834,26 @@ namespace mongo { @param serverHostname host to connect to. can include port number ( 127.0.0.1 , 127.0.0.1:5555 ) */ - void connect(string serverHostname) { + void connect(const string& serverHostname) { string errmsg; - if( !connect(serverHostname.c_str(), errmsg) ) + if( !connect(HostAndPort(serverHostname), errmsg) ) throw ConnectException(string("can't connect ") + errmsg); } virtual bool auth(const string &dbname, const string &username, const string &pwd, string& errmsg, bool digestPassword = true); - virtual auto_ptr query(const string &ns, Query query, int nToReturn = 0, int nToSkip = 0, + virtual auto_ptr query(const string &ns, Query query=Query(), int nToReturn = 0, int nToSkip = 0, const BSONObj *fieldsToReturn = 0, int queryOptions = 0 , int batchSize = 0 ) { checkConnection(); return DBClientBase::query( ns, query, nToReturn, nToSkip, fieldsToReturn, queryOptions , batchSize ); } + /** uses QueryOption_Exhaust + use DBClientCursorBatchIterator if you want to do items in large blocks, perhpas to avoid granular locking and such. + */ + unsigned long long query( boost::function f, const string& ns, Query query, const BSONObj *fieldsToReturn = 0, int queryOptions = 0); + unsigned long long query( boost::function f, const string& ns, Query query, const BSONObj *fieldsToReturn = 0, int queryOptions = 0); + /** @return true if this connection is currently in a failed state. When autoreconnect is on, a connection will transition back to an ok state after reconnecting. @@ -805,67 +863,75 @@ namespace mongo { } MessagingPort& port() { - return *p.get(); + return *p; } string toStringLong() const { stringstream ss; - ss << serverAddress; + ss << _serverString; if ( failed ) ss << " failed"; return ss.str(); } /** Returns the address of the server */ string toString() { - return serverAddress; + return _serverString; } string getServerAddress() const { - return serverAddress; + return _serverString; + } + + virtual void killCursor( long long cursorID ); + + virtual bool callRead( Message& toSend , Message& response ){ + return call( toSend , response ); } - virtual bool call( Message &toSend, Message &response, bool assertOk = true ); virtual void say( Message &toSend ); + virtual bool call( Message &toSend, Message &response, bool assertOk = true ); + + virtual ConnectionString::ConnectionType type() const { return ConnectionString::MASTER; } + protected: + friend class SyncClusterConnection; + virtual void recv( Message& m ); virtual void sayPiggyBack( Message &toSend ); virtual void checkResponse( const char *data, int nReturned ); }; - - /** Use this class to connect to a replica pair of servers. The class will manage - checking for which server in a replica pair is master, and do failover automatically. - + + /** Use this class to connect to a replica set of servers. The class will manage + checking for which server in a replica set is master, and do failover automatically. + + This can also be used to connect to replica pairs since pairs are a subset of sets + On a failover situation, expect at least one operation to return an error (throw an exception) before the failover is complete. Operations are not retried. */ - class DBClientPaired : public DBClientBase { - DBClientConnection left,right; - enum State { - NotSetL=0, - NotSetR=1, - Left, Right - } master; + class DBClientReplicaSet : public DBClientBase { + string _name; + DBClientConnection * _currentMaster; + vector _servers; + vector _conns; + void _checkMaster(); - DBClientConnection& checkMaster(); + DBClientConnection * checkMaster(); public: - /** Call connect() after constructing. autoReconnect is always on for DBClientPaired connections. */ - DBClientPaired(); + /** Call connect() after constructing. autoReconnect is always on for DBClientReplicaSet connections. */ + DBClientReplicaSet( const string& name , const vector& servers ); + virtual ~DBClientReplicaSet(); - /** Returns false is neither member of the pair were reachable, or neither is + /** Returns false if nomember of the set were reachable, or neither is master, although, when false returned, you can still try to use this connection object, it will try reconnects. */ - bool connect(const string &serverHostname1, const string &serverHostname2); + bool connect(); - /** Connect to a server pair using a host pair string of the form - hostname[:port],hostname[:port] - */ - bool connect(string hostpairstring); - - /** Authorize. Authorizes both sides of the pair as needed. + /** Authorize. Authorizes all nodes as needed */ - bool auth(const string &dbname, const string &username, const string &pwd, string& errmsg); + virtual bool auth(const string &dbname, const string &username, const string &pwd, string& errmsg, bool digestPassword = true ); /** throws userassertion "no master found" */ virtual @@ -874,56 +940,69 @@ namespace mongo { /** throws userassertion "no master found" */ virtual - BSONObj findOne(const string &ns, Query query, const BSONObj *fieldsToReturn = 0, int queryOptions = 0); + BSONObj findOne(const string &ns, const Query& query, const BSONObj *fieldsToReturn = 0, int queryOptions = 0); /** insert */ virtual void insert( const string &ns , BSONObj obj ) { - checkMaster().insert(ns, obj); + checkMaster()->insert(ns, obj); } /** insert multiple objects. Note that single object insert is asynchronous, so this version is only nominally faster and not worth a special effort to try to use. */ virtual void insert( const string &ns, const vector< BSONObj >& v ) { - checkMaster().insert(ns, v); + checkMaster()->insert(ns, v); } /** remove */ virtual void remove( const string &ns , Query obj , bool justOne = 0 ) { - checkMaster().remove(ns, obj, justOne); + checkMaster()->remove(ns, obj, justOne); } /** update */ virtual void update( const string &ns , Query query , BSONObj obj , bool upsert = 0 , bool multi = 0 ) { - return checkMaster().update(ns, query, obj, upsert,multi); + return checkMaster()->update(ns, query, obj, upsert,multi); } + virtual void killCursor( long long cursorID ){ + checkMaster()->killCursor( cursorID ); + } + string toString(); /* this is the callback from our underlying connections to notify us that we got a "not master" error. */ void isntMaster() { - master = ( ( master == Left ) ? NotSetR : NotSetL ); + _currentMaster = 0; } - string getServerAddress() const { - return left.getServerAddress() + "," + right.getServerAddress(); - } - + string getServerAddress() const; + + DBClientConnection& masterConn(); DBClientConnection& slaveConn(); - /* TODO - not yet implemented. mongos may need these. */ - virtual bool call( Message &toSend, Message &response, bool assertOk=true ) { assert(false); return false; } - virtual void say( Message &toSend ) { assert(false); } + + virtual bool call( Message &toSend, Message &response, bool assertOk=true ) { return checkMaster()->call( toSend , response , assertOk ); } + virtual void say( Message &toSend ) { checkMaster()->say( toSend ); } + virtual bool callRead( Message& toSend , Message& response ){ return checkMaster()->callRead( toSend , response ); } + + virtual ConnectionString::ConnectionType type() const { return ConnectionString::SET; } + + protected: virtual void sayPiggyBack( Message &toSend ) { assert(false); } virtual void checkResponse( const char *data, int nReturned ) { assert(false); } bool isFailed() const { - // TODO: this really should check isFailed on current master as well - return master < Left; + return _currentMaster == 0 || _currentMaster->isFailed(); } }; + /** pings server to check if it's up + */ + bool serverAlive( const string &uri ); DBClientBase * createDirectClient(); } // namespace mongo + +#include "dbclientcursor.h" +#include "undef_macros.h" diff --git a/client/dbclientcursor.cpp b/client/dbclientcursor.cpp new file mode 100644 index 0000000..07771bb --- /dev/null +++ b/client/dbclientcursor.cpp @@ -0,0 +1,232 @@ +// dbclient.cpp - connect to a Mongo database as a database, from C++ + +/* Copyright 2009 10gen Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "pch.h" +#include "dbclient.h" +#include "../db/dbmessage.h" +#include "../db/cmdline.h" +#include "connpool.h" +#include "../s/shard.h" + +namespace mongo { + + void assembleRequest( const string &ns, BSONObj query, int nToReturn, int nToSkip, const BSONObj *fieldsToReturn, int queryOptions, Message &toSend ); + + int DBClientCursor::nextBatchSize(){ + + if ( nToReturn == 0 ) + return batchSize; + + if ( batchSize == 0 ) + return nToReturn; + + return batchSize < nToReturn ? batchSize : nToReturn; + } + + bool DBClientCursor::init() { + Message toSend; + if ( !cursorId ) { + assembleRequest( ns, query, nextBatchSize() , nToSkip, fieldsToReturn, opts, toSend ); + } else { + BufBuilder b; + b.appendNum( opts ); + b.appendStr( ns ); + b.appendNum( nToReturn ); + b.appendNum( cursorId ); + toSend.setData( dbGetMore, b.buf(), b.len() ); + } + if ( !connector->call( toSend, *m, false ) ) + return false; + if ( m->empty() ) + return false; + dataReceived(); + return true; + } + + void DBClientCursor::requestMore() { + assert( cursorId && pos == nReturned ); + + if (haveLimit){ + nToReturn -= nReturned; + assert(nToReturn > 0); + } + BufBuilder b; + b.appendNum(opts); + b.appendStr(ns); + b.appendNum(nextBatchSize()); + b.appendNum(cursorId); + + Message toSend; + toSend.setData(dbGetMore, b.buf(), b.len()); + auto_ptr response(new Message()); + + if ( connector ){ + connector->call( toSend, *response ); + m = response; + dataReceived(); + } + else { + assert( _scopedHost.size() ); + ScopedDbConnection conn( _scopedHost ); + conn->call( toSend , *response ); + connector = conn.get(); + m = response; + dataReceived(); + connector = 0; + conn.done(); + } + } + + /** with QueryOption_Exhaust, the server just blasts data at us (marked at end with cursorid==0). */ + void DBClientCursor::exhaustReceiveMore() { + assert( cursorId && pos == nReturned ); + assert( !haveLimit ); + auto_ptr response(new Message()); + assert( connector ); + connector->recv(*response); + m = response; + dataReceived(); + } + + void DBClientCursor::dataReceived() { + QueryResult *qr = (QueryResult *) m->singleData(); + resultFlags = qr->resultFlags(); + + if ( qr->resultFlags() & ResultFlag_CursorNotFound ) { + // cursor id no longer valid at the server. + assert( qr->cursorId == 0 ); + cursorId = 0; // 0 indicates no longer valid (dead) + if ( ! ( opts & QueryOption_CursorTailable ) ) + throw UserException( 13127 , "getMore: cursor didn't exist on server, possible restart or timeout?" ); + } + + if ( cursorId == 0 || ! ( opts & QueryOption_CursorTailable ) ) { + // only set initially: we don't want to kill it on end of data + // if it's a tailable cursor + cursorId = qr->cursorId; + } + + nReturned = qr->nReturned; + pos = 0; + data = qr->data(); + + connector->checkResponse( data, nReturned ); + /* this assert would fire the way we currently work: + assert( nReturned || cursorId == 0 ); + */ + } + + /** If true, safe to call next(). Requests more from server if necessary. */ + bool DBClientCursor::more() { + _assertIfNull(); + + if ( !_putBack.empty() ) + return true; + + if (haveLimit && pos >= nToReturn) + return false; + + if ( pos < nReturned ) + return true; + + if ( cursorId == 0 ) + return false; + + requestMore(); + return pos < nReturned; + } + + BSONObj DBClientCursor::next() { + DEV _assertIfNull(); + if ( !_putBack.empty() ) { + BSONObj ret = _putBack.top(); + _putBack.pop(); + return ret; + } + + uassert(13422, "DBClientCursor next() called but more() is false", pos < nReturned); + + pos++; + BSONObj o(data); + data += o.objsize(); + /* todo would be good to make data null at end of batch for safety */ + return o; + } + + void DBClientCursor::peek(vector& v, int atMost) { + int m = atMost; + + /* + for( stack::iterator i = _putBack.begin(); i != _putBack.end(); i++ ) { + if( m == 0 ) + return; + v.push_back(*i); + m--; + n++; + } + */ + + int p = pos; + const char *d = data; + while( m && p < nReturned ) { + BSONObj o(d); + d += o.objsize(); + p++; + m--; + v.push_back(o); + } + } + + void DBClientCursor::attach( AScopedConnection * conn ){ + assert( _scopedHost.size() == 0 ); + assert( connector == conn->get() ); + _scopedHost = conn->getHost(); + conn->done(); + connector = 0; + } + + DBClientCursor::~DBClientCursor() { + if (!this) + return; + + DESTRUCTOR_GUARD ( + + if ( cursorId && _ownCursor ) { + BufBuilder b; + b.appendNum( (int)0 ); // reserved + b.appendNum( (int)1 ); // number + b.appendNum( cursorId ); + + Message m; + m.setData( dbKillCursors , b.buf() , b.len() ); + + if ( connector ){ + connector->sayPiggyBack( m ); + } + else { + assert( _scopedHost.size() ); + ScopedDbConnection conn( _scopedHost ); + conn->sayPiggyBack( m ); + conn.done(); + } + } + + ); + } + + +} // namespace mongo diff --git a/client/dbclientcursor.h b/client/dbclientcursor.h new file mode 100644 index 0000000..51cdc13 --- /dev/null +++ b/client/dbclientcursor.h @@ -0,0 +1,204 @@ +// file dbclientcursor.h + +/* Copyright 2009 10gen Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "../pch.h" +#include "../util/message.h" +#include "../db/jsobj.h" +#include "../db/json.h" +#include + +namespace mongo { + + class AScopedConnection; + + /** Queries return a cursor object */ + class DBClientCursor : boost::noncopyable { + public: + /** If true, safe to call next(). Requests more from server if necessary. */ + bool more(); + + /** If true, there is more in our local buffers to be fetched via next(). Returns + false when a getMore request back to server would be required. You can use this + if you want to exhaust whatever data has been fetched to the client already but + then perhaps stop. + */ + int objsLeftInBatch() const { _assertIfNull(); return _putBack.size() + nReturned - pos; } + bool moreInCurrentBatch() { return objsLeftInBatch() > 0; } + + /** next + @return next object in the result cursor. + on an error at the remote server, you will get back: + { $err: } + if you do not want to handle that yourself, call nextSafe(). + */ + BSONObj next(); + + /** + restore an object previously returned by next() to the cursor + */ + void putBack( const BSONObj &o ) { _putBack.push( o.getOwned() ); } + + /** throws AssertionException if get back { $err : ... } */ + BSONObj nextSafe() { + BSONObj o = next(); + BSONElement e = o.firstElement(); + if( strcmp(e.fieldName(), "$err") == 0 ) { + if( logLevel >= 5 ) + log() << "nextSafe() error " << o.toString() << endl; + uassert(13106, "nextSafe(): " + o.toString(), false); + } + return o; + } + + /** peek ahead at items buffered for future next() calls. + never requests new data from the server. so peek only effective + with what is already buffered. + WARNING: no support for _putBack yet! + */ + void peek(vector&, int atMost); + + /** + iterate the rest of the cursor and return the number if items + */ + int itcount(){ + int c = 0; + while ( more() ){ + next(); + c++; + } + return c; + } + + /** cursor no longer valid -- use with tailable cursors. + note you should only rely on this once more() returns false; + 'dead' may be preset yet some data still queued and locally + available from the dbclientcursor. + */ + bool isDead() const { + return !this || cursorId == 0; + } + + bool tailable() const { + return (opts & QueryOption_CursorTailable) != 0; + } + + /** see ResultFlagType (constants.h) for flag values + mostly these flags are for internal purposes - + ResultFlag_ErrSet is the possible exception to that + */ + bool hasResultFlag( int flag ){ + _assertIfNull(); + return (resultFlags & flag) != 0; + } + + DBClientCursor( DBConnector *_connector, const string &_ns, BSONObj _query, int _nToReturn, + int _nToSkip, const BSONObj *_fieldsToReturn, int queryOptions , int bs ) : + connector(_connector), + ns(_ns), + query(_query), + nToReturn(_nToReturn), + haveLimit( _nToReturn > 0 && !(queryOptions & QueryOption_CursorTailable)), + nToSkip(_nToSkip), + fieldsToReturn(_fieldsToReturn), + opts(queryOptions), + batchSize(bs==1?2:bs), + m(new Message()), + cursorId(), + nReturned(), + pos(), + data(), + _ownCursor( true ){ + } + + DBClientCursor( DBConnector *_connector, const string &_ns, long long _cursorId, int _nToReturn, int options ) : + connector(_connector), + ns(_ns), + nToReturn( _nToReturn ), + haveLimit( _nToReturn > 0 && !(options & QueryOption_CursorTailable)), + opts( options ), + m(new Message()), + cursorId( _cursorId ), + nReturned(), + pos(), + data(), + _ownCursor( true ){ + } + + virtual ~DBClientCursor(); + + long long getCursorId() const { return cursorId; } + + /** by default we "own" the cursor and will send the server a KillCursor + message when ~DBClientCursor() is called. This function overrides that. + */ + void decouple() { _ownCursor = false; } + + void attach( AScopedConnection * conn ); + + private: + friend class DBClientBase; + friend class DBClientConnection; + bool init(); + int nextBatchSize(); + DBConnector *connector; + string ns; + BSONObj query; + int nToReturn; + bool haveLimit; + int nToSkip; + const BSONObj *fieldsToReturn; + int opts; + int batchSize; + auto_ptr m; + stack< BSONObj > _putBack; + int resultFlags; + long long cursorId; + int nReturned; + int pos; + const char *data; + void dataReceived(); + void requestMore(); + void exhaustReceiveMore(); // for exhaust + bool _ownCursor; // see decouple() + string _scopedHost; + + // Don't call from a virtual function + void _assertIfNull() const { uassert(13348, "connection died", this); } + }; + + /** iterate over objects in current batch only - will not cause a network call + */ + class DBClientCursorBatchIterator { + public: + DBClientCursorBatchIterator( DBClientCursor &c ) : _c( c ), _n() {} + bool moreInCurrentBatch() { return _c.moreInCurrentBatch(); } + BSONObj nextSafe() { + massert( 13383, "BatchIterator empty", moreInCurrentBatch() ); + ++_n; + return _c.nextSafe(); + } + int n() const { return _n; } + private: + DBClientCursor &_c; + int _n; + }; + +} // namespace mongo + +#include "undef_macros.h" diff --git a/client/distlock.cpp b/client/distlock.cpp new file mode 100644 index 0000000..c264597 --- /dev/null +++ b/client/distlock.cpp @@ -0,0 +1,225 @@ +// @file distlock.h + +/* Copyright 2009 10gen Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "pch.h" +#include "dbclient.h" +#include "distlock.h" + +namespace mongo { + + string lockPingNS = "config.lockpings"; + + ThreadLocalValue distLockIds(""); + + string getDistLockProcess(){ + static string s; + if ( s.empty() ){ + stringstream ss; + ss << getHostNameCached() << ":" << time(0) << ":" << rand(); + s = ss.str(); + } + return s; + } + + string getDistLockId(){ + string s = distLockIds.get(); + if ( s.empty() ){ + stringstream ss; + ss << getDistLockProcess() << ":" << getThreadName() << ":" << rand(); + s = ss.str(); + distLockIds.set( s ); + } + return s; + } + + void distLockPingThread( ConnectionString addr ){ + static int loops = 0; + while( ! inShutdown() ){ + try { + ScopedDbConnection conn( addr ); + + // do ping + conn->update( lockPingNS , + BSON( "_id" << getDistLockProcess() ) , + BSON( "$set" << BSON( "ping" << DATENOW ) ) , + true ); + + + // remove really old entries + BSONObjBuilder f; + f.appendDate( "$lt" , jsTime() - ( 4 * 86400 * 1000 ) ); + BSONObj r = BSON( "ping" << f.obj() ); + conn->remove( lockPingNS , r ); + + // create index so remove is fast even with a lot of servers + if ( loops++ == 0 ){ + conn->ensureIndex( lockPingNS , BSON( "ping" << 1 ) ); + } + + conn.done(); + } + catch ( std::exception& e ){ + log( LL_WARNING ) << "couldn't ping: " << e.what() << endl; + } + sleepsecs(30); + } + } + + + class DistributedLockPinger { + public: + DistributedLockPinger() + : _mutex( "DistributedLockPinger" ){ + } + + void got( const ConnectionString& conn ){ + string s = conn.toString(); + scoped_lock lk( _mutex ); + if ( _seen.count( s ) > 0 ) + return; + boost::thread t( boost::bind( &distLockPingThread , conn ) ); + _seen.insert( s ); + } + + set _seen; + mongo::mutex _mutex; + + } distLockPinger; + + DistributedLock::DistributedLock( const ConnectionString& conn , const string& name , unsigned takeoverMinutes ) + : _conn(conn),_name(name),_takeoverMinutes(takeoverMinutes){ + _id = BSON( "_id" << name ); + _ns = "config.locks"; + distLockPinger.got( conn ); + } + + + bool DistributedLock::lock_try( string why , BSONObj * other ){ + // check for recrusive + assert( getState() == 0 ); + + ScopedDbConnection conn( _conn ); + + BSONObjBuilder queryBuilder; + queryBuilder.appendElements( _id ); + queryBuilder.append( "state" , 0 ); + + { // make sure its there so we can use simple update logic below + BSONObj o = conn->findOne( _ns , _id ); + if ( o.isEmpty() ){ + try { + conn->insert( _ns , BSON( "_id" << _name << "state" << 0 << "who" << "" ) ); + } + catch ( UserException& ){ + } + } + else if ( o["state"].numberInt() > 0 ){ + BSONObj lastPing = conn->findOne( lockPingNS , o["process"].wrap( "_id" ) ); + if ( lastPing.isEmpty() ){ + // TODO: maybe this should clear, not sure yet + log() << "lastPing is empty! this could be bad: " << o << endl; + conn.done(); + return false; + } + + unsigned long long elapsed = jsTime() - lastPing["ping"].Date(); // in ms + elapsed = elapsed / ( 1000 * 60 ); // convert to minutes + + if ( elapsed <= _takeoverMinutes ){ + log(1) << "dist_lock lock failed because taken by: " << o << endl; + conn.done(); + return false; + } + + log() << "dist_lock forcefully taking over from: " << o << " elapsed minutes: " << elapsed << endl; + conn->update( _ns , _id , BSON( "$set" << BSON( "state" << 0 ) ) ); + } + else if ( o["ts"].type() ){ + queryBuilder.append( o["ts"] ); + } + } + + OID ts; + ts.init(); + + bool gotLock = false; + BSONObj now; + + BSONObj whatIWant = BSON( "$set" << BSON( "state" << 1 << + "who" << getDistLockId() << "process" << getDistLockProcess() << + "when" << DATENOW << "why" << why << "ts" << ts ) ); + try { + conn->update( _ns , queryBuilder.obj() , whatIWant ); + + BSONObj o = conn->getLastErrorDetailed(); + now = conn->findOne( _ns , _id ); + + if ( o["n"].numberInt() == 0 ){ + if ( other ) + *other = now; + gotLock = false; + } + else { + gotLock = true; + } + + } + catch ( UpdateNotTheSame& up ){ + // this means our update got through on some, but not others + + for ( unsigned i=0; ifindOne( _ns , _id ); + + if ( now.isEmpty() || now["ts"] < temp2["ts"] ){ + now = temp2.getOwned(); + } + + temp.done(); + } + + if ( now["ts"].OID() == ts ){ + gotLock = true; + conn->update( _ns , _id , whatIWant ); + } + else { + gotLock = false; + } + } + + conn.done(); + + log(1) << "dist_lock lock gotLock: " << gotLock << " now: " << now << endl; + + if ( ! gotLock ) + return false; + + _state.set( 1 ); + return true; + } + + void DistributedLock::unlock(){ + ScopedDbConnection conn( _conn ); + conn->update( _ns , _id, BSON( "$set" << BSON( "state" << 0 ) ) ); + log(1) << "dist_lock unlock: " << conn->findOne( _ns , _id ) << endl; + conn.done(); + + _state.set( 0 ); + } + + +} diff --git a/client/distlock.h b/client/distlock.h new file mode 100644 index 0000000..3a03390 --- /dev/null +++ b/client/distlock.h @@ -0,0 +1,91 @@ +// distlock.h + +/* Copyright 2009 10gen Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +/** + * distributed locking mechanism + */ + +#include "../pch.h" +#include "dbclient.h" +#include "connpool.h" +#include "redef_macros.h" +#include "syncclusterconnection.h" + +namespace mongo { + + class DistributedLock { + public: + + /** + * @param takeoverMinutes how long before we steal lock in minutes + */ + DistributedLock( const ConnectionString& conn , const string& name , unsigned takeoverMinutes = 10 ); + + int getState(){ + return _state.get(); + } + + bool isLocked(){ + return _state.get() != 0; + } + + bool lock_try( string why , BSONObj * other = 0 ); + void unlock(); + + private: + ConnectionString _conn; + string _name; + unsigned _takeoverMinutes; + + string _ns; + BSONObj _id; + + ThreadLocalValue _state; + }; + + class dist_lock_try { + public: + + dist_lock_try( DistributedLock * lock , string why ) + : _lock(lock){ + _got = _lock->lock_try( why , &_other ); + } + + ~dist_lock_try(){ + if ( _got ){ + _lock->unlock(); + } + } + + bool got() const { + return _got; + } + + BSONObj other() const { + return _other; + } + + private: + DistributedLock * _lock; + bool _got; + BSONObj _other; + + }; + +} + diff --git a/client/distlock_test.cpp b/client/distlock_test.cpp new file mode 100644 index 0000000..0879b6e --- /dev/null +++ b/client/distlock_test.cpp @@ -0,0 +1,80 @@ +// distlock_test.h + +/* Copyright 2009 10gen Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "../pch.h" +#include "dbclient.h" +#include "distlock.h" +#include "../db/commands.h" + +namespace mongo { + + class TestDistLockWithSync : public Command { + public: + TestDistLockWithSync() : Command( "_testDistLockWithSyncCluster" ){} + virtual void help( stringstream& help ) const { + help << "should not be calling this directly" << endl; + } + + virtual bool slaveOk() const { return false; } + virtual bool adminOnly() const { return true; } + virtual LockType locktype() const { return NONE; } + + static void runThread(){ + for ( int i=0; i<1000; i++ ){ + if ( current->lock_try( "test" ) ){ + gotit++; + for ( int j=0; j<2000; j++ ){ + count++; + } + current->unlock(); + } + } + } + + bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){ + DistributedLock lk( ConnectionString( cmdObj["host"].String() , ConnectionString::SYNC ), "testdistlockwithsync" ); + current = &lk; + count = 0; + gotit = 0; + + vector > l; + for ( int i=0; i<4; i++ ){ + l.push_back( shared_ptr( new boost::thread( runThread ) ) ); + } + + for ( unsigned i=0; ijoin(); + + result.append( "count" , count ); + result.append( "gotit" , gotit ); + current = 0; + return count == gotit * 2000; + } + + static DistributedLock * current; + static int count; + static int gotit; + + } testDistLockWithSyncCmd; + + + DistributedLock * TestDistLockWithSync::current; + int TestDistLockWithSync::count; + int TestDistLockWithSync::gotit; + + +} diff --git a/client/examples/clientTest.cpp b/client/examples/clientTest.cpp index bbb82f6..83a556a 100644 --- a/client/examples/clientTest.cpp +++ b/client/examples/clientTest.cpp @@ -137,10 +137,14 @@ int main( int argc, const char **argv ) { assert( conn.getLastError() == "" ); // nonexistent index test - assert( conn.findOne(ns, Query("{name:\"eliot\"}").hint("{foo:1}")).hasElement("$err") ); - assert( conn.getLastError() == "bad hint" ); - conn.resetError(); - assert( conn.getLastError() == "" ); + bool asserted = false; + try { + conn.findOne(ns, Query("{name:\"eliot\"}").hint("{foo:1}")); + } + catch ( ... ){ + asserted = true; + } + assert( asserted ); //existing index assert( conn.findOne(ns, Query("{name:'eliot'}").hint("{name:1}")).hasElement("name") ); @@ -176,8 +180,9 @@ int main( int argc, const char **argv ) { } BSONObj found = conn.findOne( tsns , mongo::BSONObj() ); + cout << "old: " << out << "\nnew: " << found << endl; assert( ( oldTime < found["ts"].timestampTime() ) || - ( oldInc + 1 == found["ts"].timestampInc() ) ); + ( oldTime == found["ts"].timestampTime() && oldInc < found["ts"].timestampInc() ) ); } @@ -185,9 +190,9 @@ int main( int argc, const char **argv ) { assert( conn.getLastError().empty() ); BufBuilder b; - b.append( (int)0 ); // reserved - b.append( (int)-1 ); // invalid # of cursors triggers exception - b.append( (int)-1 ); // bogus cursor id + b.appendNum( (int)0 ); // reserved + b.appendNum( (int)-1 ); // invalid # of cursors triggers exception + b.appendNum( (int)-1 ); // bogus cursor id Message m; m.setData( dbKillCursors, b.buf(), b.len() ); diff --git a/client/examples/tail.cpp b/client/examples/tail.cpp index e844b32..3738b4f 100644 --- a/client/examples/tail.cpp +++ b/client/examples/tail.cpp @@ -22,34 +22,25 @@ using namespace mongo; -void foo() { } +void tail(DBClientBase& conn, const char *ns) { + BSONElement lastId = minKey.firstElement(); + Query query = Query(); -/* "tail" the specified namespace, outputting elements as they are added. - _id values must be inserted in increasing order for this to work. (Some other - field could also be used.) + auto_ptr c = + conn.query(ns, query, 0, 0, 0, QueryOption_CursorTailable); - Note: one could use a capped collection and $natural order to do something - similar, using sort({$natural:1}), and then not need to worry about - _id's being in order. -*/ -void tail(DBClientBase& conn, const char *ns) { - conn.ensureIndex(ns, fromjson("{_id:1}")); - BSONElement lastId; - Query query = Query().sort("_id"); while( 1 ) { - auto_ptr c = conn.query(ns, query, 0, 0, 0, Option_CursorTailable); - while( 1 ) { - if( !c->more() ) { - if( c->isDead() ) { - // we need to requery - break; - } - sleepsecs(1); + if( !c->more() ) { + if( c->isDead() ) { + break; // we need to requery + } + + // all data (so far) exhausted, wait for more + sleepsecs(1); + continue; } BSONObj o = c->next(); lastId = o["_id"]; cout << o.toString() << endl; - } - query = QUERY( "_id" << GT << lastId ).sort("_id"); } } diff --git a/client/gridfs.cpp b/client/gridfs.cpp index 892ec6e..b2ae478 100644 --- a/client/gridfs.cpp +++ b/client/gridfs.cpp @@ -15,7 +15,7 @@ * limitations under the License. */ -#include "../stdafx.h" +#include "pch.h" #include #include @@ -34,15 +34,15 @@ namespace mongo { const unsigned DEFAULT_CHUNK_SIZE = 256 * 1024; - Chunk::Chunk( BSONObj o ){ + GridFSChunk::GridFSChunk( BSONObj o ){ _data = o; } - Chunk::Chunk( BSONObj fileObject , int chunkNumber , const char * data , int len ){ + GridFSChunk::GridFSChunk( BSONObj fileObject , int chunkNumber , const char * data , int len ){ BSONObjBuilder b; b.appendAs( fileObject["_id"] , "files_id" ); b.append( "n" , chunkNumber ); - b.appendBinDataArray( "data" , data , len ); + b.appendBinData( "data" , len, BinDataGeneral, data ); _data = b.obj(); } @@ -50,7 +50,7 @@ namespace mongo { GridFS::GridFS( DBClientBase& client , const string& dbName , const string& prefix ) : _client( client ) , _dbName( dbName ) , _prefix( prefix ){ _filesNS = dbName + "." + prefix + ".files"; _chunksNS = dbName + "." + prefix + ".chunks"; - + _chunkSize = DEFAULT_CHUNK_SIZE; client.ensureIndex( _filesNS , BSON( "filename" << 1 ) ); client.ensureIndex( _chunksNS , BSON( "files_id" << 1 << "n" << 1 ) ); @@ -60,6 +60,11 @@ namespace mongo { } + void GridFS::setChunkSize(unsigned int size) { + massert( 13296 , "invalid chunk size is specified", (size == 0)); + _chunkSize = size; + } + BSONObj GridFS::storeFile( const char* data , size_t length , const string& remoteName , const string& contentType){ massert( 10279 , "large files not yet implemented", length <= 0xffffffff); char const * const end = data + length; @@ -70,8 +75,8 @@ namespace mongo { int chunkNumber = 0; while (data < end){ - int chunkLen = MIN(DEFAULT_CHUNK_SIZE, (unsigned)(end-data)); - Chunk c(idObj, chunkNumber, data, chunkLen); + int chunkLen = MIN(_chunkSize, (unsigned)(end-data)); + GridFSChunk c(idObj, chunkNumber, data, chunkLen); _client.insert( _chunksNS.c_str() , c._data ); chunkNumber++; @@ -99,22 +104,24 @@ namespace mongo { int chunkNumber = 0; gridfs_offset length = 0; while (!feof(fd)){ - boost::scoped_arraybuf (new char[DEFAULT_CHUNK_SIZE]); - char* bufPos = buf.get(); + //boost::scoped_arraybuf (new char[_chunkSize+1]); + char * buf = new char[_chunkSize+1]; + char* bufPos = buf;//.get(); unsigned int chunkLen = 0; // how much in the chunk now - while(chunkLen != DEFAULT_CHUNK_SIZE && !feof(fd)){ - int readLen = fread(bufPos, 1, DEFAULT_CHUNK_SIZE - chunkLen, fd); + while(chunkLen != _chunkSize && !feof(fd)){ + int readLen = fread(bufPos, 1, _chunkSize - chunkLen, fd); chunkLen += readLen; bufPos += readLen; - assert(chunkLen <= DEFAULT_CHUNK_SIZE); + assert(chunkLen <= _chunkSize); } - Chunk c(idObj, chunkNumber, buf.get(), chunkLen); + GridFSChunk c(idObj, chunkNumber, buf, chunkLen); _client.insert( _chunksNS.c_str() , c._data ); length += chunkLen; chunkNumber++; + delete[] buf; } if (fd != stdin) @@ -125,7 +132,7 @@ namespace mongo { return insertFile((remoteName.empty() ? fileName : remoteName), id, length, contentType); } - BSONObj GridFS::insertFile(const string& name, const OID& id, unsigned length, const string& contentType){ + BSONObj GridFS::insertFile(const string& name, const OID& id, gridfs_offset length, const string& contentType){ BSONObj res; if ( ! _client.runCommand( _dbName.c_str() , BSON( "filemd5" << id << "root" << _prefix ) , res ) ) @@ -134,12 +141,17 @@ namespace mongo { BSONObjBuilder file; file << "_id" << id << "filename" << name - << "length" << (unsigned) length - << "chunkSize" << DEFAULT_CHUNK_SIZE + << "chunkSize" << _chunkSize << "uploadDate" << DATENOW << "md5" << res["md5"] ; + if (length < 1024*1024*1024){ // 2^30 + file << "length" << (int) length; + }else{ + file << "length" << (long long) length; + } + if (!contentType.empty()) file << "contentType" << contentType; @@ -190,7 +202,7 @@ namespace mongo { return meta_element.embeddedObject(); } - Chunk GridFile::getChunk( int n ){ + GridFSChunk GridFile::getChunk( int n ){ _exists(); BSONObjBuilder b; b.appendAs( _obj["_id"] , "files_id" ); @@ -198,7 +210,7 @@ namespace mongo { BSONObj o = _grid->_client.findOne( _grid->_chunksNS.c_str() , b.obj() ); uassert( 10014 , "chunk is empty!" , ! o.isEmpty() ); - return Chunk(o); + return GridFSChunk(o); } gridfs_offset GridFile::write( ostream & out ){ @@ -207,7 +219,7 @@ namespace mongo { const int num = getNumChunks(); for ( int i=0; iupdate( getNS() , q , o ); + conn->update( getNS() , q , o , true ); } @@ -94,4 +123,16 @@ namespace mongo { throw UserException( 9003 , (string)"error on Model::save: " + errmsg ); } + BSONObj Model::toObject(){ + BSONObjBuilder b; + serialize( b ); + return b.obj(); + } + + void Model::append( const char * name , BSONObjBuilder& b ){ + BSONObjBuilder bb( b.subobjStart( name ) ); + serialize( bb ); + bb.done(); + } + } // namespace mongo diff --git a/client/model.h b/client/model.h index f3a63ad..108efc0 100644 --- a/client/model.h +++ b/client/model.h @@ -18,6 +18,7 @@ #pragma once #include "dbclient.h" +#include "redef_macros.h" namespace mongo { @@ -40,7 +41,9 @@ namespace mongo { virtual const char * getNS() = 0; virtual void serialize(BSONObjBuilder& to) = 0; virtual void unserialize(const BSONObj& from) = 0; - + virtual BSONObj toObject(); + virtual void append( const char * name , BSONObjBuilder& b ); + virtual string modelServer() = 0; /** Load a single object. @@ -55,3 +58,5 @@ namespace mongo { }; } // namespace mongo + +#include "undef_macros.h" diff --git a/client/parallel.cpp b/client/parallel.cpp index bd29013..eeadb89 100644 --- a/client/parallel.cpp +++ b/client/parallel.cpp @@ -16,12 +16,13 @@ */ -#include "stdafx.h" +#include "pch.h" #include "parallel.h" #include "connpool.h" #include "../db/queryutil.h" #include "../db/dbmessage.h" #include "../s/util.h" +#include "../s/shard.h" namespace mongo { @@ -31,8 +32,13 @@ namespace mongo { _ns = q.ns; _query = q.query.copy(); _options = q.queryOptions; - _fields = q.fields; + _fields = q.fields.copy(); + _batchSize = q.ntoreturn; + if ( _batchSize == 1 ) + _batchSize = 2; + _done = false; + _didInit = false; } ClusteredCursor::ClusteredCursor( const string& ns , const BSONObj& q , int options , const BSONObj& fields ){ @@ -40,37 +46,84 @@ namespace mongo { _query = q.getOwned(); _options = options; _fields = fields.getOwned(); + _batchSize = 0; + _done = false; + _didInit = false; } ClusteredCursor::~ClusteredCursor(){ _done = true; // just in case } + + void ClusteredCursor::init(){ + if ( _didInit ) + return; + _didInit = true; + _init(); + } - auto_ptr ClusteredCursor::query( const string& server , int num , BSONObj extra ){ + auto_ptr ClusteredCursor::query( const string& server , int num , BSONObj extra , int skipLeft ){ uassert( 10017 , "cursor already done" , ! _done ); + assert( _didInit ); BSONObj q = _query; if ( ! extra.isEmpty() ){ q = concatQuery( q , extra ); } - ScopedDbConnection conn( server ); - checkShardVersion( conn.conn() , _ns ); + ShardConnection conn( server , _ns ); + + if ( conn.setVersion() ){ + conn.done(); + throw StaleConfigException( _ns , "ClusteredCursor::query ShardConnection had to change" , true ); + } + + if ( logLevel >= 5 ){ + log(5) << "ClusteredCursor::query (" << type() << ") server:" << server + << " ns:" << _ns << " query:" << q << " num:" << num + << " _fields:" << _fields << " options: " << _options << endl; + } + + auto_ptr cursor = + conn->query( _ns , q , num , 0 , ( _fields.isEmpty() ? 0 : &_fields ) , _options , _batchSize == 0 ? 0 : _batchSize + skipLeft ); - log(5) << "ClusteredCursor::query server:" << server << " ns:" << _ns << " query:" << q << " num:" << num << " _fields:" << _fields << " options: " << _options << endl; - auto_ptr cursor = conn->query( _ns.c_str() , q , num , 0 , ( _fields.isEmpty() ? 0 : &_fields ) , _options ); - if ( cursor->hasResultFlag( QueryResult::ResultFlag_ShardConfigStale ) ) + assert( cursor.get() ); + + if ( cursor->hasResultFlag( ResultFlag_ShardConfigStale ) ){ + conn.done(); throw StaleConfigException( _ns , "ClusteredCursor::query" ); + } + + if ( cursor->hasResultFlag( ResultFlag_ErrSet ) ){ + conn.done(); + BSONObj o = cursor->next(); + throw UserException( o["code"].numberInt() , o["$err"].String() ); + } + + + cursor->attach( &conn ); conn.done(); return cursor; } + BSONObj ClusteredCursor::explain( const string& server , BSONObj extra ){ + BSONObj q = _query; + if ( ! extra.isEmpty() ){ + q = concatQuery( q , extra ); + } + + ShardConnection conn( server , _ns ); + BSONObj o = conn->findOne( _ns , Query( q ).explain() ); + conn.done(); + return o; + } + BSONObj ClusteredCursor::concatQuery( const BSONObj& query , const BSONObj& extraFilter ){ if ( ! query.hasField( "query" ) ) return _concatFilter( query , extraFilter ); - + BSONObjBuilder b; BSONObjIterator i( query ); while ( i.more() ){ @@ -94,6 +147,112 @@ namespace mongo { // TODO: should do some simplification here if possibl ideally } + BSONObj ClusteredCursor::explain(){ + BSONObjBuilder b; + b.append( "clusteredType" , type() ); + + long long nscanned = 0; + long long nscannedObjects = 0; + long long n = 0; + long long millis = 0; + double numExplains = 0; + + map > out; + { + _explain( out ); + + BSONObjBuilder x( b.subobjStart( "shards" ) ); + for ( map >::iterator i=out.begin(); i!=out.end(); ++i ){ + string shard = i->first; + list l = i->second; + BSONArrayBuilder y( x.subarrayStart( shard.c_str() ) ); + for ( list::iterator j=l.begin(); j!=l.end(); ++j ){ + BSONObj temp = *j; + y.append( temp ); + + nscanned += temp["nscanned"].numberLong(); + nscannedObjects += temp["nscannedObjects"].numberLong(); + n += temp["n"].numberLong(); + millis += temp["millis"].numberLong(); + numExplains++; + } + y.done(); + } + x.done(); + } + + b.appendNumber( "nscanned" , nscanned ); + b.appendNumber( "nscannedObjects" , nscannedObjects ); + b.appendNumber( "n" , n ); + b.appendNumber( "millisTotal" , millis ); + b.append( "millisAvg" , (int)((double)millis / numExplains ) ); + b.append( "numQueries" , (int)numExplains ); + b.append( "numShards" , (int)out.size() ); + + return b.obj(); + } + + // -------- FilteringClientCursor ----------- + FilteringClientCursor::FilteringClientCursor( const BSONObj filter ) + : _matcher( filter ) , _done( true ){ + } + + FilteringClientCursor::FilteringClientCursor( auto_ptr cursor , const BSONObj filter ) + : _matcher( filter ) , _cursor( cursor ) , _done( cursor.get() == 0 ){ + } + + FilteringClientCursor::~FilteringClientCursor(){ + } + + void FilteringClientCursor::reset( auto_ptr cursor ){ + _cursor = cursor; + _next = BSONObj(); + _done = _cursor.get() == 0; + } + + bool FilteringClientCursor::more(){ + if ( ! _next.isEmpty() ) + return true; + + if ( _done ) + return false; + + _advance(); + return ! _next.isEmpty(); + } + + BSONObj FilteringClientCursor::next(){ + assert( ! _next.isEmpty() ); + assert( ! _done ); + + BSONObj ret = _next; + _next = BSONObj(); + _advance(); + return ret; + } + + BSONObj FilteringClientCursor::peek(){ + if ( _next.isEmpty() ) + _advance(); + return _next; + } + + void FilteringClientCursor::_advance(){ + assert( _next.isEmpty() ); + if ( ! _cursor.get() || _done ) + return; + + while ( _cursor->more() ){ + _next = _cursor->next(); + if ( _matcher.matches( _next ) ){ + if ( ! _cursor->moreInCurrentBatch() ) + _next = _next.getOwned(); + return; + } + _next = BSONObj(); + } + _done = true; + } // -------- SerialServerClusteredCursor ----------- @@ -107,10 +266,21 @@ namespace mongo { sort( _servers.rbegin() , _servers.rend() ); _serverIndex = 0; + + _needToSkip = q.ntoskip; } bool SerialServerClusteredCursor::more(){ - if ( _current.get() && _current->more() ) + + // TODO: optimize this by sending on first query and then back counting + // tricky in case where 1st server doesn't have any after + // need it to send n skipped + while ( _needToSkip > 0 && _current.more() ){ + _current.next(); + _needToSkip--; + } + + if ( _current.more() ) return true; if ( _serverIndex >= _servers.size() ){ @@ -119,17 +289,21 @@ namespace mongo { ServerAndQuery& sq = _servers[_serverIndex++]; - _current = query( sq._server , 0 , sq._extra ); - if ( _current->more() ) - return true; - - // this sq has nothing, so keep looking + _current.reset( query( sq._server , 0 , sq._extra ) ); return more(); } BSONObj SerialServerClusteredCursor::next(){ uassert( 10018 , "no more items" , more() ); - return _current->next(); + return _current.next(); + } + + void SerialServerClusteredCursor::_explain( map< string,list >& out ){ + for ( unsigned i=0; i<_servers.size(); i++ ){ + ServerAndQuery& sq = _servers[i]; + list & l = out[sq._server]; + l.push_back( explain( sq._server , sq._extra ) ); + } } // -------- ParallelSortClusteredCursor ----------- @@ -138,7 +312,8 @@ namespace mongo { const BSONObj& sortKey ) : ClusteredCursor( q ) , _servers( servers ){ _sortKey = sortKey.getOwned(); - _init(); + _needToSkip = q.ntoskip; + _finishCons(); } ParallelSortClusteredCursor::ParallelSortClusteredCursor( const set& servers , const string& ns , @@ -146,85 +321,123 @@ namespace mongo { int options , const BSONObj& fields ) : ClusteredCursor( ns , q.obj , options , fields ) , _servers( servers ){ _sortKey = q.getSort().copy(); - _init(); + _needToSkip = 0; + _finishCons(); } - void ParallelSortClusteredCursor::_init(){ + void ParallelSortClusteredCursor::_finishCons(){ _numServers = _servers.size(); - _cursors = new auto_ptr[_numServers]; - _nexts = new BSONObj[_numServers]; + _cursors = 0; + + if ( ! _sortKey.isEmpty() && ! _fields.isEmpty() ){ + // we need to make sure the sort key is in the project + bool isNegative = false; + BSONObjBuilder b; + { + BSONObjIterator i( _fields ); + while ( i.more() ){ + BSONElement e = i.next(); + b.append( e ); + if ( ! e.trueValue() ) + isNegative = true; + } + } + + { + BSONObjIterator i( _sortKey ); + while ( i.more() ){ + BSONElement e = i.next(); + BSONElement f = _fields.getField( e.fieldName() ); + if ( isNegative ){ + uassert( 13431 , "have to have sort key in projection and removing it" , f.eoo() ); + } + else if ( f.eoo() ){ + // add to projection + b.append( e ); + } + } + } + + _fields = b.obj(); + } + } + + void ParallelSortClusteredCursor::_init(){ + assert( ! _cursors ); + _cursors = new FilteringClientCursor[_numServers]; // TODO: parellize int num = 0; - for ( set::iterator i = _servers.begin(); i!=_servers.end(); i++ ){ + for ( set::iterator i = _servers.begin(); i!=_servers.end(); ++i ){ const ServerAndQuery& sq = *i; - _cursors[num++] = query( sq._server , 0 , sq._extra ); + _cursors[num++].reset( query( sq._server , 0 , sq._extra , _needToSkip ) ); } } ParallelSortClusteredCursor::~ParallelSortClusteredCursor(){ delete [] _cursors; - delete [] _nexts; + _cursors = 0; } bool ParallelSortClusteredCursor::more(){ - for ( int i=0; i<_numServers; i++ ){ - if ( ! _nexts[i].isEmpty() ) - return true; - if ( _cursors[i].get() && _cursors[i]->more() ) + if ( _needToSkip > 0 ){ + int n = _needToSkip; + _needToSkip = 0; + + while ( n > 0 && more() ){ + BSONObj x = next(); + n--; + } + + _needToSkip = n; + } + + for ( int i=0; i<_numServers; i++ ){ + if ( _cursors[i].more() ) return true; } return false; } BSONObj ParallelSortClusteredCursor::next(){ - advance(); - BSONObj best = BSONObj(); int bestFrom = -1; for ( int i=0; i<_numServers; i++){ - if ( _nexts[i].isEmpty() ) + if ( ! _cursors[i].more() ) continue; + + BSONObj me = _cursors[i].peek(); if ( best.isEmpty() ){ - best = _nexts[i]; + best = me; bestFrom = i; continue; } - int comp = best.woSortOrder( _nexts[i] , _sortKey ); + int comp = best.woSortOrder( me , _sortKey , true ); if ( comp < 0 ) continue; - best = _nexts[i]; + best = me; bestFrom = i; } - + uassert( 10019 , "no more elements" , ! best.isEmpty() ); - _nexts[bestFrom] = BSONObj(); + _cursors[bestFrom].next(); return best; } - void ParallelSortClusteredCursor::advance(){ - for ( int i=0; i<_numServers; i++ ){ - - if ( ! _nexts[i].isEmpty() ){ - // already have a good object there - continue; - } - - if ( ! _cursors[i]->more() ){ - // cursor is dead, oh well - continue; - } - - _nexts[i] = _cursors[i]->next(); + void ParallelSortClusteredCursor::_explain( map< string,list >& out ){ + for ( set::iterator i=_servers.begin(); i!=_servers.end(); ++i ){ + const ServerAndQuery& sq = *i; + list & l = out[sq._server]; + l.push_back( explain( sq._server , sq._extra ) ); } - + } // ----------------- @@ -252,6 +465,7 @@ namespace mongo { ScopedDbConnection conn( res->_server ); res->_ok = conn->runCommand( res->_db , res->_cmd , res->_res ); res->_done = true; + conn.done(); } shared_ptr Future::spawnCommand( const string& server , const string& db , const BSONObj& cmd ){ diff --git a/client/parallel.h b/client/parallel.h index 88864ae..b60190a 100644 --- a/client/parallel.h +++ b/client/parallel.h @@ -16,15 +16,52 @@ */ /** - tools for wokring in parallel/sharded/clustered environment + tools for working in parallel/sharded/clustered environment */ -#include "../stdafx.h" +#include "../pch.h" #include "dbclient.h" +#include "redef_macros.h" #include "../db/dbmessage.h" +#include "../db/matcher.h" namespace mongo { + /** + * holder for a server address and a query to run + */ + class ServerAndQuery { + public: + ServerAndQuery( const string& server , BSONObj extra = BSONObj() , BSONObj orderObject = BSONObj() ) : + _server( server ) , _extra( extra.getOwned() ) , _orderObject( orderObject.getOwned() ){ + } + + bool operator<( const ServerAndQuery& other ) const{ + if ( ! _orderObject.isEmpty() ) + return _orderObject.woCompare( other._orderObject ) < 0; + + if ( _server < other._server ) + return true; + if ( other._server > _server ) + return false; + return _extra.woCompare( other._extra ) < 0; + } + + string toString() const { + StringBuilder ss; + ss << "server:" << _server << " _extra:" << _extra.toString() << " _orderObject:" << _orderObject.toString(); + return ss.str(); + } + + operator string() const { + return toString(); + } + + string _server; + BSONObj _extra; + BSONObj _orderObject; + }; + /** * this is a cursor that works over a set of servers * can be used in serial/paralellel as controlled by sub classes @@ -34,7 +71,10 @@ namespace mongo { ClusteredCursor( QueryMessage& q ); ClusteredCursor( const string& ns , const BSONObj& q , int options=0 , const BSONObj& fields=BSONObj() ); virtual ~ClusteredCursor(); - + + /** call before using */ + void init(); + virtual bool more() = 0; virtual BSONObj next() = 0; @@ -42,53 +82,105 @@ namespace mongo { virtual string type() const = 0; + virtual BSONObj explain(); + protected: - auto_ptr query( const string& server , int num = 0 , BSONObj extraFilter = BSONObj() ); + + virtual void _init() = 0; + auto_ptr query( const string& server , int num = 0 , BSONObj extraFilter = BSONObj() , int skipLeft = 0 ); + BSONObj explain( const string& server , BSONObj extraFilter = BSONObj() ); + static BSONObj _concatFilter( const BSONObj& filter , const BSONObj& extraFilter ); + virtual void _explain( map< string,list >& out ) = 0; + string _ns; BSONObj _query; int _options; BSONObj _fields; + int _batchSize; + + bool _didInit; bool _done; }; - /** - * holder for a server address and a query to run - */ - class ServerAndQuery { + class FilteringClientCursor { public: - ServerAndQuery( const string& server , BSONObj extra = BSONObj() , BSONObj orderObject = BSONObj() ) : - _server( server ) , _extra( extra.getOwned() ) , _orderObject( orderObject.getOwned() ){ + FilteringClientCursor( const BSONObj filter = BSONObj() ); + FilteringClientCursor( auto_ptr cursor , const BSONObj filter = BSONObj() ); + ~FilteringClientCursor(); + + void reset( auto_ptr cursor ); + + bool more(); + BSONObj next(); + + BSONObj peek(); + private: + void _advance(); + + Matcher _matcher; + auto_ptr _cursor; + + BSONObj _next; + bool _done; + }; + + + class Servers { + public: + Servers(){ + } + + void add( const ServerAndQuery& s ){ + add( s._server , s._extra ); + } + + void add( const string& server , const BSONObj& filter ){ + vector& mine = _filters[server]; + mine.push_back( filter.getOwned() ); } + + // TOOO: pick a less horrible name + class View { + View( const Servers* s ){ + for ( map >::const_iterator i=s->_filters.begin(); i!=s->_filters.end(); ++i ){ + _servers.push_back( i->first ); + _filters.push_back( i->second ); + } + } + public: + int size() const { + return _servers.size(); + } - bool operator<( const ServerAndQuery& other ) const{ - if ( ! _orderObject.isEmpty() ) - return _orderObject.woCompare( other._orderObject ) < 0; + string getServer( int n ) const { + return _servers[n]; + } + + vector getFilter( int n ) const { + return _filters[ n ]; + } - if ( _server < other._server ) - return true; - if ( other._server > _server ) - return false; - return _extra.woCompare( other._extra ) < 0; - } + private: + vector _servers; + vector< vector > _filters; - string toString() const { - StringBuilder ss; - ss << "server:" << _server << " _extra:" << _extra << " _orderObject:" << _orderObject; - return ss.str(); - } + friend class Servers; + }; - operator string() const { - return toString(); + View view() const { + return View( this ); } + - string _server; - BSONObj _extra; - BSONObj _orderObject; + private: + map > _filters; + + friend class View; }; @@ -102,11 +194,18 @@ namespace mongo { virtual bool more(); virtual BSONObj next(); virtual string type() const { return "SerialServer"; } - private: + + protected: + virtual void _explain( map< string,list >& out ); + + void _init(){} + vector _servers; unsigned _serverIndex; - auto_ptr _current; + FilteringClientCursor _current; + + int _needToSkip; }; @@ -123,17 +222,18 @@ namespace mongo { virtual bool more(); virtual BSONObj next(); virtual string type() const { return "ParallelSort"; } - private: + protected: + void _finishCons(); void _init(); - - void advance(); + + virtual void _explain( map< string,list >& out ); int _numServers; set _servers; BSONObj _sortKey; - - auto_ptr * _cursors; - BSONObj * _nexts; + + FilteringClientCursor * _cursors; + int _needToSkip; }; /** @@ -193,3 +293,5 @@ namespace mongo { } + +#include "undef_macros.h" diff --git a/client/redef_macros.h b/client/redef_macros.h new file mode 100644 index 0000000..dd2e66f --- /dev/null +++ b/client/redef_macros.h @@ -0,0 +1,55 @@ +/** @file redef_macros.h - redefine macros from undef_macros.h */ + +/* Copyright 2009 10gen Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// If you define a new global un-prefixed macro, please add it here and in undef_macros + +// #pragma once // this file is intended to be processed multiple times + +#if defined(MONGO_MACROS_CLEANED) + +// util/allocator.h +#define malloc MONGO_malloc +#define realloc MONGO_realloc + +// util/assert_util.h +#define assert MONGO_assert +#define dassert MONGO_dassert +#define wassert MONGO_wassert +#define massert MONGO_massert +#define uassert MONGO_uassert +#define BOOST_CHECK_EXCEPTION MONGO_BOOST_CHECK_EXCEPTION +#define DESTRUCTOR_GUARD MONGO_DESTRUCTOR_GUARD + +// util/goodies.h +#define PRINT MONGO_PRINT +#define PRINTFL MONGO_PRINTFL +#define asctime MONGO_asctime +#define gmtime MONGO_gmtime +#define localtime MONGO_localtime +#define ctime MONGO_ctime + +// util/debug_util.h +#define DEV MONGO_DEV +#define DEBUGGING MONGO_DEBUGGING +#define SOMETIMES MONGO_SOMETIMES +#define OCCASIONALLY MONGO_OCCASIONALLY +#define RARELY MONGO_RARELY +#define ONCE MONGO_ONCE + +#undef MONGO_MACROS_CLEANED +#endif + diff --git a/client/syncclusterconnection.cpp b/client/syncclusterconnection.cpp index 0a8fc79..5324b6c 100644 --- a/client/syncclusterconnection.cpp +++ b/client/syncclusterconnection.cpp @@ -16,15 +16,29 @@ */ -#include "stdafx.h" +#include "pch.h" #include "syncclusterconnection.h" #include "../db/dbmessage.h" // error codes 8000-8009 namespace mongo { + + SyncClusterConnection::SyncClusterConnection( const list & L) : _mutex("SynClusterConnection") { + { + stringstream s; + int n=0; + for( list::const_iterator i = L.begin(); i != L.end(); i++ ) { + if( ++n > 1 ) s << ','; + s << i->toString(); + } + _address = s.str(); + } + for( list::const_iterator i = L.begin(); i != L.end(); i++ ) + _connect( i->toString() ); + } - SyncClusterConnection::SyncClusterConnection( string commaSeperated ){ + SyncClusterConnection::SyncClusterConnection( string commaSeperated ) : _mutex("SyncClusterConnection") { _address = commaSeperated; string::size_type idx; while ( ( idx = commaSeperated.find( ',' ) ) != string::npos ){ @@ -36,7 +50,7 @@ namespace mongo { uassert( 8004 , "SyncClusterConnection needs 3 servers" , _conns.size() == 3 ); } - SyncClusterConnection::SyncClusterConnection( string a , string b , string c ){ + SyncClusterConnection::SyncClusterConnection( string a , string b , string c ) : _mutex("SyncClusterConnection") { _address = a + "," + b + "," + c; // connect to all even if not working _connect( a ); @@ -44,7 +58,7 @@ namespace mongo { _connect( c ); } - SyncClusterConnection::SyncClusterConnection( SyncClusterConnection& prev ){ + SyncClusterConnection::SyncClusterConnection( SyncClusterConnection& prev ) : _mutex("SyncClusterConnection") { assert(0); } @@ -55,6 +69,7 @@ namespace mongo { } bool SyncClusterConnection::prepare( string& errmsg ){ + _lastErrors.clear(); return fsync( errmsg ); } @@ -79,7 +94,7 @@ namespace mongo { } void SyncClusterConnection::_checkLast(){ - vector all; + _lastErrors.clear(); vector errors; for ( size_t i=0; i<_conns.size(); i++ ){ @@ -95,17 +110,17 @@ namespace mongo { catch ( ... ){ err += "unknown failure"; } - all.push_back( res ); + _lastErrors.push_back( res.getOwned() ); errors.push_back( err ); } - - assert( all.size() == errors.size() && all.size() == _conns.size() ); + + assert( _lastErrors.size() == errors.size() && _lastErrors.size() == _conns.size() ); stringstream err; bool ok = true; for ( size_t i = 0; i<_conns.size(); i++ ){ - BSONObj res = all[i]; + BSONObj res = _lastErrors[i]; if ( res["ok"].trueValue() && res["fsyncFiles"].numberInt() > 0 ) continue; ok = false; @@ -117,35 +132,71 @@ namespace mongo { throw UserException( 8001 , (string)"SyncClusterConnection write op failed: " + err.str() ); } + BSONObj SyncClusterConnection::getLastErrorDetailed(){ + if ( _lastErrors.size() ) + return _lastErrors[0]; + return DBClientBase::getLastErrorDetailed(); + } + void SyncClusterConnection::_connect( string host ){ log() << "SyncClusterConnection connecting to [" << host << "]" << endl; DBClientConnection * c = new DBClientConnection( true ); string errmsg; if ( ! c->connect( host , errmsg ) ) log() << "SyncClusterConnection connect fail to: " << host << " errmsg: " << errmsg << endl; + _connAddresses.push_back( host ); _conns.push_back( c ); } - auto_ptr SyncClusterConnection::query(const string &ns, Query query, int nToReturn, int nToSkip, - const BSONObj *fieldsToReturn, int queryOptions, int batchSize ){ + bool SyncClusterConnection::callRead( Message& toSend , Message& response ){ + // TODO: need to save state of which one to go back to somehow... + return _conns[0]->callRead( toSend , response ); + } + BSONObj SyncClusterConnection::findOne(const string &ns, const Query& query, const BSONObj *fieldsToReturn, int queryOptions) { + if ( ns.find( ".$cmd" ) != string::npos ){ string cmdName = query.obj.firstElement().fieldName(); - int lockType = 0; - - map::iterator i = _lockTypes.find( cmdName ); - if ( i == _lockTypes.end() ){ - BSONObj info; - uassert( 13053 , "help failed" , _commandOnActive( "admin" , BSON( cmdName << "1" << "help" << 1 ) , info ) ); - lockType = info["lockType"].numberInt(); - _lockTypes[cmdName] = lockType; - } - else { - lockType = i->second; + int lockType = _lockType( cmdName ); + + if ( lockType > 0 ){ // write $cmd + string errmsg; + if ( ! prepare( errmsg ) ) + throw UserException( 13104 , (string)"SyncClusterConnection::findOne prepare failed: " + errmsg ); + + vector all; + for ( size_t i=0; i<_conns.size(); i++ ){ + all.push_back( _conns[i]->findOne( ns , query , 0 , queryOptions ).getOwned() ); + } + + _checkLast(); + + for ( size_t i=0; itoString(); + throw UserException( 13105 , ss.str() ); + } + + return all[0]; } - - uassert( 13054 , (string)"write $cmd not supported in SyncClusterConnection: " + cmdName , lockType <= 0 ); + } + + return DBClientBase::findOne( ns , query , fieldsToReturn , queryOptions ); + } + + + auto_ptr SyncClusterConnection::query(const string &ns, Query query, int nToReturn, int nToSkip, + const BSONObj *fieldsToReturn, int queryOptions, int batchSize ){ + _lastErrors.clear(); + if ( ns.find( ".$cmd" ) != string::npos ){ + string cmdName = query.obj.firstElement().fieldName(); + int lockType = _lockType( cmdName ); + uassert( 13054 , (string)"write $cmd not supported in SyncClusterConnection::query for:" + cmdName , lockType <= 0 ); } return _queryOnActive( ns , query , nToReturn , nToSkip , fieldsToReturn , queryOptions , batchSize ); @@ -185,6 +236,10 @@ namespace mongo { } void SyncClusterConnection::insert( const string &ns, BSONObj obj ){ + + uassert( 13119 , (string)"SyncClusterConnection::insert obj has to have an _id: " + obj.jsonString() , + ns.find( ".system.indexes" ) != string::npos || obj["_id"].type() ); + string errmsg; if ( ! prepare( errmsg ) ) throw UserException( 8003 , (string)"SyncClusterConnection::insert prepare failed: " + errmsg ); @@ -201,19 +256,52 @@ namespace mongo { } void SyncClusterConnection::remove( const string &ns , Query query, bool justOne ){ - assert(0); + string errmsg; + if ( ! prepare( errmsg ) ) + throw UserException( 8020 , (string)"SyncClusterConnection::remove prepare failed: " + errmsg ); + + for ( size_t i=0; i<_conns.size(); i++ ){ + _conns[i]->remove( ns , query , justOne ); + } + + _checkLast(); } void SyncClusterConnection::update( const string &ns , Query query , BSONObj obj , bool upsert , bool multi ){ - string errmsg; - if ( ! prepare( errmsg ) ) - throw UserException( 8005 , (string)"SyncClusterConnection::udpate prepare failed: " + errmsg ); + + if ( upsert ){ + uassert( 13120 , "SyncClusterConnection::update upsert query needs _id" , query.obj["_id"].type() ); + } + + if ( _writeConcern ){ + string errmsg; + if ( ! prepare( errmsg ) ) + throw UserException( 8005 , (string)"SyncClusterConnection::udpate prepare failed: " + errmsg ); + } for ( size_t i=0; i<_conns.size(); i++ ){ - _conns[i]->update( ns , query , obj , upsert , multi ); + try { + _conns[i]->update( ns , query , obj , upsert , multi ); + } + catch ( std::exception& e ){ + if ( _writeConcern ) + throw e; + } } - _checkLast(); + if ( _writeConcern ){ + _checkLast(); + assert( _lastErrors.size() > 1 ); + + int a = _lastErrors[0]["n"].numberInt(); + for ( unsigned i=1; i<_lastErrors.size(); i++ ){ + int b = _lastErrors[i]["n"].numberInt(); + if ( a == b ) + continue; + + throw UpdateNotTheSame( 8017 , "update not consistent" , _connAddresses , _lastErrors ); + } + } } string SyncClusterConnection::_toString() const { @@ -244,12 +332,41 @@ namespace mongo { } void SyncClusterConnection::say( Message &toSend ){ - assert(0); + string errmsg; + if ( ! prepare( errmsg ) ) + throw UserException( 13397 , (string)"SyncClusterConnection::say prepare failed: " + errmsg ); + + for ( size_t i=0; i<_conns.size(); i++ ){ + _conns[i]->say( toSend ); + } + + _checkLast(); } void SyncClusterConnection::sayPiggyBack( Message &toSend ){ assert(0); } + int SyncClusterConnection::_lockType( const string& name ){ + { + scoped_lock lk(_mutex); + map::iterator i = _lockTypes.find( name ); + if ( i != _lockTypes.end() ) + return i->second; + } + + BSONObj info; + uassert( 13053 , "help failed" , _commandOnActive( "admin" , BSON( name << "1" << "help" << 1 ) , info ) ); + + int lockType = info["lockType"].numberInt(); + scoped_lock lk(_mutex); + _lockTypes[name] = lockType; + return lockType; + } + + void SyncClusterConnection::killCursor( long long cursorID ){ + // should never need to do this + assert(0); + } } diff --git a/client/syncclusterconnection.h b/client/syncclusterconnection.h index e3411e1..d1115f7 100644 --- a/client/syncclusterconnection.h +++ b/client/syncclusterconnection.h @@ -1,4 +1,5 @@ -// syncclusterconnection.h +// @file syncclusterconnection.h + /* * Copyright 2010 10gen Inc. * @@ -16,25 +17,36 @@ */ -#include "../stdafx.h" +#include "../pch.h" #include "dbclient.h" +#include "redef_macros.h" namespace mongo { /** - * this is a connection to a cluster of servers that operate as one - * for super high durability + * This is a connection to a cluster of servers that operate as one + * for super high durability. + * + * Write operations are two-phase. First, all nodes are asked to fsync. If successful + * everywhere, the write is sent everywhere and then followed by an fsync. There is no + * rollback if a problem occurs during the second phase. Naturally, with all these fsyncs, + * these operations will be quite slow -- use sparingly. + * + * Read operations are sent to a single random node. + * + * The class checks if a command is read or write style, and sends to a single + * node if a read lock command and to all in two phases with a write style command. */ class SyncClusterConnection : public DBClientBase { public: /** - * @param commaSeperated should be 3 hosts comma seperated + * @param commaSeparated should be 3 hosts comma separated */ - SyncClusterConnection( string commaSeperated ); + SyncClusterConnection( const list & ); + SyncClusterConnection( string commaSeparated ); SyncClusterConnection( string a , string b , string c ); ~SyncClusterConnection(); - /** * @return true if all servers are up and ready for writes */ @@ -47,6 +59,8 @@ namespace mongo { // --- from DBClientInterface + virtual BSONObj findOne(const string &ns, const Query& query, const BSONObj *fieldsToReturn, int queryOptions); + virtual auto_ptr query(const string &ns, Query query, int nToReturn, int nToSkip, const BSONObj *fieldsToReturn, int queryOptions, int batchSize ); @@ -60,41 +74,65 @@ namespace mongo { virtual void update( const string &ns , Query query , BSONObj obj , bool upsert , bool multi ); - virtual string toString(){ - return _toString(); - } - virtual bool call( Message &toSend, Message &response, bool assertOk ); virtual void say( Message &toSend ); virtual void sayPiggyBack( Message &toSend ); + + virtual void killCursor( long long cursorID ); virtual string getServerAddress() const { return _address; } + virtual bool isFailed() const { return false; } + virtual string toString() { return _toString(); } - virtual bool isFailed() const { - return false; - } + virtual BSONObj getLastErrorDetailed(); + + virtual bool callRead( Message& toSend , Message& response ); + + virtual ConnectionString::ConnectionType type() const { return ConnectionString::SYNC; } private: - SyncClusterConnection( SyncClusterConnection& prev ); - - string _toString() const; - + string _toString() const; bool _commandOnActive(const string &dbname, const BSONObj& cmd, BSONObj &info, int options=0); - auto_ptr _queryOnActive(const string &ns, Query query, int nToReturn, int nToSkip, const BSONObj *fieldsToReturn, int queryOptions, int batchSize ); - - bool _isReadOnly( const string& name ); - + int _lockType( const string& name ); void _checkLast(); - void _connect( string host ); string _address; + vector _connAddresses; vector _conns; map _lockTypes; + mongo::mutex _mutex; + + vector _lastErrors; }; + class UpdateNotTheSame : public UserException { + public: + UpdateNotTheSame( int code , const string& msg , const vector& addrs , const vector& lastErrors ) + : UserException( code , msg ) , _addrs( addrs ) , _lastErrors( lastErrors ){ + assert( _addrs.size() == _lastErrors.size() ); + } + + virtual ~UpdateNotTheSame() throw() { + } + + unsigned size() const { + return _addrs.size(); + } + pair operator[](unsigned i) const { + return make_pair( _addrs[i] , _lastErrors[i] ); + } + + private: + + vector _addrs; + vector _lastErrors; + }; + }; + +#include "undef_macros.h" diff --git a/client/undef_macros.h b/client/undef_macros.h new file mode 100644 index 0000000..cce8692 --- /dev/null +++ b/client/undef_macros.h @@ -0,0 +1,58 @@ +/** @file undef_macros.h - remove mongo-specific macros that might cause issues */ + +/* Copyright 2009 10gen Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// If you define a new global un-prefixed macro, please add it here and in redef_macros + +// #pragma once // this file is intended to be processed multiple times + + +/** MONGO_EXPOSE_MACROS - when defined, indicates that you are compiling a mongo program rather + than just using the C++ driver. +*/ +#if !defined(MONGO_EXPOSE_MACROS) && !defined(MONGO_MACROS_CLEANED) + +// util/allocator.h +#undef malloc +#undef realloc + +// util/assert_util.h +#undef assert +#undef dassert +#undef wassert +#undef massert +#undef uassert +#undef BOOST_CHECK_EXCEPTION +#undef DESTRUCTOR_GUARD + +// util/goodies.h +#undef PRINT +#undef PRINTFL +#undef asctime +#undef gmtime +#undef localtime +#undef ctime + +// util/debug_util.h +#undef DEV +#undef DEBUGGING +#undef SOMETIMES +#undef OCCASIONALLY +#undef RARELY +#undef ONCE + +#define MONGO_MACROS_CLEANED +#endif -- cgit v1.2.3