summaryrefslogtreecommitdiff
path: root/s/d_writeback.cpp
diff options
context:
space:
mode:
authorAntonin Kral <a.kral@bobek.cz>2011-03-17 00:05:43 +0100
committerAntonin Kral <a.kral@bobek.cz>2011-03-17 00:05:43 +0100
commit582fc32574a3b158c81e49cb00e6ae59205e66ba (patch)
treeac64a3243e0d2121709f685695247052858115c8 /s/d_writeback.cpp
parent2761bffa96595ac1698d86bbc2e95ebb0d4d6e93 (diff)
downloadmongodb-582fc32574a3b158c81e49cb00e6ae59205e66ba.tar.gz
Imported Upstream version 1.8.0
Diffstat (limited to 's/d_writeback.cpp')
-rw-r--r--s/d_writeback.cpp97
1 files changed, 70 insertions, 27 deletions
diff --git a/s/d_writeback.cpp b/s/d_writeback.cpp
index a18e5d5..401e0aa 100644
--- a/s/d_writeback.cpp
+++ b/s/d_writeback.cpp
@@ -19,62 +19,105 @@
#include "pch.h"
#include "../db/commands.h"
-#include "../db/jsobj.h"
-#include "../db/dbmessage.h"
-#include "../db/query.h"
-
-#include "../client/connpool.h"
-
#include "../util/queue.h"
-#include "shard.h"
+#include "d_writeback.h"
using namespace std;
namespace mongo {
- map< string , BlockingQueue<BSONObj>* > writebackQueue;
- mongo::mutex writebackQueueLock("sharding:writebackQueueLock");
+ // ---------- WriteBackManager class ----------
+
+ // TODO init at mongod startup
+ WriteBackManager writeBackManager;
+
+ WriteBackManager::WriteBackManager() : _writebackQueueLock("sharding:writebackQueueLock") {
+ }
+
+ WriteBackManager::~WriteBackManager() {
+ }
+
+ void WriteBackManager::queueWriteBack( const string& remote , const BSONObj& o ) {
+ getWritebackQueue( remote )->push( o );
+ }
- BlockingQueue<BSONObj>* getWritebackQueue( const string& remote ){
- scoped_lock lk (writebackQueueLock );
- BlockingQueue<BSONObj>*& q = writebackQueue[remote];
+ BlockingQueue<BSONObj>* WriteBackManager::getWritebackQueue( const string& remote ) {
+ scoped_lock lk ( _writebackQueueLock );
+ BlockingQueue<BSONObj>*& q = _writebackQueues[remote];
if ( ! q )
q = new BlockingQueue<BSONObj>();
return q;
}
-
- void queueWriteBack( const string& remote , const BSONObj& o ){
- getWritebackQueue( remote )->push( o );
+
+ bool WriteBackManager::queuesEmpty() const {
+ scoped_lock lk( _writebackQueueLock );
+ for ( WriteBackQueuesMap::const_iterator it = _writebackQueues.begin(); it != _writebackQueues.end(); ++it ) {
+ const BlockingQueue<BSONObj>* queue = it->second;
+ if (! queue->empty() ) {
+ return false;
+ }
+ }
+ return true;
}
+ // ---------- admin commands ----------
+
// Note, this command will block until there is something to WriteBack
class WriteBackCommand : public Command {
public:
- virtual LockType locktype() const { return NONE; }
+ virtual LockType locktype() const { return NONE; }
virtual bool slaveOk() const { return true; }
virtual bool adminOnly() const { return true; }
-
- WriteBackCommand() : Command( "writebacklisten" ){}
+
+ WriteBackCommand() : Command( "writebacklisten" ) {}
void help(stringstream& h) const { h<<"internal"; }
- bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){
+ bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool) {
BSONElement e = cmdObj.firstElement();
- if ( e.type() != jstOID ){
+ if ( e.type() != jstOID ) {
errmsg = "need oid as first value";
return 0;
}
-
+
+ // get the command issuer's (a mongos) serverID
const OID id = e.__oid();
- BSONObj z = getWritebackQueue(id.str())->blockingPop();
- log(1) << "WriteBackCommand got : " << z << endl;
-
- result.append( "data" , z );
-
+
+ // the command issuer is blocked awaiting a response
+ // we want to do return at least at every 5 minutes so sockets don't timeout
+ BSONObj z;
+ if ( writeBackManager.getWritebackQueue(id.str())->blockingPop( z, 5 * 60 /* 5 minutes */ ) ) {
+ log(1) << "WriteBackCommand got : " << z << endl;
+ result.append( "data" , z );
+ }
+ else {
+ result.appendBool( "noop" , true );
+ }
+
return true;
}
} writeBackCommand;
-}
+ class WriteBacksQueuedCommand : public Command {
+ public:
+ virtual LockType locktype() const { return NONE; }
+ virtual bool slaveOk() const { return true; }
+ virtual bool adminOnly() const { return true; }
+
+ WriteBacksQueuedCommand() : Command( "writeBacksQueued" ) {}
+
+ void help(stringstream& help) const {
+ help << "Returns whether there are operations in the writeback queue at the time the command was called. "
+ << "This is an internal comand";
+ }
+
+ bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool) {
+ result.appendBool( "hasOpsQueued" , ! writeBackManager.queuesEmpty() );
+ return true;
+ }
+
+ } writeBacksQueuedCommand;
+
+} // namespace mongo