summaryrefslogtreecommitdiff
path: root/client/dbclient.cpp
diff options
context:
space:
mode:
authorAntonin Kral <a.kral@bobek.cz>2011-03-17 00:05:43 +0100
committerAntonin Kral <a.kral@bobek.cz>2011-03-17 00:05:43 +0100
commit582fc32574a3b158c81e49cb00e6ae59205e66ba (patch)
treeac64a3243e0d2121709f685695247052858115c8 /client/dbclient.cpp
parent2761bffa96595ac1698d86bbc2e95ebb0d4d6e93 (diff)
downloadmongodb-582fc32574a3b158c81e49cb00e6ae59205e66ba.tar.gz
Imported Upstream version 1.8.0
Diffstat (limited to 'client/dbclient.cpp')
-rw-r--r--client/dbclient.cpp587
1 files changed, 229 insertions, 358 deletions
diff --git a/client/dbclient.cpp b/client/dbclient.cpp
index aa9b7ae..b4214ab 100644
--- a/client/dbclient.cpp
+++ b/client/dbclient.cpp
@@ -31,8 +31,41 @@
namespace mongo {
+ void ConnectionString::_fillServers( string s ) {
+
+ {
+ string::size_type idx = s.find( '/' );
+ if ( idx != string::npos ) {
+ _setName = s.substr( 0 , idx );
+ s = s.substr( idx + 1 );
+ _type = SET;
+ }
+ }
+
+ string::size_type idx;
+ while ( ( idx = s.find( ',' ) ) != string::npos ) {
+ _servers.push_back( s.substr( 0 , idx ) );
+ s = s.substr( idx + 1 );
+ }
+ _servers.push_back( s );
+
+ }
+
+ void ConnectionString::_finishInit() {
+ stringstream ss;
+ if ( _type == SET )
+ ss << _setName << "/";
+ for ( unsigned i=0; i<_servers.size(); i++ ) {
+ if ( i > 0 )
+ ss << ",";
+ ss << _servers[i].toString();
+ }
+ _string = ss.str();
+ }
+
+
DBClientBase* ConnectionString::connect( string& errmsg ) const {
- switch ( _type ){
+ switch ( _type ) {
case MASTER: {
DBClientConnection * c = new DBClientConnection(true);
log(1) << "creating new connection to:" << _servers[0] << endl;
@@ -42,11 +75,11 @@ namespace mongo {
}
return c;
}
-
- case PAIR:
+
+ case PAIR:
case SET: {
DBClientReplicaSet * set = new DBClientReplicaSet( _setName , _servers );
- if( ! set->connect() ){
+ if( ! set->connect() ) {
delete set;
errmsg = "connect failed to set ";
errmsg += toString();
@@ -54,7 +87,7 @@ namespace mongo {
}
return set;
}
-
+
case SYNC: {
// TODO , don't copy
list<HostAndPort> l;
@@ -62,40 +95,58 @@ namespace mongo {
l.push_back( _servers[i] );
return new SyncClusterConnection( l );
}
-
+
case INVALID:
throw UserException( 13421 , "trying to connect to invalid ConnectionString" );
break;
}
-
+
assert( 0 );
return 0;
}
- ConnectionString ConnectionString::parse( const string& host , string& errmsg ){
-
+ ConnectionString ConnectionString::parse( const string& host , string& errmsg ) {
+
string::size_type i = host.find( '/' );
- if ( i != string::npos ){
+ if ( i != string::npos && i != 0) {
// replica set
return ConnectionString( SET , host.substr( i + 1 ) , host.substr( 0 , i ) );
}
- int numCommas = DBClientBase::countCommas( host );
-
- if( numCommas == 0 )
+ int numCommas = str::count( host , ',' );
+
+ if( numCommas == 0 )
return ConnectionString( HostAndPort( host ) );
-
- if ( numCommas == 1 )
+
+ if ( numCommas == 1 )
return ConnectionString( PAIR , host );
if ( numCommas == 2 )
return ConnectionString( SYNC , host );
-
+
errmsg = (string)"invalid hostname [" + host + "]";
return ConnectionString(); // INVALID
}
- Query& Query::where(const string &jscode, BSONObj scope) {
+ string ConnectionString::typeToString( ConnectionType type ) {
+ switch ( type ) {
+ case INVALID:
+ return "invalid";
+ case MASTER:
+ return "master";
+ case PAIR:
+ return "pair";
+ case SET:
+ return "set";
+ case SYNC:
+ return "sync";
+ }
+ assert(0);
+ return "";
+ }
+
+
+ Query& Query::where(const string &jscode, BSONObj scope) {
/* use where() before sort() and hint() and explain(), else this will assert. */
assert( ! isComplex() );
BSONObjBuilder b;
@@ -113,44 +164,44 @@ namespace mongo {
obj = b.obj();
}
- Query& Query::sort(const BSONObj& s) {
+ Query& Query::sort(const BSONObj& s) {
appendComplex( "orderby", s );
- return *this;
+ return *this;
}
Query& Query::hint(BSONObj keyPattern) {
appendComplex( "$hint", keyPattern );
- return *this;
+ return *this;
}
Query& Query::explain() {
appendComplex( "$explain", true );
- return *this;
+ return *this;
}
-
+
Query& Query::snapshot() {
appendComplex( "$snapshot", true );
- return *this;
+ return *this;
}
-
+
Query& Query::minKey( const BSONObj &val ) {
appendComplex( "$min", val );
- return *this;
+ return *this;
}
Query& Query::maxKey( const BSONObj &val ) {
appendComplex( "$max", val );
- return *this;
+ return *this;
}
- bool Query::isComplex( bool * hasDollar ) const{
- if ( obj.hasElement( "query" ) ){
+ bool Query::isComplex( bool * hasDollar ) const {
+ if ( obj.hasElement( "query" ) ) {
if ( hasDollar )
hasDollar[0] = false;
return true;
}
- if ( obj.hasElement( "$query" ) ){
+ if ( obj.hasElement( "$query" ) ) {
if ( hasDollar )
hasDollar[0] = true;
return true;
@@ -158,12 +209,12 @@ namespace mongo {
return false;
}
-
+
BSONObj Query::getFilter() const {
bool hasDollar;
if ( ! isComplex( &hasDollar ) )
return obj;
-
+
return obj.getObjectField( hasDollar ? "$query" : "query" );
}
BSONObj Query::getSort() const {
@@ -182,8 +233,8 @@ namespace mongo {
bool Query::isExplain() const {
return isComplex() && obj.getBoolField( "$explain" );
}
-
- string Query::toString() const{
+
+ string Query::toString() const {
return obj.toString();
}
@@ -203,7 +254,7 @@ namespace mongo {
}
return _cachedAvailableOptions;
}
-
+
inline bool DBClientWithCommands::runCommand(const string &dbname, const BSONObj& cmd, BSONObj &info, int options) {
string ns = dbname + ".$cmd";
info = findOne(ns, cmd, 0 , options);
@@ -222,38 +273,50 @@ namespace mongo {
return runCommand(dbname, b.done(), *info);
}
- unsigned long long DBClientWithCommands::count(const string &_ns, const BSONObj& query, int options) {
- NamespaceString ns(_ns);
- BSONObj cmd = BSON( "count" << ns.coll << "query" << query );
+ unsigned long long DBClientWithCommands::count(const string &myns, const BSONObj& query, int options, int limit, int skip ) {
+ NamespaceString ns(myns);
+ BSONObj cmd = _countCmd( myns , query , options , limit , skip );
BSONObj res;
if( !runCommand(ns.db.c_str(), cmd, res, options) )
uasserted(11010,string("count fails:") + res.toString());
return res["n"].numberLong();
}
+ BSONObj DBClientWithCommands::_countCmd(const string &myns, const BSONObj& query, int options, int limit, int skip ) {
+ NamespaceString ns(myns);
+ BSONObjBuilder b;
+ b.append( "count" , ns.coll );
+ b.append( "query" , query );
+ if ( limit )
+ b.append( "limit" , limit );
+ if ( skip )
+ b.append( "skip" , skip );
+ return b.obj();
+ }
+
BSONObj getlasterrorcmdobj = fromjson("{getlasterror:1}");
- BSONObj DBClientWithCommands::getLastErrorDetailed() {
+ BSONObj DBClientWithCommands::getLastErrorDetailed() {
BSONObj info;
runCommand("admin", getlasterrorcmdobj, info);
- return info;
+ return info;
}
- string DBClientWithCommands::getLastError() {
+ string DBClientWithCommands::getLastError() {
BSONObj info = getLastErrorDetailed();
return getLastErrorString( info );
}
-
- string DBClientWithCommands::getLastErrorString( const BSONObj& info ){
+
+ string DBClientWithCommands::getLastErrorString( const BSONObj& info ) {
BSONElement e = info["err"];
if( e.eoo() ) return "";
if( e.type() == Object ) return e.toString();
- return e.str();
+ return e.str();
}
BSONObj getpreverrorcmdobj = fromjson("{getpreverror:1}");
- BSONObj DBClientWithCommands::getPrevError() {
+ BSONObj DBClientWithCommands::getPrevError() {
BSONObj info;
runCommand("admin", getpreverrorcmdobj, info);
return info;
@@ -261,7 +324,7 @@ namespace mongo {
BSONObj getnoncecmdobj = fromjson("{getnonce:1}");
- string DBClientWithCommands::createPasswordDigest( const string & username , const string & clearTextPassword ){
+ string DBClientWithCommands::createPasswordDigest( const string & username , const string & clearTextPassword ) {
md5digest d;
{
md5_state_t st;
@@ -275,11 +338,9 @@ namespace mongo {
}
bool DBClientWithCommands::auth(const string &dbname, const string &username, const string &password_text, string& errmsg, bool digestPassword) {
- //cout << "TEMP AUTH " << toString() << dbname << ' ' << username << ' ' << password_text << ' ' << digestPassword << endl;
-
- string password = password_text;
- if( digestPassword )
- password = createPasswordDigest( username , password_text );
+ string password = password_text;
+ if( digestPassword )
+ password = createPasswordDigest( username , password_text );
BSONObj info;
string nonce;
@@ -310,8 +371,8 @@ namespace mongo {
b << "key" << digestToString( d );
authCmd = b.done();
}
-
- if( runCommand(dbname, authCmd, info) )
+
+ if( runCommand(dbname, authCmd, info) )
return true;
errmsg = info.toString();
@@ -322,7 +383,7 @@ namespace mongo {
bool DBClientWithCommands::isMaster(bool& isMaster, BSONObj *info) {
BSONObj o;
- if ( info == 0 )
+ if ( info == 0 )
info = &o;
bool ok = runCommand("admin", ismastercmdobj, *info);
isMaster = info->getField("ismaster").trueValue();
@@ -331,7 +392,7 @@ namespace mongo {
bool DBClientWithCommands::createCollection(const string &ns, long long size, bool capped, int max, BSONObj *info) {
BSONObj o;
- if ( info == 0 ) info = &o;
+ if ( info == 0 ) info = &o;
BSONObjBuilder b;
string db = nsToDatabase(ns.c_str());
b.append("create", ns.c_str() + db.length() + 1);
@@ -381,11 +442,11 @@ namespace mongo {
return false;
}
- BSONObj DBClientWithCommands::mapreduce(const string &ns, const string &jsmapf, const string &jsreducef, BSONObj query, const string& outputcolname) {
+ BSONObj DBClientWithCommands::mapreduce(const string &ns, const string &jsmapf, const string &jsreducef, BSONObj query, const string& outputcolname) {
BSONObjBuilder b;
b.append("mapreduce", nsGetCollection(ns));
- b.appendCode("map", jsmapf.c_str());
- b.appendCode("reduce", jsreducef.c_str());
+ b.appendCode("map", jsmapf);
+ b.appendCode("reduce", jsreducef);
if( !query.isEmpty() )
b.append("query", query);
if( !outputcolname.empty() )
@@ -397,7 +458,7 @@ namespace mongo {
bool DBClientWithCommands::eval(const string &dbname, const string &jscode, BSONObj& info, BSONElement& retValue, BSONObj *args) {
BSONObjBuilder b;
- b.appendCode("$eval", jscode.c_str());
+ b.appendCode("$eval", jscode);
if ( args )
b.appendArray("args", *args);
bool ok = runCommand(dbname, b.done(), info);
@@ -412,27 +473,27 @@ namespace mongo {
return eval(dbname, jscode, info, retValue);
}
- list<string> DBClientWithCommands::getDatabaseNames(){
+ list<string> DBClientWithCommands::getDatabaseNames() {
BSONObj info;
uassert( 10005 , "listdatabases failed" , runCommand( "admin" , BSON( "listDatabases" << 1 ) , info ) );
uassert( 10006 , "listDatabases.databases not array" , info["databases"].type() == Array );
-
+
list<string> names;
-
+
BSONObjIterator i( info["databases"].embeddedObjectUserCheck() );
- while ( i.more() ){
+ while ( i.more() ) {
names.push_back( i.next().embeddedObjectUserCheck()["name"].valuestr() );
}
return names;
}
- list<string> DBClientWithCommands::getCollectionNames( const string& db ){
+ list<string> DBClientWithCommands::getCollectionNames( const string& db ) {
list<string> names;
-
+
string ns = db + ".system.namespaces";
auto_ptr<DBClientCursor> c = query( ns.c_str() , BSONObj() );
- while ( c->more() ){
+ while ( c->more() ) {
string name = c->next()["name"].valuestr();
if ( name.find( "$" ) != string::npos )
continue;
@@ -441,37 +502,37 @@ namespace mongo {
return names;
}
- bool DBClientWithCommands::exists( const string& ns ){
+ bool DBClientWithCommands::exists( const string& ns ) {
list<string> names;
-
+
string db = nsGetDB( ns ) + ".system.namespaces";
BSONObj q = BSON( "name" << ns );
- return count( db.c_str() , q ) != 0;
+ return count( db.c_str() , q, QueryOption_SlaveOk ) != 0;
}
/* --- dbclientconnection --- */
- bool DBClientConnection::auth(const string &dbname, const string &username, const string &password_text, string& errmsg, bool digestPassword) {
- string password = password_text;
- if( digestPassword )
- password = createPasswordDigest( username , password_text );
+ bool DBClientConnection::auth(const string &dbname, const string &username, const string &password_text, string& errmsg, bool digestPassword) {
+ string password = password_text;
+ if( digestPassword )
+ password = createPasswordDigest( username , password_text );
- if( autoReconnect ) {
- /* note we remember the auth info before we attempt to auth -- if the connection is broken, we will
- then have it for the next autoreconnect attempt.
- */
- pair<string,string> p = pair<string,string>(username, password);
- authCache[dbname] = p;
- }
+ if( autoReconnect ) {
+ /* note we remember the auth info before we attempt to auth -- if the connection is broken, we will
+ then have it for the next autoreconnect attempt.
+ */
+ pair<string,string> p = pair<string,string>(username, password);
+ authCache[dbname] = p;
+ }
- return DBClientBase::auth(dbname, username, password.c_str(), errmsg, false);
- }
+ return DBClientBase::auth(dbname, username, password.c_str(), errmsg, false);
+ }
BSONObj DBClientInterface::findOne(const string &ns, const Query& query, const BSONObj *fieldsToReturn, int queryOptions) {
auto_ptr<DBClientCursor> c =
this->query(ns, query, 1, 0, fieldsToReturn, queryOptions);
- uassert( 10276 , "DBClientBase::findOne: transport error", c.get() );
+ uassert( 10276 , str::stream() << "DBClientBase::findOne: transport error: " << getServerAddress() << " query: " << query.toString(), c.get() );
if ( c->hasResultFlag( ResultFlag_ShardConfigStale ) )
throw StaleConfigException( ns , "findOne has stale config" );
@@ -482,20 +543,20 @@ namespace mongo {
return c->nextSafe().copy();
}
- bool DBClientConnection::connect(const HostAndPort& server, string& errmsg){
+ bool DBClientConnection::connect(const HostAndPort& server, string& errmsg) {
_server = server;
_serverString = _server.toString();
return _connect( errmsg );
}
- bool DBClientConnection::_connect( string& errmsg ){
+ bool DBClientConnection::_connect( string& errmsg ) {
_serverString = _server.toString();
// we keep around SockAddr for connection life -- maybe MessagingPort
// requires that?
server.reset(new SockAddr(_server.host().c_str(), _server.port()));
- p.reset(new MessagingPort( _timeout, _logLevel ));
+ p.reset(new MessagingPort( _so_timeout, _logLevel ));
- if (server->getAddr() == "0.0.0.0"){
+ if (server->getAddr() == "0.0.0.0") {
failed = true;
return false;
}
@@ -513,35 +574,39 @@ namespace mongo {
void DBClientConnection::_checkConnection() {
if ( !failed )
return;
- if ( lastReconnectTry && time(0)-lastReconnectTry < 2 )
- return;
+ if ( lastReconnectTry && time(0)-lastReconnectTry < 2 ) {
+ // we wait a little before reconnect attempt to avoid constant hammering.
+ // but we throw we don't want to try to use a connection in a bad state
+ throw SocketException(SocketException::FAILED_STATE);
+ }
if ( !autoReconnect )
- return;
+ throw SocketException(SocketException::FAILED_STATE);
lastReconnectTry = time(0);
log(_logLevel) << "trying reconnect to " << _serverString << endl;
string errmsg;
failed = false;
- if ( ! _connect(errmsg) ) {
+ if ( ! _connect(errmsg) ) {
+ failed = true;
log(_logLevel) << "reconnect " << _serverString << " failed " << errmsg << endl;
- return;
- }
+ throw SocketException(SocketException::CONNECT_ERROR);
+ }
- log(_logLevel) << "reconnect " << _serverString << " ok" << endl;
- for( map< string, pair<string,string> >::iterator i = authCache.begin(); i != authCache.end(); i++ ) {
- const char *dbname = i->first.c_str();
- const char *username = i->second.first.c_str();
- const char *password = i->second.second.c_str();
- if( !DBClientBase::auth(dbname, username, password, errmsg, false) )
- log(_logLevel) << "reconnect: auth failed db:" << dbname << " user:" << username << ' ' << errmsg << '\n';
- }
+ log(_logLevel) << "reconnect " << _serverString << " ok" << endl;
+ for( map< string, pair<string,string> >::iterator i = authCache.begin(); i != authCache.end(); i++ ) {
+ const char *dbname = i->first.c_str();
+ const char *username = i->second.first.c_str();
+ const char *password = i->second.second.c_str();
+ if( !DBClientBase::auth(dbname, username, password, errmsg, false) )
+ log(_logLevel) << "reconnect: auth failed db:" << dbname << " user:" << username << ' ' << errmsg << '\n';
+ }
}
auto_ptr<DBClientCursor> DBClientBase::query(const string &ns, Query query, int nToReturn,
- int nToSkip, const BSONObj *fieldsToReturn, int queryOptions , int batchSize ) {
+ int nToSkip, const BSONObj *fieldsToReturn, int queryOptions , int batchSize ) {
auto_ptr<DBClientCursor> c( new DBClientCursor( this,
- ns, query.obj, nToReturn, nToSkip,
- fieldsToReturn, queryOptions , batchSize ) );
+ ns, query.obj, nToReturn, nToSkip,
+ fieldsToReturn, queryOptions , batchSize ) );
if ( c->init() )
return c;
return auto_ptr< DBClientCursor >( 0 );
@@ -562,14 +627,14 @@ namespace mongo {
}
boost::function<void(const BSONObj &)> _f;
};
-
+
unsigned long long DBClientConnection::query( boost::function<void(const BSONObj&)> f, const string& ns, Query query, const BSONObj *fieldsToReturn, int queryOptions ) {
DBClientFunConvertor fun;
fun._f = f;
boost::function<void(DBClientCursorBatchIterator &)> ptr( fun );
return DBClientConnection::query( ptr, ns, query, fieldsToReturn, queryOptions );
}
-
+
unsigned long long DBClientConnection::query( boost::function<void(DBClientCursorBatchIterator &)> f, const string& ns, Query query, const BSONObj *fieldsToReturn, int queryOptions ) {
// mask options
queryOptions &= (int)( QueryOption_NoCursorTimeout | QueryOption_SlaveOk );
@@ -577,11 +642,11 @@ namespace mongo {
bool doExhaust = ( availableOptions() & QueryOption_Exhaust );
if ( doExhaust ) {
- queryOptions |= (int)QueryOption_Exhaust;
+ queryOptions |= (int)QueryOption_Exhaust;
}
auto_ptr<DBClientCursor> c( this->query(ns, query, 0, 0, fieldsToReturn, queryOptions) );
- massert( 13386, "socket error for mapping query", c.get() );
-
+ uassert( 13386, "socket error for mapping query", c.get() );
+
if ( !doExhaust ) {
while( c->more() ) {
DBClientCursorBatchIterator i( *c );
@@ -591,21 +656,21 @@ namespace mongo {
return n;
}
- try {
- while( 1 ) {
- while( c->moreInCurrentBatch() ) {
+ try {
+ while( 1 ) {
+ while( c->moreInCurrentBatch() ) {
DBClientCursorBatchIterator i( *c );
f( i );
n += i.n();
}
- if( c->getCursorId() == 0 )
+ if( c->getCursorId() == 0 )
break;
c->exhaustReceiveMore();
}
}
- catch(std::exception&) {
+ catch(std::exception&) {
/* connection CANNOT be used anymore as more data may be on the way from the server.
we have to reconnect.
*/
@@ -633,16 +698,16 @@ namespace mongo {
void DBClientBase::insert( const string & ns , const vector< BSONObj > &v ) {
Message toSend;
-
+
BufBuilder b;
int opts = 0;
b.appendNum( opts );
b.appendStr( ns );
for( vector< BSONObj >::const_iterator i = v.begin(); i != v.end(); ++i )
i->appendSelfToBufBuilder( b );
-
+
toSend.setData( dbInsert, b.buf(), b.len() );
-
+
say( toSend );
}
@@ -686,63 +751,63 @@ namespace mongo {
say( toSend );
}
- auto_ptr<DBClientCursor> DBClientWithCommands::getIndexes( const string &ns ){
+ auto_ptr<DBClientCursor> DBClientWithCommands::getIndexes( const string &ns ) {
return query( Namespace( ns.c_str() ).getSisterNS( "system.indexes" ).c_str() , BSON( "ns" << ns ) );
}
-
- void DBClientWithCommands::dropIndex( const string& ns , BSONObj keys ){
+
+ void DBClientWithCommands::dropIndex( const string& ns , BSONObj keys ) {
dropIndex( ns , genIndexName( keys ) );
}
- void DBClientWithCommands::dropIndex( const string& ns , const string& indexName ){
+ void DBClientWithCommands::dropIndex( const string& ns , const string& indexName ) {
BSONObj info;
- if ( ! runCommand( nsToDatabase( ns.c_str() ) ,
- BSON( "deleteIndexes" << NamespaceString( ns ).coll << "index" << indexName ) ,
- info ) ){
+ if ( ! runCommand( nsToDatabase( ns.c_str() ) ,
+ BSON( "deleteIndexes" << NamespaceString( ns ).coll << "index" << indexName ) ,
+ info ) ) {
log(_logLevel) << "dropIndex failed: " << info << endl;
uassert( 10007 , "dropIndex failed" , 0 );
}
resetIndexCache();
}
-
- void DBClientWithCommands::dropIndexes( const string& ns ){
+
+ void DBClientWithCommands::dropIndexes( const string& ns ) {
BSONObj info;
- uassert( 10008 , "dropIndexes failed" , runCommand( nsToDatabase( ns.c_str() ) ,
- BSON( "deleteIndexes" << NamespaceString( ns ).coll << "index" << "*") ,
- info ) );
+ uassert( 10008 , "dropIndexes failed" , runCommand( nsToDatabase( ns.c_str() ) ,
+ BSON( "deleteIndexes" << NamespaceString( ns ).coll << "index" << "*") ,
+ info ) );
resetIndexCache();
}
- void DBClientWithCommands::reIndex( const string& ns ){
+ void DBClientWithCommands::reIndex( const string& ns ) {
list<BSONObj> all;
auto_ptr<DBClientCursor> i = getIndexes( ns );
- while ( i->more() ){
+ while ( i->more() ) {
all.push_back( i->next().getOwned() );
}
-
+
dropIndexes( ns );
-
- for ( list<BSONObj>::iterator i=all.begin(); i!=all.end(); i++ ){
+
+ for ( list<BSONObj>::iterator i=all.begin(); i!=all.end(); i++ ) {
BSONObj o = *i;
insert( Namespace( ns.c_str() ).getSisterNS( "system.indexes" ).c_str() , o );
}
-
+
}
-
- string DBClientWithCommands::genIndexName( const BSONObj& keys ){
+
+ string DBClientWithCommands::genIndexName( const BSONObj& keys ) {
stringstream ss;
-
+
bool first = 1;
for ( BSONObjIterator i(keys); i.more(); ) {
BSONElement f = i.next();
-
+
if ( first )
first = 0;
else
ss << "_";
-
+
ss << f.fieldName() << "_";
if( f.isNumber() )
ss << f.numberInt();
@@ -750,7 +815,7 @@ namespace mongo {
return ss.str();
}
- bool DBClientWithCommands::ensureIndex( const string &ns , BSONObj keys , bool unique, const string & name ) {
+ bool DBClientWithCommands::ensureIndex( const string &ns , BSONObj keys , bool unique, const string & name , bool cache ) {
BSONObjBuilder toSave;
toSave.append( "ns" , ns );
toSave.append( "key" , keys );
@@ -767,13 +832,15 @@ namespace mongo {
toSave.append( "name" , nn );
cacheKey += nn;
}
-
+
if ( unique )
toSave.appendBool( "unique", unique );
if ( _seenIndexes.count( cacheKey ) )
return 0;
- _seenIndexes.insert( cacheKey );
+
+ if ( cache )
+ _seenIndexes.insert( cacheKey );
insert( Namespace( ns.c_str() ).getSisterNS( "system.indexes" ).c_str() , toSave.obj() );
return 1;
@@ -808,9 +875,10 @@ namespace mongo {
void DBClientConnection::say( Message &toSend ) {
checkConnection();
- try {
+ try {
port().say( toSend );
- } catch( SocketException & ) {
+ }
+ catch( SocketException & ) {
failed = true;
throw;
}
@@ -820,24 +888,25 @@ namespace mongo {
port().piggyBack( toSend );
}
- void DBClientConnection::recv( Message &m ) {
+ void DBClientConnection::recv( Message &m ) {
port().recv(m);
}
- bool DBClientConnection::call( Message &toSend, Message &response, bool assertOk ) {
- /* todo: this is very ugly messagingport::call returns an error code AND can throw
- an exception. we should make it return void and just throw an exception anytime
+ bool DBClientConnection::call( Message &toSend, Message &response, bool assertOk , string * actualServer ) {
+ /* todo: this is very ugly messagingport::call returns an error code AND can throw
+ an exception. we should make it return void and just throw an exception anytime
it fails
*/
- try {
+ try {
if ( !port().call(toSend, response) ) {
failed = true;
if ( assertOk )
- uassert( 10278 , "dbclient error communicating with server", false);
+ uasserted( 10278 , str::stream() << "dbclient error communicating with server: " << getServerAddress() );
+
return false;
}
}
- catch( SocketException & ) {
+ catch( SocketException & ) {
failed = true;
throw;
}
@@ -858,222 +927,24 @@ namespace mongo {
}
}
- void DBClientConnection::killCursor( long long cursorId ){
+ void DBClientConnection::killCursor( long long cursorId ) {
BufBuilder b;
b.appendNum( (int)0 ); // reserved
b.appendNum( (int)1 ); // number
b.appendNum( cursorId );
-
+
Message m;
m.setData( dbKillCursors , b.buf() , b.len() );
- sayPiggyBack( m );
+ if ( _lazyKillCursor )
+ sayPiggyBack( m );
+ else
+ say(m);
}
- /* --- class dbclientpaired --- */
+ AtomicUInt DBClientConnection::_numConnections;
+ bool DBClientConnection::_lazyKillCursor = true;
- string DBClientReplicaSet::toString() {
- return getServerAddress();
- }
-
- DBClientReplicaSet::DBClientReplicaSet( const string& name , const vector<HostAndPort>& servers )
- : _name( name ) , _currentMaster( 0 ), _servers( servers ){
-
- for ( unsigned i=0; i<_servers.size(); i++ )
- _conns.push_back( new DBClientConnection( true , this ) );
- }
-
- DBClientReplicaSet::~DBClientReplicaSet(){
- for ( unsigned i=0; i<_conns.size(); i++ )
- delete _conns[i];
- _conns.clear();
- }
-
- string DBClientReplicaSet::getServerAddress() const {
- StringBuilder ss;
- if ( _name.size() )
- ss << _name << "/";
-
- for ( unsigned i=0; i<_servers.size(); i++ ){
- if ( i > 0 )
- ss << ",";
- ss << _servers[i].toString();
- }
- return ss.str();
- }
-
- /* find which server, the left or right, is currently master mode */
- void DBClientReplicaSet::_checkMaster() {
-
- bool triedQuickCheck = false;
-
- log( _logLevel + 1) << "_checkMaster on: " << toString() << endl;
- for ( int retry = 0; retry < 2; retry++ ) {
- for ( unsigned i=0; i<_conns.size(); i++ ){
- DBClientConnection * c = _conns[i];
- try {
- bool im;
- BSONObj o;
- c->isMaster(im, &o);
-
- if ( retry )
- log(_logLevel) << "checkmaster: " << c->toString() << ' ' << o << '\n';
-
- string maybePrimary;
- if ( o["hosts"].type() == Array ){
- if ( o["primary"].type() == String )
- maybePrimary = o["primary"].String();
-
- BSONObjIterator hi(o["hosts"].Obj());
- while ( hi.more() ){
- string toCheck = hi.next().String();
- int found = -1;
- for ( unsigned x=0; x<_servers.size(); x++ ){
- if ( toCheck == _servers[x].toString() ){
- found = x;
- break;
- }
- }
-
- if ( found == -1 ){
- HostAndPort h( toCheck );
- _servers.push_back( h );
- _conns.push_back( new DBClientConnection( true, this ) );
- string temp;
- _conns[ _conns.size() - 1 ]->connect( h , temp );
- log( _logLevel ) << "updated set to: " << toString() << endl;
- }
-
- }
- }
-
- if ( im ) {
- _currentMaster = c;
- return;
- }
-
- if ( maybePrimary.size() && ! triedQuickCheck ){
- for ( unsigned x=0; x<_servers.size(); x++ ){
- if ( _servers[i].toString() != maybePrimary )
- continue;
- triedQuickCheck = true;
- _conns[x]->isMaster( im , &o );
- if ( im ){
- _currentMaster = _conns[x];
- return;
- }
- }
- }
- }
- catch ( std::exception& e ) {
- if ( retry )
- log(_logLevel) << "checkmaster: caught exception " << c->toString() << ' ' << e.what() << endl;
- }
- }
- sleepsecs(1);
- }
-
- uassert( 10009 , "checkmaster: no master found", false);
- }
-
- DBClientConnection * DBClientReplicaSet::checkMaster() {
- if ( _currentMaster ){
- // a master is selected. let's just make sure connection didn't die
- if ( ! _currentMaster->isFailed() )
- return _currentMaster;
- _currentMaster = 0;
- }
-
- _checkMaster();
- assert( _currentMaster );
- return _currentMaster;
- }
-
- DBClientConnection& DBClientReplicaSet::masterConn(){
- return *checkMaster();
- }
-
- DBClientConnection& DBClientReplicaSet::slaveConn(){
- DBClientConnection * m = checkMaster();
- assert( ! m->isFailed() );
-
- DBClientConnection * failedSlave = 0;
-
- for ( unsigned i=0; i<_conns.size(); i++ ){
- if ( m == _conns[i] )
- continue;
- failedSlave = _conns[i];
- if ( _conns[i]->isFailed() )
- continue;
- return *_conns[i];
- }
-
- assert(failedSlave);
- return *failedSlave;
- }
-
- bool DBClientReplicaSet::connect(){
- string errmsg;
-
- bool anyGood = false;
- for ( unsigned i=0; i<_conns.size(); i++ ){
- if ( _conns[i]->connect( _servers[i] , errmsg ) )
- anyGood = true;
- }
-
- if ( ! anyGood )
- return false;
-
- try {
- checkMaster();
- }
- catch (AssertionException&) {
- return false;
- }
- return true;
- }
-
- bool DBClientReplicaSet::auth(const string &dbname, const string &username, const string &pwd, string& errmsg, bool digestPassword ) {
- DBClientConnection * m = checkMaster();
- if( !m->auth(dbname, username, pwd, errmsg, digestPassword ) )
- return false;
-
- /* we try to authentiate with the other half of the pair -- even if down, that way the authInfo is cached. */
- for ( unsigned i=0; i<_conns.size(); i++ ){
- if ( _conns[i] == m )
- continue;
- try {
- string e;
- _conns[i]->auth( dbname , username , pwd , e , digestPassword );
- }
- catch ( AssertionException& ){
- }
- }
-
- return true;
- }
-
- auto_ptr<DBClientCursor> DBClientReplicaSet::query(const string &a, Query b, int c, int d,
- const BSONObj *e, int f, int g){
- // TODO: if slave ok is set go to a slave
- return checkMaster()->query(a,b,c,d,e,f,g);
- }
-
- BSONObj DBClientReplicaSet::findOne(const string &a, const Query& b, const BSONObj *c, int d) {
- return checkMaster()->findOne(a,b,c,d);
- }
-
- bool DBClientReplicaSet::isMember( const DBConnector * conn ) const {
- if ( conn == this )
- return true;
-
- for ( unsigned i=0; i<_conns.size(); i++ )
- if ( _conns[i]->isMember( conn ) )
- return true;
-
- return false;
- }
-
bool serverAlive( const string &uri ) {
DBClientConnection c( false, 0, 20 ); // potentially the connection to server could fail while we're checking if it's alive - so use timeouts
@@ -1084,5 +955,5 @@ namespace mongo {
return false;
return true;
}
-
+
} // namespace mongo