summaryrefslogtreecommitdiff
path: root/client
diff options
context:
space:
mode:
authorAntonin Kral <a.kral@bobek.cz>2010-08-11 12:38:57 +0200
committerAntonin Kral <a.kral@bobek.cz>2010-08-11 12:38:57 +0200
commit7645618fd3914cb8a20561625913c20d49504a49 (patch)
tree8370f846f58f6d71165b7a0e2eda04648584ec76 /client
parent68c73c3c7608b4c87f07440dc3232801720b1168 (diff)
downloadmongodb-7645618fd3914cb8a20561625913c20d49504a49.tar.gz
Imported Upstream version 1.6.0
Diffstat (limited to 'client')
-rw-r--r--client/clientOnly.cpp18
-rw-r--r--client/connpool.cpp171
-rw-r--r--client/connpool.h88
-rw-r--r--client/constants.h26
-rw-r--r--client/dbclient.cpp722
-rw-r--r--client/dbclient.h543
-rw-r--r--client/dbclientcursor.cpp232
-rw-r--r--client/dbclientcursor.h204
-rw-r--r--client/distlock.cpp225
-rw-r--r--client/distlock.h91
-rw-r--r--client/distlock_test.cpp80
-rw-r--r--client/examples/clientTest.cpp21
-rw-r--r--client/examples/tail.cpp35
-rw-r--r--client/gridfs.cpp51
-rw-r--r--client/gridfs.h32
-rw-r--r--client/model.cpp53
-rw-r--r--client/model.h7
-rw-r--r--client/parallel.cpp316
-rw-r--r--client/parallel.h176
-rw-r--r--client/redef_macros.h55
-rw-r--r--client/syncclusterconnection.cpp179
-rw-r--r--client/syncclusterconnection.h84
-rw-r--r--client/undef_macros.h58
23 files changed, 2618 insertions, 849 deletions
diff --git a/client/clientOnly.cpp b/client/clientOnly.cpp
index 566095a..6178257 100644
--- a/client/clientOnly.cpp
+++ b/client/clientOnly.cpp
@@ -15,10 +15,11 @@
* limitations under the License.
*/
-#include "stdafx.h"
+#include "pch.h"
#include "../client/dbclient.h"
#include "../db/dbhelpers.h"
#include "../db/cmdline.h"
+#include "../s/shard.h"
namespace mongo {
@@ -57,10 +58,15 @@ namespace mongo {
uassert( 10256 , "no createDirectClient in clientOnly" , 0 );
return 0;
}
-/*
- auto_ptr<CursorIterator> Helpers::find( const char *ns , BSONObj query , bool requireIndex ){
- uassert( 10000 , "Helpers::find can't be used in client" , 0 );
- return auto_ptr<CursorIterator>(0);
+
+ void Shard::getAllShards( vector<Shard>& all ){
+ assert(0);
+ }
+
+ bool Shard::isAShard( const string& ident ){
+ assert(0);
+ return false;
}
-*/
+
+
}
diff --git a/client/connpool.cpp b/client/connpool.cpp
index 5a08483..dae13f6 100644
--- a/client/connpool.cpp
+++ b/client/connpool.cpp
@@ -18,76 +18,99 @@
// _ todo: reconnect?
-#include "stdafx.h"
+#include "pch.h"
#include "connpool.h"
#include "../db/commands.h"
#include "syncclusterconnection.h"
+#include "../s/shard.h"
namespace mongo {
DBConnectionPool pool;
+ DBClientBase* DBConnectionPool::_get(const string& ident) {
+ scoped_lock L(_mutex);
+
+ PoolForHost& p = _pools[ident];
+ if ( p.pool.empty() )
+ return 0;
+
+ DBClientBase *c = p.pool.top();
+ p.pool.pop();
+ return c;
+ }
+
+ DBClientBase* DBConnectionPool::_finishCreate( const string& host , DBClientBase* conn ){
+ {
+ scoped_lock L(_mutex);
+ PoolForHost& p = _pools[host];
+ p.created++;
+ }
+
+ onCreate( conn );
+ onHandedOut( conn );
+
+ return conn;
+ }
+
+ DBClientBase* DBConnectionPool::get(const ConnectionString& url) {
+ DBClientBase * c = _get( url.toString() );
+ if ( c ){
+ onHandedOut( c );
+ return c;
+ }
+
+ string errmsg;
+ c = url.connect( errmsg );
+ uassert( 13328 , (string)"dbconnectionpool: connect failed " + url.toString() + " : " + errmsg , c );
+
+ return _finishCreate( url.toString() , c );
+ }
+
DBClientBase* DBConnectionPool::get(const string& host) {
- scoped_lock L(poolMutex);
+ DBClientBase * c = _get( host );
+ if ( c ){
+ onHandedOut( c );
+ return c;
+ }
- PoolForHost *&p = pools[host];
- if ( p == 0 )
- p = new PoolForHost();
- if ( p->pool.empty() ) {
- int numCommas = DBClientBase::countCommas( host );
- DBClientBase *c;
-
- if( numCommas == 0 ) {
- DBClientConnection *cc = new DBClientConnection(true);
- log(2) << "creating new connection for pool to:" << host << endl;
- string errmsg;
- if ( !cc->connect(host.c_str(), errmsg) ) {
- delete cc;
- uassert( 11002 , (string)"dbconnectionpool: connect failed " + host , false);
- return 0;
- }
- c = cc;
- onCreate( c );
- }
- else if ( numCommas == 1 ) {
- DBClientPaired *p = new DBClientPaired();
- if( !p->connect(host) ) {
- delete p;
- uassert( 11003 , (string)"dbconnectionpool: connect failed [2] " + host , false);
- return 0;
- }
- c = p;
- }
- else if ( numCommas == 2 ) {
- c = new SyncClusterConnection( host );
- }
- else {
- uassert( 13071 , (string)"invalid hostname [" + host + "]" , 0 );
+ string errmsg;
+ ConnectionString cs = ConnectionString::parse( host , errmsg );
+ uassert( 13071 , (string)"invalid hostname [" + host + "]" + errmsg , cs.isValid() );
+
+ c = cs.connect( errmsg );
+ uassert( 11002 , (string)"dbconnectionpool: connect failed " + host + " : " + errmsg , c );
+ return _finishCreate( host , c );
+ }
+
+ DBConnectionPool::~DBConnectionPool(){
+ for ( map<string,PoolForHost>::iterator i = _pools.begin(); i != _pools.end(); i++ ){
+ PoolForHost& p = i->second;
+
+ while ( ! p.pool.empty() ){
+ DBClientBase * c = p.pool.top();
+ delete c;
+ p.pool.pop();
}
- return c;
}
- DBClientBase *c = p->pool.top();
- p->pool.pop();
- onHandedOut( c );
- return c;
}
void DBConnectionPool::flush(){
- scoped_lock L(poolMutex);
- for ( map<string,PoolForHost*>::iterator i = pools.begin(); i != pools.end(); i++ ){
- PoolForHost* p = i->second;
+ scoped_lock L(_mutex);
+ for ( map<string,PoolForHost>::iterator i = _pools.begin(); i != _pools.end(); i++ ){
+ PoolForHost& p = i->second;
vector<DBClientBase*> all;
- while ( ! p->pool.empty() ){
- DBClientBase * c = p->pool.top();
- p->pool.pop();
+ while ( ! p.pool.empty() ){
+ DBClientBase * c = p.pool.top();
+ p.pool.pop();
all.push_back( c );
bool res;
c->isMaster( res );
}
for ( vector<DBClientBase*>::iterator i=all.begin(); i != all.end(); i++ ){
- p->pool.push( *i );
+ p.pool.push( *i );
}
}
}
@@ -114,6 +137,26 @@ namespace mongo {
}
}
+ void DBConnectionPool::appendInfo( BSONObjBuilder& b ){
+ scoped_lock lk( _mutex );
+ BSONObjBuilder bb( b.subobjStart( "hosts" ) );
+ for ( map<string,PoolForHost>::iterator i=_pools.begin(); i!=_pools.end(); ++i ){
+ string s = i->first;
+ BSONObjBuilder temp( bb.subobjStart( s.c_str() ) );
+ temp.append( "available" , (int)(i->second.pool.size()) );
+ temp.appendNumber( "created" , i->second.created );
+ temp.done();
+ }
+ bb.done();
+ }
+
+ ScopedDbConnection * ScopedDbConnection::steal(){
+ assert( _conn );
+ ScopedDbConnection * n = new ScopedDbConnection( _host , _conn );
+ _conn = 0;
+ return n;
+ }
+
ScopedDbConnection::~ScopedDbConnection() {
if ( _conn ){
if ( ! _conn->isFailed() ) {
@@ -124,20 +167,44 @@ namespace mongo {
}
}
+ ScopedDbConnection::ScopedDbConnection(const Shard& shard )
+ : _host( shard.getConnString() ) , _conn( pool.get(_host) ){
+ }
+
+ ScopedDbConnection::ScopedDbConnection(const Shard* shard )
+ : _host( shard->getConnString() ) , _conn( pool.get(_host) ){
+ }
+
class PoolFlushCmd : public Command {
public:
- PoolFlushCmd() : Command( "connpoolsync" ){}
- virtual LockType locktype(){ return NONE; }
- virtual bool run(const char*, mongo::BSONObj&, std::string&, mongo::BSONObjBuilder& result, bool){
+ PoolFlushCmd() : Command( "connPoolSync" , false , "connpoolsync" ){}
+ virtual void help( stringstream &help ) const { help<<"internal"; }
+ virtual LockType locktype() const { return NONE; }
+ virtual bool run(const string&, mongo::BSONObj&, std::string&, mongo::BSONObjBuilder& result, bool){
pool.flush();
- result << "ok" << 1;
return true;
}
- virtual bool slaveOk(){
+ virtual bool slaveOk() const {
return true;
}
} poolFlushCmd;
+ class PoolStats : public Command {
+ public:
+ PoolStats() : Command( "connPoolStats" ){}
+ virtual void help( stringstream &help ) const { help<<"stats about connection pool"; }
+ virtual LockType locktype() const { return NONE; }
+ virtual bool run(const string&, mongo::BSONObj&, std::string&, mongo::BSONObjBuilder& result, bool){
+ pool.appendInfo( result );
+ return true;
+ }
+ virtual bool slaveOk() const {
+ return true;
+ }
+
+ } poolStatsCmd;
+
+
} // namespace mongo
diff --git a/client/connpool.h b/client/connpool.h
index b44ff51..00570c5 100644
--- a/client/connpool.h
+++ b/client/connpool.h
@@ -19,20 +19,30 @@
#include <stack>
#include "dbclient.h"
+#include "redef_macros.h"
namespace mongo {
+ class Shard;
+
struct PoolForHost {
+ PoolForHost()
+ : created(0){}
+ PoolForHost( const PoolForHost& other ){
+ assert(other.pool.size() == 0);
+ created = other.created;
+ assert( created == 0 );
+ }
+
std::stack<DBClientBase*> pool;
+ long long created;
};
class DBConnectionHook {
public:
virtual ~DBConnectionHook(){}
-
virtual void onCreate( DBClientBase * conn ){}
virtual void onHandedOut( DBClientBase * conn ){}
-
};
/** Database connection pool.
@@ -51,33 +61,54 @@ namespace mongo {
}
*/
class DBConnectionPool {
- mongo::mutex poolMutex;
- map<string,PoolForHost*> pools; // servername -> pool
+ mongo::mutex _mutex;
+ map<string,PoolForHost> _pools; // servername -> pool
list<DBConnectionHook*> _hooks;
+
+ DBClientBase* _get( const string& ident );
+ DBClientBase* _finishCreate( const string& ident , DBClientBase* conn );
+
+ public:
+ DBConnectionPool() : _mutex("DBConnectionPool") { }
+ ~DBConnectionPool();
+
+
void onCreate( DBClientBase * conn );
void onHandedOut( DBClientBase * conn );
- public:
+
void flush();
+
DBClientBase *get(const string& host);
+ DBClientBase *get(const ConnectionString& host);
+
void release(const string& host, DBClientBase *c) {
if ( c->isFailed() ){
delete c;
return;
}
- scoped_lock L(poolMutex);
- pools[host]->pool.push(c);
+ scoped_lock L(_mutex);
+ _pools[host].pool.push(c);
}
void addHook( DBConnectionHook * hook );
+ void appendInfo( BSONObjBuilder& b );
};
-
+
extern DBConnectionPool pool;
+ class AScopedConnection : boost::noncopyable {
+ public:
+ virtual ~AScopedConnection(){}
+ virtual DBClientBase* get() = 0;
+ virtual void done() = 0;
+ virtual string getHost() const = 0;
+ };
+
/** Use to get a connection from the pool. On exceptions things
clean up nicely.
*/
- class ScopedDbConnection {
- const string host;
+ class ScopedDbConnection : public AScopedConnection {
+ const string _host;
DBClientBase *_conn;
public:
/** get the associated connection object */
@@ -85,19 +116,42 @@ namespace mongo {
uassert( 11004 , "did you call done already" , _conn );
return _conn;
}
-
+
/** get the associated connection object */
DBClientBase& conn() {
uassert( 11005 , "did you call done already" , _conn );
return *_conn;
}
+ /** get the associated connection object */
+ DBClientBase* get() {
+ uassert( 13102 , "did you call done already" , _conn );
+ return _conn;
+ }
+
+ ScopedDbConnection()
+ : _host( "" ) , _conn(0) {
+ }
+
/** throws UserException if can't connect */
- ScopedDbConnection(const string& _host) :
- host(_host), _conn( pool.get(_host) ) {
- //cout << " for: " << _host << " got conn: " << _conn << endl;
+ ScopedDbConnection(const string& host)
+ : _host(host), _conn( pool.get(host) ) {
+ }
+
+ ScopedDbConnection(const string& host, DBClientBase* conn )
+ : _host( host ) , _conn( conn ){
+ }
+
+ ScopedDbConnection(const Shard& shard );
+ ScopedDbConnection(const Shard* shard );
+
+ ScopedDbConnection(const ConnectionString& url )
+ : _host(url.toString()), _conn( pool.get(url) ) {
}
+
+ string getHost() const { return _host; }
+
/** Force closure of the connection. You should call this if you leave it in
a bad state. Destructor will do this too, but it is verbose.
*/
@@ -121,12 +175,16 @@ namespace mongo {
kill();
else
*/
- pool.release(host, _conn);
+ pool.release(_host, _conn);
_conn = 0;
}
+ ScopedDbConnection * steal();
+
~ScopedDbConnection();
};
} // namespace mongo
+
+#include "undef_macros.h"
diff --git a/client/constants.h b/client/constants.h
new file mode 100644
index 0000000..66aa9b1
--- /dev/null
+++ b/client/constants.h
@@ -0,0 +1,26 @@
+// constants.h
+
+#pragma once
+
+namespace mongo {
+
+ /* query results include a 32 result flag word consisting of these bits */
+ enum ResultFlagType {
+ /* returned, with zero results, when getMore is called but the cursor id
+ is not valid at the server. */
+ ResultFlag_CursorNotFound = 1,
+
+ /* { $err : ... } is being returned */
+ ResultFlag_ErrSet = 2,
+
+ /* Have to update config from the server, usually $err is also set */
+ ResultFlag_ShardConfigStale = 4,
+
+ /* for backward compatability: this let's us know the server supports
+ the QueryOption_AwaitData option. if it doesn't, a repl slave client should sleep
+ a little between getMore's.
+ */
+ ResultFlag_AwaitCapable = 8
+ };
+
+}
diff --git a/client/dbclient.cpp b/client/dbclient.cpp
index f617f7c..04b6147 100644
--- a/client/dbclient.cpp
+++ b/client/dbclient.cpp
@@ -15,22 +15,89 @@
* limitations under the License.
*/
-#include "stdafx.h"
+#include "pch.h"
#include "../db/pdfile.h"
#include "dbclient.h"
-#include "../util/builder.h"
+#include "../bson/util/builder.h"
#include "../db/jsobj.h"
#include "../db/json.h"
#include "../db/instance.h"
#include "../util/md5.hpp"
#include "../db/dbmessage.h"
#include "../db/cmdline.h"
+#include "connpool.h"
+#include "../s/util.h"
+#include "syncclusterconnection.h"
namespace mongo {
+ DBClientBase* ConnectionString::connect( string& errmsg ) const {
+ switch ( _type ){
+ case MASTER: {
+ DBClientConnection * c = new DBClientConnection(true);
+ log(1) << "creating new connection to:" << _servers[0] << endl;
+ if ( ! c->connect( _servers[0] , errmsg ) ) {
+ delete c;
+ return 0;
+ }
+ return c;
+ }
+
+ case PAIR:
+ case SET: {
+ DBClientReplicaSet * set = new DBClientReplicaSet( _setName , _servers );
+ if( ! set->connect() ){
+ delete set;
+ errmsg = "connect failed to set ";
+ errmsg += toString();
+ return 0;
+ }
+ return set;
+ }
+
+ case SYNC: {
+ // TODO , don't copy
+ list<HostAndPort> l;
+ for ( unsigned i=0; i<_servers.size(); i++ )
+ l.push_back( _servers[i] );
+ return new SyncClusterConnection( l );
+ }
+
+ case INVALID:
+ throw UserException( 13421 , "trying to connect to invalid ConnectionString" );
+ break;
+ }
+
+ assert( 0 );
+ return 0;
+ }
+
+ ConnectionString ConnectionString::parse( const string& host , string& errmsg ){
+
+ string::size_type i = host.find( '/' );
+ if ( i != string::npos ){
+ // replica set
+ return ConnectionString( SET , host.substr( i + 1 ) , host.substr( 0 , i ) );
+ }
+
+ int numCommas = DBClientBase::countCommas( host );
+
+ if( numCommas == 0 )
+ return ConnectionString( HostAndPort( host ) );
+
+ if ( numCommas == 1 )
+ return ConnectionString( PAIR , host );
+
+ if ( numCommas == 2 )
+ return ConnectionString( SYNC , host );
+
+ errmsg = (string)"invalid hostname [" + host + "]";
+ return ConnectionString(); // INVALID
+ }
+
Query& Query::where(const string &jscode, BSONObj scope) {
/* use where() before sort() and hint() and explain(), else this will assert. */
- assert( !obj.hasField("query") );
+ assert( ! isComplex() );
BSONObjBuilder b;
b.appendElements(obj);
b.appendWhere(jscode, scope);
@@ -39,7 +106,7 @@ namespace mongo {
}
void Query::makeComplex() {
- if ( obj.hasElement( "query" ) )
+ if ( isComplex() )
return;
BSONObjBuilder b;
b.append( "query", obj );
@@ -76,19 +143,36 @@ namespace mongo {
return *this;
}
- bool Query::isComplex() const{
- return obj.hasElement( "query" );
+ bool Query::isComplex( bool * hasDollar ) const{
+ if ( obj.hasElement( "query" ) ){
+ if ( hasDollar )
+ hasDollar[0] = false;
+ return true;
+ }
+
+ if ( obj.hasElement( "$query" ) ){
+ if ( hasDollar )
+ hasDollar[0] = true;
+ return true;
+ }
+
+ return false;
}
BSONObj Query::getFilter() const {
- if ( ! isComplex() )
+ bool hasDollar;
+ if ( ! isComplex( &hasDollar ) )
return obj;
- return obj.getObjectField( "query" );
+
+ return obj.getObjectField( hasDollar ? "$query" : "query" );
}
BSONObj Query::getSort() const {
if ( ! isComplex() )
return BSONObj();
- return obj.getObjectField( "orderby" );
+ BSONObj ret = obj.getObjectField( "orderby" );
+ if (ret.isEmpty())
+ ret = obj.getObjectField( "$orderby" );
+ return ret;
}
BSONObj Query::getHint() const {
if ( ! isComplex() )
@@ -109,6 +193,17 @@ namespace mongo {
return o["ok"].trueValue();
}
+ enum QueryOptions DBClientWithCommands::availableOptions() {
+ if ( !_haveCachedAvailableOptions ) {
+ BSONObj ret;
+ if ( runCommand( "admin", BSON( "availablequeryoptions" << 1 ), ret ) ) {
+ _cachedAvailableOptions = ( enum QueryOptions )( ret.getIntField( "options" ) );
+ }
+ _haveCachedAvailableOptions = true;
+ }
+ return _cachedAvailableOptions;
+ }
+
inline bool DBClientWithCommands::runCommand(const string &dbname, const BSONObj& cmd, BSONObj &info, int options) {
string ns = dbname + ".$cmd";
info = findOne(ns, cmd, 0 , options);
@@ -133,7 +228,7 @@ namespace mongo {
BSONObj res;
if( !runCommand(ns.db.c_str(), cmd, res, options) )
uasserted(11010,string("count fails:") + res.toString());
- return res.getIntField("n");
+ return res["n"].numberLong();
}
BSONObj getlasterrorcmdobj = fromjson("{getlasterror:1}");
@@ -146,10 +241,14 @@ namespace mongo {
string DBClientWithCommands::getLastError() {
BSONObj info = getLastErrorDetailed();
+ return getLastErrorString( info );
+ }
+
+ string DBClientWithCommands::getLastErrorString( const BSONObj& info ){
BSONElement e = info["err"];
if( e.eoo() ) return "";
if( e.type() == Object ) return e.toString();
- return e.str();
+ return e.str();
}
BSONObj getpreverrorcmdobj = fromjson("{getpreverror:1}");
@@ -223,13 +322,14 @@ namespace mongo {
bool DBClientWithCommands::isMaster(bool& isMaster, BSONObj *info) {
BSONObj o;
- if ( info == 0 ) info = &o;
+ if ( info == 0 )
+ info = &o;
bool ok = runCommand("admin", ismastercmdobj, *info);
- isMaster = (info->getIntField("ismaster") == 1);
+ isMaster = info->getField("ismaster").trueValue();
return ok;
}
- bool DBClientWithCommands::createCollection(const string &ns, unsigned size, bool capped, int max, BSONObj *info) {
+ bool DBClientWithCommands::createCollection(const string &ns, long long size, bool capped, int max, BSONObj *info) {
BSONObj o;
if ( info == 0 ) info = &o;
BSONObjBuilder b;
@@ -346,64 +446,9 @@ namespace mongo {
string db = nsGetDB( ns ) + ".system.namespaces";
BSONObj q = BSON( "name" << ns );
- return count( db.c_str() , q );
- }
-
-
- void testSort() {
- DBClientConnection c;
- string err;
- if ( !c.connect("localhost", err) ) {
- out() << "can't connect to server " << err << endl;
- return;
- }
-
- cout << "findOne returns:" << endl;
- cout << c.findOne("test.foo", QUERY( "x" << 3 ) ).toString() << endl;
- cout << c.findOne("test.foo", QUERY( "x" << 3 ).sort("name") ).toString() << endl;
-
- }
-
- /* TODO: unit tests should run this? */
- void testDbEval() {
- DBClientConnection c;
- string err;
- if ( !c.connect("localhost", err) ) {
- out() << "can't connect to server " << err << endl;
- return;
- }
-
- if( !c.auth("dwight", "u", "p", err) ) {
- out() << "can't authenticate " << err << endl;
- return;
- }
-
- BSONObj info;
- BSONElement retValue;
- BSONObjBuilder b;
- b.append("0", 99);
- BSONObj args = b.done();
- bool ok = c.eval("dwight", "function() { return args[0]; }", info, retValue, &args);
- out() << "eval ok=" << ok << endl;
- out() << "retvalue=" << retValue.toString() << endl;
- out() << "info=" << info.toString() << endl;
-
- out() << endl;
-
- int x = 3;
- assert( c.eval("dwight", "function() { return 3; }", x) );
-
- out() << "***\n";
-
- BSONObj foo = fromjson("{\"x\":7}");
- out() << foo.toString() << endl;
- int res=0;
- ok = c.eval("dwight", "function(parm1) { return parm1.x; }", foo, res);
- out() << ok << " retval:" << res << endl;
+ return count( db.c_str() , q ) != 0;
}
- void testPaired();
-
/* --- dbclientconnection --- */
bool DBClientConnection::auth(const string &dbname, const string &username, const string &password_text, string& errmsg, bool digestPassword) {
@@ -422,48 +467,42 @@ namespace mongo {
return DBClientBase::auth(dbname, username, password.c_str(), errmsg, false);
}
- BSONObj DBClientInterface::findOne(const string &ns, Query query, const BSONObj *fieldsToReturn, int queryOptions) {
+ BSONObj DBClientInterface::findOne(const string &ns, const Query& query, const BSONObj *fieldsToReturn, int queryOptions) {
auto_ptr<DBClientCursor> c =
this->query(ns, query, 1, 0, fieldsToReturn, queryOptions);
- massert( 10276 , "DBClientBase::findOne: transport error", c.get() );
+ uassert( 10276 , "DBClientBase::findOne: transport error", c.get() );
+
+ if ( c->hasResultFlag( ResultFlag_ShardConfigStale ) )
+ throw StaleConfigException( ns , "findOne has stale config" );
if ( !c->more() )
return BSONObj();
- return c->next().copy();
+ return c->nextSafe().copy();
+ }
+
+ bool DBClientConnection::connect(const HostAndPort& server, string& errmsg){
+ _server = server;
+ _serverString = _server.toString();
+ return _connect( errmsg );
}
- bool DBClientConnection::connect(const string &_serverAddress, string& errmsg) {
- serverAddress = _serverAddress;
+ bool DBClientConnection::_connect( string& errmsg ){
+ _serverString = _server.toString();
+ // we keep around SockAddr for connection life -- maybe MessagingPort
+ // requires that?
+ server.reset(new SockAddr(_server.host().c_str(), _server.port()));
+ p.reset(new MessagingPort( _timeout, _logLevel ));
- string ip;
- int port;
- size_t idx = serverAddress.find( ":" );
- if ( idx != string::npos ) {
- port = strtol( serverAddress.substr( idx + 1 ).c_str(), 0, 10 );
- ip = serverAddress.substr( 0 , idx );
- ip = hostbyname(ip.c_str());
- } else {
- port = CmdLine::DefaultDBPort;
- ip = hostbyname( serverAddress.c_str() );
- }
- if( ip.empty() ) {
- stringstream ss;
- ss << "client connect: couldn't parse/resolve hostname: " << _serverAddress;
- errmsg = ss.str();
+ if (server->getAddr() == "0.0.0.0"){
failed = true;
return false;
}
- // we keep around SockAddr for connection life -- maybe MessagingPort
- // requires that?
- server = auto_ptr<SockAddr>(new SockAddr(ip.c_str(), port));
- p = auto_ptr<MessagingPort>(new MessagingPort());
-
if ( !p->connect(*server) ) {
stringstream ss;
- ss << "couldn't connect to server " << serverAddress << " " << ip << ":" << port;
+ ss << "couldn't connect to server " << _serverString << '}';
errmsg = ss.str();
failed = true;
return false;
@@ -480,22 +519,21 @@ namespace mongo {
return;
lastReconnectTry = time(0);
- log() << "trying reconnect to " << serverAddress << endl;
+ log(_logLevel) << "trying reconnect to " << _serverString << endl;
string errmsg;
- string tmp = serverAddress;
failed = false;
- if ( !connect(tmp.c_str(), errmsg) ) {
- log() << "reconnect " << serverAddress << " failed " << errmsg << endl;
+ if ( ! _connect(errmsg) ) {
+ log(_logLevel) << "reconnect " << _serverString << " failed " << errmsg << endl;
return;
}
- log() << "reconnect " << serverAddress << " ok" << endl;
+ log(_logLevel) << "reconnect " << _serverString << " ok" << endl;
for( map< string, pair<string,string> >::iterator i = authCache.begin(); i != authCache.end(); i++ ) {
const char *dbname = i->first.c_str();
const char *username = i->second.first.c_str();
const char *password = i->second.second.c_str();
if( !DBClientBase::auth(dbname, username, password, errmsg, false) )
- log() << "reconnect: auth failed db:" << dbname << " user:" << username << ' ' << errmsg << '\n';
+ log(_logLevel) << "reconnect: auth failed db:" << dbname << " user:" << username << ' ' << errmsg << '\n';
}
}
@@ -516,13 +554,76 @@ namespace mongo {
return auto_ptr< DBClientCursor >( 0 );
}
+ struct DBClientFunConvertor {
+ void operator()( DBClientCursorBatchIterator &i ) {
+ while( i.moreInCurrentBatch() ) {
+ _f( i.nextSafe() );
+ }
+ }
+ boost::function<void(const BSONObj &)> _f;
+ };
+
+ unsigned long long DBClientConnection::query( boost::function<void(const BSONObj&)> f, const string& ns, Query query, const BSONObj *fieldsToReturn, int queryOptions ) {
+ DBClientFunConvertor fun;
+ fun._f = f;
+ boost::function<void(DBClientCursorBatchIterator &)> ptr( fun );
+ return DBClientConnection::query( ptr, ns, query, fieldsToReturn, queryOptions );
+ }
+
+ unsigned long long DBClientConnection::query( boost::function<void(DBClientCursorBatchIterator &)> f, const string& ns, Query query, const BSONObj *fieldsToReturn, int queryOptions ) {
+ // mask options
+ queryOptions &= (int)( QueryOption_NoCursorTimeout | QueryOption_SlaveOk );
+ unsigned long long n = 0;
+
+ bool doExhaust = ( availableOptions() & QueryOption_Exhaust );
+ if ( doExhaust ) {
+ queryOptions |= (int)QueryOption_Exhaust;
+ }
+ auto_ptr<DBClientCursor> c( this->query(ns, query, 0, 0, fieldsToReturn, queryOptions) );
+ massert( 13386, "socket error for mapping query", c.get() );
+
+ if ( !doExhaust ) {
+ while( c->more() ) {
+ DBClientCursorBatchIterator i( *c );
+ f( i );
+ n += i.n();
+ }
+ return n;
+ }
+
+ try {
+ while( 1 ) {
+ while( c->moreInCurrentBatch() ) {
+ DBClientCursorBatchIterator i( *c );
+ f( i );
+ n += i.n();
+ }
+
+ if( c->getCursorId() == 0 )
+ break;
+
+ c->exhaustReceiveMore();
+ }
+ }
+ catch(std::exception&) {
+ /* connection CANNOT be used anymore as more data may be on the way from the server.
+ we have to reconnect.
+ */
+ failed = true;
+ p->shutdown();
+ throw;
+ }
+
+ return n;
+ }
+
void DBClientBase::insert( const string & ns , BSONObj obj ) {
Message toSend;
BufBuilder b;
int opts = 0;
- b.append( opts );
- b.append( ns );
+ b.appendNum( opts );
+ b.appendStr( ns );
obj.appendSelfToBufBuilder( b );
toSend.setData( dbInsert , b.buf() , b.len() );
@@ -535,8 +636,8 @@ namespace mongo {
BufBuilder b;
int opts = 0;
- b.append( opts );
- b.append( ns );
+ b.appendNum( opts );
+ b.appendStr( ns );
for( vector< BSONObj >::const_iterator i = v.begin(); i != v.end(); ++i )
i->appendSelfToBufBuilder( b );
@@ -550,13 +651,13 @@ namespace mongo {
BufBuilder b;
int opts = 0;
- b.append( opts );
- b.append( ns );
+ b.appendNum( opts );
+ b.appendStr( ns );
int flags = 0;
if ( justOne )
- flags |= 1;
- b.append( flags );
+ flags |= RemoveOption_JustOne;
+ b.appendNum( flags );
obj.obj.appendSelfToBufBuilder( b );
@@ -568,13 +669,13 @@ namespace mongo {
void DBClientBase::update( const string & ns , Query query , BSONObj obj , bool upsert , bool multi ) {
BufBuilder b;
- b.append( (int)0 ); // reserved
- b.append( ns );
+ b.appendNum( (int)0 ); // reserved
+ b.appendStr( ns );
int flags = 0;
if ( upsert ) flags |= UpdateOption_Upsert;
if ( multi ) flags |= UpdateOption_Multi;
- b.append( flags );
+ b.appendNum( flags );
query.obj.appendSelfToBufBuilder( b );
obj.appendSelfToBufBuilder( b );
@@ -599,7 +700,7 @@ namespace mongo {
if ( ! runCommand( nsToDatabase( ns.c_str() ) ,
BSON( "deleteIndexes" << NamespaceString( ns ).coll << "index" << indexName ) ,
info ) ){
- log() << "dropIndex failed: " << info << endl;
+ log(_logLevel) << "dropIndex failed: " << info << endl;
uassert( 10007 , "dropIndex failed" , 0 );
}
resetIndexCache();
@@ -684,15 +785,21 @@ namespace mongo {
/* -- DBClientCursor ---------------------------------------------- */
+#ifdef _DEBUG
+#define CHECK_OBJECT( o , msg ) massert( 10337 , (string)"object not valid" + (msg) , (o).isValid() )
+#else
+#define CHECK_OBJECT( o , msg )
+#endif
+
void assembleRequest( const string &ns, BSONObj query, int nToReturn, int nToSkip, const BSONObj *fieldsToReturn, int queryOptions, Message &toSend ) {
CHECK_OBJECT( query , "assembleRequest query" );
// see query.h for the protocol we are using here.
BufBuilder b;
int opts = queryOptions;
- b.append(opts);
- b.append(ns.c_str());
- b.append(nToSkip);
- b.append(nToReturn);
+ b.appendNum(opts);
+ b.appendStr(ns);
+ b.appendNum(nToSkip);
+ b.appendNum(nToReturn);
query.appendSelfToBufBuilder(b);
if ( fieldsToReturn )
fieldsToReturn->appendSelfToBufBuilder(b);
@@ -713,6 +820,10 @@ namespace mongo {
port().piggyBack( toSend );
}
+ void DBClientConnection::recv( Message &m ) {
+ port().recv(m);
+ }
+
bool DBClientConnection::call( Message &toSend, Message &response, bool assertOk ) {
/* todo: this is very ugly messagingport::call returns an error code AND can throw
an exception. we should make it return void and just throw an exception anytime
@@ -722,7 +833,7 @@ namespace mongo {
if ( !port().call(toSend, response) ) {
failed = true;
if ( assertOk )
- massert( 10278 , "dbclient error communicating with server", false);
+ uassert( 10278 , "dbclient error communicating with server", false);
return false;
}
}
@@ -736,178 +847,128 @@ namespace mongo {
void DBClientConnection::checkResponse( const char *data, int nReturned ) {
/* check for errors. the only one we really care about at
this stage is "not master" */
- if ( clientPaired && nReturned ) {
+ if ( clientSet && nReturned ) {
+ assert(data);
BSONObj o(data);
BSONElement e = o.firstElement();
if ( strcmp(e.fieldName(), "$err") == 0 &&
e.type() == String && strncmp(e.valuestr(), "not master", 10) == 0 ) {
- clientPaired->isntMaster();
+ clientSet->isntMaster();
}
}
}
- int DBClientCursor::nextBatchSize(){
- if ( nToReturn == 0 )
- return batchSize;
- if ( batchSize == 0 )
- return nToReturn;
+ void DBClientConnection::killCursor( long long cursorId ){
+ BufBuilder b;
+ b.appendNum( (int)0 ); // reserved
+ b.appendNum( (int)1 ); // number
+ b.appendNum( cursorId );
- return batchSize < nToReturn ? batchSize : nToReturn;
- }
-
- bool DBClientCursor::init() {
- Message toSend;
- if ( !cursorId ) {
- assembleRequest( ns, query, nextBatchSize() , nToSkip, fieldsToReturn, opts, toSend );
- } else {
- BufBuilder b;
- b.append( opts );
- b.append( ns.c_str() );
- b.append( nToReturn );
- b.append( cursorId );
- toSend.setData( dbGetMore, b.buf(), b.len() );
- }
- if ( !connector->call( toSend, *m, false ) )
- return false;
- if ( ! m->data )
- return false;
- dataReceived();
- return true;
+ Message m;
+ m.setData( dbKillCursors , b.buf() , b.len() );
+
+ sayPiggyBack( m );
}
- void DBClientCursor::requestMore() {
- assert( cursorId && pos == nReturned );
-
- if (haveLimit){
- nToReturn -= nReturned;
- assert(nToReturn > 0);
- }
- BufBuilder b;
- b.append(opts);
- b.append(ns.c_str());
- b.append(nextBatchSize());
- b.append(cursorId);
+ /* --- class dbclientpaired --- */
- Message toSend;
- toSend.setData(dbGetMore, b.buf(), b.len());
- auto_ptr<Message> response(new Message());
- connector->call( toSend, *response );
-
- m = response;
- dataReceived();
- }
-
- void DBClientCursor::dataReceived() {
- QueryResult *qr = (QueryResult *) m->data;
- resultFlags = qr->resultFlags();
- if ( qr->resultFlags() & QueryResult::ResultFlag_CursorNotFound ) {
- // cursor id no longer valid at the server.
- assert( qr->cursorId == 0 );
- cursorId = 0; // 0 indicates no longer valid (dead)
- // TODO: should we throw a UserException here???
- }
- if ( cursorId == 0 || ! ( opts & QueryOption_CursorTailable ) ) {
- // only set initially: we don't want to kill it on end of data
- // if it's a tailable cursor
- cursorId = qr->cursorId;
- }
- nReturned = qr->nReturned;
- pos = 0;
- data = qr->data();
-
- connector->checkResponse( data, nReturned );
- /* this assert would fire the way we currently work:
- assert( nReturned || cursorId == 0 );
- */
+ string DBClientReplicaSet::toString() {
+ return getServerAddress();
}
- /** If true, safe to call next(). Requests more from server if necessary. */
- bool DBClientCursor::more() {
- if ( !_putBack.empty() )
- return true;
+ DBClientReplicaSet::DBClientReplicaSet( const string& name , const vector<HostAndPort>& servers )
+ : _name( name ) , _currentMaster( 0 ), _servers( servers ){
- if (haveLimit && pos >= nToReturn)
- return false;
-
- if ( pos < nReturned )
- return true;
-
- if ( cursorId == 0 )
- return false;
-
- requestMore();
- return pos < nReturned;
+ for ( unsigned i=0; i<_servers.size(); i++ )
+ _conns.push_back( new DBClientConnection( true , this ) );
}
-
- BSONObj DBClientCursor::next() {
- assert( more() );
- if ( !_putBack.empty() ) {
- BSONObj ret = _putBack.top();
- _putBack.pop();
- return ret;
- }
- pos++;
- BSONObj o(data);
- data += o.objsize();
- return o;
- }
-
- DBClientCursor::~DBClientCursor() {
- DESTRUCTOR_GUARD (
- if ( cursorId && _ownCursor ) {
- BufBuilder b;
- b.append( (int)0 ); // reserved
- b.append( (int)1 ); // number
- b.append( cursorId );
-
- Message m;
- m.setData( dbKillCursors , b.buf() , b.len() );
-
- connector->sayPiggyBack( m );
- }
- );
+
+ DBClientReplicaSet::~DBClientReplicaSet(){
+ for ( unsigned i=0; i<_conns.size(); i++ )
+ delete _conns[i];
+ _conns.clear();
}
-
- /* --- class dbclientpaired --- */
-
- string DBClientPaired::toString() {
- stringstream ss;
- ss << "state: " << master << '\n';
- ss << "left: " << left.toStringLong() << '\n';
- ss << "right: " << right.toStringLong() << '\n';
+
+ string DBClientReplicaSet::getServerAddress() const {
+ StringBuilder ss;
+ if ( _name.size() )
+ ss << _name << "/";
+
+ for ( unsigned i=0; i<_servers.size(); i++ ){
+ if ( i > 0 )
+ ss << ",";
+ ss << _servers[i].toString();
+ }
return ss.str();
}
-#pragma warning(disable: 4355)
- DBClientPaired::DBClientPaired() :
- left(true, this), right(true, this)
- {
- master = NotSetL;
- }
-#pragma warning(default: 4355)
-
/* find which server, the left or right, is currently master mode */
- void DBClientPaired::_checkMaster() {
+ void DBClientReplicaSet::_checkMaster() {
+
+ bool triedQuickCheck = false;
+
+ log( _logLevel + 1) << "_checkMaster on: " << toString() << endl;
for ( int retry = 0; retry < 2; retry++ ) {
- int x = master;
- for ( int pass = 0; pass < 2; pass++ ) {
- DBClientConnection& c = x == 0 ? left : right;
+ for ( unsigned i=0; i<_conns.size(); i++ ){
+ DBClientConnection * c = _conns[i];
try {
bool im;
BSONObj o;
- c.isMaster(im, &o);
+ c->isMaster(im, &o);
+
if ( retry )
- log() << "checkmaster: " << c.toString() << ' ' << o.toString() << '\n';
+ log(_logLevel) << "checkmaster: " << c->toString() << ' ' << o << '\n';
+
+ string maybePrimary;
+ if ( o["hosts"].type() == Array ){
+ if ( o["primary"].type() == String )
+ maybePrimary = o["primary"].String();
+
+ BSONObjIterator hi(o["hosts"].Obj());
+ while ( hi.more() ){
+ string toCheck = hi.next().String();
+ int found = -1;
+ for ( unsigned x=0; x<_servers.size(); x++ ){
+ if ( toCheck == _servers[x].toString() ){
+ found = x;
+ break;
+ }
+ }
+
+ if ( found == -1 ){
+ HostAndPort h( toCheck );
+ _servers.push_back( h );
+ _conns.push_back( new DBClientConnection( true, this ) );
+ string temp;
+ _conns[ _conns.size() - 1 ]->connect( h , temp );
+ log( _logLevel ) << "updated set to: " << toString() << endl;
+ }
+
+ }
+ }
+
if ( im ) {
- master = (State) (x + 2);
+ _currentMaster = c;
return;
}
+
+ if ( maybePrimary.size() && ! triedQuickCheck ){
+ for ( unsigned x=0; x<_servers.size(); x++ ){
+ if ( _servers[i].toString() != maybePrimary )
+ continue;
+ triedQuickCheck = true;
+ _conns[x]->isMaster( im , &o );
+ if ( im ){
+ _currentMaster = _conns[x];
+ return;
+ }
+ }
+ }
}
- catch (AssertionException&) {
+ catch ( std::exception& e ) {
if ( retry )
- log() << "checkmaster: caught exception " << c.toString() << '\n';
+ log(_logLevel) << "checkmaster: caught exception " << c->toString() << ' ' << e.what() << endl;
}
- x = x^1;
}
sleepsecs(1);
}
@@ -915,36 +976,54 @@ namespace mongo {
uassert( 10009 , "checkmaster: no master found", false);
}
- inline DBClientConnection& DBClientPaired::checkMaster() {
- if ( master > NotSetR ) {
+ DBClientConnection * DBClientReplicaSet::checkMaster() {
+ if ( _currentMaster ){
// a master is selected. let's just make sure connection didn't die
- DBClientConnection& c = master == Left ? left : right;
- if ( !c.isFailed() )
- return c;
- // after a failure, on the next checkMaster, start with the other
- // server -- presumably it took over. (not critical which we check first,
- // just will make the failover slightly faster if we guess right)
- master = master == Left ? NotSetR : NotSetL;
+ if ( ! _currentMaster->isFailed() )
+ return _currentMaster;
+ _currentMaster = 0;
}
_checkMaster();
- assert( master > NotSetR );
- return master == Left ? left : right;
+ assert( _currentMaster );
+ return _currentMaster;
}
- DBClientConnection& DBClientPaired::slaveConn(){
- DBClientConnection& m = checkMaster();
- assert( ! m.isFailed() );
- return master == Left ? right : left;
+ DBClientConnection& DBClientReplicaSet::masterConn(){
+ return *checkMaster();
}
- bool DBClientPaired::connect(const string &serverHostname1, const string &serverHostname2) {
+ DBClientConnection& DBClientReplicaSet::slaveConn(){
+ DBClientConnection * m = checkMaster();
+ assert( ! m->isFailed() );
+
+ DBClientConnection * failedSlave = 0;
+
+ for ( unsigned i=0; i<_conns.size(); i++ ){
+ if ( m == _conns[i] )
+ continue;
+ failedSlave = _conns[i];
+ if ( _conns[i]->isFailed() )
+ continue;
+ return *_conns[i];
+ }
+
+ assert(failedSlave);
+ return *failedSlave;
+ }
+
+ bool DBClientReplicaSet::connect(){
string errmsg;
- bool l = left.connect(serverHostname1, errmsg);
- bool r = right.connect(serverHostname2, errmsg);
- master = l ? NotSetL : NotSetR;
- if ( !l && !r ) // it would be ok to fall through, but checkMaster will then try an immediate reconnect which is slow
+
+ bool anyGood = false;
+ for ( unsigned i=0; i<_conns.size(); i++ ){
+ if ( _conns[i]->connect( _servers[i] , errmsg ) )
+ anyGood = true;
+ }
+
+ if ( ! anyGood )
return false;
+
try {
checkMaster();
}
@@ -954,61 +1033,44 @@ namespace mongo {
return true;
}
- bool DBClientPaired::connect(string hostpairstring) {
- size_t comma = hostpairstring.find( "," );
- uassert( 10010 , "bad hostpairstring", comma != string::npos);
- return connect( hostpairstring.substr( 0 , comma ) , hostpairstring.substr( comma + 1 ) );
- }
-
- bool DBClientPaired::auth(const string &dbname, const string &username, const string &pwd, string& errmsg) {
- DBClientConnection& m = checkMaster();
- if( !m.auth(dbname, username, pwd, errmsg) )
+ bool DBClientReplicaSet::auth(const string &dbname, const string &username, const string &pwd, string& errmsg, bool digestPassword ) {
+ DBClientConnection * m = checkMaster();
+ if( !m->auth(dbname, username, pwd, errmsg, digestPassword ) )
return false;
+
/* we try to authentiate with the other half of the pair -- even if down, that way the authInfo is cached. */
- string e;
- try {
- if( &m == &left )
- right.auth(dbname, username, pwd, e);
- else
- left.auth(dbname, username, pwd, e);
- }
- catch( AssertionException&) {
- }
+ for ( unsigned i=0; i<_conns.size(); i++ ){
+ if ( _conns[i] == m )
+ continue;
+ try {
+ string e;
+ _conns[i]->auth( dbname , username , pwd , e , digestPassword );
+ }
+ catch ( AssertionException& ){
+ }
+ }
+
return true;
}
- auto_ptr<DBClientCursor> DBClientPaired::query(const string &a, Query b, int c, int d,
- const BSONObj *e, int f, int g)
- {
- return checkMaster().query(a,b,c,d,e,f,g);
- }
-
- BSONObj DBClientPaired::findOne(const string &a, Query b, const BSONObj *c, int d) {
- return checkMaster().findOne(a,b,c,d);
- }
-
- void testPaired() {
- DBClientPaired p;
- log() << "connect returns " << p.connect("localhost:27017", "localhost:27018") << endl;
-
- //DBClientConnection p(true);
- string errmsg;
- // log() << "connect " << p.connect("localhost", errmsg) << endl;
- log() << "auth " << p.auth("dwight", "u", "p", errmsg) << endl;
-
- while( 1 ) {
- sleepsecs(3);
- try {
- log() << "findone returns " << p.findOne("dwight.foo", BSONObj()).toString() << endl;
- sleepsecs(3);
- BSONObj info;
- bool im;
- log() << "ismaster returns " << p.isMaster(im,&info) << " info: " << info.toString() << endl;
- }
- catch(...) {
- cout << "caught exception" << endl;
- }
- }
- }
+ auto_ptr<DBClientCursor> DBClientReplicaSet::query(const string &a, Query b, int c, int d,
+ const BSONObj *e, int f, int g){
+ // TODO: if slave ok is set go to a slave
+ return checkMaster()->query(a,b,c,d,e,f,g);
+ }
+ BSONObj DBClientReplicaSet::findOne(const string &a, const Query& b, const BSONObj *c, int d) {
+ return checkMaster()->findOne(a,b,c,d);
+ }
+
+ bool serverAlive( const string &uri ) {
+ DBClientConnection c( false, 0, 20 ); // potentially the connection to server could fail while we're checking if it's alive - so use timeouts
+ string err;
+ if ( !c.connect( uri, err ) )
+ return false;
+ if ( !c.simpleCommand( "admin", 0, "ping" ) )
+ return false;
+ return true;
+ }
+
} // namespace mongo
diff --git a/client/dbclient.h b/client/dbclient.h
index a2fad8e..639d960 100644
--- a/client/dbclient.h
+++ b/client/dbclient.h
@@ -17,7 +17,7 @@
#pragma once
-#include "../stdafx.h"
+#include "../pch.h"
#include "../util/message.h"
#include "../db/jsobj.h"
#include "../db/json.h"
@@ -51,7 +51,7 @@ namespace mongo {
// an extended period of time.
QueryOption_OplogReplay = 1 << 3,
- /** The server normally times out idle cursors after an inactivy period to prevent excess memory use
+ /** The server normally times out idle cursors after an inactivy period to prevent excess memory uses
Set this option to prevent that.
*/
QueryOption_NoCursorTimeout = 1 << 4,
@@ -59,7 +59,18 @@ namespace mongo {
/** Use with QueryOption_CursorTailable. If we are at the end of the data, block for a while rather
than returning no data. After a timeout period, we do return as normal.
*/
- QueryOption_AwaitData = 1 << 5
+ QueryOption_AwaitData = 1 << 5,
+
+ /** Stream the data down full blast in multiple "more" packages, on the assumption that the client
+ will fully read all data queried. Faster when you are pulling a lot of data and know you want to
+ pull it all down. Note: it is not allowed to not read all the data unless you close the connection.
+
+ Use the query( boost::function<void(const BSONObj&)> f, ... ) version of the connection's query()
+ method, and it will take care of all the details for you.
+ */
+ QueryOption_Exhaust = 1 << 6,
+
+ QueryOption_AllSupported = QueryOption_CursorTailable | QueryOption_SlaveOk | QueryOption_OplogReplay | QueryOption_NoCursorTimeout | QueryOption_AwaitData | QueryOption_Exhaust
};
@@ -69,10 +80,129 @@ namespace mongo {
/** Update multiple documents (if multiple documents match query expression).
(Default is update a single document and stop.) */
- UpdateOption_Multi = 1 << 1
+ UpdateOption_Multi = 1 << 1,
+
+ /** flag from mongo saying this update went everywhere */
+ UpdateOption_Broadcast = 1 << 2
+ };
+
+ enum RemoveOptions {
+ /** only delete one option */
+ RemoveOption_JustOne = 1 << 0,
+
+ /** flag from mongo saying this update went everywhere */
+ RemoveOption_Broadcast = 1 << 1
+ };
+
+ class DBClientBase;
+
+ class ConnectionString {
+ public:
+ enum ConnectionType { INVALID , MASTER , PAIR , SET , SYNC };
+
+ ConnectionString( const HostAndPort& server ){
+ _type = MASTER;
+ _servers.push_back( server );
+ _finishInit();
+ }
+
+ ConnectionString( ConnectionType type , const vector<HostAndPort>& servers )
+ : _type( type ) , _servers( servers ){
+ _finishInit();
+ }
+
+ ConnectionString( ConnectionType type , const string& s , const string& setName = "" ){
+ _type = type;
+ _setName = setName;
+ _fillServers( s );
+
+ switch ( _type ){
+ case MASTER:
+ assert( _servers.size() == 1 );
+ break;
+ case SET:
+ assert( _setName.size() );
+ assert( _servers.size() >= 1 ); // 1 is ok since we can derive
+ break;
+ case PAIR:
+ assert( _servers.size() == 2 );
+ break;
+ default:
+ assert( _servers.size() > 0 );
+ }
+
+ _finishInit();
+ }
+
+ ConnectionString( const string& s , ConnectionType favoredMultipleType ){
+ _fillServers( s );
+ if ( _servers.size() == 1 ){
+ _type = MASTER;
+ }
+ else {
+ _type = favoredMultipleType;
+ assert( _type != MASTER );
+ }
+ _finishInit();
+ }
+
+ bool isValid() const { return _type != INVALID; }
+
+ string toString() const {
+ return _string;
+ }
+
+ DBClientBase* connect( string& errmsg ) const;
+
+ static ConnectionString parse( const string& url , string& errmsg );
+
+ private:
+
+ ConnectionString(){
+ _type = INVALID;
+ }
+
+ void _fillServers( string s ){
+ string::size_type idx;
+ while ( ( idx = s.find( ',' ) ) != string::npos ){
+ _servers.push_back( s.substr( 0 , idx ) );
+ s = s.substr( idx + 1 );
+ }
+ _servers.push_back( s );
+ }
+
+ void _finishInit(){
+ stringstream ss;
+ if ( _type == SET )
+ ss << _setName << "/";
+ for ( unsigned i=0; i<_servers.size(); i++ ){
+ if ( i > 0 )
+ ss << ",";
+ ss << _servers[i].toString();
+ }
+ _string = ss.str();
+ }
+
+ ConnectionType _type;
+ vector<HostAndPort> _servers;
+ string _string;
+ string _setName;
+ };
+
+ /**
+ * controls how much a clients cares about writes
+ * default is NORMAL
+ */
+ enum WriteConcern {
+ W_NONE = 0 , // TODO: not every connection type fully supports this
+ W_NORMAL = 1
+ // TODO SAFE = 2
};
class BSONObj;
+ class ScopedDbConnection;
+ class DBClientCursor;
+ class DBClientCursorBatchIterator;
/** Represents a Mongo query expression. Typically one uses the QUERY(...) macro to construct a Query object.
Examples:
@@ -160,7 +290,7 @@ namespace mongo {
/**
* if this query has an orderby, hint, or some other field
*/
- bool isComplex() const;
+ bool isComplex( bool * hasDollar = 0 ) const;
BSONObj getFilter() const;
BSONObj getSort() const;
@@ -195,146 +325,12 @@ namespace mongo {
virtual bool call( Message &toSend, Message &response, bool assertOk=true ) = 0;
virtual void say( Message &toSend ) = 0;
virtual void sayPiggyBack( Message &toSend ) = 0;
- virtual void checkResponse( const string &data, int nReturned ) {}
- };
-
- /** Queries return a cursor object */
- class DBClientCursor : boost::noncopyable {
- friend class DBClientBase;
- bool init();
- public:
- /** If true, safe to call next(). Requests more from server if necessary. */
- bool more();
+ virtual void checkResponse( const char* data, int nReturned ) {}
- /** If true, there is more in our local buffers to be fetched via next(). Returns
- false when a getMore request back to server would be required. You can use this
- if you want to exhaust whatever data has been fetched to the client already but
- then perhaps stop.
- */
- bool moreInCurrentBatch() { return !_putBack.empty() || pos < nReturned; }
-
- /** next
- @return next object in the result cursor.
- on an error at the remote server, you will get back:
- { $err: <string> }
- if you do not want to handle that yourself, call nextSafe().
- */
- BSONObj next();
-
- /**
- restore an object previously returned by next() to the cursor
- */
- void putBack( const BSONObj &o ) { _putBack.push( o.getOwned() ); }
-
- /** throws AssertionException if get back { $err : ... } */
- BSONObj nextSafe() {
- BSONObj o = next();
- BSONElement e = o.firstElement();
- assert( strcmp(e.fieldName(), "$err") != 0 );
- return o;
- }
-
- /**
- iterate the rest of the cursor and return the number if items
- */
- int itcount(){
- int c = 0;
- while ( more() ){
- next();
- c++;
- }
- return c;
- }
-
- /** cursor no longer valid -- use with tailable cursors.
- note you should only rely on this once more() returns false;
- 'dead' may be preset yet some data still queued and locally
- available from the dbclientcursor.
- */
- bool isDead() const {
- return cursorId == 0;
- }
-
- bool tailable() const {
- return (opts & QueryOption_CursorTailable) != 0;
- }
-
- /** see QueryResult::ResultFlagType (db/dbmessage.h) for flag values
- mostly these flags are for internal purposes -
- ResultFlag_ErrSet is the possible exception to that
- */
- bool hasResultFlag( int flag ){
- return (resultFlags & flag) != 0;
- }
-
- DBClientCursor( DBConnector *_connector, const string &_ns, BSONObj _query, int _nToReturn,
- int _nToSkip, const BSONObj *_fieldsToReturn, int queryOptions , int bs ) :
- connector(_connector),
- ns(_ns),
- query(_query),
- nToReturn(_nToReturn),
- haveLimit( _nToReturn > 0 && !(queryOptions & QueryOption_CursorTailable)),
- nToSkip(_nToSkip),
- fieldsToReturn(_fieldsToReturn),
- opts(queryOptions),
- batchSize(bs),
- m(new Message()),
- cursorId(),
- nReturned(),
- pos(),
- data(),
- _ownCursor( true ) {
- }
-
- DBClientCursor( DBConnector *_connector, const string &_ns, long long _cursorId, int _nToReturn, int options ) :
- connector(_connector),
- ns(_ns),
- nToReturn( _nToReturn ),
- haveLimit( _nToReturn > 0 && !(options & QueryOption_CursorTailable)),
- opts( options ),
- m(new Message()),
- cursorId( _cursorId ),
- nReturned(),
- pos(),
- data(),
- _ownCursor( true ) {
- }
-
- virtual ~DBClientCursor();
-
- long long getCursorId() const { return cursorId; }
-
- /** by default we "own" the cursor and will send the server a KillCursor
- message when ~DBClientCursor() is called. This function overrides that.
- */
- void decouple() { _ownCursor = false; }
-
- private:
-
- int nextBatchSize();
-
- DBConnector *connector;
- string ns;
- BSONObj query;
- int nToReturn;
- bool haveLimit;
- int nToSkip;
- const BSONObj *fieldsToReturn;
- int opts;
- int batchSize;
- auto_ptr<Message> m;
- stack< BSONObj > _putBack;
-
- int resultFlags;
- long long cursorId;
- int nReturned;
- int pos;
- const char *data;
- void dataReceived();
- void requestMore();
- bool _ownCursor; // see decouple()
+ /* used by QueryOption_Exhaust. To use that your subclass must implement this. */
+ virtual void recv( Message& m ) { assert(false); }
};
-
+
/**
The interface that any db connection should implement
*/
@@ -343,6 +339,7 @@ namespace mongo {
virtual auto_ptr<DBClientCursor> query(const string &ns, Query query, int nToReturn = 0, int nToSkip = 0,
const BSONObj *fieldsToReturn = 0, int queryOptions = 0 , int batchSize = 0 ) = 0;
+ /** don't use this - called automatically by DBClientCursor for you */
virtual auto_ptr<DBClientCursor> getMore( const string &ns, long long cursorId, int nToReturn = 0, int options = 0 ) = 0;
virtual void insert( const string &ns, BSONObj obj ) = 0;
@@ -359,7 +356,7 @@ namespace mongo {
@return a single object that matches the query. if none do, then the object is empty
@throws AssertionException
*/
- virtual BSONObj findOne(const string &ns, Query query, const BSONObj *fieldsToReturn = 0, int queryOptions = 0);
+ virtual BSONObj findOne(const string &ns, const Query& query, const BSONObj *fieldsToReturn = 0, int queryOptions = 0);
};
@@ -371,33 +368,38 @@ namespace mongo {
class DBClientWithCommands : public DBClientInterface {
set<string> _seenIndexes;
public:
+ /** controls how chatty the client is about network errors & such. See log.h */
+ int _logLevel;
+
+ DBClientWithCommands() : _logLevel(0), _cachedAvailableOptions( (enum QueryOptions)0 ), _haveCachedAvailableOptions(false) { }
- /** helper function. run a simple command where the command expression is simply
- { command : 1 }
+ /** helper function. run a simple command where the command expression is simply
+ { command : 1 }
@param info -- where to put result object. may be null if caller doesn't need that info
@param command -- command name
- @return true if the command returned "ok".
- */
+ @return true if the command returned "ok".
+ */
bool simpleCommand(const string &dbname, BSONObj *info, const string &command);
/** Run a database command. Database commands are represented as BSON objects. Common database
commands have prebuilt helper functions -- see below. If a helper is not available you can
- directly call runCommand.
+ directly call runCommand.
@param dbname database name. Use "admin" for global administrative commands.
@param cmd the command object to execute. For example, { ismaster : 1 }
@param info the result object the database returns. Typically has { ok : ..., errmsg : ... } fields
set.
- @return true if the command returned "ok".
+ @param options see enum QueryOptions - normally not needed to run a command
+ @return true if the command returned "ok".
*/
virtual bool runCommand(const string &dbname, const BSONObj& cmd, BSONObj &info, int options=0);
/** Authorize access to a particular database.
- Authentication is separate for each database on the server -- you may authenticate for any
- number of databases on a single connection.
- The "admin" database is special and once authenticated provides access to all databases on the
- server.
- @param digestPassword if password is plain text, set this to true. otherwise assumed to be pre-digested
+ Authentication is separate for each database on the server -- you may authenticate for any
+ number of databases on a single connection.
+ The "admin" database is special and once authenticated provides access to all databases on the
+ server.
+ @param digestPassword if password is plain text, set this to true. otherwise assumed to be pre-digested
@return true if successful
*/
virtual bool auth(const string &dbname, const string &username, const string &pwd, string& errmsg, bool digestPassword = true);
@@ -425,17 +427,17 @@ namespace mongo {
If the collection already exists, no action occurs.
- ns: fully qualified collection name
- size: desired initial extent size for the collection.
- Must be <= 1000000000 for normal collections.
- For fixed size (capped) collections, this size is the total/max size of the
- collection.
- capped: if true, this is a fixed size collection (where old data rolls out).
- max: maximum number of objects if capped (optional).
+ @param ns fully qualified collection name
+ @param size desired initial extent size for the collection.
+ Must be <= 1000000000 for normal collections.
+ For fixed size (capped) collections, this size is the total/max size of the
+ collection.
+ @param capped if true, this is a fixed size collection (where old data rolls out).
+ @param max maximum number of objects if capped (optional).
returns true if successful.
*/
- bool createCollection(const string &ns, unsigned size = 0, bool capped = false, int max = 0, BSONObj *info = 0);
+ bool createCollection(const string &ns, long long size = 0, bool capped = false, int max = 0, BSONObj *info = 0);
/** Get error result from the last operation on this connection.
@return error message text, or empty string if no error.
@@ -444,7 +446,9 @@ namespace mongo {
/** Get error result from the last operation on this connection.
@return full error object.
*/
- BSONObj getLastErrorDetailed();
+ virtual BSONObj getLastErrorDetailed();
+
+ static string getLastErrorString( const BSONObj& res );
/** Return the last error which has occurred, even if not the very last operation.
@@ -595,6 +599,8 @@ namespace mongo {
/**
get a list of all the current databases
+ uses the { listDatabases : 1 } command.
+ throws on error
*/
list<string> getDatabaseNames();
@@ -605,7 +611,6 @@ namespace mongo {
bool exists( const string& ns );
-
/** Create an index if it does not already exist.
ensureIndex calls are remembered so it is safe/fast to call this function many
times in your code.
@@ -666,25 +671,39 @@ namespace mongo {
protected:
bool isOk(const BSONObj&);
-
+
+ enum QueryOptions availableOptions();
+
+ private:
+ enum QueryOptions _cachedAvailableOptions;
+ bool _haveCachedAvailableOptions;
};
/**
abstract class that implements the core db operations
*/
class DBClientBase : public DBClientWithCommands, public DBConnector {
+ protected:
+ WriteConcern _writeConcern;
+
public:
+ DBClientBase(){
+ _writeConcern = W_NORMAL;
+ }
+
+ WriteConcern getWriteConcern() const { return _writeConcern; }
+ void setWriteConcern( WriteConcern w ){ _writeConcern = w; }
+
/** send a query to the database.
- ns: namespace to query, format is <dbname>.<collectname>[.<collectname>]*
- query: query to perform on the collection. this is a BSONObj (binary JSON)
+ @param ns namespace to query, format is <dbname>.<collectname>[.<collectname>]*
+ @param query query to perform on the collection. this is a BSONObj (binary JSON)
You may format as
{ query: { ... }, orderby: { ... } }
to specify a sort order.
- nToReturn: n to return. 0 = unlimited
- nToSkip: start with the nth item
- fieldsToReturn:
- optional template of which fields to select. if unspecified, returns all fields
- queryOptions: see options enum at top of this file
+ @param nToReturn n to return. 0 = unlimited
+ @param nToSkip start with the nth item
+ @param fieldsToReturn optional template of which fields to select. if unspecified, returns all fields
+ @param queryOptions see options enum at top of this file
@return cursor. 0 if error (connection failure)
@throws AssertionException
@@ -692,12 +711,13 @@ namespace mongo {
virtual auto_ptr<DBClientCursor> query(const string &ns, Query query, int nToReturn = 0, int nToSkip = 0,
const BSONObj *fieldsToReturn = 0, int queryOptions = 0 , int batchSize = 0 );
- /** @param cursorId id of cursor to retrieve
+ /** don't use this - called automatically by DBClientCursor for you
+ @param cursorId id of cursor to retrieve
@return an handle to a previously allocated cursor
@throws AssertionException
*/
virtual auto_ptr<DBClientCursor> getMore( const string &ns, long long cursorId, int nToReturn = 0, int options = 0 );
-
+
/**
insert an object into the database
*/
@@ -717,11 +737,13 @@ namespace mongo {
/**
updates objects matching query
*/
- virtual void update( const string &ns , Query query , BSONObj obj , bool upsert = 0 , bool multi = 0 );
+ virtual void update( const string &ns , Query query , BSONObj obj , bool upsert = false , bool multi = false );
virtual string getServerAddress() const = 0;
virtual bool isFailed() const = 0;
+
+ virtual void killCursor( long long cursorID ) = 0;
static int countCommas( const string& s ){
int n = 0;
@@ -730,9 +752,15 @@ namespace mongo {
n++;
return n;
}
- };
+
+ virtual bool callRead( Message& toSend , Message& response ) = 0;
+ // virtual bool callWrite( Message& toSend , Message& response ) = 0; // TODO: add this if needed
+ virtual void say( Message& toSend ) = 0;
+
+ virtual ConnectionString::ConnectionType type() const = 0;
+ }; // DBClientBase
- class DBClientPaired;
+ class DBClientReplicaSet;
class ConnectException : public UserException {
public:
@@ -744,24 +772,31 @@ namespace mongo {
This is the main entry point for talking to a simple Mongo setup
*/
class DBClientConnection : public DBClientBase {
- DBClientPaired *clientPaired;
- auto_ptr<MessagingPort> p;
- auto_ptr<SockAddr> server;
+ DBClientReplicaSet *clientSet;
+ boost::scoped_ptr<MessagingPort> p;
+ boost::scoped_ptr<SockAddr> server;
bool failed; // true if some sort of fatal error has ever happened
bool autoReconnect;
time_t lastReconnectTry;
- string serverAddress; // remember for reconnects
+ HostAndPort _server; // remember for reconnects
+ string _serverString;
+ int _port;
void _checkConnection();
void checkConnection() { if( failed ) _checkConnection(); }
map< string, pair<string,string> > authCache;
+ int _timeout;
+
+ bool _connect( string& errmsg );
public:
/**
@param _autoReconnect if true, automatically reconnect on a connection failure
- @param cp used by DBClientPaired. You do not need to specify this parameter
+ @param cp used by DBClientReplicaSet. You do not need to specify this parameter
+ @param timeout tcp timeout in seconds - this is for read/write, not connect.
+ Connect timeout is fixed, but short, at 5 seconds.
*/
- DBClientConnection(bool _autoReconnect=false,DBClientPaired* cp=0) :
- clientPaired(cp), failed(false), autoReconnect(_autoReconnect), lastReconnectTry(0) { }
+ DBClientConnection(bool _autoReconnect=false, DBClientReplicaSet* cp=0, int timeout=0) :
+ clientSet(cp), failed(false), autoReconnect(_autoReconnect), lastReconnectTry(0), _timeout(timeout) { }
/** Connect to a Mongo database server.
@@ -769,10 +804,27 @@ namespace mongo {
false was returned -- it will try to connect again.
@param serverHostname host to connect to. can include port number ( 127.0.0.1 , 127.0.0.1:5555 )
+ If you use IPv6 you must add a port number ( ::1:27017 )
@param errmsg any relevant error message will appended to the string
+ @deprecated please use HostAndPort
@return false if fails to connect.
*/
- virtual bool connect(const string &serverHostname, string& errmsg);
+ virtual bool connect(const char * hostname, string& errmsg){
+ // TODO: remove this method
+ HostAndPort t( hostname );
+ return connect( t , errmsg );
+ }
+
+ /** Connect to a Mongo database server.
+
+ If autoReconnect is true, you can try to use the DBClientConnection even when
+ false was returned -- it will try to connect again.
+
+ @param server server to connect to.
+ @param errmsg any relevant error message will appended to the string
+ @return false if fails to connect.
+ */
+ virtual bool connect(const HostAndPort& server, string& errmsg);
/** Connect to a Mongo database server. Exception throwing version.
Throws a UserException if cannot connect.
@@ -782,20 +834,26 @@ namespace mongo {
@param serverHostname host to connect to. can include port number ( 127.0.0.1 , 127.0.0.1:5555 )
*/
- void connect(string serverHostname) {
+ void connect(const string& serverHostname) {
string errmsg;
- if( !connect(serverHostname.c_str(), errmsg) )
+ if( !connect(HostAndPort(serverHostname), errmsg) )
throw ConnectException(string("can't connect ") + errmsg);
}
virtual bool auth(const string &dbname, const string &username, const string &pwd, string& errmsg, bool digestPassword = true);
- virtual auto_ptr<DBClientCursor> query(const string &ns, Query query, int nToReturn = 0, int nToSkip = 0,
+ virtual auto_ptr<DBClientCursor> query(const string &ns, Query query=Query(), int nToReturn = 0, int nToSkip = 0,
const BSONObj *fieldsToReturn = 0, int queryOptions = 0 , int batchSize = 0 ) {
checkConnection();
return DBClientBase::query( ns, query, nToReturn, nToSkip, fieldsToReturn, queryOptions , batchSize );
}
+ /** uses QueryOption_Exhaust
+ use DBClientCursorBatchIterator if you want to do items in large blocks, perhpas to avoid granular locking and such.
+ */
+ unsigned long long query( boost::function<void(const BSONObj&)> f, const string& ns, Query query, const BSONObj *fieldsToReturn = 0, int queryOptions = 0);
+ unsigned long long query( boost::function<void(DBClientCursorBatchIterator&)> f, const string& ns, Query query, const BSONObj *fieldsToReturn = 0, int queryOptions = 0);
+
/**
@return true if this connection is currently in a failed state. When autoreconnect is on,
a connection will transition back to an ok state after reconnecting.
@@ -805,67 +863,75 @@ namespace mongo {
}
MessagingPort& port() {
- return *p.get();
+ return *p;
}
string toStringLong() const {
stringstream ss;
- ss << serverAddress;
+ ss << _serverString;
if ( failed ) ss << " failed";
return ss.str();
}
/** Returns the address of the server */
string toString() {
- return serverAddress;
+ return _serverString;
}
string getServerAddress() const {
- return serverAddress;
+ return _serverString;
+ }
+
+ virtual void killCursor( long long cursorID );
+
+ virtual bool callRead( Message& toSend , Message& response ){
+ return call( toSend , response );
}
- virtual bool call( Message &toSend, Message &response, bool assertOk = true );
virtual void say( Message &toSend );
+ virtual bool call( Message &toSend, Message &response, bool assertOk = true );
+
+ virtual ConnectionString::ConnectionType type() const { return ConnectionString::MASTER; }
+ protected:
+ friend class SyncClusterConnection;
+ virtual void recv( Message& m );
virtual void sayPiggyBack( Message &toSend );
virtual void checkResponse( const char *data, int nReturned );
};
-
- /** Use this class to connect to a replica pair of servers. The class will manage
- checking for which server in a replica pair is master, and do failover automatically.
-
+
+ /** Use this class to connect to a replica set of servers. The class will manage
+ checking for which server in a replica set is master, and do failover automatically.
+
+ This can also be used to connect to replica pairs since pairs are a subset of sets
+
On a failover situation, expect at least one operation to return an error (throw
an exception) before the failover is complete. Operations are not retried.
*/
- class DBClientPaired : public DBClientBase {
- DBClientConnection left,right;
- enum State {
- NotSetL=0,
- NotSetR=1,
- Left, Right
- } master;
+ class DBClientReplicaSet : public DBClientBase {
+ string _name;
+ DBClientConnection * _currentMaster;
+ vector<HostAndPort> _servers;
+ vector<DBClientConnection*> _conns;
+
void _checkMaster();
- DBClientConnection& checkMaster();
+ DBClientConnection * checkMaster();
public:
- /** Call connect() after constructing. autoReconnect is always on for DBClientPaired connections. */
- DBClientPaired();
+ /** Call connect() after constructing. autoReconnect is always on for DBClientReplicaSet connections. */
+ DBClientReplicaSet( const string& name , const vector<HostAndPort>& servers );
+ virtual ~DBClientReplicaSet();
- /** Returns false is neither member of the pair were reachable, or neither is
+ /** Returns false if nomember of the set were reachable, or neither is
master, although,
when false returned, you can still try to use this connection object, it will
try reconnects.
*/
- bool connect(const string &serverHostname1, const string &serverHostname2);
+ bool connect();
- /** Connect to a server pair using a host pair string of the form
- hostname[:port],hostname[:port]
- */
- bool connect(string hostpairstring);
-
- /** Authorize. Authorizes both sides of the pair as needed.
+ /** Authorize. Authorizes all nodes as needed
*/
- bool auth(const string &dbname, const string &username, const string &pwd, string& errmsg);
+ virtual bool auth(const string &dbname, const string &username, const string &pwd, string& errmsg, bool digestPassword = true );
/** throws userassertion "no master found" */
virtual
@@ -874,56 +940,69 @@ namespace mongo {
/** throws userassertion "no master found" */
virtual
- BSONObj findOne(const string &ns, Query query, const BSONObj *fieldsToReturn = 0, int queryOptions = 0);
+ BSONObj findOne(const string &ns, const Query& query, const BSONObj *fieldsToReturn = 0, int queryOptions = 0);
/** insert */
virtual void insert( const string &ns , BSONObj obj ) {
- checkMaster().insert(ns, obj);
+ checkMaster()->insert(ns, obj);
}
/** insert multiple objects. Note that single object insert is asynchronous, so this version
is only nominally faster and not worth a special effort to try to use. */
virtual void insert( const string &ns, const vector< BSONObj >& v ) {
- checkMaster().insert(ns, v);
+ checkMaster()->insert(ns, v);
}
/** remove */
virtual void remove( const string &ns , Query obj , bool justOne = 0 ) {
- checkMaster().remove(ns, obj, justOne);
+ checkMaster()->remove(ns, obj, justOne);
}
/** update */
virtual void update( const string &ns , Query query , BSONObj obj , bool upsert = 0 , bool multi = 0 ) {
- return checkMaster().update(ns, query, obj, upsert,multi);
+ return checkMaster()->update(ns, query, obj, upsert,multi);
}
+ virtual void killCursor( long long cursorID ){
+ checkMaster()->killCursor( cursorID );
+ }
+
string toString();
/* this is the callback from our underlying connections to notify us that we got a "not master" error.
*/
void isntMaster() {
- master = ( ( master == Left ) ? NotSetR : NotSetL );
+ _currentMaster = 0;
}
- string getServerAddress() const {
- return left.getServerAddress() + "," + right.getServerAddress();
- }
-
+ string getServerAddress() const;
+
+ DBClientConnection& masterConn();
DBClientConnection& slaveConn();
- /* TODO - not yet implemented. mongos may need these. */
- virtual bool call( Message &toSend, Message &response, bool assertOk=true ) { assert(false); return false; }
- virtual void say( Message &toSend ) { assert(false); }
+
+ virtual bool call( Message &toSend, Message &response, bool assertOk=true ) { return checkMaster()->call( toSend , response , assertOk ); }
+ virtual void say( Message &toSend ) { checkMaster()->say( toSend ); }
+ virtual bool callRead( Message& toSend , Message& response ){ return checkMaster()->callRead( toSend , response ); }
+
+ virtual ConnectionString::ConnectionType type() const { return ConnectionString::SET; }
+
+ protected:
virtual void sayPiggyBack( Message &toSend ) { assert(false); }
virtual void checkResponse( const char *data, int nReturned ) { assert(false); }
bool isFailed() const {
- // TODO: this really should check isFailed on current master as well
- return master < Left;
+ return _currentMaster == 0 || _currentMaster->isFailed();
}
};
+ /** pings server to check if it's up
+ */
+ bool serverAlive( const string &uri );
DBClientBase * createDirectClient();
} // namespace mongo
+
+#include "dbclientcursor.h"
+#include "undef_macros.h"
diff --git a/client/dbclientcursor.cpp b/client/dbclientcursor.cpp
new file mode 100644
index 0000000..07771bb
--- /dev/null
+++ b/client/dbclientcursor.cpp
@@ -0,0 +1,232 @@
+// dbclient.cpp - connect to a Mongo database as a database, from C++
+
+/* Copyright 2009 10gen Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "pch.h"
+#include "dbclient.h"
+#include "../db/dbmessage.h"
+#include "../db/cmdline.h"
+#include "connpool.h"
+#include "../s/shard.h"
+
+namespace mongo {
+
+ void assembleRequest( const string &ns, BSONObj query, int nToReturn, int nToSkip, const BSONObj *fieldsToReturn, int queryOptions, Message &toSend );
+
+ int DBClientCursor::nextBatchSize(){
+
+ if ( nToReturn == 0 )
+ return batchSize;
+
+ if ( batchSize == 0 )
+ return nToReturn;
+
+ return batchSize < nToReturn ? batchSize : nToReturn;
+ }
+
+ bool DBClientCursor::init() {
+ Message toSend;
+ if ( !cursorId ) {
+ assembleRequest( ns, query, nextBatchSize() , nToSkip, fieldsToReturn, opts, toSend );
+ } else {
+ BufBuilder b;
+ b.appendNum( opts );
+ b.appendStr( ns );
+ b.appendNum( nToReturn );
+ b.appendNum( cursorId );
+ toSend.setData( dbGetMore, b.buf(), b.len() );
+ }
+ if ( !connector->call( toSend, *m, false ) )
+ return false;
+ if ( m->empty() )
+ return false;
+ dataReceived();
+ return true;
+ }
+
+ void DBClientCursor::requestMore() {
+ assert( cursorId && pos == nReturned );
+
+ if (haveLimit){
+ nToReturn -= nReturned;
+ assert(nToReturn > 0);
+ }
+ BufBuilder b;
+ b.appendNum(opts);
+ b.appendStr(ns);
+ b.appendNum(nextBatchSize());
+ b.appendNum(cursorId);
+
+ Message toSend;
+ toSend.setData(dbGetMore, b.buf(), b.len());
+ auto_ptr<Message> response(new Message());
+
+ if ( connector ){
+ connector->call( toSend, *response );
+ m = response;
+ dataReceived();
+ }
+ else {
+ assert( _scopedHost.size() );
+ ScopedDbConnection conn( _scopedHost );
+ conn->call( toSend , *response );
+ connector = conn.get();
+ m = response;
+ dataReceived();
+ connector = 0;
+ conn.done();
+ }
+ }
+
+ /** with QueryOption_Exhaust, the server just blasts data at us (marked at end with cursorid==0). */
+ void DBClientCursor::exhaustReceiveMore() {
+ assert( cursorId && pos == nReturned );
+ assert( !haveLimit );
+ auto_ptr<Message> response(new Message());
+ assert( connector );
+ connector->recv(*response);
+ m = response;
+ dataReceived();
+ }
+
+ void DBClientCursor::dataReceived() {
+ QueryResult *qr = (QueryResult *) m->singleData();
+ resultFlags = qr->resultFlags();
+
+ if ( qr->resultFlags() & ResultFlag_CursorNotFound ) {
+ // cursor id no longer valid at the server.
+ assert( qr->cursorId == 0 );
+ cursorId = 0; // 0 indicates no longer valid (dead)
+ if ( ! ( opts & QueryOption_CursorTailable ) )
+ throw UserException( 13127 , "getMore: cursor didn't exist on server, possible restart or timeout?" );
+ }
+
+ if ( cursorId == 0 || ! ( opts & QueryOption_CursorTailable ) ) {
+ // only set initially: we don't want to kill it on end of data
+ // if it's a tailable cursor
+ cursorId = qr->cursorId;
+ }
+
+ nReturned = qr->nReturned;
+ pos = 0;
+ data = qr->data();
+
+ connector->checkResponse( data, nReturned );
+ /* this assert would fire the way we currently work:
+ assert( nReturned || cursorId == 0 );
+ */
+ }
+
+ /** If true, safe to call next(). Requests more from server if necessary. */
+ bool DBClientCursor::more() {
+ _assertIfNull();
+
+ if ( !_putBack.empty() )
+ return true;
+
+ if (haveLimit && pos >= nToReturn)
+ return false;
+
+ if ( pos < nReturned )
+ return true;
+
+ if ( cursorId == 0 )
+ return false;
+
+ requestMore();
+ return pos < nReturned;
+ }
+
+ BSONObj DBClientCursor::next() {
+ DEV _assertIfNull();
+ if ( !_putBack.empty() ) {
+ BSONObj ret = _putBack.top();
+ _putBack.pop();
+ return ret;
+ }
+
+ uassert(13422, "DBClientCursor next() called but more() is false", pos < nReturned);
+
+ pos++;
+ BSONObj o(data);
+ data += o.objsize();
+ /* todo would be good to make data null at end of batch for safety */
+ return o;
+ }
+
+ void DBClientCursor::peek(vector<BSONObj>& v, int atMost) {
+ int m = atMost;
+
+ /*
+ for( stack<BSONObj>::iterator i = _putBack.begin(); i != _putBack.end(); i++ ) {
+ if( m == 0 )
+ return;
+ v.push_back(*i);
+ m--;
+ n++;
+ }
+ */
+
+ int p = pos;
+ const char *d = data;
+ while( m && p < nReturned ) {
+ BSONObj o(d);
+ d += o.objsize();
+ p++;
+ m--;
+ v.push_back(o);
+ }
+ }
+
+ void DBClientCursor::attach( AScopedConnection * conn ){
+ assert( _scopedHost.size() == 0 );
+ assert( connector == conn->get() );
+ _scopedHost = conn->getHost();
+ conn->done();
+ connector = 0;
+ }
+
+ DBClientCursor::~DBClientCursor() {
+ if (!this)
+ return;
+
+ DESTRUCTOR_GUARD (
+
+ if ( cursorId && _ownCursor ) {
+ BufBuilder b;
+ b.appendNum( (int)0 ); // reserved
+ b.appendNum( (int)1 ); // number
+ b.appendNum( cursorId );
+
+ Message m;
+ m.setData( dbKillCursors , b.buf() , b.len() );
+
+ if ( connector ){
+ connector->sayPiggyBack( m );
+ }
+ else {
+ assert( _scopedHost.size() );
+ ScopedDbConnection conn( _scopedHost );
+ conn->sayPiggyBack( m );
+ conn.done();
+ }
+ }
+
+ );
+ }
+
+
+} // namespace mongo
diff --git a/client/dbclientcursor.h b/client/dbclientcursor.h
new file mode 100644
index 0000000..51cdc13
--- /dev/null
+++ b/client/dbclientcursor.h
@@ -0,0 +1,204 @@
+// file dbclientcursor.h
+
+/* Copyright 2009 10gen Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "../pch.h"
+#include "../util/message.h"
+#include "../db/jsobj.h"
+#include "../db/json.h"
+#include <stack>
+
+namespace mongo {
+
+ class AScopedConnection;
+
+ /** Queries return a cursor object */
+ class DBClientCursor : boost::noncopyable {
+ public:
+ /** If true, safe to call next(). Requests more from server if necessary. */
+ bool more();
+
+ /** If true, there is more in our local buffers to be fetched via next(). Returns
+ false when a getMore request back to server would be required. You can use this
+ if you want to exhaust whatever data has been fetched to the client already but
+ then perhaps stop.
+ */
+ int objsLeftInBatch() const { _assertIfNull(); return _putBack.size() + nReturned - pos; }
+ bool moreInCurrentBatch() { return objsLeftInBatch() > 0; }
+
+ /** next
+ @return next object in the result cursor.
+ on an error at the remote server, you will get back:
+ { $err: <string> }
+ if you do not want to handle that yourself, call nextSafe().
+ */
+ BSONObj next();
+
+ /**
+ restore an object previously returned by next() to the cursor
+ */
+ void putBack( const BSONObj &o ) { _putBack.push( o.getOwned() ); }
+
+ /** throws AssertionException if get back { $err : ... } */
+ BSONObj nextSafe() {
+ BSONObj o = next();
+ BSONElement e = o.firstElement();
+ if( strcmp(e.fieldName(), "$err") == 0 ) {
+ if( logLevel >= 5 )
+ log() << "nextSafe() error " << o.toString() << endl;
+ uassert(13106, "nextSafe(): " + o.toString(), false);
+ }
+ return o;
+ }
+
+ /** peek ahead at items buffered for future next() calls.
+ never requests new data from the server. so peek only effective
+ with what is already buffered.
+ WARNING: no support for _putBack yet!
+ */
+ void peek(vector<BSONObj>&, int atMost);
+
+ /**
+ iterate the rest of the cursor and return the number if items
+ */
+ int itcount(){
+ int c = 0;
+ while ( more() ){
+ next();
+ c++;
+ }
+ return c;
+ }
+
+ /** cursor no longer valid -- use with tailable cursors.
+ note you should only rely on this once more() returns false;
+ 'dead' may be preset yet some data still queued and locally
+ available from the dbclientcursor.
+ */
+ bool isDead() const {
+ return !this || cursorId == 0;
+ }
+
+ bool tailable() const {
+ return (opts & QueryOption_CursorTailable) != 0;
+ }
+
+ /** see ResultFlagType (constants.h) for flag values
+ mostly these flags are for internal purposes -
+ ResultFlag_ErrSet is the possible exception to that
+ */
+ bool hasResultFlag( int flag ){
+ _assertIfNull();
+ return (resultFlags & flag) != 0;
+ }
+
+ DBClientCursor( DBConnector *_connector, const string &_ns, BSONObj _query, int _nToReturn,
+ int _nToSkip, const BSONObj *_fieldsToReturn, int queryOptions , int bs ) :
+ connector(_connector),
+ ns(_ns),
+ query(_query),
+ nToReturn(_nToReturn),
+ haveLimit( _nToReturn > 0 && !(queryOptions & QueryOption_CursorTailable)),
+ nToSkip(_nToSkip),
+ fieldsToReturn(_fieldsToReturn),
+ opts(queryOptions),
+ batchSize(bs==1?2:bs),
+ m(new Message()),
+ cursorId(),
+ nReturned(),
+ pos(),
+ data(),
+ _ownCursor( true ){
+ }
+
+ DBClientCursor( DBConnector *_connector, const string &_ns, long long _cursorId, int _nToReturn, int options ) :
+ connector(_connector),
+ ns(_ns),
+ nToReturn( _nToReturn ),
+ haveLimit( _nToReturn > 0 && !(options & QueryOption_CursorTailable)),
+ opts( options ),
+ m(new Message()),
+ cursorId( _cursorId ),
+ nReturned(),
+ pos(),
+ data(),
+ _ownCursor( true ){
+ }
+
+ virtual ~DBClientCursor();
+
+ long long getCursorId() const { return cursorId; }
+
+ /** by default we "own" the cursor and will send the server a KillCursor
+ message when ~DBClientCursor() is called. This function overrides that.
+ */
+ void decouple() { _ownCursor = false; }
+
+ void attach( AScopedConnection * conn );
+
+ private:
+ friend class DBClientBase;
+ friend class DBClientConnection;
+ bool init();
+ int nextBatchSize();
+ DBConnector *connector;
+ string ns;
+ BSONObj query;
+ int nToReturn;
+ bool haveLimit;
+ int nToSkip;
+ const BSONObj *fieldsToReturn;
+ int opts;
+ int batchSize;
+ auto_ptr<Message> m;
+ stack< BSONObj > _putBack;
+ int resultFlags;
+ long long cursorId;
+ int nReturned;
+ int pos;
+ const char *data;
+ void dataReceived();
+ void requestMore();
+ void exhaustReceiveMore(); // for exhaust
+ bool _ownCursor; // see decouple()
+ string _scopedHost;
+
+ // Don't call from a virtual function
+ void _assertIfNull() const { uassert(13348, "connection died", this); }
+ };
+
+ /** iterate over objects in current batch only - will not cause a network call
+ */
+ class DBClientCursorBatchIterator {
+ public:
+ DBClientCursorBatchIterator( DBClientCursor &c ) : _c( c ), _n() {}
+ bool moreInCurrentBatch() { return _c.moreInCurrentBatch(); }
+ BSONObj nextSafe() {
+ massert( 13383, "BatchIterator empty", moreInCurrentBatch() );
+ ++_n;
+ return _c.nextSafe();
+ }
+ int n() const { return _n; }
+ private:
+ DBClientCursor &_c;
+ int _n;
+ };
+
+} // namespace mongo
+
+#include "undef_macros.h"
diff --git a/client/distlock.cpp b/client/distlock.cpp
new file mode 100644
index 0000000..c264597
--- /dev/null
+++ b/client/distlock.cpp
@@ -0,0 +1,225 @@
+// @file distlock.h
+
+/* Copyright 2009 10gen Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "pch.h"
+#include "dbclient.h"
+#include "distlock.h"
+
+namespace mongo {
+
+ string lockPingNS = "config.lockpings";
+
+ ThreadLocalValue<string> distLockIds("");
+
+ string getDistLockProcess(){
+ static string s;
+ if ( s.empty() ){
+ stringstream ss;
+ ss << getHostNameCached() << ":" << time(0) << ":" << rand();
+ s = ss.str();
+ }
+ return s;
+ }
+
+ string getDistLockId(){
+ string s = distLockIds.get();
+ if ( s.empty() ){
+ stringstream ss;
+ ss << getDistLockProcess() << ":" << getThreadName() << ":" << rand();
+ s = ss.str();
+ distLockIds.set( s );
+ }
+ return s;
+ }
+
+ void distLockPingThread( ConnectionString addr ){
+ static int loops = 0;
+ while( ! inShutdown() ){
+ try {
+ ScopedDbConnection conn( addr );
+
+ // do ping
+ conn->update( lockPingNS ,
+ BSON( "_id" << getDistLockProcess() ) ,
+ BSON( "$set" << BSON( "ping" << DATENOW ) ) ,
+ true );
+
+
+ // remove really old entries
+ BSONObjBuilder f;
+ f.appendDate( "$lt" , jsTime() - ( 4 * 86400 * 1000 ) );
+ BSONObj r = BSON( "ping" << f.obj() );
+ conn->remove( lockPingNS , r );
+
+ // create index so remove is fast even with a lot of servers
+ if ( loops++ == 0 ){
+ conn->ensureIndex( lockPingNS , BSON( "ping" << 1 ) );
+ }
+
+ conn.done();
+ }
+ catch ( std::exception& e ){
+ log( LL_WARNING ) << "couldn't ping: " << e.what() << endl;
+ }
+ sleepsecs(30);
+ }
+ }
+
+
+ class DistributedLockPinger {
+ public:
+ DistributedLockPinger()
+ : _mutex( "DistributedLockPinger" ){
+ }
+
+ void got( const ConnectionString& conn ){
+ string s = conn.toString();
+ scoped_lock lk( _mutex );
+ if ( _seen.count( s ) > 0 )
+ return;
+ boost::thread t( boost::bind( &distLockPingThread , conn ) );
+ _seen.insert( s );
+ }
+
+ set<string> _seen;
+ mongo::mutex _mutex;
+
+ } distLockPinger;
+
+ DistributedLock::DistributedLock( const ConnectionString& conn , const string& name , unsigned takeoverMinutes )
+ : _conn(conn),_name(name),_takeoverMinutes(takeoverMinutes){
+ _id = BSON( "_id" << name );
+ _ns = "config.locks";
+ distLockPinger.got( conn );
+ }
+
+
+ bool DistributedLock::lock_try( string why , BSONObj * other ){
+ // check for recrusive
+ assert( getState() == 0 );
+
+ ScopedDbConnection conn( _conn );
+
+ BSONObjBuilder queryBuilder;
+ queryBuilder.appendElements( _id );
+ queryBuilder.append( "state" , 0 );
+
+ { // make sure its there so we can use simple update logic below
+ BSONObj o = conn->findOne( _ns , _id );
+ if ( o.isEmpty() ){
+ try {
+ conn->insert( _ns , BSON( "_id" << _name << "state" << 0 << "who" << "" ) );
+ }
+ catch ( UserException& ){
+ }
+ }
+ else if ( o["state"].numberInt() > 0 ){
+ BSONObj lastPing = conn->findOne( lockPingNS , o["process"].wrap( "_id" ) );
+ if ( lastPing.isEmpty() ){
+ // TODO: maybe this should clear, not sure yet
+ log() << "lastPing is empty! this could be bad: " << o << endl;
+ conn.done();
+ return false;
+ }
+
+ unsigned long long elapsed = jsTime() - lastPing["ping"].Date(); // in ms
+ elapsed = elapsed / ( 1000 * 60 ); // convert to minutes
+
+ if ( elapsed <= _takeoverMinutes ){
+ log(1) << "dist_lock lock failed because taken by: " << o << endl;
+ conn.done();
+ return false;
+ }
+
+ log() << "dist_lock forcefully taking over from: " << o << " elapsed minutes: " << elapsed << endl;
+ conn->update( _ns , _id , BSON( "$set" << BSON( "state" << 0 ) ) );
+ }
+ else if ( o["ts"].type() ){
+ queryBuilder.append( o["ts"] );
+ }
+ }
+
+ OID ts;
+ ts.init();
+
+ bool gotLock = false;
+ BSONObj now;
+
+ BSONObj whatIWant = BSON( "$set" << BSON( "state" << 1 <<
+ "who" << getDistLockId() << "process" << getDistLockProcess() <<
+ "when" << DATENOW << "why" << why << "ts" << ts ) );
+ try {
+ conn->update( _ns , queryBuilder.obj() , whatIWant );
+
+ BSONObj o = conn->getLastErrorDetailed();
+ now = conn->findOne( _ns , _id );
+
+ if ( o["n"].numberInt() == 0 ){
+ if ( other )
+ *other = now;
+ gotLock = false;
+ }
+ else {
+ gotLock = true;
+ }
+
+ }
+ catch ( UpdateNotTheSame& up ){
+ // this means our update got through on some, but not others
+
+ for ( unsigned i=0; i<up.size(); i++ ){
+ ScopedDbConnection temp( up[i].first );
+ BSONObj temp2 = temp->findOne( _ns , _id );
+
+ if ( now.isEmpty() || now["ts"] < temp2["ts"] ){
+ now = temp2.getOwned();
+ }
+
+ temp.done();
+ }
+
+ if ( now["ts"].OID() == ts ){
+ gotLock = true;
+ conn->update( _ns , _id , whatIWant );
+ }
+ else {
+ gotLock = false;
+ }
+ }
+
+ conn.done();
+
+ log(1) << "dist_lock lock gotLock: " << gotLock << " now: " << now << endl;
+
+ if ( ! gotLock )
+ return false;
+
+ _state.set( 1 );
+ return true;
+ }
+
+ void DistributedLock::unlock(){
+ ScopedDbConnection conn( _conn );
+ conn->update( _ns , _id, BSON( "$set" << BSON( "state" << 0 ) ) );
+ log(1) << "dist_lock unlock: " << conn->findOne( _ns , _id ) << endl;
+ conn.done();
+
+ _state.set( 0 );
+ }
+
+
+}
diff --git a/client/distlock.h b/client/distlock.h
new file mode 100644
index 0000000..3a03390
--- /dev/null
+++ b/client/distlock.h
@@ -0,0 +1,91 @@
+// distlock.h
+
+/* Copyright 2009 10gen Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+/**
+ * distributed locking mechanism
+ */
+
+#include "../pch.h"
+#include "dbclient.h"
+#include "connpool.h"
+#include "redef_macros.h"
+#include "syncclusterconnection.h"
+
+namespace mongo {
+
+ class DistributedLock {
+ public:
+
+ /**
+ * @param takeoverMinutes how long before we steal lock in minutes
+ */
+ DistributedLock( const ConnectionString& conn , const string& name , unsigned takeoverMinutes = 10 );
+
+ int getState(){
+ return _state.get();
+ }
+
+ bool isLocked(){
+ return _state.get() != 0;
+ }
+
+ bool lock_try( string why , BSONObj * other = 0 );
+ void unlock();
+
+ private:
+ ConnectionString _conn;
+ string _name;
+ unsigned _takeoverMinutes;
+
+ string _ns;
+ BSONObj _id;
+
+ ThreadLocalValue<int> _state;
+ };
+
+ class dist_lock_try {
+ public:
+
+ dist_lock_try( DistributedLock * lock , string why )
+ : _lock(lock){
+ _got = _lock->lock_try( why , &_other );
+ }
+
+ ~dist_lock_try(){
+ if ( _got ){
+ _lock->unlock();
+ }
+ }
+
+ bool got() const {
+ return _got;
+ }
+
+ BSONObj other() const {
+ return _other;
+ }
+
+ private:
+ DistributedLock * _lock;
+ bool _got;
+ BSONObj _other;
+
+ };
+
+}
+
diff --git a/client/distlock_test.cpp b/client/distlock_test.cpp
new file mode 100644
index 0000000..0879b6e
--- /dev/null
+++ b/client/distlock_test.cpp
@@ -0,0 +1,80 @@
+// distlock_test.h
+
+/* Copyright 2009 10gen Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "../pch.h"
+#include "dbclient.h"
+#include "distlock.h"
+#include "../db/commands.h"
+
+namespace mongo {
+
+ class TestDistLockWithSync : public Command {
+ public:
+ TestDistLockWithSync() : Command( "_testDistLockWithSyncCluster" ){}
+ virtual void help( stringstream& help ) const {
+ help << "should not be calling this directly" << endl;
+ }
+
+ virtual bool slaveOk() const { return false; }
+ virtual bool adminOnly() const { return true; }
+ virtual LockType locktype() const { return NONE; }
+
+ static void runThread(){
+ for ( int i=0; i<1000; i++ ){
+ if ( current->lock_try( "test" ) ){
+ gotit++;
+ for ( int j=0; j<2000; j++ ){
+ count++;
+ }
+ current->unlock();
+ }
+ }
+ }
+
+ bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){
+ DistributedLock lk( ConnectionString( cmdObj["host"].String() , ConnectionString::SYNC ), "testdistlockwithsync" );
+ current = &lk;
+ count = 0;
+ gotit = 0;
+
+ vector<shared_ptr<boost::thread> > l;
+ for ( int i=0; i<4; i++ ){
+ l.push_back( shared_ptr<boost::thread>( new boost::thread( runThread ) ) );
+ }
+
+ for ( unsigned i=0; i<l.size(); i++ )
+ l[i]->join();
+
+ result.append( "count" , count );
+ result.append( "gotit" , gotit );
+ current = 0;
+ return count == gotit * 2000;
+ }
+
+ static DistributedLock * current;
+ static int count;
+ static int gotit;
+
+ } testDistLockWithSyncCmd;
+
+
+ DistributedLock * TestDistLockWithSync::current;
+ int TestDistLockWithSync::count;
+ int TestDistLockWithSync::gotit;
+
+
+}
diff --git a/client/examples/clientTest.cpp b/client/examples/clientTest.cpp
index bbb82f6..83a556a 100644
--- a/client/examples/clientTest.cpp
+++ b/client/examples/clientTest.cpp
@@ -137,10 +137,14 @@ int main( int argc, const char **argv ) {
assert( conn.getLastError() == "" );
// nonexistent index test
- assert( conn.findOne(ns, Query("{name:\"eliot\"}").hint("{foo:1}")).hasElement("$err") );
- assert( conn.getLastError() == "bad hint" );
- conn.resetError();
- assert( conn.getLastError() == "" );
+ bool asserted = false;
+ try {
+ conn.findOne(ns, Query("{name:\"eliot\"}").hint("{foo:1}"));
+ }
+ catch ( ... ){
+ asserted = true;
+ }
+ assert( asserted );
//existing index
assert( conn.findOne(ns, Query("{name:'eliot'}").hint("{name:1}")).hasElement("name") );
@@ -176,8 +180,9 @@ int main( int argc, const char **argv ) {
}
BSONObj found = conn.findOne( tsns , mongo::BSONObj() );
+ cout << "old: " << out << "\nnew: " << found << endl;
assert( ( oldTime < found["ts"].timestampTime() ) ||
- ( oldInc + 1 == found["ts"].timestampInc() ) );
+ ( oldTime == found["ts"].timestampTime() && oldInc < found["ts"].timestampInc() ) );
}
@@ -185,9 +190,9 @@ int main( int argc, const char **argv ) {
assert( conn.getLastError().empty() );
BufBuilder b;
- b.append( (int)0 ); // reserved
- b.append( (int)-1 ); // invalid # of cursors triggers exception
- b.append( (int)-1 ); // bogus cursor id
+ b.appendNum( (int)0 ); // reserved
+ b.appendNum( (int)-1 ); // invalid # of cursors triggers exception
+ b.appendNum( (int)-1 ); // bogus cursor id
Message m;
m.setData( dbKillCursors, b.buf(), b.len() );
diff --git a/client/examples/tail.cpp b/client/examples/tail.cpp
index e844b32..3738b4f 100644
--- a/client/examples/tail.cpp
+++ b/client/examples/tail.cpp
@@ -22,34 +22,25 @@
using namespace mongo;
-void foo() { }
+void tail(DBClientBase& conn, const char *ns) {
+ BSONElement lastId = minKey.firstElement();
+ Query query = Query();
-/* "tail" the specified namespace, outputting elements as they are added.
- _id values must be inserted in increasing order for this to work. (Some other
- field could also be used.)
+ auto_ptr<DBClientCursor> c =
+ conn.query(ns, query, 0, 0, 0, QueryOption_CursorTailable);
- Note: one could use a capped collection and $natural order to do something
- similar, using sort({$natural:1}), and then not need to worry about
- _id's being in order.
-*/
-void tail(DBClientBase& conn, const char *ns) {
- conn.ensureIndex(ns, fromjson("{_id:1}"));
- BSONElement lastId;
- Query query = Query().sort("_id");
while( 1 ) {
- auto_ptr<DBClientCursor> c = conn.query(ns, query, 0, 0, 0, Option_CursorTailable);
- while( 1 ) {
- if( !c->more() ) {
- if( c->isDead() ) {
- // we need to requery
- break;
- }
- sleepsecs(1);
+ if( !c->more() ) {
+ if( c->isDead() ) {
+ break; // we need to requery
+ }
+
+ // all data (so far) exhausted, wait for more
+ sleepsecs(1);
+ continue;
}
BSONObj o = c->next();
lastId = o["_id"];
cout << o.toString() << endl;
- }
- query = QUERY( "_id" << GT << lastId ).sort("_id");
}
}
diff --git a/client/gridfs.cpp b/client/gridfs.cpp
index 892ec6e..b2ae478 100644
--- a/client/gridfs.cpp
+++ b/client/gridfs.cpp
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-#include "../stdafx.h"
+#include "pch.h"
#include <fcntl.h>
#include <utility>
@@ -34,15 +34,15 @@ namespace mongo {
const unsigned DEFAULT_CHUNK_SIZE = 256 * 1024;
- Chunk::Chunk( BSONObj o ){
+ GridFSChunk::GridFSChunk( BSONObj o ){
_data = o;
}
- Chunk::Chunk( BSONObj fileObject , int chunkNumber , const char * data , int len ){
+ GridFSChunk::GridFSChunk( BSONObj fileObject , int chunkNumber , const char * data , int len ){
BSONObjBuilder b;
b.appendAs( fileObject["_id"] , "files_id" );
b.append( "n" , chunkNumber );
- b.appendBinDataArray( "data" , data , len );
+ b.appendBinData( "data" , len, BinDataGeneral, data );
_data = b.obj();
}
@@ -50,7 +50,7 @@ namespace mongo {
GridFS::GridFS( DBClientBase& client , const string& dbName , const string& prefix ) : _client( client ) , _dbName( dbName ) , _prefix( prefix ){
_filesNS = dbName + "." + prefix + ".files";
_chunksNS = dbName + "." + prefix + ".chunks";
-
+ _chunkSize = DEFAULT_CHUNK_SIZE;
client.ensureIndex( _filesNS , BSON( "filename" << 1 ) );
client.ensureIndex( _chunksNS , BSON( "files_id" << 1 << "n" << 1 ) );
@@ -60,6 +60,11 @@ namespace mongo {
}
+ void GridFS::setChunkSize(unsigned int size) {
+ massert( 13296 , "invalid chunk size is specified", (size == 0));
+ _chunkSize = size;
+ }
+
BSONObj GridFS::storeFile( const char* data , size_t length , const string& remoteName , const string& contentType){
massert( 10279 , "large files not yet implemented", length <= 0xffffffff);
char const * const end = data + length;
@@ -70,8 +75,8 @@ namespace mongo {
int chunkNumber = 0;
while (data < end){
- int chunkLen = MIN(DEFAULT_CHUNK_SIZE, (unsigned)(end-data));
- Chunk c(idObj, chunkNumber, data, chunkLen);
+ int chunkLen = MIN(_chunkSize, (unsigned)(end-data));
+ GridFSChunk c(idObj, chunkNumber, data, chunkLen);
_client.insert( _chunksNS.c_str() , c._data );
chunkNumber++;
@@ -99,22 +104,24 @@ namespace mongo {
int chunkNumber = 0;
gridfs_offset length = 0;
while (!feof(fd)){
- boost::scoped_array<char>buf (new char[DEFAULT_CHUNK_SIZE]);
- char* bufPos = buf.get();
+ //boost::scoped_array<char>buf (new char[_chunkSize+1]);
+ char * buf = new char[_chunkSize+1];
+ char* bufPos = buf;//.get();
unsigned int chunkLen = 0; // how much in the chunk now
- while(chunkLen != DEFAULT_CHUNK_SIZE && !feof(fd)){
- int readLen = fread(bufPos, 1, DEFAULT_CHUNK_SIZE - chunkLen, fd);
+ while(chunkLen != _chunkSize && !feof(fd)){
+ int readLen = fread(bufPos, 1, _chunkSize - chunkLen, fd);
chunkLen += readLen;
bufPos += readLen;
- assert(chunkLen <= DEFAULT_CHUNK_SIZE);
+ assert(chunkLen <= _chunkSize);
}
- Chunk c(idObj, chunkNumber, buf.get(), chunkLen);
+ GridFSChunk c(idObj, chunkNumber, buf, chunkLen);
_client.insert( _chunksNS.c_str() , c._data );
length += chunkLen;
chunkNumber++;
+ delete[] buf;
}
if (fd != stdin)
@@ -125,7 +132,7 @@ namespace mongo {
return insertFile((remoteName.empty() ? fileName : remoteName), id, length, contentType);
}
- BSONObj GridFS::insertFile(const string& name, const OID& id, unsigned length, const string& contentType){
+ BSONObj GridFS::insertFile(const string& name, const OID& id, gridfs_offset length, const string& contentType){
BSONObj res;
if ( ! _client.runCommand( _dbName.c_str() , BSON( "filemd5" << id << "root" << _prefix ) , res ) )
@@ -134,12 +141,17 @@ namespace mongo {
BSONObjBuilder file;
file << "_id" << id
<< "filename" << name
- << "length" << (unsigned) length
- << "chunkSize" << DEFAULT_CHUNK_SIZE
+ << "chunkSize" << _chunkSize
<< "uploadDate" << DATENOW
<< "md5" << res["md5"]
;
+ if (length < 1024*1024*1024){ // 2^30
+ file << "length" << (int) length;
+ }else{
+ file << "length" << (long long) length;
+ }
+
if (!contentType.empty())
file << "contentType" << contentType;
@@ -190,7 +202,7 @@ namespace mongo {
return meta_element.embeddedObject();
}
- Chunk GridFile::getChunk( int n ){
+ GridFSChunk GridFile::getChunk( int n ){
_exists();
BSONObjBuilder b;
b.appendAs( _obj["_id"] , "files_id" );
@@ -198,7 +210,7 @@ namespace mongo {
BSONObj o = _grid->_client.findOne( _grid->_chunksNS.c_str() , b.obj() );
uassert( 10014 , "chunk is empty!" , ! o.isEmpty() );
- return Chunk(o);
+ return GridFSChunk(o);
}
gridfs_offset GridFile::write( ostream & out ){
@@ -207,7 +219,7 @@ namespace mongo {
const int num = getNumChunks();
for ( int i=0; i<num; i++ ){
- Chunk c = getChunk( i );
+ GridFSChunk c = getChunk( i );
int len;
const char * data = c.data( len );
@@ -222,6 +234,7 @@ namespace mongo {
return write( cout );
} else {
ofstream out(where.c_str() , ios::out | ios::binary );
+ uassert(13325, "couldn't open file: " + where, out.is_open() );
return write( out );
}
}
diff --git a/client/gridfs.h b/client/gridfs.h
index 3165d5f..1c55f79 100644
--- a/client/gridfs.h
+++ b/client/gridfs.h
@@ -18,6 +18,7 @@
#pragma once
#include "dbclient.h"
+#include "redef_macros.h"
namespace mongo {
@@ -26,26 +27,19 @@ namespace mongo {
class GridFS;
class GridFile;
- class Chunk {
+ class GridFSChunk {
public:
- Chunk( BSONObj data );
- Chunk( BSONObj fileId , int chunkNumber , const char * data , int len );
+ GridFSChunk( BSONObj data );
+ GridFSChunk( BSONObj fileId , int chunkNumber , const char * data , int len );
int len(){
int len;
- const char * data = _data["data"].binData( len );
- int * foo = (int*)data;
- assert( len - 4 == foo[0] );
- return len - 4;
+ _data["data"].binDataClean( len );
+ return len;
}
const char * data( int & len ){
- const char * data = _data["data"].binData( len );
- int * foo = (int*)data;
- assert( len - 4 == foo[0] );
-
- len = len - 4;
- return data + 4;
+ return _data["data"].binDataClean( len );
}
private:
@@ -68,6 +62,11 @@ namespace mongo {
~GridFS();
/**
+ * @param
+ */
+ void setChunkSize(unsigned int size);
+
+ /**
* puts the file reference by fileName into the db
* @param fileName local filename relative to process
* @param remoteName optional filename to use for file stored in GridFS
@@ -122,9 +121,10 @@ namespace mongo {
string _prefix;
string _filesNS;
string _chunksNS;
+ unsigned int _chunkSize;
// insert fileobject. All chunks must be in DB.
- BSONObj insertFile(const string& name, const OID& id, unsigned length, const string& contentType);
+ BSONObj insertFile(const string& name, const OID& id, gridfs_offset length, const string& contentType);
friend class GridFile;
};
@@ -176,7 +176,7 @@ namespace mongo {
return (int) ceil( (double)getContentLength() / (double)getChunkSize() );
}
- Chunk getChunk( int n );
+ GridFSChunk getChunk( int n );
/**
write the file to the output stream
@@ -200,4 +200,4 @@ namespace mongo {
};
}
-
+#include "undef_macros.h"
diff --git a/client/model.cpp b/client/model.cpp
index 3978105..7861b91 100644
--- a/client/model.cpp
+++ b/client/model.cpp
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-#include "stdafx.h"
+#include "pch.h"
#include "model.h"
#include "connpool.h"
@@ -57,6 +57,31 @@ namespace mongo {
BSONObjBuilder b;
serialize( b );
+ BSONElement myId;
+ {
+ BSONObjIterator i = b.iterator();
+ while ( i.more() ){
+ BSONElement e = i.next();
+ if ( strcmp( e.fieldName() , "_id" ) == 0 ){
+ myId = e;
+ break;
+ }
+ }
+ }
+
+ if ( myId.type() ){
+ if ( _id.isEmpty() ){
+ _id = myId.wrap();
+ }
+ else if ( myId.woCompare( _id.firstElement() ) ){
+ stringstream ss;
+ ss << "_id from serialize and stored differ: ";
+ ss << '[' << myId << "] != ";
+ ss << '[' << _id.firstElement() << ']';
+ throw UserException( 13121 , ss.str() );
+ }
+ }
+
if ( _id.isEmpty() ){
OID oid;
oid.init();
@@ -69,18 +94,22 @@ namespace mongo {
log(4) << "inserted new model " << getNS() << " " << o << endl;
}
else {
- BSONElement id = _id["_id"];
- b.append( id );
+ if ( myId.eoo() ){
+ myId = _id["_id"];
+ b.append( myId );
+ }
+
+ assert( ! myId.eoo() );
BSONObjBuilder qb;
- qb.append( id );
+ qb.append( myId );
BSONObj q = qb.obj();
BSONObj o = b.obj();
- log(4) << "updated old model" << getNS() << " " << q << " " << o << endl;
+ log(4) << "updated model" << getNS() << " " << q << " " << o << endl;
- conn->update( getNS() , q , o );
+ conn->update( getNS() , q , o , true );
}
@@ -94,4 +123,16 @@ namespace mongo {
throw UserException( 9003 , (string)"error on Model::save: " + errmsg );
}
+ BSONObj Model::toObject(){
+ BSONObjBuilder b;
+ serialize( b );
+ return b.obj();
+ }
+
+ void Model::append( const char * name , BSONObjBuilder& b ){
+ BSONObjBuilder bb( b.subobjStart( name ) );
+ serialize( bb );
+ bb.done();
+ }
+
} // namespace mongo
diff --git a/client/model.h b/client/model.h
index f3a63ad..108efc0 100644
--- a/client/model.h
+++ b/client/model.h
@@ -18,6 +18,7 @@
#pragma once
#include "dbclient.h"
+#include "redef_macros.h"
namespace mongo {
@@ -40,7 +41,9 @@ namespace mongo {
virtual const char * getNS() = 0;
virtual void serialize(BSONObjBuilder& to) = 0;
virtual void unserialize(const BSONObj& from) = 0;
-
+ virtual BSONObj toObject();
+ virtual void append( const char * name , BSONObjBuilder& b );
+
virtual string modelServer() = 0;
/** Load a single object.
@@ -55,3 +58,5 @@ namespace mongo {
};
} // namespace mongo
+
+#include "undef_macros.h"
diff --git a/client/parallel.cpp b/client/parallel.cpp
index bd29013..eeadb89 100644
--- a/client/parallel.cpp
+++ b/client/parallel.cpp
@@ -16,12 +16,13 @@
*/
-#include "stdafx.h"
+#include "pch.h"
#include "parallel.h"
#include "connpool.h"
#include "../db/queryutil.h"
#include "../db/dbmessage.h"
#include "../s/util.h"
+#include "../s/shard.h"
namespace mongo {
@@ -31,8 +32,13 @@ namespace mongo {
_ns = q.ns;
_query = q.query.copy();
_options = q.queryOptions;
- _fields = q.fields;
+ _fields = q.fields.copy();
+ _batchSize = q.ntoreturn;
+ if ( _batchSize == 1 )
+ _batchSize = 2;
+
_done = false;
+ _didInit = false;
}
ClusteredCursor::ClusteredCursor( const string& ns , const BSONObj& q , int options , const BSONObj& fields ){
@@ -40,37 +46,84 @@ namespace mongo {
_query = q.getOwned();
_options = options;
_fields = fields.getOwned();
+ _batchSize = 0;
+
_done = false;
+ _didInit = false;
}
ClusteredCursor::~ClusteredCursor(){
_done = true; // just in case
}
+
+ void ClusteredCursor::init(){
+ if ( _didInit )
+ return;
+ _didInit = true;
+ _init();
+ }
- auto_ptr<DBClientCursor> ClusteredCursor::query( const string& server , int num , BSONObj extra ){
+ auto_ptr<DBClientCursor> ClusteredCursor::query( const string& server , int num , BSONObj extra , int skipLeft ){
uassert( 10017 , "cursor already done" , ! _done );
+ assert( _didInit );
BSONObj q = _query;
if ( ! extra.isEmpty() ){
q = concatQuery( q , extra );
}
- ScopedDbConnection conn( server );
- checkShardVersion( conn.conn() , _ns );
+ ShardConnection conn( server , _ns );
+
+ if ( conn.setVersion() ){
+ conn.done();
+ throw StaleConfigException( _ns , "ClusteredCursor::query ShardConnection had to change" , true );
+ }
+
+ if ( logLevel >= 5 ){
+ log(5) << "ClusteredCursor::query (" << type() << ") server:" << server
+ << " ns:" << _ns << " query:" << q << " num:" << num
+ << " _fields:" << _fields << " options: " << _options << endl;
+ }
+
+ auto_ptr<DBClientCursor> cursor =
+ conn->query( _ns , q , num , 0 , ( _fields.isEmpty() ? 0 : &_fields ) , _options , _batchSize == 0 ? 0 : _batchSize + skipLeft );
- log(5) << "ClusteredCursor::query server:" << server << " ns:" << _ns << " query:" << q << " num:" << num << " _fields:" << _fields << " options: " << _options << endl;
- auto_ptr<DBClientCursor> cursor = conn->query( _ns.c_str() , q , num , 0 , ( _fields.isEmpty() ? 0 : &_fields ) , _options );
- if ( cursor->hasResultFlag( QueryResult::ResultFlag_ShardConfigStale ) )
+ assert( cursor.get() );
+
+ if ( cursor->hasResultFlag( ResultFlag_ShardConfigStale ) ){
+ conn.done();
throw StaleConfigException( _ns , "ClusteredCursor::query" );
+ }
+
+ if ( cursor->hasResultFlag( ResultFlag_ErrSet ) ){
+ conn.done();
+ BSONObj o = cursor->next();
+ throw UserException( o["code"].numberInt() , o["$err"].String() );
+ }
+
+
+ cursor->attach( &conn );
conn.done();
return cursor;
}
+ BSONObj ClusteredCursor::explain( const string& server , BSONObj extra ){
+ BSONObj q = _query;
+ if ( ! extra.isEmpty() ){
+ q = concatQuery( q , extra );
+ }
+
+ ShardConnection conn( server , _ns );
+ BSONObj o = conn->findOne( _ns , Query( q ).explain() );
+ conn.done();
+ return o;
+ }
+
BSONObj ClusteredCursor::concatQuery( const BSONObj& query , const BSONObj& extraFilter ){
if ( ! query.hasField( "query" ) )
return _concatFilter( query , extraFilter );
-
+
BSONObjBuilder b;
BSONObjIterator i( query );
while ( i.more() ){
@@ -94,6 +147,112 @@ namespace mongo {
// TODO: should do some simplification here if possibl ideally
}
+ BSONObj ClusteredCursor::explain(){
+ BSONObjBuilder b;
+ b.append( "clusteredType" , type() );
+
+ long long nscanned = 0;
+ long long nscannedObjects = 0;
+ long long n = 0;
+ long long millis = 0;
+ double numExplains = 0;
+
+ map<string,list<BSONObj> > out;
+ {
+ _explain( out );
+
+ BSONObjBuilder x( b.subobjStart( "shards" ) );
+ for ( map<string,list<BSONObj> >::iterator i=out.begin(); i!=out.end(); ++i ){
+ string shard = i->first;
+ list<BSONObj> l = i->second;
+ BSONArrayBuilder y( x.subarrayStart( shard.c_str() ) );
+ for ( list<BSONObj>::iterator j=l.begin(); j!=l.end(); ++j ){
+ BSONObj temp = *j;
+ y.append( temp );
+
+ nscanned += temp["nscanned"].numberLong();
+ nscannedObjects += temp["nscannedObjects"].numberLong();
+ n += temp["n"].numberLong();
+ millis += temp["millis"].numberLong();
+ numExplains++;
+ }
+ y.done();
+ }
+ x.done();
+ }
+
+ b.appendNumber( "nscanned" , nscanned );
+ b.appendNumber( "nscannedObjects" , nscannedObjects );
+ b.appendNumber( "n" , n );
+ b.appendNumber( "millisTotal" , millis );
+ b.append( "millisAvg" , (int)((double)millis / numExplains ) );
+ b.append( "numQueries" , (int)numExplains );
+ b.append( "numShards" , (int)out.size() );
+
+ return b.obj();
+ }
+
+ // -------- FilteringClientCursor -----------
+ FilteringClientCursor::FilteringClientCursor( const BSONObj filter )
+ : _matcher( filter ) , _done( true ){
+ }
+
+ FilteringClientCursor::FilteringClientCursor( auto_ptr<DBClientCursor> cursor , const BSONObj filter )
+ : _matcher( filter ) , _cursor( cursor ) , _done( cursor.get() == 0 ){
+ }
+
+ FilteringClientCursor::~FilteringClientCursor(){
+ }
+
+ void FilteringClientCursor::reset( auto_ptr<DBClientCursor> cursor ){
+ _cursor = cursor;
+ _next = BSONObj();
+ _done = _cursor.get() == 0;
+ }
+
+ bool FilteringClientCursor::more(){
+ if ( ! _next.isEmpty() )
+ return true;
+
+ if ( _done )
+ return false;
+
+ _advance();
+ return ! _next.isEmpty();
+ }
+
+ BSONObj FilteringClientCursor::next(){
+ assert( ! _next.isEmpty() );
+ assert( ! _done );
+
+ BSONObj ret = _next;
+ _next = BSONObj();
+ _advance();
+ return ret;
+ }
+
+ BSONObj FilteringClientCursor::peek(){
+ if ( _next.isEmpty() )
+ _advance();
+ return _next;
+ }
+
+ void FilteringClientCursor::_advance(){
+ assert( _next.isEmpty() );
+ if ( ! _cursor.get() || _done )
+ return;
+
+ while ( _cursor->more() ){
+ _next = _cursor->next();
+ if ( _matcher.matches( _next ) ){
+ if ( ! _cursor->moreInCurrentBatch() )
+ _next = _next.getOwned();
+ return;
+ }
+ _next = BSONObj();
+ }
+ _done = true;
+ }
// -------- SerialServerClusteredCursor -----------
@@ -107,10 +266,21 @@ namespace mongo {
sort( _servers.rbegin() , _servers.rend() );
_serverIndex = 0;
+
+ _needToSkip = q.ntoskip;
}
bool SerialServerClusteredCursor::more(){
- if ( _current.get() && _current->more() )
+
+ // TODO: optimize this by sending on first query and then back counting
+ // tricky in case where 1st server doesn't have any after
+ // need it to send n skipped
+ while ( _needToSkip > 0 && _current.more() ){
+ _current.next();
+ _needToSkip--;
+ }
+
+ if ( _current.more() )
return true;
if ( _serverIndex >= _servers.size() ){
@@ -119,17 +289,21 @@ namespace mongo {
ServerAndQuery& sq = _servers[_serverIndex++];
- _current = query( sq._server , 0 , sq._extra );
- if ( _current->more() )
- return true;
-
- // this sq has nothing, so keep looking
+ _current.reset( query( sq._server , 0 , sq._extra ) );
return more();
}
BSONObj SerialServerClusteredCursor::next(){
uassert( 10018 , "no more items" , more() );
- return _current->next();
+ return _current.next();
+ }
+
+ void SerialServerClusteredCursor::_explain( map< string,list<BSONObj> >& out ){
+ for ( unsigned i=0; i<_servers.size(); i++ ){
+ ServerAndQuery& sq = _servers[i];
+ list<BSONObj> & l = out[sq._server];
+ l.push_back( explain( sq._server , sq._extra ) );
+ }
}
// -------- ParallelSortClusteredCursor -----------
@@ -138,7 +312,8 @@ namespace mongo {
const BSONObj& sortKey )
: ClusteredCursor( q ) , _servers( servers ){
_sortKey = sortKey.getOwned();
- _init();
+ _needToSkip = q.ntoskip;
+ _finishCons();
}
ParallelSortClusteredCursor::ParallelSortClusteredCursor( const set<ServerAndQuery>& servers , const string& ns ,
@@ -146,85 +321,123 @@ namespace mongo {
int options , const BSONObj& fields )
: ClusteredCursor( ns , q.obj , options , fields ) , _servers( servers ){
_sortKey = q.getSort().copy();
- _init();
+ _needToSkip = 0;
+ _finishCons();
}
- void ParallelSortClusteredCursor::_init(){
+ void ParallelSortClusteredCursor::_finishCons(){
_numServers = _servers.size();
- _cursors = new auto_ptr<DBClientCursor>[_numServers];
- _nexts = new BSONObj[_numServers];
+ _cursors = 0;
+
+ if ( ! _sortKey.isEmpty() && ! _fields.isEmpty() ){
+ // we need to make sure the sort key is in the project
+ bool isNegative = false;
+ BSONObjBuilder b;
+ {
+ BSONObjIterator i( _fields );
+ while ( i.more() ){
+ BSONElement e = i.next();
+ b.append( e );
+ if ( ! e.trueValue() )
+ isNegative = true;
+ }
+ }
+
+ {
+ BSONObjIterator i( _sortKey );
+ while ( i.more() ){
+ BSONElement e = i.next();
+ BSONElement f = _fields.getField( e.fieldName() );
+ if ( isNegative ){
+ uassert( 13431 , "have to have sort key in projection and removing it" , f.eoo() );
+ }
+ else if ( f.eoo() ){
+ // add to projection
+ b.append( e );
+ }
+ }
+ }
+
+ _fields = b.obj();
+ }
+ }
+
+ void ParallelSortClusteredCursor::_init(){
+ assert( ! _cursors );
+ _cursors = new FilteringClientCursor[_numServers];
// TODO: parellize
int num = 0;
- for ( set<ServerAndQuery>::iterator i = _servers.begin(); i!=_servers.end(); i++ ){
+ for ( set<ServerAndQuery>::iterator i = _servers.begin(); i!=_servers.end(); ++i ){
const ServerAndQuery& sq = *i;
- _cursors[num++] = query( sq._server , 0 , sq._extra );
+ _cursors[num++].reset( query( sq._server , 0 , sq._extra , _needToSkip ) );
}
}
ParallelSortClusteredCursor::~ParallelSortClusteredCursor(){
delete [] _cursors;
- delete [] _nexts;
+ _cursors = 0;
}
bool ParallelSortClusteredCursor::more(){
- for ( int i=0; i<_numServers; i++ ){
- if ( ! _nexts[i].isEmpty() )
- return true;
- if ( _cursors[i].get() && _cursors[i]->more() )
+ if ( _needToSkip > 0 ){
+ int n = _needToSkip;
+ _needToSkip = 0;
+
+ while ( n > 0 && more() ){
+ BSONObj x = next();
+ n--;
+ }
+
+ _needToSkip = n;
+ }
+
+ for ( int i=0; i<_numServers; i++ ){
+ if ( _cursors[i].more() )
return true;
}
return false;
}
BSONObj ParallelSortClusteredCursor::next(){
- advance();
-
BSONObj best = BSONObj();
int bestFrom = -1;
for ( int i=0; i<_numServers; i++){
- if ( _nexts[i].isEmpty() )
+ if ( ! _cursors[i].more() )
continue;
+
+ BSONObj me = _cursors[i].peek();
if ( best.isEmpty() ){
- best = _nexts[i];
+ best = me;
bestFrom = i;
continue;
}
- int comp = best.woSortOrder( _nexts[i] , _sortKey );
+ int comp = best.woSortOrder( me , _sortKey , true );
if ( comp < 0 )
continue;
- best = _nexts[i];
+ best = me;
bestFrom = i;
}
-
+
uassert( 10019 , "no more elements" , ! best.isEmpty() );
- _nexts[bestFrom] = BSONObj();
+ _cursors[bestFrom].next();
return best;
}
- void ParallelSortClusteredCursor::advance(){
- for ( int i=0; i<_numServers; i++ ){
-
- if ( ! _nexts[i].isEmpty() ){
- // already have a good object there
- continue;
- }
-
- if ( ! _cursors[i]->more() ){
- // cursor is dead, oh well
- continue;
- }
-
- _nexts[i] = _cursors[i]->next();
+ void ParallelSortClusteredCursor::_explain( map< string,list<BSONObj> >& out ){
+ for ( set<ServerAndQuery>::iterator i=_servers.begin(); i!=_servers.end(); ++i ){
+ const ServerAndQuery& sq = *i;
+ list<BSONObj> & l = out[sq._server];
+ l.push_back( explain( sq._server , sq._extra ) );
}
-
+
}
// -----------------
@@ -252,6 +465,7 @@ namespace mongo {
ScopedDbConnection conn( res->_server );
res->_ok = conn->runCommand( res->_db , res->_cmd , res->_res );
res->_done = true;
+ conn.done();
}
shared_ptr<Future::CommandResult> Future::spawnCommand( const string& server , const string& db , const BSONObj& cmd ){
diff --git a/client/parallel.h b/client/parallel.h
index 88864ae..b60190a 100644
--- a/client/parallel.h
+++ b/client/parallel.h
@@ -16,16 +16,53 @@
*/
/**
- tools for wokring in parallel/sharded/clustered environment
+ tools for working in parallel/sharded/clustered environment
*/
-#include "../stdafx.h"
+#include "../pch.h"
#include "dbclient.h"
+#include "redef_macros.h"
#include "../db/dbmessage.h"
+#include "../db/matcher.h"
namespace mongo {
/**
+ * holder for a server address and a query to run
+ */
+ class ServerAndQuery {
+ public:
+ ServerAndQuery( const string& server , BSONObj extra = BSONObj() , BSONObj orderObject = BSONObj() ) :
+ _server( server ) , _extra( extra.getOwned() ) , _orderObject( orderObject.getOwned() ){
+ }
+
+ bool operator<( const ServerAndQuery& other ) const{
+ if ( ! _orderObject.isEmpty() )
+ return _orderObject.woCompare( other._orderObject ) < 0;
+
+ if ( _server < other._server )
+ return true;
+ if ( other._server > _server )
+ return false;
+ return _extra.woCompare( other._extra ) < 0;
+ }
+
+ string toString() const {
+ StringBuilder ss;
+ ss << "server:" << _server << " _extra:" << _extra.toString() << " _orderObject:" << _orderObject.toString();
+ return ss.str();
+ }
+
+ operator string() const {
+ return toString();
+ }
+
+ string _server;
+ BSONObj _extra;
+ BSONObj _orderObject;
+ };
+
+ /**
* this is a cursor that works over a set of servers
* can be used in serial/paralellel as controlled by sub classes
*/
@@ -34,7 +71,10 @@ namespace mongo {
ClusteredCursor( QueryMessage& q );
ClusteredCursor( const string& ns , const BSONObj& q , int options=0 , const BSONObj& fields=BSONObj() );
virtual ~ClusteredCursor();
-
+
+ /** call before using */
+ void init();
+
virtual bool more() = 0;
virtual BSONObj next() = 0;
@@ -42,53 +82,105 @@ namespace mongo {
virtual string type() const = 0;
+ virtual BSONObj explain();
+
protected:
- auto_ptr<DBClientCursor> query( const string& server , int num = 0 , BSONObj extraFilter = BSONObj() );
+
+ virtual void _init() = 0;
+ auto_ptr<DBClientCursor> query( const string& server , int num = 0 , BSONObj extraFilter = BSONObj() , int skipLeft = 0 );
+ BSONObj explain( const string& server , BSONObj extraFilter = BSONObj() );
+
static BSONObj _concatFilter( const BSONObj& filter , const BSONObj& extraFilter );
+ virtual void _explain( map< string,list<BSONObj> >& out ) = 0;
+
string _ns;
BSONObj _query;
int _options;
BSONObj _fields;
+ int _batchSize;
+
+ bool _didInit;
bool _done;
};
- /**
- * holder for a server address and a query to run
- */
- class ServerAndQuery {
+ class FilteringClientCursor {
public:
- ServerAndQuery( const string& server , BSONObj extra = BSONObj() , BSONObj orderObject = BSONObj() ) :
- _server( server ) , _extra( extra.getOwned() ) , _orderObject( orderObject.getOwned() ){
+ FilteringClientCursor( const BSONObj filter = BSONObj() );
+ FilteringClientCursor( auto_ptr<DBClientCursor> cursor , const BSONObj filter = BSONObj() );
+ ~FilteringClientCursor();
+
+ void reset( auto_ptr<DBClientCursor> cursor );
+
+ bool more();
+ BSONObj next();
+
+ BSONObj peek();
+ private:
+ void _advance();
+
+ Matcher _matcher;
+ auto_ptr<DBClientCursor> _cursor;
+
+ BSONObj _next;
+ bool _done;
+ };
+
+
+ class Servers {
+ public:
+ Servers(){
+ }
+
+ void add( const ServerAndQuery& s ){
+ add( s._server , s._extra );
+ }
+
+ void add( const string& server , const BSONObj& filter ){
+ vector<BSONObj>& mine = _filters[server];
+ mine.push_back( filter.getOwned() );
}
+
+ // TOOO: pick a less horrible name
+ class View {
+ View( const Servers* s ){
+ for ( map<string, vector<BSONObj> >::const_iterator i=s->_filters.begin(); i!=s->_filters.end(); ++i ){
+ _servers.push_back( i->first );
+ _filters.push_back( i->second );
+ }
+ }
+ public:
+ int size() const {
+ return _servers.size();
+ }
- bool operator<( const ServerAndQuery& other ) const{
- if ( ! _orderObject.isEmpty() )
- return _orderObject.woCompare( other._orderObject ) < 0;
+ string getServer( int n ) const {
+ return _servers[n];
+ }
+
+ vector<BSONObj> getFilter( int n ) const {
+ return _filters[ n ];
+ }
- if ( _server < other._server )
- return true;
- if ( other._server > _server )
- return false;
- return _extra.woCompare( other._extra ) < 0;
- }
+ private:
+ vector<string> _servers;
+ vector< vector<BSONObj> > _filters;
- string toString() const {
- StringBuilder ss;
- ss << "server:" << _server << " _extra:" << _extra << " _orderObject:" << _orderObject;
- return ss.str();
- }
+ friend class Servers;
+ };
- operator string() const {
- return toString();
+ View view() const {
+ return View( this );
}
+
- string _server;
- BSONObj _extra;
- BSONObj _orderObject;
+ private:
+ map<string, vector<BSONObj> > _filters;
+
+ friend class View;
};
@@ -102,11 +194,18 @@ namespace mongo {
virtual bool more();
virtual BSONObj next();
virtual string type() const { return "SerialServer"; }
- private:
+
+ protected:
+ virtual void _explain( map< string,list<BSONObj> >& out );
+
+ void _init(){}
+
vector<ServerAndQuery> _servers;
unsigned _serverIndex;
- auto_ptr<DBClientCursor> _current;
+ FilteringClientCursor _current;
+
+ int _needToSkip;
};
@@ -123,17 +222,18 @@ namespace mongo {
virtual bool more();
virtual BSONObj next();
virtual string type() const { return "ParallelSort"; }
- private:
+ protected:
+ void _finishCons();
void _init();
-
- void advance();
+
+ virtual void _explain( map< string,list<BSONObj> >& out );
int _numServers;
set<ServerAndQuery> _servers;
BSONObj _sortKey;
-
- auto_ptr<DBClientCursor> * _cursors;
- BSONObj * _nexts;
+
+ FilteringClientCursor * _cursors;
+ int _needToSkip;
};
/**
@@ -193,3 +293,5 @@ namespace mongo {
}
+
+#include "undef_macros.h"
diff --git a/client/redef_macros.h b/client/redef_macros.h
new file mode 100644
index 0000000..dd2e66f
--- /dev/null
+++ b/client/redef_macros.h
@@ -0,0 +1,55 @@
+/** @file redef_macros.h - redefine macros from undef_macros.h */
+
+/* Copyright 2009 10gen Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// If you define a new global un-prefixed macro, please add it here and in undef_macros
+
+// #pragma once // this file is intended to be processed multiple times
+
+#if defined(MONGO_MACROS_CLEANED)
+
+// util/allocator.h
+#define malloc MONGO_malloc
+#define realloc MONGO_realloc
+
+// util/assert_util.h
+#define assert MONGO_assert
+#define dassert MONGO_dassert
+#define wassert MONGO_wassert
+#define massert MONGO_massert
+#define uassert MONGO_uassert
+#define BOOST_CHECK_EXCEPTION MONGO_BOOST_CHECK_EXCEPTION
+#define DESTRUCTOR_GUARD MONGO_DESTRUCTOR_GUARD
+
+// util/goodies.h
+#define PRINT MONGO_PRINT
+#define PRINTFL MONGO_PRINTFL
+#define asctime MONGO_asctime
+#define gmtime MONGO_gmtime
+#define localtime MONGO_localtime
+#define ctime MONGO_ctime
+
+// util/debug_util.h
+#define DEV MONGO_DEV
+#define DEBUGGING MONGO_DEBUGGING
+#define SOMETIMES MONGO_SOMETIMES
+#define OCCASIONALLY MONGO_OCCASIONALLY
+#define RARELY MONGO_RARELY
+#define ONCE MONGO_ONCE
+
+#undef MONGO_MACROS_CLEANED
+#endif
+
diff --git a/client/syncclusterconnection.cpp b/client/syncclusterconnection.cpp
index 0a8fc79..5324b6c 100644
--- a/client/syncclusterconnection.cpp
+++ b/client/syncclusterconnection.cpp
@@ -16,15 +16,29 @@
*/
-#include "stdafx.h"
+#include "pch.h"
#include "syncclusterconnection.h"
#include "../db/dbmessage.h"
// error codes 8000-8009
namespace mongo {
+
+ SyncClusterConnection::SyncClusterConnection( const list<HostAndPort> & L) : _mutex("SynClusterConnection") {
+ {
+ stringstream s;
+ int n=0;
+ for( list<HostAndPort>::const_iterator i = L.begin(); i != L.end(); i++ ) {
+ if( ++n > 1 ) s << ',';
+ s << i->toString();
+ }
+ _address = s.str();
+ }
+ for( list<HostAndPort>::const_iterator i = L.begin(); i != L.end(); i++ )
+ _connect( i->toString() );
+ }
- SyncClusterConnection::SyncClusterConnection( string commaSeperated ){
+ SyncClusterConnection::SyncClusterConnection( string commaSeperated ) : _mutex("SyncClusterConnection") {
_address = commaSeperated;
string::size_type idx;
while ( ( idx = commaSeperated.find( ',' ) ) != string::npos ){
@@ -36,7 +50,7 @@ namespace mongo {
uassert( 8004 , "SyncClusterConnection needs 3 servers" , _conns.size() == 3 );
}
- SyncClusterConnection::SyncClusterConnection( string a , string b , string c ){
+ SyncClusterConnection::SyncClusterConnection( string a , string b , string c ) : _mutex("SyncClusterConnection") {
_address = a + "," + b + "," + c;
// connect to all even if not working
_connect( a );
@@ -44,7 +58,7 @@ namespace mongo {
_connect( c );
}
- SyncClusterConnection::SyncClusterConnection( SyncClusterConnection& prev ){
+ SyncClusterConnection::SyncClusterConnection( SyncClusterConnection& prev ) : _mutex("SyncClusterConnection") {
assert(0);
}
@@ -55,6 +69,7 @@ namespace mongo {
}
bool SyncClusterConnection::prepare( string& errmsg ){
+ _lastErrors.clear();
return fsync( errmsg );
}
@@ -79,7 +94,7 @@ namespace mongo {
}
void SyncClusterConnection::_checkLast(){
- vector<BSONObj> all;
+ _lastErrors.clear();
vector<string> errors;
for ( size_t i=0; i<_conns.size(); i++ ){
@@ -95,17 +110,17 @@ namespace mongo {
catch ( ... ){
err += "unknown failure";
}
- all.push_back( res );
+ _lastErrors.push_back( res.getOwned() );
errors.push_back( err );
}
-
- assert( all.size() == errors.size() && all.size() == _conns.size() );
+
+ assert( _lastErrors.size() == errors.size() && _lastErrors.size() == _conns.size() );
stringstream err;
bool ok = true;
for ( size_t i = 0; i<_conns.size(); i++ ){
- BSONObj res = all[i];
+ BSONObj res = _lastErrors[i];
if ( res["ok"].trueValue() && res["fsyncFiles"].numberInt() > 0 )
continue;
ok = false;
@@ -117,35 +132,71 @@ namespace mongo {
throw UserException( 8001 , (string)"SyncClusterConnection write op failed: " + err.str() );
}
+ BSONObj SyncClusterConnection::getLastErrorDetailed(){
+ if ( _lastErrors.size() )
+ return _lastErrors[0];
+ return DBClientBase::getLastErrorDetailed();
+ }
+
void SyncClusterConnection::_connect( string host ){
log() << "SyncClusterConnection connecting to [" << host << "]" << endl;
DBClientConnection * c = new DBClientConnection( true );
string errmsg;
if ( ! c->connect( host , errmsg ) )
log() << "SyncClusterConnection connect fail to: " << host << " errmsg: " << errmsg << endl;
+ _connAddresses.push_back( host );
_conns.push_back( c );
}
- auto_ptr<DBClientCursor> SyncClusterConnection::query(const string &ns, Query query, int nToReturn, int nToSkip,
- const BSONObj *fieldsToReturn, int queryOptions, int batchSize ){
+ bool SyncClusterConnection::callRead( Message& toSend , Message& response ){
+ // TODO: need to save state of which one to go back to somehow...
+ return _conns[0]->callRead( toSend , response );
+ }
+ BSONObj SyncClusterConnection::findOne(const string &ns, const Query& query, const BSONObj *fieldsToReturn, int queryOptions) {
+
if ( ns.find( ".$cmd" ) != string::npos ){
string cmdName = query.obj.firstElement().fieldName();
- int lockType = 0;
-
- map<string,int>::iterator i = _lockTypes.find( cmdName );
- if ( i == _lockTypes.end() ){
- BSONObj info;
- uassert( 13053 , "help failed" , _commandOnActive( "admin" , BSON( cmdName << "1" << "help" << 1 ) , info ) );
- lockType = info["lockType"].numberInt();
- _lockTypes[cmdName] = lockType;
- }
- else {
- lockType = i->second;
+ int lockType = _lockType( cmdName );
+
+ if ( lockType > 0 ){ // write $cmd
+ string errmsg;
+ if ( ! prepare( errmsg ) )
+ throw UserException( 13104 , (string)"SyncClusterConnection::findOne prepare failed: " + errmsg );
+
+ vector<BSONObj> all;
+ for ( size_t i=0; i<_conns.size(); i++ ){
+ all.push_back( _conns[i]->findOne( ns , query , 0 , queryOptions ).getOwned() );
+ }
+
+ _checkLast();
+
+ for ( size_t i=0; i<all.size(); i++ ){
+ BSONObj temp = all[i];
+ if ( isOk( temp ) )
+ continue;
+ stringstream ss;
+ ss << "write $cmd failed on a shard: " << temp.jsonString();
+ ss << " " << _conns[i]->toString();
+ throw UserException( 13105 , ss.str() );
+ }
+
+ return all[0];
}
-
- uassert( 13054 , (string)"write $cmd not supported in SyncClusterConnection: " + cmdName , lockType <= 0 );
+ }
+
+ return DBClientBase::findOne( ns , query , fieldsToReturn , queryOptions );
+ }
+
+
+ auto_ptr<DBClientCursor> SyncClusterConnection::query(const string &ns, Query query, int nToReturn, int nToSkip,
+ const BSONObj *fieldsToReturn, int queryOptions, int batchSize ){
+ _lastErrors.clear();
+ if ( ns.find( ".$cmd" ) != string::npos ){
+ string cmdName = query.obj.firstElement().fieldName();
+ int lockType = _lockType( cmdName );
+ uassert( 13054 , (string)"write $cmd not supported in SyncClusterConnection::query for:" + cmdName , lockType <= 0 );
}
return _queryOnActive( ns , query , nToReturn , nToSkip , fieldsToReturn , queryOptions , batchSize );
@@ -185,6 +236,10 @@ namespace mongo {
}
void SyncClusterConnection::insert( const string &ns, BSONObj obj ){
+
+ uassert( 13119 , (string)"SyncClusterConnection::insert obj has to have an _id: " + obj.jsonString() ,
+ ns.find( ".system.indexes" ) != string::npos || obj["_id"].type() );
+
string errmsg;
if ( ! prepare( errmsg ) )
throw UserException( 8003 , (string)"SyncClusterConnection::insert prepare failed: " + errmsg );
@@ -201,19 +256,52 @@ namespace mongo {
}
void SyncClusterConnection::remove( const string &ns , Query query, bool justOne ){
- assert(0);
+ string errmsg;
+ if ( ! prepare( errmsg ) )
+ throw UserException( 8020 , (string)"SyncClusterConnection::remove prepare failed: " + errmsg );
+
+ for ( size_t i=0; i<_conns.size(); i++ ){
+ _conns[i]->remove( ns , query , justOne );
+ }
+
+ _checkLast();
}
void SyncClusterConnection::update( const string &ns , Query query , BSONObj obj , bool upsert , bool multi ){
- string errmsg;
- if ( ! prepare( errmsg ) )
- throw UserException( 8005 , (string)"SyncClusterConnection::udpate prepare failed: " + errmsg );
+
+ if ( upsert ){
+ uassert( 13120 , "SyncClusterConnection::update upsert query needs _id" , query.obj["_id"].type() );
+ }
+
+ if ( _writeConcern ){
+ string errmsg;
+ if ( ! prepare( errmsg ) )
+ throw UserException( 8005 , (string)"SyncClusterConnection::udpate prepare failed: " + errmsg );
+ }
for ( size_t i=0; i<_conns.size(); i++ ){
- _conns[i]->update( ns , query , obj , upsert , multi );
+ try {
+ _conns[i]->update( ns , query , obj , upsert , multi );
+ }
+ catch ( std::exception& e ){
+ if ( _writeConcern )
+ throw e;
+ }
}
- _checkLast();
+ if ( _writeConcern ){
+ _checkLast();
+ assert( _lastErrors.size() > 1 );
+
+ int a = _lastErrors[0]["n"].numberInt();
+ for ( unsigned i=1; i<_lastErrors.size(); i++ ){
+ int b = _lastErrors[i]["n"].numberInt();
+ if ( a == b )
+ continue;
+
+ throw UpdateNotTheSame( 8017 , "update not consistent" , _connAddresses , _lastErrors );
+ }
+ }
}
string SyncClusterConnection::_toString() const {
@@ -244,12 +332,41 @@ namespace mongo {
}
void SyncClusterConnection::say( Message &toSend ){
- assert(0);
+ string errmsg;
+ if ( ! prepare( errmsg ) )
+ throw UserException( 13397 , (string)"SyncClusterConnection::say prepare failed: " + errmsg );
+
+ for ( size_t i=0; i<_conns.size(); i++ ){
+ _conns[i]->say( toSend );
+ }
+
+ _checkLast();
}
void SyncClusterConnection::sayPiggyBack( Message &toSend ){
assert(0);
}
+ int SyncClusterConnection::_lockType( const string& name ){
+ {
+ scoped_lock lk(_mutex);
+ map<string,int>::iterator i = _lockTypes.find( name );
+ if ( i != _lockTypes.end() )
+ return i->second;
+ }
+
+ BSONObj info;
+ uassert( 13053 , "help failed" , _commandOnActive( "admin" , BSON( name << "1" << "help" << 1 ) , info ) );
+
+ int lockType = info["lockType"].numberInt();
+ scoped_lock lk(_mutex);
+ _lockTypes[name] = lockType;
+ return lockType;
+ }
+
+ void SyncClusterConnection::killCursor( long long cursorID ){
+ // should never need to do this
+ assert(0);
+ }
}
diff --git a/client/syncclusterconnection.h b/client/syncclusterconnection.h
index e3411e1..d1115f7 100644
--- a/client/syncclusterconnection.h
+++ b/client/syncclusterconnection.h
@@ -1,4 +1,5 @@
-// syncclusterconnection.h
+// @file syncclusterconnection.h
+
/*
* Copyright 2010 10gen Inc.
*
@@ -16,25 +17,36 @@
*/
-#include "../stdafx.h"
+#include "../pch.h"
#include "dbclient.h"
+#include "redef_macros.h"
namespace mongo {
/**
- * this is a connection to a cluster of servers that operate as one
- * for super high durability
+ * This is a connection to a cluster of servers that operate as one
+ * for super high durability.
+ *
+ * Write operations are two-phase. First, all nodes are asked to fsync. If successful
+ * everywhere, the write is sent everywhere and then followed by an fsync. There is no
+ * rollback if a problem occurs during the second phase. Naturally, with all these fsyncs,
+ * these operations will be quite slow -- use sparingly.
+ *
+ * Read operations are sent to a single random node.
+ *
+ * The class checks if a command is read or write style, and sends to a single
+ * node if a read lock command and to all in two phases with a write style command.
*/
class SyncClusterConnection : public DBClientBase {
public:
/**
- * @param commaSeperated should be 3 hosts comma seperated
+ * @param commaSeparated should be 3 hosts comma separated
*/
- SyncClusterConnection( string commaSeperated );
+ SyncClusterConnection( const list<HostAndPort> & );
+ SyncClusterConnection( string commaSeparated );
SyncClusterConnection( string a , string b , string c );
~SyncClusterConnection();
-
/**
* @return true if all servers are up and ready for writes
*/
@@ -47,6 +59,8 @@ namespace mongo {
// --- from DBClientInterface
+ virtual BSONObj findOne(const string &ns, const Query& query, const BSONObj *fieldsToReturn, int queryOptions);
+
virtual auto_ptr<DBClientCursor> query(const string &ns, Query query, int nToReturn, int nToSkip,
const BSONObj *fieldsToReturn, int queryOptions, int batchSize );
@@ -60,41 +74,65 @@ namespace mongo {
virtual void update( const string &ns , Query query , BSONObj obj , bool upsert , bool multi );
- virtual string toString(){
- return _toString();
- }
-
virtual bool call( Message &toSend, Message &response, bool assertOk );
virtual void say( Message &toSend );
virtual void sayPiggyBack( Message &toSend );
+
+ virtual void killCursor( long long cursorID );
virtual string getServerAddress() const { return _address; }
+ virtual bool isFailed() const { return false; }
+ virtual string toString() { return _toString(); }
- virtual bool isFailed() const {
- return false;
- }
+ virtual BSONObj getLastErrorDetailed();
+
+ virtual bool callRead( Message& toSend , Message& response );
+
+ virtual ConnectionString::ConnectionType type() const { return ConnectionString::SYNC; }
private:
-
SyncClusterConnection( SyncClusterConnection& prev );
-
- string _toString() const;
-
+ string _toString() const;
bool _commandOnActive(const string &dbname, const BSONObj& cmd, BSONObj &info, int options=0);
-
auto_ptr<DBClientCursor> _queryOnActive(const string &ns, Query query, int nToReturn, int nToSkip,
const BSONObj *fieldsToReturn, int queryOptions, int batchSize );
-
- bool _isReadOnly( const string& name );
-
+ int _lockType( const string& name );
void _checkLast();
-
void _connect( string host );
string _address;
+ vector<string> _connAddresses;
vector<DBClientConnection*> _conns;
map<string,int> _lockTypes;
+ mongo::mutex _mutex;
+
+ vector<BSONObj> _lastErrors;
};
+ class UpdateNotTheSame : public UserException {
+ public:
+ UpdateNotTheSame( int code , const string& msg , const vector<string>& addrs , const vector<BSONObj>& lastErrors )
+ : UserException( code , msg ) , _addrs( addrs ) , _lastErrors( lastErrors ){
+ assert( _addrs.size() == _lastErrors.size() );
+ }
+
+ virtual ~UpdateNotTheSame() throw() {
+ }
+
+ unsigned size() const {
+ return _addrs.size();
+ }
+ pair<string,BSONObj> operator[](unsigned i) const {
+ return make_pair( _addrs[i] , _lastErrors[i] );
+ }
+
+ private:
+
+ vector<string> _addrs;
+ vector<BSONObj> _lastErrors;
+ };
+
};
+
+#include "undef_macros.h"
diff --git a/client/undef_macros.h b/client/undef_macros.h
new file mode 100644
index 0000000..cce8692
--- /dev/null
+++ b/client/undef_macros.h
@@ -0,0 +1,58 @@
+/** @file undef_macros.h - remove mongo-specific macros that might cause issues */
+
+/* Copyright 2009 10gen Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// If you define a new global un-prefixed macro, please add it here and in redef_macros
+
+// #pragma once // this file is intended to be processed multiple times
+
+
+/** MONGO_EXPOSE_MACROS - when defined, indicates that you are compiling a mongo program rather
+ than just using the C++ driver.
+*/
+#if !defined(MONGO_EXPOSE_MACROS) && !defined(MONGO_MACROS_CLEANED)
+
+// util/allocator.h
+#undef malloc
+#undef realloc
+
+// util/assert_util.h
+#undef assert
+#undef dassert
+#undef wassert
+#undef massert
+#undef uassert
+#undef BOOST_CHECK_EXCEPTION
+#undef DESTRUCTOR_GUARD
+
+// util/goodies.h
+#undef PRINT
+#undef PRINTFL
+#undef asctime
+#undef gmtime
+#undef localtime
+#undef ctime
+
+// util/debug_util.h
+#undef DEV
+#undef DEBUGGING
+#undef SOMETIMES
+#undef OCCASIONALLY
+#undef RARELY
+#undef ONCE
+
+#define MONGO_MACROS_CLEANED
+#endif