diff options
Diffstat (limited to 'client/parallel.h')
-rw-r--r-- | client/parallel.h | 98 |
1 files changed, 53 insertions, 45 deletions
diff --git a/client/parallel.h b/client/parallel.h index 603cfe7..0809376 100644 --- a/client/parallel.h +++ b/client/parallel.h @@ -24,6 +24,7 @@ #include "redef_macros.h" #include "../db/dbmessage.h" #include "../db/matcher.h" +#include "../util/concurrency/mvar.h" namespace mongo { @@ -32,14 +33,14 @@ namespace mongo { */ class ServerAndQuery { public: - ServerAndQuery( const string& server , BSONObj extra = BSONObj() , BSONObj orderObject = BSONObj() ) : - _server( server ) , _extra( extra.getOwned() ) , _orderObject( orderObject.getOwned() ){ + ServerAndQuery( const string& server , BSONObj extra = BSONObj() , BSONObj orderObject = BSONObj() ) : + _server( server ) , _extra( extra.getOwned() ) , _orderObject( orderObject.getOwned() ) { } - bool operator<( const ServerAndQuery& other ) const{ + bool operator<( const ServerAndQuery& other ) const { if ( ! _orderObject.isEmpty() ) return _orderObject.woCompare( other._orderObject ) < 0; - + if ( _server < other._server ) return true; if ( other._server > _server ) @@ -71,28 +72,28 @@ namespace mongo { ClusteredCursor( QueryMessage& q ); ClusteredCursor( const string& ns , const BSONObj& q , int options=0 , const BSONObj& fields=BSONObj() ); virtual ~ClusteredCursor(); - + /** call before using */ void init(); - + virtual bool more() = 0; virtual BSONObj next() = 0; - + static BSONObj concatQuery( const BSONObj& query , const BSONObj& extraFilter ); - + virtual string type() const = 0; virtual BSONObj explain(); protected: - + virtual void _init() = 0; auto_ptr<DBClientCursor> query( const string& server , int num = 0 , BSONObj extraFilter = BSONObj() , int skipLeft = 0 ); BSONObj explain( const string& server , BSONObj extraFilter = BSONObj() ); - + static BSONObj _concatFilter( const BSONObj& filter , const BSONObj& extraFilter ); - + virtual void _explain( map< string,list<BSONObj> >& out ) = 0; string _ns; @@ -112,19 +113,19 @@ namespace mongo { FilteringClientCursor( const BSONObj filter = BSONObj() ); FilteringClientCursor( auto_ptr<DBClientCursor> cursor , const BSONObj filter = BSONObj() ); ~FilteringClientCursor(); - + void reset( auto_ptr<DBClientCursor> cursor ); - + bool more(); BSONObj next(); - + BSONObj peek(); private: void _advance(); - + Matcher _matcher; auto_ptr<DBClientCursor> _cursor; - + BSONObj _next; bool _done; }; @@ -132,22 +133,22 @@ namespace mongo { class Servers { public: - Servers(){ + Servers() { } - - void add( const ServerAndQuery& s ){ + + void add( const ServerAndQuery& s ) { add( s._server , s._extra ); } - - void add( const string& server , const BSONObj& filter ){ + + void add( const string& server , const BSONObj& filter ) { vector<BSONObj>& mine = _filters[server]; mine.push_back( filter.getOwned() ); } - + // TOOO: pick a less horrible name class View { - View( const Servers* s ){ - for ( map<string, vector<BSONObj> >::const_iterator i=s->_filters.begin(); i!=s->_filters.end(); ++i ){ + View( const Servers* s ) { + for ( map<string, vector<BSONObj> >::const_iterator i=s->_filters.begin(); i!=s->_filters.end(); ++i ) { _servers.push_back( i->first ); _filters.push_back( i->second ); } @@ -164,7 +165,7 @@ namespace mongo { vector<BSONObj> getFilter( int n ) const { return _filters[ n ]; } - + private: vector<string> _servers; vector< vector<BSONObj> > _filters; @@ -175,7 +176,7 @@ namespace mongo { View view() const { return View( this ); } - + private: map<string, vector<BSONObj> > _filters; @@ -198,13 +199,13 @@ namespace mongo { protected: virtual void _explain( map< string,list<BSONObj> >& out ); - void _init(){} + void _init() {} vector<ServerAndQuery> _servers; unsigned _serverIndex; - + FilteringClientCursor _current; - + int _needToSkip; }; @@ -212,11 +213,11 @@ namespace mongo { /** * runs a query in parellel across N servers * sots - */ + */ class ParallelSortClusteredCursor : public ClusteredCursor { public: ParallelSortClusteredCursor( const set<ServerAndQuery>& servers , QueryMessage& q , const BSONObj& sortKey ); - ParallelSortClusteredCursor( const set<ServerAndQuery>& servers , const string& ns , + ParallelSortClusteredCursor( const set<ServerAndQuery>& servers , const string& ns , const Query& q , int options=0, const BSONObj& fields=BSONObj() ); virtual ~ParallelSortClusteredCursor(); virtual bool more(); @@ -231,7 +232,7 @@ namespace mongo { int _numServers; set<ServerAndQuery> _servers; BSONObj _sortKey; - + FilteringClientCursor * _cursors; int _needToSkip; }; @@ -245,11 +246,11 @@ namespace mongo { public: class CommandResult { public: - + string getServer() const { return _server; } bool isDone() const { return _done; } - + bool ok() const { assert( _done ); return _ok; @@ -265,30 +266,37 @@ namespace mongo { returns ok() */ bool join(); - + private: - - CommandResult( const string& server , const string& db , const BSONObj& cmd ); - + + CommandResult( const string& server , const string& db , const BSONObj& cmd , DBClientBase * conn ); + string _server; string _db; BSONObj _cmd; + DBClientBase * _conn; scoped_ptr<boost::thread> _thr; - + BSONObj _res; - bool _done; bool _ok; - + bool _done; + friend class Future; }; + + static void commandThread(shared_ptr<CommandResult> res); - static void commandThread( shared_ptr<CommandResult> res ); - - static shared_ptr<CommandResult> spawnCommand( const string& server , const string& db , const BSONObj& cmd ); + /** + * @param server server name + * @param db db name + * @param cmd cmd to exec + * @param conn optional connection to use. will use standard pooled if non-specified + */ + static shared_ptr<CommandResult> spawnCommand( const string& server , const string& db , const BSONObj& cmd , DBClientBase * conn = 0 ); }; - + } #include "undef_macros.h" |