diff options
Diffstat (limited to 'client')
-rw-r--r-- | client/clientOnly.cpp | 4 | ||||
-rw-r--r-- | client/connpool.cpp | 3 | ||||
-rw-r--r-- | client/dbclient.cpp | 7 | ||||
-rw-r--r-- | client/dbclient.h | 19 | ||||
-rw-r--r-- | client/dbclient_rs.cpp | 56 | ||||
-rw-r--r-- | client/dbclient_rs.h | 3 | ||||
-rw-r--r-- | client/examples/clientTest.cpp | 22 |
7 files changed, 85 insertions, 29 deletions
diff --git a/client/clientOnly.cpp b/client/clientOnly.cpp index 726c3a9..5725e5f 100644 --- a/client/clientOnly.cpp +++ b/client/clientOnly.cpp @@ -68,5 +68,9 @@ namespace mongo { return false; } + string prettyHostName() { + assert(0); + return ""; + } } diff --git a/client/connpool.cpp b/client/connpool.cpp index a521699..23d14da 100644 --- a/client/connpool.cpp +++ b/client/connpool.cpp @@ -192,6 +192,9 @@ namespace mongo { { scoped_lock lk( _mutex ); for ( PoolMap::iterator i=_pools.begin(); i!=_pools.end(); ++i ) { + if ( i->second.numCreated() == 0 ) + continue; + string s = i->first; BSONObjBuilder temp( bb.subobjStart( s ) ); temp.append( "available" , i->second.numAvailable() ); diff --git a/client/dbclient.cpp b/client/dbclient.cpp index a68b1af..bb24199 100644 --- a/client/dbclient.cpp +++ b/client/dbclient.cpp @@ -442,15 +442,16 @@ namespace mongo { return false; } - BSONObj DBClientWithCommands::mapreduce(const string &ns, const string &jsmapf, const string &jsreducef, BSONObj query, const string& outputcolname) { + DBClientWithCommands::MROutput DBClientWithCommands::MRInline (BSON("inline" << 1)); + + BSONObj DBClientWithCommands::mapreduce(const string &ns, const string &jsmapf, const string &jsreducef, BSONObj query, MROutput output) { BSONObjBuilder b; b.append("mapreduce", nsGetCollection(ns)); b.appendCode("map", jsmapf); b.appendCode("reduce", jsreducef); if( !query.isEmpty() ) b.append("query", query); - if( !outputcolname.empty() ) - b.append("out", outputcolname); + b.append("out", output.out); BSONObj info; runCommand(nsGetDB(ns), b.done(), info); return info; diff --git a/client/dbclient.h b/client/dbclient.h index 9cb6571..9bc71fd 100644 --- a/client/dbclient.h +++ b/client/dbclient.h @@ -528,6 +528,19 @@ namespace mongo { bool setDbProfilingLevel(const string &dbname, ProfilingLevel level, BSONObj *info = 0); bool getDbProfilingLevel(const string &dbname, ProfilingLevel& level, BSONObj *info = 0); + + /** This implicitly converts from char*, string, and BSONObj to be an argument to mapreduce + You shouldn't need to explicitly construct this + */ + struct MROutput { + MROutput(const char* collection) : out(BSON("replace" << collection)) {} + MROutput(const string& collection) : out(BSON("replace" << collection)) {} + MROutput(const BSONObj& obj) : out(obj) {} + + BSONObj out; + }; + static MROutput MRInline; + /** Run a map/reduce job on the server. See http://www.mongodb.org/display/DOCS/MapReduce @@ -536,8 +549,8 @@ namespace mongo { jsmapf javascript map function code jsreducef javascript reduce function code. query optional query filter for the input - output optional permanent output collection name. if not specified server will - generate a temporary collection and return its name. + output either a string collection name or an object representing output type + if not specified uses inline output type returns a result object which contains: { result : <collection_name>, @@ -551,7 +564,7 @@ namespace mongo { result.getField("ok").trueValue() on the result to check if ok. */ - BSONObj mapreduce(const string &ns, const string &jsmapf, const string &jsreducef, BSONObj query = BSONObj(), const string& output = ""); + BSONObj mapreduce(const string &ns, const string &jsmapf, const string &jsreducef, BSONObj query = BSONObj(), MROutput output = MRInline); /** Run javascript code on the database server. dbname database SavedContext in which the code runs. The javascript variable 'db' will be assigned diff --git a/client/dbclient_rs.cpp b/client/dbclient_rs.cpp index b6ce776..ae01da3 100644 --- a/client/dbclient_rs.cpp +++ b/client/dbclient_rs.cpp @@ -74,7 +74,7 @@ namespace mongo { ReplicaSetMonitor::ReplicaSetMonitor( const string& name , const vector<HostAndPort>& servers ) - : _lock( "ReplicaSetMonitor instance" ) , _checkConnectionLock( "ReplicaSetMonitor check connection lock" ), _name( name ) , _master(-1) { + : _lock( "ReplicaSetMonitor instance" ) , _checkConnectionLock( "ReplicaSetMonitor check connection lock" ), _name( name ) , _master(-1), _nextSlave(0) { uassert( 13642 , "need at least 1 node for a replica set" , servers.size() > 0 ); @@ -85,6 +85,12 @@ namespace mongo { string errmsg; for ( unsigned i=0; i<servers.size(); i++ ) { + + bool haveAlready = false; + for ( unsigned n = 0; n < _nodes.size() && ! haveAlready; n++ ) + haveAlready = ( _nodes[n].addr == servers[i] ); + if( haveAlready ) continue; + auto_ptr<DBClientConnection> conn( new DBClientConnection( true , 0, 5.0 ) ); if (!conn->connect( servers[i] , errmsg ) ) { log(1) << "error connecting to seed " << servers[i] << ": " << errmsg << endl; @@ -221,19 +227,17 @@ namespace mongo { } HostAndPort ReplicaSetMonitor::getSlave() { - int x = rand() % _nodes.size(); - { - scoped_lock lk( _lock ); - for ( unsigned i=0; i<_nodes.size(); i++ ) { - int p = ( i + x ) % _nodes.size(); - if ( p == _master ) - continue; - if ( _nodes[p].ok ) - return _nodes[p].addr; - } + + scoped_lock lk( _lock ); + for ( unsigned i=0; i<_nodes.size(); i++ ) { + _nextSlave = ( _nextSlave + 1 ) % _nodes.size(); + if ( _nextSlave == _master ) + continue; + if ( _nodes[ _nextSlave ].ok ) + return _nodes[ _nextSlave ].addr; } - return _nodes[0].addr; + return _nodes[ 0 ].addr; } /** @@ -292,6 +296,10 @@ namespace mongo { newConn->connect( h , temp ); { scoped_lock lk( _lock ); + if ( _find_inlock( toCheck ) >= 0 ) { + // we need this check inside the lock so there isn't thread contention on adding to vector + continue; + } _nodes.push_back( Node( h , newConn ) ); } log() << "updated set (" << _name << ") to: " << getServerAddress() << endl; @@ -309,10 +317,9 @@ namespace mongo { BSONObj o; c->isMaster(isMaster, &o); - log( ! verbose ) << "ReplicaSetMonitor::_checkConnection: " << c->toString() << ' ' << o << '\n'; + log( ! verbose ) << "ReplicaSetMonitor::_checkConnection: " << c->toString() << ' ' << o << endl; // add other nodes - string maybePrimary; if ( o["hosts"].type() == Array ) { if ( o["primary"].type() == String ) maybePrimary = o["primary"].String(); @@ -394,12 +401,17 @@ namespace mongo { int ReplicaSetMonitor::_find( const string& server ) const { scoped_lock lk( _lock ); + return _find_inlock( server ); + } + + int ReplicaSetMonitor::_find_inlock( const string& server ) const { for ( unsigned i=0; i<_nodes.size(); i++ ) if ( _nodes[i].addr == server ) return i; return -1; } + int ReplicaSetMonitor::_find( const HostAndPort& server ) const { scoped_lock lk( _lock ); for ( unsigned i=0; i<_nodes.size(); i++ ) @@ -426,7 +438,7 @@ namespace mongo { DBClientConnection * DBClientReplicaSet::checkMaster() { HostAndPort h = _monitor->getMaster(); - if ( h == _masterHost ) { + if ( h == _masterHost && _master ) { // a master is selected. let's just make sure connection didn't die if ( ! _master->isFailed() ) return _master.get(); @@ -447,7 +459,7 @@ namespace mongo { DBClientConnection * DBClientReplicaSet::checkSlave() { HostAndPort h = _monitor->getSlave( _slaveHost ); - if ( h == _slaveHost ) { + if ( h == _slaveHost && _slave ) { if ( ! _slave->isFailed() ) return _slave.get(); _monitor->notifySlaveFailure( _slaveHost ); @@ -534,8 +546,8 @@ namespace mongo { try { return checkSlave()->query(ns,query,nToReturn,nToSkip,fieldsToReturn,queryOptions,batchSize); } - catch ( DBException & ) { - LOG(1) << "can't query replica set slave: " << _slaveHost << endl; + catch ( DBException &e ) { + log() << "can't query replica set slave " << i << " : " << _slaveHost << e.what() << endl; } } } @@ -552,8 +564,8 @@ namespace mongo { try { return checkSlave()->findOne(ns,query,fieldsToReturn,queryOptions); } - catch ( DBException & ) { - LOG(1) << "can't query replica set slave: " << _slaveHost << endl; + catch ( DBException &e ) { + LOG(1) << "can't findone replica set slave " << i << " : " << _slaveHost << e.what() << endl; } } } @@ -588,8 +600,8 @@ namespace mongo { *actualServer = s->getServerAddress(); return s->call( toSend , response , assertOk ); } - catch ( DBException & ) { - log(1) << "can't query replica set slave: " << _slaveHost << endl; + catch ( DBException &e ) { + LOG(1) << "can't call replica set slave " << i << " : " << _slaveHost << e.what() << endl; if ( actualServer ) *actualServer = ""; } diff --git a/client/dbclient_rs.h b/client/dbclient_rs.h index fca6e6e..e942d7b 100644 --- a/client/dbclient_rs.h +++ b/client/dbclient_rs.h @@ -124,6 +124,7 @@ namespace mongo { bool _checkConnection( DBClientConnection * c , string& maybePrimary , bool verbose ); int _find( const string& server ) const ; + int _find_inlock( const string& server ) const ; int _find( const HostAndPort& server ) const ; mutable mongo::mutex _lock; // protects _nodes @@ -147,7 +148,7 @@ namespace mongo { vector<Node> _nodes; int _master; // which node is the current master. -1 means no master is known - + int _nextSlave; // which node is the current slave static mongo::mutex _setsLock; // protects _sets static map<string,ReplicaSetMonitorPtr> _sets; // set name to Monitor diff --git a/client/examples/clientTest.cpp b/client/examples/clientTest.cpp index bd4432e..96c014e 100644 --- a/client/examples/clientTest.cpp +++ b/client/examples/clientTest.cpp @@ -224,5 +224,27 @@ int main( int argc, const char **argv ) { } } + { + //Map Reduce (this mostly just tests that it compiles with all output types) + const string ns = "test.mr"; + conn.insert(ns, BSON("a" << 1)); + conn.insert(ns, BSON("a" << 1)); + + const char* map = "function() { emit(this.a, 1); }"; + const char* reduce = "function(key, values) { return Array.sum(values); }"; + + const string outcoll = ns + ".out"; + + BSONObj out; + out = conn.mapreduce(ns, map, reduce, BSONObj()); // default to inline + //MONGO_PRINT(out); + out = conn.mapreduce(ns, map, reduce, BSONObj(), outcoll); + //MONGO_PRINT(out); + out = conn.mapreduce(ns, map, reduce, BSONObj(), outcoll.c_str()); + //MONGO_PRINT(out); + out = conn.mapreduce(ns, map, reduce, BSONObj(), BSON("reduce" << outcoll)); + //MONGO_PRINT(out); + } + cout << "client test finished!" << endl; } |