diff options
Diffstat (limited to 's/d_writeback.cpp')
-rw-r--r-- | s/d_writeback.cpp | 80 |
1 files changed, 80 insertions, 0 deletions
diff --git a/s/d_writeback.cpp b/s/d_writeback.cpp new file mode 100644 index 0000000..738d4d4 --- /dev/null +++ b/s/d_writeback.cpp @@ -0,0 +1,80 @@ +// d_writeback.cpp + +/** +* Copyright (C) 2008 10gen Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "pch.h" + +#include "../db/commands.h" +#include "../db/jsobj.h" +#include "../db/dbmessage.h" +#include "../db/query.h" + +#include "../client/connpool.h" + +#include "../util/queue.h" + +#include "shard.h" + +using namespace std; + +namespace mongo { + + map< string , BlockingQueue<BSONObj>* > writebackQueue; + mongo::mutex writebackQueueLock("sharding:writebackQueueLock"); + + BlockingQueue<BSONObj>* getWritebackQueue( const string& remote ){ + scoped_lock lk (writebackQueueLock ); + BlockingQueue<BSONObj>*& q = writebackQueue[remote]; + if ( ! q ) + q = new BlockingQueue<BSONObj>(); + return q; + } + + void queueWriteBack( const string& remote , const BSONObj& o ){ + getWritebackQueue( remote )->push( o ); + } + + // Note, this command will block until there is something to WriteBack + class WriteBackCommand : public Command { + public: + virtual LockType locktype() const { return NONE; } + virtual bool slaveOk() const { return false; } + virtual bool adminOnly() const { return true; } + + WriteBackCommand() : Command( "writebacklisten" ){} + + void help(stringstream& h) const { h<<"internal"; } + + bool run(const string& , BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){ + + BSONElement e = cmdObj.firstElement(); + if ( e.type() != jstOID ){ + errmsg = "need oid as first value"; + return 0; + } + + const OID id = e.__oid(); + BSONObj z = getWritebackQueue(id.str())->blockingPop(); + log(1) << "WriteBackCommand got : " << z << endl; + + result.append( "data" , z ); + + return true; + } + } writeBackCommand; + +} |