summaryrefslogtreecommitdiff
path: root/client/parallel.h
diff options
context:
space:
mode:
Diffstat (limited to 'client/parallel.h')
-rw-r--r--client/parallel.h98
1 files changed, 53 insertions, 45 deletions
diff --git a/client/parallel.h b/client/parallel.h
index 603cfe7..0809376 100644
--- a/client/parallel.h
+++ b/client/parallel.h
@@ -24,6 +24,7 @@
#include "redef_macros.h"
#include "../db/dbmessage.h"
#include "../db/matcher.h"
+#include "../util/concurrency/mvar.h"
namespace mongo {
@@ -32,14 +33,14 @@ namespace mongo {
*/
class ServerAndQuery {
public:
- ServerAndQuery( const string& server , BSONObj extra = BSONObj() , BSONObj orderObject = BSONObj() ) :
- _server( server ) , _extra( extra.getOwned() ) , _orderObject( orderObject.getOwned() ){
+ ServerAndQuery( const string& server , BSONObj extra = BSONObj() , BSONObj orderObject = BSONObj() ) :
+ _server( server ) , _extra( extra.getOwned() ) , _orderObject( orderObject.getOwned() ) {
}
- bool operator<( const ServerAndQuery& other ) const{
+ bool operator<( const ServerAndQuery& other ) const {
if ( ! _orderObject.isEmpty() )
return _orderObject.woCompare( other._orderObject ) < 0;
-
+
if ( _server < other._server )
return true;
if ( other._server > _server )
@@ -71,28 +72,28 @@ namespace mongo {
ClusteredCursor( QueryMessage& q );
ClusteredCursor( const string& ns , const BSONObj& q , int options=0 , const BSONObj& fields=BSONObj() );
virtual ~ClusteredCursor();
-
+
/** call before using */
void init();
-
+
virtual bool more() = 0;
virtual BSONObj next() = 0;
-
+
static BSONObj concatQuery( const BSONObj& query , const BSONObj& extraFilter );
-
+
virtual string type() const = 0;
virtual BSONObj explain();
protected:
-
+
virtual void _init() = 0;
auto_ptr<DBClientCursor> query( const string& server , int num = 0 , BSONObj extraFilter = BSONObj() , int skipLeft = 0 );
BSONObj explain( const string& server , BSONObj extraFilter = BSONObj() );
-
+
static BSONObj _concatFilter( const BSONObj& filter , const BSONObj& extraFilter );
-
+
virtual void _explain( map< string,list<BSONObj> >& out ) = 0;
string _ns;
@@ -112,19 +113,19 @@ namespace mongo {
FilteringClientCursor( const BSONObj filter = BSONObj() );
FilteringClientCursor( auto_ptr<DBClientCursor> cursor , const BSONObj filter = BSONObj() );
~FilteringClientCursor();
-
+
void reset( auto_ptr<DBClientCursor> cursor );
-
+
bool more();
BSONObj next();
-
+
BSONObj peek();
private:
void _advance();
-
+
Matcher _matcher;
auto_ptr<DBClientCursor> _cursor;
-
+
BSONObj _next;
bool _done;
};
@@ -132,22 +133,22 @@ namespace mongo {
class Servers {
public:
- Servers(){
+ Servers() {
}
-
- void add( const ServerAndQuery& s ){
+
+ void add( const ServerAndQuery& s ) {
add( s._server , s._extra );
}
-
- void add( const string& server , const BSONObj& filter ){
+
+ void add( const string& server , const BSONObj& filter ) {
vector<BSONObj>& mine = _filters[server];
mine.push_back( filter.getOwned() );
}
-
+
// TOOO: pick a less horrible name
class View {
- View( const Servers* s ){
- for ( map<string, vector<BSONObj> >::const_iterator i=s->_filters.begin(); i!=s->_filters.end(); ++i ){
+ View( const Servers* s ) {
+ for ( map<string, vector<BSONObj> >::const_iterator i=s->_filters.begin(); i!=s->_filters.end(); ++i ) {
_servers.push_back( i->first );
_filters.push_back( i->second );
}
@@ -164,7 +165,7 @@ namespace mongo {
vector<BSONObj> getFilter( int n ) const {
return _filters[ n ];
}
-
+
private:
vector<string> _servers;
vector< vector<BSONObj> > _filters;
@@ -175,7 +176,7 @@ namespace mongo {
View view() const {
return View( this );
}
-
+
private:
map<string, vector<BSONObj> > _filters;
@@ -198,13 +199,13 @@ namespace mongo {
protected:
virtual void _explain( map< string,list<BSONObj> >& out );
- void _init(){}
+ void _init() {}
vector<ServerAndQuery> _servers;
unsigned _serverIndex;
-
+
FilteringClientCursor _current;
-
+
int _needToSkip;
};
@@ -212,11 +213,11 @@ namespace mongo {
/**
* runs a query in parellel across N servers
* sots
- */
+ */
class ParallelSortClusteredCursor : public ClusteredCursor {
public:
ParallelSortClusteredCursor( const set<ServerAndQuery>& servers , QueryMessage& q , const BSONObj& sortKey );
- ParallelSortClusteredCursor( const set<ServerAndQuery>& servers , const string& ns ,
+ ParallelSortClusteredCursor( const set<ServerAndQuery>& servers , const string& ns ,
const Query& q , int options=0, const BSONObj& fields=BSONObj() );
virtual ~ParallelSortClusteredCursor();
virtual bool more();
@@ -231,7 +232,7 @@ namespace mongo {
int _numServers;
set<ServerAndQuery> _servers;
BSONObj _sortKey;
-
+
FilteringClientCursor * _cursors;
int _needToSkip;
};
@@ -245,11 +246,11 @@ namespace mongo {
public:
class CommandResult {
public:
-
+
string getServer() const { return _server; }
bool isDone() const { return _done; }
-
+
bool ok() const {
assert( _done );
return _ok;
@@ -265,30 +266,37 @@ namespace mongo {
returns ok()
*/
bool join();
-
+
private:
-
- CommandResult( const string& server , const string& db , const BSONObj& cmd );
-
+
+ CommandResult( const string& server , const string& db , const BSONObj& cmd , DBClientBase * conn );
+
string _server;
string _db;
BSONObj _cmd;
+ DBClientBase * _conn;
scoped_ptr<boost::thread> _thr;
-
+
BSONObj _res;
- bool _done;
bool _ok;
-
+ bool _done;
+
friend class Future;
};
+
+ static void commandThread(shared_ptr<CommandResult> res);
- static void commandThread( shared_ptr<CommandResult> res );
-
- static shared_ptr<CommandResult> spawnCommand( const string& server , const string& db , const BSONObj& cmd );
+ /**
+ * @param server server name
+ * @param db db name
+ * @param cmd cmd to exec
+ * @param conn optional connection to use. will use standard pooled if non-specified
+ */
+ static shared_ptr<CommandResult> spawnCommand( const string& server , const string& db , const BSONObj& cmd , DBClientBase * conn = 0 );
};
-
+
}
#include "undef_macros.h"