diff options
author | Antonin Kral <a.kral@bobek.cz> | 2010-01-31 08:32:52 +0100 |
---|---|---|
committer | Antonin Kral <a.kral@bobek.cz> | 2010-01-31 08:32:52 +0100 |
commit | 4eefaf421bfeddf040d96a3dafb12e09673423d7 (patch) | |
tree | cb2e5ccc7f98158894f977ff131949da36673591 /client | |
download | mongodb-4eefaf421bfeddf040d96a3dafb12e09673423d7.tar.gz |
Imported Upstream version 1.3.1
Diffstat (limited to 'client')
-rw-r--r-- | client/clientOnly.cpp | 57 | ||||
-rw-r--r-- | client/connpool.cpp | 122 | ||||
-rw-r--r-- | client/connpool.h | 135 | ||||
-rw-r--r-- | client/dbclient.cpp | 981 | ||||
-rw-r--r-- | client/dbclient.h | 894 | ||||
-rw-r--r-- | client/examples/authTest.cpp | 53 | ||||
-rw-r--r-- | client/examples/clientTest.cpp | 214 | ||||
-rw-r--r-- | client/examples/first.cpp | 85 | ||||
-rw-r--r-- | client/examples/second.cpp | 56 | ||||
-rw-r--r-- | client/examples/tail.cpp | 55 | ||||
-rw-r--r-- | client/examples/tutorial.cpp | 67 | ||||
-rw-r--r-- | client/examples/whereExample.cpp | 68 | ||||
-rw-r--r-- | client/gridfs.cpp | 233 | ||||
-rw-r--r-- | client/gridfs.h | 203 | ||||
-rw-r--r-- | client/model.cpp | 97 | ||||
-rw-r--r-- | client/model.h | 57 | ||||
-rw-r--r-- | client/parallel.cpp | 259 | ||||
-rw-r--r-- | client/parallel.h | 195 | ||||
-rw-r--r-- | client/syncclusterconnection.cpp | 165 | ||||
-rw-r--r-- | client/syncclusterconnection.h | 57 |
20 files changed, 4053 insertions, 0 deletions
diff --git a/client/clientOnly.cpp b/client/clientOnly.cpp new file mode 100644 index 0000000..f9fc570 --- /dev/null +++ b/client/clientOnly.cpp @@ -0,0 +1,57 @@ +// clientOnly.cpp + +/* Copyright 2009 10gen Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "stdafx.h" +#include "../client/dbclient.h" +#include "../db/dbhelpers.h" +#include "../db/cmdline.h" + +namespace mongo { + + CmdLine cmdLine; + + const char * curNs = "in client mode"; + + bool dbexitCalled = false; + + void dbexit( ExitCode returnCode, const char *whyMsg ) { + dbexitCalled = true; + out() << "dbexit called" << endl; + if ( whyMsg ) + out() << " b/c " << whyMsg << endl; + out() << "exiting" << endl; + ::exit( returnCode ); + } + + bool inShutdown(){ + return dbexitCalled; + } + + string getDbContext() { + return "in client only mode"; + } + + bool haveLocalShardingInfo( const string& ns ){ + return false; + } +/* + auto_ptr<CursorIterator> Helpers::find( const char *ns , BSONObj query , bool requireIndex ){ + uassert( 10000 , "Helpers::find can't be used in client" , 0 ); + return auto_ptr<CursorIterator>(0); + } +*/ +} diff --git a/client/connpool.cpp b/client/connpool.cpp new file mode 100644 index 0000000..b332bae --- /dev/null +++ b/client/connpool.cpp @@ -0,0 +1,122 @@ +/* connpool.cpp +*/ + +/* Copyright 2009 10gen Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// _ todo: reconnect? + +#include "stdafx.h" +#include "connpool.h" +#include "../db/commands.h" + +namespace mongo { + + DBConnectionPool pool; + + DBClientBase* DBConnectionPool::get(const string& host) { + boostlock L(poolMutex); + + PoolForHost *&p = pools[host]; + if ( p == 0 ) + p = new PoolForHost(); + if ( p->pool.empty() ) { + string errmsg; + DBClientBase *c; + if( host.find(',') == string::npos ) { + DBClientConnection *cc = new DBClientConnection(true); + log(2) << "creating new connection for pool to:" << host << endl; + if ( !cc->connect(host.c_str(), errmsg) ) { + delete cc; + uassert( 11002 , (string)"dbconnectionpool: connect failed " + host , false); + return 0; + } + c = cc; + onCreate( c ); + } + else { + DBClientPaired *p = new DBClientPaired(); + if( !p->connect(host) ) { + delete p; + uassert( 11003 , (string)"dbconnectionpool: connect failed [2] " + host , false); + return 0; + } + c = p; + } + return c; + } + DBClientBase *c = p->pool.top(); + p->pool.pop(); + onHandedOut( c ); + return c; + } + + void DBConnectionPool::flush(){ + boostlock L(poolMutex); + for ( map<string,PoolForHost*>::iterator i = pools.begin(); i != pools.end(); i++ ){ + PoolForHost* p = i->second; + + vector<DBClientBase*> all; + while ( ! p->pool.empty() ){ + DBClientBase * c = p->pool.top(); + p->pool.pop(); + all.push_back( c ); + bool res; + c->isMaster( res ); + } + + for ( vector<DBClientBase*>::iterator i=all.begin(); i != all.end(); i++ ){ + p->pool.push( *i ); + } + } + } + + void DBConnectionPool::addHook( DBConnectionHook * hook ){ + _hooks.push_back( hook ); + } + + void DBConnectionPool::onCreate( DBClientBase * conn ){ + if ( _hooks.size() == 0 ) + return; + + for ( list<DBConnectionHook*>::iterator i = _hooks.begin(); i != _hooks.end(); i++ ){ + (*i)->onCreate( conn ); + } + } + + void DBConnectionPool::onHandedOut( DBClientBase * conn ){ + if ( _hooks.size() == 0 ) + return; + + for ( list<DBConnectionHook*>::iterator i = _hooks.begin(); i != _hooks.end(); i++ ){ + (*i)->onHandedOut( conn ); + } + } + + class PoolFlushCmd : public Command { + public: + PoolFlushCmd() : Command( "connpoolsync" ){} + virtual bool run(const char*, mongo::BSONObj&, std::string&, mongo::BSONObjBuilder& result, bool){ + pool.flush(); + result << "ok" << 1; + return true; + } + virtual bool slaveOk(){ + return true; + } + + } poolFlushCmd; + +} // namespace mongo diff --git a/client/connpool.h b/client/connpool.h new file mode 100644 index 0000000..34ed498 --- /dev/null +++ b/client/connpool.h @@ -0,0 +1,135 @@ +/** @file connpool.h */ + +/* Copyright 2009 10gen Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <stack> +#include "dbclient.h" + +namespace mongo { + + struct PoolForHost { + std::stack<DBClientBase*> pool; + }; + + class DBConnectionHook { + public: + virtual ~DBConnectionHook(){} + + virtual void onCreate( DBClientBase * conn ){} + virtual void onHandedOut( DBClientBase * conn ){} + + }; + + /** Database connection pool. + + Generally, use ScopedDbConnection and do not call these directly. + + This class, so far, is suitable for use with unauthenticated connections. + Support for authenticated connections requires some adjustements: please + request... + + Usage: + + { + ScopedDbConnection c("myserver"); + c.conn()... + } + */ + class DBConnectionPool { + boost::mutex poolMutex; + map<string,PoolForHost*> pools; // servername -> pool + list<DBConnectionHook*> _hooks; + + void onCreate( DBClientBase * conn ); + void onHandedOut( DBClientBase * conn ); + public: + void flush(); + DBClientBase *get(const string& host); + void release(const string& host, DBClientBase *c) { + if ( c->isFailed() ) + return; + boostlock L(poolMutex); + pools[host]->pool.push(c); + } + void addHook( DBConnectionHook * hook ); + }; + + extern DBConnectionPool pool; + + /** Use to get a connection from the pool. On exceptions things + clean up nicely. + */ + class ScopedDbConnection { + const string host; + DBClientBase *_conn; + public: + /** get the associated connection object */ + DBClientBase* operator->(){ + uassert( 11004 , "did you call done already" , _conn ); + return _conn; + } + + /** get the associated connection object */ + DBClientBase& conn() { + uassert( 11005 , "did you call done already" , _conn ); + return *_conn; + } + + /** throws UserException if can't connect */ + ScopedDbConnection(const string& _host) : + host(_host), _conn( pool.get(_host) ) { + //cout << " for: " << _host << " got conn: " << _conn << endl; + } + + /** Force closure of the connection. You should call this if you leave it in + a bad state. Destructor will do this too, but it is verbose. + */ + void kill() { + delete _conn; + _conn = 0; + } + + /** Call this when you are done with the connection. + + If you do not call done() before this object goes out of scope, + we can't be sure we fully read all expected data of a reply on the socket. so + we don't try to reuse the connection in that situation. + */ + void done() { + if ( ! _conn ) + return; + + /* we could do this, but instead of assume one is using autoreconnect mode on the connection + if ( _conn->isFailed() ) + kill(); + else + */ + pool.release(host, _conn); + _conn = 0; + } + + ~ScopedDbConnection() { + if ( _conn && ! _conn->isFailed() ) { + /* see done() comments above for why we log this line */ + log() << "~ScopedDBConnection: _conn != null" << endl; + kill(); + } + } + }; + +} // namespace mongo diff --git a/client/dbclient.cpp b/client/dbclient.cpp new file mode 100644 index 0000000..165981d --- /dev/null +++ b/client/dbclient.cpp @@ -0,0 +1,981 @@ +// dbclient.cpp - connect to a Mongo database as a database, from C++ + +/* Copyright 2009 10gen Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "stdafx.h" +#include "../db/pdfile.h" +#include "dbclient.h" +#include "../util/builder.h" +#include "../db/jsobj.h" +#include "../db/json.h" +#include "../db/instance.h" +#include "../util/md5.hpp" +#include "../db/dbmessage.h" +#include "../db/cmdline.h" + +namespace mongo { + + Query& Query::where(const string &jscode, BSONObj scope) { + /* use where() before sort() and hint() and explain(), else this will assert. */ + assert( !obj.hasField("query") ); + BSONObjBuilder b; + b.appendElements(obj); + b.appendWhere(jscode, scope); + obj = b.obj(); + return *this; + } + + void Query::makeComplex() { + if ( obj.hasElement( "query" ) ) + return; + BSONObjBuilder b; + b.append( "query", obj ); + obj = b.obj(); + } + + Query& Query::sort(const BSONObj& s) { + appendComplex( "orderby", s ); + return *this; + } + + Query& Query::hint(BSONObj keyPattern) { + appendComplex( "$hint", keyPattern ); + return *this; + } + + Query& Query::explain() { + appendComplex( "$explain", true ); + return *this; + } + + Query& Query::snapshot() { + appendComplex( "$snapshot", true ); + return *this; + } + + Query& Query::minKey( const BSONObj &val ) { + appendComplex( "$min", val ); + return *this; + } + + Query& Query::maxKey( const BSONObj &val ) { + appendComplex( "$max", val ); + return *this; + } + + bool Query::isComplex() const{ + return obj.hasElement( "query" ); + } + + BSONObj Query::getFilter() const { + if ( ! isComplex() ) + return obj; + return obj.getObjectField( "query" ); + } + BSONObj Query::getSort() const { + if ( ! isComplex() ) + return BSONObj(); + return obj.getObjectField( "orderby" ); + } + BSONObj Query::getHint() const { + if ( ! isComplex() ) + return BSONObj(); + return obj.getObjectField( "$hint" ); + } + bool Query::isExplain() const { + return isComplex() && obj.getBoolField( "$explain" ); + } + + string Query::toString() const{ + return obj.toString(); + } + + /* --- dbclientcommands --- */ + + inline bool DBClientWithCommands::isOk(const BSONObj& o) { + return o.getIntField("ok") == 1; + } + + inline bool DBClientWithCommands::runCommand(const string &dbname, const BSONObj& cmd, BSONObj &info, int options) { + string ns = dbname + ".$cmd"; + info = findOne(ns, cmd, 0 , options); + return isOk(info); + } + + /* note - we build a bson obj here -- for something that is super common like getlasterror you + should have that object prebuilt as that would be faster. + */ + bool DBClientWithCommands::simpleCommand(const string &dbname, BSONObj *info, const string &command) { + BSONObj o; + if ( info == 0 ) + info = &o; + BSONObjBuilder b; + b.append(command, 1); + return runCommand(dbname, b.done(), *info); + } + + unsigned long long DBClientWithCommands::count(const string &_ns, const BSONObj& query, int options) { + NamespaceString ns(_ns); + BSONObj cmd = BSON( "count" << ns.coll << "query" << query ); + BSONObj res; + if( !runCommand(ns.db.c_str(), cmd, res, options) ) + uasserted(11010,string("count fails:") + res.toString()); + return res.getIntField("n"); + } + + BSONObj getlasterrorcmdobj = fromjson("{getlasterror:1}"); + + BSONObj DBClientWithCommands::getLastErrorDetailed() { + BSONObj info; + runCommand("admin", getlasterrorcmdobj, info); + return info; + } + + string DBClientWithCommands::getLastError() { + BSONObj info = getLastErrorDetailed(); + BSONElement e = info["err"]; + if( e.eoo() ) return ""; + if( e.type() == Object ) return e.toString(); + return e.str(); + } + + BSONObj getpreverrorcmdobj = fromjson("{getpreverror:1}"); + + BSONObj DBClientWithCommands::getPrevError() { + BSONObj info; + runCommand("admin", getpreverrorcmdobj, info); + return info; + } + + BSONObj getnoncecmdobj = fromjson("{getnonce:1}"); + + string DBClientWithCommands::createPasswordDigest( const string & username , const string & clearTextPassword ){ + md5digest d; + { + md5_state_t st; + md5_init(&st); + md5_append(&st, (const md5_byte_t *) username.data(), username.length()); + md5_append(&st, (const md5_byte_t *) ":mongo:", 7 ); + md5_append(&st, (const md5_byte_t *) clearTextPassword.data(), clearTextPassword.length()); + md5_finish(&st, d); + } + return digestToString( d ); + } + + bool DBClientWithCommands::auth(const string &dbname, const string &username, const string &password_text, string& errmsg, bool digestPassword) { + //cout << "TEMP AUTH " << toString() << dbname << ' ' << username << ' ' << password_text << ' ' << digestPassword << endl; + + string password = password_text; + if( digestPassword ) + password = createPasswordDigest( username , password_text ); + + BSONObj info; + string nonce; + if( !runCommand(dbname, getnoncecmdobj, info) ) { + errmsg = "getnonce fails - connection problem?"; + return false; + } + { + BSONElement e = info.getField("nonce"); + assert( e.type() == String ); + nonce = e.valuestr(); + } + + BSONObj authCmd; + BSONObjBuilder b; + { + + b << "authenticate" << 1 << "nonce" << nonce << "user" << username; + md5digest d; + { + md5_state_t st; + md5_init(&st); + md5_append(&st, (const md5_byte_t *) nonce.c_str(), nonce.size() ); + md5_append(&st, (const md5_byte_t *) username.data(), username.length()); + md5_append(&st, (const md5_byte_t *) password.c_str(), password.size() ); + md5_finish(&st, d); + } + b << "key" << digestToString( d ); + authCmd = b.done(); + } + + if( runCommand(dbname, authCmd, info) ) + return true; + + errmsg = info.toString(); + return false; + } + + BSONObj ismastercmdobj = fromjson("{\"ismaster\":1}"); + + bool DBClientWithCommands::isMaster(bool& isMaster, BSONObj *info) { + BSONObj o; + if ( info == 0 ) info = &o; + bool ok = runCommand("admin", ismastercmdobj, *info); + isMaster = (info->getIntField("ismaster") == 1); + return ok; + } + + bool DBClientWithCommands::createCollection(const string &ns, unsigned size, bool capped, int max, BSONObj *info) { + BSONObj o; + if ( info == 0 ) info = &o; + BSONObjBuilder b; + b.append("create", ns); + if ( size ) b.append("size", size); + if ( capped ) b.append("capped", true); + if ( max ) b.append("max", max); + string db = nsToDatabase(ns.c_str()); + return runCommand(db.c_str(), b.done(), *info); + } + + bool DBClientWithCommands::copyDatabase(const string &fromdb, const string &todb, const string &fromhost, BSONObj *info) { + BSONObj o; + if ( info == 0 ) info = &o; + BSONObjBuilder b; + b.append("copydb", 1); + b.append("fromhost", fromhost); + b.append("fromdb", fromdb); + b.append("todb", todb); + return runCommand("admin", b.done(), *info); + } + + bool DBClientWithCommands::setDbProfilingLevel(const string &dbname, ProfilingLevel level, BSONObj *info ) { + BSONObj o; + if ( info == 0 ) info = &o; + + if ( level ) { + // Create system.profile collection. If it already exists this does nothing. + // TODO: move this into the db instead of here so that all + // drivers don't have to do this. + string ns = dbname + ".system.profile"; + createCollection(ns.c_str(), 1024 * 1024, true, 0, info); + } + + BSONObjBuilder b; + b.append("profile", (int) level); + return runCommand(dbname, b.done(), *info); + } + + BSONObj getprofilingcmdobj = fromjson("{\"profile\":-1}"); + + bool DBClientWithCommands::getDbProfilingLevel(const string &dbname, ProfilingLevel& level, BSONObj *info) { + BSONObj o; + if ( info == 0 ) info = &o; + if ( runCommand(dbname, getprofilingcmdobj, *info) ) { + level = (ProfilingLevel) info->getIntField("was"); + return true; + } + return false; + } + + BSONObj DBClientWithCommands::mapreduce(const string &ns, const string &jsmapf, const string &jsreducef, BSONObj query, const string& outputcolname) { + BSONObjBuilder b; + b.append("mapreduce", nsGetCollection(ns)); + b.appendCode("map", jsmapf.c_str()); + b.appendCode("reduce", jsreducef.c_str()); + if( !query.isEmpty() ) + b.append("query", query); + if( !outputcolname.empty() ) + b.append("out", outputcolname); + BSONObj info; + runCommand(nsGetDB(ns), b.done(), info); + return info; + } + + bool DBClientWithCommands::eval(const string &dbname, const string &jscode, BSONObj& info, BSONElement& retValue, BSONObj *args) { + BSONObjBuilder b; + b.appendCode("$eval", jscode.c_str()); + if ( args ) + b.appendArray("args", *args); + bool ok = runCommand(dbname, b.done(), info); + if ( ok ) + retValue = info.getField("retval"); + return ok; + } + + bool DBClientWithCommands::eval(const string &dbname, const string &jscode) { + BSONObj info; + BSONElement retValue; + return eval(dbname, jscode, info, retValue); + } + + list<string> DBClientWithCommands::getDatabaseNames(){ + BSONObj info; + uassert( 10005 , "listdatabases failed" , runCommand( "admin" , BSON( "listDatabases" << 1 ) , info ) ); + uassert( 10006 , "listDatabases.databases not array" , info["databases"].type() == Array ); + + list<string> names; + + BSONObjIterator i( info["databases"].embeddedObjectUserCheck() ); + while ( i.more() ){ + names.push_back( i.next().embeddedObjectUserCheck()["name"].valuestr() ); + } + + return names; + } + + list<string> DBClientWithCommands::getCollectionNames( const string& db ){ + list<string> names; + + string ns = db + ".system.namespaces"; + auto_ptr<DBClientCursor> c = query( ns.c_str() , BSONObj() ); + while ( c->more() ){ + string name = c->next()["name"].valuestr(); + if ( name.find( "$" ) != string::npos ) + continue; + names.push_back( name ); + } + return names; + } + + bool DBClientWithCommands::exists( const string& ns ){ + list<string> names; + + string db = nsGetDB( ns ) + ".system.namespaces"; + BSONObj q = BSON( "name" << ns ); + return count( db.c_str() , q ); + } + + + void testSort() { + DBClientConnection c; + string err; + if ( !c.connect("localhost", err) ) { + out() << "can't connect to server " << err << endl; + return; + } + + cout << "findOne returns:" << endl; + cout << c.findOne("test.foo", QUERY( "x" << 3 ) ).toString() << endl; + cout << c.findOne("test.foo", QUERY( "x" << 3 ).sort("name") ).toString() << endl; + + } + + /* TODO: unit tests should run this? */ + void testDbEval() { + DBClientConnection c; + string err; + if ( !c.connect("localhost", err) ) { + out() << "can't connect to server " << err << endl; + return; + } + + if( !c.auth("dwight", "u", "p", err) ) { + out() << "can't authenticate " << err << endl; + return; + } + + BSONObj info; + BSONElement retValue; + BSONObjBuilder b; + b.append("0", 99); + BSONObj args = b.done(); + bool ok = c.eval("dwight", "function() { return args[0]; }", info, retValue, &args); + out() << "eval ok=" << ok << endl; + out() << "retvalue=" << retValue.toString() << endl; + out() << "info=" << info.toString() << endl; + + out() << endl; + + int x = 3; + assert( c.eval("dwight", "function() { return 3; }", x) ); + + out() << "***\n"; + + BSONObj foo = fromjson("{\"x\":7}"); + out() << foo.toString() << endl; + int res=0; + ok = c.eval("dwight", "function(parm1) { return parm1.x; }", foo, res); + out() << ok << " retval:" << res << endl; + } + + void testPaired(); + + /* --- dbclientconnection --- */ + + bool DBClientConnection::auth(const string &dbname, const string &username, const string &password_text, string& errmsg, bool digestPassword) { + string password = password_text; + if( digestPassword ) + password = createPasswordDigest( username , password_text ); + + if( autoReconnect ) { + /* note we remember the auth info before we attempt to auth -- if the connection is broken, we will + then have it for the next autoreconnect attempt. + */ + pair<string,string> p = pair<string,string>(username, password); + authCache[dbname] = p; + } + + return DBClientBase::auth(dbname, username, password.c_str(), errmsg, false); + } + + BSONObj DBClientInterface::findOne(const string &ns, Query query, const BSONObj *fieldsToReturn, int queryOptions) { + auto_ptr<DBClientCursor> c = + this->query(ns, query, 1, 0, fieldsToReturn, queryOptions); + + massert( 10276 , "DBClientBase::findOne: transport error", c.get() ); + + if ( !c->more() ) + return BSONObj(); + + return c->next().copy(); + } + + bool DBClientConnection::connect(const string &_serverAddress, string& errmsg) { + serverAddress = _serverAddress; + + string ip; + int port; + size_t idx = serverAddress.find( ":" ); + if ( idx != string::npos ) { + port = strtol( serverAddress.substr( idx + 1 ).c_str(), 0, 10 ); + ip = serverAddress.substr( 0 , idx ); + ip = hostbyname(ip.c_str()); + } else { + port = CmdLine::DefaultDBPort; + ip = hostbyname( serverAddress.c_str() ); + } + massert( 10277 , "Unable to parse hostname", !ip.empty() ); + + // we keep around SockAddr for connection life -- maybe MessagingPort + // requires that? + server = auto_ptr<SockAddr>(new SockAddr(ip.c_str(), port)); + p = auto_ptr<MessagingPort>(new MessagingPort()); + + if ( !p->connect(*server) ) { + stringstream ss; + ss << "couldn't connect to server " << serverAddress << " " << ip << ":" << port; + errmsg = ss.str(); + failed = true; + return false; + } + return true; + } + + void DBClientConnection::_checkConnection() { + if ( !failed ) + return; + if ( lastReconnectTry && time(0)-lastReconnectTry < 2 ) + return; + if ( !autoReconnect ) + return; + + lastReconnectTry = time(0); + log() << "trying reconnect to " << serverAddress << endl; + string errmsg; + string tmp = serverAddress; + failed = false; + if ( !connect(tmp.c_str(), errmsg) ) { + log() << "reconnect " << serverAddress << " failed " << errmsg << endl; + return; + } + + log() << "reconnect " << serverAddress << " ok" << endl; + for( map< string, pair<string,string> >::iterator i = authCache.begin(); i != authCache.end(); i++ ) { + const char *dbname = i->first.c_str(); + const char *username = i->second.first.c_str(); + const char *password = i->second.second.c_str(); + if( !DBClientBase::auth(dbname, username, password, errmsg, false) ) + log() << "reconnect: auth failed db:" << dbname << " user:" << username << ' ' << errmsg << '\n'; + } + } + + auto_ptr<DBClientCursor> DBClientBase::query(const string &ns, Query query, int nToReturn, + int nToSkip, const BSONObj *fieldsToReturn, int queryOptions) { + auto_ptr<DBClientCursor> c( new DBClientCursor( this, + ns, query.obj, nToReturn, nToSkip, + fieldsToReturn, queryOptions ) ); + if ( c->init() ) + return c; + return auto_ptr< DBClientCursor >( 0 ); + } + + auto_ptr<DBClientCursor> DBClientBase::getMore( const string &ns, long long cursorId, int nToReturn, int options ) { + auto_ptr<DBClientCursor> c( new DBClientCursor( this, ns, cursorId, nToReturn, options ) ); + if ( c->init() ) + return c; + return auto_ptr< DBClientCursor >( 0 ); + } + + void DBClientBase::insert( const string & ns , BSONObj obj ) { + Message toSend; + + BufBuilder b; + int opts = 0; + b.append( opts ); + b.append( ns ); + obj.appendSelfToBufBuilder( b ); + + toSend.setData( dbInsert , b.buf() , b.len() ); + + say( toSend ); + } + + void DBClientBase::insert( const string & ns , const vector< BSONObj > &v ) { + Message toSend; + + BufBuilder b; + int opts = 0; + b.append( opts ); + b.append( ns ); + for( vector< BSONObj >::const_iterator i = v.begin(); i != v.end(); ++i ) + i->appendSelfToBufBuilder( b ); + + toSend.setData( dbInsert, b.buf(), b.len() ); + + say( toSend ); + } + + void DBClientBase::remove( const string & ns , Query obj , bool justOne ) { + Message toSend; + + BufBuilder b; + int opts = 0; + b.append( opts ); + b.append( ns ); + + int flags = 0; + if ( justOne ) + flags |= 1; + b.append( flags ); + + obj.obj.appendSelfToBufBuilder( b ); + + toSend.setData( dbDelete , b.buf() , b.len() ); + + say( toSend ); + } + + void DBClientBase::update( const string & ns , Query query , BSONObj obj , bool upsert , bool multi ) { + + BufBuilder b; + b.append( (int)0 ); // reserverd + b.append( ns ); + + int flags = 0; + if ( upsert ) flags |= UpdateOption_Upsert; + if ( multi ) flags |= UpdateOption_Multi; + b.append( flags ); + + query.obj.appendSelfToBufBuilder( b ); + obj.appendSelfToBufBuilder( b ); + + Message toSend; + toSend.setData( dbUpdate , b.buf() , b.len() ); + + say( toSend ); + } + + auto_ptr<DBClientCursor> DBClientWithCommands::getIndexes( const string &ns ){ + return query( Namespace( ns.c_str() ).getSisterNS( "system.indexes" ).c_str() , BSON( "ns" << ns ) ); + } + + void DBClientWithCommands::dropIndex( const string& ns , BSONObj keys ){ + dropIndex( ns , genIndexName( keys ) ); + } + + + void DBClientWithCommands::dropIndex( const string& ns , const string& indexName ){ + BSONObj info; + if ( ! runCommand( nsToDatabase( ns.c_str() ) , + BSON( "deleteIndexes" << NamespaceString( ns ).coll << "index" << indexName ) , + info ) ){ + log() << "dropIndex failed: " << info << endl; + uassert( 10007 , "dropIndex failed" , 0 ); + } + resetIndexCache(); + } + + void DBClientWithCommands::dropIndexes( const string& ns ){ + BSONObj info; + uassert( 10008 , "dropIndexes failed" , runCommand( nsToDatabase( ns.c_str() ) , + BSON( "deleteIndexes" << NamespaceString( ns ).coll << "index" << "*") , + info ) ); + resetIndexCache(); + } + + void DBClientWithCommands::reIndex( const string& ns ){ + list<BSONObj> all; + auto_ptr<DBClientCursor> i = getIndexes( ns ); + while ( i->more() ){ + all.push_back( i->next().getOwned() ); + } + + dropIndexes( ns ); + + for ( list<BSONObj>::iterator i=all.begin(); i!=all.end(); i++ ){ + BSONObj o = *i; + insert( Namespace( ns.c_str() ).getSisterNS( "system.indexes" ).c_str() , o ); + } + + } + + + string DBClientWithCommands::genIndexName( const BSONObj& keys ){ + stringstream ss; + + bool first = 1; + for ( BSONObjIterator i(keys); i.more(); ) { + BSONElement f = i.next(); + + if ( first ) + first = 0; + else + ss << "_"; + + ss << f.fieldName() << "_"; + if( f.isNumber() ) + ss << f.numberInt(); + } + return ss.str(); + } + + bool DBClientWithCommands::ensureIndex( const string &ns , BSONObj keys , bool unique, const string & name ) { + BSONObjBuilder toSave; + toSave.append( "ns" , ns ); + toSave.append( "key" , keys ); + + string cacheKey(ns); + cacheKey += "--"; + + if ( name != "" ) { + toSave.append( "name" , name ); + cacheKey += name; + } + else { + string nn = genIndexName( keys ); + toSave.append( "name" , nn ); + cacheKey += nn; + } + + if ( unique ) + toSave.appendBool( "unique", unique ); + + if ( _seenIndexes.count( cacheKey ) ) + return 0; + _seenIndexes.insert( cacheKey ); + + insert( Namespace( ns.c_str() ).getSisterNS( "system.indexes" ).c_str() , toSave.obj() ); + return 1; + } + + void DBClientWithCommands::resetIndexCache() { + _seenIndexes.clear(); + } + + /* -- DBClientCursor ---------------------------------------------- */ + + void assembleRequest( const string &ns, BSONObj query, int nToReturn, int nToSkip, const BSONObj *fieldsToReturn, int queryOptions, Message &toSend ) { + CHECK_OBJECT( query , "assembleRequest query" ); + // see query.h for the protocol we are using here. + BufBuilder b; + int opts = queryOptions; + b.append(opts); + b.append(ns.c_str()); + b.append(nToSkip); + b.append(nToReturn); + query.appendSelfToBufBuilder(b); + if ( fieldsToReturn ) + fieldsToReturn->appendSelfToBufBuilder(b); + toSend.setData(dbQuery, b.buf(), b.len()); + } + + void DBClientConnection::say( Message &toSend ) { + checkConnection(); + try { + port().say( toSend ); + } catch( SocketException & ) { + failed = true; + throw; + } + } + + void DBClientConnection::sayPiggyBack( Message &toSend ) { + port().piggyBack( toSend ); + } + + bool DBClientConnection::call( Message &toSend, Message &response, bool assertOk ) { + /* todo: this is very ugly messagingport::call returns an error code AND can throw + an exception. we should make it return void and just throw an exception anytime + it fails + */ + try { + if ( !port().call(toSend, response) ) { + failed = true; + if ( assertOk ) + massert( 10278 , "dbclient error communicating with server", false); + return false; + } + } + catch( SocketException & ) { + failed = true; + throw; + } + return true; + } + + void DBClientConnection::checkResponse( const char *data, int nReturned ) { + /* check for errors. the only one we really care about at + this stage is "not master" */ + if ( clientPaired && nReturned ) { + BSONObj o(data); + BSONElement e = o.firstElement(); + if ( strcmp(e.fieldName(), "$err") == 0 && + e.type() == String && strncmp(e.valuestr(), "not master", 10) == 0 ) { + clientPaired->isntMaster(); + } + } + } + + bool DBClientCursor::init() { + Message toSend; + if ( !cursorId ) { + assembleRequest( ns, query, nToReturn, nToSkip, fieldsToReturn, opts, toSend ); + } else { + BufBuilder b; + b.append( opts ); + b.append( ns.c_str() ); + b.append( nToReturn ); + b.append( cursorId ); + toSend.setData( dbGetMore, b.buf(), b.len() ); + } + if ( !connector->call( toSend, *m, false ) ) + return false; + dataReceived(); + return true; + } + + void DBClientCursor::requestMore() { + assert( cursorId && pos == nReturned ); + + BufBuilder b; + b.append(opts); + b.append(ns.c_str()); + b.append(nToReturn); + b.append(cursorId); + + Message toSend; + toSend.setData(dbGetMore, b.buf(), b.len()); + auto_ptr<Message> response(new Message()); + connector->call( toSend, *response ); + + m = response; + dataReceived(); + } + + void DBClientCursor::dataReceived() { + QueryResult *qr = (QueryResult *) m->data; + resultFlags = qr->resultFlags(); + if ( qr->resultFlags() & QueryResult::ResultFlag_CursorNotFound ) { + // cursor id no longer valid at the server. + assert( qr->cursorId == 0 ); + cursorId = 0; // 0 indicates no longer valid (dead) + // TODO: should we throw a UserException here??? + } + if ( cursorId == 0 || ! ( opts & QueryOption_CursorTailable ) ) { + // only set initially: we don't want to kill it on end of data + // if it's a tailable cursor + cursorId = qr->cursorId; + } + nReturned = qr->nReturned; + pos = 0; + data = qr->data(); + + connector->checkResponse( data, nReturned ); + /* this assert would fire the way we currently work: + assert( nReturned || cursorId == 0 ); + */ + } + + /** If true, safe to call next(). Requests more from server if necessary. */ + bool DBClientCursor::more() { + if ( pos < nReturned ) + return true; + + if ( cursorId == 0 ) + return false; + + requestMore(); + return pos < nReturned; + } + + BSONObj DBClientCursor::next() { + assert( more() ); + pos++; + BSONObj o(data); + data += o.objsize(); + return o; + } + + DBClientCursor::~DBClientCursor() { + if ( cursorId && _ownCursor ) { + BufBuilder b; + b.append( (int)0 ); // reserved + b.append( (int)1 ); // number + b.append( cursorId ); + + Message m; + m.setData( dbKillCursors , b.buf() , b.len() ); + + connector->sayPiggyBack( m ); + } + + } + + /* --- class dbclientpaired --- */ + + string DBClientPaired::toString() { + stringstream ss; + ss << "state: " << master << '\n'; + ss << "left: " << left.toStringLong() << '\n'; + ss << "right: " << right.toStringLong() << '\n'; + return ss.str(); + } + +#pragma warning(disable: 4355) + DBClientPaired::DBClientPaired() : + left(true, this), right(true, this) + { + master = NotSetL; + } +#pragma warning(default: 4355) + + /* find which server, the left or right, is currently master mode */ + void DBClientPaired::_checkMaster() { + for ( int retry = 0; retry < 2; retry++ ) { + int x = master; + for ( int pass = 0; pass < 2; pass++ ) { + DBClientConnection& c = x == 0 ? left : right; + try { + bool im; + BSONObj o; + c.isMaster(im, &o); + if ( retry ) + log() << "checkmaster: " << c.toString() << ' ' << o.toString() << '\n'; + if ( im ) { + master = (State) (x + 2); + return; + } + } + catch (AssertionException&) { + if ( retry ) + log() << "checkmaster: caught exception " << c.toString() << '\n'; + } + x = x^1; + } + sleepsecs(1); + } + + uassert( 10009 , "checkmaster: no master found", false); + } + + inline DBClientConnection& DBClientPaired::checkMaster() { + if ( master > NotSetR ) { + // a master is selected. let's just make sure connection didn't die + DBClientConnection& c = master == Left ? left : right; + if ( !c.isFailed() ) + return c; + // after a failure, on the next checkMaster, start with the other + // server -- presumably it took over. (not critical which we check first, + // just will make the failover slightly faster if we guess right) + master = master == Left ? NotSetR : NotSetL; + } + + _checkMaster(); + assert( master > NotSetR ); + return master == Left ? left : right; + } + + DBClientConnection& DBClientPaired::slaveConn(){ + DBClientConnection& m = checkMaster(); + assert( ! m.isFailed() ); + return master == Left ? right : left; + } + + bool DBClientPaired::connect(const string &serverHostname1, const string &serverHostname2) { + string errmsg; + bool l = left.connect(serverHostname1, errmsg); + bool r = right.connect(serverHostname2, errmsg); + master = l ? NotSetL : NotSetR; + if ( !l && !r ) // it would be ok to fall through, but checkMaster will then try an immediate reconnect which is slow + return false; + try { + checkMaster(); + } + catch (AssertionException&) { + return false; + } + return true; + } + + bool DBClientPaired::connect(string hostpairstring) { + size_t comma = hostpairstring.find( "," ); + uassert( 10010 , "bad hostpairstring", comma != string::npos); + return connect( hostpairstring.substr( 0 , comma ) , hostpairstring.substr( comma + 1 ) ); + } + + bool DBClientPaired::auth(const string &dbname, const string &username, const string &pwd, string& errmsg) { + DBClientConnection& m = checkMaster(); + if( !m.auth(dbname, username, pwd, errmsg) ) + return false; + /* we try to authentiate with the other half of the pair -- even if down, that way the authInfo is cached. */ + string e; + try { + if( &m == &left ) + right.auth(dbname, username, pwd, e); + else + left.auth(dbname, username, pwd, e); + } + catch( AssertionException&) { + } + return true; + } + + auto_ptr<DBClientCursor> DBClientPaired::query(const string &a, Query b, int c, int d, + const BSONObj *e, int f) + { + return checkMaster().query(a,b,c,d,e,f); + } + + BSONObj DBClientPaired::findOne(const string &a, Query b, const BSONObj *c, int d) { + return checkMaster().findOne(a,b,c,d); + } + + void testPaired() { + DBClientPaired p; + log() << "connect returns " << p.connect("localhost:27017", "localhost:27018") << endl; + + //DBClientConnection p(true); + string errmsg; + // log() << "connect " << p.connect("localhost", errmsg) << endl; + log() << "auth " << p.auth("dwight", "u", "p", errmsg) << endl; + + while( 1 ) { + sleepsecs(3); + try { + log() << "findone returns " << p.findOne("dwight.foo", BSONObj()).toString() << endl; + sleepsecs(3); + BSONObj info; + bool im; + log() << "ismaster returns " << p.isMaster(im,&info) << " info: " << info.toString() << endl; + } + catch(...) { + cout << "caught exception" << endl; + } + } + } + +} // namespace mongo diff --git a/client/dbclient.h b/client/dbclient.h new file mode 100644 index 0000000..e3f1675 --- /dev/null +++ b/client/dbclient.h @@ -0,0 +1,894 @@ +/** @file dbclient.h - connect to a Mongo database as a database, from C++ */ + +/* Copyright 2009 10gen Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "../stdafx.h" +#include "../util/message.h" +#include "../db/jsobj.h" +#include "../db/json.h" + +namespace mongo { + + /** the query field 'options' can have these bits set: */ + enum QueryOptions { + /** Tailable means cursor is not closed when the last data is retrieved. rather, the cursor marks + the final object's position. you can resume using the cursor later, from where it was located, + if more data were received. Set on dbQuery and dbGetMore. + + like any "latent cursor", the cursor may become invalid at some point -- for example if that + final object it references were deleted. Thus, you should be prepared to requery if you get back + ResultFlag_CursorNotFound. + */ + QueryOption_CursorTailable = 1 << 1, + + /** allow query of replica slave. normally these return an error except for namespace "local". + */ + QueryOption_SlaveOk = 1 << 2, + + // findingStart mode is used to find the first operation of interest when + // we are scanning through a repl log. For efficiency in the common case, + // where the first operation of interest is closer to the tail than the head, + // we start from the tail of the log and work backwards until we find the + // first operation of interest. Then we scan forward from that first operation, + // actually returning results to the client. During the findingStart phase, + // we release the db mutex occasionally to avoid blocking the db process for + // an extended period of time. + QueryOption_OplogReplay = 1 << 3, + + /** The server normally times out idle cursors after an inactivy period to prevent excess memory use + Set this option to prevent that. + */ + QueryOption_NoCursorTimeout = 1 << 4, + + /** Use with QueryOption_CursorTailable. If we are at the end of the data, block for a while rather + than returning no data. After a timeout period, we do return as normal. + */ + QueryOption_AwaitData = 1 << 5 + + }; + + enum UpdateOptions { + /** Upsert - that is, insert the item if no matching item is found. */ + UpdateOption_Upsert = 1 << 0, + + /** Update multiple documents (if multiple documents match query expression). + (Default is update a single document and stop.) */ + UpdateOption_Multi = 1 << 1 + }; + + class BSONObj; + + /** Represents a Mongo query expression. Typically one uses the QUERY(...) macro to construct a Query object. + Examples: + QUERY( "age" << 33 << "school" << "UCLA" ).sort("name") + QUERY( "age" << GT << 30 << LT << 50 ) + */ + class Query { + public: + BSONObj obj; + Query() : obj(BSONObj()) { } + Query(const BSONObj& b) : obj(b) { } + Query(const string &json) : + obj(fromjson(json)) { } + Query(const char * json) : + obj(fromjson(json)) { } + + /** Add a sort (ORDER BY) criteria to the query expression. + @param sortPattern the sort order template. For example to order by name ascending, time descending: + { name : 1, ts : -1 } + i.e. + BSON( "name" << 1 << "ts" << -1 ) + or + fromjson(" name : 1, ts : -1 ") + */ + Query& sort(const BSONObj& sortPattern); + + /** Add a sort (ORDER BY) criteria to the query expression. + This version of sort() assumes you want to sort on a single field. + @param asc = 1 for ascending order + asc = -1 for descending order + */ + Query& sort(const string &field, int asc = 1) { sort( BSON( field << asc ) ); return *this; } + + /** Provide a hint to the query. + @param keyPattern Key pattern for the index to use. + Example: + hint("{ts:1}") + */ + Query& hint(BSONObj keyPattern); + Query& hint(const string &jsonKeyPatt) { return hint(fromjson(jsonKeyPatt)); } + + /** Provide min and/or max index limits for the query. + min <= x < max + */ + Query& minKey(const BSONObj &val); + /** + max is exclusive + */ + Query& maxKey(const BSONObj &val); + + /** Return explain information about execution of this query instead of the actual query results. + Normally it is easier to use the mongo shell to run db.find(...).explain(). + */ + Query& explain(); + + /** Use snapshot mode for the query. Snapshot mode assures no duplicates are returned, or objects missed, which were + present at both the start and end of the query's execution (if an object is new during the query, or deleted during + the query, it may or may not be returned, even with snapshot mode). + + Note that short query responses (less than 1MB) are always effectively snapshotted. + + Currently, snapshot mode may not be used with sorting or explicit hints. + */ + Query& snapshot(); + + /** Queries to the Mongo database support a $where parameter option which contains + a javascript function that is evaluated to see whether objects being queried match + its criteria. Use this helper to append such a function to a query object. + Your query may also contain other traditional Mongo query terms. + + @param jscode The javascript function to evaluate against each potential object + match. The function must return true for matched objects. Use the this + variable to inspect the current object. + @param scope SavedContext for the javascript object. List in a BSON object any + variables you would like defined when the jscode executes. One can think + of these as "bind variables". + + Examples: + conn.findOne("test.coll", Query("{a:3}").where("this.b == 2 || this.c == 3")); + Query badBalance = Query().where("this.debits - this.credits < 0"); + */ + Query& where(const string &jscode, BSONObj scope); + Query& where(const string &jscode) { return where(jscode, BSONObj()); } + + /** + * if this query has an orderby, hint, or some other field + */ + bool isComplex() const; + + BSONObj getFilter() const; + BSONObj getSort() const; + BSONObj getHint() const; + bool isExplain() const; + + string toString() const; + operator string() const { return toString(); } + private: + void makeComplex(); + template< class T > + void appendComplex( const char *fieldName, const T& val ) { + makeComplex(); + BSONObjBuilder b; + b.appendElements(obj); + b.append(fieldName, val); + obj = b.obj(); + } + }; + +/** Typically one uses the QUERY(...) macro to construct a Query object. + Example: QUERY( "age" << 33 << "school" << "UCLA" ) +*/ +#define QUERY(x) Query( BSON(x) ) + + /** + interface that handles communication with the db + */ + class DBConnector { + public: + virtual ~DBConnector() {} + virtual bool call( Message &toSend, Message &response, bool assertOk=true ) = 0; + virtual void say( Message &toSend ) = 0; + virtual void sayPiggyBack( Message &toSend ) = 0; + virtual void checkResponse( const string &data, int nReturned ) {} + }; + + /** Queries return a cursor object */ + class DBClientCursor : boost::noncopyable { + friend class DBClientBase; + bool init(); + public: + /** If true, safe to call next(). Requests more from server if necessary. */ + bool more(); + + /** next + @return next object in the result cursor. + on an error at the remote server, you will get back: + { $err: <string> } + if you do not want to handle that yourself, call nextSafe(). + */ + BSONObj next(); + + /** throws AssertionException if get back { $err : ... } */ + BSONObj nextSafe() { + BSONObj o = next(); + BSONElement e = o.firstElement(); + assert( strcmp(e.fieldName(), "$err") != 0 ); + return o; + } + + /** + iterate the rest of the cursor and return the number if items + */ + int itcount(){ + int c = 0; + while ( more() ){ + next(); + c++; + } + return c; + } + + /** cursor no longer valid -- use with tailable cursors. + note you should only rely on this once more() returns false; + 'dead' may be preset yet some data still queued and locally + available from the dbclientcursor. + */ + bool isDead() const { + return cursorId == 0; + } + + bool tailable() const { + return (opts & QueryOption_CursorTailable) != 0; + } + + bool hasResultFlag( int flag ){ + return (resultFlags & flag) != 0; + } + public: + DBClientCursor( DBConnector *_connector, const string &_ns, BSONObj _query, int _nToReturn, + int _nToSkip, const BSONObj *_fieldsToReturn, int queryOptions ) : + connector(_connector), + ns(_ns), + query(_query), + nToReturn(_nToReturn), + nToSkip(_nToSkip), + fieldsToReturn(_fieldsToReturn), + opts(queryOptions), + m(new Message()), + cursorId(), + nReturned(), + pos(), + data(), + _ownCursor( true ) { + } + + DBClientCursor( DBConnector *_connector, const string &_ns, long long _cursorId, int _nToReturn, int options ) : + connector(_connector), + ns(_ns), + nToReturn( _nToReturn ), + opts( options ), + m(new Message()), + cursorId( _cursorId ), + nReturned(), + pos(), + data(), + _ownCursor( true ) { + } + + virtual ~DBClientCursor(); + + long long getCursorId() const { return cursorId; } + + /** by default we "own" the cursor and will send the server a KillCursor + message when ~DBClientCursor() is called. This function overrides that. + */ + void decouple() { _ownCursor = false; } + + private: + DBConnector *connector; + string ns; + BSONObj query; + int nToReturn; + int nToSkip; + const BSONObj *fieldsToReturn; + int opts; + auto_ptr<Message> m; + + int resultFlags; + long long cursorId; + int nReturned; + int pos; + const char *data; + void dataReceived(); + void requestMore(); + bool _ownCursor; // see decouple() + }; + + /** + The interface that any db connection should implement + */ + class DBClientInterface : boost::noncopyable { + public: + virtual auto_ptr<DBClientCursor> query(const string &ns, Query query, int nToReturn = 0, int nToSkip = 0, + const BSONObj *fieldsToReturn = 0, int queryOptions = 0) = 0; + + virtual auto_ptr<DBClientCursor> getMore( const string &ns, long long cursorId, int nToReturn = 0, int options = 0 ) = 0; + + virtual void insert( const string &ns, BSONObj obj ) = 0; + + virtual void insert( const string &ns, const vector< BSONObj >& v ) = 0; + + virtual void remove( const string &ns , Query query, bool justOne = 0 ) = 0; + + virtual void update( const string &ns , Query query , BSONObj obj , bool upsert = 0 , bool multi = 0 ) = 0; + + virtual ~DBClientInterface() { } + + /** + @return a single object that matches the query. if none do, then the object is empty + @throws AssertionException + */ + virtual BSONObj findOne(const string &ns, Query query, const BSONObj *fieldsToReturn = 0, int queryOptions = 0); + + + }; + + /** + DB "commands" + Basically just invocations of connection.$cmd.findOne({...}); + */ + class DBClientWithCommands : public DBClientInterface { + bool isOk(const BSONObj&); + set<string> _seenIndexes; + public: + + /** helper function. run a simple command where the command expression is simply + { command : 1 } + @param info -- where to put result object. may be null if caller doesn't need that info + @param command -- command name + @return true if the command returned "ok". + */ + bool simpleCommand(const string &dbname, BSONObj *info, const string &command); + + /** Run a database command. Database commands are represented as BSON objects. Common database + commands have prebuilt helper functions -- see below. If a helper is not available you can + directly call runCommand. + + @param dbname database name. Use "admin" for global administrative commands. + @param cmd the command object to execute. For example, { ismaster : 1 } + @param info the result object the database returns. Typically has { ok : ..., errmsg : ... } fields + set. + @return true if the command returned "ok". + */ + bool runCommand(const string &dbname, const BSONObj& cmd, BSONObj &info, int options=0); + + /** Authorize access to a particular database. + Authentication is separate for each database on the server -- you may authenticate for any + number of databases on a single connection. + The "admin" database is special and once authenticated provides access to all databases on the + server. + @param digestPassword if password is plain text, set this to true. otherwise assumed to be pre-digested + @return true if successful + */ + virtual bool auth(const string &dbname, const string &username, const string &pwd, string& errmsg, bool digestPassword = true); + + /** count number of objects in collection ns that match the query criteria specified + throws UserAssertion if database returns an error + */ + unsigned long long count(const string &ns, const BSONObj& query = BSONObj(), int options=0 ); + + string createPasswordDigest( const string &username , const string &clearTextPassword ); + + /** returns true in isMaster parm if this db is the current master + of a replica pair. + + pass in info for more details e.g.: + { "ismaster" : 1.0 , "msg" : "not paired" , "ok" : 1.0 } + + returns true if command invoked successfully. + */ + virtual bool isMaster(bool& isMaster, BSONObj *info=0); + + /** + Create a new collection in the database. Normally, collection creation is automatic. You would + use this function if you wish to specify special options on creation. + + If the collection already exists, no action occurs. + + ns: fully qualified collection name + size: desired initial extent size for the collection. + Must be <= 1000000000 for normal collections. + For fixed size (capped) collections, this size is the total/max size of the + collection. + capped: if true, this is a fixed size collection (where old data rolls out). + max: maximum number of objects if capped (optional). + + returns true if successful. + */ + bool createCollection(const string &ns, unsigned size = 0, bool capped = false, int max = 0, BSONObj *info = 0); + + /** Get error result from the last operation on this connection. + @return error message text, or empty string if no error. + */ + string getLastError(); + /** Get error result from the last operation on this connection. + @return full error object. + */ + BSONObj getLastErrorDetailed(); + + /** Return the last error which has occurred, even if not the very last operation. + + @return { err : <error message>, nPrev : <how_many_ops_back_occurred>, ok : 1 } + + result.err will be null if no error has occurred. + */ + BSONObj getPrevError(); + + /** Reset the previous error state for this connection (accessed via getLastError and + getPrevError). Useful when performing several operations at once and then checking + for an error after attempting all operations. + */ + bool resetError() { return simpleCommand("admin", 0, "reseterror"); } + + /** Delete the specified collection. */ + virtual bool dropCollection( const string &ns ){ + string db = nsGetDB( ns ); + string coll = nsGetCollection( ns ); + uassert( 10011 , "no collection name", coll.size() ); + + BSONObj info; + + bool res = runCommand( db.c_str() , BSON( "drop" << coll ) , info ); + resetIndexCache(); + return res; + } + + /** Perform a repair and compaction of the specified database. May take a long time to run. Disk space + must be available equal to the size of the database while repairing. + */ + bool repairDatabase(const string &dbname, BSONObj *info = 0) { + return simpleCommand(dbname, info, "repairDatabase"); + } + + /** Copy database from one server or name to another server or name. + + Generally, you should dropDatabase() first as otherwise the copied information will MERGE + into whatever data is already present in this database. + + For security reasons this function only works when you are authorized to access the "admin" db. However, + if you have access to said db, you can copy any database from one place to another. + TODO: this needs enhancement to be more flexible in terms of security. + + This method provides a way to "rename" a database by copying it to a new db name and + location. The copy is "repaired" and compacted. + + fromdb database name from which to copy. + todb database name to copy to. + fromhost hostname of the database (and optionally, ":port") from which to + copy the data. copies from self if "". + + returns true if successful + */ + bool copyDatabase(const string &fromdb, const string &todb, const string &fromhost = "", BSONObj *info = 0); + + /** The Mongo database provides built-in performance profiling capabilities. Uset setDbProfilingLevel() + to enable. Profiling information is then written to the system.profiling collection, which one can + then query. + */ + enum ProfilingLevel { + ProfileOff = 0, + ProfileSlow = 1, // log very slow (>100ms) operations + ProfileAll = 2 + }; + bool setDbProfilingLevel(const string &dbname, ProfilingLevel level, BSONObj *info = 0); + bool getDbProfilingLevel(const string &dbname, ProfilingLevel& level, BSONObj *info = 0); + + /** Run a map/reduce job on the server. + + See http://www.mongodb.org/display/DOCS/MapReduce + + ns namespace (db+collection name) of input data + jsmapf javascript map function code + jsreducef javascript reduce function code. + query optional query filter for the input + output optional permanent output collection name. if not specified server will + generate a temporary collection and return its name. + + returns a result object which contains: + { result : <collection_name>,
+ numObjects : <number_of_objects_scanned>,
+ timeMillis : <job_time>,
+ ok : <1_if_ok>,
+ [, err : <errmsg_if_error>]
+ } + + For example one might call: + result.getField("ok").trueValue() + on the result to check if ok. + */ + BSONObj mapreduce(const string &ns, const string &jsmapf, const string &jsreducef, BSONObj query = BSONObj(), const string& output = ""); + + /** Run javascript code on the database server. + dbname database SavedContext in which the code runs. The javascript variable 'db' will be assigned + to this database when the function is invoked. + jscode source code for a javascript function. + info the command object which contains any information on the invocation result including + the return value and other information. If an error occurs running the jscode, error + information will be in info. (try "out() << info.toString()") + retValue return value from the jscode function. + args args to pass to the jscode function. when invoked, the 'args' variable will be defined + for use by the jscode. + + returns true if runs ok. + + See testDbEval() in dbclient.cpp for an example of usage. + */ + bool eval(const string &dbname, const string &jscode, BSONObj& info, BSONElement& retValue, BSONObj *args = 0); + + /** + + */ + bool validate( const string &ns , bool scandata=true ){ + BSONObj cmd = BSON( "validate" << nsGetCollection( ns ) << "scandata" << scandata ); + BSONObj info; + return runCommand( nsGetDB( ns ).c_str() , cmd , info ); + } + + /* The following helpers are simply more convenient forms of eval() for certain common cases */ + + /* invocation with no return value of interest -- with or without one simple parameter */ + bool eval(const string &dbname, const string &jscode); + template< class T > + bool eval(const string &dbname, const string &jscode, T parm1) { + BSONObj info; + BSONElement retValue; + BSONObjBuilder b; + b.append("0", parm1); + BSONObj args = b.done(); + return eval(dbname, jscode, info, retValue, &args); + } + + /** eval invocation with one parm to server and one numeric field (either int or double) returned */ + template< class T, class NumType > + bool eval(const string &dbname, const string &jscode, T parm1, NumType& ret) { + BSONObj info; + BSONElement retValue; + BSONObjBuilder b; + b.append("0", parm1); + BSONObj args = b.done(); + if ( !eval(dbname, jscode, info, retValue, &args) ) + return false; + ret = (NumType) retValue.number(); + return true; + } + + /** + get a list of all the current databases + */ + list<string> getDatabaseNames(); + + /** + get a list of all the current collections in db + */ + list<string> getCollectionNames( const string& db ); + + bool exists( const string& ns ); + + + /** Create an index if it does not already exist. + ensureIndex calls are remembered so it is safe/fast to call this function many + times in your code. + @param ns collection to be indexed + @param keys the "key pattern" for the index. e.g., { name : 1 } + @param unique if true, indicates that key uniqueness should be enforced for this index + @param name if not isn't specified, it will be created from the keys (recommended) + @return whether or not sent message to db. + should be true on first call, false on subsequent unless resetIndexCache was called + */ + virtual bool ensureIndex( const string &ns , BSONObj keys , bool unique = false, const string &name = "" ); + + /** + clears the index cache, so the subsequent call to ensureIndex for any index will go to the server + */ + virtual void resetIndexCache(); + + virtual auto_ptr<DBClientCursor> getIndexes( const string &ns ); + + virtual void dropIndex( const string& ns , BSONObj keys ); + virtual void dropIndex( const string& ns , const string& indexName ); + + /** + drops all indexes for the collection + */ + virtual void dropIndexes( const string& ns ); + + virtual void reIndex( const string& ns ); + + string genIndexName( const BSONObj& keys ); + + /** Erase / drop an entire database */ + virtual bool dropDatabase(const string &dbname, BSONObj *info = 0) { + bool ret = simpleCommand(dbname, info, "dropDatabase"); + resetIndexCache(); + return ret; + } + + virtual string toString() = 0; + + /** @return the database name portion of an ns string */ + string nsGetDB( const string &ns ){ + string::size_type pos = ns.find( "." ); + if ( pos == string::npos ) + return ns; + + return ns.substr( 0 , pos ); + } + + /** @return the collection name portion of an ns string */ + string nsGetCollection( const string &ns ){ + string::size_type pos = ns.find( "." ); + if ( pos == string::npos ) + return ""; + + return ns.substr( pos + 1 ); + } + + }; + + /** + abstract class that implements the core db operations + */ + class DBClientBase : public DBClientWithCommands, public DBConnector { + public: + /** send a query to the database. + ns: namespace to query, format is <dbname>.<collectname>[.<collectname>]* + query: query to perform on the collection. this is a BSONObj (binary JSON) + You may format as + { query: { ... }, orderby: { ... } } + to specify a sort order. + nToReturn: n to return. 0 = unlimited + nToSkip: start with the nth item + fieldsToReturn: + optional template of which fields to select. if unspecified, returns all fields + queryOptions: see options enum at top of this file + + @return cursor. 0 if error (connection failure) + @throws AssertionException + */ + virtual auto_ptr<DBClientCursor> query(const string &ns, Query query, int nToReturn = 0, int nToSkip = 0, + const BSONObj *fieldsToReturn = 0, int queryOptions = 0); + + /** @param cursorId id of cursor to retrieve + @return an handle to a previously allocated cursor + @throws AssertionException + */ + virtual auto_ptr<DBClientCursor> getMore( const string &ns, long long cursorId, int nToReturn = 0, int options = 0 ); + + /** + insert an object into the database + */ + virtual void insert( const string &ns , BSONObj obj ); + + /** + insert a vector of objects into the database + */ + virtual void insert( const string &ns, const vector< BSONObj >& v ); + + /** + remove matching objects from the database + @param justOne if this true, then once a single match is found will stop + */ + virtual void remove( const string &ns , Query q , bool justOne = 0 ); + + /** + updates objects matching query + */ + virtual void update( const string &ns , Query query , BSONObj obj , bool upsert = 0 , bool multi = 0 ); + + virtual string getServerAddress() const = 0; + + virtual bool isFailed() const = 0; + + }; + + class DBClientPaired; + + class ConnectException : public UserException { + public: + ConnectException(string msg) : UserException(9000,msg) { } + }; + + /** + A basic connection to the database. + This is the main entry point for talking to a simple Mongo setup + */ + class DBClientConnection : public DBClientBase { + DBClientPaired *clientPaired; + auto_ptr<MessagingPort> p; + auto_ptr<SockAddr> server; + bool failed; // true if some sort of fatal error has ever happened + bool autoReconnect; + time_t lastReconnectTry; + string serverAddress; // remember for reconnects + void _checkConnection(); + void checkConnection() { if( failed ) _checkConnection(); } + map< string, pair<string,string> > authCache; + public: + + /** + @param _autoReconnect if true, automatically reconnect on a connection failure + @param cp used by DBClientPaired. You do not need to specify this parameter + */ + DBClientConnection(bool _autoReconnect=false,DBClientPaired* cp=0) : + clientPaired(cp), failed(false), autoReconnect(_autoReconnect), lastReconnectTry(0) { } + + /** Connect to a Mongo database server. + + If autoReconnect is true, you can try to use the DBClientConnection even when + false was returned -- it will try to connect again. + + @param serverHostname host to connect to. can include port number ( 127.0.0.1 , 127.0.0.1:5555 ) + @param errmsg any relevant error message will appended to the string + @return false if fails to connect. + */ + virtual bool connect(const string &serverHostname, string& errmsg); + + /** Connect to a Mongo database server. Exception throwing version. + Throws a UserException if cannot connect. + + If autoReconnect is true, you can try to use the DBClientConnection even when + false was returned -- it will try to connect again. + + @param serverHostname host to connect to. can include port number ( 127.0.0.1 , 127.0.0.1:5555 ) + */ + void connect(string serverHostname) { + string errmsg; + if( !connect(serverHostname.c_str(), errmsg) ) + throw ConnectException(string("can't connect ") + errmsg); + } + + virtual bool auth(const string &dbname, const string &username, const string &pwd, string& errmsg, bool digestPassword = true); + + virtual auto_ptr<DBClientCursor> query(const string &ns, Query query, int nToReturn = 0, int nToSkip = 0, + const BSONObj *fieldsToReturn = 0, int queryOptions = 0) { + checkConnection(); + return DBClientBase::query( ns, query, nToReturn, nToSkip, fieldsToReturn, queryOptions ); + } + + /** + @return true if this connection is currently in a failed state. When autoreconnect is on, + a connection will transition back to an ok state after reconnecting. + */ + bool isFailed() const { + return failed; + } + + MessagingPort& port() { + return *p.get(); + } + + string toStringLong() const { + stringstream ss; + ss << serverAddress; + if ( failed ) ss << " failed"; + return ss.str(); + } + + /** Returns the address of the server */ + string toString() { + return serverAddress; + } + + string getServerAddress() const { + return serverAddress; + } + + protected: + virtual bool call( Message &toSend, Message &response, bool assertOk = true ); + virtual void say( Message &toSend ); + virtual void sayPiggyBack( Message &toSend ); + virtual void checkResponse( const char *data, int nReturned ); + }; + + /** Use this class to connect to a replica pair of servers. The class will manage + checking for which server in a replica pair is master, and do failover automatically. + + On a failover situation, expect at least one operation to return an error (throw + an exception) before the failover is complete. Operations are not retried. + */ + class DBClientPaired : public DBClientBase { + DBClientConnection left,right; + enum State { + NotSetL=0, + NotSetR=1, + Left, Right + } master; + + void _checkMaster(); + DBClientConnection& checkMaster(); + + public: + /** Call connect() after constructing. autoReconnect is always on for DBClientPaired connections. */ + DBClientPaired(); + + /** Returns false is neither member of the pair were reachable, or neither is + master, although, + when false returned, you can still try to use this connection object, it will + try reconnects. + */ + bool connect(const string &serverHostname1, const string &serverHostname2); + + /** Connect to a server pair using a host pair string of the form + hostname[:port],hostname[:port] + */ + bool connect(string hostpairstring); + + /** Authorize. Authorizes both sides of the pair as needed. + */ + bool auth(const string &dbname, const string &username, const string &pwd, string& errmsg); + + /** throws userassertion "no master found" */ + virtual + auto_ptr<DBClientCursor> query(const string &ns, Query query, int nToReturn = 0, int nToSkip = 0, + const BSONObj *fieldsToReturn = 0, int queryOptions = 0); + + /** throws userassertion "no master found" */ + virtual + BSONObj findOne(const string &ns, Query query, const BSONObj *fieldsToReturn = 0, int queryOptions = 0); + + /** insert */ + virtual void insert( const string &ns , BSONObj obj ) { + checkMaster().insert(ns, obj); + } + + /** insert multiple objects. Note that single object insert is asynchronous, so this version + is only nominally faster and not worth a special effort to try to use. */ + virtual void insert( const string &ns, const vector< BSONObj >& v ) { + checkMaster().insert(ns, v); + } + + /** remove */ + virtual void remove( const string &ns , Query obj , bool justOne = 0 ) { + checkMaster().remove(ns, obj, justOne); + } + + /** update */ + virtual void update( const string &ns , Query query , BSONObj obj , bool upsert = 0 , bool multi = 0 ) { + return checkMaster().update(ns, query, obj, upsert,multi); + } + + string toString(); + + /* this is the callback from our underlying connections to notify us that we got a "not master" error. + */ + void isntMaster() { + master = ( ( master == Left ) ? NotSetR : NotSetL ); + } + + string getServerAddress() const { + return left.getServerAddress() + "," + right.getServerAddress(); + } + + DBClientConnection& slaveConn(); + + /* TODO - not yet implemented. mongos may need these. */ + virtual bool call( Message &toSend, Message &response, bool assertOk=true ) { assert(false); return false; } + virtual void say( Message &toSend ) { assert(false); } + virtual void sayPiggyBack( Message &toSend ) { assert(false); } + virtual void checkResponse( const char *data, int nReturned ) { assert(false); } + + bool isFailed() const { + // TODO: this really should check isFailed on current master as well + return master > NotSetR; + } + }; + + + DBClientBase * createDirectClient(); + +} // namespace mongo diff --git a/client/examples/authTest.cpp b/client/examples/authTest.cpp new file mode 100644 index 0000000..77ce12d --- /dev/null +++ b/client/examples/authTest.cpp @@ -0,0 +1,53 @@ +// authTest.cpp + +/* Copyright 2009 10gen Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <iostream> + +#include "client/dbclient.h" + +using namespace mongo; + +int main( int argc, const char **argv ) { + + const char *port = "27017"; + if ( argc != 1 ) { + if ( argc != 3 ) + throw -12; + port = argv[ 2 ]; + } + + DBClientConnection conn; + string errmsg; + if ( ! conn.connect( string( "127.0.0.1:" ) + port , errmsg ) ) { + cout << "couldn't connect : " << errmsg << endl; + throw -11; + } + + { // clean up old data from any previous tests + conn.remove( "test.system.users" , BSONObj() ); + } + + conn.insert( "test.system.users" , BSON( "user" << "eliot" << "pwd" << conn.createPasswordDigest( "eliot" , "bar" ) ) ); + + errmsg.clear(); + bool ok = conn.auth( "test" , "eliot" , "bar" , errmsg ); + if ( ! ok ) + cout << errmsg << endl; + assert( ok ); + + assert( ! conn.auth( "test" , "eliot" , "bars" , errmsg ) ); +} diff --git a/client/examples/clientTest.cpp b/client/examples/clientTest.cpp new file mode 100644 index 0000000..bbb82f6 --- /dev/null +++ b/client/examples/clientTest.cpp @@ -0,0 +1,214 @@ +// clientTest.cpp + +/* Copyright 2009 10gen Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * a simple test for the c++ driver + */ + +#include <iostream> + +#include "client/dbclient.h" + +using namespace std; +using namespace mongo; + +int main( int argc, const char **argv ) { + + const char *port = "27017"; + if ( argc != 1 ) { + if ( argc != 3 ) + throw -12; + port = argv[ 2 ]; + } + + DBClientConnection conn; + string errmsg; + if ( ! conn.connect( string( "127.0.0.1:" ) + port , errmsg ) ) { + cout << "couldn't connect : " << errmsg << endl; + throw -11; + } + + const char * ns = "test.test1"; + + conn.dropCollection(ns); + + // clean up old data from any previous tests + conn.remove( ns, BSONObj() ); + assert( conn.findOne( ns , BSONObj() ).isEmpty() ); + + // test insert + conn.insert( ns ,BSON( "name" << "eliot" << "num" << 1 ) ); + assert( ! conn.findOne( ns , BSONObj() ).isEmpty() ); + + // test remove + conn.remove( ns, BSONObj() ); + assert( conn.findOne( ns , BSONObj() ).isEmpty() ); + + + // insert, findOne testing + conn.insert( ns , BSON( "name" << "eliot" << "num" << 1 ) ); + { + BSONObj res = conn.findOne( ns , BSONObj() ); + assert( strstr( res.getStringField( "name" ) , "eliot" ) ); + assert( ! strstr( res.getStringField( "name2" ) , "eliot" ) ); + assert( 1 == res.getIntField( "num" ) ); + } + + + // cursor + conn.insert( ns ,BSON( "name" << "sara" << "num" << 2 ) ); + { + auto_ptr<DBClientCursor> cursor = conn.query( ns , BSONObj() ); + int count = 0; + while ( cursor->more() ) { + count++; + BSONObj obj = cursor->next(); + } + assert( count == 2 ); + } + + { + auto_ptr<DBClientCursor> cursor = conn.query( ns , BSON( "num" << 1 ) ); + int count = 0; + while ( cursor->more() ) { + count++; + BSONObj obj = cursor->next(); + } + assert( count == 1 ); + } + + { + auto_ptr<DBClientCursor> cursor = conn.query( ns , BSON( "num" << 3 ) ); + int count = 0; + while ( cursor->more() ) { + count++; + BSONObj obj = cursor->next(); + } + assert( count == 0 ); + } + + // update + { + BSONObj res = conn.findOne( ns , BSONObjBuilder().append( "name" , "eliot" ).obj() ); + assert( ! strstr( res.getStringField( "name2" ) , "eliot" ) ); + + BSONObj after = BSONObjBuilder().appendElements( res ).append( "name2" , "h" ).obj(); + + conn.update( ns , BSONObjBuilder().append( "name" , "eliot2" ).obj() , after ); + res = conn.findOne( ns , BSONObjBuilder().append( "name" , "eliot" ).obj() ); + assert( ! strstr( res.getStringField( "name2" ) , "eliot" ) ); + assert( conn.findOne( ns , BSONObjBuilder().append( "name" , "eliot2" ).obj() ).isEmpty() ); + + conn.update( ns , BSONObjBuilder().append( "name" , "eliot" ).obj() , after ); + res = conn.findOne( ns , BSONObjBuilder().append( "name" , "eliot" ).obj() ); + assert( strstr( res.getStringField( "name" ) , "eliot" ) ); + assert( strstr( res.getStringField( "name2" ) , "h" ) ); + assert( conn.findOne( ns , BSONObjBuilder().append( "name" , "eliot2" ).obj() ).isEmpty() ); + + // upsert + conn.update( ns , BSONObjBuilder().append( "name" , "eliot2" ).obj() , after , 1 ); + assert( ! conn.findOne( ns , BSONObjBuilder().append( "name" , "eliot" ).obj() ).isEmpty() ); + + } + + { // ensure index + assert( conn.ensureIndex( ns , BSON( "name" << 1 ) ) ); + assert( ! conn.ensureIndex( ns , BSON( "name" << 1 ) ) ); + } + + { // hint related tests + assert( conn.findOne(ns, "{}")["name"].str() == "sara" ); + + assert( conn.findOne(ns, "{ name : 'eliot' }")["name"].str() == "eliot" ); + assert( conn.getLastError() == "" ); + + // nonexistent index test + assert( conn.findOne(ns, Query("{name:\"eliot\"}").hint("{foo:1}")).hasElement("$err") ); + assert( conn.getLastError() == "bad hint" ); + conn.resetError(); + assert( conn.getLastError() == "" ); + + //existing index + assert( conn.findOne(ns, Query("{name:'eliot'}").hint("{name:1}")).hasElement("name") ); + + // run validate + assert( conn.validate( ns ) ); + } + + { // timestamp test + + const char * tsns = "test.tstest1"; + conn.dropCollection( tsns ); + + { + mongo::BSONObjBuilder b; + b.appendTimestamp( "ts" ); + conn.insert( tsns , b.obj() ); + } + + mongo::BSONObj out = conn.findOne( tsns , mongo::BSONObj() ); + Date_t oldTime = out["ts"].timestampTime(); + unsigned int oldInc = out["ts"].timestampInc(); + + { + mongo::BSONObjBuilder b1; + b1.append( out["_id"] ); + + mongo::BSONObjBuilder b2; + b2.append( out["_id"] ); + b2.appendTimestamp( "ts" ); + + conn.update( tsns , b1.obj() , b2.obj() ); + } + + BSONObj found = conn.findOne( tsns , mongo::BSONObj() ); + assert( ( oldTime < found["ts"].timestampTime() ) || + ( oldInc + 1 == found["ts"].timestampInc() ) ); + + } + + { // check that killcursors doesn't affect last error + assert( conn.getLastError().empty() ); + + BufBuilder b; + b.append( (int)0 ); // reserved + b.append( (int)-1 ); // invalid # of cursors triggers exception + b.append( (int)-1 ); // bogus cursor id + + Message m; + m.setData( dbKillCursors, b.buf(), b.len() ); + + // say() is protected in DBClientConnection, so get superclass + static_cast< DBConnector* >( &conn )->say( m ); + + assert( conn.getLastError().empty() ); + } + + { + list<string> l = conn.getDatabaseNames(); + for ( list<string>::iterator i = l.begin(); i != l.end(); i++ ){ + cout << "db name : " << *i << endl; + } + + l = conn.getCollectionNames( "test" ); + for ( list<string>::iterator i = l.begin(); i != l.end(); i++ ){ + cout << "coll name : " << *i << endl; + } + } + + cout << "client test finished!" << endl; +} diff --git a/client/examples/first.cpp b/client/examples/first.cpp new file mode 100644 index 0000000..f3b654f --- /dev/null +++ b/client/examples/first.cpp @@ -0,0 +1,85 @@ +// first.cpp + +/* Copyright 2009 10gen Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * this is a good first example of how to use mongo from c++ + */ + +#include <iostream> + +#include "client/dbclient.h" + +using namespace std; + +void insert( mongo::DBClientConnection & conn , const char * name , int num ) { + mongo::BSONObjBuilder obj; + obj.append( "name" , name ); + obj.append( "num" , num ); + conn.insert( "test.people" , obj.obj() ); +} + +int main( int argc, const char **argv ) { + + const char *port = "27017"; + if ( argc != 1 ) { + if ( argc != 3 ) + throw -12; + port = argv[ 2 ]; + } + + mongo::DBClientConnection conn; + string errmsg; + if ( ! conn.connect( string( "127.0.0.1:" ) + port , errmsg ) ) { + cout << "couldn't connect : " << errmsg << endl; + throw -11; + } + + { // clean up old data from any previous tests + mongo::BSONObjBuilder query; + conn.remove( "test.people" , query.obj() ); + } + + insert( conn , "eliot" , 15 ); + insert( conn , "sara" , 23 ); + + { + mongo::BSONObjBuilder query; + auto_ptr<mongo::DBClientCursor> cursor = conn.query( "test.people" , query.obj() ); + cout << "using cursor" << endl; + while ( cursor->more() ) { + mongo::BSONObj obj = cursor->next(); + cout << "\t" << obj.jsonString() << endl; + } + + } + + { + mongo::BSONObjBuilder query; + query.append( "name" , "eliot" ); + mongo::BSONObj res = conn.findOne( "test.people" , query.obj() ); + cout << res.isEmpty() << "\t" << res.jsonString() << endl; + } + + { + mongo::BSONObjBuilder query; + query.append( "name" , "asd" ); + mongo::BSONObj res = conn.findOne( "test.people" , query.obj() ); + cout << res.isEmpty() << "\t" << res.jsonString() << endl; + } + + +} diff --git a/client/examples/second.cpp b/client/examples/second.cpp new file mode 100644 index 0000000..68eafaa --- /dev/null +++ b/client/examples/second.cpp @@ -0,0 +1,56 @@ +// second.cpp + +/* Copyright 2009 10gen Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <iostream> + +#include "client/dbclient.h" + +using namespace std; +using namespace mongo; + +int main( int argc, const char **argv ) { + + const char *port = "27017"; + if ( argc != 1 ) { + if ( argc != 3 ) + throw -12; + port = argv[ 2 ]; + } + + DBClientConnection conn; + string errmsg; + if ( ! conn.connect( string( "127.0.0.1:" ) + port , errmsg ) ) { + cout << "couldn't connect : " << errmsg << endl; + throw -11; + } + + const char * ns = "test.second"; + + conn.remove( ns , BSONObj() ); + + conn.insert( ns , BSON( "name" << "eliot" << "num" << 17 ) ); + conn.insert( ns , BSON( "name" << "sara" << "num" << 24 ) ); + + auto_ptr<DBClientCursor> cursor = conn.query( ns , BSONObj() ); + cout << "using cursor" << endl; + while ( cursor->more() ) { + BSONObj obj = cursor->next(); + cout << "\t" << obj.jsonString() << endl; + } + + conn.ensureIndex( ns , BSON( "name" << 1 << "num" << -1 ) ); +} diff --git a/client/examples/tail.cpp b/client/examples/tail.cpp new file mode 100644 index 0000000..e844b32 --- /dev/null +++ b/client/examples/tail.cpp @@ -0,0 +1,55 @@ +// tail.cpp + +/* Copyright 2009 10gen Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* example of using a tailable cursor */ + +#include "../../client/dbclient.h" +#include "../../util/goodies.h" + +using namespace mongo; + +void foo() { } + +/* "tail" the specified namespace, outputting elements as they are added. + _id values must be inserted in increasing order for this to work. (Some other + field could also be used.) + + Note: one could use a capped collection and $natural order to do something + similar, using sort({$natural:1}), and then not need to worry about + _id's being in order. +*/ +void tail(DBClientBase& conn, const char *ns) { + conn.ensureIndex(ns, fromjson("{_id:1}")); + BSONElement lastId; + Query query = Query().sort("_id"); + while( 1 ) { + auto_ptr<DBClientCursor> c = conn.query(ns, query, 0, 0, 0, Option_CursorTailable); + while( 1 ) { + if( !c->more() ) { + if( c->isDead() ) { + // we need to requery + break; + } + sleepsecs(1); + } + BSONObj o = c->next(); + lastId = o["_id"]; + cout << o.toString() << endl; + } + query = QUERY( "_id" << GT << lastId ).sort("_id"); + } +} diff --git a/client/examples/tutorial.cpp b/client/examples/tutorial.cpp new file mode 100644 index 0000000..28e1b27 --- /dev/null +++ b/client/examples/tutorial.cpp @@ -0,0 +1,67 @@ +//tutorial.cpp + +/* Copyright 2009 10gen Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <iostream> +#include "../../client/dbclient.h" + +// g++ tutorial.cpp -lmongoclient -lboost_thread -lboost_filesystem -o tutorial + +using namespace mongo; + +void printIfAge(DBClientConnection& c, int age) { + auto_ptr<DBClientCursor> cursor = c.query("tutorial.persons", QUERY( "age" << age ).sort("name") ); + while( cursor->more() ) { + BSONObj p = cursor->next(); + cout << p.getStringField("name") << endl; + } +} + +void run() { + DBClientConnection c; + c.connect("localhost"); //"192.168.58.1"); + cout << "connected ok" << endl; + BSONObj p = BSON( "name" << "Joe" << "age" << 33 ); + c.insert("tutorial.persons", p); + p = BSON( "name" << "Jane" << "age" << 40 ); + c.insert("tutorial.persons", p); + p = BSON( "name" << "Abe" << "age" << 33 ); + c.insert("tutorial.persons", p); + p = BSON( "name" << "Samantha" << "age" << 21 << "city" << "Los Angeles" << "state" << "CA" ); + c.insert("tutorial.persons", p); + + c.ensureIndex("tutorial.persons", fromjson("{age:1}")); + + cout << "count:" << c.count("tutorial.persons") << endl; + + auto_ptr<DBClientCursor> cursor = c.query("tutorial.persons", BSONObj()); + while( cursor->more() ) { + cout << cursor->next().toString() << endl; + } + + cout << "\nprintifage:\n"; + printIfAge(c, 33); +} + +int main() { + try { + run(); + } + catch( DBException &e ) { + cout << "caught " << e.what() << endl; + } + return 0; +} diff --git a/client/examples/whereExample.cpp b/client/examples/whereExample.cpp new file mode 100644 index 0000000..a26d921 --- /dev/null +++ b/client/examples/whereExample.cpp @@ -0,0 +1,68 @@ +// whereExample.cpp + +/* Copyright 2009 10gen Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <iostream> + +#include "client/dbclient.h" + +using namespace std; +using namespace mongo; + +int main( int argc, const char **argv ) { + + const char *port = "27017"; + if ( argc != 1 ) { + if ( argc != 3 ) + throw -12; + port = argv[ 2 ]; + } + + DBClientConnection conn; + string errmsg; + if ( ! conn.connect( string( "127.0.0.1:" ) + port , errmsg ) ) { + cout << "couldn't connect : " << errmsg << endl; + throw -11; + } + + const char * ns = "test.where"; + + conn.remove( ns , BSONObj() ); + + conn.insert( ns , BSON( "name" << "eliot" << "num" << 17 ) ); + conn.insert( ns , BSON( "name" << "sara" << "num" << 24 ) ); + + auto_ptr<DBClientCursor> cursor = conn.query( ns , BSONObj() ); + + while ( cursor->more() ) { + BSONObj obj = cursor->next(); + cout << "\t" << obj.jsonString() << endl; + } + + cout << "now using $where" << endl; + + Query q = Query("{}").where("this.name == name" , BSON( "name" << "sara" )); + + cursor = conn.query( ns , q ); + + int num = 0; + while ( cursor->more() ) { + BSONObj obj = cursor->next(); + cout << "\t" << obj.jsonString() << endl; + num++; + } + assert( num == 1 ); +} diff --git a/client/gridfs.cpp b/client/gridfs.cpp new file mode 100644 index 0000000..892ec6e --- /dev/null +++ b/client/gridfs.cpp @@ -0,0 +1,233 @@ +// gridfs.cpp + +/* Copyright 2009 10gen + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "../stdafx.h" +#include <fcntl.h> +#include <utility> + +#include "gridfs.h" +#include <boost/smart_ptr.hpp> + +#if defined(_WIN32) +#include <io.h> +#endif + +#ifndef MIN +#define MIN(a,b) ( (a) < (b) ? (a) : (b) ) +#endif + +namespace mongo { + + const unsigned DEFAULT_CHUNK_SIZE = 256 * 1024; + + Chunk::Chunk( BSONObj o ){ + _data = o; + } + + Chunk::Chunk( BSONObj fileObject , int chunkNumber , const char * data , int len ){ + BSONObjBuilder b; + b.appendAs( fileObject["_id"] , "files_id" ); + b.append( "n" , chunkNumber ); + b.appendBinDataArray( "data" , data , len ); + _data = b.obj(); + } + + + GridFS::GridFS( DBClientBase& client , const string& dbName , const string& prefix ) : _client( client ) , _dbName( dbName ) , _prefix( prefix ){ + _filesNS = dbName + "." + prefix + ".files"; + _chunksNS = dbName + "." + prefix + ".chunks"; + + + client.ensureIndex( _filesNS , BSON( "filename" << 1 ) ); + client.ensureIndex( _chunksNS , BSON( "files_id" << 1 << "n" << 1 ) ); + } + + GridFS::~GridFS(){ + + } + + BSONObj GridFS::storeFile( const char* data , size_t length , const string& remoteName , const string& contentType){ + massert( 10279 , "large files not yet implemented", length <= 0xffffffff); + char const * const end = data + length; + + OID id; + id.init(); + BSONObj idObj = BSON("_id" << id); + + int chunkNumber = 0; + while (data < end){ + int chunkLen = MIN(DEFAULT_CHUNK_SIZE, (unsigned)(end-data)); + Chunk c(idObj, chunkNumber, data, chunkLen); + _client.insert( _chunksNS.c_str() , c._data ); + + chunkNumber++; + data += chunkLen; + } + + return insertFile(remoteName, id, length, contentType); + } + + + BSONObj GridFS::storeFile( const string& fileName , const string& remoteName , const string& contentType){ + uassert( 10012 , "file doesn't exist" , fileName == "-" || boost::filesystem::exists( fileName ) ); + + FILE* fd; + if (fileName == "-") + fd = stdin; + else + fd = fopen( fileName.c_str() , "rb" ); + uassert( 10013 , "error opening file", fd); + + OID id; + id.init(); + BSONObj idObj = BSON("_id" << id); + + int chunkNumber = 0; + gridfs_offset length = 0; + while (!feof(fd)){ + boost::scoped_array<char>buf (new char[DEFAULT_CHUNK_SIZE]); + char* bufPos = buf.get(); + unsigned int chunkLen = 0; // how much in the chunk now + while(chunkLen != DEFAULT_CHUNK_SIZE && !feof(fd)){ + int readLen = fread(bufPos, 1, DEFAULT_CHUNK_SIZE - chunkLen, fd); + chunkLen += readLen; + bufPos += readLen; + + assert(chunkLen <= DEFAULT_CHUNK_SIZE); + } + + Chunk c(idObj, chunkNumber, buf.get(), chunkLen); + _client.insert( _chunksNS.c_str() , c._data ); + + length += chunkLen; + chunkNumber++; + } + + if (fd != stdin) + fclose( fd ); + + massert( 10280 , "large files not yet implemented", length <= 0xffffffff); + + return insertFile((remoteName.empty() ? fileName : remoteName), id, length, contentType); + } + + BSONObj GridFS::insertFile(const string& name, const OID& id, unsigned length, const string& contentType){ + + BSONObj res; + if ( ! _client.runCommand( _dbName.c_str() , BSON( "filemd5" << id << "root" << _prefix ) , res ) ) + throw UserException( 9008 , "filemd5 failed" ); + + BSONObjBuilder file; + file << "_id" << id + << "filename" << name + << "length" << (unsigned) length + << "chunkSize" << DEFAULT_CHUNK_SIZE + << "uploadDate" << DATENOW + << "md5" << res["md5"] + ; + + if (!contentType.empty()) + file << "contentType" << contentType; + + BSONObj ret = file.obj(); + _client.insert(_filesNS.c_str(), ret); + + return ret; + } + + void GridFS::removeFile( const string& fileName ){ + auto_ptr<DBClientCursor> files = _client.query( _filesNS , BSON( "filename" << fileName ) ); + while (files->more()){ + BSONObj file = files->next(); + BSONElement id = file["_id"]; + _client.remove( _filesNS.c_str() , BSON( "_id" << id ) ); + _client.remove( _chunksNS.c_str() , BSON( "files_id" << id ) ); + } + } + + GridFile::GridFile( GridFS * grid , BSONObj obj ){ + _grid = grid; + _obj = obj; + } + + GridFile GridFS::findFile( const string& fileName ){ + return findFile( BSON( "filename" << fileName ) ); + }; + + GridFile GridFS::findFile( BSONObj query ){ + query = BSON("query" << query << "orderby" << BSON("uploadDate" << -1)); + return GridFile( this , _client.findOne( _filesNS.c_str() , query ) ); + } + + auto_ptr<DBClientCursor> GridFS::list(){ + return _client.query( _filesNS.c_str() , BSONObj() ); + } + + auto_ptr<DBClientCursor> GridFS::list( BSONObj o ){ + return _client.query( _filesNS.c_str() , o ); + } + + BSONObj GridFile::getMetadata(){ + BSONElement meta_element = _obj["metadata"]; + if( meta_element.eoo() ){ + return BSONObj(); + } + + return meta_element.embeddedObject(); + } + + Chunk GridFile::getChunk( int n ){ + _exists(); + BSONObjBuilder b; + b.appendAs( _obj["_id"] , "files_id" ); + b.append( "n" , n ); + + BSONObj o = _grid->_client.findOne( _grid->_chunksNS.c_str() , b.obj() ); + uassert( 10014 , "chunk is empty!" , ! o.isEmpty() ); + return Chunk(o); + } + + gridfs_offset GridFile::write( ostream & out ){ + _exists(); + + const int num = getNumChunks(); + + for ( int i=0; i<num; i++ ){ + Chunk c = getChunk( i ); + + int len; + const char * data = c.data( len ); + out.write( data , len ); + } + + return getContentLength(); + } + + gridfs_offset GridFile::write( const string& where ){ + if (where == "-"){ + return write( cout ); + } else { + ofstream out(where.c_str() , ios::out | ios::binary ); + return write( out ); + } + } + + void GridFile::_exists(){ + uassert( 10015 , "doesn't exists" , exists() ); + } + +} diff --git a/client/gridfs.h b/client/gridfs.h new file mode 100644 index 0000000..3165d5f --- /dev/null +++ b/client/gridfs.h @@ -0,0 +1,203 @@ +/** @file gridfs.h */ + +/* Copyright 2009 10gen Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "dbclient.h" + +namespace mongo { + + typedef unsigned long long gridfs_offset; + + class GridFS; + class GridFile; + + class Chunk { + public: + Chunk( BSONObj data ); + Chunk( BSONObj fileId , int chunkNumber , const char * data , int len ); + + int len(){ + int len; + const char * data = _data["data"].binData( len ); + int * foo = (int*)data; + assert( len - 4 == foo[0] ); + return len - 4; + } + + const char * data( int & len ){ + const char * data = _data["data"].binData( len ); + int * foo = (int*)data; + assert( len - 4 == foo[0] ); + + len = len - 4; + return data + 4; + } + + private: + BSONObj _data; + friend class GridFS; + }; + + + /** + this is the main entry point into the mongo grid fs + */ + class GridFS{ + public: + /** + * @param client - db connection + * @param dbName - root database name + * @param prefix - if you want your data somewhere besides <dbname>.fs + */ + GridFS( DBClientBase& client , const string& dbName , const string& prefix="fs" ); + ~GridFS(); + + /** + * puts the file reference by fileName into the db + * @param fileName local filename relative to process + * @param remoteName optional filename to use for file stored in GridFS + * (default is to use fileName parameter) + * @param contentType optional MIME type for this object. + * (default is to omit) + * @return the file object + */ + BSONObj storeFile( const string& fileName , const string& remoteName="" , const string& contentType=""); + + /** + * puts the file represented by data into the db + * @param data pointer to buffer to store in GridFS + * @param length length of buffer + * @param remoteName optional filename to use for file stored in GridFS + * (default is to use fileName parameter) + * @param contentType optional MIME type for this object. + * (default is to omit) + * @return the file object + */ + BSONObj storeFile( const char* data , size_t length , const string& remoteName , const string& contentType=""); + /** + * removes file referenced by fileName from the db + * @param fileName filename (in GridFS) of the file to remove + * @return the file object + */ + void removeFile( const string& fileName ); + + /** + * returns a file object matching the query + */ + GridFile findFile( BSONObj query ); + + /** + * equiv to findFile( { filename : filename } ) + */ + GridFile findFile( const string& fileName ); + + /** + * convenience method to get all the files + */ + auto_ptr<DBClientCursor> list(); + + /** + * convenience method to get all the files with a filter + */ + auto_ptr<DBClientCursor> list( BSONObj query ); + + private: + DBClientBase& _client; + string _dbName; + string _prefix; + string _filesNS; + string _chunksNS; + + // insert fileobject. All chunks must be in DB. + BSONObj insertFile(const string& name, const OID& id, unsigned length, const string& contentType); + + friend class GridFile; + }; + + /** + wrapper for a file stored in the Mongo database + */ + class GridFile { + public: + /** + * @return whether or not this file exists + * findFile will always return a GriFile, so need to check this + */ + bool exists(){ + return ! _obj.isEmpty(); + } + + string getFilename(){ + return _obj["filename"].str(); + } + + int getChunkSize(){ + return (int)(_obj["chunkSize"].number()); + } + + gridfs_offset getContentLength(){ + return (gridfs_offset)(_obj["length"].number()); + } + + string getContentType(){ + return _obj["contentType"].valuestr(); + } + + Date_t getUploadDate(){ + return _obj["uploadDate"].date(); + } + + string getMD5(){ + return _obj["md5"].str(); + } + + BSONElement getFileField( const string& name ){ + return _obj[name]; + } + + BSONObj getMetadata(); + + int getNumChunks(){ + return (int) ceil( (double)getContentLength() / (double)getChunkSize() ); + } + + Chunk getChunk( int n ); + + /** + write the file to the output stream + */ + gridfs_offset write( ostream & out ); + + /** + write the file to this filename + */ + gridfs_offset write( const string& where ); + + private: + GridFile( GridFS * grid , BSONObj obj ); + + void _exists(); + + GridFS * _grid; + BSONObj _obj; + + friend class GridFS; + }; +} + + diff --git a/client/model.cpp b/client/model.cpp new file mode 100644 index 0000000..3978105 --- /dev/null +++ b/client/model.cpp @@ -0,0 +1,97 @@ +// model.cpp + +/* Copyright 2009 10gen + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "stdafx.h" +#include "model.h" +#include "connpool.h" + +namespace mongo { + + bool Model::load(BSONObj& query){ + ScopedDbConnection conn( modelServer() ); + + BSONObj b = conn->findOne(getNS(), query); + conn.done(); + + if ( b.isEmpty() ) + return false; + + unserialize(b); + _id = b["_id"].wrap().getOwned(); + return true; + } + + void Model::remove( bool safe ){ + uassert( 10016 , "_id isn't set - needed for remove()" , _id["_id"].type() ); + + ScopedDbConnection conn( modelServer() ); + conn->remove( getNS() , _id ); + + string errmsg = ""; + if ( safe ) + errmsg = conn->getLastError(); + + conn.done(); + + if ( safe && errmsg.size() ) + throw UserException( 9002 , (string)"error on Model::remove: " + errmsg ); + } + + void Model::save( bool safe ){ + ScopedDbConnection conn( modelServer() ); + + BSONObjBuilder b; + serialize( b ); + + if ( _id.isEmpty() ){ + OID oid; + oid.init(); + b.appendOID( "_id" , &oid ); + + BSONObj o = b.obj(); + conn->insert( getNS() , o ); + _id = o["_id"].wrap().getOwned(); + + log(4) << "inserted new model " << getNS() << " " << o << endl; + } + else { + BSONElement id = _id["_id"]; + b.append( id ); + + BSONObjBuilder qb; + qb.append( id ); + + BSONObj q = qb.obj(); + BSONObj o = b.obj(); + + log(4) << "updated old model" << getNS() << " " << q << " " << o << endl; + + conn->update( getNS() , q , o ); + + } + + string errmsg = ""; + if ( safe ) + errmsg = conn->getLastError(); + + conn.done(); + + if ( safe && errmsg.size() ) + throw UserException( 9003 , (string)"error on Model::save: " + errmsg ); + } + +} // namespace mongo diff --git a/client/model.h b/client/model.h new file mode 100644 index 0000000..f3a63ad --- /dev/null +++ b/client/model.h @@ -0,0 +1,57 @@ +/** @file model.h */ + +/* Copyright 2009 10gen + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "dbclient.h" + +namespace mongo { + + /** Model is a base class for defining objects which are serializable to the Mongo + database via the database driver. + + Definition + Your serializable class should inherit from Model and implement the abstract methods + below. + + Loading + To load, first construct an (empty) object. Then call load(). Do not load an object + more than once. + */ + class Model { + public: + Model() { } + virtual ~Model() { } + + virtual const char * getNS() = 0; + virtual void serialize(BSONObjBuilder& to) = 0; + virtual void unserialize(const BSONObj& from) = 0; + + virtual string modelServer() = 0; + + /** Load a single object. + @return true if successful. + */ + virtual bool load(BSONObj& query); + virtual void save( bool safe=false ); + virtual void remove( bool safe=false ); + + protected: + BSONObj _id; + }; + +} // namespace mongo diff --git a/client/parallel.cpp b/client/parallel.cpp new file mode 100644 index 0000000..449f436 --- /dev/null +++ b/client/parallel.cpp @@ -0,0 +1,259 @@ +// parallel.cpp + +#include "stdafx.h" +#include "parallel.h" +#include "connpool.h" +#include "../db/queryutil.h" +#include "../db/dbmessage.h" +#include "../s/util.h" + +namespace mongo { + + // -------- ClusteredCursor ----------- + + ClusteredCursor::ClusteredCursor( QueryMessage& q ){ + _ns = q.ns; + _query = q.query.copy(); + _options = q.queryOptions; + if ( q.fields.get() ) + _fields = q.fields->getSpec(); + _done = false; + } + + ClusteredCursor::ClusteredCursor( const string& ns , const BSONObj& q , int options , const BSONObj& fields ){ + _ns = ns; + _query = q.getOwned(); + _options = options; + _fields = fields.getOwned(); + _done = false; + } + + ClusteredCursor::~ClusteredCursor(){ + _done = true; // just in case + } + + auto_ptr<DBClientCursor> ClusteredCursor::query( const string& server , int num , BSONObj extra ){ + uassert( 10017 , "cursor already done" , ! _done ); + + BSONObj q = _query; + if ( ! extra.isEmpty() ){ + q = concatQuery( q , extra ); + } + + ScopedDbConnection conn( server ); + checkShardVersion( conn.conn() , _ns ); + + log(5) << "ClusteredCursor::query server:" << server << " ns:" << _ns << " query:" << q << " num:" << num << " _fields:" << _fields << " options: " << _options << endl; + auto_ptr<DBClientCursor> cursor = conn->query( _ns.c_str() , q , num , 0 , ( _fields.isEmpty() ? 0 : &_fields ) , _options ); + if ( cursor->hasResultFlag( QueryResult::ResultFlag_ShardConfigStale ) ) + throw StaleConfigException( _ns , "ClusteredCursor::query" ); + + conn.done(); + return cursor; + } + + BSONObj ClusteredCursor::concatQuery( const BSONObj& query , const BSONObj& extraFilter ){ + if ( ! query.hasField( "query" ) ) + return _concatFilter( query , extraFilter ); + + BSONObjBuilder b; + BSONObjIterator i( query ); + while ( i.more() ){ + BSONElement e = i.next(); + + if ( strcmp( e.fieldName() , "query" ) ){ + b.append( e ); + continue; + } + + b.append( "query" , _concatFilter( e.embeddedObjectUserCheck() , extraFilter ) ); + } + return b.obj(); + } + + BSONObj ClusteredCursor::_concatFilter( const BSONObj& filter , const BSONObj& extra ){ + BSONObjBuilder b; + b.appendElements( filter ); + b.appendElements( extra ); + return b.obj(); + // TODO: should do some simplification here if possibl ideally + } + + + // -------- SerialServerClusteredCursor ----------- + + SerialServerClusteredCursor::SerialServerClusteredCursor( set<ServerAndQuery> servers , QueryMessage& q , int sortOrder) : ClusteredCursor( q ){ + for ( set<ServerAndQuery>::iterator i = servers.begin(); i!=servers.end(); i++ ) + _servers.push_back( *i ); + + if ( sortOrder > 0 ) + sort( _servers.begin() , _servers.end() ); + else if ( sortOrder < 0 ) + sort( _servers.rbegin() , _servers.rend() ); + + _serverIndex = 0; + } + + bool SerialServerClusteredCursor::more(){ + if ( _current.get() && _current->more() ) + return true; + + if ( _serverIndex >= _servers.size() ){ + return false; + } + + ServerAndQuery& sq = _servers[_serverIndex++]; + + _current = query( sq._server , 0 , sq._extra ); + if ( _current->more() ) + return true; + + // this sq has nothing, so keep looking + return more(); + } + + BSONObj SerialServerClusteredCursor::next(){ + uassert( 10018 , "no more items" , more() ); + return _current->next(); + } + + // -------- ParallelSortClusteredCursor ----------- + + ParallelSortClusteredCursor::ParallelSortClusteredCursor( set<ServerAndQuery> servers , QueryMessage& q , + const BSONObj& sortKey ) + : ClusteredCursor( q ) , _servers( servers ){ + _sortKey = sortKey.getOwned(); + _init(); + } + + ParallelSortClusteredCursor::ParallelSortClusteredCursor( set<ServerAndQuery> servers , const string& ns , + const Query& q , + int options , const BSONObj& fields ) + : ClusteredCursor( ns , q.obj , options , fields ) , _servers( servers ){ + _sortKey = q.getSort().copy(); + _init(); + } + + void ParallelSortClusteredCursor::_init(){ + _numServers = _servers.size(); + _cursors = new auto_ptr<DBClientCursor>[_numServers]; + _nexts = new BSONObj[_numServers]; + + // TODO: parellize + int num = 0; + for ( set<ServerAndQuery>::iterator i = _servers.begin(); i!=_servers.end(); i++ ){ + const ServerAndQuery& sq = *i; + _cursors[num++] = query( sq._server , 0 , sq._extra ); + } + + } + + ParallelSortClusteredCursor::~ParallelSortClusteredCursor(){ + delete [] _cursors; + delete [] _nexts; + } + + bool ParallelSortClusteredCursor::more(){ + for ( int i=0; i<_numServers; i++ ){ + if ( ! _nexts[i].isEmpty() ) + return true; + + if ( _cursors[i].get() && _cursors[i]->more() ) + return true; + } + return false; + } + + BSONObj ParallelSortClusteredCursor::next(){ + advance(); + + BSONObj best = BSONObj(); + int bestFrom = -1; + + for ( int i=0; i<_numServers; i++){ + if ( _nexts[i].isEmpty() ) + continue; + + if ( best.isEmpty() ){ + best = _nexts[i]; + bestFrom = i; + continue; + } + + int comp = best.woSortOrder( _nexts[i] , _sortKey ); + if ( comp < 0 ) + continue; + + best = _nexts[i]; + bestFrom = i; + } + + uassert( 10019 , "no more elements" , ! best.isEmpty() ); + _nexts[bestFrom] = BSONObj(); + + return best; + } + + void ParallelSortClusteredCursor::advance(){ + for ( int i=0; i<_numServers; i++ ){ + + if ( ! _nexts[i].isEmpty() ){ + // already have a good object there + continue; + } + + if ( ! _cursors[i]->more() ){ + // cursor is dead, oh well + continue; + } + + _nexts[i] = _cursors[i]->next(); + } + + } + + // ----------------- + // ---- Future ----- + // ----------------- + + Future::CommandResult::CommandResult( const string& server , const string& db , const BSONObj& cmd ){ + _server = server; + _db = db; + _cmd = cmd; + _done = false; + } + + bool Future::CommandResult::join(){ + while ( ! _done ) + sleepmicros( 50 ); + return _ok; + } + + void Future::commandThread(){ + assert( _grab ); + shared_ptr<CommandResult> res = *_grab; + _grab = 0; + + ScopedDbConnection conn( res->_server ); + res->_ok = conn->runCommand( res->_db , res->_cmd , res->_res ); + res->_done = true; + } + + shared_ptr<Future::CommandResult> Future::spawnCommand( const string& server , const string& db , const BSONObj& cmd ){ + shared_ptr<Future::CommandResult> res; + res.reset( new Future::CommandResult( server , db , cmd ) ); + + _grab = &res; + + boost::thread thr( Future::commandThread ); + + while ( _grab ) + sleepmicros(2); + + return res; + } + + shared_ptr<Future::CommandResult> * Future::_grab; + + +} diff --git a/client/parallel.h b/client/parallel.h new file mode 100644 index 0000000..5a22624 --- /dev/null +++ b/client/parallel.h @@ -0,0 +1,195 @@ +// parallel.h + +/* Copyright 2009 10gen Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + tools for wokring in parallel/sharded/clustered environment + */ + +#include "../stdafx.h" +#include "dbclient.h" +#include "../db/dbmessage.h" + +namespace mongo { + + /** + * this is a cursor that works over a set of servers + * can be used in serial/paralellel as controlled by sub classes + */ + class ClusteredCursor { + public: + ClusteredCursor( QueryMessage& q ); + ClusteredCursor( const string& ns , const BSONObj& q , int options=0 , const BSONObj& fields=BSONObj() ); + virtual ~ClusteredCursor(); + + virtual bool more() = 0; + virtual BSONObj next() = 0; + + static BSONObj concatQuery( const BSONObj& query , const BSONObj& extraFilter ); + + virtual string type() const = 0; + + protected: + auto_ptr<DBClientCursor> query( const string& server , int num = 0 , BSONObj extraFilter = BSONObj() ); + + static BSONObj _concatFilter( const BSONObj& filter , const BSONObj& extraFilter ); + + string _ns; + BSONObj _query; + int _options; + BSONObj _fields; + + bool _done; + }; + + + /** + * holder for a server address and a query to run + */ + class ServerAndQuery { + public: + ServerAndQuery( const string& server , BSONObj extra = BSONObj() , BSONObj orderObject = BSONObj() ) : + _server( server ) , _extra( extra.getOwned() ) , _orderObject( orderObject.getOwned() ){ + } + + bool operator<( const ServerAndQuery& other ) const{ + if ( ! _orderObject.isEmpty() ) + return _orderObject.woCompare( other._orderObject ) < 0; + + if ( _server < other._server ) + return true; + if ( other._server > _server ) + return false; + return _extra.woCompare( other._extra ) < 0; + } + + string toString() const { + StringBuilder ss; + ss << "server:" << _server << " _extra:" << _extra << " _orderObject:" << _orderObject; + return ss.str(); + } + + operator string() const { + return toString(); + } + + string _server; + BSONObj _extra; + BSONObj _orderObject; + }; + + + /** + * runs a query in serial across any number of servers + * returns all results from 1 server, then the next, etc... + */ + class SerialServerClusteredCursor : public ClusteredCursor { + public: + SerialServerClusteredCursor( set<ServerAndQuery> servers , QueryMessage& q , int sortOrder=0); + virtual bool more(); + virtual BSONObj next(); + virtual string type() const { return "SerialServer"; } + private: + vector<ServerAndQuery> _servers; + unsigned _serverIndex; + + auto_ptr<DBClientCursor> _current; + }; + + + /** + * runs a query in parellel across N servers + * sots + */ + class ParallelSortClusteredCursor : public ClusteredCursor { + public: + ParallelSortClusteredCursor( set<ServerAndQuery> servers , QueryMessage& q , const BSONObj& sortKey ); + ParallelSortClusteredCursor( set<ServerAndQuery> servers , const string& ns , + const Query& q , int options=0, const BSONObj& fields=BSONObj() ); + virtual ~ParallelSortClusteredCursor(); + virtual bool more(); + virtual BSONObj next(); + virtual string type() const { return "ParallelSort"; } + private: + void _init(); + + void advance(); + + int _numServers; + set<ServerAndQuery> _servers; + BSONObj _sortKey; + + auto_ptr<DBClientCursor> * _cursors; + BSONObj * _nexts; + }; + + /** + * tools for doing asynchronous operations + * right now uses underlying sync network ops and uses another thread + * should be changed to use non-blocking io + */ + class Future { + public: + class CommandResult { + public: + + string getServer() const { return _server; } + + bool isDone() const { return _done; } + + bool ok() const { + assert( _done ); + return _ok; + } + + BSONObj result() const { + assert( _done ); + return _res; + } + + /** + blocks until command is done + returns ok() + */ + bool join(); + + private: + + CommandResult( const string& server , const string& db , const BSONObj& cmd ); + + string _server; + string _db; + BSONObj _cmd; + + boost::thread _thr; + + BSONObj _res; + bool _done; + bool _ok; + + friend class Future; + }; + + static void commandThread(); + + static shared_ptr<CommandResult> spawnCommand( const string& server , const string& db , const BSONObj& cmd ); + + private: + static shared_ptr<CommandResult> * _grab; + }; + + +} diff --git a/client/syncclusterconnection.cpp b/client/syncclusterconnection.cpp new file mode 100644 index 0000000..b942709 --- /dev/null +++ b/client/syncclusterconnection.cpp @@ -0,0 +1,165 @@ +// syncclusterconnection.cpp + +#include "stdafx.h" +#include "syncclusterconnection.h" + +// error codes 8000-8009 + +namespace mongo { + + SyncCluterConnection::SyncCluterConnection( string commaSeperated ){ + string::size_type idx; + while ( ( idx = commaSeperated.find( ',' ) ) != string::npos ){ + string h = commaSeperated.substr( 0 , idx ); + commaSeperated = commaSeperated.substr( idx + 1 ); + _connect( h ); + } + _connect( commaSeperated ); + uassert( 8004 , "SyncCluterConnection needs 3 servers" , _conns.size() == 3 ); + } + + SyncCluterConnection::SyncCluterConnection( string a , string b , string c ){ + // connect to all even if not working + _connect( a ); + _connect( b ); + _connect( c ); + } + + SyncCluterConnection::~SyncCluterConnection(){ + for ( size_t i=0; i<_conns.size(); i++ ) + delete _conns[i]; + _conns.clear(); + } + + bool SyncCluterConnection::prepare( string& errmsg ){ + return fsync( errmsg ); + } + + bool SyncCluterConnection::fsync( string& errmsg ){ + bool ok = true; + errmsg = ""; + for ( size_t i=0; i<_conns.size(); i++ ){ + BSONObj res; + try { + if ( _conns[i]->simpleCommand( "admin" , 0 , "fsync" ) ) + continue; + } + catch ( std::exception& e ){ + errmsg += e.what(); + } + catch ( ... ){ + } + ok = false; + errmsg += _conns[i]->toString() + ":" + res.toString(); + } + return ok; + } + + void SyncCluterConnection::_checkLast(){ + vector<BSONObj> all; + vector<string> errors; + + for ( size_t i=0; i<_conns.size(); i++ ){ + BSONObj res; + string err; + try { + if ( ! _conns[i]->runCommand( "admin" , BSON( "getlasterror" << 1 << "fsync" << 1 ) , res ) ) + err = "cmd failed: "; + } + catch ( std::exception& e ){ + err += e.what(); + } + catch ( ... ){ + err += "unknown failure"; + } + all.push_back( res ); + errors.push_back( err ); + } + + assert( all.size() == errors.size() && all.size() == _conns.size() ); + + stringstream err; + bool ok = true; + + for ( size_t i = 0; i<_conns.size(); i++ ){ + BSONObj res = all[i]; + if ( res["ok"].trueValue() && res["fsyncFiles"].numberInt() > 0 ) + continue; + ok = false; + err << _conns[i]->toString() << ": " << res << " " << errors[i]; + } + + if ( ok ) + return; + throw UserException( 8001 , (string)"SyncCluterConnection write op failed: " + err.str() ); + } + + void SyncCluterConnection::_connect( string host ){ + log() << "SyncCluterConnection connecting to: " << host << endl; + DBClientConnection * c = new DBClientConnection( true ); + string errmsg; + if ( ! c->connect( host , errmsg ) ) + log() << "SyncCluterConnection connect fail to: " << host << " errmsg: " << errmsg << endl; + _conns.push_back( c ); + } + + auto_ptr<DBClientCursor> SyncCluterConnection::query(const string &ns, Query query, int nToReturn, int nToSkip, + const BSONObj *fieldsToReturn, int queryOptions){ + + uassert( 10021 , "$cmd not support yet in SyncCluterConnection::query" , ns.find( "$cmd" ) == string::npos ); + + for ( size_t i=0; i<_conns.size(); i++ ){ + try { + auto_ptr<DBClientCursor> cursor = + _conns[i]->query( ns , query , nToReturn , nToSkip , fieldsToReturn , queryOptions ); + if ( cursor.get() ) + return cursor; + log() << "query failed to: " << _conns[i]->toString() << " no data" << endl; + } + catch ( ... ){ + log() << "query failed to: " << _conns[i]->toString() << " exception" << endl; + } + } + throw UserException( 8002 , "all servers down!" ); + } + + auto_ptr<DBClientCursor> SyncCluterConnection::getMore( const string &ns, long long cursorId, int nToReturn, int options ){ + uassert( 10022 , "SyncCluterConnection::getMore not supported yet" , 0); + auto_ptr<DBClientCursor> c; + return c; + } + + void SyncCluterConnection::insert( const string &ns, BSONObj obj ){ + string errmsg; + if ( ! prepare( errmsg ) ) + throw UserException( 8003 , (string)"SyncCluterConnection::insert prepare failed: " + errmsg ); + + for ( size_t i=0; i<_conns.size(); i++ ){ + _conns[i]->insert( ns , obj ); + } + + _checkLast(); + } + + void SyncCluterConnection::insert( const string &ns, const vector< BSONObj >& v ){ + uassert( 10023 , "SyncCluterConnection bulk insert not implemented" , 0); + } + + void SyncCluterConnection::remove( const string &ns , Query query, bool justOne ){ assert(0); } + + void SyncCluterConnection::update( const string &ns , Query query , BSONObj obj , bool upsert , bool multi ){ assert(0); } + + string SyncCluterConnection::toString(){ + stringstream ss; + ss << "SyncCluterConnection ["; + for ( size_t i=0; i<_conns.size(); i++ ){ + if ( i > 0 ) + ss << ","; + ss << _conns[i]->toString(); + } + ss << "]"; + return ss.str(); + } + + +} diff --git a/client/syncclusterconnection.h b/client/syncclusterconnection.h new file mode 100644 index 0000000..c14a9bb --- /dev/null +++ b/client/syncclusterconnection.h @@ -0,0 +1,57 @@ +// syncclusterconnection.h + +#include "../stdafx.h" +#include "dbclient.h" + +namespace mongo { + + /** + * this is a connection to a cluster of servers that operate as one + * for super high durability + */ + class SyncCluterConnection : public DBClientWithCommands { + public: + /** + * @param commaSeperated should be 3 hosts comma seperated + */ + SyncCluterConnection( string commaSeperated ); + SyncCluterConnection( string a , string b , string c ); + ~SyncCluterConnection(); + + + /** + * @return true if all servers are up and ready for writes + */ + bool prepare( string& errmsg ); + + /** + * runs fsync on all servers + */ + bool fsync( string& errmsg ); + + // --- from DBClientInterface + + virtual auto_ptr<DBClientCursor> query(const string &ns, Query query, int nToReturn, int nToSkip, + const BSONObj *fieldsToReturn, int queryOptions); + + virtual auto_ptr<DBClientCursor> getMore( const string &ns, long long cursorId, int nToReturn, int options ); + + virtual void insert( const string &ns, BSONObj obj ); + + virtual void insert( const string &ns, const vector< BSONObj >& v ); + + virtual void remove( const string &ns , Query query, bool justOne ); + + virtual void update( const string &ns , Query query , BSONObj obj , bool upsert , bool multi ); + + virtual string toString(); + private: + + void _checkLast(); + + void _connect( string host ); + vector<DBClientConnection*> _conns; + }; + + +}; |