diff options
Diffstat (limited to 's/d_writeback.cpp')
-rw-r--r-- | s/d_writeback.cpp | 97 |
1 files changed, 70 insertions, 27 deletions
diff --git a/s/d_writeback.cpp b/s/d_writeback.cpp index a18e5d5..401e0aa 100644 --- a/s/d_writeback.cpp +++ b/s/d_writeback.cpp @@ -19,62 +19,105 @@ #include "pch.h" #include "../db/commands.h" -#include "../db/jsobj.h" -#include "../db/dbmessage.h" -#include "../db/query.h" - -#include "../client/connpool.h" - #include "../util/queue.h" -#include "shard.h" +#include "d_writeback.h" using namespace std; namespace mongo { - map< string , BlockingQueue<BSONObj>* > writebackQueue; - mongo::mutex writebackQueueLock("sharding:writebackQueueLock"); + // ---------- WriteBackManager class ---------- + + // TODO init at mongod startup + WriteBackManager writeBackManager; + + WriteBackManager::WriteBackManager() : _writebackQueueLock("sharding:writebackQueueLock") { + } + + WriteBackManager::~WriteBackManager() { + } + + void WriteBackManager::queueWriteBack( const string& remote , const BSONObj& o ) { + getWritebackQueue( remote )->push( o ); + } - BlockingQueue<BSONObj>* getWritebackQueue( const string& remote ){ - scoped_lock lk (writebackQueueLock ); - BlockingQueue<BSONObj>*& q = writebackQueue[remote]; + BlockingQueue<BSONObj>* WriteBackManager::getWritebackQueue( const string& remote ) { + scoped_lock lk ( _writebackQueueLock ); + BlockingQueue<BSONObj>*& q = _writebackQueues[remote]; if ( ! q ) q = new BlockingQueue<BSONObj>(); return q; } - - void queueWriteBack( const string& remote , const BSONObj& o ){ - getWritebackQueue( remote )->push( o ); + + bool WriteBackManager::queuesEmpty() const { + scoped_lock lk( _writebackQueueLock ); + for ( WriteBackQueuesMap::const_iterator it = _writebackQueues.begin(); it != _writebackQueues.end(); ++it ) { + const BlockingQueue<BSONObj>* queue = it->second; + if (! queue->empty() ) { + return false; + } + } + return true; } + // ---------- admin commands ---------- + // Note, this command will block until there is something to WriteBack class WriteBackCommand : public Command { public: - virtual LockType locktype() const { return NONE; } + virtual LockType locktype() const { return NONE; } virtual bool slaveOk() const { return true; } virtual bool adminOnly() const { return true; } - - WriteBackCommand() : Command( "writebacklisten" ){} + + WriteBackCommand() : Command( "writebacklisten" ) {} void help(stringstream& h) const { h<<"internal"; } - bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){ + bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool) { BSONElement e = cmdObj.firstElement(); - if ( e.type() != jstOID ){ + if ( e.type() != jstOID ) { errmsg = "need oid as first value"; return 0; } - + + // get the command issuer's (a mongos) serverID const OID id = e.__oid(); - BSONObj z = getWritebackQueue(id.str())->blockingPop(); - log(1) << "WriteBackCommand got : " << z << endl; - - result.append( "data" , z ); - + + // the command issuer is blocked awaiting a response + // we want to do return at least at every 5 minutes so sockets don't timeout + BSONObj z; + if ( writeBackManager.getWritebackQueue(id.str())->blockingPop( z, 5 * 60 /* 5 minutes */ ) ) { + log(1) << "WriteBackCommand got : " << z << endl; + result.append( "data" , z ); + } + else { + result.appendBool( "noop" , true ); + } + return true; } } writeBackCommand; -} + class WriteBacksQueuedCommand : public Command { + public: + virtual LockType locktype() const { return NONE; } + virtual bool slaveOk() const { return true; } + virtual bool adminOnly() const { return true; } + + WriteBacksQueuedCommand() : Command( "writeBacksQueued" ) {} + + void help(stringstream& help) const { + help << "Returns whether there are operations in the writeback queue at the time the command was called. " + << "This is an internal comand"; + } + + bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool) { + result.appendBool( "hasOpsQueued" , ! writeBackManager.queuesEmpty() ); + return true; + } + + } writeBacksQueuedCommand; + +} // namespace mongo |