diff options
author | Antonin Kral <a.kral@bobek.cz> | 2010-08-11 12:38:57 +0200 |
---|---|---|
committer | Antonin Kral <a.kral@bobek.cz> | 2010-08-11 12:38:57 +0200 |
commit | 7645618fd3914cb8a20561625913c20d49504a49 (patch) | |
tree | 8370f846f58f6d71165b7a0e2eda04648584ec76 /s/d_logic.cpp | |
parent | 68c73c3c7608b4c87f07440dc3232801720b1168 (diff) | |
download | mongodb-7645618fd3914cb8a20561625913c20d49504a49.tar.gz |
Imported Upstream version 1.6.0
Diffstat (limited to 's/d_logic.cpp')
-rw-r--r-- | s/d_logic.cpp | 503 |
1 files changed, 34 insertions, 469 deletions
diff --git a/s/d_logic.cpp b/s/d_logic.cpp index 2a9cde3..ddf83e8 100644 --- a/s/d_logic.cpp +++ b/s/d_logic.cpp @@ -22,500 +22,57 @@ mostly around shard management and checking */ -#include "stdafx.h" +#include "pch.h" #include <map> #include <string> #include "../db/commands.h" #include "../db/jsobj.h" #include "../db/dbmessage.h" +#include "../db/query.h" #include "../client/connpool.h" #include "../util/queue.h" +#include "shard.h" +#include "d_logic.h" + using namespace std; namespace mongo { - - typedef map<string,unsigned long long> NSVersions; - - NSVersions globalVersions; - boost::thread_specific_ptr<NSVersions> clientShardVersions; - - string shardConfigServer; - - boost::thread_specific_ptr<OID> clientServerIds; - map< string , BlockingQueue<BSONObj>* > clientQueues; - - unsigned long long getVersion( BSONElement e , string& errmsg ){ - if ( e.eoo() ){ - errmsg = "no version"; - return 0; - } - - if ( e.isNumber() ) - return (unsigned long long)e.number(); - - if ( e.type() == Date || e.type() == Timestamp ) - return e._numberLong(); - - - errmsg = "version is not a numberic type"; - return 0; - } - - class MongodShardCommand : public Command { - public: - MongodShardCommand( const char * n ) : Command( n ){ - } - virtual bool slaveOk(){ - return false; - } - virtual bool adminOnly() { - return true; - } - }; - - class WriteBackCommand : public MongodShardCommand { - public: - virtual LockType locktype(){ return NONE; } - WriteBackCommand() : MongodShardCommand( "writebacklisten" ){} - bool run(const char *cmdns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){ - - BSONElement e = cmdObj.firstElement(); - if ( e.type() != jstOID ){ - errmsg = "need oid as first value"; - return 0; - } - - const OID id = e.__oid(); - - if ( ! clientQueues[id.str()] ) - clientQueues[id.str()] = new BlockingQueue<BSONObj>(); - - BSONObj z = clientQueues[id.str()]->blockingPop(); - log(1) << "WriteBackCommand got : " << z << endl; - - result.append( "data" , z ); - - return true; - } - } writeBackCommand; - - // setShardVersion( ns ) - - class SetShardVersion : public MongodShardCommand { - public: - SetShardVersion() : MongodShardCommand("setShardVersion"){} - - virtual void help( stringstream& help ) const { - help << " example: { setShardVersion : 'alleyinsider.foo' , version : 1 , configdb : '' } "; - } - - virtual LockType locktype(){ return WRITE; } // TODO: figure out how to make this not need to lock - - bool run(const char *cmdns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){ - - bool authoritative = cmdObj.getBoolField( "authoritative" ); - - string configdb = cmdObj["configdb"].valuestrsafe(); - { // configdb checking - if ( configdb.size() == 0 ){ - errmsg = "no configdb"; - return false; - } - - if ( shardConfigServer.size() == 0 ){ - if ( ! authoritative ){ - result.appendBool( "need_authoritative" , true ); - errmsg = "first setShardVersion"; - return false; - } - shardConfigServer = configdb; - } - else if ( shardConfigServer != configdb ){ - errmsg = "specified a different configdb!"; - return false; - } - } - - { // setting up ids - if ( cmdObj["serverID"].type() != jstOID ){ - // TODO: fix this - //errmsg = "need serverID to be an OID"; - //return 0; - } - else { - OID clientId = cmdObj["serverID"].__oid(); - if ( ! clientServerIds.get() ){ - string s = clientId.str(); - - OID * nid = new OID(); - nid->init( s ); - clientServerIds.reset( nid ); - - if ( ! clientQueues[s] ) - clientQueues[s] = new BlockingQueue<BSONObj>(); - } - else if ( clientId != *clientServerIds.get() ){ - errmsg = "server id has changed!"; - return 0; - } - } - } - - unsigned long long version = getVersion( cmdObj["version"] , errmsg ); - if ( errmsg.size() ){ - return false; - } - - NSVersions * versions = clientShardVersions.get(); - - if ( ! versions ){ - log(1) << "entering shard mode for connection" << endl; - versions = new NSVersions(); - clientShardVersions.reset( versions ); - } - - string ns = cmdObj["setShardVersion"].valuestrsafe(); - if ( ns.size() == 0 ){ - errmsg = "need to speciy fully namespace"; - return false; - } - - unsigned long long& oldVersion = (*versions)[ns]; - unsigned long long& globalVersion = globalVersions[ns]; - - if ( version == 0 && globalVersion == 0 ){ - // this connection is cleaning itself - oldVersion = 0; - return 1; - } - - if ( version == 0 && globalVersion > 0 ){ - if ( ! authoritative ){ - result.appendBool( "need_authoritative" , true ); - result.appendTimestamp( "globalVersion" , globalVersion ); - result.appendTimestamp( "oldVersion" , oldVersion ); - errmsg = "dropping needs to be authoritative"; - return 0; - } - log() << "wiping data for: " << ns << endl; - result.appendTimestamp( "beforeDrop" , globalVersion ); - // only setting global version on purpose - // need clients to re-find meta-data - globalVersion = 0; - oldVersion = 0; - return 1; - } - - if ( version < oldVersion ){ - errmsg = "you already have a newer version"; - result.appendTimestamp( "oldVersion" , oldVersion ); - result.appendTimestamp( "newVersion" , version ); - return false; - } - - if ( version < globalVersion ){ - errmsg = "going to older version for global"; - return false; - } - - if ( globalVersion == 0 && ! cmdObj.getBoolField( "authoritative" ) ){ - // need authoritative for first look - result.appendBool( "need_authoritative" , true ); - result.append( "ns" , ns ); - errmsg = "first time for this ns"; - return false; - } - - result.appendTimestamp( "oldVersion" , oldVersion ); - oldVersion = version; - globalVersion = version; - - result.append( "ok" , 1 ); - return 1; - } - - } setShardVersion; - - class GetShardVersion : public MongodShardCommand { - public: - GetShardVersion() : MongodShardCommand("getShardVersion"){} - - virtual void help( stringstream& help ) const { - help << " example: { getShardVersion : 'alleyinsider.foo' } "; - } - - virtual LockType locktype(){ return WRITE; } // TODO: figure out how to make this not need to lock - - bool run(const char *cmdns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){ - string ns = cmdObj["getShardVersion"].valuestrsafe(); - if ( ns.size() == 0 ){ - errmsg = "need to speciy fully namespace"; - return false; - } - - result.append( "configServer" , shardConfigServer.c_str() ); - - result.appendTimestamp( "global" , globalVersions[ns] ); - if ( clientShardVersions.get() ) - result.appendTimestamp( "mine" , (*clientShardVersions.get())[ns] ); - else - result.appendTimestamp( "mine" , 0 ); - - return true; - } - - } getShardVersion; - - class MoveShardStartCommand : public MongodShardCommand { - public: - MoveShardStartCommand() : MongodShardCommand( "movechunk.start" ){} - virtual void help( stringstream& help ) const { - help << "should not be calling this directly" << endl; - } - - virtual LockType locktype(){ return WRITE; } - - bool run(const char *cmdns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){ - // so i have to start clone, tell caller its ok to make change - // at this point the caller locks me, and updates config db - // then finish calls finish, and then deletes data when cursors are done - - string ns = cmdObj["movechunk.start"].valuestrsafe(); - string to = cmdObj["to"].valuestrsafe(); - string from = cmdObj["from"].valuestrsafe(); // my public address, a tad redundant, but safe - BSONObj filter = cmdObj.getObjectField( "filter" ); - - if ( ns.size() == 0 ){ - errmsg = "need to specify namespace in command"; - return false; - } - - if ( to.size() == 0 ){ - errmsg = "need to specify server to move shard to"; - return false; - } - if ( from.size() == 0 ){ - errmsg = "need to specify server to move shard from (redundat i know)"; - return false; - } - - if ( filter.isEmpty() ){ - errmsg = "need to specify a filter"; - return false; - } - - log() << "got movechunk.start: " << cmdObj << endl; - - - BSONObj res; - bool ok; - - { - dbtemprelease unlock; - - ScopedDbConnection conn( to ); - ok = conn->runCommand( "admin" , - BSON( "startCloneCollection" << ns << - "from" << from << - "query" << filter - ) , - res ); - conn.done(); - } - - log() << " movechunk.start res: " << res << endl; - - if ( ok ){ - result.append( res["finishToken"] ); - } - else { - errmsg = "startCloneCollection failed: "; - errmsg += res["errmsg"].valuestrsafe(); - } - return ok; - } - - } moveShardStartCmd; - - class MoveShardFinishCommand : public MongodShardCommand { - public: - MoveShardFinishCommand() : MongodShardCommand( "movechunk.finish" ){} - virtual void help( stringstream& help ) const { - help << "should not be calling this directly" << endl; - } - - virtual LockType locktype(){ return WRITE; } - - bool run(const char *cmdns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){ - // see MoveShardStartCommand::run - - string ns = cmdObj["movechunk.finish"].valuestrsafe(); - if ( ns.size() == 0 ){ - errmsg = "need ns as cmd value"; - return false; - } - - string to = cmdObj["to"].valuestrsafe(); - if ( to.size() == 0 ){ - errmsg = "need to specify server to move shard to"; - return false; - } - - - unsigned long long newVersion = getVersion( cmdObj["newVersion"] , errmsg ); - if ( newVersion == 0 ){ - errmsg = "have to specify new version number"; - return false; - } - - BSONObj finishToken = cmdObj.getObjectField( "finishToken" ); - if ( finishToken.isEmpty() ){ - errmsg = "need finishToken"; - return false; - } - - if ( ns != finishToken["collection"].valuestrsafe() ){ - errmsg = "namespaced don't match"; - return false; - } - - // now we're locked - globalVersions[ns] = newVersion; - NSVersions * versions = clientShardVersions.get(); - if ( ! versions ){ - versions = new NSVersions(); - clientShardVersions.reset( versions ); - } - (*versions)[ns] = newVersion; - - BSONObj res; - bool ok; - - { - dbtemprelease unlock; - - ScopedDbConnection conn( to ); - ok = conn->runCommand( "admin" , - BSON( "finishCloneCollection" << finishToken ) , - res ); - conn.done(); - } - - if ( ! ok ){ - // uh oh - errmsg = "finishCloneCollection failed!"; - result << "finishError" << res; - return false; - } - - // wait until cursors are clean - cout << "WARNING: deleting data before ensuring no more cursors TODO" << endl; - - dbtemprelease unlock; - DBDirectClient client; - BSONObj removeFilter = finishToken.getObjectField( "query" ); - client.remove( ns , removeFilter ); - - return true; - } - - } moveShardFinishCmd; - - bool haveLocalShardingInfo( const string& ns ){ - if ( shardConfigServer.empty() ) - return false; - - - unsigned long long version = globalVersions[ns]; - if ( version == 0 ) - return false; - - NSVersions * versions = clientShardVersions.get(); - if ( ! versions ) - return false; - - return true; - } - - /** - * @ return true if not in sharded mode - or if version for this client is ok - */ - bool shardVersionOk( const string& ns , string& errmsg ){ - if ( shardConfigServer.empty() ){ - return true; - } - - NSVersions::iterator i = globalVersions.find( ns ); - if ( i == globalVersions.end() ) - return true; - - NSVersions * versions = clientShardVersions.get(); - if ( ! versions ){ - // this means the client has nothing sharded - // so this allows direct connections to do whatever they want - // which i think is the correct behavior - return true; - } - - unsigned long long clientVersion = (*versions)[ns]; - unsigned long long version = i->second; - - if ( version == 0 && clientVersion > 0 ){ - stringstream ss; - ss << "version: " << version << " clientVersion: " << clientVersion; - errmsg = ss.str(); + bool handlePossibleShardedMessage( Message &m, DbResponse* dbresponse ){ + if ( ! shardingState.enabled() ) return false; - } - - if ( clientVersion >= version ) - return true; - - if ( clientVersion == 0 ){ - errmsg = "client in sharded mode, but doesn't have version set for this collection"; + int op = m.operation(); + if ( op < 2000 + || op >= 3000 + || op == dbGetMore // cursors are weird + ) return false; - } - - errmsg = (string)"your version is too old ns: " + ns; - return false; - } - - - bool handlePossibleShardedMessage( Message &m, DbResponse &dbresponse ){ - - if ( shardConfigServer.empty() ){ - return false; - } - - int op = m.data->operation(); - if ( op < 2000 || op >= 3000 ) - return false; - - const char *ns = m.data->_data + 4; + DbMessage d(m); + const char *ns = d.getns(); string errmsg; if ( shardVersionOk( ns , errmsg ) ){ return false; } - log() << "shardVersionOk failed ns:" << ns << " " << errmsg << endl; + log() << "shardVersionOk failed ns:(" << ns << ") op:(" << opToString(op) << ") " << errmsg << endl; if ( doesOpGetAResponse( op ) ){ + assert( dbresponse ); BufBuilder b( 32768 ); b.skip( sizeof( QueryResult ) ); { BSONObj obj = BSON( "$err" << errmsg ); - b.append( obj.objdata() , obj.objsize() ); + b.appendBuf( obj.objdata() , obj.objsize() ); } QueryResult *qr = (QueryResult*)b.buf(); - qr->_resultFlags() = QueryResult::ResultFlag_ErrSet | QueryResult::ResultFlag_ShardConfigStale; + qr->_resultFlags() = ResultFlag_ErrSet | ResultFlag_ShardConfigStale; qr->len = b.len(); qr->setOperation( opReply ); qr->cursorId = 0; @@ -526,22 +83,30 @@ namespace mongo { Message * resp = new Message(); resp->setData( qr , true ); - dbresponse.response = resp; - dbresponse.responseTo = m.data->id; + dbresponse->response = resp; + dbresponse->responseTo = m.header()->id; return true; } - OID * clientID = clientServerIds.get(); - massert( 10422 , "write with bad shard config and no server id!" , clientID ); + OID writebackID; + writebackID.init(); + lastError.getSafe()->writeback( writebackID ); + + const OID& clientID = ShardedConnectionInfo::get(false)->getID(); + massert( 10422 , "write with bad shard config and no server id!" , clientID.isSet() ); - log() << "got write with an old config - writing back" << endl; + log(1) << "got write with an old config - writing back ns: " << ns << endl; + if ( logLevel ) log(1) << debugString( m ) << endl; BSONObjBuilder b; b.appendBool( "writeBack" , true ); b.append( "ns" , ns ); - b.appendBinData( "msg" , m.data->len , bdtCustom , (char*)(m.data) ); - log() << "writing back msg with len: " << m.data->len << " op: " << m.data->_operation << endl; - clientQueues[clientID->str()]->push( b.obj() ); + b.append( "id" , writebackID ); + b.appendTimestamp( "version" , shardingState.getVersion( ns ) ); + b.appendTimestamp( "yourVersion" , ShardedConnectionInfo::get( true )->getVersion( ns ) ); + b.appendBinData( "msg" , m.header()->len , bdtCustom , (char*)(m.singleData()) ); + log(2) << "writing back msg with len: " << m.header()->len << " op: " << m.operation() << endl; + queueWriteBack( clientID.str() , b.obj() ); return true; } |