diff options
author | Antonin Kral <a.kral@bobek.cz> | 2010-08-11 12:38:57 +0200 |
---|---|---|
committer | Antonin Kral <a.kral@bobek.cz> | 2010-08-11 12:38:57 +0200 |
commit | 7645618fd3914cb8a20561625913c20d49504a49 (patch) | |
tree | 8370f846f58f6d71165b7a0e2eda04648584ec76 /client/dbclient.cpp | |
parent | 68c73c3c7608b4c87f07440dc3232801720b1168 (diff) | |
download | mongodb-7645618fd3914cb8a20561625913c20d49504a49.tar.gz |
Imported Upstream version 1.6.0
Diffstat (limited to 'client/dbclient.cpp')
-rw-r--r-- | client/dbclient.cpp | 722 |
1 files changed, 392 insertions, 330 deletions
diff --git a/client/dbclient.cpp b/client/dbclient.cpp index f617f7c..04b6147 100644 --- a/client/dbclient.cpp +++ b/client/dbclient.cpp @@ -15,22 +15,89 @@ * limitations under the License. */ -#include "stdafx.h" +#include "pch.h" #include "../db/pdfile.h" #include "dbclient.h" -#include "../util/builder.h" +#include "../bson/util/builder.h" #include "../db/jsobj.h" #include "../db/json.h" #include "../db/instance.h" #include "../util/md5.hpp" #include "../db/dbmessage.h" #include "../db/cmdline.h" +#include "connpool.h" +#include "../s/util.h" +#include "syncclusterconnection.h" namespace mongo { + DBClientBase* ConnectionString::connect( string& errmsg ) const { + switch ( _type ){ + case MASTER: { + DBClientConnection * c = new DBClientConnection(true); + log(1) << "creating new connection to:" << _servers[0] << endl; + if ( ! c->connect( _servers[0] , errmsg ) ) { + delete c; + return 0; + } + return c; + } + + case PAIR: + case SET: { + DBClientReplicaSet * set = new DBClientReplicaSet( _setName , _servers ); + if( ! set->connect() ){ + delete set; + errmsg = "connect failed to set "; + errmsg += toString(); + return 0; + } + return set; + } + + case SYNC: { + // TODO , don't copy + list<HostAndPort> l; + for ( unsigned i=0; i<_servers.size(); i++ ) + l.push_back( _servers[i] ); + return new SyncClusterConnection( l ); + } + + case INVALID: + throw UserException( 13421 , "trying to connect to invalid ConnectionString" ); + break; + } + + assert( 0 ); + return 0; + } + + ConnectionString ConnectionString::parse( const string& host , string& errmsg ){ + + string::size_type i = host.find( '/' ); + if ( i != string::npos ){ + // replica set + return ConnectionString( SET , host.substr( i + 1 ) , host.substr( 0 , i ) ); + } + + int numCommas = DBClientBase::countCommas( host ); + + if( numCommas == 0 ) + return ConnectionString( HostAndPort( host ) ); + + if ( numCommas == 1 ) + return ConnectionString( PAIR , host ); + + if ( numCommas == 2 ) + return ConnectionString( SYNC , host ); + + errmsg = (string)"invalid hostname [" + host + "]"; + return ConnectionString(); // INVALID + } + Query& Query::where(const string &jscode, BSONObj scope) { /* use where() before sort() and hint() and explain(), else this will assert. */ - assert( !obj.hasField("query") ); + assert( ! isComplex() ); BSONObjBuilder b; b.appendElements(obj); b.appendWhere(jscode, scope); @@ -39,7 +106,7 @@ namespace mongo { } void Query::makeComplex() { - if ( obj.hasElement( "query" ) ) + if ( isComplex() ) return; BSONObjBuilder b; b.append( "query", obj ); @@ -76,19 +143,36 @@ namespace mongo { return *this; } - bool Query::isComplex() const{ - return obj.hasElement( "query" ); + bool Query::isComplex( bool * hasDollar ) const{ + if ( obj.hasElement( "query" ) ){ + if ( hasDollar ) + hasDollar[0] = false; + return true; + } + + if ( obj.hasElement( "$query" ) ){ + if ( hasDollar ) + hasDollar[0] = true; + return true; + } + + return false; } BSONObj Query::getFilter() const { - if ( ! isComplex() ) + bool hasDollar; + if ( ! isComplex( &hasDollar ) ) return obj; - return obj.getObjectField( "query" ); + + return obj.getObjectField( hasDollar ? "$query" : "query" ); } BSONObj Query::getSort() const { if ( ! isComplex() ) return BSONObj(); - return obj.getObjectField( "orderby" ); + BSONObj ret = obj.getObjectField( "orderby" ); + if (ret.isEmpty()) + ret = obj.getObjectField( "$orderby" ); + return ret; } BSONObj Query::getHint() const { if ( ! isComplex() ) @@ -109,6 +193,17 @@ namespace mongo { return o["ok"].trueValue(); } + enum QueryOptions DBClientWithCommands::availableOptions() { + if ( !_haveCachedAvailableOptions ) { + BSONObj ret; + if ( runCommand( "admin", BSON( "availablequeryoptions" << 1 ), ret ) ) { + _cachedAvailableOptions = ( enum QueryOptions )( ret.getIntField( "options" ) ); + } + _haveCachedAvailableOptions = true; + } + return _cachedAvailableOptions; + } + inline bool DBClientWithCommands::runCommand(const string &dbname, const BSONObj& cmd, BSONObj &info, int options) { string ns = dbname + ".$cmd"; info = findOne(ns, cmd, 0 , options); @@ -133,7 +228,7 @@ namespace mongo { BSONObj res; if( !runCommand(ns.db.c_str(), cmd, res, options) ) uasserted(11010,string("count fails:") + res.toString()); - return res.getIntField("n"); + return res["n"].numberLong(); } BSONObj getlasterrorcmdobj = fromjson("{getlasterror:1}"); @@ -146,10 +241,14 @@ namespace mongo { string DBClientWithCommands::getLastError() { BSONObj info = getLastErrorDetailed(); + return getLastErrorString( info ); + } + + string DBClientWithCommands::getLastErrorString( const BSONObj& info ){ BSONElement e = info["err"]; if( e.eoo() ) return ""; if( e.type() == Object ) return e.toString(); - return e.str(); + return e.str(); } BSONObj getpreverrorcmdobj = fromjson("{getpreverror:1}"); @@ -223,13 +322,14 @@ namespace mongo { bool DBClientWithCommands::isMaster(bool& isMaster, BSONObj *info) { BSONObj o; - if ( info == 0 ) info = &o; + if ( info == 0 ) + info = &o; bool ok = runCommand("admin", ismastercmdobj, *info); - isMaster = (info->getIntField("ismaster") == 1); + isMaster = info->getField("ismaster").trueValue(); return ok; } - bool DBClientWithCommands::createCollection(const string &ns, unsigned size, bool capped, int max, BSONObj *info) { + bool DBClientWithCommands::createCollection(const string &ns, long long size, bool capped, int max, BSONObj *info) { BSONObj o; if ( info == 0 ) info = &o; BSONObjBuilder b; @@ -346,64 +446,9 @@ namespace mongo { string db = nsGetDB( ns ) + ".system.namespaces"; BSONObj q = BSON( "name" << ns ); - return count( db.c_str() , q ); - } - - - void testSort() { - DBClientConnection c; - string err; - if ( !c.connect("localhost", err) ) { - out() << "can't connect to server " << err << endl; - return; - } - - cout << "findOne returns:" << endl; - cout << c.findOne("test.foo", QUERY( "x" << 3 ) ).toString() << endl; - cout << c.findOne("test.foo", QUERY( "x" << 3 ).sort("name") ).toString() << endl; - - } - - /* TODO: unit tests should run this? */ - void testDbEval() { - DBClientConnection c; - string err; - if ( !c.connect("localhost", err) ) { - out() << "can't connect to server " << err << endl; - return; - } - - if( !c.auth("dwight", "u", "p", err) ) { - out() << "can't authenticate " << err << endl; - return; - } - - BSONObj info; - BSONElement retValue; - BSONObjBuilder b; - b.append("0", 99); - BSONObj args = b.done(); - bool ok = c.eval("dwight", "function() { return args[0]; }", info, retValue, &args); - out() << "eval ok=" << ok << endl; - out() << "retvalue=" << retValue.toString() << endl; - out() << "info=" << info.toString() << endl; - - out() << endl; - - int x = 3; - assert( c.eval("dwight", "function() { return 3; }", x) ); - - out() << "***\n"; - - BSONObj foo = fromjson("{\"x\":7}"); - out() << foo.toString() << endl; - int res=0; - ok = c.eval("dwight", "function(parm1) { return parm1.x; }", foo, res); - out() << ok << " retval:" << res << endl; + return count( db.c_str() , q ) != 0; } - void testPaired(); - /* --- dbclientconnection --- */ bool DBClientConnection::auth(const string &dbname, const string &username, const string &password_text, string& errmsg, bool digestPassword) { @@ -422,48 +467,42 @@ namespace mongo { return DBClientBase::auth(dbname, username, password.c_str(), errmsg, false); } - BSONObj DBClientInterface::findOne(const string &ns, Query query, const BSONObj *fieldsToReturn, int queryOptions) { + BSONObj DBClientInterface::findOne(const string &ns, const Query& query, const BSONObj *fieldsToReturn, int queryOptions) { auto_ptr<DBClientCursor> c = this->query(ns, query, 1, 0, fieldsToReturn, queryOptions); - massert( 10276 , "DBClientBase::findOne: transport error", c.get() ); + uassert( 10276 , "DBClientBase::findOne: transport error", c.get() ); + + if ( c->hasResultFlag( ResultFlag_ShardConfigStale ) ) + throw StaleConfigException( ns , "findOne has stale config" ); if ( !c->more() ) return BSONObj(); - return c->next().copy(); + return c->nextSafe().copy(); + } + + bool DBClientConnection::connect(const HostAndPort& server, string& errmsg){ + _server = server; + _serverString = _server.toString(); + return _connect( errmsg ); } - bool DBClientConnection::connect(const string &_serverAddress, string& errmsg) { - serverAddress = _serverAddress; + bool DBClientConnection::_connect( string& errmsg ){ + _serverString = _server.toString(); + // we keep around SockAddr for connection life -- maybe MessagingPort + // requires that? + server.reset(new SockAddr(_server.host().c_str(), _server.port())); + p.reset(new MessagingPort( _timeout, _logLevel )); - string ip; - int port; - size_t idx = serverAddress.find( ":" ); - if ( idx != string::npos ) { - port = strtol( serverAddress.substr( idx + 1 ).c_str(), 0, 10 ); - ip = serverAddress.substr( 0 , idx ); - ip = hostbyname(ip.c_str()); - } else { - port = CmdLine::DefaultDBPort; - ip = hostbyname( serverAddress.c_str() ); - } - if( ip.empty() ) { - stringstream ss; - ss << "client connect: couldn't parse/resolve hostname: " << _serverAddress; - errmsg = ss.str(); + if (server->getAddr() == "0.0.0.0"){ failed = true; return false; } - // we keep around SockAddr for connection life -- maybe MessagingPort - // requires that? - server = auto_ptr<SockAddr>(new SockAddr(ip.c_str(), port)); - p = auto_ptr<MessagingPort>(new MessagingPort()); - if ( !p->connect(*server) ) { stringstream ss; - ss << "couldn't connect to server " << serverAddress << " " << ip << ":" << port; + ss << "couldn't connect to server " << _serverString << '}'; errmsg = ss.str(); failed = true; return false; @@ -480,22 +519,21 @@ namespace mongo { return; lastReconnectTry = time(0); - log() << "trying reconnect to " << serverAddress << endl; + log(_logLevel) << "trying reconnect to " << _serverString << endl; string errmsg; - string tmp = serverAddress; failed = false; - if ( !connect(tmp.c_str(), errmsg) ) { - log() << "reconnect " << serverAddress << " failed " << errmsg << endl; + if ( ! _connect(errmsg) ) { + log(_logLevel) << "reconnect " << _serverString << " failed " << errmsg << endl; return; } - log() << "reconnect " << serverAddress << " ok" << endl; + log(_logLevel) << "reconnect " << _serverString << " ok" << endl; for( map< string, pair<string,string> >::iterator i = authCache.begin(); i != authCache.end(); i++ ) { const char *dbname = i->first.c_str(); const char *username = i->second.first.c_str(); const char *password = i->second.second.c_str(); if( !DBClientBase::auth(dbname, username, password, errmsg, false) ) - log() << "reconnect: auth failed db:" << dbname << " user:" << username << ' ' << errmsg << '\n'; + log(_logLevel) << "reconnect: auth failed db:" << dbname << " user:" << username << ' ' << errmsg << '\n'; } } @@ -516,13 +554,76 @@ namespace mongo { return auto_ptr< DBClientCursor >( 0 ); } + struct DBClientFunConvertor { + void operator()( DBClientCursorBatchIterator &i ) { + while( i.moreInCurrentBatch() ) { + _f( i.nextSafe() ); + } + } + boost::function<void(const BSONObj &)> _f; + }; + + unsigned long long DBClientConnection::query( boost::function<void(const BSONObj&)> f, const string& ns, Query query, const BSONObj *fieldsToReturn, int queryOptions ) { + DBClientFunConvertor fun; + fun._f = f; + boost::function<void(DBClientCursorBatchIterator &)> ptr( fun ); + return DBClientConnection::query( ptr, ns, query, fieldsToReturn, queryOptions ); + } + + unsigned long long DBClientConnection::query( boost::function<void(DBClientCursorBatchIterator &)> f, const string& ns, Query query, const BSONObj *fieldsToReturn, int queryOptions ) { + // mask options + queryOptions &= (int)( QueryOption_NoCursorTimeout | QueryOption_SlaveOk ); + unsigned long long n = 0; + + bool doExhaust = ( availableOptions() & QueryOption_Exhaust ); + if ( doExhaust ) { + queryOptions |= (int)QueryOption_Exhaust; + } + auto_ptr<DBClientCursor> c( this->query(ns, query, 0, 0, fieldsToReturn, queryOptions) ); + massert( 13386, "socket error for mapping query", c.get() ); + + if ( !doExhaust ) { + while( c->more() ) { + DBClientCursorBatchIterator i( *c ); + f( i ); + n += i.n(); + } + return n; + } + + try { + while( 1 ) { + while( c->moreInCurrentBatch() ) { + DBClientCursorBatchIterator i( *c ); + f( i ); + n += i.n(); + } + + if( c->getCursorId() == 0 ) + break; + + c->exhaustReceiveMore(); + } + } + catch(std::exception&) { + /* connection CANNOT be used anymore as more data may be on the way from the server. + we have to reconnect. + */ + failed = true; + p->shutdown(); + throw; + } + + return n; + } + void DBClientBase::insert( const string & ns , BSONObj obj ) { Message toSend; BufBuilder b; int opts = 0; - b.append( opts ); - b.append( ns ); + b.appendNum( opts ); + b.appendStr( ns ); obj.appendSelfToBufBuilder( b ); toSend.setData( dbInsert , b.buf() , b.len() ); @@ -535,8 +636,8 @@ namespace mongo { BufBuilder b; int opts = 0; - b.append( opts ); - b.append( ns ); + b.appendNum( opts ); + b.appendStr( ns ); for( vector< BSONObj >::const_iterator i = v.begin(); i != v.end(); ++i ) i->appendSelfToBufBuilder( b ); @@ -550,13 +651,13 @@ namespace mongo { BufBuilder b; int opts = 0; - b.append( opts ); - b.append( ns ); + b.appendNum( opts ); + b.appendStr( ns ); int flags = 0; if ( justOne ) - flags |= 1; - b.append( flags ); + flags |= RemoveOption_JustOne; + b.appendNum( flags ); obj.obj.appendSelfToBufBuilder( b ); @@ -568,13 +669,13 @@ namespace mongo { void DBClientBase::update( const string & ns , Query query , BSONObj obj , bool upsert , bool multi ) { BufBuilder b; - b.append( (int)0 ); // reserved - b.append( ns ); + b.appendNum( (int)0 ); // reserved + b.appendStr( ns ); int flags = 0; if ( upsert ) flags |= UpdateOption_Upsert; if ( multi ) flags |= UpdateOption_Multi; - b.append( flags ); + b.appendNum( flags ); query.obj.appendSelfToBufBuilder( b ); obj.appendSelfToBufBuilder( b ); @@ -599,7 +700,7 @@ namespace mongo { if ( ! runCommand( nsToDatabase( ns.c_str() ) , BSON( "deleteIndexes" << NamespaceString( ns ).coll << "index" << indexName ) , info ) ){ - log() << "dropIndex failed: " << info << endl; + log(_logLevel) << "dropIndex failed: " << info << endl; uassert( 10007 , "dropIndex failed" , 0 ); } resetIndexCache(); @@ -684,15 +785,21 @@ namespace mongo { /* -- DBClientCursor ---------------------------------------------- */ +#ifdef _DEBUG +#define CHECK_OBJECT( o , msg ) massert( 10337 , (string)"object not valid" + (msg) , (o).isValid() ) +#else +#define CHECK_OBJECT( o , msg ) +#endif + void assembleRequest( const string &ns, BSONObj query, int nToReturn, int nToSkip, const BSONObj *fieldsToReturn, int queryOptions, Message &toSend ) { CHECK_OBJECT( query , "assembleRequest query" ); // see query.h for the protocol we are using here. BufBuilder b; int opts = queryOptions; - b.append(opts); - b.append(ns.c_str()); - b.append(nToSkip); - b.append(nToReturn); + b.appendNum(opts); + b.appendStr(ns); + b.appendNum(nToSkip); + b.appendNum(nToReturn); query.appendSelfToBufBuilder(b); if ( fieldsToReturn ) fieldsToReturn->appendSelfToBufBuilder(b); @@ -713,6 +820,10 @@ namespace mongo { port().piggyBack( toSend ); } + void DBClientConnection::recv( Message &m ) { + port().recv(m); + } + bool DBClientConnection::call( Message &toSend, Message &response, bool assertOk ) { /* todo: this is very ugly messagingport::call returns an error code AND can throw an exception. we should make it return void and just throw an exception anytime @@ -722,7 +833,7 @@ namespace mongo { if ( !port().call(toSend, response) ) { failed = true; if ( assertOk ) - massert( 10278 , "dbclient error communicating with server", false); + uassert( 10278 , "dbclient error communicating with server", false); return false; } } @@ -736,178 +847,128 @@ namespace mongo { void DBClientConnection::checkResponse( const char *data, int nReturned ) { /* check for errors. the only one we really care about at this stage is "not master" */ - if ( clientPaired && nReturned ) { + if ( clientSet && nReturned ) { + assert(data); BSONObj o(data); BSONElement e = o.firstElement(); if ( strcmp(e.fieldName(), "$err") == 0 && e.type() == String && strncmp(e.valuestr(), "not master", 10) == 0 ) { - clientPaired->isntMaster(); + clientSet->isntMaster(); } } } - int DBClientCursor::nextBatchSize(){ - if ( nToReturn == 0 ) - return batchSize; - if ( batchSize == 0 ) - return nToReturn; + void DBClientConnection::killCursor( long long cursorId ){ + BufBuilder b; + b.appendNum( (int)0 ); // reserved + b.appendNum( (int)1 ); // number + b.appendNum( cursorId ); - return batchSize < nToReturn ? batchSize : nToReturn; - } - - bool DBClientCursor::init() { - Message toSend; - if ( !cursorId ) { - assembleRequest( ns, query, nextBatchSize() , nToSkip, fieldsToReturn, opts, toSend ); - } else { - BufBuilder b; - b.append( opts ); - b.append( ns.c_str() ); - b.append( nToReturn ); - b.append( cursorId ); - toSend.setData( dbGetMore, b.buf(), b.len() ); - } - if ( !connector->call( toSend, *m, false ) ) - return false; - if ( ! m->data ) - return false; - dataReceived(); - return true; + Message m; + m.setData( dbKillCursors , b.buf() , b.len() ); + + sayPiggyBack( m ); } - void DBClientCursor::requestMore() { - assert( cursorId && pos == nReturned ); - - if (haveLimit){ - nToReturn -= nReturned; - assert(nToReturn > 0); - } - BufBuilder b; - b.append(opts); - b.append(ns.c_str()); - b.append(nextBatchSize()); - b.append(cursorId); + /* --- class dbclientpaired --- */ - Message toSend; - toSend.setData(dbGetMore, b.buf(), b.len()); - auto_ptr<Message> response(new Message()); - connector->call( toSend, *response ); - - m = response; - dataReceived(); - } - - void DBClientCursor::dataReceived() { - QueryResult *qr = (QueryResult *) m->data; - resultFlags = qr->resultFlags(); - if ( qr->resultFlags() & QueryResult::ResultFlag_CursorNotFound ) { - // cursor id no longer valid at the server. - assert( qr->cursorId == 0 ); - cursorId = 0; // 0 indicates no longer valid (dead) - // TODO: should we throw a UserException here??? - } - if ( cursorId == 0 || ! ( opts & QueryOption_CursorTailable ) ) { - // only set initially: we don't want to kill it on end of data - // if it's a tailable cursor - cursorId = qr->cursorId; - } - nReturned = qr->nReturned; - pos = 0; - data = qr->data(); - - connector->checkResponse( data, nReturned ); - /* this assert would fire the way we currently work: - assert( nReturned || cursorId == 0 ); - */ + string DBClientReplicaSet::toString() { + return getServerAddress(); } - /** If true, safe to call next(). Requests more from server if necessary. */ - bool DBClientCursor::more() { - if ( !_putBack.empty() ) - return true; + DBClientReplicaSet::DBClientReplicaSet( const string& name , const vector<HostAndPort>& servers ) + : _name( name ) , _currentMaster( 0 ), _servers( servers ){ - if (haveLimit && pos >= nToReturn) - return false; - - if ( pos < nReturned ) - return true; - - if ( cursorId == 0 ) - return false; - - requestMore(); - return pos < nReturned; + for ( unsigned i=0; i<_servers.size(); i++ ) + _conns.push_back( new DBClientConnection( true , this ) ); } - - BSONObj DBClientCursor::next() { - assert( more() ); - if ( !_putBack.empty() ) { - BSONObj ret = _putBack.top(); - _putBack.pop(); - return ret; - } - pos++; - BSONObj o(data); - data += o.objsize(); - return o; - } - - DBClientCursor::~DBClientCursor() { - DESTRUCTOR_GUARD ( - if ( cursorId && _ownCursor ) { - BufBuilder b; - b.append( (int)0 ); // reserved - b.append( (int)1 ); // number - b.append( cursorId ); - - Message m; - m.setData( dbKillCursors , b.buf() , b.len() ); - - connector->sayPiggyBack( m ); - } - ); + + DBClientReplicaSet::~DBClientReplicaSet(){ + for ( unsigned i=0; i<_conns.size(); i++ ) + delete _conns[i]; + _conns.clear(); } - - /* --- class dbclientpaired --- */ - - string DBClientPaired::toString() { - stringstream ss; - ss << "state: " << master << '\n'; - ss << "left: " << left.toStringLong() << '\n'; - ss << "right: " << right.toStringLong() << '\n'; + + string DBClientReplicaSet::getServerAddress() const { + StringBuilder ss; + if ( _name.size() ) + ss << _name << "/"; + + for ( unsigned i=0; i<_servers.size(); i++ ){ + if ( i > 0 ) + ss << ","; + ss << _servers[i].toString(); + } return ss.str(); } -#pragma warning(disable: 4355) - DBClientPaired::DBClientPaired() : - left(true, this), right(true, this) - { - master = NotSetL; - } -#pragma warning(default: 4355) - /* find which server, the left or right, is currently master mode */ - void DBClientPaired::_checkMaster() { + void DBClientReplicaSet::_checkMaster() { + + bool triedQuickCheck = false; + + log( _logLevel + 1) << "_checkMaster on: " << toString() << endl; for ( int retry = 0; retry < 2; retry++ ) { - int x = master; - for ( int pass = 0; pass < 2; pass++ ) { - DBClientConnection& c = x == 0 ? left : right; + for ( unsigned i=0; i<_conns.size(); i++ ){ + DBClientConnection * c = _conns[i]; try { bool im; BSONObj o; - c.isMaster(im, &o); + c->isMaster(im, &o); + if ( retry ) - log() << "checkmaster: " << c.toString() << ' ' << o.toString() << '\n'; + log(_logLevel) << "checkmaster: " << c->toString() << ' ' << o << '\n'; + + string maybePrimary; + if ( o["hosts"].type() == Array ){ + if ( o["primary"].type() == String ) + maybePrimary = o["primary"].String(); + + BSONObjIterator hi(o["hosts"].Obj()); + while ( hi.more() ){ + string toCheck = hi.next().String(); + int found = -1; + for ( unsigned x=0; x<_servers.size(); x++ ){ + if ( toCheck == _servers[x].toString() ){ + found = x; + break; + } + } + + if ( found == -1 ){ + HostAndPort h( toCheck ); + _servers.push_back( h ); + _conns.push_back( new DBClientConnection( true, this ) ); + string temp; + _conns[ _conns.size() - 1 ]->connect( h , temp ); + log( _logLevel ) << "updated set to: " << toString() << endl; + } + + } + } + if ( im ) { - master = (State) (x + 2); + _currentMaster = c; return; } + + if ( maybePrimary.size() && ! triedQuickCheck ){ + for ( unsigned x=0; x<_servers.size(); x++ ){ + if ( _servers[i].toString() != maybePrimary ) + continue; + triedQuickCheck = true; + _conns[x]->isMaster( im , &o ); + if ( im ){ + _currentMaster = _conns[x]; + return; + } + } + } } - catch (AssertionException&) { + catch ( std::exception& e ) { if ( retry ) - log() << "checkmaster: caught exception " << c.toString() << '\n'; + log(_logLevel) << "checkmaster: caught exception " << c->toString() << ' ' << e.what() << endl; } - x = x^1; } sleepsecs(1); } @@ -915,36 +976,54 @@ namespace mongo { uassert( 10009 , "checkmaster: no master found", false); } - inline DBClientConnection& DBClientPaired::checkMaster() { - if ( master > NotSetR ) { + DBClientConnection * DBClientReplicaSet::checkMaster() { + if ( _currentMaster ){ // a master is selected. let's just make sure connection didn't die - DBClientConnection& c = master == Left ? left : right; - if ( !c.isFailed() ) - return c; - // after a failure, on the next checkMaster, start with the other - // server -- presumably it took over. (not critical which we check first, - // just will make the failover slightly faster if we guess right) - master = master == Left ? NotSetR : NotSetL; + if ( ! _currentMaster->isFailed() ) + return _currentMaster; + _currentMaster = 0; } _checkMaster(); - assert( master > NotSetR ); - return master == Left ? left : right; + assert( _currentMaster ); + return _currentMaster; } - DBClientConnection& DBClientPaired::slaveConn(){ - DBClientConnection& m = checkMaster(); - assert( ! m.isFailed() ); - return master == Left ? right : left; + DBClientConnection& DBClientReplicaSet::masterConn(){ + return *checkMaster(); } - bool DBClientPaired::connect(const string &serverHostname1, const string &serverHostname2) { + DBClientConnection& DBClientReplicaSet::slaveConn(){ + DBClientConnection * m = checkMaster(); + assert( ! m->isFailed() ); + + DBClientConnection * failedSlave = 0; + + for ( unsigned i=0; i<_conns.size(); i++ ){ + if ( m == _conns[i] ) + continue; + failedSlave = _conns[i]; + if ( _conns[i]->isFailed() ) + continue; + return *_conns[i]; + } + + assert(failedSlave); + return *failedSlave; + } + + bool DBClientReplicaSet::connect(){ string errmsg; - bool l = left.connect(serverHostname1, errmsg); - bool r = right.connect(serverHostname2, errmsg); - master = l ? NotSetL : NotSetR; - if ( !l && !r ) // it would be ok to fall through, but checkMaster will then try an immediate reconnect which is slow + + bool anyGood = false; + for ( unsigned i=0; i<_conns.size(); i++ ){ + if ( _conns[i]->connect( _servers[i] , errmsg ) ) + anyGood = true; + } + + if ( ! anyGood ) return false; + try { checkMaster(); } @@ -954,61 +1033,44 @@ namespace mongo { return true; } - bool DBClientPaired::connect(string hostpairstring) { - size_t comma = hostpairstring.find( "," ); - uassert( 10010 , "bad hostpairstring", comma != string::npos); - return connect( hostpairstring.substr( 0 , comma ) , hostpairstring.substr( comma + 1 ) ); - } - - bool DBClientPaired::auth(const string &dbname, const string &username, const string &pwd, string& errmsg) { - DBClientConnection& m = checkMaster(); - if( !m.auth(dbname, username, pwd, errmsg) ) + bool DBClientReplicaSet::auth(const string &dbname, const string &username, const string &pwd, string& errmsg, bool digestPassword ) { + DBClientConnection * m = checkMaster(); + if( !m->auth(dbname, username, pwd, errmsg, digestPassword ) ) return false; + /* we try to authentiate with the other half of the pair -- even if down, that way the authInfo is cached. */ - string e; - try { - if( &m == &left ) - right.auth(dbname, username, pwd, e); - else - left.auth(dbname, username, pwd, e); - } - catch( AssertionException&) { - } + for ( unsigned i=0; i<_conns.size(); i++ ){ + if ( _conns[i] == m ) + continue; + try { + string e; + _conns[i]->auth( dbname , username , pwd , e , digestPassword ); + } + catch ( AssertionException& ){ + } + } + return true; } - auto_ptr<DBClientCursor> DBClientPaired::query(const string &a, Query b, int c, int d, - const BSONObj *e, int f, int g) - { - return checkMaster().query(a,b,c,d,e,f,g); - } - - BSONObj DBClientPaired::findOne(const string &a, Query b, const BSONObj *c, int d) { - return checkMaster().findOne(a,b,c,d); - } - - void testPaired() { - DBClientPaired p; - log() << "connect returns " << p.connect("localhost:27017", "localhost:27018") << endl; - - //DBClientConnection p(true); - string errmsg; - // log() << "connect " << p.connect("localhost", errmsg) << endl; - log() << "auth " << p.auth("dwight", "u", "p", errmsg) << endl; - - while( 1 ) { - sleepsecs(3); - try { - log() << "findone returns " << p.findOne("dwight.foo", BSONObj()).toString() << endl; - sleepsecs(3); - BSONObj info; - bool im; - log() << "ismaster returns " << p.isMaster(im,&info) << " info: " << info.toString() << endl; - } - catch(...) { - cout << "caught exception" << endl; - } - } - } + auto_ptr<DBClientCursor> DBClientReplicaSet::query(const string &a, Query b, int c, int d, + const BSONObj *e, int f, int g){ + // TODO: if slave ok is set go to a slave + return checkMaster()->query(a,b,c,d,e,f,g); + } + BSONObj DBClientReplicaSet::findOne(const string &a, const Query& b, const BSONObj *c, int d) { + return checkMaster()->findOne(a,b,c,d); + } + + bool serverAlive( const string &uri ) { + DBClientConnection c( false, 0, 20 ); // potentially the connection to server could fail while we're checking if it's alive - so use timeouts + string err; + if ( !c.connect( uri, err ) ) + return false; + if ( !c.simpleCommand( "admin", 0, "ping" ) ) + return false; + return true; + } + } // namespace mongo |