summaryrefslogtreecommitdiff
path: root/client/dbclient.cpp
diff options
context:
space:
mode:
authorAntonin Kral <a.kral@bobek.cz>2010-08-11 12:38:57 +0200
committerAntonin Kral <a.kral@bobek.cz>2010-08-11 12:38:57 +0200
commit7645618fd3914cb8a20561625913c20d49504a49 (patch)
tree8370f846f58f6d71165b7a0e2eda04648584ec76 /client/dbclient.cpp
parent68c73c3c7608b4c87f07440dc3232801720b1168 (diff)
downloadmongodb-7645618fd3914cb8a20561625913c20d49504a49.tar.gz
Imported Upstream version 1.6.0
Diffstat (limited to 'client/dbclient.cpp')
-rw-r--r--client/dbclient.cpp722
1 files changed, 392 insertions, 330 deletions
diff --git a/client/dbclient.cpp b/client/dbclient.cpp
index f617f7c..04b6147 100644
--- a/client/dbclient.cpp
+++ b/client/dbclient.cpp
@@ -15,22 +15,89 @@
* limitations under the License.
*/
-#include "stdafx.h"
+#include "pch.h"
#include "../db/pdfile.h"
#include "dbclient.h"
-#include "../util/builder.h"
+#include "../bson/util/builder.h"
#include "../db/jsobj.h"
#include "../db/json.h"
#include "../db/instance.h"
#include "../util/md5.hpp"
#include "../db/dbmessage.h"
#include "../db/cmdline.h"
+#include "connpool.h"
+#include "../s/util.h"
+#include "syncclusterconnection.h"
namespace mongo {
+ DBClientBase* ConnectionString::connect( string& errmsg ) const {
+ switch ( _type ){
+ case MASTER: {
+ DBClientConnection * c = new DBClientConnection(true);
+ log(1) << "creating new connection to:" << _servers[0] << endl;
+ if ( ! c->connect( _servers[0] , errmsg ) ) {
+ delete c;
+ return 0;
+ }
+ return c;
+ }
+
+ case PAIR:
+ case SET: {
+ DBClientReplicaSet * set = new DBClientReplicaSet( _setName , _servers );
+ if( ! set->connect() ){
+ delete set;
+ errmsg = "connect failed to set ";
+ errmsg += toString();
+ return 0;
+ }
+ return set;
+ }
+
+ case SYNC: {
+ // TODO , don't copy
+ list<HostAndPort> l;
+ for ( unsigned i=0; i<_servers.size(); i++ )
+ l.push_back( _servers[i] );
+ return new SyncClusterConnection( l );
+ }
+
+ case INVALID:
+ throw UserException( 13421 , "trying to connect to invalid ConnectionString" );
+ break;
+ }
+
+ assert( 0 );
+ return 0;
+ }
+
+ ConnectionString ConnectionString::parse( const string& host , string& errmsg ){
+
+ string::size_type i = host.find( '/' );
+ if ( i != string::npos ){
+ // replica set
+ return ConnectionString( SET , host.substr( i + 1 ) , host.substr( 0 , i ) );
+ }
+
+ int numCommas = DBClientBase::countCommas( host );
+
+ if( numCommas == 0 )
+ return ConnectionString( HostAndPort( host ) );
+
+ if ( numCommas == 1 )
+ return ConnectionString( PAIR , host );
+
+ if ( numCommas == 2 )
+ return ConnectionString( SYNC , host );
+
+ errmsg = (string)"invalid hostname [" + host + "]";
+ return ConnectionString(); // INVALID
+ }
+
Query& Query::where(const string &jscode, BSONObj scope) {
/* use where() before sort() and hint() and explain(), else this will assert. */
- assert( !obj.hasField("query") );
+ assert( ! isComplex() );
BSONObjBuilder b;
b.appendElements(obj);
b.appendWhere(jscode, scope);
@@ -39,7 +106,7 @@ namespace mongo {
}
void Query::makeComplex() {
- if ( obj.hasElement( "query" ) )
+ if ( isComplex() )
return;
BSONObjBuilder b;
b.append( "query", obj );
@@ -76,19 +143,36 @@ namespace mongo {
return *this;
}
- bool Query::isComplex() const{
- return obj.hasElement( "query" );
+ bool Query::isComplex( bool * hasDollar ) const{
+ if ( obj.hasElement( "query" ) ){
+ if ( hasDollar )
+ hasDollar[0] = false;
+ return true;
+ }
+
+ if ( obj.hasElement( "$query" ) ){
+ if ( hasDollar )
+ hasDollar[0] = true;
+ return true;
+ }
+
+ return false;
}
BSONObj Query::getFilter() const {
- if ( ! isComplex() )
+ bool hasDollar;
+ if ( ! isComplex( &hasDollar ) )
return obj;
- return obj.getObjectField( "query" );
+
+ return obj.getObjectField( hasDollar ? "$query" : "query" );
}
BSONObj Query::getSort() const {
if ( ! isComplex() )
return BSONObj();
- return obj.getObjectField( "orderby" );
+ BSONObj ret = obj.getObjectField( "orderby" );
+ if (ret.isEmpty())
+ ret = obj.getObjectField( "$orderby" );
+ return ret;
}
BSONObj Query::getHint() const {
if ( ! isComplex() )
@@ -109,6 +193,17 @@ namespace mongo {
return o["ok"].trueValue();
}
+ enum QueryOptions DBClientWithCommands::availableOptions() {
+ if ( !_haveCachedAvailableOptions ) {
+ BSONObj ret;
+ if ( runCommand( "admin", BSON( "availablequeryoptions" << 1 ), ret ) ) {
+ _cachedAvailableOptions = ( enum QueryOptions )( ret.getIntField( "options" ) );
+ }
+ _haveCachedAvailableOptions = true;
+ }
+ return _cachedAvailableOptions;
+ }
+
inline bool DBClientWithCommands::runCommand(const string &dbname, const BSONObj& cmd, BSONObj &info, int options) {
string ns = dbname + ".$cmd";
info = findOne(ns, cmd, 0 , options);
@@ -133,7 +228,7 @@ namespace mongo {
BSONObj res;
if( !runCommand(ns.db.c_str(), cmd, res, options) )
uasserted(11010,string("count fails:") + res.toString());
- return res.getIntField("n");
+ return res["n"].numberLong();
}
BSONObj getlasterrorcmdobj = fromjson("{getlasterror:1}");
@@ -146,10 +241,14 @@ namespace mongo {
string DBClientWithCommands::getLastError() {
BSONObj info = getLastErrorDetailed();
+ return getLastErrorString( info );
+ }
+
+ string DBClientWithCommands::getLastErrorString( const BSONObj& info ){
BSONElement e = info["err"];
if( e.eoo() ) return "";
if( e.type() == Object ) return e.toString();
- return e.str();
+ return e.str();
}
BSONObj getpreverrorcmdobj = fromjson("{getpreverror:1}");
@@ -223,13 +322,14 @@ namespace mongo {
bool DBClientWithCommands::isMaster(bool& isMaster, BSONObj *info) {
BSONObj o;
- if ( info == 0 ) info = &o;
+ if ( info == 0 )
+ info = &o;
bool ok = runCommand("admin", ismastercmdobj, *info);
- isMaster = (info->getIntField("ismaster") == 1);
+ isMaster = info->getField("ismaster").trueValue();
return ok;
}
- bool DBClientWithCommands::createCollection(const string &ns, unsigned size, bool capped, int max, BSONObj *info) {
+ bool DBClientWithCommands::createCollection(const string &ns, long long size, bool capped, int max, BSONObj *info) {
BSONObj o;
if ( info == 0 ) info = &o;
BSONObjBuilder b;
@@ -346,64 +446,9 @@ namespace mongo {
string db = nsGetDB( ns ) + ".system.namespaces";
BSONObj q = BSON( "name" << ns );
- return count( db.c_str() , q );
- }
-
-
- void testSort() {
- DBClientConnection c;
- string err;
- if ( !c.connect("localhost", err) ) {
- out() << "can't connect to server " << err << endl;
- return;
- }
-
- cout << "findOne returns:" << endl;
- cout << c.findOne("test.foo", QUERY( "x" << 3 ) ).toString() << endl;
- cout << c.findOne("test.foo", QUERY( "x" << 3 ).sort("name") ).toString() << endl;
-
- }
-
- /* TODO: unit tests should run this? */
- void testDbEval() {
- DBClientConnection c;
- string err;
- if ( !c.connect("localhost", err) ) {
- out() << "can't connect to server " << err << endl;
- return;
- }
-
- if( !c.auth("dwight", "u", "p", err) ) {
- out() << "can't authenticate " << err << endl;
- return;
- }
-
- BSONObj info;
- BSONElement retValue;
- BSONObjBuilder b;
- b.append("0", 99);
- BSONObj args = b.done();
- bool ok = c.eval("dwight", "function() { return args[0]; }", info, retValue, &args);
- out() << "eval ok=" << ok << endl;
- out() << "retvalue=" << retValue.toString() << endl;
- out() << "info=" << info.toString() << endl;
-
- out() << endl;
-
- int x = 3;
- assert( c.eval("dwight", "function() { return 3; }", x) );
-
- out() << "***\n";
-
- BSONObj foo = fromjson("{\"x\":7}");
- out() << foo.toString() << endl;
- int res=0;
- ok = c.eval("dwight", "function(parm1) { return parm1.x; }", foo, res);
- out() << ok << " retval:" << res << endl;
+ return count( db.c_str() , q ) != 0;
}
- void testPaired();
-
/* --- dbclientconnection --- */
bool DBClientConnection::auth(const string &dbname, const string &username, const string &password_text, string& errmsg, bool digestPassword) {
@@ -422,48 +467,42 @@ namespace mongo {
return DBClientBase::auth(dbname, username, password.c_str(), errmsg, false);
}
- BSONObj DBClientInterface::findOne(const string &ns, Query query, const BSONObj *fieldsToReturn, int queryOptions) {
+ BSONObj DBClientInterface::findOne(const string &ns, const Query& query, const BSONObj *fieldsToReturn, int queryOptions) {
auto_ptr<DBClientCursor> c =
this->query(ns, query, 1, 0, fieldsToReturn, queryOptions);
- massert( 10276 , "DBClientBase::findOne: transport error", c.get() );
+ uassert( 10276 , "DBClientBase::findOne: transport error", c.get() );
+
+ if ( c->hasResultFlag( ResultFlag_ShardConfigStale ) )
+ throw StaleConfigException( ns , "findOne has stale config" );
if ( !c->more() )
return BSONObj();
- return c->next().copy();
+ return c->nextSafe().copy();
+ }
+
+ bool DBClientConnection::connect(const HostAndPort& server, string& errmsg){
+ _server = server;
+ _serverString = _server.toString();
+ return _connect( errmsg );
}
- bool DBClientConnection::connect(const string &_serverAddress, string& errmsg) {
- serverAddress = _serverAddress;
+ bool DBClientConnection::_connect( string& errmsg ){
+ _serverString = _server.toString();
+ // we keep around SockAddr for connection life -- maybe MessagingPort
+ // requires that?
+ server.reset(new SockAddr(_server.host().c_str(), _server.port()));
+ p.reset(new MessagingPort( _timeout, _logLevel ));
- string ip;
- int port;
- size_t idx = serverAddress.find( ":" );
- if ( idx != string::npos ) {
- port = strtol( serverAddress.substr( idx + 1 ).c_str(), 0, 10 );
- ip = serverAddress.substr( 0 , idx );
- ip = hostbyname(ip.c_str());
- } else {
- port = CmdLine::DefaultDBPort;
- ip = hostbyname( serverAddress.c_str() );
- }
- if( ip.empty() ) {
- stringstream ss;
- ss << "client connect: couldn't parse/resolve hostname: " << _serverAddress;
- errmsg = ss.str();
+ if (server->getAddr() == "0.0.0.0"){
failed = true;
return false;
}
- // we keep around SockAddr for connection life -- maybe MessagingPort
- // requires that?
- server = auto_ptr<SockAddr>(new SockAddr(ip.c_str(), port));
- p = auto_ptr<MessagingPort>(new MessagingPort());
-
if ( !p->connect(*server) ) {
stringstream ss;
- ss << "couldn't connect to server " << serverAddress << " " << ip << ":" << port;
+ ss << "couldn't connect to server " << _serverString << '}';
errmsg = ss.str();
failed = true;
return false;
@@ -480,22 +519,21 @@ namespace mongo {
return;
lastReconnectTry = time(0);
- log() << "trying reconnect to " << serverAddress << endl;
+ log(_logLevel) << "trying reconnect to " << _serverString << endl;
string errmsg;
- string tmp = serverAddress;
failed = false;
- if ( !connect(tmp.c_str(), errmsg) ) {
- log() << "reconnect " << serverAddress << " failed " << errmsg << endl;
+ if ( ! _connect(errmsg) ) {
+ log(_logLevel) << "reconnect " << _serverString << " failed " << errmsg << endl;
return;
}
- log() << "reconnect " << serverAddress << " ok" << endl;
+ log(_logLevel) << "reconnect " << _serverString << " ok" << endl;
for( map< string, pair<string,string> >::iterator i = authCache.begin(); i != authCache.end(); i++ ) {
const char *dbname = i->first.c_str();
const char *username = i->second.first.c_str();
const char *password = i->second.second.c_str();
if( !DBClientBase::auth(dbname, username, password, errmsg, false) )
- log() << "reconnect: auth failed db:" << dbname << " user:" << username << ' ' << errmsg << '\n';
+ log(_logLevel) << "reconnect: auth failed db:" << dbname << " user:" << username << ' ' << errmsg << '\n';
}
}
@@ -516,13 +554,76 @@ namespace mongo {
return auto_ptr< DBClientCursor >( 0 );
}
+ struct DBClientFunConvertor {
+ void operator()( DBClientCursorBatchIterator &i ) {
+ while( i.moreInCurrentBatch() ) {
+ _f( i.nextSafe() );
+ }
+ }
+ boost::function<void(const BSONObj &)> _f;
+ };
+
+ unsigned long long DBClientConnection::query( boost::function<void(const BSONObj&)> f, const string& ns, Query query, const BSONObj *fieldsToReturn, int queryOptions ) {
+ DBClientFunConvertor fun;
+ fun._f = f;
+ boost::function<void(DBClientCursorBatchIterator &)> ptr( fun );
+ return DBClientConnection::query( ptr, ns, query, fieldsToReturn, queryOptions );
+ }
+
+ unsigned long long DBClientConnection::query( boost::function<void(DBClientCursorBatchIterator &)> f, const string& ns, Query query, const BSONObj *fieldsToReturn, int queryOptions ) {
+ // mask options
+ queryOptions &= (int)( QueryOption_NoCursorTimeout | QueryOption_SlaveOk );
+ unsigned long long n = 0;
+
+ bool doExhaust = ( availableOptions() & QueryOption_Exhaust );
+ if ( doExhaust ) {
+ queryOptions |= (int)QueryOption_Exhaust;
+ }
+ auto_ptr<DBClientCursor> c( this->query(ns, query, 0, 0, fieldsToReturn, queryOptions) );
+ massert( 13386, "socket error for mapping query", c.get() );
+
+ if ( !doExhaust ) {
+ while( c->more() ) {
+ DBClientCursorBatchIterator i( *c );
+ f( i );
+ n += i.n();
+ }
+ return n;
+ }
+
+ try {
+ while( 1 ) {
+ while( c->moreInCurrentBatch() ) {
+ DBClientCursorBatchIterator i( *c );
+ f( i );
+ n += i.n();
+ }
+
+ if( c->getCursorId() == 0 )
+ break;
+
+ c->exhaustReceiveMore();
+ }
+ }
+ catch(std::exception&) {
+ /* connection CANNOT be used anymore as more data may be on the way from the server.
+ we have to reconnect.
+ */
+ failed = true;
+ p->shutdown();
+ throw;
+ }
+
+ return n;
+ }
+
void DBClientBase::insert( const string & ns , BSONObj obj ) {
Message toSend;
BufBuilder b;
int opts = 0;
- b.append( opts );
- b.append( ns );
+ b.appendNum( opts );
+ b.appendStr( ns );
obj.appendSelfToBufBuilder( b );
toSend.setData( dbInsert , b.buf() , b.len() );
@@ -535,8 +636,8 @@ namespace mongo {
BufBuilder b;
int opts = 0;
- b.append( opts );
- b.append( ns );
+ b.appendNum( opts );
+ b.appendStr( ns );
for( vector< BSONObj >::const_iterator i = v.begin(); i != v.end(); ++i )
i->appendSelfToBufBuilder( b );
@@ -550,13 +651,13 @@ namespace mongo {
BufBuilder b;
int opts = 0;
- b.append( opts );
- b.append( ns );
+ b.appendNum( opts );
+ b.appendStr( ns );
int flags = 0;
if ( justOne )
- flags |= 1;
- b.append( flags );
+ flags |= RemoveOption_JustOne;
+ b.appendNum( flags );
obj.obj.appendSelfToBufBuilder( b );
@@ -568,13 +669,13 @@ namespace mongo {
void DBClientBase::update( const string & ns , Query query , BSONObj obj , bool upsert , bool multi ) {
BufBuilder b;
- b.append( (int)0 ); // reserved
- b.append( ns );
+ b.appendNum( (int)0 ); // reserved
+ b.appendStr( ns );
int flags = 0;
if ( upsert ) flags |= UpdateOption_Upsert;
if ( multi ) flags |= UpdateOption_Multi;
- b.append( flags );
+ b.appendNum( flags );
query.obj.appendSelfToBufBuilder( b );
obj.appendSelfToBufBuilder( b );
@@ -599,7 +700,7 @@ namespace mongo {
if ( ! runCommand( nsToDatabase( ns.c_str() ) ,
BSON( "deleteIndexes" << NamespaceString( ns ).coll << "index" << indexName ) ,
info ) ){
- log() << "dropIndex failed: " << info << endl;
+ log(_logLevel) << "dropIndex failed: " << info << endl;
uassert( 10007 , "dropIndex failed" , 0 );
}
resetIndexCache();
@@ -684,15 +785,21 @@ namespace mongo {
/* -- DBClientCursor ---------------------------------------------- */
+#ifdef _DEBUG
+#define CHECK_OBJECT( o , msg ) massert( 10337 , (string)"object not valid" + (msg) , (o).isValid() )
+#else
+#define CHECK_OBJECT( o , msg )
+#endif
+
void assembleRequest( const string &ns, BSONObj query, int nToReturn, int nToSkip, const BSONObj *fieldsToReturn, int queryOptions, Message &toSend ) {
CHECK_OBJECT( query , "assembleRequest query" );
// see query.h for the protocol we are using here.
BufBuilder b;
int opts = queryOptions;
- b.append(opts);
- b.append(ns.c_str());
- b.append(nToSkip);
- b.append(nToReturn);
+ b.appendNum(opts);
+ b.appendStr(ns);
+ b.appendNum(nToSkip);
+ b.appendNum(nToReturn);
query.appendSelfToBufBuilder(b);
if ( fieldsToReturn )
fieldsToReturn->appendSelfToBufBuilder(b);
@@ -713,6 +820,10 @@ namespace mongo {
port().piggyBack( toSend );
}
+ void DBClientConnection::recv( Message &m ) {
+ port().recv(m);
+ }
+
bool DBClientConnection::call( Message &toSend, Message &response, bool assertOk ) {
/* todo: this is very ugly messagingport::call returns an error code AND can throw
an exception. we should make it return void and just throw an exception anytime
@@ -722,7 +833,7 @@ namespace mongo {
if ( !port().call(toSend, response) ) {
failed = true;
if ( assertOk )
- massert( 10278 , "dbclient error communicating with server", false);
+ uassert( 10278 , "dbclient error communicating with server", false);
return false;
}
}
@@ -736,178 +847,128 @@ namespace mongo {
void DBClientConnection::checkResponse( const char *data, int nReturned ) {
/* check for errors. the only one we really care about at
this stage is "not master" */
- if ( clientPaired && nReturned ) {
+ if ( clientSet && nReturned ) {
+ assert(data);
BSONObj o(data);
BSONElement e = o.firstElement();
if ( strcmp(e.fieldName(), "$err") == 0 &&
e.type() == String && strncmp(e.valuestr(), "not master", 10) == 0 ) {
- clientPaired->isntMaster();
+ clientSet->isntMaster();
}
}
}
- int DBClientCursor::nextBatchSize(){
- if ( nToReturn == 0 )
- return batchSize;
- if ( batchSize == 0 )
- return nToReturn;
+ void DBClientConnection::killCursor( long long cursorId ){
+ BufBuilder b;
+ b.appendNum( (int)0 ); // reserved
+ b.appendNum( (int)1 ); // number
+ b.appendNum( cursorId );
- return batchSize < nToReturn ? batchSize : nToReturn;
- }
-
- bool DBClientCursor::init() {
- Message toSend;
- if ( !cursorId ) {
- assembleRequest( ns, query, nextBatchSize() , nToSkip, fieldsToReturn, opts, toSend );
- } else {
- BufBuilder b;
- b.append( opts );
- b.append( ns.c_str() );
- b.append( nToReturn );
- b.append( cursorId );
- toSend.setData( dbGetMore, b.buf(), b.len() );
- }
- if ( !connector->call( toSend, *m, false ) )
- return false;
- if ( ! m->data )
- return false;
- dataReceived();
- return true;
+ Message m;
+ m.setData( dbKillCursors , b.buf() , b.len() );
+
+ sayPiggyBack( m );
}
- void DBClientCursor::requestMore() {
- assert( cursorId && pos == nReturned );
-
- if (haveLimit){
- nToReturn -= nReturned;
- assert(nToReturn > 0);
- }
- BufBuilder b;
- b.append(opts);
- b.append(ns.c_str());
- b.append(nextBatchSize());
- b.append(cursorId);
+ /* --- class dbclientpaired --- */
- Message toSend;
- toSend.setData(dbGetMore, b.buf(), b.len());
- auto_ptr<Message> response(new Message());
- connector->call( toSend, *response );
-
- m = response;
- dataReceived();
- }
-
- void DBClientCursor::dataReceived() {
- QueryResult *qr = (QueryResult *) m->data;
- resultFlags = qr->resultFlags();
- if ( qr->resultFlags() & QueryResult::ResultFlag_CursorNotFound ) {
- // cursor id no longer valid at the server.
- assert( qr->cursorId == 0 );
- cursorId = 0; // 0 indicates no longer valid (dead)
- // TODO: should we throw a UserException here???
- }
- if ( cursorId == 0 || ! ( opts & QueryOption_CursorTailable ) ) {
- // only set initially: we don't want to kill it on end of data
- // if it's a tailable cursor
- cursorId = qr->cursorId;
- }
- nReturned = qr->nReturned;
- pos = 0;
- data = qr->data();
-
- connector->checkResponse( data, nReturned );
- /* this assert would fire the way we currently work:
- assert( nReturned || cursorId == 0 );
- */
+ string DBClientReplicaSet::toString() {
+ return getServerAddress();
}
- /** If true, safe to call next(). Requests more from server if necessary. */
- bool DBClientCursor::more() {
- if ( !_putBack.empty() )
- return true;
+ DBClientReplicaSet::DBClientReplicaSet( const string& name , const vector<HostAndPort>& servers )
+ : _name( name ) , _currentMaster( 0 ), _servers( servers ){
- if (haveLimit && pos >= nToReturn)
- return false;
-
- if ( pos < nReturned )
- return true;
-
- if ( cursorId == 0 )
- return false;
-
- requestMore();
- return pos < nReturned;
+ for ( unsigned i=0; i<_servers.size(); i++ )
+ _conns.push_back( new DBClientConnection( true , this ) );
}
-
- BSONObj DBClientCursor::next() {
- assert( more() );
- if ( !_putBack.empty() ) {
- BSONObj ret = _putBack.top();
- _putBack.pop();
- return ret;
- }
- pos++;
- BSONObj o(data);
- data += o.objsize();
- return o;
- }
-
- DBClientCursor::~DBClientCursor() {
- DESTRUCTOR_GUARD (
- if ( cursorId && _ownCursor ) {
- BufBuilder b;
- b.append( (int)0 ); // reserved
- b.append( (int)1 ); // number
- b.append( cursorId );
-
- Message m;
- m.setData( dbKillCursors , b.buf() , b.len() );
-
- connector->sayPiggyBack( m );
- }
- );
+
+ DBClientReplicaSet::~DBClientReplicaSet(){
+ for ( unsigned i=0; i<_conns.size(); i++ )
+ delete _conns[i];
+ _conns.clear();
}
-
- /* --- class dbclientpaired --- */
-
- string DBClientPaired::toString() {
- stringstream ss;
- ss << "state: " << master << '\n';
- ss << "left: " << left.toStringLong() << '\n';
- ss << "right: " << right.toStringLong() << '\n';
+
+ string DBClientReplicaSet::getServerAddress() const {
+ StringBuilder ss;
+ if ( _name.size() )
+ ss << _name << "/";
+
+ for ( unsigned i=0; i<_servers.size(); i++ ){
+ if ( i > 0 )
+ ss << ",";
+ ss << _servers[i].toString();
+ }
return ss.str();
}
-#pragma warning(disable: 4355)
- DBClientPaired::DBClientPaired() :
- left(true, this), right(true, this)
- {
- master = NotSetL;
- }
-#pragma warning(default: 4355)
-
/* find which server, the left or right, is currently master mode */
- void DBClientPaired::_checkMaster() {
+ void DBClientReplicaSet::_checkMaster() {
+
+ bool triedQuickCheck = false;
+
+ log( _logLevel + 1) << "_checkMaster on: " << toString() << endl;
for ( int retry = 0; retry < 2; retry++ ) {
- int x = master;
- for ( int pass = 0; pass < 2; pass++ ) {
- DBClientConnection& c = x == 0 ? left : right;
+ for ( unsigned i=0; i<_conns.size(); i++ ){
+ DBClientConnection * c = _conns[i];
try {
bool im;
BSONObj o;
- c.isMaster(im, &o);
+ c->isMaster(im, &o);
+
if ( retry )
- log() << "checkmaster: " << c.toString() << ' ' << o.toString() << '\n';
+ log(_logLevel) << "checkmaster: " << c->toString() << ' ' << o << '\n';
+
+ string maybePrimary;
+ if ( o["hosts"].type() == Array ){
+ if ( o["primary"].type() == String )
+ maybePrimary = o["primary"].String();
+
+ BSONObjIterator hi(o["hosts"].Obj());
+ while ( hi.more() ){
+ string toCheck = hi.next().String();
+ int found = -1;
+ for ( unsigned x=0; x<_servers.size(); x++ ){
+ if ( toCheck == _servers[x].toString() ){
+ found = x;
+ break;
+ }
+ }
+
+ if ( found == -1 ){
+ HostAndPort h( toCheck );
+ _servers.push_back( h );
+ _conns.push_back( new DBClientConnection( true, this ) );
+ string temp;
+ _conns[ _conns.size() - 1 ]->connect( h , temp );
+ log( _logLevel ) << "updated set to: " << toString() << endl;
+ }
+
+ }
+ }
+
if ( im ) {
- master = (State) (x + 2);
+ _currentMaster = c;
return;
}
+
+ if ( maybePrimary.size() && ! triedQuickCheck ){
+ for ( unsigned x=0; x<_servers.size(); x++ ){
+ if ( _servers[i].toString() != maybePrimary )
+ continue;
+ triedQuickCheck = true;
+ _conns[x]->isMaster( im , &o );
+ if ( im ){
+ _currentMaster = _conns[x];
+ return;
+ }
+ }
+ }
}
- catch (AssertionException&) {
+ catch ( std::exception& e ) {
if ( retry )
- log() << "checkmaster: caught exception " << c.toString() << '\n';
+ log(_logLevel) << "checkmaster: caught exception " << c->toString() << ' ' << e.what() << endl;
}
- x = x^1;
}
sleepsecs(1);
}
@@ -915,36 +976,54 @@ namespace mongo {
uassert( 10009 , "checkmaster: no master found", false);
}
- inline DBClientConnection& DBClientPaired::checkMaster() {
- if ( master > NotSetR ) {
+ DBClientConnection * DBClientReplicaSet::checkMaster() {
+ if ( _currentMaster ){
// a master is selected. let's just make sure connection didn't die
- DBClientConnection& c = master == Left ? left : right;
- if ( !c.isFailed() )
- return c;
- // after a failure, on the next checkMaster, start with the other
- // server -- presumably it took over. (not critical which we check first,
- // just will make the failover slightly faster if we guess right)
- master = master == Left ? NotSetR : NotSetL;
+ if ( ! _currentMaster->isFailed() )
+ return _currentMaster;
+ _currentMaster = 0;
}
_checkMaster();
- assert( master > NotSetR );
- return master == Left ? left : right;
+ assert( _currentMaster );
+ return _currentMaster;
}
- DBClientConnection& DBClientPaired::slaveConn(){
- DBClientConnection& m = checkMaster();
- assert( ! m.isFailed() );
- return master == Left ? right : left;
+ DBClientConnection& DBClientReplicaSet::masterConn(){
+ return *checkMaster();
}
- bool DBClientPaired::connect(const string &serverHostname1, const string &serverHostname2) {
+ DBClientConnection& DBClientReplicaSet::slaveConn(){
+ DBClientConnection * m = checkMaster();
+ assert( ! m->isFailed() );
+
+ DBClientConnection * failedSlave = 0;
+
+ for ( unsigned i=0; i<_conns.size(); i++ ){
+ if ( m == _conns[i] )
+ continue;
+ failedSlave = _conns[i];
+ if ( _conns[i]->isFailed() )
+ continue;
+ return *_conns[i];
+ }
+
+ assert(failedSlave);
+ return *failedSlave;
+ }
+
+ bool DBClientReplicaSet::connect(){
string errmsg;
- bool l = left.connect(serverHostname1, errmsg);
- bool r = right.connect(serverHostname2, errmsg);
- master = l ? NotSetL : NotSetR;
- if ( !l && !r ) // it would be ok to fall through, but checkMaster will then try an immediate reconnect which is slow
+
+ bool anyGood = false;
+ for ( unsigned i=0; i<_conns.size(); i++ ){
+ if ( _conns[i]->connect( _servers[i] , errmsg ) )
+ anyGood = true;
+ }
+
+ if ( ! anyGood )
return false;
+
try {
checkMaster();
}
@@ -954,61 +1033,44 @@ namespace mongo {
return true;
}
- bool DBClientPaired::connect(string hostpairstring) {
- size_t comma = hostpairstring.find( "," );
- uassert( 10010 , "bad hostpairstring", comma != string::npos);
- return connect( hostpairstring.substr( 0 , comma ) , hostpairstring.substr( comma + 1 ) );
- }
-
- bool DBClientPaired::auth(const string &dbname, const string &username, const string &pwd, string& errmsg) {
- DBClientConnection& m = checkMaster();
- if( !m.auth(dbname, username, pwd, errmsg) )
+ bool DBClientReplicaSet::auth(const string &dbname, const string &username, const string &pwd, string& errmsg, bool digestPassword ) {
+ DBClientConnection * m = checkMaster();
+ if( !m->auth(dbname, username, pwd, errmsg, digestPassword ) )
return false;
+
/* we try to authentiate with the other half of the pair -- even if down, that way the authInfo is cached. */
- string e;
- try {
- if( &m == &left )
- right.auth(dbname, username, pwd, e);
- else
- left.auth(dbname, username, pwd, e);
- }
- catch( AssertionException&) {
- }
+ for ( unsigned i=0; i<_conns.size(); i++ ){
+ if ( _conns[i] == m )
+ continue;
+ try {
+ string e;
+ _conns[i]->auth( dbname , username , pwd , e , digestPassword );
+ }
+ catch ( AssertionException& ){
+ }
+ }
+
return true;
}
- auto_ptr<DBClientCursor> DBClientPaired::query(const string &a, Query b, int c, int d,
- const BSONObj *e, int f, int g)
- {
- return checkMaster().query(a,b,c,d,e,f,g);
- }
-
- BSONObj DBClientPaired::findOne(const string &a, Query b, const BSONObj *c, int d) {
- return checkMaster().findOne(a,b,c,d);
- }
-
- void testPaired() {
- DBClientPaired p;
- log() << "connect returns " << p.connect("localhost:27017", "localhost:27018") << endl;
-
- //DBClientConnection p(true);
- string errmsg;
- // log() << "connect " << p.connect("localhost", errmsg) << endl;
- log() << "auth " << p.auth("dwight", "u", "p", errmsg) << endl;
-
- while( 1 ) {
- sleepsecs(3);
- try {
- log() << "findone returns " << p.findOne("dwight.foo", BSONObj()).toString() << endl;
- sleepsecs(3);
- BSONObj info;
- bool im;
- log() << "ismaster returns " << p.isMaster(im,&info) << " info: " << info.toString() << endl;
- }
- catch(...) {
- cout << "caught exception" << endl;
- }
- }
- }
+ auto_ptr<DBClientCursor> DBClientReplicaSet::query(const string &a, Query b, int c, int d,
+ const BSONObj *e, int f, int g){
+ // TODO: if slave ok is set go to a slave
+ return checkMaster()->query(a,b,c,d,e,f,g);
+ }
+ BSONObj DBClientReplicaSet::findOne(const string &a, const Query& b, const BSONObj *c, int d) {
+ return checkMaster()->findOne(a,b,c,d);
+ }
+
+ bool serverAlive( const string &uri ) {
+ DBClientConnection c( false, 0, 20 ); // potentially the connection to server could fail while we're checking if it's alive - so use timeouts
+ string err;
+ if ( !c.connect( uri, err ) )
+ return false;
+ if ( !c.simpleCommand( "admin", 0, "ping" ) )
+ return false;
+ return true;
+ }
+
} // namespace mongo