diff options
author | Antonin Kral <a.kral@bobek.cz> | 2011-03-17 00:05:43 +0100 |
---|---|---|
committer | Antonin Kral <a.kral@bobek.cz> | 2011-03-17 00:05:43 +0100 |
commit | 582fc32574a3b158c81e49cb00e6ae59205e66ba (patch) | |
tree | ac64a3243e0d2121709f685695247052858115c8 /client/dbclient.cpp | |
parent | 2761bffa96595ac1698d86bbc2e95ebb0d4d6e93 (diff) | |
download | mongodb-582fc32574a3b158c81e49cb00e6ae59205e66ba.tar.gz |
Imported Upstream version 1.8.0
Diffstat (limited to 'client/dbclient.cpp')
-rw-r--r-- | client/dbclient.cpp | 587 |
1 files changed, 229 insertions, 358 deletions
diff --git a/client/dbclient.cpp b/client/dbclient.cpp index aa9b7ae..b4214ab 100644 --- a/client/dbclient.cpp +++ b/client/dbclient.cpp @@ -31,8 +31,41 @@ namespace mongo { + void ConnectionString::_fillServers( string s ) { + + { + string::size_type idx = s.find( '/' ); + if ( idx != string::npos ) { + _setName = s.substr( 0 , idx ); + s = s.substr( idx + 1 ); + _type = SET; + } + } + + string::size_type idx; + while ( ( idx = s.find( ',' ) ) != string::npos ) { + _servers.push_back( s.substr( 0 , idx ) ); + s = s.substr( idx + 1 ); + } + _servers.push_back( s ); + + } + + void ConnectionString::_finishInit() { + stringstream ss; + if ( _type == SET ) + ss << _setName << "/"; + for ( unsigned i=0; i<_servers.size(); i++ ) { + if ( i > 0 ) + ss << ","; + ss << _servers[i].toString(); + } + _string = ss.str(); + } + + DBClientBase* ConnectionString::connect( string& errmsg ) const { - switch ( _type ){ + switch ( _type ) { case MASTER: { DBClientConnection * c = new DBClientConnection(true); log(1) << "creating new connection to:" << _servers[0] << endl; @@ -42,11 +75,11 @@ namespace mongo { } return c; } - - case PAIR: + + case PAIR: case SET: { DBClientReplicaSet * set = new DBClientReplicaSet( _setName , _servers ); - if( ! set->connect() ){ + if( ! set->connect() ) { delete set; errmsg = "connect failed to set "; errmsg += toString(); @@ -54,7 +87,7 @@ namespace mongo { } return set; } - + case SYNC: { // TODO , don't copy list<HostAndPort> l; @@ -62,40 +95,58 @@ namespace mongo { l.push_back( _servers[i] ); return new SyncClusterConnection( l ); } - + case INVALID: throw UserException( 13421 , "trying to connect to invalid ConnectionString" ); break; } - + assert( 0 ); return 0; } - ConnectionString ConnectionString::parse( const string& host , string& errmsg ){ - + ConnectionString ConnectionString::parse( const string& host , string& errmsg ) { + string::size_type i = host.find( '/' ); - if ( i != string::npos ){ + if ( i != string::npos && i != 0) { // replica set return ConnectionString( SET , host.substr( i + 1 ) , host.substr( 0 , i ) ); } - int numCommas = DBClientBase::countCommas( host ); - - if( numCommas == 0 ) + int numCommas = str::count( host , ',' ); + + if( numCommas == 0 ) return ConnectionString( HostAndPort( host ) ); - - if ( numCommas == 1 ) + + if ( numCommas == 1 ) return ConnectionString( PAIR , host ); if ( numCommas == 2 ) return ConnectionString( SYNC , host ); - + errmsg = (string)"invalid hostname [" + host + "]"; return ConnectionString(); // INVALID } - Query& Query::where(const string &jscode, BSONObj scope) { + string ConnectionString::typeToString( ConnectionType type ) { + switch ( type ) { + case INVALID: + return "invalid"; + case MASTER: + return "master"; + case PAIR: + return "pair"; + case SET: + return "set"; + case SYNC: + return "sync"; + } + assert(0); + return ""; + } + + + Query& Query::where(const string &jscode, BSONObj scope) { /* use where() before sort() and hint() and explain(), else this will assert. */ assert( ! isComplex() ); BSONObjBuilder b; @@ -113,44 +164,44 @@ namespace mongo { obj = b.obj(); } - Query& Query::sort(const BSONObj& s) { + Query& Query::sort(const BSONObj& s) { appendComplex( "orderby", s ); - return *this; + return *this; } Query& Query::hint(BSONObj keyPattern) { appendComplex( "$hint", keyPattern ); - return *this; + return *this; } Query& Query::explain() { appendComplex( "$explain", true ); - return *this; + return *this; } - + Query& Query::snapshot() { appendComplex( "$snapshot", true ); - return *this; + return *this; } - + Query& Query::minKey( const BSONObj &val ) { appendComplex( "$min", val ); - return *this; + return *this; } Query& Query::maxKey( const BSONObj &val ) { appendComplex( "$max", val ); - return *this; + return *this; } - bool Query::isComplex( bool * hasDollar ) const{ - if ( obj.hasElement( "query" ) ){ + bool Query::isComplex( bool * hasDollar ) const { + if ( obj.hasElement( "query" ) ) { if ( hasDollar ) hasDollar[0] = false; return true; } - if ( obj.hasElement( "$query" ) ){ + if ( obj.hasElement( "$query" ) ) { if ( hasDollar ) hasDollar[0] = true; return true; @@ -158,12 +209,12 @@ namespace mongo { return false; } - + BSONObj Query::getFilter() const { bool hasDollar; if ( ! isComplex( &hasDollar ) ) return obj; - + return obj.getObjectField( hasDollar ? "$query" : "query" ); } BSONObj Query::getSort() const { @@ -182,8 +233,8 @@ namespace mongo { bool Query::isExplain() const { return isComplex() && obj.getBoolField( "$explain" ); } - - string Query::toString() const{ + + string Query::toString() const { return obj.toString(); } @@ -203,7 +254,7 @@ namespace mongo { } return _cachedAvailableOptions; } - + inline bool DBClientWithCommands::runCommand(const string &dbname, const BSONObj& cmd, BSONObj &info, int options) { string ns = dbname + ".$cmd"; info = findOne(ns, cmd, 0 , options); @@ -222,38 +273,50 @@ namespace mongo { return runCommand(dbname, b.done(), *info); } - unsigned long long DBClientWithCommands::count(const string &_ns, const BSONObj& query, int options) { - NamespaceString ns(_ns); - BSONObj cmd = BSON( "count" << ns.coll << "query" << query ); + unsigned long long DBClientWithCommands::count(const string &myns, const BSONObj& query, int options, int limit, int skip ) { + NamespaceString ns(myns); + BSONObj cmd = _countCmd( myns , query , options , limit , skip ); BSONObj res; if( !runCommand(ns.db.c_str(), cmd, res, options) ) uasserted(11010,string("count fails:") + res.toString()); return res["n"].numberLong(); } + BSONObj DBClientWithCommands::_countCmd(const string &myns, const BSONObj& query, int options, int limit, int skip ) { + NamespaceString ns(myns); + BSONObjBuilder b; + b.append( "count" , ns.coll ); + b.append( "query" , query ); + if ( limit ) + b.append( "limit" , limit ); + if ( skip ) + b.append( "skip" , skip ); + return b.obj(); + } + BSONObj getlasterrorcmdobj = fromjson("{getlasterror:1}"); - BSONObj DBClientWithCommands::getLastErrorDetailed() { + BSONObj DBClientWithCommands::getLastErrorDetailed() { BSONObj info; runCommand("admin", getlasterrorcmdobj, info); - return info; + return info; } - string DBClientWithCommands::getLastError() { + string DBClientWithCommands::getLastError() { BSONObj info = getLastErrorDetailed(); return getLastErrorString( info ); } - - string DBClientWithCommands::getLastErrorString( const BSONObj& info ){ + + string DBClientWithCommands::getLastErrorString( const BSONObj& info ) { BSONElement e = info["err"]; if( e.eoo() ) return ""; if( e.type() == Object ) return e.toString(); - return e.str(); + return e.str(); } BSONObj getpreverrorcmdobj = fromjson("{getpreverror:1}"); - BSONObj DBClientWithCommands::getPrevError() { + BSONObj DBClientWithCommands::getPrevError() { BSONObj info; runCommand("admin", getpreverrorcmdobj, info); return info; @@ -261,7 +324,7 @@ namespace mongo { BSONObj getnoncecmdobj = fromjson("{getnonce:1}"); - string DBClientWithCommands::createPasswordDigest( const string & username , const string & clearTextPassword ){ + string DBClientWithCommands::createPasswordDigest( const string & username , const string & clearTextPassword ) { md5digest d; { md5_state_t st; @@ -275,11 +338,9 @@ namespace mongo { } bool DBClientWithCommands::auth(const string &dbname, const string &username, const string &password_text, string& errmsg, bool digestPassword) { - //cout << "TEMP AUTH " << toString() << dbname << ' ' << username << ' ' << password_text << ' ' << digestPassword << endl; - - string password = password_text; - if( digestPassword ) - password = createPasswordDigest( username , password_text ); + string password = password_text; + if( digestPassword ) + password = createPasswordDigest( username , password_text ); BSONObj info; string nonce; @@ -310,8 +371,8 @@ namespace mongo { b << "key" << digestToString( d ); authCmd = b.done(); } - - if( runCommand(dbname, authCmd, info) ) + + if( runCommand(dbname, authCmd, info) ) return true; errmsg = info.toString(); @@ -322,7 +383,7 @@ namespace mongo { bool DBClientWithCommands::isMaster(bool& isMaster, BSONObj *info) { BSONObj o; - if ( info == 0 ) + if ( info == 0 ) info = &o; bool ok = runCommand("admin", ismastercmdobj, *info); isMaster = info->getField("ismaster").trueValue(); @@ -331,7 +392,7 @@ namespace mongo { bool DBClientWithCommands::createCollection(const string &ns, long long size, bool capped, int max, BSONObj *info) { BSONObj o; - if ( info == 0 ) info = &o; + if ( info == 0 ) info = &o; BSONObjBuilder b; string db = nsToDatabase(ns.c_str()); b.append("create", ns.c_str() + db.length() + 1); @@ -381,11 +442,11 @@ namespace mongo { return false; } - BSONObj DBClientWithCommands::mapreduce(const string &ns, const string &jsmapf, const string &jsreducef, BSONObj query, const string& outputcolname) { + BSONObj DBClientWithCommands::mapreduce(const string &ns, const string &jsmapf, const string &jsreducef, BSONObj query, const string& outputcolname) { BSONObjBuilder b; b.append("mapreduce", nsGetCollection(ns)); - b.appendCode("map", jsmapf.c_str()); - b.appendCode("reduce", jsreducef.c_str()); + b.appendCode("map", jsmapf); + b.appendCode("reduce", jsreducef); if( !query.isEmpty() ) b.append("query", query); if( !outputcolname.empty() ) @@ -397,7 +458,7 @@ namespace mongo { bool DBClientWithCommands::eval(const string &dbname, const string &jscode, BSONObj& info, BSONElement& retValue, BSONObj *args) { BSONObjBuilder b; - b.appendCode("$eval", jscode.c_str()); + b.appendCode("$eval", jscode); if ( args ) b.appendArray("args", *args); bool ok = runCommand(dbname, b.done(), info); @@ -412,27 +473,27 @@ namespace mongo { return eval(dbname, jscode, info, retValue); } - list<string> DBClientWithCommands::getDatabaseNames(){ + list<string> DBClientWithCommands::getDatabaseNames() { BSONObj info; uassert( 10005 , "listdatabases failed" , runCommand( "admin" , BSON( "listDatabases" << 1 ) , info ) ); uassert( 10006 , "listDatabases.databases not array" , info["databases"].type() == Array ); - + list<string> names; - + BSONObjIterator i( info["databases"].embeddedObjectUserCheck() ); - while ( i.more() ){ + while ( i.more() ) { names.push_back( i.next().embeddedObjectUserCheck()["name"].valuestr() ); } return names; } - list<string> DBClientWithCommands::getCollectionNames( const string& db ){ + list<string> DBClientWithCommands::getCollectionNames( const string& db ) { list<string> names; - + string ns = db + ".system.namespaces"; auto_ptr<DBClientCursor> c = query( ns.c_str() , BSONObj() ); - while ( c->more() ){ + while ( c->more() ) { string name = c->next()["name"].valuestr(); if ( name.find( "$" ) != string::npos ) continue; @@ -441,37 +502,37 @@ namespace mongo { return names; } - bool DBClientWithCommands::exists( const string& ns ){ + bool DBClientWithCommands::exists( const string& ns ) { list<string> names; - + string db = nsGetDB( ns ) + ".system.namespaces"; BSONObj q = BSON( "name" << ns ); - return count( db.c_str() , q ) != 0; + return count( db.c_str() , q, QueryOption_SlaveOk ) != 0; } /* --- dbclientconnection --- */ - bool DBClientConnection::auth(const string &dbname, const string &username, const string &password_text, string& errmsg, bool digestPassword) { - string password = password_text; - if( digestPassword ) - password = createPasswordDigest( username , password_text ); + bool DBClientConnection::auth(const string &dbname, const string &username, const string &password_text, string& errmsg, bool digestPassword) { + string password = password_text; + if( digestPassword ) + password = createPasswordDigest( username , password_text ); - if( autoReconnect ) { - /* note we remember the auth info before we attempt to auth -- if the connection is broken, we will - then have it for the next autoreconnect attempt. - */ - pair<string,string> p = pair<string,string>(username, password); - authCache[dbname] = p; - } + if( autoReconnect ) { + /* note we remember the auth info before we attempt to auth -- if the connection is broken, we will + then have it for the next autoreconnect attempt. + */ + pair<string,string> p = pair<string,string>(username, password); + authCache[dbname] = p; + } - return DBClientBase::auth(dbname, username, password.c_str(), errmsg, false); - } + return DBClientBase::auth(dbname, username, password.c_str(), errmsg, false); + } BSONObj DBClientInterface::findOne(const string &ns, const Query& query, const BSONObj *fieldsToReturn, int queryOptions) { auto_ptr<DBClientCursor> c = this->query(ns, query, 1, 0, fieldsToReturn, queryOptions); - uassert( 10276 , "DBClientBase::findOne: transport error", c.get() ); + uassert( 10276 , str::stream() << "DBClientBase::findOne: transport error: " << getServerAddress() << " query: " << query.toString(), c.get() ); if ( c->hasResultFlag( ResultFlag_ShardConfigStale ) ) throw StaleConfigException( ns , "findOne has stale config" ); @@ -482,20 +543,20 @@ namespace mongo { return c->nextSafe().copy(); } - bool DBClientConnection::connect(const HostAndPort& server, string& errmsg){ + bool DBClientConnection::connect(const HostAndPort& server, string& errmsg) { _server = server; _serverString = _server.toString(); return _connect( errmsg ); } - bool DBClientConnection::_connect( string& errmsg ){ + bool DBClientConnection::_connect( string& errmsg ) { _serverString = _server.toString(); // we keep around SockAddr for connection life -- maybe MessagingPort // requires that? server.reset(new SockAddr(_server.host().c_str(), _server.port())); - p.reset(new MessagingPort( _timeout, _logLevel )); + p.reset(new MessagingPort( _so_timeout, _logLevel )); - if (server->getAddr() == "0.0.0.0"){ + if (server->getAddr() == "0.0.0.0") { failed = true; return false; } @@ -513,35 +574,39 @@ namespace mongo { void DBClientConnection::_checkConnection() { if ( !failed ) return; - if ( lastReconnectTry && time(0)-lastReconnectTry < 2 ) - return; + if ( lastReconnectTry && time(0)-lastReconnectTry < 2 ) { + // we wait a little before reconnect attempt to avoid constant hammering. + // but we throw we don't want to try to use a connection in a bad state + throw SocketException(SocketException::FAILED_STATE); + } if ( !autoReconnect ) - return; + throw SocketException(SocketException::FAILED_STATE); lastReconnectTry = time(0); log(_logLevel) << "trying reconnect to " << _serverString << endl; string errmsg; failed = false; - if ( ! _connect(errmsg) ) { + if ( ! _connect(errmsg) ) { + failed = true; log(_logLevel) << "reconnect " << _serverString << " failed " << errmsg << endl; - return; - } + throw SocketException(SocketException::CONNECT_ERROR); + } - log(_logLevel) << "reconnect " << _serverString << " ok" << endl; - for( map< string, pair<string,string> >::iterator i = authCache.begin(); i != authCache.end(); i++ ) { - const char *dbname = i->first.c_str(); - const char *username = i->second.first.c_str(); - const char *password = i->second.second.c_str(); - if( !DBClientBase::auth(dbname, username, password, errmsg, false) ) - log(_logLevel) << "reconnect: auth failed db:" << dbname << " user:" << username << ' ' << errmsg << '\n'; - } + log(_logLevel) << "reconnect " << _serverString << " ok" << endl; + for( map< string, pair<string,string> >::iterator i = authCache.begin(); i != authCache.end(); i++ ) { + const char *dbname = i->first.c_str(); + const char *username = i->second.first.c_str(); + const char *password = i->second.second.c_str(); + if( !DBClientBase::auth(dbname, username, password, errmsg, false) ) + log(_logLevel) << "reconnect: auth failed db:" << dbname << " user:" << username << ' ' << errmsg << '\n'; + } } auto_ptr<DBClientCursor> DBClientBase::query(const string &ns, Query query, int nToReturn, - int nToSkip, const BSONObj *fieldsToReturn, int queryOptions , int batchSize ) { + int nToSkip, const BSONObj *fieldsToReturn, int queryOptions , int batchSize ) { auto_ptr<DBClientCursor> c( new DBClientCursor( this, - ns, query.obj, nToReturn, nToSkip, - fieldsToReturn, queryOptions , batchSize ) ); + ns, query.obj, nToReturn, nToSkip, + fieldsToReturn, queryOptions , batchSize ) ); if ( c->init() ) return c; return auto_ptr< DBClientCursor >( 0 ); @@ -562,14 +627,14 @@ namespace mongo { } boost::function<void(const BSONObj &)> _f; }; - + unsigned long long DBClientConnection::query( boost::function<void(const BSONObj&)> f, const string& ns, Query query, const BSONObj *fieldsToReturn, int queryOptions ) { DBClientFunConvertor fun; fun._f = f; boost::function<void(DBClientCursorBatchIterator &)> ptr( fun ); return DBClientConnection::query( ptr, ns, query, fieldsToReturn, queryOptions ); } - + unsigned long long DBClientConnection::query( boost::function<void(DBClientCursorBatchIterator &)> f, const string& ns, Query query, const BSONObj *fieldsToReturn, int queryOptions ) { // mask options queryOptions &= (int)( QueryOption_NoCursorTimeout | QueryOption_SlaveOk ); @@ -577,11 +642,11 @@ namespace mongo { bool doExhaust = ( availableOptions() & QueryOption_Exhaust ); if ( doExhaust ) { - queryOptions |= (int)QueryOption_Exhaust; + queryOptions |= (int)QueryOption_Exhaust; } auto_ptr<DBClientCursor> c( this->query(ns, query, 0, 0, fieldsToReturn, queryOptions) ); - massert( 13386, "socket error for mapping query", c.get() ); - + uassert( 13386, "socket error for mapping query", c.get() ); + if ( !doExhaust ) { while( c->more() ) { DBClientCursorBatchIterator i( *c ); @@ -591,21 +656,21 @@ namespace mongo { return n; } - try { - while( 1 ) { - while( c->moreInCurrentBatch() ) { + try { + while( 1 ) { + while( c->moreInCurrentBatch() ) { DBClientCursorBatchIterator i( *c ); f( i ); n += i.n(); } - if( c->getCursorId() == 0 ) + if( c->getCursorId() == 0 ) break; c->exhaustReceiveMore(); } } - catch(std::exception&) { + catch(std::exception&) { /* connection CANNOT be used anymore as more data may be on the way from the server. we have to reconnect. */ @@ -633,16 +698,16 @@ namespace mongo { void DBClientBase::insert( const string & ns , const vector< BSONObj > &v ) { Message toSend; - + BufBuilder b; int opts = 0; b.appendNum( opts ); b.appendStr( ns ); for( vector< BSONObj >::const_iterator i = v.begin(); i != v.end(); ++i ) i->appendSelfToBufBuilder( b ); - + toSend.setData( dbInsert, b.buf(), b.len() ); - + say( toSend ); } @@ -686,63 +751,63 @@ namespace mongo { say( toSend ); } - auto_ptr<DBClientCursor> DBClientWithCommands::getIndexes( const string &ns ){ + auto_ptr<DBClientCursor> DBClientWithCommands::getIndexes( const string &ns ) { return query( Namespace( ns.c_str() ).getSisterNS( "system.indexes" ).c_str() , BSON( "ns" << ns ) ); } - - void DBClientWithCommands::dropIndex( const string& ns , BSONObj keys ){ + + void DBClientWithCommands::dropIndex( const string& ns , BSONObj keys ) { dropIndex( ns , genIndexName( keys ) ); } - void DBClientWithCommands::dropIndex( const string& ns , const string& indexName ){ + void DBClientWithCommands::dropIndex( const string& ns , const string& indexName ) { BSONObj info; - if ( ! runCommand( nsToDatabase( ns.c_str() ) , - BSON( "deleteIndexes" << NamespaceString( ns ).coll << "index" << indexName ) , - info ) ){ + if ( ! runCommand( nsToDatabase( ns.c_str() ) , + BSON( "deleteIndexes" << NamespaceString( ns ).coll << "index" << indexName ) , + info ) ) { log(_logLevel) << "dropIndex failed: " << info << endl; uassert( 10007 , "dropIndex failed" , 0 ); } resetIndexCache(); } - - void DBClientWithCommands::dropIndexes( const string& ns ){ + + void DBClientWithCommands::dropIndexes( const string& ns ) { BSONObj info; - uassert( 10008 , "dropIndexes failed" , runCommand( nsToDatabase( ns.c_str() ) , - BSON( "deleteIndexes" << NamespaceString( ns ).coll << "index" << "*") , - info ) ); + uassert( 10008 , "dropIndexes failed" , runCommand( nsToDatabase( ns.c_str() ) , + BSON( "deleteIndexes" << NamespaceString( ns ).coll << "index" << "*") , + info ) ); resetIndexCache(); } - void DBClientWithCommands::reIndex( const string& ns ){ + void DBClientWithCommands::reIndex( const string& ns ) { list<BSONObj> all; auto_ptr<DBClientCursor> i = getIndexes( ns ); - while ( i->more() ){ + while ( i->more() ) { all.push_back( i->next().getOwned() ); } - + dropIndexes( ns ); - - for ( list<BSONObj>::iterator i=all.begin(); i!=all.end(); i++ ){ + + for ( list<BSONObj>::iterator i=all.begin(); i!=all.end(); i++ ) { BSONObj o = *i; insert( Namespace( ns.c_str() ).getSisterNS( "system.indexes" ).c_str() , o ); } - + } - - string DBClientWithCommands::genIndexName( const BSONObj& keys ){ + + string DBClientWithCommands::genIndexName( const BSONObj& keys ) { stringstream ss; - + bool first = 1; for ( BSONObjIterator i(keys); i.more(); ) { BSONElement f = i.next(); - + if ( first ) first = 0; else ss << "_"; - + ss << f.fieldName() << "_"; if( f.isNumber() ) ss << f.numberInt(); @@ -750,7 +815,7 @@ namespace mongo { return ss.str(); } - bool DBClientWithCommands::ensureIndex( const string &ns , BSONObj keys , bool unique, const string & name ) { + bool DBClientWithCommands::ensureIndex( const string &ns , BSONObj keys , bool unique, const string & name , bool cache ) { BSONObjBuilder toSave; toSave.append( "ns" , ns ); toSave.append( "key" , keys ); @@ -767,13 +832,15 @@ namespace mongo { toSave.append( "name" , nn ); cacheKey += nn; } - + if ( unique ) toSave.appendBool( "unique", unique ); if ( _seenIndexes.count( cacheKey ) ) return 0; - _seenIndexes.insert( cacheKey ); + + if ( cache ) + _seenIndexes.insert( cacheKey ); insert( Namespace( ns.c_str() ).getSisterNS( "system.indexes" ).c_str() , toSave.obj() ); return 1; @@ -808,9 +875,10 @@ namespace mongo { void DBClientConnection::say( Message &toSend ) { checkConnection(); - try { + try { port().say( toSend ); - } catch( SocketException & ) { + } + catch( SocketException & ) { failed = true; throw; } @@ -820,24 +888,25 @@ namespace mongo { port().piggyBack( toSend ); } - void DBClientConnection::recv( Message &m ) { + void DBClientConnection::recv( Message &m ) { port().recv(m); } - bool DBClientConnection::call( Message &toSend, Message &response, bool assertOk ) { - /* todo: this is very ugly messagingport::call returns an error code AND can throw - an exception. we should make it return void and just throw an exception anytime + bool DBClientConnection::call( Message &toSend, Message &response, bool assertOk , string * actualServer ) { + /* todo: this is very ugly messagingport::call returns an error code AND can throw + an exception. we should make it return void and just throw an exception anytime it fails */ - try { + try { if ( !port().call(toSend, response) ) { failed = true; if ( assertOk ) - uassert( 10278 , "dbclient error communicating with server", false); + uasserted( 10278 , str::stream() << "dbclient error communicating with server: " << getServerAddress() ); + return false; } } - catch( SocketException & ) { + catch( SocketException & ) { failed = true; throw; } @@ -858,222 +927,24 @@ namespace mongo { } } - void DBClientConnection::killCursor( long long cursorId ){ + void DBClientConnection::killCursor( long long cursorId ) { BufBuilder b; b.appendNum( (int)0 ); // reserved b.appendNum( (int)1 ); // number b.appendNum( cursorId ); - + Message m; m.setData( dbKillCursors , b.buf() , b.len() ); - sayPiggyBack( m ); + if ( _lazyKillCursor ) + sayPiggyBack( m ); + else + say(m); } - /* --- class dbclientpaired --- */ + AtomicUInt DBClientConnection::_numConnections; + bool DBClientConnection::_lazyKillCursor = true; - string DBClientReplicaSet::toString() { - return getServerAddress(); - } - - DBClientReplicaSet::DBClientReplicaSet( const string& name , const vector<HostAndPort>& servers ) - : _name( name ) , _currentMaster( 0 ), _servers( servers ){ - - for ( unsigned i=0; i<_servers.size(); i++ ) - _conns.push_back( new DBClientConnection( true , this ) ); - } - - DBClientReplicaSet::~DBClientReplicaSet(){ - for ( unsigned i=0; i<_conns.size(); i++ ) - delete _conns[i]; - _conns.clear(); - } - - string DBClientReplicaSet::getServerAddress() const { - StringBuilder ss; - if ( _name.size() ) - ss << _name << "/"; - - for ( unsigned i=0; i<_servers.size(); i++ ){ - if ( i > 0 ) - ss << ","; - ss << _servers[i].toString(); - } - return ss.str(); - } - - /* find which server, the left or right, is currently master mode */ - void DBClientReplicaSet::_checkMaster() { - - bool triedQuickCheck = false; - - log( _logLevel + 1) << "_checkMaster on: " << toString() << endl; - for ( int retry = 0; retry < 2; retry++ ) { - for ( unsigned i=0; i<_conns.size(); i++ ){ - DBClientConnection * c = _conns[i]; - try { - bool im; - BSONObj o; - c->isMaster(im, &o); - - if ( retry ) - log(_logLevel) << "checkmaster: " << c->toString() << ' ' << o << '\n'; - - string maybePrimary; - if ( o["hosts"].type() == Array ){ - if ( o["primary"].type() == String ) - maybePrimary = o["primary"].String(); - - BSONObjIterator hi(o["hosts"].Obj()); - while ( hi.more() ){ - string toCheck = hi.next().String(); - int found = -1; - for ( unsigned x=0; x<_servers.size(); x++ ){ - if ( toCheck == _servers[x].toString() ){ - found = x; - break; - } - } - - if ( found == -1 ){ - HostAndPort h( toCheck ); - _servers.push_back( h ); - _conns.push_back( new DBClientConnection( true, this ) ); - string temp; - _conns[ _conns.size() - 1 ]->connect( h , temp ); - log( _logLevel ) << "updated set to: " << toString() << endl; - } - - } - } - - if ( im ) { - _currentMaster = c; - return; - } - - if ( maybePrimary.size() && ! triedQuickCheck ){ - for ( unsigned x=0; x<_servers.size(); x++ ){ - if ( _servers[i].toString() != maybePrimary ) - continue; - triedQuickCheck = true; - _conns[x]->isMaster( im , &o ); - if ( im ){ - _currentMaster = _conns[x]; - return; - } - } - } - } - catch ( std::exception& e ) { - if ( retry ) - log(_logLevel) << "checkmaster: caught exception " << c->toString() << ' ' << e.what() << endl; - } - } - sleepsecs(1); - } - - uassert( 10009 , "checkmaster: no master found", false); - } - - DBClientConnection * DBClientReplicaSet::checkMaster() { - if ( _currentMaster ){ - // a master is selected. let's just make sure connection didn't die - if ( ! _currentMaster->isFailed() ) - return _currentMaster; - _currentMaster = 0; - } - - _checkMaster(); - assert( _currentMaster ); - return _currentMaster; - } - - DBClientConnection& DBClientReplicaSet::masterConn(){ - return *checkMaster(); - } - - DBClientConnection& DBClientReplicaSet::slaveConn(){ - DBClientConnection * m = checkMaster(); - assert( ! m->isFailed() ); - - DBClientConnection * failedSlave = 0; - - for ( unsigned i=0; i<_conns.size(); i++ ){ - if ( m == _conns[i] ) - continue; - failedSlave = _conns[i]; - if ( _conns[i]->isFailed() ) - continue; - return *_conns[i]; - } - - assert(failedSlave); - return *failedSlave; - } - - bool DBClientReplicaSet::connect(){ - string errmsg; - - bool anyGood = false; - for ( unsigned i=0; i<_conns.size(); i++ ){ - if ( _conns[i]->connect( _servers[i] , errmsg ) ) - anyGood = true; - } - - if ( ! anyGood ) - return false; - - try { - checkMaster(); - } - catch (AssertionException&) { - return false; - } - return true; - } - - bool DBClientReplicaSet::auth(const string &dbname, const string &username, const string &pwd, string& errmsg, bool digestPassword ) { - DBClientConnection * m = checkMaster(); - if( !m->auth(dbname, username, pwd, errmsg, digestPassword ) ) - return false; - - /* we try to authentiate with the other half of the pair -- even if down, that way the authInfo is cached. */ - for ( unsigned i=0; i<_conns.size(); i++ ){ - if ( _conns[i] == m ) - continue; - try { - string e; - _conns[i]->auth( dbname , username , pwd , e , digestPassword ); - } - catch ( AssertionException& ){ - } - } - - return true; - } - - auto_ptr<DBClientCursor> DBClientReplicaSet::query(const string &a, Query b, int c, int d, - const BSONObj *e, int f, int g){ - // TODO: if slave ok is set go to a slave - return checkMaster()->query(a,b,c,d,e,f,g); - } - - BSONObj DBClientReplicaSet::findOne(const string &a, const Query& b, const BSONObj *c, int d) { - return checkMaster()->findOne(a,b,c,d); - } - - bool DBClientReplicaSet::isMember( const DBConnector * conn ) const { - if ( conn == this ) - return true; - - for ( unsigned i=0; i<_conns.size(); i++ ) - if ( _conns[i]->isMember( conn ) ) - return true; - - return false; - } - bool serverAlive( const string &uri ) { DBClientConnection c( false, 0, 20 ); // potentially the connection to server could fail while we're checking if it's alive - so use timeouts @@ -1084,5 +955,5 @@ namespace mongo { return false; return true; } - + } // namespace mongo |