summaryrefslogtreecommitdiff
path: root/client
diff options
context:
space:
mode:
authorAntonin Kral <a.kral@bobek.cz>2010-01-31 08:32:52 +0100
committerAntonin Kral <a.kral@bobek.cz>2010-01-31 08:32:52 +0100
commit4eefaf421bfeddf040d96a3dafb12e09673423d7 (patch)
treecb2e5ccc7f98158894f977ff131949da36673591 /client
downloadmongodb-4eefaf421bfeddf040d96a3dafb12e09673423d7.tar.gz
Imported Upstream version 1.3.1
Diffstat (limited to 'client')
-rw-r--r--client/clientOnly.cpp57
-rw-r--r--client/connpool.cpp122
-rw-r--r--client/connpool.h135
-rw-r--r--client/dbclient.cpp981
-rw-r--r--client/dbclient.h894
-rw-r--r--client/examples/authTest.cpp53
-rw-r--r--client/examples/clientTest.cpp214
-rw-r--r--client/examples/first.cpp85
-rw-r--r--client/examples/second.cpp56
-rw-r--r--client/examples/tail.cpp55
-rw-r--r--client/examples/tutorial.cpp67
-rw-r--r--client/examples/whereExample.cpp68
-rw-r--r--client/gridfs.cpp233
-rw-r--r--client/gridfs.h203
-rw-r--r--client/model.cpp97
-rw-r--r--client/model.h57
-rw-r--r--client/parallel.cpp259
-rw-r--r--client/parallel.h195
-rw-r--r--client/syncclusterconnection.cpp165
-rw-r--r--client/syncclusterconnection.h57
20 files changed, 4053 insertions, 0 deletions
diff --git a/client/clientOnly.cpp b/client/clientOnly.cpp
new file mode 100644
index 0000000..f9fc570
--- /dev/null
+++ b/client/clientOnly.cpp
@@ -0,0 +1,57 @@
+// clientOnly.cpp
+
+/* Copyright 2009 10gen Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "stdafx.h"
+#include "../client/dbclient.h"
+#include "../db/dbhelpers.h"
+#include "../db/cmdline.h"
+
+namespace mongo {
+
+ CmdLine cmdLine;
+
+ const char * curNs = "in client mode";
+
+ bool dbexitCalled = false;
+
+ void dbexit( ExitCode returnCode, const char *whyMsg ) {
+ dbexitCalled = true;
+ out() << "dbexit called" << endl;
+ if ( whyMsg )
+ out() << " b/c " << whyMsg << endl;
+ out() << "exiting" << endl;
+ ::exit( returnCode );
+ }
+
+ bool inShutdown(){
+ return dbexitCalled;
+ }
+
+ string getDbContext() {
+ return "in client only mode";
+ }
+
+ bool haveLocalShardingInfo( const string& ns ){
+ return false;
+ }
+/*
+ auto_ptr<CursorIterator> Helpers::find( const char *ns , BSONObj query , bool requireIndex ){
+ uassert( 10000 , "Helpers::find can't be used in client" , 0 );
+ return auto_ptr<CursorIterator>(0);
+ }
+*/
+}
diff --git a/client/connpool.cpp b/client/connpool.cpp
new file mode 100644
index 0000000..b332bae
--- /dev/null
+++ b/client/connpool.cpp
@@ -0,0 +1,122 @@
+/* connpool.cpp
+*/
+
+/* Copyright 2009 10gen Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// _ todo: reconnect?
+
+#include "stdafx.h"
+#include "connpool.h"
+#include "../db/commands.h"
+
+namespace mongo {
+
+ DBConnectionPool pool;
+
+ DBClientBase* DBConnectionPool::get(const string& host) {
+ boostlock L(poolMutex);
+
+ PoolForHost *&p = pools[host];
+ if ( p == 0 )
+ p = new PoolForHost();
+ if ( p->pool.empty() ) {
+ string errmsg;
+ DBClientBase *c;
+ if( host.find(',') == string::npos ) {
+ DBClientConnection *cc = new DBClientConnection(true);
+ log(2) << "creating new connection for pool to:" << host << endl;
+ if ( !cc->connect(host.c_str(), errmsg) ) {
+ delete cc;
+ uassert( 11002 , (string)"dbconnectionpool: connect failed " + host , false);
+ return 0;
+ }
+ c = cc;
+ onCreate( c );
+ }
+ else {
+ DBClientPaired *p = new DBClientPaired();
+ if( !p->connect(host) ) {
+ delete p;
+ uassert( 11003 , (string)"dbconnectionpool: connect failed [2] " + host , false);
+ return 0;
+ }
+ c = p;
+ }
+ return c;
+ }
+ DBClientBase *c = p->pool.top();
+ p->pool.pop();
+ onHandedOut( c );
+ return c;
+ }
+
+ void DBConnectionPool::flush(){
+ boostlock L(poolMutex);
+ for ( map<string,PoolForHost*>::iterator i = pools.begin(); i != pools.end(); i++ ){
+ PoolForHost* p = i->second;
+
+ vector<DBClientBase*> all;
+ while ( ! p->pool.empty() ){
+ DBClientBase * c = p->pool.top();
+ p->pool.pop();
+ all.push_back( c );
+ bool res;
+ c->isMaster( res );
+ }
+
+ for ( vector<DBClientBase*>::iterator i=all.begin(); i != all.end(); i++ ){
+ p->pool.push( *i );
+ }
+ }
+ }
+
+ void DBConnectionPool::addHook( DBConnectionHook * hook ){
+ _hooks.push_back( hook );
+ }
+
+ void DBConnectionPool::onCreate( DBClientBase * conn ){
+ if ( _hooks.size() == 0 )
+ return;
+
+ for ( list<DBConnectionHook*>::iterator i = _hooks.begin(); i != _hooks.end(); i++ ){
+ (*i)->onCreate( conn );
+ }
+ }
+
+ void DBConnectionPool::onHandedOut( DBClientBase * conn ){
+ if ( _hooks.size() == 0 )
+ return;
+
+ for ( list<DBConnectionHook*>::iterator i = _hooks.begin(); i != _hooks.end(); i++ ){
+ (*i)->onHandedOut( conn );
+ }
+ }
+
+ class PoolFlushCmd : public Command {
+ public:
+ PoolFlushCmd() : Command( "connpoolsync" ){}
+ virtual bool run(const char*, mongo::BSONObj&, std::string&, mongo::BSONObjBuilder& result, bool){
+ pool.flush();
+ result << "ok" << 1;
+ return true;
+ }
+ virtual bool slaveOk(){
+ return true;
+ }
+
+ } poolFlushCmd;
+
+} // namespace mongo
diff --git a/client/connpool.h b/client/connpool.h
new file mode 100644
index 0000000..34ed498
--- /dev/null
+++ b/client/connpool.h
@@ -0,0 +1,135 @@
+/** @file connpool.h */
+
+/* Copyright 2009 10gen Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <stack>
+#include "dbclient.h"
+
+namespace mongo {
+
+ struct PoolForHost {
+ std::stack<DBClientBase*> pool;
+ };
+
+ class DBConnectionHook {
+ public:
+ virtual ~DBConnectionHook(){}
+
+ virtual void onCreate( DBClientBase * conn ){}
+ virtual void onHandedOut( DBClientBase * conn ){}
+
+ };
+
+ /** Database connection pool.
+
+ Generally, use ScopedDbConnection and do not call these directly.
+
+ This class, so far, is suitable for use with unauthenticated connections.
+ Support for authenticated connections requires some adjustements: please
+ request...
+
+ Usage:
+
+ {
+ ScopedDbConnection c("myserver");
+ c.conn()...
+ }
+ */
+ class DBConnectionPool {
+ boost::mutex poolMutex;
+ map<string,PoolForHost*> pools; // servername -> pool
+ list<DBConnectionHook*> _hooks;
+
+ void onCreate( DBClientBase * conn );
+ void onHandedOut( DBClientBase * conn );
+ public:
+ void flush();
+ DBClientBase *get(const string& host);
+ void release(const string& host, DBClientBase *c) {
+ if ( c->isFailed() )
+ return;
+ boostlock L(poolMutex);
+ pools[host]->pool.push(c);
+ }
+ void addHook( DBConnectionHook * hook );
+ };
+
+ extern DBConnectionPool pool;
+
+ /** Use to get a connection from the pool. On exceptions things
+ clean up nicely.
+ */
+ class ScopedDbConnection {
+ const string host;
+ DBClientBase *_conn;
+ public:
+ /** get the associated connection object */
+ DBClientBase* operator->(){
+ uassert( 11004 , "did you call done already" , _conn );
+ return _conn;
+ }
+
+ /** get the associated connection object */
+ DBClientBase& conn() {
+ uassert( 11005 , "did you call done already" , _conn );
+ return *_conn;
+ }
+
+ /** throws UserException if can't connect */
+ ScopedDbConnection(const string& _host) :
+ host(_host), _conn( pool.get(_host) ) {
+ //cout << " for: " << _host << " got conn: " << _conn << endl;
+ }
+
+ /** Force closure of the connection. You should call this if you leave it in
+ a bad state. Destructor will do this too, but it is verbose.
+ */
+ void kill() {
+ delete _conn;
+ _conn = 0;
+ }
+
+ /** Call this when you are done with the connection.
+
+ If you do not call done() before this object goes out of scope,
+ we can't be sure we fully read all expected data of a reply on the socket. so
+ we don't try to reuse the connection in that situation.
+ */
+ void done() {
+ if ( ! _conn )
+ return;
+
+ /* we could do this, but instead of assume one is using autoreconnect mode on the connection
+ if ( _conn->isFailed() )
+ kill();
+ else
+ */
+ pool.release(host, _conn);
+ _conn = 0;
+ }
+
+ ~ScopedDbConnection() {
+ if ( _conn && ! _conn->isFailed() ) {
+ /* see done() comments above for why we log this line */
+ log() << "~ScopedDBConnection: _conn != null" << endl;
+ kill();
+ }
+ }
+ };
+
+} // namespace mongo
diff --git a/client/dbclient.cpp b/client/dbclient.cpp
new file mode 100644
index 0000000..165981d
--- /dev/null
+++ b/client/dbclient.cpp
@@ -0,0 +1,981 @@
+// dbclient.cpp - connect to a Mongo database as a database, from C++
+
+/* Copyright 2009 10gen Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "stdafx.h"
+#include "../db/pdfile.h"
+#include "dbclient.h"
+#include "../util/builder.h"
+#include "../db/jsobj.h"
+#include "../db/json.h"
+#include "../db/instance.h"
+#include "../util/md5.hpp"
+#include "../db/dbmessage.h"
+#include "../db/cmdline.h"
+
+namespace mongo {
+
+ Query& Query::where(const string &jscode, BSONObj scope) {
+ /* use where() before sort() and hint() and explain(), else this will assert. */
+ assert( !obj.hasField("query") );
+ BSONObjBuilder b;
+ b.appendElements(obj);
+ b.appendWhere(jscode, scope);
+ obj = b.obj();
+ return *this;
+ }
+
+ void Query::makeComplex() {
+ if ( obj.hasElement( "query" ) )
+ return;
+ BSONObjBuilder b;
+ b.append( "query", obj );
+ obj = b.obj();
+ }
+
+ Query& Query::sort(const BSONObj& s) {
+ appendComplex( "orderby", s );
+ return *this;
+ }
+
+ Query& Query::hint(BSONObj keyPattern) {
+ appendComplex( "$hint", keyPattern );
+ return *this;
+ }
+
+ Query& Query::explain() {
+ appendComplex( "$explain", true );
+ return *this;
+ }
+
+ Query& Query::snapshot() {
+ appendComplex( "$snapshot", true );
+ return *this;
+ }
+
+ Query& Query::minKey( const BSONObj &val ) {
+ appendComplex( "$min", val );
+ return *this;
+ }
+
+ Query& Query::maxKey( const BSONObj &val ) {
+ appendComplex( "$max", val );
+ return *this;
+ }
+
+ bool Query::isComplex() const{
+ return obj.hasElement( "query" );
+ }
+
+ BSONObj Query::getFilter() const {
+ if ( ! isComplex() )
+ return obj;
+ return obj.getObjectField( "query" );
+ }
+ BSONObj Query::getSort() const {
+ if ( ! isComplex() )
+ return BSONObj();
+ return obj.getObjectField( "orderby" );
+ }
+ BSONObj Query::getHint() const {
+ if ( ! isComplex() )
+ return BSONObj();
+ return obj.getObjectField( "$hint" );
+ }
+ bool Query::isExplain() const {
+ return isComplex() && obj.getBoolField( "$explain" );
+ }
+
+ string Query::toString() const{
+ return obj.toString();
+ }
+
+ /* --- dbclientcommands --- */
+
+ inline bool DBClientWithCommands::isOk(const BSONObj& o) {
+ return o.getIntField("ok") == 1;
+ }
+
+ inline bool DBClientWithCommands::runCommand(const string &dbname, const BSONObj& cmd, BSONObj &info, int options) {
+ string ns = dbname + ".$cmd";
+ info = findOne(ns, cmd, 0 , options);
+ return isOk(info);
+ }
+
+ /* note - we build a bson obj here -- for something that is super common like getlasterror you
+ should have that object prebuilt as that would be faster.
+ */
+ bool DBClientWithCommands::simpleCommand(const string &dbname, BSONObj *info, const string &command) {
+ BSONObj o;
+ if ( info == 0 )
+ info = &o;
+ BSONObjBuilder b;
+ b.append(command, 1);
+ return runCommand(dbname, b.done(), *info);
+ }
+
+ unsigned long long DBClientWithCommands::count(const string &_ns, const BSONObj& query, int options) {
+ NamespaceString ns(_ns);
+ BSONObj cmd = BSON( "count" << ns.coll << "query" << query );
+ BSONObj res;
+ if( !runCommand(ns.db.c_str(), cmd, res, options) )
+ uasserted(11010,string("count fails:") + res.toString());
+ return res.getIntField("n");
+ }
+
+ BSONObj getlasterrorcmdobj = fromjson("{getlasterror:1}");
+
+ BSONObj DBClientWithCommands::getLastErrorDetailed() {
+ BSONObj info;
+ runCommand("admin", getlasterrorcmdobj, info);
+ return info;
+ }
+
+ string DBClientWithCommands::getLastError() {
+ BSONObj info = getLastErrorDetailed();
+ BSONElement e = info["err"];
+ if( e.eoo() ) return "";
+ if( e.type() == Object ) return e.toString();
+ return e.str();
+ }
+
+ BSONObj getpreverrorcmdobj = fromjson("{getpreverror:1}");
+
+ BSONObj DBClientWithCommands::getPrevError() {
+ BSONObj info;
+ runCommand("admin", getpreverrorcmdobj, info);
+ return info;
+ }
+
+ BSONObj getnoncecmdobj = fromjson("{getnonce:1}");
+
+ string DBClientWithCommands::createPasswordDigest( const string & username , const string & clearTextPassword ){
+ md5digest d;
+ {
+ md5_state_t st;
+ md5_init(&st);
+ md5_append(&st, (const md5_byte_t *) username.data(), username.length());
+ md5_append(&st, (const md5_byte_t *) ":mongo:", 7 );
+ md5_append(&st, (const md5_byte_t *) clearTextPassword.data(), clearTextPassword.length());
+ md5_finish(&st, d);
+ }
+ return digestToString( d );
+ }
+
+ bool DBClientWithCommands::auth(const string &dbname, const string &username, const string &password_text, string& errmsg, bool digestPassword) {
+ //cout << "TEMP AUTH " << toString() << dbname << ' ' << username << ' ' << password_text << ' ' << digestPassword << endl;
+
+ string password = password_text;
+ if( digestPassword )
+ password = createPasswordDigest( username , password_text );
+
+ BSONObj info;
+ string nonce;
+ if( !runCommand(dbname, getnoncecmdobj, info) ) {
+ errmsg = "getnonce fails - connection problem?";
+ return false;
+ }
+ {
+ BSONElement e = info.getField("nonce");
+ assert( e.type() == String );
+ nonce = e.valuestr();
+ }
+
+ BSONObj authCmd;
+ BSONObjBuilder b;
+ {
+
+ b << "authenticate" << 1 << "nonce" << nonce << "user" << username;
+ md5digest d;
+ {
+ md5_state_t st;
+ md5_init(&st);
+ md5_append(&st, (const md5_byte_t *) nonce.c_str(), nonce.size() );
+ md5_append(&st, (const md5_byte_t *) username.data(), username.length());
+ md5_append(&st, (const md5_byte_t *) password.c_str(), password.size() );
+ md5_finish(&st, d);
+ }
+ b << "key" << digestToString( d );
+ authCmd = b.done();
+ }
+
+ if( runCommand(dbname, authCmd, info) )
+ return true;
+
+ errmsg = info.toString();
+ return false;
+ }
+
+ BSONObj ismastercmdobj = fromjson("{\"ismaster\":1}");
+
+ bool DBClientWithCommands::isMaster(bool& isMaster, BSONObj *info) {
+ BSONObj o;
+ if ( info == 0 ) info = &o;
+ bool ok = runCommand("admin", ismastercmdobj, *info);
+ isMaster = (info->getIntField("ismaster") == 1);
+ return ok;
+ }
+
+ bool DBClientWithCommands::createCollection(const string &ns, unsigned size, bool capped, int max, BSONObj *info) {
+ BSONObj o;
+ if ( info == 0 ) info = &o;
+ BSONObjBuilder b;
+ b.append("create", ns);
+ if ( size ) b.append("size", size);
+ if ( capped ) b.append("capped", true);
+ if ( max ) b.append("max", max);
+ string db = nsToDatabase(ns.c_str());
+ return runCommand(db.c_str(), b.done(), *info);
+ }
+
+ bool DBClientWithCommands::copyDatabase(const string &fromdb, const string &todb, const string &fromhost, BSONObj *info) {
+ BSONObj o;
+ if ( info == 0 ) info = &o;
+ BSONObjBuilder b;
+ b.append("copydb", 1);
+ b.append("fromhost", fromhost);
+ b.append("fromdb", fromdb);
+ b.append("todb", todb);
+ return runCommand("admin", b.done(), *info);
+ }
+
+ bool DBClientWithCommands::setDbProfilingLevel(const string &dbname, ProfilingLevel level, BSONObj *info ) {
+ BSONObj o;
+ if ( info == 0 ) info = &o;
+
+ if ( level ) {
+ // Create system.profile collection. If it already exists this does nothing.
+ // TODO: move this into the db instead of here so that all
+ // drivers don't have to do this.
+ string ns = dbname + ".system.profile";
+ createCollection(ns.c_str(), 1024 * 1024, true, 0, info);
+ }
+
+ BSONObjBuilder b;
+ b.append("profile", (int) level);
+ return runCommand(dbname, b.done(), *info);
+ }
+
+ BSONObj getprofilingcmdobj = fromjson("{\"profile\":-1}");
+
+ bool DBClientWithCommands::getDbProfilingLevel(const string &dbname, ProfilingLevel& level, BSONObj *info) {
+ BSONObj o;
+ if ( info == 0 ) info = &o;
+ if ( runCommand(dbname, getprofilingcmdobj, *info) ) {
+ level = (ProfilingLevel) info->getIntField("was");
+ return true;
+ }
+ return false;
+ }
+
+ BSONObj DBClientWithCommands::mapreduce(const string &ns, const string &jsmapf, const string &jsreducef, BSONObj query, const string& outputcolname) {
+ BSONObjBuilder b;
+ b.append("mapreduce", nsGetCollection(ns));
+ b.appendCode("map", jsmapf.c_str());
+ b.appendCode("reduce", jsreducef.c_str());
+ if( !query.isEmpty() )
+ b.append("query", query);
+ if( !outputcolname.empty() )
+ b.append("out", outputcolname);
+ BSONObj info;
+ runCommand(nsGetDB(ns), b.done(), info);
+ return info;
+ }
+
+ bool DBClientWithCommands::eval(const string &dbname, const string &jscode, BSONObj& info, BSONElement& retValue, BSONObj *args) {
+ BSONObjBuilder b;
+ b.appendCode("$eval", jscode.c_str());
+ if ( args )
+ b.appendArray("args", *args);
+ bool ok = runCommand(dbname, b.done(), info);
+ if ( ok )
+ retValue = info.getField("retval");
+ return ok;
+ }
+
+ bool DBClientWithCommands::eval(const string &dbname, const string &jscode) {
+ BSONObj info;
+ BSONElement retValue;
+ return eval(dbname, jscode, info, retValue);
+ }
+
+ list<string> DBClientWithCommands::getDatabaseNames(){
+ BSONObj info;
+ uassert( 10005 , "listdatabases failed" , runCommand( "admin" , BSON( "listDatabases" << 1 ) , info ) );
+ uassert( 10006 , "listDatabases.databases not array" , info["databases"].type() == Array );
+
+ list<string> names;
+
+ BSONObjIterator i( info["databases"].embeddedObjectUserCheck() );
+ while ( i.more() ){
+ names.push_back( i.next().embeddedObjectUserCheck()["name"].valuestr() );
+ }
+
+ return names;
+ }
+
+ list<string> DBClientWithCommands::getCollectionNames( const string& db ){
+ list<string> names;
+
+ string ns = db + ".system.namespaces";
+ auto_ptr<DBClientCursor> c = query( ns.c_str() , BSONObj() );
+ while ( c->more() ){
+ string name = c->next()["name"].valuestr();
+ if ( name.find( "$" ) != string::npos )
+ continue;
+ names.push_back( name );
+ }
+ return names;
+ }
+
+ bool DBClientWithCommands::exists( const string& ns ){
+ list<string> names;
+
+ string db = nsGetDB( ns ) + ".system.namespaces";
+ BSONObj q = BSON( "name" << ns );
+ return count( db.c_str() , q );
+ }
+
+
+ void testSort() {
+ DBClientConnection c;
+ string err;
+ if ( !c.connect("localhost", err) ) {
+ out() << "can't connect to server " << err << endl;
+ return;
+ }
+
+ cout << "findOne returns:" << endl;
+ cout << c.findOne("test.foo", QUERY( "x" << 3 ) ).toString() << endl;
+ cout << c.findOne("test.foo", QUERY( "x" << 3 ).sort("name") ).toString() << endl;
+
+ }
+
+ /* TODO: unit tests should run this? */
+ void testDbEval() {
+ DBClientConnection c;
+ string err;
+ if ( !c.connect("localhost", err) ) {
+ out() << "can't connect to server " << err << endl;
+ return;
+ }
+
+ if( !c.auth("dwight", "u", "p", err) ) {
+ out() << "can't authenticate " << err << endl;
+ return;
+ }
+
+ BSONObj info;
+ BSONElement retValue;
+ BSONObjBuilder b;
+ b.append("0", 99);
+ BSONObj args = b.done();
+ bool ok = c.eval("dwight", "function() { return args[0]; }", info, retValue, &args);
+ out() << "eval ok=" << ok << endl;
+ out() << "retvalue=" << retValue.toString() << endl;
+ out() << "info=" << info.toString() << endl;
+
+ out() << endl;
+
+ int x = 3;
+ assert( c.eval("dwight", "function() { return 3; }", x) );
+
+ out() << "***\n";
+
+ BSONObj foo = fromjson("{\"x\":7}");
+ out() << foo.toString() << endl;
+ int res=0;
+ ok = c.eval("dwight", "function(parm1) { return parm1.x; }", foo, res);
+ out() << ok << " retval:" << res << endl;
+ }
+
+ void testPaired();
+
+ /* --- dbclientconnection --- */
+
+ bool DBClientConnection::auth(const string &dbname, const string &username, const string &password_text, string& errmsg, bool digestPassword) {
+ string password = password_text;
+ if( digestPassword )
+ password = createPasswordDigest( username , password_text );
+
+ if( autoReconnect ) {
+ /* note we remember the auth info before we attempt to auth -- if the connection is broken, we will
+ then have it for the next autoreconnect attempt.
+ */
+ pair<string,string> p = pair<string,string>(username, password);
+ authCache[dbname] = p;
+ }
+
+ return DBClientBase::auth(dbname, username, password.c_str(), errmsg, false);
+ }
+
+ BSONObj DBClientInterface::findOne(const string &ns, Query query, const BSONObj *fieldsToReturn, int queryOptions) {
+ auto_ptr<DBClientCursor> c =
+ this->query(ns, query, 1, 0, fieldsToReturn, queryOptions);
+
+ massert( 10276 , "DBClientBase::findOne: transport error", c.get() );
+
+ if ( !c->more() )
+ return BSONObj();
+
+ return c->next().copy();
+ }
+
+ bool DBClientConnection::connect(const string &_serverAddress, string& errmsg) {
+ serverAddress = _serverAddress;
+
+ string ip;
+ int port;
+ size_t idx = serverAddress.find( ":" );
+ if ( idx != string::npos ) {
+ port = strtol( serverAddress.substr( idx + 1 ).c_str(), 0, 10 );
+ ip = serverAddress.substr( 0 , idx );
+ ip = hostbyname(ip.c_str());
+ } else {
+ port = CmdLine::DefaultDBPort;
+ ip = hostbyname( serverAddress.c_str() );
+ }
+ massert( 10277 , "Unable to parse hostname", !ip.empty() );
+
+ // we keep around SockAddr for connection life -- maybe MessagingPort
+ // requires that?
+ server = auto_ptr<SockAddr>(new SockAddr(ip.c_str(), port));
+ p = auto_ptr<MessagingPort>(new MessagingPort());
+
+ if ( !p->connect(*server) ) {
+ stringstream ss;
+ ss << "couldn't connect to server " << serverAddress << " " << ip << ":" << port;
+ errmsg = ss.str();
+ failed = true;
+ return false;
+ }
+ return true;
+ }
+
+ void DBClientConnection::_checkConnection() {
+ if ( !failed )
+ return;
+ if ( lastReconnectTry && time(0)-lastReconnectTry < 2 )
+ return;
+ if ( !autoReconnect )
+ return;
+
+ lastReconnectTry = time(0);
+ log() << "trying reconnect to " << serverAddress << endl;
+ string errmsg;
+ string tmp = serverAddress;
+ failed = false;
+ if ( !connect(tmp.c_str(), errmsg) ) {
+ log() << "reconnect " << serverAddress << " failed " << errmsg << endl;
+ return;
+ }
+
+ log() << "reconnect " << serverAddress << " ok" << endl;
+ for( map< string, pair<string,string> >::iterator i = authCache.begin(); i != authCache.end(); i++ ) {
+ const char *dbname = i->first.c_str();
+ const char *username = i->second.first.c_str();
+ const char *password = i->second.second.c_str();
+ if( !DBClientBase::auth(dbname, username, password, errmsg, false) )
+ log() << "reconnect: auth failed db:" << dbname << " user:" << username << ' ' << errmsg << '\n';
+ }
+ }
+
+ auto_ptr<DBClientCursor> DBClientBase::query(const string &ns, Query query, int nToReturn,
+ int nToSkip, const BSONObj *fieldsToReturn, int queryOptions) {
+ auto_ptr<DBClientCursor> c( new DBClientCursor( this,
+ ns, query.obj, nToReturn, nToSkip,
+ fieldsToReturn, queryOptions ) );
+ if ( c->init() )
+ return c;
+ return auto_ptr< DBClientCursor >( 0 );
+ }
+
+ auto_ptr<DBClientCursor> DBClientBase::getMore( const string &ns, long long cursorId, int nToReturn, int options ) {
+ auto_ptr<DBClientCursor> c( new DBClientCursor( this, ns, cursorId, nToReturn, options ) );
+ if ( c->init() )
+ return c;
+ return auto_ptr< DBClientCursor >( 0 );
+ }
+
+ void DBClientBase::insert( const string & ns , BSONObj obj ) {
+ Message toSend;
+
+ BufBuilder b;
+ int opts = 0;
+ b.append( opts );
+ b.append( ns );
+ obj.appendSelfToBufBuilder( b );
+
+ toSend.setData( dbInsert , b.buf() , b.len() );
+
+ say( toSend );
+ }
+
+ void DBClientBase::insert( const string & ns , const vector< BSONObj > &v ) {
+ Message toSend;
+
+ BufBuilder b;
+ int opts = 0;
+ b.append( opts );
+ b.append( ns );
+ for( vector< BSONObj >::const_iterator i = v.begin(); i != v.end(); ++i )
+ i->appendSelfToBufBuilder( b );
+
+ toSend.setData( dbInsert, b.buf(), b.len() );
+
+ say( toSend );
+ }
+
+ void DBClientBase::remove( const string & ns , Query obj , bool justOne ) {
+ Message toSend;
+
+ BufBuilder b;
+ int opts = 0;
+ b.append( opts );
+ b.append( ns );
+
+ int flags = 0;
+ if ( justOne )
+ flags |= 1;
+ b.append( flags );
+
+ obj.obj.appendSelfToBufBuilder( b );
+
+ toSend.setData( dbDelete , b.buf() , b.len() );
+
+ say( toSend );
+ }
+
+ void DBClientBase::update( const string & ns , Query query , BSONObj obj , bool upsert , bool multi ) {
+
+ BufBuilder b;
+ b.append( (int)0 ); // reserverd
+ b.append( ns );
+
+ int flags = 0;
+ if ( upsert ) flags |= UpdateOption_Upsert;
+ if ( multi ) flags |= UpdateOption_Multi;
+ b.append( flags );
+
+ query.obj.appendSelfToBufBuilder( b );
+ obj.appendSelfToBufBuilder( b );
+
+ Message toSend;
+ toSend.setData( dbUpdate , b.buf() , b.len() );
+
+ say( toSend );
+ }
+
+ auto_ptr<DBClientCursor> DBClientWithCommands::getIndexes( const string &ns ){
+ return query( Namespace( ns.c_str() ).getSisterNS( "system.indexes" ).c_str() , BSON( "ns" << ns ) );
+ }
+
+ void DBClientWithCommands::dropIndex( const string& ns , BSONObj keys ){
+ dropIndex( ns , genIndexName( keys ) );
+ }
+
+
+ void DBClientWithCommands::dropIndex( const string& ns , const string& indexName ){
+ BSONObj info;
+ if ( ! runCommand( nsToDatabase( ns.c_str() ) ,
+ BSON( "deleteIndexes" << NamespaceString( ns ).coll << "index" << indexName ) ,
+ info ) ){
+ log() << "dropIndex failed: " << info << endl;
+ uassert( 10007 , "dropIndex failed" , 0 );
+ }
+ resetIndexCache();
+ }
+
+ void DBClientWithCommands::dropIndexes( const string& ns ){
+ BSONObj info;
+ uassert( 10008 , "dropIndexes failed" , runCommand( nsToDatabase( ns.c_str() ) ,
+ BSON( "deleteIndexes" << NamespaceString( ns ).coll << "index" << "*") ,
+ info ) );
+ resetIndexCache();
+ }
+
+ void DBClientWithCommands::reIndex( const string& ns ){
+ list<BSONObj> all;
+ auto_ptr<DBClientCursor> i = getIndexes( ns );
+ while ( i->more() ){
+ all.push_back( i->next().getOwned() );
+ }
+
+ dropIndexes( ns );
+
+ for ( list<BSONObj>::iterator i=all.begin(); i!=all.end(); i++ ){
+ BSONObj o = *i;
+ insert( Namespace( ns.c_str() ).getSisterNS( "system.indexes" ).c_str() , o );
+ }
+
+ }
+
+
+ string DBClientWithCommands::genIndexName( const BSONObj& keys ){
+ stringstream ss;
+
+ bool first = 1;
+ for ( BSONObjIterator i(keys); i.more(); ) {
+ BSONElement f = i.next();
+
+ if ( first )
+ first = 0;
+ else
+ ss << "_";
+
+ ss << f.fieldName() << "_";
+ if( f.isNumber() )
+ ss << f.numberInt();
+ }
+ return ss.str();
+ }
+
+ bool DBClientWithCommands::ensureIndex( const string &ns , BSONObj keys , bool unique, const string & name ) {
+ BSONObjBuilder toSave;
+ toSave.append( "ns" , ns );
+ toSave.append( "key" , keys );
+
+ string cacheKey(ns);
+ cacheKey += "--";
+
+ if ( name != "" ) {
+ toSave.append( "name" , name );
+ cacheKey += name;
+ }
+ else {
+ string nn = genIndexName( keys );
+ toSave.append( "name" , nn );
+ cacheKey += nn;
+ }
+
+ if ( unique )
+ toSave.appendBool( "unique", unique );
+
+ if ( _seenIndexes.count( cacheKey ) )
+ return 0;
+ _seenIndexes.insert( cacheKey );
+
+ insert( Namespace( ns.c_str() ).getSisterNS( "system.indexes" ).c_str() , toSave.obj() );
+ return 1;
+ }
+
+ void DBClientWithCommands::resetIndexCache() {
+ _seenIndexes.clear();
+ }
+
+ /* -- DBClientCursor ---------------------------------------------- */
+
+ void assembleRequest( const string &ns, BSONObj query, int nToReturn, int nToSkip, const BSONObj *fieldsToReturn, int queryOptions, Message &toSend ) {
+ CHECK_OBJECT( query , "assembleRequest query" );
+ // see query.h for the protocol we are using here.
+ BufBuilder b;
+ int opts = queryOptions;
+ b.append(opts);
+ b.append(ns.c_str());
+ b.append(nToSkip);
+ b.append(nToReturn);
+ query.appendSelfToBufBuilder(b);
+ if ( fieldsToReturn )
+ fieldsToReturn->appendSelfToBufBuilder(b);
+ toSend.setData(dbQuery, b.buf(), b.len());
+ }
+
+ void DBClientConnection::say( Message &toSend ) {
+ checkConnection();
+ try {
+ port().say( toSend );
+ } catch( SocketException & ) {
+ failed = true;
+ throw;
+ }
+ }
+
+ void DBClientConnection::sayPiggyBack( Message &toSend ) {
+ port().piggyBack( toSend );
+ }
+
+ bool DBClientConnection::call( Message &toSend, Message &response, bool assertOk ) {
+ /* todo: this is very ugly messagingport::call returns an error code AND can throw
+ an exception. we should make it return void and just throw an exception anytime
+ it fails
+ */
+ try {
+ if ( !port().call(toSend, response) ) {
+ failed = true;
+ if ( assertOk )
+ massert( 10278 , "dbclient error communicating with server", false);
+ return false;
+ }
+ }
+ catch( SocketException & ) {
+ failed = true;
+ throw;
+ }
+ return true;
+ }
+
+ void DBClientConnection::checkResponse( const char *data, int nReturned ) {
+ /* check for errors. the only one we really care about at
+ this stage is "not master" */
+ if ( clientPaired && nReturned ) {
+ BSONObj o(data);
+ BSONElement e = o.firstElement();
+ if ( strcmp(e.fieldName(), "$err") == 0 &&
+ e.type() == String && strncmp(e.valuestr(), "not master", 10) == 0 ) {
+ clientPaired->isntMaster();
+ }
+ }
+ }
+
+ bool DBClientCursor::init() {
+ Message toSend;
+ if ( !cursorId ) {
+ assembleRequest( ns, query, nToReturn, nToSkip, fieldsToReturn, opts, toSend );
+ } else {
+ BufBuilder b;
+ b.append( opts );
+ b.append( ns.c_str() );
+ b.append( nToReturn );
+ b.append( cursorId );
+ toSend.setData( dbGetMore, b.buf(), b.len() );
+ }
+ if ( !connector->call( toSend, *m, false ) )
+ return false;
+ dataReceived();
+ return true;
+ }
+
+ void DBClientCursor::requestMore() {
+ assert( cursorId && pos == nReturned );
+
+ BufBuilder b;
+ b.append(opts);
+ b.append(ns.c_str());
+ b.append(nToReturn);
+ b.append(cursorId);
+
+ Message toSend;
+ toSend.setData(dbGetMore, b.buf(), b.len());
+ auto_ptr<Message> response(new Message());
+ connector->call( toSend, *response );
+
+ m = response;
+ dataReceived();
+ }
+
+ void DBClientCursor::dataReceived() {
+ QueryResult *qr = (QueryResult *) m->data;
+ resultFlags = qr->resultFlags();
+ if ( qr->resultFlags() & QueryResult::ResultFlag_CursorNotFound ) {
+ // cursor id no longer valid at the server.
+ assert( qr->cursorId == 0 );
+ cursorId = 0; // 0 indicates no longer valid (dead)
+ // TODO: should we throw a UserException here???
+ }
+ if ( cursorId == 0 || ! ( opts & QueryOption_CursorTailable ) ) {
+ // only set initially: we don't want to kill it on end of data
+ // if it's a tailable cursor
+ cursorId = qr->cursorId;
+ }
+ nReturned = qr->nReturned;
+ pos = 0;
+ data = qr->data();
+
+ connector->checkResponse( data, nReturned );
+ /* this assert would fire the way we currently work:
+ assert( nReturned || cursorId == 0 );
+ */
+ }
+
+ /** If true, safe to call next(). Requests more from server if necessary. */
+ bool DBClientCursor::more() {
+ if ( pos < nReturned )
+ return true;
+
+ if ( cursorId == 0 )
+ return false;
+
+ requestMore();
+ return pos < nReturned;
+ }
+
+ BSONObj DBClientCursor::next() {
+ assert( more() );
+ pos++;
+ BSONObj o(data);
+ data += o.objsize();
+ return o;
+ }
+
+ DBClientCursor::~DBClientCursor() {
+ if ( cursorId && _ownCursor ) {
+ BufBuilder b;
+ b.append( (int)0 ); // reserved
+ b.append( (int)1 ); // number
+ b.append( cursorId );
+
+ Message m;
+ m.setData( dbKillCursors , b.buf() , b.len() );
+
+ connector->sayPiggyBack( m );
+ }
+
+ }
+
+ /* --- class dbclientpaired --- */
+
+ string DBClientPaired::toString() {
+ stringstream ss;
+ ss << "state: " << master << '\n';
+ ss << "left: " << left.toStringLong() << '\n';
+ ss << "right: " << right.toStringLong() << '\n';
+ return ss.str();
+ }
+
+#pragma warning(disable: 4355)
+ DBClientPaired::DBClientPaired() :
+ left(true, this), right(true, this)
+ {
+ master = NotSetL;
+ }
+#pragma warning(default: 4355)
+
+ /* find which server, the left or right, is currently master mode */
+ void DBClientPaired::_checkMaster() {
+ for ( int retry = 0; retry < 2; retry++ ) {
+ int x = master;
+ for ( int pass = 0; pass < 2; pass++ ) {
+ DBClientConnection& c = x == 0 ? left : right;
+ try {
+ bool im;
+ BSONObj o;
+ c.isMaster(im, &o);
+ if ( retry )
+ log() << "checkmaster: " << c.toString() << ' ' << o.toString() << '\n';
+ if ( im ) {
+ master = (State) (x + 2);
+ return;
+ }
+ }
+ catch (AssertionException&) {
+ if ( retry )
+ log() << "checkmaster: caught exception " << c.toString() << '\n';
+ }
+ x = x^1;
+ }
+ sleepsecs(1);
+ }
+
+ uassert( 10009 , "checkmaster: no master found", false);
+ }
+
+ inline DBClientConnection& DBClientPaired::checkMaster() {
+ if ( master > NotSetR ) {
+ // a master is selected. let's just make sure connection didn't die
+ DBClientConnection& c = master == Left ? left : right;
+ if ( !c.isFailed() )
+ return c;
+ // after a failure, on the next checkMaster, start with the other
+ // server -- presumably it took over. (not critical which we check first,
+ // just will make the failover slightly faster if we guess right)
+ master = master == Left ? NotSetR : NotSetL;
+ }
+
+ _checkMaster();
+ assert( master > NotSetR );
+ return master == Left ? left : right;
+ }
+
+ DBClientConnection& DBClientPaired::slaveConn(){
+ DBClientConnection& m = checkMaster();
+ assert( ! m.isFailed() );
+ return master == Left ? right : left;
+ }
+
+ bool DBClientPaired::connect(const string &serverHostname1, const string &serverHostname2) {
+ string errmsg;
+ bool l = left.connect(serverHostname1, errmsg);
+ bool r = right.connect(serverHostname2, errmsg);
+ master = l ? NotSetL : NotSetR;
+ if ( !l && !r ) // it would be ok to fall through, but checkMaster will then try an immediate reconnect which is slow
+ return false;
+ try {
+ checkMaster();
+ }
+ catch (AssertionException&) {
+ return false;
+ }
+ return true;
+ }
+
+ bool DBClientPaired::connect(string hostpairstring) {
+ size_t comma = hostpairstring.find( "," );
+ uassert( 10010 , "bad hostpairstring", comma != string::npos);
+ return connect( hostpairstring.substr( 0 , comma ) , hostpairstring.substr( comma + 1 ) );
+ }
+
+ bool DBClientPaired::auth(const string &dbname, const string &username, const string &pwd, string& errmsg) {
+ DBClientConnection& m = checkMaster();
+ if( !m.auth(dbname, username, pwd, errmsg) )
+ return false;
+ /* we try to authentiate with the other half of the pair -- even if down, that way the authInfo is cached. */
+ string e;
+ try {
+ if( &m == &left )
+ right.auth(dbname, username, pwd, e);
+ else
+ left.auth(dbname, username, pwd, e);
+ }
+ catch( AssertionException&) {
+ }
+ return true;
+ }
+
+ auto_ptr<DBClientCursor> DBClientPaired::query(const string &a, Query b, int c, int d,
+ const BSONObj *e, int f)
+ {
+ return checkMaster().query(a,b,c,d,e,f);
+ }
+
+ BSONObj DBClientPaired::findOne(const string &a, Query b, const BSONObj *c, int d) {
+ return checkMaster().findOne(a,b,c,d);
+ }
+
+ void testPaired() {
+ DBClientPaired p;
+ log() << "connect returns " << p.connect("localhost:27017", "localhost:27018") << endl;
+
+ //DBClientConnection p(true);
+ string errmsg;
+ // log() << "connect " << p.connect("localhost", errmsg) << endl;
+ log() << "auth " << p.auth("dwight", "u", "p", errmsg) << endl;
+
+ while( 1 ) {
+ sleepsecs(3);
+ try {
+ log() << "findone returns " << p.findOne("dwight.foo", BSONObj()).toString() << endl;
+ sleepsecs(3);
+ BSONObj info;
+ bool im;
+ log() << "ismaster returns " << p.isMaster(im,&info) << " info: " << info.toString() << endl;
+ }
+ catch(...) {
+ cout << "caught exception" << endl;
+ }
+ }
+ }
+
+} // namespace mongo
diff --git a/client/dbclient.h b/client/dbclient.h
new file mode 100644
index 0000000..e3f1675
--- /dev/null
+++ b/client/dbclient.h
@@ -0,0 +1,894 @@
+/** @file dbclient.h - connect to a Mongo database as a database, from C++ */
+
+/* Copyright 2009 10gen Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "../stdafx.h"
+#include "../util/message.h"
+#include "../db/jsobj.h"
+#include "../db/json.h"
+
+namespace mongo {
+
+ /** the query field 'options' can have these bits set: */
+ enum QueryOptions {
+ /** Tailable means cursor is not closed when the last data is retrieved. rather, the cursor marks
+ the final object's position. you can resume using the cursor later, from where it was located,
+ if more data were received. Set on dbQuery and dbGetMore.
+
+ like any "latent cursor", the cursor may become invalid at some point -- for example if that
+ final object it references were deleted. Thus, you should be prepared to requery if you get back
+ ResultFlag_CursorNotFound.
+ */
+ QueryOption_CursorTailable = 1 << 1,
+
+ /** allow query of replica slave. normally these return an error except for namespace "local".
+ */
+ QueryOption_SlaveOk = 1 << 2,
+
+ // findingStart mode is used to find the first operation of interest when
+ // we are scanning through a repl log. For efficiency in the common case,
+ // where the first operation of interest is closer to the tail than the head,
+ // we start from the tail of the log and work backwards until we find the
+ // first operation of interest. Then we scan forward from that first operation,
+ // actually returning results to the client. During the findingStart phase,
+ // we release the db mutex occasionally to avoid blocking the db process for
+ // an extended period of time.
+ QueryOption_OplogReplay = 1 << 3,
+
+ /** The server normally times out idle cursors after an inactivy period to prevent excess memory use
+ Set this option to prevent that.
+ */
+ QueryOption_NoCursorTimeout = 1 << 4,
+
+ /** Use with QueryOption_CursorTailable. If we are at the end of the data, block for a while rather
+ than returning no data. After a timeout period, we do return as normal.
+ */
+ QueryOption_AwaitData = 1 << 5
+
+ };
+
+ enum UpdateOptions {
+ /** Upsert - that is, insert the item if no matching item is found. */
+ UpdateOption_Upsert = 1 << 0,
+
+ /** Update multiple documents (if multiple documents match query expression).
+ (Default is update a single document and stop.) */
+ UpdateOption_Multi = 1 << 1
+ };
+
+ class BSONObj;
+
+ /** Represents a Mongo query expression. Typically one uses the QUERY(...) macro to construct a Query object.
+ Examples:
+ QUERY( "age" << 33 << "school" << "UCLA" ).sort("name")
+ QUERY( "age" << GT << 30 << LT << 50 )
+ */
+ class Query {
+ public:
+ BSONObj obj;
+ Query() : obj(BSONObj()) { }
+ Query(const BSONObj& b) : obj(b) { }
+ Query(const string &json) :
+ obj(fromjson(json)) { }
+ Query(const char * json) :
+ obj(fromjson(json)) { }
+
+ /** Add a sort (ORDER BY) criteria to the query expression.
+ @param sortPattern the sort order template. For example to order by name ascending, time descending:
+ { name : 1, ts : -1 }
+ i.e.
+ BSON( "name" << 1 << "ts" << -1 )
+ or
+ fromjson(" name : 1, ts : -1 ")
+ */
+ Query& sort(const BSONObj& sortPattern);
+
+ /** Add a sort (ORDER BY) criteria to the query expression.
+ This version of sort() assumes you want to sort on a single field.
+ @param asc = 1 for ascending order
+ asc = -1 for descending order
+ */
+ Query& sort(const string &field, int asc = 1) { sort( BSON( field << asc ) ); return *this; }
+
+ /** Provide a hint to the query.
+ @param keyPattern Key pattern for the index to use.
+ Example:
+ hint("{ts:1}")
+ */
+ Query& hint(BSONObj keyPattern);
+ Query& hint(const string &jsonKeyPatt) { return hint(fromjson(jsonKeyPatt)); }
+
+ /** Provide min and/or max index limits for the query.
+ min <= x < max
+ */
+ Query& minKey(const BSONObj &val);
+ /**
+ max is exclusive
+ */
+ Query& maxKey(const BSONObj &val);
+
+ /** Return explain information about execution of this query instead of the actual query results.
+ Normally it is easier to use the mongo shell to run db.find(...).explain().
+ */
+ Query& explain();
+
+ /** Use snapshot mode for the query. Snapshot mode assures no duplicates are returned, or objects missed, which were
+ present at both the start and end of the query's execution (if an object is new during the query, or deleted during
+ the query, it may or may not be returned, even with snapshot mode).
+
+ Note that short query responses (less than 1MB) are always effectively snapshotted.
+
+ Currently, snapshot mode may not be used with sorting or explicit hints.
+ */
+ Query& snapshot();
+
+ /** Queries to the Mongo database support a $where parameter option which contains
+ a javascript function that is evaluated to see whether objects being queried match
+ its criteria. Use this helper to append such a function to a query object.
+ Your query may also contain other traditional Mongo query terms.
+
+ @param jscode The javascript function to evaluate against each potential object
+ match. The function must return true for matched objects. Use the this
+ variable to inspect the current object.
+ @param scope SavedContext for the javascript object. List in a BSON object any
+ variables you would like defined when the jscode executes. One can think
+ of these as "bind variables".
+
+ Examples:
+ conn.findOne("test.coll", Query("{a:3}").where("this.b == 2 || this.c == 3"));
+ Query badBalance = Query().where("this.debits - this.credits < 0");
+ */
+ Query& where(const string &jscode, BSONObj scope);
+ Query& where(const string &jscode) { return where(jscode, BSONObj()); }
+
+ /**
+ * if this query has an orderby, hint, or some other field
+ */
+ bool isComplex() const;
+
+ BSONObj getFilter() const;
+ BSONObj getSort() const;
+ BSONObj getHint() const;
+ bool isExplain() const;
+
+ string toString() const;
+ operator string() const { return toString(); }
+ private:
+ void makeComplex();
+ template< class T >
+ void appendComplex( const char *fieldName, const T& val ) {
+ makeComplex();
+ BSONObjBuilder b;
+ b.appendElements(obj);
+ b.append(fieldName, val);
+ obj = b.obj();
+ }
+ };
+
+/** Typically one uses the QUERY(...) macro to construct a Query object.
+ Example: QUERY( "age" << 33 << "school" << "UCLA" )
+*/
+#define QUERY(x) Query( BSON(x) )
+
+ /**
+ interface that handles communication with the db
+ */
+ class DBConnector {
+ public:
+ virtual ~DBConnector() {}
+ virtual bool call( Message &toSend, Message &response, bool assertOk=true ) = 0;
+ virtual void say( Message &toSend ) = 0;
+ virtual void sayPiggyBack( Message &toSend ) = 0;
+ virtual void checkResponse( const string &data, int nReturned ) {}
+ };
+
+ /** Queries return a cursor object */
+ class DBClientCursor : boost::noncopyable {
+ friend class DBClientBase;
+ bool init();
+ public:
+ /** If true, safe to call next(). Requests more from server if necessary. */
+ bool more();
+
+ /** next
+ @return next object in the result cursor.
+ on an error at the remote server, you will get back:
+ { $err: <string> }
+ if you do not want to handle that yourself, call nextSafe().
+ */
+ BSONObj next();
+
+ /** throws AssertionException if get back { $err : ... } */
+ BSONObj nextSafe() {
+ BSONObj o = next();
+ BSONElement e = o.firstElement();
+ assert( strcmp(e.fieldName(), "$err") != 0 );
+ return o;
+ }
+
+ /**
+ iterate the rest of the cursor and return the number if items
+ */
+ int itcount(){
+ int c = 0;
+ while ( more() ){
+ next();
+ c++;
+ }
+ return c;
+ }
+
+ /** cursor no longer valid -- use with tailable cursors.
+ note you should only rely on this once more() returns false;
+ 'dead' may be preset yet some data still queued and locally
+ available from the dbclientcursor.
+ */
+ bool isDead() const {
+ return cursorId == 0;
+ }
+
+ bool tailable() const {
+ return (opts & QueryOption_CursorTailable) != 0;
+ }
+
+ bool hasResultFlag( int flag ){
+ return (resultFlags & flag) != 0;
+ }
+ public:
+ DBClientCursor( DBConnector *_connector, const string &_ns, BSONObj _query, int _nToReturn,
+ int _nToSkip, const BSONObj *_fieldsToReturn, int queryOptions ) :
+ connector(_connector),
+ ns(_ns),
+ query(_query),
+ nToReturn(_nToReturn),
+ nToSkip(_nToSkip),
+ fieldsToReturn(_fieldsToReturn),
+ opts(queryOptions),
+ m(new Message()),
+ cursorId(),
+ nReturned(),
+ pos(),
+ data(),
+ _ownCursor( true ) {
+ }
+
+ DBClientCursor( DBConnector *_connector, const string &_ns, long long _cursorId, int _nToReturn, int options ) :
+ connector(_connector),
+ ns(_ns),
+ nToReturn( _nToReturn ),
+ opts( options ),
+ m(new Message()),
+ cursorId( _cursorId ),
+ nReturned(),
+ pos(),
+ data(),
+ _ownCursor( true ) {
+ }
+
+ virtual ~DBClientCursor();
+
+ long long getCursorId() const { return cursorId; }
+
+ /** by default we "own" the cursor and will send the server a KillCursor
+ message when ~DBClientCursor() is called. This function overrides that.
+ */
+ void decouple() { _ownCursor = false; }
+
+ private:
+ DBConnector *connector;
+ string ns;
+ BSONObj query;
+ int nToReturn;
+ int nToSkip;
+ const BSONObj *fieldsToReturn;
+ int opts;
+ auto_ptr<Message> m;
+
+ int resultFlags;
+ long long cursorId;
+ int nReturned;
+ int pos;
+ const char *data;
+ void dataReceived();
+ void requestMore();
+ bool _ownCursor; // see decouple()
+ };
+
+ /**
+ The interface that any db connection should implement
+ */
+ class DBClientInterface : boost::noncopyable {
+ public:
+ virtual auto_ptr<DBClientCursor> query(const string &ns, Query query, int nToReturn = 0, int nToSkip = 0,
+ const BSONObj *fieldsToReturn = 0, int queryOptions = 0) = 0;
+
+ virtual auto_ptr<DBClientCursor> getMore( const string &ns, long long cursorId, int nToReturn = 0, int options = 0 ) = 0;
+
+ virtual void insert( const string &ns, BSONObj obj ) = 0;
+
+ virtual void insert( const string &ns, const vector< BSONObj >& v ) = 0;
+
+ virtual void remove( const string &ns , Query query, bool justOne = 0 ) = 0;
+
+ virtual void update( const string &ns , Query query , BSONObj obj , bool upsert = 0 , bool multi = 0 ) = 0;
+
+ virtual ~DBClientInterface() { }
+
+ /**
+ @return a single object that matches the query. if none do, then the object is empty
+ @throws AssertionException
+ */
+ virtual BSONObj findOne(const string &ns, Query query, const BSONObj *fieldsToReturn = 0, int queryOptions = 0);
+
+
+ };
+
+ /**
+ DB "commands"
+ Basically just invocations of connection.$cmd.findOne({...});
+ */
+ class DBClientWithCommands : public DBClientInterface {
+ bool isOk(const BSONObj&);
+ set<string> _seenIndexes;
+ public:
+
+ /** helper function. run a simple command where the command expression is simply
+ { command : 1 }
+ @param info -- where to put result object. may be null if caller doesn't need that info
+ @param command -- command name
+ @return true if the command returned "ok".
+ */
+ bool simpleCommand(const string &dbname, BSONObj *info, const string &command);
+
+ /** Run a database command. Database commands are represented as BSON objects. Common database
+ commands have prebuilt helper functions -- see below. If a helper is not available you can
+ directly call runCommand.
+
+ @param dbname database name. Use "admin" for global administrative commands.
+ @param cmd the command object to execute. For example, { ismaster : 1 }
+ @param info the result object the database returns. Typically has { ok : ..., errmsg : ... } fields
+ set.
+ @return true if the command returned "ok".
+ */
+ bool runCommand(const string &dbname, const BSONObj& cmd, BSONObj &info, int options=0);
+
+ /** Authorize access to a particular database.
+ Authentication is separate for each database on the server -- you may authenticate for any
+ number of databases on a single connection.
+ The "admin" database is special and once authenticated provides access to all databases on the
+ server.
+ @param digestPassword if password is plain text, set this to true. otherwise assumed to be pre-digested
+ @return true if successful
+ */
+ virtual bool auth(const string &dbname, const string &username, const string &pwd, string& errmsg, bool digestPassword = true);
+
+ /** count number of objects in collection ns that match the query criteria specified
+ throws UserAssertion if database returns an error
+ */
+ unsigned long long count(const string &ns, const BSONObj& query = BSONObj(), int options=0 );
+
+ string createPasswordDigest( const string &username , const string &clearTextPassword );
+
+ /** returns true in isMaster parm if this db is the current master
+ of a replica pair.
+
+ pass in info for more details e.g.:
+ { "ismaster" : 1.0 , "msg" : "not paired" , "ok" : 1.0 }
+
+ returns true if command invoked successfully.
+ */
+ virtual bool isMaster(bool& isMaster, BSONObj *info=0);
+
+ /**
+ Create a new collection in the database. Normally, collection creation is automatic. You would
+ use this function if you wish to specify special options on creation.
+
+ If the collection already exists, no action occurs.
+
+ ns: fully qualified collection name
+ size: desired initial extent size for the collection.
+ Must be <= 1000000000 for normal collections.
+ For fixed size (capped) collections, this size is the total/max size of the
+ collection.
+ capped: if true, this is a fixed size collection (where old data rolls out).
+ max: maximum number of objects if capped (optional).
+
+ returns true if successful.
+ */
+ bool createCollection(const string &ns, unsigned size = 0, bool capped = false, int max = 0, BSONObj *info = 0);
+
+ /** Get error result from the last operation on this connection.
+ @return error message text, or empty string if no error.
+ */
+ string getLastError();
+ /** Get error result from the last operation on this connection.
+ @return full error object.
+ */
+ BSONObj getLastErrorDetailed();
+
+ /** Return the last error which has occurred, even if not the very last operation.
+
+ @return { err : <error message>, nPrev : <how_many_ops_back_occurred>, ok : 1 }
+
+ result.err will be null if no error has occurred.
+ */
+ BSONObj getPrevError();
+
+ /** Reset the previous error state for this connection (accessed via getLastError and
+ getPrevError). Useful when performing several operations at once and then checking
+ for an error after attempting all operations.
+ */
+ bool resetError() { return simpleCommand("admin", 0, "reseterror"); }
+
+ /** Delete the specified collection. */
+ virtual bool dropCollection( const string &ns ){
+ string db = nsGetDB( ns );
+ string coll = nsGetCollection( ns );
+ uassert( 10011 , "no collection name", coll.size() );
+
+ BSONObj info;
+
+ bool res = runCommand( db.c_str() , BSON( "drop" << coll ) , info );
+ resetIndexCache();
+ return res;
+ }
+
+ /** Perform a repair and compaction of the specified database. May take a long time to run. Disk space
+ must be available equal to the size of the database while repairing.
+ */
+ bool repairDatabase(const string &dbname, BSONObj *info = 0) {
+ return simpleCommand(dbname, info, "repairDatabase");
+ }
+
+ /** Copy database from one server or name to another server or name.
+
+ Generally, you should dropDatabase() first as otherwise the copied information will MERGE
+ into whatever data is already present in this database.
+
+ For security reasons this function only works when you are authorized to access the "admin" db. However,
+ if you have access to said db, you can copy any database from one place to another.
+ TODO: this needs enhancement to be more flexible in terms of security.
+
+ This method provides a way to "rename" a database by copying it to a new db name and
+ location. The copy is "repaired" and compacted.
+
+ fromdb database name from which to copy.
+ todb database name to copy to.
+ fromhost hostname of the database (and optionally, ":port") from which to
+ copy the data. copies from self if "".
+
+ returns true if successful
+ */
+ bool copyDatabase(const string &fromdb, const string &todb, const string &fromhost = "", BSONObj *info = 0);
+
+ /** The Mongo database provides built-in performance profiling capabilities. Uset setDbProfilingLevel()
+ to enable. Profiling information is then written to the system.profiling collection, which one can
+ then query.
+ */
+ enum ProfilingLevel {
+ ProfileOff = 0,
+ ProfileSlow = 1, // log very slow (>100ms) operations
+ ProfileAll = 2
+ };
+ bool setDbProfilingLevel(const string &dbname, ProfilingLevel level, BSONObj *info = 0);
+ bool getDbProfilingLevel(const string &dbname, ProfilingLevel& level, BSONObj *info = 0);
+
+ /** Run a map/reduce job on the server.
+
+ See http://www.mongodb.org/display/DOCS/MapReduce
+
+ ns namespace (db+collection name) of input data
+ jsmapf javascript map function code
+ jsreducef javascript reduce function code.
+ query optional query filter for the input
+ output optional permanent output collection name. if not specified server will
+ generate a temporary collection and return its name.
+
+ returns a result object which contains:
+ { result : <collection_name>,
+ numObjects : <number_of_objects_scanned>,
+ timeMillis : <job_time>,
+ ok : <1_if_ok>,
+ [, err : <errmsg_if_error>]
+ }
+
+ For example one might call:
+ result.getField("ok").trueValue()
+ on the result to check if ok.
+ */
+ BSONObj mapreduce(const string &ns, const string &jsmapf, const string &jsreducef, BSONObj query = BSONObj(), const string& output = "");
+
+ /** Run javascript code on the database server.
+ dbname database SavedContext in which the code runs. The javascript variable 'db' will be assigned
+ to this database when the function is invoked.
+ jscode source code for a javascript function.
+ info the command object which contains any information on the invocation result including
+ the return value and other information. If an error occurs running the jscode, error
+ information will be in info. (try "out() << info.toString()")
+ retValue return value from the jscode function.
+ args args to pass to the jscode function. when invoked, the 'args' variable will be defined
+ for use by the jscode.
+
+ returns true if runs ok.
+
+ See testDbEval() in dbclient.cpp for an example of usage.
+ */
+ bool eval(const string &dbname, const string &jscode, BSONObj& info, BSONElement& retValue, BSONObj *args = 0);
+
+ /**
+
+ */
+ bool validate( const string &ns , bool scandata=true ){
+ BSONObj cmd = BSON( "validate" << nsGetCollection( ns ) << "scandata" << scandata );
+ BSONObj info;
+ return runCommand( nsGetDB( ns ).c_str() , cmd , info );
+ }
+
+ /* The following helpers are simply more convenient forms of eval() for certain common cases */
+
+ /* invocation with no return value of interest -- with or without one simple parameter */
+ bool eval(const string &dbname, const string &jscode);
+ template< class T >
+ bool eval(const string &dbname, const string &jscode, T parm1) {
+ BSONObj info;
+ BSONElement retValue;
+ BSONObjBuilder b;
+ b.append("0", parm1);
+ BSONObj args = b.done();
+ return eval(dbname, jscode, info, retValue, &args);
+ }
+
+ /** eval invocation with one parm to server and one numeric field (either int or double) returned */
+ template< class T, class NumType >
+ bool eval(const string &dbname, const string &jscode, T parm1, NumType& ret) {
+ BSONObj info;
+ BSONElement retValue;
+ BSONObjBuilder b;
+ b.append("0", parm1);
+ BSONObj args = b.done();
+ if ( !eval(dbname, jscode, info, retValue, &args) )
+ return false;
+ ret = (NumType) retValue.number();
+ return true;
+ }
+
+ /**
+ get a list of all the current databases
+ */
+ list<string> getDatabaseNames();
+
+ /**
+ get a list of all the current collections in db
+ */
+ list<string> getCollectionNames( const string& db );
+
+ bool exists( const string& ns );
+
+
+ /** Create an index if it does not already exist.
+ ensureIndex calls are remembered so it is safe/fast to call this function many
+ times in your code.
+ @param ns collection to be indexed
+ @param keys the "key pattern" for the index. e.g., { name : 1 }
+ @param unique if true, indicates that key uniqueness should be enforced for this index
+ @param name if not isn't specified, it will be created from the keys (recommended)
+ @return whether or not sent message to db.
+ should be true on first call, false on subsequent unless resetIndexCache was called
+ */
+ virtual bool ensureIndex( const string &ns , BSONObj keys , bool unique = false, const string &name = "" );
+
+ /**
+ clears the index cache, so the subsequent call to ensureIndex for any index will go to the server
+ */
+ virtual void resetIndexCache();
+
+ virtual auto_ptr<DBClientCursor> getIndexes( const string &ns );
+
+ virtual void dropIndex( const string& ns , BSONObj keys );
+ virtual void dropIndex( const string& ns , const string& indexName );
+
+ /**
+ drops all indexes for the collection
+ */
+ virtual void dropIndexes( const string& ns );
+
+ virtual void reIndex( const string& ns );
+
+ string genIndexName( const BSONObj& keys );
+
+ /** Erase / drop an entire database */
+ virtual bool dropDatabase(const string &dbname, BSONObj *info = 0) {
+ bool ret = simpleCommand(dbname, info, "dropDatabase");
+ resetIndexCache();
+ return ret;
+ }
+
+ virtual string toString() = 0;
+
+ /** @return the database name portion of an ns string */
+ string nsGetDB( const string &ns ){
+ string::size_type pos = ns.find( "." );
+ if ( pos == string::npos )
+ return ns;
+
+ return ns.substr( 0 , pos );
+ }
+
+ /** @return the collection name portion of an ns string */
+ string nsGetCollection( const string &ns ){
+ string::size_type pos = ns.find( "." );
+ if ( pos == string::npos )
+ return "";
+
+ return ns.substr( pos + 1 );
+ }
+
+ };
+
+ /**
+ abstract class that implements the core db operations
+ */
+ class DBClientBase : public DBClientWithCommands, public DBConnector {
+ public:
+ /** send a query to the database.
+ ns: namespace to query, format is <dbname>.<collectname>[.<collectname>]*
+ query: query to perform on the collection. this is a BSONObj (binary JSON)
+ You may format as
+ { query: { ... }, orderby: { ... } }
+ to specify a sort order.
+ nToReturn: n to return. 0 = unlimited
+ nToSkip: start with the nth item
+ fieldsToReturn:
+ optional template of which fields to select. if unspecified, returns all fields
+ queryOptions: see options enum at top of this file
+
+ @return cursor. 0 if error (connection failure)
+ @throws AssertionException
+ */
+ virtual auto_ptr<DBClientCursor> query(const string &ns, Query query, int nToReturn = 0, int nToSkip = 0,
+ const BSONObj *fieldsToReturn = 0, int queryOptions = 0);
+
+ /** @param cursorId id of cursor to retrieve
+ @return an handle to a previously allocated cursor
+ @throws AssertionException
+ */
+ virtual auto_ptr<DBClientCursor> getMore( const string &ns, long long cursorId, int nToReturn = 0, int options = 0 );
+
+ /**
+ insert an object into the database
+ */
+ virtual void insert( const string &ns , BSONObj obj );
+
+ /**
+ insert a vector of objects into the database
+ */
+ virtual void insert( const string &ns, const vector< BSONObj >& v );
+
+ /**
+ remove matching objects from the database
+ @param justOne if this true, then once a single match is found will stop
+ */
+ virtual void remove( const string &ns , Query q , bool justOne = 0 );
+
+ /**
+ updates objects matching query
+ */
+ virtual void update( const string &ns , Query query , BSONObj obj , bool upsert = 0 , bool multi = 0 );
+
+ virtual string getServerAddress() const = 0;
+
+ virtual bool isFailed() const = 0;
+
+ };
+
+ class DBClientPaired;
+
+ class ConnectException : public UserException {
+ public:
+ ConnectException(string msg) : UserException(9000,msg) { }
+ };
+
+ /**
+ A basic connection to the database.
+ This is the main entry point for talking to a simple Mongo setup
+ */
+ class DBClientConnection : public DBClientBase {
+ DBClientPaired *clientPaired;
+ auto_ptr<MessagingPort> p;
+ auto_ptr<SockAddr> server;
+ bool failed; // true if some sort of fatal error has ever happened
+ bool autoReconnect;
+ time_t lastReconnectTry;
+ string serverAddress; // remember for reconnects
+ void _checkConnection();
+ void checkConnection() { if( failed ) _checkConnection(); }
+ map< string, pair<string,string> > authCache;
+ public:
+
+ /**
+ @param _autoReconnect if true, automatically reconnect on a connection failure
+ @param cp used by DBClientPaired. You do not need to specify this parameter
+ */
+ DBClientConnection(bool _autoReconnect=false,DBClientPaired* cp=0) :
+ clientPaired(cp), failed(false), autoReconnect(_autoReconnect), lastReconnectTry(0) { }
+
+ /** Connect to a Mongo database server.
+
+ If autoReconnect is true, you can try to use the DBClientConnection even when
+ false was returned -- it will try to connect again.
+
+ @param serverHostname host to connect to. can include port number ( 127.0.0.1 , 127.0.0.1:5555 )
+ @param errmsg any relevant error message will appended to the string
+ @return false if fails to connect.
+ */
+ virtual bool connect(const string &serverHostname, string& errmsg);
+
+ /** Connect to a Mongo database server. Exception throwing version.
+ Throws a UserException if cannot connect.
+
+ If autoReconnect is true, you can try to use the DBClientConnection even when
+ false was returned -- it will try to connect again.
+
+ @param serverHostname host to connect to. can include port number ( 127.0.0.1 , 127.0.0.1:5555 )
+ */
+ void connect(string serverHostname) {
+ string errmsg;
+ if( !connect(serverHostname.c_str(), errmsg) )
+ throw ConnectException(string("can't connect ") + errmsg);
+ }
+
+ virtual bool auth(const string &dbname, const string &username, const string &pwd, string& errmsg, bool digestPassword = true);
+
+ virtual auto_ptr<DBClientCursor> query(const string &ns, Query query, int nToReturn = 0, int nToSkip = 0,
+ const BSONObj *fieldsToReturn = 0, int queryOptions = 0) {
+ checkConnection();
+ return DBClientBase::query( ns, query, nToReturn, nToSkip, fieldsToReturn, queryOptions );
+ }
+
+ /**
+ @return true if this connection is currently in a failed state. When autoreconnect is on,
+ a connection will transition back to an ok state after reconnecting.
+ */
+ bool isFailed() const {
+ return failed;
+ }
+
+ MessagingPort& port() {
+ return *p.get();
+ }
+
+ string toStringLong() const {
+ stringstream ss;
+ ss << serverAddress;
+ if ( failed ) ss << " failed";
+ return ss.str();
+ }
+
+ /** Returns the address of the server */
+ string toString() {
+ return serverAddress;
+ }
+
+ string getServerAddress() const {
+ return serverAddress;
+ }
+
+ protected:
+ virtual bool call( Message &toSend, Message &response, bool assertOk = true );
+ virtual void say( Message &toSend );
+ virtual void sayPiggyBack( Message &toSend );
+ virtual void checkResponse( const char *data, int nReturned );
+ };
+
+ /** Use this class to connect to a replica pair of servers. The class will manage
+ checking for which server in a replica pair is master, and do failover automatically.
+
+ On a failover situation, expect at least one operation to return an error (throw
+ an exception) before the failover is complete. Operations are not retried.
+ */
+ class DBClientPaired : public DBClientBase {
+ DBClientConnection left,right;
+ enum State {
+ NotSetL=0,
+ NotSetR=1,
+ Left, Right
+ } master;
+
+ void _checkMaster();
+ DBClientConnection& checkMaster();
+
+ public:
+ /** Call connect() after constructing. autoReconnect is always on for DBClientPaired connections. */
+ DBClientPaired();
+
+ /** Returns false is neither member of the pair were reachable, or neither is
+ master, although,
+ when false returned, you can still try to use this connection object, it will
+ try reconnects.
+ */
+ bool connect(const string &serverHostname1, const string &serverHostname2);
+
+ /** Connect to a server pair using a host pair string of the form
+ hostname[:port],hostname[:port]
+ */
+ bool connect(string hostpairstring);
+
+ /** Authorize. Authorizes both sides of the pair as needed.
+ */
+ bool auth(const string &dbname, const string &username, const string &pwd, string& errmsg);
+
+ /** throws userassertion "no master found" */
+ virtual
+ auto_ptr<DBClientCursor> query(const string &ns, Query query, int nToReturn = 0, int nToSkip = 0,
+ const BSONObj *fieldsToReturn = 0, int queryOptions = 0);
+
+ /** throws userassertion "no master found" */
+ virtual
+ BSONObj findOne(const string &ns, Query query, const BSONObj *fieldsToReturn = 0, int queryOptions = 0);
+
+ /** insert */
+ virtual void insert( const string &ns , BSONObj obj ) {
+ checkMaster().insert(ns, obj);
+ }
+
+ /** insert multiple objects. Note that single object insert is asynchronous, so this version
+ is only nominally faster and not worth a special effort to try to use. */
+ virtual void insert( const string &ns, const vector< BSONObj >& v ) {
+ checkMaster().insert(ns, v);
+ }
+
+ /** remove */
+ virtual void remove( const string &ns , Query obj , bool justOne = 0 ) {
+ checkMaster().remove(ns, obj, justOne);
+ }
+
+ /** update */
+ virtual void update( const string &ns , Query query , BSONObj obj , bool upsert = 0 , bool multi = 0 ) {
+ return checkMaster().update(ns, query, obj, upsert,multi);
+ }
+
+ string toString();
+
+ /* this is the callback from our underlying connections to notify us that we got a "not master" error.
+ */
+ void isntMaster() {
+ master = ( ( master == Left ) ? NotSetR : NotSetL );
+ }
+
+ string getServerAddress() const {
+ return left.getServerAddress() + "," + right.getServerAddress();
+ }
+
+ DBClientConnection& slaveConn();
+
+ /* TODO - not yet implemented. mongos may need these. */
+ virtual bool call( Message &toSend, Message &response, bool assertOk=true ) { assert(false); return false; }
+ virtual void say( Message &toSend ) { assert(false); }
+ virtual void sayPiggyBack( Message &toSend ) { assert(false); }
+ virtual void checkResponse( const char *data, int nReturned ) { assert(false); }
+
+ bool isFailed() const {
+ // TODO: this really should check isFailed on current master as well
+ return master > NotSetR;
+ }
+ };
+
+
+ DBClientBase * createDirectClient();
+
+} // namespace mongo
diff --git a/client/examples/authTest.cpp b/client/examples/authTest.cpp
new file mode 100644
index 0000000..77ce12d
--- /dev/null
+++ b/client/examples/authTest.cpp
@@ -0,0 +1,53 @@
+// authTest.cpp
+
+/* Copyright 2009 10gen Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <iostream>
+
+#include "client/dbclient.h"
+
+using namespace mongo;
+
+int main( int argc, const char **argv ) {
+
+ const char *port = "27017";
+ if ( argc != 1 ) {
+ if ( argc != 3 )
+ throw -12;
+ port = argv[ 2 ];
+ }
+
+ DBClientConnection conn;
+ string errmsg;
+ if ( ! conn.connect( string( "127.0.0.1:" ) + port , errmsg ) ) {
+ cout << "couldn't connect : " << errmsg << endl;
+ throw -11;
+ }
+
+ { // clean up old data from any previous tests
+ conn.remove( "test.system.users" , BSONObj() );
+ }
+
+ conn.insert( "test.system.users" , BSON( "user" << "eliot" << "pwd" << conn.createPasswordDigest( "eliot" , "bar" ) ) );
+
+ errmsg.clear();
+ bool ok = conn.auth( "test" , "eliot" , "bar" , errmsg );
+ if ( ! ok )
+ cout << errmsg << endl;
+ assert( ok );
+
+ assert( ! conn.auth( "test" , "eliot" , "bars" , errmsg ) );
+}
diff --git a/client/examples/clientTest.cpp b/client/examples/clientTest.cpp
new file mode 100644
index 0000000..bbb82f6
--- /dev/null
+++ b/client/examples/clientTest.cpp
@@ -0,0 +1,214 @@
+// clientTest.cpp
+
+/* Copyright 2009 10gen Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * a simple test for the c++ driver
+ */
+
+#include <iostream>
+
+#include "client/dbclient.h"
+
+using namespace std;
+using namespace mongo;
+
+int main( int argc, const char **argv ) {
+
+ const char *port = "27017";
+ if ( argc != 1 ) {
+ if ( argc != 3 )
+ throw -12;
+ port = argv[ 2 ];
+ }
+
+ DBClientConnection conn;
+ string errmsg;
+ if ( ! conn.connect( string( "127.0.0.1:" ) + port , errmsg ) ) {
+ cout << "couldn't connect : " << errmsg << endl;
+ throw -11;
+ }
+
+ const char * ns = "test.test1";
+
+ conn.dropCollection(ns);
+
+ // clean up old data from any previous tests
+ conn.remove( ns, BSONObj() );
+ assert( conn.findOne( ns , BSONObj() ).isEmpty() );
+
+ // test insert
+ conn.insert( ns ,BSON( "name" << "eliot" << "num" << 1 ) );
+ assert( ! conn.findOne( ns , BSONObj() ).isEmpty() );
+
+ // test remove
+ conn.remove( ns, BSONObj() );
+ assert( conn.findOne( ns , BSONObj() ).isEmpty() );
+
+
+ // insert, findOne testing
+ conn.insert( ns , BSON( "name" << "eliot" << "num" << 1 ) );
+ {
+ BSONObj res = conn.findOne( ns , BSONObj() );
+ assert( strstr( res.getStringField( "name" ) , "eliot" ) );
+ assert( ! strstr( res.getStringField( "name2" ) , "eliot" ) );
+ assert( 1 == res.getIntField( "num" ) );
+ }
+
+
+ // cursor
+ conn.insert( ns ,BSON( "name" << "sara" << "num" << 2 ) );
+ {
+ auto_ptr<DBClientCursor> cursor = conn.query( ns , BSONObj() );
+ int count = 0;
+ while ( cursor->more() ) {
+ count++;
+ BSONObj obj = cursor->next();
+ }
+ assert( count == 2 );
+ }
+
+ {
+ auto_ptr<DBClientCursor> cursor = conn.query( ns , BSON( "num" << 1 ) );
+ int count = 0;
+ while ( cursor->more() ) {
+ count++;
+ BSONObj obj = cursor->next();
+ }
+ assert( count == 1 );
+ }
+
+ {
+ auto_ptr<DBClientCursor> cursor = conn.query( ns , BSON( "num" << 3 ) );
+ int count = 0;
+ while ( cursor->more() ) {
+ count++;
+ BSONObj obj = cursor->next();
+ }
+ assert( count == 0 );
+ }
+
+ // update
+ {
+ BSONObj res = conn.findOne( ns , BSONObjBuilder().append( "name" , "eliot" ).obj() );
+ assert( ! strstr( res.getStringField( "name2" ) , "eliot" ) );
+
+ BSONObj after = BSONObjBuilder().appendElements( res ).append( "name2" , "h" ).obj();
+
+ conn.update( ns , BSONObjBuilder().append( "name" , "eliot2" ).obj() , after );
+ res = conn.findOne( ns , BSONObjBuilder().append( "name" , "eliot" ).obj() );
+ assert( ! strstr( res.getStringField( "name2" ) , "eliot" ) );
+ assert( conn.findOne( ns , BSONObjBuilder().append( "name" , "eliot2" ).obj() ).isEmpty() );
+
+ conn.update( ns , BSONObjBuilder().append( "name" , "eliot" ).obj() , after );
+ res = conn.findOne( ns , BSONObjBuilder().append( "name" , "eliot" ).obj() );
+ assert( strstr( res.getStringField( "name" ) , "eliot" ) );
+ assert( strstr( res.getStringField( "name2" ) , "h" ) );
+ assert( conn.findOne( ns , BSONObjBuilder().append( "name" , "eliot2" ).obj() ).isEmpty() );
+
+ // upsert
+ conn.update( ns , BSONObjBuilder().append( "name" , "eliot2" ).obj() , after , 1 );
+ assert( ! conn.findOne( ns , BSONObjBuilder().append( "name" , "eliot" ).obj() ).isEmpty() );
+
+ }
+
+ { // ensure index
+ assert( conn.ensureIndex( ns , BSON( "name" << 1 ) ) );
+ assert( ! conn.ensureIndex( ns , BSON( "name" << 1 ) ) );
+ }
+
+ { // hint related tests
+ assert( conn.findOne(ns, "{}")["name"].str() == "sara" );
+
+ assert( conn.findOne(ns, "{ name : 'eliot' }")["name"].str() == "eliot" );
+ assert( conn.getLastError() == "" );
+
+ // nonexistent index test
+ assert( conn.findOne(ns, Query("{name:\"eliot\"}").hint("{foo:1}")).hasElement("$err") );
+ assert( conn.getLastError() == "bad hint" );
+ conn.resetError();
+ assert( conn.getLastError() == "" );
+
+ //existing index
+ assert( conn.findOne(ns, Query("{name:'eliot'}").hint("{name:1}")).hasElement("name") );
+
+ // run validate
+ assert( conn.validate( ns ) );
+ }
+
+ { // timestamp test
+
+ const char * tsns = "test.tstest1";
+ conn.dropCollection( tsns );
+
+ {
+ mongo::BSONObjBuilder b;
+ b.appendTimestamp( "ts" );
+ conn.insert( tsns , b.obj() );
+ }
+
+ mongo::BSONObj out = conn.findOne( tsns , mongo::BSONObj() );
+ Date_t oldTime = out["ts"].timestampTime();
+ unsigned int oldInc = out["ts"].timestampInc();
+
+ {
+ mongo::BSONObjBuilder b1;
+ b1.append( out["_id"] );
+
+ mongo::BSONObjBuilder b2;
+ b2.append( out["_id"] );
+ b2.appendTimestamp( "ts" );
+
+ conn.update( tsns , b1.obj() , b2.obj() );
+ }
+
+ BSONObj found = conn.findOne( tsns , mongo::BSONObj() );
+ assert( ( oldTime < found["ts"].timestampTime() ) ||
+ ( oldInc + 1 == found["ts"].timestampInc() ) );
+
+ }
+
+ { // check that killcursors doesn't affect last error
+ assert( conn.getLastError().empty() );
+
+ BufBuilder b;
+ b.append( (int)0 ); // reserved
+ b.append( (int)-1 ); // invalid # of cursors triggers exception
+ b.append( (int)-1 ); // bogus cursor id
+
+ Message m;
+ m.setData( dbKillCursors, b.buf(), b.len() );
+
+ // say() is protected in DBClientConnection, so get superclass
+ static_cast< DBConnector* >( &conn )->say( m );
+
+ assert( conn.getLastError().empty() );
+ }
+
+ {
+ list<string> l = conn.getDatabaseNames();
+ for ( list<string>::iterator i = l.begin(); i != l.end(); i++ ){
+ cout << "db name : " << *i << endl;
+ }
+
+ l = conn.getCollectionNames( "test" );
+ for ( list<string>::iterator i = l.begin(); i != l.end(); i++ ){
+ cout << "coll name : " << *i << endl;
+ }
+ }
+
+ cout << "client test finished!" << endl;
+}
diff --git a/client/examples/first.cpp b/client/examples/first.cpp
new file mode 100644
index 0000000..f3b654f
--- /dev/null
+++ b/client/examples/first.cpp
@@ -0,0 +1,85 @@
+// first.cpp
+
+/* Copyright 2009 10gen Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * this is a good first example of how to use mongo from c++
+ */
+
+#include <iostream>
+
+#include "client/dbclient.h"
+
+using namespace std;
+
+void insert( mongo::DBClientConnection & conn , const char * name , int num ) {
+ mongo::BSONObjBuilder obj;
+ obj.append( "name" , name );
+ obj.append( "num" , num );
+ conn.insert( "test.people" , obj.obj() );
+}
+
+int main( int argc, const char **argv ) {
+
+ const char *port = "27017";
+ if ( argc != 1 ) {
+ if ( argc != 3 )
+ throw -12;
+ port = argv[ 2 ];
+ }
+
+ mongo::DBClientConnection conn;
+ string errmsg;
+ if ( ! conn.connect( string( "127.0.0.1:" ) + port , errmsg ) ) {
+ cout << "couldn't connect : " << errmsg << endl;
+ throw -11;
+ }
+
+ { // clean up old data from any previous tests
+ mongo::BSONObjBuilder query;
+ conn.remove( "test.people" , query.obj() );
+ }
+
+ insert( conn , "eliot" , 15 );
+ insert( conn , "sara" , 23 );
+
+ {
+ mongo::BSONObjBuilder query;
+ auto_ptr<mongo::DBClientCursor> cursor = conn.query( "test.people" , query.obj() );
+ cout << "using cursor" << endl;
+ while ( cursor->more() ) {
+ mongo::BSONObj obj = cursor->next();
+ cout << "\t" << obj.jsonString() << endl;
+ }
+
+ }
+
+ {
+ mongo::BSONObjBuilder query;
+ query.append( "name" , "eliot" );
+ mongo::BSONObj res = conn.findOne( "test.people" , query.obj() );
+ cout << res.isEmpty() << "\t" << res.jsonString() << endl;
+ }
+
+ {
+ mongo::BSONObjBuilder query;
+ query.append( "name" , "asd" );
+ mongo::BSONObj res = conn.findOne( "test.people" , query.obj() );
+ cout << res.isEmpty() << "\t" << res.jsonString() << endl;
+ }
+
+
+}
diff --git a/client/examples/second.cpp b/client/examples/second.cpp
new file mode 100644
index 0000000..68eafaa
--- /dev/null
+++ b/client/examples/second.cpp
@@ -0,0 +1,56 @@
+// second.cpp
+
+/* Copyright 2009 10gen Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <iostream>
+
+#include "client/dbclient.h"
+
+using namespace std;
+using namespace mongo;
+
+int main( int argc, const char **argv ) {
+
+ const char *port = "27017";
+ if ( argc != 1 ) {
+ if ( argc != 3 )
+ throw -12;
+ port = argv[ 2 ];
+ }
+
+ DBClientConnection conn;
+ string errmsg;
+ if ( ! conn.connect( string( "127.0.0.1:" ) + port , errmsg ) ) {
+ cout << "couldn't connect : " << errmsg << endl;
+ throw -11;
+ }
+
+ const char * ns = "test.second";
+
+ conn.remove( ns , BSONObj() );
+
+ conn.insert( ns , BSON( "name" << "eliot" << "num" << 17 ) );
+ conn.insert( ns , BSON( "name" << "sara" << "num" << 24 ) );
+
+ auto_ptr<DBClientCursor> cursor = conn.query( ns , BSONObj() );
+ cout << "using cursor" << endl;
+ while ( cursor->more() ) {
+ BSONObj obj = cursor->next();
+ cout << "\t" << obj.jsonString() << endl;
+ }
+
+ conn.ensureIndex( ns , BSON( "name" << 1 << "num" << -1 ) );
+}
diff --git a/client/examples/tail.cpp b/client/examples/tail.cpp
new file mode 100644
index 0000000..e844b32
--- /dev/null
+++ b/client/examples/tail.cpp
@@ -0,0 +1,55 @@
+// tail.cpp
+
+/* Copyright 2009 10gen Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/* example of using a tailable cursor */
+
+#include "../../client/dbclient.h"
+#include "../../util/goodies.h"
+
+using namespace mongo;
+
+void foo() { }
+
+/* "tail" the specified namespace, outputting elements as they are added.
+ _id values must be inserted in increasing order for this to work. (Some other
+ field could also be used.)
+
+ Note: one could use a capped collection and $natural order to do something
+ similar, using sort({$natural:1}), and then not need to worry about
+ _id's being in order.
+*/
+void tail(DBClientBase& conn, const char *ns) {
+ conn.ensureIndex(ns, fromjson("{_id:1}"));
+ BSONElement lastId;
+ Query query = Query().sort("_id");
+ while( 1 ) {
+ auto_ptr<DBClientCursor> c = conn.query(ns, query, 0, 0, 0, Option_CursorTailable);
+ while( 1 ) {
+ if( !c->more() ) {
+ if( c->isDead() ) {
+ // we need to requery
+ break;
+ }
+ sleepsecs(1);
+ }
+ BSONObj o = c->next();
+ lastId = o["_id"];
+ cout << o.toString() << endl;
+ }
+ query = QUERY( "_id" << GT << lastId ).sort("_id");
+ }
+}
diff --git a/client/examples/tutorial.cpp b/client/examples/tutorial.cpp
new file mode 100644
index 0000000..28e1b27
--- /dev/null
+++ b/client/examples/tutorial.cpp
@@ -0,0 +1,67 @@
+//tutorial.cpp
+
+/* Copyright 2009 10gen Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <iostream>
+#include "../../client/dbclient.h"
+
+// g++ tutorial.cpp -lmongoclient -lboost_thread -lboost_filesystem -o tutorial
+
+using namespace mongo;
+
+void printIfAge(DBClientConnection& c, int age) {
+ auto_ptr<DBClientCursor> cursor = c.query("tutorial.persons", QUERY( "age" << age ).sort("name") );
+ while( cursor->more() ) {
+ BSONObj p = cursor->next();
+ cout << p.getStringField("name") << endl;
+ }
+}
+
+void run() {
+ DBClientConnection c;
+ c.connect("localhost"); //"192.168.58.1");
+ cout << "connected ok" << endl;
+ BSONObj p = BSON( "name" << "Joe" << "age" << 33 );
+ c.insert("tutorial.persons", p);
+ p = BSON( "name" << "Jane" << "age" << 40 );
+ c.insert("tutorial.persons", p);
+ p = BSON( "name" << "Abe" << "age" << 33 );
+ c.insert("tutorial.persons", p);
+ p = BSON( "name" << "Samantha" << "age" << 21 << "city" << "Los Angeles" << "state" << "CA" );
+ c.insert("tutorial.persons", p);
+
+ c.ensureIndex("tutorial.persons", fromjson("{age:1}"));
+
+ cout << "count:" << c.count("tutorial.persons") << endl;
+
+ auto_ptr<DBClientCursor> cursor = c.query("tutorial.persons", BSONObj());
+ while( cursor->more() ) {
+ cout << cursor->next().toString() << endl;
+ }
+
+ cout << "\nprintifage:\n";
+ printIfAge(c, 33);
+}
+
+int main() {
+ try {
+ run();
+ }
+ catch( DBException &e ) {
+ cout << "caught " << e.what() << endl;
+ }
+ return 0;
+}
diff --git a/client/examples/whereExample.cpp b/client/examples/whereExample.cpp
new file mode 100644
index 0000000..a26d921
--- /dev/null
+++ b/client/examples/whereExample.cpp
@@ -0,0 +1,68 @@
+// whereExample.cpp
+
+/* Copyright 2009 10gen Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <iostream>
+
+#include "client/dbclient.h"
+
+using namespace std;
+using namespace mongo;
+
+int main( int argc, const char **argv ) {
+
+ const char *port = "27017";
+ if ( argc != 1 ) {
+ if ( argc != 3 )
+ throw -12;
+ port = argv[ 2 ];
+ }
+
+ DBClientConnection conn;
+ string errmsg;
+ if ( ! conn.connect( string( "127.0.0.1:" ) + port , errmsg ) ) {
+ cout << "couldn't connect : " << errmsg << endl;
+ throw -11;
+ }
+
+ const char * ns = "test.where";
+
+ conn.remove( ns , BSONObj() );
+
+ conn.insert( ns , BSON( "name" << "eliot" << "num" << 17 ) );
+ conn.insert( ns , BSON( "name" << "sara" << "num" << 24 ) );
+
+ auto_ptr<DBClientCursor> cursor = conn.query( ns , BSONObj() );
+
+ while ( cursor->more() ) {
+ BSONObj obj = cursor->next();
+ cout << "\t" << obj.jsonString() << endl;
+ }
+
+ cout << "now using $where" << endl;
+
+ Query q = Query("{}").where("this.name == name" , BSON( "name" << "sara" ));
+
+ cursor = conn.query( ns , q );
+
+ int num = 0;
+ while ( cursor->more() ) {
+ BSONObj obj = cursor->next();
+ cout << "\t" << obj.jsonString() << endl;
+ num++;
+ }
+ assert( num == 1 );
+}
diff --git a/client/gridfs.cpp b/client/gridfs.cpp
new file mode 100644
index 0000000..892ec6e
--- /dev/null
+++ b/client/gridfs.cpp
@@ -0,0 +1,233 @@
+// gridfs.cpp
+
+/* Copyright 2009 10gen
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "../stdafx.h"
+#include <fcntl.h>
+#include <utility>
+
+#include "gridfs.h"
+#include <boost/smart_ptr.hpp>
+
+#if defined(_WIN32)
+#include <io.h>
+#endif
+
+#ifndef MIN
+#define MIN(a,b) ( (a) < (b) ? (a) : (b) )
+#endif
+
+namespace mongo {
+
+ const unsigned DEFAULT_CHUNK_SIZE = 256 * 1024;
+
+ Chunk::Chunk( BSONObj o ){
+ _data = o;
+ }
+
+ Chunk::Chunk( BSONObj fileObject , int chunkNumber , const char * data , int len ){
+ BSONObjBuilder b;
+ b.appendAs( fileObject["_id"] , "files_id" );
+ b.append( "n" , chunkNumber );
+ b.appendBinDataArray( "data" , data , len );
+ _data = b.obj();
+ }
+
+
+ GridFS::GridFS( DBClientBase& client , const string& dbName , const string& prefix ) : _client( client ) , _dbName( dbName ) , _prefix( prefix ){
+ _filesNS = dbName + "." + prefix + ".files";
+ _chunksNS = dbName + "." + prefix + ".chunks";
+
+
+ client.ensureIndex( _filesNS , BSON( "filename" << 1 ) );
+ client.ensureIndex( _chunksNS , BSON( "files_id" << 1 << "n" << 1 ) );
+ }
+
+ GridFS::~GridFS(){
+
+ }
+
+ BSONObj GridFS::storeFile( const char* data , size_t length , const string& remoteName , const string& contentType){
+ massert( 10279 , "large files not yet implemented", length <= 0xffffffff);
+ char const * const end = data + length;
+
+ OID id;
+ id.init();
+ BSONObj idObj = BSON("_id" << id);
+
+ int chunkNumber = 0;
+ while (data < end){
+ int chunkLen = MIN(DEFAULT_CHUNK_SIZE, (unsigned)(end-data));
+ Chunk c(idObj, chunkNumber, data, chunkLen);
+ _client.insert( _chunksNS.c_str() , c._data );
+
+ chunkNumber++;
+ data += chunkLen;
+ }
+
+ return insertFile(remoteName, id, length, contentType);
+ }
+
+
+ BSONObj GridFS::storeFile( const string& fileName , const string& remoteName , const string& contentType){
+ uassert( 10012 , "file doesn't exist" , fileName == "-" || boost::filesystem::exists( fileName ) );
+
+ FILE* fd;
+ if (fileName == "-")
+ fd = stdin;
+ else
+ fd = fopen( fileName.c_str() , "rb" );
+ uassert( 10013 , "error opening file", fd);
+
+ OID id;
+ id.init();
+ BSONObj idObj = BSON("_id" << id);
+
+ int chunkNumber = 0;
+ gridfs_offset length = 0;
+ while (!feof(fd)){
+ boost::scoped_array<char>buf (new char[DEFAULT_CHUNK_SIZE]);
+ char* bufPos = buf.get();
+ unsigned int chunkLen = 0; // how much in the chunk now
+ while(chunkLen != DEFAULT_CHUNK_SIZE && !feof(fd)){
+ int readLen = fread(bufPos, 1, DEFAULT_CHUNK_SIZE - chunkLen, fd);
+ chunkLen += readLen;
+ bufPos += readLen;
+
+ assert(chunkLen <= DEFAULT_CHUNK_SIZE);
+ }
+
+ Chunk c(idObj, chunkNumber, buf.get(), chunkLen);
+ _client.insert( _chunksNS.c_str() , c._data );
+
+ length += chunkLen;
+ chunkNumber++;
+ }
+
+ if (fd != stdin)
+ fclose( fd );
+
+ massert( 10280 , "large files not yet implemented", length <= 0xffffffff);
+
+ return insertFile((remoteName.empty() ? fileName : remoteName), id, length, contentType);
+ }
+
+ BSONObj GridFS::insertFile(const string& name, const OID& id, unsigned length, const string& contentType){
+
+ BSONObj res;
+ if ( ! _client.runCommand( _dbName.c_str() , BSON( "filemd5" << id << "root" << _prefix ) , res ) )
+ throw UserException( 9008 , "filemd5 failed" );
+
+ BSONObjBuilder file;
+ file << "_id" << id
+ << "filename" << name
+ << "length" << (unsigned) length
+ << "chunkSize" << DEFAULT_CHUNK_SIZE
+ << "uploadDate" << DATENOW
+ << "md5" << res["md5"]
+ ;
+
+ if (!contentType.empty())
+ file << "contentType" << contentType;
+
+ BSONObj ret = file.obj();
+ _client.insert(_filesNS.c_str(), ret);
+
+ return ret;
+ }
+
+ void GridFS::removeFile( const string& fileName ){
+ auto_ptr<DBClientCursor> files = _client.query( _filesNS , BSON( "filename" << fileName ) );
+ while (files->more()){
+ BSONObj file = files->next();
+ BSONElement id = file["_id"];
+ _client.remove( _filesNS.c_str() , BSON( "_id" << id ) );
+ _client.remove( _chunksNS.c_str() , BSON( "files_id" << id ) );
+ }
+ }
+
+ GridFile::GridFile( GridFS * grid , BSONObj obj ){
+ _grid = grid;
+ _obj = obj;
+ }
+
+ GridFile GridFS::findFile( const string& fileName ){
+ return findFile( BSON( "filename" << fileName ) );
+ };
+
+ GridFile GridFS::findFile( BSONObj query ){
+ query = BSON("query" << query << "orderby" << BSON("uploadDate" << -1));
+ return GridFile( this , _client.findOne( _filesNS.c_str() , query ) );
+ }
+
+ auto_ptr<DBClientCursor> GridFS::list(){
+ return _client.query( _filesNS.c_str() , BSONObj() );
+ }
+
+ auto_ptr<DBClientCursor> GridFS::list( BSONObj o ){
+ return _client.query( _filesNS.c_str() , o );
+ }
+
+ BSONObj GridFile::getMetadata(){
+ BSONElement meta_element = _obj["metadata"];
+ if( meta_element.eoo() ){
+ return BSONObj();
+ }
+
+ return meta_element.embeddedObject();
+ }
+
+ Chunk GridFile::getChunk( int n ){
+ _exists();
+ BSONObjBuilder b;
+ b.appendAs( _obj["_id"] , "files_id" );
+ b.append( "n" , n );
+
+ BSONObj o = _grid->_client.findOne( _grid->_chunksNS.c_str() , b.obj() );
+ uassert( 10014 , "chunk is empty!" , ! o.isEmpty() );
+ return Chunk(o);
+ }
+
+ gridfs_offset GridFile::write( ostream & out ){
+ _exists();
+
+ const int num = getNumChunks();
+
+ for ( int i=0; i<num; i++ ){
+ Chunk c = getChunk( i );
+
+ int len;
+ const char * data = c.data( len );
+ out.write( data , len );
+ }
+
+ return getContentLength();
+ }
+
+ gridfs_offset GridFile::write( const string& where ){
+ if (where == "-"){
+ return write( cout );
+ } else {
+ ofstream out(where.c_str() , ios::out | ios::binary );
+ return write( out );
+ }
+ }
+
+ void GridFile::_exists(){
+ uassert( 10015 , "doesn't exists" , exists() );
+ }
+
+}
diff --git a/client/gridfs.h b/client/gridfs.h
new file mode 100644
index 0000000..3165d5f
--- /dev/null
+++ b/client/gridfs.h
@@ -0,0 +1,203 @@
+/** @file gridfs.h */
+
+/* Copyright 2009 10gen Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "dbclient.h"
+
+namespace mongo {
+
+ typedef unsigned long long gridfs_offset;
+
+ class GridFS;
+ class GridFile;
+
+ class Chunk {
+ public:
+ Chunk( BSONObj data );
+ Chunk( BSONObj fileId , int chunkNumber , const char * data , int len );
+
+ int len(){
+ int len;
+ const char * data = _data["data"].binData( len );
+ int * foo = (int*)data;
+ assert( len - 4 == foo[0] );
+ return len - 4;
+ }
+
+ const char * data( int & len ){
+ const char * data = _data["data"].binData( len );
+ int * foo = (int*)data;
+ assert( len - 4 == foo[0] );
+
+ len = len - 4;
+ return data + 4;
+ }
+
+ private:
+ BSONObj _data;
+ friend class GridFS;
+ };
+
+
+ /**
+ this is the main entry point into the mongo grid fs
+ */
+ class GridFS{
+ public:
+ /**
+ * @param client - db connection
+ * @param dbName - root database name
+ * @param prefix - if you want your data somewhere besides <dbname>.fs
+ */
+ GridFS( DBClientBase& client , const string& dbName , const string& prefix="fs" );
+ ~GridFS();
+
+ /**
+ * puts the file reference by fileName into the db
+ * @param fileName local filename relative to process
+ * @param remoteName optional filename to use for file stored in GridFS
+ * (default is to use fileName parameter)
+ * @param contentType optional MIME type for this object.
+ * (default is to omit)
+ * @return the file object
+ */
+ BSONObj storeFile( const string& fileName , const string& remoteName="" , const string& contentType="");
+
+ /**
+ * puts the file represented by data into the db
+ * @param data pointer to buffer to store in GridFS
+ * @param length length of buffer
+ * @param remoteName optional filename to use for file stored in GridFS
+ * (default is to use fileName parameter)
+ * @param contentType optional MIME type for this object.
+ * (default is to omit)
+ * @return the file object
+ */
+ BSONObj storeFile( const char* data , size_t length , const string& remoteName , const string& contentType="");
+ /**
+ * removes file referenced by fileName from the db
+ * @param fileName filename (in GridFS) of the file to remove
+ * @return the file object
+ */
+ void removeFile( const string& fileName );
+
+ /**
+ * returns a file object matching the query
+ */
+ GridFile findFile( BSONObj query );
+
+ /**
+ * equiv to findFile( { filename : filename } )
+ */
+ GridFile findFile( const string& fileName );
+
+ /**
+ * convenience method to get all the files
+ */
+ auto_ptr<DBClientCursor> list();
+
+ /**
+ * convenience method to get all the files with a filter
+ */
+ auto_ptr<DBClientCursor> list( BSONObj query );
+
+ private:
+ DBClientBase& _client;
+ string _dbName;
+ string _prefix;
+ string _filesNS;
+ string _chunksNS;
+
+ // insert fileobject. All chunks must be in DB.
+ BSONObj insertFile(const string& name, const OID& id, unsigned length, const string& contentType);
+
+ friend class GridFile;
+ };
+
+ /**
+ wrapper for a file stored in the Mongo database
+ */
+ class GridFile {
+ public:
+ /**
+ * @return whether or not this file exists
+ * findFile will always return a GriFile, so need to check this
+ */
+ bool exists(){
+ return ! _obj.isEmpty();
+ }
+
+ string getFilename(){
+ return _obj["filename"].str();
+ }
+
+ int getChunkSize(){
+ return (int)(_obj["chunkSize"].number());
+ }
+
+ gridfs_offset getContentLength(){
+ return (gridfs_offset)(_obj["length"].number());
+ }
+
+ string getContentType(){
+ return _obj["contentType"].valuestr();
+ }
+
+ Date_t getUploadDate(){
+ return _obj["uploadDate"].date();
+ }
+
+ string getMD5(){
+ return _obj["md5"].str();
+ }
+
+ BSONElement getFileField( const string& name ){
+ return _obj[name];
+ }
+
+ BSONObj getMetadata();
+
+ int getNumChunks(){
+ return (int) ceil( (double)getContentLength() / (double)getChunkSize() );
+ }
+
+ Chunk getChunk( int n );
+
+ /**
+ write the file to the output stream
+ */
+ gridfs_offset write( ostream & out );
+
+ /**
+ write the file to this filename
+ */
+ gridfs_offset write( const string& where );
+
+ private:
+ GridFile( GridFS * grid , BSONObj obj );
+
+ void _exists();
+
+ GridFS * _grid;
+ BSONObj _obj;
+
+ friend class GridFS;
+ };
+}
+
+
diff --git a/client/model.cpp b/client/model.cpp
new file mode 100644
index 0000000..3978105
--- /dev/null
+++ b/client/model.cpp
@@ -0,0 +1,97 @@
+// model.cpp
+
+/* Copyright 2009 10gen
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "stdafx.h"
+#include "model.h"
+#include "connpool.h"
+
+namespace mongo {
+
+ bool Model::load(BSONObj& query){
+ ScopedDbConnection conn( modelServer() );
+
+ BSONObj b = conn->findOne(getNS(), query);
+ conn.done();
+
+ if ( b.isEmpty() )
+ return false;
+
+ unserialize(b);
+ _id = b["_id"].wrap().getOwned();
+ return true;
+ }
+
+ void Model::remove( bool safe ){
+ uassert( 10016 , "_id isn't set - needed for remove()" , _id["_id"].type() );
+
+ ScopedDbConnection conn( modelServer() );
+ conn->remove( getNS() , _id );
+
+ string errmsg = "";
+ if ( safe )
+ errmsg = conn->getLastError();
+
+ conn.done();
+
+ if ( safe && errmsg.size() )
+ throw UserException( 9002 , (string)"error on Model::remove: " + errmsg );
+ }
+
+ void Model::save( bool safe ){
+ ScopedDbConnection conn( modelServer() );
+
+ BSONObjBuilder b;
+ serialize( b );
+
+ if ( _id.isEmpty() ){
+ OID oid;
+ oid.init();
+ b.appendOID( "_id" , &oid );
+
+ BSONObj o = b.obj();
+ conn->insert( getNS() , o );
+ _id = o["_id"].wrap().getOwned();
+
+ log(4) << "inserted new model " << getNS() << " " << o << endl;
+ }
+ else {
+ BSONElement id = _id["_id"];
+ b.append( id );
+
+ BSONObjBuilder qb;
+ qb.append( id );
+
+ BSONObj q = qb.obj();
+ BSONObj o = b.obj();
+
+ log(4) << "updated old model" << getNS() << " " << q << " " << o << endl;
+
+ conn->update( getNS() , q , o );
+
+ }
+
+ string errmsg = "";
+ if ( safe )
+ errmsg = conn->getLastError();
+
+ conn.done();
+
+ if ( safe && errmsg.size() )
+ throw UserException( 9003 , (string)"error on Model::save: " + errmsg );
+ }
+
+} // namespace mongo
diff --git a/client/model.h b/client/model.h
new file mode 100644
index 0000000..f3a63ad
--- /dev/null
+++ b/client/model.h
@@ -0,0 +1,57 @@
+/** @file model.h */
+
+/* Copyright 2009 10gen
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "dbclient.h"
+
+namespace mongo {
+
+ /** Model is a base class for defining objects which are serializable to the Mongo
+ database via the database driver.
+
+ Definition
+ Your serializable class should inherit from Model and implement the abstract methods
+ below.
+
+ Loading
+ To load, first construct an (empty) object. Then call load(). Do not load an object
+ more than once.
+ */
+ class Model {
+ public:
+ Model() { }
+ virtual ~Model() { }
+
+ virtual const char * getNS() = 0;
+ virtual void serialize(BSONObjBuilder& to) = 0;
+ virtual void unserialize(const BSONObj& from) = 0;
+
+ virtual string modelServer() = 0;
+
+ /** Load a single object.
+ @return true if successful.
+ */
+ virtual bool load(BSONObj& query);
+ virtual void save( bool safe=false );
+ virtual void remove( bool safe=false );
+
+ protected:
+ BSONObj _id;
+ };
+
+} // namespace mongo
diff --git a/client/parallel.cpp b/client/parallel.cpp
new file mode 100644
index 0000000..449f436
--- /dev/null
+++ b/client/parallel.cpp
@@ -0,0 +1,259 @@
+// parallel.cpp
+
+#include "stdafx.h"
+#include "parallel.h"
+#include "connpool.h"
+#include "../db/queryutil.h"
+#include "../db/dbmessage.h"
+#include "../s/util.h"
+
+namespace mongo {
+
+ // -------- ClusteredCursor -----------
+
+ ClusteredCursor::ClusteredCursor( QueryMessage& q ){
+ _ns = q.ns;
+ _query = q.query.copy();
+ _options = q.queryOptions;
+ if ( q.fields.get() )
+ _fields = q.fields->getSpec();
+ _done = false;
+ }
+
+ ClusteredCursor::ClusteredCursor( const string& ns , const BSONObj& q , int options , const BSONObj& fields ){
+ _ns = ns;
+ _query = q.getOwned();
+ _options = options;
+ _fields = fields.getOwned();
+ _done = false;
+ }
+
+ ClusteredCursor::~ClusteredCursor(){
+ _done = true; // just in case
+ }
+
+ auto_ptr<DBClientCursor> ClusteredCursor::query( const string& server , int num , BSONObj extra ){
+ uassert( 10017 , "cursor already done" , ! _done );
+
+ BSONObj q = _query;
+ if ( ! extra.isEmpty() ){
+ q = concatQuery( q , extra );
+ }
+
+ ScopedDbConnection conn( server );
+ checkShardVersion( conn.conn() , _ns );
+
+ log(5) << "ClusteredCursor::query server:" << server << " ns:" << _ns << " query:" << q << " num:" << num << " _fields:" << _fields << " options: " << _options << endl;
+ auto_ptr<DBClientCursor> cursor = conn->query( _ns.c_str() , q , num , 0 , ( _fields.isEmpty() ? 0 : &_fields ) , _options );
+ if ( cursor->hasResultFlag( QueryResult::ResultFlag_ShardConfigStale ) )
+ throw StaleConfigException( _ns , "ClusteredCursor::query" );
+
+ conn.done();
+ return cursor;
+ }
+
+ BSONObj ClusteredCursor::concatQuery( const BSONObj& query , const BSONObj& extraFilter ){
+ if ( ! query.hasField( "query" ) )
+ return _concatFilter( query , extraFilter );
+
+ BSONObjBuilder b;
+ BSONObjIterator i( query );
+ while ( i.more() ){
+ BSONElement e = i.next();
+
+ if ( strcmp( e.fieldName() , "query" ) ){
+ b.append( e );
+ continue;
+ }
+
+ b.append( "query" , _concatFilter( e.embeddedObjectUserCheck() , extraFilter ) );
+ }
+ return b.obj();
+ }
+
+ BSONObj ClusteredCursor::_concatFilter( const BSONObj& filter , const BSONObj& extra ){
+ BSONObjBuilder b;
+ b.appendElements( filter );
+ b.appendElements( extra );
+ return b.obj();
+ // TODO: should do some simplification here if possibl ideally
+ }
+
+
+ // -------- SerialServerClusteredCursor -----------
+
+ SerialServerClusteredCursor::SerialServerClusteredCursor( set<ServerAndQuery> servers , QueryMessage& q , int sortOrder) : ClusteredCursor( q ){
+ for ( set<ServerAndQuery>::iterator i = servers.begin(); i!=servers.end(); i++ )
+ _servers.push_back( *i );
+
+ if ( sortOrder > 0 )
+ sort( _servers.begin() , _servers.end() );
+ else if ( sortOrder < 0 )
+ sort( _servers.rbegin() , _servers.rend() );
+
+ _serverIndex = 0;
+ }
+
+ bool SerialServerClusteredCursor::more(){
+ if ( _current.get() && _current->more() )
+ return true;
+
+ if ( _serverIndex >= _servers.size() ){
+ return false;
+ }
+
+ ServerAndQuery& sq = _servers[_serverIndex++];
+
+ _current = query( sq._server , 0 , sq._extra );
+ if ( _current->more() )
+ return true;
+
+ // this sq has nothing, so keep looking
+ return more();
+ }
+
+ BSONObj SerialServerClusteredCursor::next(){
+ uassert( 10018 , "no more items" , more() );
+ return _current->next();
+ }
+
+ // -------- ParallelSortClusteredCursor -----------
+
+ ParallelSortClusteredCursor::ParallelSortClusteredCursor( set<ServerAndQuery> servers , QueryMessage& q ,
+ const BSONObj& sortKey )
+ : ClusteredCursor( q ) , _servers( servers ){
+ _sortKey = sortKey.getOwned();
+ _init();
+ }
+
+ ParallelSortClusteredCursor::ParallelSortClusteredCursor( set<ServerAndQuery> servers , const string& ns ,
+ const Query& q ,
+ int options , const BSONObj& fields )
+ : ClusteredCursor( ns , q.obj , options , fields ) , _servers( servers ){
+ _sortKey = q.getSort().copy();
+ _init();
+ }
+
+ void ParallelSortClusteredCursor::_init(){
+ _numServers = _servers.size();
+ _cursors = new auto_ptr<DBClientCursor>[_numServers];
+ _nexts = new BSONObj[_numServers];
+
+ // TODO: parellize
+ int num = 0;
+ for ( set<ServerAndQuery>::iterator i = _servers.begin(); i!=_servers.end(); i++ ){
+ const ServerAndQuery& sq = *i;
+ _cursors[num++] = query( sq._server , 0 , sq._extra );
+ }
+
+ }
+
+ ParallelSortClusteredCursor::~ParallelSortClusteredCursor(){
+ delete [] _cursors;
+ delete [] _nexts;
+ }
+
+ bool ParallelSortClusteredCursor::more(){
+ for ( int i=0; i<_numServers; i++ ){
+ if ( ! _nexts[i].isEmpty() )
+ return true;
+
+ if ( _cursors[i].get() && _cursors[i]->more() )
+ return true;
+ }
+ return false;
+ }
+
+ BSONObj ParallelSortClusteredCursor::next(){
+ advance();
+
+ BSONObj best = BSONObj();
+ int bestFrom = -1;
+
+ for ( int i=0; i<_numServers; i++){
+ if ( _nexts[i].isEmpty() )
+ continue;
+
+ if ( best.isEmpty() ){
+ best = _nexts[i];
+ bestFrom = i;
+ continue;
+ }
+
+ int comp = best.woSortOrder( _nexts[i] , _sortKey );
+ if ( comp < 0 )
+ continue;
+
+ best = _nexts[i];
+ bestFrom = i;
+ }
+
+ uassert( 10019 , "no more elements" , ! best.isEmpty() );
+ _nexts[bestFrom] = BSONObj();
+
+ return best;
+ }
+
+ void ParallelSortClusteredCursor::advance(){
+ for ( int i=0; i<_numServers; i++ ){
+
+ if ( ! _nexts[i].isEmpty() ){
+ // already have a good object there
+ continue;
+ }
+
+ if ( ! _cursors[i]->more() ){
+ // cursor is dead, oh well
+ continue;
+ }
+
+ _nexts[i] = _cursors[i]->next();
+ }
+
+ }
+
+ // -----------------
+ // ---- Future -----
+ // -----------------
+
+ Future::CommandResult::CommandResult( const string& server , const string& db , const BSONObj& cmd ){
+ _server = server;
+ _db = db;
+ _cmd = cmd;
+ _done = false;
+ }
+
+ bool Future::CommandResult::join(){
+ while ( ! _done )
+ sleepmicros( 50 );
+ return _ok;
+ }
+
+ void Future::commandThread(){
+ assert( _grab );
+ shared_ptr<CommandResult> res = *_grab;
+ _grab = 0;
+
+ ScopedDbConnection conn( res->_server );
+ res->_ok = conn->runCommand( res->_db , res->_cmd , res->_res );
+ res->_done = true;
+ }
+
+ shared_ptr<Future::CommandResult> Future::spawnCommand( const string& server , const string& db , const BSONObj& cmd ){
+ shared_ptr<Future::CommandResult> res;
+ res.reset( new Future::CommandResult( server , db , cmd ) );
+
+ _grab = &res;
+
+ boost::thread thr( Future::commandThread );
+
+ while ( _grab )
+ sleepmicros(2);
+
+ return res;
+ }
+
+ shared_ptr<Future::CommandResult> * Future::_grab;
+
+
+}
diff --git a/client/parallel.h b/client/parallel.h
new file mode 100644
index 0000000..5a22624
--- /dev/null
+++ b/client/parallel.h
@@ -0,0 +1,195 @@
+// parallel.h
+
+/* Copyright 2009 10gen Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ tools for wokring in parallel/sharded/clustered environment
+ */
+
+#include "../stdafx.h"
+#include "dbclient.h"
+#include "../db/dbmessage.h"
+
+namespace mongo {
+
+ /**
+ * this is a cursor that works over a set of servers
+ * can be used in serial/paralellel as controlled by sub classes
+ */
+ class ClusteredCursor {
+ public:
+ ClusteredCursor( QueryMessage& q );
+ ClusteredCursor( const string& ns , const BSONObj& q , int options=0 , const BSONObj& fields=BSONObj() );
+ virtual ~ClusteredCursor();
+
+ virtual bool more() = 0;
+ virtual BSONObj next() = 0;
+
+ static BSONObj concatQuery( const BSONObj& query , const BSONObj& extraFilter );
+
+ virtual string type() const = 0;
+
+ protected:
+ auto_ptr<DBClientCursor> query( const string& server , int num = 0 , BSONObj extraFilter = BSONObj() );
+
+ static BSONObj _concatFilter( const BSONObj& filter , const BSONObj& extraFilter );
+
+ string _ns;
+ BSONObj _query;
+ int _options;
+ BSONObj _fields;
+
+ bool _done;
+ };
+
+
+ /**
+ * holder for a server address and a query to run
+ */
+ class ServerAndQuery {
+ public:
+ ServerAndQuery( const string& server , BSONObj extra = BSONObj() , BSONObj orderObject = BSONObj() ) :
+ _server( server ) , _extra( extra.getOwned() ) , _orderObject( orderObject.getOwned() ){
+ }
+
+ bool operator<( const ServerAndQuery& other ) const{
+ if ( ! _orderObject.isEmpty() )
+ return _orderObject.woCompare( other._orderObject ) < 0;
+
+ if ( _server < other._server )
+ return true;
+ if ( other._server > _server )
+ return false;
+ return _extra.woCompare( other._extra ) < 0;
+ }
+
+ string toString() const {
+ StringBuilder ss;
+ ss << "server:" << _server << " _extra:" << _extra << " _orderObject:" << _orderObject;
+ return ss.str();
+ }
+
+ operator string() const {
+ return toString();
+ }
+
+ string _server;
+ BSONObj _extra;
+ BSONObj _orderObject;
+ };
+
+
+ /**
+ * runs a query in serial across any number of servers
+ * returns all results from 1 server, then the next, etc...
+ */
+ class SerialServerClusteredCursor : public ClusteredCursor {
+ public:
+ SerialServerClusteredCursor( set<ServerAndQuery> servers , QueryMessage& q , int sortOrder=0);
+ virtual bool more();
+ virtual BSONObj next();
+ virtual string type() const { return "SerialServer"; }
+ private:
+ vector<ServerAndQuery> _servers;
+ unsigned _serverIndex;
+
+ auto_ptr<DBClientCursor> _current;
+ };
+
+
+ /**
+ * runs a query in parellel across N servers
+ * sots
+ */
+ class ParallelSortClusteredCursor : public ClusteredCursor {
+ public:
+ ParallelSortClusteredCursor( set<ServerAndQuery> servers , QueryMessage& q , const BSONObj& sortKey );
+ ParallelSortClusteredCursor( set<ServerAndQuery> servers , const string& ns ,
+ const Query& q , int options=0, const BSONObj& fields=BSONObj() );
+ virtual ~ParallelSortClusteredCursor();
+ virtual bool more();
+ virtual BSONObj next();
+ virtual string type() const { return "ParallelSort"; }
+ private:
+ void _init();
+
+ void advance();
+
+ int _numServers;
+ set<ServerAndQuery> _servers;
+ BSONObj _sortKey;
+
+ auto_ptr<DBClientCursor> * _cursors;
+ BSONObj * _nexts;
+ };
+
+ /**
+ * tools for doing asynchronous operations
+ * right now uses underlying sync network ops and uses another thread
+ * should be changed to use non-blocking io
+ */
+ class Future {
+ public:
+ class CommandResult {
+ public:
+
+ string getServer() const { return _server; }
+
+ bool isDone() const { return _done; }
+
+ bool ok() const {
+ assert( _done );
+ return _ok;
+ }
+
+ BSONObj result() const {
+ assert( _done );
+ return _res;
+ }
+
+ /**
+ blocks until command is done
+ returns ok()
+ */
+ bool join();
+
+ private:
+
+ CommandResult( const string& server , const string& db , const BSONObj& cmd );
+
+ string _server;
+ string _db;
+ BSONObj _cmd;
+
+ boost::thread _thr;
+
+ BSONObj _res;
+ bool _done;
+ bool _ok;
+
+ friend class Future;
+ };
+
+ static void commandThread();
+
+ static shared_ptr<CommandResult> spawnCommand( const string& server , const string& db , const BSONObj& cmd );
+
+ private:
+ static shared_ptr<CommandResult> * _grab;
+ };
+
+
+}
diff --git a/client/syncclusterconnection.cpp b/client/syncclusterconnection.cpp
new file mode 100644
index 0000000..b942709
--- /dev/null
+++ b/client/syncclusterconnection.cpp
@@ -0,0 +1,165 @@
+// syncclusterconnection.cpp
+
+#include "stdafx.h"
+#include "syncclusterconnection.h"
+
+// error codes 8000-8009
+
+namespace mongo {
+
+ SyncCluterConnection::SyncCluterConnection( string commaSeperated ){
+ string::size_type idx;
+ while ( ( idx = commaSeperated.find( ',' ) ) != string::npos ){
+ string h = commaSeperated.substr( 0 , idx );
+ commaSeperated = commaSeperated.substr( idx + 1 );
+ _connect( h );
+ }
+ _connect( commaSeperated );
+ uassert( 8004 , "SyncCluterConnection needs 3 servers" , _conns.size() == 3 );
+ }
+
+ SyncCluterConnection::SyncCluterConnection( string a , string b , string c ){
+ // connect to all even if not working
+ _connect( a );
+ _connect( b );
+ _connect( c );
+ }
+
+ SyncCluterConnection::~SyncCluterConnection(){
+ for ( size_t i=0; i<_conns.size(); i++ )
+ delete _conns[i];
+ _conns.clear();
+ }
+
+ bool SyncCluterConnection::prepare( string& errmsg ){
+ return fsync( errmsg );
+ }
+
+ bool SyncCluterConnection::fsync( string& errmsg ){
+ bool ok = true;
+ errmsg = "";
+ for ( size_t i=0; i<_conns.size(); i++ ){
+ BSONObj res;
+ try {
+ if ( _conns[i]->simpleCommand( "admin" , 0 , "fsync" ) )
+ continue;
+ }
+ catch ( std::exception& e ){
+ errmsg += e.what();
+ }
+ catch ( ... ){
+ }
+ ok = false;
+ errmsg += _conns[i]->toString() + ":" + res.toString();
+ }
+ return ok;
+ }
+
+ void SyncCluterConnection::_checkLast(){
+ vector<BSONObj> all;
+ vector<string> errors;
+
+ for ( size_t i=0; i<_conns.size(); i++ ){
+ BSONObj res;
+ string err;
+ try {
+ if ( ! _conns[i]->runCommand( "admin" , BSON( "getlasterror" << 1 << "fsync" << 1 ) , res ) )
+ err = "cmd failed: ";
+ }
+ catch ( std::exception& e ){
+ err += e.what();
+ }
+ catch ( ... ){
+ err += "unknown failure";
+ }
+ all.push_back( res );
+ errors.push_back( err );
+ }
+
+ assert( all.size() == errors.size() && all.size() == _conns.size() );
+
+ stringstream err;
+ bool ok = true;
+
+ for ( size_t i = 0; i<_conns.size(); i++ ){
+ BSONObj res = all[i];
+ if ( res["ok"].trueValue() && res["fsyncFiles"].numberInt() > 0 )
+ continue;
+ ok = false;
+ err << _conns[i]->toString() << ": " << res << " " << errors[i];
+ }
+
+ if ( ok )
+ return;
+ throw UserException( 8001 , (string)"SyncCluterConnection write op failed: " + err.str() );
+ }
+
+ void SyncCluterConnection::_connect( string host ){
+ log() << "SyncCluterConnection connecting to: " << host << endl;
+ DBClientConnection * c = new DBClientConnection( true );
+ string errmsg;
+ if ( ! c->connect( host , errmsg ) )
+ log() << "SyncCluterConnection connect fail to: " << host << " errmsg: " << errmsg << endl;
+ _conns.push_back( c );
+ }
+
+ auto_ptr<DBClientCursor> SyncCluterConnection::query(const string &ns, Query query, int nToReturn, int nToSkip,
+ const BSONObj *fieldsToReturn, int queryOptions){
+
+ uassert( 10021 , "$cmd not support yet in SyncCluterConnection::query" , ns.find( "$cmd" ) == string::npos );
+
+ for ( size_t i=0; i<_conns.size(); i++ ){
+ try {
+ auto_ptr<DBClientCursor> cursor =
+ _conns[i]->query( ns , query , nToReturn , nToSkip , fieldsToReturn , queryOptions );
+ if ( cursor.get() )
+ return cursor;
+ log() << "query failed to: " << _conns[i]->toString() << " no data" << endl;
+ }
+ catch ( ... ){
+ log() << "query failed to: " << _conns[i]->toString() << " exception" << endl;
+ }
+ }
+ throw UserException( 8002 , "all servers down!" );
+ }
+
+ auto_ptr<DBClientCursor> SyncCluterConnection::getMore( const string &ns, long long cursorId, int nToReturn, int options ){
+ uassert( 10022 , "SyncCluterConnection::getMore not supported yet" , 0);
+ auto_ptr<DBClientCursor> c;
+ return c;
+ }
+
+ void SyncCluterConnection::insert( const string &ns, BSONObj obj ){
+ string errmsg;
+ if ( ! prepare( errmsg ) )
+ throw UserException( 8003 , (string)"SyncCluterConnection::insert prepare failed: " + errmsg );
+
+ for ( size_t i=0; i<_conns.size(); i++ ){
+ _conns[i]->insert( ns , obj );
+ }
+
+ _checkLast();
+ }
+
+ void SyncCluterConnection::insert( const string &ns, const vector< BSONObj >& v ){
+ uassert( 10023 , "SyncCluterConnection bulk insert not implemented" , 0);
+ }
+
+ void SyncCluterConnection::remove( const string &ns , Query query, bool justOne ){ assert(0); }
+
+ void SyncCluterConnection::update( const string &ns , Query query , BSONObj obj , bool upsert , bool multi ){ assert(0); }
+
+ string SyncCluterConnection::toString(){
+ stringstream ss;
+ ss << "SyncCluterConnection [";
+ for ( size_t i=0; i<_conns.size(); i++ ){
+ if ( i > 0 )
+ ss << ",";
+ ss << _conns[i]->toString();
+ }
+ ss << "]";
+ return ss.str();
+ }
+
+
+}
diff --git a/client/syncclusterconnection.h b/client/syncclusterconnection.h
new file mode 100644
index 0000000..c14a9bb
--- /dev/null
+++ b/client/syncclusterconnection.h
@@ -0,0 +1,57 @@
+// syncclusterconnection.h
+
+#include "../stdafx.h"
+#include "dbclient.h"
+
+namespace mongo {
+
+ /**
+ * this is a connection to a cluster of servers that operate as one
+ * for super high durability
+ */
+ class SyncCluterConnection : public DBClientWithCommands {
+ public:
+ /**
+ * @param commaSeperated should be 3 hosts comma seperated
+ */
+ SyncCluterConnection( string commaSeperated );
+ SyncCluterConnection( string a , string b , string c );
+ ~SyncCluterConnection();
+
+
+ /**
+ * @return true if all servers are up and ready for writes
+ */
+ bool prepare( string& errmsg );
+
+ /**
+ * runs fsync on all servers
+ */
+ bool fsync( string& errmsg );
+
+ // --- from DBClientInterface
+
+ virtual auto_ptr<DBClientCursor> query(const string &ns, Query query, int nToReturn, int nToSkip,
+ const BSONObj *fieldsToReturn, int queryOptions);
+
+ virtual auto_ptr<DBClientCursor> getMore( const string &ns, long long cursorId, int nToReturn, int options );
+
+ virtual void insert( const string &ns, BSONObj obj );
+
+ virtual void insert( const string &ns, const vector< BSONObj >& v );
+
+ virtual void remove( const string &ns , Query query, bool justOne );
+
+ virtual void update( const string &ns , Query query , BSONObj obj , bool upsert , bool multi );
+
+ virtual string toString();
+ private:
+
+ void _checkLast();
+
+ void _connect( string host );
+ vector<DBClientConnection*> _conns;
+ };
+
+
+};