summaryrefslogtreecommitdiff
path: root/s/d_writeback.cpp
diff options
context:
space:
mode:
Diffstat (limited to 's/d_writeback.cpp')
-rw-r--r--s/d_writeback.cpp80
1 files changed, 80 insertions, 0 deletions
diff --git a/s/d_writeback.cpp b/s/d_writeback.cpp
new file mode 100644
index 0000000..738d4d4
--- /dev/null
+++ b/s/d_writeback.cpp
@@ -0,0 +1,80 @@
+// d_writeback.cpp
+
+/**
+* Copyright (C) 2008 10gen Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "pch.h"
+
+#include "../db/commands.h"
+#include "../db/jsobj.h"
+#include "../db/dbmessage.h"
+#include "../db/query.h"
+
+#include "../client/connpool.h"
+
+#include "../util/queue.h"
+
+#include "shard.h"
+
+using namespace std;
+
+namespace mongo {
+
+ map< string , BlockingQueue<BSONObj>* > writebackQueue;
+ mongo::mutex writebackQueueLock("sharding:writebackQueueLock");
+
+ BlockingQueue<BSONObj>* getWritebackQueue( const string& remote ){
+ scoped_lock lk (writebackQueueLock );
+ BlockingQueue<BSONObj>*& q = writebackQueue[remote];
+ if ( ! q )
+ q = new BlockingQueue<BSONObj>();
+ return q;
+ }
+
+ void queueWriteBack( const string& remote , const BSONObj& o ){
+ getWritebackQueue( remote )->push( o );
+ }
+
+ // Note, this command will block until there is something to WriteBack
+ class WriteBackCommand : public Command {
+ public:
+ virtual LockType locktype() const { return NONE; }
+ virtual bool slaveOk() const { return false; }
+ virtual bool adminOnly() const { return true; }
+
+ WriteBackCommand() : Command( "writebacklisten" ){}
+
+ void help(stringstream& h) const { h<<"internal"; }
+
+ bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){
+
+ BSONElement e = cmdObj.firstElement();
+ if ( e.type() != jstOID ){
+ errmsg = "need oid as first value";
+ return 0;
+ }
+
+ const OID id = e.__oid();
+ BSONObj z = getWritebackQueue(id.str())->blockingPop();
+ log(1) << "WriteBackCommand got : " << z << endl;
+
+ result.append( "data" , z );
+
+ return true;
+ }
+ } writeBackCommand;
+
+}