diff options
Diffstat (limited to 'client/parallel.h')
-rw-r--r-- | client/parallel.h | 176 |
1 files changed, 139 insertions, 37 deletions
diff --git a/client/parallel.h b/client/parallel.h index 88864ae..b60190a 100644 --- a/client/parallel.h +++ b/client/parallel.h @@ -16,16 +16,53 @@ */ /** - tools for wokring in parallel/sharded/clustered environment + tools for working in parallel/sharded/clustered environment */ -#include "../stdafx.h" +#include "../pch.h" #include "dbclient.h" +#include "redef_macros.h" #include "../db/dbmessage.h" +#include "../db/matcher.h" namespace mongo { /** + * holder for a server address and a query to run + */ + class ServerAndQuery { + public: + ServerAndQuery( const string& server , BSONObj extra = BSONObj() , BSONObj orderObject = BSONObj() ) : + _server( server ) , _extra( extra.getOwned() ) , _orderObject( orderObject.getOwned() ){ + } + + bool operator<( const ServerAndQuery& other ) const{ + if ( ! _orderObject.isEmpty() ) + return _orderObject.woCompare( other._orderObject ) < 0; + + if ( _server < other._server ) + return true; + if ( other._server > _server ) + return false; + return _extra.woCompare( other._extra ) < 0; + } + + string toString() const { + StringBuilder ss; + ss << "server:" << _server << " _extra:" << _extra.toString() << " _orderObject:" << _orderObject.toString(); + return ss.str(); + } + + operator string() const { + return toString(); + } + + string _server; + BSONObj _extra; + BSONObj _orderObject; + }; + + /** * this is a cursor that works over a set of servers * can be used in serial/paralellel as controlled by sub classes */ @@ -34,7 +71,10 @@ namespace mongo { ClusteredCursor( QueryMessage& q ); ClusteredCursor( const string& ns , const BSONObj& q , int options=0 , const BSONObj& fields=BSONObj() ); virtual ~ClusteredCursor(); - + + /** call before using */ + void init(); + virtual bool more() = 0; virtual BSONObj next() = 0; @@ -42,53 +82,105 @@ namespace mongo { virtual string type() const = 0; + virtual BSONObj explain(); + protected: - auto_ptr<DBClientCursor> query( const string& server , int num = 0 , BSONObj extraFilter = BSONObj() ); + + virtual void _init() = 0; + auto_ptr<DBClientCursor> query( const string& server , int num = 0 , BSONObj extraFilter = BSONObj() , int skipLeft = 0 ); + BSONObj explain( const string& server , BSONObj extraFilter = BSONObj() ); + static BSONObj _concatFilter( const BSONObj& filter , const BSONObj& extraFilter ); + virtual void _explain( map< string,list<BSONObj> >& out ) = 0; + string _ns; BSONObj _query; int _options; BSONObj _fields; + int _batchSize; + + bool _didInit; bool _done; }; - /** - * holder for a server address and a query to run - */ - class ServerAndQuery { + class FilteringClientCursor { public: - ServerAndQuery( const string& server , BSONObj extra = BSONObj() , BSONObj orderObject = BSONObj() ) : - _server( server ) , _extra( extra.getOwned() ) , _orderObject( orderObject.getOwned() ){ + FilteringClientCursor( const BSONObj filter = BSONObj() ); + FilteringClientCursor( auto_ptr<DBClientCursor> cursor , const BSONObj filter = BSONObj() ); + ~FilteringClientCursor(); + + void reset( auto_ptr<DBClientCursor> cursor ); + + bool more(); + BSONObj next(); + + BSONObj peek(); + private: + void _advance(); + + Matcher _matcher; + auto_ptr<DBClientCursor> _cursor; + + BSONObj _next; + bool _done; + }; + + + class Servers { + public: + Servers(){ + } + + void add( const ServerAndQuery& s ){ + add( s._server , s._extra ); + } + + void add( const string& server , const BSONObj& filter ){ + vector<BSONObj>& mine = _filters[server]; + mine.push_back( filter.getOwned() ); } + + // TOOO: pick a less horrible name + class View { + View( const Servers* s ){ + for ( map<string, vector<BSONObj> >::const_iterator i=s->_filters.begin(); i!=s->_filters.end(); ++i ){ + _servers.push_back( i->first ); + _filters.push_back( i->second ); + } + } + public: + int size() const { + return _servers.size(); + } - bool operator<( const ServerAndQuery& other ) const{ - if ( ! _orderObject.isEmpty() ) - return _orderObject.woCompare( other._orderObject ) < 0; + string getServer( int n ) const { + return _servers[n]; + } + + vector<BSONObj> getFilter( int n ) const { + return _filters[ n ]; + } - if ( _server < other._server ) - return true; - if ( other._server > _server ) - return false; - return _extra.woCompare( other._extra ) < 0; - } + private: + vector<string> _servers; + vector< vector<BSONObj> > _filters; - string toString() const { - StringBuilder ss; - ss << "server:" << _server << " _extra:" << _extra << " _orderObject:" << _orderObject; - return ss.str(); - } + friend class Servers; + }; - operator string() const { - return toString(); + View view() const { + return View( this ); } + - string _server; - BSONObj _extra; - BSONObj _orderObject; + private: + map<string, vector<BSONObj> > _filters; + + friend class View; }; @@ -102,11 +194,18 @@ namespace mongo { virtual bool more(); virtual BSONObj next(); virtual string type() const { return "SerialServer"; } - private: + + protected: + virtual void _explain( map< string,list<BSONObj> >& out ); + + void _init(){} + vector<ServerAndQuery> _servers; unsigned _serverIndex; - auto_ptr<DBClientCursor> _current; + FilteringClientCursor _current; + + int _needToSkip; }; @@ -123,17 +222,18 @@ namespace mongo { virtual bool more(); virtual BSONObj next(); virtual string type() const { return "ParallelSort"; } - private: + protected: + void _finishCons(); void _init(); - - void advance(); + + virtual void _explain( map< string,list<BSONObj> >& out ); int _numServers; set<ServerAndQuery> _servers; BSONObj _sortKey; - - auto_ptr<DBClientCursor> * _cursors; - BSONObj * _nexts; + + FilteringClientCursor * _cursors; + int _needToSkip; }; /** @@ -193,3 +293,5 @@ namespace mongo { } + +#include "undef_macros.h" |