summaryrefslogtreecommitdiff
path: root/client/parallel.h
diff options
context:
space:
mode:
Diffstat (limited to 'client/parallel.h')
-rw-r--r--client/parallel.h176
1 files changed, 139 insertions, 37 deletions
diff --git a/client/parallel.h b/client/parallel.h
index 88864ae..b60190a 100644
--- a/client/parallel.h
+++ b/client/parallel.h
@@ -16,16 +16,53 @@
*/
/**
- tools for wokring in parallel/sharded/clustered environment
+ tools for working in parallel/sharded/clustered environment
*/
-#include "../stdafx.h"
+#include "../pch.h"
#include "dbclient.h"
+#include "redef_macros.h"
#include "../db/dbmessage.h"
+#include "../db/matcher.h"
namespace mongo {
/**
+ * holder for a server address and a query to run
+ */
+ class ServerAndQuery {
+ public:
+ ServerAndQuery( const string& server , BSONObj extra = BSONObj() , BSONObj orderObject = BSONObj() ) :
+ _server( server ) , _extra( extra.getOwned() ) , _orderObject( orderObject.getOwned() ){
+ }
+
+ bool operator<( const ServerAndQuery& other ) const{
+ if ( ! _orderObject.isEmpty() )
+ return _orderObject.woCompare( other._orderObject ) < 0;
+
+ if ( _server < other._server )
+ return true;
+ if ( other._server > _server )
+ return false;
+ return _extra.woCompare( other._extra ) < 0;
+ }
+
+ string toString() const {
+ StringBuilder ss;
+ ss << "server:" << _server << " _extra:" << _extra.toString() << " _orderObject:" << _orderObject.toString();
+ return ss.str();
+ }
+
+ operator string() const {
+ return toString();
+ }
+
+ string _server;
+ BSONObj _extra;
+ BSONObj _orderObject;
+ };
+
+ /**
* this is a cursor that works over a set of servers
* can be used in serial/paralellel as controlled by sub classes
*/
@@ -34,7 +71,10 @@ namespace mongo {
ClusteredCursor( QueryMessage& q );
ClusteredCursor( const string& ns , const BSONObj& q , int options=0 , const BSONObj& fields=BSONObj() );
virtual ~ClusteredCursor();
-
+
+ /** call before using */
+ void init();
+
virtual bool more() = 0;
virtual BSONObj next() = 0;
@@ -42,53 +82,105 @@ namespace mongo {
virtual string type() const = 0;
+ virtual BSONObj explain();
+
protected:
- auto_ptr<DBClientCursor> query( const string& server , int num = 0 , BSONObj extraFilter = BSONObj() );
+
+ virtual void _init() = 0;
+ auto_ptr<DBClientCursor> query( const string& server , int num = 0 , BSONObj extraFilter = BSONObj() , int skipLeft = 0 );
+ BSONObj explain( const string& server , BSONObj extraFilter = BSONObj() );
+
static BSONObj _concatFilter( const BSONObj& filter , const BSONObj& extraFilter );
+ virtual void _explain( map< string,list<BSONObj> >& out ) = 0;
+
string _ns;
BSONObj _query;
int _options;
BSONObj _fields;
+ int _batchSize;
+
+ bool _didInit;
bool _done;
};
- /**
- * holder for a server address and a query to run
- */
- class ServerAndQuery {
+ class FilteringClientCursor {
public:
- ServerAndQuery( const string& server , BSONObj extra = BSONObj() , BSONObj orderObject = BSONObj() ) :
- _server( server ) , _extra( extra.getOwned() ) , _orderObject( orderObject.getOwned() ){
+ FilteringClientCursor( const BSONObj filter = BSONObj() );
+ FilteringClientCursor( auto_ptr<DBClientCursor> cursor , const BSONObj filter = BSONObj() );
+ ~FilteringClientCursor();
+
+ void reset( auto_ptr<DBClientCursor> cursor );
+
+ bool more();
+ BSONObj next();
+
+ BSONObj peek();
+ private:
+ void _advance();
+
+ Matcher _matcher;
+ auto_ptr<DBClientCursor> _cursor;
+
+ BSONObj _next;
+ bool _done;
+ };
+
+
+ class Servers {
+ public:
+ Servers(){
+ }
+
+ void add( const ServerAndQuery& s ){
+ add( s._server , s._extra );
+ }
+
+ void add( const string& server , const BSONObj& filter ){
+ vector<BSONObj>& mine = _filters[server];
+ mine.push_back( filter.getOwned() );
}
+
+ // TOOO: pick a less horrible name
+ class View {
+ View( const Servers* s ){
+ for ( map<string, vector<BSONObj> >::const_iterator i=s->_filters.begin(); i!=s->_filters.end(); ++i ){
+ _servers.push_back( i->first );
+ _filters.push_back( i->second );
+ }
+ }
+ public:
+ int size() const {
+ return _servers.size();
+ }
- bool operator<( const ServerAndQuery& other ) const{
- if ( ! _orderObject.isEmpty() )
- return _orderObject.woCompare( other._orderObject ) < 0;
+ string getServer( int n ) const {
+ return _servers[n];
+ }
+
+ vector<BSONObj> getFilter( int n ) const {
+ return _filters[ n ];
+ }
- if ( _server < other._server )
- return true;
- if ( other._server > _server )
- return false;
- return _extra.woCompare( other._extra ) < 0;
- }
+ private:
+ vector<string> _servers;
+ vector< vector<BSONObj> > _filters;
- string toString() const {
- StringBuilder ss;
- ss << "server:" << _server << " _extra:" << _extra << " _orderObject:" << _orderObject;
- return ss.str();
- }
+ friend class Servers;
+ };
- operator string() const {
- return toString();
+ View view() const {
+ return View( this );
}
+
- string _server;
- BSONObj _extra;
- BSONObj _orderObject;
+ private:
+ map<string, vector<BSONObj> > _filters;
+
+ friend class View;
};
@@ -102,11 +194,18 @@ namespace mongo {
virtual bool more();
virtual BSONObj next();
virtual string type() const { return "SerialServer"; }
- private:
+
+ protected:
+ virtual void _explain( map< string,list<BSONObj> >& out );
+
+ void _init(){}
+
vector<ServerAndQuery> _servers;
unsigned _serverIndex;
- auto_ptr<DBClientCursor> _current;
+ FilteringClientCursor _current;
+
+ int _needToSkip;
};
@@ -123,17 +222,18 @@ namespace mongo {
virtual bool more();
virtual BSONObj next();
virtual string type() const { return "ParallelSort"; }
- private:
+ protected:
+ void _finishCons();
void _init();
-
- void advance();
+
+ virtual void _explain( map< string,list<BSONObj> >& out );
int _numServers;
set<ServerAndQuery> _servers;
BSONObj _sortKey;
-
- auto_ptr<DBClientCursor> * _cursors;
- BSONObj * _nexts;
+
+ FilteringClientCursor * _cursors;
+ int _needToSkip;
};
/**
@@ -193,3 +293,5 @@ namespace mongo {
}
+
+#include "undef_macros.h"