summaryrefslogtreecommitdiff
path: root/s/d_logic.cpp
diff options
context:
space:
mode:
authorAntonin Kral <a.kral@bobek.cz>2010-08-11 12:38:57 +0200
committerAntonin Kral <a.kral@bobek.cz>2010-08-11 12:38:57 +0200
commit7645618fd3914cb8a20561625913c20d49504a49 (patch)
tree8370f846f58f6d71165b7a0e2eda04648584ec76 /s/d_logic.cpp
parent68c73c3c7608b4c87f07440dc3232801720b1168 (diff)
downloadmongodb-7645618fd3914cb8a20561625913c20d49504a49.tar.gz
Imported Upstream version 1.6.0
Diffstat (limited to 's/d_logic.cpp')
-rw-r--r--s/d_logic.cpp503
1 files changed, 34 insertions, 469 deletions
diff --git a/s/d_logic.cpp b/s/d_logic.cpp
index 2a9cde3..ddf83e8 100644
--- a/s/d_logic.cpp
+++ b/s/d_logic.cpp
@@ -22,500 +22,57 @@
mostly around shard management and checking
*/
-#include "stdafx.h"
+#include "pch.h"
#include <map>
#include <string>
#include "../db/commands.h"
#include "../db/jsobj.h"
#include "../db/dbmessage.h"
+#include "../db/query.h"
#include "../client/connpool.h"
#include "../util/queue.h"
+#include "shard.h"
+#include "d_logic.h"
+
using namespace std;
namespace mongo {
-
- typedef map<string,unsigned long long> NSVersions;
-
- NSVersions globalVersions;
- boost::thread_specific_ptr<NSVersions> clientShardVersions;
-
- string shardConfigServer;
-
- boost::thread_specific_ptr<OID> clientServerIds;
- map< string , BlockingQueue<BSONObj>* > clientQueues;
-
- unsigned long long getVersion( BSONElement e , string& errmsg ){
- if ( e.eoo() ){
- errmsg = "no version";
- return 0;
- }
-
- if ( e.isNumber() )
- return (unsigned long long)e.number();
-
- if ( e.type() == Date || e.type() == Timestamp )
- return e._numberLong();
-
-
- errmsg = "version is not a numberic type";
- return 0;
- }
-
- class MongodShardCommand : public Command {
- public:
- MongodShardCommand( const char * n ) : Command( n ){
- }
- virtual bool slaveOk(){
- return false;
- }
- virtual bool adminOnly() {
- return true;
- }
- };
-
- class WriteBackCommand : public MongodShardCommand {
- public:
- virtual LockType locktype(){ return NONE; }
- WriteBackCommand() : MongodShardCommand( "writebacklisten" ){}
- bool run(const char *cmdns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){
-
- BSONElement e = cmdObj.firstElement();
- if ( e.type() != jstOID ){
- errmsg = "need oid as first value";
- return 0;
- }
-
- const OID id = e.__oid();
-
- if ( ! clientQueues[id.str()] )
- clientQueues[id.str()] = new BlockingQueue<BSONObj>();
-
- BSONObj z = clientQueues[id.str()]->blockingPop();
- log(1) << "WriteBackCommand got : " << z << endl;
-
- result.append( "data" , z );
-
- return true;
- }
- } writeBackCommand;
-
- // setShardVersion( ns )
-
- class SetShardVersion : public MongodShardCommand {
- public:
- SetShardVersion() : MongodShardCommand("setShardVersion"){}
-
- virtual void help( stringstream& help ) const {
- help << " example: { setShardVersion : 'alleyinsider.foo' , version : 1 , configdb : '' } ";
- }
-
- virtual LockType locktype(){ return WRITE; } // TODO: figure out how to make this not need to lock
-
- bool run(const char *cmdns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){
-
- bool authoritative = cmdObj.getBoolField( "authoritative" );
-
- string configdb = cmdObj["configdb"].valuestrsafe();
- { // configdb checking
- if ( configdb.size() == 0 ){
- errmsg = "no configdb";
- return false;
- }
-
- if ( shardConfigServer.size() == 0 ){
- if ( ! authoritative ){
- result.appendBool( "need_authoritative" , true );
- errmsg = "first setShardVersion";
- return false;
- }
- shardConfigServer = configdb;
- }
- else if ( shardConfigServer != configdb ){
- errmsg = "specified a different configdb!";
- return false;
- }
- }
-
- { // setting up ids
- if ( cmdObj["serverID"].type() != jstOID ){
- // TODO: fix this
- //errmsg = "need serverID to be an OID";
- //return 0;
- }
- else {
- OID clientId = cmdObj["serverID"].__oid();
- if ( ! clientServerIds.get() ){
- string s = clientId.str();
-
- OID * nid = new OID();
- nid->init( s );
- clientServerIds.reset( nid );
-
- if ( ! clientQueues[s] )
- clientQueues[s] = new BlockingQueue<BSONObj>();
- }
- else if ( clientId != *clientServerIds.get() ){
- errmsg = "server id has changed!";
- return 0;
- }
- }
- }
-
- unsigned long long version = getVersion( cmdObj["version"] , errmsg );
- if ( errmsg.size() ){
- return false;
- }
-
- NSVersions * versions = clientShardVersions.get();
-
- if ( ! versions ){
- log(1) << "entering shard mode for connection" << endl;
- versions = new NSVersions();
- clientShardVersions.reset( versions );
- }
-
- string ns = cmdObj["setShardVersion"].valuestrsafe();
- if ( ns.size() == 0 ){
- errmsg = "need to speciy fully namespace";
- return false;
- }
-
- unsigned long long& oldVersion = (*versions)[ns];
- unsigned long long& globalVersion = globalVersions[ns];
-
- if ( version == 0 && globalVersion == 0 ){
- // this connection is cleaning itself
- oldVersion = 0;
- return 1;
- }
-
- if ( version == 0 && globalVersion > 0 ){
- if ( ! authoritative ){
- result.appendBool( "need_authoritative" , true );
- result.appendTimestamp( "globalVersion" , globalVersion );
- result.appendTimestamp( "oldVersion" , oldVersion );
- errmsg = "dropping needs to be authoritative";
- return 0;
- }
- log() << "wiping data for: " << ns << endl;
- result.appendTimestamp( "beforeDrop" , globalVersion );
- // only setting global version on purpose
- // need clients to re-find meta-data
- globalVersion = 0;
- oldVersion = 0;
- return 1;
- }
-
- if ( version < oldVersion ){
- errmsg = "you already have a newer version";
- result.appendTimestamp( "oldVersion" , oldVersion );
- result.appendTimestamp( "newVersion" , version );
- return false;
- }
-
- if ( version < globalVersion ){
- errmsg = "going to older version for global";
- return false;
- }
-
- if ( globalVersion == 0 && ! cmdObj.getBoolField( "authoritative" ) ){
- // need authoritative for first look
- result.appendBool( "need_authoritative" , true );
- result.append( "ns" , ns );
- errmsg = "first time for this ns";
- return false;
- }
-
- result.appendTimestamp( "oldVersion" , oldVersion );
- oldVersion = version;
- globalVersion = version;
-
- result.append( "ok" , 1 );
- return 1;
- }
-
- } setShardVersion;
-
- class GetShardVersion : public MongodShardCommand {
- public:
- GetShardVersion() : MongodShardCommand("getShardVersion"){}
-
- virtual void help( stringstream& help ) const {
- help << " example: { getShardVersion : 'alleyinsider.foo' } ";
- }
-
- virtual LockType locktype(){ return WRITE; } // TODO: figure out how to make this not need to lock
-
- bool run(const char *cmdns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){
- string ns = cmdObj["getShardVersion"].valuestrsafe();
- if ( ns.size() == 0 ){
- errmsg = "need to speciy fully namespace";
- return false;
- }
-
- result.append( "configServer" , shardConfigServer.c_str() );
-
- result.appendTimestamp( "global" , globalVersions[ns] );
- if ( clientShardVersions.get() )
- result.appendTimestamp( "mine" , (*clientShardVersions.get())[ns] );
- else
- result.appendTimestamp( "mine" , 0 );
-
- return true;
- }
-
- } getShardVersion;
-
- class MoveShardStartCommand : public MongodShardCommand {
- public:
- MoveShardStartCommand() : MongodShardCommand( "movechunk.start" ){}
- virtual void help( stringstream& help ) const {
- help << "should not be calling this directly" << endl;
- }
-
- virtual LockType locktype(){ return WRITE; }
-
- bool run(const char *cmdns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){
- // so i have to start clone, tell caller its ok to make change
- // at this point the caller locks me, and updates config db
- // then finish calls finish, and then deletes data when cursors are done
-
- string ns = cmdObj["movechunk.start"].valuestrsafe();
- string to = cmdObj["to"].valuestrsafe();
- string from = cmdObj["from"].valuestrsafe(); // my public address, a tad redundant, but safe
- BSONObj filter = cmdObj.getObjectField( "filter" );
-
- if ( ns.size() == 0 ){
- errmsg = "need to specify namespace in command";
- return false;
- }
-
- if ( to.size() == 0 ){
- errmsg = "need to specify server to move shard to";
- return false;
- }
- if ( from.size() == 0 ){
- errmsg = "need to specify server to move shard from (redundat i know)";
- return false;
- }
-
- if ( filter.isEmpty() ){
- errmsg = "need to specify a filter";
- return false;
- }
-
- log() << "got movechunk.start: " << cmdObj << endl;
-
-
- BSONObj res;
- bool ok;
-
- {
- dbtemprelease unlock;
-
- ScopedDbConnection conn( to );
- ok = conn->runCommand( "admin" ,
- BSON( "startCloneCollection" << ns <<
- "from" << from <<
- "query" << filter
- ) ,
- res );
- conn.done();
- }
-
- log() << " movechunk.start res: " << res << endl;
-
- if ( ok ){
- result.append( res["finishToken"] );
- }
- else {
- errmsg = "startCloneCollection failed: ";
- errmsg += res["errmsg"].valuestrsafe();
- }
- return ok;
- }
-
- } moveShardStartCmd;
-
- class MoveShardFinishCommand : public MongodShardCommand {
- public:
- MoveShardFinishCommand() : MongodShardCommand( "movechunk.finish" ){}
- virtual void help( stringstream& help ) const {
- help << "should not be calling this directly" << endl;
- }
-
- virtual LockType locktype(){ return WRITE; }
-
- bool run(const char *cmdns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){
- // see MoveShardStartCommand::run
-
- string ns = cmdObj["movechunk.finish"].valuestrsafe();
- if ( ns.size() == 0 ){
- errmsg = "need ns as cmd value";
- return false;
- }
-
- string to = cmdObj["to"].valuestrsafe();
- if ( to.size() == 0 ){
- errmsg = "need to specify server to move shard to";
- return false;
- }
-
-
- unsigned long long newVersion = getVersion( cmdObj["newVersion"] , errmsg );
- if ( newVersion == 0 ){
- errmsg = "have to specify new version number";
- return false;
- }
-
- BSONObj finishToken = cmdObj.getObjectField( "finishToken" );
- if ( finishToken.isEmpty() ){
- errmsg = "need finishToken";
- return false;
- }
-
- if ( ns != finishToken["collection"].valuestrsafe() ){
- errmsg = "namespaced don't match";
- return false;
- }
-
- // now we're locked
- globalVersions[ns] = newVersion;
- NSVersions * versions = clientShardVersions.get();
- if ( ! versions ){
- versions = new NSVersions();
- clientShardVersions.reset( versions );
- }
- (*versions)[ns] = newVersion;
-
- BSONObj res;
- bool ok;
-
- {
- dbtemprelease unlock;
-
- ScopedDbConnection conn( to );
- ok = conn->runCommand( "admin" ,
- BSON( "finishCloneCollection" << finishToken ) ,
- res );
- conn.done();
- }
-
- if ( ! ok ){
- // uh oh
- errmsg = "finishCloneCollection failed!";
- result << "finishError" << res;
- return false;
- }
-
- // wait until cursors are clean
- cout << "WARNING: deleting data before ensuring no more cursors TODO" << endl;
-
- dbtemprelease unlock;
- DBDirectClient client;
- BSONObj removeFilter = finishToken.getObjectField( "query" );
- client.remove( ns , removeFilter );
-
- return true;
- }
-
- } moveShardFinishCmd;
-
- bool haveLocalShardingInfo( const string& ns ){
- if ( shardConfigServer.empty() )
- return false;
-
-
- unsigned long long version = globalVersions[ns];
- if ( version == 0 )
- return false;
-
- NSVersions * versions = clientShardVersions.get();
- if ( ! versions )
- return false;
-
- return true;
- }
-
- /**
- * @ return true if not in sharded mode
- or if version for this client is ok
- */
- bool shardVersionOk( const string& ns , string& errmsg ){
- if ( shardConfigServer.empty() ){
- return true;
- }
-
- NSVersions::iterator i = globalVersions.find( ns );
- if ( i == globalVersions.end() )
- return true;
-
- NSVersions * versions = clientShardVersions.get();
- if ( ! versions ){
- // this means the client has nothing sharded
- // so this allows direct connections to do whatever they want
- // which i think is the correct behavior
- return true;
- }
-
- unsigned long long clientVersion = (*versions)[ns];
- unsigned long long version = i->second;
-
- if ( version == 0 && clientVersion > 0 ){
- stringstream ss;
- ss << "version: " << version << " clientVersion: " << clientVersion;
- errmsg = ss.str();
+ bool handlePossibleShardedMessage( Message &m, DbResponse* dbresponse ){
+ if ( ! shardingState.enabled() )
return false;
- }
-
- if ( clientVersion >= version )
- return true;
-
- if ( clientVersion == 0 ){
- errmsg = "client in sharded mode, but doesn't have version set for this collection";
+ int op = m.operation();
+ if ( op < 2000
+ || op >= 3000
+ || op == dbGetMore // cursors are weird
+ )
return false;
- }
-
- errmsg = (string)"your version is too old ns: " + ns;
- return false;
- }
-
-
- bool handlePossibleShardedMessage( Message &m, DbResponse &dbresponse ){
-
- if ( shardConfigServer.empty() ){
- return false;
- }
-
- int op = m.data->operation();
- if ( op < 2000 || op >= 3000 )
- return false;
-
- const char *ns = m.data->_data + 4;
+ DbMessage d(m);
+ const char *ns = d.getns();
string errmsg;
if ( shardVersionOk( ns , errmsg ) ){
return false;
}
- log() << "shardVersionOk failed ns:" << ns << " " << errmsg << endl;
+ log() << "shardVersionOk failed ns:(" << ns << ") op:(" << opToString(op) << ") " << errmsg << endl;
if ( doesOpGetAResponse( op ) ){
+ assert( dbresponse );
BufBuilder b( 32768 );
b.skip( sizeof( QueryResult ) );
{
BSONObj obj = BSON( "$err" << errmsg );
- b.append( obj.objdata() , obj.objsize() );
+ b.appendBuf( obj.objdata() , obj.objsize() );
}
QueryResult *qr = (QueryResult*)b.buf();
- qr->_resultFlags() = QueryResult::ResultFlag_ErrSet | QueryResult::ResultFlag_ShardConfigStale;
+ qr->_resultFlags() = ResultFlag_ErrSet | ResultFlag_ShardConfigStale;
qr->len = b.len();
qr->setOperation( opReply );
qr->cursorId = 0;
@@ -526,22 +83,30 @@ namespace mongo {
Message * resp = new Message();
resp->setData( qr , true );
- dbresponse.response = resp;
- dbresponse.responseTo = m.data->id;
+ dbresponse->response = resp;
+ dbresponse->responseTo = m.header()->id;
return true;
}
- OID * clientID = clientServerIds.get();
- massert( 10422 , "write with bad shard config and no server id!" , clientID );
+ OID writebackID;
+ writebackID.init();
+ lastError.getSafe()->writeback( writebackID );
+
+ const OID& clientID = ShardedConnectionInfo::get(false)->getID();
+ massert( 10422 , "write with bad shard config and no server id!" , clientID.isSet() );
- log() << "got write with an old config - writing back" << endl;
+ log(1) << "got write with an old config - writing back ns: " << ns << endl;
+ if ( logLevel ) log(1) << debugString( m ) << endl;
BSONObjBuilder b;
b.appendBool( "writeBack" , true );
b.append( "ns" , ns );
- b.appendBinData( "msg" , m.data->len , bdtCustom , (char*)(m.data) );
- log() << "writing back msg with len: " << m.data->len << " op: " << m.data->_operation << endl;
- clientQueues[clientID->str()]->push( b.obj() );
+ b.append( "id" , writebackID );
+ b.appendTimestamp( "version" , shardingState.getVersion( ns ) );
+ b.appendTimestamp( "yourVersion" , ShardedConnectionInfo::get( true )->getVersion( ns ) );
+ b.appendBinData( "msg" , m.header()->len , bdtCustom , (char*)(m.singleData()) );
+ log(2) << "writing back msg with len: " << m.header()->len << " op: " << m.operation() << endl;
+ queueWriteBack( clientID.str() , b.obj() );
return true;
}